adding utility methods
authorerickson <erickson@dcc99617-32d9-48b4-a31d-7c20da2025e4>
Fri, 26 Aug 2005 20:03:31 +0000 (20:03 +0000)
committererickson <erickson@dcc99617-32d9-48b4-a31d-7c20da2025e4>
Fri, 26 Aug 2005 20:03:31 +0000 (20:03 +0000)
adding the preforking for server apps fork code

git-svn-id: svn://svn.open-ils.org/ILS/trunk@1753 dcc99617-32d9-48b4-a31d-7c20da2025e4

13 files changed:
OpenSRF/src/libstack/Makefile
OpenSRF/src/libstack/osrf_app_session.c
OpenSRF/src/libstack/osrf_app_session.h
OpenSRF/src/libstack/osrf_prefork.c [new file with mode: 0644]
OpenSRF/src/libstack/osrf_prefork.h [new file with mode: 0644]
OpenSRF/src/libstack/osrf_stack.c
OpenSRF/src/libstack/osrf_stack.h
OpenSRF/src/libstack/osrf_system.c
OpenSRF/src/libstack/osrf_system.h
OpenSRF/src/utils/logging.c
OpenSRF/src/utils/socket_bundle.c
OpenSRF/src/utils/utils.c
OpenSRF/src/utils/utils.h

index 25b4fee..d86a348 100644 (file)
@@ -1,11 +1,11 @@
 CC_OPTS                +=  -DASSUME_STATELESS 
 LD_OPTS += -lxml2 -lopensrf_transport  -lopensrf_stack -lobjson -lc_utils
 
-SOURCES = osrf_message.c osrf_app_session.c osrf_stack.c osrf_system.c osrf_config.c osrf_settings.c
-TARGETS = osrf_message.o osrf_app_session.o osrf_stack.o osrf_system.o osrf_config.o osrf_settings.o
-HEADERS = osrf_message.h osrf_app_session.h osrf_stack.h osrf_system.h osrf_config.h osrf_settings.h
+SOURCES = osrf_message.c osrf_app_session.c osrf_stack.c osrf_system.c osrf_config.c osrf_settings.c osrf_prefork.c
+TARGETS = osrf_message.o osrf_app_session.o osrf_stack.o osrf_system.o osrf_config.o osrf_settings.o osrf_prefork.o
+HEADERS = osrf_message.h osrf_app_session.h osrf_stack.h osrf_system.h osrf_config.h osrf_settings.h osrf_prefork.h
 
-all: msg libopensrf_stack.so  
+all: msg libopensrf_stack.so  test
 
 msg: 
        echo "-> $$(pwd)"       
@@ -51,6 +51,10 @@ osrf_config.o:       osrf_config.c osrf_config.h
 osrf_settings.o:       osrf_settings.c osrf_settings.h
        echo $@; $(CC) -c $(CC_OPTS) osrf_settings.c -o $@
 
+osrf_prefork.o:        osrf_prefork.c osrf_prefork.h
+       echo $@; $(CC) -c $(CC_OPTS) osrf_prefork.c -o $@
+
+
 install:
        echo installing libopensrf_stack.so
        cp $(HEADERS) $(INCLUDEDIR)/$(OPENSRF)
