]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm
don't try to close the socket if it's not there
[OpenSRF.git] / src / perl / lib / OpenSRF / Transport / SlimJabber / XMPPReader.pm
1 package OpenSRF::Transport::SlimJabber::XMPPReader;
2 use strict; use warnings;
3 use XML::Parser;
4 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
5 use Time::HiRes qw/time/;
6 use OpenSRF::Transport::SlimJabber::XMPPMessage;
7 use OpenSRF::Utils::Logger qw/$logger/;
8
9 # -----------------------------------------------------------
10 # Connect, disconnect, and authentication messsage templates
11 # -----------------------------------------------------------
12 use constant JABBER_CONNECT =>
13     "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
14
15 use constant JABBER_BASIC_AUTH =>
16     "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
17     "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
18
19 use constant JABBER_DISCONNECT => "</stream:stream>";
20
21
22 # -----------------------------------------------------------
23 # XMPP Stream states
24 # -----------------------------------------------------------
25 use constant DISCONNECTED   => 1;
26 use constant CONNECT_RECV   => 2;
27 use constant CONNECTED      => 3;
28
29
30 # -----------------------------------------------------------
31 # XMPP Message states
32 # -----------------------------------------------------------
33 use constant IN_NOTHING => 1;
34 use constant IN_BODY    => 2;
35 use constant IN_THREAD  => 3;
36 use constant IN_STATUS  => 4;
37
38
39 # -----------------------------------------------------------
40 # Constructor, getter/setters
41 # -----------------------------------------------------------
42 sub new {
43     my $class = shift;
44     my $socket = shift;
45
46     my $self = bless({}, $class);
47
48     $self->{queue} = [];
49     $self->{stream_state} = DISCONNECTED;
50     $self->{xml_state} = IN_NOTHING;
51     $self->socket($socket);
52
53     my $p = new XML::Parser(Handlers => {
54         Start => \&start_element,
55         End   => \&end_element,
56         Char  => \&characters,
57     });
58
59     $self->parser($p->parse_start); # create a push parser
60     $self->parser->{_parent_} = $self;
61     $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
62     return $self;
63 }
64
65 sub push_msg {
66     my($self, $msg) = @_; 
67     push(@{$self->{queue}}, $msg) if $msg;
68 }
69
70 sub next_msg {
71     my $self = shift;
72     return shift @{$self->{queue}};
73 }
74
75 sub peek_msg {
76     my $self = shift;
77     return (@{$self->{queue}} > 0);
78 }
79
80 sub parser {
81     my($self, $parser) = @_;
82     $self->{parser} = $parser if $parser;
83     return $self->{parser};
84 }
85
86 sub socket {
87     my($self, $socket) = @_;
88     $self->{socket} = $socket if $socket;
89     return $self->{socket};
90 }
91
92 sub stream_state {
93     my($self, $stream_state) = @_;
94     $self->{stream_state} = $stream_state if $stream_state;
95     return $self->{stream_state};
96 }
97
98 sub xml_state {
99     my($self, $xml_state) = @_;
100     $self->{xml_state} = $xml_state if $xml_state;
101     return $self->{xml_state};
102 }
103
104 sub message {
105     my($self, $message) = @_;
106     $self->{message} = $message if $message;
107     return $self->{message};
108 }
109
110
111 # -----------------------------------------------------------
112 # Stream and connection handling methods
113 # -----------------------------------------------------------
114
115 sub connect {
116     my($self, $domain, $username, $password, $resource) = @_;
117     
118     $self->send(sprintf(JABBER_CONNECT, $domain));
119     $self->wait(10);
120
121     unless($self->{stream_state} == CONNECT_RECV) {
122         $logger->error("No initial XMPP response from server");
123         return 0;
124     }
125
126     $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
127     $self->wait(10);
128
129     unless($self->connected) {
130         $logger->error('XMPP connect failed');
131         return 0;
132     }
133
134     return 1;
135 }
136
137 sub disconnect {
138     my $self = shift;
139     return unless $self->socket;
140     if($self->tcp_connected) {
141         $self->send(JABBER_DISCONNECT); 
142         shutdown($self->socket, 2);
143     }
144     close($self->socket);
145 }
146
147 # -----------------------------------------------------------
148 # returns true if this stream is connected to the server
149 # -----------------------------------------------------------
150 sub connected {
151     my $self = shift;
152     return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
153 }
154
155 # -----------------------------------------------------------
156 # returns true if the socket is connected
157 # -----------------------------------------------------------
158 sub tcp_connected {
159     my $self = shift;
160     return ($self->socket and $self->socket->connected);
161 }
162
163 # -----------------------------------------------------------
164 # sends pre-formated XML
165 # -----------------------------------------------------------
166 sub send {
167     my($self, $xml) = @_;
168     $self->{socket}->print($xml);
169 }
170
171 # -----------------------------------------------------------
172 # Puts a file handle into blocking mode
173 # -----------------------------------------------------------
174 sub set_block {
175     my $fh = shift;
176     my  $flags = fcntl($fh, F_GETFL, 0);
177     $flags &= ~O_NONBLOCK;
178     fcntl($fh, F_SETFL, $flags);
179 }
180
181
182 # -----------------------------------------------------------
183 # Puts a file handle into non-blocking mode
184 # -----------------------------------------------------------
185 sub set_nonblock {
186     my $fh = shift;
187     my  $flags = fcntl($fh, F_GETFL, 0);
188     fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
189 }
190
191
192 sub wait {
193     my($self, $timeout) = @_;
194      
195     return $self->next_msg if $self->peek_msg;
196
197     $timeout ||= 0;
198     $timeout = undef if $timeout < 0;
199     my $socket = $self->{socket};
200
201     set_block($socket);
202     
203     # build the select readset
204     my $infile = '';
205     vec($infile, $socket->fileno, 1) = 1;
206     return undef unless select($infile, undef, undef, $timeout);
207
208     # now slurp the data off the socket
209     my $buf;
210     my $read_size = 1024;
211     while(my $n = sysread($socket, $buf, $read_size)) {
212         $self->{parser}->parse_more($buf) if $buf;
213         if($n < $read_size or $self->peek_msg) {
214             set_block($socket);
215             last;
216         }
217         set_nonblock($socket);
218     }
219
220     return $self->next_msg;
221 }
222
223 # -----------------------------------------------------------
224 # Waits up to timeout seconds for a fully-formed XMPP
225 # message to arrive.  If timeout is < 0, waits indefinitely
226 # -----------------------------------------------------------
227 sub wait_msg {
228     my($self, $timeout) = @_;
229     my $xml;
230
231     $timeout = 0 unless defined $timeout;
232
233     if($timeout < 0) {
234         while(1) {
235             return $xml if $xml = $self->wait($timeout); 
236         }
237
238     } else {
239         while($timeout >= 0) {
240             my $start = time;
241             return $xml if $xml = $self->wait($timeout); 
242             $timeout -= time - $start;
243         }
244     }
245
246     return undef;
247 }
248
249
250 # -----------------------------------------------------------
251 # SAX Handlers
252 # -----------------------------------------------------------
253
254
255 sub start_element {
256     my($parser, $name, %attrs) = @_;
257     my $self = $parser->{_parent_};
258
259     if($name eq 'message') {
260
261         my $msg = $self->{message};
262         $msg->{to} = $attrs{'to'};
263         $msg->{from} = $attrs{router_from} if $attrs{router_from};
264         $msg->{from} = $attrs{from} unless $msg->{from};
265         $msg->{osrf_xid} = $attrs{'osrf_xid'};
266         $msg->{type} = $attrs{type};
267
268     } elsif($name eq 'body') {
269         $self->{xml_state} = IN_BODY;
270
271     } elsif($name eq 'thread') {
272         $self->{xml_state} = IN_THREAD;
273
274     } elsif($name eq 'stream:stream') {
275         $self->{stream_state} = CONNECT_RECV;
276
277     } elsif($name eq 'iq') {
278         if($attrs{type} and $attrs{type} eq 'result') {
279             $self->{stream_state} = CONNECTED;
280         }
281
282     } elsif($name eq 'status') {
283         $self->{xml_state } = IN_STATUS;
284
285     } elsif($name eq 'stream:error') {
286         $self->{stream_state} = DISCONNECTED;
287
288     } elsif($name eq 'error') {
289         $self->{message}->{err_type} = $attrs{'type'};
290         $self->{message}->{err_code} = $attrs{'code'};
291         $self->{stream_state} = DISCONNECTED;
292     }
293 }
294
295 sub characters {
296     my($parser, $chars) = @_;
297     my $self = $parser->{_parent_};
298     my $state = $self->{xml_state};
299
300     if($state == IN_BODY) {
301         $self->{message}->{body} .= $chars;
302
303     } elsif($state == IN_THREAD) {
304         $self->{message}->{thread} .= $chars;
305
306     } elsif($state == IN_STATUS) {
307         $self->{message}->{status} .= $chars;
308     }
309 }
310
311 sub end_element {
312     my($parser, $name) = @_;
313     my $self = $parser->{_parent_};
314     $self->{xml_state} = IN_NOTHING;
315
316     if($name eq 'message') {
317         $self->push_msg($self->{message});
318         $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
319
320     } elsif($name eq 'stream:stream') {
321         $self->{stream_state} = DISCONNECTED;
322     }
323 }
324
325 sub flush_socket {
326         my $self = shift;
327         my $socket = $self->socket;
328     return 0 unless $socket and $socket->connected;
329
330     my $flags = fcntl($socket, F_GETFL, 0);
331     fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
332
333     while( my $n = sysread( $socket, my $buf, 8192 ) ) {
334         $logger->debug("flush_socket dropped $n bytes of data");
335         $logger->error("flush_socket dropped data on disconnected socket: $buf")
336             unless($socket->connected);
337     }
338
339     fcntl($socket, F_SETFL, $flags);
340     return 0 unless $socket->connected;
341     return 1;
342 }
343
344
345
346
347
348 1;
349
350
351
352
353