]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/java/org/opensrf/net/xmpp/XMPPReader.java
fixed some doc strings. added sample makefile
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPReader.java
1 package org.opensrf.net.xmpp;
2
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
9 import java.util.Date;
10
11
12 /**
13  * Slim XMPP Stream reader.  This reader only understands enough XMPP
14  * to handle logins and recv messages.  It's implemented as a StAX parser.
15  * @author Bill Erickson, Georgia Public Library Systems
16  */
17 public class XMPPReader implements Runnable {
18
19     /** Queue of received messages. */
20     private Queue<XMPPMessage> msgQueue;
21     /** Incoming XMPP XML stream */
22     private InputStream inStream;
23     /** Current message body */
24     private StringBuffer msgBody;
25     /** Current message thread */
26     private StringBuffer msgThread;
27     /** Current message status */
28     private StringBuffer msgStatus;
29     /** Current message error type */
30     private StringBuffer msgErrType;
31     /** Current message sender */
32     private String msgFrom;
33     /** Current message recipient */
34     private String msgTo;
35     /** Current message error code */
36     private int msgErrCode;
37
38     /** Where this reader currently is in the document */
39     private XMLState xmlState;
40
41     /** The current connect state to the XMPP server */
42     private XMPPStreamState streamState;
43
44
45     /** Used to represent out connection state to the XMPP server */
46     public static enum XMPPStreamState {
47         DISCONNECTED,   /* not connected to the server */
48         CONNECT_SENT,   /* we've sent the initial connect message */
49         CONNECT_RECV,   /* we've received a response to our connect message */
50         AUTH_SENT,      /* we've sent an authentication request */
51         CONNECTED       /* authentication is complete */
52     };
53
54
55     /** Used to represents where we are in the XML document stream. */
56     public static enum XMLState {
57         IN_NOTHING,
58         IN_BODY,
59         IN_THREAD,
60         IN_STATUS
61     };
62
63
64     /**
65      * Creates a new reader. Initializes the message queue.
66      * Sets the stream state to disconnected, and the xml
67      * state to in_nothing.
68      * @param inStream the inbound XML stream
69      */
70     public XMPPReader(InputStream inStream) {
71         msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
72         this.inStream = inStream;
73         resetBuffers();
74         xmlState = XMLState.IN_NOTHING;
75         streamState = XMPPStreamState.DISCONNECTED;
76     }
77
78     /**
79      * Change the connect state and notify that a core 
80      * event has occurred.
81      */
82     protected void setXMPPStreamState(XMPPStreamState state) {
83         streamState = state;
84         notifyCoreEvent();
85     }
86
87     /**
88      * @return The current stream state of the reader 
89      */
90     public XMPPStreamState getXMPPStreamState() {
91         return streamState;
92     }
93
94
95     /**
96      * @return The next message in the queue, or null
97      */
98     public XMPPMessage popMessageQueue() {
99         return (XMPPMessage) msgQueue.poll();
100     }
101
102
103     /**
104      * Initializes the message buffers 
105      */
106     private void resetBuffers() {
107         msgBody = new StringBuffer();
108         msgThread = new StringBuffer();
109         msgStatus = new StringBuffer(); 
110         msgErrType = new StringBuffer();
111         msgFrom = "";
112         msgTo = "";
113     }
114
115
116     /**
117      * Notifies the waiting thread that a core event has occurred.
118      * Each reader should have exactly one dependent session thread. 
119      */
120     private synchronized void notifyCoreEvent() {
121         notify();
122     }
123
124
125     /**
126      * Waits up to timeout milliseconds for a core event to occur. 
127      * Also, having a message already waiting in the queue 
128      * constitutes a core event.
129      * @param timeout The number of milliseconds to wait.  If 
130      * timeout is negative, waits potentially forever.
131      * @return The number of milliseconds in wait
132      */
133     public synchronized long waitCoreEvent(long timeout) {
134
135         if(msgQueue.peek() != null || timeout == 0) return 0;
136
137         long start = new Date().getTime();
138         try{
139             if(timeout < 0) wait();
140             else wait(timeout);
141         } catch(InterruptedException ie) {}
142
143         return new Date().getTime() - start;
144     }
145
146
147
148     /** Kickoff the thread */
149     public void run() {
150         read();
151     }
152
153
154     /**
155      * Parses XML data from the provided XMPP stream.
156      */
157     public void read() {
158
159         try {
160
161             XMLInputFactory factory = XMLInputFactory.newInstance();
162
163             /** disable as many unused features as possible to speed up the parsing */
164             factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
165             factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
166             factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
167             factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
168             factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
169
170             /** create the stream reader */
171             XMLStreamReader reader = factory.createXMLStreamReader(inStream);
172             int eventType;
173
174             while(reader.hasNext()) {
175                 /** cycle through the XML events */
176
177                 eventType = reader.next();
178
179                 switch(eventType) {
180
181                     case XMLEvent.START_ELEMENT:
182                         handleStartElement(reader);
183                         break;
184
185                     case XMLEvent.CHARACTERS:
186                         switch(xmlState) {
187                             case IN_BODY:
188                                 msgBody.append(reader.getText());
189                                 break;
190                             case IN_THREAD:
191                                 msgThread.append(reader.getText());
192                                 break;
193                             case IN_STATUS:
194                                 msgStatus.append(reader.getText());
195                                 break;
196                         }
197                         break;
198
199                     case XMLEvent.END_ELEMENT: 
200                         xmlState = XMLState.IN_NOTHING;
201                         if("message".equals(reader.getName().toString())) {
202
203                            /** build a message and add it to the message queue */
204                            XMPPMessage msg = new XMPPMessage();
205                            msg.setFrom(msgFrom);
206                            msg.setTo(msgTo);
207                            msg.setBody(msgBody.toString());
208                            msg.setThread(msgThread.toString());
209
210                            msgQueue.offer(msg);
211                            resetBuffers(); 
212                            notifyCoreEvent();
213                         }
214                         break;
215                 }
216             }
217
218         } catch(javax.xml.stream.XMLStreamException se) {
219             /* XXX log an error */
220             xmlState = XMLState.IN_NOTHING;
221             streamState = XMPPStreamState.DISCONNECTED;
222             notifyCoreEvent();
223         }
224     }
225
226
227     /**
228      * Handles the start_element event.
229      */
230     private void handleStartElement(XMLStreamReader reader) {
231
232         String name = reader.getName().toString();
233
234         if("message".equals(name)) {
235             xmlState = XMLState.IN_BODY;
236
237             /** add a special case for the opensrf "router_from" attribute */
238             String rf = reader.getAttributeValue(null, "router_from");
239             if( rf != null )
240                 msgFrom = rf;
241             else
242                 msgFrom = reader.getAttributeValue(null, "from");
243             msgTo = reader.getAttributeValue(null, "to");
244             return;
245         }
246
247         if("body".equals(name)) {
248             xmlState = XMLState.IN_BODY;
249             return;
250         }
251
252         if("thread".equals(name)) {
253             xmlState = XMLState.IN_THREAD;
254             return;
255         }
256
257         if("stream:stream".equals(name)) {
258             setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
259             return;
260         }
261
262         if("iq".equals(name)) {
263             if("result".equals(reader.getAttributeValue(null, "type")))
264                 setXMPPStreamState(XMPPStreamState.CONNECTED);
265             return;
266         }
267
268         if("status".equals(name)) {
269             xmlState = XMLState.IN_STATUS;
270             return;
271         }
272
273         if("stream:error".equals(name)) {
274             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
275             return;
276         }
277
278         if("error".equals(name)) {
279             msgErrType.append(reader.getAttributeValue(null, "type"));
280             msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
281             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
282             return;
283         }
284     }
285 }
286
287
288
289