1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
16 import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
17 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
18 OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
19 OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
20 OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
22 import random, os, time, threading
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
29 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
30 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
32 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
35 class Session(object):
36 """Abstract session superclass."""
38 ''' Global cache of in-service sessions '''
42 # by default, we're connected to no one
43 self.state = OSRF_APP_SESSION_DISCONNECTED
51 def find_or_create(thread):
52 if thread in Session.session_cache:
53 return Session.session_cache[thread]
54 return ServerSession(thread)
56 def set_remote_id(self, remoteid):
57 self.remote_id = remoteid
58 osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
60 def wait(self, timeout=120):
61 """Wait up to <timeout> seconds for data to arrive on the network"""
62 osrf.log.log_internal("Session.wait(%d)" % timeout)
63 handle = osrf.net.get_network_handle()
66 def send(self, omessages):
67 """Sends an OpenSRF message"""
68 if not isinstance(omessages, list):
69 omessages = [omessages]
71 net_msg = osrf.net.NetworkMessage(
72 recipient = self.remote_id,
73 body = osrf.json.to_json(omessages),
78 handle = osrf.net.get_network_handle()
82 """Removes the session from the global session cache."""
83 del Session.session_cache[self.thread]
85 class ClientSession(Session):
86 """Client session object. Use this to make server requests."""
88 def __init__(self, service, locale='en-US'):
90 # call superclass constructor
91 Session.__init__(self)
93 # the service we are sending requests to
94 self.service = service
96 # the locale we want requests to be returned in
99 # find the remote service handle <router>@<domain>/<service>
100 domain = osrf.conf.get('domain', 0)
101 router = osrf.conf.get('router_name')
102 self.remote_id = "%s@%s/%s" % (router, domain, service)
103 self.orig_remote_id = self.remote_id
105 # generate a random message thread
106 self.thread = "%s%s%s%s" % (os.getpid(),
107 str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
109 # how many requests this session has taken part in
112 # cache of request objects
115 # cache this session in the global session cache
116 Session.session_cache[self.thread] = self
119 def reset_request_timeout(self, rid):
120 req = self.find_request(rid)
122 req.reset_timeout = True
125 def request2(self, method, arr):
126 """Creates a new request and sends the request to the server using a python array as the params."""
127 return self.__request(method, arr)
129 def request(self, method, *args):
130 """Creates a new request and sends the request to the server using a variable argument list as params"""
132 return self.__request(method, arr)
134 def __request(self, method, arr):
135 """Builds the request object and sends it."""
136 if self.state != OSRF_APP_SESSION_CONNECTED:
137 self.reset_remote_id()
141 osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
142 req = ClientRequest(self, self.next_id, method, arr, self.locale)
143 self.requests[str(self.next_id)] = req
149 def connect(self, timeout=10):
150 """Connects to a remote service"""
152 if self.state == OSRF_APP_SESSION_CONNECTED:
154 self.state = OSRF_APP_SESSION_CONNECTING
156 # construct and send a CONNECT message
158 osrf.net_obj.NetworkObject.osrfMessage(
160 'type' : OSRF_MESSAGE_TYPE_CONNECT
165 while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
168 timeout -= time.time() - start
170 if self.state != OSRF_APP_SESSION_CONNECTED:
171 raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
175 def disconnect(self):
176 """Disconnects from a remote service"""
178 if self.state == OSRF_APP_SESSION_DISCONNECTED:
182 osrf.net_obj.NetworkObject.osrfMessage(
184 'type' : OSRF_MESSAGE_TYPE_DISCONNECT
189 self.state = OSRF_APP_SESSION_DISCONNECTED
193 def reset_remote_id(self):
194 """Recovers the original remote id"""
195 self.remote_id = self.orig_remote_id
196 osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
198 def push_response_queue(self, message):
199 """Pushes the message payload onto the response queue
200 for the request associated with the message's ID."""
201 osrf.log.log_debug("pushing %s" % message.payload())
203 self.find_request(message.threadTrace()).push_response(message.payload())
205 osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
207 def find_request(self, rid):
208 """Returns the original request matching this message's threadTrace."""
210 return self.requests[str(rid)]
212 osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
216 def atomic_request(service, method, *args):
217 ses = ClientSession(service)
218 req = ses.request2(method, list(args))
222 data = resp.content()
230 class Request(object):
231 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
232 self.session = session # my session handle
233 self.rid = rid # my unique request ID
234 self.method = method # method name
235 self.params = params # my method params
237 self.complete = False # is this request done?
238 self.complete_time = 0 # time at which the request was completed
241 class ClientRequest(Request):
242 """Represents a single OpenSRF request.
243 A request is made and any resulting respones are
244 collected for the client."""
246 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
247 Request.__init__(self, session, rid, method, params, locale)
248 self.queue = [] # response queue
249 self.reset_timeout = False # resets the recv timeout?
250 self.send_time = 0 # local time the request was put on the wire
251 self.first_response_time = 0 # time it took for our first reponse to be received
254 """Sends a request message"""
256 # construct the method object message with params and method name
257 method = osrf.net_obj.NetworkObject.osrfMethod( {
258 'method' : self.method,
259 'params' : self.params
262 # construct the osrf message with our method message embedded
263 message = osrf.net_obj.NetworkObject.osrfMessage( {
264 'threadTrace' : self.rid,
265 'type' : OSRF_MESSAGE_TYPE_REQUEST,
267 'locale' : self.locale
270 self.send_time = time.time()
271 self.session.send(message)
273 def recv(self, timeout=120):
274 """ Waits up to <timeout> seconds for a response to this request.
276 If a message is received in time, the response message is returned.
277 Returns None otherwise."""
281 orig_timeout = timeout
282 while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
285 self.session.wait(timeout)
287 if self.reset_timeout:
288 self.reset_timeout = False
289 timeout = orig_timeout
291 elif orig_timeout >= 0:
292 timeout -= time.time() - s
296 # -----------------------------------------------------------------
297 # log some statistics
298 if len(self.queue) > 0:
299 if not self.first_response_time:
300 self.first_response_time = now
301 osrf.log.log_debug("time elapsed before first response: %f" \
302 % (self.first_response_time - self.send_time))
305 if not self.complete_time:
306 self.complete_time = now
307 osrf.log.log_debug("time elapsed before complete: %f" \
308 % (self.complete_time - self.send_time))
309 # -----------------------------------------------------------------
312 if len(self.queue) > 0:
313 # we have a reponse, return it
314 return self.queue.pop(0)
318 def push_response(self, content):
319 """Pushes a method response onto this requests response queue."""
320 self.queue.append(content)
323 """Cleans up request data from the cache.
325 Do this when you are done with a request to prevent "leaked" cache memory."""
326 del self.session.requests[str(self.rid)]
328 def set_complete(self):
329 """Sets me as complete. This means the server has sent a 'request complete' message"""
333 class ServerSession(Session):
334 """Implements a server-side session"""
336 def __init__(self, thread):
337 Session.__init__(self)
339 Session.session_cache[thread] = self
341 def send_status(self, thread_trace, payload):
343 osrf.net_obj.NetworkObject.osrfMessage(
344 { 'threadTrace' : thread_trace,
345 'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
347 'locale' : self.locale
352 def send_connect_ok(self, thread_trace):
353 status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
354 'status' : 'Connection Successful',
355 'statusCode': osrf.const.OSRF_STATUS_OK
357 self.send_status(thread_trace, status_msg)
360 class ServerRequest(Request):
362 def __init__(self, session, rid, method, params=[]):
363 Request.__init__(self, session, rid, method, params, session.locale)
364 self.response_list = []
366 def _build_response_msg(self, data):
367 result = osrf.net_obj.NetworkObject.osrfResult({
369 'statusCode' : osrf.const.OSRF_STATUS_OK,
373 return osrf.net_obj.NetworkObject.osrfMessage({
374 'threadTrace' : self.rid,
375 'type' : OSRF_MESSAGE_TYPE_RESULT,
377 'locale' : self.locale
380 def _build_complete_msg(self):
382 status = osrf.net_obj.NetworkObject.osrfConnectStatus({
383 'threadTrace' : self.rid,
384 'status' : 'Request Complete',
385 'statusCode': osrf.const.OSRF_STATUS_COMPLETE
388 return osrf.net_obj.NetworkObject.osrfMessage({
389 'threadTrace' : self.rid,
390 'type' : OSRF_MESSAGE_TYPE_STATUS,
392 'locale' : self.locale
395 def respond(self, data):
396 ''' For non-atomic calls, this sends a response directly back
397 to the client. For atomic calls, this pushes the response
398 onto the response list '''
399 osrf.log.log_internal("responding with %s" % str(data))
400 if self.method.atomic:
401 self.response_list.append(data)
403 self.session.send(self._build_response_msg(data))
405 def respond_complete(self, data):
406 ''' Sends a complete message accompanied by the final result if applicable '''
411 self.complete_time = time.time()
413 if self.method.atomic:
415 self.response_list.append(data)
417 self._build_response_msg(self.response_list),
418 self._build_complete_msg(),
421 elif data is not None:
423 self._build_response_msg(data),
424 self._build_complete_msg(),
428 self.session.send(self._build_complete_msg())
431 class MultiSession(object):
432 ''' Manages multiple requests. With the current implementation, a 1 second
433 lag time before the first response is practically guaranteed. Use
434 only for long running requests.
436 Another approach would be a threaded version, but that would require
437 build-up and breakdown of thread-specific xmpp connections somewhere.
440 class Container(object):
441 def __init__(self, req):
446 self.complete = False
449 def request(self, service, method, *args):
450 ses = ClientSession(service)
451 cont = MultiSession.Container(ses.request(method, *args))
452 cont.id = len(self.reqs)
453 self.reqs.append(cont)
455 def recv(self, timeout=120):
456 ''' Returns a tuple of req_id, response '''
460 for i in range(0, len(self.reqs)):
465 if i == 0 and not res:
466 res = req.recv(block_time)
472 duration += block_time
473 if duration >= timeout:
477 self.reqs.pop(self.reqs.index(cont))
479 if len(self.reqs) == 0:
482 return cont.id, res.content()