From 08ee4f993fe773e37233b139961cbcdae2fe93b8 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 13 Feb 2012 16:53:59 -0500 Subject: [PATCH] Perl pipe reading overhaul : data size header The lockfile mechanism for preventing premature end of reads on child processes suffers from one serious flaw: if the data to write exceeds the pipe buffer size, the parent will block on syswrite and the service will lock up. It's also not as effecient (for the normal case) as the code was without the lockfile, becasue the writes and reads are serialized. This commit replaces the lockfile mechanism with a protocol header in the data. The first X (currently 12) bytes of data written to the child process will contain the full length of the data to be written (minus the header size). The child now reads the data in parallel with the parent as data is available. If the child reads all available data (in the pipe) but not all of the expected data, the child will go back into a select() wait pending more data from the parent. The process continues until all data is read. This same mechanism is already used to commicate status info from child processes to the parent. Signed-off-by: Bill Erickson Signed-off-by: Jason Stephenson Signed-off-by: Mike Rylander --- src/perl/lib/OpenSRF/Server.pm | 122 +++++++++++++-------------------- src/perl/lib/OpenSRF/System.pm | 3 +- 2 files changed, 50 insertions(+), 75 deletions(-) diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm index 90a186d..32954f2 100644 --- a/src/perl/lib/OpenSRF/Server.pm +++ b/src/perl/lib/OpenSRF/Server.pm @@ -24,13 +24,14 @@ use OpenSRF::Utils::Logger qw($logger); use OpenSRF::Transport::SlimJabber::Client; use Encode; use POSIX qw/:sys_wait_h :errno_h/; -use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK); +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Time::HiRes qw/usleep/; use IO::Select; use Socket; our $chatty = 1; # disable for production use constant STATUS_PIPE_DATA_SIZE => 12; +use constant WRITE_PIPE_DATA_SIZE => 12; sub new { my($class, $service, %args) = @_; @@ -48,9 +49,6 @@ sub new { $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" if $self->{stderr_log_path}; - $self->{lock_file} = - sprintf("%s/%s_$$.lock", $self->{lock_file_path}, $self->{service}); - $self->{min_spare_children} ||= 0; $self->{max_spare_children} = $self->{min_spare_children} + 1 if @@ -85,19 +83,9 @@ sub cleanup { # clean up our dead children $self->reap_children(1); - # clean up the lock file - close($self->{lock_file_handle}); - unlink($self->{lock_file}); - exit(0) unless $no_exit; } -sub open_lock_file { - my $self = shift; - open($self->{lock_file_handle}, '>>', $self->{lock_file}) - or die "server: cannot open lock file ".$self->{lock_file} ." : $!\n"; -} - # ---------------------------------------------------------------- # Waits on the jabber socket for inbound data from the router. # Each new message is passed off to a child process for handling. @@ -114,7 +102,6 @@ sub run { $self->spawn_children; $self->build_osrf_handle; $self->register_routers; - $self->open_lock_file; my $wait_time = 1; # main server loop @@ -253,8 +240,10 @@ sub write_child { my($self, $child, $msg) = @_; my $xml = encode_utf8(decode_utf8($msg->to_xml)); - flock($self->{lock_file_handle}, LOCK_EX) or - $logger->error("server: cannot flock : $!"); + # tell the child how much data to expect, minus the header + my $write_size; + {use bytes; $write_size = length($xml)} + $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size); for (0..2) { @@ -262,16 +251,13 @@ sub write_child { local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; }; # send message to child data pipe - syswrite($child->{pipe_to_child}, $xml); + syswrite($child->{pipe_to_child}, $write_size . $xml); last unless $self->{sig_pipe}; $logger->error("server: got SIGPIPE writing to $child, retrying..."); usleep(50000); # 50 msec } - flock($self->{lock_file_handle}, LOCK_UN) or - $logger->error("server: cannot de-flock : $!"); - $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe}; } @@ -508,7 +494,7 @@ use OpenSRF::Transport::PeerHandle; use OpenSRF::Transport::SlimJabber::XMPPMessage; use OpenSRF::Utils::Logger qw($logger); use OpenSRF::DomainObject::oilsResponse qw/:status/; -use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK); +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Time::HiRes qw(time usleep); use POSIX qw/:sys_wait_h :errno_h/; @@ -537,26 +523,11 @@ sub set_block { fcntl($fh, F_SETFL, $flags); } -sub open_lock_file { - my $self = shift; - - # close our copy of the parent's lock file since we won't be using it - close($self->{parent}->{lock_file_handle}) - if $self->{parent}->{lock_file_handle}; - - my $fname = $self->{parent}->{lock_file}; - unless (open($self->{lock_file_handle}, '>>', $fname)) { - $logger->error("server: child cannot open lock file $fname : $!"); - die "server: child cannot open lock file $fname : $!\n"; - } -} - # ---------------------------------------------------------------- # Connects to Jabber and runs the application child_init # ---------------------------------------------------------------- sub init { my $self = shift; - $self->open_lock_file; my $service = $self->{parent}->{service}; $0 = "OpenSRF Drone [$service]"; OpenSRF::Transport::PeerHandle->construct($service); @@ -617,59 +588,64 @@ sub wait_for_request { my $self = shift; my $data = ''; - my $read_size = 1024; + my $buf_size = 4096; my $nonblock = 0; my $read_pipe = $self->{pipe_to_parent}; - - # wait for some data to start arriving - my $read_set = IO::Select->new; - $read_set->add($read_pipe); + my $data_size; + my $total_read; + my $first_read = 1; while (1) { - # if can_read is interrupted while blocking, - # go back and wait again until it it succeeds. - last if $read_set->can_read; - } - # Parent has started sending data to us. - # Wait for the parent to write all data to the pipe. Then, immediately release - # the lock so the parent can start writing data to other child processes. - # Note: there is no danger of a subsequent message coming from the parent on - # the same pipe, since this child is now marked as active. - flock($self->{lock_file_handle}, LOCK_EX) or $logger->error("server: cannot flock : $!"); - flock($self->{lock_file_handle}, LOCK_UN) or $logger->error("server: cannot de-flock : $!"); - - # we have all data now so all reads can be done in non-blocking mode - $self->set_nonblock($read_pipe); + # wait for some data to start arriving + my $read_set = IO::Select->new; + $read_set->add($read_pipe); + + while (1) { + # if can_read is interrupted while blocking, + # go back and wait again until it succeeds. + last if $read_set->can_read; + } - while(1) { - my $sig_pipe = 0; - local $SIG{'PIPE'} = sub { $sig_pipe = 1 }; + # parent started writing, let's start reading + $self->set_nonblock($read_pipe); - my $buf = ''; - my $n = sysread($self->{pipe_to_parent}, $buf, $read_size); + while (1) { + # pull as much data from the pipe as possible - unless(defined $n) { + my $buf = ''; + my $bytes_read = sysread($self->{pipe_to_parent}, $buf, $buf_size); - if ($sig_pipe) { - $logger->info("server: $self got SIGPIPE reading data from parent, retrying..."); - usleep(50000); # 50 msec - next; + unless(defined $bytes_read) { + $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; + last; } - $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; - last; + last if $bytes_read <= 0; # no more data available for reading + + $total_read += $bytes_read; + $data .= $buf; } - last if $n <= 0; # no data left to read + # we've read all the data currently available on the pipe. + # let's see if we're done. + + if ($first_read) { + # extract the data size and remove the size header + my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE; + $data_size = int(substr($data, 0, $wps_size)) + $wps_size; + $data = substr($data, $wps_size); + $first_read = 0; + } - $data .= $buf; + $self->set_block($self->{pipe_to_parent}); - last if $n < $read_size; # done reading all data + if ($total_read == $data_size) { + # we've read all the data. Nothing left to do + last; + } } - # return to blocking mode - $self->set_block($self->{pipe_to_parent}); return $data; } diff --git a/src/perl/lib/OpenSRF/System.pm b/src/perl/lib/OpenSRF/System.pm index a720d26..7fd7195 100644 --- a/src/perl/lib/OpenSRF/System.pm +++ b/src/perl/lib/OpenSRF/System.pm @@ -99,8 +99,7 @@ sub run_service { min_children => $getval->(unix_config => 'min_children') || 1, min_spare_children => $getval->(unix_config => 'min_spare_children'), max_spare_children => $getval->(unix_config => 'max_spare_children'), - stderr_log_path => $stderr_path, - lock_file_path => $pid_dir + stderr_log_path => $stderr_path ); while(1) { -- 2.43.2