3 @brief Spawn and manage a collection of child process to service requests.
5 Spawn a collection of child processes, replacing them as needed. Forward requests to them
6 and let the children do the work.
8 Each child processes some maximum number of requests before it terminates itself. When a
9 child dies, either deliberately or otherwise, we can spawn another one to replace it,
10 keeping the number of children within a predefined range.
12 Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13 a request, and who are still working on them. Use a separate linear linked list to keep
14 track of children that are currently idle. Move them back and forth as needed.
16 For each child, set up two pipes:
17 - One for the parent to send requests to the child.
18 - One for the child to notify the parent that it is available for another request.
20 The message sent to the child represents an XML stanza as received from Jabber.
22 When the child finishes processing the request, it writes the string "available" back
23 to the parent. Then the parent knows that it can send that child another request.
28 #include <sys/types.h>
34 #include <sys/select.h>
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
48 int max_requests; /**< How many requests a child processes before terminating. */
49 int min_children; /**< Minimum number of children to maintain. */
50 int max_children; /**< Maximum number of children to maintain. */
51 int fd; /**< Unused. */
52 int data_to_child; /**< Unused. */
53 int data_to_parent; /**< Unused. */
54 int current_num_children; /**< How many children are currently on the list. */
55 int keepalive; /**< Keepalive time for stateful sessions. */
56 char* appname; /**< Name of the application. */
57 /** Points to a circular linked list of children. */
58 struct prefork_child_struct* first_child;
59 /** List of of child processes that aren't doing anything at the moment and are
60 therefore available to service a new request. */
61 struct prefork_child_struct* idle_list;
62 /** List of allocated but unused prefork_children, available for reuse. Each one is just
63 raw memory, apart from the "next" pointer used to stitch them together. In particular,
64 there is no child process for them, and the file descriptors are not open. */
65 struct prefork_child_struct* free_list;
66 transport_client* connection; /**< Connection to Jabber. */
69 struct prefork_child_struct {
70 pid_t pid; /**< Process ID of the child. */
71 int read_data_fd; /**< Child uses to read request. */
72 int write_data_fd; /**< Parent uses to write request. */
73 int read_status_fd; /**< Parent reads to see if child is available. */
74 int write_status_fd; /**< Child uses to notify parent when it's available again. */
75 int max_requests; /**< How many requests a child can process before terminating. */
76 const char* appname; /**< Name of the application. */
77 int keepalive; /**< Keepalive time for stateful sessions. */
78 struct prefork_child_struct* next; /**< Linkage pointer for linked list. */
79 struct prefork_child_struct* prev; /**< Linkage pointer for linked list. */
82 typedef struct prefork_child_struct prefork_child;
84 /** Boolean. Set to true by a signal handler when it traps SIGCHLD. */
85 static volatile sig_atomic_t child_dead;
87 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
88 int max_requests, int min_children, int max_children );
89 static prefork_child* launch_child( prefork_simple* forker );
90 static void prefork_launch_children( prefork_simple* forker );
91 static void prefork_run( prefork_simple* forker );
92 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
94 static void del_prefork_child( prefork_simple* forker, pid_t pid );
95 static int check_children( prefork_simple* forker, int forever );
96 static int prefork_child_process_request( prefork_child*, char* data );
97 static int prefork_child_init_hook( prefork_child* );
98 static prefork_child* prefork_child_init( prefork_simple* forker,
99 int read_data_fd, int write_data_fd,
100 int read_status_fd, int write_status_fd );
102 /* listens on the 'data_to_child' fd and wait for incoming data */
103 static void prefork_child_wait( prefork_child* child );
104 static void prefork_clear( prefork_simple* );
105 static void prefork_child_free( prefork_simple* forker, prefork_child* );
106 static void osrf_prefork_register_routers( const char* appname );
107 static void osrf_prefork_child_exit( prefork_child* );
109 static void sigchld_handler( int sig );
112 @brief Spawn and manage a collection of drone processes for servicing requests.
113 @param appname Name of the application.
114 @return 0 if successful, or -1 if error.
116 int osrf_prefork_run( const char* appname ) {
119 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
123 set_proc_title( "OpenSRF Listener [%s]", appname );
130 // Get configuration settings
131 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname );
133 char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
134 char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
135 char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
136 char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname );
139 osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive );
141 kalive = atoi( keepalive );
144 osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr );
146 maxr = atoi( max_req );
149 osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming %d", minc );
151 minc = atoi( min_children );
154 osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming %d", maxc );
156 maxc = atoi( max_children );
160 free( min_children );
161 free( max_children );
162 /* --------------------------------------------------- */
164 char* resc = va_list_to_string( "%s_listener", appname );
166 // Make sure that we haven't already booted
167 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
168 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
175 prefork_simple forker;
177 if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
178 osrfLogError( OSRF_LOG_MARK,
179 "osrf_prefork_run() failed to create prefork_simple object" );
183 // Finish initializing the prefork_simple.
184 forker.appname = strdup( appname );
185 forker.keepalive = kalive;
187 // Spawn the children; put them in the idle list.
188 prefork_launch_children( &forker );
190 // Tell the router that you're open for business.
191 osrf_prefork_register_routers( appname );
193 // Sit back and let the requests roll in
194 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname );
195 prefork_run( &forker );
197 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??" );
198 prefork_clear( &forker );
203 @brief Register the application with a specified router.
204 @param appname Name of the application.
205 @param routerName Name of the router.
206 @param routerDomain Domain of the router.
208 Tell the router that you're open for business so that it can route requests to you.
210 Called only by the parent process.
212 static void osrf_prefork_send_router_registration(
213 const char* appname, const char* routerName, const char* routerDomain ) {
214 // Get a pointer to the global transport_client
215 transport_client* client = osrfSystemGetTransportClient();
217 // Construct the Jabber address of the router
218 char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
219 osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
221 // Create the registration message, and send it
222 transport_message* msg = message_init( "registering", NULL, NULL, jid, NULL );
223 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
224 client_send_message( client, msg );
232 @brief Register with a router, or not, according to some config settings.
233 @param appname Name of the application
234 @param RouterChunk A representation of part of the config file.
236 Parse a "complex" router configuration chunk.
238 Examine the services listed for a given router (normally in opensrf_core.xml). If
239 there is an entry for this service, or if there are @em no services listed, then
240 register with this router. Otherwise don't.
242 Called only by the parent process.
244 static void osrf_prefork_parse_router_chunk( const char* appname, const jsonObject* routerChunk ) {
246 const char* routerName = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "name" ));
247 const char* domain = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "domain" ));
248 const jsonObject* services = jsonObjectGetKeyConst( routerChunk, "services" );
249 osrfLogDebug( OSRF_LOG_MARK, "found router config with domain %s and name %s",
250 routerName, domain );
252 if( services && services->type == JSON_HASH ) {
253 osrfLogDebug( OSRF_LOG_MARK, "investigating router information..." );
254 const jsonObject* service_obj = jsonObjectGetKeyConst( services, "service" );
256 ; // do nothing (shouldn't happen)
257 else if( JSON_ARRAY == service_obj->type ) {
258 // There are multiple services listed. Register with this router
259 // if and only if this service is on the list.
261 for( j = 0; j < service_obj->size; j++ ) {
262 const char* service = jsonObjectGetString( jsonObjectGetIndex( service_obj, j ));
263 if( service && !strcmp( appname, service ))
264 osrf_prefork_send_router_registration( appname, routerName, domain );
267 else if( JSON_STRING == service_obj->type ) {
268 // There's only one service listed. Register with this router
269 // if and only if this service is the one listed.
270 if( !strcmp( appname, jsonObjectGetString( service_obj )) )
271 osrf_prefork_send_router_registration( appname, routerName, domain );
274 // This router is not restricted to any set of services,
275 // so go ahead and register with it.
276 osrf_prefork_send_router_registration( appname, routerName, domain );
281 @brief Register the application with one or more routers, according to the configuration.
282 @param appname Name of the application.
284 Called only by the parent process.
286 static void osrf_prefork_register_routers( const char* appname ) {
288 jsonObject* routerInfo = osrfConfigGetValueObject( NULL, "/routers/router" );
291 for( i = 0; i < routerInfo->size; i++ ) {
292 const jsonObject* routerChunk = jsonObjectGetIndex( routerInfo, i );
294 if( routerChunk->type == JSON_STRING ) {
295 /* this accomodates simple router configs */
296 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
297 char* domain = osrfConfigGetValue( NULL, "/routers/router" );
298 osrfLogDebug( OSRF_LOG_MARK, "found simple router settings with router name %s",
300 osrf_prefork_send_router_registration( appname, routerName, domain );
305 osrf_prefork_parse_router_chunk( appname, routerChunk );
309 jsonObjectFree( routerInfo );
313 @brief Initialize a child process.
314 @param child Pointer to the prefork_child representing the new child process.
315 @return Zero if successful, or -1 if not.
317 Called only by child processes. Actions:
318 - Connect to one or more cache servers
319 - Reconfigure logger, if necessary
320 - Discard parent's Jabber connection and open a new one
321 - Dynamically call an application-specific initialization routine
322 - Change the command line as reported by ps
324 static int prefork_child_init_hook( prefork_child* child ) {
326 if( !child ) return -1;
327 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid );
329 // Connect to cache server(s).
330 osrfSystemInitCache();
331 char* resc = va_list_to_string( "%s_drone", child->appname );
333 // If we're a source-client, tell the logger now that we're a new process.
334 char* isclient = osrfConfigGetValue( NULL, "/client" );
335 if( isclient && !strcasecmp( isclient,"true" ))
336 osrfLogSetIsClient( 1 );
339 // Remove traces of our parent's socket connection so we can have our own.
340 osrfSystemIgnoreTransportClient();
343 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
344 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
351 // Dynamically call the application-specific initialization function
352 // from a previously loaded shared library.
353 if( ! osrfAppRunChildInit( child->appname )) {
354 osrfLogDebug( OSRF_LOG_MARK, "Prefork child_init succeeded\n" );
356 osrfLogError( OSRF_LOG_MARK, "Prefork child_init failed\n" );
360 // Change the command line as reported by ps
361 set_proc_title( "OpenSRF Drone [%s]", child->appname );
366 @brief Respond to a client request forwarded by the parent.
367 @param child Pointer to the state of the child process.
368 @param data Pointer to the raw XMPP message received from the parent.
369 @return 0 on success; non-zero means that the child process should clean itself up
370 and terminate immediately, presumably due to a fatal error condition.
372 Called only by a child process.
374 static int prefork_child_process_request( prefork_child* child, char* data ) {
375 if( !child ) return 0;
377 transport_client* client = osrfSystemGetTransportClient();
379 // Make sure that we're still connected to Jabber; reconnect if necessary.
380 if( !client_connected( client )) {
381 osrfSystemIgnoreTransportClient();
382 osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
383 if( !osrf_system_bootstrap_client( NULL, NULL )) {
384 osrfLogError( OSRF_LOG_MARK,
385 "Unable to bootstrap client in prefork_child_process_request()" );
387 osrf_prefork_child_exit( child );
391 // Construct the message from the xml.
392 transport_message* msg = new_message_from_xml( data );
394 // Respond to the transport message. This is where method calls are buried.
395 osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
399 int rc = session->panic;
402 osrfLogWarning( OSRF_LOG_MARK,
403 "Drone for session %s terminating immediately", session->session_id );
404 osrfAppSessionFree( session );
408 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
409 // We're no longer connected to the client, which presumably means that
410 // we're done with this request. Bail out.
411 osrfAppSessionFree( session );
415 // If we get this far, then the client has opened an application connection so that it
416 // can send multiple requests directly to the same server drone, bypassing the router
417 // and the listener. For example, it may need to do a database transaction, requiring
418 // multiple method calls within the same database session.
420 // Hence we go into a loop, responding to successive requests from the same client, until
421 // either the client disconnects or an error occurs.
423 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
424 int keepalive = child->keepalive;
432 // Respond to any input messages. This is where the method calls are buried.
433 osrfLogDebug( OSRF_LOG_MARK,
434 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive );
435 start = time( NULL );
436 retval = osrf_app_session_queue_wait( session, keepalive, &recvd );
439 osrfLogDebug( OSRF_LOG_MARK, "Data received == %d", recvd );
441 // Now we check a number of possible reasons to exit the loop.
443 // If the method call decided to terminate immediately,
444 // note that for future reference.
448 // If an error occurred when we tried to service the request, exit the loop.
450 osrfLogError( OSRF_LOG_MARK, "queue-wait returned non-success %d", retval );
454 // If the client disconnected, exit the loop.
455 if( session->state != OSRF_SESSION_CONNECTED )
458 // If we timed out while waiting for a request, exit the loop.
459 if( !recvd && (end - start) >= keepalive ) {
460 osrfLogInfo( OSRF_LOG_MARK,
461 "No request was received in %d seconds, exiting stateful session", keepalive );
462 osrfAppSessionStatus(
466 0, "Disconnected on timeout" );
471 // If the child process has decided to terminate immediately, exit the loop.
476 osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
477 osrfAppSessionFree( session );
482 @brief Partially initialize a prefork_simple provided by the caller.
483 @param prefork Pointer to a a raw prefork_simple to be initialized.
484 @param client Pointer to a transport_client (connection to Jabber).
485 @param max_requests The maximum number of requests that a child process may service
487 @param min_children Minimum number of child processes to maintain.
488 @param max_children Maximum number of child processes to maintain.
489 @return 0 if successful, or 1 if not (due to invalid parameters).
491 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
492 int max_requests, int min_children, int max_children ) {
494 if( min_children > max_children ) {
495 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
496 "than max_children (%d)", min_children, max_children );
500 if( max_children > ABS_MAX_CHILDREN ) {
501 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
502 max_children, ABS_MAX_CHILDREN );
506 osrfLogInfo( OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
507 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
509 /* flesh out the struct */
510 prefork->max_requests = max_requests;
511 prefork->min_children = min_children;
512 prefork->max_children = max_children;
514 prefork->data_to_child = 0;
515 prefork->data_to_parent = 0;
516 prefork->current_num_children = 0;
517 prefork->keepalive = 0;
518 prefork->appname = NULL;
519 prefork->first_child = NULL;
520 prefork->idle_list = NULL;
521 prefork->free_list = NULL;
522 prefork->connection = client;
528 @brief Spawn a new child process and put it in the idle list.
529 @param forker Pointer to the prefork_simple that will own the process.
530 @return Pointer to the new prefork_child, or not at all.
532 Spawn a new child process. Create a prefork_child for it and put it in the idle list.
534 After forking, the parent returns a pointer to the new prefork_child. The child
535 services its quota of requests and then terminates without returning.
537 static prefork_child* launch_child( prefork_simple* forker ) {
543 // Set up the data and status pipes
544 if( pipe( data_fd ) < 0 ) { /* build the data pipe*/
545 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
549 if( pipe( status_fd ) < 0 ) {/* build the status pipe */
550 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
556 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
557 data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
559 // Create and initialize a prefork_child for the new process
560 prefork_child* child = prefork_child_init( forker, data_fd[0],
561 data_fd[1], status_fd[0], status_fd[1] );
563 if( (pid=fork()) < 0 ) {
564 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
565 prefork_child_free( forker, child );
569 // Add the new child to the head of the idle list
570 child->next = forker->idle_list;
571 forker->idle_list = child;
573 if( pid > 0 ) { /* parent */
575 signal( SIGCHLD, sigchld_handler );
576 ( forker->current_num_children )++;
579 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
580 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
581 the children are currently using */
587 osrfLogInternal( OSRF_LOG_MARK,
588 "I am new child with read_data_fd = %d and write_status_fd = %d",
589 child->read_data_fd, child->write_status_fd );
591 child->pid = getpid();
592 close( child->write_data_fd );
593 close( child->read_status_fd );
596 if( prefork_child_init_hook( child ) == -1 ) {
597 osrfLogError( OSRF_LOG_MARK,
598 "Forker child going away because we could not connect to OpenSRF..." );
599 osrf_prefork_child_exit( child );
602 prefork_child_wait( child ); // Should exit without returning
603 osrf_prefork_child_exit( child ); // Just to be sure
604 return NULL; // Unreachable, but it keeps the compiler happy
609 @brief Terminate a child process.
610 @param child Pointer to the prefork_child representing the child process (not used).
612 Called only by child processes. Dynamically call an application-specific shutdown
613 function from a previously loaded shared library; then exit.
615 static void osrf_prefork_child_exit( prefork_child* child ) {
616 osrfAppRunExitCode();
621 @brief Launch all the child processes, putting them in the idle list.
622 @param forker Pointer to the prefork_simple that will own the children.
624 Called only by the parent process (in order to become a parent).
626 static void prefork_launch_children( prefork_simple* forker ) {
627 if( !forker ) return;
629 while( c++ < forker->min_children )
630 launch_child( forker );
634 @brief Signal handler for SIGCHLD: note that a child process has terminated.
635 @param sig The value of the trapped signal; always SIGCHLD.
637 Set a boolean to be checked later.
639 static void sigchld_handler( int sig ) {
640 signal( SIGCHLD, sigchld_handler );
645 @brief Replenish the collection of child processes, after one has terminated.
646 @param forker Pointer to the prefork_simple that manages the child processes.
648 The parent calls this function when it notices (via a signal handler) that
649 a child process has died.
651 Wait on the dead children so that they won't be zombies. Spawn new ones as needed
652 to maintain at least a minimum number.
654 void reap_children( prefork_simple* forker ) {
658 // Reset our boolean so that we can detect any further terminations.
661 // Bury the children so that they won't be zombies. WNOHANG means that waitpid() returns
662 // immediately if there are no waitable children, instead of waiting for more to die.
663 // Ignore the return code of the child. We don't do an autopsy.
664 while( (child_pid = waitpid( -1, NULL, WNOHANG )) > 0 ) {
665 --forker->current_num_children;
666 del_prefork_child( forker, child_pid );
669 // Spawn more children as needed.
670 while( forker->current_num_children < forker->min_children )
671 launch_child( forker );
675 @brief Read transport_messages and dispatch them to child processes for servicing.
676 @param forker Pointer to the prefork_simple that manages the child processes.
678 This is the main loop of the parent process, and once entered, does not exit.
680 For each usable transport_message received: look for an idle child to service it. If
681 no idle children are available, either spawn a new one or, if we've already spawned the
682 maximum number of children, wait for one to become available. Once a child is available
683 by whatever means, write an XML version of the input message, to a pipe designated for
686 static void prefork_run( prefork_simple* forker ) {
688 if( NULL == forker->idle_list )
689 return; // No available children, and we haven't even started yet
691 transport_message* cur_msg = NULL;
695 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
696 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
700 // Wait indefinitely for an input message
701 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
702 cur_msg = client_recv( forker->connection, -1 );
704 if( cur_msg == NULL )
705 continue; // Error? Interrupted by a signal? Try again...
707 message_prepare_xml( cur_msg );
708 const char* msg_data = cur_msg->msg_xml;
709 if( ! msg_data || ! *msg_data ) {
710 osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
711 (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
712 message_free( cur_msg );
713 continue; // Message not usable; go on to the next one.
716 int honored = 0; /* will be set to true when we service the request */
722 if(check_children( forker, 0 ) < 0) {
723 continue; // check failed, try again
728 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
730 prefork_child* cur_child = NULL;
732 // Look for an available child in the idle list. Since the idle list operates
733 // as a stack, the child we get is the one that was most recently active, or
734 // most recently spawned. That means it's the one most likely still to be in
735 // physical memory, and the one least likely to have to be swapped in.
736 while( forker->idle_list ) {
738 osrfLogDebug( OSRF_LOG_MARK, "Looking for idle child" );
739 // Grab the prefork_child at the head of the idle list
740 cur_child = forker->idle_list;
741 forker->idle_list = cur_child->next;
742 cur_child->next = NULL;
744 osrfLogInternal( OSRF_LOG_MARK,
745 "Searching for available child. cur_child->pid = %d", cur_child->pid );
746 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
747 forker->current_num_children );
749 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
750 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
751 cur_child->write_data_fd );
753 int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
755 // This child appears to be dead or unusable. Discard it.
756 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
757 errno, strerror( errno ));
758 kill( cur_child->pid, SIGKILL );
759 del_prefork_child( forker, cur_child->pid );
763 add_prefork_child( forker, cur_child ); // Add it to active list
768 /* if none available, add a new child if we can */
770 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add..." );
772 if( forker->current_num_children < forker->max_children ) {
773 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
774 forker->current_num_children );
776 launch_child( forker ); // Put a new child into the idle list
777 if( forker->idle_list ) {
779 // Take the new child from the idle list
780 prefork_child* new_child = forker->idle_list;
781 forker->idle_list = new_child->next;
782 new_child->next = NULL;
784 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
785 new_child->write_data_fd, new_child->pid );
788 new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
790 // This child appears to be dead or unusable. Discard it.
791 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
792 errno, strerror( errno ));
793 kill( cur_child->pid, SIGKILL );
794 del_prefork_child( forker, cur_child->pid );
796 add_prefork_child( forker, new_child );
801 osrfLogWarning( OSRF_LOG_MARK, "Could not launch a new child as %d children "
802 "were already running; consider increasing max_children for this "
803 "application higher than %d in the OpenSRF configuration if this "
804 "message occurs frequently",
805 forker->current_num_children, forker->max_children );
810 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
811 if( check_children( forker, 1 ) >= 0 ) {
812 // Tell the loop not to call check_children again, since we just successfully called it
818 reap_children( forker );
820 } // end while( ! honored )
822 message_free( cur_msg );
824 } /* end top level listen loop */
829 @brief See if any children have become available.
830 @param forker Pointer to the prefork_simple that owns the children.
831 @param forever Boolean: true if we should wait indefinitely.
832 @return 0 or greater if successful, -1 on select error/interrupt
834 Call select() for all the children in the active list. Read each active file
835 descriptor and move the corresponding child to the idle list.
837 If @a forever is true, wait indefinitely for input. Otherwise return immediately if
838 there are no active file descriptors.
840 static int check_children( prefork_simple* forker, int forever ) {
843 reap_children( forker );
845 if( NULL == forker->first_child ) {
846 // If forever is true, then we're here because we've run out of idle
847 // processes, so there should be some active ones around.
848 // If forever is false, then the children may all be idle, and that's okay.
850 osrfLogError( OSRF_LOG_MARK, "No active child processes to check" );
856 FD_ZERO( &read_set );
860 // Prepare to select() on pipes from all the active children
861 prefork_child* cur_child = forker->first_child;
863 if( cur_child->read_status_fd > max_fd )
864 max_fd = cur_child->read_status_fd;
865 FD_SET( cur_child->read_status_fd, &read_set );
866 cur_child = cur_child->next;
867 } while( cur_child != forker->first_child );
869 FD_CLR( 0, &read_set ); /* just to be sure */
872 osrfLogWarning( OSRF_LOG_MARK,
873 "We have no children available - waiting for one to show up..." );
875 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL )) == -1 ) {
876 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
877 errno, strerror( errno ));
879 osrfLogInfo( OSRF_LOG_MARK,
880 "select() completed after waiting on children to become available" );
888 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv )) == -1 ) {
889 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
890 errno, strerror( errno ));
894 if( select_ret <= 0 ) // we're done here
897 // Check each child in the active list.
898 // If it has responded, move it to the idle list.
899 cur_child = forker->first_child;
900 prefork_child* next_child = NULL;
903 next_child = cur_child->next;
904 if( FD_ISSET( cur_child->read_status_fd, &read_set )) {
905 osrfLogDebug( OSRF_LOG_MARK,
906 "Server received status from a child %d", cur_child->pid );
910 /* now suck off the data */
912 if( (n=read( cur_child->read_status_fd, buf, sizeof( buf ) - 1 )) < 0 ) {
913 osrfLogWarning( OSRF_LOG_MARK,
914 "Read error after select in child status read with errno %d: %s",
915 errno, strerror( errno ));
919 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
922 // Remove the child from the active list
923 if( forker->first_child == cur_child ) {
924 if( cur_child->next == cur_child )
925 forker->first_child = NULL; // only child in the active list
927 forker->first_child = cur_child->next;
929 cur_child->next->prev = cur_child->prev;
930 cur_child->prev->next = cur_child->next;
932 // Add it to the idle list
933 cur_child->prev = NULL;
934 cur_child->next = forker->idle_list;
935 forker->idle_list = cur_child;
937 cur_child = next_child;
938 } while( forker->first_child && forker->first_child != next_child );
944 @brief Service up a set maximum number of requests; then shut down.
945 @param child Pointer to the prefork_child representing the child process.
947 Called only by child process.
949 Enter a loop, for up to max_requests iterations. On each iteration:
950 - Wait indefinitely for a request from the parent.
951 - Service the request.
952 - Increment a counter. If the limit hasn't been reached, notify the parent that you
953 are available for another request.
955 After exiting the loop, shut down and terminate the process.
957 static void prefork_child_wait( prefork_child* child ) {
960 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
961 char buf[READ_BUFSIZE];
963 for( i = 0; i < child->max_requests; i++ ) {
966 int gotdata = 0; // boolean; set to true if we get data
967 clr_fl( child->read_data_fd, O_NONBLOCK );
969 // Read a request from the parent, via a pipe, into a growing_buffer.
970 while( (n = read( child->read_data_fd, buf, READ_BUFSIZE-1 )) > 0 ) {
972 osrfLogDebug( OSRF_LOG_MARK, "Prefork child read %d bytes of data", n );
974 set_fl( child->read_data_fd, O_NONBLOCK );
977 buffer_add_n( gbuf, buf, n );
980 if( errno == EAGAIN )
983 if( errno == EPIPE ) {
984 osrfLogDebug( OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting..." );
988 int terminate_now = 0; // Boolean
991 osrfLogWarning( OSRF_LOG_MARK,
992 "Prefork child read returned error with errno %d", errno );
995 } else if( gotdata ) {
996 // Process the request
997 osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
998 terminate_now = prefork_child_process_request( child, gbuf->buf );
999 buffer_reset( gbuf );
1002 if( terminate_now ) {
1003 // We're terminating prematurely -- presumably due to a fatal error condition.
1004 osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
1008 if( i < child->max_requests - 1 ) {
1009 // Report back to the parent for another request.
1011 ssize_t len = write(
1012 child->write_status_fd, "available" /*less than 64 bytes*/, msg_len );
1013 if( len != msg_len ) {
1014 osrfLogError( OSRF_LOG_MARK,
1015 "Drone terminating: unable to notify listener of availability: %s",
1017 buffer_free( gbuf );
1018 osrf_prefork_child_exit( child );
1023 buffer_free( gbuf );
1025 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
1026 child->max_requests, i, (long) getpid());
1028 osrf_prefork_child_exit( child );
1032 @brief Add a prefork_child to the end of the active list.
1033 @param forker Pointer to the prefork_simple that owns the list.
1034 @param child Pointer to the prefork_child to be added.
1036 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
1038 if( forker->first_child == NULL ) {
1039 // Simplest case: list is initially empty.
1040 forker->first_child = child;
1041 child->next = child;
1042 child->prev = child;
1044 // Find the last node in the circular list.
1045 prefork_child* last_child = forker->first_child->prev;
1047 // Insert the new child between the last and first children.
1048 last_child->next = child;
1049 child->prev = last_child;
1050 child->next = forker->first_child;
1051 forker->first_child->prev = child;
1056 @brief Delete and destroy a dead child from our list.
1057 @param forker Pointer to the prefork_simple that owns the dead child.
1058 @param pid Process ID of the dead child.
1060 Look for the dead child first in the list of active children. If you don't find it
1061 there, look in the list of idle children. If you find it, remove it from whichever
1062 list it's on, and destroy it.
1064 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
1066 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
1068 prefork_child* cur_child = NULL;
1070 // Look first in the active list
1071 if( forker->first_child ) {
1072 cur_child = forker->first_child; /* current pointer */
1073 while( cur_child->pid != pid && cur_child->next != forker->first_child )
1074 cur_child = cur_child->next;
1076 if( cur_child->pid == pid ) {
1077 // We found the right node. Remove it from the list.
1078 if( cur_child->next == cur_child )
1079 forker->first_child = NULL; // only child in the list
1081 if( forker->first_child == cur_child )
1082 forker->first_child = cur_child->next; // Reseat forker->first_child
1084 // Stitch the adjacent nodes together
1085 cur_child->prev->next = cur_child->next;
1086 cur_child->next->prev = cur_child->prev;
1089 cur_child = NULL; // Didn't find it in the active list
1093 // Maybe it's in the idle list. This can happen if, for example,
1094 // a child is killed by a signal while it's between requests.
1096 prefork_child* prev = NULL;
1097 cur_child = forker->idle_list;
1098 while( cur_child && cur_child->pid != pid ) {
1100 cur_child = cur_child->next;
1104 // Detach from the list
1106 prev->next = cur_child->next;
1108 forker->idle_list = cur_child->next;
1109 } // else we can't find it
1112 // If we found the node, destroy it.
1114 prefork_child_free( forker, cur_child );
1118 @brief Create and initialize a prefork_child.
1119 @param forker Pointer to the prefork_simple that will own the prefork_child.
1120 @param read_data_fd Used by child to read request from parent.
1121 @param write_data_fd Used by parent to write request to child.
1122 @param read_status_fd Used by parent to read status from child.
1123 @param write_status_fd Used by child to write status to parent.
1124 @return Pointer to the newly created prefork_child.
1126 The calling code is responsible for freeing the prefork_child by calling
1127 prefork_child_free().
1129 static prefork_child* prefork_child_init( prefork_simple* forker,
1130 int read_data_fd, int write_data_fd,
1131 int read_status_fd, int write_status_fd ) {
1133 // Allocate a prefork_child -- from the free list if possible, or from
1134 // the heap if necessary. The free list is a non-circular, singly-linked list.
1135 prefork_child* child;
1136 if( forker->free_list ) {
1137 child = forker->free_list;
1138 forker->free_list = child->next;
1140 child = safe_malloc( sizeof( prefork_child ));
1143 child->read_data_fd = read_data_fd;
1144 child->write_data_fd = write_data_fd;
1145 child->read_status_fd = read_status_fd;
1146 child->write_status_fd = write_status_fd;
1147 child->max_requests = forker->max_requests;
1148 child->appname = forker->appname; // We don't make a separate copy
1149 child->keepalive = forker->keepalive;
1157 @brief Terminate all child processes and clear out a prefork_simple.
1158 @param prefork Pointer to the prefork_simple to be cleared out.
1160 We do not deallocate the prefork_simple itself, just its contents.
1162 static void prefork_clear( prefork_simple* prefork ) {
1164 // Kill all the active children, and move their prefork_child nodes to the free list.
1165 while( prefork->first_child ) {
1166 kill( prefork->first_child->pid, SIGKILL );
1167 del_prefork_child( prefork, prefork->first_child->pid );
1170 // Kill all the idle prefork children, close their file
1171 // descriptors, and move them to the free list.
1172 prefork_child* child = prefork->idle_list;
1173 prefork->idle_list = NULL;
1175 prefork_child* temp = child->next;
1176 kill( child->pid, SIGKILL );
1177 prefork_child_free( prefork, child );
1180 //prefork->current_num_children = 0;
1182 // Physically free the free list of prefork_children.
1183 child = prefork->free_list;
1184 prefork->free_list = NULL;
1186 prefork_child* temp = child->next;
1191 // Close the Jabber connection
1192 client_free( prefork->connection );
1193 prefork->connection = NULL;
1195 // After giving the child processes a second to terminate, wait on them so that they
1196 // don't become zombies. We don't wait indefinitely, so it's possible that some
1197 // children will survive a bit longer.
1199 while( (waitpid( -1, NULL, WNOHANG )) > 0 ) {
1200 --prefork->current_num_children;
1203 free( prefork->appname );
1204 prefork->appname = NULL;
1208 @brief Destroy and deallocate a prefork_child.
1209 @param forker Pointer to the prefork_simple that owns the prefork_child.
1210 @param child Pointer to the prefork_child to be destroyed.
1212 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1213 close( child->read_data_fd );
1214 close( child->write_data_fd );
1215 close( child->read_status_fd );
1216 close( child->write_status_fd );
1218 // Stick the prefork_child in a free list for potential reuse. This is a
1219 // non-circular, singly linked list.
1221 child->next = forker->free_list;
1222 forker->free_list = child;