1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
16 import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
17 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
18 OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
19 OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
20 OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
22 import random, os, time, threading
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload', 'ingress'], 'hash')
29 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
30 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
32 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
35 class Session(object):
36 """Abstract session superclass."""
38 ''' Global cache of in-service sessions '''
40 current_ingress = 'opensrf';
43 # by default, we're connected to no one
44 self.state = OSRF_APP_SESSION_DISCONNECTED
51 def find_or_create(thread):
52 if thread in Session.session_cache:
53 return Session.session_cache[thread]
54 return ServerSession(thread)
59 Session.current_ingress = ingress
60 return Session.current_ingress
62 def set_remote_id(self, remoteid):
63 self.remote_id = remoteid
64 osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
66 def wait(self, timeout=120):
67 """Wait up to <timeout> seconds for data to arrive on the network"""
68 osrf.log.log_internal("Session.wait(%d)" % timeout)
69 handle = osrf.net.get_network_handle()
70 return handle.recv(timeout)
72 def send(self, omessages):
73 """Sends an OpenSRF message"""
74 if not isinstance(omessages, list):
75 omessages = [omessages]
78 msg.ingress(Session.current_ingress);
80 net_msg = osrf.net.NetworkMessage(
81 recipient = self.remote_id,
82 body = osrf.json.to_json(omessages),
87 handle = osrf.net.get_network_handle()
91 """Removes the session from the global session cache."""
92 del Session.session_cache[self.thread]
94 class ClientSession(Session):
95 """Client session object. Use this to make server requests."""
97 def __init__(self, service, locale='en-US'):
99 # call superclass constructor
100 Session.__init__(self)
102 # the service we are sending requests to
103 self.service = service
105 # the locale we want requests to be returned in
108 # find the remote service handle <router>@<domain>/<service>
109 domain = osrf.conf.get('domain', 0)
110 router = osrf.conf.get('router_name')
111 self.remote_id = "%s@%s/%s" % (router, domain, service)
112 self.orig_remote_id = self.remote_id
114 # generate a random message thread
115 self.thread = "%s%s%s%s" % (os.getpid(),
116 str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
118 # how many requests this session has taken part in
121 # cache of request objects
124 # cache this session in the global session cache
125 Session.session_cache[self.thread] = self
127 def reset_request_timeout(self, rid):
128 req = self.find_request(rid)
130 req.reset_timeout = True
133 def request2(self, method, arr):
134 """Creates a new request and sends the request to the server using a python array as the params."""
135 return self.__request(method, arr)
137 def request(self, method, *args):
138 """Creates a new request and sends the request to the server using a variable argument list as params"""
140 return self.__request(method, arr)
142 def __request(self, method, arr):
143 """Builds the request object and sends it."""
144 if self.state != OSRF_APP_SESSION_CONNECTED:
145 self.reset_remote_id()
149 osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
150 req = ClientRequest(self, self.next_id, method, arr, self.locale)
151 self.requests[str(self.next_id)] = req
157 def connect(self, timeout=10):
158 """Connects to a remote service"""
160 if self.state == OSRF_APP_SESSION_CONNECTED:
162 self.state = OSRF_APP_SESSION_CONNECTING
164 # construct and send a CONNECT message
166 osrf.net_obj.NetworkObject.osrfMessage(
168 'type' : OSRF_MESSAGE_TYPE_CONNECT
173 while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
176 timeout -= time.time() - start
178 if self.state != OSRF_APP_SESSION_CONNECTED:
179 raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
183 def disconnect(self):
184 """Disconnects from a remote service"""
186 if self.state == OSRF_APP_SESSION_DISCONNECTED:
190 osrf.net_obj.NetworkObject.osrfMessage(
192 'type' : OSRF_MESSAGE_TYPE_DISCONNECT
197 self.state = OSRF_APP_SESSION_DISCONNECTED
201 def reset_remote_id(self):
202 """Recovers the original remote id"""
203 self.remote_id = self.orig_remote_id
204 osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
206 def push_response_queue(self, message):
207 """Pushes the message payload onto the response queue
208 for the request associated with the message's ID."""
209 osrf.log.log_debug("pushing %s" % message.payload())
211 self.find_request(message.threadTrace()).push_response(message.payload())
213 osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
215 def find_request(self, rid):
216 """Returns the original request matching this message's threadTrace."""
218 return self.requests[str(rid)]
220 osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
224 def atomic_request(service, method, *args):
225 ses = ClientSession(service)
226 req = ses.request2(method, list(args))
230 data = resp.content()
238 class Request(object):
239 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
240 self.session = session # my session handle
241 self.rid = rid # my unique request ID
242 self.method = method # method name
243 self.params = params # my method params
245 self.complete = False # is this request done?
246 self.complete_time = 0 # time at which the request was completed
249 class ClientRequest(Request):
250 """Represents a single OpenSRF request.
251 A request is made and any resulting respones are
252 collected for the client."""
254 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
255 Request.__init__(self, session, rid, method, params, locale)
256 self.queue = [] # response queue
257 self.reset_timeout = False # resets the recv timeout?
258 self.send_time = 0 # local time the request was put on the wire
259 self.first_response_time = 0 # time it took for our first reponse to be received
262 """Sends a request message"""
264 # construct the method object message with params and method name
265 method = osrf.net_obj.NetworkObject.osrfMethod( {
266 'method' : self.method,
267 'params' : self.params
270 # construct the osrf message with our method message embedded
271 message = osrf.net_obj.NetworkObject.osrfMessage( {
272 'threadTrace' : self.rid,
273 'type' : OSRF_MESSAGE_TYPE_REQUEST,
275 'locale' : self.locale
278 self.send_time = time.time()
279 self.session.send(message)
281 def recv(self, timeout=120):
282 """ Waits up to <timeout> seconds for a response to this request.
284 If a message is received in time, the response message is returned.
285 Returns None otherwise."""
289 orig_timeout = timeout
290 while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
293 self.session.wait(timeout)
295 if self.reset_timeout:
296 self.reset_timeout = False
297 timeout = orig_timeout
299 elif orig_timeout >= 0:
300 timeout -= time.time() - s
304 # -----------------------------------------------------------------
305 # log some statistics
306 if len(self.queue) > 0:
307 if not self.first_response_time:
308 self.first_response_time = now
309 osrf.log.log_debug("time elapsed before first response: %f" \
310 % (self.first_response_time - self.send_time))
313 if not self.complete_time:
314 self.complete_time = now
315 osrf.log.log_debug("time elapsed before complete: %f" \
316 % (self.complete_time - self.send_time))
317 # -----------------------------------------------------------------
320 if len(self.queue) > 0:
321 # we have a reponse, return it
322 return self.queue.pop(0)
326 def push_response(self, content):
327 """Pushes a method response onto this requests response queue."""
328 self.queue.append(content)
331 """Cleans up request data from the cache.
333 Do this when you are done with a request to prevent "leaked" cache memory."""
334 del self.session.requests[str(self.rid)]
336 def set_complete(self):
337 """Sets me as complete. This means the server has sent a 'request complete' message"""
341 class ServerSession(Session):
342 """Implements a server-side session"""
344 def __init__(self, thread):
345 Session.__init__(self)
348 self.session_data = {}
349 Session.session_cache[self.thread] = self
351 def send_status(self, thread_trace, payload):
353 osrf.net_obj.NetworkObject.osrfMessage(
354 { 'threadTrace' : thread_trace,
355 'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
357 'locale' : self.locale
362 def send_connect_ok(self, thread_trace):
363 status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
364 'status' : 'Connection Successful',
365 'statusCode': osrf.const.OSRF_STATUS_OK
367 self.send_status(thread_trace, status_msg)
369 def send_method_not_found(self, thread_trace, method_name):
370 status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
371 'status' : 'Method [%s] not found for %s' % (method_name, self.service),
372 'statusCode': osrf.const.OSRF_STATUS_NOTFOUND
374 self.send_status(thread_trace, status_msg)
377 def run_callback(self, type):
378 if type in self.callbacks:
379 self.callbacks[type](self)
381 def register_callback(self, type, func):
382 self.callbacks[type] = func
385 Session.cleanup(self)
386 self.run_callback('death')
389 class ServerRequest(Request):
391 def __init__(self, session, rid, method, params=[]):
392 Request.__init__(self, session, rid, method, params, session.locale)
393 self.response_list = []
395 def _build_response_msg(self, data):
396 result = osrf.net_obj.NetworkObject.osrfResult({
398 'statusCode' : osrf.const.OSRF_STATUS_OK,
402 return osrf.net_obj.NetworkObject.osrfMessage({
403 'threadTrace' : self.rid,
404 'type' : OSRF_MESSAGE_TYPE_RESULT,
406 'locale' : self.locale
409 def _build_complete_msg(self):
411 status = osrf.net_obj.NetworkObject.osrfConnectStatus({
412 'threadTrace' : self.rid,
413 'status' : 'Request Complete',
414 'statusCode': osrf.const.OSRF_STATUS_COMPLETE
417 return osrf.net_obj.NetworkObject.osrfMessage({
418 'threadTrace' : self.rid,
419 'type' : OSRF_MESSAGE_TYPE_STATUS,
421 'locale' : self.locale
424 def respond(self, data):
425 ''' For non-atomic calls, this sends a response directly back
426 to the client. For atomic calls, this pushes the response
427 onto the response list '''
428 osrf.log.log_internal("responding with %s" % str(data))
429 if self.method.atomic:
430 self.response_list.append(data)
432 self.session.send(self._build_response_msg(data))
434 def respond_complete(self, data):
435 ''' Sends a complete message accompanied by the final result if applicable '''
440 self.complete_time = time.time()
442 if self.method.atomic:
444 self.response_list.append(data)
446 self._build_response_msg(self.response_list),
447 self._build_complete_msg(),
450 elif data is not None:
452 self._build_response_msg(data),
453 self._build_complete_msg(),
457 self.session.send(self._build_complete_msg())
460 class MultiSession(object):
461 ''' Manages multiple requests. With the current implementation, a 1 second
462 lag time before the first response is practically guaranteed. Use
463 only for long running requests.
465 Another approach would be a threaded version, but that would require
466 build-up and breakdown of thread-specific xmpp connections somewhere.
469 class Container(object):
470 def __init__(self, req):
475 self.complete = False
478 def request(self, service, method, *args):
479 ses = ClientSession(service)
480 cont = MultiSession.Container(ses.request(method, *args))
481 cont.id = len(self.reqs)
482 self.reqs.append(cont)
484 def recv(self, timeout=120):
485 ''' Returns a tuple of req_id, response '''
489 for i in range(0, len(self.reqs)):
494 if i == 0 and not res:
495 res = req.recv(block_time)
501 duration += block_time
502 if duration >= timeout:
506 self.reqs.pop(self.reqs.index(cont))
508 if len(self.reqs) == 0:
511 return cont.id, res.content()