1 package OpenSRF::Transport::SlimJabber::XMPPReader;
2 use strict; use warnings;
4 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
5 use Time::HiRes qw/time/;
6 use OpenSRF::Transport::SlimJabber::XMPPMessage;
7 use OpenSRF::Utils::Logger qw/$logger/;
9 # -----------------------------------------------------------
10 # Connect, disconnect, and authentication messsage templates
11 # -----------------------------------------------------------
12 use constant JABBER_CONNECT =>
13 "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
15 use constant JABBER_BASIC_AUTH =>
16 "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
17 "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
19 use constant JABBER_DISCONNECT => "</stream:stream>";
22 # -----------------------------------------------------------
24 # -----------------------------------------------------------
25 use constant DISCONNECTED => 1;
26 use constant CONNECT_RECV => 2;
27 use constant CONNECTED => 3;
30 # -----------------------------------------------------------
32 # -----------------------------------------------------------
33 use constant IN_NOTHING => 1;
34 use constant IN_BODY => 2;
35 use constant IN_THREAD => 3;
36 use constant IN_STATUS => 4;
39 # -----------------------------------------------------------
40 # Constructor, getter/setters
41 # -----------------------------------------------------------
46 my $self = bless({}, $class);
49 $self->{stream_state} = DISCONNECTED;
50 $self->{xml_state} = IN_NOTHING;
51 $self->socket($socket);
53 my $p = new XML::Parser(Handlers => {
54 Start => \&start_element,
59 $self->parser($p->parse_start); # create a push parser
60 $self->parser->{_parent_} = $self;
61 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
67 push(@{$self->{queue}}, $msg) if $msg;
72 return shift @{$self->{queue}};
77 return (@{$self->{queue}} > 0);
81 my($self, $parser) = @_;
82 $self->{parser} = $parser if $parser;
83 return $self->{parser};
87 my($self, $socket) = @_;
88 $self->{socket} = $socket if $socket;
89 return $self->{socket};
93 my($self, $stream_state) = @_;
94 $self->{stream_state} = $stream_state if $stream_state;
95 return $self->{stream_state};
99 my($self, $xml_state) = @_;
100 $self->{xml_state} = $xml_state if $xml_state;
101 return $self->{xml_state};
105 my($self, $message) = @_;
106 $self->{message} = $message if $message;
107 return $self->{message};
111 # -----------------------------------------------------------
112 # Stream and connection handling methods
113 # -----------------------------------------------------------
116 my($self, $domain, $username, $password, $resource) = @_;
118 $self->send(sprintf(JABBER_CONNECT, $domain));
121 unless($self->{stream_state} == CONNECT_RECV) {
122 $logger->error("No initial XMPP response from server");
126 $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
129 unless($self->connected) {
130 $logger->error('XMPP connect failed');
139 return unless $self->socket;
140 if($self->tcp_connected) {
141 $self->send(JABBER_DISCONNECT);
142 shutdown($self->socket, 2);
144 close($self->socket);
147 # -----------------------------------------------------------
148 # returns true if this stream is connected to the server
149 # -----------------------------------------------------------
152 return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
155 # -----------------------------------------------------------
156 # returns true if the socket is connected
157 # -----------------------------------------------------------
160 return ($self->socket and $self->socket->connected);
163 # -----------------------------------------------------------
164 # sends pre-formated XML
165 # -----------------------------------------------------------
167 my($self, $xml) = @_;
168 $self->{socket}->print($xml);
171 # -----------------------------------------------------------
172 # Puts a file handle into blocking mode
173 # -----------------------------------------------------------
176 my $flags = fcntl($fh, F_GETFL, 0);
177 $flags &= ~O_NONBLOCK;
178 fcntl($fh, F_SETFL, $flags);
182 # -----------------------------------------------------------
183 # Puts a file handle into non-blocking mode
184 # -----------------------------------------------------------
187 my $flags = fcntl($fh, F_GETFL, 0);
188 fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
193 my($self, $timeout) = @_;
195 return $self->next_msg if $self->peek_msg;
198 $timeout = undef if $timeout < 0;
199 my $socket = $self->{socket};
203 # build the select readset
205 vec($infile, $socket->fileno, 1) = 1;
207 my $nfound = select($infile, undef, undef, $timeout);
208 return undef if !$nfound or $nfound == -1;
210 # now slurp the data off the socket
212 my $read_size = 1024;
215 while(my $n = sysread($socket, $buf, $read_size)) {
216 $self->{parser}->parse_more($buf) if $buf;
217 if($n < $read_size or $self->peek_msg) {
218 set_block($socket) if $nonblock;
221 set_nonblock($socket) unless $nonblock;
225 return $self->next_msg;
228 # -----------------------------------------------------------
229 # Waits up to timeout seconds for a fully-formed XMPP
230 # message to arrive. If timeout is < 0, waits indefinitely
231 # -----------------------------------------------------------
233 my($self, $timeout) = @_;
236 $timeout = 0 unless defined $timeout;
240 return $xml if $xml = $self->wait($timeout);
244 while($timeout >= 0) {
246 return $xml if $xml = $self->wait($timeout);
247 $timeout -= time - $start;
255 # -----------------------------------------------------------
257 # -----------------------------------------------------------
261 my($parser, $name, %attrs) = @_;
262 my $self = $parser->{_parent_};
264 if($name eq 'message') {
266 my $msg = $self->{message};
267 $msg->{to} = $attrs{'to'};
268 $msg->{from} = $attrs{router_from} if $attrs{router_from};
269 $msg->{from} = $attrs{from} unless $msg->{from};
270 $msg->{osrf_xid} = $attrs{'osrf_xid'};
271 $msg->{type} = $attrs{type};
273 } elsif($name eq 'body') {
274 $self->{xml_state} = IN_BODY;
276 } elsif($name eq 'thread') {
277 $self->{xml_state} = IN_THREAD;
279 } elsif($name eq 'stream:stream') {
280 $self->{stream_state} = CONNECT_RECV;
282 } elsif($name eq 'iq') {
283 if($attrs{type} and $attrs{type} eq 'result') {
284 $self->{stream_state} = CONNECTED;
287 } elsif($name eq 'status') {
288 $self->{xml_state } = IN_STATUS;
290 } elsif($name eq 'stream:error') {
291 $self->{stream_state} = DISCONNECTED;
293 } elsif($name eq 'error') {
294 $self->{message}->{err_type} = $attrs{'type'};
295 $self->{message}->{err_code} = $attrs{'code'};
300 my($parser, $chars) = @_;
301 my $self = $parser->{_parent_};
302 my $state = $self->{xml_state};
304 if($state == IN_BODY) {
305 $self->{message}->{body} .= $chars;
307 } elsif($state == IN_THREAD) {
308 $self->{message}->{thread} .= $chars;
310 } elsif($state == IN_STATUS) {
311 $self->{message}->{status} .= $chars;
316 my($parser, $name) = @_;
317 my $self = $parser->{_parent_};
318 $self->{xml_state} = IN_NOTHING;
320 if($name eq 'message') {
321 $self->push_msg($self->{message});
322 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
324 } elsif($name eq 'stream:stream') {
325 $self->{stream_state} = DISCONNECTED;
330 # read all the data on the jabber socket through the
331 # parser and drop the resulting message
334 return 0 unless $self->connected;
336 while ($self->wait(0)) {
337 # TODO remove this log line
338 $logger->info("flushing data from socket...");
341 return $self->connected;