]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perl/lib/OpenSRF/Server.pm
ba463e4052ed70cb9a6b8bba92351851e5cdf02f
[OpenSRF.git] / src / perl / lib / OpenSRF / Server.pm
1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
16 use strict;
17 use warnings;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
25 use Encode;
26 use POSIX qw/:sys_wait_h :errno_h/;
27 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
28 use Time::HiRes qw/usleep/;
29 use IO::Select;
30 use Socket;
31 our $chatty = 1; # disable for production
32
33 use constant STATUS_PIPE_DATA_SIZE => 12;
34 use constant WRITE_PIPE_DATA_SIZE  => 12;
35
36 sub new {
37     my($class, $service, %args) = @_;
38     my $self = bless(\%args, $class);
39
40     $self->{service}        = $service; # service name
41     $self->{num_children}   = 0; # number of child processes
42     $self->{osrf_handle}    = undef; # xmpp handle
43     $self->{routers}        = []; # list of registered routers
44     $self->{active_list}    = []; # list of active children
45     $self->{idle_list}      = []; # list of idle children
46     $self->{sighup_pending} = [];
47     $self->{pid_map}        = {}; # map of child pid to child for cleaner access
48     $self->{sig_pipe}       = 0;  # true if last syswrite failed
49
50     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
51         if $self->{stderr_log_path};
52
53     $self->{min_spare_children} ||= 0;
54
55     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
56         $self->{max_spare_children} and
57         $self->{max_spare_children} <= $self->{min_spare_children};
58
59     return $self;
60 }
61
62 # ----------------------------------------------------------------
63 # Disconnects from routers and waits for child processes to exit.
64 # ----------------------------------------------------------------
65 sub cleanup {
66     my $self = shift;
67     my $no_exit = shift;
68     my $graceful = shift;
69
70     $logger->info("server: shutting down and cleaning up...");
71
72     # de-register routers
73     $self->unregister_routers;
74
75     if ($graceful) {
76         # graceful shutdown waits for all active 
77         # children to complete their in-process tasks.
78
79         while (@{$self->{active_list}}) {
80             $logger->info("server: graceful shutdown with ".
81                 @{$self->{active_list}}." active children...");
82
83             # block until a child is becomes available
84             $self->check_status(1);
85         }
86         $logger->info("server: all clear for graceful shutdown");
87     }
88
89     # don't get sidetracked by signals while we're cleaning up.
90     # it could result in unexpected behavior with list traversal
91     $SIG{CHLD} = 'IGNORE';
92
93     # terminate the child processes
94     $self->kill_child($_) for
95         (@{$self->{idle_list}}, @{$self->{active_list}});
96
97     $self->{osrf_handle}->disconnect;
98
99     # clean up our dead children
100     $self->reap_children(1);
101
102     exit(0) unless $no_exit;
103 }
104
105 # ----------------------------------------------------------------
106 # SIGHUP handler.  Kill all idle children.  Copy list of active
107 # children into sighup_pending list for later cleanup.
108 # ----------------------------------------------------------------
109 sub handle_sighup {
110     my $self = shift;
111     $logger->info("server: caught SIGHUP; reloading children");
112
113     # reload the opensrf config
114     # note: calling ::Config->load() results in ever-growing
115     # package names, which eventually causes an exception
116     OpenSRF::Utils::Config->current->_load(
117         force => 1,
118         config_file => OpenSRF::Utils::Config->current->FILE
119     );
120
121     # force-reload the logger config
122     OpenSRF::Utils::Logger::set_config(1);
123
124     # copy active list into pending list for later cleanup
125     $self->{sighup_pending} = [ @{$self->{active_list}} ];
126
127     # idle_list will be modified as children are reaped.
128     my @idle = @{$self->{idle_list}};
129
130     # idle children are the reaper's plaything
131     $self->kill_child($_) for @idle;
132 }
133
134 # ----------------------------------------------------------------
135 # Waits on the jabber socket for inbound data from the router.
136 # Each new message is passed off to a child process for handling.
137 # At regular intervals, wake up for min/max spare child maintenance
138 # ----------------------------------------------------------------
139 sub run {
140     my $self = shift;
141
142     $logger->set_service($self->{service});
143
144     $SIG{$_} = sub { $self->cleanup; } for (qw/INT QUIT/);
145     $SIG{TERM} = sub { $self->cleanup(0, 1); };
146     $SIG{CHLD} = sub { $self->reap_children(); };
147     $SIG{HUP} = sub { $self->handle_sighup(); };
148     $SIG{USR1} = sub { $self->unregister_routers; };
149     $SIG{USR2} = sub { $self->register_routers; };
150
151     $self->spawn_children;
152     $self->build_osrf_handle;
153     $self->register_routers;
154     my $wait_time = 1;
155
156     # main server loop
157     while(1) {
158
159         $self->check_status;
160         $self->{child_died} = 0;
161
162         my $msg = $self->{osrf_handle}->process($wait_time);
163
164         # we woke up for any reason, reset the wait time to allow
165         # for idle maintenance as necessary
166         $wait_time = 1;
167
168         if($msg) {
169
170             if ($msg->type and $msg->type eq 'error') {
171                 $logger->info("server: Listener received an XMPP error ".
172                     "message.  Likely a bounced message. sender=".$msg->from);
173
174             } elsif(my $child = pop(@{$self->{idle_list}})) {
175
176                 # we have an idle child to handle the request
177                 $chatty and $logger->internal("server: passing request to idle child $child");
178                 push(@{$self->{active_list}}, $child);
179                 $self->write_child($child, $msg);
180
181             } elsif($self->{num_children} < $self->{max_children}) {
182
183                 # spawning a child to handle the request
184                 $chatty and $logger->internal("server: spawning child to handle request");
185                 $self->write_child($self->spawn_child(1), $msg);
186
187             } else {
188                 $logger->warn("server: no children available, waiting... consider increasing " .
189                     "max_children for this application higher than $self->{max_children} ".
190                     "in the OpenSRF configuration if this message occurs frequently");
191                 $self->check_status(1); # block until child is available
192
193                 my $child = pop(@{$self->{idle_list}});
194                 push(@{$self->{active_list}}, $child);
195                 $self->write_child($child, $msg);
196             }
197
198         } else {
199
200             # don't perform idle maint immediately when woken by SIGCHLD
201             unless($self->{child_died}) {
202
203                 # when we hit equilibrium, there's no need for regular
204                 # maintenance, so set wait_time to 'forever'
205                 $wait_time = -1 if 
206                     !$self->perform_idle_maintenance and # no maintenance performed this time
207                     @{$self->{active_list}} == 0; # no active children 
208             }
209         }
210     }
211 }
212
213 # ----------------------------------------------------------------
214 # Launch a new spare child or kill an extra spare child.  To
215 # prevent large-scale spawning or die-offs, spawn or kill only
216 # 1 process per idle maintenance loop.
217 # Returns true if any idle maintenance occurred, 0 otherwise
218 # ----------------------------------------------------------------
219 sub perform_idle_maintenance {
220     my $self = shift;
221
222     $chatty and $logger->internal(
223         sprintf(
224             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
225             scalar(@{$self->{idle_list}}), 
226             scalar(@{$self->{active_list}}),
227             $self->{min_spare_children},
228             $self->{max_spare_children}
229         )
230     );
231
232     # spawn 1 spare child per maintenance loop if necessary
233     if( $self->{min_spare_children} and
234         $self->{num_children} < $self->{max_children} and
235         scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
236
237         $chatty and $logger->internal("server: spawning spare child");
238         $self->spawn_child;
239         return 1;
240
241     # kill 1 excess spare child per maintenance loop if necessary
242     } elsif($self->{max_spare_children} and
243             $self->{num_children} > $self->{min_children} and
244             scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
245
246         $chatty and $logger->internal("server: killing spare child");
247         $self->kill_child;
248         return 1;
249     }
250
251     return 0;
252 }
253
254 sub kill_child {
255     my $self = shift;
256     my $child = shift || pop(@{$self->{idle_list}}) or return;
257     $chatty and $logger->internal("server: killing child $child");
258     kill('TERM', $child->{pid});
259 }
260
261 # ----------------------------------------------------------------
262 # Jabber connection inbound message arrive on.
263 # ----------------------------------------------------------------
264 sub build_osrf_handle {
265     my $self = shift;
266
267     my $conf = OpenSRF::Utils::Config->current;
268     my $username = $conf->bootstrap->username;
269     my $password = $conf->bootstrap->passwd;
270     my $domain = $conf->bootstrap->domain;
271     my $port = $conf->bootstrap->port;
272     my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
273
274     $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
275
276     $self->{osrf_handle} =
277         OpenSRF::Transport::SlimJabber::Client->new(
278             username => $username,
279             resource => $resource,
280             password => $password,
281             host => $domain,
282             port => $port,
283         );
284
285     $self->{osrf_handle}->initialize;
286 }
287
288
289 # ----------------------------------------------------------------
290 # Sends request data to a child process
291 # ----------------------------------------------------------------
292 sub write_child {
293     my($self, $child, $msg) = @_;
294     my $xml = encode_utf8(decode_utf8($msg->to_xml));
295
296     # tell the child how much data to expect, minus the header
297     my $write_size;
298     {use bytes; $write_size = length($xml)}
299     $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
300
301     for (0..2) {
302
303         $self->{sig_pipe} = 0;
304         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
305
306         # In rare cases a child can die between creation and first
307         # write, typically a result of a jabber connect error.  Before
308         # sending data to each child, confirm it's still alive.  If it's
309         # not, log the error and drop the message to prevent the parent
310         # process from dying.
311         # When a child dies, all of its attributes are deleted,
312         # so the lack of a pid means the child is dead.
313         if (!$child->{pid}) {
314             $logger->error("server: child is dead in write_child(). ".
315                 "unable to send message: $xml");
316             return; # avoid syswrite crash
317         }
318
319         # send message to child data pipe
320         syswrite($child->{pipe_to_child}, $write_size . $xml);
321
322         last unless $self->{sig_pipe};
323         $logger->error("server: got SIGPIPE writing to $child, retrying...");
324         usleep(50000); # 50 msec
325     }
326
327     $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
328 }
329
330 # ----------------------------------------------------------------
331 # Checks to see if any child process has reported its availability
332 # In blocking mode, blocks until a child has reported.
333 # ----------------------------------------------------------------
334 sub check_status {
335     my($self, $block) = @_;
336
337     return unless @{$self->{active_list}};
338
339     my @pids;
340
341     while (1) {
342
343         # if can_read or sysread is interrupted while bloking, go back and 
344         # wait again until we have at least 1 free child
345
346         # refresh the read_set handles in case we lost a child in the previous iteration
347         my $read_set = IO::Select->new;
348         $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
349
350         if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
351             my $pid = '';
352             for my $pipe (@handles) {
353                 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
354                 push(@pids, int($pid));
355             }
356         }
357
358         last unless $block and !@pids;
359     }
360
361     return unless @pids;
362
363     $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
364
365     my $child;
366     my @new_actives;
367
368     # move the children from the active list to the idle list
369     for my $proc (@{$self->{active_list}}) {
370         if(grep { $_ == $proc->{pid} } @pids) {
371             push(@{$self->{idle_list}}, $proc);
372         } else {
373             push(@new_actives, $proc);
374         }
375     }
376
377     $self->{active_list} = [@new_actives];
378
379     $chatty and $logger->internal(sprintf(
380         "server: %d idle and %d active children after status update",
381             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
382
383     # some children just went from active to idle. let's see 
384     # if any of them need to be killed from a previous sighup.
385
386     for my $child (@{$self->{sighup_pending}}) {
387         if (grep {$_ == $child->{pid}} @pids) {
388
389             $chatty and $logger->internal(
390                 "server: killing previously-active ".
391                 "child after receiving SIGHUP: $child");
392
393             # remove the pending child
394             $self->{sighup_pending} = [
395                 grep {$_->{pid} != $child->{pid}} 
396                     @{$self->{sighup_pending}}
397             ];
398
399             # kill the pending child
400             $self->kill_child($child)
401         }
402     }
403 }
404
405 # ----------------------------------------------------------------
406 # Cleans up any child processes that have exited.
407 # In shutdown mode, block until all children have washed ashore
408 # ----------------------------------------------------------------
409 sub reap_children {
410     my($self, $shutdown) = @_;
411     $self->{child_died} = 1;
412
413     while(1) {
414
415         my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
416         last if $pid <= 0;
417
418         $chatty and $logger->internal("server: reaping child $pid");
419
420         my $child = $self->{pid_map}->{$pid};
421
422         close($child->{pipe_to_parent});
423         close($child->{pipe_to_child});
424
425         $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
426         $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
427
428         $self->{num_children}--;
429         delete $self->{pid_map}->{$pid};
430         delete $child->{$_} for keys %$child; # destroy with a vengeance
431     }
432
433     $self->spawn_children unless $shutdown;
434
435     $chatty and $logger->internal(sprintf(
436         "server: %d idle and %d active children after reap_children",
437             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
438 }
439
440 # ----------------------------------------------------------------
441 # Spawn up to max_children processes
442 # ----------------------------------------------------------------
443 sub spawn_children {
444     my $self = shift;
445     $self->spawn_child while $self->{num_children} < $self->{min_children};
446 }
447
448 # ----------------------------------------------------------------
449 # Spawns a new child.  If $active is set, the child goes directly
450 # into the active_list.
451 # ----------------------------------------------------------------
452 sub spawn_child {
453     my($self, $active) = @_;
454
455     my $child = OpenSRF::Server::Child->new($self);
456
457     # socket for sending message data to the child
458     if(!socketpair(
459         $child->{pipe_to_child},
460         $child->{pipe_to_parent},
461         AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
462             $logger->error("server: error creating data socketpair: $!");
463             return undef;
464     }
465
466     $child->{pipe_to_child}->autoflush(1);
467     $child->{pipe_to_parent}->autoflush(1);
468
469     $child->{pid} = fork();
470
471     if($child->{pid}) { # parent process
472         $self->{num_children}++;
473         $self->{pid_map}->{$child->{pid}} = $child;
474
475         if($active) {
476             push(@{$self->{active_list}}, $child);
477         } else {
478             push(@{$self->{idle_list}}, $child);
479         }
480
481         $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
482
483         return $child;
484
485     } else { # child process
486
487         # recover default handling for any signal whose handler 
488         # may have been adopted from the parent process.
489         $SIG{$_} = 'DEFAULT' for qw/TERM INT QUIT HUP CHLD USR1 USR2/;
490
491         if($self->{stderr_log}) {
492
493             $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
494
495             close STDERR;
496             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
497                 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
498                 open STDERR, '>/dev/null'; # send it back to /dev/null
499             }
500         }
501
502         $child->{pid} = $$;
503         eval {
504             $child->init;
505             $child->run;
506             OpenSRF::Transport::PeerHandle->retrieve->disconnect;
507         };
508         $logger->error("server: child process died: $@") if $@;
509         exit(0);
510     }
511 }
512
513 # ----------------------------------------------------------------
514 # Sends the register command to the configured routers
515 # ----------------------------------------------------------------
516 sub register_routers {
517     my $self = shift;
518
519     my $conf = OpenSRF::Utils::Config->current;
520     my $routers = $conf->bootstrap->routers;
521     my $router_name = $conf->bootstrap->router_name;
522     my @targets;
523
524     for my $router (@$routers) {
525         if(ref $router) {
526
527             if( !$router->{services} ||
528                 !$router->{services}->{service} ||
529                 (
530                     ref($router->{services}->{service}) eq 'ARRAY' and
531                     grep { $_ eq $self->{service} } @{$router->{services}->{service}}
532                 )  || $router->{services}->{service} eq $self->{service}) {
533
534                 my $name = $router->{name};
535                 my $domain = $router->{domain};
536                 push(@targets, "$name\@$domain/router");
537             }
538
539         } else {
540             push(@targets, "$router_name\@$router/router");
541         }
542     }
543
544     foreach (@targets) {
545         $logger->info("server: registering with router $_");
546         $self->{osrf_handle}->send(
547             to => $_,
548             body => 'registering',
549             router_command => 'register',
550             router_class => $self->{service}
551         );
552     }
553
554     $self->{routers} = \@targets;
555 }
556
557 # ----------------------------------------------------------------
558 # Sends the unregister command to any routers we have registered
559 # with.
560 # ----------------------------------------------------------------
561 sub unregister_routers {
562     my $self = shift;
563     return unless $self->{osrf_handle}->tcp_connected;
564
565     for my $router (@{$self->{routers}}) {
566         $logger->info("server: disconnecting from router $router");
567         $self->{osrf_handle}->send(
568             to => $router,
569             body => "unregistering",
570             router_command => "unregister",
571             router_class => $self->{service}
572         );
573     }
574 }
575
576
577 package OpenSRF::Server::Child;
578 use strict;
579 use warnings;
580 use OpenSRF::Transport;
581 use OpenSRF::Application;
582 use OpenSRF::Transport::PeerHandle;
583 use OpenSRF::Transport::SlimJabber::XMPPMessage;
584 use OpenSRF::Utils::Logger qw($logger);
585 use OpenSRF::DomainObject::oilsResponse qw/:status/;
586 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
587 use Time::HiRes qw(time usleep);
588 use POSIX qw/:sys_wait_h :errno_h/;
589
590 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
591
592 sub new {
593     my($class, $parent) = @_;
594     my $self = bless({}, $class);
595     $self->{pid} = 0; # my process ID
596     $self->{parent} = $parent; # Controller parent process
597     $self->{num_requests} = 0; # total serviced requests
598     $self->{sig_pipe} = 0;  # true if last syswrite failed
599     return $self;
600 }
601
602 sub set_nonblock {
603     my($self, $fh) = @_;
604     my  $flags = fcntl($fh, F_GETFL, 0);
605     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
606 }
607
608 sub set_block {
609     my($self, $fh) = @_;
610     my  $flags = fcntl($fh, F_GETFL, 0);
611     $flags &= ~O_NONBLOCK;
612     fcntl($fh, F_SETFL, $flags);
613 }
614
615 # ----------------------------------------------------------------
616 # Connects to Jabber and runs the application child_init
617 # ----------------------------------------------------------------
618 sub init {
619     my $self = shift;
620     my $service = $self->{parent}->{service};
621     $0 = "OpenSRF Drone [$service]";
622     OpenSRF::Transport::PeerHandle->construct($service);
623     OpenSRF::Application->application_implementation->child_init
624         if (OpenSRF::Application->application_implementation->can('child_init'));
625 }
626
627 # ----------------------------------------------------------------
628 # Waits for messages from the parent process, handles the message,
629 # then goes into the keepalive loop if this is a stateful session.
630 # When max_requests is hit, the process exits.
631 # ----------------------------------------------------------------
632 sub run {
633     my $self = shift;
634     my $network = OpenSRF::Transport::PeerHandle->retrieve;
635
636     # main child run loop.  Ends when this child hits max requests.
637     while(1) {
638
639         my $data = $self->wait_for_request or next;
640
641         # Update process name to show activity
642         my $orig_name = $0;
643         $0 = "$0*";
644
645         # Discard extraneous data from the jabber socket
646         if(!$network->flush_socket()) {
647             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
648             exit;
649         }
650
651         my $session = OpenSRF::Transport->handler(
652             $self->{parent}->{service},
653             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
654         );
655
656         $self->keepalive_loop($session);
657
658         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
659
660         # Tell the parent process we are available to process requests
661         $self->send_status;
662
663         # Repair process name
664         $0 = $orig_name;
665     }
666
667     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
668
669     OpenSRF::Application->application_implementation->child_exit
670         if (OpenSRF::Application->application_implementation->can('child_exit'));
671 }
672
673 # ----------------------------------------------------------------
674 # waits for a request data on the parent pipe and returns it.
675 # ----------------------------------------------------------------
676 sub wait_for_request {
677     my $self = shift;
678
679     my $data = ''; # final request data
680     my $buf_size = 4096; # default linux pipe_buf (atomic window, not total size)
681     my $read_pipe = $self->{pipe_to_parent};
682     my $bytes_needed; # size of the data we are about to receive
683     my $bytes_recvd; # number of bytes read so far
684     my $first_read = 1; # true for first loop iteration
685     my $read_error;
686
687     while (1) {
688
689         # wait for some data to start arriving
690         my $read_set = IO::Select->new;
691         $read_set->add($read_pipe);
692     
693         while (1) {
694             # if can_read is interrupted while blocking, 
695             # go back and wait again until it succeeds.
696             last if $read_set->can_read;
697         }
698
699         # parent started writing, let's start reading
700         $self->set_nonblock($read_pipe);
701
702         while (1) {
703             # read all of the available data
704
705             my $buf = '';
706             my $nbytes = sysread($self->{pipe_to_parent}, $buf, $buf_size);
707
708             unless(defined $nbytes) {
709                 if ($! != EAGAIN) {
710                     $logger->error("server: error reading data from parent: $!.  ".
711                         "bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd; data=$data");
712                     $read_error = 1;
713                 }
714                 last;
715             }
716
717             last if $nbytes <= 0; # no more data available for reading
718
719             $bytes_recvd += $nbytes;
720             $data .= $buf;
721         }
722
723         $self->set_block($self->{pipe_to_parent});
724         return undef if $read_error;
725
726         # extract the data size and remove the header from the final data
727         if ($first_read) {
728             my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
729             $bytes_needed = int(substr($data, 0, $wps_size)) + $wps_size;
730             $data = substr($data, $wps_size);
731             $first_read = 0;
732         }
733
734
735         if ($bytes_recvd == $bytes_needed) {
736             # we've read all the data. Nothing left to do
737             last;
738         }
739
740         $logger->info("server: child process read all available pipe data.  ".
741             "waiting for more data from parent.  bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd");
742     }
743
744     return $data;
745 }
746
747
748 # ----------------------------------------------------------------
749 # If this is a stateful opensrf session, wait up to $keepalive
750 # seconds for subsequent requests from the client
751 # ----------------------------------------------------------------
752 sub keepalive_loop {
753     my($self, $session) = @_;
754     my $keepalive = $self->{parent}->{keepalive};
755
756     while($session->state and $session->state == $session->CONNECTED) {
757
758         unless( $session->queue_wait($keepalive) ) {
759
760             # client failed to disconnect before timeout
761             $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
762
763             my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
764                 status => "Disconnected on timeout",
765                 statusCode => STATUS_TIMEOUT
766             );
767
768             $session->status($res);
769             $session->state($session->DISCONNECTED);
770             last;
771         }
772     }
773
774     $chatty and $logger->internal("server: child done with request(s)");
775     $session->kill_me;
776 }
777
778 # ----------------------------------------------------------------
779 # Report our availability to our parent process
780 # ----------------------------------------------------------------
781 sub send_status {
782     my $self = shift;
783
784     for (0..2) {
785
786         $self->{sig_pipe} = 0;
787         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
788
789         syswrite(
790             $self->{pipe_to_parent},
791             sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
792         );
793
794         last unless $self->{sig_pipe};
795         $logger->error("server: $self got SIGPIPE writing status to parent, retrying...");
796         usleep(50000); # 50 msec
797     }
798
799     $logger->error("server: $self unable to send status to parent") if $self->{sig_pipe};
800 }
801
802
803 1;