]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perl/lib/OpenSRF/Server.pm
don't clean up child attributes until we're done with them; added pid map hash for...
[OpenSRF.git] / src / perl / lib / OpenSRF / Server.pm
1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
16 use strict;
17 use warnings;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
25 use POSIX qw/:sys_wait_h :errno_h/;
26 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
27 use IO::Select;
28 use Socket;
29 our $chatty = 1; # disable for production
30
31 use constant STATUS_PIPE_DATA_SIZE => 12;
32
33 sub new {
34     my($class, $service, %args) = @_;
35     my $self = bless(\%args, $class);
36
37     $self->{service}        = $service; # service name
38     $self->{num_children}   = 0; # number of child processes
39     $self->{osrf_handle}    = undef; # xmpp handle
40     $self->{routers}        = []; # list of registered routers
41     $self->{active_list}    = []; # list of active children
42     $self->{idle_list}      = []; # list of idle children
43     $self->{pid_map}        = {}; # map of child pid to child for cleaner access
44
45     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
46         if $self->{stderr_log_path};
47
48     $self->{min_spare_children} ||= 0;
49
50     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
51         $self->{max_spare_children} and
52         $self->{max_spare_children} <= $self->{min_spare_children};
53
54     return $self;
55 }
56
57 # ----------------------------------------------------------------
58 # Disconnects from routers and waits for child processes to exit.
59 # ----------------------------------------------------------------
60 sub cleanup {
61     my $self = shift;
62     my $no_exit = shift;
63
64     $logger->info("server: shutting down and cleaning up...");
65
66     # don't get sidetracked by signals while we're cleaning up.
67     # it could result in unexpected behavior with list traversal
68     $SIG{CHLD} = 'IGNORE';
69
70     # terminate the child processes
71     $self->kill_child($_) for
72         (@{$self->{idle_list}}, @{$self->{active_list}});
73
74     # de-register routers
75     $self->unregister_routers;
76
77     $self->{osrf_handle}->disconnect;
78
79     # clean up our dead children
80     $self->reap_children(1);
81
82     exit(0) unless $no_exit;
83 }
84
85
86 # ----------------------------------------------------------------
87 # Waits on the jabber socket for inbound data from the router.
88 # Each new message is passed off to a child process for handling.
89 # At regular intervals, wake up for min/max spare child maintenance
90 # ----------------------------------------------------------------
91 sub run {
92     my $self = shift;
93
94         $logger->set_service($self->{service});
95
96     $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
97     $SIG{CHLD} = sub { $self->reap_children(); };
98
99     $self->spawn_children;
100     $self->build_osrf_handle;
101     $self->register_routers;
102     my $wait_time = 1;
103
104     # main server loop
105     while(1) {
106
107         $self->check_status;
108         $self->{child_died} = 0;
109
110         my $msg = $self->{osrf_handle}->process($wait_time);
111
112         # we woke up for any reason, reset the wait time to allow
113         # for idle maintenance as necessary
114         $wait_time = 1;
115
116         if($msg) {
117
118             if(my $child = pop(@{$self->{idle_list}})) {
119
120                 # we have an idle child to handle the request
121                 $chatty and $logger->internal("server: passing request to idle child $child");
122                 push(@{$self->{active_list}}, $child);
123                 $self->write_child($child, $msg);
124
125             } elsif($self->{num_children} < $self->{max_children}) {
126
127                 # spawning a child to handle the request
128                 $chatty and $logger->internal("server: spawning child to handle request");
129                 $self->write_child($self->spawn_child(1), $msg);
130
131             } else {
132
133                 $logger->warn("server: no children available, waiting...");
134                 $self->check_status(1); # block until child is available
135
136                 my $child = pop(@{$self->{idle_list}});
137                 push(@{$self->{active_list}}, $child);
138                 $self->write_child($child, $msg);
139             }
140
141         } else {
142
143             # don't perform idle maint immediately when woken by SIGCHLD
144             unless($self->{child_died}) {
145
146                 # when we hit equilibrium, there's no need for regular
147                 # maintenance, so set wait_time to 'forever'
148                 $wait_time = -1 if 
149                     !$self->perform_idle_maintenance and # no maintenance performed this time
150                     @{$self->{active_list}} == 0; # no active children 
151             }
152         }
153     }
154 }
155
156 # ----------------------------------------------------------------
157 # Launch a new spare child or kill an extra spare child.  To
158 # prevent large-scale spawning or die-offs, spawn or kill only
159 # 1 process per idle maintenance loop.
160 # Returns true if any idle maintenance occurred, 0 otherwise
161 # ----------------------------------------------------------------
162 sub perform_idle_maintenance {
163     my $self = shift;
164
165     $chatty and $logger->internal(
166         sprintf(
167             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
168             scalar(@{$self->{idle_list}}), 
169             scalar(@{$self->{active_list}}),
170             $self->{min_spare_children},
171             $self->{max_spare_children}
172         )
173     );
174
175     # spawn 1 spare child per maintenance loop if necessary
176     if( $self->{min_spare_children} and
177         $self->{num_children} < $self->{max_children} and
178         scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
179
180         $chatty and $logger->internal("server: spawning spare child");
181         $self->spawn_child;
182         return 1;
183
184     # kill 1 excess spare child per maintenance loop if necessary
185     } elsif($self->{max_spare_children} and
186             $self->{num_children} > $self->{min_children} and
187             scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
188
189         $chatty and $logger->internal("server: killing spare child");
190         $self->kill_child;
191         return 1;
192     }
193
194     return 0;
195 }
196
197 sub kill_child {
198     my $self = shift;
199     my $child = shift || pop(@{$self->{idle_list}}) or return;
200     $chatty and $logger->internal("server: killing child $child");
201     kill('TERM', $child->{pid});
202 }
203
204 # ----------------------------------------------------------------
205 # Jabber connection inbound message arrive on.
206 # ----------------------------------------------------------------
207 sub build_osrf_handle {
208     my $self = shift;
209
210     my $conf = OpenSRF::Utils::Config->current;
211     my $username = $conf->bootstrap->username;
212     my $password = $conf->bootstrap->passwd;
213     my $domain = $conf->bootstrap->domain;
214     my $port = $conf->bootstrap->port;
215     my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
216
217     $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
218
219     $self->{osrf_handle} =
220         OpenSRF::Transport::SlimJabber::Client->new(
221             username => $username,
222             resource => $resource,
223             password => $password,
224             host => $domain,
225             port => $port,
226         );
227
228     $self->{osrf_handle}->initialize;
229 }
230
231
232 # ----------------------------------------------------------------
233 # Sends request data to a child process
234 # ----------------------------------------------------------------
235 sub write_child {
236     my($self, $child, $msg) = @_;
237     my $xml = $msg->to_xml;
238     syswrite($child->{pipe_to_child}, $xml);
239 }
240
241 # ----------------------------------------------------------------
242 # Checks to see if any child process has reported its availability
243 # In blocking mode, blocks until a child has reported.
244 # ----------------------------------------------------------------
245 sub check_status {
246     my($self, $block) = @_;
247
248     return unless @{$self->{active_list}};
249
250     my $read_set = IO::Select->new;
251     $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
252
253     my @pids;
254
255     while (1) {
256
257         # if can_read or sysread is interrupted while bloking, go back and 
258         # wait again until we have at least 1 free child
259
260         if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
261             my $pid = '';
262             for my $pipe (@handles) {
263                 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
264                 push(@pids, int($pid));
265             }
266         }
267
268         last unless $block and !@pids;
269     }
270
271     return unless @pids;
272
273     $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
274
275     my $child;
276     my @new_actives;
277
278     # move the children from the active list to the idle list
279     for my $proc (@{$self->{active_list}}) {
280         if(grep { $_ == $proc->{pid} } @pids) {
281             push(@{$self->{idle_list}}, $proc);
282         } else {
283             push(@new_actives, $proc);
284         }
285     }
286
287     $self->{active_list} = [@new_actives];
288
289     $chatty and $logger->internal(sprintf(
290         "server: %d idle and %d active children after status update",
291             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
292 }
293
294 # ----------------------------------------------------------------
295 # Cleans up any child processes that have exited.
296 # In shutdown mode, block until all children have washed ashore
297 # ----------------------------------------------------------------
298 sub reap_children {
299     my($self, $shutdown) = @_;
300     $self->{child_died} = 1;
301
302     while(1) {
303
304         my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
305         last if $pid <= 0;
306
307         $chatty and $logger->internal("server: reaping child $pid");
308
309         my $child = $self->{pid_map}->{$pid};
310
311         close($child->{pipe_to_parent});
312         close($child->{pipe_to_child});
313
314         $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
315         $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
316
317         $self->{num_children}--;
318         delete $self->{pid_map}->{$pid};
319         delete $child->{$_} for keys %$child; # destroy with a vengeance
320     }
321
322     $self->spawn_children unless $shutdown;
323
324     $chatty and $logger->internal(sprintf(
325         "server: %d idle and %d active children after reap_children",
326             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
327
328 }
329
330 # ----------------------------------------------------------------
331 # Spawn up to max_children processes
332 # ----------------------------------------------------------------
333 sub spawn_children {
334     my $self = shift;
335     $self->spawn_child while $self->{num_children} < $self->{min_children};
336 }
337
338 # ----------------------------------------------------------------
339 # Spawns a new child.  If $active is set, the child goes directly
340 # into the active_list.
341 # ----------------------------------------------------------------
342 sub spawn_child {
343     my($self, $active) = @_;
344
345     my $child = OpenSRF::Server::Child->new($self);
346
347     # socket for sending message data to the child
348     if(!socketpair(
349         $child->{pipe_to_child},
350         $child->{pipe_to_parent},
351         AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
352             $logger->error("server: error creating data socketpair: $!");
353             return undef;
354     }
355
356     $child->{pipe_to_child}->autoflush(1);
357     $child->{pipe_to_parent}->autoflush(1);
358
359     $child->{pid} = fork();
360
361     if($child->{pid}) { # parent process
362         $self->{num_children}++;
363         $self->{pid_map}->{$child->{pid}} = $child;
364
365         if($active) {
366             push(@{$self->{active_list}}, $child);
367         } else {
368             push(@{$self->{idle_list}}, $child);
369         }
370
371         $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
372
373         return $child;
374
375     } else { # child process
376
377         $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
378
379         if($self->{stderr_log}) {
380
381             $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
382
383             close STDERR;
384             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
385                 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
386                 open STDERR, '>/dev/null'; # send it back to /dev/null
387             }
388         }
389
390         $child->{pid} = $$;
391         eval {
392             $child->init;
393             $child->run;
394             OpenSRF::Transport::PeerHandle->retrieve->disconnect;
395         };
396         $logger->error("server: child process died: $@") if $@;
397         exit(0);
398     }
399 }
400
401 # ----------------------------------------------------------------
402 # Sends the register command to the configured routers
403 # ----------------------------------------------------------------
404 sub register_routers {
405     my $self = shift;
406
407     my $conf = OpenSRF::Utils::Config->current;
408     my $routers = $conf->bootstrap->routers;
409     my $router_name = $conf->bootstrap->router_name;
410     my @targets;
411
412     for my $router (@$routers) {
413         if(ref $router) {
414
415             if( !$router->{services} ||
416                 !$router->{services}->{service} ||
417                 (
418                     ref($router->{services}->{service}) eq 'ARRAY' and
419                     grep { $_ eq $self->{service} } @{$router->{services}->{service}}
420                 )  || $router->{services}->{service} eq $self->{service}) {
421
422                 my $name = $router->{name};
423                 my $domain = $router->{domain};
424                 push(@targets, "$name\@$domain/router");
425             }
426
427         } else {
428             push(@targets, "$router_name\@$router/router");
429         }
430     }
431
432     foreach (@targets) {
433         $logger->info("server: registering with router $_");
434         $self->{osrf_handle}->send(
435             to => $_,
436             body => 'registering',
437             router_command => 'register',
438             router_class => $self->{service}
439         );
440     }
441
442     $self->{routers} = \@targets;
443 }
444
445 # ----------------------------------------------------------------
446 # Sends the unregister command to any routers we have registered
447 # with.
448 # ----------------------------------------------------------------
449 sub unregister_routers {
450     my $self = shift;
451     return unless $self->{osrf_handle}->tcp_connected;
452
453         for my $router (@{$self->{routers}}) {
454         $logger->info("server: disconnecting from router $router");
455         $self->{osrf_handle}->send(
456             to => $router,
457             body => "unregistering",
458             router_command => "unregister",
459             router_class => $self->{service}
460         );
461     }
462 }
463
464
465 package OpenSRF::Server::Child;
466 use strict;
467 use warnings;
468 use OpenSRF::Transport;
469 use OpenSRF::Application;
470 use OpenSRF::Transport::PeerHandle;
471 use OpenSRF::Transport::SlimJabber::XMPPMessage;
472 use OpenSRF::Utils::Logger qw($logger);
473 use OpenSRF::DomainObject::oilsResponse qw/:status/;
474 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
475 use Time::HiRes qw(time);
476 use POSIX qw/:sys_wait_h :errno_h/;
477
478 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
479
480 sub new {
481     my($class, $parent) = @_;
482     my $self = bless({}, $class);
483     $self->{pid} = 0; # my process ID
484     $self->{parent} = $parent; # Controller parent process
485     $self->{num_requests} = 0; # total serviced requests
486     return $self;
487 }
488
489 sub set_nonblock {
490     my($self, $fh) = @_;
491     my  $flags = fcntl($fh, F_GETFL, 0);
492     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
493 }
494
495 sub set_block {
496     my($self, $fh) = @_;
497     my  $flags = fcntl($fh, F_GETFL, 0);
498     $flags &= ~O_NONBLOCK;
499     fcntl($fh, F_SETFL, $flags);
500 }
501
502 # ----------------------------------------------------------------
503 # Connects to Jabber and runs the application child_init
504 # ----------------------------------------------------------------
505 sub init {
506     my $self = shift;
507     my $service = $self->{parent}->{service};
508     $0 = "OpenSRF Drone [$service]";
509     OpenSRF::Transport::PeerHandle->construct($service);
510         OpenSRF::Application->application_implementation->child_init
511                 if (OpenSRF::Application->application_implementation->can('child_init'));
512 }
513
514 # ----------------------------------------------------------------
515 # Waits for messages from the parent process, handles the message,
516 # then goes into the keepalive loop if this is a stateful session.
517 # When max_requests is hit, the process exits.
518 # ----------------------------------------------------------------
519 sub run {
520     my $self = shift;
521     my $network = OpenSRF::Transport::PeerHandle->retrieve;
522
523     # main child run loop.  Ends when this child hits max requests.
524     while(1) {
525
526         my $data = $self->wait_for_request or next;
527
528         # Update process name to show activity
529         my $orig_name = $0;
530         $0 = "$0*";
531
532         # Discard extraneous data from the jabber socket
533         if(!$network->flush_socket()) {
534             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
535             exit;
536         }
537
538         my $session = OpenSRF::Transport->handler(
539             $self->{parent}->{service},
540             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
541         );
542
543         $self->keepalive_loop($session);
544
545         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
546
547         # Tell the parent process we are available to process requests
548         $self->send_status;
549
550         # Repair process name
551         $0 = $orig_name;
552     }
553
554     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
555
556         OpenSRF::Application->application_implementation->child_exit
557                 if (OpenSRF::Application->application_implementation->can('child_exit'));
558 }
559
560 # ----------------------------------------------------------------
561 # waits for a request data on the parent pipe and returns it.
562 # ----------------------------------------------------------------
563 sub wait_for_request {
564     my $self = shift;
565
566     my $data = '';
567     my $read_size = 1024;
568     my $nonblock = 0;
569
570     while(1) {
571         # Start out blocking, when data is available, read it all
572
573         my $buf = '';
574         my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
575
576         unless(defined $n) {
577             $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; 
578             last;
579         }
580
581         last if $n <= 0; # no data left to read
582
583         $data .= $buf;
584
585         last if $n < $read_size; # done reading all data
586
587         $self->set_nonblock($self->{pipe_to_parent}) unless $nonblock;
588         $nonblock = 1;
589     }
590
591     $self->set_block($self->{pipe_to_parent}) if $nonblock;
592     return $data;
593 }
594
595
596 # ----------------------------------------------------------------
597 # If this is a stateful opensrf session, wait up to $keepalive
598 # seconds for subsequent requests from the client
599 # ----------------------------------------------------------------
600 sub keepalive_loop {
601     my($self, $session) = @_;
602     my $keepalive = $self->{parent}->{keepalive};
603
604     while($session->state and $session->state == $session->CONNECTED) {
605
606         unless( $session->queue_wait($keepalive) ) {
607
608             # client failed to disconnect before timeout
609             $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
610
611             my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
612                 status => "Disconnected on timeout",
613                 statusCode => STATUS_TIMEOUT
614             );
615
616             $session->status($res);
617             $session->state($session->DISCONNECTED);
618             last;
619         }
620     }
621
622     $chatty and $logger->internal("server: child done with request(s)");
623     $session->kill_me;
624 }
625
626 # ----------------------------------------------------------------
627 # Report our availability to our parent process
628 # ----------------------------------------------------------------
629 sub send_status {
630     my $self = shift;
631     syswrite(
632         $self->{pipe_to_parent},
633         sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
634     );
635 }
636
637
638 1;