1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
4 use base qw( OpenSRF );
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Utils::Config;
7 use Time::HiRes qw(ualarm);
8 use OpenSRF::Utils::Config;
10 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
16 OpenSRF::Transport::SlimJabber::Client
18 Home-brewed slimmed down jabber connection agent. Supports SSL connections
19 with a config file options:
21 transport->server->sslport # the ssl port
22 transport->server->ssl # is this ssl?
26 my $logger = "OpenSRF::Utils::Logger";
35 my $socket = $self->{_socket};
36 if( $socket and $socket->connected() ) {
37 print $socket "</stream:stream>";
45 Creates a new Client object.
47 debug and log_file are not required if you don't care to log the activity,
48 however all other parameters are.
62 my( $class, %params ) = @_;
64 $class = ref( $class ) || $class;
66 my $port = $params{'port'} || return undef;
67 my $username = $params{'username'} || return undef;
68 my $resource = $params{'resource'} || return undef;
69 my $password = $params{'password'} || return undef;
70 my $host = $params{'host'} || return undef;
72 my $jid = "$username\@$host\/$resource";
74 my $self = bless {} => $class;
79 $self->username( $username );
80 $self->resource( $resource );
81 $self->password( $password );
82 $self->{temp_buffer} = "";
84 $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
90 # clears the tmp buffer as well as the TCP buffer
94 $self->{temp_buffer} = "";
96 my $fh = $self->{_socket};
99 while( sysread( $fh, $t_buf, 4096 ) ) {}
102 # -------------------------------------------------
106 Gathers all Jabber messages sitting in the collection queue
107 and hands them each to their respective callbacks. This call
108 does not block (calls Process(0))
112 sub gather { my $self = shift; $self->process( 0 ); }
114 # -------------------------------------------------
118 Blocks and gathers incoming messages as they arrive. Does not return
119 unless an error occurs.
121 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
127 my $sock = $self->unix_sock();
128 my $socket = IO::Socket::UNIX->new( Peer => $sock );
129 $logger->transport( "Unix Socket opened by Listener", INTERNAL );
131 throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
132 unless ($socket->connected);
135 my $o = $self->process( -1 );
136 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
137 if( ! defined( $o ) ) {
138 throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
143 throw OpenSRF::EX::Socket( "How did we get here?!?!" );
148 my $flags = fcntl($fh, F_GETFL, 0)
149 or die "Can't get flags for the socket: $!\n";
151 $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
153 fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
154 or die "Can't set flags for the socket: $!\n";
162 $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
163 fcntl($fh, F_SETFL, $flags) if defined $flags;
169 my $flags = fcntl($fh, F_GETFL, 0)
170 or die "Can't get flags for the socket: $!\n";
172 $flags &= ~O_NONBLOCK;
174 fcntl($fh, F_SETFL, $flags)
175 or die "Can't set flags for the socket: $!\n";
180 my ($self, $timeout) = @_;
182 $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
183 if( $self->can( "app" ) ) {
184 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
187 # See if there is a complete message in the temp_buffer
189 if( $self->{temp_buffer} ) {
190 my $buffer = $self->{temp_buffer};
192 $self->{temp_buffer} = '';
194 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
195 $self->{last_tag} = $tag;
196 $logger->transport("Using tag: $tag ", INTERNAL);
198 if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
200 $self->{temp_buffer} = $2;
202 $logger->transport( "completed read with $buffer", INTERNAL );
203 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
204 $self->{temp_buffer} = $1;
206 $logger->transport( "completed read with $buffer", INTERNAL );
208 $self->{temp_buffer} = $buffer;
211 if( $buffer and $complete ) {
218 my $fh = $self->{_socket};
220 unless( $fh and $fh->connected ) {
221 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
224 $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
227 if (defined($timeout) && !$timeout) {
228 $flags = set_nonblock( $fh );
232 $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
242 local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
244 # alarm needs a number greater => 1.
245 my $alarm_timeout = $timeout;
246 if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
249 alarm $alarm_timeout;
252 my $buffer = $self->{temp_buffer};
253 $self->{temp_buffer} = '';
256 # This code is no longer in use
257 #my $ff = fcntl($fh, F_GETFL, 0);
258 #if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
259 #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
263 my $read_size = 1024; my $f = 0;
264 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
266 unless( $fh->connected ) {
267 OpenSRF::EX::JabberDisconnected->throw(
268 "Lost jabber client in timed_read()");
271 # XXX Change me to debug/internal at some point, this is for testing...
272 # XXX Found a race condition where reading >= $read_size bytes of data
273 # will fail if the log line below is removed.
274 $logger->info("timed_read() read $n bytes of data");
278 if( $n < $read_size ) {
279 #reset_fl( $fh, $f ) if $f;
283 # see if there is any more data to grab...
284 $f = set_nonblock( $fh );
287 #sysread($fh, $buffer, 2048, length($buffer) );
288 #sysread( $fh, $t_buf, 2048 );
292 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
295 $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
296 ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
297 $self->{last_tag} = $tag;
299 $logger->transport("Using tag: $tag ", INTERNAL);
302 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
304 $self->{temp_buffer} = $2;
306 $logger->transport( "completed read with $buffer", INTERNAL );
307 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
308 $self->{temp_buffer} = $1;
310 $logger->transport( "completed read with $buffer", INTERNAL );
315 } while (!$complete && $xml);
321 $logger->transport( "XML Read: $xml", INTERNAL );
322 #reset_fl( $fh, $flags) if defined $flags;
323 set_block( $fh ) if defined $flags;
335 # -------------------------------------------------
340 return 1 if ($self->{_socket} and $self->{_socket}->connected);
345 my( $self, $password ) = @_;
346 $self->{'oils:password'} = $password if $password;
347 return $self->{'oils:password'};
350 # -------------------------------------------------
353 my( $self, $username ) = @_;
354 $self->{'oils:username'} = $username if $username;
355 return $self->{'oils:username'};
358 # -------------------------------------------------
361 my( $self, $resource ) = @_;
362 $self->{'oils:resource'} = $resource if $resource;
363 return $self->{'oils:resource'};
366 # -------------------------------------------------
369 my( $self, $jid ) = @_;
370 $self->{'oils:jid'} = $jid if $jid;
371 return $self->{'oils:jid'};
375 my( $self, $port ) = @_;
376 $self->{'oils:port'} = $port if $port;
377 return $self->{'oils:port'};
381 my( $self, $host ) = @_;
382 $self->{'oils:host'} = $host if $host;
383 return $self->{'oils:host'};
386 # -------------------------------------------------
390 Sends a Jabber message.
393 to - The JID of the recipient
394 thread - The Jabber thread
395 body - The body of the message
403 my $to = $params{'to'} || return undef;
404 my $body = $params{'body'} || return undef;
405 my $thread = $params{'thread'} || "";
406 my $router_command = $params{'router_command'} || "";
407 my $router_class = $params{'router_class'} || "";
409 my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
412 $msg->setThread( $thread ) if $thread;
413 $msg->setBody( $body );
414 $msg->set_router_command( $router_command );
415 $msg->set_router_class( $router_class );
416 $msg->set_osrf_xid($logger->get_osrf_xid);
419 "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
421 my $soc = $self->{_socket};
422 unless( $soc and $soc->connected ) {
423 throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
425 print $soc $msg->toString;
428 "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
434 Connect to the server and log in.
436 Throws an OpenSRF::EX::JabberException if we cannot connect
437 to the server or if the authentication fails.
441 # --- The logging lines have been commented out until we decide
442 # on which log files we're using.
448 my $jid = $self->jid;
449 my $host = $self->host;
450 my $port = $self->port;
451 my $username = $self->username;
452 my $resource = $self->resource;
453 my $password = $self->password;
455 my $stream = <<" XML";
456 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
459 my $conf = OpenSRF::Utils::Config->current;
461 if(!$conf->bootstrap->router_name && $username eq "router") {
466 <iq id='123' type='set'>
467 <query xmlns='jabber:iq:auth'>
468 <username>$username</username>
469 <password>$password</password>
470 <resource>${resource}$tail</resource>
475 my $sock_type = 'IO::Socket::INET';
477 # if port is a string, then we're connecting to a UNIX socket
478 unless( $port =~ /^\d+$/ ) {
479 $sock_type = 'IO::Socket::UNIX';
482 # --- 5 tries to connect to the jabber server
485 $socket = $sock_type->new( PeerHost => $host,
489 $logger->debug( "$jid: $_ connect attempt to $host:$port");
490 last if ( $socket and $socket->connected );
491 $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
495 unless ( $socket and $socket->connected ) {
496 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
499 $logger->transport( "Logging into jabber as $jid " .
500 "from " . ref( $self ), DEBUG );
502 print $socket $stream;
507 local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
509 sysread($socket, $buffer, 4096);
510 $logger->transport( "Login buffer 1: $buffer", INTERNAL );
518 if( $socket and $socket->connected() ) {
519 $self->{_socket} = $socket;
521 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
525 $buffer = $self->timed_read(10);
527 if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
529 if( $buffer and $buffer =~ /type=["\']result["\']/ ) {
530 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
532 if( !$buffer ) { $buffer = " "; }
534 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
541 my( $class, $app ) = @_;
542 $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
544 $class->new( $app )->initialize() );
549 my( $self, $timeout ) = @_;
552 undef $timeout if ( $timeout == -1 );
554 unless( $self->{_socket}->connected ) {
555 OpenSRF::EX::JabberDisconnected->throw(
556 "This JabberClient instance is no longer connected to the server " .
557 $self->username . " : " . $self->resource, ERROR );
560 my $val = $self->timed_read( $timeout );
562 $timeout = "FOREVER" unless ( defined $timeout );
564 if ( ! defined( $val ) ) {
565 OpenSRF::EX::Jabber->throw(
566 "Call to Client->timed_read( $timeout ) failed", ERROR );
569 "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
572 "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
575 my $t = $self->{last_tag};
578 my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
582 my $tmp = $self->{temp_buffer};
584 $self->{temp_buffer} = '';
585 $self->{temp_buffer} .= $_ for (@msgs);
586 $self->{temp_buffer} .= $tmp;
594 # --------------------------------------------------------------
595 # Sets the socket to O_NONBLOCK, reads all of the data off of
596 # the socket, the restores the sockets flags
597 # Returns 1 on success, 0 if the socket isn't connected
598 # --------------------------------------------------------------
602 my $socket = $self->{_socket};
607 my $flags = fcntl($socket, F_GETFL, 0);
609 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
610 while( my $n = sysread( $socket, $buf, 8192 ) ) {
611 $logger->debug("flush_socket dropped $n bytes of data");
612 if(!$socket->connected()) {
613 $logger->error("flush_socket dropped data on disconnected socket: $buf");
616 fcntl($socket, F_SETFL, $flags);
618 return 0 unless $socket->connected();