]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/gateway/osrf_websocket_translator.c
ef8d4af30264b02610a7173d1a6656fad2b92eca
[OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  Messages are analyzed to determine
21  * when a connect/disconnect occurs, so that the cache of recipients can be
22  * properly managed.  We also activity-log REQUEST messages.
23  *
24  * Messages to/from the websocket client take the following form:
25  * {
26  *   "service"  : "opensrf.foo", // required
27  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
28  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
29  *   "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...]   // required
30  * }
31  *
32  * Each translator operates with three threads.  One thread receives messages
33  * from the websocket client, translates, and relays them to the opensrf 
34  * network. The second thread collects responses from the opensrf network and 
35  * relays them back to the websocket client.  The third thread inspects
36  * the idle timeout interval t see if it's time to drop the idle client.
37  *
38  * After the initial setup, all thread actions occur within a thread
39  * mutex.  The desired affect is a non-threaded application that uses
40  * threads for the sole purpose of having one thread listening for
41  * incoming data, while a second thread listens for responses, and a
42  * third checks the idle timeout.  When any thread awakens, it's the
43  * only thread in town until it goes back to sleep (i.e. listening on
44  * its socket for data).
45  *
46  * Note that with the opensrf "thread", which allows us to identify the
47  * opensrf session, the caller does not need to provide a recipient
48  * address.  The "service" is only required to start a new opensrf
49  * session.  After the sesession is started, all future communication is
50  * based solely on the thread.  However, the "service" should be passed
51  * by the caller for all requests to ensure it is properly logged in the
52  * activity log.
53  *
54  * Every inbound and outbound message updates the last_activity_time.
55  * A separate thread wakes periodically to see if the time since the
56  * last_activity_time exceeds the configured idle_timeout_interval.  If
57  * so, a disconnect is sent to the client, completing the conversation.
58  *
59  * Configuration goes directly into the Apache envvars file.  
60  * (e.g. /etc/apache2-websockets/envvars).  As of today, it's not 
61  * possible to leverage Apache configuration directives directly,
62  * since this is not an Apache module, but a shared library loaded
63  * by an apache module.  This includes SetEnv / SetEnvIf.
64  *
65  * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300
66  * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
67  * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
68  * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
69  */
70
71 #include <stdlib.h>
72 #include <signal.h>
73 #include <unistd.h>
74 #include "httpd.h"
75 #include "http_log.h"
76 #include "apr_strings.h"
77 #include "apr_thread_proc.h"
78 #include "apr_hash.h"
79 #include "websocket_plugin.h"
80 #include "opensrf/log.h"
81 #include "opensrf/osrf_json.h"
82 #include "opensrf/transport_client.h"
83 #include "opensrf/transport_message.h"
84 #include "opensrf/osrf_system.h"                                                
85 #include "opensrf/osrfConfig.h"
86
87 #define MAX_THREAD_SIZE 64
88 #define RECIP_BUF_SIZE 256
89 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
90
91 // maximun number of active, CONNECTed opensrf sessions allowed. in
92 // practice, this number will be very small, rarely reaching double
93 // digits.  This is just a security back-stop.  A client trying to open
94 // this many connections is almost certainly attempting to DOS the
95 // gateway / server.  We may want to lower this further.
96 #define MAX_ACTIVE_STATEFUL_SESSIONS 128
97
98 // default values, replaced during setup (below) as needed.
99 static char* config_file = "/openils/conf/opensrf_core.xml";
100 static char* config_ctxt = "gateway";
101
102 static time_t idle_timeout_interval = 120; 
103 static time_t idle_check_interval = 5;
104 static time_t last_activity_time = 0;
105
106 // Generally, we do not disconnect the client (as idle) if there is a
107 // request in flight.  However, we need to have an upper bound on the
108 // amount of time we will wait for in-flight requests to complete to
109 // avoid leaving an effectively idle connection open after a request
110 // died on the backend and no response was received.
111 // Note that if other activity occurs while a long-running request
112 // is active, the wait time will get reset with each new activity. 
113 // This is OK, though, because the goal of max_request_wait_time
114 // is not to chop requests off at the knees, it's to allow the client
115 // to timeout as idle when only a single long-running request is active
116 // and preventing timeout.
117 static time_t max_request_wait_time = 600;
118
119 // Incremented with every REQUEST, decremented with every COMPLETE.
120 // Gives us a rough picture of the number of reqests we've sent to 
121 // the server vs. the number for which a completed response has been 
122 // received.
123 static int requests_in_flight = 0;
124
125 // true if we've received a signal to start graceful shutdown
126 static int shutdown_requested = 0; 
127 static void sigusr1_handler(int sig);
128 static void sigusr1_handler(int sig) {                                       
129     shutdown_requested = 1; 
130     signal(SIGUSR1, sigusr1_handler);
131     osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown");
132 }
133
134 static const char* get_client_ip(const request_rec* r) {
135 #ifdef APACHE_MIN_24
136     return r->connection->client_ip;
137 #else
138     return r->connection->remote_ip;
139 #endif
140 }
141
142 typedef struct _osrfWebsocketTranslator {
143
144     /** Our handle for communicating with the caller */
145     const WebSocketServer *server;
146     
147     /**
148      * Standalone, per-process APR pool.  Primarily
149      * there for managing thread data, which lasts 
150      * the duration of the process.
151      */
152     apr_pool_t *main_pool;
153
154     /**
155      * Map of thread => drone-xmpp-address.  Maintaining this
156      * map internally means the caller never need know about
157      * internal XMPP addresses and the server doesn't have to 
158      * verify caller-specified recipient addresses.  It's
159      * all managed internally.  This is only used for stateful
160      * (CONNECT'ed) session.  Stateless sessions need not 
161      * track the recipient, since they are one-off calls.
162      */
163     apr_hash_t *stateful_session_cache; 
164
165     /**
166      * stateful_session_pool contains the key/value pairs stored in
167      * the stateful_session_cache.  The pool is regularly destroyed
168      * and re-created to avoid long-term memory consumption
169      */
170     apr_pool_t *stateful_session_pool;
171
172     /**
173      * Thread responsible for collecting responses on the opensrf
174      * network and relaying them back to the caller
175      */
176     apr_thread_t *responder_thread;
177
178     /**
179      * Thread responsible for checking inactivity timeout.
180      * If no activitity occurs within the configured interval,
181      * a disconnect is sent to the client and the connection
182      * is terminated.
183      */
184     apr_thread_t *idle_timeout_thread;
185
186     /**
187      * All message handling code is wrapped in a thread mutex such
188      * that all actions (after the initial setup) are serialized
189      * to minimize the possibility of multi-threading snafus.
190      */
191     apr_thread_mutex_t *mutex;
192
193     /**
194      * True if a websocket client is currently connected
195      */
196     int client_connected;
197
198     /** OpenSRF jouter name */
199     char* osrf_router;
200
201     /** OpenSRF domain */
202     char* osrf_domain;
203
204 } osrfWebsocketTranslator;
205
206 static osrfWebsocketTranslator *trans = NULL;
207 static transport_client *osrf_handle = NULL;
208 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
209
210 static void clear_cached_recipient(const char* thread) {
211     apr_pool_t *pool = NULL;                                                
212     request_rec *r = trans->server->request(trans->server);
213
214     if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) {
215
216         osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
217
218         // remove it from the hash
219         apr_hash_set(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING, NULL);
220
221         if (apr_hash_count(trans->stateful_session_cache) == 0) {
222             osrfLogDebug(OSRF_LOG_MARK, "WS re-setting stateful_session_pool");
223
224             // memory accumulates in the stateful_session_pool as
225             // sessions are cached then un-cached.  Un-caching removes
226             // strings from the hash, but not from the pool.  Clear the
227             // pool here. note: apr_pool_clear does not free memory, it
228             // reclaims it for use again within the pool.  This is more
229             // effecient than freeing and allocating every time.
230             apr_pool_clear(trans->stateful_session_pool);
231         }
232     }
233 }
234
235 void* osrf_responder_thread_main_body(transport_message *tmsg) {
236
237     osrfList *msg_list = NULL;
238     osrfMessage *one_msg = NULL;
239     int i;
240
241     osrfLogDebug(OSRF_LOG_MARK, 
242         "WS received opensrf response for thread=%s", tmsg->thread);
243
244     // first we need to perform some maintenance
245     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
246
247     for (i = 0; i < msg_list->size; i++) {
248         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
249
250         osrfLogDebug(OSRF_LOG_MARK, 
251             "WS returned response of type %d", one_msg->m_type);
252
253         /*  if our client just successfully connected to an opensrf service,
254             cache the sender so that future calls on this thread will use
255             the correct recipient. */
256         if (one_msg && one_msg->m_type == STATUS) {
257
258             if (one_msg->status_code == OSRF_STATUS_OK) {
259
260                 if (!apr_hash_get(trans->stateful_session_cache, 
261                         tmsg->thread, APR_HASH_KEY_STRING)) {
262
263                     apr_size_t ses_size = 
264                         apr_hash_count(trans->stateful_session_cache);
265
266                     if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
267
268                         osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
269                             "thread=%s, sender=%s; concurrent=%d", 
270                             tmsg->thread, tmsg->sender, ses_size);
271
272                         apr_hash_set(trans->stateful_session_cache, 
273                             apr_pstrdup(trans->stateful_session_pool, tmsg->thread),
274                             APR_HASH_KEY_STRING, 
275                             apr_pstrdup(trans->stateful_session_pool, tmsg->sender));
276
277                     } else {
278                         osrfLogWarning(OSRF_LOG_MARK, 
279                             "WS max concurrent sessions (%d) reached.  "
280                             "Current session will not be tracked",
281                             MAX_ACTIVE_STATEFUL_SESSIONS
282                         );
283                     }
284                 }
285
286             } else {
287
288                 // connection timed out; clear the cached recipient
289                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
290                     clear_cached_recipient(tmsg->thread);
291
292                 } else {
293                     if (one_msg->status_code == OSRF_STATUS_COMPLETE)
294                         requests_in_flight--;
295                 }
296             }
297         }
298     }
299
300     // osrfMessageDeserialize applies the freeItem handler to the 
301     // newly created osrfList.  We only need to free the list and 
302     // the individual osrfMessage's will be freed along with it
303     osrfListFree(msg_list);
304     
305     // relay the response messages to the client
306     jsonObject *msg_wrapper = NULL;
307     char *msg_string = NULL;
308
309     // build the wrapper object
310     msg_wrapper = jsonNewObject(NULL);
311     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
312     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
313     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
314
315     if (tmsg->is_error) {
316         osrfLogError(OSRF_LOG_MARK, 
317             "WS received jabber error message in response to thread=%s", 
318             tmsg->thread);
319         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
320     }
321
322     msg_string = jsonObjectToJSONRaw(msg_wrapper);
323
324     // drop the JSON on the outbound wire
325     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
326         (unsigned char*) msg_string, strlen(msg_string));
327
328     free(msg_string);
329     jsonObjectFree(msg_wrapper);
330 }
331
332 /**
333  * Responder thread main body.
334  * Collects responses from the opensrf network and relays them to the 
335  * websocket caller.
336  */
337 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
338
339     transport_message *tmsg;
340     while (1) {
341
342         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
343             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
344             return NULL;
345         }
346
347         // wait for a response
348         tmsg = client_recv(osrf_handle, -1);
349
350         if (!tmsg) continue; // interrupt
351
352         if (trans->client_connected) {
353
354             if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
355                 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
356                 return NULL;
357             }
358
359             osrfLogForceXid(tmsg->osrf_xid);
360             osrf_responder_thread_main_body(tmsg);
361             last_activity_time = time(NULL);
362         }
363
364         message_free(tmsg);                                                         
365     }
366
367     return NULL;
368 }
369
370 static int active_connection_count() {
371
372     if (requests_in_flight) {
373
374         time_t now = time(NULL);
375         time_t difference = now - last_activity_time;
376
377         if (difference >= max_request_wait_time) {
378             osrfLogWarning(OSRF_LOG_MARK, 
379                 "%d In-flight request(s) took longer than %d seconds "
380                 "to complete.  Treating request as dead and moving on.",
381                 requests_in_flight, 
382                 max_request_wait_time
383             );
384             requests_in_flight = 0;
385         }
386     }
387
388     return requests_in_flight;
389 }
390
391 /**
392  * Sleep and regularly wake to see if the process has been idle for too
393  * long.  If so, send a disconnect to the client.
394  */
395 void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
396         apr_thread_t *thread, void *data) {
397
398     // sleep time defaults to the check interval, but may 
399     // be shortened during shutdown.
400     int sleep_time = idle_check_interval;
401     int shutdown_loops = 0;
402
403     while (1) {
404
405         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
406             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
407             return NULL;
408         }
409
410         // note: receiving a signal (e.g. SIGUSR1) will not interrupt
411         // this sleep(), since it's running within its own thread.
412         // During graceful shtudown, we may wait up to 
413         // idle_check_interval seconds before initiating shutdown.
414         sleep(sleep_time);
415
416         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
417             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
418             return NULL;
419         }
420
421         // no client is connected.  reset sleep time go back to sleep.
422         if (!trans->client_connected) {
423             sleep_time = idle_check_interval;
424             continue;
425         }
426
427         // do we have any active stateful conversations with the client?
428         int active_count = active_connection_count();
429
430         if (active_count) {
431
432             if (shutdown_requested) {
433                 // active conversations means we can't shut down.  
434                 // shorten the check interval to re-check more often.
435                 shutdown_loops++;
436                 osrfLogDebug(OSRF_LOG_MARK, 
437                     "WS: %d active conversation(s) found in shutdown after "
438                     "%d attempts.  Sleeping...", shutdown_loops, active_count
439                 );
440
441                 if (shutdown_loops > 30) {
442                     // this is clearly a long-running conversation, let's
443                     // check less frequently to avoid excessive logging.
444                     sleep_time = 3;
445                 } else {
446                     sleep_time = 1;
447                 }
448             } 
449
450             // active conversations means keep going.  There's no point in
451             // checking the idle time (below) if we're mid-conversation
452             continue;
453         }
454
455         // no active conversations
456              
457         if (shutdown_requested) {
458             // there's no need to reset the shutdown vars (loops/requested)
459             // SIGUSR1 is Apaches reload signal, which means this process
460             // will be going away as soon as the client is disconnected.
461
462             osrfLogInfo(OSRF_LOG_MARK,
463                 "WS: no active conversations remain in shutdown; "
464                     "closing client connection");
465
466         } else { 
467             // see how long we've been idle.  If too long, kick the client
468
469             time_t now = time(NULL);
470             time_t difference = now - last_activity_time;
471
472             osrfLogDebug(OSRF_LOG_MARK, 
473                 "WS connection idle for %d seconds", difference);
474
475             if (difference < idle_timeout_interval) {
476                 // Last activity occurred within the idle timeout interval.
477                 continue;
478             }
479
480             // idle timeout exceeded
481             osrfLogDebug(OSRF_LOG_MARK, 
482                 "WS: idle timeout exceeded.  now=%d / last=%d; " 
483                 "closing client connection", now, last_activity_time);
484         }
485
486
487         // send a disconnect to the client, which will come back around
488         // to cause our on_disconnect_handler to run.
489         osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
490         trans->server->close(trans->server);
491
492         // client will be going away, reset sleep time
493         sleep_time = idle_check_interval;
494     }
495
496     // should never get here
497     return NULL;
498 }
499
500 static int build_startup_data(const WebSocketServer *server) {
501
502     apr_pool_t *main_pool = NULL;                                                
503     apr_pool_t *stateful_session_pool = NULL;                                                
504     apr_thread_t *thread = NULL;
505     apr_threadattr_t *thread_attr = NULL;
506     apr_thread_mutex_t *mutex = NULL;
507     request_rec *r = server->request(server);
508
509     // create a pool for our translator data
510     // Do not use r->pool as the parent, since r->pool will be freed
511     // when the current client disconnects.
512     if (apr_pool_create(&main_pool, NULL) != APR_SUCCESS) {
513         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
514         return 1;
515     }
516
517     trans = (osrfWebsocketTranslator*) 
518         apr_palloc(main_pool, sizeof(osrfWebsocketTranslator));
519
520     if (trans == NULL) {
521         osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
522         return 1;
523     }
524
525     trans->server = server;
526     trans->main_pool = main_pool;
527     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
528     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
529
530     // opensrf session / recipient cache
531     trans->stateful_session_cache = apr_hash_make(trans->main_pool);
532     if (trans->stateful_session_cache == NULL) {
533         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
534         return 1;
535     }
536
537     // opensrf session / recipient string pool; cleared regularly
538     // the only data entering this pools are the session strings.
539     if (apr_pool_create(&stateful_session_pool, trans->main_pool) != APR_SUCCESS) {
540         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
541         return NULL;
542     }
543     trans->stateful_session_pool = stateful_session_pool;
544
545     if (apr_thread_mutex_create(
546             &mutex, APR_THREAD_MUTEX_UNNESTED, 
547             trans->main_pool) != APR_SUCCESS) {
548         osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
549         return 1;
550     }
551     trans->mutex = mutex;
552
553     // responder thread
554     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
555          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
556          (apr_thread_create(&thread, thread_attr, 
557                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
558
559         trans->responder_thread = thread;
560         
561     } else {
562         osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
563         return 1;
564     }
565
566     // idle timeout thread
567     thread = NULL; // reset
568     thread_attr = NULL; // reset
569     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
570          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
571          (apr_thread_create(&thread, thread_attr, 
572             osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
573
574         osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
575         trans->idle_timeout_thread = thread;
576         
577     } else {
578         osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
579         return 1;
580     }
581
582     return APR_SUCCESS;
583 }
584
585
586 /**
587  * Connect to OpenSRF, create the main pool, responder thread
588  * session cache and session pool.
589  */
590 int child_init(const WebSocketServer *server) {
591     request_rec *r = server->request(server);
592
593     // osrf_handle will already be connected if this is not the first request
594     // served by this process.
595     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
596         
597         // load config values from the env
598         char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT");
599         if (timeout) {
600             if (!atoi(timeout)) {
601                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
602                     "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout);
603             } else {
604                 idle_timeout_interval = (time_t) atoi(timeout);
605             }
606         }
607
608         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
609             "WS: timeout set to %d", idle_timeout_interval);
610
611         timeout = getenv("OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME");
612         if (timeout) {
613             if (!atoi(timeout)) {
614                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
615                     "WS: invalid OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME: %s", 
616                     timeout
617                 );
618             } else {
619                 max_request_wait_time = (time_t) atoi(timeout);
620             }
621         }
622
623         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
624             "WS: max request wait time set to %d", max_request_wait_time);
625
626         char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
627         if (interval) {
628             if (!atoi(interval)) {
629                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
630                     "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s", 
631                     interval
632                 );
633             } else {
634                 idle_check_interval = (time_t) atoi(interval);
635             }
636         } 
637
638         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
639             "WS: idle check interval set to %d", idle_check_interval);
640
641       
642         char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE");
643         if (cfile) {
644             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
645                 "WS: config file set to %s", cfile);
646             config_file = cfile;
647         }
648
649         char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT");
650         if (ctxt) {
651             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
652                 "WS: config context set to %s", ctxt);
653             config_ctxt = ctxt;
654         }
655
656         // connect to opensrf
657         if (!osrfSystemBootstrapClientResc(
658                 config_file, config_ctxt, "websocket")) {   
659
660             osrfLogError(OSRF_LOG_MARK, 
661                 "WS unable to bootstrap OpenSRF client with config %s "
662                 "and context %s", config_file, config_ctxt
663             ); 
664             return 1;
665         }
666
667         osrfLogSetAppname("osrf_websocket_translator");
668         osrf_handle = osrfSystemGetTransportClient();
669     }
670
671     signal(SIGUSR1, sigusr1_handler);
672     return APR_SUCCESS;
673 }
674
675 /**
676  * Create the per-client translator
677  */
678 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
679     request_rec *r = server->request(server);
680
681     if (!trans) { // first connection
682
683         // connect to opensrf
684         if (child_init(server) != APR_SUCCESS)
685             return NULL;
686
687         // build pools, thread data, and the translator
688         if (build_startup_data(server) != APR_SUCCESS)
689             return NULL;
690     }
691
692     const char* client_ip = get_client_ip(r);
693     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
694
695     last_activity_time = time(NULL);
696     trans->client_connected = 1;
697     return trans;
698 }
699
700
701 /** 
702  * for each inbound opensrf message:
703  * 1. Stamp the ingress
704  * 2. REQUEST: log it as activity
705  * 3. DISCONNECT: remove the cached recipient
706  * then re-string-ify for xmpp delivery
707  */
708
709 static char* extract_inbound_messages(
710         const request_rec *r, 
711         const char* service, 
712         const char* thread, 
713         const char* recipient, 
714         const jsonObject *osrf_msg) {
715
716     int i;
717     int num_msgs = osrf_msg->size;
718     osrfMessage* msg;
719     osrfMessage* msg_list[num_msgs];
720
721     // here we do an extra json round-trip to get the data
722     // in a form osrf_message_deserialize can understand
723     // TODO: consider a version of osrf_message_init which can 
724     // accept a jsonObject* instead of a JSON string.
725     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
726     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
727     free(osrf_msg_json);
728
729     // should we require the caller to always pass the service?
730     if (service == NULL) service = "";
731
732     for(i = 0; i < num_msgs; i++) {
733         msg = msg_list[i];
734         osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
735
736         switch(msg->m_type) {
737
738             case REQUEST: {
739                 const jsonObject* params = msg->_params;
740                 growing_buffer* act = buffer_init(128);
741                 char* method = msg->method_name;
742                 buffer_fadd(act, "[%s] [%s] %s %s", 
743                     get_client_ip(r), "", service, method);
744
745                 const jsonObject* obj = NULL;
746                 int i = 0;
747                 const char* str;
748                 int redactParams = 0;
749                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
750                     if(!strncmp(method, str, strlen(str))) {
751                         redactParams = 1;
752                         break;
753                     }
754                 }
755                 if(redactParams) {
756                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
757                 } else {
758                     i = 0;
759                     while((obj = jsonObjectGetIndex(params, i++))) {
760                         char* str = jsonObjectToJSON(obj);
761                         if( i == 1 )
762                             OSRF_BUFFER_ADD(act, " ");
763                         else
764                             OSRF_BUFFER_ADD(act, ", ");
765                         OSRF_BUFFER_ADD(act, str);
766                         free(str);
767                     }
768                 }
769                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
770                 buffer_free(act);
771                 requests_in_flight++;
772                 break;
773             }
774
775             case DISCONNECT:
776                 clear_cached_recipient(thread);
777                 break;
778         }
779     }
780
781     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
782
783     // clean up our messages
784     for(i = 0; i < num_msgs; i++) 
785         osrfMessageFree(msg_list[i]);
786
787     return finalMsg;
788 }
789
790 /**
791  * Parse opensrf request and relay the request to the opensrf network.
792  */
793 static size_t on_message_handler_body(void *data,
794                 const WebSocketServer *server, const int type, 
795                 unsigned char *buffer, const size_t buffer_size) {
796
797     request_rec *r = server->request(server);
798
799     jsonObject *msg_wrapper = NULL; // free me
800     const jsonObject *tmp_obj = NULL;
801     const jsonObject *osrf_msg = NULL;
802     const char *service = NULL;
803     const char *thread = NULL;
804     const char *log_xid = NULL;
805     char *msg_body = NULL;
806     char *recipient = NULL;
807     int i;
808
809     if (buffer_size <= 0) return OK;
810
811     // generate a new log trace for this request. it 
812     // may be replaced by a client-provided trace below.
813     osrfLogMkXid();
814
815     osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
816
817     // buffer may not be \0-terminated, which jsonParse requires
818     char buf[buffer_size + 1];
819     memcpy(buf, buffer, buffer_size);
820     buf[buffer_size] = '\0';
821
822     osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", buf);
823
824     msg_wrapper = jsonParse(buf);
825
826     if (msg_wrapper == NULL) {
827         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
828         return HTTP_BAD_REQUEST;
829     }
830
831     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
832
833     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
834         service = jsonObjectGetString(tmp_obj);
835
836     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
837         thread = jsonObjectGetString(tmp_obj);
838
839     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
840         log_xid = jsonObjectGetString(tmp_obj);
841
842     if (log_xid) {
843
844         // use the caller-provide log trace id
845         if (strlen(log_xid) > MAX_THREAD_SIZE) {
846             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
847             return HTTP_BAD_REQUEST;
848         }
849
850         osrfLogForceXid(log_xid);
851     }
852
853     if (thread) {
854
855         if (strlen(thread) > MAX_THREAD_SIZE) {
856             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
857             return HTTP_BAD_REQUEST;
858         }
859
860         // since clients can provide their own threads at session start time,
861         // the presence of a thread does not guarantee a cached recipient
862         recipient = (char*) apr_hash_get(
863             trans->stateful_session_cache, thread, APR_HASH_KEY_STRING);
864
865         if (recipient) {
866             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
867         }
868     }
869
870     if (!recipient) {
871
872         if (service) {
873             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
874                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
875             recipient_buf[size] = '\0';                                          
876             recipient = recipient_buf;
877
878         } else {
879             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
880             return HTTP_BAD_REQUEST;
881         }
882     }
883
884     osrfLogDebug(OSRF_LOG_MARK, 
885         "WS relaying message to opensrf thread=%s, recipient=%s", 
886             thread, recipient);
887
888     msg_body = extract_inbound_messages(
889         r, service, thread, recipient, osrf_msg);
890
891     osrfLogInternal(OSRF_LOG_MARK, 
892         "WS relaying inbound message: %s", msg_body);
893
894     transport_message *tmsg = message_init(
895         msg_body, NULL, thread, recipient, NULL);
896
897     message_set_osrf_xid(tmsg, osrfLogGetXid());
898     client_send_message(osrf_handle, tmsg);
899
900
901     osrfLogClearXid();
902     message_free(tmsg);                                                         
903     jsonObjectFree(msg_wrapper);
904     free(msg_body);
905
906     last_activity_time = time(NULL);
907     return OK;
908 }
909
910 static size_t CALLBACK on_message_handler(void *data,
911                 const WebSocketServer *server, const int type, 
912                 unsigned char *buffer, const size_t buffer_size) {
913
914     if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
915         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
916         return 1; // TODO: map to apr_status_t value?
917     }
918
919     apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
920
921     if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
922         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
923         return 1;
924     }
925
926     return stat;
927 }
928
929
930 /**
931  * Clear the session cache, release the session pool
932  */
933 void CALLBACK on_disconnect_handler(
934     void *data, const WebSocketServer *server) {
935
936     // if the threads wake up during disconnect, this tells 
937     // them to go back to sleep.
938     trans->client_connected = 0;
939
940     request_rec *r = server->request(server);
941     osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); 
942
943     // Clear any lingering session data
944     // NOTE: we could apr_pool_destroy the stateful_session_pool to truly free
945     // the memory, but since there is a limit to the size of the pool
946     // (max_concurrent_sessions), the memory cannot grow unbounded, 
947     // so there's no need.
948     apr_hash_clear(trans->stateful_session_cache);
949     apr_pool_clear(trans->stateful_session_pool);
950 }
951
952 static WebSocketPlugin osrf_websocket_plugin = {
953     sizeof(WebSocketPlugin),
954     WEBSOCKET_PLUGIN_VERSION_0,
955     NULL, // on_destroy_handler
956     on_connect_handler,
957     on_message_handler,
958     on_disconnect_handler
959 };
960
961 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
962     return &osrf_websocket_plugin;
963 }
964