]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/ses.py
f1c6c387c5ed8765a81ce09c92552f13fc88962f
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 from osrf.json import *
17 from osrf.net_obj import *
18 from osrf.conf import osrfConfigValue
19 from osrf.net import osrfNetworkMessage, osrfGetNetworkHandle
20 from osrf.log import *
21 from osrf.const import *
22 import random, sys, os, time
23
24
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrfNetworkRegisterHint('osrfMessage', ['threadTrace', 'type', 'payload'], 'hash')
29 osrfNetworkRegisterHint('osrfMethod', ['method', 'params'], 'hash')
30 osrfNetworkRegisterHint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrfNetworkRegisterHint('osrfConnectStatus', ['status','statusCode'], 'hash')
32 osrfNetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash')
33
34
35 class osrfSession(object):
36         """Abstract session superclass."""
37
38         def __init__(self):
39                 # by default, we're connected to no one
40                 self.state = OSRF_APP_SESSION_DISCONNECTED
41
42
43         def wait(self, timeout=120):
44                 """Wait up to <timeout> seconds for data to arrive on the network"""
45                 osrfLogInternal("osrfSession.wait(%d)" % timeout)
46                 handle = osrfGetNetworkHandle()
47                 handle.recv(timeout)
48
49         def send(self, omessage):
50                 """Sends an OpenSRF message"""
51                 netMessage = osrfNetworkMessage(
52                         to              = self.remoteId,
53                         body    = osrfObjectToJSON([omessage]),
54                         thread = self.thread )
55
56                 handle = osrfGetNetworkHandle()
57                 handle.send(netMessage)
58
59         def cleanup(self):
60                 """Removes the session from the global session cache."""
61                 del osrfClientSession.sessionCache[self.thread]
62
63 class osrfClientSession(osrfSession):
64         """Client session object.  Use this to make server requests."""
65
66         def __init__(self, service):
67                 
68                 # call superclass constructor
69                 osrfSession.__init__(self)
70
71                 # the remote service we want to make requests of
72                 self.service = service
73
74                 # find the remote service handle <router>@<domain>/<service>
75                 domain = osrfConfigValue('domains.domain', 0)
76                 router = osrfConfigValue('router_name')
77                 self.remoteId = "%s@%s/%s" % (router, domain, service)
78                 self.origRemoteId = self.remoteId
79
80                 # generate a random message thread
81                 self.thread = "%s%s%s" % (os.getpid(), str(random.randint(100,100000)), str(time.time()))
82
83                 # how many requests this session has taken part in
84                 self.nextId = 0 
85
86                 # cache of request objects 
87                 self.requests = {}
88
89                 # cache this session in the global session cache
90                 osrfClientSession.sessionCache[self.thread] = self
91
92         def resetRequestTimeout(self, rid):
93                 req = self.findRequest(rid)
94                 if req:
95                         req.resetTimeout = True
96                         
97
98         def request2(self, method, arr):
99                 """Creates a new request and sends the request to the server using a python array as the params."""
100                 return self.__request(method, arr)
101
102         def request(self, method, *args):
103                 """Creates a new request and sends the request to the server using a variable argument list as params"""
104                 arr = list(args)
105                 return self.__request(method, arr)
106
107         def __request(self, method, arr):
108                 """Builds the request object and sends it."""
109                 if self.state != OSRF_APP_SESSION_CONNECTED:
110                         self.resetRemoteId()
111
112                 osrfLogDebug("Sending request %s -> %s " % (self.service, method))
113                 req = osrfRequest(self, self.nextId, method, arr)
114                 self.requests[str(self.nextId)] = req
115                 self.nextId += 1
116                 req.send()
117                 return req
118
119
120         def connect(self, timeout=10):
121                 """Connects to a remote service"""
122
123                 if self.state == OSRF_APP_SESSION_CONNECTED:
124                         return True
125                 self.state == OSRF_APP_SESSION_CONNECTING
126
127                 # construct and send a CONNECT message
128                 self.send(
129                         osrfNetworkObject.osrfMessage( 
130                                 {       'threadTrace' : 0,
131                                         'type' : OSRF_MESSAGE_TYPE_CONNECT
132                                 } 
133                         )
134                 )
135
136                 while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
137                         start = time.time()
138                         self.wait(timeout)
139                         timeout -= time.time() - start
140                 
141                 if self.state != OSRF_APP_SESSION_CONNECTED:
142                         raise osrfServiceException("Unable to connect to " + self.service)
143                 
144                 return True
145
146         def disconnect(self):
147                 """Disconnects from a remote service"""
148
149                 if self.state == OSRF_APP_SESSION_DISCONNECTED:
150                         return True
151
152                 self.send(
153                         osrfNetworkObject.osrfMessage( 
154                                 {       'threadTrace' : 0,
155                                         'type' : OSRF_MESSAGE_TYPE_DISCONNECT
156                                 } 
157                         )
158                 )
159
160                 self.state = OSRF_APP_SESSION_DISCONNECTED
161
162
163                 
164         
165         def setRemoteId(self, remoteid):
166                 self.remoteId = remoteid
167                 osrfLogInternal("Setting request remote ID to %s" % self.remoteId)
168
169         def resetRemoteId(self):
170                 """Recovers the original remote id"""
171                 self.remoteId = self.origRemoteId
172                 osrfLogInternal("Resetting remote ID to %s" % self.remoteId)
173
174         def pushResponseQueue(self, message):
175                 """Pushes the message payload onto the response queue 
176                         for the request associated with the message's ID."""
177                 osrfLogDebug("pushing %s" % message.payload())
178                 try:
179                         self.findRequest(message.threadTrace()).pushResponse(message.payload())
180                 except Exception, e: 
181                         osrfLogWarn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
182
183         def findRequest(self, rid):
184                 """Returns the original request matching this message's threadTrace."""
185                 try:
186                         return self.requests[str(rid)]
187                 except KeyError:
188                         osrfLogDebug('findRequest(): non-existent request %s' % str(rid))
189                         return None
190
191
192
193 osrfSession.sessionCache = {}
194 def osrfFindSession(thread):
195         """Finds a session in the global cache."""
196         try:
197                 return osrfClientSession.sessionCache[thread]
198         except: return None
199
200 class osrfRequest(object):
201         """Represents a single OpenSRF request.
202                 A request is made and any resulting respones are 
203                 collected for the client."""
204
205         def __init__(self, session, id, method=None, params=[]):
206
207                 self.session = session # my session handle
208                 self.id         = id # my unique request ID
209                 self.method = method # method name
210                 self.params = params # my method params
211                 self.queue      = [] # response queue
212                 self.resetTimeout = False # resets the recv timeout?
213                 self.complete = False # has the server told us this request is done?
214                 self.sendTime = 0 # local time the request was put on the wire
215                 self.completeTime =  0 # time the server told us the request was completed
216                 self.firstResponseTime = 0 # time it took for our first reponse to be received
217
218         def send(self):
219                 """Sends a request message"""
220
221                 # construct the method object message with params and method name
222                 method = osrfNetworkObject.osrfMethod( {
223                         'method' : self.method,
224                         'params' : self.params
225                 } )
226
227                 # construct the osrf message with our method message embedded
228                 message = osrfNetworkObject.osrfMessage( {
229                         'threadTrace' : self.id,
230                         'type' : OSRF_MESSAGE_TYPE_REQUEST,
231                         'payload' : method
232                 } )
233
234                 self.sendTime = time.time()
235                 self.session.send(message)
236
237         def recv(self, timeout=120):
238                 """Waits up to <timeout> seconds for a response to this request.
239                 
240                         If a message is received in time, the response message is returned.
241                         Returns None otherwise."""
242
243                 self.session.wait(0)
244
245                 origTimeout = timeout
246                 while not self.complete and timeout >= 0 and len(self.queue) == 0:
247                         s = time.time()
248                         self.session.wait(timeout)
249                         timeout -= time.time() - s
250                         if self.resetTimeout:
251                                 self.resetTimeout = False
252                                 timeout = origTimeout
253
254                 now = time.time()
255
256                 # -----------------------------------------------------------------
257                 # log some statistics 
258                 if len(self.queue) > 0:
259                         if not self.firstResponseTime:
260                                 self.firstResponseTime = now
261                                 osrfLogDebug("time elapsed before first response: %f" \
262                                         % (self.firstResponseTime - self.sendTime))
263
264                 if self.complete:
265                         if not self.completeTime:
266                                 self.completeTime = now
267                                 osrfLogDebug("time elapsed before complete: %f" \
268                                         % (self.completeTime - self.sendTime))
269                 # -----------------------------------------------------------------
270
271
272                 if len(self.queue) > 0:
273                         # we have a reponse, return it
274                         return self.queue.pop(0)
275
276                 return None
277
278         def pushResponse(self, content):
279                 """Pushes a method response onto this requests response queue."""
280                 self.queue.append(content)
281
282         def cleanup(self):
283                 """Cleans up request data from the cache. 
284
285                         Do this when you are done with a request to prevent "leaked" cache memory."""
286                 del self.session.requests[str(self.id)]
287
288         def setComplete(self):
289                 """Sets me as complete.  This means the server has sent a 'request complete' message"""
290                 self.complete = True
291
292
293 class osrfServerSession(osrfSession):
294         """Implements a server-side session"""
295         pass
296
297
298 def osrfAtomicRequest(service, method, *args):
299         ses = osrfClientSession(service)
300         req = ses.request2('open-ils.cstore.direct.actor.user.retrieve', list(args)) # grab user with ID 1
301         resp = req.recv()
302         data = resp.content()
303         req.cleanup()
304         ses.cleanup()
305         return data
306
307
308