1 import os, time, md5, random
2 from mod_python import apache, util
5 from osrf.system import osrfConnect
6 from osrf.json import osrfJSONToObject
7 from osrf.conf import osrfConfigValue
8 from osrf.set import osrfSettingsValue
9 from osrf.const import *
10 from osrf.net import *
11 from osrf.log import *
15 Proof of concept OpenSRF-HTTP multipart streaming gateway.
17 Example Apache mod_python config:
19 <Location /osrf-http-translator>
21 PythonPath "['/path/to/osrf-python'] + sys.path"
22 PythonHandler osrf.http_translator
23 PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
24 PythonOption OSRF_CONFIG_CONTEXT gateway
31 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
32 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-thread'
33 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
34 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
35 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
36 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
37 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
39 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
40 JSON_CONTENT_TYPE = 'text/plain';
46 # If true, all data sent to the client is also written to stderr (apache error log)
51 sys.stderr.write("%s\n\n" % str(s))
57 ''' At time of writing, mod_python doesn't support a childInit handler,
58 so this function is called once per process to initialize
59 the opensrf connection '''
61 global initComplete, ROUTER_NAME, OSRF_DOMAIN
65 ops = req.get_options()
66 conf = ops['OSRF_CONFIG']
67 ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
68 osrfConnect(conf, ctxt)
70 ROUTER_NAME = osrfConfigValue('router_name')
71 OSRF_DOMAIN = osrfConfigValue('domains.domain')
74 servers = osrfSettingsValue('cache.global.servers.server')
75 if not isinstance(servers, list):
77 osrf.cache.CacheClient.connect(servers)
81 ''' Create the translator and tell it to process the request. '''
83 return HTTPTranslator(req).process()
85 class HTTPTranslator(object):
86 def __init__(self, apreq):
93 post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
94 self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
100 self.complete = False
101 self.handle = osrfGetNetworkHandle()
102 self.handle.setRecvCallback(None)
104 self.to = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
105 self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
106 self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or "%s%s" % (os.getpid(), time.time())
107 self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
108 self.multipart = str(apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
109 self.disconnectOnly = False
111 # generate a random multipart delimiter
113 m.update("%f%d%d" % (time.time(), os.getpid(), random.randint(100,10000000)))
114 self.delim = m.hexdigest()
115 self.remoteHost = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
116 self.cache = osrf.cache.CacheClient()
121 if self.apreq.header_only:
124 return apache.HTTP_BAD_REQUEST
125 if not self.setToAddr():
126 return apache.HTTP_BAD_REQUEST
127 if not self.parseRequest():
128 return apache.HTTP_BAD_REQUEST
130 while self.handle.recv(0):
131 pass # drop stale messages
134 netMsg = osrfNetworkMessage(to=self.to, thread=self.thread, body=self.body)
135 self.handle.send(netMsg)
137 if self.disconnectOnly:
138 osrfLogDebug("exiting early on DISCONNECT")
142 while not self.complete:
144 netMsg = self.handle.recv(self.timeout)
146 return apache.GATEWAY_TIME_OUT
148 if not self.checkStatus(netMsg):
152 self.initHeaders(netMsg)
156 self.respondChunk(netMsg)
158 self.messages.append(netMsg.body)
162 # condense the sets of arrays into a single array of messages
163 json = self.messages.pop(0)
164 while len(self.messages) > 0:
165 m = self.messages.pop(0)
166 json = "%s,%s" % (json[0:len(json)-1], m[1:])
168 self.write("%s" % json)
173 def parseRequest(self):
174 ''' If this is solely a DISCONNECT message, we set self.disconnectOnly to true
175 @return True if the body parses correctly, False otherwise
177 osrfMsgs = osrfJSONToObject(self.body)
181 if len(osrfMsgs) == 1 and osrfMsgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
182 self.disconnectOnly = True
188 ''' Determines the TO address. Returns false if
189 the address is missing or ambiguous.
190 Also returns false if an explicit TO is specified and the
191 thread/IP/TO combination is not found in the session cache
195 osrfLogWarn("specifying both SERVICE and TO is not allowed")
197 self.to = "%s@%s/%s" % (ROUTER_NAME, OSRF_DOMAIN, self.service)
201 # If the client specifies a specific TO address, verify it's the same
202 # address that was cached with the previous request.
203 obj = self.cache.get(self.thread)
204 if obj and obj['ip'] == self.remoteHost and obj['jid'] == self.to:
206 osrfLogWarn("client [%s] attempted to send directly [%s] without a session" % (self.remoteHost, self.to))
210 def initHeaders(self, netMsg):
211 self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = netMsg.sender
213 self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
214 self.write("--%s\n" % self.delim)
216 self.apreq.content_type = JSON_CONTENT_TYPE
217 self.cache.put(self.thread, {'ip':self.remoteHost, 'jid': netMsg.sender}, CACHE_TIME)
219 osrfLogDebug("caching session [%s] for host [%s] and server drone [%s]" % (
220 self.thread, self.remoteHost, netMsg.sender))
224 def checkStatus(self, netMsg):
225 ''' Checks the status of the server response.
226 If we received a timeout message, we drop it.
227 if it's any other non-continue status, we mark this session as
228 complete and carry on.
229 @return False if there is no data to return to the caller
230 (dropped message, eg. timeout), True otherwise '''
232 osrfMsgs = osrfJSONToObject(netMsg.body)
233 lastMsg = osrfMsgs.pop()
235 if lastMsg.type() == OSRF_MESSAGE_TYPE_STATUS:
236 code = int(lastMsg.payload().statusCode())
238 if code == OSRF_STATUS_TIMEOUT:
239 osrfLogDebug("removing cached session [%s] and dropping TIMEOUT message" % netMsg.thread)
240 self.cache.delete(netMsg.thread)
243 if code != OSRF_STATUS_CONTINUE:
249 def respondChunk(self, resp):
250 ''' Writes a single multipart-delimited chunk of data '''
252 self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
253 self.write("%s\n\n" % resp.body)
255 self.write("--%s--\n" % self.delim)
257 self.write("--%s\n" % self.delim)
260 def write(self, msg):
261 ''' Writes data to the client stream. '''
264 sys.stderr.write(msg)
266 self.apreq.write(msg)