1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
16 import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
17 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
18 OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
19 OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
20 OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
22 import random, os, time, threading
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
29 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
30 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
32 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
35 class Session(object):
36 """Abstract session superclass."""
38 ''' Global cache of in-service sessions '''
42 # by default, we're connected to no one
43 self.state = OSRF_APP_SESSION_DISCONNECTED
50 def find_or_create(thread):
51 if thread in Session.session_cache:
52 return Session.session_cache[thread]
53 return ServerSession(thread)
55 def set_remote_id(self, remoteid):
56 self.remote_id = remoteid
57 osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
59 def wait(self, timeout=120):
60 """Wait up to <timeout> seconds for data to arrive on the network"""
61 osrf.log.log_internal("Session.wait(%d)" % timeout)
62 handle = osrf.net.get_network_handle()
65 def send(self, omessages):
66 """Sends an OpenSRF message"""
67 if not isinstance(omessages, list):
68 omessages = [omessages]
70 net_msg = osrf.net.NetworkMessage(
71 recipient = self.remote_id,
72 body = osrf.json.to_json(omessages),
77 handle = osrf.net.get_network_handle()
81 """Removes the session from the global session cache."""
82 del Session.session_cache[self.thread]
84 class ClientSession(Session):
85 """Client session object. Use this to make server requests."""
87 def __init__(self, service, locale='en_US'):
89 # call superclass constructor
90 Session.__init__(self)
92 # the service we are sending requests to
93 self.service = service
95 # the locale we want requests to be returned in
98 # find the remote service handle <router>@<domain>/<service>
99 domain = osrf.conf.get('domain', 0)
100 router = osrf.conf.get('router_name')
101 self.remote_id = "%s@%s/%s" % (router, domain, service)
102 self.orig_remote_id = self.remote_id
104 # generate a random message thread
105 self.thread = "%s%s%s%s" % (os.getpid(),
106 str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
108 # how many requests this session has taken part in
111 # cache of request objects
114 # cache this session in the global session cache
115 Session.session_cache[self.thread] = self
117 def reset_request_timeout(self, rid):
118 req = self.find_request(rid)
120 req.reset_timeout = True
123 def request2(self, method, arr):
124 """Creates a new request and sends the request to the server using a python array as the params."""
125 return self.__request(method, arr)
127 def request(self, method, *args):
128 """Creates a new request and sends the request to the server using a variable argument list as params"""
130 return self.__request(method, arr)
132 def __request(self, method, arr):
133 """Builds the request object and sends it."""
134 if self.state != OSRF_APP_SESSION_CONNECTED:
135 self.reset_remote_id()
139 osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
140 req = ClientRequest(self, self.next_id, method, arr, self.locale)
141 self.requests[str(self.next_id)] = req
147 def connect(self, timeout=10):
148 """Connects to a remote service"""
150 if self.state == OSRF_APP_SESSION_CONNECTED:
152 self.state = OSRF_APP_SESSION_CONNECTING
154 # construct and send a CONNECT message
156 osrf.net_obj.NetworkObject.osrfMessage(
158 'type' : OSRF_MESSAGE_TYPE_CONNECT
163 while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
166 timeout -= time.time() - start
168 if self.state != OSRF_APP_SESSION_CONNECTED:
169 raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
173 def disconnect(self):
174 """Disconnects from a remote service"""
176 if self.state == OSRF_APP_SESSION_DISCONNECTED:
180 osrf.net_obj.NetworkObject.osrfMessage(
182 'type' : OSRF_MESSAGE_TYPE_DISCONNECT
187 self.state = OSRF_APP_SESSION_DISCONNECTED
191 def reset_remote_id(self):
192 """Recovers the original remote id"""
193 self.remote_id = self.orig_remote_id
194 osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
196 def push_response_queue(self, message):
197 """Pushes the message payload onto the response queue
198 for the request associated with the message's ID."""
199 osrf.log.log_debug("pushing %s" % message.payload())
201 self.find_request(message.threadTrace()).push_response(message.payload())
203 osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
205 def find_request(self, rid):
206 """Returns the original request matching this message's threadTrace."""
208 return self.requests[str(rid)]
210 osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
214 def atomic_request(service, method, *args):
215 ses = ClientSession(service)
216 req = ses.request2(method, list(args))
220 data = resp.content()
228 class Request(object):
229 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
230 self.session = session # my session handle
231 self.rid = rid # my unique request ID
232 self.method = method # method name
233 self.params = params # my method params
235 self.complete = False # is this request done?
236 self.complete_time = 0 # time at which the request was completed
239 class ClientRequest(Request):
240 """Represents a single OpenSRF request.
241 A request is made and any resulting respones are
242 collected for the client."""
244 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
245 Request.__init__(self, session, rid, method, params, locale)
246 self.queue = [] # response queue
247 self.reset_timeout = False # resets the recv timeout?
248 self.send_time = 0 # local time the request was put on the wire
249 self.first_response_time = 0 # time it took for our first reponse to be received
252 """Sends a request message"""
254 # construct the method object message with params and method name
255 method = osrf.net_obj.NetworkObject.osrfMethod( {
256 'method' : self.method,
257 'params' : self.params
260 # construct the osrf message with our method message embedded
261 message = osrf.net_obj.NetworkObject.osrfMessage( {
262 'threadTrace' : self.rid,
263 'type' : OSRF_MESSAGE_TYPE_REQUEST,
265 'locale' : self.locale
268 self.send_time = time.time()
269 self.session.send(message)
271 def recv(self, timeout=120):
272 """ Waits up to <timeout> seconds for a response to this request.
274 If a message is received in time, the response message is returned.
275 Returns None otherwise."""
279 orig_timeout = timeout
280 while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
283 self.session.wait(timeout)
285 if self.reset_timeout:
286 self.reset_timeout = False
287 timeout = orig_timeout
289 elif orig_timeout >= 0:
290 timeout -= time.time() - s
294 # -----------------------------------------------------------------
295 # log some statistics
296 if len(self.queue) > 0:
297 if not self.first_response_time:
298 self.first_response_time = now
299 osrf.log.log_debug("time elapsed before first response: %f" \
300 % (self.first_response_time - self.send_time))
303 if not self.complete_time:
304 self.complete_time = now
305 osrf.log.log_debug("time elapsed before complete: %f" \
306 % (self.complete_time - self.send_time))
307 # -----------------------------------------------------------------
310 if len(self.queue) > 0:
311 # we have a reponse, return it
312 return self.queue.pop(0)
316 def push_response(self, content):
317 """Pushes a method response onto this requests response queue."""
318 self.queue.append(content)
321 """Cleans up request data from the cache.
323 Do this when you are done with a request to prevent "leaked" cache memory."""
324 del self.session.requests[str(self.rid)]
326 def set_complete(self):
327 """Sets me as complete. This means the server has sent a 'request complete' message"""
331 class ServerSession(Session):
332 """Implements a server-side session"""
334 def __init__(self, thread):
335 Session.__init__(self)
338 def send_status(self, thread_trace, payload):
340 osrf.net_obj.NetworkObject.osrfMessage(
341 { 'threadTrace' : thread_trace,
342 'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
344 'locale' : self.locale
349 def send_connect_ok(self, thread_trace):
350 status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
351 'status' : 'Connection Successful',
352 'statusCode': osrf.const.OSRF_STATUS_OK
354 self.send_status(thread_trace, status_msg)
357 class ServerRequest(Request):
359 def __init__(self, session, rid, method, params=[]):
360 Request.__init__(self, session, rid, method, params, session.locale)
361 self.response_list = []
363 def _build_response_msg(self, data):
364 result = osrf.net_obj.NetworkObject.osrfResult({
366 'statusCode' : osrf.const.OSRF_STATUS_OK,
370 return osrf.net_obj.NetworkObject.osrfMessage({
371 'threadTrace' : self.rid,
372 'type' : OSRF_MESSAGE_TYPE_RESULT,
374 'locale' : self.locale
377 def _build_complete_msg(self):
379 status = osrf.net_obj.NetworkObject.osrfConnectStatus({
380 'threadTrace' : self.rid,
381 'status' : 'Request Complete',
382 'statusCode': osrf.const.OSRF_STATUS_COMPLETE
385 return osrf.net_obj.NetworkObject.osrfMessage({
386 'threadTrace' : self.rid,
387 'type' : OSRF_MESSAGE_TYPE_STATUS,
389 'locale' : self.locale
392 def respond(self, data):
393 ''' For non-atomic calls, this sends a response directly back
394 to the client. For atomic calls, this pushes the response
395 onto the response list '''
396 osrf.log.log_internal("responding with %s" % str(data))
397 if self.method.atomic:
398 self.response_list.append(data)
400 self.session.send(self._build_response_msg(data))
402 def respond_complete(self, data):
403 ''' Sends a complete message accompanied by the final result if applicable '''
408 self.complete_time = time.time()
410 if self.method.atomic:
412 self.response_list.append(data)
414 self._build_response_msg(self.response_list),
415 self._build_complete_msg(),
418 elif data is not None:
420 self._build_response_msg(data),
421 self._build_complete_msg(),
425 self.session.send(self._build_complete_msg())
428 class MultiSession(object):
429 ''' Manages multiple requests. With the current implementation, a 1 second
430 lag time before the first response is practically guaranteed. Use
431 only for long running requests.
433 Another approach would be a threaded version, but that would require
434 build-up and breakdown of thread-specific xmpp connections somewhere.
437 class Container(object):
438 def __init__(self, req):
443 self.complete = False
446 def request(self, service, method, *args):
447 ses = ClientSession(service)
448 cont = MultiSession.Container(ses.request(method, *args))
449 cont.id = len(self.reqs)
450 self.reqs.append(cont)
452 def recv(self, timeout=120):
453 ''' Returns a tuple of req_id, response '''
457 for i in range(0, len(self.reqs)):
462 if i == 0 and not res:
463 res = req.recv(block_time)
469 duration += block_time
470 if duration >= timeout:
474 self.reqs.pop(self.reqs.index(cont))
476 if len(self.reqs) == 0:
479 return cont.id, res.content()