LP#1268619: websocket; docs, more memory mgmt
authorBill Erickson <berick@esilibrary.com>
Mon, 10 Dec 2012 19:33:14 +0000 (14:33 -0500)
committerGalen Charlton <gmc@esilibrary.com>
Tue, 19 Aug 2014 22:50:47 +0000 (15:50 -0700)
Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Galen Charlton <gmc@esilibrary.com>
src/gateway/osrf_websocket_translator.c

index 35f986d..6bf8066 100644 (file)
  */
 
 /**
  */
 
 /**
- * Dumb websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
+ * websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
  * and relayed to the opensrf network.  Responses are pulled from the opensrf
  * and relayed to the opensrf network.  Responses are pulled from the opensrf
- * network and passed back to the client.  No attempt is made to understand
- * the contents of the messages.
+ * network and passed back to the client.  Messages are analyzed to determine
+ * when a connect/disconnect occurs, so that the cache of recipients can be
+ * properly managed.  We also activity-log REQUEST messages.
  *
  * Messages to/from the websocket client take the following form:
  * {
  *
  * Messages to/from the websocket client take the following form:
  * {
- *   "service"  : "opensrf.foo", // required for new sessions (inbound only)
+ *   "service"  : "opensrf.foo", // required
  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
  *   "osrf_msg" : {<osrf_msg>}   // required
  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
  *   "osrf_msg" : {<osrf_msg>}   // required
  * network. The second thread collects responses from the opensrf network and 
  * relays them back to the websocket client.
  *
  * network. The second thread collects responses from the opensrf network and 
  * relays them back to the websocket client.
  *
- * The main thread reads from socket A (apache) and writes to socket B 
- * (openesrf), while the responder thread reads from B and writes to A.  The 
- * apr data structures used are threadsafe.  For now, no thread mutex's are 
- * used.
+ * After the initial setup, all thread actions occur within a thread mutex.
+ * The desired affect is a non-threaded application that uses threads for 
+ * the sole purpose of having one thread listening for incoming data, while
+ * a second thread listens for responses.  When either thread awakens, it's
+ * the only thread in town until it goes back to sleep (i.e. listening on 
+ * its socket for data).
  *
  * Note that with a "thread", which allows us to identify the opensrf session,
  * the caller does not need to provide a recipient address.  The "service" is
  * only required to start a new opensrf session.  After the sesession is 
  *
  * Note that with a "thread", which allows us to identify the opensrf session,
  * the caller does not need to provide a recipient address.  The "service" is
  * only required to start a new opensrf session.  After the sesession is 
- * started, all future communication is based solely on the thread.  
- *
- * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care 
- * about the contents of the messages.
+ * started, all future communication is based solely on the thread.  However,
+ * the "service" should be passed by the caller for all requests to ensure it
+ * is properly logged in the activity log.
  */
 
 /**
  */
 
 /**
 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
 
 typedef struct _osrfWebsocketTranslator {
 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
 
 typedef struct _osrfWebsocketTranslator {
+
+    /** Our handle for communicating with the caller */
     const WebSocketServer *server;
     const WebSocketServer *server;
-    apr_pool_t *main_pool; // standalone per-process pool
-    apr_pool_t *session_pool; // child of r->pool; per-session
+    
+    /**
+     * Standalone, per-process APR pool.  Primarily
+     * there for managing thread data, which lasts 
+     * the duration of the process.
+     */
+    apr_pool_t *main_pool;
+
+    /**
+     * Map of thread => drone-xmpp-address.  Maintaining this
+     * map internally means the caller never need know about
+     * internal XMPP addresses and the server doesn't have to 
+     * verify caller-specified recipient addresses.  It's
+     * all managed internally.
+     */
     apr_hash_t *session_cache; 
     apr_hash_t *session_cache; 
+
+    /**
+     * session_pool contains the key/value pairs stored in
+     * the session_cache.  The pool is regularly destroyed
+     * and re-created to avoid long-term memory consumption
+     */
+    apr_pool_t *session_pool;
+
+    /**
+     * Thread responsible for collecting responses on the opensrf
+     * network and relaying them back to the caller
+     */
     apr_thread_t *responder_thread;
     apr_thread_t *responder_thread;
+
+    /**
+     * All message handling code is wrapped in a thread mutex such
+     * that all actions (after the initial setup) are serialized
+     * to minimize the possibility of multi-threading snafus.
+     */
     apr_thread_mutex_t *mutex;
     apr_thread_mutex_t *mutex;
+
+    /**
+     * True if a websocket client is currently connected
+     */
     int client_connected;
     int client_connected;
+
+    /** OpenSRF jouter name */
     char* osrf_router;
     char* osrf_router;
+
+    /** OpenSRF domain */
     char* osrf_domain;
     char* osrf_domain;
+
 } osrfWebsocketTranslator;
 
 static osrfWebsocketTranslator *trans = NULL;
 static transport_client *osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 } osrfWebsocketTranslator;
 
 static osrfWebsocketTranslator *trans = NULL;
 static transport_client *osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
