1 package org.opensrf.net.xmpp;
3 import javax.xml.stream.*;
4 import javax.xml.stream.events.* ;
5 import javax.xml.namespace.QName;
6 import java.util.Queue;
7 import java.io.InputStream;
8 import java.util.concurrent.ConcurrentLinkedQueue;
10 import org.opensrf.util.Logger;
12 import com.ctc.wstx.stax.WstxInputFactory;
15 * Slim XMPP Stream reader. This reader only understands enough XMPP
16 * to handle logins and recv messages. It's implemented as a StAX parser.
17 * @author Bill Erickson, Georgia Public Library Systems
19 public class XMPPReader implements Runnable {
21 /** Queue of received messages. */
22 private Queue<XMPPMessage> msgQueue;
23 /** Incoming XMPP XML stream */
24 private InputStream inStream;
25 /** Current message body */
26 private StringBuffer msgBody;
27 /** Current message thread */
28 private StringBuffer msgThread;
29 /** Current message status */
30 private StringBuffer msgStatus;
31 /** Current message error type */
32 private StringBuffer msgErrType;
33 /** Current message sender */
34 private String msgFrom;
35 /** Current message recipient */
37 /** Current message error code */
38 private int msgErrCode;
40 /** Where this reader currently is in the document */
41 private XMLState xmlState;
43 /** The current connect state to the XMPP server */
44 private XMPPStreamState streamState;
47 /** Used to represent out connection state to the XMPP server */
48 public static enum XMPPStreamState {
49 DISCONNECTED, /* not connected to the server */
50 CONNECT_SENT, /* we've sent the initial connect message */
51 CONNECT_RECV, /* we've received a response to our connect message */
52 AUTH_SENT, /* we've sent an authentication request */
53 CONNECTED /* authentication is complete */
57 /** Used to represents where we are in the XML document stream. */
58 public static enum XMLState {
67 * Creates a new reader. Initializes the message queue.
68 * Sets the stream state to disconnected, and the xml
69 * state to in_nothing.
70 * @param inStream the inbound XML stream
72 public XMPPReader(InputStream inStream) {
73 msgQueue = new ConcurrentLinkedQueue<XMPPMessage>();
74 this.inStream = inStream;
76 xmlState = XMLState.IN_NOTHING;
77 streamState = XMPPStreamState.DISCONNECTED;
81 * Change the connect state and notify that a core
84 protected void setXMPPStreamState(XMPPStreamState state) {
90 * @return The current stream state of the reader
92 public XMPPStreamState getXMPPStreamState() {
98 * @return The next message in the queue, or null
100 public XMPPMessage popMessageQueue() {
101 return (XMPPMessage) msgQueue.poll();
106 * Initializes the message buffers
108 private void resetBuffers() {
109 msgBody = new StringBuffer();
110 msgThread = new StringBuffer();
111 msgStatus = new StringBuffer();
112 msgErrType = new StringBuffer();
119 * Notifies the waiting thread that a core event has occurred.
120 * Each reader should have exactly one dependent session thread.
122 private synchronized void notifyCoreEvent() {
128 * Waits up to timeout milliseconds for a core event to occur.
129 * Also, having a message already waiting in the queue
130 * constitutes a core event.
131 * @param timeout The number of milliseconds to wait. If
132 * timeout is negative, waits potentially forever.
133 * @return The number of milliseconds in wait
135 public synchronized long waitCoreEvent(long timeout) {
137 if(msgQueue.peek() != null || timeout == 0) return 0;
138 long start = new Date().getTime();
145 } catch(InterruptedException ie) {}
147 return new Date().getTime() - start;
152 /** Kickoff the thread */
159 * Parses XML data from the provided XMPP stream.
165 //XMLInputFactory factory = XMLInputFactory.newInstance();
166 XMLInputFactory factory = new com.ctc.wstx.stax.WstxInputFactory();
168 /** disable as many unused features as possible to speed up the parsing */
169 factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE);
170 factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE);
171 factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);
172 factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE);
173 factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE);
175 /** create the stream reader */
176 XMLStreamReader reader = factory.createXMLStreamReader(inStream);
179 while(reader.hasNext()) {
180 /** cycle through the XML events */
182 eventType = reader.next();
186 case XMLEvent.START_ELEMENT:
187 handleStartElement(reader);
190 case XMLEvent.CHARACTERS:
193 msgBody.append(reader.getText());
196 msgThread.append(reader.getText());
199 msgStatus.append(reader.getText());
204 case XMLEvent.END_ELEMENT:
205 xmlState = XMLState.IN_NOTHING;
206 if("message".equals(reader.getName().toString())) {
208 /** build a message and add it to the message queue */
209 XMPPMessage msg = new XMPPMessage();
210 msg.setFrom(msgFrom);
212 msg.setBody(msgBody.toString());
213 msg.setThread(msgThread.toString());
215 Logger.internal("xmpp message from="+msgFrom+" " + msg.getBody());
225 } catch(javax.xml.stream.XMLStreamException se) {
226 /* XXX log an error */
227 xmlState = XMLState.IN_NOTHING;
228 streamState = XMPPStreamState.DISCONNECTED;
235 * Handles the start_element event.
237 private void handleStartElement(XMLStreamReader reader) {
239 String name = reader.getName().toString();
241 if("message".equals(name)) {
242 xmlState = XMLState.IN_BODY;
244 /** add a special case for the opensrf "router_from" attribute */
245 String rf = reader.getAttributeValue(null, "router_from");
249 msgFrom = reader.getAttributeValue(null, "from");
250 msgTo = reader.getAttributeValue(null, "to");
254 if("body".equals(name)) {
255 xmlState = XMLState.IN_BODY;
259 if("thread".equals(name)) {
260 xmlState = XMLState.IN_THREAD;
264 if("stream:stream".equals(name)) {
265 setXMPPStreamState(XMPPStreamState.CONNECT_RECV);
269 if("iq".equals(name)) {
270 if("result".equals(reader.getAttributeValue(null, "type")))
271 setXMPPStreamState(XMPPStreamState.CONNECTED);
275 if("status".equals(name)) {
276 xmlState = XMLState.IN_STATUS;
280 if("stream:error".equals(name)) {
281 setXMPPStreamState(XMPPStreamState.DISCONNECTED);
285 if("error".equals(name)) {
286 msgErrType.append(reader.getAttributeValue(null, "type"));
287 msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code"));
288 setXMPPStreamState(XMPPStreamState.DISCONNECTED);