LP#1268619: websocket gateway: improved memory mgt; logging
[OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  Messages are analyzed to determine
21  * when a connect/disconnect occurs, so that the cache of recipients can be
22  * properly managed.  We also activity-log REQUEST messages.
23  *
24  * Messages to/from the websocket client take the following form:
25  * {
26  *   "service"  : "opensrf.foo", // required
27  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
28  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
29  *   "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...]   // required
30  * }
31  *
32  * Each translator operates with two threads.  One thread receives messages
33  * from the websocket client, translates, and relays them to the opensrf 
34  * network. The second thread collects responses from the opensrf network and 
35  * relays them back to the websocket client.
36  *
37  * After the initial setup, all thread actions occur within a thread mutex.
38  * The desired affect is a non-threaded application that uses threads for 
39  * the sole purpose of having one thread listening for incoming data, while
40  * a second thread listens for responses.  When either thread awakens, it's
41  * the only thread in town until it goes back to sleep (i.e. listening on 
42  * its socket for data).
43  *
44  * Note that with a "thread", which allows us to identify the opensrf session,
45  * the caller does not need to provide a recipient address.  The "service" is
46  * only required to start a new opensrf session.  After the sesession is 
47  * started, all future communication is based solely on the thread.  However,
48  * the "service" should be passed by the caller for all requests to ensure it
49  * is properly logged in the activity log.
50  */
51
52 /**
53  * TODO:
54  * short-timeout mode for brick detachment where inactivity timeout drops way 
55  * down for graceful disconnects.
56  */
57
58 #include "httpd.h"
59 #include "apr_strings.h"
60 #include "apr_thread_proc.h"
61 #include "apr_hash.h"
62 #include "websocket_plugin.h"
63 #include "opensrf/log.h"
64 #include "opensrf/osrf_json.h"
65 #include "opensrf/transport_client.h"
66 #include "opensrf/transport_message.h"
67 #include "opensrf/osrf_system.h"                                                
68 #include "opensrf/osrfConfig.h"
69
70 #define MAX_THREAD_SIZE 64
71 #define RECIP_BUF_SIZE 128
72 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
73
74 typedef struct _osrfWebsocketTranslator {
75
76     /** Our handle for communicating with the caller */
77     const WebSocketServer *server;
78     
79     /**
80      * Standalone, per-process APR pool.  Primarily
81      * there for managing thread data, which lasts 
82      * the duration of the process.
83      */
84     apr_pool_t *main_pool;
85
86     /**
87      * Map of thread => drone-xmpp-address.  Maintaining this
88      * map internally means the caller never need know about
89      * internal XMPP addresses and the server doesn't have to 
90      * verify caller-specified recipient addresses.  It's
91      * all managed internally.
92      */
93     apr_hash_t *session_cache; 
94
95     /**
96      * session_pool contains the key/value pairs stored in
97      * the session_cache.  The pool is regularly destroyed
98      * and re-created to avoid long-term memory consumption
99      */
100     apr_pool_t *session_pool;
101
102     /**
103      * Thread responsible for collecting responses on the opensrf
104      * network and relaying them back to the caller
105      */
106     apr_thread_t *responder_thread;
107
108     /**
109      * All message handling code is wrapped in a thread mutex such
110      * that all actions (after the initial setup) are serialized
111      * to minimize the possibility of multi-threading snafus.
112      */
113     apr_thread_mutex_t *mutex;
114
115     /**
116      * True if a websocket client is currently connected
117      */
118     int client_connected;
119
120     /** OpenSRF jouter name */
121     char* osrf_router;
122
123     /** OpenSRF domain */
124     char* osrf_domain;
125
126 } osrfWebsocketTranslator;
127
128 static osrfWebsocketTranslator *trans = NULL;
129 static transport_client *osrf_handle = NULL;
130 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
131
132 static void clear_cached_recipient(const char* thread) {
133     apr_pool_t *pool = NULL;                                                
134
135     if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
136
137         osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
138
139         // remove it from the hash
140         apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
141
142         if (apr_hash_count(trans->session_cache) == 0) {
143             osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
144
145             // memory accumulates in the session_pool as sessions are cached then 
146             // un-cached.  Un-caching removes strings from the hash, but not the 
147             // pool itself.  That only happens when the pool is destroyed. Here
148             // we destroy the session pool to clear any lingering memory, then
149             // re-create it for future caching.
150             apr_pool_destroy(trans->session_pool);
151     
152             if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
153                 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
154                 trans->session_pool = NULL;
155                 return;
156             }
157
158             trans->session_pool = pool;
159         }
160     }
161 }
162
163
164 void* osrf_responder_thread_main_body(transport_message *tmsg) {
165
166     osrfList *msg_list = NULL;
167     osrfMessage *one_msg = NULL;
168     int i;
169
170     osrfLogDebug(OSRF_LOG_MARK, 
171         "WS received opensrf response for thread=%s", tmsg->thread);
172
173     // first we need to perform some maintenance
174     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
175
176     for (i = 0; i < msg_list->size; i++) {
177         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
178
179         osrfLogDebug(OSRF_LOG_MARK, 
180             "WS returned response of type %d", one_msg->m_type);
181
182         /*  if our client just successfully connected to an opensrf service,
183             cache the sender so that future calls on this thread will use
184             the correct recipient. */
185         if (one_msg && one_msg->m_type == STATUS) {
186
187
188             // only cache recipients if the client is still connected
189             if (trans->client_connected && 
190                     one_msg->status_code == OSRF_STATUS_OK) {
191
192                 if (!apr_hash_get(trans->session_cache, 
193                         tmsg->thread, APR_HASH_KEY_STRING)) {
194
195                     osrfLogDebug(OSRF_LOG_MARK, 
196                         "WS caching sender thread=%s, sender=%s", 
197                         tmsg->thread, tmsg->sender);
198
199                     apr_hash_set(trans->session_cache, 
200                         apr_pstrdup(trans->session_pool, tmsg->thread),
201                         APR_HASH_KEY_STRING, 
202                         apr_pstrdup(trans->session_pool, tmsg->sender));
203                 }
204
205             } else {
206
207                 // connection timed out; clear the cached recipient
208                 // regardless of whether the client is still connected
209                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
210                     clear_cached_recipient(tmsg->thread);
211             }
212         }
213     }
214
215     // osrfMessageDeserialize applies the freeItem handler to the 
216     // newly created osrfList.  We only need to free the list and 
217     // the individual osrfMessage's will be freed along with it
218     osrfListFree(msg_list);
219
220     if (!trans->client_connected) {
221
222         osrfLogInfo(OSRF_LOG_MARK, 
223             "WS discarding response for thread=%s", tmsg->thread);
224
225         return;
226     }
227     
228     // client is still connected. 
229     // relay the response messages to the client
230     jsonObject *msg_wrapper = NULL;
231     char *msg_string = NULL;
232
233     // build the wrapper object
234     msg_wrapper = jsonNewObject(NULL);
235     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
236     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
237     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
238
239     if (tmsg->is_error) {
240         osrfLogError(OSRF_LOG_MARK, 
241             "WS received jabber error message in response to thread=%s", 
242             tmsg->thread);
243         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
244     }
245
246     msg_string = jsonObjectToJSONRaw(msg_wrapper);
247
248     // drop the JSON on the outbound wire
249     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
250         (unsigned char*) msg_string, strlen(msg_string));
251
252     free(msg_string);
253     jsonObjectFree(msg_wrapper);
254
255 }
256
257 /**
258  * Responder thread main body.
259  * Collects responses from the opensrf network and relays them to the 
260  * websocket caller.
261  */
262 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
263
264     transport_message *tmsg;
265     while (1) {
266
267         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
268             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
269             return NULL;
270         }
271
272         // wait for a response
273         tmsg = client_recv(osrf_handle, -1);
274
275         if (!tmsg) continue; // early exit on interrupt
276
277         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
278             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
279             return NULL;
280         }
281
282         osrfLogForceXid(tmsg->osrf_xid);
283         osrf_responder_thread_main_body(tmsg);
284         message_free(tmsg);                                                         
285     }
286
287     return NULL;
288 }
289
290
291
292 /**
293  * Connect to OpenSRF, create the main pool, responder thread
294  * session cache and session pool.
295  */
296 int child_init(const WebSocketServer *server) {
297
298     apr_pool_t *pool = NULL;                                                
299     apr_thread_t *thread = NULL;
300     apr_threadattr_t *thread_attr = NULL;
301     apr_thread_mutex_t *mutex = NULL;
302     request_rec *r = server->request(server);
303         
304     osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
305
306     // osrf_handle will already be connected if this is not the first request
307     // served by this process.
308     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
309         char* config_file = "/openils/conf/opensrf_core.xml";
310         char* config_ctx = "gateway"; //TODO config
311         if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {   
312             osrfLogError(OSRF_LOG_MARK, 
313                 "WS unable to bootstrap OpenSRF client with config %s", config_file); 
314             return 1;
315         }
316
317         osrf_handle = osrfSystemGetTransportClient();
318     }
319
320     // create a standalone pool for our translator data
321     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
322         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
323         return 1;
324     }
325
326
327     // allocate our static translator instance
328     trans = (osrfWebsocketTranslator*) 
329         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
330
331     if (trans == NULL) {
332         osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
333         return 1;
334     }
335
336     trans->main_pool = pool;
337     trans->server = server;
338     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
339     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
340
341     trans->session_cache = apr_hash_make(pool);
342
343     if (trans->session_cache == NULL) {
344         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
345         return 1;
346     }
347
348     // Create the responder thread.  Once created, 
349     // it runs for the lifetime of this process.
350     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
351          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
352          (apr_thread_create(&thread, thread_attr, 
353                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
354
355         trans->responder_thread = thread;
356         
357     } else {
358         osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
359         return 1;
360     }
361
362     if (apr_thread_mutex_create(
363             &mutex, APR_THREAD_MUTEX_UNNESTED, 
364             trans->main_pool) != APR_SUCCESS) {
365         osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
366         return 1;
367     }
368
369     trans->mutex = mutex;
370
371     return APR_SUCCESS;
372 }
373
374 /**
375  * Create the per-client translator
376  */
377 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
378     request_rec *r = server->request(server);
379     apr_pool_t *pool;
380
381     osrfLogInfo(OSRF_LOG_MARK, 
382         "WS connect from %s", r->connection->remote_ip); 
383         //"WS connect from %s", r->connection->client_ip); // apache 2.4
384
385     if (!trans) {
386         // first connection
387         if (child_init(server) != APR_SUCCESS) {
388             return NULL;
389         }
390     }
391
392     // create a standalone pool for the session cache values
393     // this pool will be destroyed and re-created regularly
394     // to clear session memory
395     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
396         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
397         return NULL;
398     }
399
400     trans->session_pool = pool;
401     trans->client_connected = 1;
402     return trans;
403 }
404
405
406 /** 
407  * for each inbound opensrf message:
408  * 1. Stamp the ingress
409  * 2. REQUEST: log it as activity
410  * 3. DISCONNECT: remove the cached recipient
411  * then re-string-ify for xmpp delivery
412  */
413
414 static char* extract_inbound_messages(
415         const request_rec *r, 
416         const char* service, 
417         const char* thread, 
418         const char* recipient, 
419         const jsonObject *osrf_msg) {
420
421     int i;
422     int num_msgs = osrf_msg->size;
423     osrfMessage* msg;
424     osrfMessage* msg_list[num_msgs];
425
426     // here we do an extra json round-trip to get the data
427     // in a form osrf_message_deserialize can understand
428     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
429     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
430     free(osrf_msg_json);
431
432     // should we require the caller to always pass the service?
433     if (service == NULL) service = "";
434
435     for(i = 0; i < num_msgs; i++) {
436         msg = msg_list[i];
437         osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
438
439         switch(msg->m_type) {
440
441             case REQUEST: {
442                 const jsonObject* params = msg->_params;
443                 growing_buffer* act = buffer_init(128);
444                 char* method = msg->method_name;
445                 buffer_fadd(act, "[%s] [%s] %s %s", 
446                     r->connection->remote_ip, "", service, method);
447
448                 const jsonObject* obj = NULL;
449                 int i = 0;
450                 const char* str;
451                 int redactParams = 0;
452                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
453                     if(!strncmp(method, str, strlen(str))) {
454                         redactParams = 1;
455                         break;
456                     }
457                 }
458                 if(redactParams) {
459                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
460                 } else {
461                     i = 0;
462                     while((obj = jsonObjectGetIndex(params, i++))) {
463                         char* str = jsonObjectToJSON(obj);
464                         if( i == 1 )
465                             OSRF_BUFFER_ADD(act, " ");
466                         else
467                             OSRF_BUFFER_ADD(act, ", ");
468                         OSRF_BUFFER_ADD(act, str);
469                         free(str);
470                     }
471                 }
472                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
473                 buffer_free(act);
474                 break;
475             }
476
477             case DISCONNECT:
478                 clear_cached_recipient(thread);
479                 break;
480         }
481     }
482
483     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
484
485     // clean up our messages
486     for(i = 0; i < num_msgs; i++) 
487         osrfMessageFree(msg_list[i]);
488
489     return finalMsg;
490 }
491
492 /**
493  * Parse opensrf request and relay the request to the opensrf network.
494  */
495 static size_t on_message_handler_body(void *data,
496                 const WebSocketServer *server, const int type, 
497                 unsigned char *buffer, const size_t buffer_size) {
498
499     request_rec *r = server->request(server);
500
501     jsonObject *msg_wrapper = NULL; // free me
502     const jsonObject *tmp_obj = NULL;
503     const jsonObject *osrf_msg = NULL;
504     const char *service = NULL;
505     const char *thread = NULL;
506     const char *log_xid = NULL;
507     char *msg_body = NULL;
508     char *recipient = NULL;
509     int i;
510
511     if (buffer_size <= 0) return OK;
512
513     // generate a new log trace for this request. it 
514     // may be replaced by a client-provided trace below.
515     osrfLogMkXid();
516
517     osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
518
519     // buffer may not be \0-terminated, which jsonParse requires
520     char buf[buffer_size + 1];
521     memcpy(buf, buffer, buffer_size);
522     buf[buffer_size] = '\0';
523
524     msg_wrapper = jsonParse(buf);
525
526     if (msg_wrapper == NULL) {
527         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
528         return HTTP_BAD_REQUEST;
529     }
530
531     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
532
533     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
534         service = jsonObjectGetString(tmp_obj);
535
536     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
537         thread = jsonObjectGetString(tmp_obj);
538
539     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
540         log_xid = jsonObjectGetString(tmp_obj);
541
542     if (log_xid) {
543
544         // use the caller-provide log trace id
545         if (strlen(log_xid) > MAX_THREAD_SIZE) {
546             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
547             return HTTP_BAD_REQUEST;
548         }
549
550         osrfLogForceXid(log_xid);
551     }
552
553     if (thread) {
554
555         if (strlen(thread) > MAX_THREAD_SIZE) {
556             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
557             return HTTP_BAD_REQUEST;
558         }
559
560         // since clients can provide their own threads at session start time,
561         // the presence of a thread does not guarantee a cached recipient
562         recipient = (char*) apr_hash_get(
563             trans->session_cache, thread, APR_HASH_KEY_STRING);
564
565         if (recipient) {
566             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
567         }
568     }
569
570     if (!recipient) {
571
572         if (service) {
573             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
574                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
575             recipient_buf[size] = '\0';                                          
576             recipient = recipient_buf;
577
578         } else {
579             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
580             return HTTP_BAD_REQUEST;
581         }
582     }
583
584     osrfLogDebug(OSRF_LOG_MARK, 
585         "WS relaying message to opensrf thread=%s, recipient=%s", 
586             thread, recipient);
587
588     msg_body = extract_inbound_messages(
589         r, service, thread, recipient, osrf_msg);
590
591     transport_message *tmsg = message_init(
592         msg_body, NULL, thread, recipient, NULL);
593
594     message_set_osrf_xid(tmsg, osrfLogGetXid());
595     client_send_message(osrf_handle, tmsg);
596
597
598     osrfLogClearXid();
599     message_free(tmsg);                                                         
600     jsonObjectFree(msg_wrapper);
601     free(msg_body);
602
603     return OK;
604 }
605
606 static size_t CALLBACK on_message_handler(void *data,
607                 const WebSocketServer *server, const int type, 
608                 unsigned char *buffer, const size_t buffer_size) {
609
610     if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
611         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
612         return 1; // TODO: map to apr_status_t value?
613     }
614
615     apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
616
617     if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
618         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
619         return 1;
620     }
621
622     return stat;
623 }
624
625
626 /**
627  * Clear the session cache, release the session pool
628  */
629 void CALLBACK on_disconnect_handler(
630     void *data, const WebSocketServer *server) {
631
632     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
633     trans->client_connected = 0;
634     
635     // ensure no errant session data is sticking around
636     apr_hash_clear(trans->session_cache);
637
638     // strictly speaking, this pool will get destroyed when
639     // r->pool is destroyed, but it doesn't hurt to explicitly
640     // destroy it ourselves.
641     apr_pool_destroy(trans->session_pool);
642     trans->session_pool = NULL;
643
644     request_rec *r = server->request(server);
645
646     osrfLogInfo(OSRF_LOG_MARK, 
647         "WS disconnect from %s", r->connection->remote_ip); 
648         //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
649 }
650
651 /**
652  * Be nice and clean up our mess
653  */
654 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
655     if (trans) {
656         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
657         apr_thread_mutex_destroy(trans->mutex);
658         apr_pool_destroy(trans->session_pool);
659         apr_pool_destroy(trans->main_pool);
660     }
661
662     trans = NULL;
663 }
664
665 static WebSocketPlugin osrf_websocket_plugin = {
666     sizeof(WebSocketPlugin),
667     WEBSOCKET_PLUGIN_VERSION_0,
668     on_destroy_handler,
669     on_connect_handler,
670     on_message_handler,
671     on_disconnect_handler
672 };
673
674 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
675     return &osrf_websocket_plugin;
676 }
677