-
 static void clear_cached_recipient(const char* thread) {
     apr_pool_t *pool = NULL;                                                
 
 static void clear_cached_recipient(const char* thread) {
     apr_pool_t *pool = NULL;                                                
 
@@ -101,11 +144,11 @@ static void clear_cached_recipient(const char* thread) {
 
             // memory accumulates in the session_pool as sessions are cached then 
             // un-cached.  Un-caching removes strings from the hash, but not the 
 
             // memory accumulates in the session_pool as sessions are cached then 
             // un-cached.  Un-caching removes strings from the hash, but not the 
-            // pool itself.  That only happens when the pool is destroyed. destroy 
-            // the session pool to clear any lingering memory
+            // pool itself.  That only happens when the pool is destroyed. Here
+            // we destroy the session pool to clear any lingering memory, then
+            // re-create it for future caching.
             apr_pool_destroy(trans->session_pool);
     
             apr_pool_destroy(trans->session_pool);
     
-            // create a standalone pool for our translator data
             if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
                 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
                 trans->session_pool = NULL;
             if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
                 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
                 trans->session_pool = NULL;
@@ -118,7 +161,6 @@ static void clear_cached_recipient(const char* thread) {
 }
 
 
 }
 
 
-
 void* osrf_responder_thread_main_body(transport_message *tmsg) {
 
     osrfList *msg_list = NULL;
 void* osrf_responder_thread_main_body(transport_message *tmsg) {
 
     osrfList *msg_list = NULL;
@@ -175,7 +217,6 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
     osrfListFree(msg_list);
 
     if (!trans->client_connected) {
     osrfListFree(msg_list);
 
     if (!trans->client_connected) {
-        // responses received after client disconnect are discarded
 
         osrfLogDebug(OSRF_LOG_MARK, 
             "WS discarding response for thread=%s, xid=%s", 
 
         osrfLogDebug(OSRF_LOG_MARK, 
             "WS discarding response for thread=%s, xid=%s", 
@@ -185,7 +226,8 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
     }
 
     
     }
 
     
-    // client is still connected; relay the messages to the client
+    // client is still connected. 
+    // relay the response messages to the client
     jsonObject *msg_wrapper = NULL;
     char *msg_string = NULL;
 
     jsonObject *msg_wrapper = NULL;
     char *msg_string = NULL;
 
@@ -196,16 +238,15 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
 
     if (tmsg->is_error) {
     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
 
     if (tmsg->is_error) {
-        fprintf(stderr,  
+        osrfLogError(OSRF_LOG_MARK, 
             "WS received jabber error message in response to thread=%s and xid=%s", 
             tmsg->thread, tmsg->osrf_xid);
             "WS received jabber error message in response to thread=%s and xid=%s", 
             tmsg->thread, tmsg->osrf_xid);
-        fflush(stderr);
         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
     }
 
     msg_string = jsonObjectToJSONRaw(msg_wrapper);
 
         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
     }
 
     msg_string = jsonObjectToJSONRaw(msg_wrapper);
 
-    // deliver the wrapped message json to the websocket client
+    // drop the JSON on the outbound wire
     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
         (unsigned char*) msg_string, strlen(msg_string));
 
     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
         (unsigned char*) msg_string, strlen(msg_string));
 
@@ -249,7 +290,8 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat
 
 
 /**
 
 
 /**
- * Allocate the session cache and create the responder thread
+ * Connect to OpenSRF, create the main pool, responder thread
+ * session cache and session pool.
  */
 int child_init(const WebSocketServer *server) {
 
  */
 int child_init(const WebSocketServer *server) {
 
@@ -341,14 +383,15 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
         //"WS connect from %s", r->connection->client_ip); // apache 2.4
 
     if (!trans) {
         //"WS connect from %s", r->connection->client_ip); // apache 2.4
 
     if (!trans) {
+        // first connection
         if (child_init(server) != APR_SUCCESS) {
             return NULL;
         }
     }
 
     // create a standalone pool for the session cache values
         if (child_init(server) != APR_SUCCESS) {
             return NULL;
         }
     }
 
     // create a standalone pool for the session cache values
-    // this pool will be destroyed and re-created regularly to 
-    // clear session memory
+    // this pool will be destroyed and re-created regularly
+    // to clear session memory
     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
         return NULL;
     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
         return NULL;
@@ -577,24 +620,25 @@ static size_t CALLBACK on_message_handler(void *data,
 
 
 /**
 
 
 /**
- * Release all memory allocated from the translator pool and kill the pool.
+ * Clear the session cache, release the session pool
  */
 void CALLBACK on_disconnect_handler(
     void *data, const WebSocketServer *server) {
 
     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
     trans->client_connected = 0;
  */
 void CALLBACK on_disconnect_handler(
     void *data, const WebSocketServer *server) {
 
     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
     trans->client_connected = 0;
+    
+    // ensure no errant session data is sticking around
+    apr_hash_clear(trans->session_cache);
 
 
-    /*
-    It's not necessary to destroy our session_pool, since
-    it's a child of the apache request_rec pool, which is 
-    destroyed after client disconnect.
+    // strictly speaking, this pool will get destroyed when
+    // r->pool is destroyed, but it doesn't hurt to explicitly
+    // destroy it ourselves.
     apr_pool_destroy(trans->session_pool);
     apr_pool_destroy(trans->session_pool);
-    */
-    
     trans->session_pool = NULL;
 
     request_rec *r = server->request(server);
     trans->session_pool = NULL;
 
     request_rec *r = server->request(server);
+
     osrfLogDebug(OSRF_LOG_MARK, 
         "WS disconnect from %s", r->connection->remote_ip); 
         //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
     osrfLogDebug(OSRF_LOG_MARK, 
         "WS disconnect from %s", r->connection->remote_ip); 
         //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
@@ -607,6 +651,7 @@ void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
     if (trans) {
         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
         apr_thread_mutex_destroy(trans->mutex);
     if (trans) {
         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
         apr_thread_mutex_destroy(trans->mutex);
+        apr_pool_destroy(trans->session_pool);
         apr_pool_destroy(trans->main_pool);
     }
 
         apr_pool_destroy(trans->main_pool);
     }