2 #include <sys/resource.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
16 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
17 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
18 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
20 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
21 #define JSON_CONTENT_TYPE "text/plain"
22 #define CACHE_TIME 300
24 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
25 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
26 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
27 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
28 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
29 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
30 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
31 #define MAX_MSGS_PER_PACKET 256
33 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
34 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
35 char* routerName = NULL;
36 char* domainName = NULL;
37 int osrfConnected = 0;
38 char recipientBuf[128];
40 // for development only, writes to apache error log
41 static void _dbg(char* s, ...) {
43 fprintf(stderr, "%s\n", VA_BUF);
50 transport_client* handle;
54 const char* recipient;
57 const char* remoteHost;
67 * Constructs a new translator object based on the current apache
68 * request_rec. Reads the request body and headers.
70 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
71 osrfHttpTranslator* trans;
72 OSRF_MALLOC(trans, sizeof(osrfHttpTranslator));
75 trans->connectOnly = 0;
76 trans->disconnectOnly = 0;
77 trans->remoteHost = apreq->connection->remote_ip;
78 trans->messages = NULL;
80 /* load the message body */
81 osrfStringArray* params = apacheParseParms(apreq);
82 trans->body = apacheGetFirstParamValue(params, "osrf-msg");
83 osrfStringArrayFree(params);
85 /* load the request headers */
86 if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID) ) {
92 trans->handle = osrfSystemGetTransportClient();
93 trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
94 trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
95 trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */
97 const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
99 trans->timeout = atoi(timeout);
101 trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
103 const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
104 if(multipart && !strcasecmp(multipart, "true"))
105 trans->multipart = 1;
107 trans->multipart = 0;
110 snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
111 trans->delim = md5sum(buf);
116 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
122 osrfListFree(trans->messages);
126 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
127 _dbg("-----------------------------------");
128 _dbg("body = %s", trans->body);
129 _dbg("service = %s", trans->service);
130 _dbg("thread = %s", trans->thread);
131 _dbg("multipart = %d", trans->multipart);
132 _dbg("recipient = %s", trans->recipient);
136 * Determines the correct recipient address based on the requested
137 * service or recipient address.
139 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
141 jsonObject* sessionCache = NULL;
144 if(trans->recipient) {
145 osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
148 // service is specified, build a recipient address
149 // from the router, domain, and service
150 int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
151 recipientBuf[size] = '\0';
152 osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
153 trans->recipient = recipientBuf;
159 if(trans->recipient) {
160 sessionCache = osrfCacheGetObject(trans->thread);
163 char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip"));
164 char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid"));
166 // choosing a specific recipient address requires that the recipient and
167 // thread be cached on the server (so drone processes cannot be hijacked)
168 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
169 osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s",
170 trans->remoteHost, trans->recipient);
172 trans->service = jsonObjectGetString(jsonObjectGetKey(sessionCache, "service"));
175 osrfLogError(OSRF_LOG_MARK,
176 "Session cache for thread %s does not match request", trans->thread);
179 osrfLogError(OSRF_LOG_MARK,
180 "attempt to send directly to %s without a session", trans->recipient);
183 osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
187 jsonObjectFree(sessionCache);
192 * Parses the request body and logs any REQUEST messages to the activity log
194 static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
196 osrfMessage* msgList[MAX_MSGS_PER_PACKET];
197 int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
198 osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
205 if(msg->m_type == CONNECT) {
206 trans->connectOnly = 1;
209 if(msg->m_type == DISCONNECT) {
210 trans->disconnectOnly = 1;
215 // log request messages to the activity log
217 for(i = 0; i < numMsgs; i++) {
219 if(msg->m_type == REQUEST) {
221 jsonObject* params = msg->_params;
222 growing_buffer* act = buffer_init(128);
223 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
227 while((str = jsonObjectGetString(jsonObjectGetIndex(params, i++)))) {
229 OSRF_BUFFER_ADD(act, " ");
231 OSRF_BUFFER_ADD(act, ", ");
232 OSRF_BUFFER_ADD(act, str);
234 osrfLogActivity(OSRF_LOG_MARK, act->buf);
242 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
243 osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
244 int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
245 osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
246 if(numMsgs == 0) return 0;
248 osrfMessage* last = omsgList[numMsgs-1];
249 if(last->m_type == STATUS) {
250 if(last->status_code == OSRF_STATUS_TIMEOUT) {
251 osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
252 osrfCacheRemove(trans->thread);
255 // XXX hm, check for explicit status=COMPLETE message instead??
256 if(last->status_code != OSRF_STATUS_CONTINUE)
263 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
264 apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
265 apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
266 if(trans->multipart) {
267 char buf[strlen(MULTIPART_CONTENT_TYPE) + strlen(trans->delim) + 1];
268 sprintf(buf, MULTIPART_CONTENT_TYPE, trans->delim);
269 ap_set_content_type(trans->apreq, buf);
270 ap_rprintf(trans->apreq, "--%s\n", trans->delim);
272 ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
276 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans) {
277 jsonObject* cacheObj = jsonNewObject(NULL);
278 jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
279 jsonObjectSetKey(cacheObj, "jid", jsonNewObject(trans->recipient));
280 jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
281 osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
286 * Writes a single chunk of multipart/x-mixed-replace content
288 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
289 ap_rprintf(trans->apreq,
290 "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
292 ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
294 ap_rprintf(trans->apreq, "--%s\n", trans->delim);
295 ap_rflush(trans->apreq);
298 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
299 if(trans->body == NULL)
300 return HTTP_BAD_REQUEST;
302 if(!osrfHttpTranslatorSetTo(trans))
303 return HTTP_BAD_REQUEST;
305 if(!osrfHttpTranslatorParseRequest(trans))
306 return HTTP_BAD_REQUEST;
308 while(client_recv(trans->handle, 0))
309 continue; // discard any old status messages in the recv queue
311 // send the message to the recipient
312 transport_message* tmsg = message_init(
313 trans->body, NULL, trans->thread, trans->recipient, NULL);
314 client_send_message(trans->handle, tmsg);
317 if(trans->disconnectOnly) {
318 osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
322 // process the response from the opensrf service
324 while(!trans->complete) {
325 transport_message* msg = client_recv(trans->handle, trans->timeout);
327 if(trans->handle->error) {
328 osrfLogError(OSRF_LOG_MARK, "Transport error");
329 return HTTP_NOT_FOUND;
333 return HTTP_GATEWAY_TIME_OUT;
336 osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
337 return HTTP_NOT_FOUND;
340 if(!osrfHttpTranslatorCheckStatus(trans, msg))
344 osrfHttpTranslatorInitHeaders(trans, msg);
345 osrfHttpTranslatorCacheSession(trans);
349 if(trans->multipart) {
350 osrfHttpTranslatorWriteChunk(trans, msg);
351 if(trans->connectOnly)
355 trans->messages = osrfNewList();
356 osrfListPush(trans->messages, msg->body);
358 if(trans->complete || trans->connectOnly) {
359 growing_buffer* buf = buffer_init(128);
361 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
362 for(i = 1; i < trans->messages->size; i++) {
363 // yay! string mangling
372 static void testConnection(request_rec* r) {
373 if(!osrfConnected || !osrfSystemGetTransportClient()) {
374 osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
375 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
376 usleep(100000); // .1 second to prevent process die/start overload
382 static apr_status_t childExit(void* data) {
383 osrf_system_shutdown();
387 static void childInit(apr_pool_t *p, server_rec *s) {
388 if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
389 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s,
390 "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
394 routerName = osrfConfigGetValue(NULL, "/router_name");
395 domainName = osrfConfigGetValue(NULL, "/domain");
396 // ---------------------
397 // XXX initialize the cache from the Apache settings
398 const char* servers[] = {"127.0.0.1:11211"};
399 osrfCacheInit(servers, 1, 86400);
400 // ---------------------
403 // at pool destroy time (= child exit time), cleanup
404 apr_pool_cleanup_register(p, NULL, childExit, NULL);
407 static int handler(request_rec *r) {
409 if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
410 if(r->header_only) return stat;
412 r->allowed |= (AP_METHOD_BIT << M_GET);
413 r->allowed |= (AP_METHOD_BIT << M_POST);
415 osrfLogSetAppname("osrf_http_translator");
418 osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
420 stat = osrfHttpTranslatorProcess(trans);
421 osrfHttpTranslatorDebug(trans);
422 osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
424 osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
426 osrfHttpTranslatorFree(trans);
431 static void registerHooks (apr_pool_t *p) {
432 ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
433 ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
437 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
438 STANDARD20_MODULE_STUFF,