]> git.evergreen-ils.org Git - Evergreen.git/blob - OpenSRF/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
added error message for cached data on a closed socket
[Evergreen.git] / OpenSRF / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
9
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
11 use IO::Socket::INET;
12 use IO::Socket::UNIX;
13
14 =head1 Description
15
16 OpenSRF::Transport::SlimJabber::Client
17
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
20
21   transport->server->sslport # the ssl port
22   transport->server->ssl  # is this ssl?
23
24 =cut
25
26 my $logger = "OpenSRF::Utils::Logger";
27
28 sub DESTROY{
29         my $self = shift;
30         $self->disconnect;
31 }
32
33 sub disconnect{
34         my $self = shift;
35         my $socket = $self->{_socket};
36         if( $socket and $socket->connected() ) {
37                 print $socket "</stream:stream>";
38                 close( $socket );
39         }
40 }
41
42
43 =head2 new()
44
45 Creates a new Client object.
46
47 debug and log_file are not required if you don't care to log the activity, 
48 however all other parameters are.
49
50 %params:
51
52         username
53         resource        
54         password
55         debug    
56         log_file
57
58 =cut
59
60 sub new {
61
62         my( $class, %params ) = @_;
63
64         $class = ref( $class ) || $class;
65
66         my $port                        = $params{'port'}                       || return undef;
67         my $username    = $params{'username'}   || return undef;
68         my $resource    = $params{'resource'}   || return undef;
69         my $password    = $params{'password'}   || return undef;
70         my $host                        = $params{'host'}                       || return undef;
71
72         my $jid = "$username\@$host\/$resource";
73
74         my $self = bless {} => $class;
75
76         $self->jid( $jid );
77         $self->host( $host );
78         $self->port( $port );
79         $self->username( $username );
80         $self->resource( $resource );
81         $self->password( $password );
82         $self->{temp_buffer} = "";
83
84         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
85                         $logger->INFO );
86
87         return $self;
88 }
89
90 # clears the tmp buffer as well as the TCP buffer
91 sub buffer_reset { 
92
93         my $self = shift;
94         $self->{temp_buffer} = ""; 
95
96         my $fh = $self->{_socket};
97         set_nonblock( $fh );
98         my $t_buf = "";
99         while( sysread( $fh, $t_buf, 4096 ) ) {} 
100         set_block( $fh );
101 }
102 # -------------------------------------------------
103
104 =head2 gather()
105
106 Gathers all Jabber messages sitting in the collection queue 
107 and hands them each to their respective callbacks.  This call
108 does not block (calls Process(0))
109
110 =cut
111
112 sub gather { my $self = shift; $self->process( 0 ); }
113
114 # -------------------------------------------------
115
116 =head2 listen()
117
118 Blocks and gathers incoming messages as they arrive.  Does not return
119 unless an error occurs.
120
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
122
123 =cut
124 sub listen {
125         my $self = shift;
126
127         my $sock = $self->unix_sock();
128         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
129         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
130         
131         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132                 unless ($socket->connected);
133                 
134         while(1) {
135                 my $o = $self->process( -1 );
136                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137                 if( ! defined( $o ) ) {
138                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139                 }
140                 print $socket $o;
141
142         }
143         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 }
145
146 sub set_nonblock {
147         my $fh = shift;
148         my      $flags = fcntl($fh, F_GETFL, 0)
149                 or die "Can't get flags for the socket: $!\n";
150
151         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
152
153         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154                 or die "Can't set flags for the socket: $!\n";
155
156         return $flags;
157 }
158
159 sub reset_fl {
160         my $fh = shift;
161         my $flags = shift;
162         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163         fcntl($fh, F_SETFL, $flags) if defined $flags;
164 }
165
166 sub set_block {
167         my $fh = shift;
168
169         my      $flags = fcntl($fh, F_GETFL, 0)
170                 or die "Can't get flags for the socket: $!\n";
171
172         $flags &= ~O_NONBLOCK;
173
174         fcntl($fh, F_SETFL, $flags)
175                 or die "Can't set flags for the socket: $!\n";
176 }
177
178
179 sub timed_read {
180         my ($self, $timeout) = @_;
181
182         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
183         if( $self->can( "app" ) ) {
184                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
185         }
186
187         # See if there is a complete message in the temp_buffer
188         # that we can return
189         if( $self->{temp_buffer} ) {
190                 my $buffer = $self->{temp_buffer};
191                 my $complete = 0;
192                 $self->{temp_buffer} = '';
193
194                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
195                 $logger->transport("Using tag: $tag  ", INTERNAL);
196
197                 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
198                         $buffer = $1;
199                         $self->{temp_buffer} = $2;
200                         $complete++;
201                         $logger->transport( "completed read with $buffer", INTERNAL );
202                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
203                         $self->{temp_buffer} = $1;
204                         $complete++;
205                         $logger->transport( "completed read with $buffer", INTERNAL );
206                 } else {
207                         $self->{temp_buffer} = $buffer;
208                 }
209                                 
210                 if( $buffer and $complete ) {
211                         return $buffer;
212                 }
213
214         }
215         ############
216
217         my $fh = $self->{_socket};
218
219         unless( $fh and $fh->connected ) {
220                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
221         }
222
223         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
224
225         my $flags;
226         if (defined($timeout) && !$timeout) {
227                 $flags = set_nonblock( $fh );
228         }
229
230         $timeout ||= 0;
231         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
232
233
234         my $complete = 0;
235         my $first_read = 1;
236         my $xml = '';
237         eval {
238                 my $tag = '';
239                 eval {
240                         no warnings;
241                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
242
243                         # alarm needs a number greater => 1.
244                         my $alarm_timeout = $timeout;
245                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
246                                 $alarm_timeout = 1;
247                         }
248                         alarm $alarm_timeout;
249                         do {    
250
251                                 my $buffer = $self->{temp_buffer};
252                                 $self->{temp_buffer} = '';
253                                 #####
254
255                                 my $ff =  fcntl($fh, F_GETFL, 0);
256                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
257                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
258                                 }
259
260                                 my $t_buf = "";
261                                 my $read_size = 1024; my $f = 0;
262                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
263
264                                         unless( $fh->connected ) {
265                                                 OpenSRF::EX::JabberDisconnected->throw(
266                                                         "Lost jabber client in timed_read()");
267                                         }
268
269                                         $buffer .= $t_buf;
270                                         if( $n < $read_size ) {
271                                                 #reset_fl( $fh, $f ) if $f;
272                                                 set_block( $fh );
273                                                 last;
274                                         }
275                                         # see if there is any more data to grab...
276                                         $f = set_nonblock( $fh );
277                                 }
278
279                                 #sysread($fh, $buffer, 2048, length($buffer) );
280                                 #sysread( $fh, $t_buf, 2048 );
281                                 #$buffer .= $t_buf;
282
283                                 #####
284                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
285
286                                 if ($first_read) {
287                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
288                                         ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
289                                         $first_read--;
290                                         $logger->transport("Using tag: $tag  ", INTERNAL);
291                                 }
292
293                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
294                                         $buffer = $1;
295                                         $self->{temp_buffer} = $2;
296                                         $complete++;
297                                         $logger->transport( "completed read with $buffer", INTERNAL );
298                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
299                                         $self->{temp_buffer} = $1;
300                                         $complete++;
301                                         $logger->transport( "completed read with $buffer", INTERNAL );
302                                 }
303                                 
304                                 $xml .= $buffer;
305
306                         } while (!$complete && $xml);
307                         alarm(0);
308                 };
309                 alarm(0);
310         };
311
312         $logger->transport( "XML Read: $xml", INTERNAL );
313         #reset_fl( $fh, $flags) if defined $flags;
314         set_block( $fh ) if defined $flags;
315
316         if ($complete) {
317                 return $xml;
318         }
319         if( $@ ) {
320                 return undef;
321         }
322         return "";
323 }
324
325
326 # -------------------------------------------------
327
328 sub tcp_connected {
329
330         my $self = shift;
331         return 1 if ($self->{_socket} and $self->{_socket}->connected);
332         return 0;
333 }
334
335 sub password {
336         my( $self, $password ) = @_;
337         $self->{'oils:password'} = $password if $password;
338         return $self->{'oils:password'};
339 }
340
341 # -------------------------------------------------
342
343 sub username {
344         my( $self, $username ) = @_;
345         $self->{'oils:username'} = $username if $username;
346         return $self->{'oils:username'};
347 }
348         
349 # -------------------------------------------------
350
351 sub resource {
352         my( $self, $resource ) = @_;
353         $self->{'oils:resource'} = $resource if $resource;
354         return $self->{'oils:resource'};
355 }
356
357 # -------------------------------------------------
358
359 sub jid {
360         my( $self, $jid ) = @_;
361         $self->{'oils:jid'} = $jid if $jid;
362         return $self->{'oils:jid'};
363 }
364
365 sub port {
366         my( $self, $port ) = @_;
367         $self->{'oils:port'} = $port if $port;
368         return $self->{'oils:port'};
369 }
370
371 sub host {
372         my( $self, $host ) = @_;
373         $self->{'oils:host'} = $host if $host;
374         return $self->{'oils:host'};
375 }
376
377 # -------------------------------------------------
378
379 =head2 send()
380
381         Sends a Jabber message.
382         
383         %params:
384                 to                      - The JID of the recipient
385                 thread  - The Jabber thread
386                 body            - The body of the message
387
388 =cut
389
390 sub send {
391         my $self = shift;
392         my %params = @_;
393
394         my $to = $params{'to'} || return undef;
395         my $body = $params{'body'} || return undef;
396         my $thread = $params{'thread'} || "";
397         my $router_command = $params{'router_command'} || "";
398         my $router_class = $params{'router_class'} || "";
399
400         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
401
402         $msg->setTo( $to );
403         $msg->setThread( $thread ) if $thread;
404         $msg->setBody( $body );
405         $msg->set_router_command( $router_command );
406         $msg->set_router_class( $router_class );
407
408
409         $logger->transport( 
410                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
411
412         my $soc = $self->{_socket};
413         unless( $soc and $soc->connected ) {
414                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
415         }
416         print $soc $msg->toString;
417
418         $logger->transport( 
419                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
420 }
421
422
423 =head2 inintialize()
424
425 Connect to the server and log in.  
426
427 Throws an OpenSRF::EX::JabberException if we cannot connect
428 to the server or if the authentication fails.
429
430 =cut
431
432 # --- The logging lines have been commented out until we decide 
433 # on which log files we're using.
434
435 sub initialize {
436
437         my $self = shift;
438
439         my $jid         = $self->jid; 
440         my $host        = $self->host; 
441         my $port        = $self->port; 
442         my $username    = $self->username;
443         my $resource    = $self->resource;
444         my $password    = $self->password;
445
446         my $stream = <<"        XML";
447 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
448         XML
449
450         my $conf = OpenSRF::Utils::Config->current;
451         my $tail = "_$$";
452         if(!$conf->bootstrap->router_name && $username eq "router") {
453                 $tail = "";
454         }
455
456         my $auth = <<"  XML";
457 <iq id='123' type='set'>
458 <query xmlns='jabber:iq:auth'>
459 <username>$username</username>
460 <password>$password</password>
461 <resource>${resource}$tail</resource>
462 </query>
463 </iq>
464         XML
465
466         my $sock_type = 'IO::Socket::INET';
467         
468         # if port is a string, then we're connecting to a UNIX socket
469         unless( $port =~ /^\d+$/ ) {
470                 $sock_type = 'IO::Socket::UNIX';
471         }
472
473         # --- 5 tries to connect to the jabber server
474         my $socket;
475         for(1..5) {
476                 $socket = $sock_type->new( PeerHost => $host,
477                                            PeerPort => $port,
478                                            Peer => $port,
479                                            Proto    => 'tcp' );
480                 $logger->debug( "$jid: $_ connect attempt to $host:$port");
481                 last if ( $socket and $socket->connected );
482                 $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
483                 sleep 3;
484         }
485
486         unless ( $socket and $socket->connected ) {
487                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
488         }
489
490         $logger->transport( "Logging into jabber as $jid " .
491                         "from " . ref( $self ), DEBUG );
492
493         print $socket $stream;
494
495         my $buffer;
496         eval {
497                 eval {
498                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
499                         alarm 3;
500                         sysread($socket, $buffer, 4096);
501                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
502                         alarm(0);
503                 };
504                 alarm(0);
505         };
506
507         print $socket $auth;
508
509         if( $socket and $socket->connected() ) {
510                 $self->{_socket} = $socket;
511         } else {
512                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
513         }
514
515
516         $buffer = $self->timed_read(10);
517
518         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
519
520         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
521                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
522         } else {
523                 if( !$buffer ) { $buffer = " "; }
524                 $socket->close;
525                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
526         }
527
528         return $self;
529 }
530
531 sub construct {
532         my( $class, $app ) = @_;
533         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
534         $class->peer_handle( 
535                         $class->new( $app )->initialize() );
536 }
537
538 sub process {
539
540         my( $self, $timeout ) = @_;
541
542         $timeout ||= 0;
543         undef $timeout if ( $timeout == -1 );
544
545         unless( $self->{_socket}->connected ) {
546                 OpenSRF::EX::JabberDisconnected->throw( 
547                   "This JabberClient instance is no longer connected to the server " . 
548                   $self->username . " : " . $self->resource, ERROR );
549         }
550
551         my $val = $self->timed_read( $timeout );
552
553         $timeout = "FOREVER" unless ( defined $timeout );
554         
555         if ( ! defined( $val ) ) {
556                 OpenSRF::EX::Jabber->throw( 
557                   "Call to Client->timed_read( $timeout ) failed", ERROR );
558         } elsif ( ! $val ) {
559                 $logger->transport( 
560                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
561         } elsif ( $val ) {
562                 $logger->transport( 
563                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
564         }
565
566         return $val;
567
568 }
569
570
571 # --------------------------------------------------------------
572 # Sets the socket to O_NONBLOCK, reads all of the data off of
573 # the socket, the restores the sockets flags
574 # Returns 1 on success, 0 if the socket isn't connected
575 # --------------------------------------------------------------
576 sub flush_socket {
577
578         my $self = shift;
579         my $socket = $self->{_socket};
580
581         if( $socket ) {
582
583                 my $buf;
584                 my      $flags = fcntl($socket, F_GETFL, 0);
585
586                 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
587                 while( my $n = sysread( $socket, $buf, 8192 ) ) {
588                         $logger->debug("flush_socket dropped $n bytes of data");
589                         if(!$socket->connected()) {
590                                 $logger->error("flush_socket dropped data on disconnected socket: $buf");
591                         }
592                 }
593                 fcntl($socket, F_SETFL, $flags);
594
595                 return 0 unless $socket->connected();
596
597                 return 1;
598
599         } else {
600
601                 return 0;
602         }
603 }
604
605
606
607 1;