# GNU General Public License for more details.
# -----------------------------------------------------------------------
-import osrf.json
-import osrf.conf
-import osrf.log
-import osrf.net
-import osrf.net_obj
+import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
from osrf.const import OSRF_APP_SESSION_CONNECTED, \
OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
- OSRF_MESSAGE_TYPE_REQUEST
+ OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
import osrf.ex
import random, os, time, threading
self.state = OSRF_APP_SESSION_DISCONNECTED
self.remote_id = None
self.locale = None
+ self.thread = None
+ self.service = None
- def find_session(threadTrace):
- return Session.session_cache.get(threadTrace)
- find_session = staticmethod(find_session)
+ @staticmethod
+ def find_or_create(thread):
+ if thread in Session.session_cache:
+ return Session.session_cache[thread]
+ return ServerSession(thread)
+
+ def set_remote_id(self, remoteid):
+ self.remote_id = remoteid
+ osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
def wait(self, timeout=120):
"""Wait up to <timeout> seconds for data to arrive on the network"""
handle = osrf.net.get_network_handle()
handle.recv(timeout)
- def send(self, omessage):
+ def send(self, omessages):
"""Sends an OpenSRF message"""
+ if not isinstance(omessages, list):
+ omessages = [omessages]
+
net_msg = osrf.net.NetworkMessage(
recipient = self.remote_id,
- body = osrf.json.to_json([omessage]),
+ body = osrf.json.to_json(omessages),
thread = self.thread,
locale = self.locale,
)
# call superclass constructor
Session.__init__(self)
- # the remote service we want to make requests of
+ # the service we are sending requests to
self.service = service
# the locale we want requests to be returned in
self.reset_remote_id()
osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
- req = Request(self, self.next_id, method, arr, self.locale)
+ req = ClientRequest(self, self.next_id, method, arr, self.locale)
self.requests[str(self.next_id)] = req
self.next_id += 1
req.send()
self.state = OSRF_APP_SESSION_DISCONNECTED
- def set_remote_id(self, remoteid):
- self.remote_id = remoteid
- osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
def reset_remote_id(self):
"""Recovers the original remote id"""
class Request(object):
- """Represents a single OpenSRF request.
- A request is made and any resulting respones are
- collected for the client."""
-
def __init__(self, session, rid, method=None, params=[], locale='en-US'):
-
self.session = session # my session handle
self.rid = rid # my unique request ID
self.method = method # method name
self.params = params # my method params
+ self.locale = locale
+ self.complete = False # is this request done?
+ self.complete_time = 0 # time at which the request was completed
+
+
+class ClientRequest(Request):
+ """Represents a single OpenSRF request.
+ A request is made and any resulting respones are
+ collected for the client."""
+
+ def __init__(self, session, rid, method=None, params=[], locale='en-US'):
+ Request.__init__(self, session, rid, method, params, locale)
self.queue = [] # response queue
self.reset_timeout = False # resets the recv timeout?
- self.complete = False # has the server told us this request is done?
self.send_time = 0 # local time the request was put on the wire
- self.complete_time = 0 # time the server told us the request was completed
self.first_response_time = 0 # time it took for our first reponse to be received
- self.locale = locale
def send(self):
"""Sends a request message"""
orig_timeout = timeout
while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
+
s = time.time()
self.session.wait(timeout)
- if orig_timeout >= 0:
- timeout -= time.time() - s
+
if self.reset_timeout:
self.reset_timeout = False
timeout = orig_timeout
+ elif orig_timeout >= 0:
+ timeout -= time.time() - s
+
now = time.time()
# -----------------------------------------------------------------
class ServerSession(Session):
"""Implements a server-side session"""
- pass
+ def __init__(self, thread):
+ Session.__init__(self)
+ self.thread = thread
+
+ def send_status(self, thread_trace, payload):
+ self.send(
+ osrf.net_obj.NetworkObject.osrfMessage(
+ { 'threadTrace' : thread_trace,
+ 'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
+ 'payload' : payload,
+ 'locale' : self.locale
+ }
+ )
+ )
+
+ def send_connect_ok(self, thread_trace):
+ status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
+ 'status' : 'Connection Successful',
+ 'statusCode': osrf.const.OSRF_STATUS_OK
+ })
+ self.send_status(thread_trace, status_msg)
+class ServerRequest(Request):
+ def __init__(self, session, rid, method, params=[]):
+ Request.__init__(self, session, rid, method, params, session.locale)
+ self.response_list = []
+
+ def _build_response_msg(self, data):
+ result = osrf.net_obj.NetworkObject.osrfResult({
+ 'content' : data,
+ 'statusCode' : osrf.const.OSRF_STATUS_OK,
+ 'status' : 'OK'
+ })
+
+ return osrf.net_obj.NetworkObject.osrfMessage({
+ 'threadTrace' : self.rid,
+ 'type' : OSRF_MESSAGE_TYPE_RESULT,
+ 'payload' : result,
+ 'locale' : self.locale
+ })
+
+ def _build_complete_msg(self):
+
+ status = osrf.net_obj.NetworkObject.osrfConnectStatus({
+ 'threadTrace' : self.rid,
+ 'status' : 'Request Complete',
+ 'statusCode': osrf.const.OSRF_STATUS_COMPLETE
+ })
+
+ return osrf.net_obj.NetworkObject.osrfMessage({
+ 'threadTrace' : self.rid,
+ 'type' : OSRF_MESSAGE_TYPE_STATUS,
+ 'payload' : status,
+ 'locale' : self.locale
+ })
+
+ def respond(self, data):
+ ''' For non-atomic calls, this sends a response directly back
+ to the client. For atomic calls, this pushes the response
+ onto the response list '''
+ osrf.log.log_internal("responding with %s" % str(data))
+ if self.method.atomic:
+ self.response_list.append(data)
+ else:
+ self.session.send(self._build_response_msg(data))
+
+ def respond_complete(self, data):
+ ''' Sends a complete message accompanied by the final result if applicable '''
+
+ if self.complete:
+ return
+ self.copmlete = True
+ self.complete_time = time.time()
+
+ if self.method.atomic:
+ if data is not None:
+ self.response_list.append(data)
+ self.session.send([
+ self._build_response_msg(self.response_list),
+ self._build_complete_msg(),
+ ])
+
+ elif data is not None:
+ self.session.send([
+ self._build_response_msg(data),
+ self._build_complete_msg(),
+ ])
+
+ else:
+ self.session.send(self._build_complete_msg())
+
class MultiSession(object):
''' Manages multiple requests. With the current implementation, a 1 second
cont.id = len(self.reqs)
self.reqs.append(cont)
- def recv(self, timeout=10):
+ def recv(self, timeout=120):
''' Returns a tuple of req_id, response '''
duration = 0
block_time = 1