1 import os, time, md5, random
2 from mod_python import apache, util
5 from osrf.system import osrfConnect
6 from osrf.json import osrfJSONToObject
7 from osrf.conf import osrfConfigValue
8 from osrf.set import osrfSettingsValue
9 from osrf.const import *
10 from osrf.net import *
14 Proof of concept OpenSRF-HTTP multipart streaming gateway.
16 Example Apache mod_python config:
18 <Location /osrf-http-translator>
20 PythonPath "['/path/to/translator-dir'] + sys.path"
21 PythonHandler osrf.http_translator
22 PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
23 PythonOption OSRF_CONFIG_CONTEXT gateway
30 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
31 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-thread'
32 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
33 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
34 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
35 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
36 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
38 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
39 JSON_CONTENT_TYPE = 'text/plain';
45 # If true, all data sent to the client is also written to stderr (apache error log)
50 sys.stderr.write("%s\n\n" % str(s))
56 ''' At time of writing, mod_python doesn't support a childInit handler,
57 so this function is called once per process to initialize
58 the opensrf connection '''
60 global initComplete, ROUTER_NAME, OSRF_DOMAIN
64 ops = req.get_options()
65 conf = ops['OSRF_CONFIG']
66 ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
67 osrfConnect(conf, ctxt)
69 ROUTER_NAME = osrfConfigValue('router_name')
70 OSRF_DOMAIN = osrfConfigValue('domains.domain')
73 servers = osrfSettingsValue('cache.global.servers.server')
74 if not isinstance(servers, list):
76 osrf.cache.CacheClient.connect(servers)
80 ''' Create the translator and tell it to process the request. '''
82 return HTTPTranslator(req).process()
84 class HTTPTranslator(object):
85 def __init__(self, apreq):
92 post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
93 self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
100 self.handle = osrfGetNetworkHandle()
101 self.handle.setRecvCallback(None)
103 self.to = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
104 self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
105 self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or "%s%s" % (os.getpid(), time.time())
106 self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
107 self.multipart = str(apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
109 # generate a random multipart delimiter
111 m.update("%f%d%d" % (time.time(), os.getpid(), random.randint(100,10000000)))
112 self.delim = m.hexdigest()
113 self.remoteHost = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
114 self.cache = osrf.cache.CacheClient()
119 if self.apreq.header_only:
122 return apache.HTTP_BAD_REQUEST
123 if not self.setToAddr():
124 return apache.HTTP_BAD_REQUEST
126 while self.handle.recv(0):
127 pass # drop stale messages
129 netMsg = osrfNetworkMessage(to=self.to, thread=self.thread, body=self.body)
130 self.handle.send(netMsg)
133 while not self.complete:
135 netMsg = self.handle.recv(self.timeout)
137 return apache.GATEWAY_TIME_OUT
139 if not self.checkStatus(netMsg):
143 self.initHeaders(netMsg)
147 self.respondChunk(netMsg)
149 self.messages.append(netMsg.body)
153 # condense the sets of arrays into a single array of messages
154 json = self.messages.pop(0)
155 while len(self.messages) > 0:
156 m = self.messages.pop(0)
157 json = "%s,%s" % (json[0:len(json)-1], m[1:])
159 self.write("%s" % json)
165 ''' Determines the TO address. Returns false if
166 the address is missing or ambiguous. '''
169 # specifying both a SERVICE and a TO is not allowed
171 self.to = "%s@%s/%s" % (ROUTER_NAME, OSRF_DOMAIN, self.service)
175 # If the client specifies a specific TO address, verify it's the same
176 # address that was cached with the previous request.
177 obj = self.cache.get(self.thread)
178 if obj and obj['ip'] == self.remoteHost and obj['jid'] == self.to:
183 def initHeaders(self, netMsg):
184 self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = netMsg.sender
186 self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
187 self.write("--%s\n" % self.delim)
189 self.apreq.content_type = JSON_CONTENT_TYPE
190 self.cache.put(self.thread, {'ip':self.remoteHost, 'jid': netMsg.sender}, CACHE_TIME)
194 def checkStatus(self, netMsg):
195 ''' Checks the status of the server response.
196 If we received a timeout message, we drop it.
197 if it's any other non-continue status, we mark this session as
198 complete and carry on.
199 @return False if there is no data to return to the caller
200 (dropped message, eg. timeout), True otherwise '''
202 osrfMsgs = osrfJSONToObject(netMsg.body)
203 lastMsg = osrfMsgs.pop()
205 if lastMsg.type() == OSRF_MESSAGE_TYPE_STATUS:
206 code = int(lastMsg.payload().statusCode())
208 if code == OSRF_STATUS_TIMEOUT:
209 # remove any existing thread cache for this session and drop the message
210 self.cache.delete(netMsg.thread)
213 if code != OSRF_STATUS_CONTINUE:
219 def respondChunk(self, resp):
220 ''' Writes a single multipart-delimited chunk of data '''
222 self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
223 self.write("%s\n\n" % resp.body)
225 self.write("--%s--\n" % self.delim)
227 self.write("--%s\n" % self.delim)
230 def write(self, msg):
231 ''' Writes data to the client stream. '''
234 sys.stderr.write(msg)
236 self.apreq.write(msg)