2 #include <sys/resource.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11 #include <opensrf/string_array.h>
13 #define MODULE_NAME "osrf_http_translator_module"
14 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
15 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
16 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
18 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
19 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
20 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
21 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
23 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
24 #define JSON_CONTENT_TYPE "text/plain"
25 #define MAX_MSGS_PER_PACKET 256
26 #define CACHE_TIME 300
27 #define TRANSLATOR_INGRESS "translator-v1"
29 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
30 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
31 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
32 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
33 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
34 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
35 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
38 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
39 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
40 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
42 char* routerName = NULL;
43 char* domainName = NULL;
44 int osrfConnected = 0;
45 char recipientBuf[128];
46 char contentTypeBuf[80];
47 osrfStringArray* allowedOrigins = NULL;
50 // Commented out to avoid compiler warning
51 // for development only, writes to apache error log
52 static void _dbg(char* s, ...) {
54 fprintf(stderr, "%s\n", VA_BUF);
62 transport_client* handle;
66 const char* recipient;
69 const char* remoteHost;
73 int connectOnly; // there is only 1 message, a CONNECT
74 int disconnectOnly; // there is only 1 message, a DISCONNECT
75 int connecting; // there is a connect message in this batch
76 int disconnecting; // there is a connect message in this batch
81 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
82 configFile = (char*) arg;
85 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
86 configCtx = (char*) arg;
89 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
90 cacheServers = (char*) arg;
94 /** set up the configuration handlers */
95 static const command_rec osrfHttpTranslatorCmds[] = {
96 AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
97 NULL, RSRC_CONF, "osrf translator config file"),
98 AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
99 NULL, RSRC_CONF, "osrf translator config file context"),
100 AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
101 NULL, RSRC_CONF, "osrf translator cache server"),
106 // there can only be one, so use a global static one
107 static osrfHttpTranslator globalTranslator;
110 * Constructs a new translator object based on the current apache
111 * request_rec. Reads the request body and headers.
113 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
114 osrfHttpTranslator* trans = &globalTranslator;
115 trans->apreq = apreq;
117 trans->connectOnly = 0;
118 trans->disconnectOnly = 0;
119 trans->connecting = 0;
120 trans->disconnecting = 0;
122 trans->remoteHost = apreq->useragent_ip;
124 trans->remoteHost = apreq->connection->remote_ip;
126 trans->messages = NULL;
128 /* load the message body */
129 osrfStringArray* params = apacheParseParms(apreq);
130 trans->body = apacheGetFirstParamValue(params, "osrf-msg");
131 osrfStringArrayFree(params);
133 /* load the request headers */
134 if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID))
135 // force our log xid to match the caller
136 osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
138 trans->handle = osrfSystemGetTransportClient();
139 trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
140 trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
142 const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
144 trans->timeout = atoi(timeout);
146 trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
148 const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
149 if(multipart && !strcasecmp(multipart, "true"))
150 trans->multipart = 1;
152 trans->multipart = 0;
155 snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
156 trans->delim = md5sum(buf);
158 /* Use thread if it has been passed in; otherwise, just use the delimiter */
159 trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
160 ? apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
161 : (const char*)trans->delim;
166 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
172 osrfListFree(trans->messages);
176 // Commented out to avoid compiler warning
177 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
178 _dbg("-----------------------------------");
179 _dbg("body = %s", trans->body);
180 _dbg("service = %s", trans->service);
181 _dbg("thread = %s", trans->thread);
182 _dbg("multipart = %d", trans->multipart);
183 _dbg("recipient = %s", trans->recipient);
188 * Determines the correct recipient address based on the requested
189 * service or recipient address.
191 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
193 jsonObject* sessionCache = NULL;
196 if(trans->recipient) {
197 osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
200 // service is specified, build a recipient address
201 // from the router, domain, and service
202 int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName,
203 domainName, trans->service);
204 recipientBuf[size] = '\0';
205 osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
206 trans->recipient = recipientBuf;
212 if(trans->recipient) {
213 sessionCache = osrfCacheGetObject(trans->thread);
216 const char* ipAddr = jsonObjectGetString(
217 jsonObjectGetKeyConst( sessionCache, "ip" ));
218 const char* recipient = jsonObjectGetString(
219 jsonObjectGetKeyConst( sessionCache, "jid" ));
221 // choosing a specific recipient address requires that the recipient and
222 // thread be cached on the server (so drone processes cannot be hijacked)
223 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
224 osrfLogDebug( OSRF_LOG_MARK,
225 "Found cached session from host %s and recipient %s",
226 trans->remoteHost, trans->recipient);
228 trans->service = apr_pstrdup(
229 trans->apreq->pool, jsonObjectGetString(
230 jsonObjectGetKeyConst( sessionCache, "service" )));
233 osrfLogError(OSRF_LOG_MARK,
234 "Session cache for thread %s does not match request", trans->thread);
237 osrfLogError(OSRF_LOG_MARK,
238 "attempt to send directly to %s without a session", trans->recipient);
241 osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
245 jsonObjectFree(sessionCache);
250 * Parses the request body, logs any REQUEST messages to the activity log,
251 * stamps the translator ingress on each message, and returns the updated
252 * messages as a JSON string.
254 static char* osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
256 osrfMessage* msgList[MAX_MSGS_PER_PACKET];
257 int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
258 osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
263 // log request messages to the activity log
265 for(i = 0; i < numMsgs; i++) {
267 osrfMessageSetIngress(msg, TRANSLATOR_INGRESS);
269 switch(msg->m_type) {
272 const jsonObject* params = msg->_params;
273 growing_buffer* act = buffer_init(128);
274 char* method = msg->method_name;
275 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "",
276 trans->service, method);
278 const jsonObject* obj = NULL;
281 int redactParams = 0;
282 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
283 //osrfLogInternal(OSRF_LOG_MARK, "Checking for log protection [%s]", str);
284 if(!strncmp(method, str, strlen(str))) {
290 OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
293 while((obj = jsonObjectGetIndex(params, i++))) {
294 str = jsonObjectToJSON(obj);
296 OSRF_BUFFER_ADD(act, " ");
298 OSRF_BUFFER_ADD(act, ", ");
299 OSRF_BUFFER_ADD(act, str);
303 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
309 trans->connecting = 1;
311 trans->connectOnly = 1;
315 trans->disconnecting = 1;
317 trans->disconnectOnly = 1;
321 osrfLogWarning( OSRF_LOG_MARK, "Unexpected RESULT message received" );
325 osrfLogWarning( OSRF_LOG_MARK, "Unexpected STATUS message received" );
329 osrfLogWarning( OSRF_LOG_MARK, "Invalid message type %d received",
335 char* jsonString = osrfMessageSerializeBatch(msgList, numMsgs);
336 for(i = 0; i < numMsgs; i++) {
337 osrfMessageFree(msgList[i]);
342 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
343 osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
344 int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
345 osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
346 if(numMsgs == 0) return 0;
348 osrfMessage* last = omsgList[numMsgs-1];
349 if(last->m_type == STATUS) {
350 if(last->status_code == OSRF_STATUS_TIMEOUT) {
351 osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
352 osrfCacheRemove(trans->thread);
355 // XXX hm, check for explicit status=COMPLETE message instead??
356 if(last->status_code != OSRF_STATUS_CONTINUE)
363 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
364 apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
365 apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
366 if(trans->multipart) {
367 sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
368 contentTypeBuf[79] = '\0';
369 osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE,
370 trans->delim, contentTypeBuf);
371 ap_set_content_type(trans->apreq, contentTypeBuf);
372 ap_rprintf(trans->apreq, "--%s\n", trans->delim);
374 ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
379 * Cache the transaction with the JID of the backend process we are talking to
381 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
382 jsonObject* cacheObj = jsonNewObject(NULL);
383 jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
384 jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
385 jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
386 osrfCachePutObject(trans->thread, cacheObj, CACHE_TIME);
391 * Writes a single chunk of multipart/x-mixed-replace content
393 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
394 osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
395 ap_rprintf(trans->apreq,
396 "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
397 //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n",
398 //JSON_CONTENT_TYPE, msg->body);
399 if(trans->complete) {
400 ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
401 //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
403 ap_rprintf(trans->apreq, "--%s\n", trans->delim);
404 //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
406 ap_rflush(trans->apreq);
409 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
410 if(trans->body == NULL)
411 return HTTP_BAD_REQUEST;
413 if(!osrfHttpTranslatorSetTo(trans))
414 return HTTP_BAD_REQUEST;
416 char* jsonBody = osrfHttpTranslatorParseRequest(trans);
417 if (NULL == jsonBody)
418 return HTTP_BAD_REQUEST;
420 while(client_recv(trans->handle, 0))
421 continue; // discard any old status messages in the recv queue
423 // send the message to the recipient
424 transport_message* tmsg = message_init(
425 jsonBody, NULL, trans->thread, trans->recipient, NULL);
426 message_set_osrf_xid(tmsg, osrfLogGetXid());
427 client_send_message(trans->handle, tmsg);
431 if(trans->disconnectOnly) {
432 osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
433 osrfCacheRemove(trans->thread);
437 // process the response from the opensrf service
439 while(!trans->complete) {
440 transport_message* msg = client_recv(trans->handle, trans->timeout);
442 if(trans->handle->error) {
443 osrfLogError(OSRF_LOG_MARK, "Transport error");
444 osrfCacheRemove(trans->thread);
445 return HTTP_INTERNAL_SERVER_ERROR;
449 return HTTP_GATEWAY_TIME_OUT;
452 osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
453 osrfCacheRemove(trans->thread);
454 return HTTP_NOT_FOUND;
457 if(!osrfHttpTranslatorCheckStatus(trans, msg))
461 osrfHttpTranslatorInitHeaders(trans, msg);
462 if(trans->connecting)
463 osrfHttpTranslatorCacheSession(trans, msg->sender);
467 if(trans->multipart) {
468 osrfHttpTranslatorWriteChunk(trans, msg);
469 if(trans->connectOnly)
473 trans->messages = osrfNewList();
474 osrfListPush(trans->messages, msg->body);
476 if(trans->complete || trans->connectOnly) {
477 growing_buffer* buf = buffer_init(128);
479 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
480 for(i = 1; i < trans->messages->size; i++) {
481 buffer_chomp(buf); // chomp off the closing array bracket
482 char* body = osrfListGetIndex(trans->messages, i);
483 char newbuf[strlen(body)];
484 sprintf(newbuf, "%s", body+1); // chomp off the opening array bracket
485 OSRF_BUFFER_ADD_CHAR(buf, ',');
486 OSRF_BUFFER_ADD(buf, newbuf);
489 ap_rputs(buf->buf, trans->apreq);
495 if(trans->disconnecting) // DISCONNECT within a multi-message batch
496 osrfCacheRemove(trans->thread);
501 static void testConnection(request_rec* r) {
502 if(!osrfConnected || !osrfSystemGetTransportClient()) {
503 osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
504 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
505 usleep(100000); // .1 second to prevent process die/start overload
511 // Commented out to avoid compiler warning
513 static apr_status_t childExit(void* data) {
514 osrf_system_shutdown();
519 static void childInit(apr_pool_t *p, server_rec *s) {
520 if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
521 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s,
522 "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
526 routerName = osrfConfigGetValue(NULL, "/router_name");
527 domainName = osrfConfigGetValue(NULL, "/domain");
528 const char* servers[] = {cacheServers};
529 osrfCacheInit(servers, 1, 86400);
532 allowedOrigins = osrfNewStringArray(4);
533 osrfConfigGetValueList(NULL, allowedOrigins, "/cross_origin/origin");
535 // at pool destroy time (= child exit time), cleanup
536 // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
537 //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
540 static int handler(request_rec *r) {
542 if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
543 if(r->header_only) return stat;
545 r->allowed |= (AP_METHOD_BIT << M_GET);
546 r->allowed |= (AP_METHOD_BIT << M_POST);
548 osrfLogSetAppname("osrf_http_translator");
549 osrfAppSessionSetIngress(TRANSLATOR_INGRESS);
551 crossOriginHeaders(r, allowedOrigins);
554 osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
556 stat = osrfHttpTranslatorProcess(trans);
557 //osrfHttpTranslatorDebug(trans);
558 osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
560 osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
562 osrfHttpTranslatorFree(trans);
567 static void registerHooks (apr_pool_t *p) {
568 ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
569 ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
573 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
574 STANDARD20_MODULE_STUFF,
579 osrfHttpTranslatorCmds,