LP#1803182 Websocketd graceful shutdown support
authorBill Erickson <berickxx@gmail.com>
Tue, 13 Nov 2018 22:21:18 +0000 (17:21 -0500)
committerGalen Charlton <gmc@equinoxinitiative.org>
Mon, 17 Dec 2018 21:13:19 +0000 (16:13 -0500)
Give websocketd clients a chance to complete any requests that are
in-process at time of shutdown.

Graceful shutdown is initiated by sending a SIGUSR1 signal to the
websocket-osrf child processes or to the parent process group.

This can be done directly via:

$ kill -s USR1 -$websocketd_parent_pid

Or via systemd for systemd users:

$ sudo systemctl kill -s USR1 websocketd-osrf

Note the websocketd parent process ignores SIGUSR1.

Once initiated, the websocketd backend goes into shutdown mode polling
for a graceful shutdown window, which occurs when all request up to now
have been completed.  Once that moment arrives, the client is kicked off
and must connect to a new websocketd instance to issue any new requests.

The polling period lasts for SHUTDOWN_MAX_GRACEFUL_SECONDS seconds (in
osrf-websocket-stdio.c), which currently defaults to 2 minutes.

Signed-off-by: Bill Erickson <berickxx@gmail.com>
Signed-off-by: Galen Charlton <gmc@equinoxinitiative.org>
src/websocket-stdio/osrf-websocket-stdio.c

index 96d62f1..0b83782 100644 (file)
 // ~100k
 #define RESET_MESSAGE_SIZE 102400
 
+// After receiving the initial shutdow call, wake the event loop every 
+// SHUTDOWN_POLL_INTERVAL_SECONDS to see if we can shut down.
+#define SHUTDOWN_POLL_INTERVAL_SECONDS 1
+
+// Attempt to gracefully disconnect the client until 
+// SHUTDOWN_MAX_GRACEFUL_SECONDS has passed without a shutdown
+// opportunity, at which point force-close the connection.
+#define SHUTDOWN_MAX_GRACEFUL_SECONDS 120
+
+// Incremented with every REQUEST, decremented with every COMPLETE.
+static int requests_in_flight = 0;
+
 // default values, replaced during setup (below) as needed.
 static char* config_file = "/openils/conf/opensrf_core.xml";
 static char* config_ctxt = "gateway";
@@ -91,18 +103,23 @@ static void read_from_osrf();
 static void read_one_osrf_message(transport_message*);
 static int shut_it_down(int);
 static void release_hash_string(char*, void*);
-
-// Websocketd sends SIGINT for shutdown, followed by SIGTERM
-// if SIGINT takes too long.
-static void sigint_handler(int sig) {
-    osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown");
-    shut_it_down(0);
+static int can_shutdown_gracefully();
+
+// Websocketd closes STDIN on shutdown, followed by SIGTERM.
+// Signal the back-ends it's time for graceful shutdown by 
+// sending a SIGUSER1 to the backend processes (or parent 
+// process group).  Websocket ignores SIGUSR1.
+static time_t shutdown_requested = 0; 
+static void sigusr1_handler(int sig) {
+    signal(SIGUSR1, sigusr1_handler);
+    osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 -- graceful shutdown");
+    shutdown_requested = time(NULL);
 }
 
 int main(int argc, char* argv[]) {
 
     // Handle shutdown signal -- only needed once.
-    signal(SIGINT, sigint_handler);
+    signal(SIGUSR1, sigusr1_handler);
 
     // Connect to OpenSR -- exits on error
     child_init(argc, argv);
@@ -119,6 +136,7 @@ int main(int argc, char* argv[]) {
     int osrf_no = osrf_handle->session->sock_id;
     int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
     int sel_resp;
+    int shutdown_stat;
 
     while (1) {
 
@@ -126,13 +144,27 @@ int main(int argc, char* argv[]) {
         FD_SET(osrf_no, &fds);
         FD_SET(stdin_no, &fds);
 
-        // Wait indefinitely for activity to process
-        sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+        if (shutdown_requested) {
+
+            struct timeval tv;
+            tv.tv_usec = 0;
+            tv.tv_sec = SHUTDOWN_POLL_INTERVAL_SECONDS;
+    
+            // Wait indefinitely for activity to process
+            sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv);
+
+        } else {
+
+            // Wait indefinitely for activity to process.
+            // This will be interrupted during a shutdown request signal.
+            sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+        }
 
         if (sel_resp < 0) { // error
 
             if (errno == EINTR) {
                 // Interrupted by a signal.  Start the loop over.
+                // Could be a SIGNUSR1 shutdown request.
                 continue;
             }
 
@@ -142,18 +174,57 @@ int main(int argc, char* argv[]) {
             shut_it_down(1);
         }
 
-        if (FD_ISSET(stdin_no, &fds)) {
-            read_from_stdin();
+        if (sel_resp > 0) {
+
+            if (FD_ISSET(stdin_no, &fds)) {
+                read_from_stdin();
+            }
+
+            if (FD_ISSET(osrf_no, &fds)) {
+                read_from_osrf();
+            }
         }
 
-        if (FD_ISSET(osrf_no, &fds)) {
-            read_from_osrf();
+        if (shutdown_requested) {
+            shutdown_stat = can_shutdown_gracefully();
+
+            if (shutdown_stat == 0) {
+                // continue graceful shutdown cycle
+                continue; 
+            }
+            
+            // graceful shutdown cycle has completed either successfully
+            // or via timeout.
+            return shut_it_down(shutdown_stat > 0 ? 0 : 1);
         }
     }
 
     return shut_it_down(0);
 }
 
+// Returns 1 if graceful shutdown is OK.
+// Returns 0 if graceful shutdown cycle should continue.
+// Returns -1 if the graceful shutdown cycle timed out.
+static int can_shutdown_gracefully() {
+
+    time_t cycle_time = time(NULL) - shutdown_requested;
+    if (cycle_time > SHUTDOWN_MAX_GRACEFUL_SECONDS) {
+        osrfLogWarning(OSRF_LOG_MARK, "Timeout during graceful shutdown");
+        return -1;
+    }
+
+    unsigned long active_sessions = osrfHashGetCount(stateful_session_cache);
+    if (active_sessions == 0 && requests_in_flight == 0) {
+        osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete");
+        return 1;
+    }
+
+    osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with " 
+        "sessions=%d requests=%d", active_sessions, requests_in_flight);
+
+    return 0;
+}
+
 static void rebuild_stdin_buffer() {
 
     if (stdin_buf != NULL) {
@@ -427,6 +498,7 @@ static char* extract_inbound_messages(
 
             case REQUEST:
                 log_request(service, msg);
+                requests_in_flight++;
                 break;
 
             case DISCONNECT:
@@ -568,6 +640,12 @@ static void read_one_osrf_message(transport_message* tmsg) {
                 // connection timed out; clear the cached recipient
                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
                     osrfHashRemove(stateful_session_cache, tmsg->thread);
+
+                } else {
+
+                    if (one_msg->status_code == OSRF_STATUS_COMPLETE) {
+                        requests_in_flight--;
+                    }
                 }
             }
         }