fixed some logic bugs with the body parsing, added some comments
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPReader.java
1 package org.opensrf.net.xmpp;
2
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
9 import java.util.Date;
10
11
12 /**
13  * Slim XMPP Stream reader.  This reader only understands enough XMPP
14  * to handle logins and recv messages.  It's implemented as a StAX parser.
15  * @author Bill Erickson, Georgia Public Library Systems
16  */
17 public class XMPPReader implements Runnable {
18
19     /** Queue of received messages. */
20     private Queue<XMPPMessage> msgQueue;
21     /** Incoming XMPP XML stream */
22     private InputStream inStream;
23     /** Current message body */
24     private StringBuffer msgBody;
25     /** Current message thread */
26     private StringBuffer msgThread;
27     /** Current message status */
28     private StringBuffer msgStatus;
29     /** Current message error type */
30     private StringBuffer msgErrType;
31     /** Current message sender */
32     private String msgFrom;
33     /** Current message recipient */
34     private String msgTo;
35     /** Current message error code */
36     private int msgErrCode;
37
38     /** Where this reader currently is in the document */
39     private XMLState xmlState;
40
41     /** The current connect state to the XMPP server */
42     private XMPPStreamState streamState;
43
44
45     /** Used to represent out connection state to the XMPP server */
46     public static enum XMPPStreamState {
47         DISCONNECTED,   /* not connected to the server */
48         CONNECT_SENT,   /* we've sent the initial connect message */
49         CONNECT_RECV,   /* we've received a response to our connect message */
50         AUTH_SENT,      /* we've sent an authentication request */
51         CONNECTED       /* authentication is complete */
52     };
53
54
55     /** Used to represents where we are in the XML document stream. */
56     public static enum XMLState {
57         IN_NOTHING,
58         IN_BODY,
59         IN_THREAD,
60         IN_STATUS
61     };
62
63
64     /**
65      * Creates a new reader. Initializes the message queue.
66      * Sets the stream state to disconnected, and the xml
67      * state to in_nothing.
68      * @param inStream the inbound XML stream
69      */
70     public XMPPReader(InputStream inStream) {
71         msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
72         this.inStream = inStream;
73         resetBuffers();
74         xmlState = XMLState.IN_NOTHING;
75         streamState = XMPPStreamState.DISCONNECTED;
76     }
77
78     /**
79      * Change the connect state and notify that a core 
80      * event has occurred.
81      */
82     protected void setXMPPStreamState(XMPPStreamState state) {
83         streamState = state;
84         notifyCoreEvent();
85     }
86
87     /**
88      * @return The current stream state of the reader 
89      */
90     public XMPPStreamState getXMPPStreamState() {
91         return streamState;
92     }
93
94
95     /**
96      * @return The next message in the queue, or null
97      */
98     public XMPPMessage popMessageQueue() {
99         return (XMPPMessage) msgQueue.poll();
100     }
101
102
103     /**
104      * Initializes the message buffers 
105      */
106     private void resetBuffers() {
107         msgBody = new StringBuffer();
108         msgThread = new StringBuffer();
109         msgStatus = new StringBuffer(); 
110         msgErrType = new StringBuffer();
111         msgFrom = "";
112         msgTo = "";
113     }
114
115
116     /**
117      * Notifies the waiting thread that a core event has occurred.
118      * Each reader should have exactly one dependent session thread. 
119      */
120     private synchronized void notifyCoreEvent() {
121         notify();
122     }
123
124
125     /**
126      * Waits up to timeout milliseconds for a core event to occur. 
127      * Also, having a message already waiting in the queue 
128      * constitutes a core event.
129      * @param timeout The number of milliseconds to wait.  If 
130      * timeout is negative, waits potentially forever.
131      * @return The number of milliseconds in wait
132      */
133     public synchronized long waitCoreEvent(int timeout) {
134
135         if(msgQueue.peek() != null || timeout == 0) return 0;
136
137         long start = new Date().getTime();
138         try{
139             if(timeout < 0) wait();
140             else wait(timeout);
141         } catch(InterruptedException ie) {}
142
143         return new Date().getTime() - start;
144     }
145
146
147
148     /** Kickoff the thread */
149     public void run() {
150         read();
151     }
152
153
154     /**
155      * Parses XML data from the provided XMPP stream.
156      * @param inStream The stream to parse.
157      */
158     public void read() {
159
160         try {
161
162             XMLInputFactory factory = XMLInputFactory.newInstance();
163
164             /** disable as many unused features as possible to speed up the parsing */
165             factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
166             factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
167             factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
168             factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
169             factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
170
171             /** create the stream reader */
172             XMLStreamReader reader = factory.createXMLStreamReader(inStream);
173             int eventType;
174
175             while(reader.hasNext()) {
176                 /** cycle through the XML events */
177
178                 eventType = reader.next();
179
180                 switch(eventType) {
181
182                     case XMLEvent.START_ELEMENT:
183                         handleStartElement(reader);
184                         break;
185
186                     case XMLEvent.CHARACTERS:
187                         switch(xmlState) {
188                             case IN_BODY:
189                                 msgBody.append(reader.getText());
190                                 break;
191                             case IN_THREAD:
192                                 msgThread.append(reader.getText());
193                                 break;
194                             case IN_STATUS:
195                                 msgStatus.append(reader.getText());
196                                 break;
197                         }
198                         break;
199
200                     case XMLEvent.END_ELEMENT: 
201                         xmlState = XMLState.IN_NOTHING;
202                         if("message".equals(reader.getName().toString())) {
203
204                            /** build a message and add it to the message queue */
205                            XMPPMessage msg = new XMPPMessage();
206                            msg.setFrom(msgFrom);
207                            msg.setTo(msgTo);
208                            msg.setBody(msgBody.toString());
209                            msg.setThread(msgThread.toString());
210
211                            msgQueue.offer(msg);
212                            resetBuffers(); 
213                            notifyCoreEvent();
214                         }
215                         break;
216                 }
217             }
218
219         } catch(javax.xml.stream.XMLStreamException se) {
220             /* XXX log an error, set a state, and notify */
221         }
222     }
223
224
225     /**
226      * Handles the start_element event.
227      */
228     private void handleStartElement(XMLStreamReader reader) {
229
230         String name = reader.getName().toString();
231
232         if("message".equals(name)) {
233             xmlState = XMLState.IN_BODY;
234
235             /** add a special case for the opensrf "router_from" attribute */
236             String rf = reader.getAttributeValue(null, "router_from");
237             if( rf != null )
238                 msgFrom = rf;
239             else
240                 msgFrom = reader.getAttributeValue(null, "from");
241             msgTo = reader.getAttributeValue(null, "to");
242             return;
243         }
244
245         if("body".equals(name)) {
246             xmlState = XMLState.IN_BODY;
247             return;
248         }
249
250         if("thread".equals(name)) {
251             xmlState = XMLState.IN_THREAD;
252             return;
253         }
254
255         if("stream:stream".equals(name)) {
256             setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
257             return;
258         }
259
260         if("iq".equals(name)) {
261             if("result".equals(reader.getAttributeValue(null, "type")))
262                 setXMPPStreamState(XMPPStreamState.CONNECTED);
263             return;
264         }
265
266         if("status".equals(name)) {
267             xmlState = XMLState.IN_STATUS;
268             return;
269         }
270
271         if("stream:error".equals(name)) {
272             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
273             return;
274         }
275
276         if("error".equals(name)) {
277             msgErrType.append(reader.getAttributeValue(null, "type"));
278             msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
279             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
280             return;
281         }
282     }
283 }
284
285
286
287