From 503d4576971a338bbc2524e19cd63bc8233031ca Mon Sep 17 00:00:00 2001 From: erickson Date: Fri, 17 Aug 2007 21:57:16 +0000 Subject: [PATCH] added multi-threaded client support to the opensrf network/xmpp layer this is all managed below the covers so that clients can continue to safely use bootstrapClient and will all "just work" we now allow 1 xmpp connection per thread, as opposed to 1 per process. added a test module git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1079 9efc2488-bf62-4759-914b-345cdb29e865 --- src/java/org/opensrf/ClientSession.java | 3 +- src/java/org/opensrf/Request.java | 11 ++- src/java/org/opensrf/Session.java | 5 +- src/java/org/opensrf/Sys.java | 26 ++++++- .../org/opensrf/net/xmpp/XMPPSession.java | 49 ++++++++++++- src/java/org/opensrf/test/TestThread.java | 68 +++++++++++++++++++ 6 files changed, 151 insertions(+), 11 deletions(-) create mode 100644 src/java/org/opensrf/test/TestThread.java diff --git a/src/java/org/opensrf/ClientSession.java b/src/java/org/opensrf/ClientSession.java index cc68505..e36ef07 100644 --- a/src/java/org/opensrf/ClientSession.java +++ b/src/java/org/opensrf/ClientSession.java @@ -51,7 +51,7 @@ public class ClientSession extends Session { /** create a random thread */ long time = new Date().getTime(); Random rand = new Random(time); - setThread(rand.nextInt()+""+rand.nextInt()+""+time); + setThread(rand.nextInt()+""+rand.nextInt()+""+time+Thread.currentThread().getId()); nextId = 0; requests = new HashMap(); @@ -115,6 +115,7 @@ public class ClientSession extends Session { Request req = findRequest(msg.getId()); if(req == null) { /** LOG that we've received a result to a non-existant request */ + System.err.println(msg.getId() +" has no corresponding request"); return; } OSRFObject payload = (OSRFObject) msg.get("payload"); diff --git a/src/java/org/opensrf/Request.java b/src/java/org/opensrf/Request.java index e5d0ef6..8fe537f 100644 --- a/src/java/org/opensrf/Request.java +++ b/src/java/org/opensrf/Request.java @@ -73,11 +73,16 @@ public class Request { Result result = null; + if((result = resultQueue.poll()) != null) + return result; + if(millis < 0 && !complete) { /** wait potentially forever for a result to arrive */ - session.waitForMessage(millis); - if((result = resultQueue.poll()) != null) - return result; + while(!complete) { + session.waitForMessage(millis); + if((result = resultQueue.poll()) != null) + return result; + } } else { diff --git a/src/java/org/opensrf/Session.java b/src/java/org/opensrf/Session.java index e2b41c3..f5045b3 100644 --- a/src/java/org/opensrf/Session.java +++ b/src/java/org/opensrf/Session.java @@ -45,10 +45,9 @@ public abstract class Session { xmsg.setTo(remoteNode); xmsg.setThread(thread); xmsg.setBody(new JSONWriter(Arrays.asList(new Message[] {omsg})).write()); - XMPPSession ses = XMPPSession.getGlobalSession(); try { - XMPPSession.getGlobalSession().send(xmsg); + XMPPSession.getThreadSession().send(xmsg); } catch(XMPPException e) { connectState = ConnectState.DISCONNECTED; throw new SessionException("Error sending message to " + remoteNode, e); @@ -63,7 +62,7 @@ public abstract class Session { public static void waitForMessage(long millis) throws SessionException, MethodException { try { Stack.processXMPPMessage( - XMPPSession.getGlobalSession().recv(millis)); + XMPPSession.getThreadSession().recv(millis)); } catch(XMPPException e) { throw new SessionException("Error waiting for message", e); } diff --git a/src/java/org/opensrf/Sys.java b/src/java/org/opensrf/Sys.java index 8520838..85fb118 100644 --- a/src/java/org/opensrf/Sys.java +++ b/src/java/org/opensrf/Sys.java @@ -2,6 +2,9 @@ package org.opensrf; import org.opensrf.util.*; import org.opensrf.net.xmpp.*; +import java.util.Random; +import java.util.Date; +import java.net.InetAddress; public class Sys { @@ -16,6 +19,10 @@ public class Sys { public static void bootstrapClient(String configFile, String configContext) throws ConfigException, SessionException { + /** see if the current thread already has a connection */ + if(XMPPSession.getThreadSession() != null) + return; + /** create the config parser */ Config config = new Config(configContext); config.parse(configFile); @@ -27,11 +34,24 @@ public class Sys { String host = (String) config.getFirst("/domains/domain"); int port = config.getInt("/port"); + + /** Create a random login resource string */ + String res = "java_"; try { + res += InetAddress.getLocalHost().getHostAddress(); + } catch(java.net.UnknownHostException e) {} + res += "_"+Math.abs(new Random(new Date().getTime()).nextInt()) + + "_t"+ Thread.currentThread().getId(); + + + try { + /** Connect to the Jabber network */ XMPPSession xses = new XMPPSession(host, port); - xses.connect(username, passwd, "test-java"); /* XXX */ - XMPPSession.setGlobalSession(xses); + System.out.println("resource = " + res); + xses.connect(username, passwd, res); + XMPPSession.setThreadSession(xses); + } catch(XMPPException e) { throw new SessionException("Unable to bootstrap client", e); } @@ -41,7 +61,7 @@ public class Sys { * Shuts down the connection to the opensrf network */ public static void shutdown() { - XMPPSession.getGlobalSession().disconnect(); + XMPPSession.getThreadSession().disconnect(); } } diff --git a/src/java/org/opensrf/net/xmpp/XMPPSession.java b/src/java/org/opensrf/net/xmpp/XMPPSession.java index 11e0c7d..cc2a9f6 100644 --- a/src/java/org/opensrf/net/xmpp/XMPPSession.java +++ b/src/java/org/opensrf/net/xmpp/XMPPSession.java @@ -2,6 +2,9 @@ package org.opensrf.net.xmpp; import java.io.*; import java.net.Socket; +import java.util.Map; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; /** @@ -21,6 +24,8 @@ public class XMPPSession { public static final String JABBER_DISCONNECT = ""; + private static Map threadConnections = new ConcurrentHashMap(); + /** jabber domain */ private String host; /** jabber port */ @@ -59,16 +64,56 @@ public class XMPPSession { /** * Returns the global, process-wide session */ + /* public static XMPPSession getGlobalSession() { return globalSession; } + */ + + public static XMPPSession getThreadSession() { + return (XMPPSession) threadConnections.get(new Long(Thread.currentThread().getId())); + } + + /** + * Sets the given session as the global session for the current thread + * @param ses The session + */ + public static void setThreadSession(XMPPSession ses) { + /* every time we create a new connection, clean up any dead threads. + * this is cheaper than cleaning up the dead threads at every access. */ + cleanupThreadSessions(); + threadConnections.put(new Long(Thread.currentThread().getId()), ses); + } + + /** + * Analyzes the threadSession data to see if there are any sessions + * whose controlling thread has gone away. + */ + private static void cleanupThreadSessions() { + Thread threads[] = new Thread[Thread.activeCount()]; + Thread.enumerate(threads); + for(Iterator i = threadConnections.keySet().iterator(); i.hasNext(); ) { + boolean found = false; + Long id = (Long) i.next(); + for(Thread t : threads) { + if(t.getId() == id.longValue()) { + found = true; + break; + } + } + if(!found) + threadConnections.remove(id); + } + } /** * Sets the global, process-wide section */ + /* public static void setGlobalSession(XMPPSession ses) { globalSession = ses; } + */ /** true if this session is connected to the server */ @@ -190,6 +235,8 @@ public class XMPPSession { } else { while(timeout >= 0) { /* wait at most 'timeout' milleseconds for a message to arrive */ + msg = reader.popMessageQueue(); + if( msg != null ) return msg; timeout -= reader.waitCoreEvent(timeout); msg = reader.popMessageQueue(); if( msg != null ) return msg; @@ -197,7 +244,7 @@ public class XMPPSession { } } - return null; + return reader.popMessageQueue(); } diff --git a/src/java/org/opensrf/test/TestThread.java b/src/java/org/opensrf/test/TestThread.java new file mode 100644 index 0000000..bb4cf06 --- /dev/null +++ b/src/java/org/opensrf/test/TestThread.java @@ -0,0 +1,68 @@ +package org.opensrf.test; +import org.opensrf.*; +import org.opensrf.util.*; +import java.util.Map; +import java.util.Date; +import java.util.List; +import java.util.ArrayList; +import java.io.PrintStream; + +/** + * Connects to the opensrf network once per thread and runs + * and runs a series of request acccross all launched threads. + * The purpose is to verify that the java threaded client api + * is functioning as expected + */ +public class TestThread implements Runnable { + + String args[]; + + public TestThread(String args[]) { + this.args = args; + } + + public void run() { + + try { + + Sys.bootstrapClient(args[0], "/config/opensrf"); + ClientSession session = new ClientSession(args[3]); + + List params = new ArrayList(); + for(int i = 5; i < args.length; i++) + params.add(new JSONReader(args[3]).read()); + + for(int i = 0; i < Integer.parseInt(args[2]); i++) { + System.out.println("thread " + Thread.currentThread().getId()+" sending request " + i); + Request request = session.request(args[4], params); + Result result = request.recv(3000); + if(result != null) { + System.out.println("thread " + Thread.currentThread().getId()+ + " got result JSON: " + new JSONWriter(result.getContent()).write()); + } else { + System.out.println("* thread " + Thread.currentThread().getId()+ " got NO result"); + } + } + + Sys.shutdown(); + } catch(Exception e) { + System.err.println(e); + } + } + + public static void main(String args[]) throws Exception { + + if(args.length < 5) { + System.out.println( "usage: org.opensrf.test.TestClient "+ + " [, ]"); + return; + } + + int numThreads = Integer.parseInt(args[1]); + for(int i = 0; i < numThreads; i++) + new Thread(new TestThread(args)).start(); + } +} + + + -- 2.43.2