index 3648e40..0421878 100644 (file)
@@ -247,8 +247,8 @@ osrf_app_session* osrf_app_client_session_init( char* remote_service ) {
 
        char target_buf[512];
        memset(target_buf,0,512);
-       char* domain    = config_value( "opensrf.bootstrap", "//%s/domains/domain1", osrf_config_context ); /* just the first for now */
-       char* router_name = config_value( "opensrf.bootstrap", "//%s/router_name", osrf_config_context );
+       char* domain    = config_value( "opensrf.bootstrap", "//%s/domains/domain1", osrf_get_config_context() ); /* just the first for now */
+       char* router_name = config_value( "opensrf.bootstrap", "//%s/router_name", osrf_get_config_context() );
        sprintf( target_buf, "%s@%s/%s",  router_name, domain, remote_service );
        free(domain);
        free(router_name);
@@ -284,7 +284,7 @@ osrf_app_session* osrf_app_client_session_init( char* remote_service ) {
 }
 
 osrf_app_session* osrf_app_server_session_init( 
-               char* session_id, char* our_app, char* remote_service, char* remote_id ) {
+               char* session_id, char* our_app, char* remote_id ) {
 
        osrf_app_session* session = osrf_app_session_find_session( session_id );
        if(session)
@@ -302,7 +302,7 @@ osrf_app_session* osrf_app_server_session_init(
        session->remote_id = strdup(remote_id);
        session->orig_remote_id = strdup(remote_id);
        session->session_id = strdup(session_id);
-       session->remote_service = strdup(remote_service);
+       session->remote_service = strdup(our_app);
 
        #ifdef ASSUME_STATELESS
        session->stateless = 1;
index 01b6dc5..bdb1644 100644 (file)
@@ -54,7 +54,8 @@ struct osrf_app_session_struct {
        /** The current remote id of the remote service we're talking to */
        char* remote_id;
 
-       /** Who we're talking to */
+       /** Who we're talking to if we're a client.  
+               what app we're serving if we're a server */
        char* remote_service;
 
        /** The current request thread_trace */
@@ -90,7 +91,7 @@ osrf_app_session* osrf_app_client_session_init( char* remote_service );
   * is checked to see if this session already exists, if so, it's returned 
   */
 osrf_app_session* osrf_app_server_session_init( 
-               char* session_id, char* our_app, char* remote_service, char* remote_id );
+               char* session_id, char* our_app, char* remote_id );
 
 /** returns a session from the global session hash */
 osrf_app_session* osrf_app_session_find_session( char* session_id );
diff --git a/OpenSRF/src/libstack/osrf_prefork.c b/OpenSRF/src/libstack/osrf_prefork.c
new file mode 100644 (file)
index 0000000..1d829ad
--- /dev/null
@@ -0,0 +1,581 @@
+#include "osrf_prefork.h"
+#include <signal.h>
+
+/* true if we just deleted a child.  This will allow us to make sure we're
+       not trying to use freed memory */
+int child_dead;
+
+int main();
+void sigchld_handler( int sig );
+
+int osrf_prefork_run(char* appname) {
+
+       if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
+
+       int maxr = 1000; 
+       int maxc = 10;
+       int minc = 3;
+
+       info_handler("Loading config in osrf_forker for app %s", appname);
+
+       object* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
+       object* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
+       object* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
+
+       if(!max_req) warning_handler("Max requests not defined, assuming 1000");
+       else maxr = max_req->num_value;
+
+       if(!min_children) warning_handler("Min children not defined, assuming 3");
+       else minc = min_children->num_value;
+
+       if(!max_children) warning_handler("Max children not defined, assuming 10");
+       else maxc = max_children->num_value;
+
+       free_object(max_req);
+       free_object(min_children);
+       free_object(max_children);
+       /* --------------------------------------------------- */
+
+       char* resc = va_list_to_string("%s_listener", appname);
+
+       if(!osrf_system_bootstrap_client_resc(
+               osrf_get_bootstrap_config(), osrf_get_config_context(), resc)) 
+               fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
+       free(resc);
+
+       prefork_simple* forker = prefork_simple_init(
+               osrf_system_get_transport_client(), maxr, minc, maxc);
+
+       forker->appname = strdup(appname);
+
+       if(forker == NULL)
+               fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
+
+       prefork_launch_children(forker);
+       
+       info_handler("Launching osrf_forker for app %s", appname);
+       prefork_run(forker);
+       
+       warning_handler("prefork_run() retuned - how??");
+       prefork_free(forker);
+       return 0;
+
+}
+
+void osrf_prefork_register_routers() {
+       //char* router = osrf_config_value("//%s/
+}
+
+void prefork_child_init_hook(prefork_child* child) {
+
+       if(!child) return;
+       info_handler("Child init hook for child %d", child->pid);
+       char* resc = va_list_to_string("%s_drone",child->appname);
+       if(!osrf_system_bootstrap_client_resc(
+               osrf_get_bootstrap_config(), osrf_get_config_context(), resc)) 
+               fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
+       free(resc);
+}
+
+void prefork_child_process_request(prefork_child* child, char* data) {
+       if(!child && child->connection) return;
+
+       /* construct the message from the xml */
+       debug_handler("Child %d received data from parent:\n%s\n", child->pid, data);
+       transport_message* msg = new_message_from_xml( data );
+
+       osrf_stack_transport_handler(msg, child->appname);
+
+       /*
+       transport_message* ret_msg = message_init(
+                       msg->body, msg->subject, msg->thread, msg->sender, NULL );
+
+       client_send_message(child->connection, ret_msg);
+       message_free( ret_msg );
+
+       printf("Message body size %d\n", strlen(msg->body));
+
+       printf( "Message Info\n" );
+       printf( "%s\n", msg->sender );
+       printf( "%s\n", msg->recipient );
+       printf( "%s\n", msg->thread );
+       printf( "%s\n", msg->body );
+       printf( "%s\n", msg->subject );
+       printf( "%s\n", msg->router_from );
+       printf( "%d\n", msg->broadcast );
+
+       message_free( msg );
+       */
+}
+
+
+prefork_simple*  prefork_simple_init( transport_client* client, 
+               int max_requests, int min_children, int max_children ) {
+
+       if( min_children > max_children )
+               fatal_handler( "min_children (%d) is greater "
+                               "than max_children (%d)", min_children, max_children );
+
+       if( max_children > ABS_MAX_CHILDREN )
+               fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
+                               max_children, ABS_MAX_CHILDREN );
+
+       /* flesh out the struct */
+       prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
+       prefork->max_requests = max_requests;
+       prefork->min_children = min_children;
+       prefork->max_children = max_children;
+       prefork->first_child = NULL;
+       prefork->connection = client;
+
+       return prefork;
+}
+
+prefork_child*  launch_child( prefork_simple* forker ) {
+
+       pid_t pid;
+       int data_fd[2];
+       int status_fd[2];
+
+       /* Set up the data pipes and add the child struct to the parent */
+       if( pipe(data_fd) < 0 ) /* build the data pipe*/
+               fatal_handler( "Pipe making error" );
+
+       if( pipe(status_fd) < 0 ) /* build the status pipe */
+               fatal_handler( "Pipe making error" );
+
+       debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
+       prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
+                       data_fd[1], status_fd[0], status_fd[1] );
+
+       child->appname = strdup(forker->appname);
+
+
+       add_prefork_child( forker, child );
+
+       if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
+
+       if( pid > 0 ) {  /* parent */
+
+               signal(SIGCHLD, sigchld_handler);
+               (forker->current_num_children)++;
+               child->pid = pid;
+
+               info_handler( "Parent launched %d", pid );
+               /* *no* child pipe FD's can be closed or the parent will re-use fd's that
+                       the children are currently using */
+               return child;
+       }
+
+       else { /* child */
+
+               debug_handler("I am  new child with read_data_fd = %d and write_status_fd = %d",
+                       child->read_data_fd, child->write_status_fd );
+
+               child->pid = getpid();
+               close( child->write_data_fd );
+               close( child->read_status_fd );
+
+               /* do the initing */
+               prefork_child_init_hook(child);
+
+               prefork_child_wait( child );
+               exit(0); /* just to be sure */
+        }
+       return NULL;
+}
+
+
+void prefork_launch_children( prefork_simple* forker ) {
+       if(!forker) return;
+       int c = 0;
+       while( c++ < forker->min_children )
+               launch_child( forker );
+}
+
+
+void sigchld_handler( int sig ) {
+       signal(SIGCHLD, sigchld_handler);
+       child_dead = 1;
+}
+
+
+void reap_children( prefork_simple* forker ) {
+
+       pid_t child_pid;
+       int status;
+
+       while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
+               del_prefork_child( forker, child_pid ); 
+
+       /* replenish */
+       while( forker->current_num_children < forker->min_children ) 
+               launch_child( forker );
+
+       child_dead = 0;
+}
+
+void prefork_run(prefork_simple* forker) {
+
+       if( forker->first_child == NULL )
+               return;
+
+       transport_message* cur_msg = NULL;
+
+       while(1) {
+
+               if( forker->first_child == NULL ) {/* no more children */
+                       warning_handler("No more children..." );
+                       return;
+               }
+
+               debug_handler("Forker going into wait for data...");
+               sleep(2);
+               cur_msg = client_recv( forker->connection, -1 );
+
+               fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
+
+               if( cur_msg == NULL ) continue;
+               
+               int honored = 0;        /* true if we've serviced the request */
+
+               while( ! honored ) {
+
+                       check_children( forker ); 
+
+                       debug_handler( "Server received inbound data" );
+                       int k;
+                       prefork_child* cur_child = forker->first_child;
+
+                       /* Look for an available child */
+                       for( k = 0; k < forker->current_num_children; k++ ) {
+
+                               debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
+                               debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
+                       
+                               if( cur_child->available ) {
+                                       debug_handler( "sending data to %d", cur_child->pid );
+                                       message_prepare_xml( cur_msg );
+                                       char* data = cur_msg->msg_xml;
+                                       if( ! data || strlen(data) < 1 ) break;
+                                       cur_child->available = 0;
+                                       debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
+
+                                       int written = 0;
+                                       fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
+                                       if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
+                                               warning_handler("Write returned error %d", errno);
+                                               cur_child = cur_child->next;
+                                               continue;
+                                       }
+
+                                       fprintf(stderr, "Wrote %d bytes to child\n", written);
+
+                                       forker->first_child = cur_child->next;
+                                       honored = 1;
+                                       break;
+                               } else 
+                                       cur_child = cur_child->next;
+                       } 
+
+                       /* if none available, add a new child if we can */
+                       if( ! honored ) {
+                               debug_handler("Not enough children, attempting to add...");
+                               if( forker->current_num_children < forker->max_children ) {
+                                       debug_handler( "Launching new child with current_num = %d",
+                                                       forker->current_num_children );
+
+                                       prefork_child* new_child = launch_child( forker );
+                                       message_prepare_xml( cur_msg );
+                                       char* data = cur_msg->msg_xml;
+                                       if( ! data || strlen(data) < 1 ) break;
+                                       new_child->available = 0;
+                                       debug_handler( "sending data to %d", new_child->pid );
+                                       debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
+                                       write( new_child->write_data_fd, data, strlen(data) + 1 );
+                                       forker->first_child = new_child->next;
+                                       honored = 1;
+                               }
+                       }
+
+                       if( !honored ) {
+                               warning_handler( "No children available, sleeping and looping..." );
+                               usleep( 50000 ); /* 50 milliseconds */
+                       }
+
+                       if( child_dead )
+                               reap_children(forker);
+
+
+                       fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
+
+               } // honored?
+
+       } /* top level listen loop */
+
+}
+
+
+void check_children( prefork_simple* forker ) {
+
+       //check_begin:
+
+       int select_ret;
+       fd_set read_set;
+       FD_ZERO(&read_set);
+       int max_fd = 0;
+       int n;
+
+       struct timeval tv;
+       tv.tv_sec       = 0;
+       tv.tv_usec      = 0;
+
+       if( child_dead )
+               reap_children(forker);
+
+       prefork_child* cur_child = forker->first_child;
+
+       int i;
+       for( i = 0; i!= forker->current_num_children; i++ ) {
+
+               if( cur_child->read_status_fd > max_fd )
+                       max_fd = cur_child->read_status_fd;
+               FD_SET( cur_child->read_status_fd, &read_set );
+               cur_child = cur_child->next;
+       }
+
+       FD_CLR(0,&read_set);/* just to be sure */
+
+       if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
+               warning_handler( "Select returned error %d on check_children", errno );
+       }
+
+       if( select_ret == 0 )
+               return;
+
+       /* see if one of a child has told us it's done */
+       cur_child = forker->first_child;
+       int j;
+       int num_handled = 0;
+       for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
+
+               if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
+                       printf( "Server received status from a child %d\n", cur_child->pid );
+                       debug_handler( "Server received status from a child %d", cur_child->pid );
+
+                       num_handled++;
+
+                       /* now suck off the data */
+                       char buf[64];
+                       memset( buf, 0, 64);
+                       if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
+                               warning_handler("Read error afer select in child status read with errno %d", errno);
+                       }
+
+                       debug_handler( "Read %d bytes from status buffer: %s", n, buf );
+                       cur_child->available = 1;
+               }
+               cur_child = cur_child->next;
+       } 
+
+}
+
+
+void prefork_child_wait( prefork_child* child ) {
+
+       int i,n;
+       growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
+       char buf[READ_BUFSIZE];
+       memset( buf, 0, READ_BUFSIZE );
+
+       for( i = 0; i!= child->max_requests; i++ ) {
+
+               n = -1;
+               clr_fl(child->read_data_fd, O_NONBLOCK );
+               while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
+                       buffer_add( gbuf, buf );
+                       memset( buf, 0, READ_BUFSIZE );
+
+                       fprintf(stderr, "Child read %d bytes\n", n);
+
+                       if( n == READ_BUFSIZE ) { 
+                               fprintf(stderr, "We read READ_BUFSIZE data....\n");
+                               /* XXX */
+                               /* either we have exactly READ_BUFSIZE data, 
+                                       or there's more waiting that we need to grab*/
+                               /* must set to non-block for reading more */
+                       } else {
+                               fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
+                               prefork_child_process_request(child, gbuf->buf);
+                               buffer_reset( gbuf );
+                               break;
+                       }
+               }
+
+               if( n < 0 ) {
+                       warning_handler( "Child read returned error with errno %d", errno );
+                       break;
+               }
+
+               if( i < child->max_requests - 1 ) 
+                       write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
+       }
+
+       buffer_free(gbuf);
+
+       debug_handler("Child exiting...[%d]", getpid() );
+
+       exit(0);
+}
+
+
+void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
+       
+       if( forker->first_child == NULL ) {
+               forker->first_child = child;
+               child->next = child;
+               return;
+       }
+
+       /* we put the child in as the last because, regardless, 
+               we have to do the DLL splice dance, and this is the
+          simplest way */
+
+       prefork_child* start_child = forker->first_child;
+       while(1) {
+               if( forker->first_child->next == start_child ) 
+                       break;
+               forker->first_child = forker->first_child->next;
+       }
+
+       /* here we know that forker->first_child is the last element 
+               in the list and start_child is the first.  Insert the
+               new child between them*/
+
+       forker->first_child->next = child;
+       child->next = start_child;
+       return;
+}
+
+prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
+
+       if( forker->first_child == NULL ) { return NULL; }
+       prefork_child* start_child = forker->first_child;
+       do {
+               if( forker->first_child->pid == pid ) 
+                       return forker->first_child;
+       } while( (forker->first_child = forker->first_child->next) != start_child );
+
+       return NULL;
+}
+
+
+void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
+
+       if( forker->first_child == NULL ) { return; }
+
+       (forker->current_num_children)--;
+       debug_handler("Deleting Child: %d", pid );
+
+       prefork_child* start_child = forker->first_child; /* starting point */
+       prefork_child* cur_child        = start_child; /* current pointer */
+       prefork_child* prev_child       = start_child; /* the trailing pointer */
+
+       /* special case where there is only one in the list */
+       if( start_child == start_child->next ) {
+               if( start_child->pid == pid ) {
+                       forker->first_child = NULL;
+
+                       close( start_child->read_data_fd );
+                       close( start_child->write_data_fd );
+                       close( start_child->read_status_fd );
+                       close( start_child->write_status_fd );
+
+                       prefork_child_free( start_child );
+               }
+               return;
+       }
+
+
+       /* special case where the first item in the list needs to be removed */
+       if( start_child->pid == pid ) { 
+
+               /* find the last one so we can remove the start_child */
+               do { 
+                       prev_child = cur_child;
+                       cur_child = cur_child->next;
+               }while( cur_child != start_child );
+
+               /* now cur_child == start_child */
+               prev_child->next = cur_child->next;
+               forker->first_child = prev_child;
+
+               close( cur_child->read_data_fd );
+               close( cur_child->write_data_fd );
+               close( cur_child->read_status_fd );
+               close( cur_child->write_status_fd );
+
+               prefork_child_free( cur_child );
+               return;
+       } 
+
+       do {
+               prev_child = cur_child;
+               cur_child = cur_child->next;
+
+               if( cur_child->pid == pid ) {
+                       prev_child->next = cur_child->next;
+
+                       close( cur_child->read_data_fd );
+                       close( cur_child->write_data_fd );
+                       close( cur_child->read_status_fd );
+                       close( cur_child->write_status_fd );
+
+                       prefork_child_free( cur_child );
+                       return;
+               }
+
+       } while(cur_child != start_child);
+}
+
+
+
+
+prefork_child* prefork_child_init( 
+       int max_requests, int read_data_fd, int write_data_fd, 
+       int read_status_fd, int write_status_fd ) {
+
+       prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
+       child->max_requests             = max_requests;
+       child->read_data_fd             = read_data_fd;
+       child->write_data_fd            = write_data_fd;
+       child->read_status_fd   = read_status_fd;
+       child->write_status_fd  = write_status_fd;
+       child->available                        = 1;
+
+       return child;
+}
+
+
+int prefork_free( prefork_simple* prefork ) {
+       
+       while( prefork->first_child != NULL ) {
+               info_handler( "Killing children and sleeping 1 to reap..." );
+               kill( 0,        SIGKILL );
+               sleep(1);
+       }
+
+       client_free(prefork->connection);
+       free(prefork->appname);
+       free( prefork );
+       return 1;
+}
+
+int prefork_child_free( prefork_child* child ) { 
+       free(child->appname);
+       close(child->read_data_fd);
+       close(child->write_status_fd);
+       free( child ); 
+       return 1;
+}
+
diff --git a/OpenSRF/src/libstack/osrf_prefork.h b/OpenSRF/src/libstack/osrf_prefork.h
new file mode 100644 (file)
index 0000000..8c1725f
--- /dev/null
@@ -0,0 +1,89 @@
+#include <sys/types.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/select.h>
+#include <sys/wait.h>
+
+#include "utils.h"
+#include "opensrf/transport_message.h"
+#include "osrf_stack.h"
+#include "osrf_settings.h"
+
+#define READ_BUFSIZE 4096
+#define MAX_BUFSIZE 10485760 /* 10M enough? ;) */
+#define ABS_MAX_CHILDREN 256 
+
+/* we receive data.  we find the next child in
+       line that is available.  pass the data down that childs pipe and go
+       back to listening for more data.
+       when we receive SIGCHLD, we check for any dead children and clean up
+       their respective prefork_child objects, close pipes, etc.
+
+       we build a select fd_set with all the child pipes (going to the parent) 
+       when a child is done processing a request, it writes a small chunk of 
+       data to the parent to alert the parent that the child is again available 
+       */
+
+struct prefork_simple_struct {
+       int max_requests;
+       int min_children;
+       int max_children;
+       int fd;
+       int data_to_child;
+       int data_to_parent;
+       int current_num_children;
+       char* appname;
+       struct prefork_child_struct* first_child;
+       transport_client* connection;
+};
+typedef struct prefork_simple_struct prefork_simple;
+
+struct prefork_child_struct {
+       pid_t pid;
+       int read_data_fd;
+       int write_data_fd;
+       int read_status_fd;
+       int write_status_fd;
+       int min_children;
+       int available;
+       int max_requests;
+       char* appname;
+       struct prefork_child_struct* next;
+       transport_client* connection;
+};
+
+typedef struct prefork_child_struct prefork_child;
+
+int osrf_prefork_run(char* appname);
+
+prefork_simple*  prefork_simple_init( transport_client* client, 
+       int max_requests, int min_children, int max_children );
+
+prefork_child*  launch_child( prefork_simple* forker );
+void prefork_launch_children( prefork_simple* forker );
+
+void prefork_run(prefork_simple* forker);
+
+void add_prefork_child( prefork_simple* forker, prefork_child* child );
+prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid );
+void del_prefork_child( prefork_simple* forker, pid_t pid );
+
+void check_children( prefork_simple* forker );
+
+void prefork_child_process_request(prefork_child*, char* data);
+void prefork_child_init_hook(prefork_child*);
+
+prefork_child* prefork_child_init( 
+               int max_requests, int read_data_fd, int write_data_fd, 
+               int read_status_fd, int write_status_fd );
+
+/* listens on the 'data_to_child' fd and wait for incoming data */
+void prefork_child_wait( prefork_child* child );
+
+int prefork_free( prefork_simple* );
+int prefork_child_free( prefork_child* );
+
+
index c11cfc6..23cbf4c 100644 (file)
@@ -10,13 +10,13 @@ int osrf_stack_process( transport_client* client, int timeout ) {
        transport_message* msg = client_recv( client, timeout );
        if(msg == NULL) return 0;
        debug_handler( "Received message from transport code from %s", msg->sender );
-       int status = osrf_stack_transport_handler( msg );
+       int status = osrf_stack_transport_handler( msg, NULL );
 
        while(1) {
                transport_message* m = client_recv( client, 0 );
                if(m) {
                        debug_handler( "Received additional message from transport code");
-                       status = osrf_stack_transport_handler( m );
+                       status = osrf_stack_transport_handler( m, NULL );
                } else  {
                        debug_handler( "osrf_stack_process returning with only 1 received message" );
                        break;
@@ -31,7 +31,7 @@ int osrf_stack_process( transport_client* client, int timeout ) {
 // -----------------------------------------------------------------------------
 // Entry point into the stack
 // -----------------------------------------------------------------------------
-int osrf_stack_transport_handler( transport_message* msg ) { 
+int osrf_stack_transport_handler( transport_message* msg, char* my_service ) { 
 
        debug_handler( "Transport handler received new message \nfrom %s "
                        "to %s with body \n\n%s\n", msg->sender, msg->recipient, msg->body );
@@ -40,12 +40,12 @@ int osrf_stack_transport_handler( transport_message* msg ) {
 
        if( session == NULL ) {  /* we must be a server, build a new session */
                info_handler( "Received message for nonexistant session. Dropping..." );
-               //osrf_app_server_session_init( msg->thread, 
-               message_free( msg );
-               return 1;
+               osrf_app_server_session_init( msg->thread, my_service, msg->sender);
+               //message_free( msg );
+               //return 1;
        }
 
-       debug_handler("Session [%s] found, building message", msg->thread );
+       //debug_handler("Session [%s] found, building message", msg->thread );
 
        osrf_app_session_set_remote( session, msg->sender );
        osrf_message* arr[OSRF_MAX_MSGS_PER_PACKET];
@@ -181,7 +181,6 @@ osrf_message* _do_server( osrf_app_session* session, osrf_message* msg ) {
        if(session == NULL || msg == NULL)
                return NULL;
 
-
        if( msg->m_type == STATUS ) { return NULL; }
 
        warning_handler( "We dont' do servers yet !!" );
index aaac111..fc35abb 100644 (file)
@@ -11,7 +11,7 @@
 // -----------------------------------------------------------------------------
 
 int osrf_stack_process( transport_client* client, int timeout );
-int osrf_stack_transport_handler( transport_message* msg );
+int osrf_stack_transport_handler( transport_message* msg, char* my_service );
 int osrf_stack_message_handler( osrf_app_session* session, osrf_message* msg );
 int osrf_stack_application_handler( osrf_app_session* session, osrf_message* msg );
 
index 62bf673..39cf7cf 100644 (file)
@@ -2,20 +2,35 @@
 
 transport_client* global_client;
 char* system_config = NULL;
+char* config_context = NULL;
+char* bootstrap_config = NULL;
 
 transport_client* osrf_system_get_transport_client() {
        return global_client;
 }
 
+
+char* osrf_get_config_context() {
+       return config_context;
+}
+
+char* osrf_get_bootstrap_config() {
+       return bootstrap_config;
+}
+
 int osrf_system_bootstrap_client( char* config_file, char* contextnode ) {
+       return osrf_system_bootstrap_client_resc(config_file, contextnode, NULL);
+}
+
+int osrf_system_bootstrap_client_resc( char* config_file, char* contextnode, char* resource ) {
 
        if( !config_file || !contextnode )
                fatal_handler("No Config File Specified\n" );
 
-       free(system_config);
-       free(osrf_config_context);
-       system_config = strdup(config_file);
-       osrf_config_context = strdup(contextnode);
+       config_context = strdup(contextnode);
+       bootstrap_config = strdup(config_file);
+
+       debug_handler("Bootstrapping client with config %s and context node %s", config_file, contextnode);
 
        config_reader_init( "opensrf.bootstrap", config_file ); 
 
@@ -37,13 +52,18 @@ int osrf_system_bootstrap_client( char* config_file, char* contextnode ) {
        info_handler("Bootstrapping system with domain %s, port %d, and unixpath %s", domain, iport, unixpath );
 
        transport_client* client = client_init( domain, iport, unixpath, 0 );
-       
-       char buf[256];
-       memset(buf,0,256);
+
        char* host = getenv("HOSTNAME");
-       sprintf(buf, "client_%s_%d", host, getpid() );
 
+       if(!resource) resource = "";
+       int len = strlen(resource) + 256;
+       char buf[len];
+       memset(buf,0,len);
+       snprintf(buf, len - 1, "opensrf_%s_%s_%d", resource, host, getpid() );
+       
        if(client_connect( client, username, password, buf, 10, AUTH_DIGEST )) {
+               /* child nodes will leak the parents client... but we can't free
+                       it without disconnecting the parents client :( */
                global_client = client;
        }
 
@@ -71,8 +91,8 @@ int osrf_system_disconnect_client() {
 int osrf_system_shutdown() {
        config_reader_free();
        osrf_system_disconnect_client();
-       free(system_config);
-       free(osrf_config_context);
+       //free(system_config);
+       //free(config_context);
        osrf_settings_free_host_config(NULL);
        log_free();
        return 1;
index cf50989..66e155d 100644 (file)
        contextnode is the location in the config file where we collect config info
 */
 
-char* osrf_config_context;
 
 int osrf_system_bootstrap_client( char* config_file, char* contextnode );
+
+/* bootstraps a client adding the given resource string to the host/pid, etc. resource string */
+int osrf_system_bootstrap_client_resc( char* config_file, char* contextnode, char* resource );
+
 transport_client* osrf_system_get_transport_client();
 
 /* disconnects and destroys the current client connection */
 int osrf_system_disconnect_client();
 int osrf_system_shutdown(); 
 
+char* osrf_get_config_context();
+
+char* osrf_get_bootstrap_config();
 
 #endif
index 53105b0..6f60798 100644 (file)
@@ -1,7 +1,6 @@
 #include <stdio.h>
 #include "logging.h"
 
-
 void get_timestamp( char buf_36chars[]) {
 
        struct timeb tb;
index 1062ace..dc4a8e2 100644 (file)
@@ -361,12 +361,14 @@ int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
 
                // If timeout is -1, we block indefinitely
                if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
+                       warning_handler("Sys Error: %s", strerror(errno));
                        return warning_handler("Call to select interrupted");
                }
 
        } else if( timeout > 0 ) { /* timeout of 0 means don't block */
 
                if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
+                       warning_handler("Sys Error: %s", strerror(errno));
                        return warning_handler( "Call to select interrupted" );
                }
        }
@@ -402,12 +404,14 @@ int socket_wait_all(socket_manager* mgr, int timeout) {
 
                // If timeout is -1, there is no timeout passed to the call to select
                if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
+                       warning_handler("Sys Error: %s", strerror(errno));
                        return warning_handler("Call to select interrupted");
                }
 
        } else if( timeout != 0 ) { /* timeout of 0 means don't block */
 
                if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
+                       warning_handler("Sys Error: %s", strerror(errno));
                        return warning_handler( "Call to select interrupted" );
                }
        }
index 9c0a3c2..c1019da 100644 (file)
@@ -14,14 +14,6 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.
 */
 
-#include <stdio.h>
-
-#include <sys/types.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <string.h>
-
-//#include <sys/timeb.h>
 #include "utils.h"
 
 inline void* safe_malloc( int size ) {
@@ -34,6 +26,8 @@ inline void* safe_malloc( int size ) {
        return ptr;
 }
 
+
+
 /* utility method for profiling */
 double get_timestamp_millis() {
        //struct timeb t;
@@ -92,6 +86,26 @@ long va_list_size(const char* format, va_list args) {
 }
 
 
+char* va_list_to_string(const char* format, ...) {
+
+       long len = 0;
+       va_list args;
+       va_list a_copy;
+
+       va_copy(a_copy, args);
+
+       va_start(args, format);
+       len = va_list_size(format, args);
+
+       char buf[len];
+       memset(buf, 0, len);
+
+       va_start(a_copy, format);
+       vsnprintf(buf, len - 1, format, a_copy);
+       va_end(a_copy);
+       return strdup(buf);
+}
+
 // ---------------------------------------------------------------------------------
 // Flesh out a ubiqitous growing string buffer
 // ---------------------------------------------------------------------------------
index af09795..1dc004d 100644 (file)
@@ -17,12 +17,18 @@ GNU General Public License for more details.
 #ifndef UTILS_H
 #define UTILS_H
 
+#include <stdio.h>
 #include <stdarg.h>
 #include <fcntl.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include <sys/stat.h>
 #include <fcntl.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <string.h>
+//#include <sys/timeb.h>
+
 
 #define BUFFER_MAX_SIZE 10485760 
 
@@ -57,6 +63,10 @@ int buffer_add_char(growing_buffer* gb, char c);
        */
 long va_list_size(const char* format, va_list);
 
+/* turns a va list into a string, caller must free the 
+       allocated char */
+char* va_list_to_string(const char* format, ...);
+
 
 /* string escape utility method.  escapes unicode embeded characters.
        escapes the usual \n, \t, etc. 
@@ -92,4 +102,5 @@ char* file_to_string(char* filename);
 
 
 
+
 #endif