1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
16 from osrf.json import *
17 from osrf.net_obj import *
18 from osrf.conf import osrfConfigValue
19 from osrf.net import osrfNetworkMessage, osrfGetNetworkHandle
20 from osrf.log import *
21 from osrf.const import *
22 import random, sys, os, time
25 # -----------------------------------------------------------------------
26 # Go ahead and register the common network objects
27 # -----------------------------------------------------------------------
28 osrfNetworkRegisterHint('osrfMessage', ['threadTrace', 'type', 'payload'], 'hash')
29 osrfNetworkRegisterHint('osrfMethod', ['method', 'params'], 'hash')
30 osrfNetworkRegisterHint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
31 osrfNetworkRegisterHint('osrfConnectStatus', ['status','statusCode'], 'hash')
32 osrfNetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash')
35 class osrfSession(object):
36 """Abstract session superclass."""
39 # by default, we're connected to no one
40 self.state = OSRF_APP_SESSION_DISCONNECTED
43 def wait(self, timeout=120):
44 """Wait up to <timeout> seconds for data to arrive on the network"""
45 osrfLogInternal("osrfSession.wait(%d)" % timeout)
46 handle = osrfGetNetworkHandle()
49 def send(self, omessage):
50 """Sends an OpenSRF message"""
51 netMessage = osrfNetworkMessage(
53 body = osrfObjectToJSON([omessage]),
54 thread = self.thread )
56 handle = osrfGetNetworkHandle()
57 handle.send(netMessage)
60 """Removes the session from the global session cache."""
61 del osrfClientSession.sessionCache[self.thread]
63 class osrfClientSession(osrfSession):
64 """Client session object. Use this to make server requests."""
66 def __init__(self, service):
68 # call superclass constructor
69 osrfSession.__init__(self)
71 # the remote service we want to make requests of
72 self.service = service
74 # find the remote service handle <router>@<domain>/<service>
75 domain = osrfConfigValue('domains.domain', 0)
76 router = osrfConfigValue('router_name')
77 self.remoteId = "%s@%s/%s" % (router, domain, service)
78 self.origRemoteId = self.remoteId
80 # generate a random message thread
81 self.thread = "%s%s%s" % (os.getpid(), str(random.randint(100,100000)), str(time.time()))
83 # how many requests this session has taken part in
86 # cache of request objects
89 # cache this session in the global session cache
90 osrfClientSession.sessionCache[self.thread] = self
92 def resetRequestTimeout(self, rid):
93 req = self.findRequest(rid)
95 req.resetTimeout = True
98 def request2(self, method, arr):
99 """Creates a new request and sends the request to the server using a python array as the params."""
100 return self.__request(method, arr)
102 def request(self, method, *args):
103 """Creates a new request and sends the request to the server using a variable argument list as params"""
105 return self.__request(method, arr)
107 def __request(self, method, arr):
108 """Builds the request object and sends it."""
109 if self.state != OSRF_APP_SESSION_CONNECTED:
112 osrfLogDebug("Sending request %s -> %s " % (self.service, method))
113 req = osrfRequest(self, self.nextId, method, arr)
114 self.requests[str(self.nextId)] = req
120 def connect(self, timeout=10):
121 """Connects to a remote service"""
123 if self.state == OSRF_APP_SESSION_CONNECTED:
125 self.state == OSRF_APP_SESSION_CONNECTING
127 # construct and send a CONNECT message
129 osrfNetworkObject.osrfMessage(
131 'type' : OSRF_MESSAGE_TYPE_CONNECT
136 while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
139 timeout -= time.time() - start
141 if self.state != OSRF_APP_SESSION_CONNECTED:
142 raise osrfServiceException("Unable to connect to " + self.service)
146 def disconnect(self):
147 """Disconnects from a remote service"""
149 if self.state == OSRF_APP_SESSION_DISCONNECTED:
153 osrfNetworkObject.osrfMessage(
155 'type' : OSRF_MESSAGE_TYPE_DISCONNECT
160 self.state = OSRF_APP_SESSION_DISCONNECTED
165 def setRemoteId(self, remoteid):
166 self.remoteId = remoteid
167 osrfLogInternal("Setting request remote ID to %s" % self.remoteId)
169 def resetRemoteId(self):
170 """Recovers the original remote id"""
171 self.remoteId = self.origRemoteId
172 osrfLogInternal("Resetting remote ID to %s" % self.remoteId)
174 def pushResponseQueue(self, message):
175 """Pushes the message payload onto the response queue
176 for the request associated with the message's ID."""
177 osrfLogDebug("pushing %s" % message.payload())
179 self.findRequest(message.threadTrace()).pushResponse(message.payload())
181 osrfLogWarn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
183 def findRequest(self, rid):
184 """Returns the original request matching this message's threadTrace."""
186 return self.requests[str(rid)]
188 osrfLogDebug('findRequest(): non-existent request %s' % str(rid))
193 osrfSession.sessionCache = {}
194 def osrfFindSession(thread):
195 """Finds a session in the global cache."""
197 return osrfClientSession.sessionCache[thread]
200 class osrfRequest(object):
201 """Represents a single OpenSRF request.
202 A request is made and any resulting respones are
203 collected for the client."""
205 def __init__(self, session, id, method=None, params=[]):
207 self.session = session # my session handle
208 self.id = id # my unique request ID
209 self.method = method # method name
210 self.params = params # my method params
211 self.queue = [] # response queue
212 self.resetTimeout = False # resets the recv timeout?
213 self.complete = False # has the server told us this request is done?
214 self.sendTime = 0 # local time the request was put on the wire
215 self.completeTime = 0 # time the server told us the request was completed
216 self.firstResponseTime = 0 # time it took for our first reponse to be received
219 """Sends a request message"""
221 # construct the method object message with params and method name
222 method = osrfNetworkObject.osrfMethod( {
223 'method' : self.method,
224 'params' : self.params
227 # construct the osrf message with our method message embedded
228 message = osrfNetworkObject.osrfMessage( {
229 'threadTrace' : self.id,
230 'type' : OSRF_MESSAGE_TYPE_REQUEST,
234 self.sendTime = time.time()
235 self.session.send(message)
237 def recv(self, timeout=120):
238 """Waits up to <timeout> seconds for a response to this request.
240 If a message is received in time, the response message is returned.
241 Returns None otherwise."""
245 origTimeout = timeout
246 while not self.complete and timeout >= 0 and len(self.queue) == 0:
248 self.session.wait(timeout)
249 timeout -= time.time() - s
250 if self.resetTimeout:
251 self.resetTimeout = False
252 timeout = origTimeout
256 # -----------------------------------------------------------------
257 # log some statistics
258 if len(self.queue) > 0:
259 if not self.firstResponseTime:
260 self.firstResponseTime = now
261 osrfLogDebug("time elapsed before first response: %f" \
262 % (self.firstResponseTime - self.sendTime))
265 if not self.completeTime:
266 self.completeTime = now
267 osrfLogDebug("time elapsed before complete: %f" \
268 % (self.completeTime - self.sendTime))
269 # -----------------------------------------------------------------
272 if len(self.queue) > 0:
273 # we have a reponse, return it
274 return self.queue.pop(0)
278 def pushResponse(self, content):
279 """Pushes a method response onto this requests response queue."""
280 self.queue.append(content)
283 """Cleans up request data from the cache.
285 Do this when you are done with a request to prevent "leaked" cache memory."""
286 del self.session.requests[str(self.id)]
288 def setComplete(self):
289 """Sets me as complete. This means the server has sent a 'request complete' message"""
293 class osrfServerSession(osrfSession):
294 """Implements a server-side session"""
298 def osrfAtomicRequest(service, method, *args):
299 ses = osrfClientSession(service)
300 req = ses.request2('open-ils.cstore.direct.actor.user.retrieve', list(args)) # grab user with ID 1
302 data = resp.content()