]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
adding routerless config support; adding local UNIX domain socket support
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8
9 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
10 use IO::Socket::INET;
11 use IO::Socket::UNIX;
12
13 =head1 Description
14
15 OpenSRF::Transport::SlimJabber::Client
16
17 Home-brewed slimmed down jabber connection agent. Supports SSL connections
18 with a config file options:
19
20   transport->server->sslport # the ssl port
21   transport->server->ssl  # is this ssl?
22
23 =cut
24
25 my $logger = "OpenSRF::Utils::Logger";
26
27 sub DESTROY{
28         my $self = shift;
29         $self->disconnect;
30 }
31
32 sub disconnect{
33         my $self = shift;
34         my $socket = $self->{_socket};
35         if( $socket and $socket->connected() ) {
36                 print $socket "</stream:stream>";
37                 close( $socket );
38         }
39 }
40
41
42 =head2 new()
43
44 Creates a new Client object.
45
46 debug and log_file are not required if you don't care to log the activity, 
47 however all other parameters are.
48
49 %params:
50
51         username
52         resource        
53         password
54         debug    
55         log_file
56
57 =cut
58
59 sub new {
60
61         my( $class, %params ) = @_;
62
63         $class = ref( $class ) || $class;
64
65         my $port                        = $params{'port'}                       || return undef;
66         my $username    = $params{'username'}   || return undef;
67         my $resource    = $params{'resource'}   || return undef;
68         my $password    = $params{'password'}   || return undef;
69         my $host                        = $params{'host'}                       || return undef;
70
71         my $jid = "$username\@$host\/$resource";
72
73         my $self = bless {} => $class;
74
75         $self->jid( $jid );
76         $self->host( $host );
77         $self->port( $port );
78         $self->username( $username );
79         $self->resource( $resource );
80         $self->password( $password );
81         $self->{temp_buffer} = "";
82
83         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
84                         $logger->INFO );
85
86         return $self;
87 }
88
89 # clears the tmp buffer as well as the TCP buffer
90 sub buffer_reset { 
91
92         my $self = shift;
93         $self->{temp_buffer} = ""; 
94
95         my $fh = $self->{_socket};
96         set_nonblock( $fh );
97         my $t_buf = "";
98         while( sysread( $fh, $t_buf, 4096 ) ) {} 
99         set_block( $fh );
100 }
101 # -------------------------------------------------
102
103 =head2 gather()
104
105 Gathers all Jabber messages sitting in the collection queue 
106 and hands them each to their respective callbacks.  This call
107 does not block (calls Process(0))
108
109 =cut
110
111 sub gather { my $self = shift; $self->process( 0 ); }
112
113 # -------------------------------------------------
114
115 =head2 listen()
116
117 Blocks and gathers incoming messages as they arrive.  Does not return
118 unless an error occurs.
119
120 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
121
122 =cut
123 sub listen {
124         my $self = shift;
125
126         my $sock = $self->unix_sock();
127         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
128         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
129         
130         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
131                 unless ($socket->connected);
132                 
133         while(1) {
134                 my $o = $self->process( -1 );
135                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
136                 if( ! defined( $o ) ) {
137                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
138                 }
139                 print $socket $o;
140
141         }
142         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
143 }
144
145 sub set_nonblock {
146         my $fh = shift;
147         my      $flags = fcntl($fh, F_GETFL, 0)
148                 or die "Can't get flags for the socket: $!\n";
149
150         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
151
152         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
153                 or die "Can't set flags for the socket: $!\n";
154
155         return $flags;
156 }
157
158 sub reset_fl {
159         my $fh = shift;
160         my $flags = shift;
161         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
162         fcntl($fh, F_SETFL, $flags) if defined $flags;
163 }
164
165 sub set_block {
166         my $fh = shift;
167
168         my      $flags = fcntl($fh, F_GETFL, 0)
169                 or die "Can't get flags for the socket: $!\n";
170
171         $flags &= ~O_NONBLOCK;
172
173         fcntl($fh, F_SETFL, $flags)
174                 or die "Can't set flags for the socket: $!\n";
175 }
176
177
178 sub timed_read {
179         my ($self, $timeout) = @_;
180
181         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
182         if( $self->can( "app" ) ) {
183                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
184         }
185
186         # See if there is a complete message in the temp_buffer
187         # that we can return
188         if( $self->{temp_buffer} ) {
189                 my $buffer = $self->{temp_buffer};
190                 my $complete = 0;
191                 $self->{temp_buffer} = '';
192
193                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
194                 $logger->transport("Using tag: $tag  ", INTERNAL);
195
196                 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
197                         $buffer = $1;
198                         $self->{temp_buffer} = $2;
199                         $complete++;
200                         $logger->transport( "completed read with $buffer", INTERNAL );
201                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
202                         $self->{temp_buffer} = $1;
203                         $complete++;
204                         $logger->transport( "completed read with $buffer", INTERNAL );
205                 } else {
206                         $self->{temp_buffer} = $buffer;
207                 }
208                                 
209                 if( $buffer and $complete ) {
210                         return $buffer;
211                 }
212
213         }
214         ############
215
216         my $fh = $self->{_socket};
217
218         unless( $fh and $fh->connected ) {
219                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
220         }
221
222         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
223
224         my $flags;
225         if (defined($timeout) && !$timeout) {
226                 $flags = set_nonblock( $fh );
227         }
228
229         $timeout ||= 0;
230         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
231
232
233         my $complete = 0;
234         my $first_read = 1;
235         my $xml = '';
236         eval {
237                 my $tag = '';
238                 eval {
239                         no warnings;
240                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
241
242                         # alarm needs a number greater => 1.
243                         my $alarm_timeout = $timeout;
244                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
245                                 $alarm_timeout = 1;
246                         }
247                         alarm $alarm_timeout;
248                         do {    
249
250                                 my $buffer = $self->{temp_buffer};
251                                 $self->{temp_buffer} = '';
252                                 #####
253
254                                 my $ff =  fcntl($fh, F_GETFL, 0);
255                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
256                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
257                                 }
258
259                                 my $t_buf = "";
260                                 my $read_size = 1024;
261                                 my $f = 0;
262                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
263                                         $buffer .= $t_buf;
264                                         if( $n < $read_size ) {
265                                                 #reset_fl( $fh, $f ) if $f;
266                                                 set_block( $fh );
267                                                 last;
268                                         }
269                                         # see if there is any more data to grab...
270                                         $f = set_nonblock( $fh );
271                                 }
272
273                                 #sysread($fh, $buffer, 2048, length($buffer) );
274                                 #sysread( $fh, $t_buf, 2048 );
275                                 #$buffer .= $t_buf;
276
277                                 #####
278                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
279
280                                 if ($first_read) {
281                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
282                                         ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
283                                         $first_read--;
284                                         $logger->transport("Using tag: $tag  ", INTERNAL);
285                                 }
286
287                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
288                                         $buffer = $1;
289                                         $self->{temp_buffer} = $2;
290                                         $complete++;
291                                         $logger->transport( "completed read with $buffer", INTERNAL );
292                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
293                                         $self->{temp_buffer} = $1;
294                                         $complete++;
295                                         $logger->transport( "completed read with $buffer", INTERNAL );
296                                 }
297                                 
298                                 $xml .= $buffer;
299
300                         } while (!$complete && $xml);
301                         alarm(0);
302                 };
303                 alarm(0);
304         };
305
306         $logger->transport( "XML Read: $xml", INTERNAL );
307         #reset_fl( $fh, $flags) if defined $flags;
308         set_block( $fh ) if defined $flags;
309
310         if ($complete) {
311                 return $xml;
312         }
313         if( $@ ) {
314                 return undef;
315         }
316         return "";
317 }
318
319
320 # -------------------------------------------------
321
322 sub tcp_connected {
323
324         my $self = shift;
325         return 1 if ($self->{_socket} and $self->{_socket}->connected);
326         return 0;
327 }
328
329 sub password {
330         my( $self, $password ) = @_;
331         $self->{'oils:password'} = $password if $password;
332         return $self->{'oils:password'};
333 }
334
335 # -------------------------------------------------
336
337 sub username {
338         my( $self, $username ) = @_;
339         $self->{'oils:username'} = $username if $username;
340         return $self->{'oils:username'};
341 }
342         
343 # -------------------------------------------------
344
345 sub resource {
346         my( $self, $resource ) = @_;
347         $self->{'oils:resource'} = $resource if $resource;
348         return $self->{'oils:resource'};
349 }
350
351 # -------------------------------------------------
352
353 sub jid {
354         my( $self, $jid ) = @_;
355         $self->{'oils:jid'} = $jid if $jid;
356         return $self->{'oils:jid'};
357 }
358
359 sub port {
360         my( $self, $port ) = @_;
361         $self->{'oils:port'} = $port if $port;
362         return $self->{'oils:port'};
363 }
364
365 sub host {
366         my( $self, $host ) = @_;
367         $self->{'oils:host'} = $host if $host;
368         return $self->{'oils:host'};
369 }
370
371 # -------------------------------------------------
372
373 =head2 send()
374
375         Sends a Jabber message.
376         
377         %params:
378                 to                      - The JID of the recipient
379                 thread  - The Jabber thread
380                 body            - The body of the message
381
382 =cut
383
384 sub send {
385         my $self = shift;
386         my %params = @_;
387
388         my $to = $params{'to'} || return undef;
389         my $body = $params{'body'} || return undef;
390         my $thread = $params{'thread'} || "";
391         my $router_command = $params{'router_command'} || "";
392         my $router_class = $params{'router_class'} || "";
393
394         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
395
396         $msg->setTo( $to );
397         $msg->setThread( $thread ) if $thread;
398         $msg->setBody( $body );
399         $msg->set_router_command( $router_command );
400         $msg->set_router_class( $router_class );
401
402
403         $logger->transport( 
404                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
405
406         my $soc = $self->{_socket};
407         unless( $soc and $soc->connected ) {
408                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
409         }
410         print $soc $msg->toString;
411
412         $logger->transport( 
413                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
414 }
415
416
417 =head2 inintialize()
418
419 Connect to the server and log in.  
420
421 Throws an OpenSRF::EX::JabberException if we cannot connect
422 to the server or if the authentication fails.
423
424 =cut
425
426 # --- The logging lines have been commented out until we decide 
427 # on which log files we're using.
428
429 sub initialize {
430
431         my $self = shift;
432
433         my $jid         = $self->jid; 
434         my $host        = $self->host; 
435         my $port        = $self->port; 
436         my $username    = $self->username;
437         my $resource    = $self->resource;
438         my $password    = $self->password;
439
440         my $stream = <<"        XML";
441 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
442         XML
443
444         my $auth = <<"  XML";
445 <iq id='123' type='set'>
446 <query xmlns='jabber:iq:auth'>
447 <username>$username</username>
448 <password>$password</password>
449 <resource>${resource}_$$</resource>
450 </query>
451 </iq>
452         XML
453
454         my $sock_type = 'IO::Socket::INET';
455         unless ($port > 0) {
456                 $sock_type = 'IO::Socket::UNIX';
457         }
458
459         # --- 5 tries to connect to the jabber server
460         my $socket;
461         for(1..5) {
462                 $logger->transport( "$jid: Attempting to connect to server...$host:$port (Try # $_)", WARN );
463                 $socket = $sock_type->new( PeerHost => $host,
464                                            PeerPort => $port,
465                                            Peer => $port,
466                                            Proto    => 'tcp' );
467                 $logger->transport( "$jid: $_ connect attempt to $host:$port", WARN );
468                 last if ( $socket and $socket->connected );
469                 sleep 3;
470         }
471
472         unless ( $socket and $socket->connected ) {
473                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
474         }
475
476         $logger->transport( "Logging into jabber as $jid " .
477                         "from " . ref( $self ), DEBUG );
478
479         print $socket $stream;
480
481         my $buffer;
482         eval {
483                 eval {
484                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
485                         alarm 3;
486                         sysread($socket, $buffer, 4096);
487                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
488                         alarm(0);
489                 };
490                 alarm(0);
491         };
492
493         print $socket $auth;
494
495         if( $socket and $socket->connected() ) {
496                 $self->{_socket} = $socket;
497         } else {
498                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
499         }
500
501
502         $buffer = $self->timed_read(10);
503
504         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
505
506         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
507                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
508         } else {
509                 if( !$buffer ) { $buffer = " "; }
510                 $socket->close;
511                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
512         }
513
514         return $self;
515 }
516
517 sub construct {
518         my( $class, $app ) = @_;
519         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
520         $class->peer_handle( 
521                         $class->new( $app )->initialize() );
522 }
523
524 sub process {
525
526         my( $self, $timeout ) = @_;
527
528         $timeout ||= 0;
529         undef $timeout if ( $timeout == -1 );
530
531         unless( $self->{_socket}->connected ) {
532                 OpenSRF::EX::JabberDisconnected->throw( 
533                   "This JabberClient instance is no longer connected to the server " . 
534                   $self->username . " : " . $self->resource, ERROR );
535         }
536
537         my $val = $self->timed_read( $timeout );
538
539         $timeout = "FOREVER" unless ( defined $timeout );
540         
541         if ( ! defined( $val ) ) {
542                 OpenSRF::EX::Jabber->throw( 
543                   "Call to Client->timed_read( $timeout ) failed", ERROR );
544         } elsif ( ! $val ) {
545                 $logger->transport( 
546                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", DEBUG );
547         } elsif ( $val ) {
548                 $logger->transport( 
549                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
550         }
551
552         return $val;
553
554 }
555
556
557 1;