2b1d7bf4c5705698ffb0d7cf33314e54278e0270
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
16
17 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
18 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
19 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
20 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
21
22 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
23 #define JSON_CONTENT_TYPE "text/plain"
24 #define MAX_MSGS_PER_PACKET 256
25 #define CACHE_TIME 300
26
27 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
28 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
29 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
30 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
31 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
32 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
33 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
34
35 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
36 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
37 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
38
39 char* routerName = NULL;
40 char* domainName = NULL;
41 int osrfConnected = 0;
42 char recipientBuf[128];
43 char contentTypeBuf[80];
44
45 // for development only, writes to apache error log
46 static void _dbg(char* s, ...) {
47     VA_LIST_TO_STRING(s);
48     fprintf(stderr, "%s\n", VA_BUF);
49     fflush(stderr);
50 }
51
52 // Translator struct
53 typedef struct {
54     request_rec* apreq;
55     transport_client* handle;
56     osrfList* messages;
57     char* body;
58     char* delim;
59     const char* recipient;
60     const char* service;
61     const char* thread;
62     const char* remoteHost;
63     int complete;
64     int timeout;
65     int multipart;
66     int connectOnly; // there is only 1 message, a CONNECT
67     int disconnectOnly; // there is only 1 message, a DISCONNECT
68     int connecting; // there is a connect message in this batch
69     int disconnecting; // there is a connect message in this batch
70     int localXid;
71 } osrfHttpTranslator;
72
73
74 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
75     configFile = (char*) arg;
76         return NULL;
77 }
78 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
79     configCtx = (char*) arg;
80         return NULL;
81 }
82 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
83     cacheServers = (char*) arg;
84         return NULL;
85 }
86
87 /** set up the configuratoin handlers */
88 static const command_rec osrf_json_gateway_cmds[] = {
89         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
90                         NULL, RSRC_CONF, "osrf translator config file"),
91         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
92                         NULL, RSRC_CONF, "osrf translator config file context"),
93         AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
94                         NULL, RSRC_CONF, "osrf translator cache server"),
95     {NULL}
96 };
97
98
99 // there can only be one, so use a global static one
100 static osrfHttpTranslator globalTranslator;
101
102 /*
103  * Constructs a new translator object based on the current apache 
104  * request_rec.  Reads the request body and headers.
105  */
106 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
107     osrfHttpTranslator* trans = &globalTranslator;
108     trans->apreq = apreq;
109     trans->complete = 0;
110     trans->connectOnly = 0;
111     trans->disconnectOnly = 0;
112     trans->connecting = 0;
113     trans->disconnecting = 0;
114     trans->remoteHost = apreq->connection->remote_ip;
115     trans->messages = NULL;
116
117     /* load the message body */
118         osrfStringArray* params = apacheParseParms(apreq);
119     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
120     osrfStringArrayFree(params);
121
122     /* load the request headers */
123     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)) // force our log xid to match the caller
124             osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
125
126     trans->handle = osrfSystemGetTransportClient();
127     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
128     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
129     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */
130
131     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
132     if(timeout) 
133         trans->timeout = atoi(timeout);
134     else 
135         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
136
137     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
138     if(multipart && !strcasecmp(multipart, "true"))
139         trans->multipart = 1;
140     else
141         trans->multipart = 0;
142
143     char buf[32];
144     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
145     trans->delim = md5sum(buf);
146
147     return trans;
148 }
149
150 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
151     if(!trans) return;
152     if(trans->body)
153         free(trans->body);
154     if(trans->delim)
155         free(trans->delim);
156     osrfListFree(trans->messages);
157 }
158
159 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
160     _dbg("-----------------------------------");
161     _dbg("body = %s", trans->body);
162     _dbg("service = %s", trans->service);
163     _dbg("thread = %s", trans->thread);
164     _dbg("multipart = %d", trans->multipart);
165     _dbg("recipient = %s", trans->recipient);
166 }
167
168 /**
169  * Determines the correct recipient address based on the requested 
170  * service or recipient address.  
171  */
172 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
173     int stat = 0;
174     jsonObject* sessionCache = NULL;
175
176     if(trans->service) {
177         if(trans->recipient) {
178             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
179
180         } else {
181             // service is specified, build a recipient address 
182             // from the router, domain, and service
183             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
184             recipientBuf[size] = '\0';
185             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
186             trans->recipient = recipientBuf;
187             stat = 1;
188         }
189
190     } else {
191
192         if(trans->recipient) {
193             sessionCache = osrfCacheGetObject(trans->thread);
194
195             if(sessionCache) {
196                 char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip"));
197                 char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid"));
198
199                 // choosing a specific recipient address requires that the recipient and 
200                 // thread be cached on the server (so drone processes cannot be hijacked)
201                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
202                     osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s", 
203                         trans->remoteHost, trans->recipient);
204                     stat = 1;
205                     trans->service = apr_pstrdup(
206                         trans->apreq->pool, jsonObjectGetString(jsonObjectGetKey(sessionCache, "service")));
207
208                 } else {
209                     osrfLogError(OSRF_LOG_MARK, 
210                         "Session cache for thread %s does not match request", trans->thread);
211                 }
212             }  else {
213                 osrfLogError(OSRF_LOG_MARK, 
214                     "attempt to send directly to %s without a session", trans->recipient);
215             }
216         } else {
217             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
218         } 
219     }
220
221     jsonObjectFree(sessionCache);
222     return stat;
223 }
224
225 /**
226  * Parses the request body and logs any REQUEST messages to the activity log
227  */
228 static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
229     osrfMessage* msg;
230     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
231     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
232     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
233
234     if(numMsgs == 0)
235         return 0;
236
237     if(numMsgs == 1) {
238         msg = msgList[0];
239         if(msg->m_type == CONNECT) {
240             trans->connectOnly = 1;
241             trans->connecting = 1;
242             return 1;
243         }
244         if(msg->m_type == DISCONNECT) {
245             trans->disconnectOnly = 1;
246             trans->disconnecting = 1;
247             return 1;
248         }
249     }
250
251     // log request messages to the activity log
252     int i;
253     for(i = 0; i < numMsgs; i++) {
254         msg = msgList[i];
255
256         switch(msg->m_type) {
257
258             case REQUEST: {
259                 jsonObject* params = msg->_params;
260                 growing_buffer* act = buffer_init(128); 
261                 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
262
263                 jsonObject* obj = NULL;
264                 int i = 0;
265                 char* str; 
266                 while((obj = jsonObjectGetIndex(params, i++))) {
267                     str = jsonObjectToJSON(obj);
268                     if( i == 1 )
269                         OSRF_BUFFER_ADD(act, " ");
270                     else 
271                         OSRF_BUFFER_ADD(act, ", ");
272                     OSRF_BUFFER_ADD(act, str);
273                     free(str);
274                 }
275                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
276                 buffer_free(act);
277                 break;
278             }
279
280             case CONNECT:
281                 trans->connecting = 1;
282                 break;
283
284             case DISCONNECT:
285                 trans->disconnecting = 1;
286                 break;
287         }
288     }
289
290     return 1;
291 }
292
293 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
294     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
295     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
296     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
297     if(numMsgs == 0) return 0;
298
299     osrfMessage* last = omsgList[numMsgs-1];
300     if(last->m_type == STATUS) {
301         if(last->status_code == OSRF_STATUS_TIMEOUT) {
302             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
303             osrfCacheRemove(trans->thread);
304             return 0;
305         }
306         // XXX hm, check for explicit status=COMPLETE message instead??
307         if(last->status_code != OSRF_STATUS_CONTINUE)
308             trans->complete = 1;
309     }
310
311     return 1;
312 }
313
314 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
315     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
316     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
317     if(trans->multipart) {
318         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
319         contentTypeBuf[79] = '\0';
320         osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE, trans->delim, contentTypeBuf);
321             ap_set_content_type(trans->apreq, contentTypeBuf);
322         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
323     } else {
324             ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
325     }
326 }
327
328 /**
329  * Cache the transaction with the JID of the backend process we are talking to
330  */
331 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
332     jsonObject* cacheObj = jsonNewObject(NULL);
333     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
334     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
335     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
336     osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
337 }
338
339            
340 /**
341  * Writes a single chunk of multipart/x-mixed-replace content
342  */
343 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
344     osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
345     ap_rprintf(trans->apreq, 
346         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
347     //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
348     if(trans->complete) {
349         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
350         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
351     } else {
352         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
353         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
354     }
355     ap_rflush(trans->apreq);
356 }
357
358 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
359     if(trans->body == NULL)
360         return HTTP_BAD_REQUEST;
361
362     if(!osrfHttpTranslatorSetTo(trans))
363         return HTTP_BAD_REQUEST;
364
365     if(!osrfHttpTranslatorParseRequest(trans))
366         return HTTP_BAD_REQUEST;
367
368     while(client_recv(trans->handle, 0))
369         continue; // discard any old status messages in the recv queue
370
371     // send the message to the recipient
372     transport_message* tmsg = message_init(
373         trans->body, NULL, trans->thread, trans->recipient, NULL);
374     message_set_osrf_xid(tmsg, osrfLogGetXid());
375     client_send_message(trans->handle, tmsg);
376     message_free(tmsg); 
377
378     if(trans->disconnectOnly) {
379         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
380         osrfCacheRemove(trans->thread);
381         return OK;
382     }
383
384     // process the response from the opensrf service
385     int firstWrite = 1;
386     while(!trans->complete) {
387         transport_message* msg = client_recv(trans->handle, trans->timeout);
388
389         if(trans->handle->error) {
390             osrfLogError(OSRF_LOG_MARK, "Transport error");
391             osrfCacheRemove(trans->thread);
392             return HTTP_INTERNAL_SERVER_ERROR;
393         }
394
395         if(msg == NULL)
396             return HTTP_GATEWAY_TIME_OUT;
397
398         if(msg->is_error) {
399             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
400             osrfCacheRemove(trans->thread);
401             return HTTP_NOT_FOUND;
402         }
403
404         if(!osrfHttpTranslatorCheckStatus(trans, msg))
405             continue;
406
407         if(firstWrite) {
408             osrfHttpTranslatorInitHeaders(trans, msg);
409             if(trans->connecting)
410                 osrfHttpTranslatorCacheSession(trans, msg->sender);
411             firstWrite = 0;
412         }
413
414         if(trans->multipart) {
415             osrfHttpTranslatorWriteChunk(trans, msg);
416             if(trans->connectOnly)
417                 break;
418         } else {
419             if(!trans->messages)
420                 trans->messages = osrfNewList();
421             osrfListPush(trans->messages, msg->body);
422
423             if(trans->complete || trans->connectOnly) {
424                 growing_buffer* buf = buffer_init(128);
425                 int i;
426                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
427                 for(i = 1; i < trans->messages->size; i++) {
428                     buffer_chomp(buf); // chomp off the closing array bracket
429                     char* body = osrfListGetIndex(trans->messages, i);
430                     char newbuf[strlen(body)];
431                     sprintf(newbuf, body+1); // chomp off the opening array bracket
432                     OSRF_BUFFER_ADD_CHAR(buf, ',');
433                     OSRF_BUFFER_ADD(buf, newbuf);
434                 }
435                 
436                 ap_rputs(buf->buf, trans->apreq);
437                 buffer_free(buf);
438             }
439         }
440     }
441
442     if(trans->disconnecting) // DISCONNECT within a multi-message batch
443         osrfCacheRemove(trans->thread);
444
445     return OK;
446 }
447
448 static void testConnection(request_rec* r) {
449         if(!osrfConnected || !osrfSystemGetTransportClient()) {
450         osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
451                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
452                 usleep(100000); // .1 second to prevent process die/start overload
453                 exit(1);
454         }
455 }
456
457 // it's dead, Jim
458 static apr_status_t childExit(void* data) {
459     osrf_system_shutdown();
460     return OK;
461 }
462
463 static void childInit(apr_pool_t *p, server_rec *s) {
464         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
465                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
466                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
467                 return;
468         }
469
470     routerName = osrfConfigGetValue(NULL, "/router_name");
471     domainName = osrfConfigGetValue(NULL, "/domain");
472     const char* servers[] = {cacheServers};
473     osrfCacheInit(servers, 1, 86400);
474         osrfConnected = 1;
475
476     // at pool destroy time (= child exit time), cleanup
477     // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
478     //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
479 }
480
481 static int handler(request_rec *r) {
482     int stat = OK;
483         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
484     if(r->header_only) return stat;
485
486         r->allowed |= (AP_METHOD_BIT << M_GET);
487         r->allowed |= (AP_METHOD_BIT << M_POST);
488
489         osrfLogSetAppname("osrf_http_translator");
490     testConnection(r);
491
492         osrfLogMkXid();
493     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
494     if(trans->body) {
495         stat = osrfHttpTranslatorProcess(trans);
496         //osrfHttpTranslatorDebug(trans);
497         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
498     } else {
499         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
500     }
501     osrfHttpTranslatorFree(trans);
502         return stat;
503 }
504
505
506 static void registerHooks (apr_pool_t *p) {
507         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
508         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
509 }
510
511
512 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
513         STANDARD20_MODULE_STUFF,
514     NULL,
515         NULL,
516     NULL,
517         NULL,
518     NULL,
519         registerHooks,
520 };
521
522
523
524