]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perl/lib/OpenSRF/Application.pm
6ebfe4b692c227ce3a42fb905607cc6b8ee26302
[OpenSRF.git] / src / perl / lib / OpenSRF / Application.pm
1 package OpenSRF::Application;
2 # vim:noet:ts=4
3 use vars qw/$_app $log @_METHODS $thunk $server_class/;
4
5 use base qw/OpenSRF/;
6 use OpenSRF::AppSession;
7 use OpenSRF::DomainObject::oilsMethod;
8 use OpenSRF::DomainObject::oilsResponse qw/:status/;
9 use OpenSRF::Utils::Logger qw/:level $logger/;
10 use Data::Dumper;
11 use Time::HiRes qw/time/;
12 use OpenSRF::EX qw/:try/;
13 use Carp;
14 use OpenSRF::Utils::JSON;
15
16 sub DESTROY{};
17
18 use strict;
19 use warnings;
20
21 $log = 'OpenSRF::Utils::Logger';
22
23 our $in_request = 0;
24 our @pending_requests;
25
26 sub package {
27         my $self = shift;
28         return 1 unless ref($self);
29         return $self->{package};
30 }
31
32 sub signature {
33         my $self = shift;
34         return 0 unless ref($self);
35         return $self->{signature};
36 }
37
38 sub strict {
39     my $self = shift; 
40     return 0 unless ref($self);
41     return $self->{strict};
42 }
43
44 sub argc {
45         my $self = shift;
46         return 0 unless ref($self);
47         return $self->{argc};
48 }
49
50 sub max_chunk_size {
51         my $self = shift;
52         return 0 unless ref($self);
53         return $self->{max_chunk_size} if (defined($self->{max_chunk_size}));
54         return 10240;
55 }
56
57 sub max_chunk_count {
58         my $self = shift;
59         return 0 unless ref($self);
60         return $self->{max_chunk_count} || 0;
61 }
62
63 sub api_name {
64         my $self = shift;
65         return 1 unless ref($self);
66         return $self->{api_name};
67 }
68
69 sub api_level {
70         my $self = shift;
71         return 1 unless ref($self);
72         return $self->{api_level};
73 }
74
75 sub session {
76         my $self = shift;
77         my $session = shift;
78
79         if($session) {
80                 $self->{session} = $session;
81         }
82         return $self->{session};
83 }
84
85 sub server_class {
86         my $class = shift;
87         if($class) {
88                 $server_class = $class;
89         }
90         return $server_class;
91 }
92
93 sub thunk {
94         my $self = shift;
95         my $flag = shift;
96         $thunk = $flag if (defined $flag);
97         return $thunk;
98 }
99
100 sub application_implementation {
101         my $self = shift;
102         my $app = shift;
103
104         if (defined $app) {
105                 $_app = $app;
106                 $_app->use;
107                 if( $@ ) {
108                         $log->error( "Error loading application_implementation: $app -> $@", ERROR);
109                 }
110
111         }
112
113         return $_app;
114 }
115
116 sub handler {
117         my ($self, $session, $app_msg) = @_;
118
119         if( ! $app_msg ) {
120                 return 1;  # error?
121         }
122
123         my $app = $self->application_implementation;
124
125         if ($session->last_message_type eq 'REQUEST') {
126
127         my @p = $app_msg->params;
128                 my $method_name = $app_msg->method;
129                 my $method_proto = $session->last_message_api_level;
130                 $log->info("CALL: ".$session->service." $method_name ". (@p ? join(', ',@p) : ''));
131
132                 my $coderef = $app->method_lookup( $method_name, $method_proto, 1, 1 );
133
134                 unless ($coderef) {
135                         $session->status( OpenSRF::DomainObject::oilsMethodException->new( 
136                                                 statusCode => STATUS_NOTFOUND(),
137                                                 status => "Method [$method_name] not found for $app"));
138                         return 1;
139                 }
140
141                 unless ($session->continue_request) {
142                         $session->status(
143                                 OpenSRF::DomainObject::oilsConnectStatus->new(
144                                                 statusCode => STATUS_REDIRECTED(),
145                                                 status => 'Disconnect on max requests' ) );
146                         $session->kill_me;
147                         return 1;
148                 }
149
150                 if (ref $coderef) {
151                         my @args = $app_msg->params;
152                         my $appreq = OpenSRF::AppRequest->new( $session );
153                         $appreq->max_chunk_size( $coderef->max_chunk_size );
154                         $appreq->max_chunk_count( $coderef->max_chunk_count );
155
156                         $log->debug( "in_request = $in_request : [" . $appreq->threadTrace."]", INTERNAL );
157                         if( $in_request ) {
158                                 $log->debug( "Pushing onto pending requests: " . $appreq->threadTrace, DEBUG );
159                                 push @pending_requests, [ $appreq, \@args, $coderef ]; 
160                                 return 1;
161                         }
162
163
164                         $in_request++;
165
166                         $log->debug( "Executing coderef for {$method_name}", INTERNAL );
167
168                         my $resp;
169                         try {
170                                 # un-if(0) this block to enable param checking based on signature and argc
171                                 if ($coderef->strict) {
172                                         if (@args < $coderef->argc) {
173                                                 die     "Not enough params passed to ".
174                                                         $coderef->api_name." : requires ". $coderef->argc
175                                         }
176                                         if (@args) {
177                                                 my $sig = $coderef->signature;
178                                                 if ($sig && exists $sig->{params}) {
179                                                         for my $p (0 .. scalar(@{ $sig->{params} }) - 1 ) {
180                                                                 my $s = $sig->{params}->[$p];
181                                                                 my $a = $args[$p];
182                                                                 if ($s->{class} && OpenSRF::Utils::JSON->lookup_hint(ref $a) ne $s->{class}) {
183                                                                         die "Incorrect param class at position $p : should be a '$$s{class}'";
184                                                                 } elsif ($s->{type}) {
185                                                                         if (lc($s->{type}) eq 'object' && $a !~ /HASH/o) {
186                                                                                 die "Incorrect param type at position $p : should be an 'object'";
187                                                                         } elsif (lc($s->{type}) eq 'array' && $a !~ /ARRAY/o) {
188                                                                                 die "Incorrect param type at position $p : should be an 'array'";
189                                                                         } elsif (lc($s->{type}) eq 'number' && (ref($a) || $a !~ /^-?\d+(?:\.\d+)?$/o)) {
190                                                                                 die "Incorrect param type at position $p : should be a 'number'";
191                                                                         } elsif (lc($s->{type}) eq 'string' && ref($a)) {
192                                                                                 die "Incorrect param type at position $p : should be a 'string'";
193                                                                         }
194                                                                 }
195                                                         }
196                                                 }
197                                         }
198                                 }
199
200                                 my $start = time();
201                                 $resp = $coderef->run( $appreq, @args); 
202                                 my $time = sprintf '%.3f', time() - $start;
203
204                                 $log->debug( "Method duration for [$method_name]:  ". $time );
205                                 $appreq->respond_complete( $resp );
206
207                         } catch Error with {
208                                 my $e = shift;
209                                 warn "Caught error from 'run' method: $e\n";
210
211                                 if(UNIVERSAL::isa($e,"Error")) {
212                                         $e = $e->stringify();
213                                 } 
214                                 my $sess_id = $session->session_id;
215                                 $session->status(
216                                         OpenSRF::DomainObject::oilsMethodException->new(
217                                                         statusCode      => STATUS_INTERNALSERVERERROR(),
218                                                         status          => " *** Call to [$method_name] failed for session ".
219                                                                            "[$sess_id], thread trace ".
220                                                                            "[".$appreq->threadTrace."]:\n$e\n"
221                                         )
222                                 );
223                         };
224
225
226
227                         # ----------------------------------------------
228
229
230                         # XXX may need this later
231                         # $_->[1] = 1 for (@OpenSRF::AppSession::_CLIENT_CACHE);
232
233                         $in_request--;
234
235                         $log->debug( "Pending Requests: " . scalar(@pending_requests), INTERNAL );
236
237                         # cycle through queued requests
238                         while( my $aref = shift @pending_requests ) {
239                                 $in_request++;
240                                 my $resp;
241                                 try {
242                                         # un-if(0) this block to enable param checking based on signature and argc
243                                         if (0) {
244                                                 if (@args < $aref->[2]->argc) {
245                                                         die     "Not enough params passed to ".
246                                                                 $aref->[2]->api_name." : requires ". $aref->[2]->argc
247                                                 }
248                                                 if (@args) {
249                                                         my $sig = $aref->[2]->signature;
250                                                         if ($sig && exists $sig->{params}) {
251                                                                 for my $p (0 .. scalar(@{ $sig->{params} }) - 1 ) {
252                                                                         my $s = $sig->{params}->[$p];
253                                                                         my $a = $args[$p];
254                                                                         if ($s->{class} && OpenSRF::Utils::JSON->lookup_hint(ref $a) ne $s->{class}) {
255                                                                                 die "Incorrect param class at position $p : should be a '$$s{class}'";
256                                                                         } elsif ($s->{type}) {
257                                                                                 if (lc($s->{type}) eq 'object' && $a !~ /HASH/o) {
258                                                                                         die "Incorrect param type at position $p : should be an 'object'";
259                                                                                 } elsif (lc($s->{type}) eq 'array' && $a !~ /ARRAY/o) {
260                                                                                         die "Incorrect param type at position $p : should be an 'array'";
261                                                                                 } elsif (lc($s->{type}) eq 'number' && (ref($a) || $a !~ /^-?\d+(?:\.\d+)?$/o)) {
262                                                                                         die "Incorrect param type at position $p : should be a 'number'";
263                                                                                 } elsif (lc($s->{type}) eq 'string' && ref($a)) {
264                                                                                         die "Incorrect param type at position $p : should be a 'string'";
265                                                                                 }
266                                                                         }
267                                                                 }
268                                                         }
269                                                 }
270                                         }
271
272                                         my $start = time;
273                                         my $response = $aref->[2]->run( $aref->[0], @{$aref->[1]} );
274                                         my $time = sprintf '%.3f', time - $start;
275                                         $log->debug( "Method duration for [".$aref->[2]->api_name." -> ".join(', ',@{$aref->[1]}).']:  '.$time, DEBUG );
276
277                                         $appreq = $aref->[0];   
278                                         $appreq->respond_complete( $response );
279                                         $log->debug( "Executed: " . $appreq->threadTrace, INTERNAL );
280
281                                 } catch Error with {
282                                         my $e = shift;
283                                         if(UNIVERSAL::isa($e,"Error")) {
284                                                 $e = $e->stringify();
285                                         }
286                                         $session->status(
287                                                 OpenSRF::DomainObject::oilsMethodException->new(
288                                                                 statusCode => STATUS_INTERNALSERVERERROR(),
289                                                                 status => "Call to [".$aref->[2]->api_name."] faild:  $e"
290                                                 )
291                                         );
292                                 };
293                                 $in_request--;
294                         }
295
296                         return 1;
297                 } 
298
299                 $log->info("Received non-REQUEST message in Application handler");
300
301                 my $res = OpenSRF::DomainObject::oilsMethodException->new( 
302                                 status => "Received non-REQUEST message in Application handler");
303                 $session->send('ERROR', $res);
304                 $session->kill_me;
305                 return 1;
306
307         } else {
308                 $session->push_queue([ $app_msg, $session->last_threadTrace ]);
309         }
310
311         $session->last_message_type('');
312         $session->last_message_api_level('');
313
314         return 1;
315 }
316
317 sub is_registered {
318         my $self = shift;
319         my $api_name = shift;
320         my $api_level = shift || 1;
321         return exists($_METHODS[$api_level]{$api_name});
322 }
323
324
325 sub normalize_whitespace {
326         my $txt = shift;
327
328         $txt =~ s/^\s+//gso;
329         $txt =~ s/\s+$//gso;
330         $txt =~ s/\s+/ /gso;
331         $txt =~ s/\n//gso;
332         $txt =~ s/\. /\.  /gso;
333
334         return $txt;
335 }
336
337 sub parse_string_signature {
338         my $string = shift;
339         return [] unless $string;
340         my @chunks = split(/\@/smo, $string);
341
342         my @params;
343         my $ret;
344         my $desc = '';
345         for (@chunks) {
346                 if (/^return (.+)$/so) {
347                         $ret = [normalize_whitespace($1)];
348                 } elsif (/^param (\w+) \b(.+)$/so) {
349                         push @params, [ $1, normalize_whitespace($2) ];
350                 } else {
351                         $desc .= '@' if $desc;
352                         $desc .= $_;
353                 }
354         }
355
356         return [normalize_whitespace($desc),\@params, $ret];
357 }
358
359 sub parse_array_signature {
360         my $array = shift;
361         my ($d,$p,$r) = @$array;
362         return {} unless ($d or $p or $r);
363
364         return {
365                 desc    => $d,
366                 params  => [
367                         map { 
368                                 { name  => $$_[0],
369                                   desc  => $$_[1],
370                                   type  => $$_[2],
371                                   class => $$_[3],
372                                 }
373                         } @$p
374                 ],
375                 'return'=>
376                         { desc  => $$r[0],
377                           type  => $$r[1],
378                           class => $$r[2],
379                         }
380         };
381 }
382
383 sub register_method {
384         my $self = shift;
385         my $app = ref($self) || $self;
386         my %args = @_;
387
388
389         throw OpenSRF::DomainObject::oilsMethodException unless ($args{method});
390
391         $args{api_level} = 1 unless(defined($args{api_level}));
392         $args{stream} ||= 0;
393         $args{remote} ||= 0;
394         $args{argc} ||= 0;
395         $args{package} ||= $app;                
396         $args{server_class} = server_class();
397         $args{api_name} ||= $args{server_class} . '.' . $args{method};
398
399         # un-if(0) this block to enable signature parsing
400         if (!$args{signature}) {
401                 if ($args{notes} && !ref($args{notes})) {
402                         $args{signature} =
403                                 parse_array_signature( parse_string_signature( $args{notes} ) );
404                 }
405         } elsif( !ref($args{signature}) ) {
406                 $args{signature} =
407                         parse_array_signature( parse_string_signature( $args{signature} ) );
408         } elsif( ref($args{signature}) eq 'ARRAY') {
409                 $args{signature} =
410                         parse_array_signature( $args{signature} );
411         }
412         
413         unless ($args{object_hint}) {
414                 ($args{object_hint} = $args{package}) =~ s/::/_/go;
415         }
416
417         OpenSRF::Utils::JSON->register_class_hint( name => $args{package}, hint => $args{object_hint}, type => "hash" );
418
419         $_METHODS[$args{api_level}]{$args{api_name}} = bless \%args => $app;
420
421         __PACKAGE__->register_method(
422                 stream => 0,
423                 argc => $args{argc},
424                 api_name => $args{api_name}.'.atomic',
425                 method => 'make_stream_atomic',
426                 notes => "This is a system generated method.  Please see the definition for $args{api_name}",
427         ) if ($args{stream});
428 }
429
430 sub retrieve_remote_apis {
431         my $method = shift;
432         my $session = OpenSRF::AppSession->create('router');
433         try {
434                 $session->connect or OpenSRF::EX::WARN->throw("Connection to router timed out");
435         } catch Error with {
436                 my $e = shift;
437                 $log->debug( "Remote subrequest returned an error:\n". $e );
438                 return undef;
439         } finally {
440                 return undef unless ($session->state == $session->CONNECTED);
441         };
442
443         my $req = $session->request( 'opensrf.router.info.class.list' );
444         my $list = $req->recv;
445
446         if( UNIVERSAL::isa($list,"Error") ) {
447                 throw $list;
448         }
449
450         my $content = $list->content;
451
452         $req->finish;
453         $session->finish;
454         $session->disconnect;
455
456         my %u_list = map { ($_ => 1) } @$content;
457
458         for my $class ( keys %u_list ) {
459                 next if($class eq $server_class);
460                 populate_remote_method_cache($class, $method);
461         }
462 }
463
464 sub populate_remote_method_cache {
465         my $class = shift;
466         my $meth = shift;
467
468         my $session = OpenSRF::AppSession->create($class);
469         try {
470                 $session->connect or OpenSRF::EX::WARN->throw("Connection to $class timed out");
471
472                 my $call = 'opensrf.system.method.all' unless (defined $meth);
473                 $call = 'opensrf.system.method' if (defined $meth);
474
475                 my $req = $session->request( $call, $meth );
476
477                 while (my $method = $req->recv) {
478                         next if (UNIVERSAL::isa($method, 'Error'));
479
480                         $method = $method->content;
481                         next if ( exists($_METHODS[$$method{api_level}]) &&
482                                 exists($_METHODS[$$method{api_level}]{$$method{api_name}}) );
483                         $method->{remote} = 1;
484                         bless($method, __PACKAGE__ );
485                         $_METHODS[$$method{api_level}]{$$method{api_name}} = $method;
486                 }
487
488                 $req->finish;
489                 $session->finish;
490                 $session->disconnect;
491
492         } catch Error with {
493                 my $e = shift;
494                 $log->debug( "Remote subrequest returned an error:\n". $e );
495                 return undef;
496         };
497 }
498
499 sub method_lookup {             
500         my $self = shift;
501         my $method = shift;
502         my $proto = shift;
503         my $no_recurse = shift || 0;
504         my $no_remote = shift || 0;
505
506         # this instead of " || 1;" above to allow api_level 0
507         $proto = $self->api_level unless (defined $proto);
508
509         my $class = ref($self) || $self;
510
511         $log->debug("Lookup of [$method] by [$class] in api_level [$proto]", DEBUG);
512         $log->debug("Available methods\n\t".join("\n\t", keys %{ $_METHODS[$proto] }), INTERNAL);
513
514         my $meth;
515         if (__PACKAGE__->thunk) {
516                 for my $p ( reverse(1 .. $proto) ) {
517                         if (exists $_METHODS[$p]{$method}) {
518                                 $meth = $_METHODS[$p]{$method};
519                         }
520                 }
521         } else {
522                 if (exists $_METHODS[$proto]{$method}) {
523                         $meth = $_METHODS[$proto]{$method};
524                 }
525         }
526
527         if (defined $meth) {
528                 if($no_remote and $meth->{remote}) {
529                         $log->debug("OH CRAP We're not supposed to return remote methods", WARN);
530                         return undef;
531                 }
532
533         } elsif (!$no_recurse) {
534                 $log->debug("We didn't find [$method], asking everyone else.", DEBUG);
535                 retrieve_remote_apis($method);
536                 $meth = $self->method_lookup($method,$proto,1);
537         }
538
539         return $meth;
540 }
541
542 sub run {
543         my $self = shift;
544         my $req = shift;
545
546         my $resp;
547         my @params = @_;
548
549         if ( !UNIVERSAL::isa($req, 'OpenSRF::AppRequest') ) {
550                 $log->debug("Creating a SubRequest object", DEBUG);
551                 unshift @params, $req;
552                 $req = OpenSRF::AppSubrequest->new;
553                 $req->session( $self->session ) if ($self->session);
554
555         } else {
556                 $log->debug("This is a top level request", DEBUG);
557         }
558
559         if (!$self->{remote}) {
560                 my $code = \&{$self->{package} . '::' . $self->{method}};
561                 my $err = undef;
562
563                 try {
564                         $resp = $code->($self, $req, @params);
565
566                 } catch Error with {
567                         $err = shift;
568
569                         if( ref($self) eq 'HASH') {
570                                 $log->error("Sub $$self{package}::$$self{method} DIED!!!\n\t$err\n", ERROR);
571                         }
572                 };
573
574                 if($err) {
575                         if(UNIVERSAL::isa($err,"Error")) { 
576                                 throw $err;
577                         } else {
578                                 die $err->stringify; 
579                         }
580                 }
581
582
583                 $log->debug("Coderef for [$$self{package}::$$self{method}] has been run", DEBUG);
584
585                 if ( ref($req) and UNIVERSAL::isa($req, 'OpenSRF::AppSubrequest') ) {
586                         $req->respond($resp) if (defined $resp);
587                         $log->debug("SubRequest object is responding with : " . join(" ",$req->responses), DEBUG);
588                         return $req->responses;
589                 } else {
590                         $log->debug("A top level Request object is responding $resp", DEBUG) if (defined $resp);
591                         return $resp;
592                 }
593         } else {
594                 my $session = OpenSRF::AppSession->create($self->{server_class});
595                 try {
596                         #$session->connect or OpenSRF::EX::WARN->throw("Connection to [$$self{server_class}] timed out");
597                         my $remote_req = $session->request( $self->{api_name}, @params );
598                         while (my $remote_resp = $remote_req->recv) {
599                                 OpenSRF::Utils::Logger->debug("Remote Subrequest Received " . $remote_resp, INTERNAL );
600                                 if( UNIVERSAL::isa($remote_resp,"Error") ) {
601                                         throw $remote_resp;
602                                 }
603                                 $req->respond( $remote_resp->content );
604                         }
605                         $remote_req->finish();
606
607                 } catch Error with {
608                         my $e = shift;
609                         $log->debug( "Remote subrequest returned an error:\n". $e );
610                         return undef;
611                 };
612
613                 if ($session) {
614                         $session->disconnect();
615                         $session->finish();
616                 }
617
618                 $log->debug( "Remote Subrequest Responses " . join(" ", $req->responses), INTERNAL );
619
620                 return $req->responses;
621         }
622         # huh? how'd we get here...
623         return undef;
624 }
625
626 sub introspect {
627         my $self = shift;
628         my $client = shift;
629         my $method = shift;
630         my $limit = shift;
631         my $offset = shift;
632
633         if ($self->api_name =~ /all$/o) {
634                 $offset = $limit;
635                 $limit = $method;
636                 $method = undef; 
637         }
638
639         my ($seen,$returned) = (0,0);
640         for my $api_level ( reverse(1 .. $#_METHODS) ) {
641                 for my $api_name ( sort keys %{$_METHODS[$api_level]} ) {
642                         if (!$offset || $offset <= $seen) {
643                                 if (!$_METHODS[$api_level]{$api_name}{remote}) {
644                                         if (defined($method)) {
645                                                 if ($api_name =~ $method) {
646                                                         if (!$limit || $returned < $limit) {
647                                                                 $client->respond( $_METHODS[$api_level]{$api_name} );
648                                                                 $returned++;
649                                                         }
650                                                 }
651                                         } else {
652                                                 if (!$limit || $returned < $limit) {
653                                                         $client->respond( $_METHODS[$api_level]{$api_name} );
654                                                         $returned++;
655                                                 }
656                                         }
657                                 }
658                         }
659                         $seen++;
660                 }
661         }
662
663         return undef;
664 }
665 __PACKAGE__->register_method(
666         stream => 1,
667         method => 'introspect',
668         api_name => 'opensrf.system.method.all',
669         argc => 0,
670         signature => {
671                 desc => q/This method is used to introspect an entire OpenSRF Application/,
672                 return => {
673                         desc => q/A stream of objects describing the methods available via this OpenSRF Application/,
674                         type => 'object'
675                 }
676         },
677 );
678 __PACKAGE__->register_method(
679         stream => 1,
680         method => 'introspect',
681         argc => 1,
682         api_name => 'opensrf.system.method',
683         argc => 1,
684         signature => {
685                 desc => q/Use this method to get the definition of a single OpenSRF Method/,
686                 params => [
687                         { desc => q/The method to introspect/,
688                           type => 'string' },
689                 ],
690                 return => { desc => q/An object describing the method requested, or an error if it can't be found/,
691                             type => 'object' }
692         },
693 );
694
695 sub echo_method {
696         my $self = shift;
697         my $client = shift;
698         my @args = @_;
699
700         $client->respond( $_ ) for (@args);
701         return undef;
702 }
703 __PACKAGE__->register_method(
704         stream => 1,
705         method => 'echo_method',
706         argc => 1,
707         api_name => 'opensrf.system.echo',
708         signature => {
709                 desc => q/A test method that will echo back it's arguments in a streaming response/,
710                 params => [
711                         { desc => q/One or more arguments to echo back/ }
712                 ],
713                 return => { desc => q/A stream of the arguments passed/ }
714         },
715 );
716
717 sub time_method {
718         my( $self, $conn ) = @_;
719         return CORE::time;
720 }
721 __PACKAGE__->register_method(
722         method => 'time_method',
723         argc => 0,
724         api_name => 'opensrf.system.time',
725         signature => {
726                 desc => q/Returns the current system time as epoch seconds/,
727                 return => { desc => q/epoch seconds/ }
728         }
729 );
730
731 sub make_stream_atomic {
732         my $self = shift;
733         my $req = shift;
734         my @args = @_;
735
736         (my $m_name = $self->api_name) =~ s/\.atomic$//o;
737         my $m = $self->method_lookup($m_name);
738
739         $m->session( $req->session );
740         my @results = $m->run(@args);
741         $m->session('');
742
743         return \@results;
744 }
745
746 1;