1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
16 import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
17 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
18 OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
19 OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
20 OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
22 import random, os, time, threading
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
29 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
30 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
32 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
35 class Session(object):
36 """Abstract session superclass."""
38 ''' Global cache of in-service sessions '''
42 # by default, we're connected to no one
43 self.state = OSRF_APP_SESSION_DISCONNECTED
50 def find_or_create(thread):
51 if thread in Session.session_cache:
52 return Session.session_cache[thread]
53 return ServerSession(thread)
55 def set_remote_id(self, remoteid):
56 self.remote_id = remoteid
57 osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
59 def wait(self, timeout=120):
60 """Wait up to <timeout> seconds for data to arrive on the network"""
61 osrf.log.log_internal("Session.wait(%d)" % timeout)
62 handle = osrf.net.get_network_handle()
65 def send(self, omessages):
66 """Sends an OpenSRF message"""
67 if not isinstance(omessages, list):
68 omessages = [omessages]
70 net_msg = osrf.net.NetworkMessage(
71 recipient = self.remote_id,
72 body = osrf.json.to_json(omessages),
77 handle = osrf.net.get_network_handle()
81 """Removes the session from the global session cache."""
82 del Session.session_cache[self.thread]
84 class ClientSession(Session):
85 """Client session object. Use this to make server requests."""
87 def __init__(self, service, locale='en_US'):
89 # call superclass constructor
90 Session.__init__(self)
92 # the service we are sending requests to
93 self.service = service
95 # the locale we want requests to be returned in
98 # find the remote service handle <router>@<domain>/<service>
99 domain = osrf.conf.get('domain', 0)
100 router = osrf.conf.get('router_name')
101 self.remote_id = "%s@%s/%s" % (router, domain, service)
102 self.orig_remote_id = self.remote_id
104 # generate a random message thread
105 self.thread = "%s%s%s%s" % (os.getpid(),
106 str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
108 # how many requests this session has taken part in
111 # cache of request objects
114 # cache this session in the global session cache
115 Session.session_cache[self.thread] = self
117 def reset_request_timeout(self, rid):
118 req = self.find_request(rid)
120 req.reset_timeout = True
123 def request2(self, method, arr):
124 """Creates a new request and sends the request to the server using a python array as the params."""
125 return self.__request(method, arr)
127 def request(self, method, *args):
128 """Creates a new request and sends the request to the server using a variable argument list as params"""
130 return self.__request(method, arr)
132 def __request(self, method, arr):
133 """Builds the request object and sends it."""
134 if self.state != OSRF_APP_SESSION_CONNECTED:
135 self.reset_remote_id()
137 osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
138 req = ClientRequest(self, self.next_id, method, arr, self.locale)
139 self.requests[str(self.next_id)] = req
145 def connect(self, timeout=10):
146 """Connects to a remote service"""
148 if self.state == OSRF_APP_SESSION_CONNECTED:
150 self.state = OSRF_APP_SESSION_CONNECTING
152 # construct and send a CONNECT message
154 osrf.net_obj.NetworkObject.osrfMessage(
156 'type' : OSRF_MESSAGE_TYPE_CONNECT
161 while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
164 timeout -= time.time() - start
166 if self.state != OSRF_APP_SESSION_CONNECTED:
167 raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
171 def disconnect(self):
172 """Disconnects from a remote service"""
174 if self.state == OSRF_APP_SESSION_DISCONNECTED:
178 osrf.net_obj.NetworkObject.osrfMessage(
180 'type' : OSRF_MESSAGE_TYPE_DISCONNECT
185 self.state = OSRF_APP_SESSION_DISCONNECTED
189 def reset_remote_id(self):
190 """Recovers the original remote id"""
191 self.remote_id = self.orig_remote_id
192 osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
194 def push_response_queue(self, message):
195 """Pushes the message payload onto the response queue
196 for the request associated with the message's ID."""
197 osrf.log.log_debug("pushing %s" % message.payload())
199 self.find_request(message.threadTrace()).push_response(message.payload())
201 osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
203 def find_request(self, rid):
204 """Returns the original request matching this message's threadTrace."""
206 return self.requests[str(rid)]
208 osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
212 def atomic_request(service, method, *args):
213 ses = ClientSession(service)
214 req = ses.request2(method, list(args))
218 data = resp.content()
226 class Request(object):
227 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
228 self.session = session # my session handle
229 self.rid = rid # my unique request ID
230 self.method = method # method name
231 self.params = params # my method params
233 self.complete = False # is this request done?
234 self.complete_time = 0 # time at which the request was completed
237 class ClientRequest(Request):
238 """Represents a single OpenSRF request.
239 A request is made and any resulting respones are
240 collected for the client."""
242 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
243 Request.__init__(self, session, rid, method, params, locale)
244 self.queue = [] # response queue
245 self.reset_timeout = False # resets the recv timeout?
246 self.send_time = 0 # local time the request was put on the wire
247 self.first_response_time = 0 # time it took for our first reponse to be received
250 """Sends a request message"""
252 # construct the method object message with params and method name
253 method = osrf.net_obj.NetworkObject.osrfMethod( {
254 'method' : self.method,
255 'params' : self.params
258 # construct the osrf message with our method message embedded
259 message = osrf.net_obj.NetworkObject.osrfMessage( {
260 'threadTrace' : self.rid,
261 'type' : OSRF_MESSAGE_TYPE_REQUEST,
263 'locale' : self.locale
266 self.send_time = time.time()
267 self.session.send(message)
269 def recv(self, timeout=120):
270 """ Waits up to <timeout> seconds for a response to this request.
272 If a message is received in time, the response message is returned.
273 Returns None otherwise."""
277 orig_timeout = timeout
278 while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
281 self.session.wait(timeout)
283 if self.reset_timeout:
284 self.reset_timeout = False
285 timeout = orig_timeout
287 elif orig_timeout >= 0:
288 timeout -= time.time() - s
292 # -----------------------------------------------------------------
293 # log some statistics
294 if len(self.queue) > 0:
295 if not self.first_response_time:
296 self.first_response_time = now
297 osrf.log.log_debug("time elapsed before first response: %f" \
298 % (self.first_response_time - self.send_time))
301 if not self.complete_time:
302 self.complete_time = now
303 osrf.log.log_debug("time elapsed before complete: %f" \
304 % (self.complete_time - self.send_time))
305 # -----------------------------------------------------------------
308 if len(self.queue) > 0:
309 # we have a reponse, return it
310 return self.queue.pop(0)
314 def push_response(self, content):
315 """Pushes a method response onto this requests response queue."""
316 self.queue.append(content)
319 """Cleans up request data from the cache.
321 Do this when you are done with a request to prevent "leaked" cache memory."""
322 del self.session.requests[str(self.rid)]
324 def set_complete(self):
325 """Sets me as complete. This means the server has sent a 'request complete' message"""
329 class ServerSession(Session):
330 """Implements a server-side session"""
332 def __init__(self, thread):
333 Session.__init__(self)
336 def send_status(self, thread_trace, payload):
338 osrf.net_obj.NetworkObject.osrfMessage(
339 { 'threadTrace' : thread_trace,
340 'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
342 'locale' : self.locale
347 def send_connect_ok(self, thread_trace):
348 status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
349 'status' : 'Connection Successful',
350 'statusCode': osrf.const.OSRF_STATUS_OK
352 self.send_status(thread_trace, status_msg)
355 class ServerRequest(Request):
357 def __init__(self, session, rid, method, params=[]):
358 Request.__init__(self, session, rid, method, params, session.locale)
359 self.response_list = []
361 def _build_response_msg(self, data):
362 result = osrf.net_obj.NetworkObject.osrfResult({
364 'statusCode' : osrf.const.OSRF_STATUS_OK,
368 return osrf.net_obj.NetworkObject.osrfMessage({
369 'threadTrace' : self.rid,
370 'type' : OSRF_MESSAGE_TYPE_RESULT,
372 'locale' : self.locale
375 def _build_complete_msg(self):
377 status = osrf.net_obj.NetworkObject.osrfConnectStatus({
378 'threadTrace' : self.rid,
379 'status' : 'Request Complete',
380 'statusCode': osrf.const.OSRF_STATUS_COMPLETE
383 return osrf.net_obj.NetworkObject.osrfMessage({
384 'threadTrace' : self.rid,
385 'type' : OSRF_MESSAGE_TYPE_STATUS,
387 'locale' : self.locale
390 def respond(self, data):
391 ''' For non-atomic calls, this sends a response directly back
392 to the client. For atomic calls, this pushes the response
393 onto the response list '''
394 osrf.log.log_internal("responding with %s" % str(data))
395 if self.method.atomic:
396 self.response_list.append(data)
398 self.session.send(self._build_response_msg(data))
400 def respond_complete(self, data):
401 ''' Sends a complete message accompanied by the final result if applicable '''
406 self.complete_time = time.time()
408 if self.method.atomic:
410 self.response_list.append(data)
412 self._build_response_msg(self.response_list),
413 self._build_complete_msg(),
416 elif data is not None:
418 self._build_response_msg(data),
419 self._build_complete_msg(),
423 self.session.send(self._build_complete_msg())
426 class MultiSession(object):
427 ''' Manages multiple requests. With the current implementation, a 1 second
428 lag time before the first response is practically guaranteed. Use
429 only for long running requests.
431 Another approach would be a threaded version, but that would require
432 build-up and breakdown of thread-specific xmpp connections somewhere.
435 class Container(object):
436 def __init__(self, req):
441 self.complete = False
444 def request(self, service, method, *args):
445 ses = ClientSession(service)
446 cont = MultiSession.Container(ses.request(method, *args))
447 cont.id = len(self.reqs)
448 self.reqs.append(cont)
450 def recv(self, timeout=120):
451 ''' Returns a tuple of req_id, response '''
455 for i in range(0, len(self.reqs)):
460 if i == 0 and not res:
461 res = req.recv(block_time)
467 duration += block_time
468 if duration >= timeout:
472 self.reqs.pop(self.reqs.index(cont))
474 if len(self.reqs) == 0:
477 return cont.id, res.content()