1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
26 use POSIX qw/:sys_wait_h :errno_h/;
27 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
28 use Time::HiRes qw/usleep/;
31 our $chatty = 1; # disable for production
33 use constant STATUS_PIPE_DATA_SIZE => 12;
34 use constant WRITE_PIPE_DATA_SIZE => 12;
37 my($class, $service, %args) = @_;
38 my $self = bless(\%args, $class);
40 $self->{service} = $service; # service name
41 $self->{num_children} = 0; # number of child processes
42 $self->{osrf_handle} = undef; # xmpp handle
43 $self->{routers} = []; # list of registered routers
44 $self->{active_list} = []; # list of active children
45 $self->{idle_list} = []; # list of idle children
46 $self->{pid_map} = {}; # map of child pid to child for cleaner access
47 $self->{sig_pipe} = 0; # true if last syswrite failed
49 $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log"
50 if $self->{stderr_log_path};
52 $self->{min_spare_children} ||= 0;
54 $self->{max_spare_children} = $self->{min_spare_children} + 1 if
55 $self->{max_spare_children} and
56 $self->{max_spare_children} <= $self->{min_spare_children};
61 # ----------------------------------------------------------------
62 # Disconnects from routers and waits for child processes to exit.
63 # ----------------------------------------------------------------
68 $logger->info("server: shutting down and cleaning up...");
70 # don't get sidetracked by signals while we're cleaning up.
71 # it could result in unexpected behavior with list traversal
72 $SIG{CHLD} = 'IGNORE';
74 # terminate the child processes
75 $self->kill_child($_) for
76 (@{$self->{idle_list}}, @{$self->{active_list}});
79 $self->unregister_routers;
81 $self->{osrf_handle}->disconnect;
83 # clean up our dead children
84 $self->reap_children(1);
86 exit(0) unless $no_exit;
89 # ----------------------------------------------------------------
90 # Waits on the jabber socket for inbound data from the router.
91 # Each new message is passed off to a child process for handling.
92 # At regular intervals, wake up for min/max spare child maintenance
93 # ----------------------------------------------------------------
97 $logger->set_service($self->{service});
99 $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
100 $SIG{CHLD} = sub { $self->reap_children(); };
102 $self->spawn_children;
103 $self->build_osrf_handle;
104 $self->register_routers;
111 $self->{child_died} = 0;
113 my $msg = $self->{osrf_handle}->process($wait_time);
115 # we woke up for any reason, reset the wait time to allow
116 # for idle maintenance as necessary
121 if(my $child = pop(@{$self->{idle_list}})) {
123 # we have an idle child to handle the request
124 $chatty and $logger->internal("server: passing request to idle child $child");
125 push(@{$self->{active_list}}, $child);
126 $self->write_child($child, $msg);
128 } elsif($self->{num_children} < $self->{max_children}) {
130 # spawning a child to handle the request
131 $chatty and $logger->internal("server: spawning child to handle request");
132 $self->write_child($self->spawn_child(1), $msg);
135 $logger->warn("server: no children available, waiting... consider increasing " .
136 "max_children for this application higher than $self->{max_children} ".
137 "in the OpenSRF configuration if this message occurs frequently");
138 $self->check_status(1); # block until child is available
140 my $child = pop(@{$self->{idle_list}});
141 push(@{$self->{active_list}}, $child);
142 $self->write_child($child, $msg);
147 # don't perform idle maint immediately when woken by SIGCHLD
148 unless($self->{child_died}) {
150 # when we hit equilibrium, there's no need for regular
151 # maintenance, so set wait_time to 'forever'
153 !$self->perform_idle_maintenance and # no maintenance performed this time
154 @{$self->{active_list}} == 0; # no active children
160 # ----------------------------------------------------------------
161 # Launch a new spare child or kill an extra spare child. To
162 # prevent large-scale spawning or die-offs, spawn or kill only
163 # 1 process per idle maintenance loop.
164 # Returns true if any idle maintenance occurred, 0 otherwise
165 # ----------------------------------------------------------------
166 sub perform_idle_maintenance {
169 $chatty and $logger->internal(
171 "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
172 scalar(@{$self->{idle_list}}),
173 scalar(@{$self->{active_list}}),
174 $self->{min_spare_children},
175 $self->{max_spare_children}
179 # spawn 1 spare child per maintenance loop if necessary
180 if( $self->{min_spare_children} and
181 $self->{num_children} < $self->{max_children} and
182 scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
184 $chatty and $logger->internal("server: spawning spare child");
188 # kill 1 excess spare child per maintenance loop if necessary
189 } elsif($self->{max_spare_children} and
190 $self->{num_children} > $self->{min_children} and
191 scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
193 $chatty and $logger->internal("server: killing spare child");
203 my $child = shift || pop(@{$self->{idle_list}}) or return;
204 $chatty and $logger->internal("server: killing child $child");
205 kill('TERM', $child->{pid});
208 # ----------------------------------------------------------------
209 # Jabber connection inbound message arrive on.
210 # ----------------------------------------------------------------
211 sub build_osrf_handle {
214 my $conf = OpenSRF::Utils::Config->current;
215 my $username = $conf->bootstrap->username;
216 my $password = $conf->bootstrap->passwd;
217 my $domain = $conf->bootstrap->domain;
218 my $port = $conf->bootstrap->port;
219 my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
221 $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
223 $self->{osrf_handle} =
224 OpenSRF::Transport::SlimJabber::Client->new(
225 username => $username,
226 resource => $resource,
227 password => $password,
232 $self->{osrf_handle}->initialize;
236 # ----------------------------------------------------------------
237 # Sends request data to a child process
238 # ----------------------------------------------------------------
240 my($self, $child, $msg) = @_;
241 my $xml = encode_utf8(decode_utf8($msg->to_xml));
243 # tell the child how much data to expect, minus the header
245 {use bytes; $write_size = length($xml)}
246 $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
250 $self->{sig_pipe} = 0;
251 local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
253 # send message to child data pipe
254 syswrite($child->{pipe_to_child}, $write_size . $xml);
256 last unless $self->{sig_pipe};
257 $logger->error("server: got SIGPIPE writing to $child, retrying...");
258 usleep(50000); # 50 msec
261 $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
264 # ----------------------------------------------------------------
265 # Checks to see if any child process has reported its availability
266 # In blocking mode, blocks until a child has reported.
267 # ----------------------------------------------------------------
269 my($self, $block) = @_;
271 return unless @{$self->{active_list}};
273 my $read_set = IO::Select->new;
274 $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
280 # if can_read or sysread is interrupted while bloking, go back and
281 # wait again until we have at least 1 free child
283 if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
285 for my $pipe (@handles) {
286 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
287 push(@pids, int($pid));
291 last unless $block and !@pids;
296 $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
301 # move the children from the active list to the idle list
302 for my $proc (@{$self->{active_list}}) {
303 if(grep { $_ == $proc->{pid} } @pids) {
304 push(@{$self->{idle_list}}, $proc);
306 push(@new_actives, $proc);
310 $self->{active_list} = [@new_actives];
312 $chatty and $logger->internal(sprintf(
313 "server: %d idle and %d active children after status update",
314 scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
317 # ----------------------------------------------------------------
318 # Cleans up any child processes that have exited.
319 # In shutdown mode, block until all children have washed ashore
320 # ----------------------------------------------------------------
322 my($self, $shutdown) = @_;
323 $self->{child_died} = 1;
327 my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
330 $chatty and $logger->internal("server: reaping child $pid");
332 my $child = $self->{pid_map}->{$pid};
334 close($child->{pipe_to_parent});
335 close($child->{pipe_to_child});
337 $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
338 $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
340 $self->{num_children}--;
341 delete $self->{pid_map}->{$pid};
342 delete $child->{$_} for keys %$child; # destroy with a vengeance
345 $self->spawn_children unless $shutdown;
347 $chatty and $logger->internal(sprintf(
348 "server: %d idle and %d active children after reap_children",
349 scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
353 # ----------------------------------------------------------------
354 # Spawn up to max_children processes
355 # ----------------------------------------------------------------
358 $self->spawn_child while $self->{num_children} < $self->{min_children};
361 # ----------------------------------------------------------------
362 # Spawns a new child. If $active is set, the child goes directly
363 # into the active_list.
364 # ----------------------------------------------------------------
366 my($self, $active) = @_;
368 my $child = OpenSRF::Server::Child->new($self);
370 # socket for sending message data to the child
372 $child->{pipe_to_child},
373 $child->{pipe_to_parent},
374 AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
375 $logger->error("server: error creating data socketpair: $!");
379 $child->{pipe_to_child}->autoflush(1);
380 $child->{pipe_to_parent}->autoflush(1);
382 $child->{pid} = fork();
384 if($child->{pid}) { # parent process
385 $self->{num_children}++;
386 $self->{pid_map}->{$child->{pid}} = $child;
389 push(@{$self->{active_list}}, $child);
391 push(@{$self->{idle_list}}, $child);
394 $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
398 } else { # child process
400 $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
402 if($self->{stderr_log}) {
404 $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
407 unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
408 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
409 open STDERR, '>/dev/null'; # send it back to /dev/null
417 OpenSRF::Transport::PeerHandle->retrieve->disconnect;
419 $logger->error("server: child process died: $@") if $@;
424 # ----------------------------------------------------------------
425 # Sends the register command to the configured routers
426 # ----------------------------------------------------------------
427 sub register_routers {
430 my $conf = OpenSRF::Utils::Config->current;
431 my $routers = $conf->bootstrap->routers;
432 my $router_name = $conf->bootstrap->router_name;
435 for my $router (@$routers) {
438 if( !$router->{services} ||
439 !$router->{services}->{service} ||
441 ref($router->{services}->{service}) eq 'ARRAY' and
442 grep { $_ eq $self->{service} } @{$router->{services}->{service}}
443 ) || $router->{services}->{service} eq $self->{service}) {
445 my $name = $router->{name};
446 my $domain = $router->{domain};
447 push(@targets, "$name\@$domain/router");
451 push(@targets, "$router_name\@$router/router");
456 $logger->info("server: registering with router $_");
457 $self->{osrf_handle}->send(
459 body => 'registering',
460 router_command => 'register',
461 router_class => $self->{service}
465 $self->{routers} = \@targets;
468 # ----------------------------------------------------------------
469 # Sends the unregister command to any routers we have registered
471 # ----------------------------------------------------------------
472 sub unregister_routers {
474 return unless $self->{osrf_handle}->tcp_connected;
476 for my $router (@{$self->{routers}}) {
477 $logger->info("server: disconnecting from router $router");
478 $self->{osrf_handle}->send(
480 body => "unregistering",
481 router_command => "unregister",
482 router_class => $self->{service}
488 package OpenSRF::Server::Child;
491 use OpenSRF::Transport;
492 use OpenSRF::Application;
493 use OpenSRF::Transport::PeerHandle;
494 use OpenSRF::Transport::SlimJabber::XMPPMessage;
495 use OpenSRF::Utils::Logger qw($logger);
496 use OpenSRF::DomainObject::oilsResponse qw/:status/;
497 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
498 use Time::HiRes qw(time usleep);
499 use POSIX qw/:sys_wait_h :errno_h/;
501 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
504 my($class, $parent) = @_;
505 my $self = bless({}, $class);
506 $self->{pid} = 0; # my process ID
507 $self->{parent} = $parent; # Controller parent process
508 $self->{num_requests} = 0; # total serviced requests
509 $self->{sig_pipe} = 0; # true if last syswrite failed
515 my $flags = fcntl($fh, F_GETFL, 0);
516 fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
521 my $flags = fcntl($fh, F_GETFL, 0);
522 $flags &= ~O_NONBLOCK;
523 fcntl($fh, F_SETFL, $flags);
526 # ----------------------------------------------------------------
527 # Connects to Jabber and runs the application child_init
528 # ----------------------------------------------------------------
531 my $service = $self->{parent}->{service};
532 $0 = "OpenSRF Drone [$service]";
533 OpenSRF::Transport::PeerHandle->construct($service);
534 OpenSRF::Application->application_implementation->child_init
535 if (OpenSRF::Application->application_implementation->can('child_init'));
538 # ----------------------------------------------------------------
539 # Waits for messages from the parent process, handles the message,
540 # then goes into the keepalive loop if this is a stateful session.
541 # When max_requests is hit, the process exits.
542 # ----------------------------------------------------------------
545 my $network = OpenSRF::Transport::PeerHandle->retrieve;
547 # main child run loop. Ends when this child hits max requests.
550 my $data = $self->wait_for_request or next;
552 # Update process name to show activity
556 # Discard extraneous data from the jabber socket
557 if(!$network->flush_socket()) {
558 $logger->error("server: network disconnected! child dropping request and exiting: $data");
562 my $session = OpenSRF::Transport->handler(
563 $self->{parent}->{service},
564 OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
567 $self->keepalive_loop($session);
569 last if ++$self->{num_requests} == $self->{parent}->{max_requests};
571 # Tell the parent process we are available to process requests
574 # Repair process name
578 $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
580 OpenSRF::Application->application_implementation->child_exit
581 if (OpenSRF::Application->application_implementation->can('child_exit'));
584 # ----------------------------------------------------------------
585 # waits for a request data on the parent pipe and returns it.
586 # ----------------------------------------------------------------
587 sub wait_for_request {
590 my $data = ''; # final request data
591 my $buf_size = 4096; # default linux pipe_buf (atomic window, not total size)
592 my $read_pipe = $self->{pipe_to_parent};
593 my $bytes_needed; # size of the data we are about to receive
594 my $bytes_recvd; # number of bytes read so far
595 my $first_read = 1; # true for first loop iteration
600 # wait for some data to start arriving
601 my $read_set = IO::Select->new;
602 $read_set->add($read_pipe);
605 # if can_read is interrupted while blocking,
606 # go back and wait again until it succeeds.
607 last if $read_set->can_read;
610 # parent started writing, let's start reading
611 $self->set_nonblock($read_pipe);
614 # read all of the available data
617 my $nbytes = sysread($self->{pipe_to_parent}, $buf, $buf_size);
619 unless(defined $nbytes) {
621 $logger->error("server: error reading data from parent: $!. ".
622 "bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd; data=$data");
628 last if $nbytes <= 0; # no more data available for reading
630 $bytes_recvd += $nbytes;
634 $self->set_block($self->{pipe_to_parent});
635 return undef if $read_error;
637 # extract the data size and remove the header from the final data
639 my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
640 $bytes_needed = int(substr($data, 0, $wps_size)) + $wps_size;
641 $data = substr($data, $wps_size);
646 if ($bytes_recvd == $bytes_needed) {
647 # we've read all the data. Nothing left to do
651 $logger->info("server: child process read all available pipe data. ".
652 "waiting for more data from parent. bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd");
659 # ----------------------------------------------------------------
660 # If this is a stateful opensrf session, wait up to $keepalive
661 # seconds for subsequent requests from the client
662 # ----------------------------------------------------------------
664 my($self, $session) = @_;
665 my $keepalive = $self->{parent}->{keepalive};
667 while($session->state and $session->state == $session->CONNECTED) {
669 unless( $session->queue_wait($keepalive) ) {
671 # client failed to disconnect before timeout
672 $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
674 my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
675 status => "Disconnected on timeout",
676 statusCode => STATUS_TIMEOUT
679 $session->status($res);
680 $session->state($session->DISCONNECTED);
685 $chatty and $logger->internal("server: child done with request(s)");
689 # ----------------------------------------------------------------
690 # Report our availability to our parent process
691 # ----------------------------------------------------------------
697 $self->{sig_pipe} = 0;
698 local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
701 $self->{pipe_to_parent},
702 sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
705 last unless $self->{sig_pipe};
706 $logger->error("server: $self got SIGPIPE writing status to parent, retrying...");
707 usleep(50000); # 50 msec
710 $logger->error("server: $self unable to send status to parent") if $self->{sig_pipe};