]> git.evergreen-ils.org Git - Evergreen.git/blob - OpenSRF/src/perlmods/OpenSRF/AppSession.pm
8e0b484f09cd1d7b3820ad04373f8c5934b4e6f6
[Evergreen.git] / OpenSRF / src / perlmods / OpenSRF / AppSession.pm
1 package OpenSRF::AppSession;
2 use OpenSRF::DOM;
3 #use OpenSRF::DOM::Element::userAuth;
4 use OpenSRF::DomainObject::oilsMessage;
5 use OpenSRF::DomainObject::oilsMethod;
6 use OpenSRF::DomainObject::oilsResponse qw/:status/;
7 use OpenSRF::Transport::PeerHandle;
8 use OpenSRF::Utils::Logger qw(:level);
9 use OpenSRF::Utils::SettingsClient;
10 use OpenSRF::Utils::Config;
11 use OpenSRF::EX;
12 use OpenSRF;
13 use Exporter;
14 use base qw/Exporter OpenSRF/;
15 use Time::HiRes qw( time usleep );
16 use warnings;
17 use strict;
18
19 our @EXPORT_OK = qw/CONNECTING INIT_CONNECTED CONNECTED DISCONNECTED CLIENT SERVER/;
20 our %EXPORT_TAGS = ( state => [ qw/CONNECTING INIT_CONNECTED CONNECTED DISCONNECTED/ ],
21                  endpoint => [ qw/CLIENT SERVER/ ],
22 );
23
24 my $logger = "OpenSRF::Utils::Logger";
25
26 our %_CACHE;
27 our @_CLIENT_CACHE;
28 our @_RESEND_QUEUE;
29
30 sub kill_client_session_cache {
31         for my $session ( @_CLIENT_CACHE ) {
32                 $session->kill_me;
33         }
34 }
35
36 sub CONNECTING { return 3 };
37 sub INIT_CONNECTED { return 4 };
38 sub CONNECTED { return 1 };
39 sub DISCONNECTED { return 2 };
40
41 sub CLIENT { return 2 };
42 sub SERVER { return 1 };
43
44 sub find {
45         return undef unless (defined $_[1]);
46         return $_CACHE{$_[1]} if (exists($_CACHE{$_[1]}));
47 }
48
49 sub find_client {
50         my( $self, $app ) = @_;
51         $logger->debug( "Client Cache contains: " .scalar(@_CLIENT_CACHE), INTERNAL );
52         my ($client) = grep { $_->[0] eq $app and $_->[1] == 1 } @_CLIENT_CACHE;
53         $client->[1] = 0;
54         return $client->[2];
55 }
56
57 sub transport_connected {
58         my $self = shift;
59         if( ! exists $self->{peer_handle} || ! $self->{peer_handle} ) {
60                 return 0;
61         }
62         return $self->{peer_handle}->tcp_connected();
63 }
64
65 sub connected {
66         my $self = shift;
67         return $self->state == CONNECTED;
68 }
69 # ----------------------------------------------------------------------------
70 # Clears the transport buffers
71 # call this if you are not through with the sesssion, but you want 
72 # to have a clean slate.  You shouldn't have to call this if
73 # you are correctly 'recv'ing all of the data from a request.
74 # however, if you don't want all of the data, this will
75 # slough off any excess
76 #  * * Note: This will delete data for all sessions using this transport
77 # handle.  For example, all client sessions use the same handle.
78 # ----------------------------------------------------------------------------
79 sub buffer_reset {
80
81         my $self = shift;
82         if( ! exists $self->{peer_handle} || ! $self->{peer_handle} ) { 
83                 return 0;
84         }
85         $self->{peer_handle}->buffer_reset();
86 }
87
88
89 sub client_cache {
90         my $self = shift;
91         push @_CLIENT_CACHE, [ $self->app, 1, $self ];
92 }
93
94 # when any incoming data is received, this method is called.
95 sub server_build {
96         my $class = shift;
97         $class = ref($class) || $class;
98
99         my $sess_id = shift;
100         my $remote_id = shift;
101         my $service = shift;
102
103         warn "Missing args to server_build():\n" .
104                 "sess_id: $sess_id, remote_id: $remote_id, service: $service\n" 
105                 unless ($sess_id and $remote_id and $service);
106
107         return undef unless ($sess_id and $remote_id and $service);
108
109         if ( my $thingy = $class->find($sess_id) ) {
110                 $thingy->remote_id( $remote_id );
111                 $logger->debug( "AppSession returning existing session $sess_id", DEBUG );
112                 return $thingy;
113         } else {
114                 $logger->debug( "AppSession building new server session $sess_id", DEBUG );
115         }
116
117         if( $service eq "client" ) {
118                 #throw OpenSRF::EX::PANIC ("Attempting to build a client session as a server" .
119                 #       " Session ID [$sess_id], remote_id [$remote_id]");
120
121                 warn "Attempting to build a client session as ".
122                                 "a server Session ID [$sess_id], remote_id [$remote_id]";
123
124                 $logger->debug("Attempting to build a client session as ".
125                                 "a server Session ID [$sess_id], remote_id [$remote_id]", ERROR );
126
127                 return undef;
128         }
129
130         my $config_client = OpenSRF::Utils::SettingsClient->new();
131         my $stateless = $config_client->config_value("apps", $service, "stateless");
132
133         #my $max_requests = $conf->$service->max_requests;
134         my $max_requests        = $config_client->config_value("apps",$service,"max_requests");
135         $logger->debug( "Max Requests for $service is $max_requests", INTERNAL ) if (defined $max_requests);
136
137         $logger->transport( "AppSession creating new session: $sess_id", INTERNAL );
138
139         my $self = bless { recv_queue  => [],
140                            request_queue  => [],
141                            requests  => 0,
142                            session_data  => {},
143                            callbacks  => {},
144                            endpoint    => SERVER,
145                            state       => CONNECTING, 
146                            session_id  => $sess_id,
147                            remote_id    => $remote_id,
148                                 peer_handle => OpenSRF::Transport::PeerHandle->retrieve($service),
149                                 max_requests => $max_requests,
150                                 session_threadTrace => 0,
151                                 service => $service,
152                                 stateless => $stateless,
153                          } => $class;
154
155         return $_CACHE{$sess_id} = $self;
156 }
157
158 sub session_data {
159         my $self = shift;
160         my ($name, $datum) = @_;
161
162         $self->{session_data}->{$name} = $datum if (defined $datum);
163         return $self->{session_data}->{$name};
164 }
165
166 sub service { return shift()->{service}; }
167
168 sub continue_request {
169         my $self = shift;
170         $self->{'requests'}++;
171         return 1 if (!$self->{'max_requests'});
172         return $self->{'requests'} <= $self->{'max_requests'} ? 1 : 0;
173 }
174
175 sub last_sent_payload {
176         my( $self, $payload ) = @_;
177         if( $payload ) {
178                 return $self->{'last_sent_payload'} = $payload;
179         }
180         return $self->{'last_sent_payload'};
181 }
182
183 sub last_sent_type {
184         my( $self, $type ) = @_;
185         if( $type ) {
186                 return $self->{'last_sent_type'} = $type;
187         }
188         return $self->{'last_sent_type'};
189 }
190
191 sub get_app_targets {
192         my $app = shift;
193
194         my $conf = OpenSRF::Utils::Config->current;
195         my $router_name = $conf->bootstrap->router_name || 'router';
196         my $routers = $conf->bootstrap->domains;
197
198         unless($router_name and $routers) {
199                 throw OpenSRF::EX::Config 
200                         ("Missing router config information 'router_name' and 'routers'");
201         }
202
203         my @targets;
204         for my $router (@$routers) {
205                 push @targets, "$router_name\@$router/$app";
206         }
207
208         return @targets;
209 }
210
211 sub stateless {
212         my $self = shift;
213         if($self) {return $self->{stateless};}
214 }
215
216 # When we're a client and we want to connect to a remote service
217 # create( $app, username => $user, secret => $passwd );
218 #    OR
219 # create( $app, sysname => $user, secret => $shared_secret );
220 sub create {
221         my $class = shift;
222         $class = ref($class) || $class;
223
224         my $app = shift;
225
226
227         
228         if( my $thingy = OpenSRF::AppSession->find_client( $app ) ) {
229                         $logger->debug( 
230                                 "AppSession returning existing client session for $app", DEBUG );
231                         return $thingy;
232         } else {
233                 $logger->debug( "AppSession creating new client session for $app", DEBUG );
234         }
235
236         my $stateless = 0;
237         my $c = OpenSRF::Utils::SettingsClient->new();
238         # we can get an infinite loop if we're grabbing the settings and we
239         # need the settings to grab the settings...
240         if($app ne "opensrf.settings" || $c->has_config()) { 
241                 $stateless = $c->config_value("apps", $app, "stateless");
242         }
243
244         my $sess_id = time . rand( $$ );
245         while ( $class->find($sess_id) ) {
246                 $sess_id = time . rand( $$ );
247         }
248
249         
250         my ($r_id) = get_app_targets($app);
251
252         my $peer_handle = OpenSRF::Transport::PeerHandle->retrieve("client"); 
253         if( ! $peer_handle ) {
254                 $peer_handle = OpenSRF::Transport::PeerHandle->retrieve("system_client");
255         }
256
257         my $self = bless { app_name    => $app,
258                                 #client_auth => $auth,
259                            #recv_queue  => [],
260                            request_queue  => [],
261                            endpoint    => CLIENT,
262                            state       => DISCONNECTED,#since we're init'ing
263                            session_id  => $sess_id,
264                            remote_id   => $r_id,
265                            api_level   => 1,
266                            orig_remote_id   => $r_id,
267                                 peer_handle => $peer_handle,
268                                 session_threadTrace => 0,
269                                 stateless               => $stateless,
270                          } => $class;
271
272         $self->client_cache();
273         $_CACHE{$sess_id} = $self;
274         return $self->find_client( $app );
275 }
276
277 sub api_level {
278         return shift()->{api_level};
279 }
280
281 sub app {
282         return shift()->{app_name};
283 }
284
285 sub reset {
286         my $self = shift;
287         $self->remote_id($$self{orig_remote_id});
288 }
289
290 # 'connect' can be used as a constructor if called as a class method,
291 # or used to connect a session that has disconnectd if called against
292 # an existing session that seems to be disconnected, or was just built
293 # using 'create' above.
294
295 # connect( $app, username => $user, secret => $passwd );
296 #    OR
297 # connect( $app, sysname => $user, secret => $shared_secret );
298
299 # --- Returns undef if the connect attempt times out.
300 # --- Returns the OpenSRF::EX object if one is returned by the server
301 # --- Returns self if connected
302 sub connect {
303         my $self = shift;
304         my $class = ref($self) || $self;
305
306
307         if ( ref( $self ) and  $self->state && $self->state == CONNECTED  ) {
308                 $logger->transport("AppSession already connected", DEBUG );
309         } else {
310                 $logger->transport("AppSession not connected, connecting..", DEBUG );
311         }
312         return $self if ( ref( $self ) and  $self->state && $self->state == CONNECTED  );
313
314
315         my $app = shift;
316         my $api_level = shift;
317         $api_level = 1 unless (defined $api_level);
318
319         $self = $class->create($app, @_) if (!ref($self));
320
321
322         return undef unless ($self);
323
324         $self->{api_level} = $api_level;
325
326         $self->reset;
327         $self->state(CONNECTING);
328         $self->send('CONNECT', "");
329
330         # if we want to connect to settings, we may not have 
331         # any data for the settings client to work with...
332         # just using a default for now XXX
333
334         my $time_remaining = 5;
335         
336 =head blah
337         my $client = OpenSRF::Utils::SettingsClient->new();
338         my $trans = $client->config_value("client_connection","transport_host");
339
340         if(!ref($trans)) {
341                 $time_remaining = $trans->{connect_timeout};
342         } else {
343                 # XXX for now, just use the first
344                 $time_remaining = $trans->[0]->{connect_timeout};
345         }
346 =cut
347
348         while ( $self->state != CONNECTED  and $time_remaining > 0 ) {
349                 my $starttime = time;
350                 $self->queue_wait($time_remaining);
351                 my $endtime = time;
352                 $time_remaining -= ($endtime - $starttime);
353         }
354
355         return undef unless($self->state == CONNECTED);
356
357         return $self;
358 }
359
360 sub finish {
361         my $self = shift;
362         if( ! $self->session_id ) {
363                 return 0;
364         }
365         #$self->disconnect if ($self->endpoint == CLIENT);
366         for my $ses ( @_CLIENT_CACHE ) {
367                 if ($ses->[2]->session_id eq $self->session_id) {
368                         $ses->[1] = 1;
369                 }
370         }
371 }
372
373 sub unregister_callback {
374         my $self = shift;
375         my $type = shift;
376         my $cb = shift;
377         if (exists $self->{callbacks}{$type}) {
378                 delete $self->{callbacks}{$type}{$cb};
379                 return $cb;
380         }
381         return undef;
382 }
383
384 sub register_callback {
385         my $self = shift;
386         my $type = shift;
387         my $cb = shift;
388         my $cb_key = "$cb";
389         $self->{callbacks}{$type}{$cb_key} = $cb;
390         return $cb_key;
391 }
392
393 sub kill_me {
394         my $self = shift;
395         if( ! $self->session_id ) { return 0; }
396
397         # run each 'death' callback;
398         if (exists $self->{callbacks}{death}) {
399                 for my $sub (values %{$self->{callbacks}{death}}) {
400                         $sub->($self);
401                 }
402         }
403
404         $self->disconnect;
405         $logger->transport( "AppSession killing self: " . $self->session_id(), DEBUG );
406         my @a;
407         for my $ses ( @_CLIENT_CACHE ) {
408                 push @a, $ses 
409                         if ($ses->[2]->session_id ne $self->session_id);
410         }
411         @_CLIENT_CACHE = @a;
412         delete $_CACHE{$self->session_id};
413         delete($$self{$_}) for (keys %$self);
414 }
415
416 sub disconnect {
417         my $self = shift;
418
419         # run each 'disconnect' callback;
420         if (exists $self->{callbacks}{disconnect}) {
421                 for my $sub (values %{$self->{callbacks}{disconnect}}) {
422                         $sub->($self);
423                 }
424         }
425
426         if( $self->stateless and $self->state != CONNECTED ) {
427                 $self->reset;
428                 return;
429         }
430
431         unless( $self->state == DISCONNECTED ) {
432                 $self->send('DISCONNECT', "") if ($self->endpoint == CLIENT);
433                 $self->state( DISCONNECTED ); 
434         }
435
436         $self->reset;
437 }
438
439 sub request {
440         my $self = shift;
441         my $meth = shift;
442         return unless $self;
443
444         my $method;
445         if (!ref $meth) {
446                 $method = new OpenSRF::DomainObject::oilsMethod ( method => $meth );
447         } else {
448                 $method = $meth;
449         }
450         
451         $method->params( @_ );
452
453         $self->send('REQUEST',$method);
454 }
455
456 sub full_request {
457         my $self = shift;
458         my $meth = shift;
459
460         my $method;
461         if (!ref $meth) {
462                 $method = new OpenSRF::DomainObject::oilsMethod ( method => $meth );
463         } else {
464                 $method = $meth;
465         }
466         
467         $method->params( @_ );
468
469         $self->send(CONNECT => '', REQUEST => $method, DISCONNECT => '');
470 }
471
472 sub send {
473         my $self = shift;
474         my @payload_list = @_; # this is a Domain Object
475
476         return unless ($self and $self->{peer_handle});
477
478         $logger->debug( "In send", INTERNAL );
479         
480         my $tT;
481
482         if( @payload_list % 2 ) { $tT = pop @payload_list; }
483
484         if( ! @payload_list ) {
485                 $logger->debug( "payload_list param is incomplete in AppSession::send()", ERROR );
486                 return undef; 
487         }
488
489         my @doc = ();
490
491         $logger->debug( "In send2", INTERNAL );
492
493         my $disconnect = 0;
494         my $connecting = 0;
495
496         while( @payload_list ) {
497
498                 my ($msg_type, $payload) = ( shift(@payload_list), shift(@payload_list) ); 
499
500                 if ($msg_type eq 'DISCONNECT' ) {
501                         $disconnect++;
502                         if( $self->state == DISCONNECTED && !$connecting) {
503                                 next;
504                         }
505                 }
506
507                 if( $msg_type eq "CONNECT" ) { 
508                         $connecting++; 
509                 }
510
511                 my $msg = OpenSRF::DomainObject::oilsMessage->new();
512                 $msg->type($msg_type);
513         
514                 no warnings;
515                 $msg->threadTrace( $tT || int($self->session_threadTrace) || int($self->last_threadTrace) );
516                 use warnings;
517         
518                 if ($msg->type eq 'REQUEST') {
519                         if ( !defined($tT) || $self->last_threadTrace != $tT ) {
520                                 $msg->update_threadTrace;
521                                 $self->session_threadTrace( $msg->threadTrace );
522                                 $tT = $self->session_threadTrace;
523                                 OpenSRF::AppRequest->new($self, $payload);
524                         }
525                 }
526         
527                 $msg->api_level($self->api_level);
528                 $msg->payload($payload) if $payload;
529         
530                 push @doc, $msg;
531
532         
533                 $logger->debug( "AppSession sending ".$msg->type." to ".$self->remote_id.
534                         " with threadTrace [".$msg->threadTrace."]", INFO );
535
536         }
537         
538         if ($self->endpoint == CLIENT and ! $disconnect) {
539                 $self->queue_wait(0);
540
541
542                 if($self->stateless && $self->state != CONNECTED) {
543                         $self->reset;
544                         $logger->debug("AppSession is stateless in send", DEBUG );
545                 }
546
547                 if( !$self->stateless and $self->state != CONNECTED ) {
548
549                         $logger->debug( "Sending connect before request 1", INTERNAL );
550
551                         unless (($self->state == CONNECTING && $connecting )) {
552                                 $logger->debug( "Sending connect before request 2", INTERNAL );
553                                 my $v = $self->connect();
554                                 if( ! $v ) {
555                                         $logger->debug( "Unable to connect to remote service in AppSession::send()", ERROR );
556                                         return undef;
557                                 }
558                                 if( ref($v) and $v->can("class") and $v->class->isa( "OpenSRF::EX" ) ) {
559                                         return $v;
560                                 }
561                         }
562                 }
563
564         } 
565         $logger->debug( "AppSession sending doc: " . JSON->perl2JSON(\@doc), INTERNAL );
566
567
568         $self->{peer_handle}->send( 
569                                         to     => $self->remote_id,
570                                    thread => $self->session_id,
571                                    body   => JSON->perl2JSON(\@doc) );
572
573         if( $disconnect) {
574                 $self->state( DISCONNECTED );
575         }
576
577         return $self->app_request( $tT );
578 }
579
580 sub app_request {
581         my $self = shift;
582         my $tT = shift;
583         
584         return undef unless (defined $tT);
585         my ($req) = grep { $_->threadTrace == $tT } @{ $self->{request_queue} };
586
587         return $req;
588 }
589
590 sub remove_app_request {
591         my $self = shift;
592         my $req = shift;
593         
594         my @list = grep { $_->threadTrace != $req->threadTrace } @{ $self->{request_queue} };
595
596         $self->{request_queue} = \@list;
597 }
598
599 sub endpoint {
600         return $_[0]->{endpoint};
601 }
602
603
604 sub session_id {
605         my $self = shift;
606         return $self->{session_id};
607 }
608
609 sub push_queue {
610         my $self = shift;
611         my $resp = shift;
612         my $req = $self->app_request($resp->[1]);
613         return $req->push_queue( $resp->[0] ) if ($req);
614         push @{ $self->{recv_queue} }, $resp->[0];
615 }
616
617 sub last_threadTrace {
618         my $self = shift;
619         my $new_last_threadTrace = shift;
620
621         my $old_last_threadTrace = $self->{last_threadTrace};
622         if (defined $new_last_threadTrace) {
623                 $self->{last_threadTrace} = $new_last_threadTrace;
624                 return $new_last_threadTrace unless ($old_last_threadTrace);
625         }
626
627         return $old_last_threadTrace;
628 }
629
630 sub session_threadTrace {
631         my $self = shift;
632         my $new_last_threadTrace = shift;
633
634         my $old_last_threadTrace = $self->{session_threadTrace};
635         if (defined $new_last_threadTrace) {
636                 $self->{session_threadTrace} = $new_last_threadTrace;
637                 return $new_last_threadTrace unless ($old_last_threadTrace);
638         }
639
640         return $old_last_threadTrace;
641 }
642
643 sub last_message_type {
644         my $self = shift;
645         my $new_last_message_type = shift;
646
647         my $old_last_message_type = $self->{last_message_type};
648         if (defined $new_last_message_type) {
649                 $self->{last_message_type} = $new_last_message_type;
650                 return $new_last_message_type unless ($old_last_message_type);
651         }
652
653         return $old_last_message_type;
654 }
655
656 sub last_message_api_level {
657         my $self = shift;
658         my $new_last_message_api_level = shift;
659
660         my $old_last_message_api_level = $self->{last_message_api_level};
661         if (defined $new_last_message_api_level) {
662                 $self->{last_message_api_level} = $new_last_message_api_level;
663                 return $new_last_message_api_level unless ($old_last_message_api_level);
664         }
665
666         return $old_last_message_api_level;
667 }
668
669 sub remote_id {
670         my $self = shift;
671         my $new_remote_id = shift;
672
673         my $old_remote_id = $self->{remote_id};
674         if (defined $new_remote_id) {
675                 $self->{remote_id} = $new_remote_id;
676                 return $new_remote_id unless ($old_remote_id);
677         }
678
679         return $old_remote_id;
680 }
681
682 sub client_auth {
683         return undef;
684         my $self = shift;
685         my $new_ua = shift;
686
687         my $old_ua = $self->{client_auth};
688         if (defined $new_ua) {
689                 $self->{client_auth} = $new_ua;
690                 return $new_ua unless ($old_ua);
691         }
692
693         return $old_ua->cloneNode(1);
694 }
695
696 sub state {
697         my $self = shift;
698         my $new_state = shift;
699
700         my $old_state = $self->{state};
701         if (defined $new_state) {
702                 $self->{state} = $new_state;
703                 return $new_state unless ($old_state);
704         }
705
706         return $old_state;
707 }
708
709 sub DESTROY {
710         my $self = shift;
711         delete $$self{$_} for keys %$self;
712         return undef;
713 }
714
715 sub recv {
716         my $self = shift;
717         my @proto_args = @_;
718         my %args;
719
720         if ( @proto_args ) {
721                 if ( !(@proto_args % 2) ) {
722                         %args = @proto_args;
723                 } elsif (@proto_args == 1) {
724                         %args = ( timeout => @proto_args );
725                 }
726         }
727
728         #$logger->debug( ref($self). " recv_queue before wait: " . $self->_print_queue(), INTERNAL );
729
730         if( exists( $args{timeout} ) ) {
731                 $args{timeout} = int($args{timeout});
732                 $self->{recv_timeout} = $args{timeout};
733         }
734
735         #$args{timeout} = 0 if ($self->complete);
736
737         if(defined($args{timeout})) {
738                 $logger->debug( ref($self) ."->recv with timeout " . $args{timeout}, INTERNAL );
739         }
740
741         $args{count} ||= 1;
742
743         my $avail = @{ $self->{recv_queue} };
744         $self->{remaining_recv_timeout} = $self->{recv_timeout};
745
746         while ( $self->{remaining_recv_timeout} > 0 and $avail < $args{count} ) {
747                 last if $self->complete;
748                 my $starttime = time;
749                 $self->queue_wait($self->{remaining_recv_timeout});
750                 my $endtime = time;
751                 if ($self->{timeout_reset}) {
752                         $self->{timeout_reset} = 0;
753                 } else {
754                         $self->{remaining_recv_timeout} -= ($endtime - $starttime)
755                 }
756                 $avail = @{ $self->{recv_queue} };
757         }
758
759
760         my @list;
761         while ( my $msg = shift @{ $self->{recv_queue} } ) {
762                 push @list, $msg;
763                 last if (scalar(@list) >= $args{count});
764         }
765
766         $logger->debug( "Number of matched responses: " . @list, DEBUG );
767         $self->queue_wait(0); # check for statuses
768         
769         return $list[0] unless (wantarray);
770         return @list;
771 }
772
773 sub push_resend {
774         my $self = shift;
775         push @OpenSRF::AppSession::_RESEND_QUEUE, @_;
776 }
777
778 sub flush_resend {
779         my $self = shift;
780         $logger->debug( "Resending..." . @_RESEND_QUEUE, DEBUG );
781         while ( my $req = shift @OpenSRF::AppSession::_RESEND_QUEUE ) {
782                 $req->resend unless $req->complete;
783         }
784 }
785
786
787 sub queue_wait {
788         my $self = shift;
789         if( ! $self->{peer_handle} ) { return 0; }
790         my $timeout = shift || 0;
791         $logger->debug( "Calling queue_wait($timeout)" , DEBUG );
792         $logger->debug( "Timestamp before process($timeout) : " . $logger->format_time(), INTERNAL );
793         my $o = $self->{peer_handle}->process($timeout);
794         $logger->debug( "Timestamp after  process($timeout) : " . $logger->format_time(), INTERNAL );
795         $self->flush_resend;
796         return $o;
797 }
798
799 sub _print_queue {
800         my( $self ) = @_;
801         my $string = "";
802         foreach my $msg ( @{$self->{recv_queue}} ) {
803                 $string = $string . $msg->toString(1) . "\n";
804         }
805         return $string;
806 }
807
808 sub status {
809         my $self = shift;
810         return unless $self;
811         $self->send( 'STATUS', @_ );
812 }
813
814 sub reset_request_timeout {
815         my $self = shift;
816         my $tt = shift;
817         my $req = $self->app_request($tt);
818         $req->{remaining_recv_timeout} = $req->{recv_timeout};
819         $req->{timout_reset} = 1;
820 }
821
822 #-------------------------------------------------------------------------------
823
824 package OpenSRF::AppRequest;
825 use base qw/OpenSRF::AppSession/;
826 use OpenSRF::Utils::Logger qw/:level/;
827 use OpenSRF::DomainObject::oilsResponse qw/:status/;
828
829 sub new {
830         my $class = shift;
831         $class = ref($class) || $class;
832
833         my $session = shift;
834         my $threadTrace = $session->session_threadTrace || $session->last_threadTrace;
835         my $payload = shift;
836         
837         my $self = {    session                 => $session,
838                         threadTrace             => $threadTrace,
839                         payload                 => $payload,
840                         complete                => 0,
841                         timeout_reset           => 0,
842                         recv_timeout            => 30,
843                         remaining_recv_timeout  => 30,
844                         recv_queue              => [],
845         };
846
847         bless $self => $class;
848
849         push @{ $self->session->{request_queue} }, $self;
850
851         return $self;
852 }
853
854 sub queue_size {
855         my $size = @{$_[0]->{recv_queue}};
856         return $size;
857 }
858         
859 sub send {
860         my $self = shift;
861         return unless ($self and $self->session and !$self->complete);
862         $self->session->send(@_);
863 }
864
865 sub finish {
866         my $self = shift;
867         return unless $self->session;
868         $self->session->remove_app_request($self);
869         delete($$self{$_}) for (keys %$self);
870 }
871
872 sub session {
873         return shift()->{session};
874 }
875
876 sub complete {
877         my $self = shift;
878         my $complete = shift;
879         return $self->{complete} if ($self->{complete});
880         if (defined $complete) {
881                 $self->{complete} = $complete;
882         } else {
883                 $self->session->queue_wait(0);
884         }
885         return $self->{complete};
886 }
887
888 sub wait_complete {
889         my $self = shift;
890         my $timeout = shift || 1;
891         my $time_remaining = $timeout;
892
893         while ( ! $self->complete  and $time_remaining > 0 ) {
894                 my $starttime = time;
895                 $self->queue_wait($time_remaining);
896                 my $endtime = time;
897                 $time_remaining -= ($endtime - $starttime);
898         }
899
900         return $self->complete;
901 }
902
903 sub threadTrace {
904         return shift()->{threadTrace};
905 }
906
907 sub push_queue {
908         my $self = shift;
909         my $resp = shift;
910         if( !$resp ) { return 0; }
911         if( UNIVERSAL::isa($resp, "Error")) {
912                 $self->{failed} = $resp;
913                 $self->complete(1);
914                 #return; eventually...
915         }
916         push @{ $self->{recv_queue} }, $resp;
917 }
918
919 sub failed {
920         my $self = shift;
921         return $self->{failed};
922 }
923
924 sub queue_wait {
925         my $self = shift;
926         OpenSRF::Utils::Logger->debug( "Calling queue_wait(@_)", DEBUG );
927         return $self->session->queue_wait(@_)
928 }
929
930 sub payload { return shift()->{payload}; }
931
932 sub resend {
933         my $self = shift;
934         return unless ($self and $self->session and !$self->complete);
935         OpenSRF::Utils::Logger->debug( "I'm resending the request for threadTrace ". $self->threadTrace, DEBUG);
936         return $self->session->send('REQUEST', $self->payload, $self->threadTrace );
937 }
938
939 sub status {
940         my $self = shift;
941         my $msg = shift;
942         return unless ($self and $self->session and !$self->complete);
943         $self->session->send( 'STATUS',$msg, $self->threadTrace );
944 }
945
946 sub respond {
947         my $self = shift;
948         my $msg = shift;
949         return unless ($self and $self->session and !$self->complete);
950
951         my $response;
952         if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
953                 $response = $msg;
954         } else {
955                 $response = new OpenSRF::DomainObject::oilsResult;
956                 $response->content($msg);
957         }
958
959         $self->session->send('RESULT', $response, $self->threadTrace);
960 }
961
962 sub respond_complete {
963         my $self = shift;
964         my $msg = shift;
965         return unless ($self and $self->session and !$self->complete);
966
967         my $response;
968         if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
969                 $response = $msg;
970         } else {
971                 $response = new OpenSRF::DomainObject::oilsResult;
972                 $response->content($msg);
973         }
974
975         my $stat = OpenSRF::DomainObject::oilsConnectStatus->new(
976                 statusCode => STATUS_COMPLETE(),
977                 status => 'Request Complete' );
978
979
980         $self->session->send( 'RESULT' => $response, 'STATUS' => $stat, $self->threadTrace);
981         $self->complete(1);
982 }
983
984 sub register_death_callback {
985         my $self = shift;
986         my $cb = shift;
987         $self->session->register_callback( death => $cb );
988 }
989
990
991 # utility method.  checks to see of the request failed.
992 # if so, throws an OpenSRF::EX::ERROR. if everything is
993 # ok, it returns the content of the request
994 sub gather {
995         my $self = shift;
996         my $finish = shift;
997         $self->wait_complete;
998         my $resp = $self->recv( timeout => 60 );
999         if( $self->failed() ) { 
1000                 throw OpenSRF::EX::ERROR
1001                         ($self->failed()->stringify());
1002         }
1003         if(!$resp) { return undef; }
1004         my $content = $resp->content;
1005         if($finish) { $self->finish();}
1006         return $content;
1007 }
1008
1009
1010 package OpenSRF::AppSubrequest;
1011
1012 sub respond {
1013         my $self = shift;
1014         my $resp = shift;
1015         push @{$$self{resp}}, $resp if (defined $resp);
1016 }
1017 sub respond_complete { respond(@_); }
1018
1019 sub new {
1020         return bless({resp => []}, 'OpenSRF::AppSubrequest');
1021 }
1022
1023 sub responses { @{$_[0]->{resp}} }
1024
1025 sub status {}
1026
1027
1028 1;
1029