From de238b740f3a54dacd07610d8e187f417bf5b677 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 10 Dec 2012 14:33:14 -0500 Subject: [PATCH] LP#1268619: websocket; docs, more memory mgmt Signed-off-by: Bill Erickson Signed-off-by: Galen Charlton --- src/gateway/osrf_websocket_translator.c | 113 +++++++++++++++++------- 1 file changed, 79 insertions(+), 34 deletions(-) diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index 35f986d..6bf8066 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -15,14 +15,15 @@ */ /** - * Dumb websocket <-> opensrf gateway. Wrapped opensrf messages are extracted + * websocket <-> opensrf gateway. Wrapped opensrf messages are extracted * and relayed to the opensrf network. Responses are pulled from the opensrf - * network and passed back to the client. No attempt is made to understand - * the contents of the messages. + * network and passed back to the client. Messages are analyzed to determine + * when a connect/disconnect occurs, so that the cache of recipients can be + * properly managed. We also activity-log REQUEST messages. * * Messages to/from the websocket client take the following form: * { - * "service" : "opensrf.foo", // required for new sessions (inbound only) + * "service" : "opensrf.foo", // required * "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars. * "log_xid" : "123..32", // optional log trace ID, max 64 chars; * "osrf_msg" : {} // required @@ -33,18 +34,19 @@ * network. The second thread collects responses from the opensrf network and * relays them back to the websocket client. * - * The main thread reads from socket A (apache) and writes to socket B - * (openesrf), while the responder thread reads from B and writes to A. The - * apr data structures used are threadsafe. For now, no thread mutex's are - * used. + * After the initial setup, all thread actions occur within a thread mutex. + * The desired affect is a non-threaded application that uses threads for + * the sole purpose of having one thread listening for incoming data, while + * a second thread listens for responses. When either thread awakens, it's + * the only thread in town until it goes back to sleep (i.e. listening on + * its socket for data). * * Note that with a "thread", which allows us to identify the opensrf session, * the caller does not need to provide a recipient address. The "service" is * only required to start a new opensrf session. After the sesession is - * started, all future communication is based solely on the thread. - * - * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care - * about the contents of the messages. + * started, all future communication is based solely on the thread. However, + * the "service" should be passed by the caller for all requests to ensure it + * is properly logged in the activity log. */ /** @@ -70,22 +72,63 @@ #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1" typedef struct _osrfWebsocketTranslator { + + /** Our handle for communicating with the caller */ const WebSocketServer *server; - apr_pool_t *main_pool; // standalone per-process pool - apr_pool_t *session_pool; // child of r->pool; per-session + + /** + * Standalone, per-process APR pool. Primarily + * there for managing thread data, which lasts + * the duration of the process. + */ + apr_pool_t *main_pool; + + /** + * Map of thread => drone-xmpp-address. Maintaining this + * map internally means the caller never need know about + * internal XMPP addresses and the server doesn't have to + * verify caller-specified recipient addresses. It's + * all managed internally. + */ apr_hash_t *session_cache; + + /** + * session_pool contains the key/value pairs stored in + * the session_cache. The pool is regularly destroyed + * and re-created to avoid long-term memory consumption + */ + apr_pool_t *session_pool; + + /** + * Thread responsible for collecting responses on the opensrf + * network and relaying them back to the caller + */ apr_thread_t *responder_thread; + + /** + * All message handling code is wrapped in a thread mutex such + * that all actions (after the initial setup) are serialized + * to minimize the possibility of multi-threading snafus. + */ apr_thread_mutex_t *mutex; + + /** + * True if a websocket client is currently connected + */ int client_connected; + + /** OpenSRF jouter name */ char* osrf_router; + + /** OpenSRF domain */ char* osrf_domain; + } osrfWebsocketTranslator; static osrfWebsocketTranslator *trans = NULL; static transport_client *osrf_handle = NULL; static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer - static void clear_cached_recipient(const char* thread) { apr_pool_t *pool = NULL; @@ -101,11 +144,11 @@ static void clear_cached_recipient(const char* thread) { // memory accumulates in the session_pool as sessions are cached then // un-cached. Un-caching removes strings from the hash, but not the - // pool itself. That only happens when the pool is destroyed. destroy - // the session pool to clear any lingering memory + // pool itself. That only happens when the pool is destroyed. Here + // we destroy the session pool to clear any lingering memory, then + // re-create it for future caching. apr_pool_destroy(trans->session_pool); - // create a standalone pool for our translator data if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool"); trans->session_pool = NULL; @@ -118,7 +161,6 @@ static void clear_cached_recipient(const char* thread) { } - void* osrf_responder_thread_main_body(transport_message *tmsg) { osrfList *msg_list = NULL; @@ -175,7 +217,6 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { osrfListFree(msg_list); if (!trans->client_connected) { - // responses received after client disconnect are discarded osrfLogDebug(OSRF_LOG_MARK, "WS discarding response for thread=%s, xid=%s", @@ -185,7 +226,8 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { } - // client is still connected; relay the messages to the client + // client is still connected. + // relay the response messages to the client jsonObject *msg_wrapper = NULL; char *msg_string = NULL; @@ -196,16 +238,15 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); if (tmsg->is_error) { - fprintf(stderr, + osrfLogError(OSRF_LOG_MARK, "WS received jabber error message in response to thread=%s and xid=%s", tmsg->thread, tmsg->osrf_xid); - fflush(stderr); jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); } msg_string = jsonObjectToJSONRaw(msg_wrapper); - // deliver the wrapped message json to the websocket client + // drop the JSON on the outbound wire trans->server->send(trans->server, MESSAGE_TYPE_TEXT, (unsigned char*) msg_string, strlen(msg_string)); @@ -249,7 +290,8 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat /** - * Allocate the session cache and create the responder thread + * Connect to OpenSRF, create the main pool, responder thread + * session cache and session pool. */ int child_init(const WebSocketServer *server) { @@ -341,14 +383,15 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) { //"WS connect from %s", r->connection->client_ip); // apache 2.4 if (!trans) { + // first connection if (child_init(server) != APR_SUCCESS) { return NULL; } } // create a standalone pool for the session cache values - // this pool will be destroyed and re-created regularly to - // clear session memory + // this pool will be destroyed and re-created regularly + // to clear session memory if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); return NULL; @@ -577,24 +620,25 @@ static size_t CALLBACK on_message_handler(void *data, /** - * Release all memory allocated from the translator pool and kill the pool. + * Clear the session cache, release the session pool */ void CALLBACK on_disconnect_handler( void *data, const WebSocketServer *server) { osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data; trans->client_connected = 0; + + // ensure no errant session data is sticking around + apr_hash_clear(trans->session_cache); - /* - It's not necessary to destroy our session_pool, since - it's a child of the apache request_rec pool, which is - destroyed after client disconnect. + // strictly speaking, this pool will get destroyed when + // r->pool is destroyed, but it doesn't hurt to explicitly + // destroy it ourselves. apr_pool_destroy(trans->session_pool); - */ - trans->session_pool = NULL; request_rec *r = server->request(server); + osrfLogDebug(OSRF_LOG_MARK, "WS disconnect from %s", r->connection->remote_ip); //"WS disconnect from %s", r->connection->client_ip); // apache 2.4 @@ -607,6 +651,7 @@ void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) { if (trans) { apr_thread_exit(trans->responder_thread, APR_SUCCESS); apr_thread_mutex_destroy(trans->mutex); + apr_pool_destroy(trans->session_pool); apr_pool_destroy(trans->main_pool); } -- 2.43.2