1 import os, time, md5, random
2 from mod_python import apache, util
10 from osrf.const import OSRF_MESSAGE_TYPE_DISCONNECT, OSRF_STATUS_CONTINUE, \
11 OSRF_STATUS_TIMEOUT, OSRF_MESSAGE_TYPE_STATUS
17 Proof of concept OpenSRF-HTTP multipart streaming gateway.
19 Example Apache mod_python config:
21 <Location /osrf-http-translator>
23 PythonPath "['/path/to/osrf-python'] + sys.path"
24 PythonHandler osrf.http_translator
25 PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
26 PythonOption OSRF_CONFIG_CONTEXT gateway
33 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
34 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-thread'
35 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
36 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
37 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
38 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
39 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
41 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
42 JSON_CONTENT_TYPE = 'text/plain'
48 # If DEBUG_WRITE = True, all data sent to the client is also written
49 # to stderr (apache error log)
54 sys.stderr.write("%s\n\n" % str(msg))
60 ''' At time of writing, mod_python doesn't support a child_init handler,
61 so this function is called once per process to initialize
62 the opensrf connection '''
64 global INIT_COMPLETE, ROUTER_NAME, OSRF_DOMAIN
68 ops = req.get_options()
69 conf = ops['OSRF_CONFIG']
70 ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
71 osrf.system.connect(conf, ctxt)
73 ROUTER_NAME = osrf.conf.get('router_name')
74 OSRF_DOMAIN = osrf.conf.get('domains.domain')
77 servers = osrf.set.get('cache.global.servers.server')
78 if not isinstance(servers, list):
80 osrf.cache.CacheClient.connect(servers)
84 ''' Create the translator and tell it to process the request. '''
86 return HTTPTranslator(req).process()
88 class HTTPTranslator(object):
89 def __init__(self, apreq):
96 post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
97 self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
103 self.complete = False
104 self.handle = osrf.net.get_network_handle()
105 self.handle.set_receive_callback(None)
107 self.recipient = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
108 self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
109 self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or \
110 "%s%s" % (os.getpid(), time.time())
111 self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
112 self.multipart = str( \
113 apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
114 self.disconnect_only = False
116 # generate a random multipart delimiter
118 mpart.update("%f%d%d" % (time.time(), os.getpid(), \
119 random.randint(100, 10000000)))
120 self.delim = mpart.hexdigest()
121 self.remote_host = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
122 self.cache = osrf.cache.CacheClient()
127 if self.apreq.header_only:
130 return apache.HTTP_BAD_REQUEST
131 if not self.set_to_addr():
132 return apache.HTTP_BAD_REQUEST
133 if not self.parse_request():
134 return apache.HTTP_BAD_REQUEST
136 while self.handle.recv(0):
137 pass # drop stale messages
140 net_msg = NetworkMessage(recipient=self.recipient, thread=self.thread, \
141 body=self.body, locale=self.locale)
142 self.handle.send(net_msg)
144 if self.disconnect_only:
145 osrf.log.log_debug("exiting early on DISCONNECT")
149 while not self.complete:
151 net_msg = self.handle.recv(self.timeout)
153 return apache.GATEWAY_TIME_OUT
155 if not self.check_status(net_msg):
159 self.init_headers(net_msg)
163 self.respond_chunk(net_msg)
165 self.messages.append(net_msg.body)
167 # condense the sets of arrays into a single array of messages
169 json = self.messages.pop(0)
170 while len(self.messages) > 0:
171 msg = self.messages.pop(0)
172 json = "%s,%s" % (json[0:len(json)-1], msg[1:])
174 self.write("%s" % json)
179 def parse_request(self):
181 If this is solely a DISCONNECT message, we set self.disconnect_only
183 @return True if the body parses correctly, False otherwise
185 osrf_msgs = osrf.json.to_object(self.body)
189 if len(osrf_msgs) == 1 and \
190 osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
191 self.disconnect_only = True
196 def set_to_addr(self):
197 ''' Determines the TO address. Returns false if
198 the address is missing or ambiguous.
199 Also returns false if an explicit TO is specified and the
200 thread/IP/TO combination is not found in the session cache
204 osrf.log.log_warn("specifying both SERVICE and TO is not allowed")
206 self.recipient = "%s@%s/%s" % \
207 (ROUTER_NAME, OSRF_DOMAIN, self.service)
211 # If the client specifies a specific TO address, verify it's
212 # the same address that was cached with the previous request.
213 obj = self.cache.get(self.thread)
214 if obj and obj['ip'] == self.remote_host and \
215 obj['jid'] == self.recipient:
217 osrf.log.log_warn("client [%s] attempted to send directly "
218 "[%s] without a session" % (self.remote_host, self.recipient))
222 def init_headers(self, net_msg):
223 self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = net_msg.sender
225 self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
226 self.write("--%s\n" % self.delim)
228 self.apreq.content_type = JSON_CONTENT_TYPE
229 self.cache.put(self.thread, \
230 {'ip':self.remote_host, 'jid': net_msg.sender}, CACHE_TIME)
232 osrf.log.log_debug("caching session [%s] for host [%s] and server "
233 " drone [%s]" % (self.thread, self.remote_host, net_msg.sender))
237 def check_status(self, net_msg):
238 ''' Checks the status of the server response.
239 If we received a timeout message, we drop it.
240 if it's any other non-continue status, we mark this session as
241 complete and carry on.
242 @return False if there is no data to return to the caller
243 (dropped message, eg. timeout), True otherwise '''
245 osrf_msgs = osrf.json.to_object(net_msg.body)
246 last_msg = osrf_msgs.pop()
248 if last_msg.type() == OSRF_MESSAGE_TYPE_STATUS:
249 code = int(last_msg.payload().statusCode())
251 if code == OSRF_STATUS_TIMEOUT:
252 osrf.log.log_debug("removing cached session [%s] and "
253 "dropping TIMEOUT message" % net_msg.thread)
254 self.cache.delete(net_msg.thread)
257 if code != OSRF_STATUS_CONTINUE:
263 def respond_chunk(self, resp):
264 ''' Writes a single multipart-delimited chunk of data '''
266 self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
267 self.write("%s\n\n" % resp.body)
269 self.write("--%s--\n" % self.delim)
271 self.write("--%s\n" % self.delim)
274 def write(self, msg):
275 ''' Writes data to the client stream. '''
278 sys.stderr.write(msg)
280 self.apreq.write(msg)