]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/ses.py
452813cef98f3f3075e9e8f6982fb9be023d6f4e
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 import osrf.json
17 import osrf.conf
18 import osrf.log
19 import osrf.net
20 import osrf.net_obj
21 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
22     OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
23     OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
24     OSRF_MESSAGE_TYPE_REQUEST
25 import osrf.ex
26 import random, os, time, threading
27
28
29 # -----------------------------------------------------------------------
30 # Go ahead and register the common network objects
31 # -----------------------------------------------------------------------
32 osrf.net_obj.NetworkRegisterHint('osrfMessage', ['threadTrace', 'type', 'payload'], 'hash')
33 osrf.net_obj.NetworkRegisterHint('osrfMethod', ['method', 'params'], 'hash')
34 osrf.net_obj.NetworkRegisterHint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
35 osrf.net_obj.NetworkRegisterHint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
36 osrf.net_obj.NetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash')
37
38
39 class Session(object):
40     """Abstract session superclass."""
41
42     ''' Global cache of in-service sessions '''
43     session_cache = {}
44
45     def __init__(self):
46         # by default, we're connected to no one
47         self.state = OSRF_APP_SESSION_DISCONNECTED
48         self.remote_id = None
49
50     def find_session(threadTrace):
51         return Session.session_cache.get(threadTrace)
52     find_session = staticmethod(find_session)
53
54     def wait(self, timeout=120):
55         """Wait up to <timeout> seconds for data to arrive on the network"""
56         osrf.log.osrfLogInternal("Session.wait(%d)" % timeout)
57         handle = osrf.net.get_network_handle()
58         handle.recv(timeout)
59
60     def send(self, omessage):
61         """Sends an OpenSRF message"""
62         net_msg = osrf.net.NetworkMessage(
63             recipient      = self.remote_id,
64             body    = osrf.json.to_json([omessage]),
65             thread = self.thread )
66
67         handle = osrf.net.get_network_handle()
68         handle.send(net_msg)
69
70     def cleanup(self):
71         """Removes the session from the global session cache."""
72         del Session.session_cache[self.thread]
73
74 class ClientSession(Session):
75     """Client session object.  Use this to make server requests."""
76
77     def __init__(self, service):
78         
79         # call superclass constructor
80         Session.__init__(self)
81
82         # the remote service we want to make requests of
83         self.service = service
84
85         # find the remote service handle <router>@<domain>/<service>
86         domain = osrf.conf.get('domains.domain', 0)
87         router = osrf.conf.get('router_name')
88         self.remote_id = "%s@%s/%s" % (router, domain, service)
89         self.orig_remote_id = self.remote_id
90
91         # generate a random message thread
92         self.thread = "%s%s%s%s" % (os.getpid(), 
93             str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
94
95         # how many requests this session has taken part in
96         self.next_id = 0 
97
98         # cache of request objects 
99         self.requests = {}
100
101         # cache this session in the global session cache
102         Session.session_cache[self.thread] = self
103
104     def reset_request_timeout(self, rid):
105         req = self.find_request(rid)
106         if req:
107             req.reset_timeout = True
108             
109
110     def request2(self, method, arr):
111         """Creates a new request and sends the request to the server using a python array as the params."""
112         return self.__request(method, arr)
113
114     def request(self, method, *args):
115         """Creates a new request and sends the request to the server using a variable argument list as params"""
116         arr = list(args)
117         return self.__request(method, arr)
118
119     def __request(self, method, arr):
120         """Builds the request object and sends it."""
121         if self.state != OSRF_APP_SESSION_CONNECTED:
122             self.reset_remote_id()
123
124         osrf.log.logDebug("Sending request %s -> %s " % (self.service, method))
125         req = Request(self, self.next_id, method, arr)
126         self.requests[str(self.next_id)] = req
127         self.next_id += 1
128         req.send()
129         return req
130
131
132     def connect(self, timeout=10):
133         """Connects to a remote service"""
134
135         if self.state == OSRF_APP_SESSION_CONNECTED:
136             return True
137         self.state == OSRF_APP_SESSION_CONNECTING
138
139         # construct and send a CONNECT message
140         self.send(
141             osrf.net_obj.NetworkObject.osrfMessage( 
142                 {   'threadTrace' : 0,
143                     'type' : OSRF_MESSAGE_TYPE_CONNECT
144                 } 
145             )
146         )
147
148         while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
149             start = time.time()
150             self.wait(timeout)
151             timeout -= time.time() - start
152         
153         if self.state != OSRF_APP_SESSION_CONNECTED:
154             raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
155         
156         return True
157
158     def disconnect(self):
159         """Disconnects from a remote service"""
160
161         if self.state == OSRF_APP_SESSION_DISCONNECTED:
162             return True
163
164         self.send(
165             osrf.net_obj.NetworkObject.osrfMessage( 
166                 {   'threadTrace' : 0,
167                     'type' : OSRF_MESSAGE_TYPE_DISCONNECT
168                 } 
169             )
170         )
171
172         self.state = OSRF_APP_SESSION_DISCONNECTED
173
174     
175     def set_remote_id(self, remoteid):
176         self.remote_id = remoteid
177         osrf.log.osrfLogInternal("Setting request remote ID to %s" % self.remote_id)
178
179     def reset_remote_id(self):
180         """Recovers the original remote id"""
181         self.remote_id = self.orig_remote_id
182         osrf.log.osrfLogInternal("Resetting remote ID to %s" % self.remote_id)
183
184     def push_response_queue(self, message):
185         """Pushes the message payload onto the response queue 
186             for the request associated with the message's ID."""
187         osrf.log.logDebug("pushing %s" % message.payload())
188         try:
189             self.find_request(message.threadTrace()).pushResponse(message.payload())
190         except Exception, e: 
191             osrf.log.osrfLogWarn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
192
193     def find_request(self, rid):
194         """Returns the original request matching this message's threadTrace."""
195         try:
196             return self.requests[str(rid)]
197         except KeyError:
198             osrf.log.logDebug('find_request(): non-existent request %s' % str(rid))
199             return None
200
201
202
203 class Request(object):
204     """Represents a single OpenSRF request.
205         A request is made and any resulting respones are 
206         collected for the client."""
207
208     def __init__(self, session, rid, method=None, params=[]):
209
210         self.session = session # my session handle
211         self.rid     = rid # my unique request ID
212         self.method = method # method name
213         self.params = params # my method params
214         self.queue  = [] # response queue
215         self.reset_timeout = False # resets the recv timeout?
216         self.complete = False # has the server told us this request is done?
217         self.send_time = 0 # local time the request was put on the wire
218         self.complete_time =  0 # time the server told us the request was completed
219         self.first_response_time = 0 # time it took for our first reponse to be received
220
221     def send(self):
222         """Sends a request message"""
223
224         # construct the method object message with params and method name
225         method = osrf.net_obj.NetworkObject.osrfMethod( {
226             'method' : self.method,
227             'params' : self.params
228         } )
229
230         # construct the osrf message with our method message embedded
231         message = osrf.net_obj.NetworkObject.osrfMessage( {
232             'threadTrace' : self.rid,
233             'type' : OSRF_MESSAGE_TYPE_REQUEST,
234             'payload' : method
235         } )
236
237         self.send_time = time.time()
238         self.session.send(message)
239
240     def recv(self, timeout=120):
241         """Waits up to <timeout> seconds for a response to this request.
242         
243             If a message is received in time, the response message is returned.
244             Returns None otherwise."""
245
246         self.session.wait(0)
247
248         orig_timeout = timeout
249         while not self.complete and timeout >= 0 and len(self.queue) == 0:
250             s = time.time()
251             self.session.wait(timeout)
252             timeout -= time.time() - s
253             if self.reset_timeout:
254                 self.reset_timeout = False
255                 timeout = orig_timeout
256
257         now = time.time()
258
259         # -----------------------------------------------------------------
260         # log some statistics 
261         if len(self.queue) > 0:
262             if not self.first_response_time:
263                 self.first_response_time = now
264                 osrf.log.logDebug("time elapsed before first response: %f" \
265                     % (self.first_response_time - self.send_time))
266
267         if self.complete:
268             if not self.complete_time:
269                 self.complete_time = now
270                 osrf.log.logDebug("time elapsed before complete: %f" \
271                     % (self.complete_time - self.send_time))
272         # -----------------------------------------------------------------
273
274
275         if len(self.queue) > 0:
276             # we have a reponse, return it
277             return self.queue.pop(0)
278
279         return None
280
281     def pushResponse(self, content):
282         """Pushes a method response onto this requests response queue."""
283         self.queue.append(content)
284
285     def cleanup(self):
286         """Cleans up request data from the cache. 
287
288             Do this when you are done with a request to prevent "leaked" cache memory."""
289         del self.session.requests[str(self.rid)]
290
291     def set_complete(self):
292         """Sets me as complete.  This means the server has sent a 'request complete' message"""
293         self.complete = True
294
295
296 class ServerSession(Session):
297     """Implements a server-side session"""
298     pass
299
300
301 def AtomicRequest(service, method, *args):
302     ses = ClientSession(service)
303     req = ses.request2(method, list(args))
304     resp = req.recv()
305     data = resp.content()
306     req.cleanup()
307     ses.cleanup()
308     return data
309
310
311