LP#1243841 - Quiet remaining Make install warnings.
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11 #include <opensrf/string_array.h>
12
13 #define MODULE_NAME "osrf_http_translator_module"
14 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
15 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
16 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
17
18 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
19 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
20 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
21 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
22
23 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
24 #define JSON_CONTENT_TYPE "text/plain"
25 #define MAX_MSGS_PER_PACKET 256
26 #define CACHE_TIME 300
27 #define TRANSLATOR_INGRESS "translator-v1"
28
29 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
30 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
31 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
32 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
33 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
34 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
35 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
36
37
38 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
39 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
40 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
41
42 char* routerName = NULL;
43 char* domainName = NULL;
44 int osrfConnected = 0;
45 char recipientBuf[128];
46 char contentTypeBuf[80];
47 osrfStringArray* allowedOrigins = NULL;
48
49 #if 0
50 // Commented out to avoid compiler warning
51 // for development only, writes to apache error log
52 static void _dbg(char* s, ...) {
53     VA_LIST_TO_STRING(s);
54     fprintf(stderr, "%s\n", VA_BUF);
55     fflush(stderr);
56 }
57 #endif
58
59 // Translator struct
60 typedef struct {
61     request_rec* apreq;
62     transport_client* handle;
63     osrfList* messages;
64     char* body;
65     char* delim;
66     const char* recipient;
67     const char* service;
68     const char* thread;
69     const char* remoteHost;
70     int complete;
71     int timeout;
72     int multipart;
73     int connectOnly; // there is only 1 message, a CONNECT
74     int disconnectOnly; // there is only 1 message, a DISCONNECT
75     int connecting; // there is a connect message in this batch
76     int disconnecting; // there is a connect message in this batch
77     int localXid;
78 } osrfHttpTranslator;
79
80
81 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
82     configFile = (char*) arg;
83         return NULL;
84 }
85 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
86     configCtx = (char*) arg;
87         return NULL;
88 }
89 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
90     cacheServers = (char*) arg;
91         return NULL;
92 }
93
94 /** set up the configuration handlers */
95 static const command_rec osrfHttpTranslatorCmds[] = {
96         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
97                         NULL, RSRC_CONF, "osrf translator config file"),
98         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
99                         NULL, RSRC_CONF, "osrf translator config file context"),
100         AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
101                         NULL, RSRC_CONF, "osrf translator cache server"),
102     {NULL}
103 };
104
105
106 // there can only be one, so use a global static one
107 static osrfHttpTranslator globalTranslator;
108
109 /*
110  * Constructs a new translator object based on the current apache 
111  * request_rec.  Reads the request body and headers.
112  */
113 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
114     osrfHttpTranslator* trans = &globalTranslator;
115     trans->apreq = apreq;
116     trans->complete = 0;
117     trans->connectOnly = 0;
118     trans->disconnectOnly = 0;
119     trans->connecting = 0;
120     trans->disconnecting = 0;
121 #ifdef APACHE_MIN_24
122     trans->remoteHost = apreq->connection->client_ip;
123 #else
124     trans->remoteHost = apreq->connection->remote_ip;
125 #endif
126     trans->messages = NULL;
127
128     /* load the message body */
129     osrfStringArray* params     = apacheParseParms(apreq);
130     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
131     osrfStringArrayFree(params);
132
133     /* load the request headers */
134     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID))
135         // force our log xid to match the caller
136         osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
137
138     trans->handle = osrfSystemGetTransportClient();
139     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
140     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
141
142     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
143     if(timeout) 
144         trans->timeout = atoi(timeout);
145     else 
146         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
147
148     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
149     if(multipart && !strcasecmp(multipart, "true"))
150         trans->multipart = 1;
151     else
152         trans->multipart = 0;
153
154     char buf[32];
155     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
156     trans->delim = md5sum(buf);
157
158     /* Use thread if it has been passed in; otherwise, just use the delimiter */
159     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
160         ?  apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
161         : (const char*)trans->delim;
162
163     return trans;
164 }
165
166 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
167     if(!trans) return;
168     if(trans->body)
169         free(trans->body);
170     if(trans->delim)
171         free(trans->delim);
172     osrfListFree(trans->messages);
173 }
174
175 #if 0
176 // Commented out to avoid compiler warning
177 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
178     _dbg("-----------------------------------");
179     _dbg("body = %s", trans->body);
180     _dbg("service = %s", trans->service);
181     _dbg("thread = %s", trans->thread);
182     _dbg("multipart = %d", trans->multipart);
183     _dbg("recipient = %s", trans->recipient);
184 }
185 #endif
186
187 /**
188  * Determines the correct recipient address based on the requested 
189  * service or recipient address.  
190  */
191 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
192     int stat = 0;
193     jsonObject* sessionCache = NULL;
194
195     if(trans->service) {
196         if(trans->recipient) {
197             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
198
199         } else {
200             // service is specified, build a recipient address 
201             // from the router, domain, and service
202             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName,
203                 domainName, trans->service);
204             recipientBuf[size] = '\0';
205             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
206             trans->recipient = recipientBuf;
207             stat = 1;
208         }
209
210     } else {
211
212         if(trans->recipient) {
213             sessionCache = osrfCacheGetObject(trans->thread);
214
215             if(sessionCache) {
216                 const char* ipAddr = jsonObjectGetString(
217                     jsonObjectGetKeyConst( sessionCache, "ip" ));
218                 const char* recipient = jsonObjectGetString(
219                     jsonObjectGetKeyConst( sessionCache, "jid" ));
220
221                 // choosing a specific recipient address requires that the recipient and 
222                 // thread be cached on the server (so drone processes cannot be hijacked)
223                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
224                     osrfLogDebug( OSRF_LOG_MARK,
225                         "Found cached session from host %s and recipient %s",
226                         trans->remoteHost, trans->recipient);
227                     stat = 1;
228                     trans->service = apr_pstrdup(
229                         trans->apreq->pool, jsonObjectGetString(
230                             jsonObjectGetKeyConst( sessionCache, "service" )));
231
232                 } else {
233                     osrfLogError(OSRF_LOG_MARK, 
234                         "Session cache for thread %s does not match request", trans->thread);
235                 }
236             }  else {
237                 osrfLogError(OSRF_LOG_MARK, 
238                     "attempt to send directly to %s without a session", trans->recipient);
239             }
240         } else {
241             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
242         } 
243     }
244
245     jsonObjectFree(sessionCache);
246     return stat;
247 }
248
249 /**
250  * Parses the request body, logs any REQUEST messages to the activity log, 
251  * stamps the translator ingress on each message, and returns the updated 
252  * messages as a JSON string.
253  */
254 static char* osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
255     osrfMessage* msg;
256     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
257     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
258     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
259
260     if(numMsgs == 0)
261         return NULL;
262
263     // log request messages to the activity log
264     int i;
265     for(i = 0; i < numMsgs; i++) {
266         msg = msgList[i];
267         osrfMessageSetIngress(msg, TRANSLATOR_INGRESS);
268
269         switch(msg->m_type) {
270
271             case REQUEST: {
272                 const jsonObject* params = msg->_params;
273                 growing_buffer* act = buffer_init(128); 
274                 char* method = msg->method_name;
275                 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "",
276                     trans->service, method);
277
278                 const jsonObject* obj = NULL;
279                 int i = 0;
280                 const char* str;
281                 int redactParams = 0;
282                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
283                     //osrfLogInternal(OSRF_LOG_MARK, "Checking for log protection [%s]", str);
284                     if(!strncmp(method, str, strlen(str))) {
285                         redactParams = 1;
286                         break;
287                     }
288                 }
289                 if(redactParams) {
290                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
291                 } else {
292                     i = 0;
293                     while((obj = jsonObjectGetIndex(params, i++))) {
294                         str = jsonObjectToJSON(obj);
295                         if( i == 1 )
296                             OSRF_BUFFER_ADD(act, " ");
297                         else
298                             OSRF_BUFFER_ADD(act, ", ");
299                         OSRF_BUFFER_ADD(act, str);
300                         free((void *)str);
301                     }
302                 }
303                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
304                 buffer_free(act);
305                 break;
306             }
307
308             case CONNECT:
309                 trans->connecting = 1;
310                 if (numMsgs == 1) 
311                     trans->connectOnly = 1;
312                 break;
313
314             case DISCONNECT:
315                 trans->disconnecting = 1;
316                 if (numMsgs == 1) 
317                     trans->disconnectOnly = 1;
318                 break;
319
320             case RESULT:
321                 osrfLogWarning( OSRF_LOG_MARK, "Unexpected RESULT message received" );
322                 break;
323
324             case STATUS:
325                 osrfLogWarning( OSRF_LOG_MARK, "Unexpected STATUS message received" );
326                 break;
327
328             default:
329                 osrfLogWarning( OSRF_LOG_MARK, "Invalid message type %d received",
330                     msg->m_type );
331                 break;
332         }
333     }
334
335     char* jsonString = osrfMessageSerializeBatch(msgList, numMsgs);
336     for(i = 0; i < numMsgs; i++) {
337         osrfMessageFree(msgList[i]);
338     }
339     return jsonString;
340 }
341
342 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
343     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
344     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
345     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
346     if(numMsgs == 0) return 0;
347
348     osrfMessage* last = omsgList[numMsgs-1];
349     if(last->m_type == STATUS) {
350         if(last->status_code == OSRF_STATUS_TIMEOUT) {
351             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
352             osrfCacheRemove(trans->thread);
353             return 0;
354         }
355         // XXX hm, check for explicit status=COMPLETE message instead??
356         if(last->status_code != OSRF_STATUS_CONTINUE)
357             trans->complete = 1;
358     }
359
360     return 1;
361 }
362
363 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
364     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
365     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
366     if(trans->multipart) {
367         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
368         contentTypeBuf[79] = '\0';
369         osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE,
370         trans->delim, contentTypeBuf);
371         ap_set_content_type(trans->apreq, contentTypeBuf);
372         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
373     } else {
374         ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
375     }
376 }
377
378 /**
379  * Cache the transaction with the JID of the backend process we are talking to
380  */
381 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
382     jsonObject* cacheObj = jsonNewObject(NULL);
383     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
384     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
385     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
386     osrfCachePutObject(trans->thread, cacheObj, CACHE_TIME);
387 }
388
389
390 /**
391  * Writes a single chunk of multipart/x-mixed-replace content
392  */
393 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
394     osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
395     ap_rprintf(trans->apreq, 
396         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
397     //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n",
398     //JSON_CONTENT_TYPE, msg->body);
399     if(trans->complete) {
400         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
401         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
402     } else {
403         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
404         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
405     }
406     ap_rflush(trans->apreq);
407 }
408
409 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
410     if(trans->body == NULL)
411         return HTTP_BAD_REQUEST;
412
413     if(!osrfHttpTranslatorSetTo(trans))
414         return HTTP_BAD_REQUEST;
415
416     char* jsonBody = osrfHttpTranslatorParseRequest(trans);
417     if (NULL == jsonBody)
418         return HTTP_BAD_REQUEST;
419
420     while(client_recv(trans->handle, 0))
421         continue; // discard any old status messages in the recv queue
422
423     // send the message to the recipient
424     transport_message* tmsg = message_init(
425         jsonBody, NULL, trans->thread, trans->recipient, NULL);
426     message_set_osrf_xid(tmsg, osrfLogGetXid());
427     client_send_message(trans->handle, tmsg);
428     message_free(tmsg); 
429     free(jsonBody);
430
431     if(trans->disconnectOnly) {
432         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
433         osrfCacheRemove(trans->thread);
434         return OK;
435     }
436
437     // process the response from the opensrf service
438     int firstWrite = 1;
439     while(!trans->complete) {
440         transport_message* msg = client_recv(trans->handle, trans->timeout);
441
442         if(trans->handle->error) {
443             osrfLogError(OSRF_LOG_MARK, "Transport error");
444             osrfCacheRemove(trans->thread);
445             return HTTP_INTERNAL_SERVER_ERROR;
446         }
447
448         if(msg == NULL)
449             return HTTP_GATEWAY_TIME_OUT;
450
451         if(msg->is_error) {
452             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
453             osrfCacheRemove(trans->thread);
454             return HTTP_NOT_FOUND;
455         }
456
457         if(!osrfHttpTranslatorCheckStatus(trans, msg))
458             continue;
459
460         if(firstWrite) {
461             osrfHttpTranslatorInitHeaders(trans, msg);
462             if(trans->connecting)
463                 osrfHttpTranslatorCacheSession(trans, msg->sender);
464             firstWrite = 0;
465         }
466
467         if(trans->multipart) {
468             osrfHttpTranslatorWriteChunk(trans, msg);
469             if(trans->connectOnly)
470                 break;
471         } else {
472             if(!trans->messages)
473                 trans->messages = osrfNewList();
474             osrfListPush(trans->messages, msg->body);
475
476             if(trans->complete || trans->connectOnly) {
477                 growing_buffer* buf = buffer_init(128);
478                 unsigned int i;
479                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
480                 for(i = 1; i < trans->messages->size; i++) {
481                     buffer_chomp(buf); // chomp off the closing array bracket
482                     char* body = osrfListGetIndex(trans->messages, i);
483                     char newbuf[strlen(body)];
484                     sprintf(newbuf, "%s", body+1); // chomp off the opening array bracket
485                     OSRF_BUFFER_ADD_CHAR(buf, ',');
486                     OSRF_BUFFER_ADD(buf, newbuf);
487                 }
488
489                 ap_rputs(buf->buf, trans->apreq);
490                 buffer_free(buf);
491             }
492         }
493     }
494
495     if(trans->disconnecting) // DISCONNECT within a multi-message batch
496         osrfCacheRemove(trans->thread);
497
498     return OK;
499 }
500
501 static void testConnection(request_rec* r) {
502         if(!osrfConnected || !osrfSystemGetTransportClient()) {
503         osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
504                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
505                 usleep(100000); // .1 second to prevent process die/start overload
506                 exit(1);
507         }
508 }
509
510 #if 0
511 // Commented out to avoid compiler warning
512 // it's dead, Jim
513 static apr_status_t childExit(void* data) {
514     osrf_system_shutdown();
515     return OK;
516 }
517 #endif
518
519 static void childInit(apr_pool_t *p, server_rec *s) {
520         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
521                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
522                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
523                 return;
524         }
525
526     routerName = osrfConfigGetValue(NULL, "/router_name");
527     domainName = osrfConfigGetValue(NULL, "/domain");
528     const char* servers[] = {cacheServers};
529     osrfCacheInit(servers, 1, 86400);
530         osrfConnected = 1;
531
532     allowedOrigins = osrfNewStringArray(4);
533     osrfConfigGetValueList(NULL, allowedOrigins, "/cross_origin/origin");
534
535     // at pool destroy time (= child exit time), cleanup
536     // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
537     //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
538 }
539
540 static int handler(request_rec *r) {
541     int stat = OK;
542         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
543     if(r->header_only) return stat;
544
545         r->allowed |= (AP_METHOD_BIT << M_GET);
546         r->allowed |= (AP_METHOD_BIT << M_POST);
547
548         osrfLogSetAppname("osrf_http_translator");
549         osrfAppSessionSetIngress(TRANSLATOR_INGRESS);
550     testConnection(r);
551     crossOriginHeaders(r, allowedOrigins);
552
553         osrfLogMkXid();
554     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
555     if(trans->body) {
556         stat = osrfHttpTranslatorProcess(trans);
557         //osrfHttpTranslatorDebug(trans);
558         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
559     } else {
560         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
561     }
562     osrfHttpTranslatorFree(trans);
563         return stat;
564 }
565
566
567 static void registerHooks (apr_pool_t *p) {
568         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
569         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
570 }
571
572
573 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
574         STANDARD20_MODULE_STUFF,
575     NULL,
576         NULL,
577     NULL,
578         NULL,
579     osrfHttpTranslatorCmds,
580         registerHooks,
581 };