]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
18529c54189fec7425479786aba43bb5f8b8bb06
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
9
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
11 use IO::Socket::INET;
12 use IO::Socket::UNIX;
13
14 =head1 Description
15
16 OpenSRF::Transport::SlimJabber::Client
17
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
20
21   transport->server->sslport # the ssl port
22   transport->server->ssl  # is this ssl?
23
24 =cut
25
26 my $logger = "OpenSRF::Utils::Logger";
27
28 sub DESTROY{
29         my $self = shift;
30         $self->disconnect;
31 }
32
33 sub disconnect{
34         my $self = shift;
35         my $socket = $self->{_socket};
36         if( $socket and $socket->connected() ) {
37                 print $socket "</stream:stream>";
38                 close( $socket );
39         }
40 }
41
42
43 =head2 new()
44
45 Creates a new Client object.
46
47 debug and log_file are not required if you don't care to log the activity, 
48 however all other parameters are.
49
50 %params:
51
52         username
53         resource        
54         password
55         debug    
56         log_file
57
58 =cut
59
60 sub new {
61
62         my( $class, %params ) = @_;
63
64         $class = ref( $class ) || $class;
65
66         my $port                        = $params{'port'}                       || return undef;
67         my $username    = $params{'username'}   || return undef;
68         my $resource    = $params{'resource'}   || return undef;
69         my $password    = $params{'password'}   || return undef;
70         my $host                        = $params{'host'}                       || return undef;
71
72         my $jid = "$username\@$host\/$resource";
73
74         my $self = bless {} => $class;
75
76         $self->jid( $jid );
77         $self->host( $host );
78         $self->port( $port );
79         $self->username( $username );
80         $self->resource( $resource );
81         $self->password( $password );
82         $self->{temp_buffer} = "";
83
84         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
85                         $logger->INFO );
86
87         return $self;
88 }
89
90 # clears the tmp buffer as well as the TCP buffer
91 sub buffer_reset { 
92
93         my $self = shift;
94         $self->{temp_buffer} = ""; 
95
96         my $fh = $self->{_socket};
97         set_nonblock( $fh );
98         my $t_buf = "";
99         while( sysread( $fh, $t_buf, 4096 ) ) {} 
100         set_block( $fh );
101 }
102 # -------------------------------------------------
103
104 =head2 gather()
105
106 Gathers all Jabber messages sitting in the collection queue 
107 and hands them each to their respective callbacks.  This call
108 does not block (calls Process(0))
109
110 =cut
111
112 sub gather { my $self = shift; $self->process( 0 ); }
113
114 # -------------------------------------------------
115
116 =head2 listen()
117
118 Blocks and gathers incoming messages as they arrive.  Does not return
119 unless an error occurs.
120
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
122
123 =cut
124 sub listen {
125         my $self = shift;
126
127         my $sock = $self->unix_sock();
128         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
129         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
130         
131         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132                 unless ($socket->connected);
133                 
134         while(1) {
135                 my $o = $self->process( -1 );
136                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137                 if( ! defined( $o ) ) {
138                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139                 }
140                 print $socket $o;
141
142         }
143         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 }
145
146 sub set_nonblock {
147         my $fh = shift;
148         my      $flags = fcntl($fh, F_GETFL, 0)
149                 or die "Can't get flags for the socket: $!\n";
150
151         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
152
153         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154                 or die "Can't set flags for the socket: $!\n";
155
156         return $flags;
157 }
158
159 sub reset_fl {
160         my $fh = shift;
161         my $flags = shift;
162         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163         fcntl($fh, F_SETFL, $flags) if defined $flags;
164 }
165
166 sub set_block {
167         my $fh = shift;
168
169         my      $flags = fcntl($fh, F_GETFL, 0)
170                 or die "Can't get flags for the socket: $!\n";
171
172         $flags &= ~O_NONBLOCK;
173
174         fcntl($fh, F_SETFL, $flags)
175                 or die "Can't set flags for the socket: $!\n";
176 }
177
178
179 sub timed_read {
180         my ($self, $timeout) = @_;
181     $timeout = defined($timeout) ? int($timeout) : undef;
182
183         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
184         if( $self->can( "app" ) ) {
185                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
186         }
187
188         # See if there is a complete message in the temp_buffer
189         # that we can return
190         if( $self->{temp_buffer} ) {
191                 my $buffer = $self->{temp_buffer};
192                 my $complete = 0;
193                 $self->{temp_buffer} = '';
194
195                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
196                 $self->{last_tag} = $tag;
197                 $logger->transport("Using tag: $tag  ", INTERNAL);
198
199                 if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
200                         $buffer = $1;
201                         $self->{temp_buffer} = $2;
202                         $complete++;
203                         $logger->transport( "completed read with $buffer", INTERNAL );
204                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
205                         $self->{temp_buffer} = $1;
206                         $complete++;
207                         $logger->transport( "completed read with $buffer", INTERNAL );
208                 } else {
209                         $self->{temp_buffer} = $buffer;
210                 }
211                                 
212                 if( $buffer and $complete ) {
213                         return $buffer;
214                 }
215
216         }
217         ############
218
219         my $fh = $self->{_socket};
220
221         unless( $fh and $fh->connected ) {
222                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
223         }
224
225         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
226
227         my $flags;
228         if (defined($timeout) && !$timeout) {
229                 $flags = set_nonblock( $fh );
230         }
231
232         $timeout ||= 0;
233         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
234
235
236         my $complete = 0;
237         my $first_read = 1;
238         my $xml = '';
239         eval {
240                 my $tag = '';
241                 eval {
242                         no warnings;
243                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
244
245                         # alarm needs a number greater => 1.
246                         my $alarm_timeout = $timeout;
247                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
248                                 $alarm_timeout = 1;
249                         }
250                         alarm $alarm_timeout;
251                         do {    
252
253                                 my $buffer = $self->{temp_buffer};
254                                 $self->{temp_buffer} = '';
255                                 #####
256
257                                 # This code is no longer in use
258                                 #my $ff =  fcntl($fh, F_GETFL, 0);
259                                 #if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
260                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
261                                 #}
262
263                                 my $t_buf = "";
264                                 my $read_size = 1024; my $f = 0;
265                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
266
267                                         unless( $fh->connected ) {
268                                                 OpenSRF::EX::JabberDisconnected->throw(
269                                                         "Lost jabber client in timed_read()");
270                                         }
271
272                                         # XXX Change me to debug/internal at some point, this is for testing...
273                                         # XXX Found a race condition where reading >= $read_size bytes of data
274                                         # will fail if the log line below is removed.
275                                         $logger->info("timed_read() read $n bytes of data");
276
277
278                                         $buffer .= $t_buf;
279                                         if( $n < $read_size ) {
280                                                 #reset_fl( $fh, $f ) if $f;
281                                                 set_block( $fh );
282                                                 last;
283                                         }
284                                         # see if there is any more data to grab...
285                                         $f = set_nonblock( $fh );
286                                 }
287
288                                 #sysread($fh, $buffer, 2048, length($buffer) );
289                                 #sysread( $fh, $t_buf, 2048 );
290                                 #$buffer .= $t_buf;
291
292                                 #####
293                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
294
295                                 if ($first_read) {
296                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
297                                         ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
298                                         $self->{last_tag} = $tag;
299                                         $first_read--;
300                                         $logger->transport("Using tag: $tag  ", INTERNAL);
301                                 }
302
303                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
304                                         $buffer = $1;
305                                         $self->{temp_buffer} = $2;
306                                         $complete++;
307                                         $logger->transport( "completed read with $buffer", INTERNAL );
308                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
309                                         $self->{temp_buffer} = $1;
310                                         $complete++;
311                                         $logger->transport( "completed read with $buffer", INTERNAL );
312                                 }
313                                 
314                                 $xml .= $buffer;
315
316                         } while (!$complete && $xml);
317                         alarm(0);
318                 };
319                 alarm(0);
320         };
321
322         $logger->transport( "XML Read: $xml", INTERNAL );
323         #reset_fl( $fh, $flags) if defined $flags;
324         set_block( $fh ) if defined $flags;
325
326         if ($complete) {
327                 return $xml;
328         }
329         if( $@ ) {
330                 return undef;
331         }
332         return "";
333 }
334
335
336 # -------------------------------------------------
337
338 sub tcp_connected {
339
340         my $self = shift;
341         return 1 if ($self->{_socket} and $self->{_socket}->connected);
342         return 0;
343 }
344
345 sub password {
346         my( $self, $password ) = @_;
347         $self->{'oils:password'} = $password if $password;
348         return $self->{'oils:password'};
349 }
350
351 # -------------------------------------------------
352
353 sub username {
354         my( $self, $username ) = @_;
355         $self->{'oils:username'} = $username if $username;
356         return $self->{'oils:username'};
357 }
358         
359 # -------------------------------------------------
360
361 sub resource {
362         my( $self, $resource ) = @_;
363         $self->{'oils:resource'} = $resource if $resource;
364         return $self->{'oils:resource'};
365 }
366
367 # -------------------------------------------------
368
369 sub jid {
370         my( $self, $jid ) = @_;
371         $self->{'oils:jid'} = $jid if $jid;
372         return $self->{'oils:jid'};
373 }
374
375 sub port {
376         my( $self, $port ) = @_;
377         $self->{'oils:port'} = $port if $port;
378         return $self->{'oils:port'};
379 }
380
381 sub host {
382         my( $self, $host ) = @_;
383         $self->{'oils:host'} = $host if $host;
384         return $self->{'oils:host'};
385 }
386
387 # -------------------------------------------------
388
389 =head2 send()
390
391         Sends a Jabber message.
392         
393         %params:
394                 to                      - The JID of the recipient
395                 thread  - The Jabber thread
396                 body            - The body of the message
397
398 =cut
399
400 sub send {
401         my $self = shift;
402         my %params = @_;
403
404         my $to = $params{'to'} || return undef;
405         my $body = $params{'body'} || return undef;
406         my $thread = $params{'thread'} || "";
407         my $router_command = $params{'router_command'} || "";
408         my $router_class = $params{'router_class'} || "";
409         my $locale = $params{'locale'} || "";
410
411         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
412
413         $msg->setTo( $to );
414         $msg->setThread( $thread ) if $thread;
415         $msg->setBody( $body );
416         $msg->set_router_command( $router_command );
417         $msg->set_router_class( $router_class );
418         $msg->set_locale( $locale );
419     $msg->set_osrf_xid($logger->get_osrf_xid);
420
421         $logger->transport( 
422                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
423
424         my $soc = $self->{_socket};
425         unless( $soc and $soc->connected ) {
426                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
427         }
428         print $soc $msg->toString;
429
430         $logger->transport( 
431                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
432 }
433
434
435 =head2 inintialize()
436
437 Connect to the server and log in.  
438
439 Throws an OpenSRF::EX::JabberException if we cannot connect
440 to the server or if the authentication fails.
441
442 =cut
443
444 # --- The logging lines have been commented out until we decide 
445 # on which log files we're using.
446
447 sub initialize {
448
449         my $self = shift;
450
451         my $jid         = $self->jid; 
452         my $host        = $self->host; 
453         my $port        = $self->port; 
454         my $username    = $self->username;
455         my $resource    = $self->resource;
456         my $password    = $self->password;
457
458         my $stream = <<"        XML";
459 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
460         XML
461
462         my $conf = OpenSRF::Utils::Config->current;
463         my $tail = "_$$";
464         if(!$conf->bootstrap->router_name && $username eq "router") {
465                 $tail = "";
466         }
467
468         my $auth = <<"  XML";
469 <iq id='123' type='set'>
470 <query xmlns='jabber:iq:auth'>
471 <username>$username</username>
472 <password>$password</password>
473 <resource>${resource}$tail</resource>
474 </query>
475 </iq>
476         XML
477
478         my $sock_type = 'IO::Socket::INET';
479         
480         # if port is a string, then we're connecting to a UNIX socket
481         unless( $port =~ /^\d+$/ ) {
482                 $sock_type = 'IO::Socket::UNIX';
483         }
484
485         # --- 5 tries to connect to the jabber server
486         my $socket;
487         for(1..5) {
488                 $socket = $sock_type->new( PeerHost => $host,
489                                            PeerPort => $port,
490                                            Peer => $port,
491                                            Proto    => 'tcp' );
492                 $logger->debug( "$jid: $_ connect attempt to $host:$port");
493                 last if ( $socket and $socket->connected );
494                 $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
495                 sleep 3;
496         }
497
498         unless ( $socket and $socket->connected ) {
499                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
500         }
501
502         $logger->transport( "Logging into jabber as $jid " .
503                         "from " . ref( $self ), DEBUG );
504
505         print $socket $stream;
506
507         my $buffer;
508         eval {
509                 eval {
510                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
511                         alarm 3;
512                         sysread($socket, $buffer, 4096);
513                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
514                         alarm(0);
515                 };
516                 alarm(0);
517         };
518
519         print $socket $auth;
520
521         if( $socket and $socket->connected() ) {
522                 $self->{_socket} = $socket;
523         } else {
524                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
525         }
526
527
528         $buffer = $self->timed_read(10);
529
530         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
531
532         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
533                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
534         } else {
535                 if( !$buffer ) { $buffer = " "; }
536                 $socket->close;
537                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
538         }
539
540         return $self;
541 }
542
543 sub construct {
544         my( $class, $app ) = @_;
545         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
546         $class->peer_handle( 
547                         $class->new( $app )->initialize() );
548 }
549
550 sub process {
551
552         my( $self, $timeout ) = @_;
553
554         $timeout ||= 0;
555     $timeout = int($timeout);
556         undef $timeout if ( $timeout < 0 );
557
558         unless( $self->{_socket}->connected ) {
559                 OpenSRF::EX::JabberDisconnected->throw( 
560                   "This JabberClient instance is no longer connected to the server " . 
561                   $self->username . " : " . $self->resource, ERROR );
562         }
563
564         my $val = $self->timed_read( $timeout );
565
566         $timeout = "FOREVER" unless ( defined $timeout );
567         
568         if ( ! defined( $val ) ) {
569                 OpenSRF::EX::Jabber->throw( 
570                   "Call to Client->timed_read( $timeout ) failed", ERROR );
571         } elsif ( ! $val ) {
572                 $logger->transport( 
573                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
574         } elsif ( $val ) {
575                 $logger->transport( 
576                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
577         }
578
579         my $t = $self->{last_tag};
580
581         if( $t and $val ) {
582                 my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
583                 $val = shift(@msgs);
584         
585                 if (@msgs) {
586                         my $tmp = $self->{temp_buffer};
587         
588                         $self->{temp_buffer} = '';
589                         $self->{temp_buffer} .= $_ for (@msgs);
590                         $self->{temp_buffer} .= $tmp;
591                 }
592         }
593
594         return $val;
595 }
596
597
598 # --------------------------------------------------------------
599 # Sets the socket to O_NONBLOCK, reads all of the data off of
600 # the socket, the restores the sockets flags
601 # Returns 1 on success, 0 if the socket isn't connected
602 # --------------------------------------------------------------
603 sub flush_socket {
604
605         my $self = shift;
606         my $socket = $self->{_socket};
607
608         if( $socket ) {
609
610                 my $buf;
611                 my      $flags = fcntl($socket, F_GETFL, 0);
612
613                 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
614                 while( my $n = sysread( $socket, $buf, 8192 ) ) {
615                         $logger->debug("flush_socket dropped $n bytes of data");
616                         if(!$socket->connected()) {
617                                 $logger->error("flush_socket dropped data on disconnected socket: $buf");
618                         }
619                 }
620                 fcntl($socket, F_SETFL, $flags);
621
622                 return 0 unless $socket->connected();
623
624                 return 1;
625
626         } else {
627
628                 return 0;
629         }
630 }
631
632
633
634 1;