minor layout change. calling notifyAll to ensure all threads are notified
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPReader.java
1 package org.opensrf.net.xmpp;
2
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
9 import java.util.Date;
10
11 import com.ctc.wstx.stax.WstxInputFactory;
12
13 /**
14  * Slim XMPP Stream reader.  This reader only understands enough XMPP
15  * to handle logins and recv messages.  It's implemented as a StAX parser.
16  * @author Bill Erickson, Georgia Public Library Systems
17  */
18 public class XMPPReader implements Runnable {
19
20     /** Queue of received messages. */
21     private Queue<XMPPMessage> msgQueue;
22     /** Incoming XMPP XML stream */
23     private InputStream inStream;
24     /** Current message body */
25     private StringBuffer msgBody;
26     /** Current message thread */
27     private StringBuffer msgThread;
28     /** Current message status */
29     private StringBuffer msgStatus;
30     /** Current message error type */
31     private StringBuffer msgErrType;
32     /** Current message sender */
33     private String msgFrom;
34     /** Current message recipient */
35     private String msgTo;
36     /** Current message error code */
37     private int msgErrCode;
38
39     /** Where this reader currently is in the document */
40     private XMLState xmlState;
41
42     /** The current connect state to the XMPP server */
43     private XMPPStreamState streamState;
44
45
46     /** Used to represent out connection state to the XMPP server */
47     public static enum XMPPStreamState {
48         DISCONNECTED,   /* not connected to the server */
49         CONNECT_SENT,   /* we've sent the initial connect message */
50         CONNECT_RECV,   /* we've received a response to our connect message */
51         AUTH_SENT,      /* we've sent an authentication request */
52         CONNECTED       /* authentication is complete */
53     };
54
55
56     /** Used to represents where we are in the XML document stream. */
57     public static enum XMLState {
58         IN_NOTHING,
59         IN_BODY,
60         IN_THREAD,
61         IN_STATUS
62     };
63
64
65     /**
66      * Creates a new reader. Initializes the message queue.
67      * Sets the stream state to disconnected, and the xml
68      * state to in_nothing.
69      * @param inStream the inbound XML stream
70      */
71     public XMPPReader(InputStream inStream) {
72         msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
73         this.inStream = inStream;
74         resetBuffers();
75         xmlState = XMLState.IN_NOTHING;
76         streamState = XMPPStreamState.DISCONNECTED;
77     }
78
79     /**
80      * Change the connect state and notify that a core 
81      * event has occurred.
82      */
83     protected void setXMPPStreamState(XMPPStreamState state) {
84         streamState = state;
85         notifyCoreEvent();
86     }
87
88     /**
89      * @return The current stream state of the reader 
90      */
91     public XMPPStreamState getXMPPStreamState() {
92         return streamState;
93     }
94
95
96     /**
97      * @return The next message in the queue, or null
98      */
99     public XMPPMessage popMessageQueue() {
100         return (XMPPMessage) msgQueue.poll();
101     }
102
103
104     /**
105      * Initializes the message buffers 
106      */
107     private void resetBuffers() {
108         msgBody = new StringBuffer();
109         msgThread = new StringBuffer();
110         msgStatus = new StringBuffer(); 
111         msgErrType = new StringBuffer();
112         msgFrom = "";
113         msgTo = "";
114     }
115
116
117     /**
118      * Notifies the waiting thread that a core event has occurred.
119      * Each reader should have exactly one dependent session thread. 
120      */
121     private synchronized void notifyCoreEvent() {
122         notifyAll();
123     }
124
125
126     /**
127      * Waits up to timeout milliseconds for a core event to occur. 
128      * Also, having a message already waiting in the queue 
129      * constitutes a core event.
130      * @param timeout The number of milliseconds to wait.  If 
131      * timeout is negative, waits potentially forever.
132      * @return The number of milliseconds in wait
133      */
134     public synchronized long waitCoreEvent(long timeout) {
135
136         if(msgQueue.peek() != null || timeout == 0) return 0;
137         long start = new Date().getTime();
138
139         try{
140             if(timeout < 0) 
141                 wait();
142             else 
143                 wait(timeout);
144         } catch(InterruptedException ie) {}
145
146         return new Date().getTime() - start;
147     }
148
149
150
151     /** Kickoff the thread */
152     public void run() {
153         read();
154     }
155
156
157     /**
158      * Parses XML data from the provided XMPP stream.
159      */
160     public void read() {
161
162         try {
163
164             //XMLInputFactory factory = XMLInputFactory.newInstance();
165             XMLInputFactory factory = new com.ctc.wstx.stax.WstxInputFactory();
166
167             /** disable as many unused features as possible to speed up the parsing */
168             factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
169             factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
170             factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
171             factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
172             factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
173
174             /** create the stream reader */
175             XMLStreamReader reader = factory.createXMLStreamReader(inStream);
176             int eventType;
177
178             while(reader.hasNext()) {
179                 /** cycle through the XML events */
180
181                 eventType = reader.next();
182
183                 switch(eventType) {
184
185                     case XMLEvent.START_ELEMENT:
186                         handleStartElement(reader);
187                         break;
188
189                     case XMLEvent.CHARACTERS:
190                         switch(xmlState) {
191                             case IN_BODY:
192                                 msgBody.append(reader.getText());
193                                 break;
194                             case IN_THREAD:
195                                 msgThread.append(reader.getText());
196                                 break;
197                             case IN_STATUS:
198                                 msgStatus.append(reader.getText());
199                                 break;
200                         }
201                         break;
202
203                     case XMLEvent.END_ELEMENT: 
204                         xmlState = XMLState.IN_NOTHING;
205                         if("message".equals(reader.getName().toString())) {
206
207                            /** build a message and add it to the message queue */
208                            XMPPMessage msg = new XMPPMessage();
209                            msg.setFrom(msgFrom);
210                            msg.setTo(msgTo);
211                            msg.setBody(msgBody.toString());
212                            msg.setThread(msgThread.toString());
213
214                            msgQueue.offer(msg);
215                            resetBuffers(); 
216                            notifyCoreEvent();
217                         }
218                         break;
219                 }
220             }
221
222         } catch(javax.xml.stream.XMLStreamException se) {
223             /* XXX log an error */
224             xmlState = XMLState.IN_NOTHING;
225             streamState = XMPPStreamState.DISCONNECTED;
226             notifyCoreEvent();
227         }
228     }
229
230
231     /**
232      * Handles the start_element event.
233      */
234     private void handleStartElement(XMLStreamReader reader) {
235
236         String name = reader.getName().toString();
237
238         if("message".equals(name)) {
239             xmlState = XMLState.IN_BODY;
240
241             /** add a special case for the opensrf "router_from" attribute */
242             String rf = reader.getAttributeValue(null, "router_from");
243             if( rf != null )
244                 msgFrom = rf;
245             else
246                 msgFrom = reader.getAttributeValue(null, "from");
247             msgTo = reader.getAttributeValue(null, "to");
248             return;
249         }
250
251         if("body".equals(name)) {
252             xmlState = XMLState.IN_BODY;
253             return;
254         }
255
256         if("thread".equals(name)) {
257             xmlState = XMLState.IN_THREAD;
258             return;
259         }
260
261         if("stream:stream".equals(name)) {
262             setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
263             return;
264         }
265
266         if("iq".equals(name)) {
267             if("result".equals(reader.getAttributeValue(null, "type")))
268                 setXMPPStreamState(XMPPStreamState.CONNECTED);
269             return;
270         }
271
272         if("status".equals(name)) {
273             xmlState = XMLState.IN_STATUS;
274             return;
275         }
276
277         if("stream:error".equals(name)) {
278             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
279             return;
280         }
281
282         if("error".equals(name)) {
283             msgErrType.append(reader.getAttributeValue(null, "type"));
284             msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
285             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
286             return;
287         }
288     }
289 }
290
291
292
293