using a static buffer to store the multipart content type string
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
16 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
17 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
18 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
19
20 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
21 #define JSON_CONTENT_TYPE "text/plain"
22 #define CACHE_TIME 300
23
24 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
25 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
26 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
27 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
28 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
29 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
30 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
31 #define MAX_MSGS_PER_PACKET 256
32
33 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
34 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
35 char* routerName = NULL;
36 char* domainName = NULL;
37 int osrfConnected = 0;
38 char recipientBuf[128];
39 char contentTypeBuf[80];
40
41 // for development only, writes to apache error log
42 static void _dbg(char* s, ...) {
43     VA_LIST_TO_STRING(s);
44     fprintf(stderr, "%s\n", VA_BUF);
45     fflush(stderr);
46 }
47
48 // Translator struct
49 typedef struct {
50     request_rec* apreq;
51     transport_client* handle;
52     osrfList* messages;
53     char* body;
54     char* delim;
55     const char* recipient;
56     const char* service;
57     const char* thread;
58     const char* remoteHost;
59     int complete;
60     int timeout;
61     int multipart;
62     int connectOnly;
63     int disconnectOnly;
64     int localXid;
65 } osrfHttpTranslator;
66
67 /*
68  * Constructs a new translator object based on the current apache 
69  * request_rec.  Reads the request body and headers.
70  */
71 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
72     osrfHttpTranslator* trans;
73     OSRF_MALLOC(trans, sizeof(osrfHttpTranslator));
74     trans->apreq = apreq;
75     trans->complete = 0;
76     trans->connectOnly = 0;
77     trans->disconnectOnly = 0;
78     trans->remoteHost = apreq->connection->remote_ip;
79     trans->messages = NULL;
80
81     /* load the message body */
82         osrfStringArray* params = apacheParseParms(apreq);
83     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
84     osrfStringArrayFree(params);
85
86     /* load the request headers */
87     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID) ) {
88         trans->localXid = 0;
89     } else {
90         trans->localXid = 1;
91     }
92
93     trans->handle = osrfSystemGetTransportClient();
94     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
95     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
96     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */
97
98     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
99     if(timeout) 
100         trans->timeout = atoi(timeout);
101     else 
102         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
103
104     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
105     if(multipart && !strcasecmp(multipart, "true"))
106         trans->multipart = 1;
107     else
108         trans->multipart = 0;
109
110     char buf[32];
111     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
112     trans->delim = md5sum(buf);
113
114     return trans;
115 }
116
117 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
118     if(!trans) return;
119     if(trans->body)
120         free(trans->body);
121     if(trans->delim)
122         free(trans->delim);
123     osrfListFree(trans->messages);
124     free(trans);
125 }
126
127 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
128     _dbg("-----------------------------------");
129     _dbg("body = %s", trans->body);
130     _dbg("service = %s", trans->service);
131     _dbg("thread = %s", trans->thread);
132     _dbg("multipart = %d", trans->multipart);
133     _dbg("recipient = %s", trans->recipient);
134 }
135
136 /**
137  * Determines the correct recipient address based on the requested 
138  * service or recipient address.  
139  */
140 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
141     int stat = 0;
142     jsonObject* sessionCache = NULL;
143
144     if(trans->service) {
145         if(trans->recipient) {
146             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
147
148         } else {
149             // service is specified, build a recipient address 
150             // from the router, domain, and service
151             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
152             recipientBuf[size] = '\0';
153             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
154             trans->recipient = recipientBuf;
155             stat = 1;
156         }
157
158     } else {
159
160         if(trans->recipient) {
161             sessionCache = osrfCacheGetObject(trans->thread);
162
163             if(sessionCache) {
164                 char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip"));
165                 char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid"));
166
167                 // choosing a specific recipient address requires that the recipient and 
168                 // thread be cached on the server (so drone processes cannot be hijacked)
169                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
170                     osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s", 
171                         trans->remoteHost, trans->recipient);
172                     stat = 1;
173                     trans->service = jsonObjectGetString(jsonObjectGetKey(sessionCache, "service"));
174
175                 } else {
176                     osrfLogError(OSRF_LOG_MARK, 
177                         "Session cache for thread %s does not match request", trans->thread);
178                 }
179             }  else {
180                 osrfLogError(OSRF_LOG_MARK, 
181                     "attempt to send directly to %s without a session", trans->recipient);
182             }
183         } else {
184             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
185         } 
186     }
187
188     jsonObjectFree(sessionCache);
189     return stat;
190 }
191
192 /**
193  * Parses the request body and logs any REQUEST messages to the activity log
194  */
195 static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
196     osrfMessage* msg;
197     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
198     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
199     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
200
201     if(numMsgs == 0)
202         return 0;
203
204     if(numMsgs == 1) {
205         msg = msgList[0];
206         if(msg->m_type == CONNECT) {
207             trans->connectOnly = 1;
208             return 1;
209         }
210         if(msg->m_type == DISCONNECT) {
211             trans->disconnectOnly = 1;
212             return 1;
213         }
214     }
215
216     // log request messages to the activity log
217     int i;
218     for(i = 0; i < numMsgs; i++) {
219         msg = msgList[i];
220         if(msg->m_type == REQUEST) {
221
222             jsonObject* params = msg->_params;
223             growing_buffer* act = buffer_init(128);     
224             buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
225
226             char* str; 
227             int i = 0;
228             while((str = jsonObjectGetString(jsonObjectGetIndex(params, i++)))) {
229                 if( i == 1 )
230                     OSRF_BUFFER_ADD(act, " ");
231                 else 
232                     OSRF_BUFFER_ADD(act, ", ");
233                 OSRF_BUFFER_ADD(act, str);
234             }
235             osrfLogActivity(OSRF_LOG_MARK, act->buf);
236             buffer_free(act);
237         }
238     }
239
240     return 1;
241 }
242
243 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
244     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
245     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
246     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
247     if(numMsgs == 0) return 0;
248
249     osrfMessage* last = omsgList[numMsgs-1];
250     if(last->m_type == STATUS) {
251         if(last->status_code == OSRF_STATUS_TIMEOUT) {
252             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
253             osrfCacheRemove(trans->thread);
254             return 0;
255         }
256         // XXX hm, check for explicit status=COMPLETE message instead??
257         if(last->status_code != OSRF_STATUS_CONTINUE)
258             trans->complete = 1;
259     }
260
261     return 1;
262 }
263
264 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
265     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
266     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
267     if(trans->multipart) {
268         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
269         contentTypeBuf[79] = '\0';
270         osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE, trans->delim, contentTypeBuf);
271             ap_set_content_type(trans->apreq, contentTypeBuf);
272         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
273     } else {
274             ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
275     }
276 }
277
278 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans) {
279     jsonObject* cacheObj = jsonNewObject(NULL);
280     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
281     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(trans->recipient));
282     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
283     osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
284 }
285
286            
287 /**
288  * Writes a single chunk of multipart/x-mixed-replace content
289  */
290 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
291     ap_rprintf(trans->apreq, 
292         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
293     if(trans->complete)
294         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
295     else
296         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
297     ap_rflush(trans->apreq);
298 }
299
300 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
301     if(trans->body == NULL)
302         return HTTP_BAD_REQUEST;
303
304     if(!osrfHttpTranslatorSetTo(trans))
305         return HTTP_BAD_REQUEST;
306
307     if(!osrfHttpTranslatorParseRequest(trans))
308         return HTTP_BAD_REQUEST;
309
310     while(client_recv(trans->handle, 0))
311         continue; // discard any old status messages in the recv queue
312
313     // send the message to the recipient
314     transport_message* tmsg = message_init(
315         trans->body, NULL, trans->thread, trans->recipient, NULL);
316     client_send_message(trans->handle, tmsg);
317     message_free(tmsg); 
318
319     if(trans->disconnectOnly) {
320         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
321         return OK;
322     }
323
324     // process the response from the opensrf service
325     int firstWrite = 1;
326     while(!trans->complete) {
327         transport_message* msg = client_recv(trans->handle, trans->timeout);
328
329         if(trans->handle->error) {
330             osrfLogError(OSRF_LOG_MARK, "Transport error");
331             return HTTP_NOT_FOUND;
332         }
333
334         if(msg == NULL)
335             return HTTP_GATEWAY_TIME_OUT;
336
337         if(msg->is_error) {
338             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
339             return HTTP_NOT_FOUND;
340         }
341
342         if(!osrfHttpTranslatorCheckStatus(trans, msg))
343             continue;
344
345         if(firstWrite) {
346             osrfHttpTranslatorInitHeaders(trans, msg);
347             osrfHttpTranslatorCacheSession(trans);
348             firstWrite = 0;
349         }
350
351         if(trans->multipart) {
352             osrfHttpTranslatorWriteChunk(trans, msg);
353             if(trans->connectOnly)
354                 break;
355         } else {
356             if(!trans->messages)
357                 trans->messages = osrfNewList();
358             osrfListPush(trans->messages, msg->body);
359
360             if(trans->complete || trans->connectOnly) {
361                 growing_buffer* buf = buffer_init(128);
362                 int i;
363                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
364                 for(i = 1; i < trans->messages->size; i++) {
365                     // yay! string mangling
366                 }
367             }
368         }
369     }
370
371     return OK;
372 }
373
374 static void testConnection(request_rec* r) {
375         if(!osrfConnected || !osrfSystemGetTransportClient()) {
376         osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
377                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
378                 usleep(100000); // .1 second to prevent process die/start overload
379                 exit(1);
380         }
381 }
382
383 // it's dead, Jim
384 static apr_status_t childExit(void* data) {
385     osrf_system_shutdown();
386     return OK;
387 }
388
389 static void childInit(apr_pool_t *p, server_rec *s) {
390         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
391                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
392                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
393                 return;
394         }
395
396     routerName = osrfConfigGetValue(NULL, "/router_name");
397     domainName = osrfConfigGetValue(NULL, "/domain");
398     // ---------------------
399     // XXX initialize the cache from the Apache settings
400     const char* servers[] = {"127.0.0.1:11211"};
401     osrfCacheInit(servers, 1, 86400);
402     // ---------------------
403         osrfConnected = 1;
404
405     // at pool destroy time (= child exit time), cleanup
406     apr_pool_cleanup_register(p, NULL, childExit, NULL);
407 }
408
409 static int handler(request_rec *r) {
410     int stat = OK;
411         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
412     if(r->header_only) return stat;
413
414         r->allowed |= (AP_METHOD_BIT << M_GET);
415         r->allowed |= (AP_METHOD_BIT << M_POST);
416
417         osrfLogSetAppname("osrf_http_translator");
418     testConnection(r);
419
420     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
421     if(trans->body) {
422         stat = osrfHttpTranslatorProcess(trans);
423         osrfHttpTranslatorDebug(trans);
424         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
425     } else {
426         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
427     }
428     osrfHttpTranslatorFree(trans);
429         return stat;
430 }
431
432
433 static void registerHooks (apr_pool_t *p) {
434         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
435         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
436 }
437
438
439 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
440         STANDARD20_MODULE_STUFF,
441     NULL,
442         NULL,
443     NULL,
444         NULL,
445     NULL,
446         registerHooks,
447 };
448
449
450
451