]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
forcing int on timeout values in process and timed_read
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
9
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
11 use IO::Socket::INET;
12 use IO::Socket::UNIX;
13
14 =head1 Description
15
16 OpenSRF::Transport::SlimJabber::Client
17
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
20
21   transport->server->sslport # the ssl port
22   transport->server->ssl  # is this ssl?
23
24 =cut
25
26 my $logger = "OpenSRF::Utils::Logger";
27
28 sub DESTROY{
29         my $self = shift;
30         $self->disconnect;
31 }
32
33 sub disconnect{
34         my $self = shift;
35         my $socket = $self->{_socket};
36         if( $socket and $socket->connected() ) {
37                 print $socket "</stream:stream>";
38                 close( $socket );
39         }
40 }
41
42
43 =head2 new()
44
45 Creates a new Client object.
46
47 debug and log_file are not required if you don't care to log the activity, 
48 however all other parameters are.
49
50 %params:
51
52         username
53         resource        
54         password
55         debug    
56         log_file
57
58 =cut
59
60 sub new {
61
62         my( $class, %params ) = @_;
63
64         $class = ref( $class ) || $class;
65
66         my $port                        = $params{'port'}                       || return undef;
67         my $username    = $params{'username'}   || return undef;
68         my $resource    = $params{'resource'}   || return undef;
69         my $password    = $params{'password'}   || return undef;
70         my $host                        = $params{'host'}                       || return undef;
71
72         my $jid = "$username\@$host\/$resource";
73
74         my $self = bless {} => $class;
75
76         $self->jid( $jid );
77         $self->host( $host );
78         $self->port( $port );
79         $self->username( $username );
80         $self->resource( $resource );
81         $self->password( $password );
82         $self->{temp_buffer} = "";
83
84         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
85                         $logger->INFO );
86
87         return $self;
88 }
89
90 # clears the tmp buffer as well as the TCP buffer
91 sub buffer_reset { 
92
93         my $self = shift;
94         $self->{temp_buffer} = ""; 
95
96         my $fh = $self->{_socket};
97         set_nonblock( $fh );
98         my $t_buf = "";
99         while( sysread( $fh, $t_buf, 4096 ) ) {} 
100         set_block( $fh );
101 }
102 # -------------------------------------------------
103
104 =head2 gather()
105
106 Gathers all Jabber messages sitting in the collection queue 
107 and hands them each to their respective callbacks.  This call
108 does not block (calls Process(0))
109
110 =cut
111
112 sub gather { my $self = shift; $self->process( 0 ); }
113
114 # -------------------------------------------------
115
116 =head2 listen()
117
118 Blocks and gathers incoming messages as they arrive.  Does not return
119 unless an error occurs.
120
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
122
123 =cut
124 sub listen {
125         my $self = shift;
126
127         my $sock = $self->unix_sock();
128         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
129         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
130         
131         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132                 unless ($socket->connected);
133                 
134         while(1) {
135                 my $o = $self->process( -1 );
136                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137                 if( ! defined( $o ) ) {
138                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139                 }
140                 print $socket $o;
141
142         }
143         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 }
145
146 sub set_nonblock {
147         my $fh = shift;
148         my      $flags = fcntl($fh, F_GETFL, 0)
149                 or die "Can't get flags for the socket: $!\n";
150
151         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
152
153         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154                 or die "Can't set flags for the socket: $!\n";
155
156         return $flags;
157 }
158
159 sub reset_fl {
160         my $fh = shift;
161         my $flags = shift;
162         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163         fcntl($fh, F_SETFL, $flags) if defined $flags;
164 }
165
166 sub set_block {
167         my $fh = shift;
168
169         my      $flags = fcntl($fh, F_GETFL, 0)
170                 or die "Can't get flags for the socket: $!\n";
171
172         $flags &= ~O_NONBLOCK;
173
174         fcntl($fh, F_SETFL, $flags)
175                 or die "Can't set flags for the socket: $!\n";
176 }
177
178
179 sub timed_read {
180         my ($self, $timeout) = @_;
181     $timeout = defined($timeout) ? int($timeout) : undef;
182
183         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
184         if( $self->can( "app" ) ) {
185                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
186         }
187
188         # See if there is a complete message in the temp_buffer
189         # that we can return
190         if( $self->{temp_buffer} ) {
191                 my $buffer = $self->{temp_buffer};
192                 my $complete = 0;
193                 $self->{temp_buffer} = '';
194
195                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
196                 $self->{last_tag} = $tag;
197                 $logger->transport("Using tag: $tag  ", INTERNAL);
198
199                 if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
200                         $buffer = $1;
201                         $self->{temp_buffer} = $2;
202                         $complete++;
203                         $logger->transport( "completed read with $buffer", INTERNAL );
204                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
205                         $self->{temp_buffer} = $1;
206                         $complete++;
207                         $logger->transport( "completed read with $buffer", INTERNAL );
208                 } else {
209                         $self->{temp_buffer} = $buffer;
210                 }
211                                 
212                 if( $buffer and $complete ) {
213                         return $buffer;
214                 }
215
216         }
217         ############
218
219         my $fh = $self->{_socket};
220
221         unless( $fh and $fh->connected ) {
222                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
223         }
224
225         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
226
227         my $flags;
228         if (defined($timeout) && !$timeout) {
229                 $flags = set_nonblock( $fh );
230         }
231
232         $timeout ||= 0;
233         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
234
235
236         my $complete = 0;
237         my $first_read = 1;
238         my $xml = '';
239         eval {
240                 my $tag = '';
241                 eval {
242                         no warnings;
243                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
244
245                         # alarm needs a number greater => 1.
246                         my $alarm_timeout = $timeout;
247                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
248                                 $alarm_timeout = 1;
249                         }
250                         alarm $alarm_timeout;
251                         do {    
252
253                                 my $buffer = $self->{temp_buffer};
254                                 $self->{temp_buffer} = '';
255                                 #####
256
257                                 # This code is no longer in use
258                                 #my $ff =  fcntl($fh, F_GETFL, 0);
259                                 #if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
260                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
261                                 #}
262
263                                 my $t_buf = "";
264                                 my $read_size = 1024; my $f = 0;
265                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
266
267                                         unless( $fh->connected ) {
268                                                 OpenSRF::EX::JabberDisconnected->throw(
269                                                         "Lost jabber client in timed_read()");
270                                         }
271
272                                         # XXX Change me to debug/internal at some point, this is for testing...
273                                         # XXX Found a race condition where reading >= $read_size bytes of data
274                                         # will fail if the log line below is removed.
275                                         $logger->info("timed_read() read $n bytes of data");
276
277
278                                         $buffer .= $t_buf;
279                                         if( $n < $read_size ) {
280                                                 #reset_fl( $fh, $f ) if $f;
281                                                 set_block( $fh );
282                                                 last;
283                                         }
284                                         # see if there is any more data to grab...
285                                         $f = set_nonblock( $fh );
286                                 }
287
288                                 #sysread($fh, $buffer, 2048, length($buffer) );
289                                 #sysread( $fh, $t_buf, 2048 );
290                                 #$buffer .= $t_buf;
291
292                                 #####
293                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
294
295                                 if ($first_read) {
296                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
297                                         ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
298                                         $self->{last_tag} = $tag;
299                                         $first_read--;
300                                         $logger->transport("Using tag: $tag  ", INTERNAL);
301                                 }
302
303                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
304                                         $buffer = $1;
305                                         $self->{temp_buffer} = $2;
306                                         $complete++;
307                                         $logger->transport( "completed read with $buffer", INTERNAL );
308                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
309                                         $self->{temp_buffer} = $1;
310                                         $complete++;
311                                         $logger->transport( "completed read with $buffer", INTERNAL );
312                                 }
313                                 
314                                 $xml .= $buffer;
315
316                         } while (!$complete && $xml);
317                         alarm(0);
318                 };
319                 alarm(0);
320         };
321
322         $logger->transport( "XML Read: $xml", INTERNAL );
323         #reset_fl( $fh, $flags) if defined $flags;
324         set_block( $fh ) if defined $flags;
325
326         if ($complete) {
327                 return $xml;
328         }
329         if( $@ ) {
330                 return undef;
331         }
332         return "";
333 }
334
335
336 # -------------------------------------------------
337
338 sub tcp_connected {
339
340         my $self = shift;
341         return 1 if ($self->{_socket} and $self->{_socket}->connected);
342         return 0;
343 }
344
345 sub password {
346         my( $self, $password ) = @_;
347         $self->{'oils:password'} = $password if $password;
348         return $self->{'oils:password'};
349 }
350
351 # -------------------------------------------------
352
353 sub username {
354         my( $self, $username ) = @_;
355         $self->{'oils:username'} = $username if $username;
356         return $self->{'oils:username'};
357 }
358         
359 # -------------------------------------------------
360
361 sub resource {
362         my( $self, $resource ) = @_;
363         $self->{'oils:resource'} = $resource if $resource;
364         return $self->{'oils:resource'};
365 }
366
367 # -------------------------------------------------
368
369 sub jid {
370         my( $self, $jid ) = @_;
371         $self->{'oils:jid'} = $jid if $jid;
372         return $self->{'oils:jid'};
373 }
374
375 sub port {
376         my( $self, $port ) = @_;
377         $self->{'oils:port'} = $port if $port;
378         return $self->{'oils:port'};
379 }
380
381 sub host {
382         my( $self, $host ) = @_;
383         $self->{'oils:host'} = $host if $host;
384         return $self->{'oils:host'};
385 }
386
387 # -------------------------------------------------
388
389 =head2 send()
390
391         Sends a Jabber message.
392         
393         %params:
394                 to                      - The JID of the recipient
395                 thread  - The Jabber thread
396                 body            - The body of the message
397
398 =cut
399
400 sub send {
401         my $self = shift;
402         my %params = @_;
403
404         my $to = $params{'to'} || return undef;
405         my $body = $params{'body'} || return undef;
406         my $thread = $params{'thread'} || "";
407         my $router_command = $params{'router_command'} || "";
408         my $router_class = $params{'router_class'} || "";
409
410         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
411
412         $msg->setTo( $to );
413         $msg->setThread( $thread ) if $thread;
414         $msg->setBody( $body );
415         $msg->set_router_command( $router_command );
416         $msg->set_router_class( $router_class );
417    $msg->set_osrf_xid($logger->get_osrf_xid);
418
419         $logger->transport( 
420                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
421
422         my $soc = $self->{_socket};
423         unless( $soc and $soc->connected ) {
424                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
425         }
426         print $soc $msg->toString;
427
428         $logger->transport( 
429                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
430 }
431
432
433 =head2 inintialize()
434
435 Connect to the server and log in.  
436
437 Throws an OpenSRF::EX::JabberException if we cannot connect
438 to the server or if the authentication fails.
439
440 =cut
441
442 # --- The logging lines have been commented out until we decide 
443 # on which log files we're using.
444
445 sub initialize {
446
447         my $self = shift;
448
449         my $jid         = $self->jid; 
450         my $host        = $self->host; 
451         my $port        = $self->port; 
452         my $username    = $self->username;
453         my $resource    = $self->resource;
454         my $password    = $self->password;
455
456         my $stream = <<"        XML";
457 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
458         XML
459
460         my $conf = OpenSRF::Utils::Config->current;
461         my $tail = "_$$";
462         if(!$conf->bootstrap->router_name && $username eq "router") {
463                 $tail = "";
464         }
465
466         my $auth = <<"  XML";
467 <iq id='123' type='set'>
468 <query xmlns='jabber:iq:auth'>
469 <username>$username</username>
470 <password>$password</password>
471 <resource>${resource}$tail</resource>
472 </query>
473 </iq>
474         XML
475
476         my $sock_type = 'IO::Socket::INET';
477         
478         # if port is a string, then we're connecting to a UNIX socket
479         unless( $port =~ /^\d+$/ ) {
480                 $sock_type = 'IO::Socket::UNIX';
481         }
482
483         # --- 5 tries to connect to the jabber server
484         my $socket;
485         for(1..5) {
486                 $socket = $sock_type->new( PeerHost => $host,
487                                            PeerPort => $port,
488                                            Peer => $port,
489                                            Proto    => 'tcp' );
490                 $logger->debug( "$jid: $_ connect attempt to $host:$port");
491                 last if ( $socket and $socket->connected );
492                 $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
493                 sleep 3;
494         }
495
496         unless ( $socket and $socket->connected ) {
497                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
498         }
499
500         $logger->transport( "Logging into jabber as $jid " .
501                         "from " . ref( $self ), DEBUG );
502
503         print $socket $stream;
504
505         my $buffer;
506         eval {
507                 eval {
508                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
509                         alarm 3;
510                         sysread($socket, $buffer, 4096);
511                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
512                         alarm(0);
513                 };
514                 alarm(0);
515         };
516
517         print $socket $auth;
518
519         if( $socket and $socket->connected() ) {
520                 $self->{_socket} = $socket;
521         } else {
522                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
523         }
524
525
526         $buffer = $self->timed_read(10);
527
528         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
529
530         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
531                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
532         } else {
533                 if( !$buffer ) { $buffer = " "; }
534                 $socket->close;
535                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
536         }
537
538         return $self;
539 }
540
541 sub construct {
542         my( $class, $app ) = @_;
543         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
544         $class->peer_handle( 
545                         $class->new( $app )->initialize() );
546 }
547
548 sub process {
549
550         my( $self, $timeout ) = @_;
551
552         $timeout ||= 0;
553     $timeout = int($timeout);
554         undef $timeout if ( $timeout < 0 );
555
556         unless( $self->{_socket}->connected ) {
557                 OpenSRF::EX::JabberDisconnected->throw( 
558                   "This JabberClient instance is no longer connected to the server " . 
559                   $self->username . " : " . $self->resource, ERROR );
560         }
561
562         my $val = $self->timed_read( $timeout );
563
564         $timeout = "FOREVER" unless ( defined $timeout );
565         
566         if ( ! defined( $val ) ) {
567                 OpenSRF::EX::Jabber->throw( 
568                   "Call to Client->timed_read( $timeout ) failed", ERROR );
569         } elsif ( ! $val ) {
570                 $logger->transport( 
571                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
572         } elsif ( $val ) {
573                 $logger->transport( 
574                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
575         }
576
577         my $t = $self->{last_tag};
578
579         if( $t and $val ) {
580                 my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
581                 $val = shift(@msgs);
582         
583                 if (@msgs) {
584                         my $tmp = $self->{temp_buffer};
585         
586                         $self->{temp_buffer} = '';
587                         $self->{temp_buffer} .= $_ for (@msgs);
588                         $self->{temp_buffer} .= $tmp;
589                 }
590         }
591
592         return $val;
593 }
594
595
596 # --------------------------------------------------------------
597 # Sets the socket to O_NONBLOCK, reads all of the data off of
598 # the socket, the restores the sockets flags
599 # Returns 1 on success, 0 if the socket isn't connected
600 # --------------------------------------------------------------
601 sub flush_socket {
602
603         my $self = shift;
604         my $socket = $self->{_socket};
605
606         if( $socket ) {
607
608                 my $buf;
609                 my      $flags = fcntl($socket, F_GETFL, 0);
610
611                 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
612                 while( my $n = sysread( $socket, $buf, 8192 ) ) {
613                         $logger->debug("flush_socket dropped $n bytes of data");
614                         if(!$socket->connected()) {
615                                 $logger->error("flush_socket dropped data on disconnected socket: $buf");
616                         }
617                 }
618                 fcntl($socket, F_SETFL, $flags);
619
620                 return 0 unless $socket->connected();
621
622                 return 1;
623
624         } else {
625
626                 return 0;
627         }
628 }
629
630
631
632 1;