]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/java/org/opensrf/net/xmpp/XMPPReader.java
added internal log level. added some crucial log lines. logging thread id
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPReader.java
1 package org.opensrf.net.xmpp;
2
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
9 import java.util.Date;
10 import org.opensrf.util.Logger;
11
12 import com.ctc.wstx.stax.WstxInputFactory;
13
14 /**
15  * Slim XMPP Stream reader.  This reader only understands enough XMPP
16  * to handle logins and recv messages.  It's implemented as a StAX parser.
17  * @author Bill Erickson, Georgia Public Library Systems
18  */
19 public class XMPPReader implements Runnable {
20
21     /** Queue of received messages. */
22     private Queue<XMPPMessage> msgQueue;
23     /** Incoming XMPP XML stream */
24     private InputStream inStream;
25     /** Current message body */
26     private StringBuffer msgBody;
27     /** Current message thread */
28     private StringBuffer msgThread;
29     /** Current message status */
30     private StringBuffer msgStatus;
31     /** Current message error type */
32     private StringBuffer msgErrType;
33     /** Current message sender */
34     private String msgFrom;
35     /** Current message recipient */
36     private String msgTo;
37     /** Current message error code */
38     private int msgErrCode;
39
40     /** Where this reader currently is in the document */
41     private XMLState xmlState;
42
43     /** The current connect state to the XMPP server */
44     private XMPPStreamState streamState;
45
46
47     /** Used to represent out connection state to the XMPP server */
48     public static enum XMPPStreamState {
49         DISCONNECTED,   /* not connected to the server */
50         CONNECT_SENT,   /* we've sent the initial connect message */
51         CONNECT_RECV,   /* we've received a response to our connect message */
52         AUTH_SENT,      /* we've sent an authentication request */
53         CONNECTED       /* authentication is complete */
54     };
55
56
57     /** Used to represents where we are in the XML document stream. */
58     public static enum XMLState {
59         IN_NOTHING,
60         IN_BODY,
61         IN_THREAD,
62         IN_STATUS
63     };
64
65
66     /**
67      * Creates a new reader. Initializes the message queue.
68      * Sets the stream state to disconnected, and the xml
69      * state to in_nothing.
70      * @param inStream the inbound XML stream
71      */
72     public XMPPReader(InputStream inStream) {
73         msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
74         this.inStream = inStream;
75         resetBuffers();
76         xmlState = XMLState.IN_NOTHING;
77         streamState = XMPPStreamState.DISCONNECTED;
78     }
79
80     /**
81      * Change the connect state and notify that a core 
82      * event has occurred.
83      */
84     protected void setXMPPStreamState(XMPPStreamState state) {
85         streamState = state;
86         notifyCoreEvent();
87     }
88
89     /**
90      * @return The current stream state of the reader 
91      */
92     public XMPPStreamState getXMPPStreamState() {
93         return streamState;
94     }
95
96
97     /**
98      * @return The next message in the queue, or null
99      */
100     public XMPPMessage popMessageQueue() {
101         return (XMPPMessage) msgQueue.poll();
102     }
103
104
105     /**
106      * Initializes the message buffers 
107      */
108     private void resetBuffers() {
109         msgBody = new StringBuffer();
110         msgThread = new StringBuffer();
111         msgStatus = new StringBuffer(); 
112         msgErrType = new StringBuffer();
113         msgFrom = "";
114         msgTo = "";
115     }
116
117
118     /**
119      * Notifies the waiting thread that a core event has occurred.
120      * Each reader should have exactly one dependent session thread. 
121      */
122     private synchronized void notifyCoreEvent() {
123         notifyAll();
124     }
125
126
127     /**
128      * Waits up to timeout milliseconds for a core event to occur. 
129      * Also, having a message already waiting in the queue 
130      * constitutes a core event.
131      * @param timeout The number of milliseconds to wait.  If 
132      * timeout is negative, waits potentially forever.
133      * @return The number of milliseconds in wait
134      */
135     public synchronized long waitCoreEvent(long timeout) {
136
137         if(msgQueue.peek() != null || timeout == 0) return 0;
138         long start = new Date().getTime();
139
140         try{
141             if(timeout < 0) 
142                 wait();
143             else 
144                 wait(timeout);
145         } catch(InterruptedException ie) {}
146
147         return new Date().getTime() - start;
148     }
149
150
151
152     /** Kickoff the thread */
153     public void run() {
154         read();
155     }
156
157
158     /**
159      * Parses XML data from the provided XMPP stream.
160      */
161     public void read() {
162
163         try {
164
165             //XMLInputFactory factory = XMLInputFactory.newInstance();
166             XMLInputFactory factory = new com.ctc.wstx.stax.WstxInputFactory();
167
168             /** disable as many unused features as possible to speed up the parsing */
169             factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
170             factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
171             factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
172             factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
173             factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
174
175             /** create the stream reader */
176             XMLStreamReader reader = factory.createXMLStreamReader(inStream);
177             int eventType;
178
179             while(reader.hasNext()) {
180                 /** cycle through the XML events */
181
182                 eventType = reader.next();
183
184                 switch(eventType) {
185
186                     case XMLEvent.START_ELEMENT:
187                         handleStartElement(reader);
188                         break;
189
190                     case XMLEvent.CHARACTERS:
191                         switch(xmlState) {
192                             case IN_BODY:
193                                 msgBody.append(reader.getText());
194                                 break;
195                             case IN_THREAD:
196                                 msgThread.append(reader.getText());
197                                 break;
198                             case IN_STATUS:
199                                 msgStatus.append(reader.getText());
200                                 break;
201                         }
202                         break;
203
204                     case XMLEvent.END_ELEMENT: 
205                         xmlState = XMLState.IN_NOTHING;
206                         if("message".equals(reader.getName().toString())) {
207
208                            /** build a message and add it to the message queue */
209                            XMPPMessage msg = new XMPPMessage();
210                            msg.setFrom(msgFrom);
211                            msg.setTo(msgTo);
212                            msg.setBody(msgBody.toString());
213                            msg.setThread(msgThread.toString());
214
215                            Logger.internal("xmpp message from="+msgFrom+" " + msg.getBody());
216
217                            msgQueue.offer(msg);
218                            resetBuffers(); 
219                            notifyCoreEvent();
220                         }
221                         break;
222                 }
223             }
224
225         } catch(javax.xml.stream.XMLStreamException se) {
226             /* XXX log an error */
227             xmlState = XMLState.IN_NOTHING;
228             streamState = XMPPStreamState.DISCONNECTED;
229             notifyCoreEvent();
230         }
231     }
232
233
234     /**
235      * Handles the start_element event.
236      */
237     private void handleStartElement(XMLStreamReader reader) {
238
239         String name = reader.getName().toString();
240
241         if("message".equals(name)) {
242             xmlState = XMLState.IN_BODY;
243
244             /** add a special case for the opensrf "router_from" attribute */
245             String rf = reader.getAttributeValue(null, "router_from");
246             if( rf != null )
247                 msgFrom = rf;
248             else
249                 msgFrom = reader.getAttributeValue(null, "from");
250             msgTo = reader.getAttributeValue(null, "to");
251             return;
252         }
253
254         if("body".equals(name)) {
255             xmlState = XMLState.IN_BODY;
256             return;
257         }
258
259         if("thread".equals(name)) {
260             xmlState = XMLState.IN_THREAD;
261             return;
262         }
263
264         if("stream:stream".equals(name)) {
265             setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
266             return;
267         }
268
269         if("iq".equals(name)) {
270             if("result".equals(reader.getAttributeValue(null, "type")))
271                 setXMPPStreamState(XMPPStreamState.CONNECTED);
272             return;
273         }
274
275         if("status".equals(name)) {
276             xmlState = XMLState.IN_STATUS;
277             return;
278         }
279
280         if("stream:error".equals(name)) {
281             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
282             return;
283         }
284
285         if("error".equals(name)) {
286             msgErrType.append(reader.getAttributeValue(null, "type"));
287             msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
288             setXMPPStreamState(XMPPStreamState.DISCONNECTED);
289             return;
290         }
291     }
292 }
293
294
295
296