Continue the march towards a pedantic 1.0 python API.
[OpenSRF.git] / src / python / osrf / http_translator.py
1 import os, time, md5, random
2 from mod_python import apache, util
3
4 import osrf.cache
5 import osrf.system
6 import osrf.json
7 import osrf.conf
8 import osrf.set
9 import sys
10 from osrf.const import OSRF_MESSAGE_TYPE_DISCONNECT, OSRF_STATUS_CONTINUE, \
11     OSRF_STATUS_TIMEOUT, OSRF_MESSAGE_TYPE_STATUS,
12 import osrf.net
13 import osrf.log
14
15
16 ''' 
17 Proof of concept OpenSRF-HTTP multipart streaming gateway.
18
19 Example Apache mod_python config:
20
21 <Location /osrf-http-translator>
22    SetHandler mod_python
23    PythonPath "['/path/to/osrf-python'] + sys.path"
24    PythonHandler osrf.http_translator
25    PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
26    PythonOption OSRF_CONFIG_CONTEXT gateway
27    # testing only
28    PythonAutoReload On
29 </Location>
30 '''
31
32
33 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
34 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-thread'
35 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
36 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
37 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
38 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
39 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
40
41 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
42 JSON_CONTENT_TYPE = 'text/plain'
43 CACHE_TIME = 300
44
45 ROUTER_NAME = None
46 OSRF_DOMAIN = None
47
48 # If DEBUG_WRITE = True, all data sent to the client is also written
49 # to stderr (apache error log)
50 DEBUG_WRITE = False
51
52 def _dbg(msg):
53     ''' testing only '''
54     sys.stderr.write("%s\n\n" % str(msg))
55     sys.stderr.flush()
56
57
58 INIT_COMPLETE = False
59 def child_init(req):
60     ''' At time of writing, mod_python doesn't support a child_init handler,
61         so this function is called once per process to initialize 
62         the opensrf connection '''
63
64     global INIT_COMPLETE, ROUTER_NAME, OSRF_DOMAIN
65     if INIT_COMPLETE: 
66         return
67
68     ops = req.get_options()
69     conf = ops['OSRF_CONFIG']
70     ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
71     osrf.system.connect(conf, ctxt)
72
73     ROUTER_NAME = osrf.conf.get('router_name')
74     OSRF_DOMAIN = osrf.conf.get('domains.domain')
75     INIT_COMPLETE = True
76
77     servers = osrf.set.get('cache.global.servers.server')
78     if not isinstance(servers, list):
79         servers = [servers]
80     osrf.cache.CacheClient.connect(servers)
81
82
83 def handler(req):
84     ''' Create the translator and tell it to process the request. '''
85     child_init(req)
86     return HTTPTranslator(req).process()
87
88 class HTTPTranslator(object):
89     def __init__(self, apreq):
90
91         self.apreq = apreq
92         if apreq.header_only: 
93             return
94
95         try:
96             post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
97             self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
98         except: 
99             self.body = None
100             return
101
102         self.messages = []
103         self.complete = False
104         self.handle = osrf.net.get_network_handle()
105         self.handle.set_receive_callback(None)
106
107         self.recipient = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
108         self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
109         self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or \
110             "%s%s" % (os.getpid(), time.time())
111         self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
112         self.multipart = str( \
113             apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
114         self.disconnect_only = False
115
116         # generate a random multipart delimiter
117         mpart = md5.new()
118         mpart.update("%f%d%d" % (time.time(), os.getpid(), \
119             random.randint(100, 10000000)))
120         self.delim = mpart.hexdigest()
121         self.remote_host = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
122         self.cache = osrf.cache.CacheClient()
123
124
125     def process(self):
126
127         if self.apreq.header_only: 
128             return apache.OK
129         if not self.body:
130             return apache.HTTP_BAD_REQUEST
131         if not self.set_to_addr():
132             return apache.HTTP_BAD_REQUEST
133         if not self.parse_request():
134             return apache.HTTP_BAD_REQUEST
135
136         while self.handle.recv(0):
137             pass # drop stale messages
138
139
140         net_msg = NetworkMessage(recipient=self.recipient, thread=self.thread, \
141             body=self.body, locale=self.locale)
142         self.handle.send(net_msg)
143
144         if self.disconnect_only:
145             osrf.log.log_debug("exiting early on DISCONNECT")
146             return apache.OK
147
148         first_write = True
149         while not self.complete:
150
151             net_msg = self.handle.recv(self.timeout)
152             if not net_msg: 
153                 return apache.GATEWAY_TIME_OUT
154
155             if not self.check_status(net_msg):
156                 continue 
157
158             if first_write:
159                 self.init_headers(net_msg)
160                 first_write = False
161
162             if self.multipart:
163                 self.respond_chunk(net_msg)
164             else:
165                 self.messages.append(net_msg.body)
166
167                 # condense the sets of arrays into a single array of messages
168                 if self.complete:
169                     json = self.messages.pop(0)
170                     while len(self.messages) > 0:
171                         msg = self.messages.pop(0)
172                         json = "%s,%s" % (json[0:len(json)-1], msg[1:])
173                         
174                     self.write("%s" % json)
175
176
177         return apache.OK
178
179     def parse_request(self):
180         '''
181         If this is solely a DISCONNECT message, we set self.disconnect_only
182         to true
183         @return True if the body parses correctly, False otherwise
184         '''
185         osrf_msgs = osrf.json.to_object(self.body)
186         if not osrf_msgs:
187             return False
188         
189         if len(osrf_msgs) == 1 and \
190             osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
191             self.disconnect_only = True
192
193         return True
194
195
196     def set_to_addr(self):
197         ''' Determines the TO address.  Returns false if 
198             the address is missing or ambiguous. 
199             Also returns false if an explicit TO is specified and the
200             thread/IP/TO combination is not found in the session cache
201             '''
202         if self.service:
203             if self.recipient:
204                 osrf.log.log_warn("specifying both SERVICE and TO is not allowed")
205                 return False
206             self.recipient = "%s@%s/%s" % \
207                 (ROUTER_NAME, OSRF_DOMAIN, self.service)
208             return True
209         else:
210             if self.recipient:
211                 # If the client specifies a specific TO address, verify it's
212                 # the same address that was cached with the previous request.  
213                 obj = self.cache.get(self.thread)
214                 if obj and obj['ip'] == self.remote_host and \
215                     obj['jid'] == self.recipient:
216                     return True
217         osrf.log.log_warn("client [%s] attempted to send directly "
218             "[%s] without a session" % (self.remote_host, self.recipient))
219         return False
220
221         
222     def init_headers(self, net_msg):
223         self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = net_msg.sender
224         if self.multipart:
225             self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
226             self.write("--%s\n" % self.delim)
227         else:
228             self.apreq.content_type = JSON_CONTENT_TYPE
229         self.cache.put(self.thread, \
230             {'ip':self.remote_host, 'jid': net_msg.sender}, CACHE_TIME)
231
232         osrf.log.log_debug("caching session [%s] for host [%s] and server "
233             " drone [%s]" % (self.thread, self.remote_host, net_msg.sender))
234
235
236
237     def check_status(self, net_msg): 
238         ''' Checks the status of the server response. 
239             If we received a timeout message, we drop it.
240             if it's any other non-continue status, we mark this session as
241             complete and carry on.
242             @return False if there is no data to return to the caller 
243             (dropped message, eg. timeout), True otherwise '''
244
245         osrf_msgs = osrf.json.to_object(net_msg.body)
246         last_msg = osrf_msgs.pop()
247
248         if last_msg.type() == OSRF_MESSAGE_TYPE_STATUS:
249             code = int(last_msg.payload().statusCode())
250
251             if code == OSRF_STATUS_TIMEOUT:
252                 osrf.log.log_debug("removing cached session [%s] and "
253                     "dropping TIMEOUT message" % net_msg.thread)
254                 self.cache.delete(net_msg.thread)
255                 return False 
256
257             if code != OSRF_STATUS_CONTINUE:
258                 self.complete = True
259
260         return True
261
262
263     def respond_chunk(self, resp):
264         ''' Writes a single multipart-delimited chunk of data '''
265
266         self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
267         self.write("%s\n\n" % resp.body)
268         if self.complete:
269             self.write("--%s--\n" % self.delim)
270         else:
271             self.write("--%s\n" % self.delim)
272         self.apreq.flush()
273
274     def write(self, msg):
275         ''' Writes data to the client stream. '''
276
277         if DEBUG_WRITE:
278             sys.stderr.write(msg)
279             sys.stderr.flush()
280         self.apreq.write(msg)
281             
282