2 #include <sys/resource.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
17 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
18 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
19 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
20 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
22 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
23 #define JSON_CONTENT_TYPE "text/plain"
24 #define MAX_MSGS_PER_PACKET 256
25 #define CACHE_TIME 300
27 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
28 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
29 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
30 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
31 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
32 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
33 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
35 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
36 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
37 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
39 char* routerName = NULL;
40 char* domainName = NULL;
41 int osrfConnected = 0;
42 char recipientBuf[128];
43 char contentTypeBuf[80];
45 // for development only, writes to apache error log
46 static void _dbg(char* s, ...) {
48 fprintf(stderr, "%s\n", VA_BUF);
55 transport_client* handle;
59 const char* recipient;
62 const char* remoteHost;
66 int connectOnly; // there is only 1 message, a CONNECT
67 int disconnectOnly; // there is only 1 message, a DISCONNECT
68 int connecting; // there is a connect message in this batch
69 int disconnecting; // there is a connect message in this batch
74 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
75 configFile = (char*) arg;
78 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
79 configCtx = (char*) arg;
82 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
83 cacheServers = (char*) arg;
87 /** set up the configuratoin handlers */
88 static const command_rec osrf_json_gateway_cmds[] = {
89 AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
90 NULL, RSRC_CONF, "osrf translator config file"),
91 AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
92 NULL, RSRC_CONF, "osrf translator config file context"),
93 AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
94 NULL, RSRC_CONF, "osrf translator cache server"),
99 // there can only be one, so use a global static one
100 static osrfHttpTranslator globalTranslator;
103 * Constructs a new translator object based on the current apache
104 * request_rec. Reads the request body and headers.
106 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
107 osrfHttpTranslator* trans = &globalTranslator;
108 trans->apreq = apreq;
110 trans->connectOnly = 0;
111 trans->disconnectOnly = 0;
112 trans->connecting = 0;
113 trans->disconnecting = 0;
114 trans->remoteHost = apreq->connection->remote_ip;
115 trans->messages = NULL;
117 /* load the message body */
118 osrfStringArray* params = apacheParseParms(apreq);
119 trans->body = apacheGetFirstParamValue(params, "osrf-msg");
120 osrfStringArrayFree(params);
122 /* load the request headers */
123 if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)) // force our log xid to match the caller
124 osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
126 trans->handle = osrfSystemGetTransportClient();
127 trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
128 trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
129 trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */
131 const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
133 trans->timeout = atoi(timeout);
135 trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
137 const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
138 if(multipart && !strcasecmp(multipart, "true"))
139 trans->multipart = 1;
141 trans->multipart = 0;
144 snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
145 trans->delim = md5sum(buf);
150 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
156 osrfListFree(trans->messages);
159 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
160 _dbg("-----------------------------------");
161 _dbg("body = %s", trans->body);
162 _dbg("service = %s", trans->service);
163 _dbg("thread = %s", trans->thread);
164 _dbg("multipart = %d", trans->multipart);
165 _dbg("recipient = %s", trans->recipient);
169 * Determines the correct recipient address based on the requested
170 * service or recipient address.
172 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
174 jsonObject* sessionCache = NULL;
177 if(trans->recipient) {
178 osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
181 // service is specified, build a recipient address
182 // from the router, domain, and service
183 int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
184 recipientBuf[size] = '\0';
185 osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
186 trans->recipient = recipientBuf;
192 if(trans->recipient) {
193 sessionCache = osrfCacheGetObject(trans->thread);
196 char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip"));
197 char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid"));
199 // choosing a specific recipient address requires that the recipient and
200 // thread be cached on the server (so drone processes cannot be hijacked)
201 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
202 osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s",
203 trans->remoteHost, trans->recipient);
205 trans->service = apr_pstrdup(
206 trans->apreq->pool, jsonObjectGetString(jsonObjectGetKey(sessionCache, "service")));
209 osrfLogError(OSRF_LOG_MARK,
210 "Session cache for thread %s does not match request", trans->thread);
213 osrfLogError(OSRF_LOG_MARK,
214 "attempt to send directly to %s without a session", trans->recipient);
217 osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
221 jsonObjectFree(sessionCache);
226 * Parses the request body and logs any REQUEST messages to the activity log
228 static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
230 osrfMessage* msgList[MAX_MSGS_PER_PACKET];
231 int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
232 osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
239 if(msg->m_type == CONNECT) {
240 trans->connectOnly = 1;
241 trans->connecting = 1;
244 if(msg->m_type == DISCONNECT) {
245 trans->disconnectOnly = 1;
246 trans->disconnecting = 1;
251 // log request messages to the activity log
253 for(i = 0; i < numMsgs; i++) {
256 switch(msg->m_type) {
259 jsonObject* params = msg->_params;
260 growing_buffer* act = buffer_init(128);
261 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
265 while((str = jsonObjectGetString(jsonObjectGetIndex(params, i++)))) {
267 OSRF_BUFFER_ADD(act, " ");
269 OSRF_BUFFER_ADD(act, ", ");
270 OSRF_BUFFER_ADD(act, str);
272 osrfLogActivity(OSRF_LOG_MARK, act->buf);
278 trans->connecting = 1;
282 trans->disconnecting = 1;
290 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
291 osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
292 int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
293 osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
294 if(numMsgs == 0) return 0;
296 osrfMessage* last = omsgList[numMsgs-1];
297 if(last->m_type == STATUS) {
298 if(last->status_code == OSRF_STATUS_TIMEOUT) {
299 osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
300 osrfCacheRemove(trans->thread);
303 // XXX hm, check for explicit status=COMPLETE message instead??
304 if(last->status_code != OSRF_STATUS_CONTINUE)
311 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
312 apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
313 apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
314 if(trans->multipart) {
315 sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
316 contentTypeBuf[79] = '\0';
317 osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE, trans->delim, contentTypeBuf);
318 ap_set_content_type(trans->apreq, contentTypeBuf);
319 ap_rprintf(trans->apreq, "--%s\n", trans->delim);
321 ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
326 * Cache the transaction with the JID of the backend process we are talking to
328 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
329 jsonObject* cacheObj = jsonNewObject(NULL);
330 jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
331 jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
332 jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
333 osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
338 * Writes a single chunk of multipart/x-mixed-replace content
340 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
341 osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
342 ap_rprintf(trans->apreq,
343 "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
344 //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
345 if(trans->complete) {
346 ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
347 //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
349 ap_rprintf(trans->apreq, "--%s\n", trans->delim);
350 //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
352 ap_rflush(trans->apreq);
355 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
356 if(trans->body == NULL)
357 return HTTP_BAD_REQUEST;
359 if(!osrfHttpTranslatorSetTo(trans))
360 return HTTP_BAD_REQUEST;
362 if(!osrfHttpTranslatorParseRequest(trans))
363 return HTTP_BAD_REQUEST;
365 while(client_recv(trans->handle, 0))
366 continue; // discard any old status messages in the recv queue
368 // send the message to the recipient
369 transport_message* tmsg = message_init(
370 trans->body, NULL, trans->thread, trans->recipient, NULL);
371 message_set_osrf_xid(tmsg, osrfLogGetXid());
372 client_send_message(trans->handle, tmsg);
375 if(trans->disconnectOnly) {
376 osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
377 osrfCacheRemove(trans->thread);
381 // process the response from the opensrf service
383 while(!trans->complete) {
384 transport_message* msg = client_recv(trans->handle, trans->timeout);
386 if(trans->handle->error) {
387 osrfLogError(OSRF_LOG_MARK, "Transport error");
388 osrfCacheRemove(trans->thread);
389 return HTTP_INTERNAL_SERVER_ERROR;
393 return HTTP_GATEWAY_TIME_OUT;
396 osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
397 osrfCacheRemove(trans->thread);
398 return HTTP_NOT_FOUND;
401 if(!osrfHttpTranslatorCheckStatus(trans, msg))
405 osrfHttpTranslatorInitHeaders(trans, msg);
406 if(trans->connecting)
407 osrfHttpTranslatorCacheSession(trans, msg->sender);
411 if(trans->multipart) {
412 osrfHttpTranslatorWriteChunk(trans, msg);
413 if(trans->connectOnly)
417 trans->messages = osrfNewList();
418 osrfListPush(trans->messages, msg->body);
420 if(trans->complete || trans->connectOnly) {
421 growing_buffer* buf = buffer_init(128);
423 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
424 for(i = 1; i < trans->messages->size; i++) {
425 buffer_chomp(buf); // chomp off the closing array bracket
426 char* body = osrfListGetIndex(trans->messages, i);
427 char newbuf[strlen(body)];
428 sprintf(newbuf, body+1); // chomp off the opening array bracket
429 OSRF_BUFFER_ADD_CHAR(buf, ',');
430 OSRF_BUFFER_ADD(buf, newbuf);
433 ap_rputs(buf->buf, trans->apreq);
439 if(trans->disconnecting) // DISCONNECT within a multi-message batch
440 osrfCacheRemove(trans->thread);
445 static void testConnection(request_rec* r) {
446 if(!osrfConnected || !osrfSystemGetTransportClient()) {
447 osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
448 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
449 usleep(100000); // .1 second to prevent process die/start overload
455 static apr_status_t childExit(void* data) {
456 osrf_system_shutdown();
460 static void childInit(apr_pool_t *p, server_rec *s) {
461 if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
462 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s,
463 "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
467 routerName = osrfConfigGetValue(NULL, "/router_name");
468 domainName = osrfConfigGetValue(NULL, "/domain");
469 const char* servers[] = {cacheServers};
470 osrfCacheInit(servers, 1, 86400);
473 // at pool destroy time (= child exit time), cleanup
474 // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
475 //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
478 static int handler(request_rec *r) {
480 if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
481 if(r->header_only) return stat;
483 r->allowed |= (AP_METHOD_BIT << M_GET);
484 r->allowed |= (AP_METHOD_BIT << M_POST);
486 osrfLogSetAppname("osrf_http_translator");
490 osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
492 stat = osrfHttpTranslatorProcess(trans);
493 //osrfHttpTranslatorDebug(trans);
494 osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
496 osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
498 osrfHttpTranslatorFree(trans);
503 static void registerHooks (apr_pool_t *p) {
504 ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
505 ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
509 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
510 STANDARD20_MODULE_STUFF,