From 8120314b86d2b6cafe2f5fd968b4475cc187acfb Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Tue, 11 Mar 2014 17:25:19 -0400 Subject: [PATCH] LP#1268619: websockets: gateway code repairs & confing options * avoid unneccessary and wrong incantation of apr_thread_exit. The two sub-threads now both live for the duration of the process. * to be safe, create thread mutex before threads Signed-off-by: Bill Erickson Signed-off-by: Galen Charlton --- README.websockets | 9 +- src/gateway/osrf_websocket_translator.c | 458 +++++++++++++----------- 2 files changed, 260 insertions(+), 207 deletions(-) diff --git a/README.websockets b/README.websockets index 69a56c0..15f38b1 100644 --- a/README.websockets +++ b/README.websockets @@ -23,10 +23,11 @@ Websockets installation instructions for Debian # OPTIONAL: add these configuration variables to # /etc/apache2-websockets/envvars and adjust as needed. -# export OSRF_WEBSOCKET_IDLE_TIMEOUT=60 +# export OSRF_WEBSOCKET_IDLE_TIMEOUT=120 # export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5 # export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml # export OSRF_WEBSOCKET_CONFIG_CTXT=gateway +# export OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME=600 # # IDLE_TIMEOUT specifies how long we will allow a client to stay connected # while idle. A longer timeout means less network traffic (from fewer @@ -36,6 +37,12 @@ Websockets installation instructions for Debian # IDLE_CHECK_INTERVAL specifies how often we wake to check the idle status # of the connected client. # +# MAX_REQUEST_WAIT_TIME is the maximum amount of time the gateway will +# wait before declaring a client as idle when there is a long-running +# outstanding request, yet no other activity is occurring. This is +# primarily a fail-safe to allow idle timeouts when one or more requests +# died on the server, and thus no response was ever delivered to the gateway. +# # Both specified in seconds # # CONFIG_FILE / CTXT are the standard opensrf core config options. diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index 5b9d607..ef8d4af 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -85,17 +85,43 @@ #include "opensrf/osrfConfig.h" #define MAX_THREAD_SIZE 64 -#define RECIP_BUF_SIZE 128 +#define RECIP_BUF_SIZE 256 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1" +// maximun number of active, CONNECTed opensrf sessions allowed. in +// practice, this number will be very small, rarely reaching double +// digits. This is just a security back-stop. A client trying to open +// this many connections is almost certainly attempting to DOS the +// gateway / server. We may want to lower this further. +#define MAX_ACTIVE_STATEFUL_SESSIONS 128 // default values, replaced during setup (below) as needed. static char* config_file = "/openils/conf/opensrf_core.xml"; static char* config_ctxt = "gateway"; -static time_t idle_timeout_interval = 60; + +static time_t idle_timeout_interval = 120; static time_t idle_check_interval = 5; static time_t last_activity_time = 0; +// Generally, we do not disconnect the client (as idle) if there is a +// request in flight. However, we need to have an upper bound on the +// amount of time we will wait for in-flight requests to complete to +// avoid leaving an effectively idle connection open after a request +// died on the backend and no response was received. +// Note that if other activity occurs while a long-running request +// is active, the wait time will get reset with each new activity. +// This is OK, though, because the goal of max_request_wait_time +// is not to chop requests off at the knees, it's to allow the client +// to timeout as idle when only a single long-running request is active +// and preventing timeout. +static time_t max_request_wait_time = 600; + +// Incremented with every REQUEST, decremented with every COMPLETE. +// Gives us a rough picture of the number of reqests we've sent to +// the server vs. the number for which a completed response has been +// received. +static int requests_in_flight = 0; + // true if we've received a signal to start graceful shutdown static int shutdown_requested = 0; static void sigusr1_handler(int sig); @@ -130,16 +156,18 @@ typedef struct _osrfWebsocketTranslator { * map internally means the caller never need know about * internal XMPP addresses and the server doesn't have to * verify caller-specified recipient addresses. It's - * all managed internally. + * all managed internally. This is only used for stateful + * (CONNECT'ed) session. Stateless sessions need not + * track the recipient, since they are one-off calls. */ - apr_hash_t *session_cache; + apr_hash_t *stateful_session_cache; /** - * session_pool contains the key/value pairs stored in - * the session_cache. The pool is regularly destroyed + * stateful_session_pool contains the key/value pairs stored in + * the stateful_session_cache. The pool is regularly destroyed * and re-created to avoid long-term memory consumption */ - apr_pool_t *session_pool; + apr_pool_t *stateful_session_pool; /** * Thread responsible for collecting responses on the opensrf @@ -181,31 +209,25 @@ static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer static void clear_cached_recipient(const char* thread) { apr_pool_t *pool = NULL; + request_rec *r = trans->server->request(trans->server); - if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) { + if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) { osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect"); // remove it from the hash - apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL); - - if (apr_hash_count(trans->session_cache) == 0) { - osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool"); - - // memory accumulates in the session_pool as sessions are cached then - // un-cached. Un-caching removes strings from the hash, but not the - // pool itself. That only happens when the pool is destroyed. Here - // we destroy the session pool to clear any lingering memory, then - // re-create it for future caching. - apr_pool_destroy(trans->session_pool); - - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool"); - trans->session_pool = NULL; - return; - } - - trans->session_pool = pool; + apr_hash_set(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING, NULL); + + if (apr_hash_count(trans->stateful_session_cache) == 0) { + osrfLogDebug(OSRF_LOG_MARK, "WS re-setting stateful_session_pool"); + + // memory accumulates in the stateful_session_pool as + // sessions are cached then un-cached. Un-caching removes + // strings from the hash, but not from the pool. Clear the + // pool here. note: apr_pool_clear does not free memory, it + // reclaims it for use again within the pool. This is more + // effecient than freeing and allocating every time. + apr_pool_clear(trans->stateful_session_pool); } } } @@ -233,30 +255,44 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { the correct recipient. */ if (one_msg && one_msg->m_type == STATUS) { + if (one_msg->status_code == OSRF_STATUS_OK) { - // only cache recipients if the client is still connected - if (trans->client_connected && - one_msg->status_code == OSRF_STATUS_OK) { - - if (!apr_hash_get(trans->session_cache, + if (!apr_hash_get(trans->stateful_session_cache, tmsg->thread, APR_HASH_KEY_STRING)) { - osrfLogDebug(OSRF_LOG_MARK, - "WS caching sender thread=%s, sender=%s", - tmsg->thread, tmsg->sender); + apr_size_t ses_size = + apr_hash_count(trans->stateful_session_cache); - apr_hash_set(trans->session_cache, - apr_pstrdup(trans->session_pool, tmsg->thread), - APR_HASH_KEY_STRING, - apr_pstrdup(trans->session_pool, tmsg->sender)); + if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) { + + osrfLogDebug(OSRF_LOG_MARK, "WS caching sender " + "thread=%s, sender=%s; concurrent=%d", + tmsg->thread, tmsg->sender, ses_size); + + apr_hash_set(trans->stateful_session_cache, + apr_pstrdup(trans->stateful_session_pool, tmsg->thread), + APR_HASH_KEY_STRING, + apr_pstrdup(trans->stateful_session_pool, tmsg->sender)); + + } else { + osrfLogWarning(OSRF_LOG_MARK, + "WS max concurrent sessions (%d) reached. " + "Current session will not be tracked", + MAX_ACTIVE_STATEFUL_SESSIONS + ); + } } } else { // connection timed out; clear the cached recipient - // regardless of whether the client is still connected - if (one_msg->status_code == OSRF_STATUS_TIMEOUT) + if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { clear_cached_recipient(tmsg->thread); + + } else { + if (one_msg->status_code == OSRF_STATUS_COMPLETE) + requests_in_flight--; + } } } } @@ -265,16 +301,7 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { // newly created osrfList. We only need to free the list and // the individual osrfMessage's will be freed along with it osrfListFree(msg_list); - - if (!trans->client_connected) { - - osrfLogInfo(OSRF_LOG_MARK, - "WS discarding response for thread=%s", tmsg->thread); - - return; - } - // client is still connected. // relay the response messages to the client jsonObject *msg_wrapper = NULL; char *msg_string = NULL; @@ -302,10 +329,68 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { jsonObjectFree(msg_wrapper); } +/** + * Responder thread main body. + * Collects responses from the opensrf network and relays them to the + * websocket caller. + */ +void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { + + transport_message *tmsg; + while (1) { + + if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); + return NULL; + } + + // wait for a response + tmsg = client_recv(osrf_handle, -1); + + if (!tmsg) continue; // interrupt + + if (trans->client_connected) { + + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return NULL; + } + + osrfLogForceXid(tmsg->osrf_xid); + osrf_responder_thread_main_body(tmsg); + last_activity_time = time(NULL); + } + + message_free(tmsg); + } + + return NULL; +} + +static int active_connection_count() { + + if (requests_in_flight) { + + time_t now = time(NULL); + time_t difference = now - last_activity_time; + + if (difference >= max_request_wait_time) { + osrfLogWarning(OSRF_LOG_MARK, + "%d In-flight request(s) took longer than %d seconds " + "to complete. Treating request as dead and moving on.", + requests_in_flight, + max_request_wait_time + ); + requests_in_flight = 0; + } + } + + return requests_in_flight; +} + /** * Sleep and regularly wake to see if the process has been idle for too * long. If so, send a disconnect to the client. - * */ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( apr_thread_t *thread, void *data) { @@ -327,7 +412,7 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( // During graceful shtudown, we may wait up to // idle_check_interval seconds before initiating shutdown. sleep(sleep_time); - + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); return NULL; @@ -339,8 +424,8 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( continue; } - // do we have any active conversations with the connected client? - int active_count = apr_hash_count(trans->session_cache); + // do we have any active stateful conversations with the client? + int active_count = active_connection_count(); if (active_count) { @@ -385,7 +470,7 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( time_t difference = now - last_activity_time; osrfLogDebug(OSRF_LOG_MARK, - "WS has been idle for %d seconds", difference); + "WS connection idle for %d seconds", difference); if (difference < idle_timeout_interval) { // Last activity occurred within the idle timeout interval. @@ -412,40 +497,90 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( return NULL; } -/** - * Responder thread main body. - * Collects responses from the opensrf network and relays them to the - * websocket caller. - */ -void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { +static int build_startup_data(const WebSocketServer *server) { - transport_message *tmsg; - while (1) { + apr_pool_t *main_pool = NULL; + apr_pool_t *stateful_session_pool = NULL; + apr_thread_t *thread = NULL; + apr_threadattr_t *thread_attr = NULL; + apr_thread_mutex_t *mutex = NULL; + request_rec *r = server->request(server); - if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); - return NULL; - } + // create a pool for our translator data + // Do not use r->pool as the parent, since r->pool will be freed + // when the current client disconnects. + if (apr_pool_create(&main_pool, NULL) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); + return 1; + } - // wait for a response - tmsg = client_recv(osrf_handle, -1); + trans = (osrfWebsocketTranslator*) + apr_palloc(main_pool, sizeof(osrfWebsocketTranslator)); - if (!tmsg) continue; // early exit on interrupt + if (trans == NULL) { + osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator"); + return 1; + } - if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); - return NULL; - } + trans->server = server; + trans->main_pool = main_pool; + trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); + trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); - osrfLogForceXid(tmsg->osrf_xid); - osrf_responder_thread_main_body(tmsg); - message_free(tmsg); - last_activity_time = time(NULL); + // opensrf session / recipient cache + trans->stateful_session_cache = apr_hash_make(trans->main_pool); + if (trans->stateful_session_cache == NULL) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); + return 1; } - return NULL; -} + // opensrf session / recipient string pool; cleared regularly + // the only data entering this pools are the session strings. + if (apr_pool_create(&stateful_session_pool, trans->main_pool) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); + return NULL; + } + trans->stateful_session_pool = stateful_session_pool; + + if (apr_thread_mutex_create( + &mutex, APR_THREAD_MUTEX_UNNESTED, + trans->main_pool) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); + return 1; + } + trans->mutex = mutex; + + // responder thread + if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && + (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && + (apr_thread_create(&thread, thread_attr, + osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { + + trans->responder_thread = thread; + + } else { + osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread"); + return 1; + } + + // idle timeout thread + thread = NULL; // reset + thread_attr = NULL; // reset + if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && + (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && + (apr_thread_create(&thread, thread_attr, + osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { + + osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread"); + trans->idle_timeout_thread = thread; + + } else { + osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread"); + return 1; + } + return APR_SUCCESS; +} /** @@ -453,14 +588,8 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat * session cache and session pool. */ int child_init(const WebSocketServer *server) { - - apr_pool_t *pool = NULL; - apr_thread_t *thread = NULL; - apr_threadattr_t *thread_attr = NULL; - apr_thread_mutex_t *mutex = NULL; request_rec *r = server->request(server); - // osrf_handle will already be connected if this is not the first request // served by this process. if ( !(osrf_handle = osrfSystemGetTransportClient()) ) { @@ -479,6 +608,21 @@ int child_init(const WebSocketServer *server) { ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS: timeout set to %d", idle_timeout_interval); + timeout = getenv("OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME"); + if (timeout) { + if (!atoi(timeout)) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, + "WS: invalid OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME: %s", + timeout + ); + } else { + max_request_wait_time = (time_t) atoi(timeout); + } + } + + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + "WS: max request wait time set to %d", max_request_wait_time); + char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL"); if (interval) { if (!atoi(interval)) { @@ -524,76 +668,7 @@ int child_init(const WebSocketServer *server) { osrf_handle = osrfSystemGetTransportClient(); } - // create a standalone pool for our translator data - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); - return 1; - } - - // allocate our static translator instance - trans = (osrfWebsocketTranslator*) - apr_palloc(pool, sizeof(osrfWebsocketTranslator)); - - if (trans == NULL) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator"); - return 1; - } - - trans->main_pool = pool; - trans->server = server; - trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); - trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); - - trans->session_cache = apr_hash_make(pool); - - if (trans->session_cache == NULL) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); - return 1; - } - - // Create the responder thread. Once created, - // it runs for the lifetime of this process. - if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && - (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && - (apr_thread_create(&thread, thread_attr, - osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { - - trans->responder_thread = thread; - - } else { - osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread"); - return 1; - } - - // Create the idle timeout thread, which lives for the lifetime - // of the process. - thread = NULL; // reset - thread_attr = NULL; // reset - if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && - (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && - (apr_thread_create(&thread, thread_attr, - osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { - - osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread"); - trans->idle_timeout_thread = thread; - - } else { - osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread"); - return 1; - } - - - if (apr_thread_mutex_create( - &mutex, APR_THREAD_MUTEX_UNNESTED, - trans->main_pool) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); - return 1; - } - - trans->mutex = mutex; - signal(SIGUSR1, sigusr1_handler); - return APR_SUCCESS; } @@ -602,32 +677,23 @@ int child_init(const WebSocketServer *server) { */ void* CALLBACK on_connect_handler(const WebSocketServer *server) { request_rec *r = server->request(server); - apr_pool_t *pool; - apr_thread_t *thread = NULL; - apr_threadattr_t *thread_attr = NULL; - const char* client_ip = get_client_ip(r); - osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); + if (!trans) { // first connection - if (!trans) { - // first connection - if (child_init(server) != APR_SUCCESS) { + // connect to opensrf + if (child_init(server) != APR_SUCCESS) return NULL; - } - } - // create a standalone pool for the session cache values - // this pool will be destroyed and re-created regularly - // to clear session memory - if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); - return NULL; + // build pools, thread data, and the translator + if (build_startup_data(server) != APR_SUCCESS) + return NULL; } - trans->session_pool = pool; - trans->client_connected = 1; - last_activity_time = time(NULL); + const char* client_ip = get_client_ip(r); + osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); + last_activity_time = time(NULL); + trans->client_connected = 1; return trans; } @@ -702,6 +768,7 @@ static char* extract_inbound_messages( } osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); buffer_free(act); + requests_in_flight++; break; } @@ -793,7 +860,7 @@ static size_t on_message_handler_body(void *data, // since clients can provide their own threads at session start time, // the presence of a thread does not guarantee a cached recipient recipient = (char*) apr_hash_get( - trans->session_cache, thread, APR_HASH_KEY_STRING); + trans->stateful_session_cache, thread, APR_HASH_KEY_STRING); if (recipient) { osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient); @@ -837,7 +904,6 @@ static size_t on_message_handler_body(void *data, free(msg_body); last_activity_time = time(NULL); - return OK; } @@ -867,46 +933,26 @@ static size_t CALLBACK on_message_handler(void *data, void CALLBACK on_disconnect_handler( void *data, const WebSocketServer *server) { - osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data; + // if the threads wake up during disconnect, this tells + // them to go back to sleep. trans->client_connected = 0; - // timeout thread is recreated w/ each new connection - apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS); - trans->idle_timeout_thread = NULL; - - // ensure no errant session data is sticking around - apr_hash_clear(trans->session_cache); - - // strictly speaking, this pool will get destroyed when - // r->pool is destroyed, but it doesn't hurt to explicitly - // destroy it ourselves. - apr_pool_destroy(trans->session_pool); - trans->session_pool = NULL; - request_rec *r = server->request(server); - osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); -} - -/** - * Be nice and clean up our mess - */ -void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) { - if (trans) { - apr_thread_exit(trans->responder_thread, APR_SUCCESS); - apr_thread_mutex_destroy(trans->mutex); - if (trans->session_pool) - apr_pool_destroy(trans->session_pool); - apr_pool_destroy(trans->main_pool); - } - trans = NULL; + // Clear any lingering session data + // NOTE: we could apr_pool_destroy the stateful_session_pool to truly free + // the memory, but since there is a limit to the size of the pool + // (max_concurrent_sessions), the memory cannot grow unbounded, + // so there's no need. + apr_hash_clear(trans->stateful_session_cache); + apr_pool_clear(trans->stateful_session_pool); } static WebSocketPlugin osrf_websocket_plugin = { sizeof(WebSocketPlugin), WEBSOCKET_PLUGIN_VERSION_0, - on_destroy_handler, + NULL, // on_destroy_handler on_connect_handler, on_message_handler, on_disconnect_handler -- 2.43.2