]> git.evergreen-ils.org Git - Evergreen.git/blob - OpenSRF/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
using new config settings
[Evergreen.git] / OpenSRF / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use Time::HiRes qw(ualarm);
7
8 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
9 use IO::Socket::INET;
10
11 =head1 Description
12
13 OpenSRF::Transport::SlimJabber::Client
14
15 Home-brewed slimmed down jabber connection agent. Supports SSL connections
16 with a config file options:
17
18   transport->server->sslport # the ssl port
19   transport->server->ssl  # is this ssl?
20
21 =cut
22
23 my $logger = "OpenSRF::Utils::Logger";
24
25 sub DESTROY{
26         my $self = shift;
27         my $socket = $self->{_socket};
28         if( $socket and $socket->connected() ) {
29                 print $socket "</stream:stream>";
30                 close( $socket );
31         }
32 };
33
34
35 =head2 new()
36
37 Creates a new Client object.
38
39 debug and log_file are not required if you don't care to log the activity, 
40 however all other parameters are.
41
42 %params:
43
44         username
45         resource        
46         password
47         debug    
48         log_file
49
50 =cut
51
52 sub new {
53
54         my( $class, %params ) = @_;
55
56         $class = ref( $class ) || $class;
57
58         my $port                        = $params{'port'}                       || return undef;
59         my $username    = $params{'username'}   || return undef;
60         my $resource    = $params{'resource'}   || return undef;
61         my $password    = $params{'password'}   || return undef;
62         my $host                        = $params{'host'}                       || return undef;
63
64         my $jid = "$username\@$host\/$resource";
65
66         my $self = bless {} => $class;
67
68         $self->jid( $jid );
69         $self->host( $host );
70         $self->port( $port );
71         $self->username( $username );
72         $self->resource( $resource );
73         $self->password( $password );
74         $self->{temp_buffer} = "";
75
76         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
77                         $logger->INFO );
78
79         return $self;
80 }
81
82 # clears the tmp buffer as well as the TCP buffer
83 sub buffer_reset { 
84
85         my $self = shift;
86         $self->{temp_buffer} = ""; 
87
88         my $fh = $self->{_socket};
89         set_nonblock( $fh );
90         my $t_buf = "";
91         while( sysread( $fh, $t_buf, 4096 ) ) {} 
92         set_block( $fh );
93 }
94 # -------------------------------------------------
95
96 =head2 gather()
97
98 Gathers all Jabber messages sitting in the collection queue 
99 and hands them each to their respective callbacks.  This call
100 does not block (calls Process(0))
101
102 =cut
103
104 sub gather { my $self = shift; $self->process( 0 ); }
105
106 # -------------------------------------------------
107
108 =head2 listen()
109
110 Blocks and gathers incoming messages as they arrive.  Does not return
111 unless an error occurs.
112
113 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
114
115 =cut
116 sub listen {
117         my $self = shift;
118
119         my $sock = $self->unix_sock();
120         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
121         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
122         
123         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
124                 unless ($socket->connected);
125                 
126         while(1) {
127                 my $o = $self->process( -1 );
128                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
129                 if( ! defined( $o ) ) {
130                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
131                 }
132                 print $socket $o;
133
134         }
135         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
136 }
137
138 sub set_nonblock {
139         my $fh = shift;
140         my      $flags = fcntl($fh, F_GETFL, 0)
141                 or die "Can't get flags for the socket: $!\n";
142
143         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
144
145         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
146                 or die "Can't set flags for the socket: $!\n";
147
148         return $flags;
149 }
150
151 sub reset_fl {
152         my $fh = shift;
153         my $flags = shift;
154         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
155         fcntl($fh, F_SETFL, $flags) if defined $flags;
156 }
157
158 sub set_block {
159         my $fh = shift;
160
161         my      $flags = fcntl($fh, F_GETFL, 0)
162                 or die "Can't get flags for the socket: $!\n";
163
164         $flags &= ~O_NONBLOCK;
165
166         fcntl($fh, F_SETFL, $flags)
167                 or die "Can't set flags for the socket: $!\n";
168 }
169
170
171 sub timed_read {
172         my ($self, $timeout) = @_;
173
174         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
175         if( $self->can( "app" ) ) {
176                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
177         }
178
179         # See if there is a complete message in the temp_buffer
180         # that we can return
181         if( $self->{temp_buffer} ) {
182                 my $buffer = $self->{temp_buffer};
183                 my $complete = 0;
184                 $self->{temp_buffer} = '';
185
186                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
187                 $logger->transport("Using tag: $tag  ", INTERNAL);
188
189                 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
190                         $buffer = $1;
191                         $self->{temp_buffer} = $2;
192                         $complete++;
193                         $logger->transport( "completed read with $buffer", INTERNAL );
194                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
195                         $self->{temp_buffer} = $1;
196                         $complete++;
197                         $logger->transport( "completed read with $buffer", INTERNAL );
198                 } else {
199                         $self->{temp_buffer} = $buffer;
200                 }
201                                 
202                 if( $buffer and $complete ) {
203                         return $buffer;
204                 }
205
206         }
207         ############
208
209         my $fh = $self->{_socket};
210
211         unless( $fh and $fh->connected ) {
212                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
213         }
214
215         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
216
217         my $flags;
218         if (defined($timeout) && !$timeout) {
219                 $flags = set_nonblock( $fh );
220         }
221
222         $timeout ||= 0;
223         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
224
225
226         my $complete = 0;
227         my $first_read = 1;
228         my $xml = '';
229         eval {
230                 my $tag = '';
231                 eval {
232                         no warnings;
233                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
234
235                         # alarm needs a number greater => 1.
236                         my $alarm_timeout = $timeout;
237                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
238                                 $alarm_timeout = 1;
239                         }
240                         alarm $alarm_timeout;
241                         do {    
242
243                                 my $buffer = $self->{temp_buffer};
244                                 $self->{temp_buffer} = '';
245                                 #####
246
247                                 my $ff =  fcntl($fh, F_GETFL, 0);
248                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
249                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
250                                 }
251
252                                 my $t_buf = "";
253                                 my $read_size = 1024;
254                                 my $f = 0;
255                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
256                                         $buffer .= $t_buf;
257                                         if( $n < $read_size ) {
258                                                 #reset_fl( $fh, $f ) if $f;
259                                                 set_block( $fh );
260                                                 last;
261                                         }
262                                         # see if there is any more data to grab...
263                                         $f = set_nonblock( $fh );
264                                 }
265
266                                 #sysread($fh, $buffer, 2048, length($buffer) );
267                                 #sysread( $fh, $t_buf, 2048 );
268                                 #$buffer .= $t_buf;
269
270                                 #####
271                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
272
273                                 if ($first_read) {
274                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
275                                         ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
276                                         $first_read--;
277                                         $logger->transport("Using tag: $tag  ", INTERNAL);
278                                 }
279
280                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
281                                         $buffer = $1;
282                                         $self->{temp_buffer} = $2;
283                                         $complete++;
284                                         $logger->transport( "completed read with $buffer", INTERNAL );
285                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
286                                         $self->{temp_buffer} = $1;
287                                         $complete++;
288                                         $logger->transport( "completed read with $buffer", INTERNAL );
289                                 }
290                                 
291                                 $xml .= $buffer;
292
293                         } while (!$complete && $xml);
294                         alarm(0);
295                 };
296                 alarm(0);
297         };
298
299         $logger->transport( "XML Read: $xml", INTERNAL );
300         #reset_fl( $fh, $flags) if defined $flags;
301         set_block( $fh ) if defined $flags;
302
303         if ($complete) {
304                 return $xml;
305         }
306         if( $@ ) {
307                 return undef;
308         }
309         return "";
310 }
311
312
313 # -------------------------------------------------
314
315 sub tcp_connected {
316
317         my $self = shift;
318         return 1 if ($self->{_socket} and $self->{_socket}->connected);
319         return 0;
320 }
321
322 sub password {
323         my( $self, $password ) = @_;
324         $self->{'oils:password'} = $password if $password;
325         return $self->{'oils:password'};
326 }
327
328 # -------------------------------------------------
329
330 sub username {
331         my( $self, $username ) = @_;
332         $self->{'oils:username'} = $username if $username;
333         return $self->{'oils:username'};
334 }
335         
336 # -------------------------------------------------
337
338 sub resource {
339         my( $self, $resource ) = @_;
340         $self->{'oils:resource'} = $resource if $resource;
341         return $self->{'oils:resource'};
342 }
343
344 # -------------------------------------------------
345
346 sub jid {
347         my( $self, $jid ) = @_;
348         $self->{'oils:jid'} = $jid if $jid;
349         return $self->{'oils:jid'};
350 }
351
352 sub port {
353         my( $self, $port ) = @_;
354         $self->{'oils:port'} = $port if $port;
355         return $self->{'oils:port'};
356 }
357
358 sub host {
359         my( $self, $host ) = @_;
360         $self->{'oils:host'} = $host if $host;
361         return $self->{'oils:host'};
362 }
363
364 # -------------------------------------------------
365
366 =head2 send()
367
368         Sends a Jabber message.
369         
370         %params:
371                 to                      - The JID of the recipient
372                 thread  - The Jabber thread
373                 body            - The body of the message
374
375 =cut
376
377 sub send {
378         my $self = shift;
379         my %params = @_;
380
381         my $to = $params{'to'} || return undef;
382         my $body = $params{'body'} || return undef;
383         my $thread = $params{'thread'} || "";
384         my $router_command = $params{'router_command'} || "";
385         my $router_class = $params{'router_class'} || "";
386
387         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
388
389         $msg->setTo( $to );
390         $msg->setThread( $thread ) if $thread;
391         $msg->setBody( $body );
392         $msg->set_router_command( $router_command );
393         $msg->set_router_class( $router_class );
394
395
396         $logger->transport( 
397                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
398
399         my $soc = $self->{_socket};
400         unless( $soc and $soc->connected ) {
401                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
402         }
403         print $soc $msg->toString;
404
405         $logger->transport( 
406                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
407 }
408
409
410 =head2 inintialize()
411
412 Connect to the server and log in.  
413
414 Throws an OpenSRF::EX::JabberException if we cannot connect
415 to the server or if the authentication fails.
416
417 =cut
418
419 # --- The logging lines have been commented out until we decide 
420 # on which log files we're using.
421
422 sub initialize {
423
424         my $self = shift;
425
426         my $jid         = $self->jid; 
427         my $host        = $self->host; 
428         my $port        = $self->port; 
429         my $username    = $self->username;
430         my $resource    = $self->resource;
431         my $password    = $self->password;
432
433         my $stream = <<"        XML";
434 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
435         XML
436
437         my $auth = <<"  XML";
438 <iq id='123' type='set'>
439 <query xmlns='jabber:iq:auth'>
440 <username>$username</username>
441 <password>$password</password>
442 <resource>$resource</resource>
443 </query>
444 </iq>
445         XML
446
447
448         # --- 5 tries to connect to the jabber server
449         my $socket;
450         for(1..5) {
451                 $logger->transport( "$jid: Attempting to connect to server...$host:$port (Try # $_)", WARN );
452                 $socket = IO::Socket::INET->new( PeerHost => $host,
453                                                  PeerPort => $port,
454                                                  Proto    => 'tcp' );
455                 $logger->transport( "$jid: $_ connect attempt to $host:$port", WARN );
456                 last if ( $socket and $socket->connected );
457                 sleep 3;
458         }
459
460         unless ( $socket and $socket->connected ) {
461                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
462         }
463
464         $logger->transport( "Logging into jabber as $jid " .
465                         "from " . ref( $self ), DEBUG );
466
467         print $socket $stream;
468
469         my $buffer;
470         eval {
471                 eval {
472                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
473                         alarm 3;
474                         sysread($socket, $buffer, 4096);
475                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
476                         alarm(0);
477                 };
478                 alarm(0);
479         };
480
481         print $socket $auth;
482
483         if( $socket and $socket->connected() ) {
484                 $self->{_socket} = $socket;
485         } else {
486                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
487         }
488
489
490         $buffer = $self->timed_read(10);
491
492         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
493
494         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
495                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
496         } else {
497                 if( !$buffer ) { $buffer = " "; }
498                 $socket->close;
499                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
500         }
501
502         return $self;
503 }
504
505 sub construct {
506         my( $class, $app ) = @_;
507         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
508         $class->peer_handle( 
509                         $class->new( $app )->initialize() );
510 }
511
512 sub process {
513
514         my( $self, $timeout ) = @_;
515
516         $timeout ||= 0;
517         undef $timeout if ( $timeout == -1 );
518
519         unless( $self->{_socket}->connected ) {
520                 OpenSRF::EX::JabberDisconnected->throw( 
521                   "This JabberClient instance is no longer connected to the server", ERROR );
522         }
523
524         my $val = $self->timed_read( $timeout );
525
526         $timeout = "FOREVER" unless ( defined $timeout );
527         
528         if ( ! defined( $val ) ) {
529                 OpenSRF::EX::Jabber->throw( 
530                   "Call to Client->timed_read( $timeout ) failed", ERROR );
531         } elsif ( ! $val ) {
532                 $logger->transport( 
533                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", DEBUG );
534         } elsif ( $val ) {
535                 $logger->transport( 
536                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
537         }
538
539         return $val;
540
541 }
542
543
544 1;