1 /* --------------------------------------------------------------------
2 * Copyright (C) 2018 King County Library Service
3 * Bill Erickson <berickxx@gmail.com>
5 * Code borrows heavily from osrf_websocket_translator.c
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License
9 * as published by the Free Software Foundation; either version 2
10 * of the License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 --------------------------------------------------------------------- */
19 * OpenSRF Websockets Relay
21 * Reads Websockets requests on STDIN
22 * Sends replies to requests on STDOUT
24 * Built to function with websocketd:
25 * https://github.com/joewalnes/websocketd
29 * websocketd --port 7682 --max-forks 250 ./osrf-websocket-stdio /path/to/opensrf_core.xml &
37 #include <opensrf/utils.h>
38 #include <opensrf/osrf_hash.h>
39 #include <opensrf/transport_client.h>
40 #include <opensrf/osrf_message.h>
41 #include <opensrf/osrf_app_session.h>
42 #include <opensrf/log.h>
44 #define MAX_THREAD_SIZE 64
45 #define RECIP_BUF_SIZE 256
46 #define WEBSOCKET_INGRESS "ws-translator-v2"
48 // maximun number of active, CONNECTed opensrf sessions allowed. in
49 // practice, this number will be very small, rarely reaching double
50 // digits. This is just a security back-stop. A client trying to open
51 // this many connections is almost certainly attempting to DOS the
53 #define MAX_ACTIVE_STATEFUL_SESSIONS 64
55 // Message exceeding this size are discarded.
56 // This value must be greater than RESET_MESSAGE_SIZE (below)
58 #define MAX_MESSAGE_SIZE 10485760
60 // After processing any message this size or larger, free and
61 // recreate the stdin buffer to release the memory.
63 #define RESET_MESSAGE_SIZE 102400
65 // default values, replaced during setup (below) as needed.
66 static char* config_file = "/openils/conf/opensrf_core.xml";
67 static char* config_ctxt = "gateway";
68 static char* osrf_router = NULL;
69 static char* osrf_domain = NULL;
71 // Cache of opensrf thread strings and back-end receipients.
72 // Tracking this here means the caller only needs to track the thread.
73 // It also means we don't have to expose internal XMPP IDs
74 static osrfHash* stateful_session_cache = NULL;
75 // Message on STDIN go into our reusable buffer
76 static growing_buffer* stdin_buf = NULL;
77 // OpenSRF XMPP connection handle
78 static transport_client* osrf_handle = NULL;
79 // Reusable string buf for recipient addresses
80 static char recipient_buf[RECIP_BUF_SIZE];
81 // Websocket client IP address (for logging)
82 static char* client_ip = NULL;
84 static void rebuild_stdin_buffer();
85 static void child_init(int argc, char* argv[]);
86 static void read_from_stdin();
87 static void relay_stdin_message(const char*);
88 static char* extract_inbound_messages();
89 static void log_request(const char*, osrfMessage*);
90 static void read_from_osrf();
91 static void read_one_osrf_message(transport_message*);
92 static int shut_it_down(int);
93 static void release_hash_string(char*, void*);
95 // Websocketd sends SIGINT for shutdown, followed by SIGTERM
96 // if SIGINT takes too long.
97 static void sigint_handler(int sig) {
98 osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown");
102 int main(int argc, char* argv[]) {
104 // Handle shutdown signal -- only needed once.
105 signal(SIGINT, sigint_handler);
107 // Connect to OpenSR -- exits on error
108 child_init(argc, argv);
110 // Disable output buffering.
111 setbuf(stdout, NULL);
112 rebuild_stdin_buffer();
114 // The main loop waits for data to be available on both STDIN
115 // (websocket client request) and the OpenSRF XMPP socket
116 // (replies returning to the websocket client).
118 int stdin_no = fileno(stdin);
119 int osrf_no = osrf_handle->session->sock_id;
120 int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
126 FD_SET(osrf_no, &fds);
127 FD_SET(stdin_no, &fds);
129 // Wait indefinitely for activity to process
130 sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
132 if (sel_resp < 0) { // error
134 if (errno == EINTR) {
135 // Interrupted by a signal. Start the loop over.
139 osrfLogError(OSRF_LOG_MARK,
140 "WS select() failed with [%s]. Exiting", strerror(errno));
145 if (FD_ISSET(stdin_no, &fds)) {
149 if (FD_ISSET(osrf_no, &fds)) {
154 return shut_it_down(0);
157 static void rebuild_stdin_buffer() {
159 if (stdin_buf != NULL) {
160 buffer_free(stdin_buf);
163 stdin_buf = buffer_init(1024);
166 static int shut_it_down(int stat) {
167 osrfHashFree(stateful_session_cache);
168 buffer_free(stdin_buf);
169 osrf_system_shutdown(); // clean XMPP disconnect
175 // Connect to OpenSRF/XMPP
176 // Apply settings and command line args.
177 static void child_init(int argc, char* argv[]) {
180 config_file = argv[1];
183 if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
184 fprintf(stderr, "Cannot boostrap OSRF\n");
188 osrf_handle = osrfSystemGetTransportClient();
189 osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
191 osrf_router = osrfConfigGetValue(NULL, "/router_name");
192 osrf_domain = osrfConfigGetValue(NULL, "/domain");
194 stateful_session_cache = osrfNewHash();
195 osrfHashSetCallback(stateful_session_cache, release_hash_string);
197 client_ip = getenv("REMOTE_ADDR");
198 osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
201 // Called by osrfHash when a string is removed. We strdup each
202 // string before it goes into the hash.
203 static void release_hash_string(char* key, void* str) {
204 if (str == NULL) return;
209 // Relay websocket client messages from STDIN to OpenSRF. Reads one
210 // message then returns, allowing responses to intermingle with long
211 // series of requests.
212 static void read_from_stdin() {
216 // Read one char at a time so we can stop at the first newline
217 // and leave any other data on the wire until read_from_stdin()
221 int stat = read(fileno(stdin), char_buf, 1);
225 if (errno == EAGAIN) {
226 // read interrupted. Return to main loop to resume.
227 // Returning here will leave any in-progress message in
228 // the stdin_buf. We return to the main select loop
229 // to confirm we really have more data to read and to
230 // perform additional error checking on the stream.
234 // All other errors reading STDIN are considered fatal.
235 osrfLogError(OSRF_LOG_MARK,
236 "WS STDIN read failed with [%s]. Exiting", strerror(errno));
241 if (stat == 0) { // EOF
242 osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect");
249 if (c == '\n') { // end of current message
251 if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
252 osrfLogError(OSRF_LOG_MARK,
253 "WS message exceeded MAX_MESSAGE_SIZE, discarding");
254 rebuild_stdin_buffer();
258 if (stdin_buf->n_used > 0) {
259 relay_stdin_message(stdin_buf->buf);
261 if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) {
262 // Current message is large. Rebuild the buffer
263 // to free the excess memory.
264 rebuild_stdin_buffer();
268 // Reset the buffer and carry on.
269 buffer_reset(stdin_buf);
277 if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
278 // Message exceeds max message size. Continue reading
279 // and discarding data. NOTE: don't reset stdin_buf
280 // here becase we check n_used again once reading is done.
284 // Add the char to our current message buffer
285 buffer_add_char(stdin_buf, c);
290 // Relays a single websocket request to the OpenSRF/XMPP network.
291 static void relay_stdin_message(const char* msg_string) {
293 jsonObject *msg_wrapper = NULL; // free me
294 const jsonObject *tmp_obj = NULL;
295 const jsonObject *osrf_msg = NULL;
296 const char *service = NULL;
297 const char *thread = NULL;
298 const char *log_xid = NULL;
299 char *msg_body = NULL;
300 char *recipient = NULL;
302 // generate a new log trace for this request. it
303 // may be replaced by a client-provided trace below.
306 osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string);
308 msg_wrapper = jsonParse(msg_string);
310 if (msg_wrapper == NULL) {
311 osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string);
315 osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
317 if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) )
318 service = jsonObjectGetString(tmp_obj);
320 if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) )
321 thread = jsonObjectGetString(tmp_obj);
323 if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) )
324 log_xid = jsonObjectGetString(tmp_obj);
328 // use the caller-provide log trace id
329 if (strlen(log_xid) > MAX_THREAD_SIZE) {
330 osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
334 osrfLogForceXid(log_xid);
339 if (strlen(thread) > MAX_THREAD_SIZE) {
340 osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
344 // since clients can provide their own threads at session start time,
345 // the presence of a thread does not guarantee a cached recipient
346 recipient = (char*) osrfHashGet(stateful_session_cache, thread);
349 osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
356 int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
357 "%s@%s/%s", osrf_router, osrf_domain, service);
358 recipient_buf[size] = '\0';
359 recipient = recipient_buf;
362 osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
367 osrfLogDebug(OSRF_LOG_MARK,
368 "WS relaying message to opensrf thread=%s, recipient=%s",
371 // 'recipient' will be freed in extract_inbound_messages
372 // during a DISCONNECT call. Retain a local copy.
373 recipient = strdup(recipient);
375 msg_body = extract_inbound_messages(service, thread, osrf_msg);
377 osrfLogInternal(OSRF_LOG_MARK,
378 "WS relaying inbound message: %s", msg_body);
380 transport_message *tmsg = message_init(
381 msg_body, NULL, thread, recipient, NULL);
385 message_set_osrf_xid(tmsg, osrfLogGetXid());
387 if (client_send_message(osrf_handle, tmsg) != 0) {
388 osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
394 jsonObjectFree(msg_wrapper);
398 // Turn the OpenSRF message JSON into a set of osrfMessage's for
399 // analysis, ingress application, and logging.
400 static char* extract_inbound_messages(
401 const char* service, const char* thread, const jsonObject *osrf_msg) {
404 int num_msgs = osrf_msg->size;
406 osrfMessage* msg_list[num_msgs];
408 // here we do an extra json round-trip to get the data
409 // in a form osrf_message_deserialize can understand
410 // TODO: consider a version of osrf_message_init which can
411 // accept a jsonObject* instead of a JSON string.
412 char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
413 osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
416 // should we require the caller to always pass the service?
417 if (service == NULL) service = "";
419 for (i = 0; i < num_msgs; i++) {
421 osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
423 switch (msg->m_type) {
429 log_request(service, msg);
433 osrfHashRemove(stateful_session_cache, thread);
437 osrfLogError(OSRF_LOG_MARK, "WS received unexpected message "
438 "type from WebSocket client: %d", msg->m_type);
443 char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
445 // clean up our messages
446 for (i = 0; i < num_msgs; i++)
447 osrfMessageFree(msg_list[i]);
452 // All REQUESTs are logged as activity.
453 static void log_request(const char* service, osrfMessage* msg) {
455 const jsonObject* params = msg->_params;
456 growing_buffer* act = buffer_init(128);
457 char* method = msg->method_name;
458 const jsonObject* obj = NULL;
461 int redactParams = 0;
463 buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method);
465 while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
466 if (!strncmp(method, str, strlen(str))) {
473 OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
476 while ((obj = jsonObjectGetIndex(params, i++))) {
477 char* str = jsonObjectToJSON(obj);
479 OSRF_BUFFER_ADD(act, " ");
481 OSRF_BUFFER_ADD(act, ", ");
482 OSRF_BUFFER_ADD(act, str);
487 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
493 // Relay response messages from OpenSRF to STDIN
494 // Relays all available messages
495 static void read_from_osrf() {
496 transport_message* tmsg = NULL;
498 // Double check the socket connection before continuing.
499 if (!client_connected(osrf_handle) ||
500 !socket_connected(osrf_handle->session->sock_id)) {
501 osrfLogWarning(OSRF_LOG_MARK,
502 "WS: Jabber socket disconnected, exiting");
506 // Once client_recv is called all data waiting on the socket is
507 // read. This means we can't return to the main select() loop after
508 // each message, because any subsequent messages will get stuck in
509 // the opensrf receive queue. Process all available messages.
510 while ( (tmsg = client_recv(osrf_handle, 0)) ) {
511 read_one_osrf_message(tmsg);
516 // Process a single OpenSRF response message and print the reponse
517 // to STDOUT for delivery to the websocket client.
518 static void read_one_osrf_message(transport_message* tmsg) {
519 osrfList *msg_list = NULL;
520 osrfMessage *one_msg = NULL;
523 osrfLogDebug(OSRF_LOG_MARK,
524 "WS received opensrf response for thread=%s", tmsg->thread);
526 // first we need to perform some maintenance
527 msg_list = osrfMessageDeserialize(tmsg->body, NULL);
529 for (i = 0; i < msg_list->size; i++) {
530 one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
532 osrfLogDebug(OSRF_LOG_MARK,
533 "WS returned response of type %d", one_msg->m_type);
535 /* if our client just successfully connected to an opensrf service,
536 cache the sender so that future calls on this thread will use
537 the correct recipient. */
538 if (one_msg && one_msg->m_type == STATUS) {
540 if (one_msg->status_code == OSRF_STATUS_OK) {
542 if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
544 unsigned long ses_size =
545 osrfHashGetCount(stateful_session_cache);
547 if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
549 osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
550 "thread=%s, sender=%s; concurrent=%d",
551 tmsg->thread, tmsg->sender, ses_size);
553 char* sender = strdup(tmsg->sender); // free in *Remove
554 osrfHashSet(stateful_session_cache, sender, tmsg->thread);
558 osrfLogWarning(OSRF_LOG_MARK,
559 "WS max concurrent sessions (%d) reached. "
560 "Current session will not be tracked",
561 MAX_ACTIVE_STATEFUL_SESSIONS
568 // connection timed out; clear the cached recipient
569 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
570 osrfHashRemove(stateful_session_cache, tmsg->thread);
576 // osrfMessageDeserialize applies the freeItem handler to the
577 // newly created osrfList. We only need to free the list and
578 // the individual osrfMessage's will be freed along with it
579 osrfListFree(msg_list);
581 // Pack the response into a websocket wrapper message.
582 jsonObject *msg_wrapper = NULL;
583 char *msg_string = NULL;
584 msg_wrapper = jsonNewObject(NULL);
586 jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
587 jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
588 jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
590 if (tmsg->is_error) {
591 // tmsg->sender is the original recipient. they get swapped
593 osrfLogError(OSRF_LOG_MARK,
594 "WS received XMPP error message in response to thread=%s and "
595 "recipient=%s. Likely the recipient is not accessible/available.",
596 tmsg->thread, tmsg->sender);
597 jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
600 msg_string = jsonObjectToJSONRaw(msg_wrapper);
602 // Send the JSON to STDOUT
603 printf("%s\n", msg_string);
606 jsonObjectFree(msg_wrapper);