LP#1729610: Allow queuing (for a while) during child backlog
[OpenSRF.git] / src / perl / lib / OpenSRF / Transport / SlimJabber / XMPPReader.pm
1 package OpenSRF::Transport::SlimJabber::XMPPReader;
2 use strict; use warnings;
3 use XML::Parser;
4 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
5 use Time::HiRes qw/time/;
6 use OpenSRF::Transport::SlimJabber::XMPPMessage;
7 use OpenSRF::Utils::Logger qw/$logger/;
8 use OpenSRF::EX;
9
10 # -----------------------------------------------------------
11 # Connect, disconnect, and authentication messsage templates
12 # -----------------------------------------------------------
13 use constant JABBER_CONNECT =>
14     "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
15
16 use constant JABBER_BASIC_AUTH =>
17     "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
18     "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
19
20 use constant JABBER_DISCONNECT => "</stream:stream>";
21
22
23 # -----------------------------------------------------------
24 # XMPP Stream states
25 # -----------------------------------------------------------
26 use constant DISCONNECTED   => 1;
27 use constant CONNECT_RECV   => 2;
28 use constant CONNECTED      => 3;
29
30
31 # -----------------------------------------------------------
32 # XMPP Message states
33 # -----------------------------------------------------------
34 use constant IN_NOTHING => 1;
35 use constant IN_BODY    => 2;
36 use constant IN_THREAD  => 3;
37 use constant IN_STATUS  => 4;
38
39
40 # -----------------------------------------------------------
41 # Constructor, getter/setters
42 # -----------------------------------------------------------
43 sub new {
44     my $class = shift;
45     my $socket = shift;
46
47     my $self = bless({}, $class);
48
49     $self->{queue} = [];
50     $self->{stream_state} = DISCONNECTED;
51     $self->{xml_state} = IN_NOTHING;
52     $self->socket($socket);
53
54     my $p = new XML::Parser(Handlers => {
55         Start => \&start_element,
56         End   => \&end_element,
57         Char  => \&characters,
58     });
59
60     $self->parser($p->parse_start); # create a push parser
61     $self->parser->{_parent_} = $self;
62     $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
63     return $self;
64 }
65
66 sub push_msg {
67     my($self, $msg) = @_; 
68     push(@{$self->{queue}}, $msg) if $msg;
69 }
70
71 sub next_msg {
72     my $self = shift;
73     return shift @{$self->{queue}};
74 }
75
76 sub peek_msg {
77     my $self = shift;
78     return (@{$self->{queue}} > 0);
79 }
80
81 sub parser {
82     my($self, $parser) = @_;
83     $self->{parser} = $parser if $parser;
84     return $self->{parser};
85 }
86
87 sub socket {
88     my($self, $socket) = @_;
89     $self->{socket} = $socket if $socket;
90     return $self->{socket};
91 }
92
93 sub stream_state {
94     my($self, $stream_state) = @_;
95     $self->{stream_state} = $stream_state if $stream_state;
96     return $self->{stream_state};
97 }
98
99 sub xml_state {
100     my($self, $xml_state) = @_;
101     $self->{xml_state} = $xml_state if $xml_state;
102     return $self->{xml_state};
103 }
104
105 sub message {
106     my($self, $message) = @_;
107     $self->{message} = $message if $message;
108     return $self->{message};
109 }
110
111
112 # -----------------------------------------------------------
113 # Stream and connection handling methods
114 # -----------------------------------------------------------
115
116 sub connect {
117     my($self, $domain, $username, $password, $resource) = @_;
118     
119     $self->send(sprintf(JABBER_CONNECT, $domain));
120     $self->wait(10);
121
122     unless($self->{stream_state} == CONNECT_RECV) {
123         $logger->error("No initial XMPP response from server");
124         return 0;
125     }
126
127     $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
128     $self->wait(10);
129
130     unless($self->connected) {
131         $logger->error('XMPP connect failed');
132         return 0;
133     }
134
135     return 1;
136 }
137
138 sub disconnect {
139     my $self = shift;
140     return unless $self->socket;
141     if($self->tcp_connected) {
142         $self->send(JABBER_DISCONNECT); 
143         shutdown($self->socket, 2);
144     }
145     close($self->socket);
146 }
147
148 # -----------------------------------------------------------
149 # returns true if this stream is connected to the server
150 # -----------------------------------------------------------
151 sub connected {
152     my $self = shift;
153     return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
154 }
155
156 # -----------------------------------------------------------
157 # returns true if the socket is connected
158 # -----------------------------------------------------------
159 sub tcp_connected {
160     my $self = shift;
161     return ($self->socket and $self->socket->connected);
162 }
163
164 # -----------------------------------------------------------
165 # sends pre-formated XML
166 # -----------------------------------------------------------
167 sub send {
168     my($self, $xml) = @_;
169         
170     local $SIG{'PIPE'} = sub {
171         $logger->error("Disconnected from Jabber server, exiting immediately");
172         exit(99);
173     };
174     $self->{socket}->print($xml);
175 }
176
177 # -----------------------------------------------------------
178 # Puts a file handle into blocking mode
179 # -----------------------------------------------------------
180 sub set_block {
181     my $fh = shift;
182     my  $flags = fcntl($fh, F_GETFL, 0);
183     $flags &= ~O_NONBLOCK;
184     fcntl($fh, F_SETFL, $flags);
185 }
186
187
188 # -----------------------------------------------------------
189 # Puts a file handle into non-blocking mode
190 # -----------------------------------------------------------
191 sub set_nonblock {
192     my $fh = shift;
193     my  $flags = fcntl($fh, F_GETFL, 0);
194     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
195 }
196
197
198 sub wait {
199     my($self, $timeout) = @_;
200      
201     return $self->next_msg if $self->peek_msg;
202
203     $timeout ||= 0;
204     $timeout = undef if $timeout < 0;
205     my $socket = $self->{socket};
206
207     set_block($socket);
208     
209     # build the select readset
210     my $infile = '';
211     vec($infile, $socket->fileno, 1) = 1;
212
213     my $nfound;
214     if (!OpenSRF->OSRF_APACHE_REQUEST_OBJ || $timeout <= 1.0) {
215         $nfound = select($infile, undef, undef, $timeout);
216     } else {
217         $timeout -= 1.0;
218         for (
219             my $sleep = 1.0;
220             $timeout >= 0.0;
221             do {
222                 $sleep = $timeout < 1.0 ? $timeout : 1.0;
223                 $timeout -= 1.0;
224             }
225         ) {
226             $nfound = select($infile, undef, undef, $sleep);
227             last if $nfound;
228             if (
229                 OpenSRF->OSRF_APACHE_REQUEST_OBJ &&
230                 OpenSRF->OSRF_APACHE_REQUEST_OBJ->connection->aborted
231             ) {
232                 # Should this be more severe? Die or throw error?
233                 $logger->warn("Upstream Apache client disconnected, aborting.");
234                 last;
235             };
236         }
237     }
238     return undef if !$nfound or $nfound == -1;
239
240     # now slurp the data off the socket
241     my $buf;
242     my $read_size = 1024;
243     my $nonblock = 0;
244     my $nbytes;
245     my $first_read = 1;
246
247     while($nbytes = sysread($socket, $buf, $read_size)) {
248         $self->{parser}->parse_more($buf) if $buf;
249         if($nbytes < $read_size or $self->peek_msg) {
250             set_block($socket) if $nonblock;
251             last;
252         }
253         set_nonblock($socket) unless $nonblock;
254         $nonblock = 1;
255         $first_read = 0;
256     }
257
258     if ($first_read and defined $nbytes and $nbytes == 0) {
259         # if the first read on an active socket is 0 bytes, 
260         # the socket has been disconnected from the remote end. 
261         $self->{stream_state} = DISCONNECTED;
262         $logger->error("Disconnected from Jabber server");
263         throw OpenSRF::EX::Jabber("Disconnected from Jabber server");
264     }
265
266     return $self->next_msg;
267 }
268
269 # -----------------------------------------------------------
270 # Waits up to timeout seconds for a fully-formed XMPP
271 # message to arrive.  If timeout is < 0, waits indefinitely
272 # -----------------------------------------------------------
273 sub wait_msg {
274     my($self, $timeout) = @_;
275     my $xml;
276
277     $timeout = 0 unless defined $timeout;
278
279     if($timeout < 0) {
280         while(1) {
281             return $xml if $xml = $self->wait($timeout); 
282         }
283
284     } else {
285         while($timeout >= 0) {
286             my $start = time;
287             return $xml if $xml = $self->wait($timeout); 
288             $timeout -= time - $start;
289         }
290     }
291
292     return undef;
293 }
294
295
296 # -----------------------------------------------------------
297 # SAX Handlers
298 # -----------------------------------------------------------
299
300
301 sub start_element {
302     my($parser, $name, %attrs) = @_;
303     my $self = $parser->{_parent_};
304
305     if($name eq 'message') {
306
307         my $msg = $self->{message};
308         $msg->{to} = $attrs{'to'};
309         $msg->{from} = $attrs{from};
310         $msg->{type} = $attrs{type};
311
312     } elsif($name eq 'opensrf') {
313
314         # These will be authoritative if they exist
315         my $msg = $self->{message};
316         $msg->{from} = $attrs{router_from} if $attrs{router_from};
317         $msg->{osrf_xid} = $attrs{'osrf_xid'};
318
319     } elsif($name eq 'body') {
320         $self->{xml_state} = IN_BODY;
321
322     } elsif($name eq 'thread') {
323         $self->{xml_state} = IN_THREAD;
324
325     } elsif($name eq 'stream:stream') {
326         $self->{stream_state} = CONNECT_RECV;
327
328     } elsif($name eq 'iq') {
329         if($attrs{type} and $attrs{type} eq 'result') {
330             $self->{stream_state} = CONNECTED;
331         }
332
333     } elsif($name eq 'status') {
334         $self->{xml_state } = IN_STATUS;
335
336     } elsif($name eq 'stream:error') {
337         $self->{stream_state} = DISCONNECTED;
338
339     } elsif($name eq 'error') {
340         $self->{message}->{err_type} = $attrs{'type'};
341         $self->{message}->{err_code} = $attrs{'code'};
342     }
343 }
344
345 sub characters {
346     my($parser, $chars) = @_;
347     my $self = $parser->{_parent_};
348     my $state = $self->{xml_state};
349
350     if($state == IN_BODY) {
351         $self->{message}->{body} .= $chars;
352
353     } elsif($state == IN_THREAD) {
354         $self->{message}->{thread} .= $chars;
355
356     } elsif($state == IN_STATUS) {
357         $self->{message}->{status} .= $chars;
358     }
359 }
360
361 sub end_element {
362     my($parser, $name) = @_;
363     my $self = $parser->{_parent_};
364     $self->{xml_state} = IN_NOTHING;
365
366     if($name eq 'message') {
367         $self->push_msg($self->{message});
368         $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
369
370     } elsif($name eq 'stream:stream') {
371         $self->{stream_state} = DISCONNECTED;
372     }
373 }
374
375
376 # read all the data on the jabber socket through the 
377 # parser and drop the resulting message
378 sub flush_socket {
379         my $self = shift;
380     return 0 unless $self->connected;
381
382     while (my $excess = $self->wait(0)) {
383         $logger->info("flushing data from socket... $excess");
384     }
385
386     return $self->connected;
387 }
388
389
390
391 1;
392