1 import sys, os, time, md5, random
2 from mod_python import apache, util
3 import osrf.system, osrf.cache, osrf.json, osrf.conf, osrf.net, osrf.log
4 from osrf.const import OSRF_MESSAGE_TYPE_DISCONNECT, OSRF_MESSAGE_TYPE_CONNECT, \
5 OSRF_STATUS_CONTINUE, OSRF_STATUS_TIMEOUT, OSRF_MESSAGE_TYPE_STATUS
9 Proof of concept OpenSRF-HTTP multipart streaming gateway.
11 Example Apache mod_python config:
13 <Location /osrf-http-translator>
15 PythonPath "['/path/to/osrf-python'] + sys.path"
16 PythonHandler osrf.http_translator
17 PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
18 PythonOption OSRF_CONFIG_CONTEXT config.gateway
19 PythonOption OSRF_CACHE_SERVERS 127.0.0.1:11211
25 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
26 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-xid'
27 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
28 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
29 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
30 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
31 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
32 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
33 JSON_CONTENT_TYPE = 'text/plain'
39 # If DEBUG_WRITE = True, all data sent to the client is also written
40 # to stderr (apache error log)
45 sys.stderr.write("%s\n\n" % str(msg))
51 ''' At time of writing, mod_python doesn't support a child_init handler,
52 so this function is called once per process to initialize
53 the opensrf connection '''
55 global INIT_COMPLETE, ROUTER_NAME, OSRF_DOMAIN
59 # Apache complains with: UnboundLocalError: local variable 'osrf' referenced before assignment
60 # if the following import line is removed, even though its also at the top of the file...
63 ops = req.get_options()
64 conf = ops['OSRF_CONFIG']
65 ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
66 osrf.system.System.net_connect(config_file=conf, config_context=ctxt)
68 ROUTER_NAME = osrf.conf.get('router_name')
69 OSRF_DOMAIN = osrf.conf.get('domain')
72 servers = ops.get('OSRF_CACHE_SERVERS')
74 servers = servers.split(',')
76 # no cache servers configured, see if we can talk to the settings server
78 servers = osrf.set.get('cache.global.servers.server')
79 if not isinstance(servers, list):
82 osrf.cache.CacheClient.connect(servers)
86 ''' Create the translator and tell it to process the request. '''
89 # capture the callback handle, clear it, then reset
90 # it after we've handled the request
91 handle = osrf.net.get_network_handle()
92 callback = handle.receive_callback
93 handle.set_receive_callback(None)
96 translator = HTTPTranslator(req)
97 status = translator.process()
98 osrf.log.log_debug("translator call resulted in status %d" % int(status))
99 if translator.local_xid:
102 handle.receive_callback = callback
106 class HTTPTranslator(object):
107 def __init__(self, apreq):
111 if OSRF_HTTP_HEADER_XID in apreq.headers_in:
112 osrf.log.log_debug('read XID from client %s' % apreq.headers_in.get(OSRF_HTTP_HEADER_XID))
113 osrf.log.set_xid(apreq.headers_in.get(OSRF_HTTP_HEADER_XID))
114 self.local_xid = False
117 osrf.log.log_debug('created new XID %s' % osrf.log.get_xid())
118 self.local_xid = True
120 if apreq.header_only:
123 for k,v in apreq.headers_in.iteritems():
124 osrf.log.log_internal('HEADER: %s = %s' % (k, v))
127 #post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
128 post = util.parse_qsl(apreq.read())
129 osrf.log.log_debug('post = ' + str(post))
130 self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
131 osrf.log.log_debug(self.body)
133 osrf.log.log_warn("error parsing osrf message: %s" % unicode(e))
138 self.complete = False
139 self.handle = osrf.net.get_network_handle()
141 self.recipient = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
142 self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
143 self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or \
144 "%s%s" % (os.getpid(), time.time())
145 self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
146 self.multipart = str(
147 apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
148 self.connect_only = False
149 self.disconnect_only = False
151 # generate a random multipart delimiter
153 mpart.update("%f%d%d" % (time.time(), os.getpid(), \
154 random.randint(100, 10000000)))
155 self.delim = mpart.hexdigest()
156 self.remote_host = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
157 self.cache = osrf.cache.CacheClient()
163 if self.apreq.header_only:
166 return apache.HTTP_BAD_REQUEST
167 if not self.set_to_addr():
168 return apache.HTTP_BAD_REQUEST
169 if not self.parse_request():
170 return apache.HTTP_BAD_REQUEST
172 while self.handle.recv(0):
173 pass # drop stale messages
176 net_msg = osrf.net.NetworkMessage(
177 recipient=self.recipient, thread=self.thread, body=self.body)
178 self.handle.send(net_msg)
180 if self.disconnect_only:
181 osrf.log.log_debug("exiting early on DISCONNECT")
185 while not self.complete:
189 net_msg = self.handle.recv(self.timeout)
190 except osrf.net.XMPPNoRecipient:
191 return apache.HTTP_NOT_FOUND
194 return apache.GATEWAY_TIME_OUT
196 if not self.check_status(net_msg):
200 self.init_headers(net_msg)
204 self.respond_chunk(net_msg)
205 if self.connect_only:
208 self.messages.append(net_msg.body)
210 # condense the sets of arrays into a single array of messages
211 if self.complete or self.connect_only:
212 json = self.messages.pop(0)
213 while len(self.messages) > 0:
214 msg = self.messages.pop(0)
215 json = "%s,%s" % (json[0:len(json)-1], msg[1:])
217 self.write("%s" % json)
222 def parse_request(self):
224 If this is solely a DISCONNECT message, we set self.disconnect_only
226 @return True if the body parses correctly, False otherwise
228 osrf_msgs = osrf.json.to_object(self.body)
232 if len(osrf_msgs) == 1:
233 if osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_CONNECT:
234 self.connect_only = True
235 elif osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
236 self.disconnect_only = True
241 def set_to_addr(self):
242 ''' Determines the TO address. Returns false if
243 the address is missing or ambiguous.
244 Also returns false if an explicit TO is specified and the
245 thread/IP/TO combination is not found in the session cache
249 osrf.log.log_warn("specifying both SERVICE and TO is not allowed")
251 self.recipient = "%s@%s/%s" % \
252 (ROUTER_NAME, OSRF_DOMAIN, self.service)
253 osrf.log.log_debug("set service to %s" % self.recipient)
257 # If the client specifies a specific TO address, verify it's
258 # the same address that was cached with the previous request.
259 obj = self.cache.get(self.thread)
260 if obj and obj['ip'] == self.remote_host and \
261 obj['jid'] == self.recipient:
263 osrf.log.log_warn("client [%s] attempted to send directly "
264 "[%s] without a session" % (self.remote_host, self.recipient))
268 def init_headers(self, net_msg):
269 osrf.log.log_debug("initializing request headers")
270 self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = net_msg.sender
271 self.apreq.headers_out[OSRF_HTTP_HEADER_THREAD] = self.thread
273 self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
274 self.write("--%s\n" % self.delim)
276 self.apreq.content_type = JSON_CONTENT_TYPE
277 self.cache.put(self.thread, \
278 {'ip':self.remote_host, 'jid': net_msg.sender}, CACHE_TIME)
280 osrf.log.log_debug("caching session [%s] for host [%s] and server "
281 " drone [%s]" % (self.thread, self.remote_host, net_msg.sender))
285 def check_status(self, net_msg):
286 ''' Checks the status of the server response.
287 If we received a timeout message, we drop it.
288 if it's any other non-continue status, we mark this session as
289 complete and carry on.
290 @return False if there is no data to return to the caller
291 (dropped message, eg. timeout), True otherwise '''
293 osrf.log.log_debug('checking status...')
294 osrf_msgs = osrf.json.to_object(net_msg.body)
295 last_msg = osrf_msgs.pop()
297 if last_msg.type() == OSRF_MESSAGE_TYPE_STATUS:
298 code = int(last_msg.payload().statusCode())
299 osrf.log.log_debug("got a status message with code %d" % code)
301 if code == OSRF_STATUS_TIMEOUT:
302 osrf.log.log_debug("removing cached session [%s] and "
303 "dropping TIMEOUT message" % net_msg.thread)
304 self.cache.delete(net_msg.thread)
307 if code != OSRF_STATUS_CONTINUE:
313 def respond_chunk(self, resp):
314 ''' Writes a single multipart-delimited chunk of data '''
316 self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
317 self.write("%s\n\n" % resp.body)
319 self.write("--%s--\n" % self.delim)
321 self.write("--%s\n" % self.delim)
324 def write(self, msg):
325 ''' Writes data to the client stream. '''
328 sys.stderr.write(msg)
330 self.apreq.write(msg)