]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perl/lib/OpenSRF/Server.pm
JSON_v0 has been superseded, and it has insidious bugs anyway.
[OpenSRF.git] / src / perl / lib / OpenSRF / Server.pm
1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
16 use strict;
17 use warnings;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
25 use Encode;
26 use POSIX qw/:sys_wait_h :errno_h/;
27 use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
28 use Time::HiRes qw/usleep/;
29 use IO::Select;
30 use Socket;
31 our $chatty = 1; # disable for production
32
33 use constant STATUS_PIPE_DATA_SIZE => 12;
34
35 sub new {
36     my($class, $service, %args) = @_;
37     my $self = bless(\%args, $class);
38
39     $self->{service}        = $service; # service name
40     $self->{num_children}   = 0; # number of child processes
41     $self->{osrf_handle}    = undef; # xmpp handle
42     $self->{routers}        = []; # list of registered routers
43     $self->{active_list}    = []; # list of active children
44     $self->{idle_list}      = []; # list of idle children
45     $self->{pid_map}        = {}; # map of child pid to child for cleaner access
46     $self->{sig_pipe}       = 0;  # true if last syswrite failed
47
48     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
49         if $self->{stderr_log_path};
50
51     $self->{lock_file} = 
52         sprintf("%s/%s_$$.lock", $self->{lock_file_path}, $self->{service});
53
54     $self->{min_spare_children} ||= 0;
55
56     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
57         $self->{max_spare_children} and
58         $self->{max_spare_children} <= $self->{min_spare_children};
59
60     return $self;
61 }
62
63 # ----------------------------------------------------------------
64 # Disconnects from routers and waits for child processes to exit.
65 # ----------------------------------------------------------------
66 sub cleanup {
67     my $self = shift;
68     my $no_exit = shift;
69
70     $logger->info("server: shutting down and cleaning up...");
71
72     # don't get sidetracked by signals while we're cleaning up.
73     # it could result in unexpected behavior with list traversal
74     $SIG{CHLD} = 'IGNORE';
75
76     # terminate the child processes
77     $self->kill_child($_) for
78         (@{$self->{idle_list}}, @{$self->{active_list}});
79
80     # de-register routers
81     $self->unregister_routers;
82
83     $self->{osrf_handle}->disconnect;
84
85     # clean up our dead children
86     $self->reap_children(1);
87
88     # clean up the lock file
89     close($self->{lock_file_handle});
90     unlink($self->{lock_file});
91
92     exit(0) unless $no_exit;
93 }
94
95 sub open_lock_file {
96     my $self = shift;
97     open($self->{lock_file_handle}, '>>', $self->{lock_file})
98         or die "server: cannot open lock file ".$self->{lock_file} ." : $!\n";
99 }
100
101 # ----------------------------------------------------------------
102 # Waits on the jabber socket for inbound data from the router.
103 # Each new message is passed off to a child process for handling.
104 # At regular intervals, wake up for min/max spare child maintenance
105 # ----------------------------------------------------------------
106 sub run {
107     my $self = shift;
108
109         $logger->set_service($self->{service});
110
111     $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
112     $SIG{CHLD} = sub { $self->reap_children(); };
113
114     $self->spawn_children;
115     $self->build_osrf_handle;
116     $self->register_routers;
117     $self->open_lock_file;
118     my $wait_time = 1;
119
120     # main server loop
121     while(1) {
122
123         $self->check_status;
124         $self->{child_died} = 0;
125
126         my $msg = $self->{osrf_handle}->process($wait_time);
127
128         # we woke up for any reason, reset the wait time to allow
129         # for idle maintenance as necessary
130         $wait_time = 1;
131
132         if($msg) {
133
134             if(my $child = pop(@{$self->{idle_list}})) {
135
136                 # we have an idle child to handle the request
137                 $chatty and $logger->internal("server: passing request to idle child $child");
138                 push(@{$self->{active_list}}, $child);
139                 $self->write_child($child, $msg);
140
141             } elsif($self->{num_children} < $self->{max_children}) {
142
143                 # spawning a child to handle the request
144                 $chatty and $logger->internal("server: spawning child to handle request");
145                 $self->write_child($self->spawn_child(1), $msg);
146
147             } else {
148                 $logger->warn("server: no children available, waiting... consider increasing " .
149                     "max_children for this application higher than $self->{max_children} ".
150                     "in the OpenSRF configuration if this message occurs frequently");
151                 $self->check_status(1); # block until child is available
152
153                 my $child = pop(@{$self->{idle_list}});
154                 push(@{$self->{active_list}}, $child);
155                 $self->write_child($child, $msg);
156             }
157
158         } else {
159
160             # don't perform idle maint immediately when woken by SIGCHLD
161             unless($self->{child_died}) {
162
163                 # when we hit equilibrium, there's no need for regular
164                 # maintenance, so set wait_time to 'forever'
165                 $wait_time = -1 if 
166                     !$self->perform_idle_maintenance and # no maintenance performed this time
167                     @{$self->{active_list}} == 0; # no active children 
168             }
169         }
170     }
171 }
172
173 # ----------------------------------------------------------------
174 # Launch a new spare child or kill an extra spare child.  To
175 # prevent large-scale spawning or die-offs, spawn or kill only
176 # 1 process per idle maintenance loop.
177 # Returns true if any idle maintenance occurred, 0 otherwise
178 # ----------------------------------------------------------------
179 sub perform_idle_maintenance {
180     my $self = shift;
181
182     $chatty and $logger->internal(
183         sprintf(
184             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
185             scalar(@{$self->{idle_list}}), 
186             scalar(@{$self->{active_list}}),
187             $self->{min_spare_children},
188             $self->{max_spare_children}
189         )
190     );
191
192     # spawn 1 spare child per maintenance loop if necessary
193     if( $self->{min_spare_children} and
194         $self->{num_children} < $self->{max_children} and
195         scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
196
197         $chatty and $logger->internal("server: spawning spare child");
198         $self->spawn_child;
199         return 1;
200
201     # kill 1 excess spare child per maintenance loop if necessary
202     } elsif($self->{max_spare_children} and
203             $self->{num_children} > $self->{min_children} and
204             scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
205
206         $chatty and $logger->internal("server: killing spare child");
207         $self->kill_child;
208         return 1;
209     }
210
211     return 0;
212 }
213
214 sub kill_child {
215     my $self = shift;
216     my $child = shift || pop(@{$self->{idle_list}}) or return;
217     $chatty and $logger->internal("server: killing child $child");
218     kill('TERM', $child->{pid});
219 }
220
221 # ----------------------------------------------------------------
222 # Jabber connection inbound message arrive on.
223 # ----------------------------------------------------------------
224 sub build_osrf_handle {
225     my $self = shift;
226
227     my $conf = OpenSRF::Utils::Config->current;
228     my $username = $conf->bootstrap->username;
229     my $password = $conf->bootstrap->passwd;
230     my $domain = $conf->bootstrap->domain;
231     my $port = $conf->bootstrap->port;
232     my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
233
234     $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
235
236     $self->{osrf_handle} =
237         OpenSRF::Transport::SlimJabber::Client->new(
238             username => $username,
239             resource => $resource,
240             password => $password,
241             host => $domain,
242             port => $port,
243         );
244
245     $self->{osrf_handle}->initialize;
246 }
247
248
249 # ----------------------------------------------------------------
250 # Sends request data to a child process
251 # ----------------------------------------------------------------
252 sub write_child {
253     my($self, $child, $msg) = @_;
254     my $xml = encode_utf8(decode_utf8($msg->to_xml));
255
256     flock($self->{lock_file_handle}, LOCK_EX) or 
257         $logger->error("server: cannot flock : $!");
258
259     for (0..2) {
260
261         $self->{sig_pipe} = 0;
262         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
263
264         # send message to child data pipe
265         syswrite($child->{pipe_to_child}, $xml);
266
267         last unless $self->{sig_pipe};
268         $logger->error("server: got SIGPIPE writing to $child, retrying...");
269         usleep(50000); # 50 msec
270     }
271
272     flock($self->{lock_file_handle}, LOCK_UN) or 
273         $logger->error("server: cannot de-flock : $!");
274
275     $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
276 }
277
278 # ----------------------------------------------------------------
279 # Checks to see if any child process has reported its availability
280 # In blocking mode, blocks until a child has reported.
281 # ----------------------------------------------------------------
282 sub check_status {
283     my($self, $block) = @_;
284
285     return unless @{$self->{active_list}};
286
287     my $read_set = IO::Select->new;
288     $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
289
290     my @pids;
291
292     while (1) {
293
294         # if can_read or sysread is interrupted while bloking, go back and 
295         # wait again until we have at least 1 free child
296
297         if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
298             my $pid = '';
299             for my $pipe (@handles) {
300                 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
301                 push(@pids, int($pid));
302             }
303         }
304
305         last unless $block and !@pids;
306     }
307
308     return unless @pids;
309
310     $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
311
312     my $child;
313     my @new_actives;
314
315     # move the children from the active list to the idle list
316     for my $proc (@{$self->{active_list}}) {
317         if(grep { $_ == $proc->{pid} } @pids) {
318             push(@{$self->{idle_list}}, $proc);
319         } else {
320             push(@new_actives, $proc);
321         }
322     }
323
324     $self->{active_list} = [@new_actives];
325
326     $chatty and $logger->internal(sprintf(
327         "server: %d idle and %d active children after status update",
328             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
329 }
330
331 # ----------------------------------------------------------------
332 # Cleans up any child processes that have exited.
333 # In shutdown mode, block until all children have washed ashore
334 # ----------------------------------------------------------------
335 sub reap_children {
336     my($self, $shutdown) = @_;
337     $self->{child_died} = 1;
338
339     while(1) {
340
341         my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
342         last if $pid <= 0;
343
344         $chatty and $logger->internal("server: reaping child $pid");
345
346         my $child = $self->{pid_map}->{$pid};
347
348         close($child->{pipe_to_parent});
349         close($child->{pipe_to_child});
350
351         $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
352         $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
353
354         $self->{num_children}--;
355         delete $self->{pid_map}->{$pid};
356         delete $child->{$_} for keys %$child; # destroy with a vengeance
357     }
358
359     $self->spawn_children unless $shutdown;
360
361     $chatty and $logger->internal(sprintf(
362         "server: %d idle and %d active children after reap_children",
363             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
364
365 }
366
367 # ----------------------------------------------------------------
368 # Spawn up to max_children processes
369 # ----------------------------------------------------------------
370 sub spawn_children {
371     my $self = shift;
372     $self->spawn_child while $self->{num_children} < $self->{min_children};
373 }
374
375 # ----------------------------------------------------------------
376 # Spawns a new child.  If $active is set, the child goes directly
377 # into the active_list.
378 # ----------------------------------------------------------------
379 sub spawn_child {
380     my($self, $active) = @_;
381
382     my $child = OpenSRF::Server::Child->new($self);
383
384     # socket for sending message data to the child
385     if(!socketpair(
386         $child->{pipe_to_child},
387         $child->{pipe_to_parent},
388         AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
389             $logger->error("server: error creating data socketpair: $!");
390             return undef;
391     }
392
393     $child->{pipe_to_child}->autoflush(1);
394     $child->{pipe_to_parent}->autoflush(1);
395
396     $child->{pid} = fork();
397
398     if($child->{pid}) { # parent process
399         $self->{num_children}++;
400         $self->{pid_map}->{$child->{pid}} = $child;
401
402         if($active) {
403             push(@{$self->{active_list}}, $child);
404         } else {
405             push(@{$self->{idle_list}}, $child);
406         }
407
408         $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
409
410         return $child;
411
412     } else { # child process
413
414         $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
415
416         if($self->{stderr_log}) {
417
418             $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
419
420             close STDERR;
421             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
422                 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
423                 open STDERR, '>/dev/null'; # send it back to /dev/null
424             }
425         }
426
427         $child->{pid} = $$;
428         eval {
429             $child->init;
430             $child->run;
431             OpenSRF::Transport::PeerHandle->retrieve->disconnect;
432         };
433         $logger->error("server: child process died: $@") if $@;
434         exit(0);
435     }
436 }
437
438 # ----------------------------------------------------------------
439 # Sends the register command to the configured routers
440 # ----------------------------------------------------------------
441 sub register_routers {
442     my $self = shift;
443
444     my $conf = OpenSRF::Utils::Config->current;
445     my $routers = $conf->bootstrap->routers;
446     my $router_name = $conf->bootstrap->router_name;
447     my @targets;
448
449     for my $router (@$routers) {
450         if(ref $router) {
451
452             if( !$router->{services} ||
453                 !$router->{services}->{service} ||
454                 (
455                     ref($router->{services}->{service}) eq 'ARRAY' and
456                     grep { $_ eq $self->{service} } @{$router->{services}->{service}}
457                 )  || $router->{services}->{service} eq $self->{service}) {
458
459                 my $name = $router->{name};
460                 my $domain = $router->{domain};
461                 push(@targets, "$name\@$domain/router");
462             }
463
464         } else {
465             push(@targets, "$router_name\@$router/router");
466         }
467     }
468
469     foreach (@targets) {
470         $logger->info("server: registering with router $_");
471         $self->{osrf_handle}->send(
472             to => $_,
473             body => 'registering',
474             router_command => 'register',
475             router_class => $self->{service}
476         );
477     }
478
479     $self->{routers} = \@targets;
480 }
481
482 # ----------------------------------------------------------------
483 # Sends the unregister command to any routers we have registered
484 # with.
485 # ----------------------------------------------------------------
486 sub unregister_routers {
487     my $self = shift;
488     return unless $self->{osrf_handle}->tcp_connected;
489
490         for my $router (@{$self->{routers}}) {
491         $logger->info("server: disconnecting from router $router");
492         $self->{osrf_handle}->send(
493             to => $router,
494             body => "unregistering",
495             router_command => "unregister",
496             router_class => $self->{service}
497         );
498     }
499 }
500
501
502 package OpenSRF::Server::Child;
503 use strict;
504 use warnings;
505 use OpenSRF::Transport;
506 use OpenSRF::Application;
507 use OpenSRF::Transport::PeerHandle;
508 use OpenSRF::Transport::SlimJabber::XMPPMessage;
509 use OpenSRF::Utils::Logger qw($logger);
510 use OpenSRF::DomainObject::oilsResponse qw/:status/;
511 use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
512 use Time::HiRes qw(time usleep);
513 use POSIX qw/:sys_wait_h :errno_h/;
514
515 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
516
517 sub new {
518     my($class, $parent) = @_;
519     my $self = bless({}, $class);
520     $self->{pid} = 0; # my process ID
521     $self->{parent} = $parent; # Controller parent process
522     $self->{num_requests} = 0; # total serviced requests
523     $self->{sig_pipe} = 0;  # true if last syswrite failed
524     return $self;
525 }
526
527 sub set_nonblock {
528     my($self, $fh) = @_;
529     my  $flags = fcntl($fh, F_GETFL, 0);
530     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
531 }
532
533 sub set_block {
534     my($self, $fh) = @_;
535     my  $flags = fcntl($fh, F_GETFL, 0);
536     $flags &= ~O_NONBLOCK;
537     fcntl($fh, F_SETFL, $flags);
538 }
539
540 sub open_lock_file {
541     my $self = shift;
542
543     # close our copy of the parent's lock file since we won't be using it
544     close($self->{parent}->{lock_file_handle}) 
545         if $self->{parent}->{lock_file_handle};
546
547     my $fname = $self->{parent}->{lock_file};
548     unless (open($self->{lock_file_handle}, '>>', $fname)) {
549         $logger->error("server: child cannot open lock file $fname : $!");
550         die "server: child cannot open lock file $fname : $!\n";
551     }
552 }
553
554 # ----------------------------------------------------------------
555 # Connects to Jabber and runs the application child_init
556 # ----------------------------------------------------------------
557 sub init {
558     my $self = shift;
559     $self->open_lock_file;
560     my $service = $self->{parent}->{service};
561     $0 = "OpenSRF Drone [$service]";
562     OpenSRF::Transport::PeerHandle->construct($service);
563         OpenSRF::Application->application_implementation->child_init
564                 if (OpenSRF::Application->application_implementation->can('child_init'));
565 }
566
567 # ----------------------------------------------------------------
568 # Waits for messages from the parent process, handles the message,
569 # then goes into the keepalive loop if this is a stateful session.
570 # When max_requests is hit, the process exits.
571 # ----------------------------------------------------------------
572 sub run {
573     my $self = shift;
574     my $network = OpenSRF::Transport::PeerHandle->retrieve;
575
576     # main child run loop.  Ends when this child hits max requests.
577     while(1) {
578
579         my $data = $self->wait_for_request or next;
580
581         # Update process name to show activity
582         my $orig_name = $0;
583         $0 = "$0*";
584
585         # Discard extraneous data from the jabber socket
586         if(!$network->flush_socket()) {
587             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
588             exit;
589         }
590
591         my $session = OpenSRF::Transport->handler(
592             $self->{parent}->{service},
593             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
594         );
595
596         $self->keepalive_loop($session);
597
598         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
599
600         # Tell the parent process we are available to process requests
601         $self->send_status;
602
603         # Repair process name
604         $0 = $orig_name;
605     }
606
607     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
608
609         OpenSRF::Application->application_implementation->child_exit
610                 if (OpenSRF::Application->application_implementation->can('child_exit'));
611 }
612
613 # ----------------------------------------------------------------
614 # waits for a request data on the parent pipe and returns it.
615 # ----------------------------------------------------------------
616 sub wait_for_request {
617     my $self = shift;
618
619     my $data = '';
620     my $read_size = 1024;
621     my $nonblock = 0;
622     my $read_pipe = $self->{pipe_to_parent};
623
624     # wait for some data to start arriving
625     my $read_set = IO::Select->new;
626     $read_set->add($read_pipe);
627
628     while (1) {
629         # if can_read is interrupted while blocking, 
630         # go back and wait again until it it succeeds.
631         last if $read_set->can_read;
632     }
633
634     # Parent has started sending data to us.
635     # Wait for the parent to write all data to the pipe.  Then, immediately release 
636     # the lock so the parent can start writing data to other child processes.  
637     # Note: there is no danger of a subsequent message coming from the parent on 
638     # the same pipe, since this child is now marked as active.
639     flock($self->{lock_file_handle}, LOCK_EX) or $logger->error("server: cannot flock : $!");
640     flock($self->{lock_file_handle}, LOCK_UN) or $logger->error("server: cannot de-flock : $!");
641
642     # we have all data now so all reads can be done in non-blocking mode
643     $self->set_nonblock($read_pipe);
644
645     while(1) {
646         my $sig_pipe = 0;
647         local $SIG{'PIPE'} = sub { $sig_pipe = 1 };
648
649         my $buf = '';
650         my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
651
652         unless(defined $n) {
653
654             if ($sig_pipe) {
655                 $logger->info("server: $self got SIGPIPE reading data from parent, retrying...");
656                 usleep(50000); # 50 msec
657                 next;
658             }
659
660             $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; 
661             last;
662         }
663
664         last if $n <= 0; # no data left to read
665
666         $data .= $buf;
667
668         last if $n < $read_size; # done reading all data
669     }
670
671     # return to blocking mode
672     $self->set_block($self->{pipe_to_parent});
673     return $data;
674 }
675
676
677 # ----------------------------------------------------------------
678 # If this is a stateful opensrf session, wait up to $keepalive
679 # seconds for subsequent requests from the client
680 # ----------------------------------------------------------------
681 sub keepalive_loop {
682     my($self, $session) = @_;
683     my $keepalive = $self->{parent}->{keepalive};
684
685     while($session->state and $session->state == $session->CONNECTED) {
686
687         unless( $session->queue_wait($keepalive) ) {
688
689             # client failed to disconnect before timeout
690             $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
691
692             my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
693                 status => "Disconnected on timeout",
694                 statusCode => STATUS_TIMEOUT
695             );
696
697             $session->status($res);
698             $session->state($session->DISCONNECTED);
699             last;
700         }
701     }
702
703     $chatty and $logger->internal("server: child done with request(s)");
704     $session->kill_me;
705 }
706
707 # ----------------------------------------------------------------
708 # Report our availability to our parent process
709 # ----------------------------------------------------------------
710 sub send_status {
711     my $self = shift;
712
713     for (0..2) {
714
715         $self->{sig_pipe} = 0;
716         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
717
718         syswrite(
719             $self->{pipe_to_parent},
720             sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
721         );
722
723         last unless $self->{sig_pipe};
724         $logger->error("server: $self got SIGPIPE writing status to parent, retrying...");
725         usleep(50000); # 50 msec
726     }
727
728     $logger->error("server: $self unable to send status to parent") if $self->{sig_pipe};
729 }
730
731
732 1;