1 package org.opensrf.net.xmpp;
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
11 import com.ctc.wstx.stax.WstxInputFactory;
14 * Slim XMPP Stream reader. This reader only understands enough XMPP
15 * to handle logins and recv messages. It's implemented as a StAX parser.
16 * @author Bill Erickson, Georgia Public Library Systems
18 public class XMPPReader implements Runnable {
20 /** Queue of received messages. */
21 private Queue<XMPPMessage> msgQueue;
22 /** Incoming XMPP XML stream */
23 private InputStream inStream;
24 /** Current message body */
25 private StringBuffer msgBody;
26 /** Current message thread */
27 private StringBuffer msgThread;
28 /** Current message status */
29 private StringBuffer msgStatus;
30 /** Current message error type */
31 private StringBuffer msgErrType;
32 /** Current message sender */
33 private String msgFrom;
34 /** Current message recipient */
36 /** Current message error code */
37 private int msgErrCode;
39 /** Where this reader currently is in the document */
40 private XMLState xmlState;
42 /** The current connect state to the XMPP server */
43 private XMPPStreamState streamState;
46 /** Used to represent out connection state to the XMPP server */
47 public static enum XMPPStreamState {
48 DISCONNECTED, /* not connected to the server */
49 CONNECT_SENT, /* we've sent the initial connect message */
50 CONNECT_RECV, /* we've received a response to our connect message */
51 AUTH_SENT, /* we've sent an authentication request */
52 CONNECTED /* authentication is complete */
56 /** Used to represents where we are in the XML document stream. */
57 public static enum XMLState {
66 * Creates a new reader. Initializes the message queue.
67 * Sets the stream state to disconnected, and the xml
68 * state to in_nothing.
69 * @param inStream the inbound XML stream
71 public XMPPReader(InputStream inStream) {
72 msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
73 this.inStream = inStream;
75 xmlState = XMLState.IN_NOTHING;
76 streamState = XMPPStreamState.DISCONNECTED;
80 * Change the connect state and notify that a core
83 protected void setXMPPStreamState(XMPPStreamState state) {
89 * @return The current stream state of the reader
91 public XMPPStreamState getXMPPStreamState() {
97 * @return The next message in the queue, or null
99 public XMPPMessage popMessageQueue() {
100 return (XMPPMessage) msgQueue.poll();
105 * Initializes the message buffers
107 private void resetBuffers() {
108 msgBody = new StringBuffer();
109 msgThread = new StringBuffer();
110 msgStatus = new StringBuffer();
111 msgErrType = new StringBuffer();
118 * Notifies the waiting thread that a core event has occurred.
119 * Each reader should have exactly one dependent session thread.
121 private synchronized void notifyCoreEvent() {
127 * Waits up to timeout milliseconds for a core event to occur.
128 * Also, having a message already waiting in the queue
129 * constitutes a core event.
130 * @param timeout The number of milliseconds to wait. If
131 * timeout is negative, waits potentially forever.
132 * @return The number of milliseconds in wait
134 public synchronized long waitCoreEvent(long timeout) {
136 if(msgQueue.peek() != null || timeout == 0) return 0;
137 long start = new Date().getTime();
140 if(timeout < 0) wait();
142 } catch(InterruptedException ie) {}
144 return new Date().getTime() - start;
149 /** Kickoff the thread */
156 * Parses XML data from the provided XMPP stream.
162 //XMLInputFactory factory = XMLInputFactory.newInstance();
163 XMLInputFactory factory = new com.ctc.wstx.stax.WstxInputFactory();
165 /** disable as many unused features as possible to speed up the parsing */
166 factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
167 factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
168 factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
169 factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
170 factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
172 /** create the stream reader */
173 XMLStreamReader reader = factory.createXMLStreamReader(inStream);
176 while(reader.hasNext()) {
177 /** cycle through the XML events */
179 eventType = reader.next();
183 case XMLEvent.START_ELEMENT:
184 handleStartElement(reader);
187 case XMLEvent.CHARACTERS:
190 msgBody.append(reader.getText());
193 msgThread.append(reader.getText());
196 msgStatus.append(reader.getText());
201 case XMLEvent.END_ELEMENT:
202 xmlState = XMLState.IN_NOTHING;
203 if("message".equals(reader.getName().toString())) {
205 /** build a message and add it to the message queue */
206 XMPPMessage msg = new XMPPMessage();
207 msg.setFrom(msgFrom);
209 msg.setBody(msgBody.toString());
210 msg.setThread(msgThread.toString());
220 } catch(javax.xml.stream.XMLStreamException se) {
221 /* XXX log an error */
222 xmlState = XMLState.IN_NOTHING;
223 streamState = XMPPStreamState.DISCONNECTED;
230 * Handles the start_element event.
232 private void handleStartElement(XMLStreamReader reader) {
234 String name = reader.getName().toString();
236 if("message".equals(name)) {
237 xmlState = XMLState.IN_BODY;
239 /** add a special case for the opensrf "router_from" attribute */
240 String rf = reader.getAttributeValue(null, "router_from");
244 msgFrom = reader.getAttributeValue(null, "from");
245 msgTo = reader.getAttributeValue(null, "to");
249 if("body".equals(name)) {
250 xmlState = XMLState.IN_BODY;
254 if("thread".equals(name)) {
255 xmlState = XMLState.IN_THREAD;
259 if("stream:stream".equals(name)) {
260 setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
264 if("iq".equals(name)) {
265 if("result".equals(reader.getAttributeValue(null, "type")))
266 setXMPPStreamState(XMPPStreamState.CONNECTED);
270 if("status".equals(name)) {
271 xmlState = XMLState.IN_STATUS;
275 if("stream:error".equals(name)) {
276 setXMPPStreamState(XMPPStreamState.DISCONNECTED);
280 if("error".equals(name)) {
281 msgErrType.append(reader.getAttributeValue(null, "type"));
282 msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
283 setXMPPStreamState(XMPPStreamState.DISCONNECTED);