1 #include "osrf_prefork.h"
4 /* true if we just deleted a child. This will allow us to make sure we're
5 not trying to use freed memory */
9 void sigchld_handler( int sig );
11 int osrf_prefork_run(char* appname) {
13 if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
19 info_handler("Loading config in osrf_forker for app %s", appname);
21 jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
22 jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
23 jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
25 if(!max_req) warning_handler("Max requests not defined, assuming 1000");
26 else maxr = (int) jsonObjectGetNumber(max_req);
28 if(!min_children) warning_handler("Min children not defined, assuming 3");
29 else minc = (int) jsonObjectGetNumber(min_children);
31 if(!max_children) warning_handler("Max children not defined, assuming 10");
32 else maxc = (int) jsonObjectGetNumber(max_children);
34 jsonObjectFree(max_req);
35 jsonObjectFree(min_children);
36 jsonObjectFree(max_children);
37 /* --------------------------------------------------- */
39 char* resc = va_list_to_string("%s_listener", appname);
41 if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc ))
42 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
45 prefork_simple* forker = prefork_simple_init(
46 osrf_system_get_transport_client(), maxr, minc, maxc);
48 forker->appname = strdup(appname);
51 fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
53 prefork_launch_children(forker);
55 info_handler("Launching osrf_forker for app %s", appname);
58 warning_handler("prefork_run() retuned - how??");
64 void osrf_prefork_register_routers() {
65 //char* router = osrf_config_value("//%s/
68 void prefork_child_init_hook(prefork_child* child) {
71 info_handler("Child init hook for child %d", child->pid);
72 char* resc = va_list_to_string("%s_drone",child->appname);
73 if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc))
74 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
78 void prefork_child_process_request(prefork_child* child, char* data) {
79 if(!child && child->connection) return;
81 /* construct the message from the xml */
82 debug_handler("Child %d received data from parent:\n%s\n", child->pid, data);
83 transport_message* msg = new_message_from_xml( data );
85 osrf_stack_transport_handler(msg, child->appname);
88 transport_message* ret_msg = message_init(
89 msg->body, msg->subject, msg->thread, msg->sender, NULL );
91 client_send_message(child->connection, ret_msg);
92 message_free( ret_msg );
94 printf("Message body size %d\n", strlen(msg->body));
96 printf( "Message Info\n" );
97 printf( "%s\n", msg->sender );
98 printf( "%s\n", msg->recipient );
99 printf( "%s\n", msg->thread );
100 printf( "%s\n", msg->body );
101 printf( "%s\n", msg->subject );
102 printf( "%s\n", msg->router_from );
103 printf( "%d\n", msg->broadcast );
110 prefork_simple* prefork_simple_init( transport_client* client,
111 int max_requests, int min_children, int max_children ) {
113 if( min_children > max_children )
114 fatal_handler( "min_children (%d) is greater "
115 "than max_children (%d)", min_children, max_children );
117 if( max_children > ABS_MAX_CHILDREN )
118 fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
119 max_children, ABS_MAX_CHILDREN );
121 /* flesh out the struct */
122 prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));
123 prefork->max_requests = max_requests;
124 prefork->min_children = min_children;
125 prefork->max_children = max_children;
126 prefork->first_child = NULL;
127 prefork->connection = client;
132 prefork_child* launch_child( prefork_simple* forker ) {
138 /* Set up the data pipes and add the child struct to the parent */
139 if( pipe(data_fd) < 0 ) /* build the data pipe*/
140 fatal_handler( "Pipe making error" );
142 if( pipe(status_fd) < 0 ) /* build the status pipe */
143 fatal_handler( "Pipe making error" );
145 debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
146 prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
147 data_fd[1], status_fd[0], status_fd[1] );
149 child->appname = strdup(forker->appname);
152 add_prefork_child( forker, child );
154 if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
156 if( pid > 0 ) { /* parent */
158 signal(SIGCHLD, sigchld_handler);
159 (forker->current_num_children)++;
162 info_handler( "Parent launched %d", pid );
163 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
164 the children are currently using */
170 debug_handler("I am new child with read_data_fd = %d and write_status_fd = %d",
171 child->read_data_fd, child->write_status_fd );
173 child->pid = getpid();
174 close( child->write_data_fd );
175 close( child->read_status_fd );
178 prefork_child_init_hook(child);
180 prefork_child_wait( child );
181 exit(0); /* just to be sure */
187 void prefork_launch_children( prefork_simple* forker ) {
190 while( c++ < forker->min_children )
191 launch_child( forker );
195 void sigchld_handler( int sig ) {
196 signal(SIGCHLD, sigchld_handler);
201 void reap_children( prefork_simple* forker ) {
206 while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
207 del_prefork_child( forker, child_pid );
210 while( forker->current_num_children < forker->min_children )
211 launch_child( forker );
216 void prefork_run(prefork_simple* forker) {
218 if( forker->first_child == NULL )
221 transport_message* cur_msg = NULL;
225 if( forker->first_child == NULL ) {/* no more children */
226 warning_handler("No more children..." );
230 debug_handler("Forker going into wait for data...");
232 cur_msg = client_recv( forker->connection, -1 );
234 fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
236 if( cur_msg == NULL ) continue;
238 int honored = 0; /* true if we've serviced the request */
242 check_children( forker );
244 debug_handler( "Server received inbound data" );
246 prefork_child* cur_child = forker->first_child;
248 /* Look for an available child */
249 for( k = 0; k < forker->current_num_children; k++ ) {
251 debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
252 debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
254 if( cur_child->available ) {
255 debug_handler( "sending data to %d", cur_child->pid );
256 message_prepare_xml( cur_msg );
257 char* data = cur_msg->msg_xml;
258 if( ! data || strlen(data) < 1 ) break;
259 cur_child->available = 0;
260 debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
263 fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
264 if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
265 warning_handler("Write returned error %d", errno);
266 cur_child = cur_child->next;
270 fprintf(stderr, "Wrote %d bytes to child\n", written);
272 forker->first_child = cur_child->next;
276 cur_child = cur_child->next;
279 /* if none available, add a new child if we can */
281 debug_handler("Not enough children, attempting to add...");
282 if( forker->current_num_children < forker->max_children ) {
283 debug_handler( "Launching new child with current_num = %d",
284 forker->current_num_children );
286 prefork_child* new_child = launch_child( forker );
287 message_prepare_xml( cur_msg );
288 char* data = cur_msg->msg_xml;
289 if( ! data || strlen(data) < 1 ) break;
290 new_child->available = 0;
291 debug_handler( "sending data to %d", new_child->pid );
292 debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
293 write( new_child->write_data_fd, data, strlen(data) + 1 );
294 forker->first_child = new_child->next;
300 warning_handler( "No children available, sleeping and looping..." );
301 usleep( 50000 ); /* 50 milliseconds */
305 reap_children(forker);
308 fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
312 } /* top level listen loop */
317 void check_children( prefork_simple* forker ) {
332 reap_children(forker);
334 prefork_child* cur_child = forker->first_child;
337 for( i = 0; i!= forker->current_num_children; i++ ) {
339 if( cur_child->read_status_fd > max_fd )
340 max_fd = cur_child->read_status_fd;
341 FD_SET( cur_child->read_status_fd, &read_set );
342 cur_child = cur_child->next;
345 FD_CLR(0,&read_set);/* just to be sure */
347 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
348 warning_handler( "Select returned error %d on check_children", errno );
351 if( select_ret == 0 )
354 /* see if one of a child has told us it's done */
355 cur_child = forker->first_child;
358 for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
360 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
361 printf( "Server received status from a child %d\n", cur_child->pid );
362 debug_handler( "Server received status from a child %d", cur_child->pid );
366 /* now suck off the data */
369 if( (n=read(cur_child->read_status_fd, buf, 63)) < 0 ) {
370 warning_handler("Read error afer select in child status read with errno %d", errno);
373 debug_handler( "Read %d bytes from status buffer: %s", n, buf );
374 cur_child->available = 1;
376 cur_child = cur_child->next;
382 void prefork_child_wait( prefork_child* child ) {
385 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
386 char buf[READ_BUFSIZE];
387 memset( buf, 0, READ_BUFSIZE );
389 for( i = 0; i!= child->max_requests; i++ ) {
392 clr_fl(child->read_data_fd, O_NONBLOCK );
393 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
394 buffer_add( gbuf, buf );
395 memset( buf, 0, READ_BUFSIZE );
397 fprintf(stderr, "Child read %d bytes\n", n);
399 if( n == READ_BUFSIZE ) {
400 fprintf(stderr, "We read READ_BUFSIZE data....\n");
402 /* either we have exactly READ_BUFSIZE data,
403 or there's more waiting that we need to grab*/
404 /* must set to non-block for reading more */
406 fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
407 prefork_child_process_request(child, gbuf->buf);
408 buffer_reset( gbuf );
414 warning_handler( "Child read returned error with errno %d", errno );
418 if( i < child->max_requests - 1 )
419 write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
424 debug_handler("Child exiting...[%d]", getpid() );
430 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
432 if( forker->first_child == NULL ) {
433 forker->first_child = child;
438 /* we put the child in as the last because, regardless,
439 we have to do the DLL splice dance, and this is the
442 prefork_child* start_child = forker->first_child;
444 if( forker->first_child->next == start_child )
446 forker->first_child = forker->first_child->next;
449 /* here we know that forker->first_child is the last element
450 in the list and start_child is the first. Insert the
451 new child between them*/
453 forker->first_child->next = child;
454 child->next = start_child;
458 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
460 if( forker->first_child == NULL ) { return NULL; }
461 prefork_child* start_child = forker->first_child;
463 if( forker->first_child->pid == pid )
464 return forker->first_child;
465 } while( (forker->first_child = forker->first_child->next) != start_child );
471 void del_prefork_child( prefork_simple* forker, pid_t pid ) {
473 if( forker->first_child == NULL ) { return; }
475 (forker->current_num_children)--;
476 debug_handler("Deleting Child: %d", pid );
478 prefork_child* start_child = forker->first_child; /* starting point */
479 prefork_child* cur_child = start_child; /* current pointer */
480 prefork_child* prev_child = start_child; /* the trailing pointer */
482 /* special case where there is only one in the list */
483 if( start_child == start_child->next ) {
484 if( start_child->pid == pid ) {
485 forker->first_child = NULL;
487 close( start_child->read_data_fd );
488 close( start_child->write_data_fd );
489 close( start_child->read_status_fd );
490 close( start_child->write_status_fd );
492 prefork_child_free( start_child );
498 /* special case where the first item in the list needs to be removed */
499 if( start_child->pid == pid ) {
501 /* find the last one so we can remove the start_child */
503 prev_child = cur_child;
504 cur_child = cur_child->next;
505 }while( cur_child != start_child );
507 /* now cur_child == start_child */
508 prev_child->next = cur_child->next;
509 forker->first_child = prev_child;
511 close( cur_child->read_data_fd );
512 close( cur_child->write_data_fd );
513 close( cur_child->read_status_fd );
514 close( cur_child->write_status_fd );
516 prefork_child_free( cur_child );
521 prev_child = cur_child;
522 cur_child = cur_child->next;
524 if( cur_child->pid == pid ) {
525 prev_child->next = cur_child->next;
527 close( cur_child->read_data_fd );
528 close( cur_child->write_data_fd );
529 close( cur_child->read_status_fd );
530 close( cur_child->write_status_fd );
532 prefork_child_free( cur_child );
536 } while(cur_child != start_child);
542 prefork_child* prefork_child_init(
543 int max_requests, int read_data_fd, int write_data_fd,
544 int read_status_fd, int write_status_fd ) {
546 prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
547 child->max_requests = max_requests;
548 child->read_data_fd = read_data_fd;
549 child->write_data_fd = write_data_fd;
550 child->read_status_fd = read_status_fd;
551 child->write_status_fd = write_status_fd;
552 child->available = 1;
558 int prefork_free( prefork_simple* prefork ) {
560 while( prefork->first_child != NULL ) {
561 info_handler( "Killing children and sleeping 1 to reap..." );
566 client_free(prefork->connection);
567 free(prefork->appname);
572 int prefork_child_free( prefork_child* child ) {
573 free(child->appname);
574 close(child->read_data_fd);
575 close(child->write_status_fd);