35f986d84d23658e9b5a5cd1903883146fe3a800
[OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * Dumb websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  No attempt is made to understand
21  * the contents of the messages.
22  *
23  * Messages to/from the websocket client take the following form:
24  * {
25  *   "service"  : "opensrf.foo", // required for new sessions (inbound only)
26  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
27  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
28  *   "osrf_msg" : {<osrf_msg>}   // required
29  * }
30  *
31  * Each translator operates with two threads.  One thread receives messages
32  * from the websocket client, translates, and relays them to the opensrf 
33  * network. The second thread collects responses from the opensrf network and 
34  * relays them back to the websocket client.
35  *
36  * The main thread reads from socket A (apache) and writes to socket B 
37  * (openesrf), while the responder thread reads from B and writes to A.  The 
38  * apr data structures used are threadsafe.  For now, no thread mutex's are 
39  * used.
40  *
41  * Note that with a "thread", which allows us to identify the opensrf session,
42  * the caller does not need to provide a recipient address.  The "service" is
43  * only required to start a new opensrf session.  After the sesession is 
44  * started, all future communication is based solely on the thread.  
45  *
46  * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care 
47  * about the contents of the messages.
48  */
49
50 /**
51  * TODO:
52  * short-timeout mode for brick detachment where inactivity timeout drops way 
53  * down for graceful disconnects.
54  */
55
56 #include "httpd.h"
57 #include "apr_strings.h"
58 #include "apr_thread_proc.h"
59 #include "apr_hash.h"
60 #include "websocket_plugin.h"
61 #include "opensrf/log.h"
62 #include "opensrf/osrf_json.h"
63 #include "opensrf/transport_client.h"
64 #include "opensrf/transport_message.h"
65 #include "opensrf/osrf_system.h"                                                
66 #include "opensrf/osrfConfig.h"
67
68 #define MAX_THREAD_SIZE 64
69 #define RECIP_BUF_SIZE 128
70 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
71
72 typedef struct _osrfWebsocketTranslator {
73     const WebSocketServer *server;
74     apr_pool_t *main_pool; // standalone per-process pool
75     apr_pool_t *session_pool; // child of r->pool; per-session
76     apr_hash_t *session_cache; 
77     apr_thread_t *responder_thread;
78     apr_thread_mutex_t *mutex;
79     int client_connected;
80     char* osrf_router;
81     char* osrf_domain;
82 } osrfWebsocketTranslator;
83
84 static osrfWebsocketTranslator *trans = NULL;
85 static transport_client *osrf_handle = NULL;
86 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
87
88
89 static void clear_cached_recipient(const char* thread) {
90     apr_pool_t *pool = NULL;                                                
91
92     if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
93
94         osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
95
96         // remove it from the hash
97         apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
98
99         if (apr_hash_count(trans->session_cache) == 0) {
100             osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
101
102             // memory accumulates in the session_pool as sessions are cached then 
103             // un-cached.  Un-caching removes strings from the hash, but not the 
104             // pool itself.  That only happens when the pool is destroyed. destroy 
105             // the session pool to clear any lingering memory
106             apr_pool_destroy(trans->session_pool);
107     
108             // create a standalone pool for our translator data
109             if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
110                 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
111                 trans->session_pool = NULL;
112                 return;
113             }
114
115             trans->session_pool = pool;
116         }
117     }
118 }
119
120
121
122 void* osrf_responder_thread_main_body(transport_message *tmsg) {
123
124     osrfList *msg_list = NULL;
125     osrfMessage *one_msg = NULL;
126     int i;
127
128     osrfLogDebug(OSRF_LOG_MARK, 
129         "WS received opensrf response for thread=%s, xid=%s", 
130             tmsg->thread, tmsg->osrf_xid);
131
132     // first we need to perform some maintenance
133     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
134
135     for (i = 0; i < msg_list->size; i++) {
136         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
137
138         osrfLogDebug(OSRF_LOG_MARK, 
139             "WS returned response of type %d", one_msg->m_type);
140
141         /*  if our client just successfully connected to an opensrf service,
142             cache the sender so that future calls on this thread will use
143             the correct recipient. */
144         if (one_msg && one_msg->m_type == STATUS) {
145
146
147             // only cache recipients if the client is still connected
148             if (trans->client_connected && 
149                     one_msg->status_code == OSRF_STATUS_OK) {
150
151                 if (!apr_hash_get(trans->session_cache, 
152                         tmsg->thread, APR_HASH_KEY_STRING)) {
153
154                     osrfLogDebug(OSRF_LOG_MARK, 
155                         "WS caching sender thread=%s, sender=%s", 
156                         tmsg->thread, tmsg->sender);
157
158                     apr_hash_set(trans->session_cache, 
159                         apr_pstrdup(trans->session_pool, tmsg->thread),
160                         APR_HASH_KEY_STRING, 
161                         apr_pstrdup(trans->session_pool, tmsg->sender));
162                 }
163
164             } else {
165
166                 // connection timed out; clear the cached recipient
167                 // regardless of whether the client is still connected
168                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
169                     clear_cached_recipient(tmsg->thread);
170             }
171         }
172     }
173
174     // maintenance is done
175     osrfListFree(msg_list);
176
177     if (!trans->client_connected) {
178         // responses received after client disconnect are discarded
179
180         osrfLogDebug(OSRF_LOG_MARK, 
181             "WS discarding response for thread=%s, xid=%s", 
182             tmsg->thread, tmsg->osrf_xid);
183
184         return;
185     }
186
187     
188     // client is still connected; relay the messages to the client
189     jsonObject *msg_wrapper = NULL;
190     char *msg_string = NULL;
191
192     // build the wrapper object
193     msg_wrapper = jsonNewObject(NULL);
194     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
195     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
196     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
197
198     if (tmsg->is_error) {
199         fprintf(stderr,  
200             "WS received jabber error message in response to thread=%s and xid=%s", 
201             tmsg->thread, tmsg->osrf_xid);
202         fflush(stderr);
203         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
204     }
205
206     msg_string = jsonObjectToJSONRaw(msg_wrapper);
207
208     // deliver the wrapped message json to the websocket client
209     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
210         (unsigned char*) msg_string, strlen(msg_string));
211
212     free(msg_string);
213     jsonObjectFree(msg_wrapper);
214
215 }
216
217 /**
218  * Responder thread main body.
219  * Collects responses from the opensrf network and relays them to the 
220  * websocket caller.
221  */
222 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
223
224     transport_message *tmsg;
225     while (1) {
226
227         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
228             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
229             return NULL;
230         }
231
232         // wait for a response
233         tmsg = client_recv(osrf_handle, -1);
234
235         if (!tmsg) continue; // early exit on interrupt
236
237         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
238             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
239             return NULL;
240         }
241
242         osrf_responder_thread_main_body(tmsg);
243         message_free(tmsg);                                                         
244     }
245
246     return NULL;
247 }
248
249
250
251 /**
252  * Allocate the session cache and create the responder thread
253  */
254 int child_init(const WebSocketServer *server) {
255
256     apr_pool_t *pool = NULL;                                                
257     apr_thread_t *thread = NULL;
258     apr_threadattr_t *thread_attr = NULL;
259     apr_thread_mutex_t *mutex = NULL;
260     request_rec *r = server->request(server);
261         
262     osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
263
264     // osrf_handle will already be connected if this is not the first request
265     // served by this process.
266     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
267         char* config_file = "/openils/conf/opensrf_core.xml";
268         char* config_ctx = "gateway"; //TODO config
269         if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {   
270             osrfLogError(OSRF_LOG_MARK, 
271                 "WS unable to bootstrap OpenSRF client with config %s", config_file); 
272             return 1;
273         }
274
275         osrf_handle = osrfSystemGetTransportClient();
276     }
277
278     // create a standalone pool for our translator data
279     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
280         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
281         return 1;
282     }
283
284
285     // allocate our static translator instance
286     trans = (osrfWebsocketTranslator*) 
287         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
288
289     if (trans == NULL) {
290         osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
291         return 1;
292     }
293
294     trans->main_pool = pool;
295     trans->server = server;
296     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
297     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
298
299     trans->session_cache = apr_hash_make(pool);
300
301     if (trans->session_cache == NULL) {
302         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
303         return 1;
304     }
305
306     // Create the responder thread.  Once created, 
307     // it runs for the lifetime of this process.
308     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
309          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
310          (apr_thread_create(&thread, thread_attr, 
311                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
312
313         trans->responder_thread = thread;
314         
315     } else {
316         osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
317         return 1;
318     }
319
320     if (apr_thread_mutex_create(
321             &mutex, APR_THREAD_MUTEX_UNNESTED, 
322             trans->main_pool) != APR_SUCCESS) {
323         osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
324         return 1;
325     }
326
327     trans->mutex = mutex;
328
329     return APR_SUCCESS;
330 }
331
332 /**
333  * Create the per-client translator
334  */
335 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
336     request_rec *r = server->request(server);
337     apr_pool_t *pool;
338
339     osrfLogDebug(OSRF_LOG_MARK, 
340         "WS connect from %s", r->connection->remote_ip); 
341         //"WS connect from %s", r->connection->client_ip); // apache 2.4
342
343     if (!trans) {
344         if (child_init(server) != APR_SUCCESS) {
345             return NULL;
346         }
347     }
348
349     // create a standalone pool for the session cache values
350     // this pool will be destroyed and re-created regularly to 
351     // clear session memory
352     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
353         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
354         return NULL;
355     }
356
357     trans->session_pool = pool;
358     trans->client_connected = 1;
359     return trans;
360 }
361
362
363 /** 
364  * for each inbound opensrf message:
365  * 1. Stamp the ingress
366  * 2. REQUEST: log it as activity
367  * 3. DISCONNECT: remove the cached recipient
368  * then re-string-ify for xmpp delivery
369  */
370
371 static char* extract_inbound_messages(
372         const request_rec *r, 
373         const char* service, 
374         const char* thread, 
375         const char* recipient, 
376         const jsonObject *osrf_msg) {
377
378     int i;
379     int num_msgs = osrf_msg->size;
380     osrfMessage* msg;
381     osrfMessage* msg_list[num_msgs];
382
383     // here we do an extra json round-trip to get the data
384     // in a form osrf_message_deserialize can understand
385     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
386     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
387     free(osrf_msg_json);
388
389     // should we require the caller to always pass the service?
390     if (service == NULL) service = "";
391
392     for(i = 0; i < num_msgs; i++) {
393         msg = msg_list[i];
394         osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
395
396         switch(msg->m_type) {
397
398             case REQUEST: {
399                 const jsonObject* params = msg->_params;
400                 growing_buffer* act = buffer_init(128);
401                 char* method = msg->method_name;
402                 buffer_fadd(act, "[%s] [%s] %s %s", 
403                     r->connection->remote_ip, "", service, method);
404
405                 const jsonObject* obj = NULL;
406                 int i = 0;
407                 const char* str;
408                 int redactParams = 0;
409                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
410                     if(!strncmp(method, str, strlen(str))) {
411                         redactParams = 1;
412                         break;
413                     }
414                 }
415                 if(redactParams) {
416                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
417                 } else {
418                     i = 0;
419                     while((obj = jsonObjectGetIndex(params, i++))) {
420                         char* str = jsonObjectToJSON(obj);
421                         if( i == 1 )
422                             OSRF_BUFFER_ADD(act, " ");
423                         else
424                             OSRF_BUFFER_ADD(act, ", ");
425                         OSRF_BUFFER_ADD(act, str);
426                         free(str);
427                     }
428                 }
429                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
430                 buffer_free(act);
431                 break;
432             }
433
434             case DISCONNECT:
435                 clear_cached_recipient(thread);
436                 break;
437         }
438     }
439
440     return osrfMessageSerializeBatch(msg_list, num_msgs);
441 }
442
443 /**
444  * Parse opensrf request and relay the request to the opensrf network.
445  */
446 static size_t on_message_handler_body(void *data,
447                 const WebSocketServer *server, const int type, 
448                 unsigned char *buffer, const size_t buffer_size) {
449
450     request_rec *r = server->request(server);
451
452     jsonObject *msg_wrapper = NULL; // free me
453     const jsonObject *tmp_obj = NULL;
454     const jsonObject *osrf_msg = NULL;
455     const char *service = NULL;
456     const char *thread = NULL;
457     const char *log_xid = NULL;
458     char *msg_body = NULL;
459     char *recipient = NULL;
460     int i;
461
462     if (buffer_size <= 0) return OK;
463
464     osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
465
466     // buffer may not be \0-terminated, which jsonParse requires
467     char buf[buffer_size + 1];
468     memcpy(buf, buffer, buffer_size);
469     buf[buffer_size] = '\0';
470
471     msg_wrapper = jsonParse(buf);
472
473     if (msg_wrapper == NULL) {
474         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
475         return HTTP_BAD_REQUEST;
476     }
477
478     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
479
480     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
481         service = jsonObjectGetString(tmp_obj);
482
483     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
484         thread = jsonObjectGetString(tmp_obj);
485
486     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
487         log_xid = jsonObjectGetString(tmp_obj);
488
489     if (log_xid) {
490
491         // use the caller-provide log trace id
492         if (strlen(log_xid) > MAX_THREAD_SIZE) {
493             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
494             return HTTP_BAD_REQUEST;
495         }
496
497         // TODO: make this work with non-client and make this call accept 
498         // const char*'s.  casting to (char*) for now to silence warnings.
499         osrfLogSetXid((char*) log_xid); 
500
501     } else {
502         // generate a new log trace id for this relay
503         osrfLogMkXid();
504     }
505
506     if (thread) {
507
508         if (strlen(thread) > MAX_THREAD_SIZE) {
509             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
510             return HTTP_BAD_REQUEST;
511         }
512
513         // since clients can provide their own threads at session start time,
514         // the presence of a thread does not guarantee a cached recipient
515         recipient = (char*) apr_hash_get(
516             trans->session_cache, thread, APR_HASH_KEY_STRING);
517
518         if (recipient) {
519             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
520         }
521     }
522
523     if (!recipient) {
524
525         if (service) {
526             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
527                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
528             recipient_buf[size] = '\0';                                          
529             recipient = recipient_buf;
530
531         } else {
532             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
533             return HTTP_BAD_REQUEST;
534         }
535     }
536
537     osrfLogDebug(OSRF_LOG_MARK, 
538         "WS relaying message thread=%s, xid=%s, recipient=%s", 
539             thread, osrfLogGetXid(), recipient);
540
541     msg_body = extract_inbound_messages(
542         r, service, thread, recipient, osrf_msg);
543
544     transport_message *tmsg = message_init(
545         msg_body, NULL, thread, recipient, NULL);
546
547     message_set_osrf_xid(tmsg, osrfLogGetXid());
548     client_send_message(osrf_handle, tmsg);
549
550
551     osrfLogClearXid();
552     message_free(tmsg);                                                         
553     jsonObjectFree(msg_wrapper);
554     free(msg_body);
555
556     return OK;
557 }
558
559 static size_t CALLBACK on_message_handler(void *data,
560                 const WebSocketServer *server, const int type, 
561                 unsigned char *buffer, const size_t buffer_size) {
562
563     if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
564         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
565         return 1; // TODO: map to apr_status_t value?
566     }
567
568     apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
569
570     if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
571         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
572         return 1;
573     }
574
575     return stat;
576 }
577
578
579 /**
580  * Release all memory allocated from the translator pool and kill the pool.
581  */
582 void CALLBACK on_disconnect_handler(
583     void *data, const WebSocketServer *server) {
584
585     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
586     trans->client_connected = 0;
587
588     /*
589     It's not necessary to destroy our session_pool, since
590     it's a child of the apache request_rec pool, which is 
591     destroyed after client disconnect.
592     apr_pool_destroy(trans->session_pool);
593     */
594     
595     trans->session_pool = NULL;
596
597     request_rec *r = server->request(server);
598     osrfLogDebug(OSRF_LOG_MARK, 
599         "WS disconnect from %s", r->connection->remote_ip); 
600         //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
601 }
602
603 /**
604  * Be nice and clean up our mess
605  */
606 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
607     if (trans) {
608         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
609         apr_thread_mutex_destroy(trans->mutex);
610         apr_pool_destroy(trans->main_pool);
611     }
612
613     trans = NULL;
614 }
615
616 static WebSocketPlugin osrf_websocket_plugin = {
617     sizeof(WebSocketPlugin),
618     WEBSOCKET_PLUGIN_VERSION_0,
619     on_destroy_handler,
620     on_connect_handler,
621     on_message_handler,
622     on_disconnect_handler
623 };
624
625 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
626     return &osrf_websocket_plugin;
627 }
628