LP#1268619: Apache websocket translator module
[OpenSRF.git] / src / gateway / osrf_websocket_translator.c
1 /* -----------------------------------------------------------------------
2  * Copyright 2012 Equinox Software, Inc.
3  * Bill Erickson <berick@esilibrary.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  * 
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * -----------------------------------------------------------------------
15  */
16
17 /**
18  * Dumb websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
19  * and relayed to the opensrf network.  Responses are pulled from the opensrf
20  * network and passed back to the client.  No attempt is made to understand
21  * the contents of the messages.
22  *
23  * Messages to/from the websocket client take the following form:
24  * {
25  *   "service"  : "opensrf.foo", // required for new sessions (inbound only)
26  *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
27  *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
28  *   "osrf_msg" : {<osrf_msg>}   // required
29  * }
30  *
31  * Each translator operates with two threads.  One thread receives messages
32  * from the websocket client, translates, and relays them to the opensrf 
33  * network. The second thread collects responses from the opensrf network and 
34  * relays them back to the websocket client.
35  *
36  * The main thread reads from socket A (apache) and writes to socket B 
37  * (openesrf), while the responder thread reads from B and writes to A.  The 
38  * apr data structures used are threadsafe.  For now, no thread mutex's are 
39  * used.
40  *
41  * Note that with a "thread", which allows us to identify the opensrf session,
42  * the caller does not need to provide a recipient address.  The "service" is
43  * only required to start a new opensrf session.  After the sesession is 
44  * started, all future communication is based solely on the thread.  
45  *
46  * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care 
47  * about the contents of the messages.
48  */
49
50 /**
51  * TODO:
52  * short-timeout mode for brick detachment where inactivity timeout drops way 
53  * down for graceful disconnects.
54  */
55
56 #include "httpd.h"
57 #include "http_log.h"
58 #include "http_log.h"
59 #include "apr_strings.h"
60 #include "apr_thread_proc.h"
61 #include "apr_hash.h"
62 #include "websocket_plugin.h"
63 #include "opensrf/osrf_json.h"
64 #include "opensrf/transport_client.h"
65 #include "opensrf/transport_message.h"
66 #include "opensrf/osrf_system.h"                                                
67 #include "opensrf/osrfConfig.h"
68
69 #define MAX_THREAD_SIZE 64
70 #define RECIP_BUF_SIZE 128
71 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
72 static transport_client *osrf_handle = NULL;
73
74 typedef struct _osrfWebsocketTranslator {
75     const WebSocketServer *server;
76     apr_pool_t *main_pool; // standline per-process pool
77     apr_pool_t *session_pool; // child of trans->main_pool; per-session
78     apr_hash_t *session_cache; 
79     apr_thread_t *responder_thread;
80     int client_connected;
81     char* osrf_router;
82     char* osrf_domain;
83 } osrfWebsocketTranslator;
84
85 static osrfWebsocketTranslator *trans = NULL;
86
87
88 /**
89  * Responder thread main body.
90  * Collects responses from the opensrf network and relays them to the 
91  * websocket caller.
92  */
93 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
94
95     request_rec *r = trans->server->request(trans->server);
96     jsonObject *msg_wrapper;
97     char *msg_string;
98
99     while (1) {
100
101         transport_message *msg = client_recv(osrf_handle, -1);
102         if (!msg) continue; // early exit on interrupt
103         
104         // discard responses received after client disconnect
105         if (!trans->client_connected) {
106             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
107                 "WS discarding response for thread=%s, xid=%s", 
108                     msg->thread, msg->osrf_xid);
109             message_free(msg);                                                         
110             continue; 
111         }
112
113         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
114             "WS received opensrf response for thread=%s, xid=%s", 
115                 msg->thread, msg->osrf_xid);
116
117         // build the wrapper object
118         msg_wrapper = jsonNewObject(NULL);
119         jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread));
120         jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid));
121         jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body));
122
123         if (msg->is_error) {
124             ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
125                 "WS received jabber error message in response to thread=%s and xid=%s", 
126                     msg->thread, msg->osrf_xid);
127             jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
128         }
129
130         msg_string = jsonObjectToJSONRaw(msg_wrapper);
131
132         // deliver the wrapped message json to the websocket client
133         trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
134             (unsigned char*) msg_string, strlen(msg_string));
135
136         // capture the true message sender
137         // TODO: this will grow to add one entry per client session.  
138         // need a last-touched timeout mechanism to periodically remove old entries
139         if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) {
140
141             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
142                 "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender);
143
144             apr_hash_set(trans->session_cache, 
145                 apr_pstrdup(trans->session_pool, msg->thread),
146                 APR_HASH_KEY_STRING, 
147                 apr_pstrdup(trans->session_pool, msg->sender));
148         }
149
150         free(msg_string);
151         jsonObjectFree(msg_wrapper);
152         message_free(msg);                                                         
153     }
154
155     return NULL;
156 }
157
158 /**
159  * Allocate the session cache and create the responder thread
160  */
161 int child_init(const WebSocketServer *server) {
162
163     apr_pool_t *pool = NULL;                                                
164     apr_thread_t *thread = NULL;
165     apr_threadattr_t *thread_attr = NULL;
166     request_rec *r = server->request(server);
167         
168     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS child_init");
169
170     // osrf_handle will already be connected if this is not the first request
171     // served by this process.
172     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
173         char* config_file = "/openils/conf/opensrf_core.xml";
174         char* config_ctx = "gateway"; //TODO config
175         if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {   
176             ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,                              
177                 "WS unable to bootstrap OpenSRF client with config %s", config_file); 
178             return 1;
179         }
180
181         osrf_handle = osrfSystemGetTransportClient();
182     }
183
184     // create a standalone pool for our translator data
185     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
186         ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create apr_pool");
187         return 1;
188     }
189
190
191     // allocate our static translator instance
192     trans = (osrfWebsocketTranslator*) 
193         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
194
195     if (trans == NULL) {
196         ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create translator");
197         return 1;
198     }
199
200     trans->main_pool = pool;
201     trans->server = server;
202     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
203     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
204
205     // Create the responder thread.  Once created, it runs for the lifetime
206     // of this process.
207     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
208          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
209          (apr_thread_create(&thread, thread_attr, 
210                 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
211
212         trans->responder_thread = thread;
213         
214     } else {
215         ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
216             "WS unable to create responder thread");
217         return 1;
218     }
219
220     return APR_SUCCESS;
221 }
222
223 /**
224  * Create the per-client translator
225  */
226 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
227     request_rec *r = server->request(server);
228     apr_pool_t *pool;
229
230     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
231         "WS connect from %s", r->connection->remote_ip);
232
233     if (!trans) {
234         if (child_init(server) != APR_SUCCESS) {
235             return NULL;
236         }
237     }
238
239     // create a standalone pool for the session cache values, which will be
240     // destroyed on client disconnect.
241     if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
242         ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
243             "WS Unable to create apr_pool");
244         return NULL;
245     }
246
247     trans->session_pool = pool;
248     trans->session_cache = apr_hash_make(trans->session_pool);
249
250     ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
251         "WS created new pool %x", trans->session_pool);
252
253     if (trans->session_cache == NULL) {
254         ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
255             "WS unable to create session cache");
256         return NULL;
257     }
258
259     ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
260         "WS created new hash %x", trans->session_cache);
261
262     trans->client_connected = 1;
263     return trans;
264 }
265
266
267
268 /**
269  * Parse opensrf request and relay the request to the opensrf network.
270  */
271 static size_t CALLBACK on_message_handler(void *data,
272                 const WebSocketServer *server, const int type, 
273                 unsigned char *buffer, const size_t buffer_size) {
274
275     request_rec *r = server->request(server);
276
277     jsonObject *msg_wrapper = NULL; // free me
278     const jsonObject *tmp_obj = NULL;
279     const jsonObject *osrf_msg = NULL;
280     const char *service = NULL;
281     const char *thread = NULL;
282     const char *log_xid = NULL;
283     char *msg_body = NULL;
284     char *recipient = NULL;
285
286     if (buffer_size <= 0) return OK;
287
288     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
289         "WS received message size=%d", buffer_size);
290
291     // buffer may not be \0-terminated, which jsonParse requires
292     char buf[buffer_size + 1];
293     memcpy(buf, buffer, buffer_size);
294     buf[buffer_size] = '\0';
295
296     msg_wrapper = jsonParseRaw(buf);
297
298     if (msg_wrapper == NULL) {
299         ap_log_rerror(APLOG_MARK, 
300             APLOG_NOTICE, 0, r, "WS Invalid JSON: %s", buf);
301         return HTTP_BAD_REQUEST;
302     }
303
304     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
305
306     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
307         service = jsonObjectGetString(tmp_obj);
308
309     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
310         thread = jsonObjectGetString(tmp_obj);
311
312     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
313         log_xid = jsonObjectGetString(tmp_obj);
314
315     if (log_xid) {
316         // use the caller-provide log trace id
317         if (strlen(log_xid) > MAX_THREAD_SIZE) {
318             ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
319                 0, r, "WS log_xid exceeds max length");
320             return HTTP_BAD_REQUEST;
321         }
322         osrfLogSetXid(log_xid); // TODO: make with with non-client
323     } else {
324         // generate a new log trace id for this relay
325         osrfLogMkXid();
326     }
327
328     if (thread) {
329
330         if (strlen(thread) > MAX_THREAD_SIZE) {
331             ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
332                 0, r, "WS thread exceeds max length");
333             return HTTP_BAD_REQUEST;
334         }
335
336         // since clients can provide their own threads at session start time,
337         // the presence of a thread does not guarantee a cached recipient
338         recipient = (char*) apr_hash_get(
339             trans->session_cache, thread, APR_HASH_KEY_STRING);
340
341         if (recipient) {
342             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
343                 "WS found cached recipient %s", recipient);
344         }
345     }
346
347     if (!recipient) {
348
349         if (service) {
350             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
351                 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
352             recipient_buf[size] = '\0';                                          
353             recipient = recipient_buf;
354
355         } else {
356             ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
357                 0, r, "WS Unable to determine recipient");
358             return HTTP_BAD_REQUEST;
359         }
360     }
361
362     // TODO: activity log entry? -- requires message analysis
363     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
364         "WS relaying message thread=%s, xid=%s, recipient=%s", 
365             thread, osrfLogGetXid(), recipient);
366
367     msg_body = jsonObjectToJSONRaw(osrf_msg);
368
369     transport_message *tmsg = message_init(
370         msg_body, NULL, thread, recipient, NULL);
371
372     message_set_osrf_xid(tmsg, osrfLogGetXid());                                
373     client_send_message(osrf_handle, tmsg);                                   
374     osrfLogClearXid();
375
376     message_free(tmsg);                                                         
377     free(msg_wrapper);
378     free(msg_body);
379
380     return OK;
381 }
382
383
384 /**
385  * Release all memory allocated from the translator pool and kill the pool.
386  */
387 void CALLBACK on_disconnect_handler(
388     void *data, const WebSocketServer *server) {
389
390     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
391     trans->client_connected = 0;
392
393     apr_hash_clear(trans->session_cache);
394     apr_pool_destroy(trans->session_pool);
395     trans->session_pool = NULL;
396     trans->session_cache = NULL;
397
398     request_rec *r = server->request(server);
399     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
400         "WS disconnect from %s", r->connection->remote_ip);
401 }
402
403 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
404     fprintf(stderr, "WS on_destroy_handler()\n");
405     fflush(stderr);
406
407     if (trans) {
408         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
409         apr_pool_destroy(trans->main_pool);
410     }
411
412     trans = NULL;
413 }
414
415 static WebSocketPlugin osrf_websocket_plugin = {
416     sizeof(WebSocketPlugin),
417     WEBSOCKET_PLUGIN_VERSION_0,
418     on_destroy_handler,
419     on_connect_handler,
420     on_message_handler,
421     on_disconnect_handler
422 };
423
424 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
425     return &osrf_websocket_plugin;
426 }
427