1 /* -----------------------------------------------------------------------
2 * Copyright 2012 Equinox Software, Inc.
3 * Bill Erickson <berick@esilibrary.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 * -----------------------------------------------------------------------
18 * websocket <-> opensrf gateway. Wrapped opensrf messages are extracted
19 * and relayed to the opensrf network. Responses are pulled from the opensrf
20 * network and passed back to the client. Messages are analyzed to determine
21 * when a connect/disconnect occurs, so that the cache of recipients can be
22 * properly managed. We also activity-log REQUEST messages.
24 * Messages to/from the websocket client take the following form:
26 * "service" : "opensrf.foo", // required
27 * "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars.
28 * "log_xid" : "123..32", // optional log trace ID, max 64 chars;
29 * "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...] // required
32 * Each translator operates with three threads. One thread receives messages
33 * from the websocket client, translates, and relays them to the opensrf
34 * network. The second thread collects responses from the opensrf network and
35 * relays them back to the websocket client. The third thread inspects
36 * the idle timeout interval t see if it's time to drop the idle client.
38 * After the initial setup, all thread actions occur within a thread
39 * mutex. The desired affect is a non-threaded application that uses
40 * threads for the sole purpose of having one thread listening for
41 * incoming data, while a second thread listens for responses, and a
42 * third checks the idle timeout. When any thread awakens, it's the
43 * only thread in town until it goes back to sleep (i.e. listening on
44 * its socket for data).
46 * Note that with the opensrf "thread", which allows us to identify the
47 * opensrf session, the caller does not need to provide a recipient
48 * address. The "service" is only required to start a new opensrf
49 * session. After the sesession is started, all future communication is
50 * based solely on the thread. However, the "service" should be passed
51 * by the caller for all requests to ensure it is properly logged in the
54 * Every inbound and outbound message updates the last_activity_time.
55 * A separate thread wakes periodically to see if the time since the
56 * last_activity_time exceeds the configured idle_timeout_interval. If
57 * so, a disconnect is sent to the client, completing the conversation.
59 * Configuration goes directly into the Apache envvars file.
60 * (e.g. /etc/apache2-websockets/envvars). As of today, it's not
61 * possible to leverage Apache configuration directives directly,
62 * since this is not an Apache module, but a shared library loaded
63 * by an apache module. This includes SetEnv / SetEnvIf.
65 * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300
66 * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
67 * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
68 * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
76 #include "apr_strings.h"
77 #include "apr_thread_proc.h"
79 #include "websocket_plugin.h"
80 #include "opensrf/log.h"
81 #include "opensrf/osrf_json.h"
82 #include "opensrf/transport_client.h"
83 #include "opensrf/transport_message.h"
84 #include "opensrf/osrf_system.h"
85 #include "opensrf/osrfConfig.h"
87 #define MAX_THREAD_SIZE 64
88 #define RECIP_BUF_SIZE 128
89 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
92 // default values, replaced during setup (below) as needed.
93 static char* config_file = "/openils/conf/opensrf_core.xml";
94 static char* config_ctxt = "gateway";
95 static time_t idle_timeout_interval = 60;
96 static time_t idle_check_interval = 5;
97 static time_t last_activity_time = 0;
99 // true if we've received a signal to start graceful shutdown
100 static int shutdown_requested = 0;
101 static void sigusr1_handler(int sig);
102 static void sigusr1_handler(int sig) {
103 shutdown_requested = 1;
104 signal(SIGUSR1, sigusr1_handler);
105 osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown");
108 static const char* get_client_ip(const request_rec* r) {
110 return r->connection->client_ip;
112 return r->connection->remote_ip;
116 typedef struct _osrfWebsocketTranslator {
118 /** Our handle for communicating with the caller */
119 const WebSocketServer *server;
122 * Standalone, per-process APR pool. Primarily
123 * there for managing thread data, which lasts
124 * the duration of the process.
126 apr_pool_t *main_pool;
129 * Map of thread => drone-xmpp-address. Maintaining this
130 * map internally means the caller never need know about
131 * internal XMPP addresses and the server doesn't have to
132 * verify caller-specified recipient addresses. It's
133 * all managed internally.
135 apr_hash_t *session_cache;
138 * session_pool contains the key/value pairs stored in
139 * the session_cache. The pool is regularly destroyed
140 * and re-created to avoid long-term memory consumption
142 apr_pool_t *session_pool;
145 * Thread responsible for collecting responses on the opensrf
146 * network and relaying them back to the caller
148 apr_thread_t *responder_thread;
151 * Thread responsible for checking inactivity timeout.
152 * If no activitity occurs within the configured interval,
153 * a disconnect is sent to the client and the connection
156 apr_thread_t *idle_timeout_thread;
159 * All message handling code is wrapped in a thread mutex such
160 * that all actions (after the initial setup) are serialized
161 * to minimize the possibility of multi-threading snafus.
163 apr_thread_mutex_t *mutex;
166 * True if a websocket client is currently connected
168 int client_connected;
170 /** OpenSRF jouter name */
173 /** OpenSRF domain */
176 } osrfWebsocketTranslator;
178 static osrfWebsocketTranslator *trans = NULL;
179 static transport_client *osrf_handle = NULL;
180 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
182 static void clear_cached_recipient(const char* thread) {
183 apr_pool_t *pool = NULL;
185 if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
187 osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
189 // remove it from the hash
190 apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
192 if (apr_hash_count(trans->session_cache) == 0) {
193 osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
195 // memory accumulates in the session_pool as sessions are cached then
196 // un-cached. Un-caching removes strings from the hash, but not the
197 // pool itself. That only happens when the pool is destroyed. Here
198 // we destroy the session pool to clear any lingering memory, then
199 // re-create it for future caching.
200 apr_pool_destroy(trans->session_pool);
202 if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
203 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
204 trans->session_pool = NULL;
208 trans->session_pool = pool;
213 void* osrf_responder_thread_main_body(transport_message *tmsg) {
215 osrfList *msg_list = NULL;
216 osrfMessage *one_msg = NULL;
219 osrfLogDebug(OSRF_LOG_MARK,
220 "WS received opensrf response for thread=%s", tmsg->thread);
222 // first we need to perform some maintenance
223 msg_list = osrfMessageDeserialize(tmsg->body, NULL);
225 for (i = 0; i < msg_list->size; i++) {
226 one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
228 osrfLogDebug(OSRF_LOG_MARK,
229 "WS returned response of type %d", one_msg->m_type);
231 /* if our client just successfully connected to an opensrf service,
232 cache the sender so that future calls on this thread will use
233 the correct recipient. */
234 if (one_msg && one_msg->m_type == STATUS) {
237 // only cache recipients if the client is still connected
238 if (trans->client_connected &&
239 one_msg->status_code == OSRF_STATUS_OK) {
241 if (!apr_hash_get(trans->session_cache,
242 tmsg->thread, APR_HASH_KEY_STRING)) {
244 osrfLogDebug(OSRF_LOG_MARK,
245 "WS caching sender thread=%s, sender=%s",
246 tmsg->thread, tmsg->sender);
248 apr_hash_set(trans->session_cache,
249 apr_pstrdup(trans->session_pool, tmsg->thread),
251 apr_pstrdup(trans->session_pool, tmsg->sender));
256 // connection timed out; clear the cached recipient
257 // regardless of whether the client is still connected
258 if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
259 clear_cached_recipient(tmsg->thread);
264 // osrfMessageDeserialize applies the freeItem handler to the
265 // newly created osrfList. We only need to free the list and
266 // the individual osrfMessage's will be freed along with it
267 osrfListFree(msg_list);
269 if (!trans->client_connected) {
271 osrfLogInfo(OSRF_LOG_MARK,
272 "WS discarding response for thread=%s", tmsg->thread);
277 // client is still connected.
278 // relay the response messages to the client
279 jsonObject *msg_wrapper = NULL;
280 char *msg_string = NULL;
282 // build the wrapper object
283 msg_wrapper = jsonNewObject(NULL);
284 jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
285 jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
286 jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
288 if (tmsg->is_error) {
289 osrfLogError(OSRF_LOG_MARK,
290 "WS received jabber error message in response to thread=%s",
292 jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
295 msg_string = jsonObjectToJSONRaw(msg_wrapper);
297 // drop the JSON on the outbound wire
298 trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
299 (unsigned char*) msg_string, strlen(msg_string));
302 jsonObjectFree(msg_wrapper);
306 * Sleep and regularly wake to see if the process has been idle for too
307 * long. If so, send a disconnect to the client.
310 void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
311 apr_thread_t *thread, void *data) {
313 // sleep time defaults to the check interval, but may
314 // be shortened during shutdown.
315 int sleep_time = idle_check_interval;
316 int shutdown_loops = 0;
320 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
321 osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
325 // note: receiving a signal (e.g. SIGUSR1) will not interrupt
326 // this sleep(), since it's running within its own thread.
327 // During graceful shtudown, we may wait up to
328 // idle_check_interval seconds before initiating shutdown.
331 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
332 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
336 // no client is connected. reset sleep time go back to sleep.
337 if (!trans->client_connected) {
338 sleep_time = idle_check_interval;
342 // do we have any active conversations with the connected client?
343 int active_count = apr_hash_count(trans->session_cache);
347 if (shutdown_requested) {
348 // active conversations means we can't shut down.
349 // shorten the check interval to re-check more often.
351 osrfLogDebug(OSRF_LOG_MARK,
352 "WS: %d active conversation(s) found in shutdown after "
353 "%d attempts. Sleeping...", shutdown_loops, active_count
356 if (shutdown_loops > 30) {
357 // this is clearly a long-running conversation, let's
358 // check less frequently to avoid excessive logging.
365 // active conversations means keep going. There's no point in
366 // checking the idle time (below) if we're mid-conversation
370 // no active conversations
372 if (shutdown_requested) {
373 // there's no need to reset the shutdown vars (loops/requested)
374 // SIGUSR1 is Apaches reload signal, which means this process
375 // will be going away as soon as the client is disconnected.
377 osrfLogInfo(OSRF_LOG_MARK,
378 "WS: no active conversations remain in shutdown; "
379 "closing client connection");
382 // see how long we've been idle. If too long, kick the client
384 time_t now = time(NULL);
385 time_t difference = now - last_activity_time;
387 osrfLogDebug(OSRF_LOG_MARK,
388 "WS has been idle for %d seconds", difference);
390 if (difference < idle_timeout_interval) {
391 // Last activity occurred within the idle timeout interval.
395 // idle timeout exceeded
396 osrfLogDebug(OSRF_LOG_MARK,
397 "WS: idle timeout exceeded. now=%d / last=%d; "
398 "closing client connection", now, last_activity_time);
402 // send a disconnect to the client, which will come back around
403 // to cause our on_disconnect_handler to run.
404 osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
405 trans->server->close(trans->server);
407 // client will be going away, reset sleep time
408 sleep_time = idle_check_interval;
411 // should never get here
416 * Responder thread main body.
417 * Collects responses from the opensrf network and relays them to the
420 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
422 transport_message *tmsg;
425 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
426 osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
430 // wait for a response
431 tmsg = client_recv(osrf_handle, -1);
433 if (!tmsg) continue; // early exit on interrupt
435 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
436 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
440 osrfLogForceXid(tmsg->osrf_xid);
441 osrf_responder_thread_main_body(tmsg);
443 last_activity_time = time(NULL);
452 * Connect to OpenSRF, create the main pool, responder thread
453 * session cache and session pool.
455 int child_init(const WebSocketServer *server) {
457 apr_pool_t *pool = NULL;
458 apr_thread_t *thread = NULL;
459 apr_threadattr_t *thread_attr = NULL;
460 apr_thread_mutex_t *mutex = NULL;
461 request_rec *r = server->request(server);
464 // osrf_handle will already be connected if this is not the first request
465 // served by this process.
466 if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
468 // load config values from the env
469 char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT");
471 if (!atoi(timeout)) {
472 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
473 "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout);
475 idle_timeout_interval = (time_t) atoi(timeout);
479 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
480 "WS: timeout set to %d", idle_timeout_interval);
482 char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
484 if (!atoi(interval)) {
485 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
486 "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s",
490 idle_check_interval = (time_t) atoi(interval);
494 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
495 "WS: idle check interval set to %d", idle_check_interval);
498 char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE");
500 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
501 "WS: config file set to %s", cfile);
505 char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT");
507 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
508 "WS: config context set to %s", ctxt);
512 // connect to opensrf
513 if (!osrfSystemBootstrapClientResc(
514 config_file, config_ctxt, "websocket")) {
516 osrfLogError(OSRF_LOG_MARK,
517 "WS unable to bootstrap OpenSRF client with config %s "
518 "and context %s", config_file, config_ctxt
523 osrfLogSetAppname("osrf_websocket_translator");
524 osrf_handle = osrfSystemGetTransportClient();
527 // create a standalone pool for our translator data
528 if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
529 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
533 // allocate our static translator instance
534 trans = (osrfWebsocketTranslator*)
535 apr_palloc(pool, sizeof(osrfWebsocketTranslator));
538 osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
542 trans->main_pool = pool;
543 trans->server = server;
544 trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
545 trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
547 trans->session_cache = apr_hash_make(pool);
549 if (trans->session_cache == NULL) {
550 osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
554 // Create the responder thread. Once created,
555 // it runs for the lifetime of this process.
556 if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
557 (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
558 (apr_thread_create(&thread, thread_attr,
559 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
561 trans->responder_thread = thread;
564 osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
568 // Create the idle timeout thread, which lives for the lifetime
570 thread = NULL; // reset
571 thread_attr = NULL; // reset
572 if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
573 (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
574 (apr_thread_create(&thread, thread_attr,
575 osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
577 osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
578 trans->idle_timeout_thread = thread;
581 osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
586 if (apr_thread_mutex_create(
587 &mutex, APR_THREAD_MUTEX_UNNESTED,
588 trans->main_pool) != APR_SUCCESS) {
589 osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
593 trans->mutex = mutex;
595 signal(SIGUSR1, sigusr1_handler);
601 * Create the per-client translator
603 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
604 request_rec *r = server->request(server);
606 apr_thread_t *thread = NULL;
607 apr_threadattr_t *thread_attr = NULL;
609 const char* client_ip = get_client_ip(r);
610 osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
614 if (child_init(server) != APR_SUCCESS) {
619 // create a standalone pool for the session cache values
620 // this pool will be destroyed and re-created regularly
621 // to clear session memory
622 if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
623 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
627 trans->session_pool = pool;
628 trans->client_connected = 1;
629 last_activity_time = time(NULL);
636 * for each inbound opensrf message:
637 * 1. Stamp the ingress
638 * 2. REQUEST: log it as activity
639 * 3. DISCONNECT: remove the cached recipient
640 * then re-string-ify for xmpp delivery
643 static char* extract_inbound_messages(
644 const request_rec *r,
647 const char* recipient,
648 const jsonObject *osrf_msg) {
651 int num_msgs = osrf_msg->size;
653 osrfMessage* msg_list[num_msgs];
655 // here we do an extra json round-trip to get the data
656 // in a form osrf_message_deserialize can understand
657 // TODO: consider a version of osrf_message_init which can
658 // accept a jsonObject* instead of a JSON string.
659 char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
660 osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
663 // should we require the caller to always pass the service?
664 if (service == NULL) service = "";
666 for(i = 0; i < num_msgs; i++) {
668 osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
670 switch(msg->m_type) {
673 const jsonObject* params = msg->_params;
674 growing_buffer* act = buffer_init(128);
675 char* method = msg->method_name;
676 buffer_fadd(act, "[%s] [%s] %s %s",
677 get_client_ip(r), "", service, method);
679 const jsonObject* obj = NULL;
682 int redactParams = 0;
683 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
684 if(!strncmp(method, str, strlen(str))) {
690 OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
693 while((obj = jsonObjectGetIndex(params, i++))) {
694 char* str = jsonObjectToJSON(obj);
696 OSRF_BUFFER_ADD(act, " ");
698 OSRF_BUFFER_ADD(act, ", ");
699 OSRF_BUFFER_ADD(act, str);
703 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
709 clear_cached_recipient(thread);
714 char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
716 // clean up our messages
717 for(i = 0; i < num_msgs; i++)
718 osrfMessageFree(msg_list[i]);
724 * Parse opensrf request and relay the request to the opensrf network.
726 static size_t on_message_handler_body(void *data,
727 const WebSocketServer *server, const int type,
728 unsigned char *buffer, const size_t buffer_size) {
730 request_rec *r = server->request(server);
732 jsonObject *msg_wrapper = NULL; // free me
733 const jsonObject *tmp_obj = NULL;
734 const jsonObject *osrf_msg = NULL;
735 const char *service = NULL;
736 const char *thread = NULL;
737 const char *log_xid = NULL;
738 char *msg_body = NULL;
739 char *recipient = NULL;
742 if (buffer_size <= 0) return OK;
744 // generate a new log trace for this request. it
745 // may be replaced by a client-provided trace below.
748 osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
750 // buffer may not be \0-terminated, which jsonParse requires
751 char buf[buffer_size + 1];
752 memcpy(buf, buffer, buffer_size);
753 buf[buffer_size] = '\0';
755 osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", buf);
757 msg_wrapper = jsonParse(buf);
759 if (msg_wrapper == NULL) {
760 osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
761 return HTTP_BAD_REQUEST;
764 osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
766 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service"))
767 service = jsonObjectGetString(tmp_obj);
769 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread"))
770 thread = jsonObjectGetString(tmp_obj);
772 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid"))
773 log_xid = jsonObjectGetString(tmp_obj);
777 // use the caller-provide log trace id
778 if (strlen(log_xid) > MAX_THREAD_SIZE) {
779 osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
780 return HTTP_BAD_REQUEST;
783 osrfLogForceXid(log_xid);
788 if (strlen(thread) > MAX_THREAD_SIZE) {
789 osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
790 return HTTP_BAD_REQUEST;
793 // since clients can provide their own threads at session start time,
794 // the presence of a thread does not guarantee a cached recipient
795 recipient = (char*) apr_hash_get(
796 trans->session_cache, thread, APR_HASH_KEY_STRING);
799 osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
806 int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
807 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);
808 recipient_buf[size] = '\0';
809 recipient = recipient_buf;
812 osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
813 return HTTP_BAD_REQUEST;
817 osrfLogDebug(OSRF_LOG_MARK,
818 "WS relaying message to opensrf thread=%s, recipient=%s",
821 msg_body = extract_inbound_messages(
822 r, service, thread, recipient, osrf_msg);
824 osrfLogInternal(OSRF_LOG_MARK,
825 "WS relaying inbound message: %s", msg_body);
827 transport_message *tmsg = message_init(
828 msg_body, NULL, thread, recipient, NULL);
830 message_set_osrf_xid(tmsg, osrfLogGetXid());
831 client_send_message(osrf_handle, tmsg);
836 jsonObjectFree(msg_wrapper);
839 last_activity_time = time(NULL);
844 static size_t CALLBACK on_message_handler(void *data,
845 const WebSocketServer *server, const int type,
846 unsigned char *buffer, const size_t buffer_size) {
848 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
849 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
850 return 1; // TODO: map to apr_status_t value?
853 apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
855 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
856 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
865 * Clear the session cache, release the session pool
867 void CALLBACK on_disconnect_handler(
868 void *data, const WebSocketServer *server) {
870 osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
871 trans->client_connected = 0;
873 // timeout thread is recreated w/ each new connection
874 apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS);
875 trans->idle_timeout_thread = NULL;
877 // ensure no errant session data is sticking around
878 apr_hash_clear(trans->session_cache);
880 // strictly speaking, this pool will get destroyed when
881 // r->pool is destroyed, but it doesn't hurt to explicitly
882 // destroy it ourselves.
883 apr_pool_destroy(trans->session_pool);
884 trans->session_pool = NULL;
886 request_rec *r = server->request(server);
888 osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r));
892 * Be nice and clean up our mess
894 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
896 apr_thread_exit(trans->responder_thread, APR_SUCCESS);
897 apr_thread_mutex_destroy(trans->mutex);
898 if (trans->session_pool)
899 apr_pool_destroy(trans->session_pool);
900 apr_pool_destroy(trans->main_pool);
906 static WebSocketPlugin osrf_websocket_plugin = {
907 sizeof(WebSocketPlugin),
908 WEBSOCKET_PLUGIN_VERSION_0,
912 on_disconnect_handler
915 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
916 return &osrf_websocket_plugin;