1 #include <opensrf/osrf_prefork.h>
2 #include <opensrf/osrf_app_session.h>
3 #include <opensrf/osrf_application.h>
6 /* true if we just deleted a child. This will allow us to make sure we're
7 not trying to use freed memory */
11 void sigchld_handler( int sig );
13 int osrf_prefork_run(char* appname) {
16 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
20 set_proc_title( "OpenSRF Listener [%s]", appname );
26 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
28 jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
29 jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
30 jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
32 char* keepalive = osrf_settings_host_value("/apps/%s/keepalive", appname);
35 kalive = atoi(keepalive);
38 kalive = 5; /* give it a default */
41 osrfLogInfo(OSRF_LOG_MARK, "keepalive setting = %d seconds", kalive);
45 if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming 1000");
46 else maxr = (int) jsonObjectGetNumber(max_req);
48 if(!min_children) osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming 3");
49 else minc = (int) jsonObjectGetNumber(min_children);
51 if(!max_children) osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming 10");
52 else maxc = (int) jsonObjectGetNumber(max_children);
54 jsonObjectFree(max_req);
55 jsonObjectFree(min_children);
56 jsonObjectFree(max_children);
57 /* --------------------------------------------------- */
59 char* resc = va_list_to_string("%s_listener", appname);
61 if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc )) {
62 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
69 prefork_simple* forker = prefork_simple_init(
70 osrfSystemGetTransportClient(), maxr, minc, maxc);
73 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object");
77 forker->appname = strdup(appname);
78 forker->keepalive = kalive;
80 prefork_launch_children(forker);
82 osrf_prefork_register_routers(appname);
84 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
87 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() retuned - how??");
93 void osrf_prefork_register_routers( char* appname ) {
95 osrfStringArray* arr = osrfNewStringArray(4);
97 int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
98 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
99 transport_client* client = osrfSystemGetTransportClient();
101 osrfLogInfo( OSRF_LOG_MARK, "router name is %s and we have %d routers to connect to", routerName, c );
104 char* domain = osrfStringArrayGetString(arr, --c);
107 char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
108 osrfLogInfo( OSRF_LOG_MARK, "Registering with router %s", jid );
110 transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
111 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
113 client_send_message( client, msg );
120 osrfStringArrayFree(arr);
123 int prefork_child_init_hook(prefork_child* child) {
125 if(!child) return -1;
126 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
127 char* resc = va_list_to_string("%s_drone",child->appname);
129 /* if we're a source-client, tell the logger now that we're a new process*/
130 char* isclient = osrfConfigGetValue(NULL, "/client");
131 if( isclient && !strcasecmp(isclient,"true") )
132 osrfLogSetIsClient(1);
136 /* we want to remove traces of our parents socket connection
137 * so we can have our own */
138 osrfSystemIgnoreTransportClient();
140 if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) {
141 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
148 if( ! osrfAppRunChildInit(child->appname) ) {
149 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
151 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
155 set_proc_title( "OpenSRF Drone [%s]", child->appname );
159 void prefork_child_process_request(prefork_child* child, char* data) {
162 transport_client* client = osrfSystemGetTransportClient();
164 if(!client_connected(client)) {
165 osrfSystemIgnoreTransportClient();
166 osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
167 if(!osrf_system_bootstrap_client(NULL, NULL)) {
168 osrfLogError( OSRF_LOG_MARK,
169 "Unable to bootstrap client in prefork_child_process_request()");
171 osrf_prefork_child_exit(child);
175 /* construct the message from the xml */
176 transport_message* msg = new_message_from_xml( data );
178 osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
181 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
182 osrfAppSessionFree( session );
186 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
187 int keepalive = child->keepalive;
195 osrfLogDebug(OSRF_LOG_MARK,
196 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
198 retval = osrf_app_session_queue_wait(session, keepalive, &recvd);
201 osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
204 osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
208 /* see if the client disconnected from us */
209 if(session->state != OSRF_SESSION_CONNECTED) break;
211 /* if no data was reveived within the timeout interval */
212 if( !recvd && (end - start) >= keepalive ) {
213 osrfLogInfo(OSRF_LOG_MARK, "No request was reveived in %d seconds, exiting stateful session", keepalive);
214 osrfAppSessionStatus(
218 0, "Disconnected on timeout" );
224 osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
225 osrfAppSessionFree( session );
230 prefork_simple* prefork_simple_init( transport_client* client,
231 int max_requests, int min_children, int max_children ) {
233 if( min_children > max_children ) {
234 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
235 "than max_children (%d)", min_children, max_children );
239 if( max_children > ABS_MAX_CHILDREN ) {
240 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
241 max_children, ABS_MAX_CHILDREN );
245 osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
246 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
248 /* flesh out the struct */
249 prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));
250 prefork->max_requests = max_requests;
251 prefork->min_children = min_children;
252 prefork->max_children = max_children;
253 prefork->first_child = NULL;
254 prefork->connection = client;
259 prefork_child* launch_child( prefork_simple* forker ) {
265 /* Set up the data pipes and add the child struct to the parent */
266 if( pipe(data_fd) < 0 ) { /* build the data pipe*/
267 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
271 if( pipe(status_fd) < 0 ) {/* build the status pipe */
272 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
276 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
277 prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
278 data_fd[1], status_fd[0], status_fd[1] );
280 child->appname = strdup(forker->appname);
281 child->keepalive = forker->keepalive;
284 add_prefork_child( forker, child );
286 if( (pid=fork()) < 0 ) {
287 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
291 if( pid > 0 ) { /* parent */
293 signal(SIGCHLD, sigchld_handler);
294 (forker->current_num_children)++;
297 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
298 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
299 the children are currently using */
305 osrfLogInternal( OSRF_LOG_MARK, "I am new child with read_data_fd = %d and write_status_fd = %d",
306 child->read_data_fd, child->write_status_fd );
308 child->pid = getpid();
309 close( child->write_data_fd );
310 close( child->read_status_fd );
313 if( prefork_child_init_hook(child) == -1 ) {
314 osrfLogError(OSRF_LOG_MARK,
315 "Forker child going away because we could not connect to OpenSRF...");
316 osrf_prefork_child_exit(child);
319 prefork_child_wait( child );
320 osrf_prefork_child_exit(child); /* just to be sure */
325 void osrf_prefork_child_exit(prefork_child* child) {
326 osrfAppRunExitCode();
330 void prefork_launch_children( prefork_simple* forker ) {
333 while( c++ < forker->min_children )
334 launch_child( forker );
338 void sigchld_handler( int sig ) {
339 signal(SIGCHLD, sigchld_handler);
344 void reap_children( prefork_simple* forker ) {
349 while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
350 del_prefork_child( forker, child_pid );
353 while( forker->current_num_children < forker->min_children )
354 launch_child( forker );
359 void prefork_run(prefork_simple* forker) {
361 if( forker->first_child == NULL )
364 transport_message* cur_msg = NULL;
369 if( forker->first_child == NULL ) {/* no more children */
370 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
374 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
375 cur_msg = client_recv( forker->connection, -1 );
377 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
379 if( cur_msg == NULL ) continue;
381 int honored = 0; /* true if we've serviced the request */
386 if(!no_recheck) check_children( forker, 0 );
389 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
391 prefork_child* cur_child = forker->first_child;
393 /* Look for an available child */
394 for( k = 0; k < forker->current_num_children; k++ ) {
396 osrfLogInternal( OSRF_LOG_MARK, "Searching for available child. cur_child->pid = %d", cur_child->pid );
397 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d", forker->current_num_children, k);
399 if( cur_child->available ) {
400 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
402 message_prepare_xml( cur_msg );
403 char* data = cur_msg->msg_xml;
404 if( ! data || strlen(data) < 1 ) break;
406 cur_child->available = 0;
407 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d", cur_child->write_data_fd );
410 //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
411 if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
412 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
413 cur_child = cur_child->next;
417 //fprintf(stderr, "Wrote %d bytes to child\n", written);
419 forker->first_child = cur_child->next;
423 cur_child = cur_child->next;
426 /* if none available, add a new child if we can */
428 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
430 if( forker->current_num_children < forker->max_children ) {
431 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
432 forker->current_num_children );
434 prefork_child* new_child = launch_child( forker );
437 message_prepare_xml( cur_msg );
438 char* data = cur_msg->msg_xml;
441 int len = strlen(data);
444 new_child->available = 0;
445 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
446 new_child->write_data_fd, new_child->pid );
448 if( write( new_child->write_data_fd, data, len + 1 ) >= 0 ) {
449 forker->first_child = new_child->next;
460 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
462 check_children( forker, 1 ); /* non-poll version */
463 /* tell the loop no to call check_children again, since we're calling it now */
468 reap_children(forker);
471 //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
475 message_free( cur_msg );
477 } /* top level listen loop */
482 /** XXX Add a flag which tells select() to wait forever on children
483 * in the best case, this will be faster than calling usleep(x), and
484 * in the worst case it won't be slower and will do less logging...
487 void check_children( prefork_simple* forker, int forever ) {
499 reap_children(forker);
501 prefork_child* cur_child = forker->first_child;
504 for( i = 0; i!= forker->current_num_children; i++ ) {
506 if( cur_child->read_status_fd > max_fd )
507 max_fd = cur_child->read_status_fd;
508 FD_SET( cur_child->read_status_fd, &read_set );
509 cur_child = cur_child->next;
512 FD_CLR(0,&read_set);/* just to be sure */
515 osrfLogWarning(OSRF_LOG_MARK, "We have no children available - waiting for one to show up...");
517 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, NULL)) == -1 ) {
518 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
520 osrfLogInfo(OSRF_LOG_MARK, "select() completed after waiting on children to become available");
528 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
529 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
533 if( select_ret == 0 )
536 /* see if one of a child has told us it's done */
537 cur_child = forker->first_child;
540 for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
542 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
543 //printf( "Server received status from a child %d\n", cur_child->pid );
544 osrfLogDebug( OSRF_LOG_MARK, "Server received status from a child %d", cur_child->pid );
548 /* now suck off the data */
551 if( (n=read(cur_child->read_status_fd, buf, 63)) < 0 ) {
552 osrfLogWarning( OSRF_LOG_MARK, "Read error afer select in child status read with errno %d", errno);
555 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
556 cur_child->available = 1;
558 cur_child = cur_child->next;
564 void prefork_child_wait( prefork_child* child ) {
567 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
568 char buf[READ_BUFSIZE];
569 memset( buf, 0, READ_BUFSIZE );
571 for( i = 0; i < child->max_requests; i++ ) {
575 clr_fl(child->read_data_fd, O_NONBLOCK );
577 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
578 osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
580 set_fl(child->read_data_fd, O_NONBLOCK );
581 buffer_add( gbuf, buf );
582 memset( buf, 0, READ_BUFSIZE );
586 if( errno == EAGAIN ) n = 0;
588 if( errno == EPIPE ) {
589 osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
594 osrfLogWarning( OSRF_LOG_MARK, "Prefork child read returned error with errno %d", errno );
597 } else if( gotdata ) {
598 osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
599 prefork_child_process_request(child, gbuf->buf);
600 buffer_reset( gbuf );
603 if( i < child->max_requests - 1 )
604 write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
609 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
610 child->max_requests, i, (long) getpid() );
612 osrf_prefork_child_exit(child); /* just to be sure */
616 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
618 if( forker->first_child == NULL ) {
619 forker->first_child = child;
624 /* we put the child in as the last because, regardless,
625 we have to do the DLL splice dance, and this is the
628 prefork_child* start_child = forker->first_child;
630 if( forker->first_child->next == start_child )
632 forker->first_child = forker->first_child->next;
635 /* here we know that forker->first_child is the last element
636 in the list and start_child is the first. Insert the
637 new child between them*/
639 forker->first_child->next = child;
640 child->next = start_child;
644 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
646 if( forker->first_child == NULL ) { return NULL; }
647 prefork_child* start_child = forker->first_child;
649 if( forker->first_child->pid == pid )
650 return forker->first_child;
651 } while( (forker->first_child = forker->first_child->next) != start_child );
657 void del_prefork_child( prefork_simple* forker, pid_t pid ) {
659 if( forker->first_child == NULL ) { return; }
661 (forker->current_num_children)--;
662 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
664 prefork_child* start_child = forker->first_child; /* starting point */
665 prefork_child* cur_child = start_child; /* current pointer */
666 prefork_child* prev_child = start_child; /* the trailing pointer */
668 /* special case where there is only one in the list */
669 if( start_child == start_child->next ) {
670 if( start_child->pid == pid ) {
671 forker->first_child = NULL;
673 close( start_child->read_data_fd );
674 close( start_child->write_data_fd );
675 close( start_child->read_status_fd );
676 close( start_child->write_status_fd );
678 prefork_child_free( start_child );
684 /* special case where the first item in the list needs to be removed */
685 if( start_child->pid == pid ) {
687 /* find the last one so we can remove the start_child */
689 prev_child = cur_child;
690 cur_child = cur_child->next;
691 }while( cur_child != start_child );
693 /* now cur_child == start_child */
694 prev_child->next = cur_child->next;
695 forker->first_child = prev_child;
697 close( cur_child->read_data_fd );
698 close( cur_child->write_data_fd );
699 close( cur_child->read_status_fd );
700 close( cur_child->write_status_fd );
702 prefork_child_free( cur_child );
707 prev_child = cur_child;
708 cur_child = cur_child->next;
710 if( cur_child->pid == pid ) {
711 prev_child->next = cur_child->next;
713 close( cur_child->read_data_fd );
714 close( cur_child->write_data_fd );
715 close( cur_child->read_status_fd );
716 close( cur_child->write_status_fd );
718 prefork_child_free( cur_child );
722 } while(cur_child != start_child);
728 prefork_child* prefork_child_init(
729 int max_requests, int read_data_fd, int write_data_fd,
730 int read_status_fd, int write_status_fd ) {
732 prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
733 child->max_requests = max_requests;
734 child->read_data_fd = read_data_fd;
735 child->write_data_fd = write_data_fd;
736 child->read_status_fd = read_status_fd;
737 child->write_status_fd = write_status_fd;
738 child->available = 1;
744 int prefork_free( prefork_simple* prefork ) {
746 while( prefork->first_child != NULL ) {
747 osrfLogInfo( OSRF_LOG_MARK, "Killing children and sleeping 1 to reap..." );
752 client_free(prefork->connection);
753 free(prefork->appname);
758 int prefork_child_free( prefork_child* child ) {
759 free(child->appname);
760 close(child->read_data_fd);
761 close(child->write_status_fd);