From 6ec1c42840233c261cb368a6932be5047a89bfd1 Mon Sep 17 00:00:00 2001 From: erickson Date: Mon, 7 Jul 2008 15:40:05 +0000 Subject: [PATCH] initial C version of the opensrf http translator. not complete. I have not plugged it into the build env. git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1359 9efc2488-bf62-4759-914b-345cdb29e865 --- src/gateway/osrf_http_translator.c | 460 +++++++++++++++++++++++++++++ 1 file changed, 460 insertions(+) create mode 100644 src/gateway/osrf_http_translator.c diff --git a/src/gateway/osrf_http_translator.c b/src/gateway/osrf_http_translator.c new file mode 100644 index 0000000..d78101e --- /dev/null +++ b/src/gateway/osrf_http_translator.c @@ -0,0 +1,460 @@ +#include +#include +#include +#include +#include "apachetools.h" +#include +#include +#include +#include +#include + +#define MODULE_NAME "osrf_http_translator_module" +#define TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig" +#define TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext" +#define TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer" +#define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway" +#define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml" +#define DEFAULT_TRANSLATOR_TIMEOUT 1200 + +#define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\"" +#define JSON_CONTENT_TYPE "text/plain" +#define CACHE_TIME 300 + +#define OSRF_HTTP_HEADER_TO "X-OpenSRF-to" +#define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid" +#define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from" +#define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread" +#define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout" +#define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service" +#define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart" +#define MAX_MSGS_PER_PACKET 256 + +char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE; +char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX; +char* routerName = NULL; +char* domainName = NULL; +int osrfConnected = 0; +char recipientBuf[128]; + +// for development only, writes to apache error log +static void _dbg(char* s, ...) { + VA_LIST_TO_STRING(s); + fprintf(stderr, "%s\n", VA_BUF); + fflush(stderr); +} + +// Translator struct +typedef struct { + request_rec* apreq; + transport_client* handle; + osrfList* messages; + char* body; + char* delim; + const char* recipient; + const char* service; + const char* thread; + const char* remoteHost; + int complete; + int timeout; + int multipart; + int connectOnly; + int disconnectOnly; + int localXid; +} osrfHttpTranslator; + +/* + * Constructs a new translator object based on the current apache + * request_rec. Reads the request body and headers. + */ +static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) { + osrfHttpTranslator* trans; + OSRF_MALLOC(trans, sizeof(trans)); + trans->apreq = apreq; + trans->complete = 0; + trans->connectOnly = 0; + trans->disconnectOnly = 0; + trans->remoteHost = apreq->connection->remote_ip; + trans->messages = NULL; + + /* load the message body */ + osrfStringArray* params = apacheParseParms(apreq); + trans->body = apacheGetFirstParamValue(params, "osrf-msg"); + osrfStringArrayFree(params); + + /* load the request headers */ + if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID) ) { + trans->localXid = 0; + } else { + trans->localXid = 1; + } + + trans->handle = osrfSystemGetTransportClient(); + trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO); + trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE); + trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */ + + const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT); + if(timeout) + trans->timeout = atoi(timeout); + else + trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT; + + const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART); + if(multipart && !strcasecmp(multipart, "true")) + trans->multipart = 1; + else + trans->multipart = 0; + + char buf[32]; + snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL)); + trans->delim = md5sum(buf); + + return trans; +} + +static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) { + if(!trans) return; + if(trans->body) + free(trans->body); + if(trans->delim) + free(trans->delim); + osrfListFree(trans->messages); + //free(trans); // why does freeing this cause Apache to croak?? +} + +static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) { + _dbg("-----------------------------------"); + _dbg("body = %s", trans->body); + _dbg("service = %s", trans->service); + _dbg("thread = %s", trans->thread); + _dbg("multipart = %d", trans->multipart); + _dbg("recipient = %s", trans->recipient); +} + +/** + * Determines the correct recipient address based on the requested + * service or recipient address. + */ +static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) { + int stat = 0; + jsonObject* sessionCache = NULL; + + if(trans->service) { + if(trans->recipient) { + osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed"); + + } else { + // service is specified, build a recipient address + // from the router, domain, and service + int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service); + recipientBuf[size] = '\0'; + osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf); + trans->recipient = recipientBuf; + stat = 1; + } + + } else { + + if(trans->recipient) { + sessionCache = osrfCacheGetObject(trans->thread); + + if(sessionCache) { + char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip")); + char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid")); + + // choosing a specific recipient address requires that the recipient and + // thread be cached on the server (so drone processes cannot be hijacked) + if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) { + osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s", + trans->remoteHost, trans->recipient); + jsonObjectFree(sessionCache); + stat = 1; + + // XXX free me + trans->service = jsonObjectGetString(jsonObjectGetKey(sessionCache, "service")); + + } else { + osrfLogError(OSRF_LOG_MARK, + "Session cache for thread %s does not match request", trans->thread); + } + + free(ipAddr); + free(recipient); + + } else { + osrfLogError(OSRF_LOG_MARK, + "attempt to send directly to %s without a session", trans->recipient); + } + } else { + osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined"); + } + } + + jsonObjectFree(sessionCache); + return stat; +} + +/** + * Parses the request body and logs any REQUEST messages to the activity log + */ +static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) { + osrfMessage* msg; + osrfMessage* msgList[MAX_MSGS_PER_PACKET]; + int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET); + osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs); + + if(numMsgs == 0) + return 0; + + if(numMsgs == 1) { + msg = msgList[0]; + if(msg->m_type == CONNECT) { + trans->connectOnly = 1; + return 1; + } + if(msg->m_type == DISCONNECT) { + trans->disconnectOnly = 1; + return 1; + } + } + + // log request messages to the activity log + int i; + for(i = 0; i < numMsgs; i++) { + msg = msgList[i]; + if(msg->m_type == REQUEST) { + + jsonObject* params = msg->_params; + growing_buffer* act = buffer_init(128); + buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name); + + char* str; + int i = 0; + while((str = jsonObjectGetString(jsonObjectGetIndex(params, i++)))) { + if( i == 1 ) + OSRF_BUFFER_ADD(act, " "); + else + OSRF_BUFFER_ADD(act, ", "); + OSRF_BUFFER_ADD(act, str); + } + osrfLogActivity(OSRF_LOG_MARK, act->buf); + buffer_free(act); + } + } + + return 1; +} + +static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) { + osrfMessage* omsgList[MAX_MSGS_PER_PACKET]; + int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET); + osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs); + if(numMsgs == 0) return 0; + + osrfMessage* last = omsgList[numMsgs-1]; + if(last->m_type == STATUS) { + if(last->status_code == OSRF_STATUS_TIMEOUT) { + osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout"); + osrfCacheRemove(trans->thread); + return 0; + } + + if(last->status_code != OSRF_STATUS_CONTINUE) + trans->complete = 1; + } + + return 1; +} + +static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) { + apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender); + apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread); + if(trans->multipart) { + char buf[strlen(MULTIPART_CONTENT_TYPE) + strlen(trans->delim) + 1]; + sprintf(buf, MULTIPART_CONTENT_TYPE, trans->delim); + ap_set_content_type(trans->apreq, buf); + ap_rvputs(trans->apreq, "--%s\n", trans->delim); + } else { + ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE); + } +} + +static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans) { + jsonObject* cacheObj = jsonNewObject(NULL); + jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost)); + jsonObjectSetKey(cacheObj, "jid", jsonNewObject(trans->recipient)); + jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service)); + osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME); +} + + +/** + * Writes a single chunk of multipart/x-mixed-replace content + */ +static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) { + ap_rvputs(trans->apreq, + "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body); + if(trans->complete) + ap_rvputs(trans->apreq, "--%s--\n", trans->delim); + else + ap_rvputs(trans->apreq, "--%s\n", trans->delim); + ap_rflush(trans->apreq); +} + +static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) { + if(trans->body == NULL) + return HTTP_BAD_REQUEST; + + if(!osrfHttpTranslatorSetTo(trans)) + return HTTP_BAD_REQUEST; + + if(!osrfHttpTranslatorParseRequest(trans)) + return HTTP_BAD_REQUEST; + + while(client_recv(trans->handle, 0)) + continue; // discard any old status messages in the recv queue + + // send the message to the recipient + transport_message* tmsg = message_init( + trans->body, NULL, trans->thread, trans->recipient, NULL); + client_send_message(trans->handle, tmsg); + message_free(tmsg); + + if(trans->disconnectOnly) { + osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect"); + return OK; + } + + // process the response from the opensrf service + int firstWrite = 1; + while(!trans->complete) { + osrfLogDebug(OSRF_LOG_MARK, "1"); + transport_message* msg = client_recv(trans->handle, trans->timeout); + osrfLogDebug(OSRF_LOG_MARK, "2"); + + if(trans->handle->error) { + osrfLogError(OSRF_LOG_MARK, "Transport error"); + return HTTP_NOT_FOUND; + } + + if(msg == NULL) + return HTTP_GATEWAY_TIME_OUT; + + if(msg->is_error) { + osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code); + return HTTP_NOT_FOUND; + } + + if(!osrfHttpTranslatorCheckStatus(trans, msg)) + continue; + + if(firstWrite) { + osrfHttpTranslatorInitHeaders(trans, msg); + osrfHttpTranslatorCacheSession(trans); + firstWrite = 0; + } + + if(trans->multipart) { + osrfHttpTranslatorWriteChunk(trans, msg); + if(trans->connectOnly) + break; + } else { + if(!trans->messages) + trans->messages = osrfNewList(); + osrfListPush(trans->messages, msg->body); + + if(trans->complete || trans->connectOnly) { + growing_buffer* buf = buffer_init(128); + int i; + OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0)); + for(i = 1; i < trans->messages->size; i++) { + // yay! string mangling + } + } + } + } + + return OK; +} + +static void testConnection(request_rec* r) { + if(!osrfConnected || !osrfSystemGetTransportClient()) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF"); + usleep(100000); // .1 second to prevent process die/start overload + exit(1); + } +} + +// it's dead, Jim +static apr_status_t childExit(void* data) { + osrf_system_shutdown(); + return OK; +} + +static void childInit(apr_pool_t *p, server_rec *s) { + if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) { + ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, + "Unable to Bootstrap OpenSRF Client with config %s..", configFile); + return; + } + + routerName = osrfConfigGetValue(NULL, "/router_name"); + domainName = osrfConfigGetValue(NULL, "/domain"); + // --------------------- + // XXX initialize the cache from the Apache settings + const char* servers[] = {"127.0.0.1:11211"}; + osrfCacheInit(servers, 1, 86400); + // --------------------- + osrfConnected = 1; + + // at pool destroy time (= child exit time), cleanup + apr_pool_cleanup_register(p, NULL, childExit, NULL); +} + +static int handler(request_rec *r) { + int stat = OK; + if(strcmp(r->handler, MODULE_NAME)) return DECLINED; + if(r->header_only) return stat; + + r->allowed |= (AP_METHOD_BIT << M_GET); + r->allowed |= (AP_METHOD_BIT << M_POST); + + osrfLogSetAppname("osrf_http_translator"); + testConnection(r); + + osrfHttpTranslator* trans = osrfNewHttpTranslator(r); + if(trans->body) { + stat = osrfHttpTranslatorProcess(trans); + osrfHttpTranslatorDebug(trans); + + osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat); + + ap_rputs(trans->body, r); + } else { + osrfLogWarning(OSRF_LOG_MARK, "no message body to process"); + } + osrfHttpTranslatorFree(trans); + return stat; +} + + +static void registerHooks (apr_pool_t *p) { + ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE); +} + + +module AP_MODULE_DECLARE_DATA osrf_http_translator_module = { + STANDARD20_MODULE_STUFF, + NULL, + NULL, + NULL, + NULL, + NULL, + registerHooks, +}; + + + + -- 2.43.2