]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
regex to determine unix-ocity for the socket
[OpenSRF.git] / src / perlmods / OpenSRF / Transport / SlimJabber / Client.pm
1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
3 use OpenSRF::EX;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8
9 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
10 use IO::Socket::INET;
11 use IO::Socket::UNIX;
12
13 =head1 Description
14
15 OpenSRF::Transport::SlimJabber::Client
16
17 Home-brewed slimmed down jabber connection agent. Supports SSL connections
18 with a config file options:
19
20   transport->server->sslport # the ssl port
21   transport->server->ssl  # is this ssl?
22
23 =cut
24
25 my $logger = "OpenSRF::Utils::Logger";
26
27 sub DESTROY{
28         my $self = shift;
29         $self->disconnect;
30 }
31
32 sub disconnect{
33         my $self = shift;
34         my $socket = $self->{_socket};
35         if( $socket and $socket->connected() ) {
36                 print $socket "</stream:stream>";
37                 close( $socket );
38         }
39 }
40
41
42 =head2 new()
43
44 Creates a new Client object.
45
46 debug and log_file are not required if you don't care to log the activity, 
47 however all other parameters are.
48
49 %params:
50
51         username
52         resource        
53         password
54         debug    
55         log_file
56
57 =cut
58
59 sub new {
60
61         my( $class, %params ) = @_;
62
63         $class = ref( $class ) || $class;
64
65         my $port                        = $params{'port'}                       || return undef;
66         my $username    = $params{'username'}   || return undef;
67         my $resource    = $params{'resource'}   || return undef;
68         my $password    = $params{'password'}   || return undef;
69         my $host                        = $params{'host'}                       || return undef;
70
71         my $jid = "$username\@$host\/$resource";
72
73         my $self = bless {} => $class;
74
75         $self->jid( $jid );
76         $self->host( $host );
77         $self->port( $port );
78         $self->username( $username );
79         $self->resource( $resource );
80         $self->password( $password );
81         $self->{temp_buffer} = "";
82
83         $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
84                         $logger->INFO );
85
86         return $self;
87 }
88
89 # clears the tmp buffer as well as the TCP buffer
90 sub buffer_reset { 
91
92         my $self = shift;
93         $self->{temp_buffer} = ""; 
94
95         my $fh = $self->{_socket};
96         set_nonblock( $fh );
97         my $t_buf = "";
98         while( sysread( $fh, $t_buf, 4096 ) ) {} 
99         set_block( $fh );
100 }
101 # -------------------------------------------------
102
103 =head2 gather()
104
105 Gathers all Jabber messages sitting in the collection queue 
106 and hands them each to their respective callbacks.  This call
107 does not block (calls Process(0))
108
109 =cut
110
111 sub gather { my $self = shift; $self->process( 0 ); }
112
113 # -------------------------------------------------
114
115 =head2 listen()
116
117 Blocks and gathers incoming messages as they arrive.  Does not return
118 unless an error occurs.
119
120 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
121
122 =cut
123 sub listen {
124         my $self = shift;
125
126         my $sock = $self->unix_sock();
127         my $socket = IO::Socket::UNIX->new( Peer => $sock  );
128         $logger->transport( "Unix Socket opened by Listener", INTERNAL );
129         
130         throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
131                 unless ($socket->connected);
132                 
133         while(1) {
134                 my $o = $self->process( -1 );
135                 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
136                 if( ! defined( $o ) ) {
137                         throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
138                 }
139                 print $socket $o;
140
141         }
142         throw OpenSRF::EX::Socket( "How did we get here?!?!" );
143 }
144
145 sub set_nonblock {
146         my $fh = shift;
147         my      $flags = fcntl($fh, F_GETFL, 0)
148                 or die "Can't get flags for the socket: $!\n";
149
150         $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
151
152         fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
153                 or die "Can't set flags for the socket: $!\n";
154
155         return $flags;
156 }
157
158 sub reset_fl {
159         my $fh = shift;
160         my $flags = shift;
161         $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
162         fcntl($fh, F_SETFL, $flags) if defined $flags;
163 }
164
165 sub set_block {
166         my $fh = shift;
167
168         my      $flags = fcntl($fh, F_GETFL, 0)
169                 or die "Can't get flags for the socket: $!\n";
170
171         $flags &= ~O_NONBLOCK;
172
173         fcntl($fh, F_SETFL, $flags)
174                 or die "Can't set flags for the socket: $!\n";
175 }
176
177
178 sub timed_read {
179         my ($self, $timeout) = @_;
180
181         $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
182         if( $self->can( "app" ) ) {
183                 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
184         }
185
186         # See if there is a complete message in the temp_buffer
187         # that we can return
188         if( $self->{temp_buffer} ) {
189                 my $buffer = $self->{temp_buffer};
190                 my $complete = 0;
191                 $self->{temp_buffer} = '';
192
193                 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
194                 $logger->transport("Using tag: $tag  ", INTERNAL);
195
196                 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
197                         $buffer = $1;
198                         $self->{temp_buffer} = $2;
199                         $complete++;
200                         $logger->transport( "completed read with $buffer", INTERNAL );
201                 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
202                         $self->{temp_buffer} = $1;
203                         $complete++;
204                         $logger->transport( "completed read with $buffer", INTERNAL );
205                 } else {
206                         $self->{temp_buffer} = $buffer;
207                 }
208                                 
209                 if( $buffer and $complete ) {
210                         return $buffer;
211                 }
212
213         }
214         ############
215
216         my $fh = $self->{_socket};
217
218         unless( $fh and $fh->connected ) {
219                 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
220         }
221
222         $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
223
224         my $flags;
225         if (defined($timeout) && !$timeout) {
226                 $flags = set_nonblock( $fh );
227         }
228
229         $timeout ||= 0;
230         $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
231
232
233         my $complete = 0;
234         my $first_read = 1;
235         my $xml = '';
236         eval {
237                 my $tag = '';
238                 eval {
239                         no warnings;
240                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
241
242                         # alarm needs a number greater => 1.
243                         my $alarm_timeout = $timeout;
244                         if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
245                                 $alarm_timeout = 1;
246                         }
247                         alarm $alarm_timeout;
248                         do {    
249
250                                 my $buffer = $self->{temp_buffer};
251                                 $self->{temp_buffer} = '';
252                                 #####
253
254                                 my $ff =  fcntl($fh, F_GETFL, 0);
255                                 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
256                                         #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
257                                 }
258
259                                 my $t_buf = "";
260                                 my $read_size = 1024;
261                                 my $f = 0;
262                                 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
263                                         $buffer .= $t_buf;
264                                         if( $n < $read_size ) {
265                                                 #reset_fl( $fh, $f ) if $f;
266                                                 set_block( $fh );
267                                                 last;
268                                         }
269                                         # see if there is any more data to grab...
270                                         $f = set_nonblock( $fh );
271                                 }
272
273                                 #sysread($fh, $buffer, 2048, length($buffer) );
274                                 #sysread( $fh, $t_buf, 2048 );
275                                 #$buffer .= $t_buf;
276
277                                 #####
278                                 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
279
280                                 if ($first_read) {
281                                         $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
282                                         ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
283                                         $first_read--;
284                                         $logger->transport("Using tag: $tag  ", INTERNAL);
285                                 }
286
287                                 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
288                                         $buffer = $1;
289                                         $self->{temp_buffer} = $2;
290                                         $complete++;
291                                         $logger->transport( "completed read with $buffer", INTERNAL );
292                                 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
293                                         $self->{temp_buffer} = $1;
294                                         $complete++;
295                                         $logger->transport( "completed read with $buffer", INTERNAL );
296                                 }
297                                 
298                                 $xml .= $buffer;
299
300                         } while (!$complete && $xml);
301                         alarm(0);
302                 };
303                 alarm(0);
304         };
305
306         $logger->transport( "XML Read: $xml", INTERNAL );
307         #reset_fl( $fh, $flags) if defined $flags;
308         set_block( $fh ) if defined $flags;
309
310         if ($complete) {
311                 return $xml;
312         }
313         if( $@ ) {
314                 return undef;
315         }
316         return "";
317 }
318
319
320 # -------------------------------------------------
321
322 sub tcp_connected {
323
324         my $self = shift;
325         return 1 if ($self->{_socket} and $self->{_socket}->connected);
326         return 0;
327 }
328
329 sub password {
330         my( $self, $password ) = @_;
331         $self->{'oils:password'} = $password if $password;
332         return $self->{'oils:password'};
333 }
334
335 # -------------------------------------------------
336
337 sub username {
338         my( $self, $username ) = @_;
339         $self->{'oils:username'} = $username if $username;
340         return $self->{'oils:username'};
341 }
342         
343 # -------------------------------------------------
344
345 sub resource {
346         my( $self, $resource ) = @_;
347         $self->{'oils:resource'} = $resource if $resource;
348         return $self->{'oils:resource'};
349 }
350
351 # -------------------------------------------------
352
353 sub jid {
354         my( $self, $jid ) = @_;
355         $self->{'oils:jid'} = $jid if $jid;
356         return $self->{'oils:jid'};
357 }
358
359 sub port {
360         my( $self, $port ) = @_;
361         $self->{'oils:port'} = $port if $port;
362         return $self->{'oils:port'};
363 }
364
365 sub host {
366         my( $self, $host ) = @_;
367         $self->{'oils:host'} = $host if $host;
368         return $self->{'oils:host'};
369 }
370
371 # -------------------------------------------------
372
373 =head2 send()
374
375         Sends a Jabber message.
376         
377         %params:
378                 to                      - The JID of the recipient
379                 thread  - The Jabber thread
380                 body            - The body of the message
381
382 =cut
383
384 sub send {
385         my $self = shift;
386         my %params = @_;
387
388         my $to = $params{'to'} || return undef;
389         my $body = $params{'body'} || return undef;
390         my $thread = $params{'thread'} || "";
391         my $router_command = $params{'router_command'} || "";
392         my $router_class = $params{'router_class'} || "";
393
394         my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
395
396         $msg->setTo( $to );
397         $msg->setThread( $thread ) if $thread;
398         $msg->setBody( $body );
399         $msg->set_router_command( $router_command );
400         $msg->set_router_class( $router_class );
401
402
403         $logger->transport( 
404                         "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
405
406         my $soc = $self->{_socket};
407         unless( $soc and $soc->connected ) {
408                 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
409         }
410         print $soc $msg->toString;
411
412         $logger->transport( 
413                         "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
414 }
415
416
417 =head2 inintialize()
418
419 Connect to the server and log in.  
420
421 Throws an OpenSRF::EX::JabberException if we cannot connect
422 to the server or if the authentication fails.
423
424 =cut
425
426 # --- The logging lines have been commented out until we decide 
427 # on which log files we're using.
428
429 sub initialize {
430
431         my $self = shift;
432
433         my $jid         = $self->jid; 
434         my $host        = $self->host; 
435         my $port        = $self->port; 
436         my $username    = $self->username;
437         my $resource    = $self->resource;
438         my $password    = $self->password;
439
440         my $stream = <<"        XML";
441 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
442         XML
443
444         my $auth = <<"  XML";
445 <iq id='123' type='set'>
446 <query xmlns='jabber:iq:auth'>
447 <username>$username</username>
448 <password>$password</password>
449 <resource>${resource}_$$</resource>
450 </query>
451 </iq>
452         XML
453
454         my $sock_type = 'IO::Socket::INET';
455         
456         # if port is a string, then we're connecting to a UNIX socket
457         unless( $port =~ /^\d+$/ ) {
458                 $sock_type = 'IO::Socket::UNIX';
459         }
460
461         # --- 5 tries to connect to the jabber server
462         my $socket;
463         for(1..5) {
464                 $logger->transport( "$jid: Attempting to connect to server...$host:$port (Try # $_)", WARN );
465                 $socket = $sock_type->new( PeerHost => $host,
466                                            PeerPort => $port,
467                                            Peer => $port,
468                                            Proto    => 'tcp' );
469                 $logger->transport( "$jid: $_ connect attempt to $host:$port", WARN );
470                 last if ( $socket and $socket->connected );
471                 sleep 3;
472         }
473
474         unless ( $socket and $socket->connected ) {
475                 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
476         }
477
478         $logger->transport( "Logging into jabber as $jid " .
479                         "from " . ref( $self ), DEBUG );
480
481         print $socket $stream;
482
483         my $buffer;
484         eval {
485                 eval {
486                         local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
487                         alarm 3;
488                         sysread($socket, $buffer, 4096);
489                         $logger->transport( "Login buffer 1: $buffer", INTERNAL );
490                         alarm(0);
491                 };
492                 alarm(0);
493         };
494
495         print $socket $auth;
496
497         if( $socket and $socket->connected() ) {
498                 $self->{_socket} = $socket;
499         } else {
500                 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
501         }
502
503
504         $buffer = $self->timed_read(10);
505
506         if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
507
508         if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
509                 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
510         } else {
511                 if( !$buffer ) { $buffer = " "; }
512                 $socket->close;
513                 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
514         }
515
516         return $self;
517 }
518
519 sub construct {
520         my( $class, $app ) = @_;
521         $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
522         $class->peer_handle( 
523                         $class->new( $app )->initialize() );
524 }
525
526 sub process {
527
528         my( $self, $timeout ) = @_;
529
530         $timeout ||= 0;
531         undef $timeout if ( $timeout == -1 );
532
533         unless( $self->{_socket}->connected ) {
534                 OpenSRF::EX::JabberDisconnected->throw( 
535                   "This JabberClient instance is no longer connected to the server " . 
536                   $self->username . " : " . $self->resource, ERROR );
537         }
538
539         my $val = $self->timed_read( $timeout );
540
541         $timeout = "FOREVER" unless ( defined $timeout );
542         
543         if ( ! defined( $val ) ) {
544                 OpenSRF::EX::Jabber->throw( 
545                   "Call to Client->timed_read( $timeout ) failed", ERROR );
546         } elsif ( ! $val ) {
547                 $logger->transport( 
548                         "Call to Client->timed_read( $timeout ) returned 0 bytes of data", DEBUG );
549         } elsif ( $val ) {
550                 $logger->transport( 
551                         "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
552         }
553
554         return $val;
555
556 }
557
558
559 1;