1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
26 use POSIX qw/:sys_wait_h :errno_h/;
27 use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
28 use Time::HiRes qw/usleep/;
31 our $chatty = 1; # disable for production
33 use constant STATUS_PIPE_DATA_SIZE => 12;
36 my($class, $service, %args) = @_;
37 my $self = bless(\%args, $class);
39 $self->{service} = $service; # service name
40 $self->{num_children} = 0; # number of child processes
41 $self->{osrf_handle} = undef; # xmpp handle
42 $self->{routers} = []; # list of registered routers
43 $self->{active_list} = []; # list of active children
44 $self->{idle_list} = []; # list of idle children
45 $self->{pid_map} = {}; # map of child pid to child for cleaner access
46 $self->{sig_pipe} = 0; # true if last syswrite failed
48 $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log"
49 if $self->{stderr_log_path};
52 sprintf("%s/%s_$$.lock", $self->{lock_file_path}, $self->{service});
54 $self->{min_spare_children} ||= 0;
56 $self->{max_spare_children} = $self->{min_spare_children} + 1 if
57 $self->{max_spare_children} and
58 $self->{max_spare_children} <= $self->{min_spare_children};
63 # ----------------------------------------------------------------
64 # Disconnects from routers and waits for child processes to exit.
65 # ----------------------------------------------------------------
70 $logger->info("server: shutting down and cleaning up...");
72 # don't get sidetracked by signals while we're cleaning up.
73 # it could result in unexpected behavior with list traversal
74 $SIG{CHLD} = 'IGNORE';
76 # terminate the child processes
77 $self->kill_child($_) for
78 (@{$self->{idle_list}}, @{$self->{active_list}});
81 $self->unregister_routers;
83 $self->{osrf_handle}->disconnect;
85 # clean up our dead children
86 $self->reap_children(1);
88 # clean up the lock file
89 close($self->{lock_file_handle});
90 unlink($self->{lock_file});
92 exit(0) unless $no_exit;
97 open($self->{lock_file_handle}, '>>', $self->{lock_file})
98 or die "server: cannot open lock file ".$self->{lock_file} ." : $!\n";
101 # ----------------------------------------------------------------
102 # Waits on the jabber socket for inbound data from the router.
103 # Each new message is passed off to a child process for handling.
104 # At regular intervals, wake up for min/max spare child maintenance
105 # ----------------------------------------------------------------
109 $logger->set_service($self->{service});
111 $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
112 $SIG{CHLD} = sub { $self->reap_children(); };
114 $self->spawn_children;
115 $self->build_osrf_handle;
116 $self->register_routers;
117 $self->open_lock_file;
124 $self->{child_died} = 0;
126 my $msg = $self->{osrf_handle}->process($wait_time);
128 # we woke up for any reason, reset the wait time to allow
129 # for idle maintenance as necessary
134 if(my $child = pop(@{$self->{idle_list}})) {
136 # we have an idle child to handle the request
137 $chatty and $logger->internal("server: passing request to idle child $child");
138 push(@{$self->{active_list}}, $child);
139 $self->write_child($child, $msg);
141 } elsif($self->{num_children} < $self->{max_children}) {
143 # spawning a child to handle the request
144 $chatty and $logger->internal("server: spawning child to handle request");
145 $self->write_child($self->spawn_child(1), $msg);
148 $logger->warn("server: no children available, waiting... consider increasing " .
149 "max_children for this application higher than $self->{max_children} ".
150 "in the OpenSRF configuration if this message occurs frequently");
151 $self->check_status(1); # block until child is available
153 my $child = pop(@{$self->{idle_list}});
154 push(@{$self->{active_list}}, $child);
155 $self->write_child($child, $msg);
160 # don't perform idle maint immediately when woken by SIGCHLD
161 unless($self->{child_died}) {
163 # when we hit equilibrium, there's no need for regular
164 # maintenance, so set wait_time to 'forever'
166 !$self->perform_idle_maintenance and # no maintenance performed this time
167 @{$self->{active_list}} == 0; # no active children
173 # ----------------------------------------------------------------
174 # Launch a new spare child or kill an extra spare child. To
175 # prevent large-scale spawning or die-offs, spawn or kill only
176 # 1 process per idle maintenance loop.
177 # Returns true if any idle maintenance occurred, 0 otherwise
178 # ----------------------------------------------------------------
179 sub perform_idle_maintenance {
182 $chatty and $logger->internal(
184 "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
185 scalar(@{$self->{idle_list}}),
186 scalar(@{$self->{active_list}}),
187 $self->{min_spare_children},
188 $self->{max_spare_children}
192 # spawn 1 spare child per maintenance loop if necessary
193 if( $self->{min_spare_children} and
194 $self->{num_children} < $self->{max_children} and
195 scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
197 $chatty and $logger->internal("server: spawning spare child");
201 # kill 1 excess spare child per maintenance loop if necessary
202 } elsif($self->{max_spare_children} and
203 $self->{num_children} > $self->{min_children} and
204 scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
206 $chatty and $logger->internal("server: killing spare child");
216 my $child = shift || pop(@{$self->{idle_list}}) or return;
217 $chatty and $logger->internal("server: killing child $child");
218 kill('TERM', $child->{pid});
221 # ----------------------------------------------------------------
222 # Jabber connection inbound message arrive on.
223 # ----------------------------------------------------------------
224 sub build_osrf_handle {
227 my $conf = OpenSRF::Utils::Config->current;
228 my $username = $conf->bootstrap->username;
229 my $password = $conf->bootstrap->passwd;
230 my $domain = $conf->bootstrap->domain;
231 my $port = $conf->bootstrap->port;
232 my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
234 $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
236 $self->{osrf_handle} =
237 OpenSRF::Transport::SlimJabber::Client->new(
238 username => $username,
239 resource => $resource,
240 password => $password,
245 $self->{osrf_handle}->initialize;
249 # ----------------------------------------------------------------
250 # Sends request data to a child process
251 # ----------------------------------------------------------------
253 my($self, $child, $msg) = @_;
254 my $xml = encode_utf8(decode_utf8($msg->to_xml));
256 flock($self->{lock_file_handle}, LOCK_EX) or
257 $logger->error("server: cannot flock : $!");
261 $self->{sig_pipe} = 0;
262 local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
264 # send message to child data pipe
265 syswrite($child->{pipe_to_child}, $xml);
267 last unless $self->{sig_pipe};
268 $logger->error("server: got SIGPIPE writing to $child, retrying...");
269 usleep(50000); # 50 msec
272 flock($self->{lock_file_handle}, LOCK_UN) or
273 $logger->error("server: cannot de-flock : $!");
275 $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
278 # ----------------------------------------------------------------
279 # Checks to see if any child process has reported its availability
280 # In blocking mode, blocks until a child has reported.
281 # ----------------------------------------------------------------
283 my($self, $block) = @_;
285 return unless @{$self->{active_list}};
287 my $read_set = IO::Select->new;
288 $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
294 # if can_read or sysread is interrupted while bloking, go back and
295 # wait again until we have at least 1 free child
297 if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
299 for my $pipe (@handles) {
300 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
301 push(@pids, int($pid));
305 last unless $block and !@pids;
310 $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
315 # move the children from the active list to the idle list
316 for my $proc (@{$self->{active_list}}) {
317 if(grep { $_ == $proc->{pid} } @pids) {
318 push(@{$self->{idle_list}}, $proc);
320 push(@new_actives, $proc);
324 $self->{active_list} = [@new_actives];
326 $chatty and $logger->internal(sprintf(
327 "server: %d idle and %d active children after status update",
328 scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
331 # ----------------------------------------------------------------
332 # Cleans up any child processes that have exited.
333 # In shutdown mode, block until all children have washed ashore
334 # ----------------------------------------------------------------
336 my($self, $shutdown) = @_;
337 $self->{child_died} = 1;
341 my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
344 $chatty and $logger->internal("server: reaping child $pid");
346 my $child = $self->{pid_map}->{$pid};
348 close($child->{pipe_to_parent});
349 close($child->{pipe_to_child});
351 $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
352 $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
354 $self->{num_children}--;
355 delete $self->{pid_map}->{$pid};
356 delete $child->{$_} for keys %$child; # destroy with a vengeance
359 $self->spawn_children unless $shutdown;
361 $chatty and $logger->internal(sprintf(
362 "server: %d idle and %d active children after reap_children",
363 scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
367 # ----------------------------------------------------------------
368 # Spawn up to max_children processes
369 # ----------------------------------------------------------------
372 $self->spawn_child while $self->{num_children} < $self->{min_children};
375 # ----------------------------------------------------------------
376 # Spawns a new child. If $active is set, the child goes directly
377 # into the active_list.
378 # ----------------------------------------------------------------
380 my($self, $active) = @_;
382 my $child = OpenSRF::Server::Child->new($self);
384 # socket for sending message data to the child
386 $child->{pipe_to_child},
387 $child->{pipe_to_parent},
388 AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
389 $logger->error("server: error creating data socketpair: $!");
393 $child->{pipe_to_child}->autoflush(1);
394 $child->{pipe_to_parent}->autoflush(1);
396 $child->{pid} = fork();
398 if($child->{pid}) { # parent process
399 $self->{num_children}++;
400 $self->{pid_map}->{$child->{pid}} = $child;
403 push(@{$self->{active_list}}, $child);
405 push(@{$self->{idle_list}}, $child);
408 $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
412 } else { # child process
414 $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
416 if($self->{stderr_log}) {
418 $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
421 unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
422 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
423 open STDERR, '>/dev/null'; # send it back to /dev/null
431 OpenSRF::Transport::PeerHandle->retrieve->disconnect;
433 $logger->error("server: child process died: $@") if $@;
438 # ----------------------------------------------------------------
439 # Sends the register command to the configured routers
440 # ----------------------------------------------------------------
441 sub register_routers {
444 my $conf = OpenSRF::Utils::Config->current;
445 my $routers = $conf->bootstrap->routers;
446 my $router_name = $conf->bootstrap->router_name;
449 for my $router (@$routers) {
452 if( !$router->{services} ||
453 !$router->{services}->{service} ||
455 ref($router->{services}->{service}) eq 'ARRAY' and
456 grep { $_ eq $self->{service} } @{$router->{services}->{service}}
457 ) || $router->{services}->{service} eq $self->{service}) {
459 my $name = $router->{name};
460 my $domain = $router->{domain};
461 push(@targets, "$name\@$domain/router");
465 push(@targets, "$router_name\@$router/router");
470 $logger->info("server: registering with router $_");
471 $self->{osrf_handle}->send(
473 body => 'registering',
474 router_command => 'register',
475 router_class => $self->{service}
479 $self->{routers} = \@targets;
482 # ----------------------------------------------------------------
483 # Sends the unregister command to any routers we have registered
485 # ----------------------------------------------------------------
486 sub unregister_routers {
488 return unless $self->{osrf_handle}->tcp_connected;
490 for my $router (@{$self->{routers}}) {
491 $logger->info("server: disconnecting from router $router");
492 $self->{osrf_handle}->send(
494 body => "unregistering",
495 router_command => "unregister",
496 router_class => $self->{service}
502 package OpenSRF::Server::Child;
505 use OpenSRF::Transport;
506 use OpenSRF::Application;
507 use OpenSRF::Transport::PeerHandle;
508 use OpenSRF::Transport::SlimJabber::XMPPMessage;
509 use OpenSRF::Utils::Logger qw($logger);
510 use OpenSRF::DomainObject::oilsResponse qw/:status/;
511 use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
512 use Time::HiRes qw(time usleep);
513 use POSIX qw/:sys_wait_h :errno_h/;
515 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
518 my($class, $parent) = @_;
519 my $self = bless({}, $class);
520 $self->{pid} = 0; # my process ID
521 $self->{parent} = $parent; # Controller parent process
522 $self->{num_requests} = 0; # total serviced requests
523 $self->{sig_pipe} = 0; # true if last syswrite failed
529 my $flags = fcntl($fh, F_GETFL, 0);
530 fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
535 my $flags = fcntl($fh, F_GETFL, 0);
536 $flags &= ~O_NONBLOCK;
537 fcntl($fh, F_SETFL, $flags);
543 # close our copy of the parent's lock file since we won't be using it
544 close($self->{parent}->{lock_file_handle})
545 if $self->{parent}->{lock_file_handle};
547 my $fname = $self->{parent}->{lock_file};
548 unless (open($self->{lock_file_handle}, '>>', $fname)) {
549 $logger->error("server: child cannot open lock file $fname : $!");
550 die "server: child cannot open lock file $fname : $!\n";
554 # ----------------------------------------------------------------
555 # Connects to Jabber and runs the application child_init
556 # ----------------------------------------------------------------
559 $self->open_lock_file;
560 my $service = $self->{parent}->{service};
561 $0 = "OpenSRF Drone [$service]";
562 OpenSRF::Transport::PeerHandle->construct($service);
563 OpenSRF::Application->application_implementation->child_init
564 if (OpenSRF::Application->application_implementation->can('child_init'));
567 # ----------------------------------------------------------------
568 # Waits for messages from the parent process, handles the message,
569 # then goes into the keepalive loop if this is a stateful session.
570 # When max_requests is hit, the process exits.
571 # ----------------------------------------------------------------
574 my $network = OpenSRF::Transport::PeerHandle->retrieve;
576 # main child run loop. Ends when this child hits max requests.
579 my $data = $self->wait_for_request or next;
581 # Update process name to show activity
585 # Discard extraneous data from the jabber socket
586 if(!$network->flush_socket()) {
587 $logger->error("server: network disconnected! child dropping request and exiting: $data");
591 my $session = OpenSRF::Transport->handler(
592 $self->{parent}->{service},
593 OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
596 $self->keepalive_loop($session);
598 last if ++$self->{num_requests} == $self->{parent}->{max_requests};
600 # Tell the parent process we are available to process requests
603 # Repair process name
607 $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
609 OpenSRF::Application->application_implementation->child_exit
610 if (OpenSRF::Application->application_implementation->can('child_exit'));
613 # ----------------------------------------------------------------
614 # waits for a request data on the parent pipe and returns it.
615 # ----------------------------------------------------------------
616 sub wait_for_request {
620 my $read_size = 1024;
622 my $read_pipe = $self->{pipe_to_parent};
624 # wait for some data to start arriving
625 my $read_set = IO::Select->new;
626 $read_set->add($read_pipe);
629 # if can_read is interrupted while blocking,
630 # go back and wait again until it it succeeds.
631 last if $read_set->can_read;
634 # Parent has started sending data to us.
635 # Wait for the parent to write all data to the pipe. Then, immediately release
636 # the lock so the parent can start writing data to other child processes.
637 # Note: there is no danger of a subsequent message coming from the parent on
638 # the same pipe, since this child is now marked as active.
639 flock($self->{lock_file_handle}, LOCK_EX) or $logger->error("server: cannot flock : $!");
640 flock($self->{lock_file_handle}, LOCK_UN) or $logger->error("server: cannot de-flock : $!");
642 # we have all data now so all reads can be done in non-blocking mode
643 $self->set_nonblock($read_pipe);
647 local $SIG{'PIPE'} = sub { $sig_pipe = 1 };
650 my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
655 $logger->info("server: $self got SIGPIPE reading data from parent, retrying...");
656 usleep(50000); # 50 msec
660 $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
664 last if $n <= 0; # no data left to read
668 last if $n < $read_size; # done reading all data
671 # return to blocking mode
672 $self->set_block($self->{pipe_to_parent});
677 # ----------------------------------------------------------------
678 # If this is a stateful opensrf session, wait up to $keepalive
679 # seconds for subsequent requests from the client
680 # ----------------------------------------------------------------
682 my($self, $session) = @_;
683 my $keepalive = $self->{parent}->{keepalive};
685 while($session->state and $session->state == $session->CONNECTED) {
687 unless( $session->queue_wait($keepalive) ) {
689 # client failed to disconnect before timeout
690 $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
692 my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
693 status => "Disconnected on timeout",
694 statusCode => STATUS_TIMEOUT
697 $session->status($res);
698 $session->state($session->DISCONNECTED);
703 $chatty and $logger->internal("server: child done with request(s)");
707 # ----------------------------------------------------------------
708 # Report our availability to our parent process
709 # ----------------------------------------------------------------
715 $self->{sig_pipe} = 0;
716 local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
719 $self->{pipe_to_parent},
720 sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
723 last unless $self->{sig_pipe};
724 $logger->error("server: $self got SIGPIPE writing status to parent, retrying...");
725 usleep(50000); # 50 msec
728 $logger->error("server: $self unable to send status to parent") if $self->{sig_pipe};