From a2522f25ed8d0c5b355838174e2aac20000c85e6 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Tue, 25 Jun 2019 11:03:36 -0400 Subject: [PATCH 1/1] LP1834208 Removing apache2-websockets Remove code and build components for apache2-websockets. Update documentation to reflect the use of websocketd as the only supported OpenSRF websockets implementation. Add a note to the install documentation indicating websocketd does not offer a configurable inactivity timeout, but this can be accomplished by running it behind a proxy. Update NGINX and HAPROXY example configs to reflect the assumption that websocketd is runs locally without SSL by default. Signed-off-by: Bill Erickson Signed-off-by: Jason Stephenson Signed-off-by: Jason Boyer Signed-off-by: Ben Shum --- README | 100 +-- examples/apache_24/websockets/apache2.conf | 8 +- examples/haproxy/osrf-ws-http-proxy | 2 +- examples/nginx/osrf-ws-http-proxy | 28 +- src/gateway/Makefile.am | 7 +- src/gateway/osrf_websocket_translator.c | 993 --------------------- src/gateway/websocket_plugin.h | 130 --- 7 files changed, 24 insertions(+), 1244 deletions(-) delete mode 100644 src/gateway/osrf_websocket_translator.c delete mode 100644 src/gateway/websocket_plugin.h diff --git a/README b/README index f2ae00b..5dde40f 100644 --- a/README +++ b/README @@ -415,95 +415,7 @@ srfsh# request opensrf.math add 2,2 + You should receive the value `4`. -Websockets installation instructions: Option #1 Apache -------------------------------------------------------- -Websockets are new to OpenSRF 2.4+ and are required for operating the new web-based -staff client for Evergreen. Complete the following steps as the *root* Linux -account: - -1. Install git if not already present: -+ -[source, bash] ---------------------------------------------------------------------------- -apt-get install git-core ---------------------------------------------------------------------------- -+ -2. Install the apache-websocket module: -+ -[source, bash] ---------------------------------------------------------------------------- -# Use a temporary directory -cd /tmp -git clone https://github.com/disconnect/apache-websocket -cd apache-websocket -apxs2 -i -a -c mod_websocket.c ---------------------------------------------------------------------------- -+ -3. Create the websocket Apache instance (more information about this in - `/usr/share/doc/apache2/README.multiple-instances`) -+ -[source, bash] ---------------------------------------------------------------------------- -sh /usr/share/doc/apache2/examples/setup-instance websockets ---------------------------------------------------------------------------- -+ -4. Remove from the main apache instance -+ -[source, bash] ---------------------------------------------------------------------------- -a2dismod websocket ---------------------------------------------------------------------------- -+ -5. Change to the directory into which you unpacked OpenSRF, then copy into - place the config files -+ -[source, bash] ---------------------------------------------------------------------------- -cd /path/to/opensrf-OSRFVERSION -cp examples/apache_24/websockets/apache2.conf /etc/apache2-websockets/ ---------------------------------------------------------------------------- -+ -6. OPTIONAL: add these configuration variables to `/etc/apache2-websockets/envvars` - and adjust as needed. -+ -[source, bash] ---------------------------------------------------------------------------- -export OSRF_WEBSOCKET_IDLE_TIMEOUT=120 -export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5 -export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml -export OSRF_WEBSOCKET_CONFIG_CTXT=gateway -export OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME=600 ---------------------------------------------------------------------------- -+ - * `IDLE_TIMEOUT` specifies how long we will allow a client to stay connected - while idle. A longer timeout means less network traffic (from fewer - websocket CONNECT calls), but it also means more Apache processes are - tied up doing nothing. - * `IDLE_CHECK_INTERVAL` specifies how often we wake to check the idle status - of the connected client. - * `MAX_REQUEST_WAIT_TIME` is the maximum amount of time the gateway will - wait before declaring a client as idle when there is a long-running - outstanding request, yet no other activity is occurring. This is - primarily a fail-safe to allow idle timeouts when one or more requests - died on the server, and thus no response was ever delivered to the gateway. - * `CONFIG_FILE / CTXT` are the standard opensrf core config options. - -7. Before you can start websockets, you must install a valid SSL certificate - in `/etc/apache2/ssl/`. It is possible, but not recommended, to generate a - self-signed SSL certificate. For example, if you need to test with a self-signed - certicate on Chrome or Chromimum browsers, one workaround is to start the browser - with `--ignore-certificate-errors`. - -8. After OpenSRF is up and running (or after any re-install), - fire up the secondary Apache instance. Errors will appear in - `/var/log/apache2-websockets/error.log`. Start apache2-websockets with: -+ -[source, bash] ---------------------------------------------------------------------------- -/etc/init.d/apache2-websockets start ---------------------------------------------------------------------------- - -Websockets installation instructions: Option #2 Websocketd +Websockets installation instructions ---------------------------------------------------------- 1. Install websocketd (latest stable release from http://websocketd.com/) @@ -521,6 +433,16 @@ sudo cp websocketd /usr/local/bin/ + Choose option a or b, below. + +[NOTE] +=========================================================================== +websocketd does not offer a configurable inactivity timeout, meaning +websocket client connections will persist until each client disconnects +or the service is restarted. However, a timeout can be achieved with +the use of a proxy (option 'a' below). A proxy also allows websocketd +to be exposed to web clients on port 443 instead of its internal port, +which may simplify firewall configuration. +=========================================================================== ++ a. Run websocketd as 'opensrf' + [NOTE] diff --git a/examples/apache_24/websockets/apache2.conf b/examples/apache_24/websockets/apache2.conf index 8bf3657..0ad11a7 100644 --- a/examples/apache_24/websockets/apache2.conf +++ b/examples/apache_24/websockets/apache2.conf @@ -40,7 +40,6 @@ Include mods-available/mime.conf Include mods-available/ssl.load Include mods-available/ssl.conf Include mods-available/socache_shmcb.load -Include mods-available/websocket.load LogFormat "%v:%p %h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"" vhost_combined LogFormat "%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"" combined @@ -48,7 +47,6 @@ LogFormat "%h %l %u %t \"%r\" %>s %O" common LogFormat "%{Referer}i -> %U" referer LogFormat "%{User-agent}i" agent -# WebSockets via SSL Listen 7682 DocumentRoot /var/www @@ -63,8 +61,4 @@ Listen 7682 SSLCertificateKeyFile /etc/apache2/ssl/server.key -# OpenSRF WebSockets gateway - - SetHandler websocket-handler - WebSocketHandler /usr/lib/apache2/modules/osrf_websocket_translator.so osrf_websocket_init - + diff --git a/examples/haproxy/osrf-ws-http-proxy b/examples/haproxy/osrf-ws-http-proxy index 834672c..ba84667 100644 --- a/examples/haproxy/osrf-ws-http-proxy +++ b/examples/haproxy/osrf-ws-http-proxy @@ -22,4 +22,4 @@ backend https_server backend ws timeout server 300s - server ws1 localhost:7682 ssl verify none + server ws1 localhost:7682 verify none diff --git a/examples/nginx/osrf-ws-http-proxy b/examples/nginx/osrf-ws-http-proxy index db2e6e2..5d5b45e 100644 --- a/examples/nginx/osrf-ws-http-proxy +++ b/examples/nginx/osrf-ws-http-proxy @@ -64,14 +64,9 @@ server { location /osrf-websocket-translator { - # apache2-websockets: - # Defaults to HTTPS with or without a proxy. - proxy_pass https://localhost:7682; - - # websocketd: # websocketd may be run with or without SSL. When used with # NGINX, the assumption is it runs w/o SSL. Change to taste. - #proxy_pass http://localhost:7682; + proxy_pass http://localhost:7682; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; @@ -81,20 +76,15 @@ server { proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; - proxy_connect_timeout 5m; - - # apache2-websockets: - # Raise the default nginx proxy timeout settings to - # an arbitrarily high value so that we can leverage - # osrf-websocket-translator's native timeout settings. - proxy_send_timeout 1h; - proxy_read_timeout 1h; + # Disconnect the client if it takes this long to connect to + # websocketd. + proxy_connect_timeout 1m; - # websocketd: - # websocketd connections persist indefinitely. Leverage nginx - # timeouts to periodically disconnect long-idle clients. - #proxy_send_timeout 5m; - #proxy_read_timeout 5m; + # websocketd connections persist indefinitely. Leverage + # nginx timeouts to disconnect idle clients. Change + # to taste. + proxy_send_timeout 3m; + proxy_read_timeout 3m; } } diff --git a/src/gateway/Makefile.am b/src/gateway/Makefile.am index d4f327d..666fb06 100644 --- a/src/gateway/Makefile.am +++ b/src/gateway/Makefile.am @@ -16,8 +16,7 @@ HAVE_APACHE_MIN_24 = -DAPACHE_MIN_24 endif EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h \ - @srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c \ - @srcdir@/osrf_websocket_translator.c + @srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) AM_LDFLAGS = -L$(LIBDIR) -L@top_builddir@/src/libopensrf @@ -33,7 +32,6 @@ install-exec-local: fi $(APXS2) -c $(DEF_LDLIBS) $(AM_CFLAGS) $(AM_LDFLAGS) @srcdir@/osrf_json_gateway.c apachetools.c apachetools.h libopensrf.so $(APXS2) -c $(DEF_LDLIBS) $(AM_CFLAGS) $(AM_LDFLAGS) @srcdir@/osrf_http_translator.c apachetools.c apachetools.h libopensrf.so - $(APXS2) -c $(DEF_LDLIBS) $(AM_CFLAGS) $(AM_LDFLAGS) @srcdir@/osrf_websocket_translator.c apachetools.c apachetools.h libopensrf.so $(MKDIR_P) $(DESTDIR)$(AP_LIBEXECDIR) if [ "$(DESTDIR)" ]; then \ $(APXS2) -i -S LIBEXECDIR=$(DESTDIR)$(AP_LIBEXECDIR) @srcdir@/osrf_json_gateway.la; \ @@ -42,7 +40,6 @@ install-exec-local: $(APXS2) -i -S LIBEXECDIR=$(DESTDIR)$(AP_LIBEXECDIR) -a @srcdir@/osrf_json_gateway.la; \ $(APXS2) -i -S LIBEXECDIR=$(DESTDIR)$(AP_LIBEXECDIR) -a @srcdir@/osrf_http_translator.la; \ fi - $(APXS2) -n osrf_websocket_translator -i -S LIBEXECDIR=$(DESTDIR)$(AP_LIBEXECDIR) @srcdir@/osrf_websocket_translator.la clean-local: - rm -f @srcdir@/osrf_http_translator.la @srcdir@/osrf_http_translator.lo @srcdir@/osrf_http_translator.slo @srcdir@/osrf_json_gateway.la @srcdir@/osrf_json_gateway.lo @srcdir@/osrf_json_gateway.slo @srcdir@/osrf_websocket_translator.la @srcdir@/osrf_websocket_translator.lo @srcdir@/osrf_websocket_translator.slo + rm -f @srcdir@/osrf_http_translator.la @srcdir@/osrf_http_translator.lo @srcdir@/osrf_http_translator.slo @srcdir@/osrf_json_gateway.la @srcdir@/osrf_json_gateway.lo @srcdir@/osrf_json_gateway.slo diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c deleted file mode 100644 index 75d6876..0000000 --- a/src/gateway/osrf_websocket_translator.c +++ /dev/null @@ -1,993 +0,0 @@ -/* ----------------------------------------------------------------------- - * Copyright 2012 Equinox Software, Inc. - * Bill Erickson - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * ----------------------------------------------------------------------- - */ - -/** - * websocket <-> opensrf gateway. Wrapped opensrf messages are extracted - * and relayed to the opensrf network. Responses are pulled from the opensrf - * network and passed back to the client. Messages are analyzed to determine - * when a connect/disconnect occurs, so that the cache of recipients can be - * properly managed. We also activity-log REQUEST messages. - * - * Messages to/from the websocket client take the following form: - * { - * "service" : "opensrf.foo", // required - * "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars. - * "log_xid" : "123..32", // optional log trace ID, max 64 chars; - * "osrf_msg" : [, , ...] // required - * } - * - * Each translator operates with three threads. One thread receives messages - * from the websocket client, translates, and relays them to the opensrf - * network. The second thread collects responses from the opensrf network and - * relays them back to the websocket client. The third thread inspects - * the idle timeout interval t see if it's time to drop the idle client. - * - * After the initial setup, all thread actions occur within a thread - * mutex. The desired affect is a non-threaded application that uses - * threads for the sole purpose of having one thread listening for - * incoming data, while a second thread listens for responses, and a - * third checks the idle timeout. When any thread awakens, it's the - * only thread in town until it goes back to sleep (i.e. listening on - * its socket for data). - * - * Note that with the opensrf "thread", which allows us to identify the - * opensrf session, the caller does not need to provide a recipient - * address. The "service" is only required to start a new opensrf - * session. After the sesession is started, all future communication is - * based solely on the thread. However, the "service" should be passed - * by the caller for all requests to ensure it is properly logged in the - * activity log. - * - * Every inbound and outbound message updates the last_activity_time. - * A separate thread wakes periodically to see if the time since the - * last_activity_time exceeds the configured idle_timeout_interval. If - * so, a disconnect is sent to the client, completing the conversation. - * - * Configuration goes directly into the Apache envvars file. - * (e.g. /etc/apache2-websockets/envvars). As of today, it's not - * possible to leverage Apache configuration directives directly, - * since this is not an Apache module, but a shared library loaded - * by an apache module. This includes SetEnv / SetEnvIf. - * - * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300 - * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5 - * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml - * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway - */ - -#include -#include -#include -#include "httpd.h" -#include "http_log.h" -#include "apr_strings.h" -#include "apr_thread_proc.h" -#include "apr_hash.h" -#include "websocket_plugin.h" -#include "opensrf/log.h" -#include "opensrf/osrf_json.h" -#include "opensrf/transport_client.h" -#include "opensrf/transport_message.h" -#include "opensrf/osrf_system.h" -#include "opensrf/osrfConfig.h" - -#define MAX_THREAD_SIZE 64 -#define RECIP_BUF_SIZE 256 -#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1" - -// maximun number of active, CONNECTed opensrf sessions allowed. in -// practice, this number will be very small, rarely reaching double -// digits. This is just a security back-stop. A client trying to open -// this many connections is almost certainly attempting to DOS the -// gateway / server. We may want to lower this further. -#define MAX_ACTIVE_STATEFUL_SESSIONS 128 - -// default values, replaced during setup (below) as needed. -static char* config_file = "/openils/conf/opensrf_core.xml"; -static char* config_ctxt = "gateway"; - -static time_t idle_timeout_interval = 120; -static time_t idle_check_interval = 5; -static time_t last_activity_time = 0; - -// Generally, we do not disconnect the client (as idle) if there is a -// request in flight. However, we need to have an upper bound on the -// amount of time we will wait for in-flight requests to complete to -// avoid leaving an effectively idle connection open after a request -// died on the backend and no response was received. -// Note that if other activity occurs while a long-running request -// is active, the wait time will get reset with each new activity. -// This is OK, though, because the goal of max_request_wait_time -// is not to chop requests off at the knees, it's to allow the client -// to timeout as idle when only a single long-running request is active -// and preventing timeout. -static time_t max_request_wait_time = 600; - -// Incremented with every REQUEST, decremented with every COMPLETE. -// Gives us a rough picture of the number of reqests we've sent to -// the server vs. the number for which a completed response has been -// received. -static int requests_in_flight = 0; - -// true if we've received a signal to start graceful shutdown -static int shutdown_requested = 0; -static void sigusr1_handler(int sig); -static void sigusr1_handler(int sig) { - shutdown_requested = 1; - signal(SIGUSR1, sigusr1_handler); - osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown"); -} - -static const char* get_client_ip(const request_rec* r) { -#ifdef APACHE_MIN_24 - return r->connection->client_ip; -#else - return r->connection->remote_ip; -#endif -} - -typedef struct _osrfWebsocketTranslator { - - /** Our handle for communicating with the caller */ - const WebSocketServer *server; - - /** - * Standalone, per-process APR pool. Primarily - * there for managing thread data, which lasts - * the duration of the process. - */ - apr_pool_t *main_pool; - - /** - * Map of thread => drone-xmpp-address. Maintaining this - * map internally means the caller never need know about - * internal XMPP addresses and the server doesn't have to - * verify caller-specified recipient addresses. It's - * all managed internally. This is only used for stateful - * (CONNECT'ed) session. Stateless sessions need not - * track the recipient, since they are one-off calls. - */ - apr_hash_t *stateful_session_cache; - - /** - * stateful_session_pool contains the key/value pairs stored in - * the stateful_session_cache. The pool is regularly destroyed - * and re-created to avoid long-term memory consumption - */ - apr_pool_t *stateful_session_pool; - - /** - * Thread responsible for collecting responses on the opensrf - * network and relaying them back to the caller - */ - apr_thread_t *responder_thread; - - /** - * Thread responsible for checking inactivity timeout. - * If no activitity occurs within the configured interval, - * a disconnect is sent to the client and the connection - * is terminated. - */ - apr_thread_t *idle_timeout_thread; - - /** - * All message handling code is wrapped in a thread mutex such - * that all actions (after the initial setup) are serialized - * to minimize the possibility of multi-threading snafus. - */ - apr_thread_mutex_t *mutex; - - /** - * True if a websocket client is currently connected - */ - int client_connected; - - /** OpenSRF jouter name */ - char* osrf_router; - - /** OpenSRF domain */ - char* osrf_domain; - -} osrfWebsocketTranslator; - -static osrfWebsocketTranslator *trans = NULL; -static transport_client *osrf_handle = NULL; -static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer - -static void clear_cached_recipient(const char* thread) { - apr_pool_t *pool = NULL; - request_rec *r = trans->server->request(trans->server); - - if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) { - - osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect"); - - // remove it from the hash - apr_hash_set(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING, NULL); - - if (apr_hash_count(trans->stateful_session_cache) == 0) { - osrfLogDebug(OSRF_LOG_MARK, "WS re-setting stateful_session_pool"); - - // memory accumulates in the stateful_session_pool as - // sessions are cached then un-cached. Un-caching removes - // strings from the hash, but not from the pool. Clear the - // pool here. note: apr_pool_clear does not free memory, it - // reclaims it for use again within the pool. This is more - // effecient than freeing and allocating every time. - apr_pool_clear(trans->stateful_session_pool); - } - } -} - -void* osrf_responder_thread_main_body(transport_message *tmsg) { - - osrfList *msg_list = NULL; - osrfMessage *one_msg = NULL; - int i; - - osrfLogDebug(OSRF_LOG_MARK, - "WS received opensrf response for thread=%s", tmsg->thread); - - // first we need to perform some maintenance - msg_list = osrfMessageDeserialize(tmsg->body, NULL); - - for (i = 0; i < msg_list->size; i++) { - one_msg = OSRF_LIST_GET_INDEX(msg_list, i); - - osrfLogDebug(OSRF_LOG_MARK, - "WS returned response of type %d", one_msg->m_type); - - /* if our client just successfully connected to an opensrf service, - cache the sender so that future calls on this thread will use - the correct recipient. */ - if (one_msg && one_msg->m_type == STATUS) { - - if (one_msg->status_code == OSRF_STATUS_OK) { - - if (!apr_hash_get(trans->stateful_session_cache, - tmsg->thread, APR_HASH_KEY_STRING)) { - - apr_size_t ses_size = - apr_hash_count(trans->stateful_session_cache); - - if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) { - - osrfLogDebug(OSRF_LOG_MARK, "WS caching sender " - "thread=%s, sender=%s; concurrent=%d", - tmsg->thread, tmsg->sender, ses_size); - - apr_hash_set(trans->stateful_session_cache, - apr_pstrdup(trans->stateful_session_pool, tmsg->thread), - APR_HASH_KEY_STRING, - apr_pstrdup(trans->stateful_session_pool, tmsg->sender)); - - } else { - osrfLogWarning(OSRF_LOG_MARK, - "WS max concurrent sessions (%d) reached. " - "Current session will not be tracked", - MAX_ACTIVE_STATEFUL_SESSIONS - ); - } - } - - } else { - - // connection timed out; clear the cached recipient - if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { - clear_cached_recipient(tmsg->thread); - - } else { - if (one_msg->status_code == OSRF_STATUS_COMPLETE) - requests_in_flight--; - } - } - } - } - - // osrfMessageDeserialize applies the freeItem handler to the - // newly created osrfList. We only need to free the list and - // the individual osrfMessage's will be freed along with it - osrfListFree(msg_list); - - // relay the response messages to the client - jsonObject *msg_wrapper = NULL; - char *msg_string = NULL; - - // build the wrapper object - msg_wrapper = jsonNewObject(NULL); - jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); - jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); - jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); - - if (tmsg->is_error) { - osrfLogError(OSRF_LOG_MARK, - "WS received jabber error message in response to thread=%s", - tmsg->thread); - jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); - } - - msg_string = jsonObjectToJSONRaw(msg_wrapper); - - // drop the JSON on the outbound wire - trans->server->send(trans->server, MESSAGE_TYPE_TEXT, - (unsigned char*) msg_string, strlen(msg_string)); - - free(msg_string); - jsonObjectFree(msg_wrapper); -} - -/** - * Responder thread main body. - * Collects responses from the opensrf network and relays them to the - * websocket caller. - */ -void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { - - transport_message *tmsg; - while (1) { - - if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); - return NULL; - } - - // wait indefinitely for a response - tmsg = client_recv(osrf_handle, -1); - - if (!tmsg) { - // tmsg can only be NULL if the underlying select() call is - // interrupted or the jabber socket connection was severed. - - if (client_connected(osrf_handle) && - socket_connected(osrf_handle->session->sock_id)) { - continue; // interrupted. restart loop. - } - - // Socket connection was broken. Send disconnect to client, - // causing on_disconnect_handler to run and cleanup. - osrfLogWarning(OSRF_LOG_MARK, - "WS: Jabber socket disconnected. Sending close() to client"); - - trans->server->close(trans->server); - return NULL; // exit thread - } - - if (trans->client_connected) { - - if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); - return NULL; - } - - osrfLogForceXid(tmsg->osrf_xid); - osrf_responder_thread_main_body(tmsg); - last_activity_time = time(NULL); - } - - message_free(tmsg); - } - - return NULL; -} - -static int active_connection_count() { - - if (requests_in_flight) { - - time_t now = time(NULL); - time_t difference = now - last_activity_time; - - if (difference >= max_request_wait_time) { - osrfLogWarning(OSRF_LOG_MARK, - "%d In-flight request(s) took longer than %d seconds " - "to complete. Treating request as dead and moving on.", - requests_in_flight, - max_request_wait_time - ); - requests_in_flight = 0; - } - } - - return requests_in_flight; -} - -/** - * Sleep and regularly wake to see if the process has been idle for too - * long. If so, send a disconnect to the client. - */ -void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( - apr_thread_t *thread, void *data) { - - // sleep time defaults to the check interval, but may - // be shortened during shutdown. - int sleep_time = idle_check_interval; - int shutdown_loops = 0; - - while (1) { - - if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); - return NULL; - } - - // note: receiving a signal (e.g. SIGUSR1) will not interrupt - // this sleep(), since it's running within its own thread. - // During graceful shtudown, we may wait up to - // idle_check_interval seconds before initiating shutdown. - sleep(sleep_time); - - if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); - return NULL; - } - - // no client is connected. reset sleep time go back to sleep. - if (!trans->client_connected) { - sleep_time = idle_check_interval; - continue; - } - - // do we have any active stateful conversations with the client? - int active_count = active_connection_count(); - - if (active_count) { - - if (shutdown_requested) { - // active conversations means we can't shut down. - // shorten the check interval to re-check more often. - shutdown_loops++; - osrfLogDebug(OSRF_LOG_MARK, - "WS: %d active conversation(s) found in shutdown after " - "%d attempts. Sleeping...", shutdown_loops, active_count - ); - - if (shutdown_loops > 30) { - // this is clearly a long-running conversation, let's - // check less frequently to avoid excessive logging. - sleep_time = 3; - } else { - sleep_time = 1; - } - } - - // active conversations means keep going. There's no point in - // checking the idle time (below) if we're mid-conversation - continue; - } - - // no active conversations - - if (shutdown_requested) { - // there's no need to reset the shutdown vars (loops/requested) - // SIGUSR1 is Apaches reload signal, which means this process - // will be going away as soon as the client is disconnected. - - osrfLogInfo(OSRF_LOG_MARK, - "WS: no active conversations remain in shutdown; " - "closing client connection"); - - } else { - // see how long we've been idle. If too long, kick the client - - time_t now = time(NULL); - time_t difference = now - last_activity_time; - - osrfLogDebug(OSRF_LOG_MARK, - "WS connection idle for %d seconds", difference); - - if (difference < idle_timeout_interval) { - // Last activity occurred within the idle timeout interval. - continue; - } - - // idle timeout exceeded - osrfLogDebug(OSRF_LOG_MARK, - "WS: idle timeout exceeded. now=%d / last=%d; " - "closing client connection", now, last_activity_time); - } - - - // send a disconnect to the client, which will come back around - // to cause our on_disconnect_handler to run. - osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client"); - trans->server->close(trans->server); - - // client will be going away, reset sleep time - sleep_time = idle_check_interval; - } - - // should never get here - return NULL; -} - -static int build_startup_data(const WebSocketServer *server) { - - apr_pool_t *main_pool = NULL; - apr_pool_t *stateful_session_pool = NULL; - apr_thread_t *thread = NULL; - apr_threadattr_t *thread_attr = NULL; - apr_thread_mutex_t *mutex = NULL; - request_rec *r = server->request(server); - - // create a pool for our translator data - // Do not use r->pool as the parent, since r->pool will be freed - // when the current client disconnects. - if (apr_pool_create(&main_pool, NULL) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); - return 1; - } - - trans = (osrfWebsocketTranslator*) - apr_palloc(main_pool, sizeof(osrfWebsocketTranslator)); - - if (trans == NULL) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator"); - return 1; - } - - trans->server = server; - trans->main_pool = main_pool; - trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); - trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); - - // opensrf session / recipient cache - trans->stateful_session_cache = apr_hash_make(trans->main_pool); - if (trans->stateful_session_cache == NULL) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); - return 1; - } - - // opensrf session / recipient string pool; cleared regularly - // the only data entering this pools are the session strings. - if (apr_pool_create(&stateful_session_pool, trans->main_pool) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); - return 0; - } - trans->stateful_session_pool = stateful_session_pool; - - if (apr_thread_mutex_create( - &mutex, APR_THREAD_MUTEX_UNNESTED, - trans->main_pool) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); - return 1; - } - trans->mutex = mutex; - - // responder thread - if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && - (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && - (apr_thread_create(&thread, thread_attr, - osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { - - trans->responder_thread = thread; - - } else { - osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread"); - return 1; - } - - // idle timeout thread - thread = NULL; // reset - thread_attr = NULL; // reset - if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && - (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && - (apr_thread_create(&thread, thread_attr, - osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { - - osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread"); - trans->idle_timeout_thread = thread; - - } else { - osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread"); - return 1; - } - - return APR_SUCCESS; -} - - -/** - * Connect to OpenSRF, create the main pool, responder thread - * session cache and session pool. - */ -int child_init(const WebSocketServer *server) { - request_rec *r = server->request(server); - - // osrf_handle will already be connected if this is not the first request - // served by this process. - if ( !(osrf_handle = osrfSystemGetTransportClient()) ) { - - // load config values from the env - char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT"); - if (timeout) { - if (!atoi(timeout)) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout); - } else { - idle_timeout_interval = (time_t) atoi(timeout); - } - } - - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS: timeout set to %ld", idle_timeout_interval); - - timeout = getenv("OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME"); - if (timeout) { - if (!atoi(timeout)) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS: invalid OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME: %s", - timeout - ); - } else { - max_request_wait_time = (time_t) atoi(timeout); - } - } - - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS: max request wait time set to %ld", max_request_wait_time); - - char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL"); - if (interval) { - if (!atoi(interval)) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s", - interval - ); - } else { - idle_check_interval = (time_t) atoi(interval); - } - } - - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS: idle check interval set to %ld", idle_check_interval); - - - char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE"); - if (cfile) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS: config file set to %s", cfile); - config_file = cfile; - } - - char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT"); - if (ctxt) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS: config context set to %s", ctxt); - config_ctxt = ctxt; - } - - // connect to opensrf - if (!osrfSystemBootstrapClientResc( - config_file, config_ctxt, "websocket")) { - - osrfLogError(OSRF_LOG_MARK, - "WS unable to bootstrap OpenSRF client with config %s " - "and context %s", config_file, config_ctxt - ); - return 1; - } - - osrfLogSetAppname("osrf_websocket_translator"); - osrf_handle = osrfSystemGetTransportClient(); - } - - signal(SIGUSR1, sigusr1_handler); - return APR_SUCCESS; -} - -/** - * Create the per-client translator - */ -void* CALLBACK on_connect_handler(const WebSocketServer *server) { - request_rec *r = server->request(server); - - if (!trans) { // first connection - - // connect to opensrf - if (child_init(server) != APR_SUCCESS) - return NULL; - - // build pools, thread data, and the translator - if (build_startup_data(server) != APR_SUCCESS) - return NULL; - } - - const char* client_ip = get_client_ip(r); - osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); - - last_activity_time = time(NULL); - trans->client_connected = 1; - return trans; -} - - -/** - * for each inbound opensrf message: - * 1. Stamp the ingress - * 2. REQUEST: log it as activity - * 3. DISCONNECT: remove the cached recipient - * then re-string-ify for xmpp delivery - */ - -static char* extract_inbound_messages( - const request_rec *r, - const char* service, - const char* thread, - const char* recipient, - const jsonObject *osrf_msg) { - - int i; - int num_msgs = osrf_msg->size; - osrfMessage* msg; - osrfMessage* msg_list[num_msgs]; - - // here we do an extra json round-trip to get the data - // in a form osrf_message_deserialize can understand - // TODO: consider a version of osrf_message_init which can - // accept a jsonObject* instead of a JSON string. - char *osrf_msg_json = jsonObjectToJSON(osrf_msg); - osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs); - free(osrf_msg_json); - - // should we require the caller to always pass the service? - if (service == NULL) service = ""; - - for(i = 0; i < num_msgs; i++) { - msg = msg_list[i]; - osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS); - - switch(msg->m_type) { - - case REQUEST: { - const jsonObject* params = msg->_params; - growing_buffer* act = buffer_init(128); - char* method = msg->method_name; - buffer_fadd(act, "[%s] [%s] %s %s", - get_client_ip(r), "", service, method); - - const jsonObject* obj = NULL; - int i = 0; - const char* str; - int redactParams = 0; - while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) { - if(!strncmp(method, str, strlen(str))) { - redactParams = 1; - break; - } - } - if(redactParams) { - OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**"); - } else { - i = 0; - while((obj = jsonObjectGetIndex(params, i++))) { - char* str = jsonObjectToJSON(obj); - if( i == 1 ) - OSRF_BUFFER_ADD(act, " "); - else - OSRF_BUFFER_ADD(act, ", "); - OSRF_BUFFER_ADD(act, str); - free(str); - } - } - osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); - buffer_free(act); - requests_in_flight++; - break; - } - - case DISCONNECT: - clear_cached_recipient(thread); - break; - } - } - - char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs); - - // clean up our messages - for(i = 0; i < num_msgs; i++) - osrfMessageFree(msg_list[i]); - - return finalMsg; -} - -/** - * Parse opensrf request and relay the request to the opensrf network. - */ -static size_t on_message_handler_body(void *data, - const WebSocketServer *server, const int type, - unsigned char *buffer, const size_t buffer_size) { - - request_rec *r = server->request(server); - - jsonObject *msg_wrapper = NULL; // free me - const jsonObject *tmp_obj = NULL; - const jsonObject *osrf_msg = NULL; - const char *service = NULL; - const char *thread = NULL; - const char *log_xid = NULL; - char *msg_body = NULL; - char *recipient = NULL; - int i; - - if (buffer_size <= 0) return OK; - - // generate a new log trace for this request. it - // may be replaced by a client-provided trace below. - osrfLogMkXid(); - - osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size); - - // buffer may not be \0-terminated, which jsonParse requires - char buf[buffer_size + 1]; - memcpy(buf, buffer, buffer_size); - buf[buffer_size] = '\0'; - - osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", buf); - - msg_wrapper = jsonParse(buf); - - if (msg_wrapper == NULL) { - osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf); - return HTTP_BAD_REQUEST; - } - - osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg"); - - if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) - service = jsonObjectGetString(tmp_obj); - - if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) - thread = jsonObjectGetString(tmp_obj); - - if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) - log_xid = jsonObjectGetString(tmp_obj); - - if (log_xid) { - - // use the caller-provide log trace id - if (strlen(log_xid) > MAX_THREAD_SIZE) { - osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length"); - return HTTP_BAD_REQUEST; - } - - osrfLogForceXid(log_xid); - } - - if (thread) { - - if (strlen(thread) > MAX_THREAD_SIZE) { - osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length"); - return HTTP_BAD_REQUEST; - } - - // since clients can provide their own threads at session start time, - // the presence of a thread does not guarantee a cached recipient - recipient = (char*) apr_hash_get( - trans->stateful_session_cache, thread, APR_HASH_KEY_STRING); - - if (recipient) { - osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient); - } - } - - if (!recipient) { - - if (service) { - int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1, - "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service); - recipient_buf[size] = '\0'; - recipient = recipient_buf; - - } else { - osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient"); - return HTTP_BAD_REQUEST; - } - } - - osrfLogDebug(OSRF_LOG_MARK, - "WS relaying message to opensrf thread=%s, recipient=%s", - thread, recipient); - - msg_body = extract_inbound_messages( - r, service, thread, recipient, osrf_msg); - - osrfLogInternal(OSRF_LOG_MARK, - "WS relaying inbound message: %s", msg_body); - - transport_message *tmsg = message_init( - msg_body, NULL, thread, recipient, NULL); - - message_set_osrf_xid(tmsg, osrfLogGetXid()); - - size_t stat = OK; - if (client_send_message(osrf_handle, tmsg) != 0) { - osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF"); - stat = HTTP_INTERNAL_SERVER_ERROR; - } - - osrfLogClearXid(); - message_free(tmsg); - jsonObjectFree(msg_wrapper); - free(msg_body); - - last_activity_time = time(NULL); - return stat; -} - -static size_t CALLBACK on_message_handler(void *data, - const WebSocketServer *server, const int type, - unsigned char *buffer, const size_t buffer_size) { - - if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); - return 1; - } - - size_t stat = on_message_handler_body(data, server, type, buffer, buffer_size); - - if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); - return 1; - } - - if (stat != OK) { - // Returning a non-OK status alone won't force a disconnect. - // Once disconnected, the on_disconnect_handler() handler - // will run, clean it all up, and kill the process. - osrfLogError(OSRF_LOG_MARK, - "Error relaying message, forcing client disconnect"); - trans->server->close(trans->server); - } - - return stat; -} - - -/** - * Clear the session cache, release the session pool - */ -void CALLBACK on_disconnect_handler( - void *data, const WebSocketServer *server) { - - // if the threads wake up during disconnect, this tells - // them to go back to sleep. - trans->client_connected = 0; - - request_rec *r = server->request(server); - osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); - - // Clear any lingering session data - // NOTE: we could apr_pool_destroy the stateful_session_pool to truly free - // the memory, but since there is a limit to the size of the pool - // (max_concurrent_sessions), the memory cannot grow unbounded, - // so there's no need. - apr_hash_clear(trans->stateful_session_cache); - apr_pool_clear(trans->stateful_session_pool); -} - -static WebSocketPlugin osrf_websocket_plugin = { - sizeof(WebSocketPlugin), - WEBSOCKET_PLUGIN_VERSION_0, - NULL, // on_destroy_handler - on_connect_handler, - on_message_handler, - on_disconnect_handler -}; - -extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() { - return &osrf_websocket_plugin; -} - diff --git a/src/gateway/websocket_plugin.h b/src/gateway/websocket_plugin.h deleted file mode 100644 index 419d47b..0000000 --- a/src/gateway/websocket_plugin.h +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2010-2011 self.disconnect - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#if !defined(_MOD_WEBSOCKET_H_) -#define _MOD_WEBSOCKET_H_ - -#include - -#if defined(__cplusplus) -extern "C" -{ -#endif - -#if defined(_WIN32) -#define EXPORT __declspec(dllexport) -#define CALLBACK __stdcall -#else -#define EXPORT -#define CALLBACK -#endif - -#define MESSAGE_TYPE_INVALID -1 -#define MESSAGE_TYPE_TEXT 0 -#define MESSAGE_TYPE_BINARY 128 -#define MESSAGE_TYPE_CLOSE 255 -#define MESSAGE_TYPE_PING 256 -#define MESSAGE_TYPE_PONG 257 - - struct _WebSocketServer; - - typedef struct request_rec *(CALLBACK * WS_Request) - (const struct _WebSocketServer *server); - - typedef const char *(CALLBACK * WS_Header_Get) - (const struct _WebSocketServer *server, - const char *key); - - typedef void (CALLBACK * WS_Header_Set) - (const struct _WebSocketServer *server, - const char *key, - const char *value); - - typedef size_t (CALLBACK * WS_Protocol_Count) - (const struct _WebSocketServer *server); - - typedef const char *(CALLBACK * WS_Protocol_Index) - (const struct _WebSocketServer *server, - const size_t index); - - typedef void (CALLBACK * WS_Protocol_Set) - (const struct _WebSocketServer *server, - const char *protocol); - - typedef size_t (CALLBACK * WS_Send) - (const struct _WebSocketServer *server, - const int type, - const unsigned char *buffer, - const size_t buffer_size); - - typedef void (CALLBACK * WS_Close) - (const struct _WebSocketServer *server); - -#define WEBSOCKET_SERVER_VERSION_1 1 - - typedef struct _WebSocketServer - { - unsigned int size; - unsigned int version; - struct _WebSocketState *state; - WS_Request request; - WS_Header_Get header_get; - WS_Header_Set header_set; - WS_Protocol_Count protocol_count; - WS_Protocol_Index protocol_index; - WS_Protocol_Set protocol_set; - WS_Send send; - WS_Close close; - } WebSocketServer; - - struct _WebSocketPlugin; - - typedef struct _WebSocketPlugin *(CALLBACK * WS_Init) - (); - typedef void (CALLBACK * WS_Destroy) - (struct _WebSocketPlugin *plugin); - - typedef void *(CALLBACK * WS_OnConnect) - (const WebSocketServer *server); /* Returns plugin_private */ - - typedef size_t (CALLBACK * WS_OnMessage) - (void *plugin_private, - const WebSocketServer *server, - const int type, - unsigned char *buffer, - const size_t buffer_size); - - typedef void (CALLBACK * WS_OnDisconnect) - (void *plugin_private, - const WebSocketServer *server); - -#define WEBSOCKET_PLUGIN_VERSION_0 0 - - typedef struct _WebSocketPlugin - { - unsigned int size; - unsigned int version; - WS_Destroy destroy; - WS_OnConnect on_connect; - WS_OnMessage on_message; - WS_OnDisconnect on_disconnect; - } WebSocketPlugin; - -#if defined(__cplusplus) -} -#endif - -#endif /* _MOD_WEBSOCKET_H_ */ -- 2.43.2