5b9c42240f73779c7cf15d892f81a747975a5598
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 from osrf.json import *
17 from osrf.net_obj import *
18 from osrf.conf import osrfConfigValue
19 from osrf.net import osrfNetworkMessage, osrfGetNetworkHandle
20 from osrf.log import *
21 from osrf.const import *
22 import random, sys, os, time, threading
23
24
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrfNetworkRegisterHint('osrfMessage', ['threadTrace', 'type', 'payload'], 'hash')
29 osrfNetworkRegisterHint('osrfMethod', ['method', 'params'], 'hash')
30 osrfNetworkRegisterHint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrfNetworkRegisterHint('osrfConnectStatus', ['status','statusCode'], 'hash')
32 osrfNetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash')
33
34
35 class osrfSession(object):
36     """Abstract session superclass."""
37
38     ''' Global cache of in-service sessions '''
39     sessionCache = {}
40
41     def __init__(self):
42         # by default, we're connected to no one
43         self.state = OSRF_APP_SESSION_DISCONNECTED
44
45     def findSession(threadTrace):
46         return osrfSession.sessionCache.get(threadTrace)
47     findSession = staticmethod(findSession)
48
49     def wait(self, timeout=120):
50         """Wait up to <timeout> seconds for data to arrive on the network"""
51         osrfLogInternal("osrfSession.wait(%d)" % timeout)
52         handle = osrfGetNetworkHandle()
53         handle.recv(timeout)
54
55     def send(self, omessage):
56         """Sends an OpenSRF message"""
57         netMessage = osrfNetworkMessage(
58             to      = self.remoteId,
59             body    = osrfObjectToJSON([omessage]),
60             thread = self.thread )
61
62         handle = osrfGetNetworkHandle()
63         handle.send(netMessage)
64
65     def cleanup(self):
66         """Removes the session from the global session cache."""
67         del osrfSession.sessionCache[self.thread]
68
69 class osrfClientSession(osrfSession):
70     """Client session object.  Use this to make server requests."""
71
72     def __init__(self, service):
73         
74         # call superclass constructor
75         osrfSession.__init__(self)
76
77         # the remote service we want to make requests of
78         self.service = service
79
80         # find the remote service handle <router>@<domain>/<service>
81         domain = osrfConfigValue('domains.domain', 0)
82         router = osrfConfigValue('router_name')
83         self.remoteId = "%s@%s/%s" % (router, domain, service)
84         self.origRemoteId = self.remoteId
85
86         # generate a random message thread
87         self.thread = "%s%s%s%s" % (os.getpid(), 
88             str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
89
90         # how many requests this session has taken part in
91         self.nextId = 0 
92
93         # cache of request objects 
94         self.requests = {}
95
96         # cache this session in the global session cache
97         osrfSession.sessionCache[self.thread] = self
98
99     def resetRequestTimeout(self, rid):
100         req = self.findRequest(rid)
101         if req:
102             req.resetTimeout = True
103             
104
105     def request2(self, method, arr):
106         """Creates a new request and sends the request to the server using a python array as the params."""
107         return self.__request(method, arr)
108
109     def request(self, method, *args):
110         """Creates a new request and sends the request to the server using a variable argument list as params"""
111         arr = list(args)
112         return self.__request(method, arr)
113
114     def __request(self, method, arr):
115         """Builds the request object and sends it."""
116         if self.state != OSRF_APP_SESSION_CONNECTED:
117             self.resetRemoteId()
118
119         osrfLogDebug("Sending request %s -> %s " % (self.service, method))
120         req = osrfRequest(self, self.nextId, method, arr)
121         self.requests[str(self.nextId)] = req
122         self.nextId += 1
123         req.send()
124         return req
125
126
127     def connect(self, timeout=10):
128         """Connects to a remote service"""
129
130         if self.state == OSRF_APP_SESSION_CONNECTED:
131             return True
132         self.state == OSRF_APP_SESSION_CONNECTING
133
134         # construct and send a CONNECT message
135         self.send(
136             osrfNetworkObject.osrfMessage( 
137                 {   'threadTrace' : 0,
138                     'type' : OSRF_MESSAGE_TYPE_CONNECT
139                 } 
140             )
141         )
142
143         while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
144             start = time.time()
145             self.wait(timeout)
146             timeout -= time.time() - start
147         
148         if self.state != OSRF_APP_SESSION_CONNECTED:
149             raise osrfServiceException("Unable to connect to " + self.service)
150         
151         return True
152
153     def disconnect(self):
154         """Disconnects from a remote service"""
155
156         if self.state == OSRF_APP_SESSION_DISCONNECTED:
157             return True
158
159         self.send(
160             osrfNetworkObject.osrfMessage( 
161                 {   'threadTrace' : 0,
162                     'type' : OSRF_MESSAGE_TYPE_DISCONNECT
163                 } 
164             )
165         )
166
167         self.state = OSRF_APP_SESSION_DISCONNECTED
168
169
170         
171     
172     def setRemoteId(self, remoteid):
173         self.remoteId = remoteid
174         osrfLogInternal("Setting request remote ID to %s" % self.remoteId)
175
176     def resetRemoteId(self):
177         """Recovers the original remote id"""
178         self.remoteId = self.origRemoteId
179         osrfLogInternal("Resetting remote ID to %s" % self.remoteId)
180
181     def pushResponseQueue(self, message):
182         """Pushes the message payload onto the response queue 
183             for the request associated with the message's ID."""
184         osrfLogDebug("pushing %s" % message.payload())
185         try:
186             self.findRequest(message.threadTrace()).pushResponse(message.payload())
187         except Exception, e: 
188             osrfLogWarn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
189
190     def findRequest(self, rid):
191         """Returns the original request matching this message's threadTrace."""
192         try:
193             return self.requests[str(rid)]
194         except KeyError:
195             osrfLogDebug('findRequest(): non-existent request %s' % str(rid))
196             return None
197
198
199
200 class osrfRequest(object):
201     """Represents a single OpenSRF request.
202         A request is made and any resulting respones are 
203         collected for the client."""
204
205     def __init__(self, session, id, method=None, params=[]):
206
207         self.session = session # my session handle
208         self.id     = id # my unique request ID
209         self.method = method # method name
210         self.params = params # my method params
211         self.queue  = [] # response queue
212         self.resetTimeout = False # resets the recv timeout?
213         self.complete = False # has the server told us this request is done?
214         self.sendTime = 0 # local time the request was put on the wire
215         self.completeTime =  0 # time the server told us the request was completed
216         self.firstResponseTime = 0 # time it took for our first reponse to be received
217
218     def send(self):
219         """Sends a request message"""
220
221         # construct the method object message with params and method name
222         method = osrfNetworkObject.osrfMethod( {
223             'method' : self.method,
224             'params' : self.params
225         } )
226
227         # construct the osrf message with our method message embedded
228         message = osrfNetworkObject.osrfMessage( {
229             'threadTrace' : self.id,
230             'type' : OSRF_MESSAGE_TYPE_REQUEST,
231             'payload' : method
232         } )
233
234         self.sendTime = time.time()
235         self.session.send(message)
236
237     def recv(self, timeout=120):
238         """Waits up to <timeout> seconds for a response to this request.
239         
240             If a message is received in time, the response message is returned.
241             Returns None otherwise."""
242
243         self.session.wait(0)
244
245         origTimeout = timeout
246         while not self.complete and timeout >= 0 and len(self.queue) == 0:
247             s = time.time()
248             self.session.wait(timeout)
249             timeout -= time.time() - s
250             if self.resetTimeout:
251                 self.resetTimeout = False
252                 timeout = origTimeout
253
254         now = time.time()
255
256         # -----------------------------------------------------------------
257         # log some statistics 
258         if len(self.queue) > 0:
259             if not self.firstResponseTime:
260                 self.firstResponseTime = now
261                 osrfLogDebug("time elapsed before first response: %f" \
262                     % (self.firstResponseTime - self.sendTime))
263
264         if self.complete:
265             if not self.completeTime:
266                 self.completeTime = now
267                 osrfLogDebug("time elapsed before complete: %f" \
268                     % (self.completeTime - self.sendTime))
269         # -----------------------------------------------------------------
270
271
272         if len(self.queue) > 0:
273             # we have a reponse, return it
274             return self.queue.pop(0)
275
276         return None
277
278     def pushResponse(self, content):
279         """Pushes a method response onto this requests response queue."""
280         self.queue.append(content)
281
282     def cleanup(self):
283         """Cleans up request data from the cache. 
284
285             Do this when you are done with a request to prevent "leaked" cache memory."""
286         del self.session.requests[str(self.id)]
287
288     def setComplete(self):
289         """Sets me as complete.  This means the server has sent a 'request complete' message"""
290         self.complete = True
291
292
293 class osrfServerSession(osrfSession):
294     """Implements a server-side session"""
295     pass
296
297
298 def osrfAtomicRequest(service, method, *args):
299     ses = osrfClientSession(service)
300     req = ses.request2(method, list(args))
301     resp = req.recv()
302     data = resp.content()
303     req.cleanup()
304     ses.cleanup()
305     return data
306
307
308