From 21c9c76332b8a4b591e443d098a0fc78b6db0e9d Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Sat, 9 Jun 2018 19:05:25 -0400 Subject: [PATCH] LP#1777180 Websocketd gateway and test scripts Adds a new OpenSRF binary/program for relaying websockets messages to and from a websocketd instance. The new binary (osrf-websocket-stdio) performs the same tasks as the osrf_websocket_translator.c Apache module, minus the Apache module glue and the extra threading required to run within the Apache module. Commit includes 2 test scripts (tester.pl and test-stateful.pl) for generating large series of test messages to send to a websockets instance. tester.pl sends echo requests only, test-stateful.pl sends connect->echo-request->disconnect batches across a configurable number of forked processes. INSTALL document updated to include websocketd setup as an alternative to Apache websockets. Signed-off-by: Bill Erickson Signed-off-by: Jeff Davis Signed-off-by: Jason Stephenson Signed-off-by: Ben Shum --- .gitignore | 4 + README | 34 +- configure.ac | 1 + src/Makefile.am | 2 +- src/websocket-stdio/Makefile.am | 22 + src/websocket-stdio/osrf-websocket-stdio.c | 609 +++++++++++++++++++++ src/websocket-stdio/test-stateful.pl | 148 +++++ src/websocket-stdio/tester.pl | 87 +++ 8 files changed, 904 insertions(+), 3 deletions(-) create mode 100644 src/websocket-stdio/Makefile.am create mode 100644 src/websocket-stdio/osrf-websocket-stdio.c create mode 100755 src/websocket-stdio/test-stateful.pl create mode 100755 src/websocket-stdio/tester.pl diff --git a/.gitignore b/.gitignore index 0170ebe..ac2a33e 100644 --- a/.gitignore +++ b/.gitignore @@ -96,3 +96,7 @@ tests/check_transport_client tests/check_transport_message tests/Makefile tests/Makefile.in +src/websocket-stdio/.deps/ +src/websocket-stdio/.libs/ +src/websocket-stdio/osrf-websocket-stdio +src/websocket-stdio/*.o diff --git a/README b/README index 46c311e..2959e6e 100644 --- a/README +++ b/README @@ -417,8 +417,8 @@ srfsh# request opensrf.math add 2,2 + You should receive the value `4`. -Optional: Websockets installation instructions ----------------------------------------------- +Websockets installation instructions: Option #1 Apache +------------------------------------------------------- Websockets are new to OpenSRF 2.4+ and are required for operating the new web-based staff client for Evergreen. Complete the following steps as the *root* Linux account: @@ -520,6 +520,36 @@ export OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME=600 /etc/init.d/apache2-websockets start --------------------------------------------------------------------------- +Websockets installation instructions: Option #2 Websocketd +---------------------------------------------------------- + +1. Install websocketd (latest stable release from http://websocketd.com/) ++ +.(Debian, Ubuntu) +[source,bash] +--------------------------------------------------------------------------- +cd /tmp +wget 'https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_amd64.zip' +unzip websocketd-0.3.0-linux_amd64.zip +sudo cp websocketd /usr/local/bin/ +--------------------------------------------------------------------------- ++ +2. Run websocketd as 'opensrf' ++ +.(Debian, Ubuntu) +[source,bash] +--------------------------------------------------------------------------- +/usr/local/bin/websocketd --port 7682 /openils/bin/osrf-websocket-stdio & + +# Other useful command line parameters include: +# --loglevel debug|trace|access|info|error|fatal +# --maxforks +# --sameorigin=true +# --origin=host[:port][,host[:port]...] + +# See https://github.com/joewalnes/websocketd/blob/master/help.go +--------------------------------------------------------------------------- + Optional: Using NGINX as a proxy -------------------------------- NGINX can be used to proxy HTTP, HTTPS, and WebSockets traffic. Among other diff --git a/configure.ac b/configure.ac index 073d6f8..48ffab8 100644 --- a/configure.ac +++ b/configure.ac @@ -387,6 +387,7 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then src/python/opensrf.py src/router/Makefile src/srfsh/Makefile + src/websocket-stdio/Makefile tests/Makefile bin/opensrf-perl.pl bin/osrf_config]) diff --git a/src/Makefile.am b/src/Makefile.am index a86281f..c8b6107 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -40,7 +40,7 @@ js_SCRIPTS = javascript/DojoSRF.js javascript/JSON_v1.js javascript/md5.js javas endif if BUILDCORE -MAYBE_CORE = libopensrf c-apps router srfsh gateway perl +MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio if BUILDPYTHON dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl @top_srcdir@/src/python/opensrf.py @top_srcdir@/src/python/srfsh.py else diff --git a/src/websocket-stdio/Makefile.am b/src/websocket-stdio/Makefile.am new file mode 100644 index 0000000..b9e6ba9 --- /dev/null +++ b/src/websocket-stdio/Makefile.am @@ -0,0 +1,22 @@ +# Copyright (C) 2018 King County Library Service +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + + +LDADD = $(DEF_LDLIBS) +AM_CFLAGS = $(DEF_CFLAGS) -DEXEC_DEFAULT -L@top_builddir@/src/libopensrf +AM_LDFLAGS = $(DEF_LDFLAGS) + +DISTCLEANFILES = Makefile.in Makefile + +bin_PROGRAMS = osrf-websocket-stdio +osrf_websocket_stdio_SOURCES = osrf-websocket-stdio.c diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c new file mode 100644 index 0000000..96d62f1 --- /dev/null +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -0,0 +1,609 @@ +/* -------------------------------------------------------------------- + * Copyright (C) 2018 King County Library Service + * Bill Erickson + * + * Code borrows heavily from osrf_websocket_translator.c + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. +--------------------------------------------------------------------- */ + +/** + * OpenSRF Websockets Relay + * + * Reads Websockets requests on STDIN + * Sends replies to requests on STDOUT + * + * Built to function with websocketd: + * https://github.com/joewalnes/websocketd + * + * Synopsis: + * + * websocketd --port 7682 --max-forks 250 ./osrf-websocket-stdio /path/to/opensrf_core.xml & + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_THREAD_SIZE 64 +#define RECIP_BUF_SIZE 256 +#define WEBSOCKET_INGRESS "ws-translator-v2" + +// maximun number of active, CONNECTed opensrf sessions allowed. in +// practice, this number will be very small, rarely reaching double +// digits. This is just a security back-stop. A client trying to open +// this many connections is almost certainly attempting to DOS the +// gateway / server. +#define MAX_ACTIVE_STATEFUL_SESSIONS 64 + +// Message exceeding this size are discarded. +// This value must be greater than RESET_MESSAGE_SIZE (below) +// ~10M +#define MAX_MESSAGE_SIZE 10485760 + +// After processing any message this size or larger, free and +// recreate the stdin buffer to release the memory. +// ~100k +#define RESET_MESSAGE_SIZE 102400 + +// default values, replaced during setup (below) as needed. +static char* config_file = "/openils/conf/opensrf_core.xml"; +static char* config_ctxt = "gateway"; +static char* osrf_router = NULL; +static char* osrf_domain = NULL; + +// Cache of opensrf thread strings and back-end receipients. +// Tracking this here means the caller only needs to track the thread. +// It also means we don't have to expose internal XMPP IDs +static osrfHash* stateful_session_cache = NULL; +// Message on STDIN go into our reusable buffer +static growing_buffer* stdin_buf = NULL; +// OpenSRF XMPP connection handle +static transport_client* osrf_handle = NULL; +// Reusable string buf for recipient addresses +static char recipient_buf[RECIP_BUF_SIZE]; +// Websocket client IP address (for logging) +static char* client_ip = NULL; + +static void rebuild_stdin_buffer(); +static void child_init(int argc, char* argv[]); +static void read_from_stdin(); +static void relay_stdin_message(const char*); +static char* extract_inbound_messages(); +static void log_request(const char*, osrfMessage*); +static void read_from_osrf(); +static void read_one_osrf_message(transport_message*); +static int shut_it_down(int); +static void release_hash_string(char*, void*); + +// Websocketd sends SIGINT for shutdown, followed by SIGTERM +// if SIGINT takes too long. +static void sigint_handler(int sig) { + osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown"); + shut_it_down(0); +} + +int main(int argc, char* argv[]) { + + // Handle shutdown signal -- only needed once. + signal(SIGINT, sigint_handler); + + // Connect to OpenSR -- exits on error + child_init(argc, argv); + + // Disable output buffering. + setbuf(stdout, NULL); + rebuild_stdin_buffer(); + + // The main loop waits for data to be available on both STDIN + // (websocket client request) and the OpenSRF XMPP socket + // (replies returning to the websocket client). + fd_set fds; + int stdin_no = fileno(stdin); + int osrf_no = osrf_handle->session->sock_id; + int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no; + int sel_resp; + + while (1) { + + FD_ZERO(&fds); + FD_SET(osrf_no, &fds); + FD_SET(stdin_no, &fds); + + // Wait indefinitely for activity to process + sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL); + + if (sel_resp < 0) { // error + + if (errno == EINTR) { + // Interrupted by a signal. Start the loop over. + continue; + } + + osrfLogError(OSRF_LOG_MARK, + "WS select() failed with [%s]. Exiting", strerror(errno)); + + shut_it_down(1); + } + + if (FD_ISSET(stdin_no, &fds)) { + read_from_stdin(); + } + + if (FD_ISSET(osrf_no, &fds)) { + read_from_osrf(); + } + } + + return shut_it_down(0); +} + +static void rebuild_stdin_buffer() { + + if (stdin_buf != NULL) { + buffer_free(stdin_buf); + } + + stdin_buf = buffer_init(1024); +} + +static int shut_it_down(int stat) { + osrfHashFree(stateful_session_cache); + buffer_free(stdin_buf); + osrf_system_shutdown(); // clean XMPP disconnect + exit(stat); + return stat; +} + + +// Connect to OpenSRF/XMPP +// Apply settings and command line args. +static void child_init(int argc, char* argv[]) { + + if (argc > 1) { + config_file = argv[1]; + } + + if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) { + fprintf(stderr, "Cannot boostrap OSRF\n"); + shut_it_down(1); + } + + osrf_handle = osrfSystemGetTransportClient(); + osrfAppSessionSetIngress(WEBSOCKET_INGRESS); + + osrf_router = osrfConfigGetValue(NULL, "/router_name"); + osrf_domain = osrfConfigGetValue(NULL, "/domain"); + + stateful_session_cache = osrfNewHash(); + osrfHashSetCallback(stateful_session_cache, release_hash_string); + + client_ip = getenv("REMOTE_ADDR"); + osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); +} + +// Called by osrfHash when a string is removed. We strdup each +// string before it goes into the hash. +static void release_hash_string(char* key, void* str) { + if (str == NULL) return; + free((char*) str); +} + + +// Relay websocket client messages from STDIN to OpenSRF. Reads one +// message then returns, allowing responses to intermingle with long +// series of requests. +static void read_from_stdin() { + char char_buf[1]; + char c; + + // Read one char at a time so we can stop at the first newline + // and leave any other data on the wire until read_from_stdin() + // is called again. + + while (1) { + int stat = read(fileno(stdin), char_buf, 1); + + if (stat < 0) { + + if (errno == EAGAIN) { + // read interrupted. Return to main loop to resume. + // Returning here will leave any in-progress message in + // the stdin_buf. We return to the main select loop + // to confirm we really have more data to read and to + // perform additional error checking on the stream. + return; + } + + // All other errors reading STDIN are considered fatal. + osrfLogError(OSRF_LOG_MARK, + "WS STDIN read failed with [%s]. Exiting", strerror(errno)); + shut_it_down(1); + return; + } + + if (stat == 0) { // EOF + osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect"); + shut_it_down(0); + return; + } + + c = char_buf[0]; + + if (c == '\n') { // end of current message + + if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) { + osrfLogError(OSRF_LOG_MARK, + "WS message exceeded MAX_MESSAGE_SIZE, discarding"); + rebuild_stdin_buffer(); + return; + } + + if (stdin_buf->n_used > 0) { + relay_stdin_message(stdin_buf->buf); + + if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) { + // Current message is large. Rebuild the buffer + // to free the excess memory. + rebuild_stdin_buffer(); + + } else { + + // Reset the buffer and carry on. + buffer_reset(stdin_buf); + } + } + + return; + + } else { + + if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) { + // Message exceeds max message size. Continue reading + // and discarding data. NOTE: don't reset stdin_buf + // here becase we check n_used again once reading is done. + continue; + } + + // Add the char to our current message buffer + buffer_add_char(stdin_buf, c); + } + } +} + +// Relays a single websocket request to the OpenSRF/XMPP network. +static void relay_stdin_message(const char* msg_string) { + + jsonObject *msg_wrapper = NULL; // free me + const jsonObject *tmp_obj = NULL; + const jsonObject *osrf_msg = NULL; + const char *service = NULL; + const char *thread = NULL; + const char *log_xid = NULL; + char *msg_body = NULL; + char *recipient = NULL; + + // generate a new log trace for this request. it + // may be replaced by a client-provided trace below. + osrfLogMkXid(); + + osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string); + + msg_wrapper = jsonParse(msg_string); + + if (msg_wrapper == NULL) { + osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string); + return; + } + + osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg"); + + if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) ) + service = jsonObjectGetString(tmp_obj); + + if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) ) + thread = jsonObjectGetString(tmp_obj); + + if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) ) + log_xid = jsonObjectGetString(tmp_obj); + + if (log_xid) { + + // use the caller-provide log trace id + if (strlen(log_xid) > MAX_THREAD_SIZE) { + osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length"); + return; + } + + osrfLogForceXid(log_xid); + } + + if (thread) { + + if (strlen(thread) > MAX_THREAD_SIZE) { + osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length"); + return; + } + + // since clients can provide their own threads at session start time, + // the presence of a thread does not guarantee a cached recipient + recipient = (char*) osrfHashGet(stateful_session_cache, thread); + + if (recipient) { + osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient); + } + } + + if (!recipient) { + + if (service) { + int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1, + "%s@%s/%s", osrf_router, osrf_domain, service); + recipient_buf[size] = '\0'; + recipient = recipient_buf; + + } else { + osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient"); + return; + } + } + + osrfLogDebug(OSRF_LOG_MARK, + "WS relaying message to opensrf thread=%s, recipient=%s", + thread, recipient); + + // 'recipient' will be freed in extract_inbound_messages + // during a DISCONNECT call. Retain a local copy. + recipient = strdup(recipient); + + msg_body = extract_inbound_messages(service, thread, osrf_msg); + + osrfLogInternal(OSRF_LOG_MARK, + "WS relaying inbound message: %s", msg_body); + + transport_message *tmsg = message_init( + msg_body, NULL, thread, recipient, NULL); + + free(recipient); + + message_set_osrf_xid(tmsg, osrfLogGetXid()); + + if (client_send_message(osrf_handle, tmsg) != 0) { + osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting"); + shut_it_down(1); + } + + osrfLogClearXid(); + message_free(tmsg); + jsonObjectFree(msg_wrapper); + free(msg_body); +} + +// Turn the OpenSRF message JSON into a set of osrfMessage's for +// analysis, ingress application, and logging. +static char* extract_inbound_messages( + const char* service, const char* thread, const jsonObject *osrf_msg) { + + int i; + int num_msgs = osrf_msg->size; + osrfMessage* msg; + osrfMessage* msg_list[num_msgs]; + + // here we do an extra json round-trip to get the data + // in a form osrf_message_deserialize can understand + // TODO: consider a version of osrf_message_init which can + // accept a jsonObject* instead of a JSON string. + char *osrf_msg_json = jsonObjectToJSON(osrf_msg); + osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs); + free(osrf_msg_json); + + // should we require the caller to always pass the service? + if (service == NULL) service = ""; + + for (i = 0; i < num_msgs; i++) { + msg = msg_list[i]; + osrfMessageSetIngress(msg, WEBSOCKET_INGRESS); + + switch (msg->m_type) { + + case CONNECT: + break; + + case REQUEST: + log_request(service, msg); + break; + + case DISCONNECT: + osrfHashRemove(stateful_session_cache, thread); + break; + + default: + osrfLogError(OSRF_LOG_MARK, "WS received unexpected message " + "type from WebSocket client: %d", msg->m_type); + break; + } + } + + char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs); + + // clean up our messages + for (i = 0; i < num_msgs; i++) + osrfMessageFree(msg_list[i]); + + return finalMsg; +} + +// All REQUESTs are logged as activity. +static void log_request(const char* service, osrfMessage* msg) { + + const jsonObject* params = msg->_params; + growing_buffer* act = buffer_init(128); + char* method = msg->method_name; + const jsonObject* obj = NULL; + int i = 0; + const char* str; + int redactParams = 0; + + buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method); + + while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) { + if (!strncmp(method, str, strlen(str))) { + redactParams = 1; + break; + } + } + + if (redactParams) { + OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**"); + } else { + i = 0; + while ((obj = jsonObjectGetIndex(params, i++))) { + char* str = jsonObjectToJSON(obj); + if (i == 1) + OSRF_BUFFER_ADD(act, " "); + else + OSRF_BUFFER_ADD(act, ", "); + OSRF_BUFFER_ADD(act, str); + free(str); + } + } + + osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); + buffer_free(act); +} + + + +// Relay response messages from OpenSRF to STDIN +// Relays all available messages +static void read_from_osrf() { + transport_message* tmsg = NULL; + + // Double check the socket connection before continuing. + if (!client_connected(osrf_handle) || + !socket_connected(osrf_handle->session->sock_id)) { + osrfLogWarning(OSRF_LOG_MARK, + "WS: Jabber socket disconnected, exiting"); + shut_it_down(1); + } + + // Once client_recv is called all data waiting on the socket is + // read. This means we can't return to the main select() loop after + // each message, because any subsequent messages will get stuck in + // the opensrf receive queue. Process all available messages. + while ( (tmsg = client_recv(osrf_handle, 0)) ) { + read_one_osrf_message(tmsg); + message_free(tmsg); + } +} + +// Process a single OpenSRF response message and print the reponse +// to STDOUT for delivery to the websocket client. +static void read_one_osrf_message(transport_message* tmsg) { + osrfList *msg_list = NULL; + osrfMessage *one_msg = NULL; + int i; + + osrfLogDebug(OSRF_LOG_MARK, + "WS received opensrf response for thread=%s", tmsg->thread); + + // first we need to perform some maintenance + msg_list = osrfMessageDeserialize(tmsg->body, NULL); + + for (i = 0; i < msg_list->size; i++) { + one_msg = OSRF_LIST_GET_INDEX(msg_list, i); + + osrfLogDebug(OSRF_LOG_MARK, + "WS returned response of type %d", one_msg->m_type); + + /* if our client just successfully connected to an opensrf service, + cache the sender so that future calls on this thread will use + the correct recipient. */ + if (one_msg && one_msg->m_type == STATUS) { + + if (one_msg->status_code == OSRF_STATUS_OK) { + + if (!osrfHashGet(stateful_session_cache, tmsg->thread)) { + + unsigned long ses_size = + osrfHashGetCount(stateful_session_cache); + + if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) { + + osrfLogDebug(OSRF_LOG_MARK, "WS caching sender " + "thread=%s, sender=%s; concurrent=%d", + tmsg->thread, tmsg->sender, ses_size); + + char* sender = strdup(tmsg->sender); // free in *Remove + osrfHashSet(stateful_session_cache, sender, tmsg->thread); + + } else { + + osrfLogWarning(OSRF_LOG_MARK, + "WS max concurrent sessions (%d) reached. " + "Current session will not be tracked", + MAX_ACTIVE_STATEFUL_SESSIONS + ); + } + } + + } else { + + // connection timed out; clear the cached recipient + if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { + osrfHashRemove(stateful_session_cache, tmsg->thread); + } + } + } + } + + // osrfMessageDeserialize applies the freeItem handler to the + // newly created osrfList. We only need to free the list and + // the individual osrfMessage's will be freed along with it + osrfListFree(msg_list); + + // Pack the response into a websocket wrapper message. + jsonObject *msg_wrapper = NULL; + char *msg_string = NULL; + msg_wrapper = jsonNewObject(NULL); + + jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); + jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); + jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); + + if (tmsg->is_error) { + // tmsg->sender is the original recipient. they get swapped + // in error replies. + osrfLogError(OSRF_LOG_MARK, + "WS received XMPP error message in response to thread=%s and " + "recipient=%s. Likely the recipient is not accessible/available.", + tmsg->thread, tmsg->sender); + jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); + } + + msg_string = jsonObjectToJSONRaw(msg_wrapper); + + // Send the JSON to STDOUT + printf("%s\n", msg_string); + + free(msg_string); + jsonObjectFree(msg_wrapper); +} + + diff --git a/src/websocket-stdio/test-stateful.pl b/src/websocket-stdio/test-stateful.pl new file mode 100755 index 0000000..0487d5e --- /dev/null +++ b/src/websocket-stdio/test-stateful.pl @@ -0,0 +1,148 @@ +#!/usr/bin/perl +# -------------------------------------------------------------------------- +# Copyright (C) 2018 King County Library Service +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# -------------------------------------------------------------------------- +# +# Synopsis: +# +# $ sudo cpan Net::Async::WebSocket::Client; +# $ sudo cpan IO::Async::SSL; +# $ time perl test-stateful.pl wss://localhost:443/osrf-websocket-translator +# +# -------------------------------------------------------------------------- +use strict; +use warnings; +use IO::Async::Loop; +use Net::Async::WebSocket::Client; +use OpenSRF::Utils::JSON; + +my $fork_count = 5; +my $batch_size = 1000; + +# allow the script to run easily on test VMs. +use IO::Socket::SSL; +IO::Socket::SSL::set_ctx_defaults(SSL_verify_mode => 0); + +package StatefulBatch; + +sub new { + my $class = shift; + my $self = { + client => undef, + loop => undef, + thread => undef, + sent_count => 0, + in_connect => 0 + }; + + return bless($self, $class); +} + +sub send_connect { + my $self = shift; + $self->{in_connect} = 1; + + my $thread = $self->{thread} = rand(); # reset on connect + my $msg = <{client}->send_text_frame($msg); +} + +sub send_request { + my $self = shift; + $self->{in_connect} = 0; + + my $thread = $self->{thread}; + my $msg = <{client}->send_text_frame($msg); +} + +sub send_disconnect { + my $self = shift; + + my $thread = $self->{thread}; + my $msg = <{client}->send_text_frame($msg); +} + + +sub on_message { + my ($self, $frame) = @_; + + my $msg = OpenSRF::Utils::JSON->JSON2perl($frame); + my $type = $msg->{osrf_msg}->[0]->{type}; + + if ($self->{in_connect}) { + my $msg = OpenSRF::Utils::JSON->JSON2perl($frame); + if ($type ne 'STATUS') { + die "Received unexpected message type: $type : $frame\n"; + } + $self->send_request; + + } else { + + if ($type ne 'RESULT') { + die "Received unexpected message type: $type : $frame\n"; + } + + # disconnect messages do not return replies + $self->send_disconnect; + + print "[$$] completed ".$self->{sent_count} . " of $batch_size\n"; + + if ($self->{sent_count}++ >= $batch_size) { + $self->{loop}->stop; + return; + } + + $self->send_connect; + } +} + +package main; + +my $url = $ARGV[0] or die "WS URL REQUIRED\n"; + + +for (1..$fork_count) { + + if (fork() == 0) { + my $tester = StatefulBatch->new; + + $tester->{client} = Net::Async::WebSocket::Client->new( + on_text_frame => sub { + my ($client, $frame) = @_; + $tester->on_message($frame); + } + ); + + $tester->{loop} = IO::Async::Loop->new; + $tester->{loop}->add($tester->{client}); + $tester->{client}->connect( + url => $url, on_connected => sub {$tester->send_connect}); + $tester->{loop}->run; + exit(0); + } +} + +# exit after all children have exited +while (wait() > -1) {} + diff --git a/src/websocket-stdio/tester.pl b/src/websocket-stdio/tester.pl new file mode 100755 index 0000000..75db284 --- /dev/null +++ b/src/websocket-stdio/tester.pl @@ -0,0 +1,87 @@ +#!/usr/bin/perl +# -------------------------------------------------------------------------- +# Copyright (C) 2018 King County Library Service +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# -------------------------------------------------------------------------- +# +# Synopsis: +# +# $ sudo cpan Net::Async::WebSocket::Client; +# $ sudo cpan IO::Async::SSL; +# $ time perl tester.pl wss://localhost:443/osrf-websocket-translator +# +# -------------------------------------------------------------------------- +use strict; +use warnings; +use IO::Async::Loop; +use Net::Async::WebSocket::Client; + +# allow the script to run easily on test VMs. +use IO::Socket::SSL; +IO::Socket::SSL::set_ctx_defaults(SSL_verify_mode => 0); + +my $client; +my $loop; +my $send_batches = 1000; +my $batches_sent = 0; +my $send_wanted = 5; # per batch +my $send_count = 0; +my $recv_count = 0; + +sub send_one_msg { + my $thread = rand(); + $send_count++; + + my $osrf_msg = <send_text_frame($osrf_msg); + print "batch=$batches_sent sent=$send_count received=$recv_count\n"; +} + +my $on_message = sub { + my ($self, $frame) = @_; + $recv_count++; + print "batch=$batches_sent sent=$send_count received=$recv_count\n"; + + if ($send_count == $send_wanted && $send_count == $recv_count) { + # once every request in the current batch has received + # a reply, kick off a new batch. + send_next_batch(); + } +}; + +sub send_next_batch { + + if ($batches_sent == $send_batches) { + $loop->stop; + return; + } + + $batches_sent++; + $send_count = 0; + $recv_count = 0; + for (1..$send_wanted) { + send_one_msg(); + } +} + +my $url = $ARGV[0] or die "WS URL REQUIRED\n"; + +$client = Net::Async::WebSocket::Client->new(on_text_frame => $on_message); +$loop = IO::Async::Loop->new; +$loop->add($client); +$client->connect(url => $url, on_connected => sub {send_next_batch()}); +$loop->run; + -- 2.43.2