implemented the majority of server-side python. still need to add settings server...
[OpenSRF.git] / src / python / osrf / ses.py
index 011b143..60c8b27 100644 (file)
 # GNU General Public License for more details.
 # -----------------------------------------------------------------------
 
-import osrf.json
-import osrf.conf
-import osrf.log
-import osrf.net
-import osrf.net_obj
+import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
     OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
     OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
-    OSRF_MESSAGE_TYPE_REQUEST
+    OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
 import osrf.ex
 import random, os, time, threading
 
@@ -47,10 +43,18 @@ class Session(object):
         self.state = OSRF_APP_SESSION_DISCONNECTED
         self.remote_id = None
         self.locale = None
+        self.thread = None
+        self.service = None
 
-    def find_session(threadTrace):
-        return Session.session_cache.get(threadTrace)
-    find_session = staticmethod(find_session)
+    @staticmethod
+    def find_or_create(thread):
+        if thread in Session.session_cache:
+            return Session.session_cache[thread]
+        return ServerSession(thread)
+
+    def set_remote_id(self, remoteid):
+        self.remote_id = remoteid
+        osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
 
     def wait(self, timeout=120):
         """Wait up to <timeout> seconds for data to arrive on the network"""
@@ -58,11 +62,14 @@ class Session(object):
         handle = osrf.net.get_network_handle()
         handle.recv(timeout)
 
-    def send(self, omessage):
+    def send(self, omessages):
         """Sends an OpenSRF message"""
+        if not isinstance(omessages, list):
+            omessages = [omessages]
+            
         net_msg = osrf.net.NetworkMessage(
             recipient      = self.remote_id,
-            body    = osrf.json.to_json([omessage]),
+            body    = osrf.json.to_json(omessages),
             thread = self.thread,
             locale = self.locale,
         )
@@ -82,7 +89,7 @@ class ClientSession(Session):
         # call superclass constructor
         Session.__init__(self)
 
-        # the remote service we want to make requests of
+        # the service we are sending requests to
         self.service = service
 
         # the locale we want requests to be returned in
@@ -128,7 +135,7 @@ class ClientSession(Session):
             self.reset_remote_id()
 
         osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
-        req = Request(self, self.next_id, method, arr, self.locale)
+        req = ClientRequest(self, self.next_id, method, arr, self.locale)
         self.requests[str(self.next_id)] = req
         self.next_id += 1
         req.send()
@@ -178,9 +185,6 @@ class ClientSession(Session):
         self.state = OSRF_APP_SESSION_DISCONNECTED
 
     
-    def set_remote_id(self, remoteid):
-        self.remote_id = remoteid
-        osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
 
     def reset_remote_id(self):
         """Recovers the original remote id"""
@@ -220,23 +224,27 @@ class ClientSession(Session):
 
 
 class Request(object):
-    """Represents a single OpenSRF request.
-        A request is made and any resulting respones are 
-        collected for the client."""
-
     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
-
         self.session = session # my session handle
         self.rid     = rid # my unique request ID
         self.method = method # method name
         self.params = params # my method params
+        self.locale = locale
+        self.complete = False # is this request done?
+        self.complete_time =  0 # time at which the request was completed
+
+
+class ClientRequest(Request):
+    """Represents a single OpenSRF request.
+        A request is made and any resulting respones are 
+        collected for the client."""
+
+    def __init__(self, session, rid, method=None, params=[], locale='en-US'):
+        Request.__init__(self, session, rid, method, params, locale)
         self.queue  = [] # response queue
         self.reset_timeout = False # resets the recv timeout?
-        self.complete = False # has the server told us this request is done?
         self.send_time = 0 # local time the request was put on the wire
-        self.complete_time =  0 # time the server told us the request was completed
         self.first_response_time = 0 # time it took for our first reponse to be received
-        self.locale = locale
 
     def send(self):
         """Sends a request message"""
