1 package OpenSRF::Transport::SlimJabber::XMPPReader;
2 use strict; use warnings;
4 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
5 use Time::HiRes qw/time/;
6 use OpenSRF::Transport::SlimJabber::XMPPMessage;
7 use OpenSRF::Utils::Logger qw/$logger/;
10 # -----------------------------------------------------------
11 # Connect, disconnect, and authentication messsage templates
12 # -----------------------------------------------------------
13 use constant JABBER_CONNECT =>
14 "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
16 use constant JABBER_BASIC_AUTH =>
17 "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
18 "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
20 use constant JABBER_DISCONNECT => "</stream:stream>";
23 # -----------------------------------------------------------
25 # -----------------------------------------------------------
26 use constant DISCONNECTED => 1;
27 use constant CONNECT_RECV => 2;
28 use constant CONNECTED => 3;
31 # -----------------------------------------------------------
33 # -----------------------------------------------------------
34 use constant IN_NOTHING => 1;
35 use constant IN_BODY => 2;
36 use constant IN_THREAD => 3;
37 use constant IN_STATUS => 4;
40 # -----------------------------------------------------------
41 # Constructor, getter/setters
42 # -----------------------------------------------------------
47 my $self = bless({}, $class);
50 $self->{stream_state} = DISCONNECTED;
51 $self->{xml_state} = IN_NOTHING;
52 $self->socket($socket);
54 my $p = new XML::Parser(Handlers => {
55 Start => \&start_element,
60 $self->parser($p->parse_start); # create a push parser
61 $self->parser->{_parent_} = $self;
62 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
68 push(@{$self->{queue}}, $msg) if $msg;
73 return shift @{$self->{queue}};
78 return (@{$self->{queue}} > 0);
82 my($self, $parser) = @_;
83 $self->{parser} = $parser if $parser;
84 return $self->{parser};
88 my($self, $socket) = @_;
89 $self->{socket} = $socket if $socket;
90 return $self->{socket};
94 my($self, $stream_state) = @_;
95 $self->{stream_state} = $stream_state if $stream_state;
96 return $self->{stream_state};
100 my($self, $xml_state) = @_;
101 $self->{xml_state} = $xml_state if $xml_state;
102 return $self->{xml_state};
106 my($self, $message) = @_;
107 $self->{message} = $message if $message;
108 return $self->{message};
112 # -----------------------------------------------------------
113 # Stream and connection handling methods
114 # -----------------------------------------------------------
117 my($self, $domain, $username, $password, $resource) = @_;
119 $self->send(sprintf(JABBER_CONNECT, $domain));
122 unless($self->{stream_state} == CONNECT_RECV) {
123 $logger->error("No initial XMPP response from server");
127 $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
130 unless($self->connected) {
131 $logger->error('XMPP connect failed');
140 return unless $self->socket;
141 if($self->tcp_connected) {
142 $self->send(JABBER_DISCONNECT);
143 shutdown($self->socket, 2);
145 close($self->socket);
148 # -----------------------------------------------------------
149 # returns true if this stream is connected to the server
150 # -----------------------------------------------------------
153 return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
156 # -----------------------------------------------------------
157 # returns true if the socket is connected
158 # -----------------------------------------------------------
161 return ($self->socket and $self->socket->connected);
164 # -----------------------------------------------------------
165 # sends pre-formated XML
166 # -----------------------------------------------------------
168 my($self, $xml) = @_;
170 local $SIG{'PIPE'} = sub {
171 $logger->error("Disconnected from Jabber server, exiting immediately");
174 $self->{socket}->print($xml);
177 # -----------------------------------------------------------
178 # Puts a file handle into blocking mode
179 # -----------------------------------------------------------
182 my $flags = fcntl($fh, F_GETFL, 0);
183 $flags &= ~O_NONBLOCK;
184 fcntl($fh, F_SETFL, $flags);
188 # -----------------------------------------------------------
189 # Puts a file handle into non-blocking mode
190 # -----------------------------------------------------------
193 my $flags = fcntl($fh, F_GETFL, 0);
194 fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
199 my($self, $timeout) = @_;
201 return $self->next_msg if $self->peek_msg;
204 $timeout = undef if $timeout < 0;
205 my $socket = $self->{socket};
209 # build the select readset
211 vec($infile, $socket->fileno, 1) = 1;
214 if (!OpenSRF->OSRF_APACHE_REQUEST_OBJ || $timeout <= 1.0) {
215 $nfound = select($infile, undef, undef, $timeout);
222 $sleep = $timeout < 1.0 ? $timeout : 1.0;
226 $nfound = select($infile, undef, undef, $sleep);
229 OpenSRF->OSRF_APACHE_REQUEST_OBJ &&
230 OpenSRF->OSRF_APACHE_REQUEST_OBJ->connection->aborted
232 # Should this be more severe? Die or throw error?
233 $logger->warn("Upstream Apache client disconnected, aborting.");
238 return undef if !$nfound or $nfound == -1;
240 # now slurp the data off the socket
242 my $read_size = 1024;
247 while($nbytes = sysread($socket, $buf, $read_size)) {
248 $self->{parser}->parse_more($buf) if $buf;
249 if($nbytes < $read_size or $self->peek_msg) {
250 set_block($socket) if $nonblock;
253 set_nonblock($socket) unless $nonblock;
258 if ($first_read and defined $nbytes and $nbytes == 0) {
259 # if the first read on an active socket is 0 bytes,
260 # the socket has been disconnected from the remote end.
261 $self->{stream_state} = DISCONNECTED;
262 $logger->error("Disconnected from Jabber server");
263 throw OpenSRF::EX::Jabber("Disconnected from Jabber server");
266 return $self->next_msg;
269 # -----------------------------------------------------------
270 # Waits up to timeout seconds for a fully-formed XMPP
271 # message to arrive. If timeout is < 0, waits indefinitely
272 # -----------------------------------------------------------
274 my($self, $timeout) = @_;
277 $timeout = 0 unless defined $timeout;
281 return $xml if $xml = $self->wait($timeout);
285 while($timeout >= 0) {
287 return $xml if $xml = $self->wait($timeout);
288 $timeout -= time - $start;
296 # -----------------------------------------------------------
298 # -----------------------------------------------------------
302 my($parser, $name, %attrs) = @_;
303 my $self = $parser->{_parent_};
305 if($name eq 'message') {
307 my $msg = $self->{message};
308 $msg->{to} = $attrs{'to'};
309 $msg->{from} = $attrs{from};
311 } elsif($name eq 'opensrf') {
313 # These will be authoritative if they exist
314 my $msg = $self->{message};
315 $msg->{from} = $attrs{router_from};
316 $msg->{osrf_xid} = $attrs{'osrf_xid'};
317 $msg->{type} = $attrs{type};
319 } elsif($name eq 'body') {
320 $self->{xml_state} = IN_BODY;
322 } elsif($name eq 'thread') {
323 $self->{xml_state} = IN_THREAD;
325 } elsif($name eq 'stream:stream') {
326 $self->{stream_state} = CONNECT_RECV;
328 } elsif($name eq 'iq') {
329 if($attrs{type} and $attrs{type} eq 'result') {
330 $self->{stream_state} = CONNECTED;
333 } elsif($name eq 'status') {
334 $self->{xml_state } = IN_STATUS;
336 } elsif($name eq 'stream:error') {
337 $self->{stream_state} = DISCONNECTED;
339 } elsif($name eq 'error') {
340 $self->{message}->{err_type} = $attrs{'type'};
341 $self->{message}->{err_code} = $attrs{'code'};
346 my($parser, $chars) = @_;
347 my $self = $parser->{_parent_};
348 my $state = $self->{xml_state};
350 if($state == IN_BODY) {
351 $self->{message}->{body} .= $chars;
353 } elsif($state == IN_THREAD) {
354 $self->{message}->{thread} .= $chars;
356 } elsif($state == IN_STATUS) {
357 $self->{message}->{status} .= $chars;
362 my($parser, $name) = @_;
363 my $self = $parser->{_parent_};
364 $self->{xml_state} = IN_NOTHING;
366 if($name eq 'message') {
367 $self->push_msg($self->{message});
368 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
370 } elsif($name eq 'stream:stream') {
371 $self->{stream_state} = DISCONNECTED;
376 # read all the data on the jabber socket through the
377 # parser and drop the resulting message
380 return 0 unless $self->connected;
382 while ($self->wait(0)) {
383 # TODO remove this log line
384 $logger->info("flushing data from socket...");
387 return $self->connected;