0d31fcfccbc783962e79d86fd28d2160e8786ab6
[OpenSRF.git] / src / perl / lib / OpenSRF / Server.pm
1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
16 use strict;
17 use warnings;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
25 use Encode;
26 use POSIX qw/:sys_wait_h :errno_h/;
27 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
28 use Time::HiRes qw/usleep/;
29 use IO::Select;
30 use Socket;
31 our $chatty = 1; # disable for production
32
33 use constant STATUS_PIPE_DATA_SIZE => 12;
34 use constant WRITE_PIPE_DATA_SIZE  => 12;
35
36 sub new {
37     my($class, $service, %args) = @_;
38     my $self = bless(\%args, $class);
39
40     $self->{service}        = $service; # service name
41     $self->{num_children}   = 0; # number of child processes
42     $self->{osrf_handle}    = undef; # xmpp handle
43     $self->{routers}        = []; # list of registered routers
44     $self->{active_list}    = []; # list of active children
45     $self->{idle_list}      = []; # list of idle children
46     $self->{sighup_pending} = [];
47     $self->{pid_map}        = {}; # map of child pid to child for cleaner access
48     $self->{sig_pipe}       = 0;  # true if last syswrite failed
49
50     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
51         if $self->{stderr_log_path};
52
53     $self->{min_spare_children} ||= 0;
54     $self->{max_backlog_queue} ||= 1000;
55
56     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
57         $self->{max_spare_children} and
58         $self->{max_spare_children} <= $self->{min_spare_children};
59
60     return $self;
61 }
62
63 # ----------------------------------------------------------------
64 # Disconnects from routers and waits for child processes to exit.
65 # ----------------------------------------------------------------
66 sub cleanup {
67     my $self = shift;
68     my $no_exit = shift;
69     my $graceful = shift;
70
71     $logger->info("server: shutting down and cleaning up...");
72
73     # de-register routers
74     $self->unregister_routers;
75
76     if ($graceful) {
77         # graceful shutdown waits for all active 
78         # children to complete their in-process tasks.
79
80         while (@{$self->{active_list}}) {
81             $logger->info("server: graceful shutdown with ".
82                 @{$self->{active_list}}." active children...");
83
84             # block until a child is becomes available
85             $self->check_status(1);
86         }
87         $logger->info("server: all clear for graceful shutdown");
88     }
89
90     # don't get sidetracked by signals while we're cleaning up.
91     # it could result in unexpected behavior with list traversal
92     $SIG{CHLD} = 'IGNORE';
93
94     # terminate the child processes
95     $self->kill_child($_) for
96         (@{$self->{idle_list}}, @{$self->{active_list}});
97
98     $self->{osrf_handle}->disconnect;
99
100     # clean up our dead children
101     $self->reap_children(1);
102
103     exit(0) unless $no_exit;
104 }
105
106 # ----------------------------------------------------------------
107 # SIGHUP handler.  Kill all idle children.  Copy list of active
108 # children into sighup_pending list for later cleanup.
109 # ----------------------------------------------------------------
110 sub handle_sighup {
111     my $self = shift;
112     $logger->info("server: caught SIGHUP; reloading children");
113
114     # reload the opensrf config
115     # note: calling ::Config->load() results in ever-growing
116     # package names, which eventually causes an exception
117     OpenSRF::Utils::Config->current->_load(
118         force => 1,
119         config_file => OpenSRF::Utils::Config->current->FILE
120     );
121
122     # force-reload the logger config
123     OpenSRF::Utils::Logger::set_config(1);
124
125     # copy active list into pending list for later cleanup
126     $self->{sighup_pending} = [ @{$self->{active_list}} ];
127
128     # idle_list will be modified as children are reaped.
129     my @idle = @{$self->{idle_list}};
130
131     # idle children are the reaper's plaything
132     $self->kill_child($_) for @idle;
133 }
134
135 # ----------------------------------------------------------------
136 # Waits on the jabber socket for inbound data from the router.
137 # Each new message is passed off to a child process for handling.
138 # At regular intervals, wake up for min/max spare child maintenance
139 # ----------------------------------------------------------------
140 sub run {
141     my $self = shift;
142
143     $logger->set_service($self->{service});
144
145     $SIG{$_} = sub { $self->cleanup; } for (qw/INT QUIT/);
146     $SIG{TERM} = sub { $self->cleanup(0, 1); };
147     $SIG{CHLD} = sub { $self->reap_children(); };
148     $SIG{HUP} = sub { $self->handle_sighup(); };
149     $SIG{USR1} = sub { $self->unregister_routers; };
150     $SIG{USR2} = sub { $self->register_routers; };
151
152     $self->spawn_children;
153     $self->build_osrf_handle;
154     $self->register_routers;
155     my $wait_time = 1;
156
157     my @max_children_msg_queue;
158
159     # main server loop
160     while(1) {
161         my $from_network = 0;
162
163         $self->check_status;
164         $self->{child_died} = 0;
165
166         my $msg = shift(@max_children_msg_queue);
167
168         # no pending message, so wait for the next one forever
169         $from_network = $wait_time = -1 if (!$msg);
170         $msg ||= $self->{osrf_handle}->process($wait_time);
171
172         !$from_network and $chatty and $logger->debug("server: attempting to process previously queued message");
173         $from_network and $chatty and $logger->internal("server: no queued messages, processing due to network or signal");
174
175         # we woke up for any reason, reset the wait time to allow
176         # for idle maintenance as necessary
177         $wait_time = 1;
178
179         if($msg) {
180
181             if ($msg->type and $msg->type eq 'error') {
182                 $logger->info("server: Listener received an XMPP error ".
183                     "message.  Likely a bounced message. sender=".$msg->from);
184
185             } elsif(my $child = pop(@{$self->{idle_list}})) {
186
187                 # we have an idle child to handle the request
188                 $chatty and $logger->internal("server: passing request to idle child $child");
189                 push(@{$self->{active_list}}, $child);
190                 $self->write_child($child, $msg);
191
192             } elsif($self->{num_children} < $self->{max_children}) {
193
194                 # spawning a child to handle the request
195                 $chatty and $logger->internal("server: spawning child to handle request");
196                 $self->write_child($self->spawn_child(1), $msg);
197
198             } else {
199                 $logger->warn("server: no children available, waiting... consider increasing " .
200                     "max_children for this application higher than $self->{max_children} ".
201                     "in the OpenSRF configuration if this message occurs frequently");
202
203                 if ($from_network) {
204                     $chatty and $logger->debug("server: queuing new message");
205                     push @max_children_msg_queue, $msg;
206                 } else {
207                     $chatty and $logger->debug("server: re-queuing old message");
208                     unshift @max_children_msg_queue, $msg;
209                 }
210
211                 $logger->warn("server: backlog queue size is now ". scalar(@max_children_msg_queue));
212
213                 if (@max_children_msg_queue < $self->{max_backlog_queue}) {
214                     # We still have room on the queue. Set the wait time to
215                     # 1s, waiting for a drone to be freed up and reprocess
216                     # this (and any other) queued messages.
217                     $wait_time = 1;
218                     if (!$from_network) {
219                         # if we got here, we had retrieved a message from the queue
220                         # but couldn't process it... but also hadn't fetched any
221                         # additional messages from the network. Doing so now,
222                         # as otherwise only one message will ever get queued
223                         $msg = $self->{osrf_handle}->process($wait_time);
224                         if ($msg) {
225                             $chatty and $logger->debug("server: queuing new message after a re-queue");
226                             push @max_children_msg_queue, $msg;
227                         }
228                     }
229                 } else {
230                     # We'll just have to wait
231                     $self->check_status(1); # block until child is available
232                 }
233             }
234
235         } else {
236
237             # don't perform idle maint immediately when woken by SIGCHLD
238             unless($self->{child_died}) {
239
240                 # when we hit equilibrium, there's no need for regular
241                 # maintenance, so set wait_time to 'forever'
242                 $wait_time = -1 if 
243                     !$self->perform_idle_maintenance and # no maintenance performed this time
244                     @{$self->{active_list}} == 0; # no active children 
245             }
246         }
247     }
248 }
249
250 # ----------------------------------------------------------------
251 # Launch a new spare child or kill an extra spare child.  To
252 # prevent large-scale spawning or die-offs, spawn or kill only
253 # 1 process per idle maintenance loop.
254 # Returns true if any idle maintenance occurred, 0 otherwise
255 # ----------------------------------------------------------------
256 sub perform_idle_maintenance {
257     my $self = shift;
258
259     $chatty and $logger->internal(
260         sprintf(
261             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
262             scalar(@{$self->{idle_list}}), 
263             scalar(@{$self->{active_list}}),
264             $self->{min_spare_children},
265             $self->{max_spare_children}
266         )
267     );
268
269     # spawn 1 spare child per maintenance loop if necessary
270     if( $self->{min_spare_children} and
271         $self->{num_children} < $self->{max_children} and
272         scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
273
274         $chatty and $logger->internal("server: spawning spare child");
275         $self->spawn_child;
276         return 1;
277
278     # kill 1 excess spare child per maintenance loop if necessary
279     } elsif($self->{max_spare_children} and
280             $self->{num_children} > $self->{min_children} and
281             scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
282
283         $chatty and $logger->internal("server: killing spare child");
284         $self->kill_child;
285         return 1;
286     }
287
288     return 0;
289 }
290
291 sub kill_child {
292     my $self = shift;
293     my $child = shift || pop(@{$self->{idle_list}}) or return;
294     $chatty and $logger->internal("server: killing child $child");
295     kill('TERM', $child->{pid});
296 }
297
298 # ----------------------------------------------------------------
299 # Jabber connection inbound message arrive on.
300 # ----------------------------------------------------------------
301 sub build_osrf_handle {
302     my $self = shift;
303
304     my $conf = OpenSRF::Utils::Config->current;
305     my $username = $conf->bootstrap->username;
306     my $password = $conf->bootstrap->passwd;
307     my $domain = $conf->bootstrap->domain;
308     my $port = $conf->bootstrap->port;
309     my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
310
311     $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
312
313     $self->{osrf_handle} =
314         OpenSRF::Transport::SlimJabber::Client->new(
315             username => $username,
316             resource => $resource,
317             password => $password,
318             host => $domain,
319             port => $port,
320         );
321
322     $self->{osrf_handle}->initialize;
323 }
324
325
326 # ----------------------------------------------------------------
327 # Sends request data to a child process
328 # ----------------------------------------------------------------
329 sub write_child {
330     my($self, $child, $msg) = @_;
331     my $xml = encode_utf8(decode_utf8($msg->to_xml));
332
333     # tell the child how much data to expect, minus the header
334     my $write_size;
335     {use bytes; $write_size = length($xml)}
336     $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
337
338     for (0..2) {
339
340         $self->{sig_pipe} = 0;
341         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
342
343         # In rare cases a child can die between creation and first
344         # write, typically a result of a jabber connect error.  Before
345         # sending data to each child, confirm it's still alive.  If it's
346         # not, log the error and drop the message to prevent the parent
347         # process from dying.
348         # When a child dies, all of its attributes are deleted,
349         # so the lack of a pid means the child is dead.
350         if (!$child->{pid}) {
351             $logger->error("server: child is dead in write_child(). ".
352                 "unable to send message: $xml");
353             return; # avoid syswrite crash
354         }
355
356         # send message to child data pipe
357         syswrite($child->{pipe_to_child}, $write_size . $xml);
358
359         last unless $self->{sig_pipe};
360         $logger->error("server: got SIGPIPE writing to $child, retrying...");
361         usleep(50000); # 50 msec
362     }
363
364     $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
365 }
366
367 # ----------------------------------------------------------------
368 # Checks to see if any child process has reported its availability
369 # In blocking mode, blocks until a child has reported.
370 # ----------------------------------------------------------------
371 sub check_status {
372     my($self, $block) = @_;
373
374     return unless @{$self->{active_list}};
375
376     my @pids;
377
378     while (1) {
379
380         # if can_read or sysread is interrupted while bloking, go back and 
381         # wait again until we have at least 1 free child
382
383         # refresh the read_set handles in case we lost a child in the previous iteration
384         my $read_set = IO::Select->new;
385         $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
386
387         if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
388             my $pid = '';
389             for my $pipe (@handles) {
390                 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
391                 push(@pids, int($pid));
392             }
393         }
394
395         last unless $block and !@pids;
396     }
397
398     return unless @pids;
399
400     $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
401
402     my $child;
403     my @new_actives;
404
405     # move the children from the active list to the idle list
406     for my $proc (@{$self->{active_list}}) {
407         if(grep { $_ == $proc->{pid} } @pids) {
408             push(@{$self->{idle_list}}, $proc);
409         } else {
410             push(@new_actives, $proc);
411         }
412     }
413
414     $self->{active_list} = [@new_actives];
415
416     $chatty and $logger->internal(sprintf(
417         "server: %d idle and %d active children after status update",
418             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
419
420     # some children just went from active to idle. let's see 
421     # if any of them need to be killed from a previous sighup.
422
423     for my $child (@{$self->{sighup_pending}}) {
424         if (grep {$_ == $child->{pid}} @pids) {
425
426             $chatty and $logger->internal(
427                 "server: killing previously-active ".
428                 "child after receiving SIGHUP: $child");
429
430             # remove the pending child
431             $self->{sighup_pending} = [
432                 grep {$_->{pid} != $child->{pid}} 
433                     @{$self->{sighup_pending}}
434             ];
435
436             # kill the pending child
437             $self->kill_child($child)
438         }
439     }
440 }
441
442 # ----------------------------------------------------------------
443 # Cleans up any child processes that have exited.
444 # In shutdown mode, block until all children have washed ashore
445 # ----------------------------------------------------------------
446 sub reap_children {
447     my($self, $shutdown) = @_;
448     $self->{child_died} = 1;
449
450     while(1) {
451
452         my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
453         last if $pid <= 0;
454
455         $chatty and $logger->internal("server: reaping child $pid");
456
457         my $child = $self->{pid_map}->{$pid};
458
459         close($child->{pipe_to_parent});
460         close($child->{pipe_to_child});
461
462         $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
463         $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
464
465         $self->{num_children}--;
466         delete $self->{pid_map}->{$pid};
467         delete $child->{$_} for keys %$child; # destroy with a vengeance
468     }
469
470     $self->spawn_children unless $shutdown;
471
472     $chatty and $logger->internal(sprintf(
473         "server: %d idle and %d active children after reap_children",
474             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
475 }
476
477 # ----------------------------------------------------------------
478 # Spawn up to max_children processes
479 # ----------------------------------------------------------------
480 sub spawn_children {
481     my $self = shift;
482     $self->spawn_child while $self->{num_children} < $self->{min_children};
483 }
484
485 # ----------------------------------------------------------------
486 # Spawns a new child.  If $active is set, the child goes directly
487 # into the active_list.
488 # ----------------------------------------------------------------
489 sub spawn_child {
490     my($self, $active) = @_;
491
492     my $child = OpenSRF::Server::Child->new($self);
493
494     # socket for sending message data to the child
495     if(!socketpair(
496         $child->{pipe_to_child},
497         $child->{pipe_to_parent},
498         AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
499             $logger->error("server: error creating data socketpair: $!");
500             return undef;
501     }
502
503     $child->{pipe_to_child}->autoflush(1);
504     $child->{pipe_to_parent}->autoflush(1);
505
506     $child->{pid} = fork();
507
508     if($child->{pid}) { # parent process
509         $self->{num_children}++;
510         $self->{pid_map}->{$child->{pid}} = $child;
511
512         if($active) {
513             push(@{$self->{active_list}}, $child);
514         } else {
515             push(@{$self->{idle_list}}, $child);
516         }
517
518         $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
519
520         return $child;
521
522     } else { # child process
523
524         # recover default handling for any signal whose handler 
525         # may have been adopted from the parent process.
526         $SIG{$_} = 'DEFAULT' for qw/TERM INT QUIT HUP CHLD USR1 USR2/;
527
528         if($self->{stderr_log}) {
529
530             $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
531
532             close STDERR;
533             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
534                 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
535                 open STDERR, '>/dev/null'; # send it back to /dev/null
536             }
537         }
538
539         $child->{pid} = $$;
540         eval {
541             $child->init;
542             $child->run;
543             OpenSRF::Transport::PeerHandle->retrieve->disconnect;
544         };
545         $logger->error("server: child process died: $@") if $@;
546         exit(0);
547     }
548 }
549
550 # ----------------------------------------------------------------
551 # Sends the register command to the configured routers
552 # ----------------------------------------------------------------
553 sub register_routers {
554     my $self = shift;
555
556     my $conf = OpenSRF::Utils::Config->current;
557     my $routers = $conf->bootstrap->routers;
558     my $router_name = $conf->bootstrap->router_name;
559     my @targets;
560
561     for my $router (@$routers) {
562         if(ref $router) {
563
564             if( !$router->{services} ||
565                 !$router->{services}->{service} ||
566                 (
567                     ref($router->{services}->{service}) eq 'ARRAY' and
568                     grep { $_ eq $self->{service} } @{$router->{services}->{service}}
569                 )  || $router->{services}->{service} eq $self->{service}) {
570
571                 my $name = $router->{name};
572                 my $domain = $router->{domain};
573                 push(@targets, "$name\@$domain/router");
574             }
575
576         } else {
577             push(@targets, "$router_name\@$router/router");
578         }
579     }
580
581     foreach (@targets) {
582         $logger->info("server: registering with router $_");
583         $self->{osrf_handle}->send(
584             to => $_,
585             body => 'registering',
586             router_command => 'register',
587             router_class => $self->{service}
588         );
589     }
590
591     $self->{routers} = \@targets;
592 }
593
594 # ----------------------------------------------------------------
595 # Sends the unregister command to any routers we have registered
596 # with.
597 # ----------------------------------------------------------------
598 sub unregister_routers {
599     my $self = shift;
600     return unless $self->{osrf_handle}->tcp_connected;
601
602     for my $router (@{$self->{routers}}) {
603         $logger->info("server: disconnecting from router $router");
604         $self->{osrf_handle}->send(
605             to => $router,
606             body => "unregistering",
607             router_command => "unregister",
608             router_class => $self->{service}
609         );
610     }
611 }
612
613
614 package OpenSRF::Server::Child;
615 use strict;
616 use warnings;
617 use OpenSRF::Transport;
618 use OpenSRF::Application;
619 use OpenSRF::Transport::PeerHandle;
620 use OpenSRF::Transport::SlimJabber::XMPPMessage;
621 use OpenSRF::Utils::Logger qw($logger);
622 use OpenSRF::DomainObject::oilsResponse qw/:status/;
623 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
624 use Time::HiRes qw(time usleep);
625 use POSIX qw/:sys_wait_h :errno_h/;
626
627 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
628
629 sub new {
630     my($class, $parent) = @_;
631     my $self = bless({}, $class);
632     $self->{pid} = 0; # my process ID
633     $self->{parent} = $parent; # Controller parent process
634     $self->{num_requests} = 0; # total serviced requests
635     $self->{sig_pipe} = 0;  # true if last syswrite failed
636     return $self;
637 }
638
639 sub set_nonblock {
640     my($self, $fh) = @_;
641     my  $flags = fcntl($fh, F_GETFL, 0);
642     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
643 }
644
645 sub set_block {
646     my($self, $fh) = @_;
647     my  $flags = fcntl($fh, F_GETFL, 0);
648     $flags &= ~O_NONBLOCK;
649     fcntl($fh, F_SETFL, $flags);
650 }
651
652 # ----------------------------------------------------------------
653 # Connects to Jabber and runs the application child_init
654 # ----------------------------------------------------------------
655 sub init {
656     my $self = shift;
657     my $service = $self->{parent}->{service};
658     $0 = "OpenSRF Drone [$service]";
659     OpenSRF::Transport::PeerHandle->construct($service);
660     OpenSRF::Application->application_implementation->child_init
661         if (OpenSRF::Application->application_implementation->can('child_init'));
662 }
663
664 # ----------------------------------------------------------------
665 # Waits for messages from the parent process, handles the message,
666 # then goes into the keepalive loop if this is a stateful session.
667 # When max_requests is hit, the process exits.
668 # ----------------------------------------------------------------
669 sub run {
670     my $self = shift;
671     my $network = OpenSRF::Transport::PeerHandle->retrieve;
672
673     # main child run loop.  Ends when this child hits max requests.
674     while(1) {
675
676         my $data = $self->wait_for_request or next;
677
678         # Update process name to show activity
679         my $orig_name = $0;
680         $0 = "$0*";
681
682         # Discard extraneous data from the jabber socket
683         if(!$network->flush_socket()) {
684             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
685             exit;
686         }
687
688         my $session = OpenSRF::Transport->handler(
689             $self->{parent}->{service},
690             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
691         );
692
693         my $recycle = $self->keepalive_loop($session);
694
695         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
696
697         if ($recycle) {
698             $chatty && $logger->internal(
699                 "server: child exiting early on force_recycle");
700             last;
701         }
702
703         # Tell the parent process we are available to process requests
704         $self->send_status;
705
706         # Repair process name
707         $0 = $orig_name;
708     }
709
710     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
711
712     OpenSRF::Application->application_implementation->child_exit
713         if (OpenSRF::Application->application_implementation->can('child_exit'));
714 }
715
716 # ----------------------------------------------------------------
717 # waits for a request data on the parent pipe and returns it.
718 # ----------------------------------------------------------------
719 sub wait_for_request {
720     my $self = shift;
721
722     my $data = ''; # final request data
723     my $buf_size = 4096; # default linux pipe_buf (atomic window, not total size)
724     my $read_pipe = $self->{pipe_to_parent};
725     my $bytes_needed; # size of the data we are about to receive
726     my $bytes_recvd; # number of bytes read so far
727     my $first_read = 1; # true for first loop iteration
728     my $read_error;
729
730     while (1) {
731
732         # wait for some data to start arriving
733         my $read_set = IO::Select->new;
734         $read_set->add($read_pipe);
735     
736         while (1) {
737             # if can_read is interrupted while blocking, 
738             # go back and wait again until it succeeds.
739             last if $read_set->can_read;
740         }
741
742         # parent started writing, let's start reading
743         $self->set_nonblock($read_pipe);
744
745         while (1) {
746             # read all of the available data
747
748             my $buf = '';
749             my $nbytes = sysread($self->{pipe_to_parent}, $buf, $buf_size);
750
751             unless(defined $nbytes) {
752                 if ($! != EAGAIN) {
753                     $logger->error("server: error reading data from parent: $!.  ".
754                         "bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd; data=$data");
755                     $read_error = 1;
756                 }
757                 last;
758             }
759
760             last if $nbytes <= 0; # no more data available for reading
761
762             $bytes_recvd += $nbytes;
763             $data .= $buf;
764         }
765
766         $self->set_block($self->{pipe_to_parent});
767         return undef if $read_error;
768
769         # extract the data size and remove the header from the final data
770         if ($first_read) {
771             my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
772             $bytes_needed = int(substr($data, 0, $wps_size)) + $wps_size;
773             $data = substr($data, $wps_size);
774             $first_read = 0;
775         }
776
777
778         if ($bytes_recvd == $bytes_needed) {
779             # we've read all the data. Nothing left to do
780             last;
781         }
782
783         $logger->info("server: child process read all available pipe data.  ".
784             "waiting for more data from parent.  bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd");
785     }
786
787     return $data;
788 }
789
790
791 # ----------------------------------------------------------------
792 # If this is a stateful opensrf session, wait up to $keepalive
793 # seconds for subsequent requests from the client
794 # ----------------------------------------------------------------
795 sub keepalive_loop {
796     my($self, $session) = @_;
797     my $keepalive = $self->{parent}->{keepalive};
798
799     while($session->state and $session->state == $session->CONNECTED) {
800
801         unless( $session->queue_wait($keepalive) ) {
802
803             # client failed to disconnect before timeout
804             $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
805
806             my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
807                 status => "Disconnected on timeout",
808                 statusCode => STATUS_TIMEOUT
809             );
810
811             $session->status($res);
812             $session->state($session->DISCONNECTED);
813             last;
814         }
815     }
816
817     $chatty and $logger->internal("server: child done with request(s)");
818
819     # Capture the recycle option value before it's clobbered.
820     # The option may be set at any point along the life of the 
821     # session.  Once set, it remains set unless 
822     # $session->force_recycle(0) is explicitly called.
823     my $recycle = $session->force_recycle;
824
825     $session->kill_me;
826     return $recycle;
827 }
828
829 # ----------------------------------------------------------------
830 # Report our availability to our parent process
831 # ----------------------------------------------------------------
832 sub send_status {
833     my $self = shift;
834
835     for (0..2) {
836
837         $self->{sig_pipe} = 0;
838         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
839
840         syswrite(
841             $self->{pipe_to_parent},
842             sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
843         );
844
845         last unless $self->{sig_pipe};
846         $logger->error("server: $self got SIGPIPE writing status to parent, retrying...");
847         usleep(50000); # 50 msec
848     }
849
850     $logger->error("server: $self unable to send status to parent") if $self->{sig_pipe};
851 }
852
853
854 1;