1 #include "osrf_prefork.h"
3 #include "osrf_app_session.h"
5 /* true if we just deleted a child. This will allow us to make sure we're
6 not trying to use freed memory */
10 void sigchld_handler( int sig );
12 int osrf_prefork_run(char* appname) {
15 osrfLogError("osrf_prefork_run requires an appname to run!");
19 set_proc_title( "OpenSRF Listener [%s]", appname );
25 osrfLogInfo("Loading config in osrf_forker for app %s", appname);
27 jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
28 jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
29 jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
32 if(!max_req) osrfLogWarning("Max requests not defined, assuming 1000");
33 else maxr = (int) jsonObjectGetNumber(max_req);
35 if(!min_children) osrfLogWarning("Min children not defined, assuming 3");
36 else minc = (int) jsonObjectGetNumber(min_children);
38 if(!max_children) osrfLogWarning("Max children not defined, assuming 10");
39 else maxc = (int) jsonObjectGetNumber(max_children);
41 jsonObjectFree(max_req);
42 jsonObjectFree(min_children);
43 jsonObjectFree(max_children);
44 /* --------------------------------------------------- */
46 char* resc = va_list_to_string("%s_listener", appname);
48 if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc )) {
49 osrfLogError("Unable to bootstrap client for osrf_prefork_run()");
56 prefork_simple* forker = prefork_simple_init(
57 osrfSystemGetTransportClient(), maxr, minc, maxc);
59 forker->appname = strdup(appname);
62 osrfLogError("osrf_prefork_run() failed to create prefork_simple object");
66 prefork_launch_children(forker);
68 osrf_prefork_register_routers(appname);
70 osrfLogInfo("Launching osrf_forker for app %s", appname);
73 osrfLogWarning("prefork_run() retuned - how??");
79 void osrf_prefork_register_routers( char* appname ) {
81 osrfStringArray* arr = osrfNewStringArray(4);
83 int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
84 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
85 transport_client* client = osrfSystemGetTransportClient();
87 osrfLogInfo("router name is %s and we have %d routers to connect to", routerName, c );
90 char* domain = osrfStringArrayGetString(arr, --c);
93 char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
94 osrfLogInfo("Registering with router %s", jid );
96 transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
97 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
99 client_send_message( client, msg );
106 osrfStringArrayFree(arr);
109 void prefork_child_init_hook(prefork_child* child) {
112 osrfLogInfo("Child init hook for child %d", child->pid);
113 char* resc = va_list_to_string("%s_drone",child->appname);
115 if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) {
116 osrfLogError("Unable to bootstrap client for osrf_prefork_run()");
123 set_proc_title( "OpenSRF Drone [%s]", child->appname );
126 void prefork_child_process_request(prefork_child* child, char* data) {
129 /* construct the message from the xml */
130 transport_message* msg = new_message_from_xml( data );
132 osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
135 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
136 osrfAppSessionFree( session );
140 /* keepalive loop for stateful sessions */
142 osrfLogDebug("Entering keepalive loop for session %s", session->session_id );
146 prefork_simple* prefork_simple_init( transport_client* client,
147 int max_requests, int min_children, int max_children ) {
149 if( min_children > max_children ) {
150 osrfLogError( "min_children (%d) is greater "
151 "than max_children (%d)", min_children, max_children );
155 if( max_children > ABS_MAX_CHILDREN ) {
156 osrfLogError( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
157 max_children, ABS_MAX_CHILDREN );
161 /* flesh out the struct */
162 prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));
163 prefork->max_requests = max_requests;
164 prefork->min_children = min_children;
165 prefork->max_children = max_children;
166 prefork->first_child = NULL;
167 prefork->connection = client;
172 prefork_child* launch_child( prefork_simple* forker ) {
178 /* Set up the data pipes and add the child struct to the parent */
179 if( pipe(data_fd) < 0 ) { /* build the data pipe*/
180 osrfLogError( "Pipe making error" );
184 if( pipe(status_fd) < 0 ) {/* build the status pipe */
185 osrfLogError( "Pipe making error" );
189 osrfLogDebug( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
190 prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
191 data_fd[1], status_fd[0], status_fd[1] );
193 child->appname = strdup(forker->appname);
196 add_prefork_child( forker, child );
198 if( (pid=fork()) < 0 ) {
199 osrfLogError( "Forking Error" );
203 if( pid > 0 ) { /* parent */
205 signal(SIGCHLD, sigchld_handler);
206 (forker->current_num_children)++;
209 osrfLogInfo( "Parent launched %d", pid );
210 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
211 the children are currently using */
217 osrfLogDebug("I am new child with read_data_fd = %d and write_status_fd = %d",
218 child->read_data_fd, child->write_status_fd );
220 child->pid = getpid();
221 close( child->write_data_fd );
222 close( child->read_status_fd );
225 prefork_child_init_hook(child);
227 prefork_child_wait( child );
228 exit(0); /* just to be sure */
234 void prefork_launch_children( prefork_simple* forker ) {
237 while( c++ < forker->min_children )
238 launch_child( forker );
242 void sigchld_handler( int sig ) {
243 signal(SIGCHLD, sigchld_handler);
248 void reap_children( prefork_simple* forker ) {
253 while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
254 del_prefork_child( forker, child_pid );
257 while( forker->current_num_children < forker->min_children )
258 launch_child( forker );
263 void prefork_run(prefork_simple* forker) {
265 if( forker->first_child == NULL )
268 transport_message* cur_msg = NULL;
273 if( forker->first_child == NULL ) {/* no more children */
274 osrfLogWarning("No more children..." );
278 osrfLogDebug("Forker going into wait for data...");
279 cur_msg = client_recv( forker->connection, -1 );
281 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
283 if( cur_msg == NULL ) continue;
285 int honored = 0; /* true if we've serviced the request */
289 check_children( forker );
291 osrfLogDebug( "Server received inbound data" );
293 prefork_child* cur_child = forker->first_child;
295 /* Look for an available child */
296 for( k = 0; k < forker->current_num_children; k++ ) {
298 osrfLogDebug("Searching for available child. cur_child->pid = %d", cur_child->pid );
299 osrfLogDebug("Current num children %d and loop %d", forker->current_num_children, k);
301 if( cur_child->available ) {
302 osrfLogDebug( "sending data to %d", cur_child->pid );
304 message_prepare_xml( cur_msg );
305 char* data = cur_msg->msg_xml;
306 if( ! data || strlen(data) < 1 ) break;
308 cur_child->available = 0;
309 osrfLogDebug( "Writing to child fd %d", cur_child->write_data_fd );
312 //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
313 if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
314 osrfLogWarning("Write returned error %d", errno);
315 cur_child = cur_child->next;
319 //fprintf(stderr, "Wrote %d bytes to child\n", written);
321 forker->first_child = cur_child->next;
325 cur_child = cur_child->next;
328 /* if none available, add a new child if we can */
330 osrfLogDebug("Not enough children, attempting to add...");
331 if( forker->current_num_children < forker->max_children ) {
332 osrfLogDebug( "Launching new child with current_num = %d",
333 forker->current_num_children );
335 prefork_child* new_child = launch_child( forker );
336 message_prepare_xml( cur_msg );
337 char* data = cur_msg->msg_xml;
338 if( ! data || strlen(data) < 1 ) break;
339 new_child->available = 0;
340 osrfLogDebug( "sending data to %d", new_child->pid );
341 osrfLogDebug( "Writing to new child fd %d", new_child->write_data_fd );
342 write( new_child->write_data_fd, data, strlen(data) + 1 );
343 forker->first_child = new_child->next;
349 osrfLogWarning( "No children available, sleeping and looping..." );
350 usleep( 50000 ); /* 50 milliseconds */
354 reap_children(forker);
357 //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
361 message_free( cur_msg );
363 } /* top level listen loop */
368 void check_children( prefork_simple* forker ) {
383 reap_children(forker);
385 prefork_child* cur_child = forker->first_child;
388 for( i = 0; i!= forker->current_num_children; i++ ) {
390 if( cur_child->read_status_fd > max_fd )
391 max_fd = cur_child->read_status_fd;
392 FD_SET( cur_child->read_status_fd, &read_set );
393 cur_child = cur_child->next;
396 FD_CLR(0,&read_set);/* just to be sure */
398 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
399 osrfLogWarning( "Select returned error %d on check_children", errno );
402 if( select_ret == 0 )
405 /* see if one of a child has told us it's done */
406 cur_child = forker->first_child;
409 for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
411 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
412 //printf( "Server received status from a child %d\n", cur_child->pid );
413 osrfLogDebug( "Server received status from a child %d", cur_child->pid );
417 /* now suck off the data */
420 if( (n=read(cur_child->read_status_fd, buf, 63)) < 0 ) {
421 osrfLogWarning("Read error afer select in child status read with errno %d", errno);
424 osrfLogDebug( "Read %d bytes from status buffer: %s", n, buf );
425 cur_child->available = 1;
427 cur_child = cur_child->next;
433 void prefork_child_wait( prefork_child* child ) {
436 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
437 char buf[READ_BUFSIZE];
438 memset( buf, 0, READ_BUFSIZE );
440 for( i = 0; i!= child->max_requests; i++ ) {
443 clr_fl(child->read_data_fd, O_NONBLOCK );
444 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
445 buffer_add( gbuf, buf );
446 memset( buf, 0, READ_BUFSIZE );
448 //fprintf(stderr, "Child read %d bytes\n", n);
450 if( n == READ_BUFSIZE ) {
451 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
453 /* either we have exactly READ_BUFSIZE data,
454 or there's more waiting that we need to grab*/
455 /* must set to non-block for reading more */
457 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
458 prefork_child_process_request(child, gbuf->buf);
459 buffer_reset( gbuf );
465 osrfLogWarning( "Child read returned error with errno %d", errno );
469 if( i < child->max_requests - 1 )
470 write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
475 osrfLogDebug("Child exiting...[%d]", getpid() );
481 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
483 if( forker->first_child == NULL ) {
484 forker->first_child = child;
489 /* we put the child in as the last because, regardless,
490 we have to do the DLL splice dance, and this is the
493 prefork_child* start_child = forker->first_child;
495 if( forker->first_child->next == start_child )
497 forker->first_child = forker->first_child->next;
500 /* here we know that forker->first_child is the last element
501 in the list and start_child is the first. Insert the
502 new child between them*/
504 forker->first_child->next = child;
505 child->next = start_child;
509 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
511 if( forker->first_child == NULL ) { return NULL; }
512 prefork_child* start_child = forker->first_child;
514 if( forker->first_child->pid == pid )
515 return forker->first_child;
516 } while( (forker->first_child = forker->first_child->next) != start_child );
522 void del_prefork_child( prefork_simple* forker, pid_t pid ) {
524 if( forker->first_child == NULL ) { return; }
526 (forker->current_num_children)--;
527 osrfLogDebug("Deleting Child: %d", pid );
529 prefork_child* start_child = forker->first_child; /* starting point */
530 prefork_child* cur_child = start_child; /* current pointer */
531 prefork_child* prev_child = start_child; /* the trailing pointer */
533 /* special case where there is only one in the list */
534 if( start_child == start_child->next ) {
535 if( start_child->pid == pid ) {
536 forker->first_child = NULL;
538 close( start_child->read_data_fd );
539 close( start_child->write_data_fd );
540 close( start_child->read_status_fd );
541 close( start_child->write_status_fd );
543 prefork_child_free( start_child );
549 /* special case where the first item in the list needs to be removed */
550 if( start_child->pid == pid ) {
552 /* find the last one so we can remove the start_child */
554 prev_child = cur_child;
555 cur_child = cur_child->next;
556 }while( cur_child != start_child );
558 /* now cur_child == start_child */
559 prev_child->next = cur_child->next;
560 forker->first_child = prev_child;
562 close( cur_child->read_data_fd );
563 close( cur_child->write_data_fd );
564 close( cur_child->read_status_fd );
565 close( cur_child->write_status_fd );
567 prefork_child_free( cur_child );
572 prev_child = cur_child;
573 cur_child = cur_child->next;
575 if( cur_child->pid == pid ) {
576 prev_child->next = cur_child->next;
578 close( cur_child->read_data_fd );
579 close( cur_child->write_data_fd );
580 close( cur_child->read_status_fd );
581 close( cur_child->write_status_fd );
583 prefork_child_free( cur_child );
587 } while(cur_child != start_child);
593 prefork_child* prefork_child_init(
594 int max_requests, int read_data_fd, int write_data_fd,
595 int read_status_fd, int write_status_fd ) {
597 prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
598 child->max_requests = max_requests;
599 child->read_data_fd = read_data_fd;
600 child->write_data_fd = write_data_fd;
601 child->read_status_fd = read_status_fd;
602 child->write_status_fd = write_status_fd;
603 child->available = 1;
609 int prefork_free( prefork_simple* prefork ) {
611 while( prefork->first_child != NULL ) {
612 osrfLogInfo( "Killing children and sleeping 1 to reap..." );
617 client_free(prefork->connection);
618 free(prefork->appname);
623 int prefork_child_free( prefork_child* child ) {
624 free(child->appname);
625 close(child->read_data_fd);
626 close(child->write_status_fd);