Initial revision
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 #use Net::Jabber qw( Client );
5 use base qw( OpenSRF );
6 use OpenSRF::Utils::Logger qw(:level);
7 use Time::HiRes qw(ualarm);
8
9 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
10 use IO::Socket::INET;
11
12 =head1 Description
13
14 OpenSRF::Transport::SlimJabber::Client
15
16 Home-brewed slimmed down jabber connection agent. Supports SSL connections
17 with a config file options:
18
19   transport->server->port # the ssl port
20   transport->server->ssl  # is this ssl?
21
22 =cut
23
24 my $logger = "OpenSRF::Utils::Logger";
25
26 sub DESTROY{
27         my $self = shift;
28         my $socket = $self->{_socket};
29         if( $socket and $socket->connected() ) {
30                 print $socket "</stream:stream>";
31                 close( $socket );
32         }
33 };
34
35
36 =head2 new()
37
38 Creates a new Client object.
39
40 debug and log_file are not required if you don't care to log the activity, 
41 however all other parameters are.
42
43 %params:
44
45         username
46         resource        
47         password
48         debug    
49         log_file
50
51 =cut
52
53 sub new {
54
55         my( $class, %params ) = @_;
56
57         $class = ref( $class ) || $class;
58
59         my $conf = OpenSRF::Utils::Config->current;
60
61         my $host                        = $conf->transport->server->primary;
62         my $port                        = $conf->transport->server->port;
63         my $username    = $params{'username'}   || return undef;
64         my $resource    = $params{'resource'}   || return undef;
65         my $password    = $params{'password'}   || return undef;
66
67         my $jid = "$username\@$host\/$resource";
68
69
70         my $self = bless {} => $class;
71
72         $self->jid( $jid );
73         $self->host( $host );
74         $self->port( $port );
75         $self->username( $username );
76         $self->resource( $resource );
77         $self->password( $password );
78         $self->{temp_buffer} = "";
79
80         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
81                         $logger->INFO );
82
83         return $self;
84 }
85
86 # clears the tmp buffer as well as the TCP buffer
87 sub buffer_reset { 
88
89         my $self = shift;
90         $self->{temp_buffer} = ""; 
91
92         my $fh = $self->{_socket};
93         set_nonblock( $fh );
94         my $t_buf = "";
95         while( sysread( $fh, $t_buf, 4096 ) ) {} 
96         set_block( $fh );
97 }
98 # -------------------------------------------------
99
100 =head2 gather()
101
102 Gathers all Jabber messages sitting in the collection queue 
103 and hands them each to their respective callbacks.  This call
104 does not block (calls Process(0))
105
106 =cut
107
108 sub gather { my $self = shift; $self->process( 0 ); }
109
110 # -------------------------------------------------
111
112 =head2 listen()
113
114 Blocks and gathers incoming messages as they arrive.  Does not return
115 unless an error occurs.
116
117 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
118
119 =cut
120 sub listen {
121         my $self = shift;
122
123         my $sock = $self->unix_sock();
124         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
125         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
126         
127         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
128                 unless ($socket->connected);
129                 
130         while(1) {
131                 my $o = $self->process( -1 );
132                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
133                 if( ! defined( $o ) ) {
134                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
135                 }
136                 print $socket $o;
137
138         }
139         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
140 }
141
142 sub set_nonblock {
143         my $fh = shift;
144         my      $flags = fcntl($fh, F_GETFL, 0)
145                 or die "Can't get flags for the socket: $!\n";
146
147         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
148
149         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
150                 or die "Can't set flags for the socket: $!\n";
151
152         return $flags;
153 }
154
155 sub reset_fl {
156         my $fh = shift;
157         my $flags = shift;
158         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
159         fcntl($fh, F_SETFL, $flags) if defined $flags;
160 }
161
162 sub set_block {
163         my $fh = shift;
164
165         my      $flags = fcntl($fh, F_GETFL, 0)
166                 or die "Can't get flags for the socket: $!\n";
167
168         $flags &= ~O_NONBLOCK;
169
170         fcntl($fh, F_SETFL, $flags)
171                 or die "Can't set flags for the socket: $!\n";
172 }
173
174
175 sub timed_read {
176         my ($self, $timeout) = @_;
177
178         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
179         if( $self->can( "app" ) ) {
180                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
181         }
182
183         # See if there is a complete message in the temp_buffer
184         # that we can return
185         if( $self->{temp_buffer} ) {
186                 my $buffer = $self->{temp_buffer};
187                 my $complete = 0;
188                 $self->{temp_buffer} = '';
189
190                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
191                 $logger->transport("Using tag: $tag  ", INTERNAL);
192
193                 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
194                         $buffer = $1;
195                         $self->{temp_buffer} = $2;
196                         $complete++;
197                         $logger->transport( "completed read with $buffer", INTERNAL );
198                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
199                         $self->{temp_buffer} = $1;
200                         $complete++;
201                         $logger->transport( "completed read with $buffer", INTERNAL );
202                 } else {
203                         $self->{temp_buffer} = $buffer;
204                 }
205                                 
206                 if( $buffer and $complete ) {
207                         return $buffer;
208                 }
209
210         }
211         ############
212
213         my $fh = $self->{_socket};
214
215         unless( $fh and $fh->connected ) {
216                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
217         }
218
219         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
220
221         my $flags;
222         if (defined($timeout) && !$timeout) {
223                 $flags = set_nonblock( $fh );
224         }
225
226         $timeout ||= 0;
227         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
228
229
230         my $complete = 0;
231         my $first_read = 1;
232         my $xml = '';
233         eval {
234                 my $tag = '';
235                 eval {
236                         no warnings;
237                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
238
239                         # alarm needs a number greater => 1.
240                         my $alarm_timeout = $timeout;
241                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
242                                 $alarm_timeout = 1;
243                         }
244                         alarm $alarm_timeout;
245                         do {    
246
247                                 my $buffer = $self->{temp_buffer};
248                                 $self->{temp_buffer} = '';
249                                 #####
250
251                                 my $ff =  fcntl($fh, F_GETFL, 0);
252                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
253                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
254                                 }
255
256                                 my $t_buf = "";
257                                 my $read_size = 1024;
258                                 my $f = 0;
259                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
260                                         $buffer .= $t_buf;
261                                         if( $n < $read_size ) {
262                                                 #reset_fl( $fh, $f ) if $f;
263                                                 set_block( $fh );
264                                                 last;
265                                         }
266                                         # see if there is any more data to grab...
267                                         $f = set_nonblock( $fh );
268                                 }
269
270                                 #sysread($fh, $buffer, 2048, length($buffer) );
271                                 #sysread( $fh, $t_buf, 2048 );
272                                 #$buffer .= $t_buf;
273
274                                 #####
275                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
276
277                                 if ($first_read) {
278                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
279                                         ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
280                                         $first_read--;
281                                         $logger->transport("Using tag: $tag  ", INTERNAL);
282                                 }
283
284                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
285                                         $buffer = $1;
286                                         $self->{temp_buffer} = $2;
287                                         $complete++;
288                                         $logger->transport( "completed read with $buffer", INTERNAL );
289                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
290                                         $self->{temp_buffer} = $1;
291                                         $complete++;
292                                         $logger->transport( "completed read with $buffer", INTERNAL );
293                                 }
294                                 
295                                 $xml .= $buffer;
296
297                         } while (!$complete && $xml);
298                         alarm(0);
299                 };
300                 alarm(0);
301         };
302
303         $logger->transport( "XML Read: $xml", INTERNAL );
304         #reset_fl( $fh, $flags) if defined $flags;
305         set_block( $fh ) if defined $flags;
306
307         if ($complete) {
308                 return $xml;
309         }
310         if( $@ ) {
311                 return undef;
312         }
313         return "";
314 }
315
316
317 # -------------------------------------------------
318
319 sub tcp_connected {
320
321         my $self = shift;
322         return 1 if ($self->{_socket} and $self->{_socket}->connected);
323         return 0;
324 }
325
326 sub password {
327         my( $self, $password ) = @_;
328         $self->{'oils:password'} = $password if $password;
329         return $self->{'oils:password'};
330 }
331
332 # -------------------------------------------------
333
334 sub username {
335         my( $self, $username ) = @_;
336         $self->{'oils:username'} = $username if $username;
337         return $self->{'oils:username'};
338 }
339         
340 # -------------------------------------------------
341
342 sub resource {
343         my( $self, $resource ) = @_;
344         $self->{'oils:resource'} = $resource if $resource;
345         return $self->{'oils:resource'};
346 }
347
348 # -------------------------------------------------
349
350 sub jid {
351         my( $self, $jid ) = @_;
352         $self->{'oils:jid'} = $jid if $jid;
353         return $self->{'oils:jid'};
354 }
355
356 sub port {
357         my( $self, $port ) = @_;
358         $self->{'oils:port'} = $port if $port;
359         return $self->{'oils:port'};
360 }
361
362 sub host {
363         my( $self, $host ) = @_;
364         $self->{'oils:host'} = $host if $host;
365         return $self->{'oils:host'};
366 }
367
368 # -------------------------------------------------
369
370 =head2 send()
371
372         Sends a Jabber message.
373         
374         %params:
375                 to                      - The JID of the recipient
376                 thread  - The Jabber thread
377                 body            - The body of the message
378
379 =cut
380
381 sub send {
382         my $self = shift;
383         my %params = @_;
384
385         my $to = $params{'to'} || return undef;
386         my $body = $params{'body'} || return undef;
387         my $thread = $params{'thread'} || "";
388         my $router_command = $params{'router_command'} || "";
389         my $router_class = $params{'router_class'} || "";
390
391         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
392
393         $msg->setTo( $to );
394         $msg->setThread( $thread ) if $thread;
395         $msg->setBody( $body );
396         $msg->set_router_command( $router_command );
397         $msg->set_router_class( $router_class );
398
399
400         $logger->transport( 
401                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
402
403         my $soc = $self->{_socket};
404         print $soc $msg->toString;
405 }
406
407
408 =head2 inintialize()
409
410 Connect to the server and log in.  
411
412 Throws an OpenSRF::EX::JabberException if we cannot connect
413 to the server or if the authentication fails.
414
415 =cut
416
417 # --- The logging lines have been commented out until we decide 
418 # on which log files we're using.
419
420 sub initialize {
421
422         my $self = shift;
423
424         my $jid         = $self->jid; 
425         my $host        = $self->host; 
426         my $port        = $self->port; 
427         my $username    = $self->username;
428         my $resource    = $self->resource;
429         my $password    = $self->password;
430
431         my $stream = <<"        XML";
432 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
433         XML
434
435         my $auth = <<"  XML";
436 <iq id='123' type='set'>
437 <query xmlns='jabber:iq:auth'>
438 <username>$username</username>
439 <password>$password</password>
440 <resource>$resource</resource>
441 </query>
442 </iq>
443         XML
444
445
446         # --- 5 tries to connect to the jabber server
447         my $socket;
448         for(1..5) {
449                 $logger->transport( "$jid: Attempting to connect to server... (Try # $_)", WARN );
450                 $socket = IO::Socket::INET->new( PeerHost => $host,
451                                                  PeerPort => $port,
452                                                  Proto    => 'tcp' );
453                 last if ( $socket and $socket->connected );
454                 sleep 3;
455         }
456
457         unless ( $socket and $socket->connected ) {
458                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
459         }
460
461         $logger->transport( "Logging into jabber as $jid " .
462                         "from " . ref( $self ), DEBUG );
463
464         print $socket $stream;
465
466         my $buffer;
467         eval {
468                 eval {
469                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
470                         alarm 3;
471                         sysread($socket, $buffer, 4096);
472                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
473                         alarm(0);
474                 };
475                 alarm(0);
476         };
477
478         print $socket $auth;
479
480         if( $socket and $socket->connected() ) {
481                 $self->{_socket} = $socket;
482         } else {
483                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
484         }
485
486
487         $buffer = $self->timed_read(10);
488
489         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
490
491         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
492                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
493         } else {
494                 if( !$buffer ) { $buffer = " "; }
495                 $socket->close;
496                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
497         }
498
499         return $self;
500 }
501
502 sub construct {
503         my( $class, $app ) = @_;
504         $logger->transport("Constructing new Jabber connection for $app", INTERNAL );
505         $class->peer_handle( 
506                         $class->new( $app )->initialize() );
507 }
508
509 sub process {
510
511         my( $self, $timeout ) = @_;
512
513         $timeout ||= 0;
514         undef $timeout if ( $timeout == -1 );
515
516         unless( $self->{_socket}->connected ) {
517                 OpenSRF::EX::JabberDisconnected->throw( 
518                   "This JabberClient instance is no longer connected to the server", ERROR );
519         }
520
521         my $val = $self->timed_read( $timeout );
522
523         $timeout = "FOREVER" unless ( defined $timeout );
524         
525         if ( ! defined( $val ) ) {
526                 OpenSRF::EX::Jabber->throw( 
527                   "Call to Client->timed_read( $timeout ) failed", ERROR );
528         } elsif ( ! $val ) {
529                 $logger->transport( 
530                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", DEBUG );
531         } elsif ( $val ) {
532                 $logger->transport( 
533                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
534         }
535
536         return $val;
537
538 }
539
540
541 1;