@@ -268,14 +276,17 @@ class Request(object):
 
         orig_timeout = timeout
         while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
+
             s = time.time()
             self.session.wait(timeout)
-            if orig_timeout >= 0:
-                timeout -= time.time() - s
+
             if self.reset_timeout:
                 self.reset_timeout = False
                 timeout = orig_timeout
 
+            elif orig_timeout >= 0:
+                timeout -= time.time() - s
+
         now = time.time()
 
         # -----------------------------------------------------------------
@@ -317,11 +328,100 @@ class Request(object):
 
 class ServerSession(Session):
     """Implements a server-side session"""
-    pass
 
+    def __init__(self, thread):
+        Session.__init__(self)
+        self.thread = thread
+
+    def send_status(self, thread_trace, payload):
+        self.send(
+            osrf.net_obj.NetworkObject.osrfMessage( 
+                {   'threadTrace' : thread_trace,
+                    'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
+                    'payload' : payload,
+                    'locale' : self.locale
+                } 
+            )
+        )
+
+    def send_connect_ok(self, thread_trace):
+        status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({   
+            'status' : 'Connection Successful',
+            'statusCode': osrf.const.OSRF_STATUS_OK
+        })
+        self.send_status(thread_trace, status_msg)
 
 
+class ServerRequest(Request):
 
+    def __init__(self, session, rid, method, params=[]):
+        Request.__init__(self, session, rid, method, params, session.locale)
+        self.response_list = []
+
+    def _build_response_msg(self, data):
+        result = osrf.net_obj.NetworkObject.osrfResult({
+            'content' :  data,
+            'statusCode' : osrf.const.OSRF_STATUS_OK,
+            'status' : 'OK'
+        })
+
+        return osrf.net_obj.NetworkObject.osrfMessage({
+            'threadTrace' : self.rid,
+            'type' : OSRF_MESSAGE_TYPE_RESULT,
+            'payload' : result,
+            'locale' : self.locale
+        })
+
+    def _build_complete_msg(self):
+
+        status = osrf.net_obj.NetworkObject.osrfConnectStatus({   
+            'threadTrace' : self.rid,
+            'status' : 'Request Complete',
+            'statusCode': osrf.const.OSRF_STATUS_COMPLETE
+        })
+
+        return osrf.net_obj.NetworkObject.osrfMessage({
+            'threadTrace' : self.rid,
+            'type' : OSRF_MESSAGE_TYPE_STATUS,
+            'payload' : status,
+            'locale' : self.locale
+        })
+
+    def respond(self, data):
+        ''' For non-atomic calls, this sends a response directly back
+            to the client.  For atomic calls, this pushes the response
+            onto the response list '''
+        osrf.log.log_internal("responding with %s" % str(data))
+        if self.method.atomic:
+            self.response_list.append(data)
+        else:
+            self.session.send(self._build_response_msg(data))
+
+    def respond_complete(self, data):
+        ''' Sends a complete message accompanied by the final result if applicable '''
+
+        if self.complete: 
+            return
+        self.copmlete = True
+        self.complete_time = time.time()
+
+        if self.method.atomic:
+            if data is not None:
+                self.response_list.append(data) 
+            self.session.send([
+                self._build_response_msg(self.response_list),
+                self._build_complete_msg(),
+            ])
+
+        elif data is not None:
+            self.session.send([
+                self._build_response_msg(data),
+                self._build_complete_msg(),
+            ])
+
+        else:
+            self.session.send(self._build_complete_msg())
+            
 
 class MultiSession(object):
     ''' Manages multiple requests.  With the current implementation, a 1 second 
@@ -347,7 +447,7 @@ class MultiSession(object):
         cont.id = len(self.reqs)
         self.reqs.append(cont)
 
-    def recv(self, timeout=10):
+    def recv(self, timeout=120):
         ''' Returns a tuple of req_id, response '''
         duration = 0
         block_time = 1