462ae08a38ad2459e8dadf9a8bcb0a49fc384b21
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 from osrf.json import *
17 from osrf.net_obj import *
18 from osrf.conf import osrfConfigValue
19 from osrf.net import osrfNetworkMessage, osrfGetNetworkHandle
20 from osrf.log import *
21 from osrf.const import *
22 import random, sys, os, time
23
24
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrfNetworkRegisterHint('osrfMessage', ['threadTrace', 'type', 'payload'], 'hash')
29 osrfNetworkRegisterHint('osrfMethod', ['method', 'params'], 'hash')
30 osrfNetworkRegisterHint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrfNetworkRegisterHint('osrfConnectStatus', ['status','statusCode'], 'hash')
32 osrfNetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash')
33
34
35 class osrfSession(object):
36     """Abstract session superclass."""
37
38     def __init__(self):
39         # by default, we're connected to no one
40         self.state = OSRF_APP_SESSION_DISCONNECTED
41
42
43     def wait(self, timeout=120):
44         """Wait up to <timeout> seconds for data to arrive on the network"""
45         osrfLogInternal("osrfSession.wait(%d)" % timeout)
46         handle = osrfGetNetworkHandle()
47         handle.recv(timeout)
48
49     def send(self, omessage):
50         """Sends an OpenSRF message"""
51         netMessage = osrfNetworkMessage(
52             to      = self.remoteId,
53             body    = osrfObjectToJSON([omessage]),
54             thread = self.thread )
55
56         handle = osrfGetNetworkHandle()
57         handle.send(netMessage)
58
59     def cleanup(self):
60         """Removes the session from the global session cache."""
61         del osrfClientSession.sessionCache[self.thread]
62
63 class osrfClientSession(osrfSession):
64     """Client session object.  Use this to make server requests."""
65
66     def __init__(self, service):
67         
68         # call superclass constructor
69         osrfSession.__init__(self)
70
71         # the remote service we want to make requests of
72         self.service = service
73
74         # find the remote service handle <router>@<domain>/<service>
75         domain = osrfConfigValue('domains.domain', 0)
76         router = osrfConfigValue('router_name')
77         self.remoteId = "%s@%s/%s" % (router, domain, service)
78         self.origRemoteId = self.remoteId
79
80         # generate a random message thread
81         self.thread = "%s%s%s" % (os.getpid(), str(random.randint(100,100000)), str(time.time()))
82
83         # how many requests this session has taken part in
84         self.nextId = 0 
85
86         # cache of request objects 
87         self.requests = {}
88
89         # cache this session in the global session cache
90         osrfClientSession.sessionCache[self.thread] = self
91
92     def resetRequestTimeout(self, rid):
93         req = self.findRequest(rid)
94         if req:
95             req.resetTimeout = True
96             
97
98     def request2(self, method, arr):
99         """Creates a new request and sends the request to the server using a python array as the params."""
100         return self.__request(method, arr)
101
102     def request(self, method, *args):
103         """Creates a new request and sends the request to the server using a variable argument list as params"""
104         arr = list(args)
105         return self.__request(method, arr)
106
107     def __request(self, method, arr):
108         """Builds the request object and sends it."""
109         if self.state != OSRF_APP_SESSION_CONNECTED:
110             self.resetRemoteId()
111
112         osrfLogDebug("Sending request %s -> %s " % (self.service, method))
113         req = osrfRequest(self, self.nextId, method, arr)
114         self.requests[str(self.nextId)] = req
115         self.nextId += 1
116         req.send()
117         return req
118
119
120     def connect(self, timeout=10):
121         """Connects to a remote service"""
122
123         if self.state == OSRF_APP_SESSION_CONNECTED:
124             return True
125         self.state == OSRF_APP_SESSION_CONNECTING
126
127         # construct and send a CONNECT message
128         self.send(
129             osrfNetworkObject.osrfMessage( 
130                 {   'threadTrace' : 0,
131                     'type' : OSRF_MESSAGE_TYPE_CONNECT
132                 } 
133             )
134         )
135
136         while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
137             start = time.time()
138             self.wait(timeout)
139             timeout -= time.time() - start
140         
141         if self.state != OSRF_APP_SESSION_CONNECTED:
142             raise osrfServiceException("Unable to connect to " + self.service)
143         
144         return True
145
146     def disconnect(self):
147         """Disconnects from a remote service"""
148
149         if self.state == OSRF_APP_SESSION_DISCONNECTED:
150             return True
151
152         self.send(
153             osrfNetworkObject.osrfMessage( 
154                 {   'threadTrace' : 0,
155                     'type' : OSRF_MESSAGE_TYPE_DISCONNECT
156                 } 
157             )
158         )
159
160         self.state = OSRF_APP_SESSION_DISCONNECTED
161
162
163         
164     
165     def setRemoteId(self, remoteid):
166         self.remoteId = remoteid
167         osrfLogInternal("Setting request remote ID to %s" % self.remoteId)
168
169     def resetRemoteId(self):
170         """Recovers the original remote id"""
171         self.remoteId = self.origRemoteId
172         osrfLogInternal("Resetting remote ID to %s" % self.remoteId)
173
174     def pushResponseQueue(self, message):
175         """Pushes the message payload onto the response queue 
176             for the request associated with the message's ID."""
177         osrfLogDebug("pushing %s" % message.payload())
178         try:
179             self.findRequest(message.threadTrace()).pushResponse(message.payload())
180         except Exception, e: 
181             osrfLogWarn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
182
183     def findRequest(self, rid):
184         """Returns the original request matching this message's threadTrace."""
185         try:
186             return self.requests[str(rid)]
187         except KeyError:
188             osrfLogDebug('findRequest(): non-existent request %s' % str(rid))
189             return None
190
191
192
193 osrfSession.sessionCache = {}
194 def osrfFindSession(thread):
195     """Finds a session in the global cache."""
196     try:
197         return osrfClientSession.sessionCache[thread]
198     except: return None
199
200 class osrfRequest(object):
201     """Represents a single OpenSRF request.
202         A request is made and any resulting respones are 
203         collected for the client."""
204
205     def __init__(self, session, id, method=None, params=[]):
206
207         self.session = session # my session handle
208         self.id     = id # my unique request ID
209         self.method = method # method name
210         self.params = params # my method params
211         self.queue  = [] # response queue
212         self.resetTimeout = False # resets the recv timeout?
213         self.complete = False # has the server told us this request is done?
214         self.sendTime = 0 # local time the request was put on the wire
215         self.completeTime =  0 # time the server told us the request was completed
216         self.firstResponseTime = 0 # time it took for our first reponse to be received
217
218     def send(self):
219         """Sends a request message"""
220
221         # construct the method object message with params and method name
222         method = osrfNetworkObject.osrfMethod( {
223             'method' : self.method,
224             'params' : self.params
225         } )
226
227         # construct the osrf message with our method message embedded
228         message = osrfNetworkObject.osrfMessage( {
229             'threadTrace' : self.id,
230             'type' : OSRF_MESSAGE_TYPE_REQUEST,
231             'payload' : method
232         } )
233
234         self.sendTime = time.time()
235         self.session.send(message)
236
237     def recv(self, timeout=120):
238         """Waits up to <timeout> seconds for a response to this request.
239         
240             If a message is received in time, the response message is returned.
241             Returns None otherwise."""
242
243         self.session.wait(0)
244
245         origTimeout = timeout
246         while not self.complete and timeout >= 0 and len(self.queue) == 0:
247             s = time.time()
248             self.session.wait(timeout)
249             timeout -= time.time() - s
250             if self.resetTimeout:
251                 self.resetTimeout = False
252                 timeout = origTimeout
253
254         now = time.time()
255
256         # -----------------------------------------------------------------
257         # log some statistics 
258         if len(self.queue) > 0:
259             if not self.firstResponseTime:
260                 self.firstResponseTime = now
261                 osrfLogDebug("time elapsed before first response: %f" \
262                     % (self.firstResponseTime - self.sendTime))
263
264         if self.complete:
265             if not self.completeTime:
266                 self.completeTime = now
267                 osrfLogDebug("time elapsed before complete: %f" \
268                     % (self.completeTime - self.sendTime))
269         # -----------------------------------------------------------------
270
271
272         if len(self.queue) > 0:
273             # we have a reponse, return it
274             return self.queue.pop(0)
275
276         return None
277
278     def pushResponse(self, content):
279         """Pushes a method response onto this requests response queue."""
280         self.queue.append(content)
281
282     def cleanup(self):
283         """Cleans up request data from the cache. 
284
285             Do this when you are done with a request to prevent "leaked" cache memory."""
286         del self.session.requests[str(self.id)]
287
288     def setComplete(self):
289         """Sets me as complete.  This means the server has sent a 'request complete' message"""
290         self.complete = True
291
292
293 class osrfServerSession(osrfSession):
294     """Implements a server-side session"""
295     pass
296
297
298 def osrfAtomicRequest(service, method, *args):
299     ses = osrfClientSession(service)
300     req = ses.request2(method, list(args))
301     resp = req.recv()
302     data = resp.content()
303     req.cleanup()
304     ses.cleanup()
305     return data
306
307
308