From 0b95ed2c333044d0c6f0ece5749e24efae97e847 Mon Sep 17 00:00:00 2001 From: erickson Date: Wed, 9 May 2007 17:45:42 +0000 Subject: [PATCH] added java jabber layer git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@876 9efc2488-bf62-4759-914b-345cdb29e865 --- .../org/opensrf/net/xmpp/XMPPException.java | 18 ++ .../org/opensrf/net/xmpp/XMPPMessage.java | 159 ++++++++++ src/java/org/opensrf/net/xmpp/XMPPReader.java | 279 ++++++++++++++++++ .../org/opensrf/net/xmpp/XMPPSession.java | 178 +++++++++++ src/java/org/opensrf/test/TestXMPP.java | 57 ++++ 5 files changed, 691 insertions(+) create mode 100644 src/java/org/opensrf/net/xmpp/XMPPException.java create mode 100644 src/java/org/opensrf/net/xmpp/XMPPMessage.java create mode 100644 src/java/org/opensrf/net/xmpp/XMPPReader.java create mode 100644 src/java/org/opensrf/net/xmpp/XMPPSession.java create mode 100644 src/java/org/opensrf/test/TestXMPP.java diff --git a/src/java/org/opensrf/net/xmpp/XMPPException.java b/src/java/org/opensrf/net/xmpp/XMPPException.java new file mode 100644 index 0000000..29ee0e7 --- /dev/null +++ b/src/java/org/opensrf/net/xmpp/XMPPException.java @@ -0,0 +1,18 @@ +package org.opensrf.net.xmpp; + +/** + * Used for XMPP stream/authentication errors + */ +public class XMPPException extends Exception { + private String info; + + /** + * @param info Runtime exception information. + */ + public XMPPException(String info) { + this.info = info; + } + public String toString() { + return this.info; + } +} diff --git a/src/java/org/opensrf/net/xmpp/XMPPMessage.java b/src/java/org/opensrf/net/xmpp/XMPPMessage.java new file mode 100644 index 0000000..f1f0d55 --- /dev/null +++ b/src/java/org/opensrf/net/xmpp/XMPPMessage.java @@ -0,0 +1,159 @@ +package org.opensrf.net.xmpp; + +import java.io.*; + + +/* + * uncomment to use the DOM serialization code... + +import org.w3c.dom.*; +import org.apache.xerces.dom.DocumentImpl; +import org.apache.xerces.dom.DOMImplementationImpl; +import org.apache.xml.serialize.OutputFormat; +import org.apache.xml.serialize.Serializer; +import org.apache.xml.serialize.SerializerFactory; +import org.apache.xml.serialize.XMLSerializer; +*/ + + +/** + * Models a single XMPP message. + */ +public class XMPPMessage { + + /** Message body */ + private String body; + /** Message recipient */ + private String to; + /** Message sender */ + private String from; + /** Message thread */ + private String thread; + /** Message xid */ + private String xid; + + public XMPPMessage() { + } + + public String getBody() { + return body; + } + public String getTo() { + return to; + } + public String getFrom() { + return from; + } + public String getThread() { + return thread; + } + public String getXid() { + return xid; + } + public void setBody(String body) { + this.body = body; + } + public void setTo(String to) { + this.to = to; + } + public void setFrom(String from) { + this.from = from; + } + public void setThread(String thread) { + this.thread = thread; + } + public void setXid(String xid) { + this.xid = xid; + } + + + /** + * Generates the XML representation of this message. + */ + public String toXML() { + StringBuffer sb = new StringBuffer(""); + escapeXML(thread, sb); + sb.append(""); + escapeXML(body, sb); + sb.append(""); + return sb.toString(); + } + + + /** + * Escapes non-valid XML characters. + * @param s The string to escape. + * @param sb The StringBuffer to append new data to. + */ + private void escapeXML(String s, StringBuffer sb) { + if( s == null ) return; + char c; + int l = s.length(); + for( int i = 0; i < l; i++ ) { + c = s.charAt(i); + switch(c) { + case '<': + sb.append("<"); + break; + case '>': + sb.append(">"); + break; + case '&': + sb.append("&"); + break; + default: + sb.append(c); + } + } + } + + + + /** + * This is a DOM implementataion of message serialization. + * I'm inclined to think the stringbuffer version is faster, but + * I have no proof. + */ + /* + public String __toXML() { + + Document doc = new DocumentImpl(); + Element message = doc.createElement("message"); + Element body = doc.createElement("body"); + Element thread = doc.createElement("thread"); + + doc.appendChild(message); + message.setAttribute("to", getTo()); + message.setAttribute("from", getFrom()); + message.appendChild(body); + message.appendChild(thread); + + body.appendChild(doc.createTextNode(getBody())); + thread.appendChild(doc.createTextNode(getThread())); + + XMLSerializer serializer = new XMLSerializer(); + StringWriter strWriter = new StringWriter(); + OutputFormat outFormat = new OutputFormat(); + + outFormat.setEncoding("UTF-8"); + outFormat.setVersion("1.0"); + outFormat.setIndenting(false); + outFormat.setOmitXMLDeclaration(true); + + serializer.setOutputCharStream(strWriter); + serializer.setOutputFormat(outFormat); + + try { + serializer.serialize(doc); + } catch(IOException ioe) { + } + return strWriter.toString(); + } + */ +} + + diff --git a/src/java/org/opensrf/net/xmpp/XMPPReader.java b/src/java/org/opensrf/net/xmpp/XMPPReader.java new file mode 100644 index 0000000..39ea0c2 --- /dev/null +++ b/src/java/org/opensrf/net/xmpp/XMPPReader.java @@ -0,0 +1,279 @@ +package org.opensrf.net.xmpp; + +import javax.xml.stream.*; +import javax.xml.stream.events.* ; +import javax.xml.namespace.QName; +import java.util.Queue; +import java.io.InputStream; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Date; + + +/** + * Slim XMPP Stream reader. This reader only understands enough XMPP + * to handle logins and recv messages. + * @author Bill Erickson, Georgia Public Library Systems + */ +public class XMPPReader implements Runnable { + + /** Queue of received messages. */ + private Queue msgQueue; + /** Incoming XMPP XML stream */ + private InputStream inStream; + /** Current message body */ + private StringBuffer msgBody; + /** Current message thread */ + private StringBuffer msgThread; + /** Current message status */ + private StringBuffer msgStatus; + /** Current message error type */ + private StringBuffer msgErrType; + /** Current message sender */ + private String msgFrom; + /** Current message recipient */ + private String msgTo; + /** Current message error code */ + private int msgErrCode; + + /** Where this reader currently is in the document */ + private XMLState xmlState; + + /** The current connect state to the XMPP server */ + private XMPPStreamState streamState; + + + /** Used to represent out connection state to the XMPP server */ + public static enum XMPPStreamState { + DISCONNECTED, /* not connected to the server */ + CONNECT_SENT, /* we've sent the initial connect message */ + CONNECT_RECV, /* we've received a response to our connect message */ + AUTH_SENT, /* we've sent an authentication request */ + CONNECTED /* authentication is complete */ + }; + + + /** Used to represents where we are in the XML document stream. */ + public static enum XMLState { + IN_NOTHING, + IN_BODY, + IN_THREAD, + IN_STATUS + }; + + + /** + * Creates a new reader. Initializes the message queue. + * Sets the stream state to disconnected, and the xml + * state to in_nothing. + * @param inStream the inbound XML stream + */ + public XMPPReader(InputStream inStream) { + msgQueue = new ConcurrentLinkedQueue(); + this.inStream = inStream; + resetBuffers(); + xmlState = XMLState.IN_NOTHING; + streamState = XMPPStreamState.DISCONNECTED; + } + + /** + * Change the connect state and notify that a core + * event has occurred. + */ + protected void setXMPPStreamState(XMPPStreamState state) { + streamState = state; + notifyCoreEvent(); + } + + /** + * @return The current stream state of the reader + */ + public XMPPStreamState getXMPPStreamState() { + return streamState; + } + + + /** + * @return The next message in the queue, or null + */ + public XMPPMessage popMessageQueue() { + return (XMPPMessage) msgQueue.poll(); + } + + + /** + * Initializes the message buffers + */ + private void resetBuffers() { + msgBody = new StringBuffer(); + msgThread = new StringBuffer(); + msgStatus = new StringBuffer(); + msgErrType = new StringBuffer(); + msgFrom = ""; + msgTo = ""; + } + + + /** + * Notifies the waiting thread that a core event has occurred. + * Each reader should have exactly one dependent session thread. + */ + private synchronized void notifyCoreEvent() { + notify(); + } + + + /** + * Waits up to timeout milliseconds for a core event to occur. + * Also, having a message already waiting in the queue + * constitutes a core event. + * @param timeout The number of milliseconds to wait. If + * timeout is negative, waits potentially forever. + * @return The number of milliseconds in wait + */ + public synchronized long waitCoreEvent(int timeout) { + + if(msgQueue.peek() != null || timeout == 0) return 0; + + long start = new Date().getTime(); + try{ + if(timeout < 0) wait(); + else wait(timeout); + } catch(InterruptedException ie) {} + + return new Date().getTime() - start; + } + + + /** Thread kickoff point */ + public void run() { + read(); + } + + + /** + * Parses XML data from the provided XMPP stream. + * @param inStream The stream to parse. + */ + public void read() { + + try { + XMLInputFactory factory = XMLInputFactory.newInstance(); + + /** disable as many features as possible to speed up the parsing */ + factory.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.FALSE); + factory.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, Boolean.FALSE); + factory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE); + factory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.FALSE); + factory.setProperty(XMLInputFactory.SUPPORT_DTD, Boolean.FALSE); + + /** create the stream reader */ + XMLStreamReader reader = factory.createXMLStreamReader(inStream); + int eventType; + + while(reader.hasNext()) { + /** cycle through the XML events */ + + eventType = reader.next(); + + switch(eventType) { + + case XMLEvent.START_ELEMENT: + handleStartElement(reader); + break; + + case XMLEvent.CHARACTERS: + switch(xmlState) { + case IN_BODY: + msgBody.append(reader.getText()); + break; + case IN_THREAD: + msgThread.append(reader.getText()); + break; + case IN_STATUS: + msgStatus.append(reader.getText()); + break; + } + break; + + case XMLEvent.END_ELEMENT: + xmlState = XMLState.IN_NOTHING; + if("message".equals(reader.getName().toString())) { + + /** build a message and add it to the message queue */ + XMPPMessage msg = new XMPPMessage(); + msg.setFrom(msgFrom); + msg.setTo(msgTo); + msg.setBody(msgBody.toString()); + msg.setThread(msgThread.toString()); + + msgQueue.offer(msg); + resetBuffers(); + notifyCoreEvent(); + } + break; + } + } + + } catch(javax.xml.stream.XMLStreamException se) { + /* XXX log an error, set a state, and notify */ + } + } + + + /** + * Handles the start_element event. + */ + private void handleStartElement(XMLStreamReader reader) { + + String name = reader.getName().toString(); + + if("stream:stream".equals(name)) { + setXMPPStreamState(XMPPStreamState.CONNECT_RECV); + return; + } + + if("iq".equals(name)) { + if("result".equals(reader.getAttributeValue(null, "type"))) + setXMPPStreamState(XMPPStreamState.CONNECTED); + return; + } + + if("message".equals(name)) { + xmlState = XMLState.IN_BODY; + /** add a special case for the opensrf "router_from" attribute */ + String rf = reader.getAttributeValue(null, "router_from"); + if( rf != null ) + msgFrom = rf; + else + msgFrom = reader.getAttributeValue(null, "from"); + msgTo = reader.getAttributeValue(null, "to"); + return; + } + + if("thread".equals(name)) { + xmlState = XMLState.IN_THREAD; + return; + } + + if("status".equals(name)) { + xmlState = XMLState.IN_STATUS; + return; + } + + if("stream:error".equals(name)) { + setXMPPStreamState(XMPPStreamState.DISCONNECTED); + return; + } + + if("error".equals(name)) { + msgErrType.append(reader.getAttributeValue(null, "type")); + msgErrCode = Integer.parseInt(reader.getAttributeValue(null, "code")); + setXMPPStreamState(XMPPStreamState.DISCONNECTED); + return; + } + } +} + + + + diff --git a/src/java/org/opensrf/net/xmpp/XMPPSession.java b/src/java/org/opensrf/net/xmpp/XMPPSession.java new file mode 100644 index 0000000..98b555f --- /dev/null +++ b/src/java/org/opensrf/net/xmpp/XMPPSession.java @@ -0,0 +1,178 @@ +package org.opensrf.net.xmpp; + +import java.io.*; +import java.net.Socket; + + +/** + * Represents a single XMPP session. Sessions are responsible for writing to + * the stream and for managing a stream reader. + */ +public class XMPPSession { + + /** Initial jabber message */ + public static final String JABBER_CONNECT = + ""; + + /** Basic auth message */ + public static final String JABBER_BASIC_AUTH = + "" + + "%s%s%s"; + + /** jabber domain */ + private String host; + /** jabber port */ + private int port; + /** jabber username */ + private String username; + /** jabber password */ + private String password; + /** jabber resource */ + private String resource; + + /** XMPP stream reader */ + XMPPReader reader; + /** Fprint-capable socket writer */ + PrintWriter writer; + /** Raw socket output stream */ + OutputStream outStream; + + + + /** + * Creates a new session. + * @param host The jabber domain + * @param port The jabber port + */ + public XMPPSession( String host, int port ) { + this.host = host; + this.port = port; + } + + + /** true if this session is connected to the server */ + public boolean connected() { + return ( + reader != null && + reader.getXMPPStreamState() == XMPPReader.XMPPStreamState.CONNECTED); + } + + + /** + * Connects to the network. + * @param username The jabber username + * @param password The jabber password + * @param resource The Jabber resource + */ + public void connect(String username, String password, String resource) throws XMPPException { + + this.username = username; + this.password = password; + this.resource = resource; + + Socket socket; + + try { + /* open the socket and associated streams */ + socket = new Socket(host, port); + + /** the session maintains control over the output stream */ + outStream = socket.getOutputStream(); + writer = new PrintWriter(outStream, true); + + /** pass the input stream to the reader */ + reader = new XMPPReader(socket.getInputStream()); + + } catch(IOException ioe) { + throw new + XMPPException("unable to communicate with host " + host + " on port " + port); + } + + /* build the reader thread */ + Thread thread = new Thread(reader); + thread.setDaemon(true); + thread.start(); + + /* send the initial jabber message */ + sendConnect(); + reader.waitCoreEvent(10000); + if( reader.getXMPPStreamState() != XMPPReader.XMPPStreamState.CONNECT_RECV ) + throw new XMPPException("unable to connect to jabber server"); + + /* send the basic auth message */ + sendBasicAuth(); /* XXX add support for other auth mechanisms */ + reader.waitCoreEvent(10000); + if(!connected()) + throw new XMPPException("Authentication failed"); + } + + /** Sends the initial jabber message */ + private void sendConnect() { + writer.printf(JABBER_CONNECT, host); + reader.setXMPPStreamState(XMPPReader.XMPPStreamState.CONNECT_SENT); + } + + /** Send the basic auth message */ + private void sendBasicAuth() { + writer.printf(JABBER_BASIC_AUTH, username, password, resource); + reader.setXMPPStreamState(XMPPReader.XMPPStreamState.AUTH_SENT); + } + + + /** + * Sends an XMPPMessage. + * @param msg The message to send. + */ + public void send(XMPPMessage msg) throws XMPPException { + checkConnected(); + try { + outStream.write(msg.toXML().getBytes()); + } catch (Exception e) { + throw new XMPPException(e.toString()); + } + } + + + /** + * @throws XMPPException if we are no longer connected. + */ + private void checkConnected() throws XMPPException { + if(!connected()) + throw new XMPPException("Disconnected stream"); + } + + + /** + * Receives messages from the network. + * @param timeout Maximum number of milliseconds to wait for a message to arrive. + * If timeout is negative, this method will wait indefinitely. + * If timeout is 0, this method will not block at all, but will return a + * message if there is already a message available. + */ + public XMPPMessage recv(int timeout) throws XMPPException { + + XMPPMessage msg; + + if(timeout < 0) { + + while(true) { /* wait indefinitely for a message to arrive */ + reader.waitCoreEvent(timeout); + msg = reader.popMessageQueue(); + if( msg != null ) return msg; + checkConnected(); + } + + } else { + + while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */ + timeout -= reader.waitCoreEvent(timeout); + msg = reader.popMessageQueue(); + if( msg != null ) return msg; + checkConnected(); + } + } + + return null; + } +} + diff --git a/src/java/org/opensrf/test/TestXMPP.java b/src/java/org/opensrf/test/TestXMPP.java new file mode 100644 index 0000000..8a34a2e --- /dev/null +++ b/src/java/org/opensrf/test/TestXMPP.java @@ -0,0 +1,57 @@ +package org.opensrf.test; + +import org.opensrf.net.xmpp.XMPPReader; +import org.opensrf.net.xmpp.XMPPMessage; +import org.opensrf.net.xmpp.XMPPSession; + +public class TestXMPP { + + public static void main(String args[]) throws Exception { + + String host; + int port; + String username; + String password; + String resource; + String recipient; + + try { + host = args[0]; + port = Integer.parseInt(args[1]); + username = args[2]; + password = args[3]; + resource = args[4]; + + } catch(ArrayIndexOutOfBoundsException e) { + System.err.println("usage: org.opensrf.test.TestXMPP "); + return; + } + + XMPPSession session = new XMPPSession(host, port); + session.connect(username, password, resource); + + XMPPMessage msg; + + if( args.length == 6 ) { + /** they specified a recipient */ + recipient = args[5]; + msg = new XMPPMessage(); + msg.setTo(recipient); + msg.setThread("test-thread"); + msg.setBody("Hello, from java-xmpp"); + System.out.println("Sending message to " + recipient); + session.send(msg); + } + + while(true) { + System.out.println("waiting for message..."); + msg = session.recv(-1); + System.out.println("got message: " + msg.toXML()); + } + } +} + + + + + -- 2.43.2