]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/gateway/osrf_websocket_translator.c
LP#1268619: websockets : added config docs to install readme
[OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  Messages are analyzed to determine
21  * when a connect/disconnect occurs, so that the cache of recipients can be
22  * properly managed.  We also activity-log REQUEST messages.
23  *
24  * Messages to/from the websocket client take the following form:
25  * {
26  *   "service"  : "opensrf.foo", // required
27  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
28  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
29  *   "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...]   // required
30  * }
31  *
32  * Each translator operates with three threads.  One thread receives messages
33  * from the websocket client, translates, and relays them to the opensrf 
34  * network. The second thread collects responses from the opensrf network and 
35  * relays them back to the websocket client.  The third thread inspects
36  * the idle timeout interval t see if it's time to drop the idle client.
37  *
38  * After the initial setup, all thread actions occur within a thread
39  * mutex.  The desired affect is a non-threaded application that uses
40  * threads for the sole purpose of having one thread listening for
41  * incoming data, while a second thread listens for responses, and a
42  * third checks the idle timeout.  When any thread awakens, it's the
43  * only thread in town until it goes back to sleep (i.e. listening on
44  * its socket for data).
45  *
46  * Note that with the opensrf "thread", which allows us to identify the
47  * opensrf session, the caller does not need to provide a recipient
48  * address.  The "service" is only required to start a new opensrf
49  * session.  After the sesession is started, all future communication is
50  * based solely on the thread.  However, the "service" should be passed
51  * by the caller for all requests to ensure it is properly logged in the
52  * activity log.
53  *
54  * Every inbound and outbound message updates the last_activity_time.
55  * A separate thread wakes periodically to see if the time since the
56  * last_activity_time exceeds the configured idle_timeout_interval.  If
57  * so, a disconnect is sent to the client, completing the conversation.
58  *
59  * Configuration goes directly into the Apache envvars file.  
60  * (e.g. /etc/apache2-websockets/envvars).  As of today, it's not 
61  * possible to leverage Apache configuration directives directly,
62  * since this is not an Apache module, but a shared library loaded
63  * by an apache module.  This includes SetEnv / SetEnvIf.
64  *
65  * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300
66  * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
67  * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
68  * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
69  */
70
71 /**
72  * TODO:
73  * short-timeout mode for brick detachment where inactivity timeout drops way 
74  * down for graceful disconnects.
75  */
76
77 #include <stdlib.h>
78 #include <signal.h>
79 #include <unistd.h>
80 #include "httpd.h"
81 #include "http_log.h"
82 #include "apr_strings.h"
83 #include "apr_thread_proc.h"
84 #include "apr_hash.h"
85 #include "websocket_plugin.h"
86 #include "opensrf/log.h"
87 #include "opensrf/osrf_json.h"
88 #include "opensrf/transport_client.h"
89 #include "opensrf/transport_message.h"
90 #include "opensrf/osrf_system.h"                                                
91 #include "opensrf/osrfConfig.h"
92
93 #define MAX_THREAD_SIZE 64
94 #define RECIP_BUF_SIZE 128
95 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
96
97
98 // default values, replaced during setup (below) as needed.
99 static char* config_file = "/openils/conf/opensrf_core.xml";
100 static char* config_ctxt = "gateway";
101 static time_t idle_timeout_interval = 60; 
102 static time_t idle_check_interval = 5;
103 static time_t last_activity_time = 0;
104
105 // true if we've received a signal to start graceful shutdown
106 static int shutdown_requested = 0; 
107 static void sigusr1_handler(int sig);
108 static void sigusr1_handler(int sig) {                                       
109     shutdown_requested = 1; 
110     signal(SIGUSR1, sigusr1_handler);
111     osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown");
112 }
113
114 typedef struct _osrfWebsocketTranslator {
115
116     /** Our handle for communicating with the caller */
117     const WebSocketServer *server;
118     
119     /**
120      * Standalone, per-process APR pool.  Primarily
121      * there for managing thread data, which lasts 
122      * the duration of the process.
123      */
124     apr_pool_t *main_pool;
125
126     /**
127      * Map of thread => drone-xmpp-address.  Maintaining this
128      * map internally means the caller never need know about
129      * internal XMPP addresses and the server doesn't have to 
130      * verify caller-specified recipient addresses.  It's
131      * all managed internally.
132      */
133     apr_hash_t *session_cache; 
134
135     /**
136      * session_pool contains the key/value pairs stored in
137      * the session_cache.  The pool is regularly destroyed
138      * and re-created to avoid long-term memory consumption
139      */
140     apr_pool_t *session_pool;
141
142     /**
143      * Thread responsible for collecting responses on the opensrf
144      * network and relaying them back to the caller
145      */
146     apr_thread_t *responder_thread;
147
148     /**
149      * Thread responsible for checking inactivity timeout.
150      * If no activitity occurs within the configured interval,
151      * a disconnect is sent to the client and the connection
152      * is terminated.
153      */
154     apr_thread_t *idle_timeout_thread;
155
156     /**
157      * All message handling code is wrapped in a thread mutex such
158      * that all actions (after the initial setup) are serialized
159      * to minimize the possibility of multi-threading snafus.
160      */
161     apr_thread_mutex_t *mutex;
162
163     /**
164      * True if a websocket client is currently connected
165      */
166     int client_connected;
167
168     /** OpenSRF jouter name */
169     char* osrf_router;
170
171     /** OpenSRF domain */
172     char* osrf_domain;
173
174 } osrfWebsocketTranslator;
175
176 static osrfWebsocketTranslator *trans = NULL;
177 static transport_client *osrf_handle = NULL;
178 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
179
180 static void clear_cached_recipient(const char* thread) {
181     apr_pool_t *pool = NULL;                                                
182
183     if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
184
185         osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
186
187         // remove it from the hash
188         apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
189
190         if (apr_hash_count(trans->session_cache) == 0) {
191             osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
192
193             // memory accumulates in the session_pool as sessions are cached then 
194             // un-cached.  Un-caching removes strings from the hash, but not the 
195             // pool itself.  That only happens when the pool is destroyed. Here
196             // we destroy the session pool to clear any lingering memory, then
197             // re-create it for future caching.
198             apr_pool_destroy(trans->session_pool);
199     
200             if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
201                 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
202                 trans->session_pool = NULL;
203                 return;
204             }
205
206             trans->session_pool = pool;
207         }
208     }
209 }
210
211 void* osrf_responder_thread_main_body(transport_message *tmsg) {
212
213     osrfList *msg_list = NULL;
214     osrfMessage *one_msg = NULL;
215     int i;
216
217     osrfLogDebug(OSRF_LOG_MARK, 
218         "WS received opensrf response for thread=%s", tmsg->thread);
219
220     // first we need to perform some maintenance
221     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
222
223     for (i = 0; i < msg_list->size; i++) {
224         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
225
226         osrfLogDebug(OSRF_LOG_MARK, 
227             "WS returned response of type %d", one_msg->m_type);
228
229         /*  if our client just successfully connected to an opensrf service,
230             cache the sender so that future calls on this thread will use
231             the correct recipient. */
232         if (one_msg && one_msg->m_type == STATUS) {
233
234
235             // only cache recipients if the client is still connected
236             if (trans->client_connected && 
237                     one_msg->status_code == OSRF_STATUS_OK) {
238
239                 if (!apr_hash_get(trans->session_cache, 
240                         tmsg->thread, APR_HASH_KEY_STRING)) {
241
242                     osrfLogDebug(OSRF_LOG_MARK, 
243                         "WS caching sender thread=%s, sender=%s", 
244                         tmsg->thread, tmsg->sender);
245
246                     apr_hash_set(trans->session_cache, 
247                         apr_pstrdup(trans->session_pool, tmsg->thread),
248                         APR_HASH_KEY_STRING, 
249                         apr_pstrdup(trans->session_pool, tmsg->sender));
250                 }
251
252             } else {
253
254                 // connection timed out; clear the cached recipient
255                 // regardless of whether the client is still connected
256                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
257                     clear_cached_recipient(tmsg->thread);
258             }
259         }
260     }
261
262     // osrfMessageDeserialize applies the freeItem handler to the 
263     // newly created osrfList.  We only need to free the list and 
264     // the individual osrfMessage's will be freed along with it
265     osrfListFree(msg_list);
266
267     if (!trans->client_connected) {
268
269         osrfLogInfo(OSRF_LOG_MARK, 
270             "WS discarding response for thread=%s", tmsg->thread);
271
272         return;
273     }
274     
275     // client is still connected. 
276     // relay the response messages to the client
277     jsonObject *msg_wrapper = NULL;
278     char *msg_string = NULL;
279
280     // build the wrapper object
281     msg_wrapper = jsonNewObject(NULL);
282     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
283     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
284     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
285
286     if (tmsg->is_error) {
287         osrfLogError(OSRF_LOG_MARK, 
288             "WS received jabber error message in response to thread=%s", 
289             tmsg->thread);
290         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
291     }
292
293     msg_string = jsonObjectToJSONRaw(msg_wrapper);
294
295     // drop the JSON on the outbound wire
296     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
297         (unsigned char*) msg_string, strlen(msg_string));
298
299     free(msg_string);
300     jsonObjectFree(msg_wrapper);
301 }
302
303 /**
304  * Sleep and regularly wake to see if the process has been idle for too
305  * long.  If so, send a disconnect to the client.
306  *
307  */
308 void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
309         apr_thread_t *thread, void *data) {
310
311     // sleep time defaults to the check interval, but may 
312     // be shortened during shutdown.
313     int sleep_time = idle_check_interval;
314     int shutdown_loops = 0;
315
316     while (1) {
317
318         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
319             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
320             return NULL;
321         }
322
323         // note: receiving a signal (e.g. SIGUSR1) will not interrupt
324         // this sleep(), since it's running within its own thread.
325         // During graceful shtudown, we may wait up to 
326         // idle_check_interval seconds before initiating shutdown.
327         sleep(sleep_time);
328         
329         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
330             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
331             return NULL;
332         }
333
334         // no client is connected.  reset sleep time go back to sleep.
335         if (!trans->client_connected) {
336             sleep_time = idle_check_interval;
337             continue;
338         }
339
340         // do we have any active conversations with the connected client?
341         int active_count = apr_hash_count(trans->session_cache);
342
343         if (active_count) {
344
345             if (shutdown_requested) {
346                 // active conversations means we can't shut down.  
347                 // shorten the check interval to re-check more often.
348                 shutdown_loops++;
349                 osrfLogDebug(OSRF_LOG_MARK, 
350                     "WS: %d active conversation(s) found in shutdown after "
351                     "%d attempts.  Sleeping...", shutdown_loops, active_count
352                 );
353
354                 if (shutdown_loops > 30) {
355                     // this is clearly a long-running conversation, let's
356                     // check less frequently to avoid excessive logging.
357                     sleep_time = 3;
358                 } else {
359                     sleep_time = 1;
360                 }
361             } 
362
363             // active conversations means keep going.  There's no point in
364             // checking the idle time (below) if we're mid-conversation
365             continue;
366         }
367
368         // no active conversations
369              
370         if (shutdown_requested) {
371             // there's no need to reset the shutdown vars (loops/requested)
372             // SIGUSR1 is Apaches reload signal, which means this process
373             // will be going away as soon as the client is disconnected.
374
375             osrfLogInfo(OSRF_LOG_MARK,
376                 "WS: no active conversations remain in shutdown; "
377                     "closing client connection");
378
379         } else { 
380             // see how long we've been idle.  If too long, kick the client
381
382             time_t now = time(NULL);
383             time_t difference = now - last_activity_time;
384
385             osrfLogDebug(OSRF_LOG_MARK, 
386                 "WS has been idle for %d seconds", difference);
387
388             if (difference < idle_timeout_interval) {
389                 // Last activity occurred within the idle timeout interval.
390                 continue;
391             }
392
393             // idle timeout exceeded
394             osrfLogDebug(OSRF_LOG_MARK, 
395                 "WS: idle timeout exceeded.  now=%d / last=%d; " 
396                 "closing client connection", now, last_activity_time);
397         }
398
399
400         // send a disconnect to the client, which will come back around
401         // to cause our on_disconnect_handler to run.
402         osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
403         trans->server->close(trans->server);
404
405         // client will be going away, reset sleep time
406         sleep_time = idle_check_interval;
407     }
408
409     // should never get here
410     return NULL;
411 }
412
413 /**
414  * Responder thread main body.
415  * Collects responses from the opensrf network and relays them to the 
416  * websocket caller.
417  */
418 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
419
420     transport_message *tmsg;
421     while (1) {
422
423         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
424             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
425             return NULL;
426         }
427
428         // wait for a response
429         tmsg = client_recv(osrf_handle, -1);
430
431         if (!tmsg) continue; // early exit on interrupt
432
433         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
434             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
435             return NULL;
436         }
437
438         osrfLogForceXid(tmsg->osrf_xid);
439         osrf_responder_thread_main_body(tmsg);
440         message_free(tmsg);                                                         
441         last_activity_time = time(NULL);
442     }
443
444     return NULL;
445 }
446
447
448
449 /**
450  * Connect to OpenSRF, create the main pool, responder thread
451  * session cache and session pool.
452  */
453 int child_init(const WebSocketServer *server) {
454
455     apr_pool_t *pool = NULL;                                                
456     apr_thread_t *thread = NULL;
457     apr_threadattr_t *thread_attr = NULL;
458     apr_thread_mutex_t *mutex = NULL;
459     request_rec *r = server->request(server);
460
461
462     // osrf_handle will already be connected if this is not the first request
463     // served by this process.
464     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
465         
466         // load config values from the env
467         char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT");
468         if (timeout) {
469             if (!atoi(timeout)) {
470                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
471                     "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout);
472             } else {
473                 idle_timeout_interval = (time_t) atoi(timeout);
474             }
475         }
476
477         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
478             "WS: timeout set to %d", idle_timeout_interval);
479
480         char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
481         if (interval) {
482             if (!atoi(interval)) {
483                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
484                     "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s", 
485                     interval
486                 );
487             } else {
488                 idle_check_interval = (time_t) atoi(interval);
489             }
490         } 
491
492         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
493             "WS: idle check interval set to %d", idle_check_interval);
494
495       
496         char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE");
497         if (cfile) {
498             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
499                 "WS: config file set to %s", cfile);
500             config_file = cfile;
501         }
502
503         char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT");
504         if (ctxt) {
505             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
506                 "WS: config context set to %s", ctxt);
507             config_ctxt = ctxt;
508         }
509
510         // connect to opensrf
511         if (!osrfSystemBootstrapClientResc(
512                 config_file, config_ctxt, "websocket")) {   
513
514             osrfLogError(OSRF_LOG_MARK, 
515                 "WS unable to bootstrap OpenSRF client with config %s "
516                 "and context %s", config_file, config_ctxt
517             ); 
518             return 1;
519         }
520
521         osrf_handle = osrfSystemGetTransportClient();
522     }
523
524     // create a standalone pool for our translator data
525     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
526         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
527         return 1;
528     }
529
530     // allocate our static translator instance
531     trans = (osrfWebsocketTranslator*) 
532         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
533
534     if (trans == NULL) {
535         osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
536         return 1;
537     }
538
539     trans->main_pool = pool;
540     trans->server = server;
541     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
542     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
543
544     trans->session_cache = apr_hash_make(pool);
545
546     if (trans->session_cache == NULL) {
547         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
548         return 1;
549     }
550
551     // Create the responder thread.  Once created, 
552     // it runs for the lifetime of this process.
553     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
554          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
555          (apr_thread_create(&thread, thread_attr, 
556                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
557
558         trans->responder_thread = thread;
559         
560     } else {
561         osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
562         return 1;
563     }
564
565     // Create the idle timeout thread, which lives for the lifetime
566     // of the process.
567     thread = NULL; // reset
568     thread_attr = NULL; // reset
569     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
570          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
571          (apr_thread_create(&thread, thread_attr, 
572             osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
573
574         osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
575         trans->idle_timeout_thread = thread;
576         
577     } else {
578         osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
579         return 1;
580     }
581
582
583     if (apr_thread_mutex_create(
584             &mutex, APR_THREAD_MUTEX_UNNESTED, 
585             trans->main_pool) != APR_SUCCESS) {
586         osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
587         return 1;
588     }
589
590     trans->mutex = mutex;
591
592     signal(SIGUSR1, sigusr1_handler);
593
594     return APR_SUCCESS;
595 }
596
597 /**
598  * Create the per-client translator
599  */
600 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
601     request_rec *r = server->request(server);
602     apr_pool_t *pool;
603     apr_thread_t *thread = NULL;
604     apr_threadattr_t *thread_attr = NULL;
605
606 #ifdef APACHE_MIN_24
607     char* client_ip = r->connection->client_ip;
608 #else
609     char* client_ip = r->connection->remote_ip;
610 #endif
611
612     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
613
614     if (!trans) {
615         // first connection
616         if (child_init(server) != APR_SUCCESS) {
617             return NULL;
618         }
619     }
620
621     // create a standalone pool for the session cache values
622     // this pool will be destroyed and re-created regularly
623     // to clear session memory
624     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
625         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
626         return NULL;
627     }
628
629     trans->session_pool = pool;
630     trans->client_connected = 1;
631     last_activity_time = time(NULL);
632
633     return trans;
634 }
635
636
637 /** 
638  * for each inbound opensrf message:
639  * 1. Stamp the ingress
640  * 2. REQUEST: log it as activity
641  * 3. DISCONNECT: remove the cached recipient
642  * then re-string-ify for xmpp delivery
643  */
644
645 static char* extract_inbound_messages(
646         const request_rec *r, 
647         const char* service, 
648         const char* thread, 
649         const char* recipient, 
650         const jsonObject *osrf_msg) {
651
652     int i;
653     int num_msgs = osrf_msg->size;
654     osrfMessage* msg;
655     osrfMessage* msg_list[num_msgs];
656
657     // here we do an extra json round-trip to get the data
658     // in a form osrf_message_deserialize can understand
659     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
660     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
661     free(osrf_msg_json);
662
663     // should we require the caller to always pass the service?
664     if (service == NULL) service = "";
665
666     for(i = 0; i < num_msgs; i++) {
667         msg = msg_list[i];
668         osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
669
670         switch(msg->m_type) {
671
672             case REQUEST: {
673                 const jsonObject* params = msg->_params;
674                 growing_buffer* act = buffer_init(128);
675                 char* method = msg->method_name;
676                 buffer_fadd(act, "[%s] [%s] %s %s", 
677                     r->connection->remote_ip, "", service, method);
678
679                 const jsonObject* obj = NULL;
680                 int i = 0;
681                 const char* str;
682                 int redactParams = 0;
683                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
684                     if(!strncmp(method, str, strlen(str))) {
685                         redactParams = 1;
686                         break;
687                     }
688                 }
689                 if(redactParams) {
690                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
691                 } else {
692                     i = 0;
693                     while((obj = jsonObjectGetIndex(params, i++))) {
694                         char* str = jsonObjectToJSON(obj);
695                         if( i == 1 )
696                             OSRF_BUFFER_ADD(act, " ");
697                         else
698                             OSRF_BUFFER_ADD(act, ", ");
699                         OSRF_BUFFER_ADD(act, str);
700                         free(str);
701                     }
702                 }
703                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
704                 buffer_free(act);
705                 break;
706             }
707
708             case DISCONNECT:
709                 clear_cached_recipient(thread);
710                 break;
711         }
712     }
713
714     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
715
716     // clean up our messages
717     for(i = 0; i < num_msgs; i++) 
718         osrfMessageFree(msg_list[i]);
719
720     return finalMsg;
721 }
722
723 /**
724  * Parse opensrf request and relay the request to the opensrf network.
725  */
726 static size_t on_message_handler_body(void *data,
727                 const WebSocketServer *server, const int type, 
728                 unsigned char *buffer, const size_t buffer_size) {
729
730     request_rec *r = server->request(server);
731
732     jsonObject *msg_wrapper = NULL; // free me
733     const jsonObject *tmp_obj = NULL;
734     const jsonObject *osrf_msg = NULL;
735     const char *service = NULL;
736     const char *thread = NULL;
737     const char *log_xid = NULL;
738     char *msg_body = NULL;
739     char *recipient = NULL;
740     int i;
741
742     if (buffer_size <= 0) return OK;
743
744     // generate a new log trace for this request. it 
745     // may be replaced by a client-provided trace below.
746     osrfLogMkXid();
747
748     osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
749
750     // buffer may not be \0-terminated, which jsonParse requires
751     char buf[buffer_size + 1];
752     memcpy(buf, buffer, buffer_size);
753     buf[buffer_size] = '\0';
754
755     msg_wrapper = jsonParse(buf);
756
757     if (msg_wrapper == NULL) {
758         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
759         return HTTP_BAD_REQUEST;
760     }
761
762     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
763
764     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
765         service = jsonObjectGetString(tmp_obj);
766
767     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
768         thread = jsonObjectGetString(tmp_obj);
769
770     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
771         log_xid = jsonObjectGetString(tmp_obj);
772
773     if (log_xid) {
774
775         // use the caller-provide log trace id
776         if (strlen(log_xid) > MAX_THREAD_SIZE) {
777             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
778             return HTTP_BAD_REQUEST;
779         }
780
781         osrfLogForceXid(log_xid);
782     }
783
784     if (thread) {
785
786         if (strlen(thread) > MAX_THREAD_SIZE) {
787             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
788             return HTTP_BAD_REQUEST;
789         }
790
791         // since clients can provide their own threads at session start time,
792         // the presence of a thread does not guarantee a cached recipient
793         recipient = (char*) apr_hash_get(
794             trans->session_cache, thread, APR_HASH_KEY_STRING);
795
796         if (recipient) {
797             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
798         }
799     }
800
801     if (!recipient) {
802
803         if (service) {
804             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
805                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
806             recipient_buf[size] = '\0';                                          
807             recipient = recipient_buf;
808
809         } else {
810             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
811             return HTTP_BAD_REQUEST;
812         }
813     }
814
815     osrfLogDebug(OSRF_LOG_MARK, 
816         "WS relaying message to opensrf thread=%s, recipient=%s", 
817             thread, recipient);
818
819     msg_body = extract_inbound_messages(
820         r, service, thread, recipient, osrf_msg);
821
822     transport_message *tmsg = message_init(
823         msg_body, NULL, thread, recipient, NULL);
824
825     message_set_osrf_xid(tmsg, osrfLogGetXid());
826     client_send_message(osrf_handle, tmsg);
827
828
829     osrfLogClearXid();
830     message_free(tmsg);                                                         
831     jsonObjectFree(msg_wrapper);
832     free(msg_body);
833
834     last_activity_time = time(NULL);
835
836     return OK;
837 }
838
839 static size_t CALLBACK on_message_handler(void *data,
840                 const WebSocketServer *server, const int type, 
841                 unsigned char *buffer, const size_t buffer_size) {
842
843     if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
844         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
845         return 1; // TODO: map to apr_status_t value?
846     }
847
848     apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
849
850     if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
851         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
852         return 1;
853     }
854
855     return stat;
856 }
857
858
859 /**
860  * Clear the session cache, release the session pool
861  */
862 void CALLBACK on_disconnect_handler(
863     void *data, const WebSocketServer *server) {
864
865     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
866     trans->client_connected = 0;
867
868     // timeout thread is recreated w/ each new connection
869     apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS);
870     trans->idle_timeout_thread = NULL;
871     
872     // ensure no errant session data is sticking around
873     apr_hash_clear(trans->session_cache);
874
875     // strictly speaking, this pool will get destroyed when
876     // r->pool is destroyed, but it doesn't hurt to explicitly
877     // destroy it ourselves.
878     apr_pool_destroy(trans->session_pool);
879     trans->session_pool = NULL;
880
881     request_rec *r = server->request(server);
882
883     osrfLogInfo(OSRF_LOG_MARK, 
884         "WS disconnect from %s", r->connection->remote_ip); 
885         //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
886 }
887
888 /**
889  * Be nice and clean up our mess
890  */
891 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
892     if (trans) {
893         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
894         apr_thread_mutex_destroy(trans->mutex);
895         if (trans->session_pool)
896             apr_pool_destroy(trans->session_pool);
897         apr_pool_destroy(trans->main_pool);
898     }
899
900     trans = NULL;
901 }
902
903 static WebSocketPlugin osrf_websocket_plugin = {
904     sizeof(WebSocketPlugin),
905     WEBSOCKET_PLUGIN_VERSION_0,
906     on_destroy_handler,
907     on_connect_handler,
908     on_message_handler,
909     on_disconnect_handler
910 };
911
912 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
913     return &osrf_websocket_plugin;
914 }
915