chopping up dup messages in Client::process()
[Evergreen.git] / OpenSRF / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
9
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
11 use IO::Socket::INET;
12 use IO::Socket::UNIX;
13
14 =head1 Description
15
16 OpenSRF::Transport::SlimJabber::Client
17
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
20
21   transport->server->sslport # the ssl port
22   transport->server->ssl  # is this ssl?
23
24 =cut
25
26 my $logger = "OpenSRF::Utils::Logger";
27
28 sub DESTROY{
29         my $self = shift;
30         $self->disconnect;
31 }
32
33 sub disconnect{
34         my $self = shift;
35         my $socket = $self->{_socket};
36         if( $socket and $socket->connected() ) {
37                 print $socket "</stream:stream>";
38                 close( $socket );
39         }
40 }
41
42
43 =head2 new()
44
45 Creates a new Client object.
46
47 debug and log_file are not required if you don't care to log the activity, 
48 however all other parameters are.
49
50 %params:
51
52         username
53         resource        
54         password
55         debug    
56         log_file
57
58 =cut
59
60 sub new {
61
62         my( $class, %params ) = @_;
63
64         $class = ref( $class ) || $class;
65
66         my $port                        = $params{'port'}                       || return undef;
67         my $username    = $params{'username'}   || return undef;
68         my $resource    = $params{'resource'}   || return undef;
69         my $password    = $params{'password'}   || return undef;
70         my $host                        = $params{'host'}                       || return undef;
71
72         my $jid = "$username\@$host\/$resource";
73
74         my $self = bless {} => $class;
75
76         $self->jid( $jid );
77         $self->host( $host );
78         $self->port( $port );
79         $self->username( $username );
80         $self->resource( $resource );
81         $self->password( $password );
82         $self->{temp_buffer} = "";
83
84         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
85                         $logger->INFO );
86
87         return $self;
88 }
89
90 # clears the tmp buffer as well as the TCP buffer
91 sub buffer_reset { 
92
93         my $self = shift;
94         $self->{temp_buffer} = ""; 
95
96         my $fh = $self->{_socket};
97         set_nonblock( $fh );
98         my $t_buf = "";
99         while( sysread( $fh, $t_buf, 4096 ) ) {} 
100         set_block( $fh );
101 }
102 # -------------------------------------------------
103
104 =head2 gather()
105
106 Gathers all Jabber messages sitting in the collection queue 
107 and hands them each to their respective callbacks.  This call
108 does not block (calls Process(0))
109
110 =cut
111
112 sub gather { my $self = shift; $self->process( 0 ); }
113
114 # -------------------------------------------------
115
116 =head2 listen()
117
118 Blocks and gathers incoming messages as they arrive.  Does not return
119 unless an error occurs.
120
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
122
123 =cut
124 sub listen {
125         my $self = shift;
126
127         my $sock = $self->unix_sock();
128         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
129         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
130         
131         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132                 unless ($socket->connected);
133                 
134         while(1) {
135                 my $o = $self->process( -1 );
136                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137                 if( ! defined( $o ) ) {
138                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139                 }
140                 print $socket $o;
141
142         }
143         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 }
145
146 sub set_nonblock {
147         my $fh = shift;
148         my      $flags = fcntl($fh, F_GETFL, 0)
149                 or die "Can't get flags for the socket: $!\n";
150
151         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
152
153         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154                 or die "Can't set flags for the socket: $!\n";
155
156         return $flags;
157 }
158
159 sub reset_fl {
160         my $fh = shift;
161         my $flags = shift;
162         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163         fcntl($fh, F_SETFL, $flags) if defined $flags;
164 }
165
166 sub set_block {
167         my $fh = shift;
168
169         my      $flags = fcntl($fh, F_GETFL, 0)
170                 or die "Can't get flags for the socket: $!\n";
171
172         $flags &= ~O_NONBLOCK;
173
174         fcntl($fh, F_SETFL, $flags)
175                 or die "Can't set flags for the socket: $!\n";
176 }
177
178
179 sub timed_read {
180         my ($self, $timeout) = @_;
181
182         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
183         if( $self->can( "app" ) ) {
184                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
185         }
186
187         # See if there is a complete message in the temp_buffer
188         # that we can return
189         if( $self->{temp_buffer} ) {
190                 my $buffer = $self->{temp_buffer};
191                 my $complete = 0;
192                 $self->{temp_buffer} = '';
193
194                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
195                 $self->{last_tag} = $tag;
196                 $logger->transport("Using tag: $tag  ", INTERNAL);
197
198                 if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
199                         $buffer = $1;
200                         $self->{temp_buffer} = $2;
201                         $complete++;
202                         $logger->transport( "completed read with $buffer", INTERNAL );
203                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
204                         $self->{temp_buffer} = $1;
205                         $complete++;
206                         $logger->transport( "completed read with $buffer", INTERNAL );
207                 } else {
208                         $self->{temp_buffer} = $buffer;
209                 }
210                                 
211                 if( $buffer and $complete ) {
212                         return $buffer;
213                 }
214
215         }
216         ############
217
218         my $fh = $self->{_socket};
219
220         unless( $fh and $fh->connected ) {
221                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
222         }
223
224         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
225
226         my $flags;
227         if (defined($timeout) && !$timeout) {
228                 $flags = set_nonblock( $fh );
229         }
230
231         $timeout ||= 0;
232         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
233
234
235         my $complete = 0;
236         my $first_read = 1;
237         my $xml = '';
238         eval {
239                 my $tag = '';
240                 eval {
241                         no warnings;
242                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
243
244                         # alarm needs a number greater => 1.
245                         my $alarm_timeout = $timeout;
246                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
247                                 $alarm_timeout = 1;
248                         }
249                         alarm $alarm_timeout;
250                         do {    
251
252                                 my $buffer = $self->{temp_buffer};
253                                 $self->{temp_buffer} = '';
254                                 #####
255
256                                 my $ff =  fcntl($fh, F_GETFL, 0);
257                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
258                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
259                                 }
260
261                                 my $t_buf = "";
262                                 my $read_size = 1024; my $f = 0;
263                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
264
265                                         unless( $fh->connected ) {
266                                                 OpenSRF::EX::JabberDisconnected->throw(
267                                                         "Lost jabber client in timed_read()");
268                                         }
269
270                                         $buffer .= $t_buf;
271                                         if( $n < $read_size ) {
272                                                 #reset_fl( $fh, $f ) if $f;
273                                                 set_block( $fh );
274                                                 last;
275                                         }
276                                         # see if there is any more data to grab...
277                                         $f = set_nonblock( $fh );
278                                 }
279
280                                 #sysread($fh, $buffer, 2048, length($buffer) );
281                                 #sysread( $fh, $t_buf, 2048 );
282                                 #$buffer .= $t_buf;
283
284                                 #####
285                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
286
287                                 if ($first_read) {
288                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
289                                         ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
290                                         $self->{last_tag} = $tag;
291                                         $first_read--;
292                                         $logger->transport("Using tag: $tag  ", INTERNAL);
293                                 }
294
295                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
296                                         $buffer = $1;
297                                         $self->{temp_buffer} = $2;
298                                         $complete++;
299                                         $logger->transport( "completed read with $buffer", INTERNAL );
300                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
301                                         $self->{temp_buffer} = $1;
302                                         $complete++;
303                                         $logger->transport( "completed read with $buffer", INTERNAL );
304                                 }
305                                 
306                                 $xml .= $buffer;
307
308                         } while (!$complete && $xml);
309                         alarm(0);
310                 };
311                 alarm(0);
312         };
313
314         $logger->transport( "XML Read: $xml", INTERNAL );
315         #reset_fl( $fh, $flags) if defined $flags;
316         set_block( $fh ) if defined $flags;
317
318         if ($complete) {
319                 return $xml;
320         }
321         if( $@ ) {
322                 return undef;
323         }
324         return "";
325 }
326
327
328 # -------------------------------------------------
329
330 sub tcp_connected {
331
332         my $self = shift;
333         return 1 if ($self->{_socket} and $self->{_socket}->connected);
334         return 0;
335 }
336
337 sub password {
338         my( $self, $password ) = @_;
339         $self->{'oils:password'} = $password if $password;
340         return $self->{'oils:password'};
341 }
342
343 # -------------------------------------------------
344
345 sub username {
346         my( $self, $username ) = @_;
347         $self->{'oils:username'} = $username if $username;
348         return $self->{'oils:username'};
349 }
350         
351 # -------------------------------------------------
352
353 sub resource {
354         my( $self, $resource ) = @_;
355         $self->{'oils:resource'} = $resource if $resource;
356         return $self->{'oils:resource'};
357 }
358
359 # -------------------------------------------------
360
361 sub jid {
362         my( $self, $jid ) = @_;
363         $self->{'oils:jid'} = $jid if $jid;
364         return $self->{'oils:jid'};
365 }
366
367 sub port {
368         my( $self, $port ) = @_;
369         $self->{'oils:port'} = $port if $port;
370         return $self->{'oils:port'};
371 }
372
373 sub host {
374         my( $self, $host ) = @_;
375         $self->{'oils:host'} = $host if $host;
376         return $self->{'oils:host'};
377 }
378
379 # -------------------------------------------------
380
381 =head2 send()
382
383         Sends a Jabber message.
384         
385         %params:
386                 to                      - The JID of the recipient
387                 thread  - The Jabber thread
388                 body            - The body of the message
389
390 =cut
391
392 sub send {
393         my $self = shift;
394         my %params = @_;
395
396         my $to = $params{'to'} || return undef;
397         my $body = $params{'body'} || return undef;
398         my $thread = $params{'thread'} || "";
399         my $router_command = $params{'router_command'} || "";
400         my $router_class = $params{'router_class'} || "";
401
402         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
403
404         $msg->setTo( $to );
405         $msg->setThread( $thread ) if $thread;
406         $msg->setBody( $body );
407         $msg->set_router_command( $router_command );
408         $msg->set_router_class( $router_class );
409
410
411         $logger->transport( 
412                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
413
414         my $soc = $self->{_socket};
415         unless( $soc and $soc->connected ) {
416                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
417         }
418         print $soc $msg->toString;
419
420         $logger->transport( 
421                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
422 }
423
424
425 =head2 inintialize()
426
427 Connect to the server and log in.  
428
429 Throws an OpenSRF::EX::JabberException if we cannot connect
430 to the server or if the authentication fails.
431
432 =cut
433
434 # --- The logging lines have been commented out until we decide 
435 # on which log files we're using.
436
437 sub initialize {
438
439         my $self = shift;
440
441         my $jid         = $self->jid; 
442         my $host        = $self->host; 
443         my $port        = $self->port; 
444         my $username    = $self->username;
445         my $resource    = $self->resource;
446         my $password    = $self->password;
447
448         my $stream = <<"        XML";
449 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
450         XML
451
452         my $conf = OpenSRF::Utils::Config->current;
453         my $tail = "_$$";
454         if(!$conf->bootstrap->router_name && $username eq "router") {
455                 $tail = "";
456         }
457
458         my $auth = <<"  XML";
459 <iq id='123' type='set'>
460 <query xmlns='jabber:iq:auth'>
461 <username>$username</username>
462 <password>$password</password>
463 <resource>${resource}$tail</resource>
464 </query>
465 </iq>
466         XML
467
468         my $sock_type = 'IO::Socket::INET';
469         
470         # if port is a string, then we're connecting to a UNIX socket
471         unless( $port =~ /^\d+$/ ) {
472                 $sock_type = 'IO::Socket::UNIX';
473         }
474
475         # --- 5 tries to connect to the jabber server
476         my $socket;
477         for(1..5) {
478                 $socket = $sock_type->new( PeerHost => $host,
479                                            PeerPort => $port,
480                                            Peer => $port,
481                                            Proto    => 'tcp' );
482                 $logger->debug( "$jid: $_ connect attempt to $host:$port");
483                 last if ( $socket and $socket->connected );
484                 $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
485                 sleep 3;
486         }
487
488         unless ( $socket and $socket->connected ) {
489                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
490         }
491
492         $logger->transport( "Logging into jabber as $jid " .
493                         "from " . ref( $self ), DEBUG );
494
495         print $socket $stream;
496
497         my $buffer;
498         eval {
499                 eval {
500                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
501                         alarm 3;
502                         sysread($socket, $buffer, 4096);
503                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
504                         alarm(0);
505                 };
506                 alarm(0);
507         };
508
509         print $socket $auth;
510
511         if( $socket and $socket->connected() ) {
512                 $self->{_socket} = $socket;
513         } else {
514                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
515         }
516
517
518         $buffer = $self->timed_read(10);
519
520         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
521
522         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
523                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
524         } else {
525                 if( !$buffer ) { $buffer = " "; }
526                 $socket->close;
527                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
528         }
529
530         return $self;
531 }
532
533 sub construct {
534         my( $class, $app ) = @_;
535         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
536         $class->peer_handle( 
537                         $class->new( $app )->initialize() );
538 }
539
540 sub process {
541
542         my( $self, $timeout ) = @_;
543
544         $timeout ||= 0;
545         undef $timeout if ( $timeout == -1 );
546
547         unless( $self->{_socket}->connected ) {
548                 OpenSRF::EX::JabberDisconnected->throw( 
549                   "This JabberClient instance is no longer connected to the server " . 
550                   $self->username . " : " . $self->resource, ERROR );
551         }
552
553         my $val = $self->timed_read( $timeout );
554
555         $timeout = "FOREVER" unless ( defined $timeout );
556         
557         if ( ! defined( $val ) ) {
558                 OpenSRF::EX::Jabber->throw( 
559                   "Call to Client->timed_read( $timeout ) failed", ERROR );
560         } elsif ( ! $val ) {
561                 $logger->transport( 
562                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
563         } elsif ( $val ) {
564                 $logger->transport( 
565                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
566         }
567
568         my $t = $self->{last_tag};
569         my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
570         $val = shift(@msgs);
571
572         if (@msgs) {
573                 my $tmp = $self->{temp_buffer};
574
575                 $self->{temp_buffer} = '';
576                 $self->{temp_buffer} .= $_ for (@msgs);
577                 $self->{temp_buffer} .= $tmp;
578         }
579
580         return $val;
581
582 }
583
584
585 # --------------------------------------------------------------
586 # Sets the socket to O_NONBLOCK, reads all of the data off of
587 # the socket, the restores the sockets flags
588 # Returns 1 on success, 0 if the socket isn't connected
589 # --------------------------------------------------------------
590 sub flush_socket {
591
592         my $self = shift;
593         my $socket = $self->{_socket};
594
595         if( $socket ) {
596
597                 my $buf;
598                 my      $flags = fcntl($socket, F_GETFL, 0);
599
600                 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
601                 while( my $n = sysread( $socket, $buf, 8192 ) ) {
602                         $logger->debug("flush_socket dropped $n bytes of data");
603                         if(!$socket->connected()) {
604                                 $logger->error("flush_socket dropped data on disconnected socket: $buf");
605                         }
606                 }
607                 fcntl($socket, F_SETFL, $flags);
608
609                 return 0 unless $socket->connected();
610
611                 return 1;
612
613         } else {
614
615                 return 0;
616         }
617 }
618
619
620
621 1;