406298a7306b50eeb96d1de5c0f2f2ee748d6aa1
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPReader.java
1 package org.opensrf.net.xmpp;
2
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
9 import java.util.Date;
10 import org.opensrf.util.Logger;
11
12 /**
13  * Slim XMPP Stream reader.  This reader only understands enough XMPP
14  * to handle logins and recv messages.  It's implemented as a StAX parser.
15  * @author Bill Erickson, Georgia Public Library Systems
16  */
17 public class XMPPReader implements Runnable {
18
19     /** Queue of received messages. */
20     private Queue<XMPPMessage> msgQueue;
21     /** Incoming XMPP XML stream */
22     private InputStream inStream;
23     /** Current message body */
24     private StringBuffer msgBody;
25     /** Current message thread */
26     private StringBuffer msgThread;
27     /** Current message status */
28     private StringBuffer msgStatus;
29     /** Current message error type */
30     private StringBuffer msgErrType;
31     /** Current message sender */
32     private String msgFrom;
33     /** Current message recipient */
34     private String msgTo;
35     /** Current message error code */
36     private int msgErrCode;
37
38     /** Where this reader currently is in the document */
39     private XMLState xmlState;
40
41     /** The current connect state to the XMPP server */
42     private XMPPStreamState streamState;
43
44
45     /** Used to represent out connection state to the XMPP server */
46     public static enum XMPPStreamState {
47         DISCONNECTED,   /* not connected to the server */
48         CONNECT_SENT,   /* we've sent the initial connect message */
49         CONNECT_RECV,   /* we've received a response to our connect message */
50         AUTH_SENT,      /* we've sent an authentication request */
51         CONNECTED       /* authentication is complete */
52     };
53
54
55     /** Used to represents where we are in the XML document stream. */
56     public static enum XMLState {
57         IN_NOTHING,
58         IN_BODY,
59         IN_THREAD,
60         IN_STATUS
61     };
62
63
64     /**
65      * Creates a new reader. Initializes the message queue.
66      * Sets the stream state to disconnected, and the xml
67      * state to in_nothing.
68      * @param inStream the inbound XML stream
69      */
70     public XMPPReader(InputStream inStream) {
71         msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
72         this.inStream = inStream;
73         resetBuffers();
74         xmlState = XMLState.IN_NOTHING;
75         streamState = XMPPStreamState.DISCONNECTED;
76     }
77
78     /**
79      * Change the connect state and notify that a core 
80      * event has occurred.
81      */
82     protected void setXMPPStreamState(XMPPStreamState state) {
83         streamState = state;
84         notifyCoreEvent();
85     }
86
87     /**
88      * @return The current stream state of the reader 
89      */
90     public XMPPStreamState getXMPPStreamState() {
91         return streamState;
92     }
93
94
95     /**
96      * @return The next message in the queue, or null
97      */
98     public XMPPMessage popMessageQueue() {
99         return (XMPPMessage) msgQueue.poll();
100     }
101
102
103     /**
104      * Initializes the message buffers 
105      */
106     private void resetBuffers() {
107         msgBody = new StringBuffer();
108         msgThread = new StringBuffer();
109         msgStatus = new StringBuffer(); 
110         msgErrType = new StringBuffer();
111         msgFrom = "";
112         msgTo = "";
113     }
114
115
116     /**
117      * Notifies the waiting thread that a core event has occurred.
118      * Each reader should have exactly one dependent session thread. 
119      */
120     private synchronized void notifyCoreEvent() {
121         notifyAll();
122     }
123
124
125     /**
126      * Waits up to timeout milliseconds for a core event to occur. 
127      * Also, having a message already waiting in the queue 
128      * constitutes a core event.
129      * @param timeout The number of milliseconds to wait.  If 
130      * timeout is negative, waits potentially forever.
131      * @return The number of milliseconds in wait
132      */
133     public synchronized long waitCoreEvent(long timeout) {
134
135         if(msgQueue.peek() != null || timeout == 0) return 0;
136         long start = new Date().getTime();
137
138         try{
139             if(timeout < 0) 
140                 wait();
141             else 
142                 wait(timeout);
143         } catch(InterruptedException ie) {}
144
145         return new Date().getTime() - start;
146     }
147
148
149
150     /** Kickoff the thread */
151     public void run() {
152         read();
153     }
154
155
156     /**
157      * Parses XML data from the provided XMPP stream.
158      */
159     public void read() {
160
161         try {
162
163             XMLInputFactory factory = XMLInputFactory.newInstance();
164
165             /** disable as many unused features as possible to speed up the parsing */
166             factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
167             factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
168             factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
169             factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
170             factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
171
172             /** create the stream reader */
173             XMLStreamReader reader = factory.createXMLStreamReader(inStream);
174             int eventType;
175
176             while(reader.hasNext()) {
177                 /** cycle through the XML events */
178
179                 eventType = reader.next();
180
181                 switch(eventType) {
182
183                     case XMLEvent.START_ELEMENT:
184                         handleStartElement(reader);
185                         break;
186
187                     case XMLEvent.CHARACTERS:
188                         switch(xmlState) {
189                             case IN_BODY:
190                                 msgBody.append(reader.getText());
191                                 break;
192                             case IN_THREAD:
193                                 msgThread.append(reader.getText());
194                                 break;
195                             case IN_STATUS:
196                                 msgStatus.append(reader.getText());
197                                 break;
198                         }
199                         break;
200
201                     case XMLEvent.END_ELEMENT: 
202                         xmlState = XMLState.IN_NOTHING;
203                         if("message".equals(reader.getName().toString())) {
204
205                            /** build a message and add it to the message queue */
206                            XMPPMessage msg = new XMPPMessage();
207                            msg.setFrom(msgFrom);
208                            msg.setTo(msgTo);
209                            msg.setBody(msgBody.toString());
210                            msg.setThread(msgThread.toString());
211
212                            Logger.internal("xmpp message from="+msgFrom+" " + msg.getBody());
213
214                            msgQueue.offer(msg);
215                            resetBuffers(); 
216                            notifyCoreEvent();
217                         }
218                         break;
219                 }
220             }
221
222         } catch(javax.xml.stream.XMLStreamException se) {
223             /* XXX log an error */
224             xmlState = XMLState.IN_NOTHING;
225             streamState = XMPPStreamState.DISCONNECTED;
226             notifyCoreEvent();
227         }
228     }
229
230
231     /**
232      * Handles the start_element event.
233      */
234     private void handleStartElement(XMLStreamReader reader) {
235
236         String name = reader.getName().toString();
237
238         if("message".equals(name)) {
239             xmlState = XMLState.IN_BODY;
240
241             /** add a special case for the opensrf "router_from" attribute */
242             String rf = reader.getAttributeValue(null, "router_from");
243             if( rf != null )
244                 msgFrom = rf;
245             else
246                 msgFrom = reader.getAttributeValue(null, "from");
247             msgTo = reader.getAttributeValue(null, "to");
248             return;
249         }
250
251         if("body".equals(name)) {
252             xmlState = XMLState.IN_BODY;
253             return;
254         }
255
256         if("thread".equals(name)) {
257             xmlState = XMLState.IN_THREAD;
258             return;
259         }
260
261         if("stream:stream".equals(name)) {
262             setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
263             return;
264         }
265
266         if("iq".equals(name)) {
267             if("result".equals(reader.getAttributeValue(null, "type")))
268                 setXMPPStreamState(XMPPStreamState.CONNECTED);
269             return;
270         }
271
272         if("status".equals(name)) {
273             xmlState = XMLState.IN_STATUS;
274             return;
275         }
276
277         if("stream:error".equals(name)) {
278             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
279             return;
280         }
281
282         if("error".equals(name)) {
283             msgErrType.append(reader.getAttributeValue(null, "type"));
284             msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
285             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
286             return;
287         }
288     }
289 }
290
291
292
293