]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/ses.py
799b1cf9975d580a382bb5b54170d6da34124248
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
17 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
18     OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
19     OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
20     OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
21 import osrf.ex
22 import random, os, time, threading
23
24
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
29 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
30 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
32 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
33
34
35 class Session(object):
36     """Abstract session superclass."""
37
38     ''' Global cache of in-service sessions '''
39     session_cache = {}
40
41     def __init__(self):
42         # by default, we're connected to no one
43         self.state = OSRF_APP_SESSION_DISCONNECTED
44         self.remote_id = None
45         self.locale = None
46         self.thread = None
47         self.service = None
48
49     @staticmethod
50     def find_or_create(thread):
51         if thread in Session.session_cache:
52             return Session.session_cache[thread]
53         return ServerSession(thread)
54
55     def set_remote_id(self, remoteid):
56         self.remote_id = remoteid
57         osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
58
59     def wait(self, timeout=120):
60         """Wait up to <timeout> seconds for data to arrive on the network"""
61         osrf.log.log_internal("Session.wait(%d)" % timeout)
62         handle = osrf.net.get_network_handle()
63         handle.recv(timeout)
64
65     def send(self, omessages):
66         """Sends an OpenSRF message"""
67         if not isinstance(omessages, list):
68             omessages = [omessages]
69             
70         net_msg = osrf.net.NetworkMessage(
71             recipient      = self.remote_id,
72             body    = osrf.json.to_json(omessages),
73             thread = self.thread,
74             locale = self.locale,
75         )
76
77         handle = osrf.net.get_network_handle()
78         handle.send(net_msg)
79
80     def cleanup(self):
81         """Removes the session from the global session cache."""
82         del Session.session_cache[self.thread]
83
84 class ClientSession(Session):
85     """Client session object.  Use this to make server requests."""
86
87     def __init__(self, service, locale='en_US'):
88         
89         # call superclass constructor
90         Session.__init__(self)
91
92         # the service we are sending requests to
93         self.service = service
94
95         # the locale we want requests to be returned in
96         self.locale = locale
97
98         # find the remote service handle <router>@<domain>/<service>
99         domain = osrf.conf.get('domain', 0)
100         router = osrf.conf.get('router_name')
101         self.remote_id = "%s@%s/%s" % (router, domain, service)
102         self.orig_remote_id = self.remote_id
103
104         # generate a random message thread
105         self.thread = "%s%s%s%s" % (os.getpid(), 
106             str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
107
108         # how many requests this session has taken part in
109         self.next_id = 0 
110
111         # cache of request objects 
112         self.requests = {}
113
114         # cache this session in the global session cache
115         Session.session_cache[self.thread] = self
116
117     def reset_request_timeout(self, rid):
118         req = self.find_request(rid)
119         if req:
120             req.reset_timeout = True
121             
122
123     def request2(self, method, arr):
124         """Creates a new request and sends the request to the server using a python array as the params."""
125         return self.__request(method, arr)
126
127     def request(self, method, *args):
128         """Creates a new request and sends the request to the server using a variable argument list as params"""
129         arr = list(args)
130         return self.__request(method, arr)
131
132     def __request(self, method, arr):
133         """Builds the request object and sends it."""
134         if self.state != OSRF_APP_SESSION_CONNECTED:
135             self.reset_remote_id()
136
137         osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
138         req = ClientRequest(self, self.next_id, method, arr, self.locale)
139         self.requests[str(self.next_id)] = req
140         self.next_id += 1
141         req.send()
142         return req
143
144
145     def connect(self, timeout=10):
146         """Connects to a remote service"""
147
148         if self.state == OSRF_APP_SESSION_CONNECTED:
149             return True
150         self.state = OSRF_APP_SESSION_CONNECTING
151
152         # construct and send a CONNECT message
153         self.send(
154             osrf.net_obj.NetworkObject.osrfMessage( 
155                 {   'threadTrace' : 0,
156                     'type' : OSRF_MESSAGE_TYPE_CONNECT
157                 } 
158             )
159         )
160
161         while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
162             start = time.time()
163             self.wait(timeout)
164             timeout -= time.time() - start
165         
166         if self.state != OSRF_APP_SESSION_CONNECTED:
167             raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
168         
169         return True
170
171     def disconnect(self):
172         """Disconnects from a remote service"""
173
174         if self.state == OSRF_APP_SESSION_DISCONNECTED:
175             return True
176
177         self.send(
178             osrf.net_obj.NetworkObject.osrfMessage( 
179                 {   'threadTrace' : 0,
180                     'type' : OSRF_MESSAGE_TYPE_DISCONNECT
181                 } 
182             )
183         )
184
185         self.state = OSRF_APP_SESSION_DISCONNECTED
186
187     
188
189     def reset_remote_id(self):
190         """Recovers the original remote id"""
191         self.remote_id = self.orig_remote_id
192         osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
193
194     def push_response_queue(self, message):
195         """Pushes the message payload onto the response queue 
196             for the request associated with the message's ID."""
197         osrf.log.log_debug("pushing %s" % message.payload())
198         try:
199             self.find_request(message.threadTrace()).push_response(message.payload())
200         except Exception, e: 
201             osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
202
203     def find_request(self, rid):
204         """Returns the original request matching this message's threadTrace."""
205         try:
206             return self.requests[str(rid)]
207         except KeyError:
208             osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
209             return None
210
211     @staticmethod
212     def atomic_request(service, method, *args):
213         ses = ClientSession(service)
214         req = ses.request2(method, list(args))
215         resp = req.recv()
216         data = None
217         if resp:
218             data = resp.content()
219         req.cleanup()
220         ses.cleanup()
221         return data
222
223
224
225
226 class Request(object):
227     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
228         self.session = session # my session handle
229         self.rid     = rid # my unique request ID
230         self.method = method # method name
231         self.params = params # my method params
232         self.locale = locale
233         self.complete = False # is this request done?
234         self.complete_time =  0 # time at which the request was completed
235
236
237 class ClientRequest(Request):
238     """Represents a single OpenSRF request.
239         A request is made and any resulting respones are 
240         collected for the client."""
241
242     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
243         Request.__init__(self, session, rid, method, params, locale)
244         self.queue  = [] # response queue
245         self.reset_timeout = False # resets the recv timeout?
246         self.send_time = 0 # local time the request was put on the wire
247         self.first_response_time = 0 # time it took for our first reponse to be received
248
249     def send(self):
250         """Sends a request message"""
251
252         # construct the method object message with params and method name
253         method = osrf.net_obj.NetworkObject.osrfMethod( {
254             'method' : self.method,
255             'params' : self.params
256         } )
257
258         # construct the osrf message with our method message embedded
259         message = osrf.net_obj.NetworkObject.osrfMessage( {
260             'threadTrace' : self.rid,
261             'type' : OSRF_MESSAGE_TYPE_REQUEST,
262             'payload' : method,
263             'locale' : self.locale
264         } )
265
266         self.send_time = time.time()
267         self.session.send(message)
268
269     def recv(self, timeout=120):
270         """ Waits up to <timeout> seconds for a response to this request.
271         
272             If a message is received in time, the response message is returned.
273             Returns None otherwise."""
274
275         self.session.wait(0)
276
277         orig_timeout = timeout
278         while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
279
280             s = time.time()
281             self.session.wait(timeout)
282
283             if self.reset_timeout:
284                 self.reset_timeout = False
285                 timeout = orig_timeout
286
287             elif orig_timeout >= 0:
288                 timeout -= time.time() - s
289
290         now = time.time()
291
292         # -----------------------------------------------------------------
293         # log some statistics 
294         if len(self.queue) > 0:
295             if not self.first_response_time:
296                 self.first_response_time = now
297                 osrf.log.log_debug("time elapsed before first response: %f" \
298                     % (self.first_response_time - self.send_time))
299
300         if self.complete:
301             if not self.complete_time:
302                 self.complete_time = now
303                 osrf.log.log_debug("time elapsed before complete: %f" \
304                     % (self.complete_time - self.send_time))
305         # -----------------------------------------------------------------
306
307
308         if len(self.queue) > 0:
309             # we have a reponse, return it
310             return self.queue.pop(0)
311
312         return None
313
314     def push_response(self, content):
315         """Pushes a method response onto this requests response queue."""
316         self.queue.append(content)
317
318     def cleanup(self):
319         """Cleans up request data from the cache. 
320
321             Do this when you are done with a request to prevent "leaked" cache memory."""
322         del self.session.requests[str(self.rid)]
323
324     def set_complete(self):
325         """Sets me as complete.  This means the server has sent a 'request complete' message"""
326         self.complete = True
327
328
329 class ServerSession(Session):
330     """Implements a server-side session"""
331
332     def __init__(self, thread):
333         Session.__init__(self)
334         self.thread = thread
335
336     def send_status(self, thread_trace, payload):
337         self.send(
338             osrf.net_obj.NetworkObject.osrfMessage( 
339                 {   'threadTrace' : thread_trace,
340                     'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
341                     'payload' : payload,
342                     'locale' : self.locale
343                 } 
344             )
345         )
346
347     def send_connect_ok(self, thread_trace):
348         status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({   
349             'status' : 'Connection Successful',
350             'statusCode': osrf.const.OSRF_STATUS_OK
351         })
352         self.send_status(thread_trace, status_msg)
353
354
355 class ServerRequest(Request):
356
357     def __init__(self, session, rid, method, params=[]):
358         Request.__init__(self, session, rid, method, params, session.locale)
359         self.response_list = []
360
361     def _build_response_msg(self, data):
362         result = osrf.net_obj.NetworkObject.osrfResult({
363             'content' :  data,
364             'statusCode' : osrf.const.OSRF_STATUS_OK,
365             'status' : 'OK'
366         })
367
368         return osrf.net_obj.NetworkObject.osrfMessage({
369             'threadTrace' : self.rid,
370             'type' : OSRF_MESSAGE_TYPE_RESULT,
371             'payload' : result,
372             'locale' : self.locale
373         })
374
375     def _build_complete_msg(self):
376
377         status = osrf.net_obj.NetworkObject.osrfConnectStatus({   
378             'threadTrace' : self.rid,
379             'status' : 'Request Complete',
380             'statusCode': osrf.const.OSRF_STATUS_COMPLETE
381         })
382
383         return osrf.net_obj.NetworkObject.osrfMessage({
384             'threadTrace' : self.rid,
385             'type' : OSRF_MESSAGE_TYPE_STATUS,
386             'payload' : status,
387             'locale' : self.locale
388         })
389
390     def respond(self, data):
391         ''' For non-atomic calls, this sends a response directly back
392             to the client.  For atomic calls, this pushes the response
393             onto the response list '''
394         osrf.log.log_internal("responding with %s" % str(data))
395         if self.method.atomic:
396             self.response_list.append(data)
397         else:
398             self.session.send(self._build_response_msg(data))
399
400     def respond_complete(self, data):
401         ''' Sends a complete message accompanied by the final result if applicable '''
402
403         if self.complete: 
404             return
405         self.complete = True
406         self.complete_time = time.time()
407
408         if self.method.atomic:
409             if data is not None:
410                 self.response_list.append(data) 
411             self.session.send([
412                 self._build_response_msg(self.response_list),
413                 self._build_complete_msg(),
414             ])
415
416         elif data is not None:
417             self.session.send([
418                 self._build_response_msg(data),
419                 self._build_complete_msg(),
420             ])
421
422         else:
423             self.session.send(self._build_complete_msg())
424             
425
426 class MultiSession(object):
427     ''' Manages multiple requests.  With the current implementation, a 1 second 
428         lag time before the first response is practically guaranteed.  Use 
429         only for long running requests.
430
431         Another approach would be a threaded version, but that would require
432         build-up and breakdown of thread-specific xmpp connections somewhere.
433         conection pooling? 
434     '''
435     class Container(object):
436         def __init__(self, req):
437             self.req = req
438             self.id = None
439
440     def __init__(self):
441         self.complete = False
442         self.reqs = []
443
444     def request(self, service, method, *args):
445         ses = ClientSession(service)
446         cont = MultiSession.Container(ses.request(method, *args))
447         cont.id = len(self.reqs)
448         self.reqs.append(cont)
449
450     def recv(self, timeout=120):
451         ''' Returns a tuple of req_id, response '''
452         duration = 0
453         block_time = 1
454         while True:
455             for i in range(0, len(self.reqs)):
456                 cont = self.reqs[i]
457                 req = cont.req
458
459                 res = req.recv(0)
460                 if i == 0 and not res:
461                     res = req.recv(block_time)
462
463                 if res: break
464
465             if res: break
466
467             duration += block_time
468             if duration >= timeout:
469                 return None
470
471         if req.complete:
472             self.reqs.pop(self.reqs.index(cont))
473
474         if len(self.reqs) == 0:
475             self.complete = True
476
477         return cont.id, res.content()
478