Python libs for OpenSRF ingress tracking
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
17 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
18     OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
19     OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
20     OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
21 import osrf.ex
22 import random, os, time, threading
23
24
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload', 'ingress'], 'hash')
29 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
30 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
32 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
33
34
35 class Session(object):
36     """Abstract session superclass."""
37
38     ''' Global cache of in-service sessions '''
39     session_cache = {}
40     current_ingress = 'opensrf';
41
42     def __init__(self):
43         # by default, we're connected to no one
44         self.state = OSRF_APP_SESSION_DISCONNECTED
45         self.remote_id = None
46         self.locale = None
47         self.thread = None
48         self.service = None
49
50     @staticmethod
51     def find_or_create(thread):
52         if thread in Session.session_cache:
53             return Session.session_cache[thread]
54         return ServerSession(thread)
55
56     @staticmethod
57     def ingress(ingress):
58         if ingress:
59             Session.current_ingress = ingress
60         return Session.current_ingress
61
62     def set_remote_id(self, remoteid):
63         self.remote_id = remoteid
64         osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
65
66     def wait(self, timeout=120):
67         """Wait up to <timeout> seconds for data to arrive on the network"""
68         osrf.log.log_internal("Session.wait(%d)" % timeout)
69         handle = osrf.net.get_network_handle()
70         return handle.recv(timeout)
71
72     def send(self, omessages):
73         """Sends an OpenSRF message"""
74         if not isinstance(omessages, list):
75             omessages = [omessages]
76
77         for msg in omessages:
78             msg.ingress(Session.current_ingress);
79             
80         net_msg = osrf.net.NetworkMessage(
81             recipient      = self.remote_id,
82             body    = osrf.json.to_json(omessages),
83             thread = self.thread,
84             locale = self.locale,
85         )
86
87         handle = osrf.net.get_network_handle()
88         handle.send(net_msg)
89
90     def cleanup(self):
91         """Removes the session from the global session cache."""
92         del Session.session_cache[self.thread]
93
94 class ClientSession(Session):
95     """Client session object.  Use this to make server requests."""
96
97     def __init__(self, service, locale='en-US'):
98         
99         # call superclass constructor
100         Session.__init__(self)
101
102         # the service we are sending requests to
103         self.service = service
104
105         # the locale we want requests to be returned in
106         self.locale = locale
107
108         # find the remote service handle <router>@<domain>/<service>
109         domain = osrf.conf.get('domain', 0)
110         router = osrf.conf.get('router_name')
111         self.remote_id = "%s@%s/%s" % (router, domain, service)
112         self.orig_remote_id = self.remote_id
113
114         # generate a random message thread
115         self.thread = "%s%s%s%s" % (os.getpid(), 
116             str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
117
118         # how many requests this session has taken part in
119         self.next_id = 0 
120
121         # cache of request objects 
122         self.requests = {}
123
124         # cache this session in the global session cache
125         Session.session_cache[self.thread] = self
126
127     def reset_request_timeout(self, rid):
128         req = self.find_request(rid)
129         if req:
130             req.reset_timeout = True
131             
132
133     def request2(self, method, arr):
134         """Creates a new request and sends the request to the server using a python array as the params."""
135         return self.__request(method, arr)
136
137     def request(self, method, *args):
138         """Creates a new request and sends the request to the server using a variable argument list as params"""
139         arr = list(args)
140         return self.__request(method, arr)
141
142     def __request(self, method, arr):
143         """Builds the request object and sends it."""
144         if self.state != OSRF_APP_SESSION_CONNECTED:
145             self.reset_remote_id()
146
147         osrf.log.make_xid()
148
149         osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
150         req = ClientRequest(self, self.next_id, method, arr, self.locale)
151         self.requests[str(self.next_id)] = req
152         self.next_id += 1
153         req.send()
154         return req
155
156
157     def connect(self, timeout=10):
158         """Connects to a remote service"""
159
160         if self.state == OSRF_APP_SESSION_CONNECTED:
161             return True
162         self.state = OSRF_APP_SESSION_CONNECTING
163
164         # construct and send a CONNECT message
165         self.send(
166             osrf.net_obj.NetworkObject.osrfMessage( 
167                 {   'threadTrace' : 0,
168                     'type' : OSRF_MESSAGE_TYPE_CONNECT
169                 } 
170             )
171         )
172
173         while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
174             start = time.time()
175             self.wait(timeout)
176             timeout -= time.time() - start
177         
178         if self.state != OSRF_APP_SESSION_CONNECTED:
179             raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
180         
181         return True
182
183     def disconnect(self):
184         """Disconnects from a remote service"""
185
186         if self.state == OSRF_APP_SESSION_DISCONNECTED:
187             return True
188
189         self.send(
190             osrf.net_obj.NetworkObject.osrfMessage( 
191                 {   'threadTrace' : 0,
192                     'type' : OSRF_MESSAGE_TYPE_DISCONNECT
193                 } 
194             )
195         )
196
197         self.state = OSRF_APP_SESSION_DISCONNECTED
198
199     
200
201     def reset_remote_id(self):
202         """Recovers the original remote id"""
203         self.remote_id = self.orig_remote_id
204         osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
205
206     def push_response_queue(self, message):
207         """Pushes the message payload onto the response queue 
208             for the request associated with the message's ID."""
209         osrf.log.log_debug("pushing %s" % message.payload())
210         try:
211             self.find_request(message.threadTrace()).push_response(message.payload())
212         except Exception, e: 
213             osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
214
215     def find_request(self, rid):
216         """Returns the original request matching this message's threadTrace."""
217         try:
218             return self.requests[str(rid)]
219         except KeyError:
220             osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
221             return None
222
223     @staticmethod
224     def atomic_request(service, method, *args):
225         ses = ClientSession(service)
226         req = ses.request2(method, list(args))
227         resp = req.recv()
228         data = None
229         if resp:
230             data = resp.content()
231         req.cleanup()
232         ses.cleanup()
233         return data
234
235
236
237
238 class Request(object):
239     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
240         self.session = session # my session handle
241         self.rid     = rid # my unique request ID
242         self.method = method # method name
243         self.params = params # my method params
244         self.locale = locale
245         self.complete = False # is this request done?
246         self.complete_time =  0 # time at which the request was completed
247
248
249 class ClientRequest(Request):
250     """Represents a single OpenSRF request.
251         A request is made and any resulting respones are 
252         collected for the client."""
253
254     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
255         Request.__init__(self, session, rid, method, params, locale)
256         self.queue  = [] # response queue
257         self.reset_timeout = False # resets the recv timeout?
258         self.send_time = 0 # local time the request was put on the wire
259         self.first_response_time = 0 # time it took for our first reponse to be received
260
261     def send(self):
262         """Sends a request message"""
263
264         # construct the method object message with params and method name
265         method = osrf.net_obj.NetworkObject.osrfMethod( {
266             'method' : self.method,
267             'params' : self.params
268         } )
269
270         # construct the osrf message with our method message embedded
271         message = osrf.net_obj.NetworkObject.osrfMessage( {
272             'threadTrace' : self.rid,
273             'type' : OSRF_MESSAGE_TYPE_REQUEST,
274             'payload' : method,
275             'locale' : self.locale
276         } )
277
278         self.send_time = time.time()
279         self.session.send(message)
280
281     def recv(self, timeout=120):
282         """ Waits up to <timeout> seconds for a response to this request.
283         
284             If a message is received in time, the response message is returned.
285             Returns None otherwise."""
286
287         self.session.wait(0)
288
289         orig_timeout = timeout
290         while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
291
292             s = time.time()
293             self.session.wait(timeout)
294
295             if self.reset_timeout:
296                 self.reset_timeout = False
297                 timeout = orig_timeout
298
299             elif orig_timeout >= 0:
300                 timeout -= time.time() - s
301
302         now = time.time()
303
304         # -----------------------------------------------------------------
305         # log some statistics 
306         if len(self.queue) > 0:
307             if not self.first_response_time:
308                 self.first_response_time = now
309                 osrf.log.log_debug("time elapsed before first response: %f" \
310                     % (self.first_response_time - self.send_time))
311
312         if self.complete:
313             if not self.complete_time:
314                 self.complete_time = now
315                 osrf.log.log_debug("time elapsed before complete: %f" \
316                     % (self.complete_time - self.send_time))
317         # -----------------------------------------------------------------
318
319
320         if len(self.queue) > 0:
321             # we have a reponse, return it
322             return self.queue.pop(0)
323
324         return None
325
326     def push_response(self, content):
327         """Pushes a method response onto this requests response queue."""
328         self.queue.append(content)
329
330     def cleanup(self):
331         """Cleans up request data from the cache. 
332
333             Do this when you are done with a request to prevent "leaked" cache memory."""
334         del self.session.requests[str(self.rid)]
335
336     def set_complete(self):
337         """Sets me as complete.  This means the server has sent a 'request complete' message"""
338         self.complete = True
339
340
341 class ServerSession(Session):
342     """Implements a server-side session"""
343
344     def __init__(self, thread):
345         Session.__init__(self)
346         self.thread = thread
347         self.callbacks = {}
348         self.session_data = {}
349         Session.session_cache[self.thread] = self
350
351     def send_status(self, thread_trace, payload):
352         self.send(
353             osrf.net_obj.NetworkObject.osrfMessage( 
354                 {   'threadTrace' : thread_trace,
355                     'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
356                     'payload' : payload,
357                     'locale' : self.locale
358                 } 
359             )
360         )
361
362     def send_connect_ok(self, thread_trace):
363         status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({   
364             'status' : 'Connection Successful',
365             'statusCode': osrf.const.OSRF_STATUS_OK
366         })
367         self.send_status(thread_trace, status_msg)
368
369     def send_method_not_found(self, thread_trace, method_name):
370         status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({   
371             'status' : 'Method [%s] not found for %s' % (method_name, self.service),
372             'statusCode': osrf.const.OSRF_STATUS_NOTFOUND
373         })
374         self.send_status(thread_trace, status_msg)
375
376
377     def run_callback(self, type):
378         if type in self.callbacks:
379             self.callbacks[type](self)
380
381     def register_callback(self, type, func):
382         self.callbacks[type] = func
383
384     def cleanup(self):
385         Session.cleanup(self)
386         self.run_callback('death')
387
388
389 class ServerRequest(Request):
390
391     def __init__(self, session, rid, method, params=[]):
392         Request.__init__(self, session, rid, method, params, session.locale)
393         self.response_list = []
394
395     def _build_response_msg(self, data):
396         result = osrf.net_obj.NetworkObject.osrfResult({
397             'content' :  data,
398             'statusCode' : osrf.const.OSRF_STATUS_OK,
399             'status' : 'OK'
400         })
401
402         return osrf.net_obj.NetworkObject.osrfMessage({
403             'threadTrace' : self.rid,
404             'type' : OSRF_MESSAGE_TYPE_RESULT,
405             'payload' : result,
406             'locale' : self.locale
407         })
408
409     def _build_complete_msg(self):
410
411         status = osrf.net_obj.NetworkObject.osrfConnectStatus({   
412             'threadTrace' : self.rid,
413             'status' : 'Request Complete',
414             'statusCode': osrf.const.OSRF_STATUS_COMPLETE
415         })
416
417         return osrf.net_obj.NetworkObject.osrfMessage({
418             'threadTrace' : self.rid,
419             'type' : OSRF_MESSAGE_TYPE_STATUS,
420             'payload' : status,
421             'locale' : self.locale
422         })
423
424     def respond(self, data):
425         ''' For non-atomic calls, this sends a response directly back
426             to the client.  For atomic calls, this pushes the response
427             onto the response list '''
428         osrf.log.log_internal("responding with %s" % str(data))
429         if self.method.atomic:
430             self.response_list.append(data)
431         else:
432             self.session.send(self._build_response_msg(data))
433
434     def respond_complete(self, data):
435         ''' Sends a complete message accompanied by the final result if applicable '''
436
437         if self.complete: 
438             return
439         self.complete = True
440         self.complete_time = time.time()
441
442         if self.method.atomic:
443             if data is not None:
444                 self.response_list.append(data) 
445             self.session.send([
446                 self._build_response_msg(self.response_list),
447                 self._build_complete_msg(),
448             ])
449
450         elif data is not None:
451             self.session.send([
452                 self._build_response_msg(data),
453                 self._build_complete_msg(),
454             ])
455
456         else:
457             self.session.send(self._build_complete_msg())
458             
459
460 class MultiSession(object):
461     ''' Manages multiple requests.  With the current implementation, a 1 second 
462         lag time before the first response is practically guaranteed.  Use 
463         only for long running requests.
464
465         Another approach would be a threaded version, but that would require
466         build-up and breakdown of thread-specific xmpp connections somewhere.
467         conection pooling? 
468     '''
469     class Container(object):
470         def __init__(self, req):
471             self.req = req
472             self.id = None
473
474     def __init__(self):
475         self.complete = False
476         self.reqs = []
477
478     def request(self, service, method, *args):
479         ses = ClientSession(service)
480         cont = MultiSession.Container(ses.request(method, *args))
481         cont.id = len(self.reqs)
482         self.reqs.append(cont)
483
484     def recv(self, timeout=120):
485         ''' Returns a tuple of req_id, response '''
486         duration = 0
487         block_time = 1
488         while True:
489             for i in range(0, len(self.reqs)):
490                 cont = self.reqs[i]
491                 req = cont.req
492
493                 res = req.recv(0)
494                 if i == 0 and not res:
495                     res = req.recv(block_time)
496
497                 if res: break
498
499             if res: break
500
501             duration += block_time
502             if duration >= timeout:
503                 return None
504
505         if req.complete:
506             self.reqs.pop(self.reqs.index(cont))
507
508         if len(self.reqs) == 0:
509             self.complete = True
510
511         return cont.id, res.content()
512