1 package org.opensrf.net.xmpp;
4 import java.net.Socket;
6 import java.util.Iterator;
7 import java.util.concurrent.ConcurrentHashMap;
11 * Represents a single XMPP session. Sessions are responsible for writing to
12 * the stream and for managing a stream reader.
14 public class XMPPSession {
16 /** Initial jabber message */
17 public static final String JABBER_CONNECT =
18 "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
20 /** Basic auth message */
21 public static final String JABBER_BASIC_AUTH =
22 "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" +
23 "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
25 public static final String JABBER_DISCONNECT = "</stream:stream>";
27 private static Map threadConnections = new ConcurrentHashMap();
33 /** jabber username */
34 private String username;
35 /** jabber password */
36 private String password;
37 /** jabber resource */
38 private String resource;
40 /** XMPP stream reader */
42 /** Fprint-capable socket writer */
44 /** Raw socket output stream */
45 OutputStream outStream;
49 /** The process-wide session. All communication occurs
50 * accross this single connection */
51 private static XMPPSession globalSession;
55 * Creates a new session.
56 * @param host The jabber domain
57 * @param port The jabber port
59 public XMPPSession( String host, int port ) {
65 * Returns the global, process-wide session
68 public static XMPPSession getGlobalSession() {
73 public static XMPPSession getThreadSession() {
74 return (XMPPSession) threadConnections.get(new Long(Thread.currentThread().getId()));
78 * Sets the given session as the global session for the current thread
79 * @param ses The session
81 public static void setThreadSession(XMPPSession ses) {
82 /* every time we create a new connection, clean up any dead threads.
83 * this is cheaper than cleaning up the dead threads at every access. */
84 cleanupThreadSessions();
85 threadConnections.put(new Long(Thread.currentThread().getId()), ses);
89 * Analyzes the threadSession data to see if there are any sessions
90 * whose controlling thread has gone away.
92 private static void cleanupThreadSessions() {
93 Thread threads[] = new Thread[Thread.activeCount()];
94 Thread.enumerate(threads);
95 for(Iterator i = threadConnections.keySet().iterator(); i.hasNext(); ) {
96 boolean found = false;
97 Long id = (Long) i.next();
98 for(Thread t : threads) {
99 if(t.getId() == id.longValue()) {
105 threadConnections.remove(id);
110 * Sets the global, process-wide section
113 public static void setGlobalSession(XMPPSession ses) {
119 /** true if this session is connected to the server */
120 public boolean connected() {
123 reader.getXMPPStreamState() == XMPPReader.XMPPStreamState.CONNECTED &&
130 * Connects to the network.
131 * @param username The jabber username
132 * @param password The jabber password
133 * @param resource The Jabber resource
135 public void connect(String username, String password, String resource) throws XMPPException {
137 this.username = username;
138 this.password = password;
139 this.resource = resource;
142 /* open the socket and associated streams */
143 socket = new Socket(host, port);
145 /** the session maintains control over the output stream */
146 outStream = socket.getOutputStream();
147 writer = new PrintWriter(outStream, true);
149 /** pass the input stream to the reader */
150 reader = new XMPPReader(socket.getInputStream());
152 } catch(IOException ioe) {
154 XMPPException("unable to communicate with host " + host + " on port " + port);
157 /* build the reader thread */
158 Thread thread = new Thread(reader);
159 thread.setDaemon(true);
162 synchronized(reader) {
163 /* send the initial jabber message */
165 reader.waitCoreEvent(10000);
167 if( reader.getXMPPStreamState() != XMPPReader.XMPPStreamState.CONNECT_RECV )
168 throw new XMPPException("unable to connect to jabber server");
170 synchronized(reader) {
171 /* send the basic auth message */
173 reader.waitCoreEvent(10000);
176 throw new XMPPException("Authentication failed");
179 /** Sends the initial jabber message */
180 private void sendConnect() {
181 reader.setXMPPStreamState(XMPPReader.XMPPStreamState.CONNECT_SENT);
182 writer.printf(JABBER_CONNECT, host);
185 /** Send the basic auth message */
186 private void sendBasicAuth() {
187 reader.setXMPPStreamState(XMPPReader.XMPPStreamState.AUTH_SENT);
188 writer.printf(JABBER_BASIC_AUTH, username, password, resource);
193 * Sends an XMPPMessage.
194 * @param msg The message to send.
196 public synchronized void send(XMPPMessage msg) throws XMPPException {
199 String xml = msg.toXML();
200 outStream.write(xml.getBytes());
201 } catch (Exception e) {
202 throw new XMPPException(e.toString());
208 * @throws XMPPException if we are no longer connected.
210 private void checkConnected() throws XMPPException {
212 throw new XMPPException("Disconnected stream");
217 * Receives messages from the network.
218 * @param timeout Maximum number of milliseconds to wait for a message to arrive.
219 * If timeout is negative, this method will wait indefinitely.
220 * If timeout is 0, this method will not block at all, but will return a
221 * message if there is already a message available.
223 public XMPPMessage recv(long timeout) throws XMPPException {
229 while(true) { /* wait indefinitely for a message to arrive */
230 reader.waitCoreEvent(timeout);
231 msg = reader.popMessageQueue();
232 if( msg != null ) return msg;
238 while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */
239 msg = reader.popMessageQueue();
240 if( msg != null ) return msg;
241 timeout -= reader.waitCoreEvent(timeout);
242 msg = reader.popMessageQueue();
243 if( msg != null ) return msg;
245 if(timeout == 0) break;
249 return reader.popMessageQueue();
254 * Disconnects from the jabber server and closes the socket
256 public void disconnect() {
258 outStream.write(JABBER_DISCONNECT.getBytes());
260 } catch(Exception e) {}