]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
making to work in new environment, added 'system_client' login for system
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 #use Net::Jabber qw( Client );
5 use base qw( OpenSRF );
6 use OpenSRF::Utils::Logger qw(:level);
7 use Time::HiRes qw(ualarm);
8
9 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
10 use IO::Socket::INET;
11
12 =head1 Description
13
14 OpenSRF::Transport::SlimJabber::Client
15
16 Home-brewed slimmed down jabber connection agent. Supports SSL connections
17 with a config file options:
18
19   transport->server->sslport # the ssl port
20   transport->server->ssl  # is this ssl?
21
22 =cut
23
24 my $logger = "OpenSRF::Utils::Logger";
25
26 sub DESTROY{
27         my $self = shift;
28         my $socket = $self->{_socket};
29         if( $socket and $socket->connected() ) {
30                 print $socket "</stream:stream>";
31                 close( $socket );
32         }
33 };
34
35
36 =head2 new()
37
38 Creates a new Client object.
39
40 debug and log_file are not required if you don't care to log the activity, 
41 however all other parameters are.
42
43 %params:
44
45         username
46         resource        
47         password
48         debug    
49         log_file
50
51 =cut
52
53 sub new {
54
55         my( $class, %params ) = @_;
56
57         $class = ref( $class ) || $class;
58
59         my $conf = OpenSRF::Utils::Config->current;
60
61         my $host;
62
63         my $port                        = $conf->transport->server->port;
64         my $username    = $params{'username'}   || return undef;
65         my $resource    = $params{'resource'}   || return undef;
66         my $password    = $params{'password'}   || return undef;
67
68         if( $params{host} ) {
69                 $host           = $params{host};
70         } else { 
71                 $host   = $conf->transport->server->system_primary;
72         }
73
74         my $jid = "$username\@$host\/$resource";
75
76         my $self = bless {} => $class;
77
78         $self->jid( $jid );
79         $self->host( $host );
80         $self->port( $port );
81         $self->username( $username );
82         $self->resource( $resource );
83         $self->password( $password );
84         $self->{temp_buffer} = "";
85
86         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
87                         $logger->INFO );
88
89         return $self;
90 }
91
92 # clears the tmp buffer as well as the TCP buffer
93 sub buffer_reset { 
94
95         my $self = shift;
96         $self->{temp_buffer} = ""; 
97
98         my $fh = $self->{_socket};
99         set_nonblock( $fh );
100         my $t_buf = "";
101         while( sysread( $fh, $t_buf, 4096 ) ) {} 
102         set_block( $fh );
103 }
104 # -------------------------------------------------
105
106 =head2 gather()
107
108 Gathers all Jabber messages sitting in the collection queue 
109 and hands them each to their respective callbacks.  This call
110 does not block (calls Process(0))
111
112 =cut
113
114 sub gather { my $self = shift; $self->process( 0 ); }
115
116 # -------------------------------------------------
117
118 =head2 listen()
119
120 Blocks and gathers incoming messages as they arrive.  Does not return
121 unless an error occurs.
122
123 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
124
125 =cut
126 sub listen {
127         my $self = shift;
128
129         my $sock = $self->unix_sock();
130         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
131         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
132         
133         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
134                 unless ($socket->connected);
135                 
136         while(1) {
137                 my $o = $self->process( -1 );
138                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
139                 if( ! defined( $o ) ) {
140                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
141                 }
142                 print $socket $o;
143
144         }
145         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
146 }
147
148 sub set_nonblock {
149         my $fh = shift;
150         my      $flags = fcntl($fh, F_GETFL, 0)
151                 or die "Can't get flags for the socket: $!\n";
152
153         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
154
155         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
156                 or die "Can't set flags for the socket: $!\n";
157
158         return $flags;
159 }
160
161 sub reset_fl {
162         my $fh = shift;
163         my $flags = shift;
164         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
165         fcntl($fh, F_SETFL, $flags) if defined $flags;
166 }
167
168 sub set_block {
169         my $fh = shift;
170
171         my      $flags = fcntl($fh, F_GETFL, 0)
172                 or die "Can't get flags for the socket: $!\n";
173
174         $flags &= ~O_NONBLOCK;
175
176         fcntl($fh, F_SETFL, $flags)
177                 or die "Can't set flags for the socket: $!\n";
178 }
179
180
181 sub timed_read {
182         my ($self, $timeout) = @_;
183
184         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
185         if( $self->can( "app" ) ) {
186                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
187         }
188
189         # See if there is a complete message in the temp_buffer
190         # that we can return
191         if( $self->{temp_buffer} ) {
192                 my $buffer = $self->{temp_buffer};
193                 my $complete = 0;
194                 $self->{temp_buffer} = '';
195
196                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
197                 $logger->transport("Using tag: $tag  ", INTERNAL);
198
199                 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
200                         $buffer = $1;
201                         $self->{temp_buffer} = $2;
202                         $complete++;
203                         $logger->transport( "completed read with $buffer", INTERNAL );
204                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
205                         $self->{temp_buffer} = $1;
206                         $complete++;
207                         $logger->transport( "completed read with $buffer", INTERNAL );
208                 } else {
209                         $self->{temp_buffer} = $buffer;
210                 }
211                                 
212                 if( $buffer and $complete ) {
213                         return $buffer;
214                 }
215
216         }
217         ############
218
219         my $fh = $self->{_socket};
220
221         unless( $fh and $fh->connected ) {
222                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
223         }
224
225         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
226
227         my $flags;
228         if (defined($timeout) && !$timeout) {
229                 $flags = set_nonblock( $fh );
230         }
231
232         $timeout ||= 0;
233         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
234
235
236         my $complete = 0;
237         my $first_read = 1;
238         my $xml = '';
239         eval {
240                 my $tag = '';
241                 eval {
242                         no warnings;
243                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
244
245                         # alarm needs a number greater => 1.
246                         my $alarm_timeout = $timeout;
247                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
248                                 $alarm_timeout = 1;
249                         }
250                         alarm $alarm_timeout;
251                         do {    
252
253                                 my $buffer = $self->{temp_buffer};
254                                 $self->{temp_buffer} = '';
255                                 #####
256
257                                 my $ff =  fcntl($fh, F_GETFL, 0);
258                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
259                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
260                                 }
261
262                                 my $t_buf = "";
263                                 my $read_size = 1024;
264                                 my $f = 0;
265                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
266                                         $buffer .= $t_buf;
267                                         if( $n < $read_size ) {
268                                                 #reset_fl( $fh, $f ) if $f;
269                                                 set_block( $fh );
270                                                 last;
271                                         }
272                                         # see if there is any more data to grab...
273                                         $f = set_nonblock( $fh );
274                                 }
275
276                                 #sysread($fh, $buffer, 2048, length($buffer) );
277                                 #sysread( $fh, $t_buf, 2048 );
278                                 #$buffer .= $t_buf;
279
280                                 #####
281                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
282
283                                 if ($first_read) {
284                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
285                                         ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
286                                         $first_read--;
287                                         $logger->transport("Using tag: $tag  ", INTERNAL);
288                                 }
289
290                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
291                                         $buffer = $1;
292                                         $self->{temp_buffer} = $2;
293                                         $complete++;
294                                         $logger->transport( "completed read with $buffer", INTERNAL );
295                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
296                                         $self->{temp_buffer} = $1;
297                                         $complete++;
298                                         $logger->transport( "completed read with $buffer", INTERNAL );
299                                 }
300                                 
301                                 $xml .= $buffer;
302
303                         } while (!$complete && $xml);
304                         alarm(0);
305                 };
306                 alarm(0);
307         };
308
309         $logger->transport( "XML Read: $xml", INTERNAL );
310         #reset_fl( $fh, $flags) if defined $flags;
311         set_block( $fh ) if defined $flags;
312
313         if ($complete) {
314                 return $xml;
315         }
316         if( $@ ) {
317                 return undef;
318         }
319         return "";
320 }
321
322
323 # -------------------------------------------------
324
325 sub tcp_connected {
326
327         my $self = shift;
328         return 1 if ($self->{_socket} and $self->{_socket}->connected);
329         return 0;
330 }
331
332 sub password {
333         my( $self, $password ) = @_;
334         $self->{'oils:password'} = $password if $password;
335         return $self->{'oils:password'};
336 }
337
338 # -------------------------------------------------
339
340 sub username {
341         my( $self, $username ) = @_;
342         $self->{'oils:username'} = $username if $username;
343         return $self->{'oils:username'};
344 }
345         
346 # -------------------------------------------------
347
348 sub resource {
349         my( $self, $resource ) = @_;
350         $self->{'oils:resource'} = $resource if $resource;
351         return $self->{'oils:resource'};
352 }
353
354 # -------------------------------------------------
355
356 sub jid {
357         my( $self, $jid ) = @_;
358         $self->{'oils:jid'} = $jid if $jid;
359         return $self->{'oils:jid'};
360 }
361
362 sub port {
363         my( $self, $port ) = @_;
364         $self->{'oils:port'} = $port if $port;
365         return $self->{'oils:port'};
366 }
367
368 sub host {
369         my( $self, $host ) = @_;
370         $self->{'oils:host'} = $host if $host;
371         return $self->{'oils:host'};
372 }
373
374 # -------------------------------------------------
375
376 =head2 send()
377
378         Sends a Jabber message.
379         
380         %params:
381                 to                      - The JID of the recipient
382                 thread  - The Jabber thread
383                 body            - The body of the message
384
385 =cut
386
387 sub send {
388         my $self = shift;
389         my %params = @_;
390
391         my $to = $params{'to'} || return undef;
392         my $body = $params{'body'} || return undef;
393         my $thread = $params{'thread'} || "";
394         my $router_command = $params{'router_command'} || "";
395         my $router_class = $params{'router_class'} || "";
396
397         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
398
399         $msg->setTo( $to );
400         $msg->setThread( $thread ) if $thread;
401         $msg->setBody( $body );
402         $msg->set_router_command( $router_command );
403         $msg->set_router_class( $router_class );
404
405
406         $logger->transport( 
407                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
408
409         my $soc = $self->{_socket};
410         print $soc $msg->toString;
411 }
412
413
414 =head2 inintialize()
415
416 Connect to the server and log in.  
417
418 Throws an OpenSRF::EX::JabberException if we cannot connect
419 to the server or if the authentication fails.
420
421 =cut
422
423 # --- The logging lines have been commented out until we decide 
424 # on which log files we're using.
425
426 sub initialize {
427
428         my $self = shift;
429
430         my $jid         = $self->jid; 
431         my $host        = $self->host; 
432         my $port        = $self->port; 
433         my $username    = $self->username;
434         my $resource    = $self->resource;
435         my $password    = $self->password;
436
437         my $stream = <<"        XML";
438 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
439         XML
440
441         my $auth = <<"  XML";
442 <iq id='123' type='set'>
443 <query xmlns='jabber:iq:auth'>
444 <username>$username</username>
445 <password>$password</password>
446 <resource>$resource</resource>
447 </query>
448 </iq>
449         XML
450
451
452         # --- 5 tries to connect to the jabber server
453         my $socket;
454         for(1..5) {
455                 $logger->transport( "$jid: Attempting to connect to server... (Try # $_)", WARN );
456                 $socket = IO::Socket::INET->new( PeerHost => $host,
457                                                  PeerPort => $port,
458                                                  Proto    => 'tcp' );
459                 last if ( $socket and $socket->connected );
460                 sleep 3;
461         }
462
463         unless ( $socket and $socket->connected ) {
464                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
465         }
466
467         $logger->transport( "Logging into jabber as $jid " .
468                         "from " . ref( $self ), DEBUG );
469
470         print $socket $stream;
471
472         my $buffer;
473         eval {
474                 eval {
475                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
476                         alarm 3;
477                         sysread($socket, $buffer, 4096);
478                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
479                         alarm(0);
480                 };
481                 alarm(0);
482         };
483
484         print $socket $auth;
485
486         if( $socket and $socket->connected() ) {
487                 $self->{_socket} = $socket;
488         } else {
489                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
490         }
491
492
493         $buffer = $self->timed_read(10);
494
495         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
496
497         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
498                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
499         } else {
500                 if( !$buffer ) { $buffer = " "; }
501                 $socket->close;
502                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
503         }
504
505         return $self;
506 }
507
508 sub construct {
509         my( $class, $app ) = @_;
510         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
511         $class->peer_handle( 
512                         $class->new( $app )->initialize() );
513 }
514
515 sub process {
516
517         my( $self, $timeout ) = @_;
518
519         $timeout ||= 0;
520         undef $timeout if ( $timeout == -1 );
521
522         unless( $self->{_socket}->connected ) {
523                 OpenSRF::EX::JabberDisconnected->throw( 
524                   "This JabberClient instance is no longer connected to the server", ERROR );
525         }
526
527         my $val = $self->timed_read( $timeout );
528
529         $timeout = "FOREVER" unless ( defined $timeout );
530         
531         if ( ! defined( $val ) ) {
532                 OpenSRF::EX::Jabber->throw( 
533                   "Call to Client->timed_read( $timeout ) failed", ERROR );
534         } elsif ( ! $val ) {
535                 $logger->transport( 
536                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", DEBUG );
537         } elsif ( $val ) {
538                 $logger->transport( 
539                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
540         }
541
542         return $val;
543
544 }
545
546
547 1;