]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perl/lib/OpenSRF/Server.pm
Resolve encoding problem found by attempting to register patrons with Unicode names
[OpenSRF.git] / src / perl / lib / OpenSRF / Server.pm
1 # ----------------------------------------------------------------
2 # Copyright (C) 2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # ----------------------------------------------------------------
15 package OpenSRF::Server;
16 use strict;
17 use warnings;
18 use OpenSRF::Transport;
19 use OpenSRF::Application;
20 use OpenSRF::Utils::Config;
21 use OpenSRF::Transport::PeerHandle;
22 use OpenSRF::Utils::SettingsClient;
23 use OpenSRF::Utils::Logger qw($logger);
24 use OpenSRF::Transport::SlimJabber::Client;
25 use Encode;
26 use POSIX qw/:sys_wait_h :errno_h/;
27 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
28 use IO::Select;
29 use Socket;
30 our $chatty = 1; # disable for production
31
32 use constant STATUS_PIPE_DATA_SIZE => 12;
33
34 sub new {
35     my($class, $service, %args) = @_;
36     my $self = bless(\%args, $class);
37
38     $self->{service}        = $service; # service name
39     $self->{num_children}   = 0; # number of child processes
40     $self->{osrf_handle}    = undef; # xmpp handle
41     $self->{routers}        = []; # list of registered routers
42     $self->{active_list}    = []; # list of active children
43     $self->{idle_list}      = []; # list of idle children
44     $self->{pid_map}        = {}; # map of child pid to child for cleaner access
45
46     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
47         if $self->{stderr_log_path};
48
49     $self->{min_spare_children} ||= 0;
50
51     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
52         $self->{max_spare_children} and
53         $self->{max_spare_children} <= $self->{min_spare_children};
54
55     return $self;
56 }
57
58 # ----------------------------------------------------------------
59 # Disconnects from routers and waits for child processes to exit.
60 # ----------------------------------------------------------------
61 sub cleanup {
62     my $self = shift;
63     my $no_exit = shift;
64
65     $logger->info("server: shutting down and cleaning up...");
66
67     # don't get sidetracked by signals while we're cleaning up.
68     # it could result in unexpected behavior with list traversal
69     $SIG{CHLD} = 'IGNORE';
70
71     # terminate the child processes
72     $self->kill_child($_) for
73         (@{$self->{idle_list}}, @{$self->{active_list}});
74
75     # de-register routers
76     $self->unregister_routers;
77
78     $self->{osrf_handle}->disconnect;
79
80     # clean up our dead children
81     $self->reap_children(1);
82
83     exit(0) unless $no_exit;
84 }
85
86
87 # ----------------------------------------------------------------
88 # Waits on the jabber socket for inbound data from the router.
89 # Each new message is passed off to a child process for handling.
90 # At regular intervals, wake up for min/max spare child maintenance
91 # ----------------------------------------------------------------
92 sub run {
93     my $self = shift;
94
95         $logger->set_service($self->{service});
96
97     $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
98     $SIG{CHLD} = sub { $self->reap_children(); };
99
100     $self->spawn_children;
101     $self->build_osrf_handle;
102     $self->register_routers;
103     my $wait_time = 1;
104
105     # main server loop
106     while(1) {
107
108         $self->check_status;
109         $self->{child_died} = 0;
110
111         my $msg = $self->{osrf_handle}->process($wait_time);
112
113         # we woke up for any reason, reset the wait time to allow
114         # for idle maintenance as necessary
115         $wait_time = 1;
116
117         if($msg) {
118
119             if(my $child = pop(@{$self->{idle_list}})) {
120
121                 # we have an idle child to handle the request
122                 $chatty and $logger->internal("server: passing request to idle child $child");
123                 push(@{$self->{active_list}}, $child);
124                 $self->write_child($child, $msg);
125
126             } elsif($self->{num_children} < $self->{max_children}) {
127
128                 # spawning a child to handle the request
129                 $chatty and $logger->internal("server: spawning child to handle request");
130                 $self->write_child($self->spawn_child(1), $msg);
131
132             } else {
133                 $logger->warn("server: no children available, waiting... consider increasing " .
134                     "max_children for this application higher than $self->{max_children} ".
135                     "in the OpenSRF configuration if this message occurs frequently");
136                 $self->check_status(1); # block until child is available
137
138                 my $child = pop(@{$self->{idle_list}});
139                 push(@{$self->{active_list}}, $child);
140                 $self->write_child($child, $msg);
141             }
142
143         } else {
144
145             # don't perform idle maint immediately when woken by SIGCHLD
146             unless($self->{child_died}) {
147
148                 # when we hit equilibrium, there's no need for regular
149                 # maintenance, so set wait_time to 'forever'
150                 $wait_time = -1 if 
151                     !$self->perform_idle_maintenance and # no maintenance performed this time
152                     @{$self->{active_list}} == 0; # no active children 
153             }
154         }
155     }
156 }
157
158 # ----------------------------------------------------------------
159 # Launch a new spare child or kill an extra spare child.  To
160 # prevent large-scale spawning or die-offs, spawn or kill only
161 # 1 process per idle maintenance loop.
162 # Returns true if any idle maintenance occurred, 0 otherwise
163 # ----------------------------------------------------------------
164 sub perform_idle_maintenance {
165     my $self = shift;
166
167     $chatty and $logger->internal(
168         sprintf(
169             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
170             scalar(@{$self->{idle_list}}), 
171             scalar(@{$self->{active_list}}),
172             $self->{min_spare_children},
173             $self->{max_spare_children}
174         )
175     );
176
177     # spawn 1 spare child per maintenance loop if necessary
178     if( $self->{min_spare_children} and
179         $self->{num_children} < $self->{max_children} and
180         scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) {
181
182         $chatty and $logger->internal("server: spawning spare child");
183         $self->spawn_child;
184         return 1;
185
186     # kill 1 excess spare child per maintenance loop if necessary
187     } elsif($self->{max_spare_children} and
188             $self->{num_children} > $self->{min_children} and
189             scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) {
190
191         $chatty and $logger->internal("server: killing spare child");
192         $self->kill_child;
193         return 1;
194     }
195
196     return 0;
197 }
198
199 sub kill_child {
200     my $self = shift;
201     my $child = shift || pop(@{$self->{idle_list}}) or return;
202     $chatty and $logger->internal("server: killing child $child");
203     kill('TERM', $child->{pid});
204 }
205
206 # ----------------------------------------------------------------
207 # Jabber connection inbound message arrive on.
208 # ----------------------------------------------------------------
209 sub build_osrf_handle {
210     my $self = shift;
211
212     my $conf = OpenSRF::Utils::Config->current;
213     my $username = $conf->bootstrap->username;
214     my $password = $conf->bootstrap->passwd;
215     my $domain = $conf->bootstrap->domain;
216     my $port = $conf->bootstrap->port;
217     my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
218
219     $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
220
221     $self->{osrf_handle} =
222         OpenSRF::Transport::SlimJabber::Client->new(
223             username => $username,
224             resource => $resource,
225             password => $password,
226             host => $domain,
227             port => $port,
228         );
229
230     $self->{osrf_handle}->initialize;
231 }
232
233
234 # ----------------------------------------------------------------
235 # Sends request data to a child process
236 # ----------------------------------------------------------------
237 sub write_child {
238     my($self, $child, $msg) = @_;
239     my $xml = decode_utf8($msg->to_xml);
240     syswrite($child->{pipe_to_child}, encode_utf8($xml));
241 }
242
243 # ----------------------------------------------------------------
244 # Checks to see if any child process has reported its availability
245 # In blocking mode, blocks until a child has reported.
246 # ----------------------------------------------------------------
247 sub check_status {
248     my($self, $block) = @_;
249
250     return unless @{$self->{active_list}};
251
252     my $read_set = IO::Select->new;
253     $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
254
255     my @pids;
256
257     while (1) {
258
259         # if can_read or sysread is interrupted while bloking, go back and 
260         # wait again until we have at least 1 free child
261
262         if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
263             my $pid = '';
264             for my $pipe (@handles) {
265                 sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next;
266                 push(@pids, int($pid));
267             }
268         }
269
270         last unless $block and !@pids;
271     }
272
273     return unless @pids;
274
275     $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
276
277     my $child;
278     my @new_actives;
279
280     # move the children from the active list to the idle list
281     for my $proc (@{$self->{active_list}}) {
282         if(grep { $_ == $proc->{pid} } @pids) {
283             push(@{$self->{idle_list}}, $proc);
284         } else {
285             push(@new_actives, $proc);
286         }
287     }
288
289     $self->{active_list} = [@new_actives];
290
291     $chatty and $logger->internal(sprintf(
292         "server: %d idle and %d active children after status update",
293             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
294 }
295
296 # ----------------------------------------------------------------
297 # Cleans up any child processes that have exited.
298 # In shutdown mode, block until all children have washed ashore
299 # ----------------------------------------------------------------
300 sub reap_children {
301     my($self, $shutdown) = @_;
302     $self->{child_died} = 1;
303
304     while(1) {
305
306         my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG);
307         last if $pid <= 0;
308
309         $chatty and $logger->internal("server: reaping child $pid");
310
311         my $child = $self->{pid_map}->{$pid};
312
313         close($child->{pipe_to_parent});
314         close($child->{pipe_to_child});
315
316         $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ];
317         $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ];
318
319         $self->{num_children}--;
320         delete $self->{pid_map}->{$pid};
321         delete $child->{$_} for keys %$child; # destroy with a vengeance
322     }
323
324     $self->spawn_children unless $shutdown;
325
326     $chatty and $logger->internal(sprintf(
327         "server: %d idle and %d active children after reap_children",
328             scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
329
330 }
331
332 # ----------------------------------------------------------------
333 # Spawn up to max_children processes
334 # ----------------------------------------------------------------
335 sub spawn_children {
336     my $self = shift;
337     $self->spawn_child while $self->{num_children} < $self->{min_children};
338 }
339
340 # ----------------------------------------------------------------
341 # Spawns a new child.  If $active is set, the child goes directly
342 # into the active_list.
343 # ----------------------------------------------------------------
344 sub spawn_child {
345     my($self, $active) = @_;
346
347     my $child = OpenSRF::Server::Child->new($self);
348
349     # socket for sending message data to the child
350     if(!socketpair(
351         $child->{pipe_to_child},
352         $child->{pipe_to_parent},
353         AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
354             $logger->error("server: error creating data socketpair: $!");
355             return undef;
356     }
357
358     $child->{pipe_to_child}->autoflush(1);
359     $child->{pipe_to_parent}->autoflush(1);
360
361     $child->{pid} = fork();
362
363     if($child->{pid}) { # parent process
364         $self->{num_children}++;
365         $self->{pid_map}->{$child->{pid}} = $child;
366
367         if($active) {
368             push(@{$self->{active_list}}, $child);
369         } else {
370             push(@{$self->{idle_list}}, $child);
371         }
372
373         $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
374
375         return $child;
376
377     } else { # child process
378
379         $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
380
381         if($self->{stderr_log}) {
382
383             $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
384
385             close STDERR;
386             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
387                 $logger->error("server: unable to open STDERR log file: " . $self->{stderr_log} . " : $@");
388                 open STDERR, '>/dev/null'; # send it back to /dev/null
389             }
390         }
391
392         $child->{pid} = $$;
393         eval {
394             $child->init;
395             $child->run;
396             OpenSRF::Transport::PeerHandle->retrieve->disconnect;
397         };
398         $logger->error("server: child process died: $@") if $@;
399         exit(0);
400     }
401 }
402
403 # ----------------------------------------------------------------
404 # Sends the register command to the configured routers
405 # ----------------------------------------------------------------
406 sub register_routers {
407     my $self = shift;
408
409     my $conf = OpenSRF::Utils::Config->current;
410     my $routers = $conf->bootstrap->routers;
411     my $router_name = $conf->bootstrap->router_name;
412     my @targets;
413
414     for my $router (@$routers) {
415         if(ref $router) {
416
417             if( !$router->{services} ||
418                 !$router->{services}->{service} ||
419                 (
420                     ref($router->{services}->{service}) eq 'ARRAY' and
421                     grep { $_ eq $self->{service} } @{$router->{services}->{service}}
422                 )  || $router->{services}->{service} eq $self->{service}) {
423
424                 my $name = $router->{name};
425                 my $domain = $router->{domain};
426                 push(@targets, "$name\@$domain/router");
427             }
428
429         } else {
430             push(@targets, "$router_name\@$router/router");
431         }
432     }
433
434     foreach (@targets) {
435         $logger->info("server: registering with router $_");
436         $self->{osrf_handle}->send(
437             to => $_,
438             body => 'registering',
439             router_command => 'register',
440             router_class => $self->{service}
441         );
442     }
443
444     $self->{routers} = \@targets;
445 }
446
447 # ----------------------------------------------------------------
448 # Sends the unregister command to any routers we have registered
449 # with.
450 # ----------------------------------------------------------------
451 sub unregister_routers {
452     my $self = shift;
453     return unless $self->{osrf_handle}->tcp_connected;
454
455         for my $router (@{$self->{routers}}) {
456         $logger->info("server: disconnecting from router $router");
457         $self->{osrf_handle}->send(
458             to => $router,
459             body => "unregistering",
460             router_command => "unregister",
461             router_class => $self->{service}
462         );
463     }
464 }
465
466
467 package OpenSRF::Server::Child;
468 use strict;
469 use warnings;
470 use OpenSRF::Transport;
471 use OpenSRF::Application;
472 use OpenSRF::Transport::PeerHandle;
473 use OpenSRF::Transport::SlimJabber::XMPPMessage;
474 use OpenSRF::Utils::Logger qw($logger);
475 use OpenSRF::DomainObject::oilsResponse qw/:status/;
476 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
477 use Time::HiRes qw(time);
478 use POSIX qw/:sys_wait_h :errno_h/;
479
480 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
481
482 sub new {
483     my($class, $parent) = @_;
484     my $self = bless({}, $class);
485     $self->{pid} = 0; # my process ID
486     $self->{parent} = $parent; # Controller parent process
487     $self->{num_requests} = 0; # total serviced requests
488     return $self;
489 }
490
491 sub set_nonblock {
492     my($self, $fh) = @_;
493     my  $flags = fcntl($fh, F_GETFL, 0);
494     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
495 }
496
497 sub set_block {
498     my($self, $fh) = @_;
499     my  $flags = fcntl($fh, F_GETFL, 0);
500     $flags &= ~O_NONBLOCK;
501     fcntl($fh, F_SETFL, $flags);
502 }
503
504 # ----------------------------------------------------------------
505 # Connects to Jabber and runs the application child_init
506 # ----------------------------------------------------------------
507 sub init {
508     my $self = shift;
509     my $service = $self->{parent}->{service};
510     $0 = "OpenSRF Drone [$service]";
511     OpenSRF::Transport::PeerHandle->construct($service);
512         OpenSRF::Application->application_implementation->child_init
513                 if (OpenSRF::Application->application_implementation->can('child_init'));
514 }
515
516 # ----------------------------------------------------------------
517 # Waits for messages from the parent process, handles the message,
518 # then goes into the keepalive loop if this is a stateful session.
519 # When max_requests is hit, the process exits.
520 # ----------------------------------------------------------------
521 sub run {
522     my $self = shift;
523     my $network = OpenSRF::Transport::PeerHandle->retrieve;
524
525     # main child run loop.  Ends when this child hits max requests.
526     while(1) {
527
528         my $data = $self->wait_for_request or next;
529
530         # Update process name to show activity
531         my $orig_name = $0;
532         $0 = "$0*";
533
534         # Discard extraneous data from the jabber socket
535         if(!$network->flush_socket()) {
536             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
537             exit;
538         }
539
540         my $session = OpenSRF::Transport->handler(
541             $self->{parent}->{service},
542             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
543         );
544
545         $self->keepalive_loop($session);
546
547         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
548
549         # Tell the parent process we are available to process requests
550         $self->send_status;
551
552         # Repair process name
553         $0 = $orig_name;
554     }
555
556     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
557
558         OpenSRF::Application->application_implementation->child_exit
559                 if (OpenSRF::Application->application_implementation->can('child_exit'));
560 }
561
562 # ----------------------------------------------------------------
563 # waits for a request data on the parent pipe and returns it.
564 # ----------------------------------------------------------------
565 sub wait_for_request {
566     my $self = shift;
567
568     my $data = '';
569     my $read_size = 1024;
570     my $nonblock = 0;
571
572     while(1) {
573         # Start out blocking, when data is available, read it all
574
575         my $buf = '';
576         my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
577
578         unless(defined $n) {
579             $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; 
580             last;
581         }
582
583         last if $n <= 0; # no data left to read
584
585         $data .= $buf;
586
587         last if $n < $read_size; # done reading all data
588
589         $self->set_nonblock($self->{pipe_to_parent}) unless $nonblock;
590         $nonblock = 1;
591     }
592
593     $self->set_block($self->{pipe_to_parent}) if $nonblock;
594     return $data;
595 }
596
597
598 # ----------------------------------------------------------------
599 # If this is a stateful opensrf session, wait up to $keepalive
600 # seconds for subsequent requests from the client
601 # ----------------------------------------------------------------
602 sub keepalive_loop {
603     my($self, $session) = @_;
604     my $keepalive = $self->{parent}->{keepalive};
605
606     while($session->state and $session->state == $session->CONNECTED) {
607
608         unless( $session->queue_wait($keepalive) ) {
609
610             # client failed to disconnect before timeout
611             $logger->info("server: no request was received in $keepalive seconds, exiting stateful session");
612
613             my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
614                 status => "Disconnected on timeout",
615                 statusCode => STATUS_TIMEOUT
616             );
617
618             $session->status($res);
619             $session->state($session->DISCONNECTED);
620             last;
621         }
622     }
623
624     $chatty and $logger->internal("server: child done with request(s)");
625     $session->kill_me;
626 }
627
628 # ----------------------------------------------------------------
629 # Report our availability to our parent process
630 # ----------------------------------------------------------------
631 sub send_status {
632     my $self = shift;
633     syswrite(
634         $self->{pipe_to_parent},
635         sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
636     );
637 }
638
639
640 1;