1 /* -----------------------------------------------------------------------
2 * Copyright 2012 Equinox Software, Inc.
3 * Bill Erickson <berick@esilibrary.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 * -----------------------------------------------------------------------
18 * Dumb websocket <-> opensrf gateway. Wrapped opensrf messages are extracted
19 * and relayed to the opensrf network. Responses are pulled from the opensrf
20 * network and passed back to the client. No attempt is made to understand
21 * the contents of the messages.
23 * Messages to/from the websocket client take the following form:
25 * "service" : "opensrf.foo", // required for new sessions (inbound only)
26 * "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars.
27 * "log_xid" : "123..32", // optional log trace ID, max 64 chars;
28 * "osrf_msg" : {<osrf_msg>} // required
31 * Each translator operates with two threads. One thread receives messages
32 * from the websocket client, translates, and relays them to the opensrf
33 * network. The second thread collects responses from the opensrf network and
34 * relays them back to the websocket client.
36 * The main thread reads from socket A (apache) and writes to socket B
37 * (openesrf), while the responder thread reads from B and writes to A. The
38 * apr data structures used are threadsafe. For now, no thread mutex's are
41 * Note that with a "thread", which allows us to identify the opensrf session,
42 * the caller does not need to provide a recipient address. The "service" is
43 * only required to start a new opensrf session. After the sesession is
44 * started, all future communication is based solely on the thread.
46 * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care
47 * about the contents of the messages.
52 * short-timeout mode for brick detachment where inactivity timeout drops way
53 * down for graceful disconnects.
57 #include "apr_strings.h"
58 #include "apr_thread_proc.h"
60 #include "websocket_plugin.h"
61 #include "opensrf/log.h"
62 #include "opensrf/osrf_json.h"
63 #include "opensrf/transport_client.h"
64 #include "opensrf/transport_message.h"
65 #include "opensrf/osrf_system.h"
66 #include "opensrf/osrfConfig.h"
68 #define MAX_THREAD_SIZE 64
69 #define RECIP_BUF_SIZE 128
70 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
72 typedef struct _osrfWebsocketTranslator {
73 const WebSocketServer *server;
74 apr_pool_t *main_pool; // standalone per-process pool
75 apr_pool_t *session_pool; // child of r->pool; per-session
76 apr_hash_t *session_cache;
77 apr_thread_t *responder_thread;
78 apr_thread_mutex_t *mutex;
82 } osrfWebsocketTranslator;
84 static osrfWebsocketTranslator *trans = NULL;
85 static transport_client *osrf_handle = NULL;
86 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
89 static void clear_cached_recipient(const char* thread) {
90 apr_pool_t *pool = NULL;
92 if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
94 osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
96 // remove it from the hash
97 apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
99 if (apr_hash_count(trans->session_cache) == 0) {
100 osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
102 // memory accumulates in the session_pool as sessions are cached then
103 // un-cached. Un-caching removes strings from the hash, but not the
104 // pool itself. That only happens when the pool is destroyed. destroy
105 // the session pool to clear any lingering memory
106 apr_pool_destroy(trans->session_pool);
108 // create a standalone pool for our translator data
109 if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
110 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
111 trans->session_pool = NULL;
115 trans->session_pool = pool;
122 void* osrf_responder_thread_main_body(transport_message *tmsg) {
124 osrfList *msg_list = NULL;
125 osrfMessage *one_msg = NULL;
128 osrfLogDebug(OSRF_LOG_MARK,
129 "WS received opensrf response for thread=%s, xid=%s",
130 tmsg->thread, tmsg->osrf_xid);
132 // first we need to perform some maintenance
133 msg_list = osrfMessageDeserialize(tmsg->body, NULL);
135 for (i = 0; i < msg_list->size; i++) {
136 one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
138 osrfLogDebug(OSRF_LOG_MARK,
139 "WS returned response of type %d", one_msg->m_type);
141 /* if our client just successfully connected to an opensrf service,
142 cache the sender so that future calls on this thread will use
143 the correct recipient. */
144 if (one_msg && one_msg->m_type == STATUS) {
147 // only cache recipients if the client is still connected
148 if (trans->client_connected &&
149 one_msg->status_code == OSRF_STATUS_OK) {
151 if (!apr_hash_get(trans->session_cache,
152 tmsg->thread, APR_HASH_KEY_STRING)) {
154 osrfLogDebug(OSRF_LOG_MARK,
155 "WS caching sender thread=%s, sender=%s",
156 tmsg->thread, tmsg->sender);
158 apr_hash_set(trans->session_cache,
159 apr_pstrdup(trans->session_pool, tmsg->thread),
161 apr_pstrdup(trans->session_pool, tmsg->sender));
166 // connection timed out; clear the cached recipient
167 // regardless of whether the client is still connected
168 if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
169 clear_cached_recipient(tmsg->thread);
174 // maintenance is done
175 osrfListFree(msg_list);
177 if (!trans->client_connected) {
178 // responses received after client disconnect are discarded
180 osrfLogDebug(OSRF_LOG_MARK,
181 "WS discarding response for thread=%s, xid=%s",
182 tmsg->thread, tmsg->osrf_xid);
188 // client is still connected; relay the messages to the client
189 jsonObject *msg_wrapper = NULL;
190 char *msg_string = NULL;
192 // build the wrapper object
193 msg_wrapper = jsonNewObject(NULL);
194 jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
195 jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
196 jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
198 if (tmsg->is_error) {
200 "WS received jabber error message in response to thread=%s and xid=%s",
201 tmsg->thread, tmsg->osrf_xid);
203 jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
206 msg_string = jsonObjectToJSONRaw(msg_wrapper);
208 // deliver the wrapped message json to the websocket client
209 trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
210 (unsigned char*) msg_string, strlen(msg_string));
213 jsonObjectFree(msg_wrapper);
218 * Responder thread main body.
219 * Collects responses from the opensrf network and relays them to the
222 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
224 transport_message *tmsg;
227 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
228 osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
232 // wait for a response
233 tmsg = client_recv(osrf_handle, -1);
235 if (!tmsg) continue; // early exit on interrupt
237 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
238 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
242 osrf_responder_thread_main_body(tmsg);
252 * Allocate the session cache and create the responder thread
254 int child_init(const WebSocketServer *server) {
256 apr_pool_t *pool = NULL;
257 apr_thread_t *thread = NULL;
258 apr_threadattr_t *thread_attr = NULL;
259 apr_thread_mutex_t *mutex = NULL;
260 request_rec *r = server->request(server);
262 osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
264 // osrf_handle will already be connected if this is not the first request
265 // served by this process.
266 if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
267 char* config_file = "/openils/conf/opensrf_core.xml";
268 char* config_ctx = "gateway"; //TODO config
269 if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {
270 osrfLogError(OSRF_LOG_MARK,
271 "WS unable to bootstrap OpenSRF client with config %s", config_file);
275 osrf_handle = osrfSystemGetTransportClient();
278 // create a standalone pool for our translator data
279 if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
280 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
285 // allocate our static translator instance
286 trans = (osrfWebsocketTranslator*)
287 apr_palloc(pool, sizeof(osrfWebsocketTranslator));
290 osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
294 trans->main_pool = pool;
295 trans->server = server;
296 trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
297 trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
299 trans->session_cache = apr_hash_make(pool);
301 if (trans->session_cache == NULL) {
302 osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
306 // Create the responder thread. Once created,
307 // it runs for the lifetime of this process.
308 if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
309 (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
310 (apr_thread_create(&thread, thread_attr,
311 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
313 trans->responder_thread = thread;
316 osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
320 if (apr_thread_mutex_create(
321 &mutex, APR_THREAD_MUTEX_UNNESTED,
322 trans->main_pool) != APR_SUCCESS) {
323 osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
327 trans->mutex = mutex;
333 * Create the per-client translator
335 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
336 request_rec *r = server->request(server);
339 osrfLogDebug(OSRF_LOG_MARK,
340 "WS connect from %s", r->connection->remote_ip);
341 //"WS connect from %s", r->connection->client_ip); // apache 2.4
344 if (child_init(server) != APR_SUCCESS) {
349 // create a standalone pool for the session cache values
350 // this pool will be destroyed and re-created regularly to
351 // clear session memory
352 if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
353 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
357 trans->session_pool = pool;
358 trans->client_connected = 1;
364 * for each inbound opensrf message:
365 * 1. Stamp the ingress
366 * 2. REQUEST: log it as activity
367 * 3. DISCONNECT: remove the cached recipient
368 * then re-string-ify for xmpp delivery
371 static char* extract_inbound_messages(
372 const request_rec *r,
375 const char* recipient,
376 const jsonObject *osrf_msg) {
379 int num_msgs = osrf_msg->size;
381 osrfMessage* msg_list[num_msgs];
383 // here we do an extra json round-trip to get the data
384 // in a form osrf_message_deserialize can understand
385 char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
386 osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
389 // should we require the caller to always pass the service?
390 if (service == NULL) service = "";
392 for(i = 0; i < num_msgs; i++) {
394 osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
396 switch(msg->m_type) {
399 const jsonObject* params = msg->_params;
400 growing_buffer* act = buffer_init(128);
401 char* method = msg->method_name;
402 buffer_fadd(act, "[%s] [%s] %s %s",
403 r->connection->remote_ip, "", service, method);
405 const jsonObject* obj = NULL;
408 int redactParams = 0;
409 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
410 if(!strncmp(method, str, strlen(str))) {
416 OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
419 while((obj = jsonObjectGetIndex(params, i++))) {
420 char* str = jsonObjectToJSON(obj);
422 OSRF_BUFFER_ADD(act, " ");
424 OSRF_BUFFER_ADD(act, ", ");
425 OSRF_BUFFER_ADD(act, str);
429 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
435 clear_cached_recipient(thread);
440 return osrfMessageSerializeBatch(msg_list, num_msgs);
444 * Parse opensrf request and relay the request to the opensrf network.
446 static size_t on_message_handler_body(void *data,
447 const WebSocketServer *server, const int type,
448 unsigned char *buffer, const size_t buffer_size) {
450 request_rec *r = server->request(server);
452 jsonObject *msg_wrapper = NULL; // free me
453 const jsonObject *tmp_obj = NULL;
454 const jsonObject *osrf_msg = NULL;
455 const char *service = NULL;
456 const char *thread = NULL;
457 const char *log_xid = NULL;
458 char *msg_body = NULL;
459 char *recipient = NULL;
462 if (buffer_size <= 0) return OK;
464 osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
466 // buffer may not be \0-terminated, which jsonParse requires
467 char buf[buffer_size + 1];
468 memcpy(buf, buffer, buffer_size);
469 buf[buffer_size] = '\0';
471 msg_wrapper = jsonParse(buf);
473 if (msg_wrapper == NULL) {
474 osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
475 return HTTP_BAD_REQUEST;
478 osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
480 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service"))
481 service = jsonObjectGetString(tmp_obj);
483 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread"))
484 thread = jsonObjectGetString(tmp_obj);
486 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid"))
487 log_xid = jsonObjectGetString(tmp_obj);
491 // use the caller-provide log trace id
492 if (strlen(log_xid) > MAX_THREAD_SIZE) {
493 osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
494 return HTTP_BAD_REQUEST;
497 // TODO: make this work with non-client and make this call accept
498 // const char*'s. casting to (char*) for now to silence warnings.
499 osrfLogSetXid((char*) log_xid);
502 // generate a new log trace id for this relay
508 if (strlen(thread) > MAX_THREAD_SIZE) {
509 osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
510 return HTTP_BAD_REQUEST;
513 // since clients can provide their own threads at session start time,
514 // the presence of a thread does not guarantee a cached recipient
515 recipient = (char*) apr_hash_get(
516 trans->session_cache, thread, APR_HASH_KEY_STRING);
519 osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
526 int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
527 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);
528 recipient_buf[size] = '\0';
529 recipient = recipient_buf;
532 osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
533 return HTTP_BAD_REQUEST;
537 osrfLogDebug(OSRF_LOG_MARK,
538 "WS relaying message thread=%s, xid=%s, recipient=%s",
539 thread, osrfLogGetXid(), recipient);
541 msg_body = extract_inbound_messages(
542 r, service, thread, recipient, osrf_msg);
544 transport_message *tmsg = message_init(
545 msg_body, NULL, thread, recipient, NULL);
547 message_set_osrf_xid(tmsg, osrfLogGetXid());
548 client_send_message(osrf_handle, tmsg);
553 jsonObjectFree(msg_wrapper);
559 static size_t CALLBACK on_message_handler(void *data,
560 const WebSocketServer *server, const int type,
561 unsigned char *buffer, const size_t buffer_size) {
563 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
564 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
565 return 1; // TODO: map to apr_status_t value?
568 apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
570 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
571 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
580 * Release all memory allocated from the translator pool and kill the pool.
582 void CALLBACK on_disconnect_handler(
583 void *data, const WebSocketServer *server) {
585 osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
586 trans->client_connected = 0;
589 It's not necessary to destroy our session_pool, since
590 it's a child of the apache request_rec pool, which is
591 destroyed after client disconnect.
592 apr_pool_destroy(trans->session_pool);
595 trans->session_pool = NULL;
597 request_rec *r = server->request(server);
598 osrfLogDebug(OSRF_LOG_MARK,
599 "WS disconnect from %s", r->connection->remote_ip);
600 //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
604 * Be nice and clean up our mess
606 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
608 apr_thread_exit(trans->responder_thread, APR_SUCCESS);
609 apr_thread_mutex_destroy(trans->mutex);
610 apr_pool_destroy(trans->main_pool);
616 static WebSocketPlugin osrf_websocket_plugin = {
617 sizeof(WebSocketPlugin),
618 WEBSOCKET_PLUGIN_VERSION_0,
622 on_disconnect_handler
625 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
626 return &osrf_websocket_plugin;