15e1f10ad32781d89582858709d215d63900c205
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
17 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
18     OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
19     OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
20     OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
21 import osrf.ex
22 import random, os, time, threading
23
24
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
29 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
30 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
32 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
33
34
35 class Session(object):
36     """Abstract session superclass."""
37
38     ''' Global cache of in-service sessions '''
39     session_cache = {}
40
41     def __init__(self):
42         # by default, we're connected to no one
43         self.state = OSRF_APP_SESSION_DISCONNECTED
44         self.remote_id = None
45         self.locale = None
46         self.thread = None
47         self.service = None
48
49     @staticmethod
50     def find_or_create(thread):
51         if thread in Session.session_cache:
52             return Session.session_cache[thread]
53         return ServerSession(thread)
54
55     def set_remote_id(self, remoteid):
56         self.remote_id = remoteid
57         osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
58
59     def wait(self, timeout=120):
60         """Wait up to <timeout> seconds for data to arrive on the network"""
61         osrf.log.log_internal("Session.wait(%d)" % timeout)
62         handle = osrf.net.get_network_handle()
63         return handle.recv(timeout)
64
65     def send(self, omessages):
66         """Sends an OpenSRF message"""
67         if not isinstance(omessages, list):
68             omessages = [omessages]
69             
70         net_msg = osrf.net.NetworkMessage(
71             recipient      = self.remote_id,
72             body    = osrf.json.to_json(omessages),
73             thread = self.thread,
74             locale = self.locale,
75         )
76
77         handle = osrf.net.get_network_handle()
78         handle.send(net_msg)
79
80     def cleanup(self):
81         """Removes the session from the global session cache."""
82         del Session.session_cache[self.thread]
83
84 class ClientSession(Session):
85     """Client session object.  Use this to make server requests."""
86
87     def __init__(self, service, locale='en-US'):
88         
89         # call superclass constructor
90         Session.__init__(self)
91
92         # the service we are sending requests to
93         self.service = service
94
95         # the locale we want requests to be returned in
96         self.locale = locale
97
98         # find the remote service handle <router>@<domain>/<service>
99         domain = osrf.conf.get('domain', 0)
100         router = osrf.conf.get('router_name')
101         self.remote_id = "%s@%s/%s" % (router, domain, service)
102         self.orig_remote_id = self.remote_id
103
104         # generate a random message thread
105         self.thread = "%s%s%s%s" % (os.getpid(), 
106             str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
107
108         # how many requests this session has taken part in
109         self.next_id = 0 
110
111         # cache of request objects 
112         self.requests = {}
113
114         # cache this session in the global session cache
115         Session.session_cache[self.thread] = self
116
117     def reset_request_timeout(self, rid):
118         req = self.find_request(rid)
119         if req:
120             req.reset_timeout = True
121             
122
123     def request2(self, method, arr):
124         """Creates a new request and sends the request to the server using a python array as the params."""
125         return self.__request(method, arr)
126
127     def request(self, method, *args):
128         """Creates a new request and sends the request to the server using a variable argument list as params"""
129         arr = list(args)
130         return self.__request(method, arr)
131
132     def __request(self, method, arr):
133         """Builds the request object and sends it."""
134         if self.state != OSRF_APP_SESSION_CONNECTED:
135             self.reset_remote_id()
136
137         osrf.log.make_xid()
138
139         osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
140         req = ClientRequest(self, self.next_id, method, arr, self.locale)
141         self.requests[str(self.next_id)] = req
142         self.next_id += 1
143         req.send()
144         return req
145
146
147     def connect(self, timeout=10):
148         """Connects to a remote service"""
149
150         if self.state == OSRF_APP_SESSION_CONNECTED:
151             return True
152         self.state = OSRF_APP_SESSION_CONNECTING
153
154         # construct and send a CONNECT message
155         self.send(
156             osrf.net_obj.NetworkObject.osrfMessage( 
157                 {   'threadTrace' : 0,
158                     'type' : OSRF_MESSAGE_TYPE_CONNECT
159                 } 
160             )
161         )
162
163         while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
164             start = time.time()
165             self.wait(timeout)
166             timeout -= time.time() - start
167         
168         if self.state != OSRF_APP_SESSION_CONNECTED:
169             raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
170         
171         return True
172
173     def disconnect(self):
174         """Disconnects from a remote service"""
175
176         if self.state == OSRF_APP_SESSION_DISCONNECTED:
177             return True
178
179         self.send(
180             osrf.net_obj.NetworkObject.osrfMessage( 
181                 {   'threadTrace' : 0,
182                     'type' : OSRF_MESSAGE_TYPE_DISCONNECT
183                 } 
184             )
185         )
186
187         self.state = OSRF_APP_SESSION_DISCONNECTED
188
189     
190
191     def reset_remote_id(self):
192         """Recovers the original remote id"""
193         self.remote_id = self.orig_remote_id
194         osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
195
196     def push_response_queue(self, message):
197         """Pushes the message payload onto the response queue 
198             for the request associated with the message's ID."""
199         osrf.log.log_debug("pushing %s" % message.payload())
200         try:
201             self.find_request(message.threadTrace()).push_response(message.payload())
202         except Exception, e: 
203             osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
204
205     def find_request(self, rid):
206         """Returns the original request matching this message's threadTrace."""
207         try:
208             return self.requests[str(rid)]
209         except KeyError:
210             osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
211             return None
212
213     @staticmethod
214     def atomic_request(service, method, *args):
215         ses = ClientSession(service)
216         req = ses.request2(method, list(args))
217         resp = req.recv()
218         data = None
219         if resp:
220             data = resp.content()
221         req.cleanup()
222         ses.cleanup()
223         return data
224
225
226
227
228 class Request(object):
229     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
230         self.session = session # my session handle
231         self.rid     = rid # my unique request ID
232         self.method = method # method name
233         self.params = params # my method params
234         self.locale = locale
235         self.complete = False # is this request done?
236         self.complete_time =  0 # time at which the request was completed
237
238
239 class ClientRequest(Request):
240     """Represents a single OpenSRF request.
241         A request is made and any resulting respones are 
242         collected for the client."""
243
244     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
245         Request.__init__(self, session, rid, method, params, locale)
246         self.queue  = [] # response queue
247         self.reset_timeout = False # resets the recv timeout?
248         self.send_time = 0 # local time the request was put on the wire
249         self.first_response_time = 0 # time it took for our first reponse to be received
250
251     def send(self):
252         """Sends a request message"""
253
254         # construct the method object message with params and method name
255         method = osrf.net_obj.NetworkObject.osrfMethod( {
256             'method' : self.method,
257             'params' : self.params
258         } )
259
260         # construct the osrf message with our method message embedded
261         message = osrf.net_obj.NetworkObject.osrfMessage( {
262             'threadTrace' : self.rid,
263             'type' : OSRF_MESSAGE_TYPE_REQUEST,
264             'payload' : method,
265             'locale' : self.locale
266         } )
267
268         self.send_time = time.time()
269         self.session.send(message)
270
271     def recv(self, timeout=120):
272         """ Waits up to <timeout> seconds for a response to this request.
273         
274             If a message is received in time, the response message is returned.
275             Returns None otherwise."""
276
277         self.session.wait(0)
278
279         orig_timeout = timeout
280         while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
281
282             s = time.time()
283             self.session.wait(timeout)
284
285             if self.reset_timeout:
286                 self.reset_timeout = False
287                 timeout = orig_timeout
288
289             elif orig_timeout >= 0:
290                 timeout -= time.time() - s
291
292         now = time.time()
293
294         # -----------------------------------------------------------------
295         # log some statistics 
296         if len(self.queue) > 0:
297             if not self.first_response_time:
298                 self.first_response_time = now
299                 osrf.log.log_debug("time elapsed before first response: %f" \
300                     % (self.first_response_time - self.send_time))
301
302         if self.complete:
303             if not self.complete_time:
304                 self.complete_time = now
305                 osrf.log.log_debug("time elapsed before complete: %f" \
306                     % (self.complete_time - self.send_time))
307         # -----------------------------------------------------------------
308
309
310         if len(self.queue) > 0:
311             # we have a reponse, return it
312             return self.queue.pop(0)
313
314         return None
315
316     def push_response(self, content):
317         """Pushes a method response onto this requests response queue."""
318         self.queue.append(content)
319
320     def cleanup(self):
321         """Cleans up request data from the cache. 
322
323             Do this when you are done with a request to prevent "leaked" cache memory."""
324         del self.session.requests[str(self.rid)]
325
326     def set_complete(self):
327         """Sets me as complete.  This means the server has sent a 'request complete' message"""
328         self.complete = True
329
330
331 class ServerSession(Session):
332     """Implements a server-side session"""
333
334     def __init__(self, thread):
335         Session.__init__(self)
336         self.thread = thread
337         self.callbacks = {}
338         self.session_data = {}
339         Session.session_cache[self.thread] = self
340
341     def send_status(self, thread_trace, payload):
342         self.send(
343             osrf.net_obj.NetworkObject.osrfMessage( 
344                 {   'threadTrace' : thread_trace,
345                     'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
346                     'payload' : payload,
347                     'locale' : self.locale
348                 } 
349             )
350         )
351
352     def send_connect_ok(self, thread_trace):
353         status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({   
354             'status' : 'Connection Successful',
355             'statusCode': osrf.const.OSRF_STATUS_OK
356         })
357         self.send_status(thread_trace, status_msg)
358
359     def send_method_not_found(self, thread_trace, method_name):
360         status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({   
361             'status' : 'Method [%s] not found for %s' % (method_name, self.service),
362             'statusCode': osrf.const.OSRF_STATUS_NOTFOUND
363         })
364         self.send_status(thread_trace, status_msg)
365
366
367     def run_callback(self, type):
368         if type in self.callbacks:
369             self.callbacks[type](self)
370
371     def register_callback(self, type, func):
372         self.callbacks[type] = func
373
374     def cleanup(self):
375         Session.cleanup(self)
376         self.run_callback('death')
377
378
379 class ServerRequest(Request):
380
381     def __init__(self, session, rid, method, params=[]):
382         Request.__init__(self, session, rid, method, params, session.locale)
383         self.response_list = []
384
385     def _build_response_msg(self, data):
386         result = osrf.net_obj.NetworkObject.osrfResult({
387             'content' :  data,
388             'statusCode' : osrf.const.OSRF_STATUS_OK,
389             'status' : 'OK'
390         })
391
392         return osrf.net_obj.NetworkObject.osrfMessage({
393             'threadTrace' : self.rid,
394             'type' : OSRF_MESSAGE_TYPE_RESULT,
395             'payload' : result,
396             'locale' : self.locale
397         })
398
399     def _build_complete_msg(self):
400
401         status = osrf.net_obj.NetworkObject.osrfConnectStatus({   
402             'threadTrace' : self.rid,
403             'status' : 'Request Complete',
404             'statusCode': osrf.const.OSRF_STATUS_COMPLETE
405         })
406
407         return osrf.net_obj.NetworkObject.osrfMessage({
408             'threadTrace' : self.rid,
409             'type' : OSRF_MESSAGE_TYPE_STATUS,
410             'payload' : status,
411             'locale' : self.locale
412         })
413
414     def respond(self, data):
415         ''' For non-atomic calls, this sends a response directly back
416             to the client.  For atomic calls, this pushes the response
417             onto the response list '''
418         osrf.log.log_internal("responding with %s" % str(data))
419         if self.method.atomic:
420             self.response_list.append(data)
421         else:
422             self.session.send(self._build_response_msg(data))
423
424     def respond_complete(self, data):
425         ''' Sends a complete message accompanied by the final result if applicable '''
426
427         if self.complete: 
428             return
429         self.complete = True
430         self.complete_time = time.time()
431
432         if self.method.atomic:
433             if data is not None:
434                 self.response_list.append(data) 
435             self.session.send([
436                 self._build_response_msg(self.response_list),
437                 self._build_complete_msg(),
438             ])
439
440         elif data is not None:
441             self.session.send([
442                 self._build_response_msg(data),
443                 self._build_complete_msg(),
444             ])
445
446         else:
447             self.session.send(self._build_complete_msg())
448             
449
450 class MultiSession(object):
451     ''' Manages multiple requests.  With the current implementation, a 1 second 
452         lag time before the first response is practically guaranteed.  Use 
453         only for long running requests.
454
455         Another approach would be a threaded version, but that would require
456         build-up and breakdown of thread-specific xmpp connections somewhere.
457         conection pooling? 
458     '''
459     class Container(object):
460         def __init__(self, req):
461             self.req = req
462             self.id = None
463
464     def __init__(self):
465         self.complete = False
466         self.reqs = []
467
468     def request(self, service, method, *args):
469         ses = ClientSession(service)
470         cont = MultiSession.Container(ses.request(method, *args))
471         cont.id = len(self.reqs)
472         self.reqs.append(cont)
473
474     def recv(self, timeout=120):
475         ''' Returns a tuple of req_id, response '''
476         duration = 0
477         block_time = 1
478         while True:
479             for i in range(0, len(self.reqs)):
480                 cont = self.reqs[i]
481                 req = cont.req
482
483                 res = req.recv(0)
484                 if i == 0 and not res:
485                     res = req.recv(block_time)
486
487                 if res: break
488
489             if res: break
490
491             duration += block_time
492             if duration >= timeout:
493                 return None
494
495         if req.complete:
496             self.reqs.pop(self.reqs.index(cont))
497
498         if len(self.reqs) == 0:
499             self.complete = True
500
501         return cont.id, res.content()
502