1 package OpenSRF::Transport::SlimJabber::XMPPReader;
2 use strict; use warnings;
4 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
5 use Time::HiRes qw/time/;
6 use OpenSRF::Transport::SlimJabber::XMPPMessage;
7 use OpenSRF::Utils::Logger qw/$logger/;
9 # -----------------------------------------------------------
10 # Connect, disconnect, and authentication messsage templates
11 # -----------------------------------------------------------
12 use constant JABBER_CONNECT =>
13 "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
15 use constant JABBER_BASIC_AUTH =>
16 "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
17 "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
19 use constant JABBER_DISCONNECT => "</stream:stream>";
22 # -----------------------------------------------------------
24 # -----------------------------------------------------------
25 use constant DISCONNECTED => 1;
26 use constant CONNECT_RECV => 2;
27 use constant CONNECTED => 3;
30 # -----------------------------------------------------------
32 # -----------------------------------------------------------
33 use constant IN_NOTHING => 1;
34 use constant IN_BODY => 2;
35 use constant IN_THREAD => 3;
36 use constant IN_STATUS => 4;
39 # -----------------------------------------------------------
40 # Constructor, getter/setters
41 # -----------------------------------------------------------
46 my $self = bless({}, $class);
49 $self->{stream_state} = DISCONNECTED;
50 $self->{xml_state} = IN_NOTHING;
51 $self->socket($socket);
53 my $p = new XML::Parser(Handlers => {
54 Start => \&start_element,
59 $self->parser($p->parse_start); # create a push parser
60 $self->parser->{_parent_} = $self;
61 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
67 push(@{$self->{queue}}, $msg) if $msg;
72 return shift @{$self->{queue}};
77 return (@{$self->{queue}} > 0);
81 my($self, $parser) = @_;
82 $self->{parser} = $parser if $parser;
83 return $self->{parser};
87 my($self, $socket) = @_;
88 $self->{socket} = $socket if $socket;
89 return $self->{socket};
93 my($self, $stream_state) = @_;
94 $self->{stream_state} = $stream_state if $stream_state;
95 return $self->{stream_state};
99 my($self, $xml_state) = @_;
100 $self->{xml_state} = $xml_state if $xml_state;
101 return $self->{xml_state};
105 my($self, $message) = @_;
106 $self->{message} = $message if $message;
107 return $self->{message};
111 # -----------------------------------------------------------
112 # Stream and connection handling methods
113 # -----------------------------------------------------------
116 my($self, $domain, $username, $password, $resource) = @_;
118 $self->send(sprintf(JABBER_CONNECT, $domain));
121 unless($self->{stream_state} == CONNECT_RECV) {
122 $logger->error("No initial XMPP response from server");
126 $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
129 unless($self->connected) {
130 $logger->error('XMPP connect failed');
139 return unless $self->socket;
140 if($self->tcp_connected) {
141 $self->send(JABBER_DISCONNECT);
142 shutdown($self->socket, 2);
144 close($self->socket);
147 # -----------------------------------------------------------
148 # returns true if this stream is connected to the server
149 # -----------------------------------------------------------
152 return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
155 # -----------------------------------------------------------
156 # returns true if the socket is connected
157 # -----------------------------------------------------------
160 return ($self->socket and $self->socket->connected);
163 # -----------------------------------------------------------
164 # sends pre-formated XML
165 # -----------------------------------------------------------
167 my($self, $xml) = @_;
168 $self->{socket}->print($xml);
171 # -----------------------------------------------------------
172 # Puts a file handle into blocking mode
173 # -----------------------------------------------------------
176 my $flags = fcntl($fh, F_GETFL, 0);
177 $flags &= ~O_NONBLOCK;
178 fcntl($fh, F_SETFL, $flags);
182 # -----------------------------------------------------------
183 # Puts a file handle into non-blocking mode
184 # -----------------------------------------------------------
187 my $flags = fcntl($fh, F_GETFL, 0);
188 fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
193 my($self, $timeout) = @_;
195 return $self->next_msg if $self->peek_msg;
198 $timeout = undef if $timeout < 0;
199 my $socket = $self->{socket};
203 # build the select readset
205 vec($infile, $socket->fileno, 1) = 1;
206 return undef unless select($infile, undef, undef, $timeout);
208 # now slurp the data off the socket
210 my $read_size = 1024;
211 while(my $n = sysread($socket, $buf, $read_size)) {
212 $self->{parser}->parse_more($buf) if $buf;
213 if($n < $read_size or $self->peek_msg) {
217 set_nonblock($socket);
220 return $self->next_msg;
223 # -----------------------------------------------------------
224 # Waits up to timeout seconds for a fully-formed XMPP
225 # message to arrive. If timeout is < 0, waits indefinitely
226 # -----------------------------------------------------------
228 my($self, $timeout) = @_;
231 $timeout = 0 unless defined $timeout;
235 return $xml if $xml = $self->wait($timeout);
239 while($timeout >= 0) {
241 return $xml if $xml = $self->wait($timeout);
242 $timeout -= time - $start;
250 # -----------------------------------------------------------
252 # -----------------------------------------------------------
256 my($parser, $name, %attrs) = @_;
257 my $self = $parser->{_parent_};
259 if($name eq 'message') {
261 my $msg = $self->{message};
262 $msg->{to} = $attrs{'to'};
263 $msg->{from} = $attrs{router_from} if $attrs{router_from};
264 $msg->{from} = $attrs{from} unless $msg->{from};
265 $msg->{osrf_xid} = $attrs{'osrf_xid'};
266 $msg->{type} = $attrs{type};
268 } elsif($name eq 'body') {
269 $self->{xml_state} = IN_BODY;
271 } elsif($name eq 'thread') {
272 $self->{xml_state} = IN_THREAD;
274 } elsif($name eq 'stream:stream') {
275 $self->{stream_state} = CONNECT_RECV;
277 } elsif($name eq 'iq') {
278 if($attrs{type} and $attrs{type} eq 'result') {
279 $self->{stream_state} = CONNECTED;
282 } elsif($name eq 'status') {
283 $self->{xml_state } = IN_STATUS;
285 } elsif($name eq 'stream:error') {
286 $self->{stream_state} = DISCONNECTED;
288 } elsif($name eq 'error') {
289 $self->{message}->{err_type} = $attrs{'type'};
290 $self->{message}->{err_code} = $attrs{'code'};
295 my($parser, $chars) = @_;
296 my $self = $parser->{_parent_};
297 my $state = $self->{xml_state};
299 if($state == IN_BODY) {
300 $self->{message}->{body} .= $chars;
302 } elsif($state == IN_THREAD) {
303 $self->{message}->{thread} .= $chars;
305 } elsif($state == IN_STATUS) {
306 $self->{message}->{status} .= $chars;
311 my($parser, $name) = @_;
312 my $self = $parser->{_parent_};
313 $self->{xml_state} = IN_NOTHING;
315 if($name eq 'message') {
316 $self->push_msg($self->{message});
317 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
319 } elsif($name eq 'stream:stream') {
320 $self->{stream_state} = DISCONNECTED;
326 my $socket = $self->socket;
327 return 0 unless $socket and $socket->connected;
329 my $flags = fcntl($socket, F_GETFL, 0);
330 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
332 while( my $n = sysread( $socket, my $buf, 8192 ) ) {
333 $logger->debug("flush_socket dropped $n bytes of data");
334 $logger->error("flush_socket dropped data on disconnected socket: $buf")
335 unless($socket->connected);
338 fcntl($socket, F_SETFL, $flags);
339 return 0 unless $socket->connected;