f829fa0915b811fdaa8c94a4009388586c7944f4
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11 #include <opensrf/string_array.h>
12
13 #define MODULE_NAME "osrf_http_translator_module"
14 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
15 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
16 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
17
18 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
19 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
20 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
21 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
22
23 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
24 #define JSON_CONTENT_TYPE "text/plain"
25 #define MAX_MSGS_PER_PACKET 256
26 #define CACHE_TIME 300
27 #define TRANSLATOR_INGRESS "translator-v1"
28
29 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
30 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
31 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
32 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
33 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
34 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
35 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
36
37
38 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
39 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
40 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
41
42 char* routerName = NULL;
43 char* domainName = NULL;
44 int osrfConnected = 0;
45 char recipientBuf[128];
46 char contentTypeBuf[80];
47
48 #if 0
49 // Commented out to avoid compiler warning
50 // for development only, writes to apache error log
51 static void _dbg(char* s, ...) {
52     VA_LIST_TO_STRING(s);
53     fprintf(stderr, "%s\n", VA_BUF);
54     fflush(stderr);
55 }
56 #endif
57
58 // Translator struct
59 typedef struct {
60     request_rec* apreq;
61     transport_client* handle;
62     osrfList* messages;
63     char* body;
64     char* delim;
65     const char* recipient;
66     const char* service;
67     const char* thread;
68     const char* remoteHost;
69     int complete;
70     int timeout;
71     int multipart;
72     int connectOnly; // there is only 1 message, a CONNECT
73     int disconnectOnly; // there is only 1 message, a DISCONNECT
74     int connecting; // there is a connect message in this batch
75     int disconnecting; // there is a connect message in this batch
76     int localXid;
77 } osrfHttpTranslator;
78
79
80 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
81     configFile = (char*) arg;
82         return NULL;
83 }
84 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
85     configCtx = (char*) arg;
86         return NULL;
87 }
88 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
89     cacheServers = (char*) arg;
90         return NULL;
91 }
92
93 /** set up the configuration handlers */
94 static const command_rec osrfHttpTranslatorCmds[] = {
95         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
96                         NULL, RSRC_CONF, "osrf translator config file"),
97         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
98                         NULL, RSRC_CONF, "osrf translator config file context"),
99         AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
100                         NULL, RSRC_CONF, "osrf translator cache server"),
101     {NULL}
102 };
103
104
105 // there can only be one, so use a global static one
106 static osrfHttpTranslator globalTranslator;
107
108 /*
109  * Constructs a new translator object based on the current apache 
110  * request_rec.  Reads the request body and headers.
111  */
112 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
113     osrfHttpTranslator* trans = &globalTranslator;
114     trans->apreq = apreq;
115     trans->complete = 0;
116     trans->connectOnly = 0;
117     trans->disconnectOnly = 0;
118     trans->connecting = 0;
119     trans->disconnecting = 0;
120 #ifdef APACHE_MIN_24
121     trans->remoteHost = apreq->connection->client_ip;
122 #else
123     trans->remoteHost = apreq->connection->remote_ip;
124 #endif
125     trans->messages = NULL;
126
127     /* load the message body */
128     osrfStringArray* params     = apacheParseParms(apreq);
129     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
130     osrfStringArrayFree(params);
131
132     /* load the request headers */
133     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID))
134         // force our log xid to match the caller
135         osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
136
137     trans->handle = osrfSystemGetTransportClient();
138     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
139     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
140
141     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
142     if(timeout) 
143         trans->timeout = atoi(timeout);
144     else 
145         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
146
147     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
148     if(multipart && !strcasecmp(multipart, "true"))
149         trans->multipart = 1;
150     else
151         trans->multipart = 0;
152
153     char buf[32];
154     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
155     trans->delim = md5sum(buf);
156
157     /* Use thread if it has been passed in; otherwise, just use the delimiter */
158     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
159         ?  apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
160         : (const char*)trans->delim;
161
162     return trans;
163 }
164
165 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
166     if(!trans) return;
167     if(trans->body)
168         free(trans->body);
169     if(trans->delim)
170         free(trans->delim);
171     osrfListFree(trans->messages);
172 }
173
174 #if 0
175 // Commented out to avoid compiler warning
176 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
177     _dbg("-----------------------------------");
178     _dbg("body = %s", trans->body);
179     _dbg("service = %s", trans->service);
180     _dbg("thread = %s", trans->thread);
181     _dbg("multipart = %d", trans->multipart);
182     _dbg("recipient = %s", trans->recipient);
183 }
184 #endif
185
186 /**
187  * Determines the correct recipient address based on the requested 
188  * service or recipient address.  
189  */
190 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
191     int stat = 0;
192     jsonObject* sessionCache = NULL;
193
194     if(trans->service) {
195         if(trans->recipient) {
196             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
197
198         } else {
199             // service is specified, build a recipient address 
200             // from the router, domain, and service
201             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName,
202                 domainName, trans->service);
203             recipientBuf[size] = '\0';
204             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
205             trans->recipient = recipientBuf;
206             stat = 1;
207         }
208
209     } else {
210
211         if(trans->recipient) {
212             sessionCache = osrfCacheGetObject(trans->thread);
213
214             if(sessionCache) {
215                 const char* ipAddr = jsonObjectGetString(
216                     jsonObjectGetKeyConst( sessionCache, "ip" ));
217                 const char* recipient = jsonObjectGetString(
218                     jsonObjectGetKeyConst( sessionCache, "jid" ));
219
220                 // choosing a specific recipient address requires that the recipient and 
221                 // thread be cached on the server (so drone processes cannot be hijacked)
222                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
223                     osrfLogDebug( OSRF_LOG_MARK,
224                         "Found cached session from host %s and recipient %s",
225                         trans->remoteHost, trans->recipient);
226                     stat = 1;
227                     trans->service = apr_pstrdup(
228                         trans->apreq->pool, jsonObjectGetString(
229                             jsonObjectGetKeyConst( sessionCache, "service" )));
230
231                 } else {
232                     osrfLogError(OSRF_LOG_MARK, 
233                         "Session cache for thread %s does not match request", trans->thread);
234                 }
235             }  else {
236                 osrfLogError(OSRF_LOG_MARK, 
237                     "attempt to send directly to %s without a session", trans->recipient);
238             }
239         } else {
240             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
241         } 
242     }
243
244     jsonObjectFree(sessionCache);
245     return stat;
246 }
247
248 /**
249  * Parses the request body, logs any REQUEST messages to the activity log, 
250  * stamps the translator ingress on each message, and returns the updated 
251  * messages as a JSON string.
252  */
253 static char* osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
254     osrfMessage* msg;
255     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
256     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
257     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
258
259     if(numMsgs == 0)
260         return NULL;
261
262     // log request messages to the activity log
263     int i;
264     for(i = 0; i < numMsgs; i++) {
265         msg = msgList[i];
266         osrfMessageSetIngress(msg, TRANSLATOR_INGRESS);
267
268         switch(msg->m_type) {
269
270             case REQUEST: {
271                 const jsonObject* params = msg->_params;
272                 growing_buffer* act = buffer_init(128); 
273                 char* method = msg->method_name;
274                 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "",
275                     trans->service, method);
276
277                 const jsonObject* obj = NULL;
278                 int i = 0;
279                 const char* str;
280                 int redactParams = 0;
281                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
282                     //osrfLogInternal(OSRF_LOG_MARK, "Checking for log protection [%s]", str);
283                     if(!strncmp(method, str, strlen(str))) {
284                         redactParams = 1;
285                         break;
286                     }
287                 }
288                 if(redactParams) {
289                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
290                 } else {
291                     i = 0;
292                     while((obj = jsonObjectGetIndex(params, i++))) {
293                         str = jsonObjectToJSON(obj);
294                         if( i == 1 )
295                             OSRF_BUFFER_ADD(act, " ");
296                         else
297                             OSRF_BUFFER_ADD(act, ", ");
298                         OSRF_BUFFER_ADD(act, str);
299                         free(str);
300                     }
301                 }
302                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
303                 buffer_free(act);
304                 break;
305             }
306
307             case CONNECT:
308                 trans->connecting = 1;
309                 if (numMsgs == 1) 
310                     trans->connectOnly = 1;
311                 break;
312
313             case DISCONNECT:
314                 trans->disconnecting = 1;
315                 if (numMsgs == 1) 
316                     trans->disconnectOnly = 1;
317                 break;
318
319             case RESULT:
320                 osrfLogWarning( OSRF_LOG_MARK, "Unexpected RESULT message received" );
321                 break;
322
323             case STATUS:
324                 osrfLogWarning( OSRF_LOG_MARK, "Unexpected STATUS message received" );
325                 break;
326
327             default:
328                 osrfLogWarning( OSRF_LOG_MARK, "Invalid message type %d received",
329                     msg->m_type );
330                 break;
331         }
332     }
333
334     char* jsonString = osrfMessageSerializeBatch(msgList, numMsgs);
335     for(i = 0; i < numMsgs; i++) {
336         osrfMessageFree(msgList[i]);
337     }
338     return jsonString;
339 }
340
341 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
342     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
343     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
344     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
345     if(numMsgs == 0) return 0;
346
347     osrfMessage* last = omsgList[numMsgs-1];
348     if(last->m_type == STATUS) {
349         if(last->status_code == OSRF_STATUS_TIMEOUT) {
350             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
351             osrfCacheRemove(trans->thread);
352             return 0;
353         }
354         // XXX hm, check for explicit status=COMPLETE message instead??
355         if(last->status_code != OSRF_STATUS_CONTINUE)
356             trans->complete = 1;
357     }
358
359     return 1;
360 }
361
362 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
363     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
364     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
365     if(trans->multipart) {
366         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
367         contentTypeBuf[79] = '\0';
368         osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE,
369         trans->delim, contentTypeBuf);
370         ap_set_content_type(trans->apreq, contentTypeBuf);
371         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
372     } else {
373         ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
374     }
375 }
376
377 /**
378  * Cache the transaction with the JID of the backend process we are talking to
379  */
380 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
381     jsonObject* cacheObj = jsonNewObject(NULL);
382     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
383     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
384     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
385     osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
386 }
387
388
389 /**
390  * Writes a single chunk of multipart/x-mixed-replace content
391  */
392 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
393     osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
394     ap_rprintf(trans->apreq, 
395         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
396     //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n",
397     //JSON_CONTENT_TYPE, msg->body);
398     if(trans->complete) {
399         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
400         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
401     } else {
402         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
403         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
404     }
405     ap_rflush(trans->apreq);
406 }
407
408 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
409     if(trans->body == NULL)
410         return HTTP_BAD_REQUEST;
411
412     if(!osrfHttpTranslatorSetTo(trans))
413         return HTTP_BAD_REQUEST;
414
415     char* jsonBody = osrfHttpTranslatorParseRequest(trans);
416     if (NULL == jsonBody)
417         return HTTP_BAD_REQUEST;
418
419     while(client_recv(trans->handle, 0))
420         continue; // discard any old status messages in the recv queue
421
422     // send the message to the recipient
423     transport_message* tmsg = message_init(
424         jsonBody, NULL, trans->thread, trans->recipient, NULL);
425     message_set_osrf_xid(tmsg, osrfLogGetXid());
426     client_send_message(trans->handle, tmsg);
427     message_free(tmsg); 
428     free(jsonBody);
429
430     if(trans->disconnectOnly) {
431         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
432         osrfCacheRemove(trans->thread);
433         return OK;
434     }
435
436     // process the response from the opensrf service
437     int firstWrite = 1;
438     while(!trans->complete) {
439         transport_message* msg = client_recv(trans->handle, trans->timeout);
440
441         if(trans->handle->error) {
442             osrfLogError(OSRF_LOG_MARK, "Transport error");
443             osrfCacheRemove(trans->thread);
444             return HTTP_INTERNAL_SERVER_ERROR;
445         }
446
447         if(msg == NULL)
448             return HTTP_GATEWAY_TIME_OUT;
449
450         if(msg->is_error) {
451             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
452             osrfCacheRemove(trans->thread);
453             return HTTP_NOT_FOUND;
454         }
455
456         if(!osrfHttpTranslatorCheckStatus(trans, msg))
457             continue;
458
459         if(firstWrite) {
460             osrfHttpTranslatorInitHeaders(trans, msg);
461             if(trans->connecting)
462                 osrfHttpTranslatorCacheSession(trans, msg->sender);
463             firstWrite = 0;
464         }
465
466         if(trans->multipart) {
467             osrfHttpTranslatorWriteChunk(trans, msg);
468             if(trans->connectOnly)
469                 break;
470         } else {
471             if(!trans->messages)
472                 trans->messages = osrfNewList();
473             osrfListPush(trans->messages, msg->body);
474
475             if(trans->complete || trans->connectOnly) {
476                 growing_buffer* buf = buffer_init(128);
477                 unsigned int i;
478                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
479                 for(i = 1; i < trans->messages->size; i++) {
480                     buffer_chomp(buf); // chomp off the closing array bracket
481                     char* body = osrfListGetIndex(trans->messages, i);
482                     char newbuf[strlen(body)];
483                     sprintf(newbuf, body+1); // chomp off the opening array bracket
484                     OSRF_BUFFER_ADD_CHAR(buf, ',');
485                     OSRF_BUFFER_ADD(buf, newbuf);
486                 }
487
488                 ap_rputs(buf->buf, trans->apreq);
489                 buffer_free(buf);
490             }
491         }
492     }
493
494     if(trans->disconnecting) // DISCONNECT within a multi-message batch
495         osrfCacheRemove(trans->thread);
496
497     return OK;
498 }
499
500 static void testConnection(request_rec* r) {
501         if(!osrfConnected || !osrfSystemGetTransportClient()) {
502         osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
503                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
504                 usleep(100000); // .1 second to prevent process die/start overload
505                 exit(1);
506         }
507 }
508
509 #if 0
510 // Commented out to avoid compiler warning
511 // it's dead, Jim
512 static apr_status_t childExit(void* data) {
513     osrf_system_shutdown();
514     return OK;
515 }
516 #endif
517
518 static void childInit(apr_pool_t *p, server_rec *s) {
519         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
520                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
521                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
522                 return;
523         }
524
525     routerName = osrfConfigGetValue(NULL, "/router_name");
526     domainName = osrfConfigGetValue(NULL, "/domain");
527     const char* servers[] = {cacheServers};
528     osrfCacheInit(servers, 1, 86400);
529         osrfConnected = 1;
530
531     // at pool destroy time (= child exit time), cleanup
532     // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
533     //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
534 }
535
536 static int handler(request_rec *r) {
537     int stat = OK;
538         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
539     if(r->header_only) return stat;
540
541         r->allowed |= (AP_METHOD_BIT << M_GET);
542         r->allowed |= (AP_METHOD_BIT << M_POST);
543
544         osrfLogSetAppname("osrf_http_translator");
545         osrfAppSessionSetIngress(TRANSLATOR_INGRESS);
546     testConnection(r);
547
548         osrfLogMkXid();
549     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
550     if(trans->body) {
551         stat = osrfHttpTranslatorProcess(trans);
552         //osrfHttpTranslatorDebug(trans);
553         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
554     } else {
555         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
556     }
557     osrfHttpTranslatorFree(trans);
558         return stat;
559 }
560
561
562 static void registerHooks (apr_pool_t *p) {
563         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
564         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
565 }
566
567
568 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
569         STANDARD20_MODULE_STUFF,
570     NULL,
571         NULL,
572     NULL,
573         NULL,
574     osrfHttpTranslatorCmds,
575         registerHooks,
576 };