added connection oriented statefull session handling to the server stack
authorerickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Mon, 15 May 2006 14:37:43 +0000 (14:37 +0000)
committererickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Mon, 15 May 2006 14:37:43 +0000 (14:37 +0000)
git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@715 9efc2488-bf62-4759-914b-345cdb29e865

src/c-apps/osrf_math.c
src/libstack/osrf_app_session.c
src/libstack/osrf_app_session.h
src/libstack/osrf_prefork.c
src/libstack/osrf_prefork.h
src/libstack/osrf_stack.c
src/srfsh/srfsh.c

index f307804..deb11f1 100644 (file)
@@ -72,6 +72,12 @@ int osrfMathRun( osrfMethodContext* ctx ) {
                        /* connect to db math */
                        osrfAppSession* ses = osrfAppSessionClientInit("opensrf.dbmath");
 
                        /* connect to db math */
                        osrfAppSession* ses = osrfAppSessionClientInit("opensrf.dbmath");
 
+                       /* forcing an explicit connect allows us to talk to one worker backend
+                        * regardless of "stateful" config settings for the server 
+                        * This buys us nothing here since we're only sending one request...
+                        * */
+                       /*osrfAppSessionConnect(ses);*/
+
                        /* dbmath uses the same method names that math does */
                        int req_id = osrfAppSessionMakeRequest( ses, newParams, ctx->method->name, 1, NULL );
                        osrfMessage* omsg = osrfAppSessionRequestRecv( ses, req_id, 60 );
                        /* dbmath uses the same method names that math does */
                        int req_id = osrfAppSessionMakeRequest( ses, newParams, ctx->method->name, 1, NULL );
                        osrfMessage* omsg = osrfAppSessionRequestRecv( ses, req_id, 60 );
index f7ed56c..2813c43 100644 (file)
@@ -401,6 +401,11 @@ int osrf_app_session_push_queue(
        return 0;
 }
 
        return 0;
 }
 
+int osrfAppSessionConnect( osrf_app_session* session ) { 
+       return osrf_app_session_connect(session);
+}
+
+
 /** Attempts to connect to the remote service */
 int osrf_app_session_connect(osrf_app_session* session){
        
 /** Attempts to connect to the remote service */
 int osrf_app_session_connect(osrf_app_session* session){
        
@@ -486,17 +491,22 @@ int osrfAppSessionSendBatch( osrfAppSession* session, osrf_message* msgs[], int
 
                osrf_app_session_queue_wait( session, 0 );
 
 
                osrf_app_session_queue_wait( session, 0 );
 
-               /* if we're not stateless and not connected and the first 
-                       message is not a connect message, then we do the connect first */
-               if(session->stateless) {
+               if(session->state != OSRF_SESSION_CONNECTED)  {
+
+                       if(session->stateless) { /* stateless session always send to the root listener */
                                osrf_app_session_reset_remote(session);
 
                                osrf_app_session_reset_remote(session);
 
-               } else {
+                       } else { 
+
+                               /* do an auto-connect if necessary */
+                               if( ! session->stateless &&
+                                       (msg->m_type != CONNECT) && 
+                                       (msg->m_type != DISCONNECT) &&
+                                       (session->state != OSRF_SESSION_CONNECTED) ) {
 
 
-                       if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) &&
-                               (session->state != OSRF_SESSION_CONNECTED) ) {
-                               if(!osrf_app_session_connect( session )) 
-                                       return 0;
+                                       if(!osrf_app_session_connect( session )) 
+                                               return 0;
+                               }
                        }
                }
        }
                        }
                }
        }
index 9e357b6..f503821 100644 (file)
@@ -144,6 +144,7 @@ int osrf_app_session_push_queue( osrf_app_session*, osrf_message* msg );
   * connection, 0 otherwise.
   */
 int osrf_app_session_connect( osrf_app_session* );
   * connection, 0 otherwise.
   */
 int osrf_app_session_connect( osrf_app_session* );
+int osrfAppSessionConnect( osrf_app_session* );
 
 /** Sends a disconnect message to the remote service.  No response is expected */
 int osrf_app_session_disconnect( osrf_app_session* );
 
 /** Sends a disconnect message to the remote service.  No response is expected */
 int osrf_app_session_disconnect( osrf_app_session* );
index f9761d2..1246236 100644 (file)
@@ -29,6 +29,18 @@ int osrf_prefork_run(char* appname) {
        jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
        jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
 
        jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
        jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
 
+       char* keepalive = osrf_settings_host_value("/apps/%s/keepalive", appname);
+       time_t kalive;
+       if( keepalive ) {
+               kalive = atoi(keepalive);
+               free(keepalive);
+       } else {
+               kalive = 5; /* give it a default */
+       }
+
+       osrfLogInfo(OSRF_LOG_MARK, "keepalive setting = %d seconds", kalive);
+
+
        
        if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming 1000");
        else maxr = (int) jsonObjectGetNumber(max_req);
        
        if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming 1000");
        else maxr = (int) jsonObjectGetNumber(max_req);
@@ -58,6 +70,7 @@ int osrf_prefork_run(char* appname) {
                osrfSystemGetTransportClient(), maxr, minc, maxc);
 
        forker->appname = strdup(appname);
                osrfSystemGetTransportClient(), maxr, minc, maxc);
 
        forker->appname = strdup(appname);
