added some more rigorous checking of session connected-ness and some logging
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPSession.java
1 package org.opensrf.net.xmpp;
2
3 import java.io.*;
4 import java.net.Socket;
5 import java.util.Map;
6 import java.util.Iterator;
7 import java.util.concurrent.ConcurrentHashMap;
8
9
10 /**
11  * Represents a single XMPP session.  Sessions are responsible for writing to
12  * the stream and for managing a stream reader.
13  */
14 public class XMPPSession {
15
16     /** Initial jabber message */
17     public static final String JABBER_CONNECT = 
18         "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
19
20     /** Basic auth message */
21     public static final String JABBER_BASIC_AUTH =  
22         "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" +
23         "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
24
25     public static final String JABBER_DISCONNECT = "</stream:stream>";
26
27     private static Map threadConnections = new ConcurrentHashMap();
28
29     /** jabber domain */
30     private String host;
31     /** jabber port */
32     private int port;
33     /** jabber username */
34     private String username;
35     /** jabber password */
36     private String password;
37     /** jabber resource */
38     private String resource;
39
40     /** XMPP stream reader */
41     XMPPReader reader;
42     /** Fprint-capable socket writer */
43     PrintWriter writer;
44     /** Raw socket output stream */
45     OutputStream outStream;
46     /** The raw socket */
47     Socket socket;
48
49     /** The process-wide session.  All communication occurs
50      * accross this single connection */
51     private static XMPPSession globalSession;
52
53
54     /**
55      * Creates a new session.
56      * @param host The jabber domain
57      * @param port The jabber port
58      */
59     public XMPPSession( String host, int port ) {
60         this.host = host;
61         this.port = port;
62     }
63
64     /**
65      * Returns the global, process-wide session
66      */
67     /*
68     public static XMPPSession getGlobalSession() {
69         return globalSession;
70     }
71     */
72
73     public static XMPPSession getThreadSession() {
74         return (XMPPSession) threadConnections.get(new Long(Thread.currentThread().getId()));
75     }
76
77     /**
78      * Sets the given session as the global session for the current thread
79      * @param ses The session
80      */
81     public static void setThreadSession(XMPPSession ses) {
82         /* every time we create a new connection, clean up any dead threads. 
83          * this is cheaper than cleaning up the dead threads at every access. */
84         cleanupThreadSessions();
85         threadConnections.put(new Long(Thread.currentThread().getId()), ses);
86     }
87
88     /**
89      * Analyzes the threadSession data to see if there are any sessions
90      * whose controlling thread has gone away.  
91      */
92     private static void cleanupThreadSessions() {
93         Thread threads[] = new Thread[Thread.activeCount()]; 
94         Thread.enumerate(threads);
95         for(Iterator i = threadConnections.keySet().iterator(); i.hasNext(); ) {
96             boolean found = false;
97             Long id = (Long) i.next();
98             for(Thread t : threads) {
99                 if(t.getId() == id.longValue()) {
100                     found = true;
101                     break;
102                 }
103             }
104             if(!found) 
105                 threadConnections.remove(id);
106         }
107     }
108
109     /**
110      * Sets the global, process-wide section
111      */
112     /*
113     public static void setGlobalSession(XMPPSession ses) {
114         globalSession = ses;
115     }
116     */
117
118
119     /** true if this session is connected to the server */
120     public boolean connected() {
121         return (
122                 reader != null && 
123                 reader.getXMPPStreamState() == XMPPReader.XMPPStreamState.CONNECTED &&
124                 !socket.isClosed()
125             );
126     }
127
128
129     /**
130      * Connects to the network.
131      * @param username The jabber username
132      * @param password The jabber password
133      * @param resource The Jabber resource
134      */
135     public void connect(String username, String password, String resource) throws XMPPException {
136
137         this.username = username;
138         this.password = password;
139         this.resource = resource;
140
141         try { 
142             /* open the socket and associated streams */
143             socket = new Socket(host, port);
144
145             /** the session maintains control over the output stream */
146             outStream = socket.getOutputStream();
147             writer = new PrintWriter(outStream, true);
148
149             /** pass the input stream to the reader */
150             reader = new XMPPReader(socket.getInputStream());
151
152         } catch(IOException ioe) {
153             throw new 
154                 XMPPException("unable to communicate with host " + host + " on port " + port);
155         }
156
157         /* build the reader thread */
158         Thread thread = new Thread(reader);
159         thread.setDaemon(true);
160         thread.start();
161
162         synchronized(reader) {
163             /* send the initial jabber message */
164             sendConnect();
165             reader.waitCoreEvent(10000);
166         }
167         if( reader.getXMPPStreamState() != XMPPReader.XMPPStreamState.CONNECT_RECV ) 
168             throw new XMPPException("unable to connect to jabber server");
169
170         synchronized(reader) {
171             /* send the basic auth message */
172             sendBasicAuth(); 
173             reader.waitCoreEvent(10000);
174         }
175         if(!connected()) 
176             throw new XMPPException("Authentication failed");
177     }
178
179     /** Sends the initial jabber message */
180     private void sendConnect() {
181         reader.setXMPPStreamState(XMPPReader.XMPPStreamState.CONNECT_SENT);
182         writer.printf(JABBER_CONNECT, host);
183     }
184
185     /** Send the basic auth message */
186     private void sendBasicAuth() {
187         reader.setXMPPStreamState(XMPPReader.XMPPStreamState.AUTH_SENT);
188         writer.printf(JABBER_BASIC_AUTH, username, password, resource);
189     }
190
191
192     /**
193      * Sends an XMPPMessage.
194      * @param msg The message to send.
195      */
196     public synchronized void send(XMPPMessage msg) throws XMPPException {
197         checkConnected();
198         try {
199             String xml = msg.toXML();
200             outStream.write(xml.getBytes()); 
201         } catch (Exception e) {
202             throw new XMPPException(e.toString());
203         }
204     }
205
206
207     /**
208      * @throws XMPPException if we are no longer connected.
209      */
210     private void checkConnected() throws XMPPException {
211         if(!connected())
212             throw new XMPPException("Disconnected stream");
213     }
214
215
216     /**
217      * Receives messages from the network.  
218      * @param timeout Maximum number of milliseconds to wait for a message to arrive.
219      * If timeout is negative, this method will wait indefinitely.
220      * If timeout is 0, this method will not block at all, but will return a 
221      * message if there is already a message available.
222      */
223     public XMPPMessage recv(long timeout) throws XMPPException {
224
225         XMPPMessage msg;
226
227         if(timeout < 0) {
228
229             while(true) { /* wait indefinitely for a message to arrive */
230                 reader.waitCoreEvent(timeout);
231                 msg = reader.popMessageQueue();
232                 if( msg != null ) return msg;
233                 checkConnected();
234             }
235
236         } else {
237
238             while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */
239                 msg = reader.popMessageQueue();
240                 if( msg != null ) return msg;
241                 timeout -= reader.waitCoreEvent(timeout);
242                 msg = reader.popMessageQueue();
243                 if( msg != null ) return msg;
244                 checkConnected();
245             }
246         }
247
248         return reader.popMessageQueue();
249     }
250
251
252     /**
253      * Disconnects from the jabber server and closes the socket
254      */
255     public void disconnect() {
256         try {
257             outStream.write(JABBER_DISCONNECT.getBytes());
258             socket.close();
259         } catch(Exception e) {}
260     }
261 }
262