2 #include "opensrf/osrf_prefork.h"
3 #include "opensrf/osrf_app_session.h"
4 #include "opensrf/osrf_application.h"
6 #define READ_BUFSIZE 1024
7 #define ABS_MAX_CHILDREN 256
13 int fd; /**< Unused. */
14 int data_to_child; /**< Unused. */
15 int data_to_parent; /**< Unused. */
16 int current_num_children;
17 int keepalive; /**< keepalive time for stateful sessions. */
19 struct prefork_child_struct* first_child;
20 transport_client* connection;
23 struct prefork_child_struct {
34 struct prefork_child_struct* next;
37 typedef struct prefork_child_struct prefork_child;
39 static prefork_simple* prefork_simple_init( transport_client* client,
40 int max_requests, int min_children, int max_children );
41 static prefork_child* launch_child( prefork_simple* forker );
42 static void prefork_launch_children( prefork_simple* forker );
43 static void prefork_run(prefork_simple* forker);
44 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
46 //static prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid );
48 static void del_prefork_child( prefork_simple* forker, pid_t pid );
49 static void check_children( prefork_simple* forker, int forever );
50 static void prefork_child_process_request(prefork_child*, char* data);
51 static int prefork_child_init_hook(prefork_child*);
52 static prefork_child* prefork_child_init(
53 int max_requests, int read_data_fd, int write_data_fd,
54 int read_status_fd, int write_status_fd );
56 /* listens on the 'data_to_child' fd and wait for incoming data */
57 static void prefork_child_wait( prefork_child* child );
58 static int prefork_free( prefork_simple* );
59 static int prefork_child_free( prefork_child* );
60 static void osrf_prefork_register_routers( const char* appname );
61 static void osrf_prefork_child_exit( prefork_child* );
64 /* true if we just deleted a child. This will allow us to make sure we're
65 not trying to use freed memory */
66 static sig_atomic_t child_dead;
68 static void sigchld_handler( int sig );
70 int osrf_prefork_run(const char* appname) {
73 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
77 set_proc_title( "OpenSRF Listener [%s]", appname );
84 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
86 char* max_req = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
87 char* min_children = osrf_settings_host_value("/apps/%s/unix_config/min_children", appname);
88 char* max_children = osrf_settings_host_value("/apps/%s/unix_config/max_children", appname);
89 char* keepalive = osrf_settings_host_value("/apps/%s/keepalive", appname);
91 if(!keepalive) osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive);
92 else kalive = atoi(keepalive);
94 if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr);
95 else maxr = atoi(max_req);
97 if(!min_children) osrfLogWarning( OSRF_LOG_MARK,
98 "Min children not defined, assuming %d", minc);
99 else minc = atoi(min_children);
101 if(!max_children) osrfLogWarning( OSRF_LOG_MARK,
102 "Max children not defined, assuming %d", maxc);
103 else maxc = atoi(max_children);
109 /* --------------------------------------------------- */
111 char* resc = va_list_to_string("%s_listener", appname);
113 if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
114 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
121 prefork_simple* forker = prefork_simple_init(
122 osrfSystemGetTransportClient(), maxr, minc, maxc);
125 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object");
129 forker->appname = strdup(appname);
130 forker->keepalive = kalive;
132 prefork_launch_children(forker);
134 osrf_prefork_register_routers(appname);
136 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
139 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() retuned - how??");
140 prefork_free(forker);
145 /* sends the "register" packet to the specified router */
146 static void osrf_prefork_send_router_registration(const char* appname, const char* routerName, const char* routerDomain) {
147 transport_client* client = osrfSystemGetTransportClient();
148 char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
149 osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
150 transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
151 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
152 client_send_message( client, msg );
157 /** parses a single "complex" router configuration chunk */
158 static void osrf_prefork_parse_router_chunk(const char* appname, jsonObject* routerChunk) {
160 const char* routerName = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "name"));
161 const char* domain = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "domain"));
162 const jsonObject* services = jsonObjectGetKeyConst(routerChunk, "services");
163 osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s",
166 if( services && services->type == JSON_HASH ) {
167 osrfLogDebug(OSRF_LOG_MARK, "investigating router information...");
168 const jsonObject* service_obj = jsonObjectGetKeyConst(services, "service");
170 ; // do nothing (shouldn't happen)
171 else if( JSON_ARRAY == service_obj->type ) {
173 for(j = 0; j < service_obj->size; j++ ) {
174 const char* service = jsonObjectGetString(jsonObjectGetIndex(service_obj, j));
175 if( service && !strcmp( appname, service ))
176 osrf_prefork_send_router_registration(appname, routerName, domain);
179 else if( JSON_STRING == service_obj->type ) {
180 if( !strcmp(appname, jsonObjectGetString( service_obj )) )
181 osrf_prefork_send_router_registration(appname, routerName, domain);
184 osrf_prefork_send_router_registration(appname, routerName, domain);
188 static void osrf_prefork_register_routers( const char* appname ) {
190 jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
193 for(i = 0; i < routerInfo->size; i++) {
194 jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
196 if(routerChunk->type == JSON_STRING) {
197 /* this accomodates simple router configs */
198 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
199 char* domain = osrfConfigGetValue(NULL, "/routers/router");
200 osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s",
202 osrf_prefork_send_router_registration(appname, routerName, domain);
205 osrf_prefork_parse_router_chunk(appname, routerChunk);
210 static int prefork_child_init_hook(prefork_child* child) {
212 if(!child) return -1;
213 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
215 osrfSystemInitCache();
216 char* resc = va_list_to_string("%s_drone",child->appname);
218 /* if we're a source-client, tell the logger now that we're a new process*/
219 char* isclient = osrfConfigGetValue(NULL, "/client");
220 if( isclient && !strcasecmp(isclient,"true") )
221 osrfLogSetIsClient(1);
224 /* we want to remove traces of our parents socket connection
225 * so we can have our own */
226 osrfSystemIgnoreTransportClient();
228 if(!osrfSystemBootstrapClientResc( NULL, NULL, resc)) {
229 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
236 if( ! osrfAppRunChildInit(child->appname) ) {
237 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
239 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
243 set_proc_title( "OpenSRF Drone [%s]", child->appname );
247 static void prefork_child_process_request(prefork_child* child, char* data) {
250 transport_client* client = osrfSystemGetTransportClient();
252 if(!client_connected(client)) {
253 osrfSystemIgnoreTransportClient();
254 osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
255 if(!osrf_system_bootstrap_client(NULL, NULL)) {
256 osrfLogError( OSRF_LOG_MARK,
257 "Unable to bootstrap client in prefork_child_process_request()");
259 osrf_prefork_child_exit(child);
263 /* construct the message from the xml */
264 transport_message* msg = new_message_from_xml( data );
266 osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
269 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
270 osrfAppSessionFree( session );
274 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
275 int keepalive = child->keepalive;
283 osrfLogDebug(OSRF_LOG_MARK,
284 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
286 retval = osrf_app_session_queue_wait(session, keepalive, &recvd);
289 osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
292 osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
296 /* see if the client disconnected from us */
297 if(session->state != OSRF_SESSION_CONNECTED) break;
299 /* if no data was reveived within the timeout interval */
300 if( !recvd && (end - start) >= keepalive ) {
301 osrfLogInfo(OSRF_LOG_MARK,
302 "No request was received in %d seconds, exiting stateful session", keepalive);
303 osrfAppSessionStatus(
307 0, "Disconnected on timeout" );
313 osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
314 osrfAppSessionFree( session );
319 static prefork_simple* prefork_simple_init( transport_client* client,
320 int max_requests, int min_children, int max_children ) {
322 if( min_children > max_children ) {
323 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
324 "than max_children (%d)", min_children, max_children );
328 if( max_children > ABS_MAX_CHILDREN ) {
329 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
330 max_children, ABS_MAX_CHILDREN );
334 osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
335 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
337 /* flesh out the struct */
338 prefork_simple* prefork = safe_malloc(sizeof(prefork_simple));
339 prefork->max_requests = max_requests;
340 prefork->min_children = min_children;
341 prefork->max_children = max_children;
343 prefork->data_to_child = 0;
344 prefork->data_to_parent = 0;
345 prefork->current_num_children = 0;
346 prefork->keepalive = 0;
347 prefork->appname = NULL;
348 prefork->first_child = NULL;
349 prefork->connection = client;
354 static prefork_child* launch_child( prefork_simple* forker ) {
360 /* Set up the data pipes and add the child struct to the parent */
361 if( pipe(data_fd) < 0 ) { /* build the data pipe*/
362 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
366 if( pipe(status_fd) < 0 ) {/* build the status pipe */
367 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
371 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
372 data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
373 prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
374 data_fd[1], status_fd[0], status_fd[1] );
376 child->appname = strdup(forker->appname);
377 child->keepalive = forker->keepalive;
380 add_prefork_child( forker, child );
382 if( (pid=fork()) < 0 ) {
383 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
387 if( pid > 0 ) { /* parent */
389 signal(SIGCHLD, sigchld_handler);
390 (forker->current_num_children)++;
393 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
394 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
395 the children are currently using */
401 osrfLogInternal( OSRF_LOG_MARK,
402 "I am new child with read_data_fd = %d and write_status_fd = %d",
403 child->read_data_fd, child->write_status_fd );
405 child->pid = getpid();
406 close( child->write_data_fd );
407 close( child->read_status_fd );
410 if( prefork_child_init_hook(child) == -1 ) {
411 osrfLogError(OSRF_LOG_MARK,
412 "Forker child going away because we could not connect to OpenSRF...");
413 osrf_prefork_child_exit(child);
416 prefork_child_wait( child );
417 osrf_prefork_child_exit(child); /* just to be sure */
422 static void osrf_prefork_child_exit(prefork_child* child) {
423 osrfAppRunExitCode();
427 static void prefork_launch_children( prefork_simple* forker ) {
430 while( c++ < forker->min_children )
431 launch_child( forker );
435 static void sigchld_handler( int sig ) {
436 signal(SIGCHLD, sigchld_handler);
441 void reap_children( prefork_simple* forker ) {
446 while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
447 del_prefork_child( forker, child_pid );
450 while( forker->current_num_children < forker->min_children )
451 launch_child( forker );
456 static void prefork_run(prefork_simple* forker) {
458 if( forker->first_child == NULL )
461 transport_message* cur_msg = NULL;
466 if( forker->first_child == NULL ) {/* no more children */
467 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
471 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
472 cur_msg = client_recv( forker->connection, -1 );
474 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
476 if( cur_msg == NULL ) continue;
478 int honored = 0; /* true if we've serviced the request */
483 if(!no_recheck) check_children( forker, 0 );
486 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
488 prefork_child* cur_child = forker->first_child;
490 /* Look for an available child */
491 for( k = 0; k < forker->current_num_children; k++ ) {
493 osrfLogInternal( OSRF_LOG_MARK,
494 "Searching for available child. cur_child->pid = %d", cur_child->pid );
495 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d",
496 forker->current_num_children, k);
498 if( cur_child->available ) {
499 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
501 message_prepare_xml( cur_msg );
502 char* data = cur_msg->msg_xml;
503 if( ! data || strlen(data) < 1 ) break;
505 cur_child->available = 0;
506 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
507 cur_child->write_data_fd );
510 if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
511 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
512 cur_child = cur_child->next;
516 forker->first_child = cur_child->next;
520 cur_child = cur_child->next;
523 /* if none available, add a new child if we can */
525 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
527 if( forker->current_num_children < forker->max_children ) {
528 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
529 forker->current_num_children );
531 prefork_child* new_child = launch_child( forker );
534 message_prepare_xml( cur_msg );
535 char* data = cur_msg->msg_xml;
538 int len = strlen(data);
541 new_child->available = 0;
542 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
543 new_child->write_data_fd, new_child->pid );
545 if( write( new_child->write_data_fd, data, len + 1 ) >= 0 ) {
546 forker->first_child = new_child->next;
557 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
559 check_children( forker, 1 ); /* non-poll version */
560 /* tell the loop no to call check_children again, since we're calling it now */
565 reap_children(forker);
568 //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
572 message_free( cur_msg );
574 } /* top level listen loop */
579 /** XXX Add a flag which tells select() to wait forever on children
580 in the best case, this will be faster than calling usleep(x), and
581 in the worst case it won't be slower and will do less logging...
583 static void check_children( prefork_simple* forker, int forever ) {
595 reap_children(forker);
597 prefork_child* cur_child = forker->first_child;
600 for( i = 0; i!= forker->current_num_children; i++ ) {
602 if( cur_child->read_status_fd > max_fd )
603 max_fd = cur_child->read_status_fd;
604 FD_SET( cur_child->read_status_fd, &read_set );
605 cur_child = cur_child->next;
608 FD_CLR(0,&read_set); /* just to be sure */
611 osrfLogWarning(OSRF_LOG_MARK,
612 "We have no children available - waiting for one to show up...");
614 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, NULL)) == -1 ) {
615 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
617 osrfLogInfo(OSRF_LOG_MARK,
618 "select() completed after waiting on children to become available");
626 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
627 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
631 if( select_ret == 0 )
634 /* see if one of a child has told us it's done */
635 cur_child = forker->first_child;
638 for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
640 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
641 //printf( "Server received status from a child %d\n", cur_child->pid );
642 osrfLogDebug( OSRF_LOG_MARK,
643 "Server received status from a child %d", cur_child->pid );
647 /* now suck off the data */
649 osrf_clearbuf( buf, sizeof(buf) );
650 if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
651 osrfLogWarning( OSRF_LOG_MARK,
652 "Read error after select in child status read with errno %d", errno);
656 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
658 cur_child->available = 1;
660 cur_child = cur_child->next;
666 static void prefork_child_wait( prefork_child* child ) {
669 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
670 char buf[READ_BUFSIZE];
671 osrf_clearbuf( buf, sizeof(buf) );
673 for( i = 0; i < child->max_requests; i++ ) {
677 clr_fl(child->read_data_fd, O_NONBLOCK );
679 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
681 osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
683 set_fl(child->read_data_fd, O_NONBLOCK );
684 buffer_add( gbuf, buf );
685 osrf_clearbuf( buf, sizeof(buf) );
689 if( errno == EAGAIN ) n = 0;
691 if( errno == EPIPE ) {
692 osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
697 osrfLogWarning( OSRF_LOG_MARK,
698 "Prefork child read returned error with errno %d", errno );
701 } else if( gotdata ) {
702 osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
703 prefork_child_process_request(child, gbuf->buf);
704 buffer_reset( gbuf );
707 if( i < child->max_requests - 1 )
708 write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
713 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
714 child->max_requests, i, (long) getpid() );
716 osrf_prefork_child_exit(child); /* just to be sure */
720 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
722 if( forker->first_child == NULL ) {
723 forker->first_child = child;
728 /* we put the child in as the last because, regardless,
729 we have to do the DLL splice dance, and this is the
732 prefork_child* start_child = forker->first_child;
734 if( forker->first_child->next == start_child )
736 forker->first_child = forker->first_child->next;
739 /* here we know that forker->first_child is the last element
740 in the list and start_child is the first. Insert the
741 new child between them*/
743 forker->first_child->next = child;
744 child->next = start_child;
748 //static prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
750 // if( forker->first_child == NULL ) { return NULL; }
751 // prefork_child* start_child = forker->first_child;
753 // if( forker->first_child->pid == pid )
754 // return forker->first_child;
755 // } while( (forker->first_child = forker->first_child->next) != start_child );
761 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
763 if( forker->first_child == NULL ) { return; }
765 (forker->current_num_children)--;
766 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
768 prefork_child* start_child = forker->first_child; /* starting point */
769 prefork_child* cur_child = start_child; /* current pointer */
770 prefork_child* prev_child = start_child; /* the trailing pointer */
772 /* special case where there is only one in the list */
773 if( start_child == start_child->next ) {
774 if( start_child->pid == pid ) {
775 forker->first_child = NULL;
777 close( start_child->read_data_fd );
778 close( start_child->write_data_fd );
779 close( start_child->read_status_fd );
780 close( start_child->write_status_fd );
782 prefork_child_free( start_child );
788 /* special case where the first item in the list needs to be removed */
789 if( start_child->pid == pid ) {
791 /* find the last one so we can remove the start_child */
793 prev_child = cur_child;
794 cur_child = cur_child->next;
795 } while( cur_child != start_child );
797 /* now cur_child == start_child */
798 prev_child->next = cur_child->next;
799 forker->first_child = prev_child;
801 close( cur_child->read_data_fd );
802 close( cur_child->write_data_fd );
803 close( cur_child->read_status_fd );
804 close( cur_child->write_status_fd );
806 prefork_child_free( cur_child );
811 prev_child = cur_child;
812 cur_child = cur_child->next;
814 if( cur_child->pid == pid ) {
815 prev_child->next = cur_child->next;
817 close( cur_child->read_data_fd );
818 close( cur_child->write_data_fd );
819 close( cur_child->read_status_fd );
820 close( cur_child->write_status_fd );
822 prefork_child_free( cur_child );
826 } while(cur_child != start_child);
832 static prefork_child* prefork_child_init(
833 int max_requests, int read_data_fd, int write_data_fd,
834 int read_status_fd, int write_status_fd ) {
836 prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
838 child->max_requests = max_requests;
839 child->read_data_fd = read_data_fd;
840 child->write_data_fd = write_data_fd;
841 child->read_status_fd = read_status_fd;
842 child->write_status_fd = write_status_fd;
843 child->min_children = 0;
844 child->available = 1;
845 child->appname = NULL;
846 child->keepalive = 0;
853 static int prefork_free( prefork_simple* prefork ) {
855 while( prefork->first_child != NULL ) {
856 osrfLogInfo( OSRF_LOG_MARK, "Killing children and sleeping 1 to reap..." );
861 client_free(prefork->connection);
862 free(prefork->appname);
867 static int prefork_child_free( prefork_child* child ) {
868 free(child->appname);
869 close(child->read_data_fd);
870 close(child->write_status_fd);