]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/gateway/osrf_websocket_translator.c
LP#1268619: websocket : apache gateway minor fixes
[OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  Messages are analyzed to determine
21  * when a connect/disconnect occurs, so that the cache of recipients can be
22  * properly managed.  We also activity-log REQUEST messages.
23  *
24  * Messages to/from the websocket client take the following form:
25  * {
26  *   "service"  : "opensrf.foo", // required
27  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
28  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
29  *   "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...]   // required
30  * }
31  *
32  * Each translator operates with three threads.  One thread receives messages
33  * from the websocket client, translates, and relays them to the opensrf 
34  * network. The second thread collects responses from the opensrf network and 
35  * relays them back to the websocket client.  The third thread inspects
36  * the idle timeout interval t see if it's time to drop the idle client.
37  *
38  * After the initial setup, all thread actions occur within a thread
39  * mutex.  The desired affect is a non-threaded application that uses
40  * threads for the sole purpose of having one thread listening for
41  * incoming data, while a second thread listens for responses, and a
42  * third checks the idle timeout.  When any thread awakens, it's the
43  * only thread in town until it goes back to sleep (i.e. listening on
44  * its socket for data).
45  *
46  * Note that with the opensrf "thread", which allows us to identify the
47  * opensrf session, the caller does not need to provide a recipient
48  * address.  The "service" is only required to start a new opensrf
49  * session.  After the sesession is started, all future communication is
50  * based solely on the thread.  However, the "service" should be passed
51  * by the caller for all requests to ensure it is properly logged in the
52  * activity log.
53  *
54  * Every inbound and outbound message updates the last_activity_time.
55  * A separate thread wakes periodically to see if the time since the
56  * last_activity_time exceeds the configured idle_timeout_interval.  If
57  * so, a disconnect is sent to the client, completing the conversation.
58  *
59  * Configuration goes directly into the Apache envvars file.  
60  * (e.g. /etc/apache2-websockets/envvars).  As of today, it's not 
61  * possible to leverage Apache configuration directives directly,
62  * since this is not an Apache module, but a shared library loaded
63  * by an apache module.  This includes SetEnv / SetEnvIf.
64  *
65  * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300
66  * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
67  * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
68  * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
69  */
70
71 #include <stdlib.h>
72 #include <signal.h>
73 #include <unistd.h>
74 #include "httpd.h"
75 #include "http_log.h"
76 #include "apr_strings.h"
77 #include "apr_thread_proc.h"
78 #include "apr_hash.h"
79 #include "websocket_plugin.h"
80 #include "opensrf/log.h"
81 #include "opensrf/osrf_json.h"
82 #include "opensrf/transport_client.h"
83 #include "opensrf/transport_message.h"
84 #include "opensrf/osrf_system.h"                                                
85 #include "opensrf/osrfConfig.h"
86
87 #define MAX_THREAD_SIZE 64
88 #define RECIP_BUF_SIZE 128
89 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
90
91
92 // default values, replaced during setup (below) as needed.
93 static char* config_file = "/openils/conf/opensrf_core.xml";
94 static char* config_ctxt = "gateway";
95 static time_t idle_timeout_interval = 60; 
96 static time_t idle_check_interval = 5;
97 static time_t last_activity_time = 0;
98
99 // true if we've received a signal to start graceful shutdown
100 static int shutdown_requested = 0; 
101 static void sigusr1_handler(int sig);
102 static void sigusr1_handler(int sig) {                                       
103     shutdown_requested = 1; 
104     signal(SIGUSR1, sigusr1_handler);
105     osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown");
106 }
107
108 static const char* get_client_ip(const request_rec* r) {
109 #ifdef APACHE_MIN_24
110     return r->connection->client_ip;
111 #else
112     return r->connection->remote_ip;
113 #endif
114 }
115
116 typedef struct _osrfWebsocketTranslator {
117
118     /** Our handle for communicating with the caller */
119     const WebSocketServer *server;
120     
121     /**
122      * Standalone, per-process APR pool.  Primarily
123      * there for managing thread data, which lasts 
124      * the duration of the process.
125      */
126     apr_pool_t *main_pool;
127
128     /**
129      * Map of thread => drone-xmpp-address.  Maintaining this
130      * map internally means the caller never need know about
131      * internal XMPP addresses and the server doesn't have to 
132      * verify caller-specified recipient addresses.  It's
133      * all managed internally.
134      */
135     apr_hash_t *session_cache; 
136
137     /**
138      * session_pool contains the key/value pairs stored in
139      * the session_cache.  The pool is regularly destroyed
140      * and re-created to avoid long-term memory consumption
141      */
142     apr_pool_t *session_pool;
143
144     /**
145      * Thread responsible for collecting responses on the opensrf
146      * network and relaying them back to the caller
147      */
148     apr_thread_t *responder_thread;
149
150     /**
151      * Thread responsible for checking inactivity timeout.
152      * If no activitity occurs within the configured interval,
153      * a disconnect is sent to the client and the connection
154      * is terminated.
155      */
156     apr_thread_t *idle_timeout_thread;
157
158     /**
159      * All message handling code is wrapped in a thread mutex such
160      * that all actions (after the initial setup) are serialized
161      * to minimize the possibility of multi-threading snafus.
162      */
163     apr_thread_mutex_t *mutex;
164
165     /**
166      * True if a websocket client is currently connected
167      */
168     int client_connected;
169
170     /** OpenSRF jouter name */
171     char* osrf_router;
172
173     /** OpenSRF domain */
174     char* osrf_domain;
175
176 } osrfWebsocketTranslator;
177
178 static osrfWebsocketTranslator *trans = NULL;
179 static transport_client *osrf_handle = NULL;
180 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
181
182 static void clear_cached_recipient(const char* thread) {
183     apr_pool_t *pool = NULL;                                                
184
185     if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
186
187         osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
188
189         // remove it from the hash
190         apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
191
192         if (apr_hash_count(trans->session_cache) == 0) {
193             osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
194
195             // memory accumulates in the session_pool as sessions are cached then 
196             // un-cached.  Un-caching removes strings from the hash, but not the 
197             // pool itself.  That only happens when the pool is destroyed. Here
198             // we destroy the session pool to clear any lingering memory, then
199             // re-create it for future caching.
200             apr_pool_destroy(trans->session_pool);
201     
202             if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
203                 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
204                 trans->session_pool = NULL;
205                 return;
206             }
207
208             trans->session_pool = pool;
209         }
210     }
211 }
212
213 void* osrf_responder_thread_main_body(transport_message *tmsg) {
214
215     osrfList *msg_list = NULL;
216     osrfMessage *one_msg = NULL;
217     int i;
218
219     osrfLogDebug(OSRF_LOG_MARK, 
220         "WS received opensrf response for thread=%s", tmsg->thread);
221
222     // first we need to perform some maintenance
223     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
224
225     for (i = 0; i < msg_list->size; i++) {
226         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
227
228         osrfLogDebug(OSRF_LOG_MARK, 
229             "WS returned response of type %d", one_msg->m_type);
230
231         /*  if our client just successfully connected to an opensrf service,
232             cache the sender so that future calls on this thread will use
233             the correct recipient. */
234         if (one_msg && one_msg->m_type == STATUS) {
235
236
237             // only cache recipients if the client is still connected
238             if (trans->client_connected && 
239                     one_msg->status_code == OSRF_STATUS_OK) {
240
241                 if (!apr_hash_get(trans->session_cache, 
242                         tmsg->thread, APR_HASH_KEY_STRING)) {
243
244                     osrfLogDebug(OSRF_LOG_MARK, 
245                         "WS caching sender thread=%s, sender=%s", 
246                         tmsg->thread, tmsg->sender);
247
248                     apr_hash_set(trans->session_cache, 
249                         apr_pstrdup(trans->session_pool, tmsg->thread),
250                         APR_HASH_KEY_STRING, 
251                         apr_pstrdup(trans->session_pool, tmsg->sender));
252                 }
253
254             } else {
255
256                 // connection timed out; clear the cached recipient
257                 // regardless of whether the client is still connected
258                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
259                     clear_cached_recipient(tmsg->thread);
260             }
261         }
262     }
263
264     // osrfMessageDeserialize applies the freeItem handler to the 
265     // newly created osrfList.  We only need to free the list and 
266     // the individual osrfMessage's will be freed along with it
267     osrfListFree(msg_list);
268
269     if (!trans->client_connected) {
270
271         osrfLogInfo(OSRF_LOG_MARK, 
272             "WS discarding response for thread=%s", tmsg->thread);
273
274         return;
275     }
276     
277     // client is still connected. 
278     // relay the response messages to the client
279     jsonObject *msg_wrapper = NULL;
280     char *msg_string = NULL;
281
282     // build the wrapper object
283     msg_wrapper = jsonNewObject(NULL);
284     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
285     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
286     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
287
288     if (tmsg->is_error) {
289         osrfLogError(OSRF_LOG_MARK, 
290             "WS received jabber error message in response to thread=%s", 
291             tmsg->thread);
292         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
293     }
294
295     msg_string = jsonObjectToJSONRaw(msg_wrapper);
296
297     // drop the JSON on the outbound wire
298     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
299         (unsigned char*) msg_string, strlen(msg_string));
300
301     free(msg_string);
302     jsonObjectFree(msg_wrapper);
303 }
304
305 /**
306  * Sleep and regularly wake to see if the process has been idle for too
307  * long.  If so, send a disconnect to the client.
308  *
309  */
310 void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
311         apr_thread_t *thread, void *data) {
312
313     // sleep time defaults to the check interval, but may 
314     // be shortened during shutdown.
315     int sleep_time = idle_check_interval;
316     int shutdown_loops = 0;
317
318     while (1) {
319
320         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
321             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
322             return NULL;
323         }
324
325         // note: receiving a signal (e.g. SIGUSR1) will not interrupt
326         // this sleep(), since it's running within its own thread.
327         // During graceful shtudown, we may wait up to 
328         // idle_check_interval seconds before initiating shutdown.
329         sleep(sleep_time);
330         
331         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
332             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
333             return NULL;
334         }
335
336         // no client is connected.  reset sleep time go back to sleep.
337         if (!trans->client_connected) {
338             sleep_time = idle_check_interval;
339             continue;
340         }
341
342         // do we have any active conversations with the connected client?
343         int active_count = apr_hash_count(trans->session_cache);
344
345         if (active_count) {
346
347             if (shutdown_requested) {
348                 // active conversations means we can't shut down.  
349                 // shorten the check interval to re-check more often.
350                 shutdown_loops++;
351                 osrfLogDebug(OSRF_LOG_MARK, 
352                     "WS: %d active conversation(s) found in shutdown after "
353                     "%d attempts.  Sleeping...", shutdown_loops, active_count
354                 );
355
356                 if (shutdown_loops > 30) {
357                     // this is clearly a long-running conversation, let's
358                     // check less frequently to avoid excessive logging.
359                     sleep_time = 3;
360                 } else {
361                     sleep_time = 1;
362                 }
363             } 
364
365             // active conversations means keep going.  There's no point in
366             // checking the idle time (below) if we're mid-conversation
367             continue;
368         }
369
370         // no active conversations
371              
372         if (shutdown_requested) {
373             // there's no need to reset the shutdown vars (loops/requested)
374             // SIGUSR1 is Apaches reload signal, which means this process
375             // will be going away as soon as the client is disconnected.
376
377             osrfLogInfo(OSRF_LOG_MARK,
378                 "WS: no active conversations remain in shutdown; "
379                     "closing client connection");
380
381         } else { 
382             // see how long we've been idle.  If too long, kick the client
383
384             time_t now = time(NULL);
385             time_t difference = now - last_activity_time;
386
387             osrfLogDebug(OSRF_LOG_MARK, 
388                 "WS has been idle for %d seconds", difference);
389
390             if (difference < idle_timeout_interval) {
391                 // Last activity occurred within the idle timeout interval.
392                 continue;
393             }
394
395             // idle timeout exceeded
396             osrfLogDebug(OSRF_LOG_MARK, 
397                 "WS: idle timeout exceeded.  now=%d / last=%d; " 
398                 "closing client connection", now, last_activity_time);
399         }
400
401
402         // send a disconnect to the client, which will come back around
403         // to cause our on_disconnect_handler to run.
404         osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
405         trans->server->close(trans->server);
406
407         // client will be going away, reset sleep time
408         sleep_time = idle_check_interval;
409     }
410
411     // should never get here
412     return NULL;
413 }
414
415 /**
416  * Responder thread main body.
417  * Collects responses from the opensrf network and relays them to the 
418  * websocket caller.
419  */
420 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
421
422     transport_message *tmsg;
423     while (1) {
424
425         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
426             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
427             return NULL;
428         }
429
430         // wait for a response
431         tmsg = client_recv(osrf_handle, -1);
432
433         if (!tmsg) continue; // early exit on interrupt
434
435         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
436             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
437             return NULL;
438         }
439
440         osrfLogForceXid(tmsg->osrf_xid);
441         osrf_responder_thread_main_body(tmsg);
442         message_free(tmsg);                                                         
443         last_activity_time = time(NULL);
444     }
445
446     return NULL;
447 }
448
449
450
451 /**
452  * Connect to OpenSRF, create the main pool, responder thread
453  * session cache and session pool.
454  */
455 int child_init(const WebSocketServer *server) {
456
457     apr_pool_t *pool = NULL;                                                
458     apr_thread_t *thread = NULL;
459     apr_threadattr_t *thread_attr = NULL;
460     apr_thread_mutex_t *mutex = NULL;
461     request_rec *r = server->request(server);
462
463
464     // osrf_handle will already be connected if this is not the first request
465     // served by this process.
466     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
467         
468         // load config values from the env
469         char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT");
470         if (timeout) {
471             if (!atoi(timeout)) {
472                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
473                     "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout);
474             } else {
475                 idle_timeout_interval = (time_t) atoi(timeout);
476             }
477         }
478
479         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
480             "WS: timeout set to %d", idle_timeout_interval);
481
482         char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
483         if (interval) {
484             if (!atoi(interval)) {
485                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
486                     "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s", 
487                     interval
488                 );
489             } else {
490                 idle_check_interval = (time_t) atoi(interval);
491             }
492         } 
493
494         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
495             "WS: idle check interval set to %d", idle_check_interval);
496
497       
498         char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE");
499         if (cfile) {
500             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
501                 "WS: config file set to %s", cfile);
502             config_file = cfile;
503         }
504
505         char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT");
506         if (ctxt) {
507             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
508                 "WS: config context set to %s", ctxt);
509             config_ctxt = ctxt;
510         }
511
512         // connect to opensrf
513         if (!osrfSystemBootstrapClientResc(
514                 config_file, config_ctxt, "websocket")) {   
515
516             osrfLogError(OSRF_LOG_MARK, 
517                 "WS unable to bootstrap OpenSRF client with config %s "
518                 "and context %s", config_file, config_ctxt
519             ); 
520             return 1;
521         }
522
523         osrf_handle = osrfSystemGetTransportClient();
524     }
525
526     // create a standalone pool for our translator data
527     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
528         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
529         return 1;
530     }
531
532     // allocate our static translator instance
533     trans = (osrfWebsocketTranslator*) 
534         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
535
536     if (trans == NULL) {
537         osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
538         return 1;
539     }
540
541     trans->main_pool = pool;
542     trans->server = server;
543     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
544     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
545
546     trans->session_cache = apr_hash_make(pool);
547
548     if (trans->session_cache == NULL) {
549         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
550         return 1;
551     }
552
553     // Create the responder thread.  Once created, 
554     // it runs for the lifetime of this process.
555     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
556          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
557          (apr_thread_create(&thread, thread_attr, 
558                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
559
560         trans->responder_thread = thread;
561         
562     } else {
563         osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
564         return 1;
565     }
566
567     // Create the idle timeout thread, which lives for the lifetime
568     // of the process.
569     thread = NULL; // reset
570     thread_attr = NULL; // reset
571     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
572          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
573          (apr_thread_create(&thread, thread_attr, 
574             osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
575
576         osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
577         trans->idle_timeout_thread = thread;
578         
579     } else {
580         osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
581         return 1;
582     }
583
584
585     if (apr_thread_mutex_create(
586             &mutex, APR_THREAD_MUTEX_UNNESTED, 
587             trans->main_pool) != APR_SUCCESS) {
588         osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
589         return 1;
590     }
591
592     trans->mutex = mutex;
593
594     signal(SIGUSR1, sigusr1_handler);
595
596     return APR_SUCCESS;
597 }
598
599 /**
600  * Create the per-client translator
601  */
602 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
603     request_rec *r = server->request(server);
604     apr_pool_t *pool;
605     apr_thread_t *thread = NULL;
606     apr_threadattr_t *thread_attr = NULL;
607
608     const char* client_ip = get_client_ip(r);
609     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
610
611     if (!trans) {
612         // first connection
613         if (child_init(server) != APR_SUCCESS) {
614             return NULL;
615         }
616     }
617
618     // create a standalone pool for the session cache values
619     // this pool will be destroyed and re-created regularly
620     // to clear session memory
621     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
622         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
623         return NULL;
624     }
625
626     trans->session_pool = pool;
627     trans->client_connected = 1;
628     last_activity_time = time(NULL);
629
630     return trans;
631 }
632
633
634 /** 
635  * for each inbound opensrf message:
636  * 1. Stamp the ingress
637  * 2. REQUEST: log it as activity
638  * 3. DISCONNECT: remove the cached recipient
639  * then re-string-ify for xmpp delivery
640  */
641
642 static char* extract_inbound_messages(
643         const request_rec *r, 
644         const char* service, 
645         const char* thread, 
646         const char* recipient, 
647         const jsonObject *osrf_msg) {
648
649     int i;
650     int num_msgs = osrf_msg->size;
651     osrfMessage* msg;
652     osrfMessage* msg_list[num_msgs];
653
654     // here we do an extra json round-trip to get the data
655     // in a form osrf_message_deserialize can understand
656     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
657     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
658     free(osrf_msg_json);
659
660     // should we require the caller to always pass the service?
661     if (service == NULL) service = "";
662
663     for(i = 0; i < num_msgs; i++) {
664         msg = msg_list[i];
665         osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
666
667         switch(msg->m_type) {
668
669             case REQUEST: {
670                 const jsonObject* params = msg->_params;
671                 growing_buffer* act = buffer_init(128);
672                 char* method = msg->method_name;
673                 buffer_fadd(act, "[%s] [%s] %s %s", 
674                     get_client_ip(r), "", service, method);
675
676                 const jsonObject* obj = NULL;
677                 int i = 0;
678                 const char* str;
679                 int redactParams = 0;
680                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
681                     if(!strncmp(method, str, strlen(str))) {
682                         redactParams = 1;
683                         break;
684                     }
685                 }
686                 if(redactParams) {
687                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
688                 } else {
689                     i = 0;
690                     while((obj = jsonObjectGetIndex(params, i++))) {
691                         char* str = jsonObjectToJSON(obj);
692                         if( i == 1 )
693                             OSRF_BUFFER_ADD(act, " ");
694                         else
695                             OSRF_BUFFER_ADD(act, ", ");
696                         OSRF_BUFFER_ADD(act, str);
697                         free(str);
698                     }
699                 }
700                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
701                 buffer_free(act);
702                 break;
703             }
704
705             case DISCONNECT:
706                 clear_cached_recipient(thread);
707                 break;
708         }
709     }
710
711     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
712
713     // clean up our messages
714     for(i = 0; i < num_msgs; i++) 
715         osrfMessageFree(msg_list[i]);
716
717     return finalMsg;
718 }
719
720 /**
721  * Parse opensrf request and relay the request to the opensrf network.
722  */
723 static size_t on_message_handler_body(void *data,
724                 const WebSocketServer *server, const int type, 
725                 unsigned char *buffer, const size_t buffer_size) {
726
727     request_rec *r = server->request(server);
728
729     jsonObject *msg_wrapper = NULL; // free me
730     const jsonObject *tmp_obj = NULL;
731     const jsonObject *osrf_msg = NULL;
732     const char *service = NULL;
733     const char *thread = NULL;
734     const char *log_xid = NULL;
735     char *msg_body = NULL;
736     char *recipient = NULL;
737     int i;
738
739     if (buffer_size <= 0) return OK;
740
741     // generate a new log trace for this request. it 
742     // may be replaced by a client-provided trace below.
743     osrfLogMkXid();
744
745     osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
746
747     // buffer may not be \0-terminated, which jsonParse requires
748     char buf[buffer_size + 1];
749     memcpy(buf, buffer, buffer_size);
750     buf[buffer_size] = '\0';
751
752     msg_wrapper = jsonParse(buf);
753
754     if (msg_wrapper == NULL) {
755         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
756         return HTTP_BAD_REQUEST;
757     }
758
759     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
760
761     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
762         service = jsonObjectGetString(tmp_obj);
763
764     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
765         thread = jsonObjectGetString(tmp_obj);
766
767     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
768         log_xid = jsonObjectGetString(tmp_obj);
769
770     if (log_xid) {
771
772         // use the caller-provide log trace id
773         if (strlen(log_xid) > MAX_THREAD_SIZE) {
774             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
775             return HTTP_BAD_REQUEST;
776         }
777
778         osrfLogForceXid(log_xid);
779     }
780
781     if (thread) {
782
783         if (strlen(thread) > MAX_THREAD_SIZE) {
784             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
785             return HTTP_BAD_REQUEST;
786         }
787
788         // since clients can provide their own threads at session start time,
789         // the presence of a thread does not guarantee a cached recipient
790         recipient = (char*) apr_hash_get(
791             trans->session_cache, thread, APR_HASH_KEY_STRING);
792
793         if (recipient) {
794             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
795         }
796     }
797
798     if (!recipient) {
799
800         if (service) {
801             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
802                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
803             recipient_buf[size] = '\0';                                          
804             recipient = recipient_buf;
805
806         } else {
807             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
808             return HTTP_BAD_REQUEST;
809         }
810     }
811
812     osrfLogDebug(OSRF_LOG_MARK, 
813         "WS relaying message to opensrf thread=%s, recipient=%s", 
814             thread, recipient);
815
816     msg_body = extract_inbound_messages(
817         r, service, thread, recipient, osrf_msg);
818
819     transport_message *tmsg = message_init(
820         msg_body, NULL, thread, recipient, NULL);
821
822     message_set_osrf_xid(tmsg, osrfLogGetXid());
823     client_send_message(osrf_handle, tmsg);
824
825
826     osrfLogClearXid();
827     message_free(tmsg);                                                         
828     jsonObjectFree(msg_wrapper);
829     free(msg_body);
830
831     last_activity_time = time(NULL);
832
833     return OK;
834 }
835
836 static size_t CALLBACK on_message_handler(void *data,
837                 const WebSocketServer *server, const int type, 
838                 unsigned char *buffer, const size_t buffer_size) {
839
840     if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
841         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
842         return 1; // TODO: map to apr_status_t value?
843     }
844
845     apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
846
847     if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
848         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
849         return 1;
850     }
851
852     return stat;
853 }
854
855
856 /**
857  * Clear the session cache, release the session pool
858  */
859 void CALLBACK on_disconnect_handler(
860     void *data, const WebSocketServer *server) {
861
862     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
863     trans->client_connected = 0;
864
865     // timeout thread is recreated w/ each new connection
866     apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS);
867     trans->idle_timeout_thread = NULL;
868     
869     // ensure no errant session data is sticking around
870     apr_hash_clear(trans->session_cache);
871
872     // strictly speaking, this pool will get destroyed when
873     // r->pool is destroyed, but it doesn't hurt to explicitly
874     // destroy it ourselves.
875     apr_pool_destroy(trans->session_pool);
876     trans->session_pool = NULL;
877
878     request_rec *r = server->request(server);
879
880     osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); 
881 }
882
883 /**
884  * Be nice and clean up our mess
885  */
886 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
887     if (trans) {
888         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
889         apr_thread_mutex_destroy(trans->mutex);
890         if (trans->session_pool)
891             apr_pool_destroy(trans->session_pool);
892         apr_pool_destroy(trans->main_pool);
893     }
894
895     trans = NULL;
896 }
897
898 static WebSocketPlugin osrf_websocket_plugin = {
899     sizeof(WebSocketPlugin),
900     WEBSOCKET_PLUGIN_VERSION_0,
901     on_destroy_handler,
902     on_connect_handler,
903     on_message_handler,
904     on_disconnect_handler
905 };
906
907 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
908     return &osrf_websocket_plugin;
909 }
910