+       forker->keepalive       = kalive;
 
        if(forker == NULL) {
                osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object");
 
        if(forker == NULL) {
                osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object");
@@ -148,9 +161,46 @@ void prefork_child_process_request(prefork_child* child, char* data) {
                return;
        }
 
                return;
        }
 
-       /* keepalive loop for stateful sessions */
+       osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
+       int keepalive = child->keepalive;
+       int retval;
+       time_t start;
+       time_t end;
+
+       while(1) {
+
+               osrfLogDebug(OSRF_LOG_MARK, 
+                               "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
+               start           = time(NULL);
+               retval  = osrf_app_session_queue_wait(session, keepalive);
+               end             = time(NULL);
+
+               if(retval) {
+                       osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
+                       break;
+               }
+
+               /* see if the client disconnected from us */
+               if(session->state != OSRF_SESSION_CONNECTED) break;
+
+               /* see if the used up the timeout */
+               if( (end - start) >= keepalive ) {
+
+                       osrfLogDebug(OSRF_LOG_MARK, "Keepalive timed out, exiting connected session");
+
+                       osrfAppSessionStatus( 
+                                       session, 
+                                       OSRF_STATUS_TIMEOUT, 
+                                       "osrfConnectStatus", 
+                                       0, "Disconnected on timeout" );
 
 
-               osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
+                       break;
+               }
+       }
+
+       osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
+       osrfAppSessionFree( session );
+       return;
 }
 
 
 }
 
 
@@ -205,6 +255,7 @@ prefork_child*  launch_child( prefork_simple* forker ) {
                        data_fd[1], status_fd[0], status_fd[1] );
 
        child->appname = strdup(forker->appname);
                        data_fd[1], status_fd[0], status_fd[1] );
 
        child->appname = strdup(forker->appname);
+       child->keepalive = forker->keepalive;
 
 
        add_prefork_child( forker, child );
 
 
        add_prefork_child( forker, child );
index 474d2b0..0e14259 100644 (file)
@@ -37,6 +37,7 @@ struct prefork_simple_struct {
        int data_to_child;
        int data_to_parent;
        int current_num_children;
        int data_to_child;
        int data_to_parent;
        int current_num_children;
+       int keepalive; /* keepalive time for stateful sessions */
        char* appname;
        struct prefork_child_struct* first_child;
        transport_client* connection;
        char* appname;
        struct prefork_child_struct* first_child;
        transport_client* connection;
@@ -53,6 +54,7 @@ struct prefork_child_struct {
        int available;
        int max_requests;
        char* appname;
        int available;
        int max_requests;
        char* appname;
+       int keepalive;
        struct prefork_child_struct* next;
 };
 
        struct prefork_child_struct* next;
 };
 
index 1c2e4c8..83de64a 100644 (file)
@@ -200,11 +200,14 @@ osrf_message* _do_server( osrf_app_session* session, osrf_message* msg ) {
 
                case DISCONNECT:
                                /* session will be freed by the forker */
 
                case DISCONNECT:
                                /* session will be freed by the forker */
+                               osrfLogDebug(OSRF_LOG_MARK, "Client sent explicit disconnect");
+                               session->state = OSRF_SESSION_DISCONNECTED;
                                return NULL;
 
                case CONNECT:
                                osrfAppSessionStatus( session, OSRF_STATUS_OK, 
                                                "osrfConnectStatus", msg->thread_trace, "Connection Successful" );
                                return NULL;
 
                case CONNECT:
                                osrfAppSessionStatus( session, OSRF_STATUS_OK, 
                                                "osrfConnectStatus", msg->thread_trace, "Connection Successful" );
+                               session->state = OSRF_SESSION_CONNECTED;
                                return NULL;
 
                case REQUEST:
                                return NULL;
 
                case REQUEST:
@@ -215,6 +218,7 @@ osrf_message* _do_server( osrf_app_session* session, osrf_message* msg ) {
 
                default:
                        osrfLogWarning( OSRF_LOG_MARK, "Server cannot handle message of type %d", msg->m_type );
 
                default:
                        osrfLogWarning( OSRF_LOG_MARK, "Server cannot handle message of type %d", msg->m_type );
+                       session->state = OSRF_SESSION_DISCONNECTED;
                        return NULL;
 
        }
                        return NULL;
 
        }
index b26f9b3..52b6918 100644 (file)
@@ -800,6 +800,7 @@ int handle_math( char* words[] ) {
 int do_math( int count, int style ) {
 
        osrf_app_session* session = osrf_app_client_session_init(  "opensrf.math" );
 int do_math( int count, int style ) {
 
        osrf_app_session* session = osrf_app_client_session_init(  "opensrf.math" );
+       osrf_app_session_connect(session);
 
        jsonObject* params = json_parse_string("[]");
        jsonObjectPush(params,jsonNewObject("1"));
 
        jsonObject* params = json_parse_string("[]");
        jsonObjectPush(params,jsonNewObject("1"));