1 #include <opensrf/osrf_prefork.h>
2 #include <opensrf/osrf_app_session.h>
3 #include <opensrf/osrf_application.h>
6 //#define READ_BUFSIZE 4096
7 #define READ_BUFSIZE 1024
8 //#define MAX_BUFSIZE 10485760 /* 10M enough? ;) */
9 #define ABS_MAX_CHILDREN 256
11 struct prefork_simple_struct {
18 int current_num_children;
19 int keepalive; /* keepalive time for stateful sessions */
21 struct prefork_child_struct* first_child;
22 transport_client* connection;
24 typedef struct prefork_simple_struct prefork_simple;
26 struct prefork_child_struct {
37 struct prefork_child_struct* next;
40 typedef struct prefork_child_struct prefork_child;
42 static prefork_simple* prefork_simple_init( transport_client* client,
43 int max_requests, int min_children, int max_children );
44 static prefork_child* launch_child( prefork_simple* forker );
45 static void prefork_launch_children( prefork_simple* forker );
46 static void prefork_run(prefork_simple* forker);
47 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
49 //static prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid );
51 static void del_prefork_child( prefork_simple* forker, pid_t pid );
52 static void check_children( prefork_simple* forker, int forever );
53 static void prefork_child_process_request(prefork_child*, char* data);
54 static int prefork_child_init_hook(prefork_child*);
55 static prefork_child* prefork_child_init(
56 int max_requests, int read_data_fd, int write_data_fd,
57 int read_status_fd, int write_status_fd );
59 /* listens on the 'data_to_child' fd and wait for incoming data */
60 static void prefork_child_wait( prefork_child* child );
61 static int prefork_free( prefork_simple* );
62 static int prefork_child_free( prefork_child* );
63 static void osrf_prefork_register_routers( const char* appname );
64 static void osrf_prefork_child_exit( prefork_child* );
67 /* true if we just deleted a child. This will allow us to make sure we're
68 not trying to use freed memory */
69 static sig_atomic_t child_dead;
71 static void sigchld_handler( int sig );
73 int osrf_prefork_run(const char* appname) {
76 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
80 set_proc_title( "OpenSRF Listener [%s]", appname );
87 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
89 char* max_req = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
90 char* min_children = osrf_settings_host_value("/apps/%s/unix_config/min_children", appname);
91 char* max_children = osrf_settings_host_value("/apps/%s/unix_config/max_children", appname);
92 char* keepalive = osrf_settings_host_value("/apps/%s/keepalive", appname);
94 if(!keepalive) osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive);
95 else kalive = atoi(keepalive);
97 if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr);
98 else maxr = atoi(max_req);
100 if(!min_children) osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming %d", minc);
101 else minc = atoi(min_children);
103 if(!max_children) osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming %d", maxc);
104 else maxc = atoi(max_children);
110 /* --------------------------------------------------- */
112 char* resc = va_list_to_string("%s_listener", appname);
114 if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
115 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
122 prefork_simple* forker = prefork_simple_init(
123 osrfSystemGetTransportClient(), maxr, minc, maxc);
126 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object");
130 forker->appname = strdup(appname);
131 forker->keepalive = kalive;
133 prefork_launch_children(forker);
135 osrf_prefork_register_routers(appname);
137 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
140 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() retuned - how??");
141 prefork_free(forker);
146 /* sends the "register" packet to the specified router */
147 static void osrf_prefork_send_router_registration(const char* appname, const char* routerName, const char* routerDomain) {
148 transport_client* client = osrfSystemGetTransportClient();
149 char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
150 osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
151 transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
152 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
153 client_send_message( client, msg );
158 /** parses a single "complex" router configuration chunk */
159 static void osrf_prefork_parse_router_chunk(const char* appname, jsonObject* routerChunk) {
161 char* routerName = jsonObjectGetString(jsonObjectGetKey(routerChunk, "name"));
162 char* domain = jsonObjectGetString(jsonObjectGetKey(routerChunk, "domain"));
163 jsonObject* services = jsonObjectGetKey(routerChunk, "services");
164 osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s", routerName, domain);
166 if( services && services->type == JSON_HASH ) {
167 osrfLogDebug(OSRF_LOG_MARK, "investigating router information...");
168 jsonObject* service_obj = jsonObjectGetKey(services, "service");
170 ; // do nothing (shouldn't happen)
171 else if( JSON_ARRAY == service_obj->type ) {
173 for(j = 0; j < service_obj->size; j++ ) {
174 const char* service = jsonObjectGetString(jsonObjectGetIndex(service_obj, j));
175 if( service && !strcmp( appname, service ))
176 osrf_prefork_send_router_registration(appname, routerName, domain);
179 else if( JSON_STRING == service_obj->type ) {
180 if( !strcmp(appname, jsonObjectGetString( service_obj )) )
181 osrf_prefork_send_router_registration(appname, routerName, domain);
184 osrf_prefork_send_router_registration(appname, routerName, domain);
188 static void osrf_prefork_register_routers( const char* appname ) {
190 jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
193 for(i = 0; i < routerInfo->size; i++) {
194 jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
196 if(routerChunk->type == JSON_STRING) {
197 /* this accomodates simple router configs */
198 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
199 char* domain = osrfConfigGetValue(NULL, "/routers/router");
200 osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s", routerName);
201 osrf_prefork_send_router_registration(appname, routerName, domain);
204 osrf_prefork_parse_router_chunk(appname, routerChunk);
209 static int prefork_child_init_hook(prefork_child* child) {
211 if(!child) return -1;
212 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
214 osrfSystemInitCache();
215 char* resc = va_list_to_string("%s_drone",child->appname);
217 /* if we're a source-client, tell the logger now that we're a new process*/
218 char* isclient = osrfConfigGetValue(NULL, "/client");
219 if( isclient && !strcasecmp(isclient,"true") )
220 osrfLogSetIsClient(1);
224 /* we want to remove traces of our parents socket connection
225 * so we can have our own */
226 osrfSystemIgnoreTransportClient();
228 if(!osrfSystemBootstrapClientResc( NULL, NULL, resc)) {
229 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
236 if( ! osrfAppRunChildInit(child->appname) ) {
237 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
239 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
243 set_proc_title( "OpenSRF Drone [%s]", child->appname );
247 static void prefork_child_process_request(prefork_child* child, char* data) {
250 transport_client* client = osrfSystemGetTransportClient();
252 if(!client_connected(client)) {
253 osrfSystemIgnoreTransportClient();
254 osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
255 if(!osrf_system_bootstrap_client(NULL, NULL)) {
256 osrfLogError( OSRF_LOG_MARK,
257 "Unable to bootstrap client in prefork_child_process_request()");
259 osrf_prefork_child_exit(child);
263 /* construct the message from the xml */
264 transport_message* msg = new_message_from_xml( data );
266 osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
269 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
270 osrfAppSessionFree( session );
274 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
275 int keepalive = child->keepalive;
283 osrfLogDebug(OSRF_LOG_MARK,
284 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
286 retval = osrf_app_session_queue_wait(session, keepalive, &recvd);
289 osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
292 osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
296 /* see if the client disconnected from us */
297 if(session->state != OSRF_SESSION_CONNECTED) break;
299 /* if no data was reveived within the timeout interval */
300 if( !recvd && (end - start) >= keepalive ) {
301 osrfLogInfo(OSRF_LOG_MARK, "No request was received in %d seconds, exiting stateful session", keepalive);
302 osrfAppSessionStatus(
306 0, "Disconnected on timeout" );
312 osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
313 osrfAppSessionFree( session );
318 static prefork_simple* prefork_simple_init( transport_client* client,
319 int max_requests, int min_children, int max_children ) {
321 if( min_children > max_children ) {
322 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
323 "than max_children (%d)", min_children, max_children );
327 if( max_children > ABS_MAX_CHILDREN ) {
328 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
329 max_children, ABS_MAX_CHILDREN );
333 osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
334 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
336 /* flesh out the struct */
337 prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));
338 prefork->max_requests = max_requests;
339 prefork->min_children = min_children;
340 prefork->max_children = max_children;
342 prefork->data_to_child = 0;
343 prefork->data_to_parent = 0;
344 prefork->current_num_children = 0;
345 prefork->keepalive = 0;
346 prefork->appname = NULL;
347 prefork->first_child = NULL;
348 prefork->connection = client;
353 static prefork_child* launch_child( prefork_simple* forker ) {
359 /* Set up the data pipes and add the child struct to the parent */
360 if( pipe(data_fd) < 0 ) { /* build the data pipe*/
361 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
365 if( pipe(status_fd) < 0 ) {/* build the status pipe */
366 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
370 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
371 prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
372 data_fd[1], status_fd[0], status_fd[1] );
374 child->appname = strdup(forker->appname);
375 child->keepalive = forker->keepalive;
378 add_prefork_child( forker, child );
380 if( (pid=fork()) < 0 ) {
381 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
385 if( pid > 0 ) { /* parent */
387 signal(SIGCHLD, sigchld_handler);
388 (forker->current_num_children)++;
391 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
392 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
393 the children are currently using */
399 osrfLogInternal( OSRF_LOG_MARK, "I am new child with read_data_fd = %d and write_status_fd = %d",
400 child->read_data_fd, child->write_status_fd );
402 child->pid = getpid();
403 close( child->write_data_fd );
404 close( child->read_status_fd );
407 if( prefork_child_init_hook(child) == -1 ) {
408 osrfLogError(OSRF_LOG_MARK,
409 "Forker child going away because we could not connect to OpenSRF...");
410 osrf_prefork_child_exit(child);
413 prefork_child_wait( child );
414 osrf_prefork_child_exit(child); /* just to be sure */
419 static void osrf_prefork_child_exit(prefork_child* child) {
420 osrfAppRunExitCode();
424 static void prefork_launch_children( prefork_simple* forker ) {
427 while( c++ < forker->min_children )
428 launch_child( forker );
432 static void sigchld_handler( int sig ) {
433 signal(SIGCHLD, sigchld_handler);
438 void reap_children( prefork_simple* forker ) {
443 while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
444 del_prefork_child( forker, child_pid );
447 while( forker->current_num_children < forker->min_children )
448 launch_child( forker );
453 static void prefork_run(prefork_simple* forker) {
455 if( forker->first_child == NULL )
458 transport_message* cur_msg = NULL;
463 if( forker->first_child == NULL ) {/* no more children */
464 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
468 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
469 cur_msg = client_recv( forker->connection, -1 );
471 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
473 if( cur_msg == NULL ) continue;
475 int honored = 0; /* true if we've serviced the request */
480 if(!no_recheck) check_children( forker, 0 );
483 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
485 prefork_child* cur_child = forker->first_child;
487 /* Look for an available child */
488 for( k = 0; k < forker->current_num_children; k++ ) {
490 osrfLogInternal( OSRF_LOG_MARK, "Searching for available child. cur_child->pid = %d", cur_child->pid );
491 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d", forker->current_num_children, k);
493 if( cur_child->available ) {
494 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
496 message_prepare_xml( cur_msg );
497 char* data = cur_msg->msg_xml;
498 if( ! data || strlen(data) < 1 ) break;
500 cur_child->available = 0;
501 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d", cur_child->write_data_fd );
504 //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
505 if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
506 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
507 cur_child = cur_child->next;
511 //fprintf(stderr, "Wrote %d bytes to child\n", written);
513 forker->first_child = cur_child->next;
517 cur_child = cur_child->next;
520 /* if none available, add a new child if we can */
522 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
524 if( forker->current_num_children < forker->max_children ) {
525 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
526 forker->current_num_children );
528 prefork_child* new_child = launch_child( forker );
531 message_prepare_xml( cur_msg );
532 char* data = cur_msg->msg_xml;
535 int len = strlen(data);
538 new_child->available = 0;
539 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
540 new_child->write_data_fd, new_child->pid );
542 if( write( new_child->write_data_fd, data, len + 1 ) >= 0 ) {
543 forker->first_child = new_child->next;
554 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
556 check_children( forker, 1 ); /* non-poll version */
557 /* tell the loop no to call check_children again, since we're calling it now */
562 reap_children(forker);
565 //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
569 message_free( cur_msg );
571 } /* top level listen loop */
576 /** XXX Add a flag which tells select() to wait forever on children
577 * in the best case, this will be faster than calling usleep(x), and
578 * in the worst case it won't be slower and will do less logging...
581 static void check_children( prefork_simple* forker, int forever ) {
593 reap_children(forker);
595 prefork_child* cur_child = forker->first_child;
598 for( i = 0; i!= forker->current_num_children; i++ ) {
600 if( cur_child->read_status_fd > max_fd )
601 max_fd = cur_child->read_status_fd;
602 FD_SET( cur_child->read_status_fd, &read_set );
603 cur_child = cur_child->next;
606 FD_CLR(0,&read_set);/* just to be sure */
609 osrfLogWarning(OSRF_LOG_MARK, "We have no children available - waiting for one to show up...");
611 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, NULL)) == -1 ) {
612 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
614 osrfLogInfo(OSRF_LOG_MARK, "select() completed after waiting on children to become available");
622 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
623 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
627 if( select_ret == 0 )
630 /* see if one of a child has told us it's done */
631 cur_child = forker->first_child;
634 for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
636 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
637 //printf( "Server received status from a child %d\n", cur_child->pid );
638 osrfLogDebug( OSRF_LOG_MARK, "Server received status from a child %d", cur_child->pid );
642 /* now suck off the data */
644 osrf_clearbuf( buf, sizeof(buf) );
645 if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
646 osrfLogWarning( OSRF_LOG_MARK, "Read error after select in child status read with errno %d", errno);
650 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
652 cur_child->available = 1;
654 cur_child = cur_child->next;
660 static void prefork_child_wait( prefork_child* child ) {
663 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
664 char buf[READ_BUFSIZE];
665 osrf_clearbuf( buf, sizeof(buf) );
667 for( i = 0; i < child->max_requests; i++ ) {
671 clr_fl(child->read_data_fd, O_NONBLOCK );
673 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
675 osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
677 set_fl(child->read_data_fd, O_NONBLOCK );
678 buffer_add( gbuf, buf );
679 osrf_clearbuf( buf, sizeof(buf) );
683 if( errno == EAGAIN ) n = 0;
685 if( errno == EPIPE ) {
686 osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
691 osrfLogWarning( OSRF_LOG_MARK, "Prefork child read returned error with errno %d", errno );
694 } else if( gotdata ) {
695 osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
696 prefork_child_process_request(child, gbuf->buf);
697 buffer_reset( gbuf );
700 if( i < child->max_requests - 1 )
701 write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
706 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
707 child->max_requests, i, (long) getpid() );
709 osrf_prefork_child_exit(child); /* just to be sure */
713 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
715 if( forker->first_child == NULL ) {
716 forker->first_child = child;
721 /* we put the child in as the last because, regardless,
722 we have to do the DLL splice dance, and this is the
725 prefork_child* start_child = forker->first_child;
727 if( forker->first_child->next == start_child )
729 forker->first_child = forker->first_child->next;
732 /* here we know that forker->first_child is the last element
733 in the list and start_child is the first. Insert the
734 new child between them*/
736 forker->first_child->next = child;
737 child->next = start_child;
741 //static prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
743 // if( forker->first_child == NULL ) { return NULL; }
744 // prefork_child* start_child = forker->first_child;
746 // if( forker->first_child->pid == pid )
747 // return forker->first_child;
748 // } while( (forker->first_child = forker->first_child->next) != start_child );
754 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
756 if( forker->first_child == NULL ) { return; }
758 (forker->current_num_children)--;
759 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
761 prefork_child* start_child = forker->first_child; /* starting point */
762 prefork_child* cur_child = start_child; /* current pointer */
763 prefork_child* prev_child = start_child; /* the trailing pointer */
765 /* special case where there is only one in the list */
766 if( start_child == start_child->next ) {
767 if( start_child->pid == pid ) {
768 forker->first_child = NULL;
770 close( start_child->read_data_fd );
771 close( start_child->write_data_fd );
772 close( start_child->read_status_fd );
773 close( start_child->write_status_fd );
775 prefork_child_free( start_child );
781 /* special case where the first item in the list needs to be removed */
782 if( start_child->pid == pid ) {
784 /* find the last one so we can remove the start_child */
786 prev_child = cur_child;
787 cur_child = cur_child->next;
788 }while( cur_child != start_child );
790 /* now cur_child == start_child */
791 prev_child->next = cur_child->next;
792 forker->first_child = prev_child;
794 close( cur_child->read_data_fd );
795 close( cur_child->write_data_fd );
796 close( cur_child->read_status_fd );
797 close( cur_child->write_status_fd );
799 prefork_child_free( cur_child );
804 prev_child = cur_child;
805 cur_child = cur_child->next;
807 if( cur_child->pid == pid ) {
808 prev_child->next = cur_child->next;
810 close( cur_child->read_data_fd );
811 close( cur_child->write_data_fd );
812 close( cur_child->read_status_fd );
813 close( cur_child->write_status_fd );
815 prefork_child_free( cur_child );
819 } while(cur_child != start_child);
825 static prefork_child* prefork_child_init(
826 int max_requests, int read_data_fd, int write_data_fd,
827 int read_status_fd, int write_status_fd ) {
829 prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
831 child->max_requests = max_requests;
832 child->read_data_fd = read_data_fd;
833 child->write_data_fd = write_data_fd;
834 child->read_status_fd = read_status_fd;
835 child->write_status_fd = write_status_fd;
836 child->min_children = 0;
837 child->available = 1;
838 child->appname = NULL;
839 child->keepalive = 0;
846 static int prefork_free( prefork_simple* prefork ) {
848 while( prefork->first_child != NULL ) {
849 osrfLogInfo( OSRF_LOG_MARK, "Killing children and sleeping 1 to reap..." );
854 client_free(prefork->connection);
855 free(prefork->appname);
860 static int prefork_child_free( prefork_child* child ) {
861 free(child->appname);
862 close(child->read_data_fd);
863 close(child->write_status_fd);