LP#1729610: extend backlog queue to C apps
[OpenSRF.git] / src / libopensrf / osrf_prefork.c
index f02f635..a9f7c42 100644 (file)
@@ -48,6 +48,7 @@ typedef struct {
        int max_requests;     /**< How many requests a child processes before terminating. */
        int min_children;     /**< Minimum number of children to maintain. */
        int max_children;     /**< Maximum number of children to maintain. */
+       int max_backlog_queue; /**< Maximum size of backlog queue. */
        int fd;               /**< Unused. */
        int data_to_child;    /**< Unused. */
        int data_to_parent;   /**< Unused. */
@@ -86,7 +87,7 @@ typedef struct prefork_child_struct prefork_child;
 static volatile sig_atomic_t child_dead;
 
 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
-       int max_requests, int min_children, int max_children );
+       int max_requests, int min_children, int max_children, int max_backlog_queue );
 static prefork_child* launch_child( prefork_simple* forker );
 static void prefork_launch_children( prefork_simple* forker );
 static void prefork_run( prefork_simple* forker );
@@ -137,6 +138,7 @@ int osrf_prefork_run( const char* appname ) {
 
        int maxr = 1000;
        int maxc = 10;
+       int maxbq = 1000;
        int minc = 3;
        int kalive = 5;
 
@@ -146,6 +148,7 @@ int osrf_prefork_run( const char* appname ) {
        char* max_req      = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
        char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
        char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
+       char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname );
        char* keepalive    = osrf_settings_host_value( "/apps/%s/keepalive", appname );
 
        if( !keepalive )
@@ -168,10 +171,16 @@ int osrf_prefork_run( const char* appname ) {
        else
                maxc = atoi( max_children );
 
+       if( !max_backlog_queue )
+               osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq );
+       else
+               maxbq = atoi( max_backlog_queue );
+
        free( keepalive );
        free( max_req );
        free( min_children );
        free( max_children );
+       free( max_backlog_queue );
        /* --------------------------------------------------- */
 
        char* resc = va_list_to_string( "%s_listener", appname );
@@ -187,7 +196,7 @@ int osrf_prefork_run( const char* appname ) {
 
        prefork_simple forker;
 
-       if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
+       if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) {
                osrfLogError( OSRF_LOG_MARK,
                        "osrf_prefork_run() failed to create prefork_simple object" );
                return -1;
@@ -522,10 +531,11 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
                        before terminating.
        @param min_children Minimum number of child processes to maintain.
        @param max_children Maximum number of child processes to maintain.
+       @param max_backlog_queue Maximum size of backlog queue.
        @return 0 if successful, or 1 if not (due to invalid parameters).
 */
 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
-               int max_requests, int min_children, int max_children ) {
+               int max_requests, int min_children, int max_children, int max_backlog_queue ) {
 
        if( min_children > max_children ) {
                osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
@@ -546,6 +556,7 @@ static int prefork_simple_init( prefork_simple* prefork, transport_client* clien
        prefork->max_requests = max_requests;
        prefork->min_children = min_children;
        prefork->max_children = max_children;
+       prefork->max_backlog_queue = max_backlog_queue;
        prefork->fd           = 0;
        prefork->data_to_child = 0;
        prefork->data_to_parent = 0;
@@ -850,6 +861,16 @@ static void prefork_run( prefork_simple* forker ) {
 
        transport_message* cur_msg = NULL;
 
+       // The backlog queue accumulates messages received while there
+       // are not yet children available to process them. While the
+       // transport client maintains its own queue of messages, sweeping
+       // the transport client's queue in the backlog queue gives us the
+       // ability to set a limit on the size of the backlog queue (and
+       // then to drop messages once the backlog queue has filled up)
+       transport_message* backlog_queue_head = NULL;
+       transport_message* backlog_queue_tail = NULL;
+       int backlog_queue_size = 0;
+
        while( 1 ) {
 
                if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
@@ -857,45 +878,98 @@ static void prefork_run( prefork_simple* forker ) {
                        return;
                }
 
-               // Wait indefinitely for an input message
-               osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
-               cur_msg = client_recv( forker->connection, -1 );
+               int received_from_network = 0;
+               if ( backlog_queue_size == 0 ) {
+                       // Wait indefinitely for an input message
+                       osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
+                       cur_msg = client_recv( forker->connection, -1 );
+                       received_from_network = 1;
+               } else {
+                       // See if any messages are immediately available
+                       cur_msg = client_recv( forker->connection, 0 );
+                       if ( cur_msg != NULL )
+                               received_from_network = 1;
+               }
 
-               if( cur_msg == NULL ) {
-                       // most likely a signal was received.  clean up any recently
-                       // deceased children and try again.
-                       if(child_dead)
-                               reap_children(forker);
-                       continue;
-        }
+               if (received_from_network) {
+                       if( cur_msg == NULL ) {
+                               // most likely a signal was received.  clean up any recently
+                               // deceased children and try again.
+                               if(child_dead)
+                                       reap_children(forker);
+                               continue;
+                       }
 
-        if (cur_msg->error_type) {
-            osrfLogInfo(OSRF_LOG_MARK, 
-                "Listener received an XMPP error message.  "
-                "Likely a bounced message. sender=%s", cur_msg->sender);
-            if(child_dead)
-                reap_children(forker);
-            continue;
-        }
+                       if (cur_msg->error_type) {
+                               osrfLogInfo(OSRF_LOG_MARK,
+                                       "Listener received an XMPP error message.  "
+                                       "Likely a bounced message. sender=%s", cur_msg->sender);
+                               if(child_dead)
+                                       reap_children(forker);
+                               continue;
+                       }
 
-               message_prepare_xml( cur_msg );
-               const char* msg_data = cur_msg->msg_xml;
-               if( ! msg_data || ! *msg_data ) {
-                       osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
-                               (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
-                       message_free( cur_msg );
-                       continue;       // Message not usable; go on to the next one.
+                       message_prepare_xml( cur_msg );
+                       const char* msg_data = cur_msg->msg_xml;
+                       if( ! msg_data || ! *msg_data ) {
+                               osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
+                                       (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
+                               message_free( cur_msg );
+                               continue;       // Message not usable; go on to the next one.
+                       }
+
+                       // stick message onto queue
+                       cur_msg->next = NULL;
+                       if (backlog_queue_size == 0) {
+                               backlog_queue_head = cur_msg;
+                               backlog_queue_tail = cur_msg;
+                       } else {
+                               if (backlog_queue_size >= forker->max_backlog_queue) {
+                                       osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping "
+                                               "latest message",
+                                               forker->max_backlog_queue );
+                                       osrfMessage* err = osrf_message_init( STATUS, 1, 1 );
+                                       osrf_message_set_status_info( err, "osrfMethodException",
+                                               "Service unavailable: no available children and backlog queue at limit",
+                                               OSRF_STATUS_SERVICEUNAVAILABLE );
+                                       char *data = osrf_message_serialize( err );
+                                       osrfMessageFree( err );
+                                       transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient );
+                                       message_set_osrf_xid(tresponse, cur_msg->osrf_xid);
+                                       free( data );
+                                       transport_client* client = osrfSystemGetTransportClient();
+                                       client_send_message( client, tresponse );
+                                       message_free( tresponse );
+                                       message_free(cur_msg);
+                                       continue;
+                               }
+                               backlog_queue_tail->next = cur_msg;
+                               backlog_queue_tail = cur_msg;
+                               osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." );
+                       }
+                       backlog_queue_size++;
                }
 
+               if (backlog_queue_size == 0) {
+                       // strictly speaking, this check may be redundant, but
+                       // from this point forward we can be sure that the
+                       // backlog queue has at least one message in it and
+                       // that if we can find a child to process it, we want to
+                       // process the head of that queue.
+                       continue;
+               }
+
+               cur_msg = backlog_queue_head;
+
                int honored = 0;     /* will be set to true when we service the request */
                int no_recheck = 0;
 
                while( ! honored ) {
 
-            if( !no_recheck ) {
-                if(check_children( forker, 0 ) < 0) {
+                       if( !no_recheck ) {
+                               if(check_children( forker, 0 ) < 0) {
                     continue; // check failed, try again
-                }
+                               }
             }
             no_recheck = 0;
 
@@ -924,6 +998,7 @@ static void prefork_run( prefork_simple* forker ) {
                                osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
                                        cur_child->write_data_fd );
 
+                               const char* msg_data = cur_msg->msg_xml;
                                int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
                                if( written < 0 ) {
                                        // This child appears to be dead or unusable.  Discard it.
@@ -958,6 +1033,7 @@ static void prefork_run( prefork_simple* forker ) {
                                                osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
                                                        new_child->write_data_fd, new_child->pid );
 
+                                               const char* msg_data = cur_msg->msg_xml;
                                                int written = write(
                                                        new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
                                                if( written < 0 ) {
@@ -980,20 +1056,21 @@ static void prefork_run( prefork_simple* forker ) {
                 }
             }
 
-                       if( !honored ) {
-                               osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
-                               if( check_children( forker, 1 ) >= 0 ) {
-                                   // Tell the loop not to call check_children again, since we just successfully called it
-                                   no_recheck = 1;
-                }
-                       }
-
                        if( child_dead )
                                reap_children( forker );
 
+                       if( !honored ) {
+                               break;
+                       }
+
                } // end while( ! honored )
 
-               message_free( cur_msg );
+               if ( honored ) {
+                       backlog_queue_head = cur_msg->next;
+                       backlog_queue_size--;
+                       cur_msg->next = NULL;
+                       message_free( cur_msg );
+               }
 
        } /* end top level listen loop */
 }