]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/ses.py
2fbb5029c69cbd1ec1e9e1661b233f736ab4df4b
[OpenSRF.git] / src / python / osrf / ses.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 from osrf.json import *
17 from osrf.conf import osrfConfigValue
18 from osrf.net import osrfNetworkMessage, osrfGetNetworkHandle
19 from osrf.log import *
20 from osrf.const import *
21 import random, sys, os, time
22
23
24 # -----------------------------------------------------------------------
25 # Go ahead and register the common network objects
26 # -----------------------------------------------------------------------
27 osrfNetworkRegisterHint('osrfMessage', ['threadTrace', 'type', 'payload'], 'hash')
28 osrfNetworkRegisterHint('osrfMethod', ['method', 'params'], 'hash')
29 osrfNetworkRegisterHint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
30 osrfNetworkRegisterHint('osrfConnectStatus', ['status','statusCode'], 'hash')
31 osrfNetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash')
32
33
34 class osrfSession(object):
35         """Abstract session superclass."""
36
37         def __init__(self):
38                 # by default, we're connected to no one
39                 self.state = OSRF_APP_SESSION_DISCONNECTED
40
41
42         def wait(self, timeout=120):
43                 """Wait up to <timeout> seconds for data to arrive on the network"""
44                 osrfLogInternal("osrfSession.wait(%d)" % timeout)
45                 handle = osrfGetNetworkHandle()
46                 handle.recv(timeout)
47
48         def send(self, omessage):
49                 """Sends an OpenSRF message"""
50                 netMessage = osrfNetworkMessage(
51                         to              = self.remoteId,
52                         body    = osrfObjectToJSON([omessage]),
53                         thread = self.thread )
54
55                 handle = osrfGetNetworkHandle()
56                 handle.send(netMessage)
57
58         def cleanup(self):
59                 """Removes the session from the global session cache."""
60                 del osrfClientSession.sessionCache[self.thread]
61
62 class osrfClientSession(osrfSession):
63         """Client session object.  Use this to make server requests."""
64
65         def __init__(self, service):
66                 
67                 # call superclass constructor
68                 osrfSession.__init__(self)
69
70                 # the remote service we want to make requests of
71                 self.service = service
72
73                 # find the remote service handle <router>@<domain>/<service>
74                 domain = osrfConfigValue('domains.domain', 0)
75                 router = osrfConfigValue('router_name')
76                 self.remoteId = "%s@%s/%s" % (router, domain, service)
77                 self.origRemoteId = self.remoteId
78
79                 # generate a random message thread
80                 self.thread = "%s%s%s" % (os.getpid(), str(random.randint(100,100000)), str(time.time()))
81
82                 # how many requests this session has taken part in
83                 self.nextId = 0 
84
85                 # cache of request objects 
86                 self.requests = {}
87
88                 # cache this session in the global session cache
89                 osrfClientSession.sessionCache[self.thread] = self
90
91         def resetRequestTimeout(self, rid):
92                 req = self.findRequest(rid)
93                 if req:
94                         req.resetTimeout = True
95                         
96
97         def request2(self, method, arr):
98                 """Creates a new request and sends the request to the server using a python array as the params."""
99                 return self.__request(method, arr)
100
101         def request(self, method, *args):
102                 """Creates a new request and sends the request to the server using a variable argument list as params"""
103                 arr = list(args)
104                 return self.__request(method, arr)
105
106         def __request(self, method, arr):
107                 """Builds the request object and sends it."""
108                 if self.state != OSRF_APP_SESSION_CONNECTED:
109                         self.resetRemoteId()
110
111                 osrfLogDebug("Sending request %s -> %s " % (self.service, method))
112                 req = osrfRequest(self, self.nextId, method, arr)
113                 self.requests[str(self.nextId)] = req
114                 self.nextId += 1
115                 req.send()
116                 return req
117
118
119         def connect(self, timeout=10):
120                 """Connects to a remote service"""
121
122                 if self.state == OSRF_APP_SESSION_CONNECTED:
123                         return True
124                 self.state == OSRF_APP_SESSION_CONNECTING
125
126                 # construct and send a CONNECT message
127                 self.send(
128                         osrfNetworkObject.osrfMessage( 
129                                 {       'threadTrace' : 0,
130                                         'type' : OSRF_MESSAGE_TYPE_CONNECT
131                                 } 
132                         )
133                 )
134
135                 while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
136                         start = time.time()
137                         self.wait(timeout)
138                         timeout -= time.time() - start
139                 
140                 if self.state != OSRF_APP_SESSION_CONNECTED:
141                         raise osrfServiceException("Unable to connect to " + self.service)
142                 
143                 return True
144
145         def disconnect(self):
146                 """Disconnects from a remote service"""
147
148                 if self.state == OSRF_APP_SESSION_DISCONNECTED:
149                         return True
150
151                 self.send(
152                         osrfNetworkObject.osrfMessage( 
153                                 {       'threadTrace' : 0,
154                                         'type' : OSRF_MESSAGE_TYPE_DISCONNECT
155                                 } 
156                         )
157                 )
158
159                 self.state = OSRF_APP_SESSION_DISCONNECTED
160
161
162                 
163         
164         def setRemoteId(self, remoteid):
165                 self.remoteId = remoteid
166                 osrfLogInternal("Setting request remote ID to %s" % self.remoteId)
167
168         def resetRemoteId(self):
169                 """Recovers the original remote id"""
170                 self.remoteId = self.origRemoteId
171                 osrfLogInternal("Resetting remote ID to %s" % self.remoteId)
172
173         def pushResponseQueue(self, message):
174                 """Pushes the message payload onto the response queue 
175                         for the request associated with the message's ID."""
176                 osrfLogDebug("pushing %s" % message.payload())
177                 try:
178                         self.findRequest(message.threadTrace()).pushResponse(message.payload())
179                 except Exception, e: 
180                         osrfLogWarn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
181
182         def findRequest(self, rid):
183                 """Returns the original request matching this message's threadTrace."""
184                 try:
185                         return self.requests[str(rid)]
186                 except KeyError:
187                         osrfLogDebug('findRequest(): non-existent request %s' % str(rid))
188                         return None
189
190
191
192 osrfSession.sessionCache = {}
193 def osrfFindSession(thread):
194         """Finds a session in the global cache."""
195         try:
196                 return osrfClientSession.sessionCache[thread]
197         except: return None
198
199 class osrfRequest(object):
200         """Represents a single OpenSRF request.
201                 A request is made and any resulting respones are 
202                 collected for the client."""
203
204         def __init__(self, session, id, method=None, params=[]):
205
206                 self.session = session # my session handle
207                 self.id         = id # my unique request ID
208                 self.method = method # method name
209                 self.params = params # my method params
210                 self.queue      = [] # response queue
211                 self.resetTimeout = False # resets the recv timeout?
212                 self.complete = False # has the server told us this request is done?
213                 self.sendTime = 0 # local time the request was put on the wire
214                 self.completeTime =  0 # time the server told us the request was completed
215                 self.firstResponseTime = 0 # time it took for our first reponse to be received
216
217         def send(self):
218                 """Sends a request message"""
219
220                 # construct the method object message with params and method name
221                 method = osrfNetworkObject.osrfMethod( {
222                         'method' : self.method,
223                         'params' : self.params
224                 } )
225
226                 # construct the osrf message with our method message embedded
227                 message = osrfNetworkObject.osrfMessage( {
228                         'threadTrace' : self.id,
229                         'type' : OSRF_MESSAGE_TYPE_REQUEST,
230                         'payload' : method
231                 } )
232
233                 self.sendTime = time.time()
234                 self.session.send(message)
235
236         def recv(self, timeout=120):
237                 """Waits up to <timeout> seconds for a response to this request.
238                 
239                         If a message is received in time, the response message is returned.
240                         Returns None otherwise."""
241
242                 self.session.wait(0)
243
244                 origTimeout = timeout
245                 while not self.complete and timeout >= 0 and len(self.queue) == 0:
246                         s = time.time()
247                         self.session.wait(timeout)
248                         timeout -= time.time() - s
249                         if self.resetTimeout:
250                                 self.resetTimeout = False
251                                 timeout = origTimeout
252
253                 now = time.time()
254
255                 # -----------------------------------------------------------------
256                 # log some statistics 
257                 if len(self.queue) > 0:
258                         if not self.firstResponseTime:
259                                 self.firstResponseTime = now
260                                 osrfLogDebug("time elapsed before first response: %f" \
261                                         % (self.firstResponseTime - self.sendTime))
262
263                 if self.complete:
264                         if not self.completeTime:
265                                 self.completeTime = now
266                                 osrfLogDebug("time elapsed before complete: %f" \
267                                         % (self.completeTime - self.sendTime))
268                 # -----------------------------------------------------------------
269
270
271                 if len(self.queue) > 0:
272                         # we have a reponse, return it
273                         return self.queue.pop(0)
274
275                 return None
276
277         def pushResponse(self, content):
278                 """Pushes a method response onto this requests response queue."""
279                 self.queue.append(content)
280
281         def cleanup(self):
282                 """Cleans up request data from the cache. 
283
284                         Do this when you are done with a request to prevent "leaked" cache memory."""
285                 del self.session.requests[str(self.id)]
286
287         def setComplete(self):
288                 """Sets me as complete.  This means the server has sent a 'request complete' message"""
289                 self.complete = True
290
291
292 class osrfServerSession(osrfSession):
293         """Implements a server-side session"""
294         pass
295
296
297 def osrfAtomicRequest(service, method, *args):
298         ses = osrfClientSession(service)
299         req = ses.request2('open-ils.cstore.direct.actor.user.retrieve', list(args)) # grab user with ID 1
300         resp = req.recv()
301         data = resp.content()
302         req.cleanup()
303         ses.cleanup()
304         return data
305
306
307