LP#1729610: return new OpenSRF status if backlog queue fills up
[OpenSRF.git] / src / perl / lib / OpenSRF / Server.pm
1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
16 use strict;
17 use warnings;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::DomainObject::oilsResponse qw/:status/;
25 use OpenSRF::Transport::SlimJabber::Client;
26 use Encode;
27 use POSIX qw/:sys_wait_h :errno_h/;
28 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
29 use Time::HiRes qw/usleep/;
30 use IO::Select;
31 use Socket;
32 our $chatty = 1; # disable for production
33
34 use constant STATUS_PIPE_DATA_SIZE => 12;
35 use constant WRITE_PIPE_DATA_SIZE  => 12;
36
37 sub new {
38     my($class, $service, %args) = @_;
39     my $self = bless(\%args, $class);
40
41     $self->{service}        = $service; # service name
42     $self->{num_children}   = 0; # number of child processes
43     $self->{osrf_handle}    = undef; # xmpp handle
44     $self->{routers}        = []; # list of registered routers
45     $self->{active_list}    = []; # list of active children
46     $self->{idle_list}      = []; # list of idle children
47     $self->{sighup_pending} = [];
48     $self->{pid_map}        = {}; # map of child pid to child for cleaner access
49     $self->{sig_pipe}       = 0;  # true if last syswrite failed
50
51     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
52         if $self->{stderr_log_path};
53
54     $self->{min_spare_children} ||= 0;
55     $self->{max_backlog_queue} ||= 1000;
56
57     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
58         $self->{max_spare_children} and
59         $self->{max_spare_children} <= $self->{min_spare_children};
60
61     return $self;
62 }
63
64 # ----------------------------------------------------------------
65 # Disconnects from routers and waits for child processes to exit.
66 # ----------------------------------------------------------------
67 sub cleanup {
68     my $self = shift;
69     my $no_exit = shift;
70     my $graceful = shift;
71
72     $logger->info("server: shutting down and cleaning up...");
73
74     # de-register routers
75     $self->unregister_routers;
76
77     if ($graceful) {
78         # graceful shutdown waits for all active 
79         # children to complete their in-process tasks.
80
81         while (@{$self->{active_list}}) {
82             $logger->info("server: graceful shutdown with ".
83                 @{$self->{active_list}}." active children...");
84
85             # block until a child is becomes available
86             $self->check_status(1);
87         }
88         $logger->info("server: all clear for graceful shutdown");
89     }
90
91     # don't get sidetracked by signals while we're cleaning up.
92     # it could result in unexpected behavior with list traversal
93     $SIG{CHLD} = 'IGNORE';
94
95     # terminate the child processes
96     $self->kill_child($_) for
97         (@{$self->{idle_list}}, @{$self->{active_list}});
98
99     $self->{osrf_handle}->disconnect;
100
101     # clean up our dead children
102     $self->reap_children(1);
103
104     exit(0) unless $no_exit;
105 }
106
107 # ----------------------------------------------------------------
108 # SIGHUP handler.  Kill all idle children.  Copy list of active
109 # children into sighup_pending list for later cleanup.
110 # ----------------------------------------------------------------
111 sub handle_sighup {
112     my $self = shift;
113     $logger->info("server: caught SIGHUP; reloading children");
114
115     # reload the opensrf config
116     # note: calling ::Config->load() results in ever-growing
117     # package names, which eventually causes an exception
118     OpenSRF::Utils::Config->current->_load(
119         force => 1,
120         config_file => OpenSRF::Utils::Config->current->FILE
121     );
122
123     # force-reload the logger config
124     OpenSRF::Utils::Logger::set_config(1);
125
126     # copy active list into pending list for later cleanup
127     $self->{sighup_pending} = [ @{$self->{active_list}} ];
128
129     # idle_list will be modified as children are reaped.
130     my @idle = @{$self->{idle_list}};
131
132     # idle children are the reaper's plaything
133     $self->kill_child($_) for @idle;
134 }
135
136 # ----------------------------------------------------------------
137 # Waits on the jabber socket for inbound data from the router.
138 # Each new message is passed off to a child process for handling.
139 # At regular intervals, wake up for min/max spare child maintenance
140 # ----------------------------------------------------------------
141 sub run {
142     my $self = shift;
143
144     $logger->set_service($self->{service});
145
146     $SIG{$_} = sub { $self->cleanup; } for (qw/INT QUIT/);
147     $SIG{TERM} = sub { $self->cleanup(0, 1); };
148     $SIG{CHLD} = sub { $self->reap_children(); };
149     $SIG{HUP} = sub { $self->handle_sighup(); };
150     $SIG{USR1} = sub { $self->unregister_routers; };
151     $SIG{USR2} = sub { $self->register_routers; };
152
153     $self->spawn_children;
154     $self->build_osrf_handle;
155     $self->register_routers;
156     my $wait_time = 1;
157
158     my @max_children_msg_queue;
159
160     # main server loop
161     while(1) {
162         my $from_network = 0;
163
164         $self->check_status;
165         $self->{child_died} = 0;
166
167         my $msg = shift(@max_children_msg_queue);
168
169         # no pending message, so wait for the next one forever
170         $from_network = $wait_time = -1 if (!$msg);
171         $msg ||= $self->{osrf_handle}->process($wait_time);
172
173         !$from_network and $chatty and $logger->debug("server: attempting to process previously queued message");
174         $from_network and $chatty and $logger->internal("server: no queued messages, processing due to network or signal");
175
176         # we woke up for any reason, reset the wait time to allow
177         # for idle maintenance as necessary
178         $wait_time = 1;
179
180         if($msg) {
181
182             if ($msg->type and $msg->type eq 'error') {
183                 $logger->info("server: Listener received an XMPP error ".
184                     "message.  Likely a bounced message. sender=".$msg->from);
185
186             } elsif(my $child = pop(@{$self->{idle_list}})) {
187
188                 # we have an idle child to handle the request
189                 $chatty and $logger->internal("server: passing request to idle child $child");
190                 push(@{$self->{active_list}}, $child);
191                 $self->write_child($child, $msg);
192
193             } elsif($self->{num_children} < $self->{max_children}) {
194
195                 # spawning a child to handle the request
196                 $chatty and $logger->internal("server: spawning child to handle request");
197                 $self->write_child($self->spawn_child(1), $msg);
198
199             } else {
200                 $logger->warn("server: no children available, waiting... consider increasing " .
201                     "max_children for this application higher than $self->{max_children} ".
202                     "in the OpenSRF configuration if this message occurs frequently");
203
204                 if ($from_network) {
205                     $chatty and $logger->debug("server: queuing new message");
206                     push @max_children_msg_queue, $msg;
207                 } else {
208                     $chatty and $logger->debug("server: re-queuing old message");
209                     unshift @max_children_msg_queue, $msg;
210                 }
211
212                 $logger->warn("server: backlog queue size is now ". scalar(@max_children_msg_queue));
213
214                 if (@max_children_msg_queue < $self->{max_backlog_queue}) {
215                     # We still have room on the queue. Set the wait time to
216                     # 1s, waiting for a drone to be freed up and reprocess
217                     # this (and any other) queued messages.
218                     $wait_time = 1;
219                     if (!$from_network) {
220                         # if we got here, we had retrieved a message from the queue
221                         # but couldn't process it... but also hadn't fetched any
222                         # additional messages from the network. Doing so now,
223                         # as otherwise only one message will ever get queued
224                         $msg = $self->{osrf_handle}->process($wait_time);
225                         if ($msg) {
226                             $chatty and $logger->debug("server: queuing new message after a re-queue");
227                             push @max_children_msg_queue, $msg;
228                         }
229                     }
230                 } else {
231
232                     if (!$from_network) {
233                         # The queue is full, and we just requeued a message. We'll
234                         # now see if there is a request available from the network;
235                         # if so, we'll see if a child is available again or else
236                         # drop it
237                         $msg = $self->{osrf_handle}->process($wait_time);
238                         if ($msg) {
239                             $self->check_status();
240                             if (@{$self->{idle_list}}) {
241                                 # child now available, so we'll go ahead and queue it
242                                 $chatty and $logger->debug("server: queuing new message after a re-queue with a full queue");
243                                 push @max_children_msg_queue, $msg;
244                             } else {
245                                 # ok, need to drop this one
246                                 my $resp = OpenSRF::DomainObject::oilsMessage->new();
247                                 $resp->type('STATUS');
248                                 $resp->payload(
249                                     OpenSRF::DomainObject::oilsMethodException->new(
250                                         status => "Service unavailable: no available children and backlog queue at limit",
251                                         statusCode => STATUS_SERVICEUNAVAILABLE
252                                     )
253                                 );
254                                 $resp->threadTrace(1);
255
256                                 $logger->set_osrf_xid($msg->osrf_xid);
257                                 $self->{osrf_handle}->send(
258                                     to => $msg->from,
259                                     osrf_xid => $msg->osrf_xid, # Note that this is ignored, which
260                                                                 # is why we called $logger->set_osrf_xid above.
261                                                                 # We probably don't want that to be necessary
262                                                                 # if osrf_xid is explicitly set here, but that'll
263                                                                 # be a FIXME for later
264                                     thread => $msg->thread,
265                                     body => OpenSRF::Utils::JSON->perl2JSON([ $resp ])
266                                 );
267                                 $logger->warn("Backlog queue full for $self->{service}; forced to drop message " .
268                                               $msg->thread . " from " . $msg->from);
269                             }
270                         }
271                     }
272                 }
273             }
274
275         } else {
276
277             # don't perform idle maint immediately when woken by SIGCHLD
278             unless($self->{child_died}) {
279
280                 # when we hit equilibrium, there's no need for regular
281                 # maintenance, so set wait_time to 'forever'
282                 $wait_time = -1 if 
283                     !$self->perform_idle_maintenance and # no maintenance performed this time
284                     @{$self->{active_list}} == 0; # no active children 
285             }
286         }
287     }
288 }
289
290 # ----------------------------------------------------------------
291 # Launch a new spare child or kill an extra spare child.  To
292 # prevent large-scale spawning or die-offs, spawn or kill only
293 # 1 process per idle maintenance loop.
294 # Returns true if any idle maintenance occurred, 0 otherwise
295 # ----------------------------------------------------------------
296 sub perform_idle_maintenance {
297     my $self = shift;
298
299     $chatty and $logger->internal(
300         sprintf(
301             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
302             scalar(@{$self->{idle_list}}), 
303             scalar(@{$self->{active_list}}),
304             $self->{min_spare_children},
305             $self->{max_spare_children}
306         )
307     );
308
309     # spawn 1 spare child per maintenance loop if necessary
310     if( $self->{min_spare_children} and
311         $self->{num_children} < $self->{max_children} and
312         scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
313
314         $chatty and $logger->internal("server: spawning spare child");
315         $self->spawn_child;
316         return 1;
317
318     # kill 1 excess spare child per maintenance loop if necessary
319     } elsif($self->{max_spare_children} and
320             $self->{num_children} > $self->{min_children} and
321             scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
322
323         $chatty and $logger->internal("server: killing spare child");
324         $self->kill_child;
325         return 1;
326     }
327
328     return 0;
329 }
330
331 sub kill_child {
332     my $self = shift;
333     my $child = shift || pop(@{$self->{idle_list}}) or return;
334     $chatty and $logger->internal("server: killing child $child");
335     kill('TERM', $child->{pid});
336 }
337
338 # ----------------------------------------------------------------
339 # Jabber connection inbound message arrive on.
340 # ----------------------------------------------------------------
341 sub build_osrf_handle {
342     my $self = shift;
343
344     my $conf = OpenSRF::Utils::Config->current;
345     my $username = $conf->bootstrap->username;
346     my $password = $conf->bootstrap->passwd;
347     my $domain = $conf->bootstrap->domain;
348     my $port = $conf->bootstrap->port;
349     my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
350
351     $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
352
353     $self->{osrf_handle} =
354         OpenSRF::Transport::SlimJabber::Client->new(
355             username => $username,
356             resource => $resource,
357             password => $password,
358             host => $domain,
359             port => $port,
360         );
361
362     $self->{osrf_handle}->initialize;
363 }
364
365
366 # ----------------------------------------------------------------
367 # Sends request data to a child process
368 # ----------------------------------------------------------------
369 sub write_child {
370     my($self, $child, $msg) = @_;
371     my $xml = encode_utf8(decode_utf8($msg->to_xml));
372
373     # tell the child how much data to expect, minus the header
374     my $write_size;
375     {use bytes; $write_size = length($xml)}
376     $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
377
378     for (0..2) {
379
380         $self->{sig_pipe} = 0;
381         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
382
383         # In rare cases a child can die between creation and first
384         # write, typically a result of a jabber connect error.  Before
385         # sending data to each child, confirm it's still alive.  If it's
386         # not, log the error and drop the message to prevent the parent
387         # process from dying.
388         # When a child dies, all of its attributes are deleted,
389         # so the lack of a pid means the child is dead.
390         if (!$child->{pid}) {
391             $logger->error("server: child is dead in write_child(). ".
392                 "unable to send message: $xml");
393             return; # avoid syswrite crash
394         }
395
396         # send message to child data pipe
397         syswrite($child->{pipe_to_child}, $write_size . $xml);
398
399         last unless $self->{sig_pipe};
400         $logger->error("server: got SIGPIPE writing to $child, retrying...");
401         usleep(50000); # 50 msec
402     }
403
404     $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
405 }
406
407 # ----------------------------------------------------------------
408 # Checks to see if any child process has reported its availability
409 # In blocking mode, blocks until a child has reported.
410 # ----------------------------------------------------------------
411 sub check_status {
412     my($self, $block) = @_;
413
414     return unless @{$self->{active_list}};
415
416     my @pids;
417
418     while (1) {
419
420         # if can_read or sysread is interrupted while bloking, go back and 
421         # wait again until we have at least 1 free child
422
423         # refresh the read_set handles in case we lost a child in the previous iteration
424         my $read_set = IO::Select->new;
425         $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
426
427         if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
428             my $pid = '';
429             for my $pipe (@handles) {
430                 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
431                 push(@pids, int($pid));
432             }
433         }
434
435         last unless $block and !@pids;
436     }
437
438     return unless @pids;
439
440     $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
441
442     my $child;
443     my @new_actives;
444
445     # move the children from the active list to the idle list
446     for my $proc (@{$self->{active_list}}) {
447         if(grep { $_ == $proc->{pid} } @pids) {
448             push(@{$self->{idle_list}}, $proc);
449         } else {
450             push(@new_actives, $proc);
451         }
452     }
453
454     $self->{active_list} = [@new_actives];
455
456     $chatty and $logger->internal(sprintf(
457         "server: %d idle and %d active children after status update",
458             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
459
460     # some children just went from active to idle. let's see 
461     # if any of them need to be killed from a previous sighup.
462
463     for my $child (@{$self->{sighup_pending}}) {
464         if (grep {$_ == $child->{pid}} @pids) {
465
466             $chatty and $logger->internal(
467                 "server: killing previously-active ".
468                 "child after receiving SIGHUP: $child");
469
470             # remove the pending child
471             $self->{sighup_pending} = [
472                 grep {$_->{pid} != $child->{pid}} 
473                     @{$self->{sighup_pending}}
474             ];
475
476             # kill the pending child
477             $self->kill_child($child)
478         }
479     }
480 }
481
482 # ----------------------------------------------------------------
483 # Cleans up any child processes that have exited.
484 # In shutdown mode, block until all children have washed ashore
485 # ----------------------------------------------------------------
486 sub reap_children {
487     my($self, $shutdown) = @_;
488     $self->{child_died} = 1;
489
490     while(1) {
491
492         my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
493         last if $pid <= 0;
494
495         $chatty and $logger->internal("server: reaping child $pid");
496
497         my $child = $self->{pid_map}->{$pid};
498
499         close($child->{pipe_to_parent});
500         close($child->{pipe_to_child});
501
502         $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
503         $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
504
505         $self->{num_children}--;
506         delete $self->{pid_map}->{$pid};
507         delete $child->{$_} for keys %$child; # destroy with a vengeance
508     }
509
510     $self->spawn_children unless $shutdown;
511
512     $chatty and $logger->internal(sprintf(
513         "server: %d idle and %d active children after reap_children",
514             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
515 }
516
517 # ----------------------------------------------------------------
518 # Spawn up to max_children processes
519 # ----------------------------------------------------------------
520 sub spawn_children {
521     my $self = shift;
522     $self->spawn_child while $self->{num_children} < $self->{min_children};
523 }
524
525 # ----------------------------------------------------------------
526 # Spawns a new child.  If $active is set, the child goes directly
527 # into the active_list.
528 # ----------------------------------------------------------------
529 sub spawn_child {
530     my($self, $active) = @_;
531
532     my $child = OpenSRF::Server::Child->new($self);
533
534     # socket for sending message data to the child
535     if(!socketpair(
536         $child->{pipe_to_child},
537         $child->{pipe_to_parent},
538         AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
539             $logger->error("server: error creating data socketpair: $!");
540             return undef;
541     }
542
543     $child->{pipe_to_child}->autoflush(1);
544     $child->{pipe_to_parent}->autoflush(1);
545
546     $child->{pid} = fork();
547
548     if($child->{pid}) { # parent process
549         $self->{num_children}++;
550         $self->{pid_map}->{$child->{pid}} = $child;
551
552         if($active) {
553             push(@{$self->{active_list}}, $child);
554         } else {
555             push(@{$self->{idle_list}}, $child);
556         }
557
558         $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
559
560         return $child;
561
562     } else { # child process
563
564         # recover default handling for any signal whose handler 
565         # may have been adopted from the parent process.
566         $SIG{$_} = 'DEFAULT' for qw/TERM INT QUIT HUP CHLD USR1 USR2/;
567
568         if($self->{stderr_log}) {
569
570             $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
571
572             close STDERR;
573             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
574                 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
575                 open STDERR, '>/dev/null'; # send it back to /dev/null
576             }
577         }
578
579         $child->{pid} = $$;
580         eval {
581             $child->init;
582             $child->run;
583             OpenSRF::Transport::PeerHandle->retrieve->disconnect;
584         };
585         $logger->error("server: child process died: $@") if $@;
586         exit(0);
587     }
588 }
589
590 # ----------------------------------------------------------------
591 # Sends the register command to the configured routers
592 # ----------------------------------------------------------------
593 sub register_routers {
594     my $self = shift;
595
596     my $conf = OpenSRF::Utils::Config->current;
597     my $routers = $conf->bootstrap->routers;
598     my $router_name = $conf->bootstrap->router_name;
599     my @targets;
600
601     for my $router (@$routers) {
602         if(ref $router) {
603
604             if( !$router->{services} ||
605                 !$router->{services}->{service} ||
606                 (
607                     ref($router->{services}->{service}) eq 'ARRAY' and
608                     grep { $_ eq $self->{service} } @{$router->{services}->{service}}
609                 )  || $router->{services}->{service} eq $self->{service}) {
610
611                 my $name = $router->{name};
612                 my $domain = $router->{domain};
613                 push(@targets, "$name\@$domain/router");
614             }
615
616         } else {
617             push(@targets, "$router_name\@$router/router");
618         }
619     }
620
621     foreach (@targets) {
622         $logger->info("server: registering with router $_");
623         $self->{osrf_handle}->send(
624             to => $_,
625             body => 'registering',
626             router_command => 'register',
627             router_class => $self->{service}
628         );
629     }
630
631     $self->{routers} = \@targets;
632 }
633
634 # ----------------------------------------------------------------
635 # Sends the unregister command to any routers we have registered
636 # with.
637 # ----------------------------------------------------------------
638 sub unregister_routers {
639     my $self = shift;
640     return unless $self->{osrf_handle}->tcp_connected;
641
642     for my $router (@{$self->{routers}}) {
643         $logger->info("server: disconnecting from router $router");
644         $self->{osrf_handle}->send(
645             to => $router,
646             body => "unregistering",
647             router_command => "unregister",
648             router_class => $self->{service}
649         );
650     }
651 }
652
653
654 package OpenSRF::Server::Child;
655 use strict;
656 use warnings;
657 use OpenSRF::Transport;
658 use OpenSRF::Application;
659 use OpenSRF::Transport::PeerHandle;
660 use OpenSRF::Transport::SlimJabber::XMPPMessage;
661 use OpenSRF::Utils::Logger qw($logger);
662 use OpenSRF::DomainObject::oilsResponse qw/:status/;
663 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
664 use Time::HiRes qw(time usleep);
665 use POSIX qw/:sys_wait_h :errno_h/;
666
667 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
668
669 sub new {
670     my($class, $parent) = @_;
671     my $self = bless({}, $class);
672     $self->{pid} = 0; # my process ID
673     $self->{parent} = $parent; # Controller parent process
674     $self->{num_requests} = 0; # total serviced requests
675     $self->{sig_pipe} = 0;  # true if last syswrite failed
676     return $self;
677 }
678
679 sub set_nonblock {
680     my($self, $fh) = @_;
681     my  $flags = fcntl($fh, F_GETFL, 0);
682     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
683 }
684
685 sub set_block {
686     my($self, $fh) = @_;
687     my  $flags = fcntl($fh, F_GETFL, 0);
688     $flags &= ~O_NONBLOCK;
689     fcntl($fh, F_SETFL, $flags);
690 }
691
692 # ----------------------------------------------------------------
693 # Connects to Jabber and runs the application child_init
694 # ----------------------------------------------------------------
695 sub init {
696     my $self = shift;
697     my $service = $self->{parent}->{service};
698     $0 = "OpenSRF Drone [$service]";
699     OpenSRF::Transport::PeerHandle->construct($service);
700     OpenSRF::Application->application_implementation->child_init
701         if (OpenSRF::Application->application_implementation->can('child_init'));
702 }
703
704 # ----------------------------------------------------------------
705 # Waits for messages from the parent process, handles the message,
706 # then goes into the keepalive loop if this is a stateful session.
707 # When max_requests is hit, the process exits.
708 # ----------------------------------------------------------------
709 sub run {
710     my $self = shift;
711     my $network = OpenSRF::Transport::PeerHandle->retrieve;
712
713     # main child run loop.  Ends when this child hits max requests.
714     while(1) {
715
716         my $data = $self->wait_for_request or next;
717
718         # Update process name to show activity
719         my $orig_name = $0;
720         $0 = "$0*";
721
722         # Discard extraneous data from the jabber socket
723         if(!$network->flush_socket()) {
724             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
725             exit;
726         }
727
728         my $session = OpenSRF::Transport->handler(
729             $self->{parent}->{service},
730             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
731         );
732
733         my $recycle = $self->keepalive_loop($session);
734
735         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
736
737         if ($recycle) {
738             $chatty && $logger->internal(
739                 "server: child exiting early on force_recycle");
740             last;
741         }
742
743         # Tell the parent process we are available to process requests
744         $self->send_status;
745
746         # Repair process name
747         $0 = $orig_name;
748     }
749
750     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
751
752     OpenSRF::Application->application_implementation->child_exit
753         if (OpenSRF::Application->application_implementation->can('child_exit'));
754 }
755
756 # ----------------------------------------------------------------
757 # waits for a request data on the parent pipe and returns it.
758 # ----------------------------------------------------------------
759 sub wait_for_request {
760     my $self = shift;
761
762     my $data = ''; # final request data
763     my $buf_size = 4096; # default linux pipe_buf (atomic window, not total size)
764     my $read_pipe = $self->{pipe_to_parent};
765     my $bytes_needed; # size of the data we are about to receive
766     my $bytes_recvd; # number of bytes read so far
767     my $first_read = 1; # true for first loop iteration
768     my $read_error;
769
770     while (1) {
771
772         # wait for some data to start arriving
773         my $read_set = IO::Select->new;
774         $read_set->add($read_pipe);
775     
776         while (1) {
777             # if can_read is interrupted while blocking, 
778             # go back and wait again until it succeeds.
779             last if $read_set->can_read;
780         }
781
782         # parent started writing, let's start reading
783         $self->set_nonblock($read_pipe);
784
785         while (1) {
786             # read all of the available data
787
788             my $buf = '';
789             my $nbytes = sysread($self->{pipe_to_parent}, $buf, $buf_size);
790
791             unless(defined $nbytes) {
792                 if ($! != EAGAIN) {
793                     $logger->error("server: error reading data from parent: $!.  ".
794                         "bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd; data=$data");
795                     $read_error = 1;
796                 }
797                 last;
798             }
799
800             last if $nbytes <= 0; # no more data available for reading
801
802             $bytes_recvd += $nbytes;
803             $data .= $buf;
804         }
805
806         $self->set_block($self->{pipe_to_parent});
807         return undef if $read_error;
808
809         # extract the data size and remove the header from the final data
810         if ($first_read) {
811             my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
812             $bytes_needed = int(substr($data, 0, $wps_size)) + $wps_size;
813             $data = substr($data, $wps_size);
814             $first_read = 0;
815         }
816
817
818         if ($bytes_recvd == $bytes_needed) {
819             # we've read all the data. Nothing left to do
820             last;
821         }
822
823         $logger->info("server: child process read all available pipe data.  ".
824             "waiting for more data from parent.  bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd");
825     }
826
827     return $data;
828 }
829
830
831 # ----------------------------------------------------------------
832 # If this is a stateful opensrf session, wait up to $keepalive
833 # seconds for subsequent requests from the client
834 # ----------------------------------------------------------------
835 sub keepalive_loop {
836     my($self, $session) = @_;
837     my $keepalive = $self->{parent}->{keepalive};
838
839     while($session->state and $session->state == $session->CONNECTED) {
840
841         unless( $session->queue_wait($keepalive) ) {
842
843             # client failed to disconnect before timeout
844             $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
845
846             my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
847                 status => "Disconnected on timeout",
848                 statusCode => STATUS_TIMEOUT
849             );
850
851             $session->status($res);
852             $session->state($session->DISCONNECTED);
853             last;
854         }
855     }
856
857     $chatty and $logger->internal("server: child done with request(s)");
858
859     # Capture the recycle option value before it's clobbered.
860     # The option may be set at any point along the life of the 
861     # session.  Once set, it remains set unless 
862     # $session->force_recycle(0) is explicitly called.
863     my $recycle = $session->force_recycle;
864
865     $session->kill_me;
866     return $recycle;
867 }
868
869 # ----------------------------------------------------------------
870 # Report our availability to our parent process
871 # ----------------------------------------------------------------
872 sub send_status {
873     my $self = shift;
874
875     for (0..2) {
876
877         $self->{sig_pipe} = 0;
878         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
879
880         syswrite(
881             $self->{pipe_to_parent},
882             sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
883         );
884
885         last unless $self->{sig_pipe};
886         $logger->error("server: $self got SIGPIPE writing status to parent, retrying...");
887         usleep(50000); # 50 msec
888     }
889
890     $logger->error("server: $self unable to send status to parent") if $self->{sig_pipe};
891 }
892
893
894 1;