1 /* -----------------------------------------------------------------------
2 * Copyright 2012 Equinox Software, Inc.
3 * Bill Erickson <berick@esilibrary.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 * -----------------------------------------------------------------------
18 * Dumb websocket <-> opensrf gateway. Wrapped opensrf messages are extracted
19 * and relayed to the opensrf network. Responses are pulled from the opensrf
20 * network and passed back to the client. No attempt is made to understand
21 * the contents of the messages.
23 * Messages to/from the websocket client take the following form:
25 * "service" : "opensrf.foo", // required for new sessions (inbound only)
26 * "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars.
27 * "log_xid" : "123..32", // optional log trace ID, max 64 chars;
28 * "osrf_msg" : {<osrf_msg>} // required
31 * Each translator operates with two threads. One thread receives messages
32 * from the websocket client, translates, and relays them to the opensrf
33 * network. The second thread collects responses from the opensrf network and
34 * relays them back to the websocket client.
36 * The main thread reads from socket A (apache) and writes to socket B
37 * (openesrf), while the responder thread reads from B and writes to A. The
38 * apr data structures used are threadsafe. For now, no thread mutex's are
41 * Note that with a "thread", which allows us to identify the opensrf session,
42 * the caller does not need to provide a recipient address. The "service" is
43 * only required to start a new opensrf session. After the sesession is
44 * started, all future communication is based solely on the thread.
46 * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care
47 * about the contents of the messages.
52 * short-timeout mode for brick detachment where inactivity timeout drops way
53 * down for graceful disconnects.
59 #include "apr_strings.h"
60 #include "apr_thread_proc.h"
62 #include "websocket_plugin.h"
63 #include "opensrf/osrf_json.h"
64 #include "opensrf/transport_client.h"
65 #include "opensrf/transport_message.h"
66 #include "opensrf/osrf_system.h"
67 #include "opensrf/osrfConfig.h"
69 #define MAX_THREAD_SIZE 64
70 #define RECIP_BUF_SIZE 128
71 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
72 static transport_client *osrf_handle = NULL;
74 typedef struct _osrfWebsocketTranslator {
75 const WebSocketServer *server;
76 apr_pool_t *main_pool; // standline per-process pool
77 apr_pool_t *session_pool; // child of trans->main_pool; per-session
78 apr_hash_t *session_cache;
79 apr_thread_t *responder_thread;
83 } osrfWebsocketTranslator;
85 static osrfWebsocketTranslator *trans = NULL;
89 * Responder thread main body.
90 * Collects responses from the opensrf network and relays them to the
93 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
95 request_rec *r = trans->server->request(trans->server);
96 jsonObject *msg_wrapper;
101 transport_message *msg = client_recv(osrf_handle, -1);
102 if (!msg) continue; // early exit on interrupt
104 // discard responses received after client disconnect
105 if (!trans->client_connected) {
106 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
107 "WS discarding response for thread=%s, xid=%s",
108 msg->thread, msg->osrf_xid);
113 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
114 "WS received opensrf response for thread=%s, xid=%s",
115 msg->thread, msg->osrf_xid);
117 // build the wrapper object
118 msg_wrapper = jsonNewObject(NULL);
119 jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread));
120 jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid));
121 jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body));
124 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
125 "WS received jabber error message in response to thread=%s and xid=%s",
126 msg->thread, msg->osrf_xid);
127 jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
130 msg_string = jsonObjectToJSONRaw(msg_wrapper);
132 // deliver the wrapped message json to the websocket client
133 trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
134 (unsigned char*) msg_string, strlen(msg_string));
136 // capture the true message sender
137 // TODO: this will grow to add one entry per client session.
138 // need a last-touched timeout mechanism to periodically remove old entries
139 if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) {
141 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
142 "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender);
144 apr_hash_set(trans->session_cache,
145 apr_pstrdup(trans->session_pool, msg->thread),
147 apr_pstrdup(trans->session_pool, msg->sender));
151 jsonObjectFree(msg_wrapper);
159 * Allocate the session cache and create the responder thread
161 int child_init(const WebSocketServer *server) {
163 apr_pool_t *pool = NULL;
164 apr_thread_t *thread = NULL;
165 apr_threadattr_t *thread_attr = NULL;
166 request_rec *r = server->request(server);
168 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS child_init");
170 // osrf_handle will already be connected if this is not the first request
171 // served by this process.
172 if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
173 char* config_file = "/openils/conf/opensrf_core.xml";
174 char* config_ctx = "gateway"; //TODO config
175 if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {
176 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
177 "WS unable to bootstrap OpenSRF client with config %s", config_file);
181 osrf_handle = osrfSystemGetTransportClient();
184 // create a standalone pool for our translator data
185 if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
186 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create apr_pool");
191 // allocate our static translator instance
192 trans = (osrfWebsocketTranslator*)
193 apr_palloc(pool, sizeof(osrfWebsocketTranslator));
196 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create translator");
200 trans->main_pool = pool;
201 trans->server = server;
202 trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
203 trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
205 // Create the responder thread. Once created, it runs for the lifetime
207 if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
208 (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
209 (apr_thread_create(&thread, thread_attr,
210 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
212 trans->responder_thread = thread;
215 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
216 "WS unable to create responder thread");
224 * Create the per-client translator
226 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
227 request_rec *r = server->request(server);
230 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
231 "WS connect from %s", r->connection->remote_ip);
234 if (child_init(server) != APR_SUCCESS) {
239 // create a standalone pool for the session cache values, which will be
240 // destroyed on client disconnect.
241 if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
242 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
243 "WS Unable to create apr_pool");
247 trans->session_pool = pool;
248 trans->session_cache = apr_hash_make(trans->session_pool);
250 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
251 "WS created new pool %x", trans->session_pool);
253 if (trans->session_cache == NULL) {
254 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
255 "WS unable to create session cache");
259 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
260 "WS created new hash %x", trans->session_cache);
262 trans->client_connected = 1;
269 * Parse opensrf request and relay the request to the opensrf network.
271 static size_t CALLBACK on_message_handler(void *data,
272 const WebSocketServer *server, const int type,
273 unsigned char *buffer, const size_t buffer_size) {
275 request_rec *r = server->request(server);
277 jsonObject *msg_wrapper = NULL; // free me
278 const jsonObject *tmp_obj = NULL;
279 const jsonObject *osrf_msg = NULL;
280 const char *service = NULL;
281 const char *thread = NULL;
282 const char *log_xid = NULL;
283 char *msg_body = NULL;
284 char *recipient = NULL;
286 if (buffer_size <= 0) return OK;
288 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
289 "WS received message size=%d", buffer_size);
291 // buffer may not be \0-terminated, which jsonParse requires
292 char buf[buffer_size + 1];
293 memcpy(buf, buffer, buffer_size);
294 buf[buffer_size] = '\0';
296 msg_wrapper = jsonParseRaw(buf);
298 if (msg_wrapper == NULL) {
299 ap_log_rerror(APLOG_MARK,
300 APLOG_NOTICE, 0, r, "WS Invalid JSON: %s", buf);
301 return HTTP_BAD_REQUEST;
304 osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
306 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service"))
307 service = jsonObjectGetString(tmp_obj);
309 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread"))
310 thread = jsonObjectGetString(tmp_obj);
312 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid"))
313 log_xid = jsonObjectGetString(tmp_obj);
316 // use the caller-provide log trace id
317 if (strlen(log_xid) > MAX_THREAD_SIZE) {
318 ap_log_rerror(APLOG_MARK, APLOG_NOTICE,
319 0, r, "WS log_xid exceeds max length");
320 return HTTP_BAD_REQUEST;
322 osrfLogSetXid(log_xid); // TODO: make with with non-client
324 // generate a new log trace id for this relay
330 if (strlen(thread) > MAX_THREAD_SIZE) {
331 ap_log_rerror(APLOG_MARK, APLOG_NOTICE,
332 0, r, "WS thread exceeds max length");
333 return HTTP_BAD_REQUEST;
336 // since clients can provide their own threads at session start time,
337 // the presence of a thread does not guarantee a cached recipient
338 recipient = (char*) apr_hash_get(
339 trans->session_cache, thread, APR_HASH_KEY_STRING);
342 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
343 "WS found cached recipient %s", recipient);
350 int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
351 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);
352 recipient_buf[size] = '\0';
353 recipient = recipient_buf;
356 ap_log_rerror(APLOG_MARK, APLOG_NOTICE,
357 0, r, "WS Unable to determine recipient");
358 return HTTP_BAD_REQUEST;
362 // TODO: activity log entry? -- requires message analysis
363 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
364 "WS relaying message thread=%s, xid=%s, recipient=%s",
365 thread, osrfLogGetXid(), recipient);
367 msg_body = jsonObjectToJSONRaw(osrf_msg);
369 transport_message *tmsg = message_init(
370 msg_body, NULL, thread, recipient, NULL);
372 message_set_osrf_xid(tmsg, osrfLogGetXid());
373 client_send_message(osrf_handle, tmsg);
385 * Release all memory allocated from the translator pool and kill the pool.
387 void CALLBACK on_disconnect_handler(
388 void *data, const WebSocketServer *server) {
390 osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
391 trans->client_connected = 0;
393 apr_hash_clear(trans->session_cache);
394 apr_pool_destroy(trans->session_pool);
395 trans->session_pool = NULL;
396 trans->session_cache = NULL;
398 request_rec *r = server->request(server);
399 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
400 "WS disconnect from %s", r->connection->remote_ip);
403 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
404 fprintf(stderr, "WS on_destroy_handler()\n");
408 apr_thread_exit(trans->responder_thread, APR_SUCCESS);
409 apr_pool_destroy(trans->main_pool);
415 static WebSocketPlugin osrf_websocket_plugin = {
416 sizeof(WebSocketPlugin),
417 WEBSOCKET_PLUGIN_VERSION_0,
421 on_disconnect_handler
424 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
425 return &osrf_websocket_plugin;