]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
router-less login mangling
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
9
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
11 use IO::Socket::INET;
12 use IO::Socket::UNIX;
13
14 =head1 Description
15
16 OpenSRF::Transport::SlimJabber::Client
17
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
20
21   transport->server->sslport # the ssl port
22   transport->server->ssl  # is this ssl?
23
24 =cut
25
26 my $logger = "OpenSRF::Utils::Logger";
27
28 sub DESTROY{
29         my $self = shift;
30         $self->disconnect;
31 }
32
33 sub disconnect{
34         my $self = shift;
35         my $socket = $self->{_socket};
36         if( $socket and $socket->connected() ) {
37                 print $socket "</stream:stream>";
38                 close( $socket );
39         }
40 }
41
42
43 =head2 new()
44
45 Creates a new Client object.
46
47 debug and log_file are not required if you don't care to log the activity, 
48 however all other parameters are.
49
50 %params:
51
52         username
53         resource        
54         password
55         debug    
56         log_file
57
58 =cut
59
60 sub new {
61
62         my( $class, %params ) = @_;
63
64         $class = ref( $class ) || $class;
65
66         my $port                        = $params{'port'}                       || return undef;
67         my $username    = $params{'username'}   || return undef;
68         my $resource    = $params{'resource'}   || return undef;
69         my $password    = $params{'password'}   || return undef;
70         my $host                        = $params{'host'}                       || return undef;
71
72         my $jid = "$username\@$host\/$resource";
73
74         my $self = bless {} => $class;
75
76         $self->jid( $jid );
77         $self->host( $host );
78         $self->port( $port );
79         $self->username( $username );
80         $self->resource( $resource );
81         $self->password( $password );
82         $self->{temp_buffer} = "";
83
84         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
85                         $logger->INFO );
86
87         return $self;
88 }
89
90 # clears the tmp buffer as well as the TCP buffer
91 sub buffer_reset { 
92
93         my $self = shift;
94         $self->{temp_buffer} = ""; 
95
96         my $fh = $self->{_socket};
97         set_nonblock( $fh );
98         my $t_buf = "";
99         while( sysread( $fh, $t_buf, 4096 ) ) {} 
100         set_block( $fh );
101 }
102 # -------------------------------------------------
103
104 =head2 gather()
105
106 Gathers all Jabber messages sitting in the collection queue 
107 and hands them each to their respective callbacks.  This call
108 does not block (calls Process(0))
109
110 =cut
111
112 sub gather { my $self = shift; $self->process( 0 ); }
113
114 # -------------------------------------------------
115
116 =head2 listen()
117
118 Blocks and gathers incoming messages as they arrive.  Does not return
119 unless an error occurs.
120
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
122
123 =cut
124 sub listen {
125         my $self = shift;
126
127         my $sock = $self->unix_sock();
128         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
129         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
130         
131         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132                 unless ($socket->connected);
133                 
134         while(1) {
135                 my $o = $self->process( -1 );
136                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137                 if( ! defined( $o ) ) {
138                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139                 }
140                 print $socket $o;
141
142         }
143         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 }
145
146 sub set_nonblock {
147         my $fh = shift;
148         my      $flags = fcntl($fh, F_GETFL, 0)
149                 or die "Can't get flags for the socket: $!\n";
150
151         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
152
153         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154                 or die "Can't set flags for the socket: $!\n";
155
156         return $flags;
157 }
158
159 sub reset_fl {
160         my $fh = shift;
161         my $flags = shift;
162         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163         fcntl($fh, F_SETFL, $flags) if defined $flags;
164 }
165
166 sub set_block {
167         my $fh = shift;
168
169         my      $flags = fcntl($fh, F_GETFL, 0)
170                 or die "Can't get flags for the socket: $!\n";
171
172         $flags &= ~O_NONBLOCK;
173
174         fcntl($fh, F_SETFL, $flags)
175                 or die "Can't set flags for the socket: $!\n";
176 }
177
178
179 sub timed_read {
180         my ($self, $timeout) = @_;
181
182         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
183         if( $self->can( "app" ) ) {
184                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
185         }
186
187         # See if there is a complete message in the temp_buffer
188         # that we can return
189         if( $self->{temp_buffer} ) {
190                 my $buffer = $self->{temp_buffer};
191                 my $complete = 0;
192                 $self->{temp_buffer} = '';
193
194                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
195                 $logger->transport("Using tag: $tag  ", INTERNAL);
196
197                 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
198                         $buffer = $1;
199                         $self->{temp_buffer} = $2;
200                         $complete++;
201                         $logger->transport( "completed read with $buffer", INTERNAL );
202                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
203                         $self->{temp_buffer} = $1;
204                         $complete++;
205                         $logger->transport( "completed read with $buffer", INTERNAL );
206                 } else {
207                         $self->{temp_buffer} = $buffer;
208                 }
209                                 
210                 if( $buffer and $complete ) {
211                         return $buffer;
212                 }
213
214         }
215         ############
216
217         my $fh = $self->{_socket};
218
219         unless( $fh and $fh->connected ) {
220                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
221         }
222
223         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
224
225         my $flags;
226         if (defined($timeout) && !$timeout) {
227                 $flags = set_nonblock( $fh );
228         }
229
230         $timeout ||= 0;
231         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
232
233
234         my $complete = 0;
235         my $first_read = 1;
236         my $xml = '';
237         eval {
238                 my $tag = '';
239                 eval {
240                         no warnings;
241                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
242
243                         # alarm needs a number greater => 1.
244                         my $alarm_timeout = $timeout;
245                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
246                                 $alarm_timeout = 1;
247                         }
248                         alarm $alarm_timeout;
249                         do {    
250
251                                 my $buffer = $self->{temp_buffer};
252                                 $self->{temp_buffer} = '';
253                                 #####
254
255                                 my $ff =  fcntl($fh, F_GETFL, 0);
256                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
257                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
258                                 }
259
260                                 my $t_buf = "";
261                                 my $read_size = 1024;
262                                 my $f = 0;
263                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
264                                         $buffer .= $t_buf;
265                                         if( $n < $read_size ) {
266                                                 #reset_fl( $fh, $f ) if $f;
267                                                 set_block( $fh );
268                                                 last;
269                                         }
270                                         # see if there is any more data to grab...
271                                         $f = set_nonblock( $fh );
272                                 }
273
274                                 #sysread($fh, $buffer, 2048, length($buffer) );
275                                 #sysread( $fh, $t_buf, 2048 );
276                                 #$buffer .= $t_buf;
277
278                                 #####
279                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
280
281                                 if ($first_read) {
282                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
283                                         ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
284                                         $first_read--;
285                                         $logger->transport("Using tag: $tag  ", INTERNAL);
286                                 }
287
288                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
289                                         $buffer = $1;
290                                         $self->{temp_buffer} = $2;
291                                         $complete++;
292                                         $logger->transport( "completed read with $buffer", INTERNAL );
293                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
294                                         $self->{temp_buffer} = $1;
295                                         $complete++;
296                                         $logger->transport( "completed read with $buffer", INTERNAL );
297                                 }
298                                 
299                                 $xml .= $buffer;
300
301                         } while (!$complete && $xml);
302                         alarm(0);
303                 };
304                 alarm(0);
305         };
306
307         $logger->transport( "XML Read: $xml", INTERNAL );
308         #reset_fl( $fh, $flags) if defined $flags;
309         set_block( $fh ) if defined $flags;
310
311         if ($complete) {
312                 return $xml;
313         }
314         if( $@ ) {
315                 return undef;
316         }
317         return "";
318 }
319
320
321 # -------------------------------------------------
322
323 sub tcp_connected {
324
325         my $self = shift;
326         return 1 if ($self->{_socket} and $self->{_socket}->connected);
327         return 0;
328 }
329
330 sub password {
331         my( $self, $password ) = @_;
332         $self->{'oils:password'} = $password if $password;
333         return $self->{'oils:password'};
334 }
335
336 # -------------------------------------------------
337
338 sub username {
339         my( $self, $username ) = @_;
340         $self->{'oils:username'} = $username if $username;
341         return $self->{'oils:username'};
342 }
343         
344 # -------------------------------------------------
345
346 sub resource {
347         my( $self, $resource ) = @_;
348         $self->{'oils:resource'} = $resource if $resource;
349         return $self->{'oils:resource'};
350 }
351
352 # -------------------------------------------------
353
354 sub jid {
355         my( $self, $jid ) = @_;
356         $self->{'oils:jid'} = $jid if $jid;
357         return $self->{'oils:jid'};
358 }
359
360 sub port {
361         my( $self, $port ) = @_;
362         $self->{'oils:port'} = $port if $port;
363         return $self->{'oils:port'};
364 }
365
366 sub host {
367         my( $self, $host ) = @_;
368         $self->{'oils:host'} = $host if $host;
369         return $self->{'oils:host'};
370 }
371
372 # -------------------------------------------------
373
374 =head2 send()
375
376         Sends a Jabber message.
377         
378         %params:
379                 to                      - The JID of the recipient
380                 thread  - The Jabber thread
381                 body            - The body of the message
382
383 =cut
384
385 sub send {
386         my $self = shift;
387         my %params = @_;
388
389         my $to = $params{'to'} || return undef;
390         my $body = $params{'body'} || return undef;
391         my $thread = $params{'thread'} || "";
392         my $router_command = $params{'router_command'} || "";
393         my $router_class = $params{'router_class'} || "";
394
395         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
396
397         $msg->setTo( $to );
398         $msg->setThread( $thread ) if $thread;
399         $msg->setBody( $body );
400         $msg->set_router_command( $router_command );
401         $msg->set_router_class( $router_class );
402
403
404         $logger->transport( 
405                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
406
407         my $soc = $self->{_socket};
408         unless( $soc and $soc->connected ) {
409                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
410         }
411         print $soc $msg->toString;
412
413         $logger->transport( 
414                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
415 }
416
417
418 =head2 inintialize()
419
420 Connect to the server and log in.  
421
422 Throws an OpenSRF::EX::JabberException if we cannot connect
423 to the server or if the authentication fails.
424
425 =cut
426
427 # --- The logging lines have been commented out until we decide 
428 # on which log files we're using.
429
430 sub initialize {
431
432         my $self = shift;
433
434         my $jid         = $self->jid; 
435         my $host        = $self->host; 
436         my $port        = $self->port; 
437         my $username    = $self->username;
438         my $resource    = $self->resource;
439         my $password    = $self->password;
440
441         my $stream = <<"        XML";
442 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
443         XML
444
445         my $conf = OpenSRF::Utils::Config->current;
446         my $tail = "_$$";
447         if(!$conf->bootstrap->router_name && $username eq "router") {
448                 $tail = "";
449         }
450
451         my $auth = <<"  XML";
452 <iq id='123' type='set'>
453 <query xmlns='jabber:iq:auth'>
454 <username>$username</username>
455 <password>$password</password>
456 <resource>${resource}$tail</resource>
457 </query>
458 </iq>
459         XML
460
461         my $sock_type = 'IO::Socket::INET';
462         
463         # if port is a string, then we're connecting to a UNIX socket
464         unless( $port =~ /^\d+$/ ) {
465                 $sock_type = 'IO::Socket::UNIX';
466         }
467
468         # --- 5 tries to connect to the jabber server
469         my $socket;
470         for(1..5) {
471                 $logger->transport( "$jid: Attempting to connect to server...$host:$port (Try # $_)", WARN );
472                 $socket = $sock_type->new( PeerHost => $host,
473                                            PeerPort => $port,
474                                            Peer => $port,
475                                            Proto    => 'tcp' );
476                 $logger->transport( "$jid: $_ connect attempt to $host:$port", WARN );
477                 last if ( $socket and $socket->connected );
478                 sleep 3;
479         }
480
481         unless ( $socket and $socket->connected ) {
482                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
483         }
484
485         $logger->transport( "Logging into jabber as $jid " .
486                         "from " . ref( $self ), DEBUG );
487
488         print $socket $stream;
489
490         my $buffer;
491         eval {
492                 eval {
493                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
494                         alarm 3;
495                         sysread($socket, $buffer, 4096);
496                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
497                         alarm(0);
498                 };
499                 alarm(0);
500         };
501
502         print $socket $auth;
503
504         if( $socket and $socket->connected() ) {
505                 $self->{_socket} = $socket;
506         } else {
507                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
508         }
509
510
511         $buffer = $self->timed_read(10);
512
513         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
514
515         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
516                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
517         } else {
518                 if( !$buffer ) { $buffer = " "; }
519                 $socket->close;
520                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
521         }
522
523         return $self;
524 }
525
526 sub construct {
527         my( $class, $app ) = @_;
528         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
529         $class->peer_handle( 
530                         $class->new( $app )->initialize() );
531 }
532
533 sub process {
534
535         my( $self, $timeout ) = @_;
536
537         $timeout ||= 0;
538         undef $timeout if ( $timeout == -1 );
539
540         unless( $self->{_socket}->connected ) {
541                 OpenSRF::EX::JabberDisconnected->throw( 
542                   "This JabberClient instance is no longer connected to the server " . 
543                   $self->username . " : " . $self->resource, ERROR );
544         }
545
546         my $val = $self->timed_read( $timeout );
547
548         $timeout = "FOREVER" unless ( defined $timeout );
549         
550         if ( ! defined( $val ) ) {
551                 OpenSRF::EX::Jabber->throw( 
552                   "Call to Client->timed_read( $timeout ) failed", ERROR );
553         } elsif ( ! $val ) {
554                 $logger->transport( 
555                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", DEBUG );
556         } elsif ( $val ) {
557                 $logger->transport( 
558                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
559         }
560
561         return $val;
562
563 }
564
565
566 1;