8da5a36aaa6cc07d7d357f37a57333cd4ea5a01b
[OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  Messages are analyzed to determine
21  * when a connect/disconnect occurs, so that the cache of recipients can be
22  * properly managed.  We also activity-log REQUEST messages.
23  *
24  * Messages to/from the websocket client take the following form:
25  * {
26  *   "service"  : "opensrf.foo", // required
27  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
28  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
29  *   "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...]   // required
30  * }
31  *
32  * Each translator operates with three threads.  One thread receives messages
33  * from the websocket client, translates, and relays them to the opensrf 
34  * network. The second thread collects responses from the opensrf network and 
35  * relays them back to the websocket client.  The third thread inspects
36  * the idle timeout interval t see if it's time to drop the idle client.
37  *
38  * After the initial setup, all thread actions occur within a thread
39  * mutex.  The desired affect is a non-threaded application that uses
40  * threads for the sole purpose of having one thread listening for
41  * incoming data, while a second thread listens for responses, and a
42  * third checks the idle timeout.  When any thread awakens, it's the
43  * only thread in town until it goes back to sleep (i.e. listening on
44  * its socket for data).
45  *
46  * Note that with the opensrf "thread", which allows us to identify the
47  * opensrf session, the caller does not need to provide a recipient
48  * address.  The "service" is only required to start a new opensrf
49  * session.  After the sesession is started, all future communication is
50  * based solely on the thread.  However, the "service" should be passed
51  * by the caller for all requests to ensure it is properly logged in the
52  * activity log.
53  *
54  * Every inbound and outbound message updates the last_activity_time.
55  * A separate thread wakes periodically to see if the time since the
56  * last_activity_time exceeds the configured idle_timeout_interval.  If
57  * so, a disconnect is sent to the client, completing the conversation.
58  *
59  * Configuration goes directly into the Apache envvars file.  
60  * (e.g. /etc/apache2-websockets/envvars).  As of today, it's not 
61  * possible to leverage Apache configuration directives directly,
62  * since this is not an Apache module, but a shared library loaded
63  * by an apache module.  This includes SetEnv / SetEnvIf.
64  *
65  * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300
66  * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
67  * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
68  * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
69  */
70
71 #include <stdlib.h>
72 #include <signal.h>
73 #include <unistd.h>
74 #include "httpd.h"
75 #include "http_log.h"
76 #include "apr_strings.h"
77 #include "apr_thread_proc.h"
78 #include "apr_hash.h"
79 #include "websocket_plugin.h"
80 #include "opensrf/log.h"
81 #include "opensrf/osrf_json.h"
82 #include "opensrf/transport_client.h"
83 #include "opensrf/transport_message.h"
84 #include "opensrf/osrf_system.h"                                                
85 #include "opensrf/osrfConfig.h"
86
87 #define MAX_THREAD_SIZE 64
88 #define RECIP_BUF_SIZE 256
89 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
90
91 // maximun number of active, CONNECTed opensrf sessions allowed. in
92 // practice, this number will be very small, rarely reaching double
93 // digits.  This is just a security back-stop.  A client trying to open
94 // this many connections is almost certainly attempting to DOS the
95 // gateway / server.  We may want to lower this further.
96 #define MAX_ACTIVE_STATEFUL_SESSIONS 128
97
98 // default values, replaced during setup (below) as needed.
99 static char* config_file = "/openils/conf/opensrf_core.xml";
100 static char* config_ctxt = "gateway";
101
102 static time_t idle_timeout_interval = 120; 
103 static time_t idle_check_interval = 5;
104 static time_t last_activity_time = 0;
105
106 // Generally, we do not disconnect the client (as idle) if there is a
107 // request in flight.  However, we need to have an upper bound on the
108 // amount of time we will wait for in-flight requests to complete to
109 // avoid leaving an effectively idle connection open after a request
110 // died on the backend and no response was received.
111 // Note that if other activity occurs while a long-running request
112 // is active, the wait time will get reset with each new activity. 
113 // This is OK, though, because the goal of max_request_wait_time
114 // is not to chop requests off at the knees, it's to allow the client
115 // to timeout as idle when only a single long-running request is active
116 // and preventing timeout.
117 static time_t max_request_wait_time = 600;
118
119 // Incremented with every REQUEST, decremented with every COMPLETE.
120 // Gives us a rough picture of the number of reqests we've sent to 
121 // the server vs. the number for which a completed response has been 
122 // received.
123 static int requests_in_flight = 0;
124
125 // true if we've received a signal to start graceful shutdown
126 static int shutdown_requested = 0; 
127 static void sigusr1_handler(int sig);
128 static void sigusr1_handler(int sig) {                                       
129     shutdown_requested = 1; 
130     signal(SIGUSR1, sigusr1_handler);
131     osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown");
132 }
133
134 static const char* get_client_ip(const request_rec* r) {
135 #ifdef APACHE_MIN_24
136     return r->connection->client_ip;
137 #else
138     return r->connection->remote_ip;
139 #endif
140 }
141
142 typedef struct _osrfWebsocketTranslator {
143
144     /** Our handle for communicating with the caller */
145     const WebSocketServer *server;
146     
147     /**
148      * Standalone, per-process APR pool.  Primarily
149      * there for managing thread data, which lasts 
150      * the duration of the process.
151      */
152     apr_pool_t *main_pool;
153
154     /**
155      * Map of thread => drone-xmpp-address.  Maintaining this
156      * map internally means the caller never need know about
157      * internal XMPP addresses and the server doesn't have to 
158      * verify caller-specified recipient addresses.  It's
159      * all managed internally.  This is only used for stateful
160      * (CONNECT'ed) session.  Stateless sessions need not 
161      * track the recipient, since they are one-off calls.
162      */
163     apr_hash_t *stateful_session_cache; 
164
165     /**
166      * stateful_session_pool contains the key/value pairs stored in
167      * the stateful_session_cache.  The pool is regularly destroyed
168      * and re-created to avoid long-term memory consumption
169      */
170     apr_pool_t *stateful_session_pool;
171
172     /**
173      * Thread responsible for collecting responses on the opensrf
174      * network and relaying them back to the caller
175      */
176     apr_thread_t *responder_thread;
177
178     /**
179      * Thread responsible for checking inactivity timeout.
180      * If no activitity occurs within the configured interval,
181      * a disconnect is sent to the client and the connection
182      * is terminated.
183      */
184     apr_thread_t *idle_timeout_thread;
185
186     /**
187      * All message handling code is wrapped in a thread mutex such
188      * that all actions (after the initial setup) are serialized
189      * to minimize the possibility of multi-threading snafus.
190      */
191     apr_thread_mutex_t *mutex;
192
193     /**
194      * True if a websocket client is currently connected
195      */
196     int client_connected;
197
198     /** OpenSRF jouter name */
199     char* osrf_router;
200
201     /** OpenSRF domain */
202     char* osrf_domain;
203
204 } osrfWebsocketTranslator;
205
206 static osrfWebsocketTranslator *trans = NULL;
207 static transport_client *osrf_handle = NULL;
208 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
209
210 static void clear_cached_recipient(const char* thread) {
211     apr_pool_t *pool = NULL;                                                
212     request_rec *r = trans->server->request(trans->server);
213
214     if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) {
215
216         osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
217
218         // remove it from the hash
219         apr_hash_set(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING, NULL);
220
221         if (apr_hash_count(trans->stateful_session_cache) == 0) {
222             osrfLogDebug(OSRF_LOG_MARK, "WS re-setting stateful_session_pool");
223
224             // memory accumulates in the stateful_session_pool as
225             // sessions are cached then un-cached.  Un-caching removes
226             // strings from the hash, but not from the pool.  Clear the
227             // pool here. note: apr_pool_clear does not free memory, it
228             // reclaims it for use again within the pool.  This is more
229             // effecient than freeing and allocating every time.
230             apr_pool_clear(trans->stateful_session_pool);
231         }
232     }
233 }
234
235 void* osrf_responder_thread_main_body(transport_message *tmsg) {
236
237     osrfList *msg_list = NULL;
238     osrfMessage *one_msg = NULL;
239     int i;
240
241     osrfLogDebug(OSRF_LOG_MARK, 
242         "WS received opensrf response for thread=%s", tmsg->thread);
243
244     // first we need to perform some maintenance
245     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
246
247     for (i = 0; i < msg_list->size; i++) {
248         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
249
250         osrfLogDebug(OSRF_LOG_MARK, 
251             "WS returned response of type %d", one_msg->m_type);
252
253         /*  if our client just successfully connected to an opensrf service,
254             cache the sender so that future calls on this thread will use
255             the correct recipient. */
256         if (one_msg && one_msg->m_type == STATUS) {
257
258             if (one_msg->status_code == OSRF_STATUS_OK) {
259
260                 if (!apr_hash_get(trans->stateful_session_cache, 
261                         tmsg->thread, APR_HASH_KEY_STRING)) {
262
263                     apr_size_t ses_size = 
264                         apr_hash_count(trans->stateful_session_cache);
265
266                     if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
267
268                         osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
269                             "thread=%s, sender=%s; concurrent=%d", 
270                             tmsg->thread, tmsg->sender, ses_size);
271
272                         apr_hash_set(trans->stateful_session_cache, 
273                             apr_pstrdup(trans->stateful_session_pool, tmsg->thread),
274                             APR_HASH_KEY_STRING, 
275                             apr_pstrdup(trans->stateful_session_pool, tmsg->sender));
276
277                     } else {
278                         osrfLogWarning(OSRF_LOG_MARK, 
279                             "WS max concurrent sessions (%d) reached.  "
280                             "Current session will not be tracked",
281                             MAX_ACTIVE_STATEFUL_SESSIONS
282                         );
283                     }
284                 }
285
286             } else {
287
288                 // connection timed out; clear the cached recipient
289                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
290                     clear_cached_recipient(tmsg->thread);
291
292                 } else {
293                     if (one_msg->status_code == OSRF_STATUS_COMPLETE)
294                         requests_in_flight--;
295                 }
296             }
297         }
298     }
299
300     // osrfMessageDeserialize applies the freeItem handler to the 
301     // newly created osrfList.  We only need to free the list and 
302     // the individual osrfMessage's will be freed along with it
303     osrfListFree(msg_list);
304     
305     // relay the response messages to the client
306     jsonObject *msg_wrapper = NULL;
307     char *msg_string = NULL;
308
309     // build the wrapper object
310     msg_wrapper = jsonNewObject(NULL);
311     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
312     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
313     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
314
315     if (tmsg->is_error) {
316         osrfLogError(OSRF_LOG_MARK, 
317             "WS received jabber error message in response to thread=%s", 
318             tmsg->thread);
319         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
320     }
321
322     msg_string = jsonObjectToJSONRaw(msg_wrapper);
323
324     // drop the JSON on the outbound wire
325     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
326         (unsigned char*) msg_string, strlen(msg_string));
327
328     free(msg_string);
329     jsonObjectFree(msg_wrapper);
330 }
331
332 /**
333  * Responder thread main body.
334  * Collects responses from the opensrf network and relays them to the 
335  * websocket caller.
336  */
337 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
338
339     transport_message *tmsg;
340     while (1) {
341
342         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
343             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
344             return NULL;
345         }
346
347         // wait indefinitely for a response
348         tmsg = client_recv(osrf_handle, -1);
349
350         if (!tmsg) {
351             // tmsg can only be NULL if the underlying select() call is
352             // interrupted or the jabber socket connection was severed.
353
354             if (client_connected(osrf_handle) &&
355                 socket_connected(osrf_handle->session->sock_id)) {
356                 continue; // interrupted.  restart loop.
357             }
358
359             // Socket connection was broken.  Send disconnect to client,
360             // causing on_disconnect_handler to run and cleanup.
361             osrfLogWarning(OSRF_LOG_MARK, 
362                 "WS: Jabber socket disconnected. Sending close() to client");
363
364             trans->server->close(trans->server);
365             return NULL; // exit thread
366         }
367
368         if (trans->client_connected) {
369
370             if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
371                 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
372                 return NULL;
373             }
374
375             osrfLogForceXid(tmsg->osrf_xid);
376             osrf_responder_thread_main_body(tmsg);
377             last_activity_time = time(NULL);
378         }
379
380         message_free(tmsg);                                                         
381     }
382
383     return NULL;
384 }
385
386 static int active_connection_count() {
387
388     if (requests_in_flight) {
389
390         time_t now = time(NULL);
391         time_t difference = now - last_activity_time;
392
393         if (difference >= max_request_wait_time) {
394             osrfLogWarning(OSRF_LOG_MARK, 
395                 "%d In-flight request(s) took longer than %d seconds "
396                 "to complete.  Treating request as dead and moving on.",
397                 requests_in_flight, 
398                 max_request_wait_time
399             );
400             requests_in_flight = 0;
401         }
402     }
403
404     return requests_in_flight;
405 }
406
407 /**
408  * Sleep and regularly wake to see if the process has been idle for too
409  * long.  If so, send a disconnect to the client.
410  */
411 void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
412         apr_thread_t *thread, void *data) {
413
414     // sleep time defaults to the check interval, but may 
415     // be shortened during shutdown.
416     int sleep_time = idle_check_interval;
417     int shutdown_loops = 0;
418
419     while (1) {
420
421         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
422             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
423             return NULL;
424         }
425
426         // note: receiving a signal (e.g. SIGUSR1) will not interrupt
427         // this sleep(), since it's running within its own thread.
428         // During graceful shtudown, we may wait up to 
429         // idle_check_interval seconds before initiating shutdown.
430         sleep(sleep_time);
431
432         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
433             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
434             return NULL;
435         }
436
437         // no client is connected.  reset sleep time go back to sleep.
438         if (!trans->client_connected) {
439             sleep_time = idle_check_interval;
440             continue;
441         }
442
443         // do we have any active stateful conversations with the client?
444         int active_count = active_connection_count();
445
446         if (active_count) {
447
448             if (shutdown_requested) {
449                 // active conversations means we can't shut down.  
450                 // shorten the check interval to re-check more often.
451                 shutdown_loops++;
452                 osrfLogDebug(OSRF_LOG_MARK, 
453                     "WS: %d active conversation(s) found in shutdown after "
454                     "%d attempts.  Sleeping...", shutdown_loops, active_count
455                 );
456
457                 if (shutdown_loops > 30) {
458                     // this is clearly a long-running conversation, let's
459                     // check less frequently to avoid excessive logging.
460                     sleep_time = 3;
461                 } else {
462                     sleep_time = 1;
463                 }
464             } 
465
466             // active conversations means keep going.  There's no point in
467             // checking the idle time (below) if we're mid-conversation
468             continue;
469         }
470
471         // no active conversations
472              
473         if (shutdown_requested) {
474             // there's no need to reset the shutdown vars (loops/requested)
475             // SIGUSR1 is Apaches reload signal, which means this process
476             // will be going away as soon as the client is disconnected.
477
478             osrfLogInfo(OSRF_LOG_MARK,
479                 "WS: no active conversations remain in shutdown; "
480                     "closing client connection");
481
482         } else { 
483             // see how long we've been idle.  If too long, kick the client
484
485             time_t now = time(NULL);
486             time_t difference = now - last_activity_time;
487
488             osrfLogDebug(OSRF_LOG_MARK, 
489                 "WS connection idle for %d seconds", difference);
490
491             if (difference < idle_timeout_interval) {
492                 // Last activity occurred within the idle timeout interval.
493                 continue;
494             }
495
496             // idle timeout exceeded
497             osrfLogDebug(OSRF_LOG_MARK, 
498                 "WS: idle timeout exceeded.  now=%d / last=%d; " 
499                 "closing client connection", now, last_activity_time);
500         }
501
502
503         // send a disconnect to the client, which will come back around
504         // to cause our on_disconnect_handler to run.
505         osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
506         trans->server->close(trans->server);
507
508         // client will be going away, reset sleep time
509         sleep_time = idle_check_interval;
510     }
511
512     // should never get here
513     return NULL;
514 }
515
516 static int build_startup_data(const WebSocketServer *server) {
517
518     apr_pool_t *main_pool = NULL;                                                
519     apr_pool_t *stateful_session_pool = NULL;                                                
520     apr_thread_t *thread = NULL;
521     apr_threadattr_t *thread_attr = NULL;
522     apr_thread_mutex_t *mutex = NULL;
523     request_rec *r = server->request(server);
524
525     // create a pool for our translator data
526     // Do not use r->pool as the parent, since r->pool will be freed
527     // when the current client disconnects.
528     if (apr_pool_create(&main_pool, NULL) != APR_SUCCESS) {
529         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
530         return 1;
531     }
532
533     trans = (osrfWebsocketTranslator*) 
534         apr_palloc(main_pool, sizeof(osrfWebsocketTranslator));
535
536     if (trans == NULL) {
537         osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
538         return 1;
539     }
540
541     trans->server = server;
542     trans->main_pool = main_pool;
543     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
544     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
545
546     // opensrf session / recipient cache
547     trans->stateful_session_cache = apr_hash_make(trans->main_pool);
548     if (trans->stateful_session_cache == NULL) {
549         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
550         return 1;
551     }
552
553     // opensrf session / recipient string pool; cleared regularly
554     // the only data entering this pools are the session strings.
555     if (apr_pool_create(&stateful_session_pool, trans->main_pool) != APR_SUCCESS) {
556         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
557         return NULL;
558     }
559     trans->stateful_session_pool = stateful_session_pool;
560
561     if (apr_thread_mutex_create(
562             &mutex, APR_THREAD_MUTEX_UNNESTED, 
563             trans->main_pool) != APR_SUCCESS) {
564         osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
565         return 1;
566     }
567     trans->mutex = mutex;
568
569     // responder thread
570     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
571          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
572          (apr_thread_create(&thread, thread_attr, 
573                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
574
575         trans->responder_thread = thread;
576         
577     } else {
578         osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
579         return 1;
580     }
581
582     // idle timeout thread
583     thread = NULL; // reset
584     thread_attr = NULL; // reset
585     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
586          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
587          (apr_thread_create(&thread, thread_attr, 
588             osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
589
590         osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
591         trans->idle_timeout_thread = thread;
592         
593     } else {
594         osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
595         return 1;
596     }
597
598     return APR_SUCCESS;
599 }
600
601
602 /**
603  * Connect to OpenSRF, create the main pool, responder thread
604  * session cache and session pool.
605  */
606 int child_init(const WebSocketServer *server) {
607     request_rec *r = server->request(server);
608
609     // osrf_handle will already be connected if this is not the first request
610     // served by this process.
611     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
612         
613         // load config values from the env
614         char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT");
615         if (timeout) {
616             if (!atoi(timeout)) {
617                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
618                     "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout);
619             } else {
620                 idle_timeout_interval = (time_t) atoi(timeout);
621             }
622         }
623
624         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
625             "WS: timeout set to %ld", idle_timeout_interval);
626
627         timeout = getenv("OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME");
628         if (timeout) {
629             if (!atoi(timeout)) {
630                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
631                     "WS: invalid OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME: %s", 
632                     timeout
633                 );
634             } else {
635                 max_request_wait_time = (time_t) atoi(timeout);
636             }
637         }
638
639         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
640             "WS: max request wait time set to %ld", max_request_wait_time);
641
642         char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
643         if (interval) {
644             if (!atoi(interval)) {
645                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
646                     "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s", 
647                     interval
648                 );
649             } else {
650                 idle_check_interval = (time_t) atoi(interval);
651             }
652         } 
653
654         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
655             "WS: idle check interval set to %ld", idle_check_interval);
656
657       
658         char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE");
659         if (cfile) {
660             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
661                 "WS: config file set to %s", cfile);
662             config_file = cfile;
663         }
664
665         char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT");
666         if (ctxt) {
667             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
668                 "WS: config context set to %s", ctxt);
669             config_ctxt = ctxt;
670         }
671
672         // connect to opensrf
673         if (!osrfSystemBootstrapClientResc(
674                 config_file, config_ctxt, "websocket")) {   
675
676             osrfLogError(OSRF_LOG_MARK, 
677                 "WS unable to bootstrap OpenSRF client with config %s "
678                 "and context %s", config_file, config_ctxt
679             ); 
680             return 1;
681         }
682
683         osrfLogSetAppname("osrf_websocket_translator");
684         osrf_handle = osrfSystemGetTransportClient();
685     }
686
687     signal(SIGUSR1, sigusr1_handler);
688     return APR_SUCCESS;
689 }
690
691 /**
692  * Create the per-client translator
693  */
694 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
695     request_rec *r = server->request(server);
696
697     if (!trans) { // first connection
698
699         // connect to opensrf
700         if (child_init(server) != APR_SUCCESS)
701             return NULL;
702
703         // build pools, thread data, and the translator
704         if (build_startup_data(server) != APR_SUCCESS)
705             return NULL;
706     }
707
708     const char* client_ip = get_client_ip(r);
709     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
710
711     last_activity_time = time(NULL);
712     trans->client_connected = 1;
713     return trans;
714 }
715
716
717 /** 
718  * for each inbound opensrf message:
719  * 1. Stamp the ingress
720  * 2. REQUEST: log it as activity
721  * 3. DISCONNECT: remove the cached recipient
722  * then re-string-ify for xmpp delivery
723  */
724
725 static char* extract_inbound_messages(
726         const request_rec *r, 
727         const char* service, 
728         const char* thread, 
729         const char* recipient, 
730         const jsonObject *osrf_msg) {
731
732     int i;
733     int num_msgs = osrf_msg->size;
734     osrfMessage* msg;
735     osrfMessage* msg_list[num_msgs];
736
737     // here we do an extra json round-trip to get the data
738     // in a form osrf_message_deserialize can understand
739     // TODO: consider a version of osrf_message_init which can 
740     // accept a jsonObject* instead of a JSON string.
741     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
742     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
743     free(osrf_msg_json);
744
745     // should we require the caller to always pass the service?
746     if (service == NULL) service = "";
747
748     for(i = 0; i < num_msgs; i++) {
749         msg = msg_list[i];
750         osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
751
752         switch(msg->m_type) {
753
754             case REQUEST: {
755                 const jsonObject* params = msg->_params;
756                 growing_buffer* act = buffer_init(128);
757                 char* method = msg->method_name;
758                 buffer_fadd(act, "[%s] [%s] %s %s", 
759                     get_client_ip(r), "", service, method);
760
761                 const jsonObject* obj = NULL;
762                 int i = 0;
763                 const char* str;
764                 int redactParams = 0;
765                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
766                     if(!strncmp(method, str, strlen(str))) {
767                         redactParams = 1;
768                         break;
769                     }
770                 }
771                 if(redactParams) {
772                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
773                 } else {
774                     i = 0;
775                     while((obj = jsonObjectGetIndex(params, i++))) {
776                         char* str = jsonObjectToJSON(obj);
777                         if( i == 1 )
778                             OSRF_BUFFER_ADD(act, " ");
779                         else
780                             OSRF_BUFFER_ADD(act, ", ");
781                         OSRF_BUFFER_ADD(act, str);
782                         free(str);
783                     }
784                 }
785                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
786                 buffer_free(act);
787                 requests_in_flight++;
788                 break;
789             }
790
791             case DISCONNECT:
792                 clear_cached_recipient(thread);
793                 break;
794         }
795     }
796
797     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
798
799     // clean up our messages
800     for(i = 0; i < num_msgs; i++) 
801         osrfMessageFree(msg_list[i]);
802
803     return finalMsg;
804 }
805
806 /**
807  * Parse opensrf request and relay the request to the opensrf network.
808  */
809 static size_t on_message_handler_body(void *data,
810                 const WebSocketServer *server, const int type, 
811                 unsigned char *buffer, const size_t buffer_size) {
812
813     request_rec *r = server->request(server);
814
815     jsonObject *msg_wrapper = NULL; // free me
816     const jsonObject *tmp_obj = NULL;
817     const jsonObject *osrf_msg = NULL;
818     const char *service = NULL;
819     const char *thread = NULL;
820     const char *log_xid = NULL;
821     char *msg_body = NULL;
822     char *recipient = NULL;
823     int i;
824
825     if (buffer_size <= 0) return OK;
826
827     // generate a new log trace for this request. it 
828     // may be replaced by a client-provided trace below.
829     osrfLogMkXid();
830
831     osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
832
833     // buffer may not be \0-terminated, which jsonParse requires
834     char buf[buffer_size + 1];
835     memcpy(buf, buffer, buffer_size);
836     buf[buffer_size] = '\0';
837
838     osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", buf);
839
840     msg_wrapper = jsonParse(buf);
841
842     if (msg_wrapper == NULL) {
843         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
844         return HTTP_BAD_REQUEST;
845     }
846
847     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
848
849     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
850         service = jsonObjectGetString(tmp_obj);
851
852     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
853         thread = jsonObjectGetString(tmp_obj);
854
855     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
856         log_xid = jsonObjectGetString(tmp_obj);
857
858     if (log_xid) {
859
860         // use the caller-provide log trace id
861         if (strlen(log_xid) > MAX_THREAD_SIZE) {
862             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
863             return HTTP_BAD_REQUEST;
864         }
865
866         osrfLogForceXid(log_xid);
867     }
868
869     if (thread) {
870
871         if (strlen(thread) > MAX_THREAD_SIZE) {
872             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
873             return HTTP_BAD_REQUEST;
874         }
875
876         // since clients can provide their own threads at session start time,
877         // the presence of a thread does not guarantee a cached recipient
878         recipient = (char*) apr_hash_get(
879             trans->stateful_session_cache, thread, APR_HASH_KEY_STRING);
880
881         if (recipient) {
882             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
883         }
884     }
885
886     if (!recipient) {
887
888         if (service) {
889             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
890                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
891             recipient_buf[size] = '\0';                                          
892             recipient = recipient_buf;
893
894         } else {
895             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
896             return HTTP_BAD_REQUEST;
897         }
898     }
899
900     osrfLogDebug(OSRF_LOG_MARK, 
901         "WS relaying message to opensrf thread=%s, recipient=%s", 
902             thread, recipient);
903
904     msg_body = extract_inbound_messages(
905         r, service, thread, recipient, osrf_msg);
906
907     osrfLogInternal(OSRF_LOG_MARK, 
908         "WS relaying inbound message: %s", msg_body);
909
910     transport_message *tmsg = message_init(
911         msg_body, NULL, thread, recipient, NULL);
912
913     message_set_osrf_xid(tmsg, osrfLogGetXid());
914
915     size_t stat = OK;
916     if (client_send_message(osrf_handle, tmsg) != 0) {
917         osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF");
918         stat = HTTP_INTERNAL_SERVER_ERROR;
919     }
920
921     osrfLogClearXid();
922     message_free(tmsg);                                                         
923     jsonObjectFree(msg_wrapper);
924     free(msg_body);
925
926     last_activity_time = time(NULL);
927     return stat;
928 }
929
930 static size_t CALLBACK on_message_handler(void *data,
931                 const WebSocketServer *server, const int type, 
932                 unsigned char *buffer, const size_t buffer_size) {
933
934     if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
935         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
936         return 1;
937     }
938
939     size_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
940
941     if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
942         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
943         return 1;
944     }
945
946     if (stat != OK) {
947         // Returning a non-OK status alone won't force a disconnect.
948         // Once disconnected, the on_disconnect_handler() handler
949         // will run, clean it all up, and kill the process.
950         osrfLogError(OSRF_LOG_MARK,
951             "Error relaying message, forcing client disconnect");
952         trans->server->close(trans->server);
953     }
954
955     return stat;
956 }
957
958
959 /**
960  * Clear the session cache, release the session pool
961  */
962 void CALLBACK on_disconnect_handler(
963     void *data, const WebSocketServer *server) {
964
965     // if the threads wake up during disconnect, this tells 
966     // them to go back to sleep.
967     trans->client_connected = 0;
968
969     request_rec *r = server->request(server);
970     osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); 
971
972     // Clear any lingering session data
973     // NOTE: we could apr_pool_destroy the stateful_session_pool to truly free
974     // the memory, but since there is a limit to the size of the pool
975     // (max_concurrent_sessions), the memory cannot grow unbounded, 
976     // so there's no need.
977     apr_hash_clear(trans->stateful_session_cache);
978     apr_pool_clear(trans->stateful_session_pool);
979 }
980
981 static WebSocketPlugin osrf_websocket_plugin = {
982     sizeof(WebSocketPlugin),
983     WEBSOCKET_PLUGIN_VERSION_0,
984     NULL, // on_destroy_handler
985     on_connect_handler,
986     on_message_handler,
987     on_disconnect_handler
988 };
989
990 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
991     return &osrf_websocket_plugin;
992 }
993