From 9e455c227be32bed4a16e6dab7045b6424e2ba15 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 29 Oct 2012 17:27:44 -0400 Subject: [PATCH] LP#1268619: websocket gateway repairs and cleanup * use jsonObjectFree() on jsonObjets, not free(); * removed some debugging logs * accommodate API changes for Apache 2.4 * safer logging: Avoid using ap_log_rerror, in particular referencing server->request from the responder thread, since the request_rec will be invalid after on_disconnect is called. Signed-off-by: Bill Erickson Signed-off-by: Galen Charlton --- src/gateway/osrf_websocket_translator.c | 127 ++++++++++++------------ 1 file changed, 61 insertions(+), 66 deletions(-) diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index fd19f2d..b6205d8 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -54,12 +54,11 @@ */ #include "httpd.h" -#include "http_log.h" -#include "http_log.h" #include "apr_strings.h" #include "apr_thread_proc.h" #include "apr_hash.h" #include "websocket_plugin.h" +#include "opensrf/log.h" #include "opensrf/osrf_json.h" #include "opensrf/transport_client.h" #include "opensrf/transport_message.h" @@ -68,12 +67,10 @@ #define MAX_THREAD_SIZE 64 #define RECIP_BUF_SIZE 128 -static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer -static transport_client *osrf_handle = NULL; typedef struct _osrfWebsocketTranslator { const WebSocketServer *server; - apr_pool_t *main_pool; // standline per-process pool + apr_pool_t *main_pool; // standalone per-process pool apr_pool_t *session_pool; // child of trans->main_pool; per-session apr_hash_t *session_cache; apr_thread_t *responder_thread; @@ -83,6 +80,8 @@ typedef struct _osrfWebsocketTranslator { } osrfWebsocketTranslator; static osrfWebsocketTranslator *trans = NULL; +static transport_client *osrf_handle = NULL; +static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer /** @@ -92,38 +91,41 @@ static osrfWebsocketTranslator *trans = NULL; */ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { - request_rec *r = trans->server->request(trans->server); + transport_message *tmsg; jsonObject *msg_wrapper; char *msg_string; while (1) { - transport_message *msg = client_recv(osrf_handle, -1); - if (!msg) continue; // early exit on interrupt + tmsg = client_recv(osrf_handle, -1); + + if (!tmsg) continue; // early exit on interrupt // discard responses received after client disconnect if (!trans->client_connected) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + osrfLogDebug(OSRF_LOG_MARK, "WS discarding response for thread=%s, xid=%s", - msg->thread, msg->osrf_xid); - message_free(msg); + tmsg->thread, tmsg->osrf_xid); + message_free(tmsg); continue; } - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + + osrfLogDebug(OSRF_LOG_MARK, "WS received opensrf response for thread=%s, xid=%s", - msg->thread, msg->osrf_xid); + tmsg->thread, tmsg->osrf_xid); // build the wrapper object msg_wrapper = jsonNewObject(NULL); - jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread)); - jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid)); - jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body)); + jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); + jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); + jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); - if (msg->is_error) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, + if (tmsg->is_error) { + fprintf(stderr, "WS received jabber error message in response to thread=%s and xid=%s", - msg->thread, msg->osrf_xid); + tmsg->thread, tmsg->osrf_xid); + fflush(stderr); jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); } @@ -135,21 +137,22 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat // capture the true message sender // TODO: this will grow to add one entry per client session. - // need a last-touched timeout mechanism to periodically remove old entries - if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) { + // need to ensure that connected-sessions don't last /too/ long or create + // a last-touched timeout mechanism to periodically remove old entries + if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender); + osrfLogDebug(OSRF_LOG_MARK, + "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender); apr_hash_set(trans->session_cache, - apr_pstrdup(trans->session_pool, msg->thread), + apr_pstrdup(trans->session_pool, tmsg->thread), APR_HASH_KEY_STRING, - apr_pstrdup(trans->session_pool, msg->sender)); + apr_pstrdup(trans->session_pool, tmsg->sender)); } free(msg_string); jsonObjectFree(msg_wrapper); - message_free(msg); + message_free(tmsg); } return NULL; @@ -165,7 +168,7 @@ int child_init(const WebSocketServer *server) { apr_threadattr_t *thread_attr = NULL; request_rec *r = server->request(server); - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS child_init"); + osrfLogDebug(OSRF_LOG_MARK, "WS child_init"); // osrf_handle will already be connected if this is not the first request // served by this process. @@ -173,7 +176,7 @@ int child_init(const WebSocketServer *server) { char* config_file = "/openils/conf/opensrf_core.xml"; char* config_ctx = "gateway"; //TODO config if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, + osrfLogError(OSRF_LOG_MARK, "WS unable to bootstrap OpenSRF client with config %s", config_file); return 1; } @@ -183,7 +186,7 @@ int child_init(const WebSocketServer *server) { // create a standalone pool for our translator data if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create apr_pool"); + osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); return 1; } @@ -193,7 +196,7 @@ int child_init(const WebSocketServer *server) { apr_palloc(pool, sizeof(osrfWebsocketTranslator)); if (trans == NULL) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create translator"); + osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator"); return 1; } @@ -202,8 +205,8 @@ int child_init(const WebSocketServer *server) { trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); - // Create the responder thread. Once created, it runs for the lifetime - // of this process. + // Create the responder thread. Once created, + // it runs for the lifetime of this process. if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && (apr_thread_create(&thread, thread_attr, @@ -212,8 +215,7 @@ int child_init(const WebSocketServer *server) { trans->responder_thread = thread; } else { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS unable to create responder thread"); + osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread"); return 1; } @@ -227,8 +229,9 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) { request_rec *r = server->request(server); apr_pool_t *pool; - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS connect from %s", r->connection->remote_ip); + osrfLogDebug(OSRF_LOG_MARK, + "WS connect from %s", r->connection->remote_ip); + //"WS connect from %s", r->connection->client_ip); // apache 2.4 if (!trans) { if (child_init(server) != APR_SUCCESS) { @@ -239,26 +242,18 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) { // create a standalone pool for the session cache values, which will be // destroyed on client disconnect. if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS Unable to create apr_pool"); + osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); return NULL; } trans->session_pool = pool; trans->session_cache = apr_hash_make(trans->session_pool); - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS created new pool %x", trans->session_pool); - if (trans->session_cache == NULL) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS unable to create session cache"); + osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); return NULL; } - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS created new hash %x", trans->session_cache); - trans->client_connected = 1; return trans; } @@ -285,8 +280,7 @@ static size_t CALLBACK on_message_handler(void *data, if (buffer_size <= 0) return OK; - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS received message size=%d", buffer_size); + osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size); // buffer may not be \0-terminated, which jsonParse requires char buf[buffer_size + 1]; @@ -296,8 +290,7 @@ static size_t CALLBACK on_message_handler(void *data, msg_wrapper = jsonParseRaw(buf); if (msg_wrapper == NULL) { - ap_log_rerror(APLOG_MARK, - APLOG_NOTICE, 0, r, "WS Invalid JSON: %s", buf); + osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf); return HTTP_BAD_REQUEST; } @@ -313,13 +306,17 @@ static size_t CALLBACK on_message_handler(void *data, log_xid = jsonObjectGetString(tmp_obj); if (log_xid) { + // use the caller-provide log trace id if (strlen(log_xid) > MAX_THREAD_SIZE) { - ap_log_rerror(APLOG_MARK, APLOG_NOTICE, - 0, r, "WS log_xid exceeds max length"); + osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length"); return HTTP_BAD_REQUEST; } - osrfLogSetXid(log_xid); // TODO: make with with non-client + + // TODO: make this work with non-client and make this call accept + // const char*'s. casting to (char*) for now to silence warnings. + osrfLogSetXid((char*) log_xid); + } else { // generate a new log trace id for this relay osrfLogMkXid(); @@ -328,8 +325,7 @@ static size_t CALLBACK on_message_handler(void *data, if (thread) { if (strlen(thread) > MAX_THREAD_SIZE) { - ap_log_rerror(APLOG_MARK, APLOG_NOTICE, - 0, r, "WS thread exceeds max length"); + osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length"); return HTTP_BAD_REQUEST; } @@ -339,8 +335,7 @@ static size_t CALLBACK on_message_handler(void *data, trans->session_cache, thread, APR_HASH_KEY_STRING); if (recipient) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS found cached recipient %s", recipient); + osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient); } } @@ -353,14 +348,13 @@ static size_t CALLBACK on_message_handler(void *data, recipient = recipient_buf; } else { - ap_log_rerror(APLOG_MARK, APLOG_NOTICE, - 0, r, "WS Unable to determine recipient"); + osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient"); return HTTP_BAD_REQUEST; } } // TODO: activity log entry? -- requires message analysis - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + osrfLogDebug(OSRF_LOG_MARK, "WS relaying message thread=%s, xid=%s, recipient=%s", thread, osrfLogGetXid(), recipient); @@ -374,7 +368,7 @@ static size_t CALLBACK on_message_handler(void *data, osrfLogClearXid(); message_free(tmsg); - free(msg_wrapper); + jsonObjectFree(msg_wrapper); free(msg_body); return OK; @@ -396,14 +390,15 @@ void CALLBACK on_disconnect_handler( trans->session_cache = NULL; request_rec *r = server->request(server); - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS disconnect from %s", r->connection->remote_ip); + osrfLogDebug(OSRF_LOG_MARK, + "WS disconnect from %s", r->connection->remote_ip); + //"WS disconnect from %s", r->connection->client_ip); // apache 2.4 } +/** + * Be nice and clean up our mess + */ void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) { - fprintf(stderr, "WS on_destroy_handler()\n"); - fflush(stderr); - if (trans) { apr_thread_exit(trans->responder_thread, APR_SUCCESS); apr_pool_destroy(trans->main_pool); -- 2.43.2