No longer globally setting the network recv_callback handler to None
[OpenSRF.git] / src / python / osrf / http_translator.py
1 import sys, os, time, md5, random
2 from mod_python import apache, util
3 import osrf.system, osrf.cache, osrf.json, osrf.conf, osrf.net, osrf.log
4 from osrf.const import OSRF_MESSAGE_TYPE_DISCONNECT, OSRF_MESSAGE_TYPE_CONNECT, \
5     OSRF_STATUS_CONTINUE, OSRF_STATUS_TIMEOUT, OSRF_MESSAGE_TYPE_STATUS
6
7
8 ''' 
9 Proof of concept OpenSRF-HTTP multipart streaming gateway.
10
11 Example Apache mod_python config:
12
13 <Location /osrf-http-translator>
14    SetHandler mod_python
15    PythonPath "['/path/to/osrf-python'] + sys.path"
16    PythonHandler osrf.http_translator
17    PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
18    PythonOption OSRF_CONFIG_CONTEXT config.gateway
19    PythonOption OSRF_CACHE_SERVERS 127.0.0.1:11211
20    # testing only
21    PythonAutoReload On
22 </Location>
23 '''
24
25 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
26 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-xid'
27 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
28 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
29 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
30 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
31 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
32 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
33 JSON_CONTENT_TYPE = 'text/plain'
34 CACHE_TIME = 300
35
36 ROUTER_NAME = None
37 OSRF_DOMAIN = None
38
39 # If DEBUG_WRITE = True, all data sent to the client is also written
40 # to stderr (apache error log)
41 DEBUG_WRITE = False
42
43 def _dbg(msg):
44     ''' testing only '''
45     sys.stderr.write("%s\n\n" % str(msg))
46     sys.stderr.flush()
47
48
49 INIT_COMPLETE = False
50 def child_init(req):
51     ''' At time of writing, mod_python doesn't support a child_init handler,
52         so this function is called once per process to initialize 
53         the opensrf connection '''
54
55     global INIT_COMPLETE, ROUTER_NAME, OSRF_DOMAIN
56     if INIT_COMPLETE: 
57         return
58
59     # Apache complains with: UnboundLocalError: local variable 'osrf' referenced before assignment
60     # if the following import line is removed, even though its also at the top of the file...
61     import osrf.system 
62
63     ops = req.get_options()
64     conf = ops['OSRF_CONFIG']
65     ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
66     osrf.system.System.net_connect(config_file=conf, config_context=ctxt)
67
68     ROUTER_NAME = osrf.conf.get('router_name')
69     OSRF_DOMAIN = osrf.conf.get('domain')
70     INIT_COMPLETE = True
71
72     servers = ops.get('OSRF_CACHE_SERVERS')
73     if servers:
74         servers = servers.split(',')
75     else:
76         # no cache servers configured, see if we can talk to the settings server
77         import osrf.set
78         servers = osrf.set.get('cache.global.servers.server')
79         if not isinstance(servers, list):
80             servers = [servers]
81
82     osrf.cache.CacheClient.connect(servers)
83
84
85 def handler(req):
86     ''' Create the translator and tell it to process the request. '''
87     child_init(req)
88
89     # capture the callback handle, clear it, then reset 
90     # it after we've handled the request
91     handle = osrf.net.get_network_handle()
92     callback = handle.receive_callback
93     handle.set_receive_callback(None)
94
95     try:
96         translator = HTTPTranslator(req)
97         status = translator.process()
98         osrf.log.log_debug("translator call resulted in status %d" % int(status))
99         if translator.local_xid:
100             osrf.log.clear_xid()
101     finally:
102         handle.receive_callback = callback
103         
104     return status
105
106 class HTTPTranslator(object):
107     def __init__(self, apreq):
108
109         self.apreq = apreq
110
111         if OSRF_HTTP_HEADER_XID in apreq.headers_in:
112             osrf.log.log_debug('read XID from client %s' % apreq.headers_in.get(OSRF_HTTP_HEADER_XID))
113             osrf.log.set_xid(apreq.headers_in.get(OSRF_HTTP_HEADER_XID))
114             self.local_xid = False
115         else:
116             osrf.log.make_xid()
117             osrf.log.log_debug('created new XID %s' % osrf.log.get_xid())
118             self.local_xid = True
119
120         if apreq.header_only: 
121             return
122
123         for k,v in apreq.headers_in.iteritems():
124             osrf.log.log_internal('HEADER: %s = %s' % (k, v))
125
126         try:
127             #post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
128             post = util.parse_qsl(apreq.read())
129             osrf.log.log_debug('post = ' + str(post))
130             self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
131             osrf.log.log_debug(self.body)
132         except Exception, e: 
133             osrf.log.log_warn("error parsing osrf message: %s" % unicode(e))
134             self.body = None
135             return
136
137         self.messages = []
138         self.complete = False
139         self.handle = osrf.net.get_network_handle()
140
141         self.recipient = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
142         self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
143         self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or \
144             "%s%s" % (os.getpid(), time.time())
145         self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
146         self.multipart = str(
147             apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
148         self.connect_only = False
149         self.disconnect_only = False
150
151         # generate a random multipart delimiter
152         mpart = md5.new()
153         mpart.update("%f%d%d" % (time.time(), os.getpid(), \
154             random.randint(100, 10000000)))
155         self.delim = mpart.hexdigest()
156         self.remote_host = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
157         self.cache = osrf.cache.CacheClient()
158
159
160
161     def process(self):
162
163         if self.apreq.header_only: 
164             return apache.OK
165         if not self.body:
166             return apache.HTTP_BAD_REQUEST
167         if not self.set_to_addr():
168             return apache.HTTP_BAD_REQUEST
169         if not self.parse_request():
170             return apache.HTTP_BAD_REQUEST
171
172         while self.handle.recv(0):
173             pass # drop stale messages
174
175
176         net_msg = osrf.net.NetworkMessage(
177             recipient=self.recipient, thread=self.thread, body=self.body)
178         self.handle.send(net_msg)
179
180         if self.disconnect_only:
181             osrf.log.log_debug("exiting early on DISCONNECT")
182             return apache.OK
183
184         first_write = True
185         while not self.complete:
186
187             net_msg = None
188             try:
189                 net_msg = self.handle.recv(self.timeout)
190             except osrf.net.XMPPNoRecipient:
191                 return apache.HTTP_NOT_FOUND 
192
193             if not net_msg: 
194                 return apache.GATEWAY_TIME_OUT
195
196             if not self.check_status(net_msg):
197                 continue 
198
199             if first_write:
200                 self.init_headers(net_msg)
201                 first_write = False
202
203             if self.multipart:
204                 self.respond_chunk(net_msg)
205                 if self.connect_only:
206                     break
207             else:
208                 self.messages.append(net_msg.body)
209
210                 # condense the sets of arrays into a single array of messages
211                 if self.complete or self.connect_only:
212                     json = self.messages.pop(0)
213                     while len(self.messages) > 0:
214                         msg = self.messages.pop(0)
215                         json = "%s,%s" % (json[0:len(json)-1], msg[1:])
216                         
217                     self.write("%s" % json)
218
219
220         return apache.OK
221
222     def parse_request(self):
223         '''
224         If this is solely a DISCONNECT message, we set self.disconnect_only
225         to true
226         @return True if the body parses correctly, False otherwise
227         '''
228         osrf_msgs = osrf.json.to_object(self.body)
229         if not osrf_msgs:
230             return False
231         
232         if len(osrf_msgs) == 1:
233             if osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_CONNECT:
234                 self.connect_only = True
235             elif osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
236                 self.disconnect_only = True
237
238         return True
239
240
241     def set_to_addr(self):
242         ''' Determines the TO address.  Returns false if 
243             the address is missing or ambiguous. 
244             Also returns false if an explicit TO is specified and the
245             thread/IP/TO combination is not found in the session cache
246             '''
247         if self.service:
248             if self.recipient:
249                 osrf.log.log_warn("specifying both SERVICE and TO is not allowed")
250                 return False
251             self.recipient = "%s@%s/%s" % \
252                 (ROUTER_NAME, OSRF_DOMAIN, self.service)
253             osrf.log.log_debug("set service to %s" % self.recipient)
254             return True
255         else:
256             if self.recipient:
257                 # If the client specifies a specific TO address, verify it's
258                 # the same address that was cached with the previous request.  
259                 obj = self.cache.get(self.thread)
260                 if obj and obj['ip'] == self.remote_host and \
261                     obj['jid'] == self.recipient:
262                     return True
263         osrf.log.log_warn("client [%s] attempted to send directly "
264             "[%s] without a session" % (self.remote_host, self.recipient))
265         return False
266
267         
268     def init_headers(self, net_msg):
269         osrf.log.log_debug("initializing request headers")
270         self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = net_msg.sender
271         self.apreq.headers_out[OSRF_HTTP_HEADER_THREAD] = self.thread
272         if self.multipart:
273             self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
274             self.write("--%s\n" % self.delim)
275         else:
276             self.apreq.content_type = JSON_CONTENT_TYPE
277         self.cache.put(self.thread, \
278             {'ip':self.remote_host, 'jid': net_msg.sender}, CACHE_TIME)
279
280         osrf.log.log_debug("caching session [%s] for host [%s] and server "
281             " drone [%s]" % (self.thread, self.remote_host, net_msg.sender))
282
283
284
285     def check_status(self, net_msg): 
286         ''' Checks the status of the server response. 
287             If we received a timeout message, we drop it.
288             if it's any other non-continue status, we mark this session as
289             complete and carry on.
290             @return False if there is no data to return to the caller 
291             (dropped message, eg. timeout), True otherwise '''
292
293         osrf.log.log_debug('checking status...')
294         osrf_msgs = osrf.json.to_object(net_msg.body)
295         last_msg = osrf_msgs.pop()
296
297         if last_msg.type() == OSRF_MESSAGE_TYPE_STATUS:
298             code = int(last_msg.payload().statusCode())
299             osrf.log.log_debug("got a status message with code %d" % code)
300
301             if code == OSRF_STATUS_TIMEOUT:
302                 osrf.log.log_debug("removing cached session [%s] and "
303                     "dropping TIMEOUT message" % net_msg.thread)
304                 self.cache.delete(net_msg.thread)
305                 return False 
306
307             if code != OSRF_STATUS_CONTINUE:
308                 self.complete = True
309
310         return True
311
312
313     def respond_chunk(self, resp):
314         ''' Writes a single multipart-delimited chunk of data '''
315
316         self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
317         self.write("%s\n\n" % resp.body)
318         if self.complete:
319             self.write("--%s--\n" % self.delim)
320         else:
321             self.write("--%s\n" % self.delim)
322         self.apreq.flush()
323
324     def write(self, msg):
325         ''' Writes data to the client stream. '''
326
327         if DEBUG_WRITE:
328             sys.stderr.write(msg)
329             sys.stderr.flush()
330         self.apreq.write(msg)
331             
332