From f469ca73214b0076db2ceaf04a4e0f7a29d50b83 Mon Sep 17 00:00:00 2001 From: miker Date: Thu, 29 Jul 2010 17:40:03 +0000 Subject: [PATCH 1/1] Add response chunking support to the Perl implementation of OpenSRF Two new optional paramters to register_method are now supported: * max_chunk_size * max_chunk_count OpenSRF has always supported message bundling, but only respond_complete made use of this fact by sending the final result message and the completion status message in the same XMPP envelope. Now, on a per method basis, RESULT messages can be bundled (cached) until one of three conditions occurs: * The size of the JSON of the RESULT messages matches or exceeds max_chunk_size * The number of RESULT messages cached matches or exceeds max_chunk_count * respond_complete is called (which happens implicitly by returning from a method) Because the overhead of sending multiple XMPP messages far outweighs the caching and cache management costs of chunking, the default for max_chunk_size is set at 10240 bytes (10k). The default for max_chunk_count is 0. To turn off chunking completely, set the max_chunk_size register_method parameter to 0. git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1985 9efc2488-bf62-4759-914b-345cdb29e865 --- src/perl/lib/OpenSRF/AppSession.pm | 44 ++++++++++++++++++++++++++++- src/perl/lib/OpenSRF/Application.pm | 15 ++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/src/perl/lib/OpenSRF/AppSession.pm b/src/perl/lib/OpenSRF/AppSession.pm index 851f02c..d450159 100644 --- a/src/perl/lib/OpenSRF/AppSession.pm +++ b/src/perl/lib/OpenSRF/AppSession.pm @@ -826,6 +826,12 @@ sub new { threadTrace => $threadTrace, payload => $payload, complete => 0, + resp_count => 0, + max_chunk_count => 0, + current_chunk_count => 0, + max_chunk_size => 0, + current_chunk_size => 0, + current_chunk => [], timeout_reset => 0, recv_timeout => 30, remaining_recv_timeout => 30, @@ -839,6 +845,20 @@ sub new { return $self; } +sub max_chunk_count { + my $self = shift; + my $value = shift; + $self->{max_chunk_count} = $value if (defined($value)); + return $self->{max_chunk_count}; +} + +sub max_chunk_size { + my $self = shift; + my $value = shift; + $self->{max_chunk_size} = $value if (defined($value)); + return $self->{max_chunk_size}; +} + sub recv_timeout { my $self = shift; my $timeout = shift; @@ -969,6 +989,26 @@ sub respond { $response->content($msg); } + if ($self->{max_chunk_count} > 0 or $self->{max_chunk_size} > 0) { # we are chunking, and we need to test the size or count + + $self->{current_chunk_size} += OpenSRF::Utils::JSON->perl2JSON($response); + push @{$self->{current_chunk}}, $response; + $self->{current_chunk_count}++; + + if ( + ($self->{max_chunk_size} && $self->{current_chunk_size} >= $self->{max_chunk_size} ) || + ($self->{max_chunk_count} && $self->{current_chunk_count} >= $self->{max_chunk_count}) + ) { # send chunk and reset + my $send_res = $self->session->send(( map { ('RESULT', $_) } @{$self->{current_chunk}} ), $self->threadTrace); + $self->{current_chunk} = []; + $self->{current_chunk_size} = 0; + $self->{current_chunk_count} = 0; + return $send_res; + } else { # not at a chunk yet, just queue it up + return $self->session->app_request( $self->threadTrace ); + } + } + $self->session->send('RESULT', $response, $self->threadTrace); } @@ -985,12 +1025,14 @@ sub respond_complete { $response->content($msg); } + push @{$self->{current_chunk}}, $response; + my $stat = OpenSRF::DomainObject::oilsConnectStatus->new( statusCode => STATUS_COMPLETE(), status => 'Request Complete' ); - $self->session->send( 'RESULT' => $response, 'STATUS' => $stat, $self->threadTrace); + $self->session->send( ( map { ('RESULT', $_) } @{$self->{current_chunk}} ), 'STATUS' => $stat, $self->threadTrace); $self->complete(1); } diff --git a/src/perl/lib/OpenSRF/Application.pm b/src/perl/lib/OpenSRF/Application.pm index 0329a02..d1e3d8f 100644 --- a/src/perl/lib/OpenSRF/Application.pm +++ b/src/perl/lib/OpenSRF/Application.pm @@ -48,6 +48,19 @@ sub argc { return $self->{argc}; } +sub max_chunk_size { + my $self = shift; + return 0 unless ref($self); + return $self->{max_chunk_size} if (defined($self->{max_chunk_size})); + return 10240; +} + +sub max_chunk_count { + my $self = shift; + return 0 unless ref($self); + return $self->{max_chunk_count} || 0; +} + sub api_name { my $self = shift; return 1 unless ref($self); @@ -138,6 +151,8 @@ sub handler { if (ref $coderef) { my @args = $app_msg->params; my $appreq = OpenSRF::AppRequest->new( $session ); + $appreq->max_chunk_size( $coderef->max_chunk_size ); + $appreq->max_chunk_count( $coderef->max_chunk_count ); $log->debug( "in_request = $in_request : [" . $appreq->threadTrace."]", INTERNAL ); if( $in_request ) { -- 2.43.2