1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
25 use POSIX qw/:sys_wait_h :errno_h/;
26 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
29 our $chatty = 1; # disable for production
31 use constant STATUS_PIPE_DATA_SIZE => 12;
34 my($class, $service, %args) = @_;
35 my $self = bless(\%args, $class);
37 $self->{service} = $service; # service name
38 $self->{num_children} = 0; # number of child processes
39 $self->{osrf_handle} = undef; # xmpp handle
40 $self->{routers} = []; # list of registered routers
41 $self->{active_list} = []; # list of active children
42 $self->{idle_list} = []; # list of idle children
44 $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log"
45 if $self->{stderr_log_path};
47 $self->{min_spare_children} ||= 0;
49 $self->{max_spare_children} = $self->{min_spare_children} + 1 if
50 $self->{max_spare_children} and
51 $self->{max_spare_children} <= $self->{min_spare_children};
56 # ----------------------------------------------------------------
57 # Disconnects from routers and waits for child processes to exit.
58 # ----------------------------------------------------------------
63 $logger->info("server: shutting down and cleaning up...");
65 # don't get sidetracked by signals while we're cleaning up.
66 # it could result in unexpected behavior with list traversal
67 $SIG{CHLD} = 'IGNORE';
69 # terminate the child processes
70 $self->kill_child($_) for
71 (@{$self->{idle_list}}, @{$self->{active_list}});
74 $self->unregister_routers;
76 $self->{osrf_handle}->disconnect;
78 # clean up our dead children
79 $self->reap_children(1);
81 exit(0) unless $no_exit;
85 # ----------------------------------------------------------------
86 # Waits on the jabber socket for inbound data from the router.
87 # Each new message is passed off to a child process for handling.
88 # At regular intervals, wake up for min/max spare child maintenance
89 # ----------------------------------------------------------------
93 $logger->set_service($self->{service});
95 $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
96 $SIG{CHLD} = sub { $self->reap_children(); };
98 $self->spawn_children;
99 $self->build_osrf_handle;
100 $self->register_routers;
107 $self->{child_died} = 0;
109 my $msg = $self->{osrf_handle}->process($wait_time);
111 # we woke up for any reason, reset the wait time to allow
112 # for idle maintenance as necessary
117 if(my $child = pop(@{$self->{idle_list}})) {
119 # we have an idle child to handle the request
120 $chatty and $logger->internal("server: passing request to idle child $child");
121 push(@{$self->{active_list}}, $child);
122 $self->write_child($child, $msg);
124 } elsif($self->{num_children} < $self->{max_children}) {
126 # spawning a child to handle the request
127 $chatty and $logger->internal("server: spawning child to handle request");
128 $self->write_child($self->spawn_child(1), $msg);
132 $logger->warn("server: no children available, waiting...");
133 $self->check_status(1); # block until child is available
135 my $child = pop(@{$self->{idle_list}});
136 push(@{$self->{active_list}}, $child);
137 $self->write_child($child, $msg);
142 # don't perform idle maint immediately when woken by SIGCHLD
143 unless($self->{child_died}) {
145 # when we hit equilibrium, there's no need for regular
146 # maintenance, so set wait_time to 'forever'
147 $wait_time = -1 unless $self->perform_idle_maintenance;
153 # ----------------------------------------------------------------
154 # Launch a new spare child or kill an extra spare child. To
155 # prevent large-scale spawning or die-offs, spawn or kill only
156 # 1 process per idle maintenance loop.
157 # Returns true if any idle maintenance occurred, 0 otherwise
158 # ----------------------------------------------------------------
159 sub perform_idle_maintenance {
162 # spawn 1 spare child per maintenance loop if necessary
163 if( $self->{min_spare_children} and
164 $self->{num_children} < $self->{max_children} and
165 scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
167 $chatty and $logger->internal("server: spawning spare child");
171 # kill 1 excess spare child per maintenance loop if necessary
172 } elsif($self->{max_spare_children} and
173 $self->{num_children} > $self->{min_children} and
174 scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
176 $chatty and $logger->internal("server: killing spare child");
186 my $child = shift || pop(@{$self->{idle_list}}) or return;
187 $chatty and $logger->internal("server: killing child $child");
188 kill('TERM', $child->{pid});
191 # ----------------------------------------------------------------
192 # Jabber connection inbound message arrive on.
193 # ----------------------------------------------------------------
194 sub build_osrf_handle {
197 my $conf = OpenSRF::Utils::Config->current;
198 my $username = $conf->bootstrap->username;
199 my $password = $conf->bootstrap->passwd;
200 my $domain = $conf->bootstrap->domain;
201 my $port = $conf->bootstrap->port;
202 my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
204 $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
206 $self->{osrf_handle} =
207 OpenSRF::Transport::SlimJabber::Client->new(
208 username => $username,
209 resource => $resource,
210 password => $password,
215 $self->{osrf_handle}->initialize;
219 # ----------------------------------------------------------------
220 # Sends request data to a child process
221 # ----------------------------------------------------------------
223 my($self, $child, $msg) = @_;
224 my $xml = $msg->to_xml;
225 syswrite($child->{pipe_to_child}, $xml);
228 # ----------------------------------------------------------------
229 # Checks to see if any child process has reported its availability
230 # In blocking mode, blocks until a child has reported.
231 # ----------------------------------------------------------------
233 my($self, $block) = @_;
235 my $read_set = IO::Select->new;
236 $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
238 my @handles = $read_set->can_read(($block) ? undef : 0) or return;
242 for my $pipe (@handles) {
243 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
244 push(@pids, int($pid));
247 $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
252 # move the children from the active list to the idle list
253 for my $proc (@{$self->{active_list}}) {
254 if(grep { $_ == $proc->{pid} } @pids) {
255 push(@{$self->{idle_list}}, $proc);
257 push(@new_actives, $proc);
261 $self->{active_list} = [@new_actives];
263 $chatty and $logger->internal(sprintf(
264 "server: %d idle and %d active children after status update",
265 scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
268 # ----------------------------------------------------------------
269 # Cleans up any child processes that have exited.
270 # In shutdown mode, block until all children have washed ashore
271 # ----------------------------------------------------------------
273 my($self, $shutdown) = @_;
274 $self->{child_died} = 1;
278 my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
281 $chatty and $logger->internal("server: reaping child $pid");
283 my ($child) = grep {$_->{pid} == $pid} (@{$self->{active_list}}, @{$self->{idle_list}});
285 close($child->{pipe_to_parent});
286 close($child->{pipe_to_child});
287 delete $child->{$_} for keys %$child; # destroy with a vengeance
289 $self->{num_children}--;
290 $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
291 $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
294 $self->spawn_children unless $shutdown;
296 $chatty and $logger->internal(sprintf(
297 "server: %d idle and %d active children after reap_children",
298 scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
302 # ----------------------------------------------------------------
303 # Spawn up to max_children processes
304 # ----------------------------------------------------------------
307 $self->spawn_child while $self->{num_children} < $self->{min_children};
310 # ----------------------------------------------------------------
311 # Spawns a new child. If $active is set, the child goes directly
312 # into the active_list.
313 # ----------------------------------------------------------------
315 my($self, $active) = @_;
317 my $child = OpenSRF::Server::Child->new($self);
319 # socket for sending message data to the child
321 $child->{pipe_to_child},
322 $child->{pipe_to_parent},
323 AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
324 $logger->error("server: error creating data socketpair: $!");
328 $child->{pipe_to_child}->autoflush(1);
329 $child->{pipe_to_parent}->autoflush(1);
331 $child->{pid} = fork();
333 if($child->{pid}) { # parent process
334 $self->{num_children}++;
338 push(@{$self->{active_list}}, $child);
340 push(@{$self->{idle_list}}, $child);
343 $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
347 } else { # child process
349 $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
351 if($self->{stderr_log}) {
353 $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
356 unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
357 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
358 open STDERR, '>/dev/null'; # send it back to /dev/null
366 OpenSRF::Transport::PeerHandle->retrieve->disconnect;
368 $logger->error("server: child process died: $@") if $@;
373 # ----------------------------------------------------------------
374 # Sends the register command to the configured routers
375 # ----------------------------------------------------------------
376 sub register_routers {
379 my $conf = OpenSRF::Utils::Config->current;
380 my $routers = $conf->bootstrap->routers;
381 my $router_name = $conf->bootstrap->router_name;
384 for my $router (@$routers) {
387 if( !$router->{services} ||
388 !$router->{services}->{service} ||
390 ref($router->{services}->{service}) eq 'ARRAY' and
391 grep { $_ eq $self->{service} } @{$router->{services}->{service}}
392 ) || $router->{services}->{service} eq $self->{service}) {
394 my $name = $router->{name};
395 my $domain = $router->{domain};
396 push(@targets, "$name\@$domain/router");
400 push(@targets, "$router_name\@$router/router");
405 $logger->info("server: registering with router $_");
406 $self->{osrf_handle}->send(
408 body => 'registering',
409 router_command => 'register',
410 router_class => $self->{service}
414 $self->{routers} = \@targets;
417 # ----------------------------------------------------------------
418 # Sends the unregister command to any routers we have registered
420 # ----------------------------------------------------------------
421 sub unregister_routers {
423 return unless $self->{osrf_handle}->tcp_connected;
425 for my $router (@{$self->{routers}}) {
426 $logger->info("server: disconnecting from router $router");
427 $self->{osrf_handle}->send(
429 body => "unregistering",
430 router_command => "unregister",
431 router_class => $self->{service}
437 package OpenSRF::Server::Child;
440 use OpenSRF::Transport;
441 use OpenSRF::Application;
442 use OpenSRF::Transport::PeerHandle;
443 use OpenSRF::Transport::SlimJabber::XMPPMessage;
444 use OpenSRF::Utils::Logger qw($logger);
445 use OpenSRF::DomainObject::oilsResponse qw/:status/;
446 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
447 use Time::HiRes qw(time);
448 use POSIX qw/:sys_wait_h :errno_h/;
450 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
453 my($class, $parent) = @_;
454 my $self = bless({}, $class);
455 $self->{pid} = 0; # my process ID
456 $self->{parent} = $parent; # Controller parent process
457 $self->{num_requests} = 0; # total serviced requests
463 my $flags = fcntl($fh, F_GETFL, 0);
464 fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
469 my $flags = fcntl($fh, F_GETFL, 0);
470 $flags &= ~O_NONBLOCK;
471 fcntl($fh, F_SETFL, $flags);
474 # ----------------------------------------------------------------
475 # Connects to Jabber and runs the application child_init
476 # ----------------------------------------------------------------
479 my $service = $self->{parent}->{service};
480 $0 = "OpenSRF Drone [$service]";
481 OpenSRF::Transport::PeerHandle->construct($service);
482 OpenSRF::Application->application_implementation->child_init
483 if (OpenSRF::Application->application_implementation->can('child_init'));
486 # ----------------------------------------------------------------
487 # Waits for messages from the parent process, handles the message,
488 # then goes into the keepalive loop if this is a stateful session.
489 # When max_requests is hit, the process exits.
490 # ----------------------------------------------------------------
493 my $network = OpenSRF::Transport::PeerHandle->retrieve;
495 # main child run loop. Ends when this child hits max requests.
498 my $data = $self->wait_for_request or next;
500 # Update process name to show activity
504 # Discard extraneous data from the jabber socket
505 if(!$network->flush_socket()) {
506 $logger->error("server: network disconnected! child dropping request and exiting: $data");
510 my $session = OpenSRF::Transport->handler(
511 $self->{parent}->{service},
512 OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
515 $self->keepalive_loop($session);
517 last if ++$self->{num_requests} == $self->{parent}->{max_requests};
519 # Tell the parent process we are available to process requests
522 # Repair process name
526 $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
528 OpenSRF::Application->application_implementation->child_exit
529 if (OpenSRF::Application->application_implementation->can('child_exit'));
532 # ----------------------------------------------------------------
533 # waits for a request data on the parent pipe and returns it.
534 # ----------------------------------------------------------------
535 sub wait_for_request {
539 my $read_size = 1024;
543 # Start out blocking, when data is available, read it all
546 my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
549 $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
553 last if $n <= 0; # no data left to read
557 last if $n < $read_size; # done reading all data
559 $self->set_nonblock($self->{pipe_to_parent}) unless $nonblock;
563 $self->set_block($self->{pipe_to_parent}) if $nonblock;
568 # ----------------------------------------------------------------
569 # If this is a stateful opensrf session, wait up to $keepalive
570 # seconds for subsequent requests from the client
571 # ----------------------------------------------------------------
573 my($self, $session) = @_;
574 my $keepalive = $self->{parent}->{keepalive};
576 while($session->state and $session->state == $session->CONNECTED) {
578 unless( $session->queue_wait($keepalive) ) {
580 # client failed to disconnect before timeout
581 $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
583 my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
584 status => "Disconnected on timeout",
585 statusCode => STATUS_TIMEOUT
588 $session->status($res);
589 $session->state($session->DISCONNECTED);
594 $chatty and $logger->internal("server: child done with request(s)");
598 # ----------------------------------------------------------------
599 # Report our availability to our parent process
600 # ----------------------------------------------------------------
604 $self->{pipe_to_parent},
605 sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})