Refresh child file handles on status read
[OpenSRF.git] / src / perl / lib / OpenSRF / Server.pm
1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
16 use strict;
17 use warnings;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
25 use Encode;
26 use POSIX qw/:sys_wait_h :errno_h/;
27 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
28 use Time::HiRes qw/usleep/;
29 use IO::Select;
30 use Socket;
31 our $chatty = 1; # disable for production
32
33 use constant STATUS_PIPE_DATA_SIZE => 12;
34 use constant WRITE_PIPE_DATA_SIZE  => 12;
35
36 sub new {
37     my($class, $service, %args) = @_;
38     my $self = bless(\%args, $class);
39
40     $self->{service}        = $service; # service name
41     $self->{num_children}   = 0; # number of child processes
42     $self->{osrf_handle}    = undef; # xmpp handle
43     $self->{routers}        = []; # list of registered routers
44     $self->{active_list}    = []; # list of active children
45     $self->{idle_list}      = []; # list of idle children
46     $self->{pid_map}        = {}; # map of child pid to child for cleaner access
47     $self->{sig_pipe}       = 0;  # true if last syswrite failed
48
49     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
50         if $self->{stderr_log_path};
51
52     $self->{min_spare_children} ||= 0;
53
54     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
55         $self->{max_spare_children} and
56         $self->{max_spare_children} <= $self->{min_spare_children};
57
58     return $self;
59 }
60
61 # ----------------------------------------------------------------
62 # Disconnects from routers and waits for child processes to exit.
63 # ----------------------------------------------------------------
64 sub cleanup {
65     my $self = shift;
66     my $no_exit = shift;
67
68     $logger->info("server: shutting down and cleaning up...");
69
70     # don't get sidetracked by signals while we're cleaning up.
71     # it could result in unexpected behavior with list traversal
72     $SIG{CHLD} = 'IGNORE';
73
74     # terminate the child processes
75     $self->kill_child($_) for
76         (@{$self->{idle_list}}, @{$self->{active_list}});
77
78     # de-register routers
79     $self->unregister_routers;
80
81     $self->{osrf_handle}->disconnect;
82
83     # clean up our dead children
84     $self->reap_children(1);
85
86     exit(0) unless $no_exit;
87 }
88
89 # ----------------------------------------------------------------
90 # Waits on the jabber socket for inbound data from the router.
91 # Each new message is passed off to a child process for handling.
92 # At regular intervals, wake up for min/max spare child maintenance
93 # ----------------------------------------------------------------
94 sub run {
95     my $self = shift;
96
97         $logger->set_service($self->{service});
98
99     $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
100     $SIG{CHLD} = sub { $self->reap_children(); };
101
102     $self->spawn_children;
103     $self->build_osrf_handle;
104     $self->register_routers;
105     my $wait_time = 1;
106
107     # main server loop
108     while(1) {
109
110         $self->check_status;
111         $self->{child_died} = 0;
112
113         my $msg = $self->{osrf_handle}->process($wait_time);
114
115         # we woke up for any reason, reset the wait time to allow
116         # for idle maintenance as necessary
117         $wait_time = 1;
118
119         if($msg) {
120
121             if(my $child = pop(@{$self->{idle_list}})) {
122
123                 # we have an idle child to handle the request
124                 $chatty and $logger->internal("server: passing request to idle child $child");
125                 push(@{$self->{active_list}}, $child);
126                 $self->write_child($child, $msg);
127
128             } elsif($self->{num_children} < $self->{max_children}) {
129
130                 # spawning a child to handle the request
131                 $chatty and $logger->internal("server: spawning child to handle request");
132                 $self->write_child($self->spawn_child(1), $msg);
133
134             } else {
135                 $logger->warn("server: no children available, waiting... consider increasing " .
136                     "max_children for this application higher than $self->{max_children} ".
137                     "in the OpenSRF configuration if this message occurs frequently");
138                 $self->check_status(1); # block until child is available
139
140                 my $child = pop(@{$self->{idle_list}});
141                 push(@{$self->{active_list}}, $child);
142                 $self->write_child($child, $msg);
143             }
144
145         } else {
146
147             # don't perform idle maint immediately when woken by SIGCHLD
148             unless($self->{child_died}) {
149
150                 # when we hit equilibrium, there's no need for regular
151                 # maintenance, so set wait_time to 'forever'
152                 $wait_time = -1 if 
153                     !$self->perform_idle_maintenance and # no maintenance performed this time
154                     @{$self->{active_list}} == 0; # no active children 
155             }
156         }
157     }
158 }
159
160 # ----------------------------------------------------------------
161 # Launch a new spare child or kill an extra spare child.  To
162 # prevent large-scale spawning or die-offs, spawn or kill only
163 # 1 process per idle maintenance loop.
164 # Returns true if any idle maintenance occurred, 0 otherwise
165 # ----------------------------------------------------------------
166 sub perform_idle_maintenance {
167     my $self = shift;
168
169     $chatty and $logger->internal(
170         sprintf(
171             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
172             scalar(@{$self->{idle_list}}), 
173             scalar(@{$self->{active_list}}),
174             $self->{min_spare_children},
175             $self->{max_spare_children}
176         )
177     );
178
179     # spawn 1 spare child per maintenance loop if necessary
180     if( $self->{min_spare_children} and
181         $self->{num_children} < $self->{max_children} and
182         scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
183
184         $chatty and $logger->internal("server: spawning spare child");
185         $self->spawn_child;
186         return 1;
187
188     # kill 1 excess spare child per maintenance loop if necessary
189     } elsif($self->{max_spare_children} and
190             $self->{num_children} > $self->{min_children} and
191             scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
192
193         $chatty and $logger->internal("server: killing spare child");
194         $self->kill_child;
195         return 1;
196     }
197
198     return 0;
199 }
200
201 sub kill_child {
202     my $self = shift;
203     my $child = shift || pop(@{$self->{idle_list}}) or return;
204     $chatty and $logger->internal("server: killing child $child");
205     kill('TERM', $child->{pid});
206 }
207
208 # ----------------------------------------------------------------
209 # Jabber connection inbound message arrive on.
210 # ----------------------------------------------------------------
211 sub build_osrf_handle {
212     my $self = shift;
213
214     my $conf = OpenSRF::Utils::Config->current;
215     my $username = $conf->bootstrap->username;
216     my $password = $conf->bootstrap->passwd;
217     my $domain = $conf->bootstrap->domain;
218     my $port = $conf->bootstrap->port;
219     my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
220
221     $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
222
223     $self->{osrf_handle} =
224         OpenSRF::Transport::SlimJabber::Client->new(
225             username => $username,
226             resource => $resource,
227             password => $password,
228             host => $domain,
229             port => $port,
230         );
231
232     $self->{osrf_handle}->initialize;
233 }
234
235
236 # ----------------------------------------------------------------
237 # Sends request data to a child process
238 # ----------------------------------------------------------------
239 sub write_child {
240     my($self, $child, $msg) = @_;
241     my $xml = encode_utf8(decode_utf8($msg->to_xml));
242
243     # tell the child how much data to expect, minus the header
244     my $write_size;
245     {use bytes; $write_size = length($xml)}
246     $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
247
248     for (0..2) {
249
250         $self->{sig_pipe} = 0;
251         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
252
253         # send message to child data pipe
254         syswrite($child->{pipe_to_child}, $write_size . $xml);
255
256         last unless $self->{sig_pipe};
257         $logger->error("server: got SIGPIPE writing to $child, retrying...");
258         usleep(50000); # 50 msec
259     }
260
261     $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
262 }
263
264 # ----------------------------------------------------------------
265 # Checks to see if any child process has reported its availability
266 # In blocking mode, blocks until a child has reported.
267 # ----------------------------------------------------------------
268 sub check_status {
269     my($self, $block) = @_;
270
271     return unless @{$self->{active_list}};
272
273     my @pids;
274
275     while (1) {
276
277         # if can_read or sysread is interrupted while bloking, go back and 
278         # wait again until we have at least 1 free child
279
280         # refresh the read_set handles in case we lost a child in the previous iteration
281         my $read_set = IO::Select->new;
282         $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
283
284         if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
285             my $pid = '';
286             for my $pipe (@handles) {
287                 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
288                 push(@pids, int($pid));
289             }
290         }
291
292         last unless $block and !@pids;
293     }
294
295     return unless @pids;
296
297     $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
298
299     my $child;
300     my @new_actives;
301
302     # move the children from the active list to the idle list
303     for my $proc (@{$self->{active_list}}) {
304         if(grep { $_ == $proc->{pid} } @pids) {
305             push(@{$self->{idle_list}}, $proc);
306         } else {
307             push(@new_actives, $proc);
308         }
309     }
310
311     $self->{active_list} = [@new_actives];
312
313     $chatty and $logger->internal(sprintf(
314         "server: %d idle and %d active children after status update",
315             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
316 }
317
318 # ----------------------------------------------------------------
319 # Cleans up any child processes that have exited.
320 # In shutdown mode, block until all children have washed ashore
321 # ----------------------------------------------------------------
322 sub reap_children {
323     my($self, $shutdown) = @_;
324     $self->{child_died} = 1;
325
326     while(1) {
327
328         my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
329         last if $pid <= 0;
330
331         $chatty and $logger->internal("server: reaping child $pid");
332
333         my $child = $self->{pid_map}->{$pid};
334
335         close($child->{pipe_to_parent});
336         close($child->{pipe_to_child});
337
338         $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
339         $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
340
341         $self->{num_children}--;
342         delete $self->{pid_map}->{$pid};
343         delete $child->{$_} for keys %$child; # destroy with a vengeance
344     }
345
346     $self->spawn_children unless $shutdown;
347
348     $chatty and $logger->internal(sprintf(
349         "server: %d idle and %d active children after reap_children",
350             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
351
352 }
353
354 # ----------------------------------------------------------------
355 # Spawn up to max_children processes
356 # ----------------------------------------------------------------
357 sub spawn_children {
358     my $self = shift;
359     $self->spawn_child while $self->{num_children} < $self->{min_children};
360 }
361
362 # ----------------------------------------------------------------
363 # Spawns a new child.  If $active is set, the child goes directly
364 # into the active_list.
365 # ----------------------------------------------------------------
366 sub spawn_child {
367     my($self, $active) = @_;
368
369     my $child = OpenSRF::Server::Child->new($self);
370
371     # socket for sending message data to the child
372     if(!socketpair(
373         $child->{pipe_to_child},
374         $child->{pipe_to_parent},
375         AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
376             $logger->error("server: error creating data socketpair: $!");
377             return undef;
378     }
379
380     $child->{pipe_to_child}->autoflush(1);
381     $child->{pipe_to_parent}->autoflush(1);
382
383     $child->{pid} = fork();
384
385     if($child->{pid}) { # parent process
386         $self->{num_children}++;
387         $self->{pid_map}->{$child->{pid}} = $child;
388
389         if($active) {
390             push(@{$self->{active_list}}, $child);
391         } else {
392             push(@{$self->{idle_list}}, $child);
393         }
394
395         $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
396
397         return $child;
398
399     } else { # child process
400
401         $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
402
403         if($self->{stderr_log}) {
404
405             $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
406
407             close STDERR;
408             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
409                 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
410                 open STDERR, '>/dev/null'; # send it back to /dev/null
411             }
412         }
413
414         $child->{pid} = $$;
415         eval {
416             $child->init;
417             $child->run;
418             OpenSRF::Transport::PeerHandle->retrieve->disconnect;
419         };
420         $logger->error("server: child process died: $@") if $@;
421         exit(0);
422     }
423 }
424
425 # ----------------------------------------------------------------
426 # Sends the register command to the configured routers
427 # ----------------------------------------------------------------
428 sub register_routers {
429     my $self = shift;
430
431     my $conf = OpenSRF::Utils::Config->current;
432     my $routers = $conf->bootstrap->routers;
433     my $router_name = $conf->bootstrap->router_name;
434     my @targets;
435
436     for my $router (@$routers) {
437         if(ref $router) {
438
439             if( !$router->{services} ||
440                 !$router->{services}->{service} ||
441                 (
442                     ref($router->{services}->{service}) eq 'ARRAY' and
443                     grep { $_ eq $self->{service} } @{$router->{services}->{service}}
444                 )  || $router->{services}->{service} eq $self->{service}) {
445
446                 my $name = $router->{name};
447                 my $domain = $router->{domain};
448                 push(@targets, "$name\@$domain/router");
449             }
450
451         } else {
452             push(@targets, "$router_name\@$router/router");
453         }
454     }
455
456     foreach (@targets) {
457         $logger->info("server: registering with router $_");
458         $self->{osrf_handle}->send(
459             to => $_,
460             body => 'registering',
461             router_command => 'register',
462             router_class => $self->{service}
463         );
464     }
465
466     $self->{routers} = \@targets;
467 }
468
469 # ----------------------------------------------------------------
470 # Sends the unregister command to any routers we have registered
471 # with.
472 # ----------------------------------------------------------------
473 sub unregister_routers {
474     my $self = shift;
475     return unless $self->{osrf_handle}->tcp_connected;
476
477         for my $router (@{$self->{routers}}) {
478         $logger->info("server: disconnecting from router $router");
479         $self->{osrf_handle}->send(
480             to => $router,
481             body => "unregistering",
482             router_command => "unregister",
483             router_class => $self->{service}
484         );
485     }
486 }
487
488
489 package OpenSRF::Server::Child;
490 use strict;
491 use warnings;
492 use OpenSRF::Transport;
493 use OpenSRF::Application;
494 use OpenSRF::Transport::PeerHandle;
495 use OpenSRF::Transport::SlimJabber::XMPPMessage;
496 use OpenSRF::Utils::Logger qw($logger);
497 use OpenSRF::DomainObject::oilsResponse qw/:status/;
498 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
499 use Time::HiRes qw(time usleep);
500 use POSIX qw/:sys_wait_h :errno_h/;
501
502 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
503
504 sub new {
505     my($class, $parent) = @_;
506     my $self = bless({}, $class);
507     $self->{pid} = 0; # my process ID
508     $self->{parent} = $parent; # Controller parent process
509     $self->{num_requests} = 0; # total serviced requests
510     $self->{sig_pipe} = 0;  # true if last syswrite failed
511     return $self;
512 }
513
514 sub set_nonblock {
515     my($self, $fh) = @_;
516     my  $flags = fcntl($fh, F_GETFL, 0);
517     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
518 }
519
520 sub set_block {
521     my($self, $fh) = @_;
522     my  $flags = fcntl($fh, F_GETFL, 0);
523     $flags &= ~O_NONBLOCK;
524     fcntl($fh, F_SETFL, $flags);
525 }
526
527 # ----------------------------------------------------------------
528 # Connects to Jabber and runs the application child_init
529 # ----------------------------------------------------------------
530 sub init {
531     my $self = shift;
532     my $service = $self->{parent}->{service};
533     $0 = "OpenSRF Drone [$service]";
534     OpenSRF::Transport::PeerHandle->construct($service);
535         OpenSRF::Application->application_implementation->child_init
536                 if (OpenSRF::Application->application_implementation->can('child_init'));
537 }
538
539 # ----------------------------------------------------------------
540 # Waits for messages from the parent process, handles the message,
541 # then goes into the keepalive loop if this is a stateful session.
542 # When max_requests is hit, the process exits.
543 # ----------------------------------------------------------------
544 sub run {
545     my $self = shift;
546     my $network = OpenSRF::Transport::PeerHandle->retrieve;
547
548     # main child run loop.  Ends when this child hits max requests.
549     while(1) {
550
551         my $data = $self->wait_for_request or next;
552
553         # Update process name to show activity
554         my $orig_name = $0;
555         $0 = "$0*";
556
557         # Discard extraneous data from the jabber socket
558         if(!$network->flush_socket()) {
559             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
560             exit;
561         }
562
563         my $session = OpenSRF::Transport->handler(
564             $self->{parent}->{service},
565             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
566         );
567
568         $self->keepalive_loop($session);
569
570         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
571
572         # Tell the parent process we are available to process requests
573         $self->send_status;
574
575         # Repair process name
576         $0 = $orig_name;
577     }
578
579     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
580
581         OpenSRF::Application->application_implementation->child_exit
582                 if (OpenSRF::Application->application_implementation->can('child_exit'));
583 }
584
585 # ----------------------------------------------------------------
586 # waits for a request data on the parent pipe and returns it.
587 # ----------------------------------------------------------------
588 sub wait_for_request {
589     my $self = shift;
590
591     my $data = ''; # final request data
592     my $buf_size = 4096; # default linux pipe_buf (atomic window, not total size)
593     my $read_pipe = $self->{pipe_to_parent};
594     my $bytes_needed; # size of the data we are about to receive
595     my $bytes_recvd; # number of bytes read so far
596     my $first_read = 1; # true for first loop iteration
597     my $read_error;
598
599     while (1) {
600
601         # wait for some data to start arriving
602         my $read_set = IO::Select->new;
603         $read_set->add($read_pipe);
604     
605         while (1) {
606             # if can_read is interrupted while blocking, 
607             # go back and wait again until it succeeds.
608             last if $read_set->can_read;
609         }
610
611         # parent started writing, let's start reading
612         $self->set_nonblock($read_pipe);
613
614         while (1) {
615             # read all of the available data
616
617             my $buf = '';
618             my $nbytes = sysread($self->{pipe_to_parent}, $buf, $buf_size);
619
620             unless(defined $nbytes) {
621                 if ($! != EAGAIN) {
622                     $logger->error("server: error reading data from parent: $!.  ".
623                         "bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd; data=$data");
624                     $read_error = 1;
625                 }
626                 last;
627             }
628
629             last if $nbytes <= 0; # no more data available for reading
630
631             $bytes_recvd += $nbytes;
632             $data .= $buf;
633         }
634
635         $self->set_block($self->{pipe_to_parent});
636         return undef if $read_error;
637
638         # extract the data size and remove the header from the final data
639         if ($first_read) {
640             my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
641             $bytes_needed = int(substr($data, 0, $wps_size)) + $wps_size;
642             $data = substr($data, $wps_size);
643             $first_read = 0;
644         }
645
646
647         if ($bytes_recvd == $bytes_needed) {
648             # we've read all the data. Nothing left to do
649             last;
650         }
651
652         $logger->info("server: child process read all available pipe data.  ".
653             "waiting for more data from parent.  bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd");
654     }
655
656     return $data;
657 }
658
659
660 # ----------------------------------------------------------------
661 # If this is a stateful opensrf session, wait up to $keepalive
662 # seconds for subsequent requests from the client
663 # ----------------------------------------------------------------
664 sub keepalive_loop {
665     my($self, $session) = @_;
666     my $keepalive = $self->{parent}->{keepalive};
667
668     while($session->state and $session->state == $session->CONNECTED) {
669
670         unless( $session->queue_wait($keepalive) ) {
671
672             # client failed to disconnect before timeout
673             $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
674
675             my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
676                 status => "Disconnected on timeout",
677                 statusCode => STATUS_TIMEOUT
678             );
679
680             $session->status($res);
681             $session->state($session->DISCONNECTED);
682             last;
683         }
684     }
685
686     $chatty and $logger->internal("server: child done with request(s)");
687     $session->kill_me;
688 }
689
690 # ----------------------------------------------------------------
691 # Report our availability to our parent process
692 # ----------------------------------------------------------------
693 sub send_status {
694     my $self = shift;
695
696     for (0..2) {
697
698         $self->{sig_pipe} = 0;
699         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
700
701         syswrite(
702             $self->{pipe_to_parent},
703             sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
704         );
705
706         last unless $self->{sig_pipe};
707         $logger->error("server: $self got SIGPIPE writing status to parent, retrying...");
708         usleep(50000); # 50 msec
709     }
710
711     $logger->error("server: $self unable to send status to parent") if $self->{sig_pipe};
712 }
713
714
715 1;