]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/http_translator.py
b532f3fa9efdc7b03a85639e4cc001bff2221bc4
[OpenSRF.git] / src / python / osrf / http_translator.py
1 import os, time, md5, random
2 from mod_python import apache, util
3
4 import osrf.cache
5 import osrf.system
6 import osrf.json
7 import osrf.conf
8 import osrf.set
9 import sys
10 from osrf.const import *
11 from osrf.net import get_network_handle
12 import osrf.log
13
14
15 ''' 
16 Proof of concept OpenSRF-HTTP multipart streaming gateway.
17
18 Example Apache mod_python config:
19
20 <Location /osrf-http-translator>
21    SetHandler mod_python
22    PythonPath "['/path/to/osrf-python'] + sys.path"
23    PythonHandler osrf.http_translator
24    PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
25    PythonOption OSRF_CONFIG_CONTEXT gateway
26    # testing only
27    PythonAutoReload On
28 </Location>
29 '''
30
31
32 OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
33 OSRF_HTTP_HEADER_XID = 'X-OpenSRF-thread'
34 OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
35 OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
36 OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
37 OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
38 OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
39
40 MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
41 JSON_CONTENT_TYPE = 'text/plain'
42 CACHE_TIME = 300
43
44 ROUTER_NAME = None
45 OSRF_DOMAIN = None
46
47 # If DEBUG_WRITE = True, all data sent to the client is also written
48 # to stderr (apache error log)
49 DEBUG_WRITE = False
50
51 def _dbg(msg):
52     ''' testing only '''
53     sys.stderr.write("%s\n\n" % str(msg))
54     sys.stderr.flush()
55
56
57 INIT_COMPLETE = False
58 def child_init(req):
59     ''' At time of writing, mod_python doesn't support a child_init handler,
60         so this function is called once per process to initialize 
61         the opensrf connection '''
62
63     global INIT_COMPLETE, ROUTER_NAME, OSRF_DOMAIN
64     if INIT_COMPLETE: 
65         return
66
67     ops = req.get_options()
68     conf = ops['OSRF_CONFIG']
69     ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
70     osrf.system.connect(conf, ctxt)
71
72     ROUTER_NAME = osrf.conf.get('router_name')
73     OSRF_DOMAIN = osrf.conf.get('domains.domain')
74     INIT_COMPLETE = True
75
76     servers = osrf.set.get('cache.global.servers.server')
77     if not isinstance(servers, list):
78         servers = [servers]
79     osrf.cache.CacheClient.connect(servers)
80
81
82 def handler(req):
83     ''' Create the translator and tell it to process the request. '''
84     child_init(req)
85     return HTTPTranslator(req).process()
86
87 class HTTPTranslator(object):
88     def __init__(self, apreq):
89
90         self.apreq = apreq
91         if apreq.header_only: 
92             return
93
94         try:
95             post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
96             self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
97         except: 
98             self.body = None
99             return
100
101         self.messages = []
102         self.complete = False
103         self.handle = osrf.net.get_network_handle()
104         self.handle.set_receive_callback(None)
105
106         self.recipient = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
107         self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
108         self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or \
109             "%s%s" % (os.getpid(), time.time())
110         self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
111         self.multipart = str( \
112             apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
113         self.disconnect_only = False
114
115         # generate a random multipart delimiter
116         mpart = md5.new()
117         mpart.update("%f%d%d" % (time.time(), os.getpid(), \
118             random.randint(100, 10000000)))
119         self.delim = mpart.hexdigest()
120         self.remote_host = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
121         self.cache = osrf.cache.CacheClient()
122
123
124     def process(self):
125
126         if self.apreq.header_only: 
127             return apache.OK
128         if not self.body:
129             return apache.HTTP_BAD_REQUEST
130         if not self.set_to_addr():
131             return apache.HTTP_BAD_REQUEST
132         if not self.parse_request():
133             return apache.HTTP_BAD_REQUEST
134
135         while self.handle.recv(0):
136             pass # drop stale messages
137
138
139         net_msg = NetworkMessage(recipient=self.recipient, thread=self.thread, \
140             body=self.body)
141         self.handle.send(net_msg)
142
143         if self.disconnect_only:
144             osrf.log.logDebug("exiting early on DISCONNECT")
145             return apache.OK
146
147         first_write = True
148         while not self.complete:
149
150             net_msg = self.handle.recv(self.timeout)
151             if not net_msg: 
152                 return apache.GATEWAY_TIME_OUT
153
154             if not self.check_status(net_msg):
155                 continue 
156
157             if first_write:
158                 self.init_headers(net_msg)
159                 first_write = False
160
161             if self.multipart:
162                 self.respond_chunk(net_msg)
163             else:
164                 self.messages.append(net_msg.body)
165
166                 # condense the sets of arrays into a single array of messages
167                 if self.complete:
168                     json = self.messages.pop(0)
169                     while len(self.messages) > 0:
170                         msg = self.messages.pop(0)
171                         json = "%s,%s" % (json[0:len(json)-1], msg[1:])
172                         
173                     self.write("%s" % json)
174
175
176         return apache.OK
177
178     def parse_request(self):
179         '''
180         If this is solely a DISCONNECT message, we set self.disconnect_only
181         to true
182         @return True if the body parses correctly, False otherwise
183         '''
184         osrf_msgs = osrf.json.to_object(self.body)
185         if not osrf_msgs:
186             return False
187         
188         if len(osrf_msgs) == 1 and \
189             osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
190             self.disconnect_only = True
191
192         return True
193
194
195     def set_to_addr(self):
196         ''' Determines the TO address.  Returns false if 
197             the address is missing or ambiguous. 
198             Also returns false if an explicit TO is specified and the
199             thread/IP/TO combination is not found in the session cache
200             '''
201         if self.service:
202             if self.recipient:
203                 osrf.log.osrfLogWarn("specifying both SERVICE and TO is not allowed")
204                 return False
205             self.recipient = "%s@%s/%s" % \
206                 (ROUTER_NAME, OSRF_DOMAIN, self.service)
207             return True
208         else:
209             if self.recipient:
210                 # If the client specifies a specific TO address, verify it's
211                 # the same address that was cached with the previous request.  
212                 obj = self.cache.get(self.thread)
213                 if obj and obj['ip'] == self.remote_host and \
214                     obj['jid'] == self.recipient:
215                     return True
216         osrf.log.osrfLogWarn("client [%s] attempted to send directly "
217             "[%s] without a session" % (self.remote_host, self.recipient))
218         return False
219
220         
221     def init_headers(self, net_msg):
222         self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = net_msg.sender
223         if self.multipart:
224             self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
225             self.write("--%s\n" % self.delim)
226         else:
227             self.apreq.content_type = JSON_CONTENT_TYPE
228         self.cache.put(self.thread, \
229             {'ip':self.remote_host, 'jid': net_msg.sender}, CACHE_TIME)
230
231         osrf.log.logDebug("caching session [%s] for host [%s] and server "
232             " drone [%s]" % (self.thread, self.remote_host, net_msg.sender))
233
234
235
236     def check_status(self, net_msg): 
237         ''' Checks the status of the server response. 
238             If we received a timeout message, we drop it.
239             if it's any other non-continue status, we mark this session as
240             complete and carry on.
241             @return False if there is no data to return to the caller 
242             (dropped message, eg. timeout), True otherwise '''
243
244         osrf_msgs = osrf.json.to_object(net_msg.body)
245         last_msg = osrf_msgs.pop()
246
247         if last_msg.type() == OSRF_MESSAGE_TYPE_STATUS:
248             code = int(last_msg.payload().statusCode())
249
250             if code == OSRF_STATUS_TIMEOUT:
251                 osrf.log.logDebug("removing cached session [%s] and "
252                     "dropping TIMEOUT message" % net_msg.thread)
253                 self.cache.delete(net_msg.thread)
254                 return False 
255
256             if code != OSRF_STATUS_CONTINUE:
257                 self.complete = True
258
259         return True
260
261
262     def respond_chunk(self, resp):
263         ''' Writes a single multipart-delimited chunk of data '''
264
265         self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
266         self.write("%s\n\n" % resp.body)
267         if self.complete:
268             self.write("--%s--\n" % self.delim)
269         else:
270             self.write("--%s\n" % self.delim)
271         self.apreq.flush()
272
273     def write(self, msg):
274         ''' Writes data to the client stream. '''
275
276         if DEBUG_WRITE:
277             sys.stderr.write(msg)
278             sys.stderr.flush()
279         self.apreq.write(msg)
280             
281