added a comment
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
9
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
11 use IO::Socket::INET;
12 use IO::Socket::UNIX;
13
14 =head1 Description
15
16 OpenSRF::Transport::SlimJabber::Client
17
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
20
21   transport->server->sslport # the ssl port
22   transport->server->ssl  # is this ssl?
23
24 =cut
25
26 my $logger = "OpenSRF::Utils::Logger";
27
28 sub DESTROY{
29         my $self = shift;
30         $self->disconnect;
31 }
32
33 sub disconnect{
34         my $self = shift;
35         my $socket = $self->{_socket};
36         if( $socket and $socket->connected() ) {
37                 print $socket "</stream:stream>";
38                 close( $socket );
39         }
40 }
41
42
43 =head2 new()
44
45 Creates a new Client object.
46
47 debug and log_file are not required if you don't care to log the activity, 
48 however all other parameters are.
49
50 %params:
51
52         username
53         resource        
54         password
55         debug    
56         log_file
57
58 =cut
59
60 sub new {
61
62         my( $class, %params ) = @_;
63
64         $class = ref( $class ) || $class;
65
66         my $port                        = $params{'port'}                       || return undef;
67         my $username    = $params{'username'}   || return undef;
68         my $resource    = $params{'resource'}   || return undef;
69         my $password    = $params{'password'}   || return undef;
70         my $host                        = $params{'host'}                       || return undef;
71
72         my $jid = "$username\@$host\/$resource";
73
74         my $self = bless {} => $class;
75
76         $self->jid( $jid );
77         $self->host( $host );
78         $self->port( $port );
79         $self->username( $username );
80         $self->resource( $resource );
81         $self->password( $password );
82         $self->{temp_buffer} = "";
83
84         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
85                         $logger->INFO );
86
87         return $self;
88 }
89
90 # clears the tmp buffer as well as the TCP buffer
91 sub buffer_reset { 
92
93         my $self = shift;
94         $self->{temp_buffer} = ""; 
95
96         my $fh = $self->{_socket};
97         set_nonblock( $fh );
98         my $t_buf = "";
99         while( sysread( $fh, $t_buf, 4096 ) ) {} 
100         set_block( $fh );
101 }
102 # -------------------------------------------------
103
104 =head2 gather()
105
106 Gathers all Jabber messages sitting in the collection queue 
107 and hands them each to their respective callbacks.  This call
108 does not block (calls Process(0))
109
110 =cut
111
112 sub gather { my $self = shift; $self->process( 0 ); }
113
114 # -------------------------------------------------
115
116 =head2 listen()
117
118 Blocks and gathers incoming messages as they arrive.  Does not return
119 unless an error occurs.
120
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
122
123 =cut
124 sub listen {
125         my $self = shift;
126
127         my $sock = $self->unix_sock();
128         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
129         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
130         
131         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132                 unless ($socket->connected);
133                 
134         while(1) {
135                 my $o = $self->process( -1 );
136                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137                 if( ! defined( $o ) ) {
138                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139                 }
140                 print $socket $o;
141
142         }
143         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 }
145
146 sub set_nonblock {
147         my $fh = shift;
148         my      $flags = fcntl($fh, F_GETFL, 0)
149                 or die "Can't get flags for the socket: $!\n";
150
151         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
152
153         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154                 or die "Can't set flags for the socket: $!\n";
155
156         return $flags;
157 }
158
159 sub reset_fl {
160         my $fh = shift;
161         my $flags = shift;
162         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163         fcntl($fh, F_SETFL, $flags) if defined $flags;
164 }
165
166 sub set_block {
167         my $fh = shift;
168
169         my      $flags = fcntl($fh, F_GETFL, 0)
170                 or die "Can't get flags for the socket: $!\n";
171
172         $flags &= ~O_NONBLOCK;
173
174         fcntl($fh, F_SETFL, $flags)
175                 or die "Can't set flags for the socket: $!\n";
176 }
177
178
179 sub timed_read {
180         my ($self, $timeout) = @_;
181
182         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
183         if( $self->can( "app" ) ) {
184                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
185         }
186
187         # See if there is a complete message in the temp_buffer
188         # that we can return
189         if( $self->{temp_buffer} ) {
190                 my $buffer = $self->{temp_buffer};
191                 my $complete = 0;
192                 $self->{temp_buffer} = '';
193
194                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
195                 $self->{last_tag} = $tag;
196                 $logger->transport("Using tag: $tag  ", INTERNAL);
197
198                 if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
199                         $buffer = $1;
200                         $self->{temp_buffer} = $2;
201                         $complete++;
202                         $logger->transport( "completed read with $buffer", INTERNAL );
203                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
204                         $self->{temp_buffer} = $1;
205                         $complete++;
206                         $logger->transport( "completed read with $buffer", INTERNAL );
207                 } else {
208                         $self->{temp_buffer} = $buffer;
209                 }
210                                 
211                 if( $buffer and $complete ) {
212                         return $buffer;
213                 }
214
215         }
216         ############
217
218         my $fh = $self->{_socket};
219
220         unless( $fh and $fh->connected ) {
221                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
222         }
223
224         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
225
226         my $flags;
227         if (defined($timeout) && !$timeout) {
228                 $flags = set_nonblock( $fh );
229         }
230
231         $timeout ||= 0;
232         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
233
234
235         my $complete = 0;
236         my $first_read = 1;
237         my $xml = '';
238         eval {
239                 my $tag = '';
240                 eval {
241                         no warnings;
242                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
243
244                         # alarm needs a number greater => 1.
245                         my $alarm_timeout = $timeout;
246                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
247                                 $alarm_timeout = 1;
248                         }
249                         alarm $alarm_timeout;
250                         do {    
251
252                                 my $buffer = $self->{temp_buffer};
253                                 $self->{temp_buffer} = '';
254                                 #####
255
256                                 # This code is no longer in use
257                                 #my $ff =  fcntl($fh, F_GETFL, 0);
258                                 #if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
259                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
260                                 #}
261
262                                 my $t_buf = "";
263                                 my $read_size = 1024; my $f = 0;
264                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
265
266                                         unless( $fh->connected ) {
267                                                 OpenSRF::EX::JabberDisconnected->throw(
268                                                         "Lost jabber client in timed_read()");
269                                         }
270
271                                         # XXX Change me to debug/internal at some point, this is for testing...
272                                         # XXX Found a race condition where reading >= $read_size bytes of data
273                                         # will fail if the log line below is removed.
274                                         $logger->info("timed_read() read $n bytes of data");
275
276
277                                         $buffer .= $t_buf;
278                                         if( $n < $read_size ) {
279                                                 #reset_fl( $fh, $f ) if $f;
280                                                 set_block( $fh );
281                                                 last;
282                                         }
283                                         # see if there is any more data to grab...
284                                         $f = set_nonblock( $fh );
285                                 }
286
287                                 #sysread($fh, $buffer, 2048, length($buffer) );
288                                 #sysread( $fh, $t_buf, 2048 );
289                                 #$buffer .= $t_buf;
290
291                                 #####
292                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
293
294                                 if ($first_read) {
295                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
296                                         ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
297                                         $self->{last_tag} = $tag;
298                                         $first_read--;
299                                         $logger->transport("Using tag: $tag  ", INTERNAL);
300                                 }
301
302                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
303                                         $buffer = $1;
304                                         $self->{temp_buffer} = $2;
305                                         $complete++;
306                                         $logger->transport( "completed read with $buffer", INTERNAL );
307                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
308                                         $self->{temp_buffer} = $1;
309                                         $complete++;
310                                         $logger->transport( "completed read with $buffer", INTERNAL );
311                                 }
312                                 
313                                 $xml .= $buffer;
314
315                         } while (!$complete && $xml);
316                         alarm(0);
317                 };
318                 alarm(0);
319         };
320
321         $logger->transport( "XML Read: $xml", INTERNAL );
322         #reset_fl( $fh, $flags) if defined $flags;
323         set_block( $fh ) if defined $flags;
324
325         if ($complete) {
326                 return $xml;
327         }
328         if( $@ ) {
329                 return undef;
330         }
331         return "";
332 }
333
334
335 # -------------------------------------------------
336
337 sub tcp_connected {
338
339         my $self = shift;
340         return 1 if ($self->{_socket} and $self->{_socket}->connected);
341         return 0;
342 }
343
344 sub password {
345         my( $self, $password ) = @_;
346         $self->{'oils:password'} = $password if $password;
347         return $self->{'oils:password'};
348 }
349
350 # -------------------------------------------------
351
352 sub username {
353         my( $self, $username ) = @_;
354         $self->{'oils:username'} = $username if $username;
355         return $self->{'oils:username'};
356 }
357         
358 # -------------------------------------------------
359
360 sub resource {
361         my( $self, $resource ) = @_;
362         $self->{'oils:resource'} = $resource if $resource;
363         return $self->{'oils:resource'};
364 }
365
366 # -------------------------------------------------
367
368 sub jid {
369         my( $self, $jid ) = @_;
370         $self->{'oils:jid'} = $jid if $jid;
371         return $self->{'oils:jid'};
372 }
373
374 sub port {
375         my( $self, $port ) = @_;
376         $self->{'oils:port'} = $port if $port;
377         return $self->{'oils:port'};
378 }
379
380 sub host {
381         my( $self, $host ) = @_;
382         $self->{'oils:host'} = $host if $host;
383         return $self->{'oils:host'};
384 }
385
386 # -------------------------------------------------
387
388 =head2 send()
389
390         Sends a Jabber message.
391         
392         %params:
393                 to                      - The JID of the recipient
394                 thread  - The Jabber thread
395                 body            - The body of the message
396
397 =cut
398
399 sub send {
400         my $self = shift;
401         my %params = @_;
402
403         my $to = $params{'to'} || return undef;
404         my $body = $params{'body'} || return undef;
405         my $thread = $params{'thread'} || "";
406         my $router_command = $params{'router_command'} || "";
407         my $router_class = $params{'router_class'} || "";
408
409         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
410
411         $msg->setTo( $to );
412         $msg->setThread( $thread ) if $thread;
413         $msg->setBody( $body );
414         $msg->set_router_command( $router_command );
415         $msg->set_router_class( $router_class );
416
417
418         $logger->transport( 
419                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
420
421         my $soc = $self->{_socket};
422         unless( $soc and $soc->connected ) {
423                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
424         }
425         print $soc $msg->toString;
426
427         $logger->transport( 
428                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
429 }
430
431
432 =head2 inintialize()
433
434 Connect to the server and log in.  
435
436 Throws an OpenSRF::EX::JabberException if we cannot connect
437 to the server or if the authentication fails.
438
439 =cut
440
441 # --- The logging lines have been commented out until we decide 
442 # on which log files we're using.
443
444 sub initialize {
445
446         my $self = shift;
447
448         my $jid         = $self->jid; 
449         my $host        = $self->host; 
450         my $port        = $self->port; 
451         my $username    = $self->username;
452         my $resource    = $self->resource;
453         my $password    = $self->password;
454
455         my $stream = <<"        XML";
456 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
457         XML
458
459         my $conf = OpenSRF::Utils::Config->current;
460         my $tail = "_$$";
461         if(!$conf->bootstrap->router_name && $username eq "router") {
462                 $tail = "";
463         }
464
465         my $auth = <<"  XML";
466 <iq id='123' type='set'>
467 <query xmlns='jabber:iq:auth'>
468 <username>$username</username>
469 <password>$password</password>
470 <resource>${resource}$tail</resource>
471 </query>
472 </iq>
473         XML
474
475         my $sock_type = 'IO::Socket::INET';
476         
477         # if port is a string, then we're connecting to a UNIX socket
478         unless( $port =~ /^\d+$/ ) {
479                 $sock_type = 'IO::Socket::UNIX';
480         }
481
482         # --- 5 tries to connect to the jabber server
483         my $socket;
484         for(1..5) {
485                 $socket = $sock_type->new( PeerHost => $host,
486                                            PeerPort => $port,
487                                            Peer => $port,
488                                            Proto    => 'tcp' );
489                 $logger->debug( "$jid: $_ connect attempt to $host:$port");
490                 last if ( $socket and $socket->connected );
491                 $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
492                 sleep 3;
493         }
494
495         unless ( $socket and $socket->connected ) {
496                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
497         }
498
499         $logger->transport( "Logging into jabber as $jid " .
500                         "from " . ref( $self ), DEBUG );
501
502         print $socket $stream;
503
504         my $buffer;
505         eval {
506                 eval {
507                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
508                         alarm 3;
509                         sysread($socket, $buffer, 4096);
510                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
511                         alarm(0);
512                 };
513                 alarm(0);
514         };
515
516         print $socket $auth;
517
518         if( $socket and $socket->connected() ) {
519                 $self->{_socket} = $socket;
520         } else {
521                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
522         }
523
524
525         $buffer = $self->timed_read(10);
526
527         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
528
529         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
530                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
531         } else {
532                 if( !$buffer ) { $buffer = " "; }
533                 $socket->close;
534                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
535         }
536
537         return $self;
538 }
539
540 sub construct {
541         my( $class, $app ) = @_;
542         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
543         $class->peer_handle( 
544                         $class->new( $app )->initialize() );
545 }
546
547 sub process {
548
549         my( $self, $timeout ) = @_;
550
551         $timeout ||= 0;
552         undef $timeout if ( $timeout == -1 );
553
554         unless( $self->{_socket}->connected ) {
555                 OpenSRF::EX::JabberDisconnected->throw( 
556                   "This JabberClient instance is no longer connected to the server " . 
557                   $self->username . " : " . $self->resource, ERROR );
558         }
559
560         my $val = $self->timed_read( $timeout );
561
562         $timeout = "FOREVER" unless ( defined $timeout );
563         
564         if ( ! defined( $val ) ) {
565                 OpenSRF::EX::Jabber->throw( 
566                   "Call to Client->timed_read( $timeout ) failed", ERROR );
567         } elsif ( ! $val ) {
568                 $logger->transport( 
569                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
570         } elsif ( $val ) {
571                 $logger->transport( 
572                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
573         }
574
575         my $t = $self->{last_tag};
576
577         if( $t and $val ) {
578                 my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
579                 $val = shift(@msgs);
580         
581                 if (@msgs) {
582                         my $tmp = $self->{temp_buffer};
583         
584                         $self->{temp_buffer} = '';
585                         $self->{temp_buffer} .= $_ for (@msgs);
586                         $self->{temp_buffer} .= $tmp;
587                 }
588         }
589
590         return $val;
591 }
592
593
594 # --------------------------------------------------------------
595 # Sets the socket to O_NONBLOCK, reads all of the data off of
596 # the socket, the restores the sockets flags
597 # Returns 1 on success, 0 if the socket isn't connected
598 # --------------------------------------------------------------
599 sub flush_socket {
600
601         my $self = shift;
602         my $socket = $self->{_socket};
603
604         if( $socket ) {
605
606                 my $buf;
607                 my      $flags = fcntl($socket, F_GETFL, 0);
608
609                 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
610                 while( my $n = sysread( $socket, $buf, 8192 ) ) {
611                         $logger->debug("flush_socket dropped $n bytes of data");
612                         if(!$socket->connected()) {
613                                 $logger->error("flush_socket dropped data on disconnected socket: $buf");
614                         }
615                 }
616                 fcntl($socket, F_SETFL, $flags);
617
618                 return 0 unless $socket->connected();
619
620                 return 1;
621
622         } else {
623
624                 return 0;
625         }
626 }
627
628
629
630 1;