added final bits for stateful sessions (drone keepalive)
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
17 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
18     OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
19     OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
20     OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
21 import osrf.ex
22 import random, os, time, threading
23
24
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
29 osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
30 osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
32 osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
33
34
35 class Session(object):
36     """Abstract session superclass."""
37
38     ''' Global cache of in-service sessions '''
39     session_cache = {}
40
41     def __init__(self):
42         # by default, we're connected to no one
43         self.state = OSRF_APP_SESSION_DISCONNECTED
44         self.remote_id = None
45         self.locale = None
46         self.thread = None
47         self.service = None
48
49
50     @staticmethod
51     def find_or_create(thread):
52         if thread in Session.session_cache:
53             return Session.session_cache[thread]
54         return ServerSession(thread)
55
56     def set_remote_id(self, remoteid):
57         self.remote_id = remoteid
58         osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
59
60     def wait(self, timeout=120):
61         """Wait up to <timeout> seconds for data to arrive on the network"""
62         osrf.log.log_internal("Session.wait(%d)" % timeout)
63         handle = osrf.net.get_network_handle()
64         handle.recv(timeout)
65
66     def send(self, omessages):
67         """Sends an OpenSRF message"""
68         if not isinstance(omessages, list):
69             omessages = [omessages]
70             
71         net_msg = osrf.net.NetworkMessage(
72             recipient      = self.remote_id,
73             body    = osrf.json.to_json(omessages),
74             thread = self.thread,
75             locale = self.locale,
76         )
77
78         handle = osrf.net.get_network_handle()
79         handle.send(net_msg)
80
81     def cleanup(self):
82         """Removes the session from the global session cache."""
83         del Session.session_cache[self.thread]
84
85 class ClientSession(Session):
86     """Client session object.  Use this to make server requests."""
87
88     def __init__(self, service, locale='en-US'):
89         
90         # call superclass constructor
91         Session.__init__(self)
92
93         # the service we are sending requests to
94         self.service = service
95
96         # the locale we want requests to be returned in
97         self.locale = locale
98
99         # find the remote service handle <router>@<domain>/<service>
100         domain = osrf.conf.get('domain', 0)
101         router = osrf.conf.get('router_name')
102         self.remote_id = "%s@%s/%s" % (router, domain, service)
103         self.orig_remote_id = self.remote_id
104
105         # generate a random message thread
106         self.thread = "%s%s%s%s" % (os.getpid(), 
107             str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
108
109         # how many requests this session has taken part in
110         self.next_id = 0 
111
112         # cache of request objects 
113         self.requests = {}
114
115         # cache this session in the global session cache
116         Session.session_cache[self.thread] = self
117
118
119     def reset_request_timeout(self, rid):
120         req = self.find_request(rid)
121         if req:
122             req.reset_timeout = True
123             
124
125     def request2(self, method, arr):
126         """Creates a new request and sends the request to the server using a python array as the params."""
127         return self.__request(method, arr)
128
129     def request(self, method, *args):
130         """Creates a new request and sends the request to the server using a variable argument list as params"""
131         arr = list(args)
132         return self.__request(method, arr)
133
134     def __request(self, method, arr):
135         """Builds the request object and sends it."""
136         if self.state != OSRF_APP_SESSION_CONNECTED:
137             self.reset_remote_id()
138
139         osrf.log.make_xid()
140
141         osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
142         req = ClientRequest(self, self.next_id, method, arr, self.locale)
143         self.requests[str(self.next_id)] = req
144         self.next_id += 1
145         req.send()
146         return req
147
148
149     def connect(self, timeout=10):
150         """Connects to a remote service"""
151
152         if self.state == OSRF_APP_SESSION_CONNECTED:
153             return True
154         self.state = OSRF_APP_SESSION_CONNECTING
155
156         # construct and send a CONNECT message
157         self.send(
158             osrf.net_obj.NetworkObject.osrfMessage( 
159                 {   'threadTrace' : 0,
160                     'type' : OSRF_MESSAGE_TYPE_CONNECT
161                 } 
162             )
163         )
164
165         while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
166             start = time.time()
167             self.wait(timeout)
168             timeout -= time.time() - start
169         
170         if self.state != OSRF_APP_SESSION_CONNECTED:
171             raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
172         
173         return True
174
175     def disconnect(self):
176         """Disconnects from a remote service"""
177
178         if self.state == OSRF_APP_SESSION_DISCONNECTED:
179             return True
180
181         self.send(
182             osrf.net_obj.NetworkObject.osrfMessage( 
183                 {   'threadTrace' : 0,
184                     'type' : OSRF_MESSAGE_TYPE_DISCONNECT
185                 } 
186             )
187         )
188
189         self.state = OSRF_APP_SESSION_DISCONNECTED
190
191     
192
193     def reset_remote_id(self):
194         """Recovers the original remote id"""
195         self.remote_id = self.orig_remote_id
196         osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
197
198     def push_response_queue(self, message):
199         """Pushes the message payload onto the response queue 
200             for the request associated with the message's ID."""
201         osrf.log.log_debug("pushing %s" % message.payload())
202         try:
203             self.find_request(message.threadTrace()).push_response(message.payload())
204         except Exception, e: 
205             osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
206
207     def find_request(self, rid):
208         """Returns the original request matching this message's threadTrace."""
209         try:
210             return self.requests[str(rid)]
211         except KeyError:
212             osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
213             return None
214
215     @staticmethod
216     def atomic_request(service, method, *args):
217         ses = ClientSession(service)
218         req = ses.request2(method, list(args))
219         resp = req.recv()
220         data = None
221         if resp:
222             data = resp.content()
223         req.cleanup()
224         ses.cleanup()
225         return data
226
227
228
229
230 class Request(object):
231     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
232         self.session = session # my session handle
233         self.rid     = rid # my unique request ID
234         self.method = method # method name
235         self.params = params # my method params
236         self.locale = locale
237         self.complete = False # is this request done?
238         self.complete_time =  0 # time at which the request was completed
239
240
241 class ClientRequest(Request):
242     """Represents a single OpenSRF request.
243         A request is made and any resulting respones are 
244         collected for the client."""
245
246     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
247         Request.__init__(self, session, rid, method, params, locale)
248         self.queue  = [] # response queue
249         self.reset_timeout = False # resets the recv timeout?
250         self.send_time = 0 # local time the request was put on the wire
251         self.first_response_time = 0 # time it took for our first reponse to be received
252
253     def send(self):
254         """Sends a request message"""
255
256         # construct the method object message with params and method name
257         method = osrf.net_obj.NetworkObject.osrfMethod( {
258             'method' : self.method,
259             'params' : self.params
260         } )
261
262         # construct the osrf message with our method message embedded
263         message = osrf.net_obj.NetworkObject.osrfMessage( {
264             'threadTrace' : self.rid,
265             'type' : OSRF_MESSAGE_TYPE_REQUEST,
266             'payload' : method,
267             'locale' : self.locale
268         } )
269
270         self.send_time = time.time()
271         self.session.send(message)
272
273     def recv(self, timeout=120):
274         """ Waits up to <timeout> seconds for a response to this request.
275         
276             If a message is received in time, the response message is returned.
277             Returns None otherwise."""
278
279         self.session.wait(0)
280
281         orig_timeout = timeout
282         while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
283
284             s = time.time()
285             self.session.wait(timeout)
286
287             if self.reset_timeout:
288                 self.reset_timeout = False
289                 timeout = orig_timeout
290
291             elif orig_timeout >= 0:
292                 timeout -= time.time() - s
293
294         now = time.time()
295
296         # -----------------------------------------------------------------
297         # log some statistics 
298         if len(self.queue) > 0:
299             if not self.first_response_time:
300                 self.first_response_time = now
301                 osrf.log.log_debug("time elapsed before first response: %f" \
302                     % (self.first_response_time - self.send_time))
303
304         if self.complete:
305             if not self.complete_time:
306                 self.complete_time = now
307                 osrf.log.log_debug("time elapsed before complete: %f" \
308                     % (self.complete_time - self.send_time))
309         # -----------------------------------------------------------------
310
311
312         if len(self.queue) > 0:
313             # we have a reponse, return it
314             return self.queue.pop(0)
315
316         return None
317
318     def push_response(self, content):
319         """Pushes a method response onto this requests response queue."""
320         self.queue.append(content)
321
322     def cleanup(self):
323         """Cleans up request data from the cache. 
324
325             Do this when you are done with a request to prevent "leaked" cache memory."""
326         del self.session.requests[str(self.rid)]
327
328     def set_complete(self):
329         """Sets me as complete.  This means the server has sent a 'request complete' message"""
330         self.complete = True
331
332
333 class ServerSession(Session):
334     """Implements a server-side session"""
335
336     def __init__(self, thread):
337         Session.__init__(self)
338         self.thread = thread
339         Session.session_cache[thread] = self
340
341     def send_status(self, thread_trace, payload):
342         self.send(
343             osrf.net_obj.NetworkObject.osrfMessage( 
344                 {   'threadTrace' : thread_trace,
345                     'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
346                     'payload' : payload,
347                     'locale' : self.locale
348                 } 
349             )
350         )
351
352     def send_connect_ok(self, thread_trace):
353         status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({   
354             'status' : 'Connection Successful',
355             'statusCode': osrf.const.OSRF_STATUS_OK
356         })
357         self.send_status(thread_trace, status_msg)
358
359
360 class ServerRequest(Request):
361
362     def __init__(self, session, rid, method, params=[]):
363         Request.__init__(self, session, rid, method, params, session.locale)
364         self.response_list = []
365
366     def _build_response_msg(self, data):
367         result = osrf.net_obj.NetworkObject.osrfResult({
368             'content' :  data,
369             'statusCode' : osrf.const.OSRF_STATUS_OK,
370             'status' : 'OK'
371         })
372
373         return osrf.net_obj.NetworkObject.osrfMessage({
374             'threadTrace' : self.rid,
375             'type' : OSRF_MESSAGE_TYPE_RESULT,
376             'payload' : result,
377             'locale' : self.locale
378         })
379
380     def _build_complete_msg(self):
381
382         status = osrf.net_obj.NetworkObject.osrfConnectStatus({   
383             'threadTrace' : self.rid,
384             'status' : 'Request Complete',
385             'statusCode': osrf.const.OSRF_STATUS_COMPLETE
386         })
387
388         return osrf.net_obj.NetworkObject.osrfMessage({
389             'threadTrace' : self.rid,
390             'type' : OSRF_MESSAGE_TYPE_STATUS,
391             'payload' : status,
392             'locale' : self.locale
393         })
394
395     def respond(self, data):
396         ''' For non-atomic calls, this sends a response directly back
397             to the client.  For atomic calls, this pushes the response
398             onto the response list '''
399         osrf.log.log_internal("responding with %s" % str(data))
400         if self.method.atomic:
401             self.response_list.append(data)
402         else:
403             self.session.send(self._build_response_msg(data))
404
405     def respond_complete(self, data):
406         ''' Sends a complete message accompanied by the final result if applicable '''
407
408         if self.complete: 
409             return
410         self.complete = True
411         self.complete_time = time.time()
412
413         if self.method.atomic:
414             if data is not None:
415                 self.response_list.append(data) 
416             self.session.send([
417                 self._build_response_msg(self.response_list),
418                 self._build_complete_msg(),
419             ])
420
421         elif data is not None:
422             self.session.send([
423                 self._build_response_msg(data),
424                 self._build_complete_msg(),
425             ])
426
427         else:
428             self.session.send(self._build_complete_msg())
429             
430
431 class MultiSession(object):
432     ''' Manages multiple requests.  With the current implementation, a 1 second 
433         lag time before the first response is practically guaranteed.  Use 
434         only for long running requests.
435
436         Another approach would be a threaded version, but that would require
437         build-up and breakdown of thread-specific xmpp connections somewhere.
438         conection pooling? 
439     '''
440     class Container(object):
441         def __init__(self, req):
442             self.req = req
443             self.id = None
444
445     def __init__(self):
446         self.complete = False
447         self.reqs = []
448
449     def request(self, service, method, *args):
450         ses = ClientSession(service)
451         cont = MultiSession.Container(ses.request(method, *args))
452         cont.id = len(self.reqs)
453         self.reqs.append(cont)
454
455     def recv(self, timeout=120):
456         ''' Returns a tuple of req_id, response '''
457         duration = 0
458         block_time = 1
459         while True:
460             for i in range(0, len(self.reqs)):
461                 cont = self.reqs[i]
462                 req = cont.req
463
464                 res = req.recv(0)
465                 if i == 0 and not res:
466                     res = req.recv(block_time)
467
468                 if res: break
469
470             if res: break
471
472             duration += block_time
473             if duration >= timeout:
474                 return None
475
476         if req.complete:
477             self.reqs.pop(self.reqs.index(cont))
478
479         if len(self.reqs) == 0:
480             self.complete = True
481
482         return cont.id, res.content()
483