]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/java/org/opensrf/net/xmpp/XMPPReader.java
added java jabber layer
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPReader.java
1 package org.opensrf.net.xmpp;
2
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
9 import java.util.Date;
10
11
12 /**
13  * Slim XMPP Stream reader.  This reader only understands enough XMPP
14  * to handle logins and recv messages.
15  * @author Bill Erickson, Georgia Public Library Systems
16  */
17 public class XMPPReader implements Runnable {
18
19     /** Queue of received messages. */
20     private Queue<XMPPMessage> msgQueue;
21     /** Incoming XMPP XML stream */
22     private InputStream inStream;
23     /** Current message body */
24     private StringBuffer msgBody;
25     /** Current message thread */
26     private StringBuffer msgThread;
27     /** Current message status */
28     private StringBuffer msgStatus;
29     /** Current message error type */
30     private StringBuffer msgErrType;
31     /** Current message sender */
32     private String msgFrom;
33     /** Current message recipient */
34     private String msgTo;
35     /** Current message error code */
36     private int msgErrCode;
37
38     /** Where this reader currently is in the document */
39     private XMLState xmlState;
40
41     /** The current connect state to the XMPP server */
42     private XMPPStreamState streamState;
43
44
45     /** Used to represent out connection state to the XMPP server */
46     public static enum XMPPStreamState {
47         DISCONNECTED,   /* not connected to the server */
48         CONNECT_SENT,   /* we've sent the initial connect message */
49         CONNECT_RECV,   /* we've received a response to our connect message */
50         AUTH_SENT,      /* we've sent an authentication request */
51         CONNECTED       /* authentication is complete */
52     };
53
54
55     /** Used to represents where we are in the XML document stream. */
56     public static enum XMLState {
57         IN_NOTHING,
58         IN_BODY,
59         IN_THREAD,
60         IN_STATUS
61     };
62
63
64     /**
65      * Creates a new reader. Initializes the message queue.
66      * Sets the stream state to disconnected, and the xml
67      * state to in_nothing.
68      * @param inStream the inbound XML stream
69      */
70     public XMPPReader(InputStream inStream) {
71         msgQueue = new ConcurrentLinkedQueue();
72         this.inStream = inStream;
73         resetBuffers();
74         xmlState = XMLState.IN_NOTHING;
75         streamState = XMPPStreamState.DISCONNECTED;
76     }
77
78     /**
79      * Change the connect state and notify that a core 
80      * event has occurred.
81      */
82     protected void setXMPPStreamState(XMPPStreamState state) {
83         streamState = state;
84         notifyCoreEvent();
85     }
86
87     /**
88      * @return The current stream state of the reader 
89      */
90     public XMPPStreamState getXMPPStreamState() {
91         return streamState;
92     }
93
94
95     /**
96      * @return The next message in the queue, or null
97      */
98     public XMPPMessage popMessageQueue() {
99         return (XMPPMessage) msgQueue.poll();
100     }
101
102
103     /**
104      * Initializes the message buffers 
105      */
106     private void resetBuffers() {
107         msgBody = new StringBuffer();
108         msgThread = new StringBuffer();
109         msgStatus = new StringBuffer(); 
110         msgErrType = new StringBuffer();
111         msgFrom = "";
112         msgTo = "";
113     }
114
115
116     /**
117      * Notifies the waiting thread that a core event has occurred.
118      * Each reader should have exactly one dependent session thread. 
119      */
120     private synchronized void notifyCoreEvent() {
121         notify();
122     }
123
124
125     /**
126      * Waits up to timeout milliseconds for a core event to occur. 
127      * Also, having a message already waiting in the queue 
128      * constitutes a core event.
129      * @param timeout The number of milliseconds to wait.  If 
130      * timeout is negative, waits potentially forever.
131      * @return The number of milliseconds in wait
132      */
133     public synchronized long waitCoreEvent(int timeout) {
134
135         if(msgQueue.peek() != null || timeout == 0) return 0;
136
137         long start = new Date().getTime();
138         try{
139             if(timeout < 0) wait();
140             else wait(timeout);
141         } catch(InterruptedException ie) {}
142
143         return new Date().getTime() - start;
144     }
145
146
147     /** Thread kickoff point */
148     public void run() {
149         read();
150     }
151
152
153     /**
154      * Parses XML data from the provided XMPP stream.
155      * @param inStream The stream to parse.
156      */
157     public void read() {
158
159         try {
160             XMLInputFactory factory = XMLInputFactory.newInstance();
161
162             /** disable as many features as possible to speed up the parsing */
163             factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
164             factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
165             factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
166             factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
167             factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
168
169             /** create the stream reader */
170             XMLStreamReader reader = factory.createXMLStreamReader(inStream);
171             int eventType;
172
173             while(reader.hasNext()) {
174                 /** cycle through the XML events */
175
176                 eventType = reader.next();
177
178                 switch(eventType) {
179
180                     case XMLEvent.START_ELEMENT:
181                         handleStartElement(reader);
182                         break;
183
184                     case XMLEvent.CHARACTERS:
185                         switch(xmlState) {
186                             case IN_BODY:
187                                 msgBody.append(reader.getText());
188                                 break;
189                             case IN_THREAD:
190                                 msgThread.append(reader.getText());
191                                 break;
192                             case IN_STATUS:
193                                 msgStatus.append(reader.getText());
194                                 break;
195                         }
196                         break;
197
198                     case XMLEvent.END_ELEMENT: 
199                         xmlState = XMLState.IN_NOTHING;
200                         if("message".equals(reader.getName().toString())) {
201
202                            /** build a message and add it to the message queue */
203                            XMPPMessage msg = new XMPPMessage();
204                            msg.setFrom(msgFrom);
205                            msg.setTo(msgTo);
206                            msg.setBody(msgBody.toString());
207                            msg.setThread(msgThread.toString());
208
209                            msgQueue.offer(msg);
210                            resetBuffers(); 
211                            notifyCoreEvent();
212                         }
213                         break;
214                 }
215             }
216
217         } catch(javax.xml.stream.XMLStreamException se) {
218             /* XXX log an error, set a state, and notify */
219         }
220     }
221
222
223     /**
224      * Handles the start_element event.
225      */
226     private void handleStartElement(XMLStreamReader reader) {
227
228         String name = reader.getName().toString();
229
230         if("stream:stream".equals(name)) {
231             setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
232             return;
233         }
234
235         if("iq".equals(name)) {
236             if("result".equals(reader.getAttributeValue(null, "type")))
237                 setXMPPStreamState(XMPPStreamState.CONNECTED);
238             return;
239         }
240
241         if("message".equals(name)) {
242             xmlState = XMLState.IN_BODY;
243             /** add a special case for the opensrf "router_from" attribute */
244             String rf = reader.getAttributeValue(null, "router_from");
245             if( rf != null )
246                 msgFrom = rf;
247             else
248                 msgFrom = reader.getAttributeValue(null, "from");
249             msgTo = reader.getAttributeValue(null, "to");
250             return;
251         }
252
253         if("thread".equals(name)) {
254             xmlState = XMLState.IN_THREAD;
255             return;
256         }
257
258         if("status".equals(name)) {
259             xmlState = XMLState.IN_STATUS;
260             return;
261         }
262
263         if("stream:error".equals(name)) {
264             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
265             return;
266         }
267
268         if("error".equals(name)) {
269             msgErrType.append(reader.getAttributeValue(null, "type"));
270             msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
271             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
272             return;
273         }
274     }
275 }
276
277
278
279