]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/java/org/opensrf/net/xmpp/XMPPSession.java
added multi-threaded client support to the opensrf network/xmpp layer
[OpenSRF.git] / src / java / org / opensrf / net / xmpp / XMPPSession.java
1 package org.opensrf.net.xmpp;
2
3 import java.io.*;
4 import java.net.Socket;
5 import java.util.Map;
6 import java.util.Iterator;
7 import java.util.concurrent.ConcurrentHashMap;
8
9
10 /**
11  * Represents a single XMPP session.  Sessions are responsible for writing to
12  * the stream and for managing a stream reader.
13  */
14 public class XMPPSession {
15
16     /** Initial jabber message */
17     public static final String JABBER_CONNECT = 
18         "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
19
20     /** Basic auth message */
21     public static final String JABBER_BASIC_AUTH =  
22         "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" +
23         "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
24
25     public static final String JABBER_DISCONNECT = "</stream:stream>";
26
27     private static Map threadConnections = new ConcurrentHashMap();
28
29     /** jabber domain */
30     private String host;
31     /** jabber port */
32     private int port;
33     /** jabber username */
34     private String username;
35     /** jabber password */
36     private String password;
37     /** jabber resource */
38     private String resource;
39
40     /** XMPP stream reader */
41     XMPPReader reader;
42     /** Fprint-capable socket writer */
43     PrintWriter writer;
44     /** Raw socket output stream */
45     OutputStream outStream;
46     /** The raw socket */
47     Socket socket;
48
49     /** The process-wide session.  All communication occurs
50      * accross this single connection */
51     private static XMPPSession globalSession;
52
53
54     /**
55      * Creates a new session.
56      * @param host The jabber domain
57      * @param port The jabber port
58      */
59     public XMPPSession( String host, int port ) {
60         this.host = host;
61         this.port = port;
62     }
63
64     /**
65      * Returns the global, process-wide session
66      */
67     /*
68     public static XMPPSession getGlobalSession() {
69         return globalSession;
70     }
71     */
72
73     public static XMPPSession getThreadSession() {
74         return (XMPPSession) threadConnections.get(new Long(Thread.currentThread().getId()));
75     }
76
77     /**
78      * Sets the given session as the global session for the current thread
79      * @param ses The session
80      */
81     public static void setThreadSession(XMPPSession ses) {
82         /* every time we create a new connection, clean up any dead threads. 
83          * this is cheaper than cleaning up the dead threads at every access. */
84         cleanupThreadSessions();
85         threadConnections.put(new Long(Thread.currentThread().getId()), ses);
86     }
87
88     /**
89      * Analyzes the threadSession data to see if there are any sessions
90      * whose controlling thread has gone away.  
91      */
92     private static void cleanupThreadSessions() {
93         Thread threads[] = new Thread[Thread.activeCount()]; 
94         Thread.enumerate(threads);
95         for(Iterator i = threadConnections.keySet().iterator(); i.hasNext(); ) {
96             boolean found = false;
97             Long id = (Long) i.next();
98             for(Thread t : threads) {
99                 if(t.getId() == id.longValue()) {
100                     found = true;
101                     break;
102                 }
103             }
104             if(!found) 
105                 threadConnections.remove(id);
106         }
107     }
108
109     /**
110      * Sets the global, process-wide section
111      */
112     /*
113     public static void setGlobalSession(XMPPSession ses) {
114         globalSession = ses;
115     }
116     */
117
118
119     /** true if this session is connected to the server */
120     public boolean connected() {
121         return (
122             reader != null && 
123             reader.getXMPPStreamState() == 
124                 XMPPReader.XMPPStreamState.CONNECTED);
125     }
126
127
128     /**
129      * Connects to the network.
130      * @param username The jabber username
131      * @param password The jabber password
132      * @param resource The Jabber resource
133      */
134     public void connect(String username, String password, String resource) throws XMPPException {
135
136         this.username = username;
137         this.password = password;
138         this.resource = resource;
139
140         try { 
141             /* open the socket and associated streams */
142             socket = new Socket(host, port);
143
144             /** the session maintains control over the output stream */
145             outStream = socket.getOutputStream();
146             writer = new PrintWriter(outStream, true);
147
148             /** pass the input stream to the reader */
149             reader = new XMPPReader(socket.getInputStream());
150
151         } catch(IOException ioe) {
152             throw new 
153                 XMPPException("unable to communicate with host " + host + " on port " + port);
154         }
155
156         /* build the reader thread */
157         Thread thread = new Thread(reader);
158         thread.setDaemon(true);
159         thread.start();
160
161         synchronized(reader) {
162             /* send the initial jabber message */
163             sendConnect();
164             reader.waitCoreEvent(10000);
165         }
166         if( reader.getXMPPStreamState() != XMPPReader.XMPPStreamState.CONNECT_RECV ) 
167             throw new XMPPException("unable to connect to jabber server");
168
169         synchronized(reader) {
170             /* send the basic auth message */
171             sendBasicAuth(); 
172             reader.waitCoreEvent(10000);
173         }
174         if(!connected()) 
175             throw new XMPPException("Authentication failed");
176     }
177
178     /** Sends the initial jabber message */
179     private void sendConnect() {
180         reader.setXMPPStreamState(XMPPReader.XMPPStreamState.CONNECT_SENT);
181         writer.printf(JABBER_CONNECT, host);
182     }
183
184     /** Send the basic auth message */
185     private void sendBasicAuth() {
186         reader.setXMPPStreamState(XMPPReader.XMPPStreamState.AUTH_SENT);
187         writer.printf(JABBER_BASIC_AUTH, username, password, resource);
188     }
189
190
191     /**
192      * Sends an XMPPMessage.
193      * @param msg The message to send.
194      */
195     public synchronized void send(XMPPMessage msg) throws XMPPException {
196         checkConnected();
197         try {
198             String xml = msg.toXML();
199             outStream.write(xml.getBytes()); 
200         } catch (Exception e) {
201             throw new XMPPException(e.toString());
202         }
203     }
204
205
206     /**
207      * @throws XMPPException if we are no longer connected.
208      */
209     private void checkConnected() throws XMPPException {
210         if(!connected())
211             throw new XMPPException("Disconnected stream");
212     }
213
214
215     /**
216      * Receives messages from the network.  
217      * @param timeout Maximum number of milliseconds to wait for a message to arrive.
218      * If timeout is negative, this method will wait indefinitely.
219      * If timeout is 0, this method will not block at all, but will return a 
220      * message if there is already a message available.
221      */
222     public XMPPMessage recv(long timeout) throws XMPPException {
223
224         XMPPMessage msg;
225
226         if(timeout < 0) {
227
228             while(true) { /* wait indefinitely for a message to arrive */
229                 reader.waitCoreEvent(timeout);
230                 msg = reader.popMessageQueue();
231                 if( msg != null ) return msg;
232                 checkConnected();
233             }
234
235         } else {
236
237             while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */
238                 msg = reader.popMessageQueue();
239                 if( msg != null ) return msg;
240                 timeout -= reader.waitCoreEvent(timeout);
241                 msg = reader.popMessageQueue();
242                 if( msg != null ) return msg;
243                 checkConnected();
244             }
245         }
246
247         return reader.popMessageQueue();
248     }
249
250
251     /**
252      * Disconnects from the jabber server and closes the socket
253      */
254     public void disconnect() {
255         try {
256             outStream.write(JABBER_DISCONNECT.getBytes());
257             socket.close();
258         } catch(Exception e) {}
259     }
260 }
261