1 /* -----------------------------------------------------------------------
2 * Copyright 2012 Equinox Software, Inc.
3 * Bill Erickson <berick@esilibrary.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 * -----------------------------------------------------------------------
18 * websocket <-> opensrf gateway. Wrapped opensrf messages are extracted
19 * and relayed to the opensrf network. Responses are pulled from the opensrf
20 * network and passed back to the client. Messages are analyzed to determine
21 * when a connect/disconnect occurs, so that the cache of recipients can be
22 * properly managed. We also activity-log REQUEST messages.
24 * Messages to/from the websocket client take the following form:
26 * "service" : "opensrf.foo", // required
27 * "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars.
28 * "log_xid" : "123..32", // optional log trace ID, max 64 chars;
29 * "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...] // required
32 * Each translator operates with two threads. One thread receives messages
33 * from the websocket client, translates, and relays them to the opensrf
34 * network. The second thread collects responses from the opensrf network and
35 * relays them back to the websocket client.
37 * After the initial setup, all thread actions occur within a thread mutex.
38 * The desired affect is a non-threaded application that uses threads for
39 * the sole purpose of having one thread listening for incoming data, while
40 * a second thread listens for responses. When either thread awakens, it's
41 * the only thread in town until it goes back to sleep (i.e. listening on
42 * its socket for data).
44 * Note that with a "thread", which allows us to identify the opensrf session,
45 * the caller does not need to provide a recipient address. The "service" is
46 * only required to start a new opensrf session. After the sesession is
47 * started, all future communication is based solely on the thread. However,
48 * the "service" should be passed by the caller for all requests to ensure it
49 * is properly logged in the activity log.
54 * short-timeout mode for brick detachment where inactivity timeout drops way
55 * down for graceful disconnects.
59 #include "apr_strings.h"
60 #include "apr_thread_proc.h"
62 #include "websocket_plugin.h"
63 #include "opensrf/log.h"
64 #include "opensrf/osrf_json.h"
65 #include "opensrf/transport_client.h"
66 #include "opensrf/transport_message.h"
67 #include "opensrf/osrf_system.h"
68 #include "opensrf/osrfConfig.h"
70 #define MAX_THREAD_SIZE 64
71 #define RECIP_BUF_SIZE 128
72 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
74 typedef struct _osrfWebsocketTranslator {
76 /** Our handle for communicating with the caller */
77 const WebSocketServer *server;
80 * Standalone, per-process APR pool. Primarily
81 * there for managing thread data, which lasts
82 * the duration of the process.
84 apr_pool_t *main_pool;
87 * Map of thread => drone-xmpp-address. Maintaining this
88 * map internally means the caller never need know about
89 * internal XMPP addresses and the server doesn't have to
90 * verify caller-specified recipient addresses. It's
91 * all managed internally.
93 apr_hash_t *session_cache;
96 * session_pool contains the key/value pairs stored in
97 * the session_cache. The pool is regularly destroyed
98 * and re-created to avoid long-term memory consumption
100 apr_pool_t *session_pool;
103 * Thread responsible for collecting responses on the opensrf
104 * network and relaying them back to the caller
106 apr_thread_t *responder_thread;
109 * All message handling code is wrapped in a thread mutex such
110 * that all actions (after the initial setup) are serialized
111 * to minimize the possibility of multi-threading snafus.
113 apr_thread_mutex_t *mutex;
116 * True if a websocket client is currently connected
118 int client_connected;
120 /** OpenSRF jouter name */
123 /** OpenSRF domain */
126 } osrfWebsocketTranslator;
128 static osrfWebsocketTranslator *trans = NULL;
129 static transport_client *osrf_handle = NULL;
130 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
132 static void clear_cached_recipient(const char* thread) {
133 apr_pool_t *pool = NULL;
135 if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
137 osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
139 // remove it from the hash
140 apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
142 if (apr_hash_count(trans->session_cache) == 0) {
143 osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
145 // memory accumulates in the session_pool as sessions are cached then
146 // un-cached. Un-caching removes strings from the hash, but not the
147 // pool itself. That only happens when the pool is destroyed. Here
148 // we destroy the session pool to clear any lingering memory, then
149 // re-create it for future caching.
150 apr_pool_destroy(trans->session_pool);
152 if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
153 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
154 trans->session_pool = NULL;
158 trans->session_pool = pool;
164 void* osrf_responder_thread_main_body(transport_message *tmsg) {
166 osrfList *msg_list = NULL;
167 osrfMessage *one_msg = NULL;
170 osrfLogDebug(OSRF_LOG_MARK,
171 "WS received opensrf response for thread=%s, xid=%s",
172 tmsg->thread, tmsg->osrf_xid);
174 // first we need to perform some maintenance
175 msg_list = osrfMessageDeserialize(tmsg->body, NULL);
177 for (i = 0; i < msg_list->size; i++) {
178 one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
180 osrfLogDebug(OSRF_LOG_MARK,
181 "WS returned response of type %d", one_msg->m_type);
183 /* if our client just successfully connected to an opensrf service,
184 cache the sender so that future calls on this thread will use
185 the correct recipient. */
186 if (one_msg && one_msg->m_type == STATUS) {
189 // only cache recipients if the client is still connected
190 if (trans->client_connected &&
191 one_msg->status_code == OSRF_STATUS_OK) {
193 if (!apr_hash_get(trans->session_cache,
194 tmsg->thread, APR_HASH_KEY_STRING)) {
196 osrfLogDebug(OSRF_LOG_MARK,
197 "WS caching sender thread=%s, sender=%s",
198 tmsg->thread, tmsg->sender);
200 apr_hash_set(trans->session_cache,
201 apr_pstrdup(trans->session_pool, tmsg->thread),
203 apr_pstrdup(trans->session_pool, tmsg->sender));
208 // connection timed out; clear the cached recipient
209 // regardless of whether the client is still connected
210 if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
211 clear_cached_recipient(tmsg->thread);
216 // maintenance is done
217 msg_list->freeItem = osrfMessageFree;
218 osrfListFree(msg_list);
220 if (!trans->client_connected) {
222 osrfLogDebug(OSRF_LOG_MARK,
223 "WS discarding response for thread=%s, xid=%s",
224 tmsg->thread, tmsg->osrf_xid);
230 // client is still connected.
231 // relay the response messages to the client
232 jsonObject *msg_wrapper = NULL;
233 char *msg_string = NULL;
235 // build the wrapper object
236 msg_wrapper = jsonNewObject(NULL);
237 jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
238 jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
239 jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
241 if (tmsg->is_error) {
242 osrfLogError(OSRF_LOG_MARK,
243 "WS received jabber error message in response to thread=%s and xid=%s",
244 tmsg->thread, tmsg->osrf_xid);
245 jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
248 msg_string = jsonObjectToJSONRaw(msg_wrapper);
250 // drop the JSON on the outbound wire
251 trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
252 (unsigned char*) msg_string, strlen(msg_string));
255 jsonObjectFree(msg_wrapper);
260 * Responder thread main body.
261 * Collects responses from the opensrf network and relays them to the
264 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
266 transport_message *tmsg;
269 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
270 osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
274 // wait for a response
275 tmsg = client_recv(osrf_handle, -1);
277 if (!tmsg) continue; // early exit on interrupt
279 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
280 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
284 osrf_responder_thread_main_body(tmsg);
294 * Connect to OpenSRF, create the main pool, responder thread
295 * session cache and session pool.
297 int child_init(const WebSocketServer *server) {
299 apr_pool_t *pool = NULL;
300 apr_thread_t *thread = NULL;
301 apr_threadattr_t *thread_attr = NULL;
302 apr_thread_mutex_t *mutex = NULL;
303 request_rec *r = server->request(server);
305 osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
307 // osrf_handle will already be connected if this is not the first request
308 // served by this process.
309 if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
310 char* config_file = "/openils/conf/opensrf_core.xml";
311 char* config_ctx = "gateway"; //TODO config
312 if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {
313 osrfLogError(OSRF_LOG_MARK,
314 "WS unable to bootstrap OpenSRF client with config %s", config_file);
318 osrf_handle = osrfSystemGetTransportClient();
321 // create a standalone pool for our translator data
322 if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
323 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
328 // allocate our static translator instance
329 trans = (osrfWebsocketTranslator*)
330 apr_palloc(pool, sizeof(osrfWebsocketTranslator));
333 osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
337 trans->main_pool = pool;
338 trans->server = server;
339 trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
340 trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
342 trans->session_cache = apr_hash_make(pool);
344 if (trans->session_cache == NULL) {
345 osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
349 // Create the responder thread. Once created,
350 // it runs for the lifetime of this process.
351 if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
352 (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
353 (apr_thread_create(&thread, thread_attr,
354 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
356 trans->responder_thread = thread;
359 osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
363 if (apr_thread_mutex_create(
364 &mutex, APR_THREAD_MUTEX_UNNESTED,
365 trans->main_pool) != APR_SUCCESS) {
366 osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
370 trans->mutex = mutex;
376 * Create the per-client translator
378 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
379 request_rec *r = server->request(server);
382 osrfLogDebug(OSRF_LOG_MARK,
383 "WS connect from %s", r->connection->remote_ip);
384 //"WS connect from %s", r->connection->client_ip); // apache 2.4
388 if (child_init(server) != APR_SUCCESS) {
393 // create a standalone pool for the session cache values
394 // this pool will be destroyed and re-created regularly
395 // to clear session memory
396 if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
397 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
401 trans->session_pool = pool;
402 trans->client_connected = 1;
408 * for each inbound opensrf message:
409 * 1. Stamp the ingress
410 * 2. REQUEST: log it as activity
411 * 3. DISCONNECT: remove the cached recipient
412 * then re-string-ify for xmpp delivery
415 static char* extract_inbound_messages(
416 const request_rec *r,
419 const char* recipient,
420 const jsonObject *osrf_msg) {
423 int num_msgs = osrf_msg->size;
425 osrfMessage* msg_list[num_msgs];
427 // here we do an extra json round-trip to get the data
428 // in a form osrf_message_deserialize can understand
429 char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
430 osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
433 // should we require the caller to always pass the service?
434 if (service == NULL) service = "";
436 for(i = 0; i < num_msgs; i++) {
438 osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
440 switch(msg->m_type) {
443 const jsonObject* params = msg->_params;
444 growing_buffer* act = buffer_init(128);
445 char* method = msg->method_name;
446 buffer_fadd(act, "[%s] [%s] %s %s",
447 r->connection->remote_ip, "", service, method);
449 const jsonObject* obj = NULL;
452 int redactParams = 0;
453 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
454 if(!strncmp(method, str, strlen(str))) {
460 OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
463 while((obj = jsonObjectGetIndex(params, i++))) {
464 char* str = jsonObjectToJSON(obj);
466 OSRF_BUFFER_ADD(act, " ");
468 OSRF_BUFFER_ADD(act, ", ");
469 OSRF_BUFFER_ADD(act, str);
473 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
479 clear_cached_recipient(thread);
483 osrfMessageFree(msg);
486 char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
488 // clean up our messages
489 for(i = 0; i < num_msgs; i++)
490 osrfMessageFree(msg_list[i]);
496 * Parse opensrf request and relay the request to the opensrf network.
498 static size_t on_message_handler_body(void *data,
499 const WebSocketServer *server, const int type,
500 unsigned char *buffer, const size_t buffer_size) {
502 request_rec *r = server->request(server);
504 jsonObject *msg_wrapper = NULL; // free me
505 const jsonObject *tmp_obj = NULL;
506 const jsonObject *osrf_msg = NULL;
507 const char *service = NULL;
508 const char *thread = NULL;
509 const char *log_xid = NULL;
510 char *msg_body = NULL;
511 char *recipient = NULL;
514 if (buffer_size <= 0) return OK;
516 osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
518 // buffer may not be \0-terminated, which jsonParse requires
519 char buf[buffer_size + 1];
520 memcpy(buf, buffer, buffer_size);
521 buf[buffer_size] = '\0';
523 msg_wrapper = jsonParse(buf);
525 if (msg_wrapper == NULL) {
526 osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
527 return HTTP_BAD_REQUEST;
530 osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
532 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service"))
533 service = jsonObjectGetString(tmp_obj);
535 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread"))
536 thread = jsonObjectGetString(tmp_obj);
538 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid"))
539 log_xid = jsonObjectGetString(tmp_obj);
543 // use the caller-provide log trace id
544 if (strlen(log_xid) > MAX_THREAD_SIZE) {
545 osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
546 return HTTP_BAD_REQUEST;
549 // TODO: make this work with non-client and make this call accept
550 // const char*'s. casting to (char*) for now to silence warnings.
551 osrfLogSetXid((char*) log_xid);
554 // generate a new log trace id for this relay
560 if (strlen(thread) > MAX_THREAD_SIZE) {
561 osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
562 return HTTP_BAD_REQUEST;
565 // since clients can provide their own threads at session start time,
566 // the presence of a thread does not guarantee a cached recipient
567 recipient = (char*) apr_hash_get(
568 trans->session_cache, thread, APR_HASH_KEY_STRING);
571 osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
578 int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
579 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);
580 recipient_buf[size] = '\0';
581 recipient = recipient_buf;
584 osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
585 return HTTP_BAD_REQUEST;
589 osrfLogDebug(OSRF_LOG_MARK,
590 "WS relaying message thread=%s, xid=%s, recipient=%s",
591 thread, osrfLogGetXid(), recipient);
593 msg_body = extract_inbound_messages(
594 r, service, thread, recipient, osrf_msg);
596 transport_message *tmsg = message_init(
597 msg_body, NULL, thread, recipient, NULL);
599 message_set_osrf_xid(tmsg, osrfLogGetXid());
600 client_send_message(osrf_handle, tmsg);
605 jsonObjectFree(msg_wrapper);
611 static size_t CALLBACK on_message_handler(void *data,
612 const WebSocketServer *server, const int type,
613 unsigned char *buffer, const size_t buffer_size) {
615 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
616 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
617 return 1; // TODO: map to apr_status_t value?
620 apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
622 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
623 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
632 * Clear the session cache, release the session pool
634 void CALLBACK on_disconnect_handler(
635 void *data, const WebSocketServer *server) {
637 osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
638 trans->client_connected = 0;
640 // ensure no errant session data is sticking around
641 apr_hash_clear(trans->session_cache);
643 // strictly speaking, this pool will get destroyed when
644 // r->pool is destroyed, but it doesn't hurt to explicitly
645 // destroy it ourselves.
646 apr_pool_destroy(trans->session_pool);
647 trans->session_pool = NULL;
649 request_rec *r = server->request(server);
651 osrfLogDebug(OSRF_LOG_MARK,
652 "WS disconnect from %s", r->connection->remote_ip);
653 //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
657 * Be nice and clean up our mess
659 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
661 apr_thread_exit(trans->responder_thread, APR_SUCCESS);
662 apr_thread_mutex_destroy(trans->mutex);
663 apr_pool_destroy(trans->session_pool);
664 apr_pool_destroy(trans->main_pool);
670 static WebSocketPlugin osrf_websocket_plugin = {
671 sizeof(WebSocketPlugin),
672 WEBSOCKET_PLUGIN_VERSION_0,
676 on_disconnect_handler
679 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
680 return &osrf_websocket_plugin;