]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
706699a9c541e8dd57bd7d6888599d5099ad989d
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use Time::HiRes qw(ualarm);
7
8 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
9 use IO::Socket::INET;
10
11 =head1 Description
12
13 OpenSRF::Transport::SlimJabber::Client
14
15 Home-brewed slimmed down jabber connection agent. Supports SSL connections
16 with a config file options:
17
18   transport->server->sslport # the ssl port
19   transport->server->ssl  # is this ssl?
20
21 =cut
22
23 my $logger = "OpenSRF::Utils::Logger";
24
25 sub DESTROY{
26         my $self = shift;
27         $self->disconnect;
28 }
29
30 sub disconnect{
31         my $self = shift;
32         my $socket = $self->{_socket};
33         if( $socket and $socket->connected() ) {
34                 print $socket "</stream:stream>";
35                 close( $socket );
36         }
37 }
38
39
40 =head2 new()
41
42 Creates a new Client object.
43
44 debug and log_file are not required if you don't care to log the activity, 
45 however all other parameters are.
46
47 %params:
48
49         username
50         resource        
51         password
52         debug    
53         log_file
54
55 =cut
56
57 sub new {
58
59         my( $class, %params ) = @_;
60
61         $class = ref( $class ) || $class;
62
63         my $port                        = $params{'port'}                       || return undef;
64         my $username    = $params{'username'}   || return undef;
65         my $resource    = $params{'resource'}   || return undef;
66         my $password    = $params{'password'}   || return undef;
67         my $host                        = $params{'host'}                       || return undef;
68
69         my $jid = "$username\@$host\/$resource";
70
71         my $self = bless {} => $class;
72
73         $self->jid( $jid );
74         $self->host( $host );
75         $self->port( $port );
76         $self->username( $username );
77         $self->resource( $resource );
78         $self->password( $password );
79         $self->{temp_buffer} = "";
80
81         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
82                         $logger->INFO );
83
84         return $self;
85 }
86
87 # clears the tmp buffer as well as the TCP buffer
88 sub buffer_reset { 
89
90         my $self = shift;
91         $self->{temp_buffer} = ""; 
92
93         my $fh = $self->{_socket};
94         set_nonblock( $fh );
95         my $t_buf = "";
96         while( sysread( $fh, $t_buf, 4096 ) ) {} 
97         set_block( $fh );
98 }
99 # -------------------------------------------------
100
101 =head2 gather()
102
103 Gathers all Jabber messages sitting in the collection queue 
104 and hands them each to their respective callbacks.  This call
105 does not block (calls Process(0))
106
107 =cut
108
109 sub gather { my $self = shift; $self->process( 0 ); }
110
111 # -------------------------------------------------
112
113 =head2 listen()
114
115 Blocks and gathers incoming messages as they arrive.  Does not return
116 unless an error occurs.
117
118 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
119
120 =cut
121 sub listen {
122         my $self = shift;
123
124         my $sock = $self->unix_sock();
125         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
126         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
127         
128         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
129                 unless ($socket->connected);
130                 
131         while(1) {
132                 my $o = $self->process( -1 );
133                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
134                 if( ! defined( $o ) ) {
135                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
136                 }
137                 print $socket $o;
138
139         }
140         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
141 }
142
143 sub set_nonblock {
144         my $fh = shift;
145         my      $flags = fcntl($fh, F_GETFL, 0)
146                 or die "Can't get flags for the socket: $!\n";
147
148         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
149
150         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
151                 or die "Can't set flags for the socket: $!\n";
152
153         return $flags;
154 }
155
156 sub reset_fl {
157         my $fh = shift;
158         my $flags = shift;
159         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
160         fcntl($fh, F_SETFL, $flags) if defined $flags;
161 }
162
163 sub set_block {
164         my $fh = shift;
165
166         my      $flags = fcntl($fh, F_GETFL, 0)
167                 or die "Can't get flags for the socket: $!\n";
168
169         $flags &= ~O_NONBLOCK;
170
171         fcntl($fh, F_SETFL, $flags)
172                 or die "Can't set flags for the socket: $!\n";
173 }
174
175
176 sub timed_read {
177         my ($self, $timeout) = @_;
178
179         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
180         if( $self->can( "app" ) ) {
181                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
182         }
183
184         # See if there is a complete message in the temp_buffer
185         # that we can return
186         if( $self->{temp_buffer} ) {
187                 my $buffer = $self->{temp_buffer};
188                 my $complete = 0;
189                 $self->{temp_buffer} = '';
190
191                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
192                 $logger->transport("Using tag: $tag  ", INTERNAL);
193
194                 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
195                         $buffer = $1;
196                         $self->{temp_buffer} = $2;
197                         $complete++;
198                         $logger->transport( "completed read with $buffer", INTERNAL );
199                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
200                         $self->{temp_buffer} = $1;
201                         $complete++;
202                         $logger->transport( "completed read with $buffer", INTERNAL );
203                 } else {
204                         $self->{temp_buffer} = $buffer;
205                 }
206                                 
207                 if( $buffer and $complete ) {
208                         return $buffer;
209                 }
210
211         }
212         ############
213
214         my $fh = $self->{_socket};
215
216         unless( $fh and $fh->connected ) {
217                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
218         }
219
220         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
221
222         my $flags;
223         if (defined($timeout) && !$timeout) {
224                 $flags = set_nonblock( $fh );
225         }
226
227         $timeout ||= 0;
228         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
229
230
231         my $complete = 0;
232         my $first_read = 1;
233         my $xml = '';
234         eval {
235                 my $tag = '';
236                 eval {
237                         no warnings;
238                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
239
240                         # alarm needs a number greater => 1.
241                         my $alarm_timeout = $timeout;
242                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
243                                 $alarm_timeout = 1;
244                         }
245                         alarm $alarm_timeout;
246                         do {    
247
248                                 my $buffer = $self->{temp_buffer};
249                                 $self->{temp_buffer} = '';
250                                 #####
251
252                                 my $ff =  fcntl($fh, F_GETFL, 0);
253                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
254                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
255                                 }
256
257                                 my $t_buf = "";
258                                 my $read_size = 1024;
259                                 my $f = 0;
260                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
261                                         $buffer .= $t_buf;
262                                         if( $n < $read_size ) {
263                                                 #reset_fl( $fh, $f ) if $f;
264                                                 set_block( $fh );
265                                                 last;
266                                         }
267                                         # see if there is any more data to grab...
268                                         $f = set_nonblock( $fh );
269                                 }
270
271                                 #sysread($fh, $buffer, 2048, length($buffer) );
272                                 #sysread( $fh, $t_buf, 2048 );
273                                 #$buffer .= $t_buf;
274
275                                 #####
276                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
277
278                                 if ($first_read) {
279                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
280                                         ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
281                                         $first_read--;
282                                         $logger->transport("Using tag: $tag  ", INTERNAL);
283                                 }
284
285                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
286                                         $buffer = $1;
287                                         $self->{temp_buffer} = $2;
288                                         $complete++;
289                                         $logger->transport( "completed read with $buffer", INTERNAL );
290                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
291                                         $self->{temp_buffer} = $1;
292                                         $complete++;
293                                         $logger->transport( "completed read with $buffer", INTERNAL );
294                                 }
295                                 
296                                 $xml .= $buffer;
297
298                         } while (!$complete && $xml);
299                         alarm(0);
300                 };
301                 alarm(0);
302         };
303
304         $logger->transport( "XML Read: $xml", INTERNAL );
305         #reset_fl( $fh, $flags) if defined $flags;
306         set_block( $fh ) if defined $flags;
307
308         if ($complete) {
309                 return $xml;
310         }
311         if( $@ ) {
312                 return undef;
313         }
314         return "";
315 }
316
317
318 # -------------------------------------------------
319
320 sub tcp_connected {
321
322         my $self = shift;
323         return 1 if ($self->{_socket} and $self->{_socket}->connected);
324         return 0;
325 }
326
327 sub password {
328         my( $self, $password ) = @_;
329         $self->{'oils:password'} = $password if $password;
330         return $self->{'oils:password'};
331 }
332
333 # -------------------------------------------------
334
335 sub username {
336         my( $self, $username ) = @_;
337         $self->{'oils:username'} = $username if $username;
338         return $self->{'oils:username'};
339 }
340         
341 # -------------------------------------------------
342
343 sub resource {
344         my( $self, $resource ) = @_;
345         $self->{'oils:resource'} = $resource if $resource;
346         return $self->{'oils:resource'};
347 }
348
349 # -------------------------------------------------
350
351 sub jid {
352         my( $self, $jid ) = @_;
353         $self->{'oils:jid'} = $jid if $jid;
354         return $self->{'oils:jid'};
355 }
356
357 sub port {
358         my( $self, $port ) = @_;
359         $self->{'oils:port'} = $port if $port;
360         return $self->{'oils:port'};
361 }
362
363 sub host {
364         my( $self, $host ) = @_;
365         $self->{'oils:host'} = $host if $host;
366         return $self->{'oils:host'};
367 }
368
369 # -------------------------------------------------
370
371 =head2 send()
372
373         Sends a Jabber message.
374         
375         %params:
376                 to                      - The JID of the recipient
377                 thread  - The Jabber thread
378                 body            - The body of the message
379
380 =cut
381
382 sub send {
383         my $self = shift;
384         my %params = @_;
385
386         my $to = $params{'to'} || return undef;
387         my $body = $params{'body'} || return undef;
388         my $thread = $params{'thread'} || "";
389         my $router_command = $params{'router_command'} || "";
390         my $router_class = $params{'router_class'} || "";
391
392         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
393
394         $msg->setTo( $to );
395         $msg->setThread( $thread ) if $thread;
396         $msg->setBody( $body );
397         $msg->set_router_command( $router_command );
398         $msg->set_router_class( $router_class );
399
400
401         $logger->transport( 
402                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
403
404         my $soc = $self->{_socket};
405         unless( $soc and $soc->connected ) {
406                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
407         }
408         print $soc $msg->toString;
409
410         $logger->transport( 
411                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
412 }
413
414
415 =head2 inintialize()
416
417 Connect to the server and log in.  
418
419 Throws an OpenSRF::EX::JabberException if we cannot connect
420 to the server or if the authentication fails.
421
422 =cut
423
424 # --- The logging lines have been commented out until we decide 
425 # on which log files we're using.
426
427 sub initialize {
428
429         my $self = shift;
430
431         my $jid         = $self->jid; 
432         my $host        = $self->host; 
433         my $port        = $self->port; 
434         my $username    = $self->username;
435         my $resource    = $self->resource;
436         my $password    = $self->password;
437
438         my $stream = <<"        XML";
439 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
440         XML
441
442         my $auth = <<"  XML";
443 <iq id='123' type='set'>
444 <query xmlns='jabber:iq:auth'>
445 <username>$username</username>
446 <password>$password</password>
447 <resource>${resource}_$$</resource>
448 </query>
449 </iq>
450         XML
451
452
453         # --- 5 tries to connect to the jabber server
454         my $socket;
455         for(1..5) {
456                 $logger->transport( "$jid: Attempting to connect to server...$host:$port (Try # $_)", WARN );
457                 $socket = IO::Socket::INET->new( PeerHost => $host,
458                                                  PeerPort => $port,
459                                                  Proto    => 'tcp' );
460                 $logger->transport( "$jid: $_ connect attempt to $host:$port", WARN );
461                 last if ( $socket and $socket->connected );
462                 sleep 3;
463         }
464
465         unless ( $socket and $socket->connected ) {
466                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
467         }
468
469         $logger->transport( "Logging into jabber as $jid " .
470                         "from " . ref( $self ), DEBUG );
471
472         print $socket $stream;
473
474         my $buffer;
475         eval {
476                 eval {
477                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
478                         alarm 3;
479                         sysread($socket, $buffer, 4096);
480                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
481                         alarm(0);
482                 };
483                 alarm(0);
484         };
485
486         print $socket $auth;
487
488         if( $socket and $socket->connected() ) {
489                 $self->{_socket} = $socket;
490         } else {
491                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
492         }
493
494
495         $buffer = $self->timed_read(10);
496
497         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
498
499         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
500                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
501         } else {
502                 if( !$buffer ) { $buffer = " "; }
503                 $socket->close;
504                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
505         }
506
507         return $self;
508 }
509
510 sub construct {
511         my( $class, $app ) = @_;
512         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
513         $class->peer_handle( 
514                         $class->new( $app )->initialize() );
515 }
516
517 sub process {
518
519         my( $self, $timeout ) = @_;
520
521         $timeout ||= 0;
522         undef $timeout if ( $timeout == -1 );
523
524         unless( $self->{_socket}->connected ) {
525                 OpenSRF::EX::JabberDisconnected->throw( 
526                   "This JabberClient instance is no longer connected to the server " . 
527                   $self->username . " : " . $self->resource, ERROR );
528         }
529
530         my $val = $self->timed_read( $timeout );
531
532         $timeout = "FOREVER" unless ( defined $timeout );
533         
534         if ( ! defined( $val ) ) {
535                 OpenSRF::EX::Jabber->throw( 
536                   "Call to Client->timed_read( $timeout ) failed", ERROR );
537         } elsif ( ! $val ) {
538                 $logger->transport( 
539                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", DEBUG );
540         } elsif ( $val ) {
541                 $logger->transport( 
542                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
543         }
544
545         return $val;
546
547 }
548
549
550 1;