e945689461387b86cffec888133b1a896b7589de
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
16
17 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
18 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
19 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
20 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
21
22 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
23 #define JSON_CONTENT_TYPE "text/plain"
24 #define MAX_MSGS_PER_PACKET 256
25 #define CACHE_TIME 300
26 #define TRANSLATOR_INGRESS "translator-v1"
27
28 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
29 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
30 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
31 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
32 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
33 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
34 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
35
36
37 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
38 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
39 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
40
41 char* routerName = NULL;
42 char* domainName = NULL;
43 int osrfConnected = 0;
44 char recipientBuf[128];
45 char contentTypeBuf[80];
46
47 #if 0
48 // Commented out to avoid compiler warning
49 // for development only, writes to apache error log
50 static void _dbg(char* s, ...) {
51     VA_LIST_TO_STRING(s);
52     fprintf(stderr, "%s\n", VA_BUF);
53     fflush(stderr);
54 }
55 #endif
56
57 // Translator struct
58 typedef struct {
59     request_rec* apreq;
60     transport_client* handle;
61     osrfList* messages;
62     char* body;
63     char* delim;
64     const char* recipient;
65     const char* service;
66     const char* thread;
67     const char* remoteHost;
68     int complete;
69     int timeout;
70     int multipart;
71     int connectOnly; // there is only 1 message, a CONNECT
72     int disconnectOnly; // there is only 1 message, a DISCONNECT
73     int connecting; // there is a connect message in this batch
74     int disconnecting; // there is a connect message in this batch
75     int localXid;
76 } osrfHttpTranslator;
77
78
79 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
80     configFile = (char*) arg;
81         return NULL;
82 }
83 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
84     configCtx = (char*) arg;
85         return NULL;
86 }
87 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
88     cacheServers = (char*) arg;
89         return NULL;
90 }
91
92 /** set up the configuration handlers */
93 static const command_rec osrfHttpTranslatorCmds[] = {
94         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
95                         NULL, RSRC_CONF, "osrf translator config file"),
96         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
97                         NULL, RSRC_CONF, "osrf translator config file context"),
98         AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
99                         NULL, RSRC_CONF, "osrf translator cache server"),
100     {NULL}
101 };
102
103
104 // there can only be one, so use a global static one
105 static osrfHttpTranslator globalTranslator;
106
107 /*
108  * Constructs a new translator object based on the current apache 
109  * request_rec.  Reads the request body and headers.
110  */
111 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
112     osrfHttpTranslator* trans = &globalTranslator;
113     trans->apreq = apreq;
114     trans->complete = 0;
115     trans->connectOnly = 0;
116     trans->disconnectOnly = 0;
117     trans->connecting = 0;
118     trans->disconnecting = 0;
119     trans->remoteHost = apreq->connection->remote_ip;
120     trans->messages = NULL;
121
122     /* load the message body */
123     osrfStringArray* params     = apacheParseParms(apreq);
124     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
125     osrfStringArrayFree(params);
126
127     /* load the request headers */
128     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID))
129         // force our log xid to match the caller
130         osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
131
132     trans->handle = osrfSystemGetTransportClient();
133     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
134     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
135
136     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
137     if(timeout) 
138         trans->timeout = atoi(timeout);
139     else 
140         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
141
142     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
143     if(multipart && !strcasecmp(multipart, "true"))
144         trans->multipart = 1;
145     else
146         trans->multipart = 0;
147
148     char buf[32];
149     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
150     trans->delim = md5sum(buf);
151
152     /* Use thread if it has been passed in; otherwise, just use the delimiter */
153     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
154         ?  apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
155         : (const char*)trans->delim;
156
157     return trans;
158 }
159
160 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
161     if(!trans) return;
162     if(trans->body)
163         free(trans->body);
164     if(trans->delim)
165         free(trans->delim);
166     osrfListFree(trans->messages);
167 }
168
169 #if 0
170 // Commented out to avoid compiler warning
171 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
172     _dbg("-----------------------------------");
173     _dbg("body = %s", trans->body);
174     _dbg("service = %s", trans->service);
175     _dbg("thread = %s", trans->thread);
176     _dbg("multipart = %d", trans->multipart);
177     _dbg("recipient = %s", trans->recipient);
178 }
179 #endif
180
181 /**
182  * Determines the correct recipient address based on the requested 
183  * service or recipient address.  
184  */
185 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
186     int stat = 0;
187     jsonObject* sessionCache = NULL;
188
189     if(trans->service) {
190         if(trans->recipient) {
191             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
192
193         } else {
194             // service is specified, build a recipient address 
195             // from the router, domain, and service
196             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName,
197                 domainName, trans->service);
198             recipientBuf[size] = '\0';
199             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
200             trans->recipient = recipientBuf;
201             stat = 1;
202         }
203
204     } else {
205
206         if(trans->recipient) {
207             sessionCache = osrfCacheGetObject(trans->thread);
208
209             if(sessionCache) {
210                 const char* ipAddr = jsonObjectGetString(
211                     jsonObjectGetKeyConst( sessionCache, "ip" ));
212                 const char* recipient = jsonObjectGetString(
213                     jsonObjectGetKeyConst( sessionCache, "jid" ));
214
215                 // choosing a specific recipient address requires that the recipient and 
216                 // thread be cached on the server (so drone processes cannot be hijacked)
217                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
218                     osrfLogDebug( OSRF_LOG_MARK,
219                         "Found cached session from host %s and recipient %s",
220                         trans->remoteHost, trans->recipient);
221                     stat = 1;
222                     trans->service = apr_pstrdup(
223                         trans->apreq->pool, jsonObjectGetString(
224                             jsonObjectGetKeyConst( sessionCache, "service" )));
225
226                 } else {
227                     osrfLogError(OSRF_LOG_MARK, 
228                         "Session cache for thread %s does not match request", trans->thread);
229                 }
230             }  else {
231                 osrfLogError(OSRF_LOG_MARK, 
232                     "attempt to send directly to %s without a session", trans->recipient);
233             }
234         } else {
235             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
236         } 
237     }
238
239     jsonObjectFree(sessionCache);
240     return stat;
241 }
242
243 /**
244  * Parses the request body, logs any REQUEST messages to the activity log, 
245  * stamps the translator ingress on each message, and returns the updated 
246  * messages as a JSON string.
247  */
248 static char* osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
249     osrfMessage* msg;
250     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
251     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
252     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
253
254     if(numMsgs == 0)
255         return NULL;
256
257     // log request messages to the activity log
258     int i;
259     for(i = 0; i < numMsgs; i++) {
260         msg = msgList[i];
261         osrfMessageSetIngress(msg, TRANSLATOR_INGRESS);
262
263         switch(msg->m_type) {
264
265             case REQUEST: {
266                 const jsonObject* params = msg->_params;
267                 growing_buffer* act = buffer_init(128); 
268                 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "",
269                     trans->service, msg->method_name);
270
271                 const jsonObject* obj = NULL;
272                 int i = 0;
273                 char* str; 
274                 while((obj = jsonObjectGetIndex(params, i++))) {
275                     str = jsonObjectToJSON(obj);
276                     if( i == 1 )
277                         OSRF_BUFFER_ADD(act, " ");
278                     else 
279                         OSRF_BUFFER_ADD(act, ", ");
280                     OSRF_BUFFER_ADD(act, str);
281                     free(str);
282                 }
283                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
284                 buffer_free(act);
285                 break;
286             }
287
288             case CONNECT:
289                 trans->connecting = 1;
290                 if (numMsgs == 1) 
291                     trans->connectOnly = 1;
292                 break;
293
294             case DISCONNECT:
295                 trans->disconnecting = 1;
296                 if (numMsgs == 1) 
297                     trans->disconnectOnly = 1;
298                 break;
299
300             case RESULT:
301                 osrfLogWarning( OSRF_LOG_MARK, "Unexpected RESULT message received" );
302                 break;
303
304             case STATUS:
305                 osrfLogWarning( OSRF_LOG_MARK, "Unexpected STATUS message received" );
306                 break;
307
308             default:
309                 osrfLogWarning( OSRF_LOG_MARK, "Invalid message type %d received",
310                     msg->m_type );
311                 break;
312         }
313     }
314
315     return osrfMessageSerializeBatch(msgList, numMsgs);
316 }
317
318 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
319     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
320     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
321     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
322     if(numMsgs == 0) return 0;
323
324     osrfMessage* last = omsgList[numMsgs-1];
325     if(last->m_type == STATUS) {
326         if(last->status_code == OSRF_STATUS_TIMEOUT) {
327             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
328             osrfCacheRemove(trans->thread);
329             return 0;
330         }
331         // XXX hm, check for explicit status=COMPLETE message instead??
332         if(last->status_code != OSRF_STATUS_CONTINUE)
333             trans->complete = 1;
334     }
335
336     return 1;
337 }
338
339 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
340     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
341     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
342     if(trans->multipart) {
343         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
344         contentTypeBuf[79] = '\0';
345         osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE,
346         trans->delim, contentTypeBuf);
347         ap_set_content_type(trans->apreq, contentTypeBuf);
348         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
349     } else {
350         ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
351     }
352 }
353
354 /**
355  * Cache the transaction with the JID of the backend process we are talking to
356  */
357 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
358     jsonObject* cacheObj = jsonNewObject(NULL);
359     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
360     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
361     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
362     osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
363 }
364
365
366 /**
367  * Writes a single chunk of multipart/x-mixed-replace content
368  */
369 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
370     osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
371     ap_rprintf(trans->apreq, 
372         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
373     //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n",
374     //JSON_CONTENT_TYPE, msg->body);
375     if(trans->complete) {
376         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
377         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
378     } else {
379         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
380         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
381     }
382     ap_rflush(trans->apreq);
383 }
384
385 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
386     if(trans->body == NULL)
387         return HTTP_BAD_REQUEST;
388
389     if(!osrfHttpTranslatorSetTo(trans))
390         return HTTP_BAD_REQUEST;
391
392     char* jsonBody = osrfHttpTranslatorParseRequest(trans);
393     if (NULL == jsonBody)
394         return HTTP_BAD_REQUEST;
395
396     while(client_recv(trans->handle, 0))
397         continue; // discard any old status messages in the recv queue
398
399     // send the message to the recipient
400     transport_message* tmsg = message_init(
401         jsonBody, NULL, trans->thread, trans->recipient, NULL);
402     message_set_osrf_xid(tmsg, osrfLogGetXid());
403     client_send_message(trans->handle, tmsg);
404     message_free(tmsg); 
405     free(jsonBody);
406
407     if(trans->disconnectOnly) {
408         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
409         osrfCacheRemove(trans->thread);
410         return OK;
411     }
412
413     // process the response from the opensrf service
414     int firstWrite = 1;
415     while(!trans->complete) {
416         transport_message* msg = client_recv(trans->handle, trans->timeout);
417
418         if(trans->handle->error) {
419             osrfLogError(OSRF_LOG_MARK, "Transport error");
420             osrfCacheRemove(trans->thread);
421             return HTTP_INTERNAL_SERVER_ERROR;
422         }
423
424         if(msg == NULL)
425             return HTTP_GATEWAY_TIME_OUT;
426
427         if(msg->is_error) {
428             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
429             osrfCacheRemove(trans->thread);
430             return HTTP_NOT_FOUND;
431         }
432
433         if(!osrfHttpTranslatorCheckStatus(trans, msg))
434             continue;
435
436         if(firstWrite) {
437             osrfHttpTranslatorInitHeaders(trans, msg);
438             if(trans->connecting)
439                 osrfHttpTranslatorCacheSession(trans, msg->sender);
440             firstWrite = 0;
441         }
442
443         if(trans->multipart) {
444             osrfHttpTranslatorWriteChunk(trans, msg);
445             if(trans->connectOnly)
446                 break;
447         } else {
448             if(!trans->messages)
449                 trans->messages = osrfNewList();
450             osrfListPush(trans->messages, msg->body);
451
452             if(trans->complete || trans->connectOnly) {
453                 growing_buffer* buf = buffer_init(128);
454                 int i;
455                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
456                 for(i = 1; i < trans->messages->size; i++) {
457                     buffer_chomp(buf); // chomp off the closing array bracket
458                     char* body = osrfListGetIndex(trans->messages, i);
459                     char newbuf[strlen(body)];
460                     sprintf(newbuf, body+1); // chomp off the opening array bracket
461                     OSRF_BUFFER_ADD_CHAR(buf, ',');
462                     OSRF_BUFFER_ADD(buf, newbuf);
463                 }
464
465                 ap_rputs(buf->buf, trans->apreq);
466                 buffer_free(buf);
467             }
468         }
469     }
470
471     if(trans->disconnecting) // DISCONNECT within a multi-message batch
472         osrfCacheRemove(trans->thread);
473
474     return OK;
475 }
476
477 static void testConnection(request_rec* r) {
478         if(!osrfConnected || !osrfSystemGetTransportClient()) {
479         osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
480                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
481                 usleep(100000); // .1 second to prevent process die/start overload
482                 exit(1);
483         }
484 }
485
486 #if 0
487 // Commented out to avoid compiler warning
488 // it's dead, Jim
489 static apr_status_t childExit(void* data) {
490     osrf_system_shutdown();
491     return OK;
492 }
493 #endif
494
495 static void childInit(apr_pool_t *p, server_rec *s) {
496         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
497                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
498                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
499                 return;
500         }
501
502     routerName = osrfConfigGetValue(NULL, "/router_name");
503     domainName = osrfConfigGetValue(NULL, "/domain");
504     const char* servers[] = {cacheServers};
505     osrfCacheInit(servers, 1, 86400);
506         osrfConnected = 1;
507
508     // at pool destroy time (= child exit time), cleanup
509     // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
510     //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
511 }
512
513 static int handler(request_rec *r) {
514     int stat = OK;
515         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
516     if(r->header_only) return stat;
517
518         r->allowed |= (AP_METHOD_BIT << M_GET);
519         r->allowed |= (AP_METHOD_BIT << M_POST);
520
521         osrfLogSetAppname("osrf_http_translator");
522         osrfAppSessionSetIngress(TRANSLATOR_INGRESS);
523     testConnection(r);
524
525         osrfLogMkXid();
526     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
527     if(trans->body) {
528         stat = osrfHttpTranslatorProcess(trans);
529         //osrfHttpTranslatorDebug(trans);
530         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
531     } else {
532         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
533     }
534     osrfHttpTranslatorFree(trans);
535         return stat;
536 }
537
538
539 static void registerHooks (apr_pool_t *p) {
540         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
541         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
542 }
543
544
545 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
546         STANDARD20_MODULE_STUFF,
547     NULL,
548         NULL,
549     NULL,
550         NULL,
551     osrfHttpTranslatorCmds,
552         registerHooks,
553 };