]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
uncomment some unnecessary code
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
9
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
11 use IO::Socket::INET;
12 use IO::Socket::UNIX;
13
14 =head1 Description
15
16 OpenSRF::Transport::SlimJabber::Client
17
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
20
21   transport->server->sslport # the ssl port
22   transport->server->ssl  # is this ssl?
23
24 =cut
25
26 my $logger = "OpenSRF::Utils::Logger";
27
28 sub DESTROY{
29         my $self = shift;
30         $self->disconnect;
31 }
32
33 sub disconnect{
34         my $self = shift;
35         my $socket = $self->{_socket};
36         if( $socket and $socket->connected() ) {
37                 print $socket "</stream:stream>";
38                 close( $socket );
39         }
40 }
41
42
43 =head2 new()
44
45 Creates a new Client object.
46
47 debug and log_file are not required if you don't care to log the activity, 
48 however all other parameters are.
49
50 %params:
51
52         username
53         resource        
54         password
55         debug    
56         log_file
57
58 =cut
59
60 sub new {
61
62         my( $class, %params ) = @_;
63
64         $class = ref( $class ) || $class;
65
66         my $port                        = $params{'port'}                       || return undef;
67         my $username    = $params{'username'}   || return undef;
68         my $resource    = $params{'resource'}   || return undef;
69         my $password    = $params{'password'}   || return undef;
70         my $host                        = $params{'host'}                       || return undef;
71
72         my $jid = "$username\@$host\/$resource";
73
74         my $self = bless {} => $class;
75
76         $self->jid( $jid );
77         $self->host( $host );
78         $self->port( $port );
79         $self->username( $username );
80         $self->resource( $resource );
81         $self->password( $password );
82         $self->{temp_buffer} = "";
83
84         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
85                         $logger->INFO );
86
87         return $self;
88 }
89
90 # clears the tmp buffer as well as the TCP buffer
91 sub buffer_reset { 
92
93         my $self = shift;
94         $self->{temp_buffer} = ""; 
95
96         my $fh = $self->{_socket};
97         set_nonblock( $fh );
98         my $t_buf = "";
99         while( sysread( $fh, $t_buf, 4096 ) ) {} 
100         set_block( $fh );
101 }
102 # -------------------------------------------------
103
104 =head2 gather()
105
106 Gathers all Jabber messages sitting in the collection queue 
107 and hands them each to their respective callbacks.  This call
108 does not block (calls Process(0))
109
110 =cut
111
112 sub gather { my $self = shift; $self->process( 0 ); }
113
114 # -------------------------------------------------
115
116 =head2 listen()
117
118 Blocks and gathers incoming messages as they arrive.  Does not return
119 unless an error occurs.
120
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
122
123 =cut
124 sub listen {
125         my $self = shift;
126
127         my $sock = $self->unix_sock();
128         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
129         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
130         
131         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132                 unless ($socket->connected);
133                 
134         while(1) {
135                 my $o = $self->process( -1 );
136                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137                 if( ! defined( $o ) ) {
138                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139                 }
140                 print $socket $o;
141
142         }
143         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 }
145
146 sub set_nonblock {
147         my $fh = shift;
148         my      $flags = fcntl($fh, F_GETFL, 0)
149                 or die "Can't get flags for the socket: $!\n";
150
151         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
152
153         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154                 or die "Can't set flags for the socket: $!\n";
155
156         return $flags;
157 }
158
159 sub reset_fl {
160         my $fh = shift;
161         my $flags = shift;
162         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163         fcntl($fh, F_SETFL, $flags) if defined $flags;
164 }
165
166 sub set_block {
167         my $fh = shift;
168
169         my      $flags = fcntl($fh, F_GETFL, 0)
170                 or die "Can't get flags for the socket: $!\n";
171
172         $flags &= ~O_NONBLOCK;
173
174         fcntl($fh, F_SETFL, $flags)
175                 or die "Can't set flags for the socket: $!\n";
176 }
177
178
179 sub timed_read {
180         my ($self, $timeout) = @_;
181
182         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
183         if( $self->can( "app" ) ) {
184                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
185         }
186
187         # See if there is a complete message in the temp_buffer
188         # that we can return
189         if( $self->{temp_buffer} ) {
190                 my $buffer = $self->{temp_buffer};
191                 my $complete = 0;
192                 $self->{temp_buffer} = '';
193
194                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
195                 $self->{last_tag} = $tag;
196                 $logger->transport("Using tag: $tag  ", INTERNAL);
197
198                 if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
199                         $buffer = $1;
200                         $self->{temp_buffer} = $2;
201                         $complete++;
202                         $logger->transport( "completed read with $buffer", INTERNAL );
203                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
204                         $self->{temp_buffer} = $1;
205                         $complete++;
206                         $logger->transport( "completed read with $buffer", INTERNAL );
207                 } else {
208                         $self->{temp_buffer} = $buffer;
209                 }
210                                 
211                 if( $buffer and $complete ) {
212                         return $buffer;
213                 }
214
215         }
216         ############
217
218         my $fh = $self->{_socket};
219
220         unless( $fh and $fh->connected ) {
221                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
222         }
223
224         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
225
226         my $flags;
227         if (defined($timeout) && !$timeout) {
228                 $flags = set_nonblock( $fh );
229         }
230
231         $timeout ||= 0;
232         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
233
234
235         my $complete = 0;
236         my $first_read = 1;
237         my $xml = '';
238         eval {
239                 my $tag = '';
240                 eval {
241                         no warnings;
242                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
243
244                         # alarm needs a number greater => 1.
245                         my $alarm_timeout = $timeout;
246                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
247                                 $alarm_timeout = 1;
248                         }
249                         alarm $alarm_timeout;
250                         do {    
251
252                                 my $buffer = $self->{temp_buffer};
253                                 $self->{temp_buffer} = '';
254                                 #####
255
256                                 # This code is no longer in use
257                                 #my $ff =  fcntl($fh, F_GETFL, 0);
258                                 #if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
259                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
260                                 #}
261
262                                 my $t_buf = "";
263                                 my $read_size = 1024; my $f = 0;
264                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
265
266                                         unless( $fh->connected ) {
267                                                 OpenSRF::EX::JabberDisconnected->throw(
268                                                         "Lost jabber client in timed_read()");
269                                         }
270
271                                         # XXX Change me to debug/internal at some point, this is for testing...
272                                         $logger->info("timed_read() read $n bytes of data");
273
274                                         $buffer .= $t_buf;
275                                         if( $n < $read_size ) {
276                                                 #reset_fl( $fh, $f ) if $f;
277                                                 set_block( $fh );
278                                                 last;
279                                         }
280                                         # see if there is any more data to grab...
281                                         $f = set_nonblock( $fh );
282                                 }
283
284                                 #sysread($fh, $buffer, 2048, length($buffer) );
285                                 #sysread( $fh, $t_buf, 2048 );
286                                 #$buffer .= $t_buf;
287
288                                 #####
289                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
290
291                                 if ($first_read) {
292                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
293                                         ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
294                                         $self->{last_tag} = $tag;
295                                         $first_read--;
296                                         $logger->transport("Using tag: $tag  ", INTERNAL);
297                                 }
298
299                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
300                                         $buffer = $1;
301                                         $self->{temp_buffer} = $2;
302                                         $complete++;
303                                         $logger->transport( "completed read with $buffer", INTERNAL );
304                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
305                                         $self->{temp_buffer} = $1;
306                                         $complete++;
307                                         $logger->transport( "completed read with $buffer", INTERNAL );
308                                 }
309                                 
310                                 $xml .= $buffer;
311
312                         } while (!$complete && $xml);
313                         alarm(0);
314                 };
315                 alarm(0);
316         };
317
318         $logger->transport( "XML Read: $xml", INTERNAL );
319         #reset_fl( $fh, $flags) if defined $flags;
320         set_block( $fh ) if defined $flags;
321
322         if ($complete) {
323                 return $xml;
324         }
325         if( $@ ) {
326                 return undef;
327         }
328         return "";
329 }
330
331
332 # -------------------------------------------------
333
334 sub tcp_connected {
335
336         my $self = shift;
337         return 1 if ($self->{_socket} and $self->{_socket}->connected);
338         return 0;
339 }
340
341 sub password {
342         my( $self, $password ) = @_;
343         $self->{'oils:password'} = $password if $password;
344         return $self->{'oils:password'};
345 }
346
347 # -------------------------------------------------
348
349 sub username {
350         my( $self, $username ) = @_;
351         $self->{'oils:username'} = $username if $username;
352         return $self->{'oils:username'};
353 }
354         
355 # -------------------------------------------------
356
357 sub resource {
358         my( $self, $resource ) = @_;
359         $self->{'oils:resource'} = $resource if $resource;
360         return $self->{'oils:resource'};
361 }
362
363 # -------------------------------------------------
364
365 sub jid {
366         my( $self, $jid ) = @_;
367         $self->{'oils:jid'} = $jid if $jid;
368         return $self->{'oils:jid'};
369 }
370
371 sub port {
372         my( $self, $port ) = @_;
373         $self->{'oils:port'} = $port if $port;
374         return $self->{'oils:port'};
375 }
376
377 sub host {
378         my( $self, $host ) = @_;
379         $self->{'oils:host'} = $host if $host;
380         return $self->{'oils:host'};
381 }
382
383 # -------------------------------------------------
384
385 =head2 send()
386
387         Sends a Jabber message.
388         
389         %params:
390                 to                      - The JID of the recipient
391                 thread  - The Jabber thread
392                 body            - The body of the message
393
394 =cut
395
396 sub send {
397         my $self = shift;
398         my %params = @_;
399
400         my $to = $params{'to'} || return undef;
401         my $body = $params{'body'} || return undef;
402         my $thread = $params{'thread'} || "";
403         my $router_command = $params{'router_command'} || "";
404         my $router_class = $params{'router_class'} || "";
405
406         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
407
408         $msg->setTo( $to );
409         $msg->setThread( $thread ) if $thread;
410         $msg->setBody( $body );
411         $msg->set_router_command( $router_command );
412         $msg->set_router_class( $router_class );
413
414
415         $logger->transport( 
416                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
417
418         my $soc = $self->{_socket};
419         unless( $soc and $soc->connected ) {
420                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
421         }
422         print $soc $msg->toString;
423
424         $logger->transport( 
425                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
426 }
427
428
429 =head2 inintialize()
430
431 Connect to the server and log in.  
432
433 Throws an OpenSRF::EX::JabberException if we cannot connect
434 to the server or if the authentication fails.
435
436 =cut
437
438 # --- The logging lines have been commented out until we decide 
439 # on which log files we're using.
440
441 sub initialize {
442
443         my $self = shift;
444
445         my $jid         = $self->jid; 
446         my $host        = $self->host; 
447         my $port        = $self->port; 
448         my $username    = $self->username;
449         my $resource    = $self->resource;
450         my $password    = $self->password;
451
452         my $stream = <<"        XML";
453 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
454         XML
455
456         my $conf = OpenSRF::Utils::Config->current;
457         my $tail = "_$$";
458         if(!$conf->bootstrap->router_name && $username eq "router") {
459                 $tail = "";
460         }
461
462         my $auth = <<"  XML";
463 <iq id='123' type='set'>
464 <query xmlns='jabber:iq:auth'>
465 <username>$username</username>
466 <password>$password</password>
467 <resource>${resource}$tail</resource>
468 </query>
469 </iq>
470         XML
471
472         my $sock_type = 'IO::Socket::INET';
473         
474         # if port is a string, then we're connecting to a UNIX socket
475         unless( $port =~ /^\d+$/ ) {
476                 $sock_type = 'IO::Socket::UNIX';
477         }
478
479         # --- 5 tries to connect to the jabber server
480         my $socket;
481         for(1..5) {
482                 $socket = $sock_type->new( PeerHost => $host,
483                                            PeerPort => $port,
484                                            Peer => $port,
485                                            Proto    => 'tcp' );
486                 $logger->debug( "$jid: $_ connect attempt to $host:$port");
487                 last if ( $socket and $socket->connected );
488                 $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
489                 sleep 3;
490         }
491
492         unless ( $socket and $socket->connected ) {
493                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
494         }
495
496         $logger->transport( "Logging into jabber as $jid " .
497                         "from " . ref( $self ), DEBUG );
498
499         print $socket $stream;
500
501         my $buffer;
502         eval {
503                 eval {
504                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
505                         alarm 3;
506                         sysread($socket, $buffer, 4096);
507                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
508                         alarm(0);
509                 };
510                 alarm(0);
511         };
512
513         print $socket $auth;
514
515         if( $socket and $socket->connected() ) {
516                 $self->{_socket} = $socket;
517         } else {
518                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
519         }
520
521
522         $buffer = $self->timed_read(10);
523
524         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
525
526         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
527                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
528         } else {
529                 if( !$buffer ) { $buffer = " "; }
530                 $socket->close;
531                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
532         }
533
534         return $self;
535 }
536
537 sub construct {
538         my( $class, $app ) = @_;
539         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
540         $class->peer_handle( 
541                         $class->new( $app )->initialize() );
542 }
543
544 sub process {
545
546         my( $self, $timeout ) = @_;
547
548         $timeout ||= 0;
549         undef $timeout if ( $timeout == -1 );
550
551         unless( $self->{_socket}->connected ) {
552                 OpenSRF::EX::JabberDisconnected->throw( 
553                   "This JabberClient instance is no longer connected to the server " . 
554                   $self->username . " : " . $self->resource, ERROR );
555         }
556
557         my $val = $self->timed_read( $timeout );
558
559         $timeout = "FOREVER" unless ( defined $timeout );
560         
561         if ( ! defined( $val ) ) {
562                 OpenSRF::EX::Jabber->throw( 
563                   "Call to Client->timed_read( $timeout ) failed", ERROR );
564         } elsif ( ! $val ) {
565                 $logger->transport( 
566                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
567         } elsif ( $val ) {
568                 $logger->transport( 
569                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
570         }
571
572         my $t = $self->{last_tag};
573
574         if( $t and $val ) {
575                 my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
576                 $val = shift(@msgs);
577         
578                 if (@msgs) {
579                         my $tmp = $self->{temp_buffer};
580         
581                         $self->{temp_buffer} = '';
582                         $self->{temp_buffer} .= $_ for (@msgs);
583                         $self->{temp_buffer} .= $tmp;
584                 }
585         }
586
587         return $val;
588 }
589
590
591 # --------------------------------------------------------------
592 # Sets the socket to O_NONBLOCK, reads all of the data off of
593 # the socket, the restores the sockets flags
594 # Returns 1 on success, 0 if the socket isn't connected
595 # --------------------------------------------------------------
596 sub flush_socket {
597
598         my $self = shift;
599         my $socket = $self->{_socket};
600
601         if( $socket ) {
602
603                 my $buf;
604                 my      $flags = fcntl($socket, F_GETFL, 0);
605
606                 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
607                 while( my $n = sysread( $socket, $buf, 8192 ) ) {
608                         $logger->debug("flush_socket dropped $n bytes of data");
609                         if(!$socket->connected()) {
610                                 $logger->error("flush_socket dropped data on disconnected socket: $buf");
611                         }
612                 }
613                 fcntl($socket, F_SETFL, $flags);
614
615                 return 0 unless $socket->connected();
616
617                 return 1;
618
619         } else {
620
621                 return 0;
622         }
623 }
624
625
626
627 1;