1 /* -----------------------------------------------------------------------
2 * Copyright 2012 Equinox Software, Inc.
3 * Bill Erickson <berick@esilibrary.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 * -----------------------------------------------------------------------
18 * Dumb websocket <-> opensrf gateway. Wrapped opensrf messages are extracted
19 * and relayed to the opensrf network. Responses are pulled from the opensrf
20 * network and passed back to the client. No attempt is made to understand
21 * the contents of the messages.
23 * Messages to/from the websocket client take the following form:
25 * "service" : "opensrf.foo", // required for new sessions (inbound only)
26 * "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars.
27 * "log_xid" : "123..32", // optional log trace ID, max 64 chars;
28 * "osrf_msg" : {<osrf_msg>} // required
31 * Each translator operates with two threads. One thread receives messages
32 * from the websocket client, translates, and relays them to the opensrf
33 * network. The second thread collects responses from the opensrf network and
34 * relays them back to the websocket client.
36 * The main thread reads from socket A (apache) and writes to socket B
37 * (openesrf), while the responder thread reads from B and writes to A. The
38 * apr data structures used are threadsafe. For now, no thread mutex's are
41 * Note that with a "thread", which allows us to identify the opensrf session,
42 * the caller does not need to provide a recipient address. The "service" is
43 * only required to start a new opensrf session. After the sesession is
44 * started, all future communication is based solely on the thread.
46 * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care
47 * about the contents of the messages.
52 * short-timeout mode for brick detachment where inactivity timeout drops way
53 * down for graceful disconnects.
57 #include "apr_strings.h"
58 #include "apr_thread_proc.h"
60 #include "websocket_plugin.h"
61 #include "opensrf/log.h"
62 #include "opensrf/osrf_json.h"
63 #include "opensrf/transport_client.h"
64 #include "opensrf/transport_message.h"
65 #include "opensrf/osrf_system.h"
66 #include "opensrf/osrfConfig.h"
68 #define MAX_THREAD_SIZE 64
69 #define RECIP_BUF_SIZE 128
71 typedef struct _osrfWebsocketTranslator {
72 const WebSocketServer *server;
73 apr_pool_t *main_pool; // standalone per-process pool
74 apr_pool_t *session_pool; // child of trans->main_pool; per-session
75 apr_hash_t *session_cache;
76 apr_thread_t *responder_thread;
80 } osrfWebsocketTranslator;
82 static osrfWebsocketTranslator *trans = NULL;
83 static transport_client *osrf_handle = NULL;
84 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
88 * Responder thread main body.
89 * Collects responses from the opensrf network and relays them to the
92 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
94 transport_message *tmsg;
95 jsonObject *msg_wrapper;
100 tmsg = client_recv(osrf_handle, -1);
102 if (!tmsg) continue; // early exit on interrupt
104 // discard responses received after client disconnect
105 if (!trans->client_connected) {
106 osrfLogDebug(OSRF_LOG_MARK,
107 "WS discarding response for thread=%s, xid=%s",
108 tmsg->thread, tmsg->osrf_xid);
114 osrfLogDebug(OSRF_LOG_MARK,
115 "WS received opensrf response for thread=%s, xid=%s",
116 tmsg->thread, tmsg->osrf_xid);
118 // build the wrapper object
119 msg_wrapper = jsonNewObject(NULL);
120 jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
121 jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
122 jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
124 if (tmsg->is_error) {
126 "WS received jabber error message in response to thread=%s and xid=%s",
127 tmsg->thread, tmsg->osrf_xid);
129 jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
132 msg_string = jsonObjectToJSONRaw(msg_wrapper);
134 // deliver the wrapped message json to the websocket client
135 trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
136 (unsigned char*) msg_string, strlen(msg_string));
138 // capture the true message sender
139 // TODO: this will grow to add one entry per client session.
140 // need to ensure that connected-sessions don't last /too/ long or create
141 // a last-touched timeout mechanism to periodically remove old entries
142 if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) {
144 osrfLogDebug(OSRF_LOG_MARK,
145 "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender);
147 apr_hash_set(trans->session_cache,
148 apr_pstrdup(trans->session_pool, tmsg->thread),
150 apr_pstrdup(trans->session_pool, tmsg->sender));
154 jsonObjectFree(msg_wrapper);
162 * Allocate the session cache and create the responder thread
164 int child_init(const WebSocketServer *server) {
166 apr_pool_t *pool = NULL;
167 apr_thread_t *thread = NULL;
168 apr_threadattr_t *thread_attr = NULL;
169 request_rec *r = server->request(server);
171 osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
173 // osrf_handle will already be connected if this is not the first request
174 // served by this process.
175 if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
176 char* config_file = "/openils/conf/opensrf_core.xml";
177 char* config_ctx = "gateway"; //TODO config
178 if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {
179 osrfLogError(OSRF_LOG_MARK,
180 "WS unable to bootstrap OpenSRF client with config %s", config_file);
184 osrf_handle = osrfSystemGetTransportClient();
187 // create a standalone pool for our translator data
188 if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
189 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
194 // allocate our static translator instance
195 trans = (osrfWebsocketTranslator*)
196 apr_palloc(pool, sizeof(osrfWebsocketTranslator));
199 osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
203 trans->main_pool = pool;
204 trans->server = server;
205 trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
206 trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
208 // Create the responder thread. Once created,
209 // it runs for the lifetime of this process.
210 if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
211 (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
212 (apr_thread_create(&thread, thread_attr,
213 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
215 trans->responder_thread = thread;
218 osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
226 * Create the per-client translator
228 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
229 request_rec *r = server->request(server);
232 osrfLogDebug(OSRF_LOG_MARK,
233 "WS connect from %s", r->connection->remote_ip);
234 //"WS connect from %s", r->connection->client_ip); // apache 2.4
237 if (child_init(server) != APR_SUCCESS) {
242 // create a standalone pool for the session cache values, which will be
243 // destroyed on client disconnect.
244 if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
245 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
249 trans->session_pool = pool;
250 trans->session_cache = apr_hash_make(trans->session_pool);
252 if (trans->session_cache == NULL) {
253 osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
257 trans->client_connected = 1;
264 * Parse opensrf request and relay the request to the opensrf network.
266 static size_t CALLBACK on_message_handler(void *data,
267 const WebSocketServer *server, const int type,
268 unsigned char *buffer, const size_t buffer_size) {
270 request_rec *r = server->request(server);
272 jsonObject *msg_wrapper = NULL; // free me
273 const jsonObject *tmp_obj = NULL;
274 const jsonObject *osrf_msg = NULL;
275 const char *service = NULL;
276 const char *thread = NULL;
277 const char *log_xid = NULL;
278 char *msg_body = NULL;
279 char *recipient = NULL;
281 if (buffer_size <= 0) return OK;
283 osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
285 // buffer may not be \0-terminated, which jsonParse requires
286 char buf[buffer_size + 1];
287 memcpy(buf, buffer, buffer_size);
288 buf[buffer_size] = '\0';
290 msg_wrapper = jsonParseRaw(buf);
292 if (msg_wrapper == NULL) {
293 osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
294 return HTTP_BAD_REQUEST;
297 osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
299 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service"))
300 service = jsonObjectGetString(tmp_obj);
302 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread"))
303 thread = jsonObjectGetString(tmp_obj);
305 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid"))
306 log_xid = jsonObjectGetString(tmp_obj);
310 // use the caller-provide log trace id
311 if (strlen(log_xid) > MAX_THREAD_SIZE) {
312 osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
313 return HTTP_BAD_REQUEST;
316 // TODO: make this work with non-client and make this call accept
317 // const char*'s. casting to (char*) for now to silence warnings.
318 osrfLogSetXid((char*) log_xid);
321 // generate a new log trace id for this relay
327 if (strlen(thread) > MAX_THREAD_SIZE) {
328 osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
329 return HTTP_BAD_REQUEST;
332 // since clients can provide their own threads at session start time,
333 // the presence of a thread does not guarantee a cached recipient
334 recipient = (char*) apr_hash_get(
335 trans->session_cache, thread, APR_HASH_KEY_STRING);
338 osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
345 int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
346 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);
347 recipient_buf[size] = '\0';
348 recipient = recipient_buf;
351 osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
352 return HTTP_BAD_REQUEST;
356 // TODO: activity log entry? -- requires message analysis
357 osrfLogDebug(OSRF_LOG_MARK,
358 "WS relaying message thread=%s, xid=%s, recipient=%s",
359 thread, osrfLogGetXid(), recipient);
361 msg_body = jsonObjectToJSONRaw(osrf_msg);
363 transport_message *tmsg = message_init(
364 msg_body, NULL, thread, recipient, NULL);
366 message_set_osrf_xid(tmsg, osrfLogGetXid());
367 client_send_message(osrf_handle, tmsg);
371 jsonObjectFree(msg_wrapper);
379 * Release all memory allocated from the translator pool and kill the pool.
381 void CALLBACK on_disconnect_handler(
382 void *data, const WebSocketServer *server) {
384 osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
385 trans->client_connected = 0;
387 apr_hash_clear(trans->session_cache);
388 apr_pool_destroy(trans->session_pool);
389 trans->session_pool = NULL;
390 trans->session_cache = NULL;
392 request_rec *r = server->request(server);
393 osrfLogDebug(OSRF_LOG_MARK,
394 "WS disconnect from %s", r->connection->remote_ip);
395 //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
399 * Be nice and clean up our mess
401 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
403 apr_thread_exit(trans->responder_thread, APR_SUCCESS);
404 apr_pool_destroy(trans->main_pool);
410 static WebSocketPlugin osrf_websocket_plugin = {
411 sizeof(WebSocketPlugin),
412 WEBSOCKET_PLUGIN_VERSION_0,
416 on_disconnect_handler
419 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
420 return &osrf_websocket_plugin;