]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
b61dd318c1ce847752c275080d20a3c7846e349c
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
9
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
11 use IO::Socket::INET;
12 use IO::Socket::UNIX;
13
14 =head1 Description
15
16 OpenSRF::Transport::SlimJabber::Client
17
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
20
21   transport->server->sslport # the ssl port
22   transport->server->ssl  # is this ssl?
23
24 =cut
25
26 my $logger = "OpenSRF::Utils::Logger";
27
28 sub DESTROY{
29         my $self = shift;
30         $self->disconnect;
31 }
32
33 sub disconnect{
34         my $self = shift;
35         my $socket = $self->{_socket};
36         if( $socket and $socket->connected() ) {
37                 print $socket "</stream:stream>";
38                 close( $socket );
39         }
40 }
41
42
43 =head2 new()
44
45 Creates a new Client object.
46
47 debug and log_file are not required if you don't care to log the activity, 
48 however all other parameters are.
49
50 %params:
51
52         username
53         resource        
54         password
55         debug    
56         log_file
57
58 =cut
59
60 sub new {
61
62         my( $class, %params ) = @_;
63
64         $class = ref( $class ) || $class;
65
66         my $port                        = $params{'port'}                       || return undef;
67         my $username    = $params{'username'}   || return undef;
68         my $resource    = $params{'resource'}   || return undef;
69         my $password    = $params{'password'}   || return undef;
70         my $host                        = $params{'host'}                       || return undef;
71
72         my $jid = "$username\@$host\/$resource";
73
74         my $self = bless {} => $class;
75
76         $self->jid( $jid );
77         $self->host( $host );
78         $self->port( $port );
79         $self->username( $username );
80         $self->resource( $resource );
81         $self->password( $password );
82         $self->{temp_buffer} = "";
83
84         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
85                         $logger->INFO );
86
87         return $self;
88 }
89
90 # clears the tmp buffer as well as the TCP buffer
91 sub buffer_reset { 
92
93         my $self = shift;
94         $self->{temp_buffer} = ""; 
95
96         my $fh = $self->{_socket};
97         set_nonblock( $fh );
98         my $t_buf = "";
99         while( sysread( $fh, $t_buf, 4096 ) ) {} 
100         set_block( $fh );
101 }
102 # -------------------------------------------------
103
104 =head2 gather()
105
106 Gathers all Jabber messages sitting in the collection queue 
107 and hands them each to their respective callbacks.  This call
108 does not block (calls Process(0))
109
110 =cut
111
112 sub gather { my $self = shift; $self->process( 0 ); }
113
114 # -------------------------------------------------
115
116 =head2 listen()
117
118 Blocks and gathers incoming messages as they arrive.  Does not return
119 unless an error occurs.
120
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
122
123 =cut
124 sub listen {
125         my $self = shift;
126
127         my $sock = $self->unix_sock();
128         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
129         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
130         
131         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132                 unless ($socket->connected);
133                 
134         while(1) {
135                 my $o = $self->process( -1 );
136                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137                 if( ! defined( $o ) ) {
138                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139                 }
140                 print $socket $o;
141
142         }
143         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 }
145
146 sub set_nonblock {
147         my $fh = shift;
148         my      $flags = fcntl($fh, F_GETFL, 0)
149                 or die "Can't get flags for the socket: $!\n";
150
151         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
152
153         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154                 or die "Can't set flags for the socket: $!\n";
155
156         return $flags;
157 }
158
159 sub reset_fl {
160         my $fh = shift;
161         my $flags = shift;
162         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163         fcntl($fh, F_SETFL, $flags) if defined $flags;
164 }
165
166 sub set_block {
167         my $fh = shift;
168
169         my      $flags = fcntl($fh, F_GETFL, 0)
170                 or die "Can't get flags for the socket: $!\n";
171
172         $flags &= ~O_NONBLOCK;
173
174         fcntl($fh, F_SETFL, $flags)
175                 or die "Can't set flags for the socket: $!\n";
176 }
177
178
179 sub timed_read {
180         my ($self, $timeout) = @_;
181
182         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
183         if( $self->can( "app" ) ) {
184                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
185         }
186
187         # See if there is a complete message in the temp_buffer
188         # that we can return
189         if( $self->{temp_buffer} ) {
190                 my $buffer = $self->{temp_buffer};
191                 my $complete = 0;
192                 $self->{temp_buffer} = '';
193
194                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
195                 $self->{last_tag} = $tag;
196                 $logger->transport("Using tag: $tag  ", INTERNAL);
197
198                 if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
199                         $buffer = $1;
200                         $self->{temp_buffer} = $2;
201                         $complete++;
202                         $logger->transport( "completed read with $buffer", INTERNAL );
203                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
204                         $self->{temp_buffer} = $1;
205                         $complete++;
206                         $logger->transport( "completed read with $buffer", INTERNAL );
207                 } else {
208                         $self->{temp_buffer} = $buffer;
209                 }
210                                 
211                 if( $buffer and $complete ) {
212                         return $buffer;
213                 }
214
215         }
216         ############
217
218         my $fh = $self->{_socket};
219
220         unless( $fh and $fh->connected ) {
221                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
222         }
223
224         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
225
226         my $flags;
227         if (defined($timeout) && !$timeout) {
228                 $flags = set_nonblock( $fh );
229         }
230
231         $timeout ||= 0;
232         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
233
234
235         my $complete = 0;
236         my $first_read = 1;
237         my $xml = '';
238         eval {
239                 my $tag = '';
240                 eval {
241                         no warnings;
242                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
243
244                         # alarm needs a number greater => 1.
245                         my $alarm_timeout = $timeout;
246                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
247                                 $alarm_timeout = 1;
248                         }
249                         alarm $alarm_timeout;
250                         do {    
251
252                                 my $buffer = $self->{temp_buffer};
253                                 $self->{temp_buffer} = '';
254                                 #####
255
256                                 my $ff =  fcntl($fh, F_GETFL, 0);
257                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
258                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
259                                 }
260
261                                 my $t_buf = "";
262                                 my $read_size = 1024; my $f = 0;
263                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
264
265                                         unless( $fh->connected ) {
266                                                 OpenSRF::EX::JabberDisconnected->throw(
267                                                         "Lost jabber client in timed_read()");
268                                         }
269
270                                         # XXX Change me to debug/internal at some point, this is for testing...
271                                         $logger->info("timed_read() read $n bytes of data");
272
273                                         $buffer .= $t_buf;
274                                         if( $n < $read_size ) {
275                                                 #reset_fl( $fh, $f ) if $f;
276                                                 set_block( $fh );
277                                                 last;
278                                         }
279                                         # see if there is any more data to grab...
280                                         $f = set_nonblock( $fh );
281                                 }
282
283                                 #sysread($fh, $buffer, 2048, length($buffer) );
284                                 #sysread( $fh, $t_buf, 2048 );
285                                 #$buffer .= $t_buf;
286
287                                 #####
288                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
289
290                                 if ($first_read) {
291                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
292                                         ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
293                                         $self->{last_tag} = $tag;
294                                         $first_read--;
295                                         $logger->transport("Using tag: $tag  ", INTERNAL);
296                                 }
297
298                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
299                                         $buffer = $1;
300                                         $self->{temp_buffer} = $2;
301                                         $complete++;
302                                         $logger->transport( "completed read with $buffer", INTERNAL );
303                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
304                                         $self->{temp_buffer} = $1;
305                                         $complete++;
306                                         $logger->transport( "completed read with $buffer", INTERNAL );
307                                 }
308                                 
309                                 $xml .= $buffer;
310
311                         } while (!$complete && $xml);
312                         alarm(0);
313                 };
314                 alarm(0);
315         };
316
317         $logger->transport( "XML Read: $xml", INTERNAL );
318         #reset_fl( $fh, $flags) if defined $flags;
319         set_block( $fh ) if defined $flags;
320
321         if ($complete) {
322                 return $xml;
323         }
324         if( $@ ) {
325                 return undef;
326         }
327         return "";
328 }
329
330
331 # -------------------------------------------------
332
333 sub tcp_connected {
334
335         my $self = shift;
336         return 1 if ($self->{_socket} and $self->{_socket}->connected);
337         return 0;
338 }
339
340 sub password {
341         my( $self, $password ) = @_;
342         $self->{'oils:password'} = $password if $password;
343         return $self->{'oils:password'};
344 }
345
346 # -------------------------------------------------
347
348 sub username {
349         my( $self, $username ) = @_;
350         $self->{'oils:username'} = $username if $username;
351         return $self->{'oils:username'};
352 }
353         
354 # -------------------------------------------------
355
356 sub resource {
357         my( $self, $resource ) = @_;
358         $self->{'oils:resource'} = $resource if $resource;
359         return $self->{'oils:resource'};
360 }
361
362 # -------------------------------------------------
363
364 sub jid {
365         my( $self, $jid ) = @_;
366         $self->{'oils:jid'} = $jid if $jid;
367         return $self->{'oils:jid'};
368 }
369
370 sub port {
371         my( $self, $port ) = @_;
372         $self->{'oils:port'} = $port if $port;
373         return $self->{'oils:port'};
374 }
375
376 sub host {
377         my( $self, $host ) = @_;
378         $self->{'oils:host'} = $host if $host;
379         return $self->{'oils:host'};
380 }
381
382 # -------------------------------------------------
383
384 =head2 send()
385
386         Sends a Jabber message.
387         
388         %params:
389                 to                      - The JID of the recipient
390                 thread  - The Jabber thread
391                 body            - The body of the message
392
393 =cut
394
395 sub send {
396         my $self = shift;
397         my %params = @_;
398
399         my $to = $params{'to'} || return undef;
400         my $body = $params{'body'} || return undef;
401         my $thread = $params{'thread'} || "";
402         my $router_command = $params{'router_command'} || "";
403         my $router_class = $params{'router_class'} || "";
404
405         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
406
407         $msg->setTo( $to );
408         $msg->setThread( $thread ) if $thread;
409         $msg->setBody( $body );
410         $msg->set_router_command( $router_command );
411         $msg->set_router_class( $router_class );
412
413
414         $logger->transport( 
415                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
416
417         my $soc = $self->{_socket};
418         unless( $soc and $soc->connected ) {
419                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
420         }
421         print $soc $msg->toString;
422
423         $logger->transport( 
424                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
425 }
426
427
428 =head2 inintialize()
429
430 Connect to the server and log in.  
431
432 Throws an OpenSRF::EX::JabberException if we cannot connect
433 to the server or if the authentication fails.
434
435 =cut
436
437 # --- The logging lines have been commented out until we decide 
438 # on which log files we're using.
439
440 sub initialize {
441
442         my $self = shift;
443
444         my $jid         = $self->jid; 
445         my $host        = $self->host; 
446         my $port        = $self->port; 
447         my $username    = $self->username;
448         my $resource    = $self->resource;
449         my $password    = $self->password;
450
451         my $stream = <<"        XML";
452 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
453         XML
454
455         my $conf = OpenSRF::Utils::Config->current;
456         my $tail = "_$$";
457         if(!$conf->bootstrap->router_name && $username eq "router") {
458                 $tail = "";
459         }
460
461         my $auth = <<"  XML";
462 <iq id='123' type='set'>
463 <query xmlns='jabber:iq:auth'>
464 <username>$username</username>
465 <password>$password</password>
466 <resource>${resource}$tail</resource>
467 </query>
468 </iq>
469         XML
470
471         my $sock_type = 'IO::Socket::INET';
472         
473         # if port is a string, then we're connecting to a UNIX socket
474         unless( $port =~ /^\d+$/ ) {
475                 $sock_type = 'IO::Socket::UNIX';
476         }
477
478         # --- 5 tries to connect to the jabber server
479         my $socket;
480         for(1..5) {
481                 $socket = $sock_type->new( PeerHost => $host,
482                                            PeerPort => $port,
483                                            Peer => $port,
484                                            Proto    => 'tcp' );
485                 $logger->debug( "$jid: $_ connect attempt to $host:$port");
486                 last if ( $socket and $socket->connected );
487                 $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
488                 sleep 3;
489         }
490
491         unless ( $socket and $socket->connected ) {
492                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
493         }
494
495         $logger->transport( "Logging into jabber as $jid " .
496                         "from " . ref( $self ), DEBUG );
497
498         print $socket $stream;
499
500         my $buffer;
501         eval {
502                 eval {
503                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
504                         alarm 3;
505                         sysread($socket, $buffer, 4096);
506                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
507                         alarm(0);
508                 };
509                 alarm(0);
510         };
511
512         print $socket $auth;
513
514         if( $socket and $socket->connected() ) {
515                 $self->{_socket} = $socket;
516         } else {
517                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
518         }
519
520
521         $buffer = $self->timed_read(10);
522
523         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
524
525         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
526                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
527         } else {
528                 if( !$buffer ) { $buffer = " "; }
529                 $socket->close;
530                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
531         }
532
533         return $self;
534 }
535
536 sub construct {
537         my( $class, $app ) = @_;
538         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
539         $class->peer_handle( 
540                         $class->new( $app )->initialize() );
541 }
542
543 sub process {
544
545         my( $self, $timeout ) = @_;
546
547         $timeout ||= 0;
548         undef $timeout if ( $timeout == -1 );
549
550         unless( $self->{_socket}->connected ) {
551                 OpenSRF::EX::JabberDisconnected->throw( 
552                   "This JabberClient instance is no longer connected to the server " . 
553                   $self->username . " : " . $self->resource, ERROR );
554         }
555
556         my $val = $self->timed_read( $timeout );
557
558         $timeout = "FOREVER" unless ( defined $timeout );
559         
560         if ( ! defined( $val ) ) {
561                 OpenSRF::EX::Jabber->throw( 
562                   "Call to Client->timed_read( $timeout ) failed", ERROR );
563         } elsif ( ! $val ) {
564                 $logger->transport( 
565                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
566         } elsif ( $val ) {
567                 $logger->transport( 
568                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
569         }
570
571         my $t = $self->{last_tag};
572
573         if( $t and $val ) {
574                 my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
575                 $val = shift(@msgs);
576         
577                 if (@msgs) {
578                         my $tmp = $self->{temp_buffer};
579         
580                         $self->{temp_buffer} = '';
581                         $self->{temp_buffer} .= $_ for (@msgs);
582                         $self->{temp_buffer} .= $tmp;
583                 }
584         }
585
586         return $val;
587 }
588
589
590 # --------------------------------------------------------------
591 # Sets the socket to O_NONBLOCK, reads all of the data off of
592 # the socket, the restores the sockets flags
593 # Returns 1 on success, 0 if the socket isn't connected
594 # --------------------------------------------------------------
595 sub flush_socket {
596
597         my $self = shift;
598         my $socket = $self->{_socket};
599
600         if( $socket ) {
601
602                 my $buf;
603                 my      $flags = fcntl($socket, F_GETFL, 0);
604
605                 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
606                 while( my $n = sysread( $socket, $buf, 8192 ) ) {
607                         $logger->debug("flush_socket dropped $n bytes of data");
608                         if(!$socket->connected()) {
609                                 $logger->error("flush_socket dropped data on disconnected socket: $buf");
610                         }
611                 }
612                 fcntl($socket, F_SETFL, $flags);
613
614                 return 0 unless $socket->connected();
615
616                 return 1;
617
618         } else {
619
620                 return 0;
621         }
622 }
623
624
625
626 1;