557c422d4199c38c744e93d8a61f950aa3af00e2
[OpenSRF.git] / src / python / osrf / http_translator.py
1 import os, time, md5, random
2 from mod_python import apache, util
3
4 import osrf.cache
5 from osrf.system import osrfConnect
6 from osrf.json import osrfJSONToObject
7 from osrf.conf import osrfConfigValue
8 from osrf.set import osrfSettingsValue
9 from osrf.const import *
10 from osrf.net import *
11
12
13 ''' 
14 Proof of concept OpenSRF-HTTP multipart streaming gateway.
15
16 Example Apache mod_python config:
17
18 <Location /osrf-http-translator>
19    SetHandler mod_python
20    PythonPath "['/path/to/translator-dir'] + sys.path"
21    PythonHandler osrf.http_translator
22    PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
23    PythonOption OSRF_CONFIG_CONTEXT gateway
24    # testing only
25    PythonAutoReload On
26 </Location>
27 '''
28
29
30 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
31 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-thread'
32 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
33 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
34 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
35 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
36 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
37
38 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
39 JSON_CONTENT_TYPE = 'text/plain';
40 CACHE_TIME = 300
41
42 ROUTER_NAME = None
43 OSRF_DOMAIN = None
44
45 # If true, all data sent to the client is also written to stderr (apache error log)
46 DEBUG_WRITE = False
47
48 def _dbg(s):
49     ''' testing only '''
50     sys.stderr.write("%s\n\n" % str(s))
51     sys.stderr.flush()
52
53
54 initComplete = False
55 def childInit(req):
56     ''' At time of writing, mod_python doesn't support a childInit handler,
57         so this function is called once per process to initialize 
58         the opensrf connection '''
59
60     global initComplete, ROUTER_NAME, OSRF_DOMAIN
61     if initComplete: 
62         return
63
64     ops = req.get_options()
65     conf = ops['OSRF_CONFIG']
66     ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
67     osrfConnect(conf, ctxt)
68
69     ROUTER_NAME = osrfConfigValue('router_name')
70     OSRF_DOMAIN = osrfConfigValue('domains.domain')
71     initComplete = True
72
73     servers = osrfSettingsValue('cache.global.servers.server')
74     if not isinstance(servers, list):
75         servers = [servers]
76     osrf.cache.CacheClient.connect(servers)
77
78
79 def handler(req):
80     ''' Create the translator and tell it to process the request. '''
81     childInit(req)
82     return HTTPTranslator(req).process()
83
84 class HTTPTranslator(object):
85     def __init__(self, apreq):
86
87         self.apreq = apreq
88         if apreq.header_only: 
89             return
90
91         try:
92             post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
93             self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
94         except: 
95             self.body = None
96             return
97
98         self.messages = []
99         self.complete = False
100         self.handle = osrfGetNetworkHandle()
101         self.handle.setRecvCallback(None)
102
103         self.to = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
104         self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
105         self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or "%s%s" % (os.getpid(), time.time())
106         self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
107         self.multipart = str(apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
108
109         # generate a random multipart delimiter
110         m = md5.new()
111         m.update("%f%d%d" % (time.time(), os.getpid(), random.randint(100,10000000)))
112         self.delim = m.hexdigest()
113         self.remoteHost = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
114         self.cache = osrf.cache.CacheClient()
115
116
117     def process(self):
118
119         if self.apreq.header_only: 
120             return apache.OK
121         if not self.body:
122             return apache.HTTP_BAD_REQUEST
123         if not self.setToAddr():
124             return apache.HTTP_BAD_REQUEST
125
126         while self.handle.recv(0):
127             pass # drop stale messages
128
129         netMsg = osrfNetworkMessage(to=self.to, thread=self.thread, body=self.body)
130         self.handle.send(netMsg)
131
132         firstWrite = True
133         while not self.complete:
134
135             netMsg = self.handle.recv(self.timeout)
136             if not netMsg: 
137                 return apache.GATEWAY_TIME_OUT
138
139             if not self.checkStatus(netMsg):
140                 continue 
141
142             if firstWrite:
143                 self.initHeaders(netMsg)
144                 firstWrite = False
145
146             if self.multipart:
147                 self.respondChunk(netMsg)
148             else:
149                 self.messages.append(netMsg.body)
150
151                 if self.complete:
152
153                     # condense the sets of arrays into a single array of messages
154                     json = self.messages.pop(0)
155                     while len(self.messages) > 0:
156                         m = self.messages.pop(0)
157                         json = "%s,%s" % (json[0:len(json)-1], m[1:])
158                         
159                     self.write("%s" % json)
160
161
162         return apache.OK
163
164     def setToAddr(self):
165         ''' Determines the TO address.  Returns false if 
166             the address is missing or ambiguous. '''
167         if self.service:
168             if self.to:
169                 # specifying both a SERVICE and a TO is not allowed
170                 return False
171             self.to = "%s@%s/%s" % (ROUTER_NAME, OSRF_DOMAIN, self.service)
172             return True
173         else:
174             if self.to:
175                 # If the client specifies a specific TO address, verify it's the same
176                 # address that was cached with the previous request.  
177                 obj = self.cache.get(self.thread)
178                 if obj and obj['ip'] == self.remoteHost and obj['jid'] == self.to:
179                     return True
180         return False
181
182         
183     def initHeaders(self, netMsg):
184         self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = netMsg.sender
185         if self.multipart:
186             self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
187             self.write("--%s\n" % self.delim)
188         else:
189             self.apreq.content_type = JSON_CONTENT_TYPE
190         self.cache.put(self.thread, {'ip':self.remoteHost, 'jid': netMsg.sender}, CACHE_TIME)
191
192
193
194     def checkStatus(self, netMsg): 
195         ''' Checks the status of the server response. 
196             If we received a timeout message, we drop it.
197             if it's any other non-continue status, we mark this session as
198             complete and carry on.
199             @return False if there is no data to return to the caller 
200             (dropped message, eg. timeout), True otherwise '''
201
202         osrfMsgs = osrfJSONToObject(netMsg.body)
203         lastMsg = osrfMsgs.pop()
204
205         if lastMsg.type() == OSRF_MESSAGE_TYPE_STATUS:
206             code = int(lastMsg.payload().statusCode())
207
208             if code == OSRF_STATUS_TIMEOUT:
209                 # remove any existing thread cache for this session and drop the message
210                 self.cache.delete(netMsg.thread)
211                 return False 
212
213             if code != OSRF_STATUS_CONTINUE:
214                 self.complete = True
215
216         return True
217
218
219     def respondChunk(self, resp):
220         ''' Writes a single multipart-delimited chunk of data '''
221
222         self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
223         self.write("%s\n\n" % resp.body)
224         if self.complete:
225             self.write("--%s--\n" % self.delim)
226         else:
227             self.write("--%s\n" % self.delim)
228         self.apreq.flush()
229
230     def write(self, msg):
231         ''' Writes data to the client stream. '''
232
233         if DEBUG_WRITE:
234             sys.stderr.write(msg)
235             sys.stderr.flush()
236         self.apreq.write(msg)
237             
238