1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
21 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
22 OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
23 OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
24 OSRF_MESSAGE_TYPE_REQUEST
26 import random, os, time, threading
29 # -----------------------------------------------------------------------
30 # Go ahead and register the common network objects
31 # -----------------------------------------------------------------------
32 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
33 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
34 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
35 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
36 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
39 class Session(object):
40 """Abstract session superclass."""
42 ''' Global cache of in-service sessions '''
46 # by default, we're connected to no one
47 self.state = OSRF_APP_SESSION_DISCONNECTED
51 def find_session(threadTrace):
52 return Session.session_cache.get(threadTrace)
53 find_session = staticmethod(find_session)
55 def wait(self, timeout=120):
56 """Wait up to <timeout> seconds for data to arrive on the network"""
57 osrf.log.log_internal("Session.wait(%d)" % timeout)
58 handle = osrf.net.get_network_handle()
61 def send(self, omessage):
62 """Sends an OpenSRF message"""
63 net_msg = osrf.net.NetworkMessage(
64 recipient = self.remote_id,
65 body = osrf.json.to_json([omessage]),
70 handle = osrf.net.get_network_handle()
74 """Removes the session from the global session cache."""
75 del Session.session_cache[self.thread]
77 class ClientSession(Session):
78 """Client session object. Use this to make server requests."""
80 def __init__(self, service, locale='en_US'):
82 # call superclass constructor
83 Session.__init__(self)
85 # the remote service we want to make requests of
86 self.service = service
88 # the locale we want requests to be returned in
91 # find the remote service handle <router>@<domain>/<service>
92 domain = osrf.conf.get('domains.domain', 0)
93 router = osrf.conf.get('router_name')
94 self.remote_id = "%s@%s/%s" % (router, domain, service)
95 self.orig_remote_id = self.remote_id
97 # generate a random message thread
98 self.thread = "%s%s%s%s" % (os.getpid(),
99 str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
101 # how many requests this session has taken part in
104 # cache of request objects
107 # cache this session in the global session cache
108 Session.session_cache[self.thread] = self
110 def reset_request_timeout(self, rid):
111 req = self.find_request(rid)
113 req.reset_timeout = True
116 def request2(self, method, arr):
117 """Creates a new request and sends the request to the server using a python array as the params."""
118 return self.__request(method, arr)
120 def request(self, method, *args):
121 """Creates a new request and sends the request to the server using a variable argument list as params"""
123 return self.__request(method, arr)
125 def __request(self, method, arr):
126 """Builds the request object and sends it."""
127 if self.state != OSRF_APP_SESSION_CONNECTED:
128 self.reset_remote_id()
130 osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
131 req = Request(self, self.next_id, method, arr, self.locale)
132 self.requests[str(self.next_id)] = req
138 def connect(self, timeout=10):
139 """Connects to a remote service"""
141 if self.state == OSRF_APP_SESSION_CONNECTED:
143 self.state == OSRF_APP_SESSION_CONNECTING
145 # construct and send a CONNECT message
147 osrf.net_obj.NetworkObject.osrfMessage(
149 'type' : OSRF_MESSAGE_TYPE_CONNECT
154 while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
157 timeout -= time.time() - start
159 if self.state != OSRF_APP_SESSION_CONNECTED:
160 raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
164 def disconnect(self):
165 """Disconnects from a remote service"""
167 if self.state == OSRF_APP_SESSION_DISCONNECTED:
171 osrf.net_obj.NetworkObject.osrfMessage(
173 'type' : OSRF_MESSAGE_TYPE_DISCONNECT
178 self.state = OSRF_APP_SESSION_DISCONNECTED
181 def set_remote_id(self, remoteid):
182 self.remote_id = remoteid
183 osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
185 def reset_remote_id(self):
186 """Recovers the original remote id"""
187 self.remote_id = self.orig_remote_id
188 osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
190 def push_response_queue(self, message):
191 """Pushes the message payload onto the response queue
192 for the request associated with the message's ID."""
193 osrf.log.log_debug("pushing %s" % message.payload())
195 self.find_request(message.threadTrace()).pushResponse(message.payload())
197 osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
199 def find_request(self, rid):
200 """Returns the original request matching this message's threadTrace."""
202 return self.requests[str(rid)]
204 osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
209 class Request(object):
210 """Represents a single OpenSRF request.
211 A request is made and any resulting respones are
212 collected for the client."""
214 def __init__(self, session, rid, method=None, params=[], locale='en-US'):
216 self.session = session # my session handle
217 self.rid = rid # my unique request ID
218 self.method = method # method name
219 self.params = params # my method params
220 self.queue = [] # response queue
221 self.reset_timeout = False # resets the recv timeout?
222 self.complete = False # has the server told us this request is done?
223 self.send_time = 0 # local time the request was put on the wire
224 self.complete_time = 0 # time the server told us the request was completed
225 self.first_response_time = 0 # time it took for our first reponse to be received
229 """Sends a request message"""
231 # construct the method object message with params and method name
232 method = osrf.net_obj.NetworkObject.osrfMethod( {
233 'method' : self.method,
234 'params' : self.params
237 # construct the osrf message with our method message embedded
238 message = osrf.net_obj.NetworkObject.osrfMessage( {
239 'threadTrace' : self.rid,
240 'type' : OSRF_MESSAGE_TYPE_REQUEST,
242 'locale' : self.locale
245 self.send_time = time.time()
246 self.session.send(message)
248 def recv(self, timeout=120):
249 """Waits up to <timeout> seconds for a response to this request.
251 If a message is received in time, the response message is returned.
252 Returns None otherwise."""
256 orig_timeout = timeout
257 while not self.complete and timeout >= 0 and len(self.queue) == 0:
259 self.session.wait(timeout)
260 timeout -= time.time() - s
261 if self.reset_timeout:
262 self.reset_timeout = False
263 timeout = orig_timeout
267 # -----------------------------------------------------------------
268 # log some statistics
269 if len(self.queue) > 0:
270 if not self.first_response_time:
271 self.first_response_time = now
272 osrf.log.log_debug("time elapsed before first response: %f" \
273 % (self.first_response_time - self.send_time))
276 if not self.complete_time:
277 self.complete_time = now
278 osrf.log.log_debug("time elapsed before complete: %f" \
279 % (self.complete_time - self.send_time))
280 # -----------------------------------------------------------------
283 if len(self.queue) > 0:
284 # we have a reponse, return it
285 return self.queue.pop(0)
289 def pushResponse(self, content):
290 """Pushes a method response onto this requests response queue."""
291 self.queue.append(content)
294 """Cleans up request data from the cache.
296 Do this when you are done with a request to prevent "leaked" cache memory."""
297 del self.session.requests[str(self.rid)]
299 def set_complete(self):
300 """Sets me as complete. This means the server has sent a 'request complete' message"""
304 class ServerSession(Session):
305 """Implements a server-side session"""
309 def AtomicRequest(service, method, *args):
310 ses = ClientSession(service)
311 req = ses.request2(method, list(args))
313 data = resp.content()