]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perl/lib/OpenSRF/Server.pm
Replace Net::Server with local pre-forking server
[OpenSRF.git] / src / perl / lib / OpenSRF / Server.pm
1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
16 use strict;
17 use warnings;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
25 use POSIX qw/:sys_wait_h :errno_h/;
26 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
27 use IO::Select;
28 use Socket;
29 our $chatty = 1; # disable for production
30
31 use constant STATUS_PIPE_DATA_SIZE => 12;
32
33 sub new {
34     my($class, $service, %args) = @_;
35     my $self = bless(\%args, $class);
36
37     $self->{service}         = $service; # service name
38     $self->{num_children}    = 0; # number of child processes
39     $self->{osrf_handle}     = undef; # xmpp handle
40     $self->{routers}         = []; # list of registered routers
41     $self->{active_list}     = []; # list of active children
42     $self->{idle_list}       = []; # list of idle children
43
44     $self->{min_spare_children} ||= 0;
45
46     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
47         $self->{max_spare_children} and
48         $self->{max_spare_children} <= $self->{min_spare_children};
49
50     return $self;
51 }
52
53 # ----------------------------------------------------------------
54 # Disconnects from routers and waits for child processes to exit.
55 # ----------------------------------------------------------------
56 sub cleanup {
57     my $self = shift;
58     my $no_exit = shift;
59
60     $logger->info("server: shutting down and cleaning up...");
61
62     # don't get sidetracked by signals while we're cleaning up.
63     # it could result in unexpected behavior with list traversal
64     $SIG{CHLD} = 'IGNORE';
65
66     # terminate the child processes
67     $self->kill_child($_) for
68         (@{$self->{idle_list}}, @{$self->{active_list}});
69
70     # de-register routers
71     $self->unregister_routers;
72
73     $self->{osrf_handle}->disconnect;
74
75     # clean up our dead children
76     $self->reap_children(1);
77
78     exit(0) unless $no_exit;
79 }
80
81
82 # ----------------------------------------------------------------
83 # Waits on the jabber socket for inbound data from the router.
84 # Each new message is passed off to a child process for handling.
85 # At regular intervals, wake up for min/max spare child maintenance
86 # ----------------------------------------------------------------
87 sub run {
88     my $self = shift;
89
90         $logger->set_service($self->{service});
91
92     $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
93     $SIG{CHLD} = sub { $self->reap_children(); };
94
95     $self->spawn_children;
96     $self->build_osrf_handle;
97     $self->register_routers;
98     my $wait_time = 1;
99
100     # main server loop
101     while(1) {
102
103         $self->check_status;
104         $self->{child_died} = 0;
105
106         my $msg = $self->{osrf_handle}->process($wait_time);
107
108         # we woke up for any reason, reset the wait time to allow
109         # for idle maintenance as necessary
110         $wait_time = 1;
111
112         if($msg) {
113
114             if(my $child = pop(@{$self->{idle_list}})) {
115
116                 # we have an idle child to handle the request
117                 $chatty and $logger->internal("server: passing request to idle child $child");
118                 push(@{$self->{active_list}}, $child);
119                 $self->write_child($child, $msg);
120
121             } elsif($self->{num_children} < $self->{max_children}) {
122
123                 # spawning a child to handle the request
124                 $chatty and $logger->internal("server: spawning child to handle request");
125                 $self->write_child($self->spawn_child(1), $msg);
126
127             } else {
128
129                 $logger->warn("server: no children available, waiting...");
130                 $self->check_status(1); # block until child is available
131
132                 my $child = pop(@{$self->{idle_list}});
133                 push(@{$self->{active_list}}, $child);
134                 $self->write_child($child, $msg);
135             }
136
137         } else {
138
139             # don't perform idle maint immediately when woken by SIGCHLD
140             unless($self->{child_died}) {
141
142                 # when we hit equilibrium, there's no need for regular
143                 # maintenance, so set wait_time to 'forever'
144                 $wait_time = -1 unless $self->perform_idle_maintenance;
145             }
146         }
147     }
148 }
149
150 # ----------------------------------------------------------------
151 # Launch a new spare child or kill an extra spare child.  To
152 # prevent large-scale spawning or die-offs, spawn or kill only
153 # 1 process per idle maintenance loop.
154 # Returns true if any idle maintenance occurred, 0 otherwise
155 # ----------------------------------------------------------------
156 sub perform_idle_maintenance {
157     my $self = shift;
158
159     # spawn 1 spare child per maintenance loop if necessary
160     if( $self->{min_spare_children} and
161         $self->{num_children} < $self->{max_children} and
162         scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
163
164         $chatty and $logger->internal("server: spawning spare child");
165         $self->spawn_child;
166         return 1;
167
168     # kill 1 excess spare child per maintenance loop if necessary
169     } elsif($self->{max_spare_children} and
170             $self->{num_children} > $self->{min_children} and
171             scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
172
173         $chatty and $logger->internal("server: killing spare child");
174         $self->kill_child;
175         return 1;
176     }
177
178     return 0;
179 }
180
181 sub kill_child {
182     my $self = shift;
183     my $child = shift || pop(@{$self->{idle_list}}) or return;
184     $chatty and $logger->internal("server: killing child $child");
185     kill('TERM', $child->{pid});
186 }
187
188 # ----------------------------------------------------------------
189 # Jabber connection inbound message arrive on.
190 # ----------------------------------------------------------------
191 sub build_osrf_handle {
192     my $self = shift;
193
194     my $conf = OpenSRF::Utils::Config->current;
195     my $username = $conf->bootstrap->username;
196     my $password = $conf->bootstrap->passwd;
197     my $domain = $conf->bootstrap->domain;
198     my $port = $conf->bootstrap->port;
199     my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
200
201     $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
202
203     $self->{osrf_handle} =
204         OpenSRF::Transport::SlimJabber::Client->new(
205             username => $username,
206             resource => $resource,
207             password => $password,
208             host => $domain,
209             port => $port,
210         );
211
212     $self->{osrf_handle}->initialize;
213 }
214
215
216 # ----------------------------------------------------------------
217 # Sends request data to a child process
218 # ----------------------------------------------------------------
219 sub write_child {
220     my($self, $child, $msg) = @_;
221     my $xml = $msg->to_xml;
222     syswrite($child->{pipe_to_child}, $xml);
223 }
224
225 # ----------------------------------------------------------------
226 # Checks to see if any child process has reported its availability
227 # In blocking mode, blocks until a child has reported.
228 # ----------------------------------------------------------------
229 sub check_status {
230     my($self, $block) = @_;
231
232     my $read_set = IO::Select->new;
233     $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
234
235     my @handles = $read_set->can_read(($block) ? undef : 0) or return;
236
237     my $pid = '';
238     my @pids;
239     for my $pipe (@handles) {
240         sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
241         push(@pids, int($pid));
242     }
243
244     $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
245
246     my $child;
247     my @new_actives;
248
249     # move the children from the active list to the idle list
250     for my $proc (@{$self->{active_list}}) {
251         if(grep { $_ == $proc->{pid} } @pids) {
252             push(@{$self->{idle_list}}, $proc);
253         } else {
254             push(@new_actives, $proc);
255         }
256     }
257
258     $self->{active_list} = [@new_actives];
259
260     $chatty and $logger->internal(sprintf(
261         "server: %d idle and %d active children after status update",
262             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
263 }
264
265 # ----------------------------------------------------------------
266 # Cleans up any child processes that have exited.
267 # In shutdown mode, block until all children have washed ashore
268 # ----------------------------------------------------------------
269 sub reap_children {
270     my($self, $shutdown) = @_;
271     $self->{child_died} = 1;
272
273     while(1) {
274
275         my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
276         return if $pid <= 0;
277
278         $chatty and $logger->internal("server: reaping child $pid");
279
280         my ($child) = grep {$_->{pid} == $pid} (@{$self->{active_list}}, @{$self->{idle_list}});
281
282         close($child->{pipe_to_parent});
283         close($child->{pipe_to_child});
284         delete $child->{$_} for keys %$child; # destroy with a vengeance
285
286         $self->{num_children}--;
287         $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
288         $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
289     }
290
291     $self->spawn_children unless $shutdown;
292
293     $chatty and $logger->internal(sprintf(
294         "server: %d idle and %d active children after reap_children",
295             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
296
297 }
298
299 # ----------------------------------------------------------------
300 # Spawn up to max_children processes
301 # ----------------------------------------------------------------
302 sub spawn_children {
303     my $self = shift;
304     $self->spawn_child while $self->{num_children} < $self->{min_children};
305 }
306
307 # ----------------------------------------------------------------
308 # Spawns a new child.  If $active is set, the child goes directly
309 # into the active_list.
310 # ----------------------------------------------------------------
311 sub spawn_child {
312     my($self, $active) = @_;
313
314     my $child = OpenSRF::Server::Child->new($self);
315
316     # socket for sending message data to the child
317     if(!socketpair(
318         $child->{pipe_to_child},
319         $child->{pipe_to_parent},
320         AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
321             $logger->error("server: error creating data socketpair: $!");
322             return undef;
323     }
324
325     $child->{pipe_to_child}->autoflush(1);
326     $child->{pipe_to_parent}->autoflush(1);
327
328     $child->{pid} = fork();
329
330     if($child->{pid}) { # parent process
331         $self->{num_children}++;
332
333
334         if($active) {
335             push(@{$self->{active_list}}, $child);
336         } else {
337             push(@{$self->{idle_list}}, $child);
338         }
339
340         $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
341
342         return $child;
343
344     } else { # child process
345
346         $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
347
348         $child->{pid} = $$;
349         eval {
350             $child->init;
351             $child->run;
352             OpenSRF::Transport::PeerHandle->retrieve->disconnect;
353         };
354         $logger->error("server: child process died: $@") if $@;
355         exit(0);
356     }
357 }
358
359 # ----------------------------------------------------------------
360 # Sends the register command to the configured routers
361 # ----------------------------------------------------------------
362 sub register_routers {
363     my $self = shift;
364
365     my $conf = OpenSRF::Utils::Config->current;
366     my $routers = $conf->bootstrap->routers;
367     my $router_name = $conf->bootstrap->router_name;
368     my @targets;
369
370     for my $router (@$routers) {
371         if(ref $router) {
372
373             if( !$router->{services} ||
374                 !$router->{services}->{service} ||
375                 (
376                     ref($router->{services}->{service}) eq 'ARRAY' and
377                     grep { $_ eq $self->{service} } @{$router->{services}->{service}}
378                 )  || $router->{services}->{service} eq $self->{service}) {
379
380                 my $name = $router->{name};
381                 my $domain = $router->{domain};
382                 push(@targets, "$name\@$domain/router");
383             }
384
385         } else {
386             push(@targets, "$router_name\@$router/router");
387         }
388     }
389
390     foreach (@targets) {
391         $logger->info("server: registering with router $_");
392         $self->{osrf_handle}->send(
393             to => $_,
394             body => 'registering',
395             router_command => 'register',
396             router_class => $self->{service}
397         );
398     }
399
400     $self->{routers} = \@targets;
401 }
402
403 # ----------------------------------------------------------------
404 # Sends the unregister command to any routers we have registered
405 # with.
406 # ----------------------------------------------------------------
407 sub unregister_routers {
408     my $self = shift;
409     return unless $self->{osrf_handle}->tcp_connected;
410
411         for my $router (@{$self->{routers}}) {
412         $logger->info("server: disconnecting from router $router");
413         $self->{osrf_handle}->send(
414             to => $router,
415             body => "unregistering",
416             router_command => "unregister",
417             router_class => $self->{service}
418         );
419     }
420 }
421
422
423 package OpenSRF::Server::Child;
424 use strict;
425 use warnings;
426 use OpenSRF::Transport;
427 use OpenSRF::Application;
428 use OpenSRF::Transport::PeerHandle;
429 use OpenSRF::Transport::SlimJabber::XMPPMessage;
430 use OpenSRF::Utils::Logger qw($logger);
431 use OpenSRF::DomainObject::oilsResponse qw/:status/;
432 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
433 use Time::HiRes qw(time);
434 use POSIX qw/:sys_wait_h :errno_h/;
435
436 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
437
438 sub new {
439     my($class, $parent) = @_;
440     my $self = bless({}, $class);
441     $self->{pid} = 0; # my process ID
442     $self->{parent} = $parent; # Controller parent process
443     $self->{num_requests} = 0; # total serviced requests
444     return $self;
445 }
446
447 sub set_nonblock {
448     my($self, $fh) = @_;
449     my  $flags = fcntl($fh, F_GETFL, 0);
450     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
451 }
452
453 sub set_block {
454     my($self, $fh) = @_;
455     my  $flags = fcntl($fh, F_GETFL, 0);
456     $flags &= ~O_NONBLOCK;
457     fcntl($fh, F_SETFL, $flags);
458 }
459
460 # ----------------------------------------------------------------
461 # Connects to Jabber and runs the application child_init
462 # ----------------------------------------------------------------
463 sub init {
464     my $self = shift;
465     my $service = $self->{parent}->{service};
466     $0 = "OpenSRF Drone [$service]";
467     OpenSRF::Transport::PeerHandle->construct($service);
468         OpenSRF::Application->application_implementation->child_init
469                 if (OpenSRF::Application->application_implementation->can('child_init'));
470 }
471
472 # ----------------------------------------------------------------
473 # Waits for messages from the parent process, handles the message,
474 # then goes into the keepalive loop if this is a stateful session.
475 # When max_requests is hit, the process exits.
476 # ----------------------------------------------------------------
477 sub run {
478     my $self = shift;
479     my $network = OpenSRF::Transport::PeerHandle->retrieve;
480
481     # main child run loop.  Ends when this child hits max requests.
482     while(1) {
483
484         my $data = $self->wait_for_request or next;
485
486         # Update process name to show activity
487         my $orig_name = $0;
488         $0 = "$0*";
489
490         # Discard extraneous data from the jabber socket
491         if(!$network->flush_socket()) {
492             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
493             exit;
494         }
495
496         my $session = OpenSRF::Transport->handler(
497             $self->{parent}->{service},
498             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
499         );
500
501         $self->keepalive_loop($session);
502
503         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
504
505         # Tell the parent process we are available to process requests
506         $self->send_status;
507
508         # Repair process name
509         $0 = $orig_name;
510     }
511
512     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
513
514         OpenSRF::Application->application_implementation->child_exit
515                 if (OpenSRF::Application->application_implementation->can('child_exit'));
516 }
517
518 # ----------------------------------------------------------------
519 # waits for a request data on the parent pipe and returns it.
520 # ----------------------------------------------------------------
521 sub wait_for_request {
522     my $self = shift;
523
524     my $data = '';
525     my $read_size = 1024;
526     my $nonblock = 0;
527
528     while(1) {
529         # Start out blocking, when data is available, read it all
530
531         my $buf = '';
532         my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
533
534         unless(defined $n) {
535             $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; 
536             last;
537         }
538
539         last if $n <= 0; # no data left to read
540
541         $data .= $buf;
542
543         last if $n < $read_size; # done reading all data
544
545         $self->set_nonblock($self->{pipe_to_parent}) unless $nonblock;
546         $nonblock = 1;
547     }
548
549     $self->set_block($self->{pipe_to_parent}) if $nonblock;
550     return $data;
551 }
552
553
554 # ----------------------------------------------------------------
555 # If this is a stateful opensrf session, wait up to $keepalive
556 # seconds for subsequent requests from the client
557 # ----------------------------------------------------------------
558 sub keepalive_loop {
559     my($self, $session) = @_;
560     my $keepalive = $self->{parent}->{keepalive};
561
562     while($session->state and $session->state == $session->CONNECTED) {
563
564         unless( $session->queue_wait($keepalive) ) {
565
566             # client failed to disconnect before timeout
567             $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
568
569             my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
570                 status => "Disconnected on timeout",
571                 statusCode => STATUS_TIMEOUT
572             );
573
574             $session->status($res);
575             $session->state($session->DISCONNECTED);
576             last;
577         }
578     }
579
580     $chatty and $logger->internal("server: child done with request(s)");
581     $session->kill_me;
582 }
583
584 # ----------------------------------------------------------------
585 # Report our availability to our parent process
586 # ----------------------------------------------------------------
587 sub send_status {
588     my $self = shift;
589     syswrite(
590         $self->{pipe_to_parent},
591         sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
592     );
593 }
594
595
596 1;