1 package org.opensrf.net.xmpp;
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
13 * Slim XMPP Stream reader. This reader only understands enough XMPP
14 * to handle logins and recv messages. It's implemented as a StAX parser.
15 * @author Bill Erickson, Georgia Public Library Systems
17 public class XMPPReader implements Runnable {
19 /** Queue of received messages. */
20 private Queue<XMPPMessage> msgQueue;
21 /** Incoming XMPP XML stream */
22 private InputStream inStream;
23 /** Current message body */
24 private StringBuffer msgBody;
25 /** Current message thread */
26 private StringBuffer msgThread;
27 /** Current message status */
28 private StringBuffer msgStatus;
29 /** Current message error type */
30 private StringBuffer msgErrType;
31 /** Current message sender */
32 private String msgFrom;
33 /** Current message recipient */
35 /** Current message error code */
36 private int msgErrCode;
38 /** Where this reader currently is in the document */
39 private XMLState xmlState;
41 /** The current connect state to the XMPP server */
42 private XMPPStreamState streamState;
45 /** Used to represent out connection state to the XMPP server */
46 public static enum XMPPStreamState {
47 DISCONNECTED, /* not connected to the server */
48 CONNECT_SENT, /* we've sent the initial connect message */
49 CONNECT_RECV, /* we've received a response to our connect message */
50 AUTH_SENT, /* we've sent an authentication request */
51 CONNECTED /* authentication is complete */
55 /** Used to represents where we are in the XML document stream. */
56 public static enum XMLState {
65 * Creates a new reader. Initializes the message queue.
66 * Sets the stream state to disconnected, and the xml
67 * state to in_nothing.
68 * @param inStream the inbound XML stream
70 public XMPPReader(InputStream inStream) {
71 msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
72 this.inStream = inStream;
74 xmlState = XMLState.IN_NOTHING;
75 streamState = XMPPStreamState.DISCONNECTED;
79 * Change the connect state and notify that a core
82 protected void setXMPPStreamState(XMPPStreamState state) {
88 * @return The current stream state of the reader
90 public XMPPStreamState getXMPPStreamState() {
96 * @return The next message in the queue, or null
98 public XMPPMessage popMessageQueue() {
99 return (XMPPMessage) msgQueue.poll();
104 * Initializes the message buffers
106 private void resetBuffers() {
107 msgBody = new StringBuffer();
108 msgThread = new StringBuffer();
109 msgStatus = new StringBuffer();
110 msgErrType = new StringBuffer();
117 * Notifies the waiting thread that a core event has occurred.
118 * Each reader should have exactly one dependent session thread.
120 private synchronized void notifyCoreEvent() {
126 * Waits up to timeout milliseconds for a core event to occur.
127 * Also, having a message already waiting in the queue
128 * constitutes a core event.
129 * @param timeout The number of milliseconds to wait. If
130 * timeout is negative, waits potentially forever.
131 * @return The number of milliseconds in wait
133 public synchronized long waitCoreEvent(long timeout) {
135 if(msgQueue.peek() != null || timeout == 0) return 0;
137 long start = new Date().getTime();
139 if(timeout < 0) wait();
141 } catch(InterruptedException ie) {}
143 return new Date().getTime() - start;
148 /** Kickoff the thread */
155 * Parses XML data from the provided XMPP stream.
161 XMLInputFactory factory = XMLInputFactory.newInstance();
163 /** disable as many unused features as possible to speed up the parsing */
164 factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
165 factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
166 factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
167 factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
168 factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
170 /** create the stream reader */
171 XMLStreamReader reader = factory.createXMLStreamReader(inStream);
174 while(reader.hasNext()) {
175 /** cycle through the XML events */
177 eventType = reader.next();
181 case XMLEvent.START_ELEMENT:
182 handleStartElement(reader);
185 case XMLEvent.CHARACTERS:
188 msgBody.append(reader.getText());
191 msgThread.append(reader.getText());
194 msgStatus.append(reader.getText());
199 case XMLEvent.END_ELEMENT:
200 xmlState = XMLState.IN_NOTHING;
201 if("message".equals(reader.getName().toString())) {
203 /** build a message and add it to the message queue */
204 XMPPMessage msg = new XMPPMessage();
205 msg.setFrom(msgFrom);
207 msg.setBody(msgBody.toString());
208 msg.setThread(msgThread.toString());
218 } catch(javax.xml.stream.XMLStreamException se) {
219 /* XXX log an error */
220 xmlState = XMLState.IN_NOTHING;
221 streamState = XMPPStreamState.DISCONNECTED;
228 * Handles the start_element event.
230 private void handleStartElement(XMLStreamReader reader) {
232 String name = reader.getName().toString();
234 if("message".equals(name)) {
235 xmlState = XMLState.IN_BODY;
237 /** add a special case for the opensrf "router_from" attribute */
238 String rf = reader.getAttributeValue(null, "router_from");
242 msgFrom = reader.getAttributeValue(null, "from");
243 msgTo = reader.getAttributeValue(null, "to");
247 if("body".equals(name)) {
248 xmlState = XMLState.IN_BODY;
252 if("thread".equals(name)) {
253 xmlState = XMLState.IN_THREAD;
257 if("stream:stream".equals(name)) {
258 setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
262 if("iq".equals(name)) {
263 if("result".equals(reader.getAttributeValue(null, "type")))
264 setXMPPStreamState(XMPPStreamState.CONNECTED);
268 if("status".equals(name)) {
269 xmlState = XMLState.IN_STATUS;
273 if("stream:error".equals(name)) {
274 setXMPPStreamState(XMPPStreamState.DISCONNECTED);
278 if("error".equals(name)) {
279 msgErrType.append(reader.getAttributeValue(null, "type"));
280 msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
281 setXMPPStreamState(XMPPStreamState.DISCONNECTED);