parsing requests so the translator can return OK and stop waiting on DISCONNECT messa...
[OpenSRF.git] / src / python / osrf / http_translator.py
1 import os, time, md5, random
2 from mod_python import apache, util
3
4 import osrf.cache
5 from osrf.system import osrfConnect
6 from osrf.json import osrfJSONToObject
7 from osrf.conf import osrfConfigValue
8 from osrf.set import osrfSettingsValue
9 from osrf.const import *
10 from osrf.net import *
11 from osrf.log import *
12
13
14 ''' 
15 Proof of concept OpenSRF-HTTP multipart streaming gateway.
16
17 Example Apache mod_python config:
18
19 <Location /osrf-http-translator>
20    SetHandler mod_python
21    PythonPath "['/path/to/osrf-python'] + sys.path"
22    PythonHandler osrf.http_translator
23    PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
24    PythonOption OSRF_CONFIG_CONTEXT gateway
25    # testing only
26    PythonAutoReload On
27 </Location>
28 '''
29
30
31 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
32 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-thread'
33 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
34 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
35 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
36 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
37 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
38
39 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
40 JSON_CONTENT_TYPE = 'text/plain';
41 CACHE_TIME = 300
42
43 ROUTER_NAME = None
44 OSRF_DOMAIN = None
45
46 # If true, all data sent to the client is also written to stderr (apache error log)
47 DEBUG_WRITE = False
48
49 def _dbg(s):
50     ''' testing only '''
51     sys.stderr.write("%s\n\n" % str(s))
52     sys.stderr.flush()
53
54
55 initComplete = False
56 def childInit(req):
57     ''' At time of writing, mod_python doesn't support a childInit handler,
58         so this function is called once per process to initialize 
59         the opensrf connection '''
60
61     global initComplete, ROUTER_NAME, OSRF_DOMAIN
62     if initComplete: 
63         return
64
65     ops = req.get_options()
66     conf = ops['OSRF_CONFIG']
67     ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
68     osrfConnect(conf, ctxt)
69
70     ROUTER_NAME = osrfConfigValue('router_name')
71     OSRF_DOMAIN = osrfConfigValue('domains.domain')
72     initComplete = True
73
74     servers = osrfSettingsValue('cache.global.servers.server')
75     if not isinstance(servers, list):
76         servers = [servers]
77     osrf.cache.CacheClient.connect(servers)
78
79
80 def handler(req):
81     ''' Create the translator and tell it to process the request. '''
82     childInit(req)
83     return HTTPTranslator(req).process()
84
85 class HTTPTranslator(object):
86     def __init__(self, apreq):
87
88         self.apreq = apreq
89         if apreq.header_only: 
90             return
91
92         try:
93             post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
94             self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
95         except: 
96             self.body = None
97             return
98
99         self.messages = []
100         self.complete = False
101         self.handle = osrfGetNetworkHandle()
102         self.handle.setRecvCallback(None)
103
104         self.to = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
105         self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
106         self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or "%s%s" % (os.getpid(), time.time())
107         self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
108         self.multipart = str(apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
109         self.disconnectOnly = False
110
111         # generate a random multipart delimiter
112         m = md5.new()
113         m.update("%f%d%d" % (time.time(), os.getpid(), random.randint(100,10000000)))
114         self.delim = m.hexdigest()
115         self.remoteHost = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
116         self.cache = osrf.cache.CacheClient()
117
118
119     def process(self):
120
121         if self.apreq.header_only: 
122             return apache.OK
123         if not self.body:
124             return apache.HTTP_BAD_REQUEST
125         if not self.setToAddr():
126             return apache.HTTP_BAD_REQUEST
127         if not self.parseRequest():
128             return apache.HTTP_BAD_REQUEST
129
130         while self.handle.recv(0):
131             pass # drop stale messages
132
133
134         netMsg = osrfNetworkMessage(to=self.to, thread=self.thread, body=self.body)
135         self.handle.send(netMsg)
136
137         if self.disconnectOnly:
138             osrfLogDebug("exiting early on DISCONNECT")
139             return apache.OK
140
141         firstWrite = True
142         while not self.complete:
143
144             netMsg = self.handle.recv(self.timeout)
145             if not netMsg: 
146                 return apache.GATEWAY_TIME_OUT
147
148             if not self.checkStatus(netMsg):
149                 continue 
150
151             if firstWrite:
152                 self.initHeaders(netMsg)
153                 firstWrite = False
154
155             if self.multipart:
156                 self.respondChunk(netMsg)
157             else:
158                 self.messages.append(netMsg.body)
159
160                 if self.complete:
161
162                     # condense the sets of arrays into a single array of messages
163                     json = self.messages.pop(0)
164                     while len(self.messages) > 0:
165                         m = self.messages.pop(0)
166                         json = "%s,%s" % (json[0:len(json)-1], m[1:])
167                         
168                     self.write("%s" % json)
169
170
171         return apache.OK
172
173     def parseRequest(self):
174         ''' If this is solely a DISCONNECT message, we set self.disconnectOnly to true
175             @return True if the body parses correctly, False otherwise
176         '''
177         osrfMsgs = osrfJSONToObject(self.body)
178         if not osrfMsgs:
179             return False
180         
181         if len(osrfMsgs) == 1 and osrfMsgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
182             self.disconnectOnly = True
183
184         return True
185
186
187     def setToAddr(self):
188         ''' Determines the TO address.  Returns false if 
189             the address is missing or ambiguous. 
190             Also returns false if an explicit TO is specified and the
191             thread/IP/TO combination is not found in the session cache
192             '''
193         if self.service:
194             if self.to:
195                 osrfLogWarn("specifying both SERVICE and TO is not allowed")
196                 return False
197             self.to = "%s@%s/%s" % (ROUTER_NAME, OSRF_DOMAIN, self.service)
198             return True
199         else:
200             if self.to:
201                 # If the client specifies a specific TO address, verify it's the same
202                 # address that was cached with the previous request.  
203                 obj = self.cache.get(self.thread)
204                 if obj and obj['ip'] == self.remoteHost and obj['jid'] == self.to:
205                     return True
206         osrfLogWarn("client [%s] attempted to send directly [%s] without a session" % (self.remoteHost, self.to))
207         return False
208
209         
210     def initHeaders(self, netMsg):
211         self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = netMsg.sender
212         if self.multipart:
213             self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
214             self.write("--%s\n" % self.delim)
215         else:
216             self.apreq.content_type = JSON_CONTENT_TYPE
217         self.cache.put(self.thread, {'ip':self.remoteHost, 'jid': netMsg.sender}, CACHE_TIME)
218
219         osrfLogDebug("caching session [%s] for host [%s] and server drone [%s]" % (
220             self.thread, self.remoteHost, netMsg.sender))
221
222
223
224     def checkStatus(self, netMsg): 
225         ''' Checks the status of the server response. 
226             If we received a timeout message, we drop it.
227             if it's any other non-continue status, we mark this session as
228             complete and carry on.
229             @return False if there is no data to return to the caller 
230             (dropped message, eg. timeout), True otherwise '''
231
232         osrfMsgs = osrfJSONToObject(netMsg.body)
233         lastMsg = osrfMsgs.pop()
234
235         if lastMsg.type() == OSRF_MESSAGE_TYPE_STATUS:
236             code = int(lastMsg.payload().statusCode())
237
238             if code == OSRF_STATUS_TIMEOUT:
239                 osrfLogDebug("removing cached session [%s] and dropping TIMEOUT message" % netMsg.thread)
240                 self.cache.delete(netMsg.thread)
241                 return False 
242
243             if code != OSRF_STATUS_CONTINUE:
244                 self.complete = True
245
246         return True
247
248
249     def respondChunk(self, resp):
250         ''' Writes a single multipart-delimited chunk of data '''
251
252         self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
253         self.write("%s\n\n" % resp.body)
254         if self.complete:
255             self.write("--%s--\n" % self.delim)
256         else:
257             self.write("--%s\n" % self.delim)
258         self.apreq.flush()
259
260     def write(self, msg):
261         ''' Writes data to the client stream. '''
262
263         if DEBUG_WRITE:
264             sys.stderr.write(msg)
265             sys.stderr.flush()
266         self.apreq.write(msg)
267             
268