From 0201ca954002eb241d277c3068659bb1f8100bab Mon Sep 17 00:00:00 2001 From: Mike Rylander Date: Tue, 24 Oct 2017 13:27:37 -0400 Subject: [PATCH] LP#1729610: Allow queuing (for a while) during child backlog This patch teaches OpenSRF listeners for Perl services how to maintain a queue of requests in case no drone process is immediately available to process a requeust. Signed-off-by: Mike Rylander Signed-off-by: Galen Charlton Signed-off-by: Bill Erickson Signed-off-by: Mike Rylander --- src/perl/lib/OpenSRF/Server.pm | 40 ++++++++++++++++--- .../Transport/SlimJabber/XMPPReader.pm | 5 +-- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm index 8efbc4c..0eaac40 100644 --- a/src/perl/lib/OpenSRF/Server.pm +++ b/src/perl/lib/OpenSRF/Server.pm @@ -51,6 +51,7 @@ sub new { if $self->{stderr_log_path}; $self->{min_spare_children} ||= 0; + $self->{max_backlog_queue} ||= 1000; $self->{max_spare_children} = $self->{min_spare_children} + 1 if $self->{max_spare_children} and @@ -153,13 +154,20 @@ sub run { $self->register_routers; my $wait_time = 1; + my @max_children_msg_queue; + # main server loop while(1) { + my $from_network = 0; $self->check_status; $self->{child_died} = 0; - my $msg = $self->{osrf_handle}->process($wait_time); + my $msg = shift(@max_children_msg_queue); + + # no pending message, so wait for the next one forever + $from_network = $wait_time = -1 if (!$msg); + $msg ||= $self->{osrf_handle}->process($wait_time); # we woke up for any reason, reset the wait time to allow # for idle maintenance as necessary @@ -188,11 +196,33 @@ sub run { $logger->warn("server: no children available, waiting... consider increasing " . "max_children for this application higher than $self->{max_children} ". "in the OpenSRF configuration if this message occurs frequently"); - $self->check_status(1); # block until child is available - my $child = pop(@{$self->{idle_list}}); - push(@{$self->{active_list}}, $child); - $self->write_child($child, $msg); + if ($from_network) { + push @max_children_msg_queue, $msg; + } else { + unshift @max_children_msg_queue, $msg; + } + + if (@max_children_msg_queue < $self->{max_backlog_queue}) { + # We still have room on the queue. Set the wait time to + # 1s, waiting for a drone to be freed up and reprocess + # this (and any other) queued messages. + $wait_time = 1; + if (!$from_network) { + # if we got here, we had retrieved a message from the queue + # but couldn't process it... but also hadn't fetched any + # additional messages from the network. Doing so now, + # as otherwise only one message will ever get queued + $msg = $self->{osrf_handle}->process($wait_time); + if ($msg) { + $chatty and $logger->debug("server: queuing new message after a re-queue"); + push @max_children_msg_queue, $msg; + } + } + } else { + # We'll just have to wait + $self->check_status(1); # block until child is available + } } } else { diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm index 0a84ae1..766df6a 100644 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm +++ b/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm @@ -379,9 +379,8 @@ sub flush_socket { my $self = shift; return 0 unless $self->connected; - while ($self->wait(0)) { - # TODO remove this log line - $logger->info("flushing data from socket..."); + while (my $excess = $self->wait(0)) { + $logger->info("flushing data from socket... $excess"); } return $self->connected; -- 2.43.2