From d7e9df6838f1c9a72db3fd41556d178cfe7f6700 Mon Sep 17 00:00:00 2001 From: Galen Charlton Date: Wed, 12 Dec 2018 14:35:56 -0500 Subject: [PATCH 1/1] LP#1729610: extend backlog queue to C apps This patch extends the notion of a backlog queue to C apps and offers the same functionality as the Perl side of the patch series: - max_backlog_queue configuration setting - ability to queue messages up to the configured limit - ability to drop requests that would overflow the backlog queue and send status 503 exceptions back to the client. This patch also adds a new service, opensrf.cslow, that implements a opensrf.cslow.wait method similar to the Perl opensrf.slooooooow service. To test ------- [1] Set a low max_backlog_queue for opensrf.cslow and a low max_children. [2] Arrange for srfsh to fire off a bunch of opensrf.cslow.wait requests. [3] Verify that requests that come in after the backlog queue fills up immediately get 503 exceptions. Signed-off-by: Galen Charlton Signed-off-by: Bill Erickson Signed-off-by: Mike Rylander --- examples/opensrf.xml.example | 19 ++++ src/c-apps/Makefile.am | 6 +- src/c-apps/osrf_cslow.c | 58 +++++++++++++ src/libopensrf/osrf_prefork.c | 157 +++++++++++++++++++++++++--------- 4 files changed, 199 insertions(+), 41 deletions(-) create mode 100644 src/c-apps/osrf_cslow.c diff --git a/examples/opensrf.xml.example b/examples/opensrf.xml.example index 7157787..c4be049 100644 --- a/examples/opensrf.xml.example +++ b/examples/opensrf.xml.example @@ -165,6 +165,24 @@ vim:et:ts=2:sw=2: + + 3 + 1 + c + libosrf_cslow.so + + 1000 + opensrf.cslow_unix.log + opensrf.cslow_unix.sock + opensrf.cslow_unix.pid + 5 + 15 + 2 + 5 + 10 + + + 1 1 @@ -262,6 +280,7 @@ vim:et:ts=2:sw=2: opensrf.dbmath opensrf.validator opensrf.slooooooow + opensrf.cslow diff --git a/src/c-apps/Makefile.am b/src/c-apps/Makefile.am index 54c3cac..9138ff6 100644 --- a/src/c-apps/Makefile.am +++ b/src/c-apps/Makefile.am @@ -18,11 +18,15 @@ AM_LDFLAGS = $(DEF_LDFLAGS) -L@top_builddir@/src/libopensrf DISTCLEANFILES = Makefile.in Makefile noinst_PROGRAMS = timejson -lib_LTLIBRARIES = libosrf_dbmath.la libosrf_math.la libosrf_version.la +lib_LTLIBRARIES = libosrf_cslow.la libosrf_dbmath.la libosrf_math.la libosrf_version.la timejson_SOURCES = timejson.c timejson_LDADD = @top_builddir@/src/libopensrf/libopensrf.la +libosrf_cslow_la_SOURCES = osrf_cslow.c +libosrf_cslow_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2 +libosrf_cslow_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la + libosrf_dbmath_la_SOURCES = osrf_dbmath.c libosrf_dbmath_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2 libosrf_dbmath_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la diff --git a/src/c-apps/osrf_cslow.c b/src/c-apps/osrf_cslow.c new file mode 100644 index 0000000..108977a --- /dev/null +++ b/src/c-apps/osrf_cslow.c @@ -0,0 +1,58 @@ +#include +#include +#include +#include + +#define MODULENAME "opensrf.cslow" + +int osrfAppInitialize(); +int osrfAppChildInit(); +int osrfCSlowWait( osrfMethodContext* ); + + +int osrfAppInitialize() { + + osrfAppRegisterMethod( + MODULENAME, + "opensrf.cslow.wait", + "osrfCSlowWait", + "Wait specified number of seconds, then return that number", 1, 0 ); + + return 0; +} + +int osrfAppChildInit() { + return 0; +} + +int osrfCSlowWait( osrfMethodContext* ctx ) { + if( osrfMethodVerifyContext( ctx ) ) { + osrfLogError( OSRF_LOG_MARK, "Invalid method context" ); + return -1; + } + + const jsonObject* x = jsonObjectGetIndex(ctx->params, 0); + + if( x ) { + + char* a = jsonObjectToSimpleString(x); + + if( a ) { + + unsigned int pause = atoi(a); + sleep(pause); + + jsonObject* resp = jsonNewNumberObject(pause); + osrfAppRespondComplete( ctx, resp ); + jsonObjectFree(resp); + + free(a); + return 0; + } + } + + return -1; +} + + + diff --git a/src/libopensrf/osrf_prefork.c b/src/libopensrf/osrf_prefork.c index f02f635..a9f7c42 100644 --- a/src/libopensrf/osrf_prefork.c +++ b/src/libopensrf/osrf_prefork.c @@ -48,6 +48,7 @@ typedef struct { int max_requests; /**< How many requests a child processes before terminating. */ int min_children; /**< Minimum number of children to maintain. */ int max_children; /**< Maximum number of children to maintain. */ + int max_backlog_queue; /**< Maximum size of backlog queue. */ int fd; /**< Unused. */ int data_to_child; /**< Unused. */ int data_to_parent; /**< Unused. */ @@ -86,7 +87,7 @@ typedef struct prefork_child_struct prefork_child; static volatile sig_atomic_t child_dead; static int prefork_simple_init( prefork_simple* prefork, transport_client* client, - int max_requests, int min_children, int max_children ); + int max_requests, int min_children, int max_children, int max_backlog_queue ); static prefork_child* launch_child( prefork_simple* forker ); static void prefork_launch_children( prefork_simple* forker ); static void prefork_run( prefork_simple* forker ); @@ -137,6 +138,7 @@ int osrf_prefork_run( const char* appname ) { int maxr = 1000; int maxc = 10; + int maxbq = 1000; int minc = 3; int kalive = 5; @@ -146,6 +148,7 @@ int osrf_prefork_run( const char* appname ) { char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname ); char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname ); char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname ); + char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname ); char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname ); if( !keepalive ) @@ -168,10 +171,16 @@ int osrf_prefork_run( const char* appname ) { else maxc = atoi( max_children ); + if( !max_backlog_queue ) + osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq ); + else + maxbq = atoi( max_backlog_queue ); + free( keepalive ); free( max_req ); free( min_children ); free( max_children ); + free( max_backlog_queue ); /* --------------------------------------------------- */ char* resc = va_list_to_string( "%s_listener", appname ); @@ -187,7 +196,7 @@ int osrf_prefork_run( const char* appname ) { prefork_simple forker; - if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) { + if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) { osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object" ); return -1; @@ -522,10 +531,11 @@ static int prefork_child_process_request( prefork_child* child, char* data ) { before terminating. @param min_children Minimum number of child processes to maintain. @param max_children Maximum number of child processes to maintain. + @param max_backlog_queue Maximum size of backlog queue. @return 0 if successful, or 1 if not (due to invalid parameters). */ static int prefork_simple_init( prefork_simple* prefork, transport_client* client, - int max_requests, int min_children, int max_children ) { + int max_requests, int min_children, int max_children, int max_backlog_queue ) { if( min_children > max_children ) { osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater " @@ -546,6 +556,7 @@ static int prefork_simple_init( prefork_simple* prefork, transport_client* clien prefork->max_requests = max_requests; prefork->min_children = min_children; prefork->max_children = max_children; + prefork->max_backlog_queue = max_backlog_queue; prefork->fd = 0; prefork->data_to_child = 0; prefork->data_to_parent = 0; @@ -850,6 +861,16 @@ static void prefork_run( prefork_simple* forker ) { transport_message* cur_msg = NULL; + // The backlog queue accumulates messages received while there + // are not yet children available to process them. While the + // transport client maintains its own queue of messages, sweeping + // the transport client's queue in the backlog queue gives us the + // ability to set a limit on the size of the backlog queue (and + // then to drop messages once the backlog queue has filled up) + transport_message* backlog_queue_head = NULL; + transport_message* backlog_queue_tail = NULL; + int backlog_queue_size = 0; + while( 1 ) { if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */ @@ -857,45 +878,98 @@ static void prefork_run( prefork_simple* forker ) { return; } - // Wait indefinitely for an input message - osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." ); - cur_msg = client_recv( forker->connection, -1 ); + int received_from_network = 0; + if ( backlog_queue_size == 0 ) { + // Wait indefinitely for an input message + osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." ); + cur_msg = client_recv( forker->connection, -1 ); + received_from_network = 1; + } else { + // See if any messages are immediately available + cur_msg = client_recv( forker->connection, 0 ); + if ( cur_msg != NULL ) + received_from_network = 1; + } - if( cur_msg == NULL ) { - // most likely a signal was received. clean up any recently - // deceased children and try again. - if(child_dead) - reap_children(forker); - continue; - } + if (received_from_network) { + if( cur_msg == NULL ) { + // most likely a signal was received. clean up any recently + // deceased children and try again. + if(child_dead) + reap_children(forker); + continue; + } - if (cur_msg->error_type) { - osrfLogInfo(OSRF_LOG_MARK, - "Listener received an XMPP error message. " - "Likely a bounced message. sender=%s", cur_msg->sender); - if(child_dead) - reap_children(forker); - continue; - } + if (cur_msg->error_type) { + osrfLogInfo(OSRF_LOG_MARK, + "Listener received an XMPP error message. " + "Likely a bounced message. sender=%s", cur_msg->sender); + if(child_dead) + reap_children(forker); + continue; + } - message_prepare_xml( cur_msg ); - const char* msg_data = cur_msg->msg_xml; - if( ! msg_data || ! *msg_data ) { - osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %", - (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread ); - message_free( cur_msg ); - continue; // Message not usable; go on to the next one. + message_prepare_xml( cur_msg ); + const char* msg_data = cur_msg->msg_xml; + if( ! msg_data || ! *msg_data ) { + osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %", + (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread ); + message_free( cur_msg ); + continue; // Message not usable; go on to the next one. + } + + // stick message onto queue + cur_msg->next = NULL; + if (backlog_queue_size == 0) { + backlog_queue_head = cur_msg; + backlog_queue_tail = cur_msg; + } else { + if (backlog_queue_size >= forker->max_backlog_queue) { + osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping " + "latest message", + forker->max_backlog_queue ); + osrfMessage* err = osrf_message_init( STATUS, 1, 1 ); + osrf_message_set_status_info( err, "osrfMethodException", + "Service unavailable: no available children and backlog queue at limit", + OSRF_STATUS_SERVICEUNAVAILABLE ); + char *data = osrf_message_serialize( err ); + osrfMessageFree( err ); + transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient ); + message_set_osrf_xid(tresponse, cur_msg->osrf_xid); + free( data ); + transport_client* client = osrfSystemGetTransportClient(); + client_send_message( client, tresponse ); + message_free( tresponse ); + message_free(cur_msg); + continue; + } + backlog_queue_tail->next = cur_msg; + backlog_queue_tail = cur_msg; + osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." ); + } + backlog_queue_size++; } + if (backlog_queue_size == 0) { + // strictly speaking, this check may be redundant, but + // from this point forward we can be sure that the + // backlog queue has at least one message in it and + // that if we can find a child to process it, we want to + // process the head of that queue. + continue; + } + + cur_msg = backlog_queue_head; + int honored = 0; /* will be set to true when we service the request */ int no_recheck = 0; while( ! honored ) { - if( !no_recheck ) { - if(check_children( forker, 0 ) < 0) { + if( !no_recheck ) { + if(check_children( forker, 0 ) < 0) { continue; // check failed, try again - } + } } no_recheck = 0; @@ -924,6 +998,7 @@ static void prefork_run( prefork_simple* forker ) { osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d", cur_child->write_data_fd ); + const char* msg_data = cur_msg->msg_xml; int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 ); if( written < 0 ) { // This child appears to be dead or unusable. Discard it. @@ -958,6 +1033,7 @@ static void prefork_run( prefork_simple* forker ) { osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d", new_child->write_data_fd, new_child->pid ); + const char* msg_data = cur_msg->msg_xml; int written = write( new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 ); if( written < 0 ) { @@ -980,20 +1056,21 @@ static void prefork_run( prefork_simple* forker ) { } } - if( !honored ) { - osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." ); - if( check_children( forker, 1 ) >= 0 ) { - // Tell the loop not to call check_children again, since we just successfully called it - no_recheck = 1; - } - } - if( child_dead ) reap_children( forker ); + if( !honored ) { + break; + } + } // end while( ! honored ) - message_free( cur_msg ); + if ( honored ) { + backlog_queue_head = cur_msg->next; + backlog_queue_size--; + cur_msg->next = NULL; + message_free( cur_msg ); + } } /* end top level listen loop */ } -- 2.43.2