1 package OpenSRF::Transport::SlimJabber::XMPPReader;
2 use strict; use warnings;
4 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
5 use Time::HiRes qw/time/;
6 use OpenSRF::Transport::SlimJabber::XMPPMessage;
7 use OpenSRF::Utils::Logger qw/$logger/;
9 # -----------------------------------------------------------
10 # Connect, disconnect, and authentication messsage templates
11 # -----------------------------------------------------------
12 use constant JABBER_CONNECT =>
13 "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
15 use constant JABBER_BASIC_AUTH =>
16 "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
17 "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
19 use constant JABBER_DISCONNECT => "</stream:stream>";
22 # -----------------------------------------------------------
24 # -----------------------------------------------------------
25 use constant DISCONNECTED => 1;
26 use constant CONNECT_RECV => 2;
27 use constant CONNECTED => 3;
30 # -----------------------------------------------------------
32 # -----------------------------------------------------------
33 use constant IN_NOTHING => 1;
34 use constant IN_BODY => 2;
35 use constant IN_THREAD => 3;
36 use constant IN_STATUS => 4;
39 # -----------------------------------------------------------
40 # Constructor, getter/setters
41 # -----------------------------------------------------------
46 my $self = bless({}, $class);
49 $self->{stream_state} = DISCONNECTED;
50 $self->{xml_state} = IN_NOTHING;
51 $self->socket($socket);
53 my $p = new XML::Parser(Handlers => {
54 Start => \&start_element,
59 $self->parser($p->parse_start); # create a push parser
60 $self->parser->{_parent_} = $self;
61 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
67 push(@{$self->{queue}}, $msg) if $msg;
72 return shift @{$self->{queue}};
77 return (@{$self->{queue}} > 0);
81 my($self, $parser) = @_;
82 $self->{parser} = $parser if $parser;
83 return $self->{parser};
87 my($self, $socket) = @_;
88 $self->{socket} = $socket if $socket;
89 return $self->{socket};
93 my($self, $stream_state) = @_;
94 $self->{stream_state} = $stream_state if $stream_state;
95 return $self->{stream_state};
99 my($self, $xml_state) = @_;
100 $self->{xml_state} = $xml_state if $xml_state;
101 return $self->{xml_state};
105 my($self, $message) = @_;
106 $self->{message} = $message if $message;
107 return $self->{message};
111 # -----------------------------------------------------------
112 # Stream and connection handling methods
113 # -----------------------------------------------------------
116 my($self, $domain, $username, $password, $resource) = @_;
118 $self->send(sprintf(JABBER_CONNECT, $domain));
121 unless($self->{stream_state} == CONNECT_RECV) {
122 $logger->error("No initial XMPP response from server");
126 $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
129 unless($self->connected) {
130 $logger->error('XMPP connect failed');
139 $self->send(JABBER_DISCONNECT);
140 shutdown($self->socket, 2);
141 close($self->socket);
144 # -----------------------------------------------------------
145 # returns true if this stream is connected to the server
146 # -----------------------------------------------------------
149 return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
152 # -----------------------------------------------------------
153 # returns true if the socket is connected
154 # -----------------------------------------------------------
157 return ($self->socket and $self->socket->connected);
160 # -----------------------------------------------------------
161 # sends pre-formated XML
162 # -----------------------------------------------------------
164 my($self, $xml) = @_;
165 $self->{socket}->print($xml);
168 # -----------------------------------------------------------
169 # Puts a file handle into blocking mode
170 # -----------------------------------------------------------
173 my $flags = fcntl($fh, F_GETFL, 0);
174 $flags &= ~O_NONBLOCK;
175 fcntl($fh, F_SETFL, $flags);
179 # -----------------------------------------------------------
180 # Puts a file handle into non-blocking mode
181 # -----------------------------------------------------------
184 my $flags = fcntl($fh, F_GETFL, 0);
185 fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
190 my($self, $timeout) = @_;
192 return $self->next_msg if $self->peek_msg;
195 $timeout = undef if $timeout < 0;
196 my $socket = $self->{socket};
200 # build the select readset
202 vec($infile, $socket->fileno, 1) = 1;
203 return undef unless select($infile, undef, undef, $timeout);
205 # now slurp the data off the socket
207 my $read_size = 1024;
208 while(my $n = sysread($socket, $buf, $read_size)) {
209 $self->{parser}->parse_more($buf) if $buf;
210 if($n < $read_size or $self->peek_msg) {
214 set_nonblock($socket);
217 return $self->next_msg;
220 # -----------------------------------------------------------
221 # Waits up to timeout seconds for a fully-formed XMPP
222 # message to arrive. If timeout is < 0, waits indefinitely
223 # -----------------------------------------------------------
225 my($self, $timeout) = @_;
228 $timeout = 0 unless defined $timeout;
232 return $xml if $xml = $self->wait($timeout);
236 while($timeout >= 0) {
238 return $xml if $xml = $self->wait($timeout);
239 $timeout -= time - $start;
247 # -----------------------------------------------------------
249 # -----------------------------------------------------------
253 my($parser, $name, %attrs) = @_;
254 my $self = $parser->{_parent_};
256 if($name eq 'message') {
258 my $msg = $self->{message};
259 $msg->{to} = $attrs{'to'};
260 $msg->{from} = $attrs{router_from} if $attrs{router_from};
261 $msg->{from} = $attrs{from} unless $msg->{from};
262 $msg->{osrf_xid} = $attrs{'osrf_xid'};
263 $msg->{type} = $attrs{type};
265 } elsif($name eq 'body') {
266 $self->{xml_state} = IN_BODY;
268 } elsif($name eq 'thread') {
269 $self->{xml_state} = IN_THREAD;
271 } elsif($name eq 'stream:stream') {
272 $self->{stream_state} = CONNECT_RECV;
274 } elsif($name eq 'iq') {
275 if($attrs{type} and $attrs{type} eq 'result') {
276 $self->{stream_state} = CONNECTED;
279 } elsif($name eq 'status') {
280 $self->{xml_state } = IN_STATUS;
282 } elsif($name eq 'stream:error') {
283 $self->{stream_state} = DISCONNECTED;
285 } elsif($name eq 'error') {
286 $self->{message}->{err_type} = $attrs{'type'};
287 $self->{message}->{err_code} = $attrs{'code'};
288 $self->{stream_state} = DISCONNECTED;
293 my($parser, $chars) = @_;
294 my $self = $parser->{_parent_};
295 my $state = $self->{xml_state};
297 if($state == IN_BODY) {
298 $self->{message}->{body} .= $chars;
300 } elsif($state == IN_THREAD) {
301 $self->{message}->{thread} .= $chars;
303 } elsif($state == IN_STATUS) {
304 $self->{message}->{status} .= $chars;
309 my($parser, $name) = @_;
310 my $self = $parser->{_parent_};
311 $self->{xml_state} = IN_NOTHING;
313 if($name eq 'message') {
314 $self->push_msg($self->{message});
315 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
317 } elsif($name eq 'stream:stream') {
318 $self->{stream_state} = DISCONNECTED;
324 my $socket = $self->socket;
325 return 0 unless $socket and $socket->connected;
327 my $flags = fcntl($socket, F_GETFL, 0);
328 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
330 while( my $n = sysread( $socket, my $buf, 8192 ) ) {
331 $logger->debug("flush_socket dropped $n bytes of data");
332 $logger->error("flush_socket dropped data on disconnected socket: $buf")
333 unless($socket->connected);
336 fcntl($socket, F_SETFL, $flags);
337 return 0 unless $socket->connected;