]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/java/org/opensrf/net/xmpp/XMPPReader.java
9c1449775c5dfcda7a2d25712ba3b347a1dca291
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPReader.java
1 package org.opensrf.net.xmpp;
2
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
9 import java.util.Date;
10
11 import com.ctc.wstx.stax.WstxInputFactory;
12
13 /**
14  * Slim XMPP Stream reader.  This reader only understands enough XMPP
15  * to handle logins and recv messages.  It's implemented as a StAX parser.
16  * @author Bill Erickson, Georgia Public Library Systems
17  */
18 public class XMPPReader implements Runnable {
19
20     /** Queue of received messages. */
21     private Queue<XMPPMessage> msgQueue;
22     /** Incoming XMPP XML stream */
23     private InputStream inStream;
24     /** Current message body */
25     private StringBuffer msgBody;
26     /** Current message thread */
27     private StringBuffer msgThread;
28     /** Current message status */
29     private StringBuffer msgStatus;
30     /** Current message error type */
31     private StringBuffer msgErrType;
32     /** Current message sender */
33     private String msgFrom;
34     /** Current message recipient */
35     private String msgTo;
36     /** Current message error code */
37     private int msgErrCode;
38
39     /** Where this reader currently is in the document */
40     private XMLState xmlState;
41
42     /** The current connect state to the XMPP server */
43     private XMPPStreamState streamState;
44
45
46     /** Used to represent out connection state to the XMPP server */
47     public static enum XMPPStreamState {
48         DISCONNECTED,   /* not connected to the server */
49         CONNECT_SENT,   /* we've sent the initial connect message */
50         CONNECT_RECV,   /* we've received a response to our connect message */
51         AUTH_SENT,      /* we've sent an authentication request */
52         CONNECTED       /* authentication is complete */
53     };
54
55
56     /** Used to represents where we are in the XML document stream. */
57     public static enum XMLState {
58         IN_NOTHING,
59         IN_BODY,
60         IN_THREAD,
61         IN_STATUS
62     };
63
64
65     /**
66      * Creates a new reader. Initializes the message queue.
67      * Sets the stream state to disconnected, and the xml
68      * state to in_nothing.
69      * @param inStream the inbound XML stream
70      */
71     public XMPPReader(InputStream inStream) {
72         msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
73         this.inStream = inStream;
74         resetBuffers();
75         xmlState = XMLState.IN_NOTHING;
76         streamState = XMPPStreamState.DISCONNECTED;
77     }
78
79     /**
80      * Change the connect state and notify that a core 
81      * event has occurred.
82      */
83     protected void setXMPPStreamState(XMPPStreamState state) {
84         streamState = state;
85         notifyCoreEvent();
86     }
87
88     /**
89      * @return The current stream state of the reader 
90      */
91     public XMPPStreamState getXMPPStreamState() {
92         return streamState;
93     }
94
95
96     /**
97      * @return The next message in the queue, or null
98      */
99     public XMPPMessage popMessageQueue() {
100         return (XMPPMessage) msgQueue.poll();
101     }
102
103
104     /**
105      * Initializes the message buffers 
106      */
107     private void resetBuffers() {
108         msgBody = new StringBuffer();
109         msgThread = new StringBuffer();
110         msgStatus = new StringBuffer(); 
111         msgErrType = new StringBuffer();
112         msgFrom = "";
113         msgTo = "";
114     }
115
116
117     /**
118      * Notifies the waiting thread that a core event has occurred.
119      * Each reader should have exactly one dependent session thread. 
120      */
121     private synchronized void notifyCoreEvent() {
122         notify();
123     }
124
125
126     /**
127      * Waits up to timeout milliseconds for a core event to occur. 
128      * Also, having a message already waiting in the queue 
129      * constitutes a core event.
130      * @param timeout The number of milliseconds to wait.  If 
131      * timeout is negative, waits potentially forever.
132      * @return The number of milliseconds in wait
133      */
134     public synchronized long waitCoreEvent(long timeout) {
135
136         if(msgQueue.peek() != null || timeout == 0) return 0;
137         long start = new Date().getTime();
138
139         try{
140             if(timeout < 0) wait();
141             else wait(timeout);
142         } catch(InterruptedException ie) {}
143
144         return new Date().getTime() - start;
145     }
146
147
148
149     /** Kickoff the thread */
150     public void run() {
151         read();
152     }
153
154
155     /**
156      * Parses XML data from the provided XMPP stream.
157      */
158     public void read() {
159
160         try {
161
162             //XMLInputFactory factory = XMLInputFactory.newInstance();
163             XMLInputFactory factory = new com.ctc.wstx.stax.WstxInputFactory();
164
165             /** disable as many unused features as possible to speed up the parsing */
166             factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
167             factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
168             factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
169             factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
170             factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
171
172             /** create the stream reader */
173             XMLStreamReader reader = factory.createXMLStreamReader(inStream);
174             int eventType;
175
176             while(reader.hasNext()) {
177                 /** cycle through the XML events */
178
179                 eventType = reader.next();
180
181                 switch(eventType) {
182
183                     case XMLEvent.START_ELEMENT:
184                         handleStartElement(reader);
185                         break;
186
187                     case XMLEvent.CHARACTERS:
188                         switch(xmlState) {
189                             case IN_BODY:
190                                 msgBody.append(reader.getText());
191                                 break;
192                             case IN_THREAD:
193                                 msgThread.append(reader.getText());
194                                 break;
195                             case IN_STATUS:
196                                 msgStatus.append(reader.getText());
197                                 break;
198                         }
199                         break;
200
201                     case XMLEvent.END_ELEMENT: 
202                         xmlState = XMLState.IN_NOTHING;
203                         if("message".equals(reader.getName().toString())) {
204
205                            /** build a message and add it to the message queue */
206                            XMPPMessage msg = new XMPPMessage();
207                            msg.setFrom(msgFrom);
208                            msg.setTo(msgTo);
209                            msg.setBody(msgBody.toString());
210                            msg.setThread(msgThread.toString());
211
212                            msgQueue.offer(msg);
213                            resetBuffers(); 
214                            notifyCoreEvent();
215                         }
216                         break;
217                 }
218             }
219
220         } catch(javax.xml.stream.XMLStreamException se) {
221             /* XXX log an error */
222             xmlState = XMLState.IN_NOTHING;
223             streamState = XMPPStreamState.DISCONNECTED;
224             notifyCoreEvent();
225         }
226     }
227
228
229     /**
230      * Handles the start_element event.
231      */
232     private void handleStartElement(XMLStreamReader reader) {
233
234         String name = reader.getName().toString();
235
236         if("message".equals(name)) {
237             xmlState = XMLState.IN_BODY;
238
239             /** add a special case for the opensrf "router_from" attribute */
240             String rf = reader.getAttributeValue(null, "router_from");
241             if( rf != null )
242                 msgFrom = rf;
243             else
244                 msgFrom = reader.getAttributeValue(null, "from");
245             msgTo = reader.getAttributeValue(null, "to");
246             return;
247         }
248
249         if("body".equals(name)) {
250             xmlState = XMLState.IN_BODY;
251             return;
252         }
253
254         if("thread".equals(name)) {
255             xmlState = XMLState.IN_THREAD;
256             return;
257         }
258
259         if("stream:stream".equals(name)) {
260             setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
261             return;
262         }
263
264         if("iq".equals(name)) {
265             if("result".equals(reader.getAttributeValue(null, "type")))
266                 setXMPPStreamState(XMPPStreamState.CONNECTED);
267             return;
268         }
269
270         if("status".equals(name)) {
271             xmlState = XMLState.IN_STATUS;
272             return;
273         }
274
275         if("stream:error".equals(name)) {
276             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
277             return;
278         }
279
280         if("error".equals(name)) {
281             msgErrType.append(reader.getAttributeValue(null, "type"));
282             msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
283             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
284             return;
285         }
286     }
287 }
288
289
290
291