From 3a67336ea60eba9573dd94878faacd89a984b304 Mon Sep 17 00:00:00 2001 From: scottmk Date: Thu, 24 Jun 2010 16:56:45 +0000 Subject: [PATCH] Provide a mechanism whereby a server drone can terminate immediately, without waiting for max_requests or a DISCONNECT message. Motivation: a drone may determine that it is incapacitated and can neither complete the current request nor service subsequent ones. In particular, it may lose a database connection. Mechanism: set a switch in the current osrfAppSession. In osrf_prefork.c, inspect this switch after every method call, and bail out if it's set. M include/opensrf/osrf_app_session.h M src/libopensrf/osrf_app_session.c M src/libopensrf/osrf_prefork.c git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1969 9efc2488-bf62-4759-914b-345cdb29e865 --- include/opensrf/osrf_app_session.h | 6 +++++ src/libopensrf/osrf_app_session.c | 38 ++++++++++++++++++--------- src/libopensrf/osrf_prefork.c | 41 ++++++++++++++++++++++++------ 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/include/opensrf/osrf_app_session.h b/include/opensrf/osrf_app_session.h index 963c84b..36fc396 100644 --- a/include/opensrf/osrf_app_session.h +++ b/include/opensrf/osrf_app_session.h @@ -84,6 +84,10 @@ struct osrf_app_session_struct { /** Hash table of pending requests. */ osrfAppRequest* request_hash[ OSRF_REQUEST_HASH_SIZE ]; + + /** Boolean: true if the app wants to terminate the process. Typically this means that */ + /** a drone has lost its database connection and can therefore no longer function. */ + int panic; }; typedef struct osrf_app_session_struct osrfAppSession; @@ -146,6 +150,8 @@ int osrfAppSessionStatus( osrfAppSession* ses, int type, void osrfAppSessionCleanup( void ); +void osrfAppSessionPanic( osrfAppSession* ses ); + #ifdef __cplusplus } #endif diff --git a/src/libopensrf/osrf_app_session.c b/src/libopensrf/osrf_app_session.c index 226ad94..ca5d14e 100644 --- a/src/libopensrf/osrf_app_session.c +++ b/src/libopensrf/osrf_app_session.c @@ -240,9 +240,13 @@ void osrf_app_session_request_reset_timeout( osrfAppSession* session, int req_id becomes available before the end of the timeout; otherwise NULL; If there is already a message available in the input queue for this request, dequeue and - return it immediately. Otherwise wait up to timeout seconds until you either get an + return it immediately. Otherwise wait up to timeout seconds until you either get an input message for the specified request, run out of time, or encounter an error. + If the only message we receive for this request is a STATUS message with a status code + OSRF_STATUS_COMPLETE, then return NULL. That means that the server has nothing further + to send in response to this request. + You may also receive other messages for other requests, and other sessions. These other messages will be wholly or partially processed behind the scenes while you wait for the one you want. @@ -271,7 +275,7 @@ static osrfMessage* _osrf_app_request_recv( osrfAppRequest* req, int timeout ) { osrfLogDebug( OSRF_LOG_MARK, "In app_request receive with remaining time [%d]", (int) remaining ); - + osrf_app_session_queue_wait( req->session, 0, NULL ); if(req->session->transport_error) { osrfLogError(OSRF_LOG_MARK, "Transport error in recv()"); @@ -474,6 +478,7 @@ osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) { session->remote_service = strdup(remote_service); session->session_locale = NULL; session->transport_error = 0; + session->panic = 0; #ifdef ASSUME_STATELESS session->stateless = 1; @@ -813,6 +818,8 @@ int osrfAppSessionConnect( osrfAppSession* session ) { /* defaulting to protocol 1 for now */ osrfMessage* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 ); + + // Address this message to the router osrf_app_session_reset_remote( session ); session->state = OSRF_SESSION_CONNECTING; int ret = _osrf_app_session_send( session, con_msg ); @@ -999,12 +1006,12 @@ static int _osrf_app_session_send( osrfAppSession* session, osrfMessage* msg ){ message; process subsequent messages if they are available, but don't wait for them. The first parameter identifies an osrfApp session, but all we really use it for is to - get a pointer to the transport_session. Typically, if not always, a given process - opens only a single transport_session (to talk to the Jabber server), and all app - sessions in that process use the same transport_session. + get a pointer to the transport_session. Typically, a given process opens only a single + transport_session (to talk to the Jabber server), and all app sessions in that process + use the same transport_session. - Hence this function indiscriminately waits for input messages for all osrfAppSessions, - not just the one specified. + Hence this function indiscriminately waits for input messages for all osrfAppSessions + tied to the same Jabber session, not just the one specified. Dispatch each message to the appropriate processing routine, depending on its type and contents, and on whether we're acting as a client or as a server for that message. @@ -1059,7 +1066,7 @@ void osrfAppSessionFree( osrfAppSession* session ){ free(session->orig_remote_id); free(session->session_id); free(session->remote_service); - + // Free the request hash int i; for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i ) { @@ -1192,7 +1199,7 @@ int osrfAppSessionStatus( osrfAppSession* ses, int type, /** @brief Free the global session cache. - + Note that the osrfHash that implements the global session cache does @em not have a callback function installed for freeing its cargo. As a result, any remaining osrfAppSessions are leaked, along with all the osrfAppRequests and osrfMessages they @@ -1203,7 +1210,14 @@ void osrfAppSessionCleanup( void ) { osrfAppSessionCache = NULL; } +/** + @brief Arrange for immediate termination of the process. + @param ses Pointer to the current osrfAppSession. - - - + Typical use case: a server drone loses its database connection, thereby becoming useless. + It terminates so that it will not receive further requests, being unable to service them. +*/ +void osrfAppSessionPanic( osrfAppSession* ses ) { + if( ses ) + ses->panic = 1; +} diff --git a/src/libopensrf/osrf_prefork.c b/src/libopensrf/osrf_prefork.c index 560142f..69ccc07 100644 --- a/src/libopensrf/osrf_prefork.c +++ b/src/libopensrf/osrf_prefork.c @@ -93,7 +93,7 @@ static void add_prefork_child( prefork_simple* forker, prefork_child* child ); static void del_prefork_child( prefork_simple* forker, pid_t pid ); static void check_children( prefork_simple* forker, int forever ); -static void prefork_child_process_request(prefork_child*, char* data); +static int prefork_child_process_request(prefork_child*, char* data); static int prefork_child_init_hook(prefork_child*); static prefork_child* prefork_child_init( prefork_simple* forker, int read_data_fd, int write_data_fd, @@ -339,8 +339,10 @@ static int prefork_child_init_hook(prefork_child* child) { } // Called only by a child process -static void prefork_child_process_request(prefork_child* child, char* data) { - if( !child ) return; +// Non-zero return code means that the child process has decided to terminate immediately, +// without waiting for a DISCONNECT or max_requests. +static int prefork_child_process_request(prefork_child* child, char* data) { + if( !child ) return 0; transport_client* client = osrfSystemGetTransportClient(); @@ -359,11 +361,20 @@ static void prefork_child_process_request(prefork_child* child, char* data) { transport_message* msg = new_message_from_xml( data ); osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname); - if(!session) return; + if(!session) return 0; + + int rc = session->panic; + + if( rc ) { + osrfLogWarning( OSRF_LOG_MARK, + "Drone for session %s terminating immediately", session->session_id ); + osrfAppSessionFree( session ); + return rc; + } if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) { osrfAppSessionFree( session ); - return; + return rc; } osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id ); @@ -383,6 +394,9 @@ static void prefork_child_process_request(prefork_child* child, char* data) { osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd); + if( session->panic ) + rc = 1; + if(retval) { osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval); break; @@ -404,11 +418,15 @@ static void prefork_child_process_request(prefork_child* child, char* data) { break; } + + // If the child process has decided to terminate immediately + if( rc ) + break; } osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id ); osrfAppSessionFree( session ); - return; + return rc; } /** @@ -909,17 +927,24 @@ static void prefork_child_wait( prefork_child* child ) { break; } + int terminate_now = 0; // Boolean + if( n < 0 ) { osrfLogWarning( OSRF_LOG_MARK, "Prefork child read returned error with errno %d", errno ); break; } else if( gotdata ) { - osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing.."); - prefork_child_process_request(child, gbuf->buf); + osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." ); + terminate_now = prefork_child_process_request( child, gbuf->buf ); buffer_reset( gbuf ); } + if( terminate_now ) { + osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" ); + break; + } + if( i < child->max_requests - 1 ) { size_t msg_len = 9; ssize_t len = write( -- 2.43.2