LP#1268619: websockets; free temporary osrf msgs; minor comment change
[OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  Messages are analyzed to determine
21  * when a connect/disconnect occurs, so that the cache of recipients can be
22  * properly managed.  We also activity-log REQUEST messages.
23  *
24  * Messages to/from the websocket client take the following form:
25  * {
26  *   "service"  : "opensrf.foo", // required
27  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
28  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
29  *   "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...]   // required
30  * }
31  *
32  * Each translator operates with two threads.  One thread receives messages
33  * from the websocket client, translates, and relays them to the opensrf 
34  * network. The second thread collects responses from the opensrf network and 
35  * relays them back to the websocket client.
36  *
37  * After the initial setup, all thread actions occur within a thread mutex.
38  * The desired affect is a non-threaded application that uses threads for 
39  * the sole purpose of having one thread listening for incoming data, while
40  * a second thread listens for responses.  When either thread awakens, it's
41  * the only thread in town until it goes back to sleep (i.e. listening on 
42  * its socket for data).
43  *
44  * Note that with a "thread", which allows us to identify the opensrf session,
45  * the caller does not need to provide a recipient address.  The "service" is
46  * only required to start a new opensrf session.  After the sesession is 
47  * started, all future communication is based solely on the thread.  However,
48  * the "service" should be passed by the caller for all requests to ensure it
49  * is properly logged in the activity log.
50  */
51
52 /**
53  * TODO:
54  * short-timeout mode for brick detachment where inactivity timeout drops way 
55  * down for graceful disconnects.
56  */
57
58 #include "httpd.h"
59 #include "apr_strings.h"
60 #include "apr_thread_proc.h"
61 #include "apr_hash.h"
62 #include "websocket_plugin.h"
63 #include "opensrf/log.h"
64 #include "opensrf/osrf_json.h"
65 #include "opensrf/transport_client.h"
66 #include "opensrf/transport_message.h"
67 #include "opensrf/osrf_system.h"                                                
68 #include "opensrf/osrfConfig.h"
69
70 #define MAX_THREAD_SIZE 64
71 #define RECIP_BUF_SIZE 128
72 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
73
74 typedef struct _osrfWebsocketTranslator {
75
76     /** Our handle for communicating with the caller */
77     const WebSocketServer *server;
78     
79     /**
80      * Standalone, per-process APR pool.  Primarily
81      * there for managing thread data, which lasts 
82      * the duration of the process.
83      */
84     apr_pool_t *main_pool;
85
86     /**
87      * Map of thread => drone-xmpp-address.  Maintaining this
88      * map internally means the caller never need know about
89      * internal XMPP addresses and the server doesn't have to 
90      * verify caller-specified recipient addresses.  It's
91      * all managed internally.
92      */
93     apr_hash_t *session_cache; 
94
95     /**
96      * session_pool contains the key/value pairs stored in
97      * the session_cache.  The pool is regularly destroyed
98      * and re-created to avoid long-term memory consumption
99      */
100     apr_pool_t *session_pool;
101
102     /**
103      * Thread responsible for collecting responses on the opensrf
104      * network and relaying them back to the caller
105      */
106     apr_thread_t *responder_thread;
107
108     /**
109      * All message handling code is wrapped in a thread mutex such
110      * that all actions (after the initial setup) are serialized
111      * to minimize the possibility of multi-threading snafus.
112      */
113     apr_thread_mutex_t *mutex;
114
115     /**
116      * True if a websocket client is currently connected
117      */
118     int client_connected;
119
120     /** OpenSRF jouter name */
121     char* osrf_router;
122
123     /** OpenSRF domain */
124     char* osrf_domain;
125
126 } osrfWebsocketTranslator;
127
128 static osrfWebsocketTranslator *trans = NULL;
129 static transport_client *osrf_handle = NULL;
130 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
131
132 static void clear_cached_recipient(const char* thread) {
133     apr_pool_t *pool = NULL;                                                
134
135     if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
136
137         osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
138
139         // remove it from the hash
140         apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
141
142         if (apr_hash_count(trans->session_cache) == 0) {
143             osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
144
145             // memory accumulates in the session_pool as sessions are cached then 
146             // un-cached.  Un-caching removes strings from the hash, but not the 
147             // pool itself.  That only happens when the pool is destroyed. Here
148             // we destroy the session pool to clear any lingering memory, then
149             // re-create it for future caching.
150             apr_pool_destroy(trans->session_pool);
151     
152             if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
153                 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
154                 trans->session_pool = NULL;
155                 return;
156             }
157
158             trans->session_pool = pool;
159         }
160     }
161 }
162
163
164 void* osrf_responder_thread_main_body(transport_message *tmsg) {
165
166     osrfList *msg_list = NULL;
167     osrfMessage *one_msg = NULL;
168     int i;
169
170     osrfLogDebug(OSRF_LOG_MARK, 
171         "WS received opensrf response for thread=%s, xid=%s", 
172             tmsg->thread, tmsg->osrf_xid);
173
174     // first we need to perform some maintenance
175     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
176
177     for (i = 0; i < msg_list->size; i++) {
178         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
179
180         osrfLogDebug(OSRF_LOG_MARK, 
181             "WS returned response of type %d", one_msg->m_type);
182
183         /*  if our client just successfully connected to an opensrf service,
184             cache the sender so that future calls on this thread will use
185             the correct recipient. */
186         if (one_msg && one_msg->m_type == STATUS) {
187
188
189             // only cache recipients if the client is still connected
190             if (trans->client_connected && 
191                     one_msg->status_code == OSRF_STATUS_OK) {
192
193                 if (!apr_hash_get(trans->session_cache, 
194                         tmsg->thread, APR_HASH_KEY_STRING)) {
195
196                     osrfLogDebug(OSRF_LOG_MARK, 
197                         "WS caching sender thread=%s, sender=%s", 
198                         tmsg->thread, tmsg->sender);
199
200                     apr_hash_set(trans->session_cache, 
201                         apr_pstrdup(trans->session_pool, tmsg->thread),
202                         APR_HASH_KEY_STRING, 
203                         apr_pstrdup(trans->session_pool, tmsg->sender));
204                 }
205
206             } else {
207
208                 // connection timed out; clear the cached recipient
209                 // regardless of whether the client is still connected
210                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
211                     clear_cached_recipient(tmsg->thread);
212             }
213         }
214     }
215
216     // maintenance is done
217     msg_list->freeItem = osrfMessageFree;
218     osrfListFree(msg_list);
219
220     if (!trans->client_connected) {
221
222         osrfLogDebug(OSRF_LOG_MARK, 
223             "WS discarding response for thread=%s, xid=%s", 
224             tmsg->thread, tmsg->osrf_xid);
225
226         return;
227     }
228
229     
230     // client is still connected. 
231     // relay the response messages to the client
232     jsonObject *msg_wrapper = NULL;
233     char *msg_string = NULL;
234
235     // build the wrapper object
236     msg_wrapper = jsonNewObject(NULL);
237     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
238     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
239     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
240
241     if (tmsg->is_error) {
242         osrfLogError(OSRF_LOG_MARK, 
243             "WS received jabber error message in response to thread=%s and xid=%s", 
244             tmsg->thread, tmsg->osrf_xid);
245         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
246     }
247
248     msg_string = jsonObjectToJSONRaw(msg_wrapper);
249
250     // drop the JSON on the outbound wire
251     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
252         (unsigned char*) msg_string, strlen(msg_string));
253
254     free(msg_string);
255     jsonObjectFree(msg_wrapper);
256
257 }
258
259 /**
260  * Responder thread main body.
261  * Collects responses from the opensrf network and relays them to the 
262  * websocket caller.
263  */
264 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
265
266     transport_message *tmsg;
267     while (1) {
268
269         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
270             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
271             return NULL;
272         }
273
274         // wait for a response
275         tmsg = client_recv(osrf_handle, -1);
276
277         if (!tmsg) continue; // early exit on interrupt
278
279         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
280             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
281             return NULL;
282         }
283
284         osrf_responder_thread_main_body(tmsg);
285         message_free(tmsg);                                                         
286     }
287
288     return NULL;
289 }
290
291
292
293 /**
294  * Connect to OpenSRF, create the main pool, responder thread
295  * session cache and session pool.
296  */
297 int child_init(const WebSocketServer *server) {
298
299     apr_pool_t *pool = NULL;                                                
300     apr_thread_t *thread = NULL;
301     apr_threadattr_t *thread_attr = NULL;
302     apr_thread_mutex_t *mutex = NULL;
303     request_rec *r = server->request(server);
304         
305     osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
306
307     // osrf_handle will already be connected if this is not the first request
308     // served by this process.
309     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
310         char* config_file = "/openils/conf/opensrf_core.xml";
311         char* config_ctx = "gateway"; //TODO config
312         if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {   
313             osrfLogError(OSRF_LOG_MARK, 
314                 "WS unable to bootstrap OpenSRF client with config %s", config_file); 
315             return 1;
316         }
317
318         osrf_handle = osrfSystemGetTransportClient();
319     }
320
321     // create a standalone pool for our translator data
322     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
323         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
324         return 1;
325     }
326
327
328     // allocate our static translator instance
329     trans = (osrfWebsocketTranslator*) 
330         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
331
332     if (trans == NULL) {
333         osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
334         return 1;
335     }
336
337     trans->main_pool = pool;
338     trans->server = server;
339     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
340     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
341
342     trans->session_cache = apr_hash_make(pool);
343
344     if (trans->session_cache == NULL) {
345         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
346         return 1;
347     }
348
349     // Create the responder thread.  Once created, 
350     // it runs for the lifetime of this process.
351     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
352          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
353          (apr_thread_create(&thread, thread_attr, 
354                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
355
356         trans->responder_thread = thread;
357         
358     } else {
359         osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
360         return 1;
361     }
362
363     if (apr_thread_mutex_create(
364             &mutex, APR_THREAD_MUTEX_UNNESTED, 
365             trans->main_pool) != APR_SUCCESS) {
366         osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
367         return 1;
368     }
369
370     trans->mutex = mutex;
371
372     return APR_SUCCESS;
373 }
374
375 /**
376  * Create the per-client translator
377  */
378 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
379     request_rec *r = server->request(server);
380     apr_pool_t *pool;
381
382     osrfLogDebug(OSRF_LOG_MARK, 
383         "WS connect from %s", r->connection->remote_ip); 
384         //"WS connect from %s", r->connection->client_ip); // apache 2.4
385
386     if (!trans) {
387         // first connection
388         if (child_init(server) != APR_SUCCESS) {
389             return NULL;
390         }
391     }
392
393     // create a standalone pool for the session cache values
394     // this pool will be destroyed and re-created regularly
395     // to clear session memory
396     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
397         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
398         return NULL;
399     }
400
401     trans->session_pool = pool;
402     trans->client_connected = 1;
403     return trans;
404 }
405
406
407 /** 
408  * for each inbound opensrf message:
409  * 1. Stamp the ingress
410  * 2. REQUEST: log it as activity
411  * 3. DISCONNECT: remove the cached recipient
412  * then re-string-ify for xmpp delivery
413  */
414
415 static char* extract_inbound_messages(
416         const request_rec *r, 
417         const char* service, 
418         const char* thread, 
419         const char* recipient, 
420         const jsonObject *osrf_msg) {
421
422     int i;
423     int num_msgs = osrf_msg->size;
424     osrfMessage* msg;
425     osrfMessage* msg_list[num_msgs];
426
427     // here we do an extra json round-trip to get the data
428     // in a form osrf_message_deserialize can understand
429     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
430     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
431     free(osrf_msg_json);
432
433     // should we require the caller to always pass the service?
434     if (service == NULL) service = "";
435
436     for(i = 0; i < num_msgs; i++) {
437         msg = msg_list[i];
438         osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
439
440         switch(msg->m_type) {
441
442             case REQUEST: {
443                 const jsonObject* params = msg->_params;
444                 growing_buffer* act = buffer_init(128);
445                 char* method = msg->method_name;
446                 buffer_fadd(act, "[%s] [%s] %s %s", 
447                     r->connection->remote_ip, "", service, method);
448
449                 const jsonObject* obj = NULL;
450                 int i = 0;
451                 const char* str;
452                 int redactParams = 0;
453                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
454                     if(!strncmp(method, str, strlen(str))) {
455                         redactParams = 1;
456                         break;
457                     }
458                 }
459                 if(redactParams) {
460                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
461                 } else {
462                     i = 0;
463                     while((obj = jsonObjectGetIndex(params, i++))) {
464                         char* str = jsonObjectToJSON(obj);
465                         if( i == 1 )
466                             OSRF_BUFFER_ADD(act, " ");
467                         else
468                             OSRF_BUFFER_ADD(act, ", ");
469                         OSRF_BUFFER_ADD(act, str);
470                         free(str);
471                     }
472                 }
473                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
474                 buffer_free(act);
475                 break;
476             }
477
478             case DISCONNECT:
479                 clear_cached_recipient(thread);
480                 break;
481         }
482
483         osrfMessageFree(msg);
484     }
485
486     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
487
488     // clean up our messages
489     for(i = 0; i < num_msgs; i++) 
490         osrfMessageFree(msg_list[i]);
491
492     return finalMsg;
493 }
494
495 /**
496  * Parse opensrf request and relay the request to the opensrf network.
497  */
498 static size_t on_message_handler_body(void *data,
499                 const WebSocketServer *server, const int type, 
500                 unsigned char *buffer, const size_t buffer_size) {
501
502     request_rec *r = server->request(server);
503
504     jsonObject *msg_wrapper = NULL; // free me
505     const jsonObject *tmp_obj = NULL;
506     const jsonObject *osrf_msg = NULL;
507     const char *service = NULL;
508     const char *thread = NULL;
509     const char *log_xid = NULL;
510     char *msg_body = NULL;
511     char *recipient = NULL;
512     int i;
513
514     if (buffer_size <= 0) return OK;
515
516     osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
517
518     // buffer may not be \0-terminated, which jsonParse requires
519     char buf[buffer_size + 1];
520     memcpy(buf, buffer, buffer_size);
521     buf[buffer_size] = '\0';
522
523     msg_wrapper = jsonParse(buf);
524
525     if (msg_wrapper == NULL) {
526         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
527         return HTTP_BAD_REQUEST;
528     }
529
530     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
531
532     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
533         service = jsonObjectGetString(tmp_obj);
534
535     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
536         thread = jsonObjectGetString(tmp_obj);
537
538     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
539         log_xid = jsonObjectGetString(tmp_obj);
540
541     if (log_xid) {
542
543         // use the caller-provide log trace id
544         if (strlen(log_xid) > MAX_THREAD_SIZE) {
545             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
546             return HTTP_BAD_REQUEST;
547         }
548
549         // TODO: make this work with non-client and make this call accept 
550         // const char*'s.  casting to (char*) for now to silence warnings.
551         osrfLogSetXid((char*) log_xid); 
552
553     } else {
554         // generate a new log trace id for this relay
555         osrfLogMkXid();
556     }
557
558     if (thread) {
559
560         if (strlen(thread) > MAX_THREAD_SIZE) {
561             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
562             return HTTP_BAD_REQUEST;
563         }
564
565         // since clients can provide their own threads at session start time,
566         // the presence of a thread does not guarantee a cached recipient
567         recipient = (char*) apr_hash_get(
568             trans->session_cache, thread, APR_HASH_KEY_STRING);
569
570         if (recipient) {
571             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
572         }
573     }
574
575     if (!recipient) {
576
577         if (service) {
578             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
579                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
580             recipient_buf[size] = '\0';                                          
581             recipient = recipient_buf;
582
583         } else {
584             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
585             return HTTP_BAD_REQUEST;
586         }
587     }
588
589     osrfLogDebug(OSRF_LOG_MARK, 
590         "WS relaying message thread=%s, xid=%s, recipient=%s", 
591             thread, osrfLogGetXid(), recipient);
592
593     msg_body = extract_inbound_messages(
594         r, service, thread, recipient, osrf_msg);
595
596     transport_message *tmsg = message_init(
597         msg_body, NULL, thread, recipient, NULL);
598
599     message_set_osrf_xid(tmsg, osrfLogGetXid());
600     client_send_message(osrf_handle, tmsg);
601
602
603     osrfLogClearXid();
604     message_free(tmsg);                                                         
605     jsonObjectFree(msg_wrapper);
606     free(msg_body);
607
608     return OK;
609 }
610
611 static size_t CALLBACK on_message_handler(void *data,
612                 const WebSocketServer *server, const int type, 
613                 unsigned char *buffer, const size_t buffer_size) {
614
615     if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
616         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
617         return 1; // TODO: map to apr_status_t value?
618     }
619
620     apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
621
622     if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
623         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
624         return 1;
625     }
626
627     return stat;
628 }
629
630
631 /**
632  * Clear the session cache, release the session pool
633  */
634 void CALLBACK on_disconnect_handler(
635     void *data, const WebSocketServer *server) {
636
637     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
638     trans->client_connected = 0;
639     
640     // ensure no errant session data is sticking around
641     apr_hash_clear(trans->session_cache);
642
643     // strictly speaking, this pool will get destroyed when
644     // r->pool is destroyed, but it doesn't hurt to explicitly
645     // destroy it ourselves.
646     apr_pool_destroy(trans->session_pool);
647     trans->session_pool = NULL;
648
649     request_rec *r = server->request(server);
650
651     osrfLogDebug(OSRF_LOG_MARK, 
652         "WS disconnect from %s", r->connection->remote_ip); 
653         //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
654 }
655
656 /**
657  * Be nice and clean up our mess
658  */
659 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
660     if (trans) {
661         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
662         apr_thread_mutex_destroy(trans->mutex);
663         apr_pool_destroy(trans->session_pool);
664         apr_pool_destroy(trans->main_pool);
665     }
666
667     trans = NULL;
668 }
669
670 static WebSocketPlugin osrf_websocket_plugin = {
671     sizeof(WebSocketPlugin),
672     WEBSOCKET_PLUGIN_VERSION_0,
673     on_destroy_handler,
674     on_connect_handler,
675     on_message_handler,
676     on_disconnect_handler
677 };
678
679 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
680     return &osrf_websocket_plugin;
681 }
682