]> git.evergreen-ils.org Git - working/OpenSRF.git/blob - src/gateway/osrf_websocket_translator.c
LP#1268619: websocket : mutex cleanup
[working/OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * Dumb websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  No attempt is made to understand
21  * the contents of the messages.
22  *
23  * Messages to/from the websocket client take the following form:
24  * {
25  *   "service"  : "opensrf.foo", // required for new sessions (inbound only)
26  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
27  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
28  *   "osrf_msg" : {<osrf_msg>}   // required
29  * }
30  *
31  * Each translator operates with two threads.  One thread receives messages
32  * from the websocket client, translates, and relays them to the opensrf 
33  * network. The second thread collects responses from the opensrf network and 
34  * relays them back to the websocket client.
35  *
36  * The main thread reads from socket A (apache) and writes to socket B 
37  * (openesrf), while the responder thread reads from B and writes to A.  The 
38  * apr data structures used are threadsafe.  For now, no thread mutex's are 
39  * used.
40  *
41  * Note that with a "thread", which allows us to identify the opensrf session,
42  * the caller does not need to provide a recipient address.  The "service" is
43  * only required to start a new opensrf session.  After the sesession is 
44  * started, all future communication is based solely on the thread.  
45  *
46  * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care 
47  * about the contents of the messages.
48  */
49
50 /**
51  * TODO:
52  * short-timeout mode for brick detachment where inactivity timeout drops way 
53  * down for graceful disconnects.
54  */
55
56 #include "httpd.h"
57 #include "apr_strings.h"
58 #include "apr_thread_proc.h"
59 #include "apr_hash.h"
60 #include "websocket_plugin.h"
61 #include "opensrf/log.h"
62 #include "opensrf/osrf_json.h"
63 #include "opensrf/transport_client.h"
64 #include "opensrf/transport_message.h"
65 #include "opensrf/osrf_system.h"                                                
66 #include "opensrf/osrfConfig.h"
67
68 #define MAX_THREAD_SIZE 64
69 #define RECIP_BUF_SIZE 128
70 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
71
72 typedef struct _osrfWebsocketTranslator {
73     const WebSocketServer *server;
74     apr_pool_t *main_pool; // standalone per-process pool
75     apr_pool_t *session_pool; // child of r->pool; per-session
76     apr_hash_t *session_cache; 
77     apr_thread_t *responder_thread;
78     apr_thread_mutex_t *mutex;
79     int client_connected;
80     char* osrf_router;
81     char* osrf_domain;
82 } osrfWebsocketTranslator;
83
84 static osrfWebsocketTranslator *trans = NULL;
85 static transport_client *osrf_handle = NULL;
86 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
87
88
89 void* osrf_responder_thread_main_body(transport_message *tmsg) {
90
91     jsonObject *msg_wrapper;
92     osrfList *msg_list;
93     osrfMessage *one_msg;
94     char *msg_string;
95     int i;
96
97     // discard responses received after client disconnect
98     if (!trans->client_connected) {
99         osrfLogDebug(OSRF_LOG_MARK, 
100             "WS discarding response for thread=%s, xid=%s", 
101             tmsg->thread, tmsg->osrf_xid);
102         return; 
103     }
104
105     osrfLogDebug(OSRF_LOG_MARK, 
106         "WS received opensrf response for thread=%s, xid=%s", 
107             tmsg->thread, tmsg->osrf_xid);
108
109     // build the wrapper object
110     msg_wrapper = jsonNewObject(NULL);
111     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
112     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
113     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
114
115     if (tmsg->is_error) {
116         fprintf(stderr,  
117             "WS received jabber error message in response to thread=%s and xid=%s", 
118             tmsg->thread, tmsg->osrf_xid);
119         fflush(stderr);
120         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
121     }
122
123     msg_string = jsonObjectToJSONRaw(msg_wrapper);
124
125     // deliver the wrapped message json to the websocket client
126     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
127         (unsigned char*) msg_string, strlen(msg_string));
128
129     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
130     for (i = 0; i < msg_list->size; i++) {
131         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
132
133         /*  if our client just successfully connected to an opensrf service,
134             cache the sender so that future calls on this thread will use
135             the correct recipient.  
136             TODO: this cache will grow to add one entry per connected client 
137             session.  Even when entries are removed, they are not cleaned up 
138             until the session_pool is destroyed.  We need to ensure that client 
139             sessions don't last too long and/or create a last-touched timeout 
140             mechanism to periodically remove old entries. */
141         if (one_msg && one_msg->m_type == STATUS && 
142                 one_msg->status_code == OSRF_STATUS_OK) {
143
144             if (!apr_hash_get(trans->session_cache, 
145                     tmsg->thread, APR_HASH_KEY_STRING)) {
146
147                 osrfLogDebug(OSRF_LOG_MARK, 
148                     "WS caching sender thread=%s, sender=%s", 
149                     tmsg->thread, tmsg->sender);
150
151                 apr_hash_set(trans->session_cache, 
152                     apr_pstrdup(trans->session_pool, tmsg->thread),
153                     APR_HASH_KEY_STRING, 
154                     apr_pstrdup(trans->session_pool, tmsg->sender));
155             }
156         }
157     }
158
159     free(msg_string);
160     osrfListFree(msg_list);
161     jsonObjectFree(msg_wrapper);
162 }
163
164 /**
165  * Responder thread main body.
166  * Collects responses from the opensrf network and relays them to the 
167  * websocket caller.
168  */
169 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
170
171     transport_message *tmsg;
172     while (1) {
173
174         if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
175             osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
176             return NULL;
177         }
178
179         // wait for a response
180         tmsg = client_recv(osrf_handle, -1);
181
182         if (!tmsg) continue; // early exit on interrupt
183
184         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
185             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
186             return NULL;
187         }
188
189         osrf_responder_thread_main_body(tmsg);
190         message_free(tmsg);                                                         
191     }
192
193     return NULL;
194 }
195
196
197
198 /**
199  * Allocate the session cache and create the responder thread
200  */
201 int child_init(const WebSocketServer *server) {
202
203     apr_pool_t *pool = NULL;                                                
204     apr_thread_t *thread = NULL;
205     apr_threadattr_t *thread_attr = NULL;
206     apr_thread_mutex_t *mutex = NULL;
207     request_rec *r = server->request(server);
208         
209     osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
210
211     // osrf_handle will already be connected if this is not the first request
212     // served by this process.
213     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
214         char* config_file = "/openils/conf/opensrf_core.xml";
215         char* config_ctx = "gateway"; //TODO config
216         if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {   
217             osrfLogError(OSRF_LOG_MARK, 
218                 "WS unable to bootstrap OpenSRF client with config %s", config_file); 
219             return 1;
220         }
221
222         osrf_handle = osrfSystemGetTransportClient();
223     }
224
225     // create a standalone pool for our translator data
226     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
227         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
228         return 1;
229     }
230
231
232     // allocate our static translator instance
233     trans = (osrfWebsocketTranslator*) 
234         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
235
236     if (trans == NULL) {
237         osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
238         return 1;
239     }
240
241     trans->main_pool = pool;
242     trans->server = server;
243     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
244     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
245
246     // Create the responder thread.  Once created, 
247     // it runs for the lifetime of this process.
248     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
249          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
250          (apr_thread_create(&thread, thread_attr, 
251                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
252
253         trans->responder_thread = thread;
254         
255     } else {
256         osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
257         return 1;
258     }
259
260     if (apr_thread_mutex_create(
261             &mutex, APR_THREAD_MUTEX_UNNESTED, 
262             trans->main_pool) != APR_SUCCESS) {
263         osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
264         return 1;
265     }
266
267     trans->mutex = mutex;
268
269     return APR_SUCCESS;
270 }
271
272 /**
273  * Create the per-client translator
274  */
275 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
276     request_rec *r = server->request(server);
277     apr_pool_t *pool;
278
279     osrfLogDebug(OSRF_LOG_MARK, 
280         "WS connect from %s", r->connection->remote_ip); 
281         //"WS connect from %s", r->connection->client_ip); // apache 2.4
282
283     if (!trans) {
284         if (child_init(server) != APR_SUCCESS) {
285             return NULL;
286         }
287     }
288
289     // create a standalone pool for the session cache values, which will be
290     // destroyed on client disconnect.
291     //if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
292     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
293         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
294         return NULL;
295     }
296
297     trans->session_pool = pool;
298     trans->session_cache = apr_hash_make(trans->session_pool);
299
300     if (trans->session_cache == NULL) {
301         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
302         return NULL;
303     }
304
305     trans->client_connected = 1;
306     return trans;
307 }
308
309
310 /** 
311  * for each inbound opensrf message:
312  * 1. Stamp the ingress
313  * 2. REQUEST: log it as activity
314  * 3. DISCONNECT: remove the cached recipient
315  * then re-string-ify for xmpp delivery
316  */
317
318 static char* extract_inbound_messages(
319         const request_rec *r, 
320         const char* service, 
321         const char* thread, 
322         const char* recipient, 
323         const jsonObject *osrf_msg) {
324
325     int i;
326     int num_msgs = osrf_msg->size;
327     osrfMessage* msg;
328     osrfMessage* msg_list[num_msgs];
329
330     // here we do an extra json round-trip to get the data
331     // in a form osrf_message_deserialize can understand
332     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
333     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
334     free(osrf_msg_json);
335
336     // should we require the caller to always pass the service?
337     if (service == NULL) service = "";
338
339     for(i = 0; i < num_msgs; i++) {
340         msg = msg_list[i];
341         osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
342
343         switch(msg->m_type) {
344
345             case REQUEST: {
346                 const jsonObject* params = msg->_params;
347                 growing_buffer* act = buffer_init(128);
348                 char* method = msg->method_name;
349                 buffer_fadd(act, "[%s] [%s] %s %s", 
350                     r->connection->remote_ip, "", service, method);
351
352                 const jsonObject* obj = NULL;
353                 int i = 0;
354                 const char* str;
355                 int redactParams = 0;
356                 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
357                     if(!strncmp(method, str, strlen(str))) {
358                         redactParams = 1;
359                         break;
360                     }
361                 }
362                 if(redactParams) {
363                     OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
364                 } else {
365                     i = 0;
366                     while((obj = jsonObjectGetIndex(params, i++))) {
367                         char* str = jsonObjectToJSON(obj);
368                         if( i == 1 )
369                             OSRF_BUFFER_ADD(act, " ");
370                         else
371                             OSRF_BUFFER_ADD(act, ", ");
372                         OSRF_BUFFER_ADD(act, str);
373                         free(str);
374                     }
375                 }
376                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
377                 buffer_free(act);
378                 break;
379             }
380
381             case DISCONNECT:
382
383                 if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
384
385                     // this clears the hash value, but the key/value memory 
386                     // will not be cleared until their pool is cleared.
387                     apr_hash_set(trans->session_cache, 
388                         apr_pstrdup(trans->session_pool, thread),
389                         APR_HASH_KEY_STRING, 
390                         apr_pstrdup(trans->session_pool, recipient));
391                 }
392
393         }
394     }
395
396     return osrfMessageSerializeBatch(msg_list, num_msgs);
397 }
398
399 /**
400  * Parse opensrf request and relay the request to the opensrf network.
401  */
402 static size_t on_message_handler_body(void *data,
403                 const WebSocketServer *server, const int type, 
404                 unsigned char *buffer, const size_t buffer_size) {
405
406     request_rec *r = server->request(server);
407
408     jsonObject *msg_wrapper = NULL; // free me
409     const jsonObject *tmp_obj = NULL;
410     const jsonObject *osrf_msg = NULL;
411     const char *service = NULL;
412     const char *thread = NULL;
413     const char *log_xid = NULL;
414     char *msg_body = NULL;
415     char *recipient = NULL;
416     int i;
417
418     if (buffer_size <= 0) return OK;
419
420     osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
421
422     // buffer may not be \0-terminated, which jsonParse requires
423     char buf[buffer_size + 1];
424     memcpy(buf, buffer, buffer_size);
425     buf[buffer_size] = '\0';
426
427     msg_wrapper = jsonParse(buf);
428
429     if (msg_wrapper == NULL) {
430         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
431         return HTTP_BAD_REQUEST;
432     }
433
434     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
435
436     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
437         service = jsonObjectGetString(tmp_obj);
438
439     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
440         thread = jsonObjectGetString(tmp_obj);
441
442     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
443         log_xid = jsonObjectGetString(tmp_obj);
444
445     if (log_xid) {
446
447         // use the caller-provide log trace id
448         if (strlen(log_xid) > MAX_THREAD_SIZE) {
449             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
450             return HTTP_BAD_REQUEST;
451         }
452
453         // TODO: make this work with non-client and make this call accept 
454         // const char*'s.  casting to (char*) for now to silence warnings.
455         osrfLogSetXid((char*) log_xid); 
456
457     } else {
458         // generate a new log trace id for this relay
459         osrfLogMkXid();
460     }
461
462     if (thread) {
463
464         if (strlen(thread) > MAX_THREAD_SIZE) {
465             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
466             return HTTP_BAD_REQUEST;
467         }
468
469         // since clients can provide their own threads at session start time,
470         // the presence of a thread does not guarantee a cached recipient
471         recipient = (char*) apr_hash_get(
472             trans->session_cache, thread, APR_HASH_KEY_STRING);
473
474         if (recipient) {
475             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
476         }
477     }
478
479     if (!recipient) {
480
481         if (service) {
482             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
483                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
484             recipient_buf[size] = '\0';                                          
485             recipient = recipient_buf;
486
487         } else {
488             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
489             return HTTP_BAD_REQUEST;
490         }
491     }
492
493     osrfLogDebug(OSRF_LOG_MARK, 
494         "WS relaying message thread=%s, xid=%s, recipient=%s", 
495             thread, osrfLogGetXid(), recipient);
496
497     msg_body = extract_inbound_messages(
498         r, service, thread, recipient, osrf_msg);
499
500     transport_message *tmsg = message_init(
501         msg_body, NULL, thread, recipient, NULL);
502
503     message_set_osrf_xid(tmsg, osrfLogGetXid());
504     client_send_message(osrf_handle, tmsg);
505
506
507     osrfLogClearXid();
508     message_free(tmsg);                                                         
509     jsonObjectFree(msg_wrapper);
510     free(msg_body);
511
512     return OK;
513 }
514
515 static size_t CALLBACK on_message_handler(void *data,
516                 const WebSocketServer *server, const int type, 
517                 unsigned char *buffer, const size_t buffer_size) {
518
519     if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
520         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
521         return 1; // TODO: map to apr_status_t value?
522     }
523
524     apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
525
526     if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
527         osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
528         return 1;
529     }
530
531     return stat;
532 }
533
534
535 /**
536  * Release all memory allocated from the translator pool and kill the pool.
537  */
538 void CALLBACK on_disconnect_handler(
539     void *data, const WebSocketServer *server) {
540
541     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
542     trans->client_connected = 0;
543
544     apr_hash_clear(trans->session_cache);
545
546     /*
547     It's not necessary to destroy our session_pool, since
548     it's a child of the apache request_rec pool, which is 
549     destroyed after client disconnect.
550     apr_pool_destroy(trans->session_pool);
551     */
552     
553     trans->session_pool = NULL;
554     trans->session_cache = NULL;
555
556     request_rec *r = server->request(server);
557     osrfLogDebug(OSRF_LOG_MARK, 
558         "WS disconnect from %s", r->connection->remote_ip); 
559         //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
560 }
561
562 /**
563  * Be nice and clean up our mess
564  */
565 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
566     if (trans) {
567         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
568         apr_thread_mutex_destroy(trans->mutex);
569         apr_pool_destroy(trans->main_pool);
570     }
571
572     trans = NULL;
573 }
574
575 static WebSocketPlugin osrf_websocket_plugin = {
576     sizeof(WebSocketPlugin),
577     WEBSOCKET_PLUGIN_VERSION_0,
578     on_destroy_handler,
579     on_connect_handler,
580     on_message_handler,
581     on_disconnect_handler
582 };
583
584 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
585     return &osrf_websocket_plugin;
586 }
587