1 package OpenSRF::Transport::SlimJabber::Client;
2 use strict; use warnings;
4 #use Net::Jabber qw( Client );
5 use base qw( OpenSRF );
6 use OpenSRF::Utils::Logger qw(:level);
7 use Time::HiRes qw(ualarm);
9 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
14 OpenSRF::Transport::SlimJabber::Client
16 Home-brewed slimmed down jabber connection agent. Supports SSL connections
17 with a config file options:
19 transport->server->port # the ssl port
20 transport->server->ssl # is this ssl?
24 my $logger = "OpenSRF::Utils::Logger";
28 my $socket = $self->{_socket};
29 if( $socket and $socket->connected() ) {
30 print $socket "</stream:stream>";
38 Creates a new Client object.
40 debug and log_file are not required if you don't care to log the activity,
41 however all other parameters are.
55 my( $class, %params ) = @_;
57 $class = ref( $class ) || $class;
59 my $conf = OpenSRF::Utils::Config->current;
61 my $host = $conf->transport->server->primary;
62 my $port = $conf->transport->server->port;
63 my $username = $params{'username'} || return undef;
64 my $resource = $params{'resource'} || return undef;
65 my $password = $params{'password'} || return undef;
67 my $jid = "$username\@$host\/$resource";
70 my $self = bless {} => $class;
75 $self->username( $username );
76 $self->resource( $resource );
77 $self->password( $password );
78 $self->{temp_buffer} = "";
80 $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
86 # clears the tmp buffer as well as the TCP buffer
90 $self->{temp_buffer} = "";
92 my $fh = $self->{_socket};
95 while( sysread( $fh, $t_buf, 4096 ) ) {}
98 # -------------------------------------------------
102 Gathers all Jabber messages sitting in the collection queue
103 and hands them each to their respective callbacks. This call
104 does not block (calls Process(0))
108 sub gather { my $self = shift; $self->process( 0 ); }
110 # -------------------------------------------------
114 Blocks and gathers incoming messages as they arrive. Does not return
115 unless an error occurs.
117 Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
123 my $sock = $self->unix_sock();
124 my $socket = IO::Socket::UNIX->new( Peer => $sock );
125 $logger->transport( "Unix Socket opened by Listener", INTERNAL );
127 throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
128 unless ($socket->connected);
131 my $o = $self->process( -1 );
132 $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
133 if( ! defined( $o ) ) {
134 throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
139 throw OpenSRF::EX::Socket( "How did we get here?!?!" );
144 my $flags = fcntl($fh, F_GETFL, 0)
145 or die "Can't get flags for the socket: $!\n";
147 $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
149 fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
150 or die "Can't set flags for the socket: $!\n";
158 $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
159 fcntl($fh, F_SETFL, $flags) if defined $flags;
165 my $flags = fcntl($fh, F_GETFL, 0)
166 or die "Can't get flags for the socket: $!\n";
168 $flags &= ~O_NONBLOCK;
170 fcntl($fh, F_SETFL, $flags)
171 or die "Can't set flags for the socket: $!\n";
176 my ($self, $timeout) = @_;
178 $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
179 if( $self->can( "app" ) ) {
180 $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
183 # See if there is a complete message in the temp_buffer
185 if( $self->{temp_buffer} ) {
186 my $buffer = $self->{temp_buffer};
188 $self->{temp_buffer} = '';
190 my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
191 $logger->transport("Using tag: $tag ", INTERNAL);
193 if ( $buffer =~ /^(.*?<\/$tag>)(.*)/s) {
195 $self->{temp_buffer} = $2;
197 $logger->transport( "completed read with $buffer", INTERNAL );
198 } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
199 $self->{temp_buffer} = $1;
201 $logger->transport( "completed read with $buffer", INTERNAL );
203 $self->{temp_buffer} = $buffer;
206 if( $buffer and $complete ) {
213 my $fh = $self->{_socket};
215 unless( $fh and $fh->connected ) {
216 throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
219 $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
222 if (defined($timeout) && !$timeout) {
223 $flags = set_nonblock( $fh );
227 $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
237 local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
239 # alarm needs a number greater => 1.
240 my $alarm_timeout = $timeout;
241 if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
244 alarm $alarm_timeout;
247 my $buffer = $self->{temp_buffer};
248 $self->{temp_buffer} = '';
251 my $ff = fcntl($fh, F_GETFL, 0);
252 if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
253 #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
257 my $read_size = 1024;
259 while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
261 if( $n < $read_size ) {
262 #reset_fl( $fh, $f ) if $f;
266 # see if there is any more data to grab...
267 $f = set_nonblock( $fh );
270 #sysread($fh, $buffer, 2048, length($buffer) );
271 #sysread( $fh, $t_buf, 2048 );
275 $logger->transport(" Got [$buffer] from the socket", INTERNAL);
278 $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
279 ($tag) = ($buffer =~ /<([^\s\?\>]+){1}/o);
281 $logger->transport("Using tag: $tag ", INTERNAL);
284 if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
286 $self->{temp_buffer} = $2;
288 $logger->transport( "completed read with $buffer", INTERNAL );
289 } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
290 $self->{temp_buffer} = $1;
292 $logger->transport( "completed read with $buffer", INTERNAL );
297 } while (!$complete && $xml);
303 $logger->transport( "XML Read: $xml", INTERNAL );
304 #reset_fl( $fh, $flags) if defined $flags;
305 set_block( $fh ) if defined $flags;
317 # -------------------------------------------------
322 return 1 if ($self->{_socket} and $self->{_socket}->connected);
327 my( $self, $password ) = @_;
328 $self->{'oils:password'} = $password if $password;
329 return $self->{'oils:password'};
332 # -------------------------------------------------
335 my( $self, $username ) = @_;
336 $self->{'oils:username'} = $username if $username;
337 return $self->{'oils:username'};
340 # -------------------------------------------------
343 my( $self, $resource ) = @_;
344 $self->{'oils:resource'} = $resource if $resource;
345 return $self->{'oils:resource'};
348 # -------------------------------------------------
351 my( $self, $jid ) = @_;
352 $self->{'oils:jid'} = $jid if $jid;
353 return $self->{'oils:jid'};
357 my( $self, $port ) = @_;
358 $self->{'oils:port'} = $port if $port;
359 return $self->{'oils:port'};
363 my( $self, $host ) = @_;
364 $self->{'oils:host'} = $host if $host;
365 return $self->{'oils:host'};
368 # -------------------------------------------------
372 Sends a Jabber message.
375 to - The JID of the recipient
376 thread - The Jabber thread
377 body - The body of the message
385 my $to = $params{'to'} || return undef;
386 my $body = $params{'body'} || return undef;
387 my $thread = $params{'thread'} || "";
388 my $router_command = $params{'router_command'} || "";
389 my $router_class = $params{'router_class'} || "";
391 my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
394 $msg->setThread( $thread ) if $thread;
395 $msg->setBody( $body );
396 $msg->set_router_command( $router_command );
397 $msg->set_router_class( $router_class );
401 "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
403 my $soc = $self->{_socket};
404 print $soc $msg->toString;
410 Connect to the server and log in.
412 Throws an OpenSRF::EX::JabberException if we cannot connect
413 to the server or if the authentication fails.
417 # --- The logging lines have been commented out until we decide
418 # on which log files we're using.
424 my $jid = $self->jid;
425 my $host = $self->host;
426 my $port = $self->port;
427 my $username = $self->username;
428 my $resource = $self->resource;
429 my $password = $self->password;
431 my $stream = <<" XML";
432 <stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
436 <iq id='123' type='set'>
437 <query xmlns='jabber:iq:auth'>
438 <username>$username</username>
439 <password>$password</password>
440 <resource>$resource</resource>
446 # --- 5 tries to connect to the jabber server
449 $logger->transport( "$jid: Attempting to connect to server... (Try # $_)", WARN );
450 $socket = IO::Socket::INET->new( PeerHost => $host,
453 last if ( $socket and $socket->connected );
457 unless ( $socket and $socket->connected ) {
458 throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
461 $logger->transport( "Logging into jabber as $jid " .
462 "from " . ref( $self ), DEBUG );
464 print $socket $stream;
469 local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
471 sysread($socket, $buffer, 4096);
472 $logger->transport( "Login buffer 1: $buffer", INTERNAL );
480 if( $socket and $socket->connected() ) {
481 $self->{_socket} = $socket;
483 throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
487 $buffer = $self->timed_read(10);
489 if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
491 if( $buffer and $buffer =~ /type=["\']result["\']/ ) {
492 $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
494 if( !$buffer ) { $buffer = " "; }
496 throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
503 my( $class, $app ) = @_;
504 $logger->transport("Constructing new Jabber connection for $app", INTERNAL );
506 $class->new( $app )->initialize() );
511 my( $self, $timeout ) = @_;
514 undef $timeout if ( $timeout == -1 );
516 unless( $self->{_socket}->connected ) {
517 OpenSRF::EX::JabberDisconnected->throw(
518 "This JabberClient instance is no longer connected to the server", ERROR );
521 my $val = $self->timed_read( $timeout );
523 $timeout = "FOREVER" unless ( defined $timeout );
525 if ( ! defined( $val ) ) {
526 OpenSRF::EX::Jabber->throw(
527 "Call to Client->timed_read( $timeout ) failed", ERROR );
530 "Call to Client->timed_read( $timeout ) returned 0 bytes of data", DEBUG );
533 "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );