1 #include "osrf_prefork.h"
3 #include "osrf_app_session.h"
4 #include "osrf_application.h"
6 /* true if we just deleted a child. This will allow us to make sure we're
7 not trying to use freed memory */
11 void sigchld_handler( int sig );
13 int osrf_prefork_run(char* appname) {
16 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
20 set_proc_title( "OpenSRF Listener [%s]", appname );
26 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
28 jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
29 jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
30 jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
33 if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming 1000");
34 else maxr = (int) jsonObjectGetNumber(max_req);
36 if(!min_children) osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming 3");
37 else minc = (int) jsonObjectGetNumber(min_children);
39 if(!max_children) osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming 10");
40 else maxc = (int) jsonObjectGetNumber(max_children);
42 jsonObjectFree(max_req);
43 jsonObjectFree(min_children);
44 jsonObjectFree(max_children);
45 /* --------------------------------------------------- */
47 char* resc = va_list_to_string("%s_listener", appname);
49 if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc )) {
50 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
57 prefork_simple* forker = prefork_simple_init(
58 osrfSystemGetTransportClient(), maxr, minc, maxc);
60 forker->appname = strdup(appname);
63 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object");
67 prefork_launch_children(forker);
69 osrf_prefork_register_routers(appname);
71 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
74 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() retuned - how??");
80 void osrf_prefork_register_routers( char* appname ) {
82 osrfStringArray* arr = osrfNewStringArray(4);
84 int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
85 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
86 transport_client* client = osrfSystemGetTransportClient();
88 osrfLogInfo( OSRF_LOG_MARK, "router name is %s and we have %d routers to connect to", routerName, c );
91 char* domain = osrfStringArrayGetString(arr, --c);
94 char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
95 osrfLogInfo( OSRF_LOG_MARK, "Registering with router %s", jid );
97 transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
98 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
100 client_send_message( client, msg );
107 osrfStringArrayFree(arr);
110 void prefork_child_init_hook(prefork_child* child) {
113 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
114 char* resc = va_list_to_string("%s_drone",child->appname);
116 /* we want to remove traces of our parents socket connection
117 * so we can have our own */
118 osrfSystemIgnoreTransportClient();
120 if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) {
121 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
128 if( ! osrfAppRunChildInit(child->appname) ) {
129 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
131 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
134 set_proc_title( "OpenSRF Drone [%s]", child->appname );
137 void prefork_child_process_request(prefork_child* child, char* data) {
140 /* construct the message from the xml */
141 transport_message* msg = new_message_from_xml( data );
143 osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
146 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
147 osrfAppSessionFree( session );
151 /* keepalive loop for stateful sessions */
153 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
157 prefork_simple* prefork_simple_init( transport_client* client,
158 int max_requests, int min_children, int max_children ) {
160 if( min_children > max_children ) {
161 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
162 "than max_children (%d)", min_children, max_children );
166 if( max_children > ABS_MAX_CHILDREN ) {
167 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
168 max_children, ABS_MAX_CHILDREN );
172 osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
173 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
175 /* flesh out the struct */
176 prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));
177 prefork->max_requests = max_requests;
178 prefork->min_children = min_children;
179 prefork->max_children = max_children;
180 prefork->first_child = NULL;
181 prefork->connection = client;
186 prefork_child* launch_child( prefork_simple* forker ) {
192 /* Set up the data pipes and add the child struct to the parent */
193 if( pipe(data_fd) < 0 ) { /* build the data pipe*/
194 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
198 if( pipe(status_fd) < 0 ) {/* build the status pipe */
199 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
203 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
204 prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
205 data_fd[1], status_fd[0], status_fd[1] );
207 child->appname = strdup(forker->appname);
210 add_prefork_child( forker, child );
212 if( (pid=fork()) < 0 ) {
213 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
217 if( pid > 0 ) { /* parent */
219 signal(SIGCHLD, sigchld_handler);
220 (forker->current_num_children)++;
223 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
224 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
225 the children are currently using */
231 osrfLogInternal( OSRF_LOG_MARK, "I am new child with read_data_fd = %d and write_status_fd = %d",
232 child->read_data_fd, child->write_status_fd );
234 child->pid = getpid();
235 close( child->write_data_fd );
236 close( child->read_status_fd );
239 prefork_child_init_hook(child);
241 prefork_child_wait( child );
242 exit(0); /* just to be sure */
248 void prefork_launch_children( prefork_simple* forker ) {
251 while( c++ < forker->min_children )
252 launch_child( forker );
256 void sigchld_handler( int sig ) {
257 signal(SIGCHLD, sigchld_handler);
262 void reap_children( prefork_simple* forker ) {
267 while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
268 del_prefork_child( forker, child_pid );
271 while( forker->current_num_children < forker->min_children )
272 launch_child( forker );
277 void prefork_run(prefork_simple* forker) {
279 if( forker->first_child == NULL )
282 transport_message* cur_msg = NULL;
287 if( forker->first_child == NULL ) {/* no more children */
288 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
292 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
293 cur_msg = client_recv( forker->connection, -1 );
295 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
297 if( cur_msg == NULL ) continue;
299 int honored = 0; /* true if we've serviced the request */
303 check_children( forker );
305 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
307 prefork_child* cur_child = forker->first_child;
309 /* Look for an available child */
310 for( k = 0; k < forker->current_num_children; k++ ) {
312 osrfLogInternal( OSRF_LOG_MARK, "Searching for available child. cur_child->pid = %d", cur_child->pid );
313 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d", forker->current_num_children, k);
315 if( cur_child->available ) {
316 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
318 message_prepare_xml( cur_msg );
319 char* data = cur_msg->msg_xml;
320 if( ! data || strlen(data) < 1 ) break;
322 cur_child->available = 0;
323 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d", cur_child->write_data_fd );
326 //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
327 if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
328 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
329 cur_child = cur_child->next;
333 //fprintf(stderr, "Wrote %d bytes to child\n", written);
335 forker->first_child = cur_child->next;
339 cur_child = cur_child->next;
342 /* if none available, add a new child if we can */
344 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
345 if( forker->current_num_children < forker->max_children ) {
346 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
347 forker->current_num_children );
349 prefork_child* new_child = launch_child( forker );
350 message_prepare_xml( cur_msg );
351 char* data = cur_msg->msg_xml;
352 if( ! data || strlen(data) < 1 ) break;
353 new_child->available = 0;
354 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
355 new_child->write_data_fd, new_child->pid );
356 write( new_child->write_data_fd, data, strlen(data) + 1 );
357 forker->first_child = new_child->next;
363 osrfLogWarning( OSRF_LOG_MARK, "No children available, sleeping and looping..." );
364 usleep( 50000 ); /* 50 milliseconds */
368 reap_children(forker);
371 //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
375 message_free( cur_msg );
377 } /* top level listen loop */
382 void check_children( prefork_simple* forker ) {
397 reap_children(forker);
399 prefork_child* cur_child = forker->first_child;
402 for( i = 0; i!= forker->current_num_children; i++ ) {
404 if( cur_child->read_status_fd > max_fd )
405 max_fd = cur_child->read_status_fd;
406 FD_SET( cur_child->read_status_fd, &read_set );
407 cur_child = cur_child->next;
410 FD_CLR(0,&read_set);/* just to be sure */
412 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
413 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
416 if( select_ret == 0 )
419 /* see if one of a child has told us it's done */
420 cur_child = forker->first_child;
423 for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
425 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
426 //printf( "Server received status from a child %d\n", cur_child->pid );
427 osrfLogDebug( OSRF_LOG_MARK, "Server received status from a child %d", cur_child->pid );
431 /* now suck off the data */
434 if( (n=read(cur_child->read_status_fd, buf, 63)) < 0 ) {
435 osrfLogWarning( OSRF_LOG_MARK, "Read error afer select in child status read with errno %d", errno);
438 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
439 cur_child->available = 1;
441 cur_child = cur_child->next;
447 void prefork_child_wait( prefork_child* child ) {
450 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
451 char buf[READ_BUFSIZE];
452 memset( buf, 0, READ_BUFSIZE );
454 for( i = 0; i < child->max_requests; i++ ) {
457 clr_fl(child->read_data_fd, O_NONBLOCK );
458 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
459 buffer_add( gbuf, buf );
460 memset( buf, 0, READ_BUFSIZE );
462 //fprintf(stderr, "Child read %d bytes\n", n);
464 if( n == READ_BUFSIZE ) {
465 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
467 /* either we have exactly READ_BUFSIZE data,
468 or there's more waiting that we need to grab*/
469 /* must set to non-block for reading more */
471 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
472 prefork_child_process_request(child, gbuf->buf);
473 buffer_reset( gbuf );
479 osrfLogWarning( OSRF_LOG_MARK, "Prefork child read returned error with errno %d", errno );
483 if( i < child->max_requests - 1 )
484 write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
489 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%d]",
490 child->max_requests, i, getpid() );
496 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
498 if( forker->first_child == NULL ) {
499 forker->first_child = child;
504 /* we put the child in as the last because, regardless,
505 we have to do the DLL splice dance, and this is the
508 prefork_child* start_child = forker->first_child;
510 if( forker->first_child->next == start_child )
512 forker->first_child = forker->first_child->next;
515 /* here we know that forker->first_child is the last element
516 in the list and start_child is the first. Insert the
517 new child between them*/
519 forker->first_child->next = child;
520 child->next = start_child;
524 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
526 if( forker->first_child == NULL ) { return NULL; }
527 prefork_child* start_child = forker->first_child;
529 if( forker->first_child->pid == pid )
530 return forker->first_child;
531 } while( (forker->first_child = forker->first_child->next) != start_child );
537 void del_prefork_child( prefork_simple* forker, pid_t pid ) {
539 if( forker->first_child == NULL ) { return; }
541 (forker->current_num_children)--;
542 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
544 prefork_child* start_child = forker->first_child; /* starting point */
545 prefork_child* cur_child = start_child; /* current pointer */
546 prefork_child* prev_child = start_child; /* the trailing pointer */
548 /* special case where there is only one in the list */
549 if( start_child == start_child->next ) {
550 if( start_child->pid == pid ) {
551 forker->first_child = NULL;
553 close( start_child->read_data_fd );
554 close( start_child->write_data_fd );
555 close( start_child->read_status_fd );
556 close( start_child->write_status_fd );
558 prefork_child_free( start_child );
564 /* special case where the first item in the list needs to be removed */
565 if( start_child->pid == pid ) {
567 /* find the last one so we can remove the start_child */
569 prev_child = cur_child;
570 cur_child = cur_child->next;
571 }while( cur_child != start_child );
573 /* now cur_child == start_child */
574 prev_child->next = cur_child->next;
575 forker->first_child = prev_child;
577 close( cur_child->read_data_fd );
578 close( cur_child->write_data_fd );
579 close( cur_child->read_status_fd );
580 close( cur_child->write_status_fd );
582 prefork_child_free( cur_child );
587 prev_child = cur_child;
588 cur_child = cur_child->next;
590 if( cur_child->pid == pid ) {
591 prev_child->next = cur_child->next;
593 close( cur_child->read_data_fd );
594 close( cur_child->write_data_fd );
595 close( cur_child->read_status_fd );
596 close( cur_child->write_status_fd );
598 prefork_child_free( cur_child );
602 } while(cur_child != start_child);
608 prefork_child* prefork_child_init(
609 int max_requests, int read_data_fd, int write_data_fd,
610 int read_status_fd, int write_status_fd ) {
612 prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
613 child->max_requests = max_requests;
614 child->read_data_fd = read_data_fd;
615 child->write_data_fd = write_data_fd;
616 child->read_status_fd = read_status_fd;
617 child->write_status_fd = write_status_fd;
618 child->available = 1;
624 int prefork_free( prefork_simple* prefork ) {
626 while( prefork->first_child != NULL ) {
627 osrfLogInfo( OSRF_LOG_MARK, "Killing children and sleeping 1 to reap..." );
632 client_free(prefork->connection);
633 free(prefork->appname);
638 int prefork_child_free( prefork_child* child ) {
639 free(child->appname);
640 close(child->read_data_fd);
641 close(child->write_status_fd);