From f990a29db95d9b1c06efa22c1b3f4fbc43206571 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 12 Nov 2012 16:46:19 -0500 Subject: [PATCH] LP#1268619: websocket translator * starting packet inspection * activity log; recipient removal * only cache connected recipients; use request_rec pool for session_pool parent * wrap all thread work in mutex * session memory goodness Signed-off-by: Bill Erickson Signed-off-by: Galen Charlton --- src/gateway/osrf_websocket_translator.c | 350 +++++++++++++++++++----- 1 file changed, 278 insertions(+), 72 deletions(-) diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index b6205d8..35f986d 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -67,13 +67,15 @@ #define MAX_THREAD_SIZE 64 #define RECIP_BUF_SIZE 128 +#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1" typedef struct _osrfWebsocketTranslator { const WebSocketServer *server; apr_pool_t *main_pool; // standalone per-process pool - apr_pool_t *session_pool; // child of trans->main_pool; per-session + apr_pool_t *session_pool; // child of r->pool; per-session apr_hash_t *session_cache; apr_thread_t *responder_thread; + apr_thread_mutex_t *mutex; int client_connected; char* osrf_router; char* osrf_domain; @@ -84,80 +86,168 @@ static transport_client *osrf_handle = NULL; static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer -/** - * Responder thread main body. - * Collects responses from the opensrf network and relays them to the - * websocket caller. - */ -void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { +static void clear_cached_recipient(const char* thread) { + apr_pool_t *pool = NULL; - transport_message *tmsg; - jsonObject *msg_wrapper; - char *msg_string; + if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) { - while (1) { + osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect"); - tmsg = client_recv(osrf_handle, -1); + // remove it from the hash + apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL); - if (!tmsg) continue; // early exit on interrupt - - // discard responses received after client disconnect - if (!trans->client_connected) { - osrfLogDebug(OSRF_LOG_MARK, - "WS discarding response for thread=%s, xid=%s", - tmsg->thread, tmsg->osrf_xid); - message_free(tmsg); - continue; + if (apr_hash_count(trans->session_cache) == 0) { + osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool"); + + // memory accumulates in the session_pool as sessions are cached then + // un-cached. Un-caching removes strings from the hash, but not the + // pool itself. That only happens when the pool is destroyed. destroy + // the session pool to clear any lingering memory + apr_pool_destroy(trans->session_pool); + + // create a standalone pool for our translator data + if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool"); + trans->session_pool = NULL; + return; + } + + trans->session_pool = pool; } + } +} + + + +void* osrf_responder_thread_main_body(transport_message *tmsg) { + osrfList *msg_list = NULL; + osrfMessage *one_msg = NULL; + int i; + + osrfLogDebug(OSRF_LOG_MARK, + "WS received opensrf response for thread=%s, xid=%s", + tmsg->thread, tmsg->osrf_xid); + + // first we need to perform some maintenance + msg_list = osrfMessageDeserialize(tmsg->body, NULL); + + for (i = 0; i < msg_list->size; i++) { + one_msg = OSRF_LIST_GET_INDEX(msg_list, i); osrfLogDebug(OSRF_LOG_MARK, - "WS received opensrf response for thread=%s, xid=%s", - tmsg->thread, tmsg->osrf_xid); - - // build the wrapper object - msg_wrapper = jsonNewObject(NULL); - jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); - jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); - jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); - - if (tmsg->is_error) { - fprintf(stderr, - "WS received jabber error message in response to thread=%s and xid=%s", - tmsg->thread, tmsg->osrf_xid); - fflush(stderr); - jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); + "WS returned response of type %d", one_msg->m_type); + + /* if our client just successfully connected to an opensrf service, + cache the sender so that future calls on this thread will use + the correct recipient. */ + if (one_msg && one_msg->m_type == STATUS) { + + + // only cache recipients if the client is still connected + if (trans->client_connected && + one_msg->status_code == OSRF_STATUS_OK) { + + if (!apr_hash_get(trans->session_cache, + tmsg->thread, APR_HASH_KEY_STRING)) { + + osrfLogDebug(OSRF_LOG_MARK, + "WS caching sender thread=%s, sender=%s", + tmsg->thread, tmsg->sender); + + apr_hash_set(trans->session_cache, + apr_pstrdup(trans->session_pool, tmsg->thread), + APR_HASH_KEY_STRING, + apr_pstrdup(trans->session_pool, tmsg->sender)); + } + + } else { + + // connection timed out; clear the cached recipient + // regardless of whether the client is still connected + if (one_msg->status_code == OSRF_STATUS_TIMEOUT) + clear_cached_recipient(tmsg->thread); + } } + } + + // maintenance is done + osrfListFree(msg_list); + + if (!trans->client_connected) { + // responses received after client disconnect are discarded + + osrfLogDebug(OSRF_LOG_MARK, + "WS discarding response for thread=%s, xid=%s", + tmsg->thread, tmsg->osrf_xid); + + return; + } + + + // client is still connected; relay the messages to the client + jsonObject *msg_wrapper = NULL; + char *msg_string = NULL; + + // build the wrapper object + msg_wrapper = jsonNewObject(NULL); + jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); + jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); + jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); + + if (tmsg->is_error) { + fprintf(stderr, + "WS received jabber error message in response to thread=%s and xid=%s", + tmsg->thread, tmsg->osrf_xid); + fflush(stderr); + jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); + } + + msg_string = jsonObjectToJSONRaw(msg_wrapper); + + // deliver the wrapped message json to the websocket client + trans->server->send(trans->server, MESSAGE_TYPE_TEXT, + (unsigned char*) msg_string, strlen(msg_string)); + + free(msg_string); + jsonObjectFree(msg_wrapper); + +} + +/** + * Responder thread main body. + * Collects responses from the opensrf network and relays them to the + * websocket caller. + */ +void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { - msg_string = jsonObjectToJSONRaw(msg_wrapper); + transport_message *tmsg; + while (1) { - // deliver the wrapped message json to the websocket client - trans->server->send(trans->server, MESSAGE_TYPE_TEXT, - (unsigned char*) msg_string, strlen(msg_string)); + if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); + return NULL; + } - // capture the true message sender - // TODO: this will grow to add one entry per client session. - // need to ensure that connected-sessions don't last /too/ long or create - // a last-touched timeout mechanism to periodically remove old entries - if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) { + // wait for a response + tmsg = client_recv(osrf_handle, -1); - osrfLogDebug(OSRF_LOG_MARK, - "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender); + if (!tmsg) continue; // early exit on interrupt - apr_hash_set(trans->session_cache, - apr_pstrdup(trans->session_pool, tmsg->thread), - APR_HASH_KEY_STRING, - apr_pstrdup(trans->session_pool, tmsg->sender)); + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return NULL; } - free(msg_string); - jsonObjectFree(msg_wrapper); + osrf_responder_thread_main_body(tmsg); message_free(tmsg); } return NULL; } + + /** * Allocate the session cache and create the responder thread */ @@ -166,6 +256,7 @@ int child_init(const WebSocketServer *server) { apr_pool_t *pool = NULL; apr_thread_t *thread = NULL; apr_threadattr_t *thread_attr = NULL; + apr_thread_mutex_t *mutex = NULL; request_rec *r = server->request(server); osrfLogDebug(OSRF_LOG_MARK, "WS child_init"); @@ -205,6 +296,13 @@ int child_init(const WebSocketServer *server) { trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); + trans->session_cache = apr_hash_make(pool); + + if (trans->session_cache == NULL) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); + return 1; + } + // Create the responder thread. Once created, // it runs for the lifetime of this process. if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && @@ -219,6 +317,15 @@ int child_init(const WebSocketServer *server) { return 1; } + if (apr_thread_mutex_create( + &mutex, APR_THREAD_MUTEX_UNNESTED, + trans->main_pool) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); + return 1; + } + + trans->mutex = mutex; + return APR_SUCCESS; } @@ -239,31 +346,104 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) { } } - // create a standalone pool for the session cache values, which will be - // destroyed on client disconnect. - if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) { + // create a standalone pool for the session cache values + // this pool will be destroyed and re-created regularly to + // clear session memory + if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); return NULL; } trans->session_pool = pool; - trans->session_cache = apr_hash_make(trans->session_pool); - - if (trans->session_cache == NULL) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); - return NULL; - } - trans->client_connected = 1; return trans; } +/** + * for each inbound opensrf message: + * 1. Stamp the ingress + * 2. REQUEST: log it as activity + * 3. DISCONNECT: remove the cached recipient + * then re-string-ify for xmpp delivery + */ + +static char* extract_inbound_messages( + const request_rec *r, + const char* service, + const char* thread, + const char* recipient, + const jsonObject *osrf_msg) { + + int i; + int num_msgs = osrf_msg->size; + osrfMessage* msg; + osrfMessage* msg_list[num_msgs]; + + // here we do an extra json round-trip to get the data + // in a form osrf_message_deserialize can understand + char *osrf_msg_json = jsonObjectToJSON(osrf_msg); + osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs); + free(osrf_msg_json); + + // should we require the caller to always pass the service? + if (service == NULL) service = ""; + + for(i = 0; i < num_msgs; i++) { + msg = msg_list[i]; + osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS); + + switch(msg->m_type) { + + case REQUEST: { + const jsonObject* params = msg->_params; + growing_buffer* act = buffer_init(128); + char* method = msg->method_name; + buffer_fadd(act, "[%s] [%s] %s %s", + r->connection->remote_ip, "", service, method); + + const jsonObject* obj = NULL; + int i = 0; + const char* str; + int redactParams = 0; + while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) { + if(!strncmp(method, str, strlen(str))) { + redactParams = 1; + break; + } + } + if(redactParams) { + OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**"); + } else { + i = 0; + while((obj = jsonObjectGetIndex(params, i++))) { + char* str = jsonObjectToJSON(obj); + if( i == 1 ) + OSRF_BUFFER_ADD(act, " "); + else + OSRF_BUFFER_ADD(act, ", "); + OSRF_BUFFER_ADD(act, str); + free(str); + } + } + osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); + buffer_free(act); + break; + } + + case DISCONNECT: + clear_cached_recipient(thread); + break; + } + } + + return osrfMessageSerializeBatch(msg_list, num_msgs); +} /** * Parse opensrf request and relay the request to the opensrf network. */ -static size_t CALLBACK on_message_handler(void *data, +static size_t on_message_handler_body(void *data, const WebSocketServer *server, const int type, unsigned char *buffer, const size_t buffer_size) { @@ -277,6 +457,7 @@ static size_t CALLBACK on_message_handler(void *data, const char *log_xid = NULL; char *msg_body = NULL; char *recipient = NULL; + int i; if (buffer_size <= 0) return OK; @@ -287,7 +468,7 @@ static size_t CALLBACK on_message_handler(void *data, memcpy(buf, buffer, buffer_size); buf[buffer_size] = '\0'; - msg_wrapper = jsonParseRaw(buf); + msg_wrapper = jsonParse(buf); if (msg_wrapper == NULL) { osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf); @@ -353,20 +534,21 @@ static size_t CALLBACK on_message_handler(void *data, } } - // TODO: activity log entry? -- requires message analysis osrfLogDebug(OSRF_LOG_MARK, "WS relaying message thread=%s, xid=%s, recipient=%s", thread, osrfLogGetXid(), recipient); - msg_body = jsonObjectToJSONRaw(osrf_msg); + msg_body = extract_inbound_messages( + r, service, thread, recipient, osrf_msg); transport_message *tmsg = message_init( msg_body, NULL, thread, recipient, NULL); - message_set_osrf_xid(tmsg, osrfLogGetXid()); - client_send_message(osrf_handle, tmsg); - osrfLogClearXid(); + message_set_osrf_xid(tmsg, osrfLogGetXid()); + client_send_message(osrf_handle, tmsg); + + osrfLogClearXid(); message_free(tmsg); jsonObjectFree(msg_wrapper); free(msg_body); @@ -374,6 +556,25 @@ static size_t CALLBACK on_message_handler(void *data, return OK; } +static size_t CALLBACK on_message_handler(void *data, + const WebSocketServer *server, const int type, + unsigned char *buffer, const size_t buffer_size) { + + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return 1; // TODO: map to apr_status_t value? + } + + apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size); + + if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return 1; + } + + return stat; +} + /** * Release all memory allocated from the translator pool and kill the pool. @@ -384,10 +585,14 @@ void CALLBACK on_disconnect_handler( osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data; trans->client_connected = 0; - apr_hash_clear(trans->session_cache); + /* + It's not necessary to destroy our session_pool, since + it's a child of the apache request_rec pool, which is + destroyed after client disconnect. apr_pool_destroy(trans->session_pool); + */ + trans->session_pool = NULL; - trans->session_cache = NULL; request_rec *r = server->request(server); osrfLogDebug(OSRF_LOG_MARK, @@ -401,6 +606,7 @@ void CALLBACK on_disconnect_handler( void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) { if (trans) { apr_thread_exit(trans->responder_thread, APR_SUCCESS); + apr_thread_mutex_destroy(trans->mutex); apr_pool_destroy(trans->main_pool); } -- 2.43.2