]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/ses.py
011b143fb8a290dd31a1b9b537c9be0769655cae
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 import osrf.json
17 import osrf.conf
18 import osrf.log
19 import osrf.net
20 import osrf.net_obj
21 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
22     OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
23     OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
24     OSRF_MESSAGE_TYPE_REQUEST
25 import osrf.ex
26 import random, os, time, threading
27
28
29 # -----------------------------------------------------------------------
30 # Go ahead and register the common network objects
31 # -----------------------------------------------------------------------
32 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
33 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
34 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
35 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
36 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
37
38
39 class Session(object):
40     """Abstract session superclass."""
41
42     ''' Global cache of in-service sessions '''
43     session_cache = {}
44
45     def __init__(self):
46         # by default, we're connected to no one
47         self.state = OSRF_APP_SESSION_DISCONNECTED
48         self.remote_id = None
49         self.locale = None
50
51     def find_session(threadTrace):
52         return Session.session_cache.get(threadTrace)
53     find_session = staticmethod(find_session)
54
55     def wait(self, timeout=120):
56         """Wait up to <timeout> seconds for data to arrive on the network"""
57         osrf.log.log_internal("Session.wait(%d)" % timeout)
58         handle = osrf.net.get_network_handle()
59         handle.recv(timeout)
60
61     def send(self, omessage):
62         """Sends an OpenSRF message"""
63         net_msg = osrf.net.NetworkMessage(
64             recipient      = self.remote_id,
65             body    = osrf.json.to_json([omessage]),
66             thread = self.thread,
67             locale = self.locale,
68         )
69
70         handle = osrf.net.get_network_handle()
71         handle.send(net_msg)
72
73     def cleanup(self):
74         """Removes the session from the global session cache."""
75         del Session.session_cache[self.thread]
76
77 class ClientSession(Session):
78     """Client session object.  Use this to make server requests."""
79
80     def __init__(self, service, locale='en_US'):
81         
82         # call superclass constructor
83         Session.__init__(self)
84
85         # the remote service we want to make requests of
86         self.service = service
87
88         # the locale we want requests to be returned in
89         self.locale = locale
90
91         # find the remote service handle <router>@<domain>/<service>
92         domain = osrf.conf.get('domain', 0)
93         router = osrf.conf.get('router_name')
94         self.remote_id = "%s@%s/%s" % (router, domain, service)
95         self.orig_remote_id = self.remote_id
96
97         # generate a random message thread
98         self.thread = "%s%s%s%s" % (os.getpid(), 
99             str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
100
101         # how many requests this session has taken part in
102         self.next_id = 0 
103
104         # cache of request objects 
105         self.requests = {}
106
107         # cache this session in the global session cache
108         Session.session_cache[self.thread] = self
109
110     def reset_request_timeout(self, rid):
111         req = self.find_request(rid)
112         if req:
113             req.reset_timeout = True
114             
115
116     def request2(self, method, arr):
117         """Creates a new request and sends the request to the server using a python array as the params."""
118         return self.__request(method, arr)
119
120     def request(self, method, *args):
121         """Creates a new request and sends the request to the server using a variable argument list as params"""
122         arr = list(args)
123         return self.__request(method, arr)
124
125     def __request(self, method, arr):
126         """Builds the request object and sends it."""
127         if self.state != OSRF_APP_SESSION_CONNECTED:
128             self.reset_remote_id()
129
130         osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
131         req = Request(self, self.next_id, method, arr, self.locale)
132         self.requests[str(self.next_id)] = req
133         self.next_id += 1
134         req.send()
135         return req
136
137
138     def connect(self, timeout=10):
139         """Connects to a remote service"""
140
141         if self.state == OSRF_APP_SESSION_CONNECTED:
142             return True
143         self.state == OSRF_APP_SESSION_CONNECTING
144
145         # construct and send a CONNECT message
146         self.send(
147             osrf.net_obj.NetworkObject.osrfMessage( 
148                 {   'threadTrace' : 0,
149                     'type' : OSRF_MESSAGE_TYPE_CONNECT
150                 } 
151             )
152         )
153
154         while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
155             start = time.time()
156             self.wait(timeout)
157             timeout -= time.time() - start
158         
159         if self.state != OSRF_APP_SESSION_CONNECTED:
160             raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
161         
162         return True
163
164     def disconnect(self):
165         """Disconnects from a remote service"""
166
167         if self.state == OSRF_APP_SESSION_DISCONNECTED:
168             return True
169
170         self.send(
171             osrf.net_obj.NetworkObject.osrfMessage( 
172                 {   'threadTrace' : 0,
173                     'type' : OSRF_MESSAGE_TYPE_DISCONNECT
174                 } 
175             )
176         )
177
178         self.state = OSRF_APP_SESSION_DISCONNECTED
179
180     
181     def set_remote_id(self, remoteid):
182         self.remote_id = remoteid
183         osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
184
185     def reset_remote_id(self):
186         """Recovers the original remote id"""
187         self.remote_id = self.orig_remote_id
188         osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
189
190     def push_response_queue(self, message):
191         """Pushes the message payload onto the response queue 
192             for the request associated with the message's ID."""
193         osrf.log.log_debug("pushing %s" % message.payload())
194         try:
195             self.find_request(message.threadTrace()).push_response(message.payload())
196         except Exception, e: 
197             osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
198
199     def find_request(self, rid):
200         """Returns the original request matching this message's threadTrace."""
201         try:
202             return self.requests[str(rid)]
203         except KeyError:
204             osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
205             return None
206
207     @staticmethod
208     def atomic_request(service, method, *args):
209         ses = ClientSession(service)
210         req = ses.request2(method, list(args))
211         resp = req.recv()
212         data = None
213         if resp:
214             data = resp.content()
215         req.cleanup()
216         ses.cleanup()
217         return data
218
219
220
221
222 class Request(object):
223     """Represents a single OpenSRF request.
224         A request is made and any resulting respones are 
225         collected for the client."""
226
227     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
228
229         self.session = session # my session handle
230         self.rid     = rid # my unique request ID
231         self.method = method # method name
232         self.params = params # my method params
233         self.queue  = [] # response queue
234         self.reset_timeout = False # resets the recv timeout?
235         self.complete = False # has the server told us this request is done?
236         self.send_time = 0 # local time the request was put on the wire
237         self.complete_time =  0 # time the server told us the request was completed
238         self.first_response_time = 0 # time it took for our first reponse to be received
239         self.locale = locale
240
241     def send(self):
242         """Sends a request message"""
243
244         # construct the method object message with params and method name
245         method = osrf.net_obj.NetworkObject.osrfMethod( {
246             'method' : self.method,
247             'params' : self.params
248         } )
249
250         # construct the osrf message with our method message embedded
251         message = osrf.net_obj.NetworkObject.osrfMessage( {
252             'threadTrace' : self.rid,
253             'type' : OSRF_MESSAGE_TYPE_REQUEST,
254             'payload' : method,
255             'locale' : self.locale
256         } )
257
258         self.send_time = time.time()
259         self.session.send(message)
260
261     def recv(self, timeout=120):
262         """ Waits up to <timeout> seconds for a response to this request.
263         
264             If a message is received in time, the response message is returned.
265             Returns None otherwise."""
266
267         self.session.wait(0)
268
269         orig_timeout = timeout
270         while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
271             s = time.time()
272             self.session.wait(timeout)
273             if orig_timeout >= 0:
274                 timeout -= time.time() - s
275             if self.reset_timeout:
276                 self.reset_timeout = False
277                 timeout = orig_timeout
278
279         now = time.time()
280
281         # -----------------------------------------------------------------
282         # log some statistics 
283         if len(self.queue) > 0:
284             if not self.first_response_time:
285                 self.first_response_time = now
286                 osrf.log.log_debug("time elapsed before first response: %f" \
287                     % (self.first_response_time - self.send_time))
288
289         if self.complete:
290             if not self.complete_time:
291                 self.complete_time = now
292                 osrf.log.log_debug("time elapsed before complete: %f" \
293                     % (self.complete_time - self.send_time))
294         # -----------------------------------------------------------------
295
296
297         if len(self.queue) > 0:
298             # we have a reponse, return it
299             return self.queue.pop(0)
300
301         return None
302
303     def push_response(self, content):
304         """Pushes a method response onto this requests response queue."""
305         self.queue.append(content)
306
307     def cleanup(self):
308         """Cleans up request data from the cache. 
309
310             Do this when you are done with a request to prevent "leaked" cache memory."""
311         del self.session.requests[str(self.rid)]
312
313     def set_complete(self):
314         """Sets me as complete.  This means the server has sent a 'request complete' message"""
315         self.complete = True
316
317
318 class ServerSession(Session):
319     """Implements a server-side session"""
320     pass
321
322
323
324
325
326 class MultiSession(object):
327     ''' Manages multiple requests.  With the current implementation, a 1 second 
328         lag time before the first response is practically guaranteed.  Use 
329         only for long running requests.
330
331         Another approach would be a threaded version, but that would require
332         build-up and breakdown of thread-specific xmpp connections somewhere.
333         conection pooling? 
334     '''
335     class Container(object):
336         def __init__(self, req):
337             self.req = req
338             self.id = None
339
340     def __init__(self):
341         self.complete = False
342         self.reqs = []
343
344     def request(self, service, method, *args):
345         ses = ClientSession(service)
346         cont = MultiSession.Container(ses.request(method, *args))
347         cont.id = len(self.reqs)
348         self.reqs.append(cont)
349
350     def recv(self, timeout=10):
351         ''' Returns a tuple of req_id, response '''
352         duration = 0
353         block_time = 1
354         while True:
355             for i in range(0, len(self.reqs)):
356                 cont = self.reqs[i]
357                 req = cont.req
358
359                 res = req.recv(0)
360                 if i == 0 and not res:
361                     res = req.recv(block_time)
362
363                 if res: break
364
365             if res: break
366
367             duration += block_time
368             if duration >= timeout:
369                 return None
370
371         if req.complete:
372             self.reqs.pop(self.reqs.index(cont))
373
374         if len(self.reqs) == 0:
375             self.complete = True
376
377         return cont.id, res.content()
378