3 @brief Spawn and manage a collection of child process to service requests.
5 Spawn a collection of child processes, replacing them as needed. Forward requests to them
6 and let the children do the work.
8 Each child processes some maximum number of requests before it terminates itself. When a
9 child dies, either deliberately or otherwise, we can spawn another one to replace it,
10 keeping the number of children within a predefined range.
12 Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13 a request, and who are still working on them. Use a separate linear linked list to keep
14 track of children that are currently idle. Move them back and forth as needed.
16 For each child, set up two pipes:
17 - One for the parent to send requests to the child.
18 - One for the child to notify the parent that it is available for another request.
20 The message sent to the child represents an XML stanza as received from Jabber.
22 When the child finishes processing the request, it writes the string "available" back
23 to the parent. Then the parent knows that it can send that child another request.
28 #include <sys/types.h>
34 #include <sys/select.h>
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
48 int max_requests; /**< How many requests a child processes before terminating. */
49 int min_children; /**< Minimum number of children to maintain. */
50 int max_children; /**< Maximum number of children to maintain. */
51 int fd; /**< Unused. */
52 int data_to_child; /**< Unused. */
53 int data_to_parent; /**< Unused. */
54 int current_num_children; /**< How many children are currently on the list. */
55 int keepalive; /**< Keepalive time for stateful sessions. */
56 char* appname; /**< Name of the application. */
57 /** Points to a circular linked list of children. */
58 struct prefork_child_struct* first_child;
59 /** List of of child processes that aren't doing anything at the moment and are
60 therefore available to service a new request. */
61 struct prefork_child_struct* idle_list;
62 /** List of allocated but unused prefork_children, available for reuse. Each one is just
63 raw memory, apart from the "next" pointer used to stitch them together. In particular,
64 there is no child process for them, and the file descriptors are not open. */
65 struct prefork_child_struct* free_list;
66 transport_client* connection; /**< Connection to Jabber. */
69 struct prefork_child_struct {
70 pid_t pid; /**< Process ID of the child. */
71 int read_data_fd; /**< Child uses to read request. */
72 int write_data_fd; /**< Parent uses to write request. */
73 int read_status_fd; /**< Parent reads to see if child is available. */
74 int write_status_fd; /**< Child uses to notify parent when it's available again. */
75 int max_requests; /**< How many requests a child can process before terminating. */
76 const char* appname; /**< Name of the application. */
77 int keepalive; /**< Keepalive time for stateful sessions. */
78 struct prefork_child_struct* next; /**< Linkage pointer for linked list. */
79 struct prefork_child_struct* prev; /**< Linkage pointer for linked list. */
82 typedef struct prefork_child_struct prefork_child;
84 /** Boolean. Set to true by a signal handler when it traps SIGCHLD. */
85 static volatile sig_atomic_t child_dead;
87 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
88 int max_requests, int min_children, int max_children );
89 static prefork_child* launch_child( prefork_simple* forker );
90 static void prefork_launch_children( prefork_simple* forker );
91 static void prefork_run( prefork_simple* forker );
92 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
94 static void del_prefork_child( prefork_simple* forker, pid_t pid );
95 static int check_children( prefork_simple* forker, int forever );
96 static int prefork_child_process_request( prefork_child*, char* data );
97 static int prefork_child_init_hook( prefork_child* );
98 static prefork_child* prefork_child_init( prefork_simple* forker,
99 int read_data_fd, int write_data_fd,
100 int read_status_fd, int write_status_fd );
102 /* listens on the 'data_to_child' fd and wait for incoming data */
103 static void prefork_child_wait( prefork_child* child );
104 static void prefork_clear( prefork_simple*, bool graceful);
105 static void prefork_child_free( prefork_simple* forker, prefork_child* );
106 static void osrf_prefork_register_routers( const char* appname, bool unregister );
107 static void osrf_prefork_child_exit( prefork_child* );
109 static void sigchld_handler( int sig );
110 static void sigusr1_handler( int sig );
111 static void sigusr2_handler( int sig );
112 static void sigterm_handler( int sig );
113 static void sigint_handler( int sig );
115 /** Maintain a global pointer to the prefork_simple object
116 * for the current process so we can refer to it later
117 * for signal handling. There will only ever be one
118 * forker per process.
120 static prefork_simple *global_forker = NULL;
123 @brief Spawn and manage a collection of drone processes for servicing requests.
124 @param appname Name of the application.
125 @return 0 if successful, or -1 if error.
127 int osrf_prefork_run( const char* appname ) {
130 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
134 set_proc_title( "OpenSRF Listener [%s]", appname );
141 // Get configuration settings
142 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname );
144 char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
145 char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
146 char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
147 char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname );
150 osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive );
152 kalive = atoi( keepalive );
155 osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr );
157 maxr = atoi( max_req );
160 osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming %d", minc );
162 minc = atoi( min_children );
165 osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming %d", maxc );
167 maxc = atoi( max_children );
171 free( min_children );
172 free( max_children );
173 /* --------------------------------------------------- */
175 char* resc = va_list_to_string( "%s_listener", appname );
177 // Make sure that we haven't already booted
178 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
179 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
186 prefork_simple forker;
188 if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
189 osrfLogError( OSRF_LOG_MARK,
190 "osrf_prefork_run() failed to create prefork_simple object" );
194 // Finish initializing the prefork_simple.
195 forker.appname = strdup( appname );
196 forker.keepalive = kalive;
197 global_forker = &forker;
199 // Spawn the children; put them in the idle list.
200 prefork_launch_children( &forker );
202 // Tell the router that you're open for business.
203 osrf_prefork_register_routers( appname, false );
205 signal( SIGUSR1, sigusr1_handler);
206 signal( SIGUSR2, sigusr2_handler);
207 signal( SIGTERM, sigterm_handler);
208 signal( SIGINT, sigint_handler );
209 signal( SIGQUIT, sigint_handler );
211 // Sit back and let the requests roll in
212 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname );
213 prefork_run( &forker );
215 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??" );
216 prefork_clear( &forker, false );
221 @brief Register the application with a specified router.
222 @param appname Name of the application.
223 @param routerName Name of the router.
224 @param routerDomain Domain of the router.
226 Tell the router that you're open for business so that it can route requests to you.
228 Called only by the parent process.
230 static void osrf_prefork_send_router_registration(
231 const char* appname, const char* routerName,
232 const char* routerDomain, bool unregister ) {
234 // Get a pointer to the global transport_client
235 transport_client* client = osrfSystemGetTransportClient();
237 // Construct the Jabber address of the router
238 char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
240 // Create the registration message, and send it
241 transport_message* msg;
244 osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
245 msg = message_init( "unregistering", NULL, NULL, jid, NULL );
246 message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
250 osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
251 msg = message_init( "registering", NULL, NULL, jid, NULL );
252 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
255 client_send_message( client, msg );
263 @brief Register with a router, or not, according to some config settings.
264 @param appname Name of the application
265 @param RouterChunk A representation of part of the config file.
267 Parse a "complex" router configuration chunk.
269 Examine the services listed for a given router (normally in opensrf_core.xml). If
270 there is an entry for this service, or if there are @em no services listed, then
271 register with this router. Otherwise don't.
273 Called only by the parent process.
275 static void osrf_prefork_parse_router_chunk(
276 const char* appname, const jsonObject* routerChunk, bool unregister ) {
278 const char* routerName = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "name" ));
279 const char* domain = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "domain" ));
280 const jsonObject* services = jsonObjectGetKeyConst( routerChunk, "services" );
281 osrfLogDebug( OSRF_LOG_MARK, "found router config with domain %s and name %s",
282 routerName, domain );
284 if( services && services->type == JSON_HASH ) {
285 osrfLogDebug( OSRF_LOG_MARK, "investigating router information..." );
286 const jsonObject* service_obj = jsonObjectGetKeyConst( services, "service" );
288 ; // do nothing (shouldn't happen)
289 else if( JSON_ARRAY == service_obj->type ) {
290 // There are multiple services listed. Register with this router
291 // if and only if this service is on the list.
293 for( j = 0; j < service_obj->size; j++ ) {
294 const char* service = jsonObjectGetString( jsonObjectGetIndex( service_obj, j ));
295 if( service && !strcmp( appname, service ))
296 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
299 else if( JSON_STRING == service_obj->type ) {
300 // There's only one service listed. Register with this router
301 // if and only if this service is the one listed.
302 if( !strcmp( appname, jsonObjectGetString( service_obj )) )
303 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
306 // This router is not restricted to any set of services,
307 // so go ahead and register with it.
308 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
313 @brief Register the application with one or more routers, according to the configuration.
314 @param appname Name of the application.
316 Called only by the parent process.
318 static void osrf_prefork_register_routers( const char* appname, bool unregister ) {
320 jsonObject* routerInfo = osrfConfigGetValueObject( NULL, "/routers/router" );
323 for( i = 0; i < routerInfo->size; i++ ) {
324 const jsonObject* routerChunk = jsonObjectGetIndex( routerInfo, i );
326 if( routerChunk->type == JSON_STRING ) {
327 /* this accomodates simple router configs */
328 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
329 char* domain = osrfConfigGetValue( NULL, "/routers/router" );
330 osrfLogDebug( OSRF_LOG_MARK, "found simple router settings with router name %s",
332 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
337 osrf_prefork_parse_router_chunk( appname, routerChunk, unregister );
341 jsonObjectFree( routerInfo );
345 @brief Initialize a child process.
346 @param child Pointer to the prefork_child representing the new child process.
347 @return Zero if successful, or -1 if not.
349 Called only by child processes. Actions:
350 - Connect to one or more cache servers
351 - Reconfigure logger, if necessary
352 - Discard parent's Jabber connection and open a new one
353 - Dynamically call an application-specific initialization routine
354 - Change the command line as reported by ps
356 static int prefork_child_init_hook( prefork_child* child ) {
358 if( !child ) return -1;
359 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid );
361 // Connect to cache server(s).
362 osrfSystemInitCache();
363 char* resc = va_list_to_string( "%s_drone", child->appname );
365 // If we're a source-client, tell the logger now that we're a new process.
366 char* isclient = osrfConfigGetValue( NULL, "/client" );
367 if( isclient && !strcasecmp( isclient,"true" ))
368 osrfLogSetIsClient( 1 );
371 // Remove traces of our parent's socket connection so we can have our own.
372 // TODO: not necessary if parent disconnects first
373 osrfSystemIgnoreTransportClient();
376 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
377 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
384 // Dynamically call the application-specific initialization function
385 // from a previously loaded shared library.
386 if( ! osrfAppRunChildInit( child->appname )) {
387 osrfLogDebug( OSRF_LOG_MARK, "Prefork child_init succeeded\n" );
389 osrfLogError( OSRF_LOG_MARK, "Prefork child_init failed\n" );
393 // Change the command line as reported by ps
394 set_proc_title( "OpenSRF Drone [%s]", child->appname );
399 @brief Respond to a client request forwarded by the parent.
400 @param child Pointer to the state of the child process.
401 @param data Pointer to the raw XMPP message received from the parent.
402 @return 0 on success; non-zero means that the child process should clean itself up
403 and terminate immediately, presumably due to a fatal error condition.
405 Called only by a child process.
407 static int prefork_child_process_request( prefork_child* child, char* data ) {
408 if( !child ) return 0;
410 transport_client* client = osrfSystemGetTransportClient();
412 // Make sure that we're still connected to Jabber; reconnect if necessary.
413 if( !client_connected( client )) {
414 osrfSystemIgnoreTransportClient();
415 osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
416 if( !osrf_system_bootstrap_client( NULL, NULL )) {
417 osrfLogError( OSRF_LOG_MARK,
418 "Unable to bootstrap client in prefork_child_process_request()" );
420 osrf_prefork_child_exit( child );
424 // Construct the message from the xml.
425 transport_message* msg = new_message_from_xml( data );
427 // Respond to the transport message. This is where method calls are buried.
428 osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
432 int rc = session->panic;
435 osrfLogWarning( OSRF_LOG_MARK,
436 "Drone for session %s terminating immediately", session->session_id );
437 osrfAppSessionFree( session );
441 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
442 // We're no longer connected to the client, which presumably means that
443 // we're done with this request. Bail out.
444 osrfAppSessionFree( session );
448 // If we get this far, then the client has opened an application connection so that it
449 // can send multiple requests directly to the same server drone, bypassing the router
450 // and the listener. For example, it may need to do a database transaction, requiring
451 // multiple method calls within the same database session.
453 // Hence we go into a loop, responding to successive requests from the same client, until
454 // either the client disconnects or an error occurs.
456 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
457 int keepalive = child->keepalive;
465 // Respond to any input messages. This is where the method calls are buried.
466 osrfLogDebug( OSRF_LOG_MARK,
467 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive );
468 start = time( NULL );
469 retval = osrf_app_session_queue_wait( session, keepalive, &recvd );
472 osrfLogDebug( OSRF_LOG_MARK, "Data received == %d", recvd );
474 // Now we check a number of possible reasons to exit the loop.
476 // If the method call decided to terminate immediately,
477 // note that for future reference.
481 // If an error occurred when we tried to service the request, exit the loop.
483 osrfLogError( OSRF_LOG_MARK, "queue-wait returned non-success %d", retval );
487 // If the client disconnected, exit the loop.
488 if( session->state != OSRF_SESSION_CONNECTED )
491 // If we timed out while waiting for a request, exit the loop.
492 if( !recvd && (end - start) >= keepalive ) {
493 osrfLogInfo( OSRF_LOG_MARK,
494 "No request was received in %d seconds, exiting stateful session", keepalive );
495 osrfAppSessionStatus(
499 0, "Disconnected on timeout" );
504 // If the child process has decided to terminate immediately, exit the loop.
509 osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
510 osrfAppSessionFree( session );
515 @brief Partially initialize a prefork_simple provided by the caller.
516 @param prefork Pointer to a a raw prefork_simple to be initialized.
517 @param client Pointer to a transport_client (connection to Jabber).
518 @param max_requests The maximum number of requests that a child process may service
520 @param min_children Minimum number of child processes to maintain.
521 @param max_children Maximum number of child processes to maintain.
522 @return 0 if successful, or 1 if not (due to invalid parameters).
524 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
525 int max_requests, int min_children, int max_children ) {
527 if( min_children > max_children ) {
528 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
529 "than max_children (%d)", min_children, max_children );
533 if( max_children > ABS_MAX_CHILDREN ) {
534 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
535 max_children, ABS_MAX_CHILDREN );
539 osrfLogInfo( OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
540 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
542 /* flesh out the struct */
543 prefork->max_requests = max_requests;
544 prefork->min_children = min_children;
545 prefork->max_children = max_children;
547 prefork->data_to_child = 0;
548 prefork->data_to_parent = 0;
549 prefork->current_num_children = 0;
550 prefork->keepalive = 0;
551 prefork->appname = NULL;
552 prefork->first_child = NULL;
553 prefork->idle_list = NULL;
554 prefork->free_list = NULL;
555 prefork->connection = client;
561 @brief Spawn a new child process and put it in the idle list.
562 @param forker Pointer to the prefork_simple that will own the process.
563 @return Pointer to the new prefork_child, or not at all.
565 Spawn a new child process. Create a prefork_child for it and put it in the idle list.
567 After forking, the parent returns a pointer to the new prefork_child. The child
568 services its quota of requests and then terminates without returning.
570 static prefork_child* launch_child( prefork_simple* forker ) {
576 // Set up the data and status pipes
577 if( pipe( data_fd ) < 0 ) { /* build the data pipe*/
578 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
582 if( pipe( status_fd ) < 0 ) {/* build the status pipe */
583 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
589 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
590 data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
592 // Create and initialize a prefork_child for the new process
593 prefork_child* child = prefork_child_init( forker, data_fd[0],
594 data_fd[1], status_fd[0], status_fd[1] );
596 if( (pid=fork()) < 0 ) {
597 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
598 prefork_child_free( forker, child );
602 // Add the new child to the head of the idle list
603 child->next = forker->idle_list;
604 forker->idle_list = child;
606 if( pid > 0 ) { /* parent */
608 signal( SIGCHLD, sigchld_handler );
609 ( forker->current_num_children )++;
612 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
613 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
614 the children are currently using */
620 // we don't want to adopt our parent's handlers.
621 signal( SIGUSR1, SIG_DFL );
622 signal( SIGUSR2, SIG_DFL );
623 signal( SIGTERM, SIG_DFL );
624 signal( SIGINT, SIG_DFL );
625 signal( SIGQUIT, SIG_DFL );
626 signal( SIGCHLD, SIG_DFL );
628 osrfLogInternal( OSRF_LOG_MARK,
629 "I am new child with read_data_fd = %d and write_status_fd = %d",
630 child->read_data_fd, child->write_status_fd );
632 child->pid = getpid();
633 close( child->write_data_fd );
634 close( child->read_status_fd );
637 if( prefork_child_init_hook( child ) == -1 ) {
638 osrfLogError( OSRF_LOG_MARK,
639 "Forker child going away because we could not connect to OpenSRF..." );
640 osrf_prefork_child_exit( child );
643 prefork_child_wait( child ); // Should exit without returning
644 osrf_prefork_child_exit( child ); // Just to be sure
645 return NULL; // Unreachable, but it keeps the compiler happy
650 @brief Terminate a child process.
651 @param child Pointer to the prefork_child representing the child process (not used).
653 Called only by child processes. Dynamically call an application-specific shutdown
654 function from a previously loaded shared library; then exit.
656 static void osrf_prefork_child_exit( prefork_child* child ) {
657 osrfAppRunExitCode();
662 @brief Launch all the child processes, putting them in the idle list.
663 @param forker Pointer to the prefork_simple that will own the children.
665 Called only by the parent process (in order to become a parent).
667 static void prefork_launch_children( prefork_simple* forker ) {
668 if( !forker ) return;
670 while( c++ < forker->min_children )
671 launch_child( forker );
675 @brief Signal handler for SIGCHLD: note that a child process has terminated.
676 @param sig The value of the trapped signal; always SIGCHLD.
678 Set a boolean to be checked later.
680 static void sigchld_handler( int sig ) {
681 signal( SIGCHLD, sigchld_handler );
686 @brief Signal handler for SIGUSR1
687 @param sig The value of the trapped signal; always SIGUSR1.
689 Send unregister command to all registered routers.
691 static void sigusr1_handler( int sig ) {
692 if (!global_forker) return;
693 osrf_prefork_register_routers(global_forker->appname, true);
694 signal( SIGUSR1, sigusr1_handler );
698 @brief Signal handler for SIGUSR2
699 @param sig The value of the trapped signal; always SIGUSR2.
701 Send register command to all known routers
703 static void sigusr2_handler( int sig ) {
704 if (!global_forker) return;
705 osrf_prefork_register_routers(global_forker->appname, false);
706 signal( SIGUSR2, sigusr2_handler );
710 @brief Signal handler for SIGTERM
711 @param sig The value of the trapped signal; always SIGTERM
713 Perform a graceful prefork server shutdown.
715 static void sigterm_handler(int sig) {
716 if (!global_forker) return;
717 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGTERM, shutting down");
718 prefork_clear(global_forker, true);
723 @brief Signal handler for SIGINT or SIGQUIT
724 @param sig The value of the trapped signal
726 Perform a non-graceful prefork server shutdown.
728 static void sigint_handler(int sig) {
729 if (!global_forker) return;
730 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGINT/QUIT, shutting down");
731 prefork_clear(global_forker, false);
736 @brief Replenish the collection of child processes, after one has terminated.
737 @param forker Pointer to the prefork_simple that manages the child processes.
739 The parent calls this function when it notices (via a signal handler) that
740 a child process has died.
742 Wait on the dead children so that they won't be zombies. Spawn new ones as needed
743 to maintain at least a minimum number.
745 void reap_children( prefork_simple* forker ) {
749 // Reset our boolean so that we can detect any further terminations.
752 // Bury the children so that they won't be zombies. WNOHANG means that waitpid() returns
753 // immediately if there are no waitable children, instead of waiting for more to die.
754 // Ignore the return code of the child. We don't do an autopsy.
755 while( (child_pid = waitpid( -1, NULL, WNOHANG )) > 0 ) {
756 --forker->current_num_children;
757 del_prefork_child( forker, child_pid );
760 // Spawn more children as needed.
761 while( forker->current_num_children < forker->min_children )
762 launch_child( forker );
766 @brief Read transport_messages and dispatch them to child processes for servicing.
767 @param forker Pointer to the prefork_simple that manages the child processes.
769 This is the main loop of the parent process, and once entered, does not exit.
771 For each usable transport_message received: look for an idle child to service it. If
772 no idle children are available, either spawn a new one or, if we've already spawned the
773 maximum number of children, wait for one to become available. Once a child is available
774 by whatever means, write an XML version of the input message, to a pipe designated for
777 static void prefork_run( prefork_simple* forker ) {
779 if( NULL == forker->idle_list )
780 return; // No available children, and we haven't even started yet
782 transport_message* cur_msg = NULL;
786 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
787 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
791 // Wait indefinitely for an input message
792 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
793 cur_msg = client_recv( forker->connection, -1 );
795 if( cur_msg == NULL )
796 continue; // Error? Interrupted by a signal? Try again...
798 message_prepare_xml( cur_msg );
799 const char* msg_data = cur_msg->msg_xml;
800 if( ! msg_data || ! *msg_data ) {
801 osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
802 (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
803 message_free( cur_msg );
804 continue; // Message not usable; go on to the next one.
807 int honored = 0; /* will be set to true when we service the request */
813 if(check_children( forker, 0 ) < 0) {
814 continue; // check failed, try again
819 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
821 prefork_child* cur_child = NULL;
823 // Look for an available child in the idle list. Since the idle list operates
824 // as a stack, the child we get is the one that was most recently active, or
825 // most recently spawned. That means it's the one most likely still to be in
826 // physical memory, and the one least likely to have to be swapped in.
827 while( forker->idle_list ) {
829 osrfLogDebug( OSRF_LOG_MARK, "Looking for idle child" );
830 // Grab the prefork_child at the head of the idle list
831 cur_child = forker->idle_list;
832 forker->idle_list = cur_child->next;
833 cur_child->next = NULL;
835 osrfLogInternal( OSRF_LOG_MARK,
836 "Searching for available child. cur_child->pid = %d", cur_child->pid );
837 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
838 forker->current_num_children );
840 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
841 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
842 cur_child->write_data_fd );
844 int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
846 // This child appears to be dead or unusable. Discard it.
847 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
848 errno, strerror( errno ));
849 kill( cur_child->pid, SIGKILL );
850 del_prefork_child( forker, cur_child->pid );
854 add_prefork_child( forker, cur_child ); // Add it to active list
859 /* if none available, add a new child if we can */
861 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add..." );
863 if( forker->current_num_children < forker->max_children ) {
864 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
865 forker->current_num_children );
867 launch_child( forker ); // Put a new child into the idle list
868 if( forker->idle_list ) {
870 // Take the new child from the idle list
871 prefork_child* new_child = forker->idle_list;
872 forker->idle_list = new_child->next;
873 new_child->next = NULL;
875 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
876 new_child->write_data_fd, new_child->pid );
879 new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
881 // This child appears to be dead or unusable. Discard it.
882 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
883 errno, strerror( errno ));
884 kill( cur_child->pid, SIGKILL );
885 del_prefork_child( forker, cur_child->pid );
887 add_prefork_child( forker, new_child );
892 osrfLogWarning( OSRF_LOG_MARK, "Could not launch a new child as %d children "
893 "were already running; consider increasing max_children for this "
894 "application higher than %d in the OpenSRF configuration if this "
895 "message occurs frequently",
896 forker->current_num_children, forker->max_children );
901 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
902 if( check_children( forker, 1 ) >= 0 ) {
903 // Tell the loop not to call check_children again, since we just successfully called it
909 reap_children( forker );
911 } // end while( ! honored )
913 message_free( cur_msg );
915 } /* end top level listen loop */
920 @brief See if any children have become available.
921 @param forker Pointer to the prefork_simple that owns the children.
922 @param forever Boolean: true if we should wait indefinitely.
923 @return 0 or greater if successful, -1 on select error/interrupt
925 Call select() for all the children in the active list. Read each active file
926 descriptor and move the corresponding child to the idle list.
928 If @a forever is true, wait indefinitely for input. Otherwise return immediately if
929 there are no active file descriptors.
931 static int check_children( prefork_simple* forker, int forever ) {
934 reap_children( forker );
936 if( NULL == forker->first_child ) {
937 // If forever is true, then we're here because we've run out of idle
938 // processes, so there should be some active ones around, except during
939 // graceful shutdown, as we wait for all active children to become idle.
940 // If forever is false, then the children may all be idle, and that's okay.
942 osrfLogDebug( OSRF_LOG_MARK, "No active child processes to check" );
948 FD_ZERO( &read_set );
952 // Prepare to select() on pipes from all the active children
953 prefork_child* cur_child = forker->first_child;
955 if( cur_child->read_status_fd > max_fd )
956 max_fd = cur_child->read_status_fd;
957 FD_SET( cur_child->read_status_fd, &read_set );
958 cur_child = cur_child->next;
959 } while( cur_child != forker->first_child );
961 FD_CLR( 0, &read_set ); /* just to be sure */
965 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL )) == -1 ) {
966 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
967 errno, strerror( errno ));
969 osrfLogInfo( OSRF_LOG_MARK,
970 "select() completed after waiting on children to become available" );
978 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv )) == -1 ) {
979 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
980 errno, strerror( errno ));
984 if( select_ret <= 0 ) // we're done here
987 // Check each child in the active list.
988 // If it has responded, move it to the idle list.
989 cur_child = forker->first_child;
990 prefork_child* next_child = NULL;
993 next_child = cur_child->next;
994 if( FD_ISSET( cur_child->read_status_fd, &read_set )) {
995 osrfLogDebug( OSRF_LOG_MARK,
996 "Server received status from a child %d", cur_child->pid );
1000 /* now suck off the data */
1002 if( (n=read( cur_child->read_status_fd, buf, sizeof( buf ) - 1 )) < 0 ) {
1003 osrfLogWarning( OSRF_LOG_MARK,
1004 "Read error after select in child status read with errno %d: %s",
1005 errno, strerror( errno ));
1009 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
1012 // Remove the child from the active list
1013 if( forker->first_child == cur_child ) {
1014 if( cur_child->next == cur_child )
1015 forker->first_child = NULL; // only child in the active list
1017 forker->first_child = cur_child->next;
1019 cur_child->next->prev = cur_child->prev;
1020 cur_child->prev->next = cur_child->next;
1022 // Add it to the idle list
1023 cur_child->prev = NULL;
1024 cur_child->next = forker->idle_list;
1025 forker->idle_list = cur_child;
1027 cur_child = next_child;
1028 } while( forker->first_child && forker->first_child != next_child );
1034 @brief Service up a set maximum number of requests; then shut down.
1035 @param child Pointer to the prefork_child representing the child process.
1037 Called only by child process.
1039 Enter a loop, for up to max_requests iterations. On each iteration:
1040 - Wait indefinitely for a request from the parent.
1041 - Service the request.
1042 - Increment a counter. If the limit hasn't been reached, notify the parent that you
1043 are available for another request.
1045 After exiting the loop, shut down and terminate the process.
1047 static void prefork_child_wait( prefork_child* child ) {
1050 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
1051 char buf[READ_BUFSIZE];
1053 for( i = 0; i < child->max_requests; i++ ) {
1056 int gotdata = 0; // boolean; set to true if we get data
1057 clr_fl( child->read_data_fd, O_NONBLOCK );
1059 // Read a request from the parent, via a pipe, into a growing_buffer.
1060 while( (n = read( child->read_data_fd, buf, READ_BUFSIZE-1 )) > 0 ) {
1062 osrfLogDebug( OSRF_LOG_MARK, "Prefork child read %d bytes of data", n );
1064 set_fl( child->read_data_fd, O_NONBLOCK );
1067 buffer_add_n( gbuf, buf, n );
1070 if( errno == EAGAIN )
1073 if( errno == EPIPE ) {
1074 osrfLogDebug( OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting..." );
1078 int terminate_now = 0; // Boolean
1081 osrfLogWarning( OSRF_LOG_MARK,
1082 "Prefork child read returned error with errno %d", errno );
1085 } else if( gotdata ) {
1086 // Process the request
1087 osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
1088 terminate_now = prefork_child_process_request( child, gbuf->buf );
1089 buffer_reset( gbuf );
1092 if( terminate_now ) {
1093 // We're terminating prematurely -- presumably due to a fatal error condition.
1094 osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
1098 if( i < child->max_requests - 1 ) {
1099 // Report back to the parent for another request.
1101 ssize_t len = write(
1102 child->write_status_fd, "available" /*less than 64 bytes*/, msg_len );
1103 if( len != msg_len ) {
1104 osrfLogError( OSRF_LOG_MARK,
1105 "Drone terminating: unable to notify listener of availability: %s",
1107 buffer_free( gbuf );
1108 osrf_prefork_child_exit( child );
1113 buffer_free( gbuf );
1115 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
1116 child->max_requests, i, (long) getpid());
1118 osrf_prefork_child_exit( child );
1122 @brief Add a prefork_child to the end of the active list.
1123 @param forker Pointer to the prefork_simple that owns the list.
1124 @param child Pointer to the prefork_child to be added.
1126 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
1128 if( forker->first_child == NULL ) {
1129 // Simplest case: list is initially empty.
1130 forker->first_child = child;
1131 child->next = child;
1132 child->prev = child;
1134 // Find the last node in the circular list.
1135 prefork_child* last_child = forker->first_child->prev;
1137 // Insert the new child between the last and first children.
1138 last_child->next = child;
1139 child->prev = last_child;
1140 child->next = forker->first_child;
1141 forker->first_child->prev = child;
1146 @brief Delete and destroy a dead child from our list.
1147 @param forker Pointer to the prefork_simple that owns the dead child.
1148 @param pid Process ID of the dead child.
1150 Look for the dead child first in the list of active children. If you don't find it
1151 there, look in the list of idle children. If you find it, remove it from whichever
1152 list it's on, and destroy it.
1154 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
1156 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
1158 prefork_child* cur_child = NULL;
1160 // Look first in the active list
1161 if( forker->first_child ) {
1162 cur_child = forker->first_child; /* current pointer */
1163 while( cur_child->pid != pid && cur_child->next != forker->first_child )
1164 cur_child = cur_child->next;
1166 if( cur_child->pid == pid ) {
1167 // We found the right node. Remove it from the list.
1168 if( cur_child->next == cur_child )
1169 forker->first_child = NULL; // only child in the list
1171 if( forker->first_child == cur_child )
1172 forker->first_child = cur_child->next; // Reseat forker->first_child
1174 // Stitch the adjacent nodes together
1175 cur_child->prev->next = cur_child->next;
1176 cur_child->next->prev = cur_child->prev;
1179 cur_child = NULL; // Didn't find it in the active list
1183 // Maybe it's in the idle list. This can happen if, for example,
1184 // a child is killed by a signal while it's between requests.
1186 prefork_child* prev = NULL;
1187 cur_child = forker->idle_list;
1188 while( cur_child && cur_child->pid != pid ) {
1190 cur_child = cur_child->next;
1194 // Detach from the list
1196 prev->next = cur_child->next;
1198 forker->idle_list = cur_child->next;
1199 } // else we can't find it
1202 // If we found the node, destroy it.
1204 prefork_child_free( forker, cur_child );
1208 @brief Create and initialize a prefork_child.
1209 @param forker Pointer to the prefork_simple that will own the prefork_child.
1210 @param read_data_fd Used by child to read request from parent.
1211 @param write_data_fd Used by parent to write request to child.
1212 @param read_status_fd Used by parent to read status from child.
1213 @param write_status_fd Used by child to write status to parent.
1214 @return Pointer to the newly created prefork_child.
1216 The calling code is responsible for freeing the prefork_child by calling
1217 prefork_child_free().
1219 static prefork_child* prefork_child_init( prefork_simple* forker,
1220 int read_data_fd, int write_data_fd,
1221 int read_status_fd, int write_status_fd ) {
1223 // Allocate a prefork_child -- from the free list if possible, or from
1224 // the heap if necessary. The free list is a non-circular, singly-linked list.
1225 prefork_child* child;
1226 if( forker->free_list ) {
1227 child = forker->free_list;
1228 forker->free_list = child->next;
1230 child = safe_malloc( sizeof( prefork_child ));
1233 child->read_data_fd = read_data_fd;
1234 child->write_data_fd = write_data_fd;
1235 child->read_status_fd = read_status_fd;
1236 child->write_status_fd = write_status_fd;
1237 child->max_requests = forker->max_requests;
1238 child->appname = forker->appname; // We don't make a separate copy
1239 child->keepalive = forker->keepalive;
1247 @brief Terminate all child processes and clear out a prefork_simple.
1248 @param prefork Pointer to the prefork_simple to be cleared out.
1250 We do not deallocate the prefork_simple itself, just its contents.
1252 static void prefork_clear( prefork_simple* prefork, bool graceful ) {
1254 // always de-register routers before killing child processes (or waiting
1255 // for them to complete) so that new requests are directed elsewhere.
1256 osrf_prefork_register_routers(global_forker->appname, true);
1258 while( prefork->first_child ) {
1261 // wait for at least one active child to become idle, then repeat.
1262 // once complete, all children will be idle and cleaned up below.
1263 osrfLogInfo(OSRF_LOG_MARK, "graceful shutdown waiting...");
1264 check_children(prefork, 1);
1267 // Kill and delete all the active children
1268 kill( prefork->first_child->pid, SIGKILL );
1269 del_prefork_child( prefork, prefork->first_child->pid );
1274 osrfLogInfo(OSRF_LOG_MARK,
1275 "all active children are now idle in graceful shutdown");
1278 // Kill all the idle prefork children, close their file
1279 // descriptors, and move them to the free list.
1280 prefork_child* child = prefork->idle_list;
1281 prefork->idle_list = NULL;
1283 prefork_child* temp = child->next;
1284 kill( child->pid, SIGKILL );
1285 prefork_child_free( prefork, child );
1288 //prefork->current_num_children = 0;
1290 // Physically free the free list of prefork_children.
1291 child = prefork->free_list;
1292 prefork->free_list = NULL;
1294 prefork_child* temp = child->next;
1299 // Close the Jabber connection
1300 client_free( prefork->connection );
1301 prefork->connection = NULL;
1303 // After giving the child processes a second to terminate, wait on them so that they
1304 // don't become zombies. We don't wait indefinitely, so it's possible that some
1305 // children will survive a bit longer.
1307 while( (waitpid( -1, NULL, WNOHANG )) > 0 ) {
1308 --prefork->current_num_children;
1311 free( prefork->appname );
1312 prefork->appname = NULL;
1316 @brief Destroy and deallocate a prefork_child.
1317 @param forker Pointer to the prefork_simple that owns the prefork_child.
1318 @param child Pointer to the prefork_child to be destroyed.
1320 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1321 close( child->read_data_fd );
1322 close( child->write_data_fd );
1323 close( child->read_status_fd );
1324 close( child->write_status_fd );
1326 // Stick the prefork_child in a free list for potential reuse. This is a
1327 // non-circular, singly linked list.
1329 child->next = forker->free_list;
1330 forker->free_list = child;