]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/websocket-stdio/osrf-websocket-stdio.c
96d62f10224d29a122e580111dfed1e529d6af87
[OpenSRF.git] / src / websocket-stdio / osrf-websocket-stdio.c
1 /* --------------------------------------------------------------------
2  * Copyright (C) 2018 King County Library Service
3  * Bill Erickson <berickxx@gmail.com>
4  *
5  * Code borrows heavily from osrf_websocket_translator.c
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * as published by the Free Software Foundation; either version 2
10  * of the License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16 --------------------------------------------------------------------- */
17
18 /**
19  * OpenSRF Websockets Relay
20  *
21  * Reads Websockets requests on STDIN
22  * Sends replies to requests on STDOUT
23  *
24  * Built to function with websocketd:
25  * https://github.com/joewalnes/websocketd
26  *
27  * Synopsis:
28  *
29  * websocketd --port 7682 --max-forks 250 ./osrf-websocket-stdio /path/to/opensrf_core.xml &
30  *
31  */
32
33 #include <stdio.h>
34 #include <unistd.h>
35 #include <string.h>
36 #include <signal.h>
37 #include <opensrf/utils.h>
38 #include <opensrf/osrf_hash.h>
39 #include <opensrf/transport_client.h>
40 #include <opensrf/osrf_message.h>
41 #include <opensrf/osrf_app_session.h>
42 #include <opensrf/log.h>
43
44 #define MAX_THREAD_SIZE 64
45 #define RECIP_BUF_SIZE 256
46 #define WEBSOCKET_INGRESS "ws-translator-v2"
47
48 // maximun number of active, CONNECTed opensrf sessions allowed. in
49 // practice, this number will be very small, rarely reaching double
50 // digits.  This is just a security back-stop.  A client trying to open
51 // this many connections is almost certainly attempting to DOS the
52 // gateway / server.
53 #define MAX_ACTIVE_STATEFUL_SESSIONS 64
54
55 // Message exceeding this size are discarded.
56 // This value must be greater than RESET_MESSAGE_SIZE (below)
57 // ~10M
58 #define MAX_MESSAGE_SIZE 10485760
59
60 // After processing any message this size or larger, free and
61 // recreate the stdin buffer to release the memory.
62 // ~100k
63 #define RESET_MESSAGE_SIZE 102400
64
65 // default values, replaced during setup (below) as needed.
66 static char* config_file = "/openils/conf/opensrf_core.xml";
67 static char* config_ctxt = "gateway";
68 static char* osrf_router = NULL;
69 static char* osrf_domain = NULL;
70
71 // Cache of opensrf thread strings and back-end receipients.
72 // Tracking this here means the caller only needs to track the thread.
73 // It also means we don't have to expose internal XMPP IDs
74 static osrfHash* stateful_session_cache = NULL;
75 // Message on STDIN go into our reusable buffer
76 static growing_buffer* stdin_buf = NULL;
77 // OpenSRF XMPP connection handle
78 static transport_client* osrf_handle = NULL;
79 // Reusable string buf for recipient addresses
80 static char recipient_buf[RECIP_BUF_SIZE];
81 // Websocket client IP address (for logging)
82 static char* client_ip = NULL;
83
84 static void rebuild_stdin_buffer();
85 static void child_init(int argc, char* argv[]);
86 static void read_from_stdin();
87 static void relay_stdin_message(const char*);
88 static char* extract_inbound_messages();
89 static void log_request(const char*, osrfMessage*);
90 static void read_from_osrf();
91 static void read_one_osrf_message(transport_message*);
92 static int shut_it_down(int);
93 static void release_hash_string(char*, void*);
94
95 // Websocketd sends SIGINT for shutdown, followed by SIGTERM
96 // if SIGINT takes too long.
97 static void sigint_handler(int sig) {
98     osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown");
99     shut_it_down(0);
100 }
101
102 int main(int argc, char* argv[]) {
103
104     // Handle shutdown signal -- only needed once.
105     signal(SIGINT, sigint_handler);
106
107     // Connect to OpenSR -- exits on error
108     child_init(argc, argv);
109
110     // Disable output buffering.
111     setbuf(stdout, NULL);
112     rebuild_stdin_buffer();
113
114     // The main loop waits for data to be available on both STDIN
115     // (websocket client request) and the OpenSRF XMPP socket 
116     // (replies returning to the websocket client).
117     fd_set fds;
118     int stdin_no = fileno(stdin);
119     int osrf_no = osrf_handle->session->sock_id;
120     int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
121     int sel_resp;
122
123     while (1) {
124
125         FD_ZERO(&fds);
126         FD_SET(osrf_no, &fds);
127         FD_SET(stdin_no, &fds);
128
129         // Wait indefinitely for activity to process
130         sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
131
132         if (sel_resp < 0) { // error
133
134             if (errno == EINTR) {
135                 // Interrupted by a signal.  Start the loop over.
136                 continue;
137             }
138
139             osrfLogError(OSRF_LOG_MARK,
140                 "WS select() failed with [%s]. Exiting", strerror(errno));
141
142             shut_it_down(1);
143         }
144
145         if (FD_ISSET(stdin_no, &fds)) {
146             read_from_stdin();
147         }
148
149         if (FD_ISSET(osrf_no, &fds)) {
150             read_from_osrf();
151         }
152     }
153
154     return shut_it_down(0);
155 }
156
157 static void rebuild_stdin_buffer() {
158
159     if (stdin_buf != NULL) {
160         buffer_free(stdin_buf);
161     }
162
163     stdin_buf = buffer_init(1024);
164 }
165
166 static int shut_it_down(int stat) {
167     osrfHashFree(stateful_session_cache);
168     buffer_free(stdin_buf);
169     osrf_system_shutdown(); // clean XMPP disconnect
170     exit(stat);
171     return stat;
172 }
173
174
175 // Connect to OpenSRF/XMPP
176 // Apply settings and command line args.
177 static void child_init(int argc, char* argv[]) {
178
179     if (argc > 1) {
180         config_file = argv[1];
181     }
182
183     if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
184         fprintf(stderr, "Cannot boostrap OSRF\n");
185         shut_it_down(1);
186     }
187
188         osrf_handle = osrfSystemGetTransportClient();
189         osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
190
191     osrf_router = osrfConfigGetValue(NULL, "/router_name");
192     osrf_domain = osrfConfigGetValue(NULL, "/domain");
193
194     stateful_session_cache = osrfNewHash();
195     osrfHashSetCallback(stateful_session_cache, release_hash_string);
196
197     client_ip = getenv("REMOTE_ADDR");
198     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
199 }
200
201 // Called by osrfHash when a string is removed.  We strdup each
202 // string before it goes into the hash.
203 static void release_hash_string(char* key, void* str) {
204     if (str == NULL) return;
205     free((char*) str);
206 }
207
208
209 // Relay websocket client messages from STDIN to OpenSRF.  Reads one
210 // message then returns, allowing responses to intermingle with long
211 // series of requests.
212 static void read_from_stdin() {
213     char char_buf[1];
214     char c;
215
216     // Read one char at a time so we can stop at the first newline
217     // and leave any other data on the wire until read_from_stdin()
218     // is called again.
219
220     while (1) {
221         int stat = read(fileno(stdin), char_buf, 1);
222
223         if (stat < 0) {
224
225             if (errno == EAGAIN) {
226                 // read interrupted.  Return to main loop to resume.
227                 // Returning here will leave any in-progress message in
228                 // the stdin_buf.  We return to the main select loop
229                 // to confirm we really have more data to read and to
230                 // perform additional error checking on the stream.
231                 return;
232             }
233
234             // All other errors reading STDIN are considered fatal.
235             osrfLogError(OSRF_LOG_MARK,
236                 "WS STDIN read failed with [%s]. Exiting", strerror(errno));
237             shut_it_down(1);
238             return;
239         }
240
241         if (stat == 0) { // EOF
242             osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect");
243             shut_it_down(0);
244             return;
245         }
246
247         c = char_buf[0];
248
249         if (c == '\n') { // end of current message
250
251             if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
252                 osrfLogError(OSRF_LOG_MARK,
253                     "WS message exceeded MAX_MESSAGE_SIZE, discarding");
254                 rebuild_stdin_buffer();
255                 return;
256             }
257
258             if (stdin_buf->n_used > 0) {
259                 relay_stdin_message(stdin_buf->buf);
260
261                 if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) {
262                     // Current message is large.  Rebuild the buffer
263                     // to free the excess memory.
264                     rebuild_stdin_buffer();
265
266                 } else {
267
268                     // Reset the buffer and carry on.
269                     buffer_reset(stdin_buf);
270                 }
271             }
272
273             return;
274
275         } else {
276
277             if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
278                 // Message exceeds max message size.  Continue reading
279                 // and discarding data.  NOTE: don't reset stdin_buf
280                 // here becase we check n_used again once reading is done.
281                 continue;
282             }
283
284             // Add the char to our current message buffer
285             buffer_add_char(stdin_buf, c);
286         }
287     }
288 }
289
290 // Relays a single websocket request to the OpenSRF/XMPP network.
291 static void relay_stdin_message(const char* msg_string) {
292
293     jsonObject *msg_wrapper = NULL; // free me
294     const jsonObject *tmp_obj = NULL;
295     const jsonObject *osrf_msg = NULL;
296     const char *service = NULL;
297     const char *thread = NULL;
298     const char *log_xid = NULL;
299     char *msg_body = NULL;
300     char *recipient = NULL;
301
302     // generate a new log trace for this request. it
303     // may be replaced by a client-provided trace below.
304     osrfLogMkXid();
305
306     osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string);
307
308     msg_wrapper = jsonParse(msg_string);
309
310     if (msg_wrapper == NULL) {
311         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string);
312         return;
313     }
314
315     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
316
317     if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) )
318         service = jsonObjectGetString(tmp_obj);
319
320     if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) )
321         thread = jsonObjectGetString(tmp_obj);
322
323     if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) )
324         log_xid = jsonObjectGetString(tmp_obj);
325
326     if (log_xid) {
327
328         // use the caller-provide log trace id
329         if (strlen(log_xid) > MAX_THREAD_SIZE) {
330             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
331             return;
332         }
333
334         osrfLogForceXid(log_xid);
335     }
336
337     if (thread) {
338
339         if (strlen(thread) > MAX_THREAD_SIZE) {
340             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
341             return;
342         }
343
344         // since clients can provide their own threads at session start time,
345         // the presence of a thread does not guarantee a cached recipient
346         recipient = (char*) osrfHashGet(stateful_session_cache, thread);
347
348         if (recipient) {
349             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
350         }
351     }
352
353     if (!recipient) {
354
355         if (service) {
356             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
357                 "%s@%s/%s", osrf_router, osrf_domain, service);
358             recipient_buf[size] = '\0';
359             recipient = recipient_buf;
360
361         } else {
362             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
363             return;
364         }
365     }
366
367     osrfLogDebug(OSRF_LOG_MARK,
368         "WS relaying message to opensrf thread=%s, recipient=%s",
369             thread, recipient);
370
371     // 'recipient' will be freed in extract_inbound_messages
372     // during a DISCONNECT call.  Retain a local copy.
373     recipient = strdup(recipient);
374
375     msg_body = extract_inbound_messages(service, thread, osrf_msg);
376
377     osrfLogInternal(OSRF_LOG_MARK,
378         "WS relaying inbound message: %s", msg_body);
379
380     transport_message *tmsg = message_init(
381         msg_body, NULL, thread, recipient, NULL);
382
383     free(recipient);
384
385     message_set_osrf_xid(tmsg, osrfLogGetXid());
386
387     if (client_send_message(osrf_handle, tmsg) != 0) {
388         osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
389         shut_it_down(1);
390     }
391
392     osrfLogClearXid();
393     message_free(tmsg);
394     jsonObjectFree(msg_wrapper);
395     free(msg_body);
396 }
397
398 // Turn the OpenSRF message JSON into a set of osrfMessage's for 
399 // analysis, ingress application, and logging.
400 static char* extract_inbound_messages(
401         const char* service, const char* thread, const jsonObject *osrf_msg) {
402
403     int i;
404     int num_msgs = osrf_msg->size;
405     osrfMessage* msg;
406     osrfMessage* msg_list[num_msgs];
407
408     // here we do an extra json round-trip to get the data
409     // in a form osrf_message_deserialize can understand
410     // TODO: consider a version of osrf_message_init which can
411     // accept a jsonObject* instead of a JSON string.
412     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
413     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
414     free(osrf_msg_json);
415
416     // should we require the caller to always pass the service?
417     if (service == NULL) service = "";
418
419     for (i = 0; i < num_msgs; i++) {
420         msg = msg_list[i];
421         osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
422
423         switch (msg->m_type) {
424
425             case CONNECT:
426                 break;
427
428             case REQUEST:
429                 log_request(service, msg);
430                 break;
431
432             case DISCONNECT:
433                 osrfHashRemove(stateful_session_cache, thread);
434                 break;
435
436             default:
437                 osrfLogError(OSRF_LOG_MARK, "WS received unexpected message "
438                     "type from WebSocket client: %d", msg->m_type);
439                 break;
440         }
441     }
442
443     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
444
445     // clean up our messages
446     for (i = 0; i < num_msgs; i++)
447         osrfMessageFree(msg_list[i]);
448
449     return finalMsg;
450 }
451
452 // All REQUESTs are logged as activity.
453 static void log_request(const char* service, osrfMessage* msg) {
454
455     const jsonObject* params = msg->_params;
456     growing_buffer* act = buffer_init(128);
457     char* method = msg->method_name;
458     const jsonObject* obj = NULL;
459     int i = 0;
460     const char* str;
461     int redactParams = 0;
462
463     buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method);
464
465     while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
466         if (!strncmp(method, str, strlen(str))) {
467             redactParams = 1;
468             break;
469         }
470     }
471
472     if (redactParams) {
473         OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
474     } else {
475         i = 0;
476         while ((obj = jsonObjectGetIndex(params, i++))) {
477             char* str = jsonObjectToJSON(obj);
478             if (i == 1)
479                 OSRF_BUFFER_ADD(act, " ");
480             else
481                 OSRF_BUFFER_ADD(act, ", ");
482             OSRF_BUFFER_ADD(act, str);
483             free(str);
484         }
485     }
486
487     osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
488     buffer_free(act);
489 }
490
491
492
493 // Relay response messages from OpenSRF to STDIN
494 // Relays all available messages
495 static void read_from_osrf() {
496     transport_message* tmsg = NULL;
497
498     // Double check the socket connection before continuing.
499     if (!client_connected(osrf_handle) ||
500         !socket_connected(osrf_handle->session->sock_id)) {
501         osrfLogWarning(OSRF_LOG_MARK,
502             "WS: Jabber socket disconnected, exiting");
503         shut_it_down(1);
504     }
505
506     // Once client_recv is called all data waiting on the socket is
507     // read.  This means we can't return to the main select() loop after
508     // each message, because any subsequent messages will get stuck in
509     // the opensrf receive queue. Process all available messages.
510     while ( (tmsg = client_recv(osrf_handle, 0)) ) {
511         read_one_osrf_message(tmsg);
512         message_free(tmsg);
513     }
514 }
515
516 // Process a single OpenSRF response message and print the reponse
517 // to STDOUT for delivery to the websocket client.
518 static void read_one_osrf_message(transport_message* tmsg) {
519     osrfList *msg_list = NULL;
520     osrfMessage *one_msg = NULL;
521     int i;
522
523     osrfLogDebug(OSRF_LOG_MARK,
524         "WS received opensrf response for thread=%s", tmsg->thread);
525
526     // first we need to perform some maintenance
527     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
528
529     for (i = 0; i < msg_list->size; i++) {
530         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
531
532         osrfLogDebug(OSRF_LOG_MARK,
533             "WS returned response of type %d", one_msg->m_type);
534
535         /*  if our client just successfully connected to an opensrf service,
536             cache the sender so that future calls on this thread will use
537             the correct recipient. */
538         if (one_msg && one_msg->m_type == STATUS) {
539
540             if (one_msg->status_code == OSRF_STATUS_OK) {
541
542                 if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
543
544                     unsigned long ses_size =
545                         osrfHashGetCount(stateful_session_cache);
546
547                     if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
548
549                         osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
550                             "thread=%s, sender=%s; concurrent=%d",
551                             tmsg->thread, tmsg->sender, ses_size);
552
553                         char* sender = strdup(tmsg->sender); // free in *Remove
554                         osrfHashSet(stateful_session_cache, sender, tmsg->thread);
555
556                     } else {
557
558                         osrfLogWarning(OSRF_LOG_MARK,
559                             "WS max concurrent sessions (%d) reached.  "
560                             "Current session will not be tracked",
561                             MAX_ACTIVE_STATEFUL_SESSIONS
562                         );
563                     }
564                 }
565
566             } else {
567
568                 // connection timed out; clear the cached recipient
569                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
570                     osrfHashRemove(stateful_session_cache, tmsg->thread);
571                 }
572             }
573         }
574     }
575
576     // osrfMessageDeserialize applies the freeItem handler to the
577     // newly created osrfList.  We only need to free the list and
578     // the individual osrfMessage's will be freed along with it
579     osrfListFree(msg_list);
580
581     // Pack the response into a websocket wrapper message.
582     jsonObject *msg_wrapper = NULL;
583     char *msg_string = NULL;
584     msg_wrapper = jsonNewObject(NULL);
585
586     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
587     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
588     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
589
590     if (tmsg->is_error) {
591         // tmsg->sender is the original recipient. they get swapped
592         // in error replies.
593         osrfLogError(OSRF_LOG_MARK,
594             "WS received XMPP error message in response to thread=%s and "
595             "recipient=%s.  Likely the recipient is not accessible/available.",
596             tmsg->thread, tmsg->sender);
597         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
598     }
599
600     msg_string = jsonObjectToJSONRaw(msg_wrapper);
601
602     // Send the JSON to STDOUT
603     printf("%s\n", msg_string);
604
605     free(msg_string);
606     jsonObjectFree(msg_wrapper);
607 }
608
609