3 @brief Spawn and manage a collection of child process to service requests.
5 Spawn a collection of child processes, replacing them as needed. Forward requests to them
6 and let the children do the work.
8 Each child processes some maximum number of requests before it terminates itself. When a
9 child dies, either deliberately or otherwise, we can spawn another one to replace it,
10 keeping the number of children within a predefined range.
12 Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13 a request, and who are still working on them. Use a separate linear linked list to keep
14 track of children that are currently idle. Move them back and forth as needed.
16 For each child, set up two pipes:
17 - One for the parent to send requests to the child.
18 - One for the child to notify the parent that it is available for another request.
20 The message sent to the child represents an XML stanza as received from Jabber.
22 When the child finishes processing the request, it writes the string "available" back
23 to the parent. Then the parent knows that it can send that child another request.
28 #include <sys/types.h>
34 #include <sys/select.h>
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
48 int max_requests; /**< How many requests a child processes before terminating. */
49 int min_children; /**< Minimum number of children to maintain. */
50 int max_children; /**< Maximum number of children to maintain. */
51 int fd; /**< Unused. */
52 int data_to_child; /**< Unused. */
53 int data_to_parent; /**< Unused. */
54 int current_num_children; /**< How many children are currently on the list. */
55 int keepalive; /**< Keepalive time for stateful sessions. */
56 char* appname; /**< Name of the application. */
57 /** Points to a circular linked list of children. */
58 struct prefork_child_struct* first_child;
59 /** List of of child processes that aren't doing anything at the moment and are
60 therefore available to service a new request. */
61 struct prefork_child_struct* idle_list;
62 /** List of allocated but unused prefork_children, available for reuse. Each one is just
63 raw memory, apart from the "next" pointer used to stitch them together. In particular,
64 there is no child process for them, and the file descriptors are not open. */
65 struct prefork_child_struct* free_list;
66 struct prefork_child_struct* sighup_pending_list;
67 transport_client* connection; /**< Connection to Jabber. */
70 struct prefork_child_struct {
71 pid_t pid; /**< Process ID of the child. */
72 int read_data_fd; /**< Child uses to read request. */
73 int write_data_fd; /**< Parent uses to write request. */
74 int read_status_fd; /**< Parent reads to see if child is available. */
75 int write_status_fd; /**< Child uses to notify parent when it's available again. */
76 int max_requests; /**< How many requests a child can process before terminating. */
77 const char* appname; /**< Name of the application. */
78 int keepalive; /**< Keepalive time for stateful sessions. */
79 struct prefork_child_struct* next; /**< Linkage pointer for linked list. */
80 struct prefork_child_struct* prev; /**< Linkage pointer for linked list. */
83 typedef struct prefork_child_struct prefork_child;
85 /** Boolean. Set to true by a signal handler when it traps SIGCHLD. */
86 static volatile sig_atomic_t child_dead;
88 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
89 int max_requests, int min_children, int max_children );
90 static prefork_child* launch_child( prefork_simple* forker );
91 static void prefork_launch_children( prefork_simple* forker );
92 static void prefork_run( prefork_simple* forker );
93 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
95 static void del_prefork_child( prefork_simple* forker, pid_t pid );
96 static int check_children( prefork_simple* forker, int forever );
97 static int prefork_child_process_request( prefork_child*, char* data );
98 static int prefork_child_init_hook( prefork_child* );
99 static prefork_child* prefork_child_init( prefork_simple* forker,
100 int read_data_fd, int write_data_fd,
101 int read_status_fd, int write_status_fd );
103 /* listens on the 'data_to_child' fd and wait for incoming data */
104 static void prefork_child_wait( prefork_child* child );
105 static void prefork_clear( prefork_simple*, bool graceful);
106 static void prefork_child_free( prefork_simple* forker, prefork_child* );
107 static void osrf_prefork_register_routers( const char* appname, bool unregister );
108 static void osrf_prefork_child_exit( prefork_child* );
110 static void sigchld_handler( int sig );
111 static void sigusr1_handler( int sig );
112 static void sigusr2_handler( int sig );
113 static void sigterm_handler( int sig );
114 static void sigint_handler( int sig );
115 static void sighup_handler( int sig );
117 /** Maintain a global pointer to the prefork_simple object
118 * for the current process so we can refer to it later
119 * for signal handling. There will only ever be one
120 * forker per process.
122 static prefork_simple *global_forker = NULL;
125 @brief Spawn and manage a collection of drone processes for servicing requests.
126 @param appname Name of the application.
127 @return 0 if successful, or -1 if error.
129 int osrf_prefork_run( const char* appname ) {
132 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
136 set_proc_title( "OpenSRF Listener [%s]", appname );
143 // Get configuration settings
144 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname );
146 char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
147 char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
148 char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
149 char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname );
152 osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive );
154 kalive = atoi( keepalive );
157 osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr );
159 maxr = atoi( max_req );
162 osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming %d", minc );
164 minc = atoi( min_children );
167 osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming %d", maxc );
169 maxc = atoi( max_children );
173 free( min_children );
174 free( max_children );
175 /* --------------------------------------------------- */
177 char* resc = va_list_to_string( "%s_listener", appname );
179 // Make sure that we haven't already booted
180 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
181 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
188 prefork_simple forker;
190 if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
191 osrfLogError( OSRF_LOG_MARK,
192 "osrf_prefork_run() failed to create prefork_simple object" );
196 // Finish initializing the prefork_simple.
197 forker.appname = strdup( appname );
198 forker.keepalive = kalive;
199 global_forker = &forker;
201 // Spawn the children; put them in the idle list.
202 prefork_launch_children( &forker );
204 // Tell the router that you're open for business.
205 osrf_prefork_register_routers( appname, false );
207 signal( SIGUSR1, sigusr1_handler);
208 signal( SIGUSR2, sigusr2_handler);
209 signal( SIGTERM, sigterm_handler);
210 signal( SIGINT, sigint_handler );
211 signal( SIGQUIT, sigint_handler );
212 signal( SIGHUP, sighup_handler );
214 // Sit back and let the requests roll in
215 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname );
216 prefork_run( &forker );
218 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??" );
219 prefork_clear( &forker, false );
224 @brief Register the application with a specified router.
225 @param appname Name of the application.
226 @param routerName Name of the router.
227 @param routerDomain Domain of the router.
229 Tell the router that you're open for business so that it can route requests to you.
231 Called only by the parent process.
233 static void osrf_prefork_send_router_registration(
234 const char* appname, const char* routerName,
235 const char* routerDomain, bool unregister ) {
237 // Get a pointer to the global transport_client
238 transport_client* client = osrfSystemGetTransportClient();
240 // Construct the Jabber address of the router
241 char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
243 // Create the registration message, and send it
244 transport_message* msg;
247 osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
248 msg = message_init( "unregistering", NULL, NULL, jid, NULL );
249 message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
253 osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
254 msg = message_init( "registering", NULL, NULL, jid, NULL );
255 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
258 client_send_message( client, msg );
266 @brief Register with a router, or not, according to some config settings.
267 @param appname Name of the application
268 @param RouterChunk A representation of part of the config file.
270 Parse a "complex" router configuration chunk.
272 Examine the services listed for a given router (normally in opensrf_core.xml). If
273 there is an entry for this service, or if there are @em no services listed, then
274 register with this router. Otherwise don't.
276 Called only by the parent process.
278 static void osrf_prefork_parse_router_chunk(
279 const char* appname, const jsonObject* routerChunk, bool unregister ) {
281 const char* routerName = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "name" ));
282 const char* domain = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "domain" ));
283 const jsonObject* services = jsonObjectGetKeyConst( routerChunk, "services" );
284 osrfLogDebug( OSRF_LOG_MARK, "found router config with domain %s and name %s",
285 routerName, domain );
287 if( services && services->type == JSON_HASH ) {
288 osrfLogDebug( OSRF_LOG_MARK, "investigating router information..." );
289 const jsonObject* service_obj = jsonObjectGetKeyConst( services, "service" );
291 ; // do nothing (shouldn't happen)
292 else if( JSON_ARRAY == service_obj->type ) {
293 // There are multiple services listed. Register with this router
294 // if and only if this service is on the list.
296 for( j = 0; j < service_obj->size; j++ ) {
297 const char* service = jsonObjectGetString( jsonObjectGetIndex( service_obj, j ));
298 if( service && !strcmp( appname, service ))
299 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
302 else if( JSON_STRING == service_obj->type ) {
303 // There's only one service listed. Register with this router
304 // if and only if this service is the one listed.
305 if( !strcmp( appname, jsonObjectGetString( service_obj )) )
306 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
309 // This router is not restricted to any set of services,
310 // so go ahead and register with it.
311 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
316 @brief Register the application with one or more routers, according to the configuration.
317 @param appname Name of the application.
319 Called only by the parent process.
321 static void osrf_prefork_register_routers( const char* appname, bool unregister ) {
323 jsonObject* routerInfo = osrfConfigGetValueObject( NULL, "/routers/router" );
326 for( i = 0; i < routerInfo->size; i++ ) {
327 const jsonObject* routerChunk = jsonObjectGetIndex( routerInfo, i );
329 if( routerChunk->type == JSON_STRING ) {
330 /* this accomodates simple router configs */
331 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
332 char* domain = osrfConfigGetValue( NULL, "/routers/router" );
333 osrfLogDebug( OSRF_LOG_MARK, "found simple router settings with router name %s",
335 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
340 osrf_prefork_parse_router_chunk( appname, routerChunk, unregister );
344 jsonObjectFree( routerInfo );
348 @brief Initialize a child process.
349 @param child Pointer to the prefork_child representing the new child process.
350 @return Zero if successful, or -1 if not.
352 Called only by child processes. Actions:
353 - Connect to one or more cache servers
354 - Reconfigure logger, if necessary
355 - Discard parent's Jabber connection and open a new one
356 - Dynamically call an application-specific initialization routine
357 - Change the command line as reported by ps
359 static int prefork_child_init_hook( prefork_child* child ) {
361 if( !child ) return -1;
362 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid );
364 // Connect to cache server(s).
365 osrfSystemInitCache();
366 char* resc = va_list_to_string( "%s_drone", child->appname );
368 // If we're a source-client, tell the logger now that we're a new process.
369 char* isclient = osrfConfigGetValue( NULL, "/client" );
370 if( isclient && !strcasecmp( isclient,"true" ))
371 osrfLogSetIsClient( 1 );
374 // Remove traces of our parent's socket connection so we can have our own.
375 // TODO: not necessary if parent disconnects first
376 osrfSystemIgnoreTransportClient();
379 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
380 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
387 // Dynamically call the application-specific initialization function
388 // from a previously loaded shared library.
389 if( ! osrfAppRunChildInit( child->appname )) {
390 osrfLogDebug( OSRF_LOG_MARK, "Prefork child_init succeeded\n" );
392 osrfLogError( OSRF_LOG_MARK, "Prefork child_init failed\n" );
396 // Change the command line as reported by ps
397 set_proc_title( "OpenSRF Drone [%s]", child->appname );
402 @brief Respond to a client request forwarded by the parent.
403 @param child Pointer to the state of the child process.
404 @param data Pointer to the raw XMPP message received from the parent.
405 @return 0 on success; non-zero means that the child process should clean itself up
406 and terminate immediately, presumably due to a fatal error condition.
408 Called only by a child process.
410 static int prefork_child_process_request( prefork_child* child, char* data ) {
411 if( !child ) return 0;
413 transport_client* client = osrfSystemGetTransportClient();
415 // Make sure that we're still connected to Jabber; reconnect if necessary.
416 if( !client_connected( client )) {
417 osrfSystemIgnoreTransportClient();
418 osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
419 if( !osrf_system_bootstrap_client( NULL, NULL )) {
420 osrfLogError( OSRF_LOG_MARK,
421 "Unable to bootstrap client in prefork_child_process_request()" );
423 osrf_prefork_child_exit( child );
427 // Construct the message from the xml.
428 transport_message* msg = new_message_from_xml( data );
430 // Respond to the transport message. This is where method calls are buried.
431 osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
435 int rc = session->panic;
438 osrfLogWarning( OSRF_LOG_MARK,
439 "Drone for session %s terminating immediately", session->session_id );
440 osrfAppSessionFree( session );
444 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
445 // We're no longer connected to the client, which presumably means that
446 // we're done with this request. Bail out.
447 osrfAppSessionFree( session );
451 // If we get this far, then the client has opened an application connection so that it
452 // can send multiple requests directly to the same server drone, bypassing the router
453 // and the listener. For example, it may need to do a database transaction, requiring
454 // multiple method calls within the same database session.
456 // Hence we go into a loop, responding to successive requests from the same client, until
457 // either the client disconnects or an error occurs.
459 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
460 int keepalive = child->keepalive;
468 // Respond to any input messages. This is where the method calls are buried.
469 osrfLogDebug( OSRF_LOG_MARK,
470 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive );
471 start = time( NULL );
472 retval = osrf_app_session_queue_wait( session, keepalive, &recvd );
475 osrfLogDebug( OSRF_LOG_MARK, "Data received == %d", recvd );
477 // Now we check a number of possible reasons to exit the loop.
479 // If the method call decided to terminate immediately,
480 // note that for future reference.
484 // If an error occurred when we tried to service the request, exit the loop.
486 osrfLogError( OSRF_LOG_MARK, "queue-wait returned non-success %d", retval );
490 // If the client disconnected, exit the loop.
491 if( session->state != OSRF_SESSION_CONNECTED )
494 // If we timed out while waiting for a request, exit the loop.
495 if( !recvd && (end - start) >= keepalive ) {
496 osrfLogInfo( OSRF_LOG_MARK,
497 "No request was received in %d seconds, exiting stateful session", keepalive );
498 osrfAppSessionStatus(
502 0, "Disconnected on timeout" );
507 // If the child process has decided to terminate immediately, exit the loop.
512 osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
513 osrfAppSessionFree( session );
518 @brief Partially initialize a prefork_simple provided by the caller.
519 @param prefork Pointer to a a raw prefork_simple to be initialized.
520 @param client Pointer to a transport_client (connection to Jabber).
521 @param max_requests The maximum number of requests that a child process may service
523 @param min_children Minimum number of child processes to maintain.
524 @param max_children Maximum number of child processes to maintain.
525 @return 0 if successful, or 1 if not (due to invalid parameters).
527 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
528 int max_requests, int min_children, int max_children ) {
530 if( min_children > max_children ) {
531 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
532 "than max_children (%d)", min_children, max_children );
536 if( max_children > ABS_MAX_CHILDREN ) {
537 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
538 max_children, ABS_MAX_CHILDREN );
542 osrfLogInfo( OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
543 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
545 /* flesh out the struct */
546 prefork->max_requests = max_requests;
547 prefork->min_children = min_children;
548 prefork->max_children = max_children;
550 prefork->data_to_child = 0;
551 prefork->data_to_parent = 0;
552 prefork->current_num_children = 0;
553 prefork->keepalive = 0;
554 prefork->appname = NULL;
555 prefork->first_child = NULL;
556 prefork->idle_list = NULL;
557 prefork->free_list = NULL;
558 prefork->connection = client;
559 prefork->sighup_pending_list = NULL;
565 @brief Spawn a new child process and put it in the idle list.
566 @param forker Pointer to the prefork_simple that will own the process.
567 @return Pointer to the new prefork_child, or not at all.
569 Spawn a new child process. Create a prefork_child for it and put it in the idle list.
571 After forking, the parent returns a pointer to the new prefork_child. The child
572 services its quota of requests and then terminates without returning.
574 static prefork_child* launch_child( prefork_simple* forker ) {
580 // Set up the data and status pipes
581 if( pipe( data_fd ) < 0 ) { /* build the data pipe*/
582 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
586 if( pipe( status_fd ) < 0 ) {/* build the status pipe */
587 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
593 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
594 data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
596 // Create and initialize a prefork_child for the new process
597 prefork_child* child = prefork_child_init( forker, data_fd[0],
598 data_fd[1], status_fd[0], status_fd[1] );
600 if( (pid=fork()) < 0 ) {
601 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
602 prefork_child_free( forker, child );
606 // Add the new child to the head of the idle list
607 child->next = forker->idle_list;
608 forker->idle_list = child;
610 if( pid > 0 ) { /* parent */
612 signal( SIGCHLD, sigchld_handler );
613 ( forker->current_num_children )++;
616 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
617 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
618 the children are currently using */
624 // we don't want to adopt our parent's handlers.
625 signal( SIGUSR1, SIG_DFL );
626 signal( SIGUSR2, SIG_DFL );
627 signal( SIGTERM, SIG_DFL );
628 signal( SIGINT, SIG_DFL );
629 signal( SIGQUIT, SIG_DFL );
630 signal( SIGCHLD, SIG_DFL );
631 signal( SIGHUP, SIG_DFL );
633 osrfLogInternal( OSRF_LOG_MARK,
634 "I am new child with read_data_fd = %d and write_status_fd = %d",
635 child->read_data_fd, child->write_status_fd );
637 child->pid = getpid();
638 close( child->write_data_fd );
639 close( child->read_status_fd );
642 if( prefork_child_init_hook( child ) == -1 ) {
643 osrfLogError( OSRF_LOG_MARK,
644 "Forker child going away because we could not connect to OpenSRF..." );
645 osrf_prefork_child_exit( child );
648 prefork_child_wait( child ); // Should exit without returning
649 osrf_prefork_child_exit( child ); // Just to be sure
650 return NULL; // Unreachable, but it keeps the compiler happy
655 @brief Terminate a child process.
656 @param child Pointer to the prefork_child representing the child process (not used).
658 Called only by child processes. Dynamically call an application-specific shutdown
659 function from a previously loaded shared library; then exit.
661 static void osrf_prefork_child_exit( prefork_child* child ) {
662 osrfAppRunExitCode();
667 @brief Launch all the child processes, putting them in the idle list.
668 @param forker Pointer to the prefork_simple that will own the children.
670 Called only by the parent process (in order to become a parent).
672 static void prefork_launch_children( prefork_simple* forker ) {
673 if( !forker ) return;
675 while( c++ < forker->min_children )
676 launch_child( forker );
680 @brief Signal handler for SIGCHLD: note that a child process has terminated.
681 @param sig The value of the trapped signal; always SIGCHLD.
683 Set a boolean to be checked later.
685 static void sigchld_handler( int sig ) {
686 signal( SIGCHLD, sigchld_handler );
691 @brief Signal handler for SIGUSR1
692 @param sig The value of the trapped signal; always SIGUSR1.
694 Send unregister command to all registered routers.
696 static void sigusr1_handler( int sig ) {
697 if (!global_forker) return;
698 osrf_prefork_register_routers(global_forker->appname, true);
699 signal( SIGUSR1, sigusr1_handler );
703 @brief Signal handler for SIGUSR2
704 @param sig The value of the trapped signal; always SIGUSR2.
706 Send register command to all known routers
708 static void sigusr2_handler( int sig ) {
709 if (!global_forker) return;
710 osrf_prefork_register_routers(global_forker->appname, false);
711 signal( SIGUSR2, sigusr2_handler );
715 @brief Signal handler for SIGTERM
716 @param sig The value of the trapped signal; always SIGTERM
718 Perform a graceful prefork server shutdown.
720 static void sigterm_handler(int sig) {
721 if (!global_forker) return;
722 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGTERM, shutting down");
723 prefork_clear(global_forker, true);
728 @brief Signal handler for SIGINT or SIGQUIT
729 @param sig The value of the trapped signal
731 Perform a non-graceful prefork server shutdown.
733 static void sigint_handler(int sig) {
734 if (!global_forker) return;
735 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGINT/QUIT, shutting down");
736 prefork_clear(global_forker, false);
740 static void sighup_handler(int sig) {
741 if (!global_forker) return;
742 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGHUP, reloading config");
744 osrfConfig* oldConfig = osrfConfigGetDefaultConfig();
745 osrfConfig* newConfig = osrfConfigInit(
746 oldConfig->configFileName, oldConfig->configContext);
749 osrfLogError(OSRF_LOG_MARK, "Config reload failed");
754 osrfConfigSetDefaultConfig(newConfig);
756 // apply the log level from the reloaded file
757 char* log_level = osrfConfigGetValue(NULL, "/loglevel");
759 int level = atoi(log_level);
760 osrfLogSetLevel(level);
763 // Copy the list of active children into the sighup_pending list.
764 // Cloning is necessary, since the nodes in the active list, particularly
765 // their next/prev pointers, will start changing once we exit this func.
766 // sighup_pending_list is a non-circular, singly linked list.
767 prefork_child* cur_child = global_forker->first_child;
768 prefork_child* clone;
770 // the first_pid lets us know when we've made a full circle of the active
773 while (cur_child && cur_child->pid != first_pid) {
775 if (!first_pid) first_pid = cur_child->pid;
777 // all we need to keep track of is the pid
778 clone = safe_malloc(sizeof(prefork_child));
779 clone->pid = cur_child->pid;
782 osrfLogDebug(OSRF_LOG_MARK,
783 "Adding child %d to sighup pending list", clone->pid);
785 // add the clone to the front of the list
786 if (global_forker->sighup_pending_list)
787 clone->next = global_forker->sighup_pending_list;
788 global_forker->sighup_pending_list = clone;
790 cur_child = cur_child->next;
793 // Kill all idle children.
794 // Let them get cleaned up through the normal response-handling cycle
795 cur_child = global_forker->idle_list;
797 osrfLogDebug(OSRF_LOG_MARK, "Killing child in SIGHUP %d", cur_child->pid);
798 kill(cur_child->pid, SIGKILL);
799 cur_child = cur_child->next;
805 @brief Replenish the collection of child processes, after one has terminated.
806 @param forker Pointer to the prefork_simple that manages the child processes.
808 The parent calls this function when it notices (via a signal handler) that
809 a child process has died.
811 Wait on the dead children so that they won't be zombies. Spawn new ones as needed
812 to maintain at least a minimum number.
814 static void reap_children( prefork_simple* forker ) {
818 // Reset our boolean so that we can detect any further terminations.
821 // Bury the children so that they won't be zombies. WNOHANG means that waitpid() returns
822 // immediately if there are no waitable children, instead of waiting for more to die.
823 // Ignore the return code of the child. We don't do an autopsy.
824 while( (child_pid = waitpid( -1, NULL, WNOHANG )) > 0 ) {
825 --forker->current_num_children;
826 del_prefork_child( forker, child_pid );
829 // Spawn more children as needed.
830 while( forker->current_num_children < forker->min_children )
831 launch_child( forker );
835 @brief Read transport_messages and dispatch them to child processes for servicing.
836 @param forker Pointer to the prefork_simple that manages the child processes.
838 This is the main loop of the parent process, and once entered, does not exit.
840 For each usable transport_message received: look for an idle child to service it. If
841 no idle children are available, either spawn a new one or, if we've already spawned the
842 maximum number of children, wait for one to become available. Once a child is available
843 by whatever means, write an XML version of the input message, to a pipe designated for
846 static void prefork_run( prefork_simple* forker ) {
848 if( NULL == forker->idle_list )
849 return; // No available children, and we haven't even started yet
851 transport_message* cur_msg = NULL;
855 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
856 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
860 // Wait indefinitely for an input message
861 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
862 cur_msg = client_recv( forker->connection, -1 );
864 if( cur_msg == NULL ) {
865 // most likely a signal was received. clean up any recently
866 // deceased children and try again.
868 reap_children(forker);
872 if (cur_msg->error_type) {
873 osrfLogInfo(OSRF_LOG_MARK,
874 "Listener received an XMPP error message. "
875 "Likely a bounced message. sender=%s", cur_msg->sender);
877 reap_children(forker);
881 message_prepare_xml( cur_msg );
882 const char* msg_data = cur_msg->msg_xml;
883 if( ! msg_data || ! *msg_data ) {
884 osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
885 (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
886 message_free( cur_msg );
887 continue; // Message not usable; go on to the next one.
890 int honored = 0; /* will be set to true when we service the request */
896 if(check_children( forker, 0 ) < 0) {
897 continue; // check failed, try again
902 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
904 prefork_child* cur_child = NULL;
906 // Look for an available child in the idle list. Since the idle list operates
907 // as a stack, the child we get is the one that was most recently active, or
908 // most recently spawned. That means it's the one most likely still to be in
909 // physical memory, and the one least likely to have to be swapped in.
910 while( forker->idle_list ) {
912 osrfLogDebug( OSRF_LOG_MARK, "Looking for idle child" );
913 // Grab the prefork_child at the head of the idle list
914 cur_child = forker->idle_list;
915 forker->idle_list = cur_child->next;
916 cur_child->next = NULL;
918 osrfLogInternal( OSRF_LOG_MARK,
919 "Searching for available child. cur_child->pid = %d", cur_child->pid );
920 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
921 forker->current_num_children );
923 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
924 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
925 cur_child->write_data_fd );
927 int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
929 // This child appears to be dead or unusable. Discard it.
930 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
931 errno, strerror( errno ));
932 kill( cur_child->pid, SIGKILL );
933 del_prefork_child( forker, cur_child->pid );
937 add_prefork_child( forker, cur_child ); // Add it to active list
942 /* if none available, add a new child if we can */
944 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add..." );
946 if( forker->current_num_children < forker->max_children ) {
947 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
948 forker->current_num_children );
950 launch_child( forker ); // Put a new child into the idle list
951 if( forker->idle_list ) {
953 // Take the new child from the idle list
954 prefork_child* new_child = forker->idle_list;
955 forker->idle_list = new_child->next;
956 new_child->next = NULL;
958 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
959 new_child->write_data_fd, new_child->pid );
962 new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
964 // This child appears to be dead or unusable. Discard it.
965 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
966 errno, strerror( errno ));
967 kill( cur_child->pid, SIGKILL );
968 del_prefork_child( forker, cur_child->pid );
970 add_prefork_child( forker, new_child );
975 osrfLogWarning( OSRF_LOG_MARK, "Could not launch a new child as %d children "
976 "were already running; consider increasing max_children for this "
977 "application higher than %d in the OpenSRF configuration if this "
978 "message occurs frequently",
979 forker->current_num_children, forker->max_children );
984 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
985 if( check_children( forker, 1 ) >= 0 ) {
986 // Tell the loop not to call check_children again, since we just successfully called it
992 reap_children( forker );
994 } // end while( ! honored )
996 message_free( cur_msg );
998 } /* end top level listen loop */
1003 @brief See if any children have become available.
1004 @param forker Pointer to the prefork_simple that owns the children.
1005 @param forever Boolean: true if we should wait indefinitely.
1006 @return 0 or greater if successful, -1 on select error/interrupt
1008 Call select() for all the children in the active list. Read each active file
1009 descriptor and move the corresponding child to the idle list.
1011 If @a forever is true, wait indefinitely for input. Otherwise return immediately if
1012 there are no active file descriptors.
1014 static int check_children( prefork_simple* forker, int forever ) {
1017 reap_children( forker );
1019 if( NULL == forker->first_child ) {
1020 // If forever is true, then we're here because we've run out of idle
1021 // processes, so there should be some active ones around, except during
1022 // graceful shutdown, as we wait for all active children to become idle.
1023 // If forever is false, then the children may all be idle, and that's okay.
1025 osrfLogDebug( OSRF_LOG_MARK, "No active child processes to check" );
1031 FD_ZERO( &read_set );
1035 // Prepare to select() on pipes from all the active children
1036 prefork_child* cur_child = forker->first_child;
1038 if( cur_child->read_status_fd > max_fd )
1039 max_fd = cur_child->read_status_fd;
1040 FD_SET( cur_child->read_status_fd, &read_set );
1041 cur_child = cur_child->next;
1042 } while( cur_child != forker->first_child );
1044 FD_CLR( 0, &read_set ); /* just to be sure */
1048 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL )) == -1 ) {
1049 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
1050 errno, strerror( errno ));
1052 osrfLogInfo( OSRF_LOG_MARK,
1053 "select() completed after waiting on children to become available" );
1061 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv )) == -1 ) {
1062 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
1063 errno, strerror( errno ));
1067 if( select_ret <= 0 ) // we're done here
1070 // Check each child in the active list.
1071 // If it has responded, move it to the idle list.
1072 cur_child = forker->first_child;
1073 prefork_child* next_child = NULL;
1074 int num_handled = 0;
1076 next_child = cur_child->next;
1077 if( FD_ISSET( cur_child->read_status_fd, &read_set )) {
1078 osrfLogDebug( OSRF_LOG_MARK,
1079 "Server received status from a child %d", cur_child->pid );
1083 /* now suck off the data */
1085 if( (n=read( cur_child->read_status_fd, buf, sizeof( buf ) - 1 )) < 0 ) {
1086 osrfLogWarning( OSRF_LOG_MARK,
1087 "Read error after select in child status read with errno %d: %s",
1088 errno, strerror( errno ));
1092 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
1096 // if this child is in the sighup_pending list, kill the child,
1097 // but leave it in the active list so that it won't be picked
1098 // for new work. When reap_children() next runs, it will be
1099 // properly cleaned up.
1100 prefork_child* hup_child = forker->sighup_pending_list;
1101 prefork_child* prev_hup_child = NULL;
1102 int hup_cleanup = 0;
1105 pid_t hup_pid = hup_child->pid;
1106 if (hup_pid == cur_child->pid) {
1108 osrfLogDebug(OSRF_LOG_MARK,
1109 "server: killing previously-active child after "
1110 "receiving SIGHUP: %d", hup_pid);
1112 if (forker->sighup_pending_list == hup_child) {
1113 // hup_child is the first (maybe only) in the list
1114 forker->sighup_pending_list = hup_child->next;
1116 // splice it from the list
1117 prev_hup_child->next = hup_child->next;
1120 free(hup_child); // clean up the thin clone
1121 kill(hup_pid, SIGKILL);
1126 prev_hup_child = hup_child;
1127 hup_child = hup_child->next;
1132 // Remove the child from the active list
1133 if( forker->first_child == cur_child ) {
1134 if( cur_child->next == cur_child ) {
1135 // only child in the active list
1136 forker->first_child = NULL;
1138 forker->first_child = cur_child->next;
1141 cur_child->next->prev = cur_child->prev;
1142 cur_child->prev->next = cur_child->next;
1144 // Add it to the idle list
1145 cur_child->prev = NULL;
1146 cur_child->next = forker->idle_list;
1147 forker->idle_list = cur_child;
1151 cur_child = next_child;
1152 } while( forker->first_child && forker->first_child != next_child );
1158 @brief Service up a set maximum number of requests; then shut down.
1159 @param child Pointer to the prefork_child representing the child process.
1161 Called only by child process.
1163 Enter a loop, for up to max_requests iterations. On each iteration:
1164 - Wait indefinitely for a request from the parent.
1165 - Service the request.
1166 - Increment a counter. If the limit hasn't been reached, notify the parent that you
1167 are available for another request.
1169 After exiting the loop, shut down and terminate the process.
1171 static void prefork_child_wait( prefork_child* child ) {
1174 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
1175 char buf[READ_BUFSIZE];
1177 for( i = 0; i < child->max_requests; i++ ) {
1180 int gotdata = 0; // boolean; set to true if we get data
1181 clr_fl( child->read_data_fd, O_NONBLOCK );
1183 // Read a request from the parent, via a pipe, into a growing_buffer.
1184 while( (n = read( child->read_data_fd, buf, READ_BUFSIZE-1 )) > 0 ) {
1186 osrfLogDebug( OSRF_LOG_MARK, "Prefork child read %d bytes of data", n );
1188 set_fl( child->read_data_fd, O_NONBLOCK );
1191 buffer_add_n( gbuf, buf, n );
1194 if( errno == EAGAIN )
1197 if( errno == EPIPE ) {
1198 osrfLogDebug( OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting..." );
1202 int terminate_now = 0; // Boolean
1205 osrfLogWarning( OSRF_LOG_MARK,
1206 "Prefork child read returned error with errno %d", errno );
1209 } else if( gotdata ) {
1210 // Process the request
1211 osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
1212 terminate_now = prefork_child_process_request( child, gbuf->buf );
1213 buffer_reset( gbuf );
1216 if( terminate_now ) {
1217 // We're terminating prematurely -- presumably due to a fatal error condition.
1218 osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
1222 if( i < child->max_requests - 1 ) {
1223 // Report back to the parent for another request.
1225 ssize_t len = write(
1226 child->write_status_fd, "available" /*less than 64 bytes*/, msg_len );
1227 if( len != msg_len ) {
1228 osrfLogError( OSRF_LOG_MARK,
1229 "Drone terminating: unable to notify listener of availability: %s",
1231 buffer_free( gbuf );
1232 osrf_prefork_child_exit( child );
1237 buffer_free( gbuf );
1239 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
1240 child->max_requests, i, (long) getpid());
1242 osrf_prefork_child_exit( child );
1246 @brief Add a prefork_child to the end of the active list.
1247 @param forker Pointer to the prefork_simple that owns the list.
1248 @param child Pointer to the prefork_child to be added.
1250 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
1252 if( forker->first_child == NULL ) {
1253 // Simplest case: list is initially empty.
1254 forker->first_child = child;
1255 child->next = child;
1256 child->prev = child;
1258 // Find the last node in the circular list.
1259 prefork_child* last_child = forker->first_child->prev;
1261 // Insert the new child between the last and first children.
1262 last_child->next = child;
1263 child->prev = last_child;
1264 child->next = forker->first_child;
1265 forker->first_child->prev = child;
1270 @brief Delete and destroy a dead child from our list.
1271 @param forker Pointer to the prefork_simple that owns the dead child.
1272 @param pid Process ID of the dead child.
1274 Look for the dead child first in the list of active children. If you don't find it
1275 there, look in the list of idle children. If you find it, remove it from whichever
1276 list it's on, and destroy it.
1278 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
1280 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
1282 prefork_child* cur_child = NULL;
1284 // Look first in the active list
1285 if( forker->first_child ) {
1286 cur_child = forker->first_child; /* current pointer */
1287 while( cur_child->pid != pid && cur_child->next != forker->first_child )
1288 cur_child = cur_child->next;
1290 if( cur_child->pid == pid ) {
1291 // We found the right node. Remove it from the list.
1292 if( cur_child->next == cur_child )
1293 forker->first_child = NULL; // only child in the list
1295 if( forker->first_child == cur_child )
1296 forker->first_child = cur_child->next; // Reseat forker->first_child
1298 // Stitch the adjacent nodes together
1299 cur_child->prev->next = cur_child->next;
1300 cur_child->next->prev = cur_child->prev;
1303 cur_child = NULL; // Didn't find it in the active list
1307 // Maybe it's in the idle list. This can happen if, for example,
1308 // a child is killed by a signal while it's between requests.
1310 prefork_child* prev = NULL;
1311 cur_child = forker->idle_list;
1312 while( cur_child && cur_child->pid != pid ) {
1314 cur_child = cur_child->next;
1318 // Detach from the list
1320 prev->next = cur_child->next;
1322 forker->idle_list = cur_child->next;
1323 } // else we can't find it
1326 // If we found the node, destroy it.
1328 prefork_child_free( forker, cur_child );
1332 @brief Create and initialize a prefork_child.
1333 @param forker Pointer to the prefork_simple that will own the prefork_child.
1334 @param read_data_fd Used by child to read request from parent.
1335 @param write_data_fd Used by parent to write request to child.
1336 @param read_status_fd Used by parent to read status from child.
1337 @param write_status_fd Used by child to write status to parent.
1338 @return Pointer to the newly created prefork_child.
1340 The calling code is responsible for freeing the prefork_child by calling
1341 prefork_child_free().
1343 static prefork_child* prefork_child_init( prefork_simple* forker,
1344 int read_data_fd, int write_data_fd,
1345 int read_status_fd, int write_status_fd ) {
1347 // Allocate a prefork_child -- from the free list if possible, or from
1348 // the heap if necessary. The free list is a non-circular, singly-linked list.
1349 prefork_child* child;
1350 if( forker->free_list ) {
1351 child = forker->free_list;
1352 forker->free_list = child->next;
1354 child = safe_malloc( sizeof( prefork_child ));
1357 child->read_data_fd = read_data_fd;
1358 child->write_data_fd = write_data_fd;
1359 child->read_status_fd = read_status_fd;
1360 child->write_status_fd = write_status_fd;
1361 child->max_requests = forker->max_requests;
1362 child->appname = forker->appname; // We don't make a separate copy
1363 child->keepalive = forker->keepalive;
1371 @brief Terminate all child processes and clear out a prefork_simple.
1372 @param prefork Pointer to the prefork_simple to be cleared out.
1374 We do not deallocate the prefork_simple itself, just its contents.
1376 static void prefork_clear( prefork_simple* prefork, bool graceful ) {
1378 // always de-register routers before killing child processes (or waiting
1379 // for them to complete) so that new requests are directed elsewhere.
1380 osrf_prefork_register_routers(global_forker->appname, true);
1382 while( prefork->first_child ) {
1385 // wait for at least one active child to become idle, then repeat.
1386 // once complete, all children will be idle and cleaned up below.
1387 osrfLogInfo(OSRF_LOG_MARK, "graceful shutdown waiting...");
1388 check_children(prefork, 1);
1391 // Kill and delete all the active children
1392 kill( prefork->first_child->pid, SIGKILL );
1393 del_prefork_child( prefork, prefork->first_child->pid );
1398 osrfLogInfo(OSRF_LOG_MARK,
1399 "all active children are now idle in graceful shutdown");
1402 // Kill all the idle prefork children, close their file
1403 // descriptors, and move them to the free list.
1404 prefork_child* child = prefork->idle_list;
1405 prefork->idle_list = NULL;
1407 prefork_child* temp = child->next;
1408 kill( child->pid, SIGKILL );
1409 prefork_child_free( prefork, child );
1412 //prefork->current_num_children = 0;
1414 // Physically free the free list of prefork_children.
1415 child = prefork->free_list;
1416 prefork->free_list = NULL;
1418 prefork_child* temp = child->next;
1423 // Close the Jabber connection
1424 client_free( prefork->connection );
1425 prefork->connection = NULL;
1427 // After giving the child processes a second to terminate, wait on them so that they
1428 // don't become zombies. We don't wait indefinitely, so it's possible that some
1429 // children will survive a bit longer.
1431 while( (waitpid( -1, NULL, WNOHANG )) > 0 ) {
1432 --prefork->current_num_children;
1435 free( prefork->appname );
1436 prefork->appname = NULL;
1440 @brief Destroy and deallocate a prefork_child.
1441 @param forker Pointer to the prefork_simple that owns the prefork_child.
1442 @param child Pointer to the prefork_child to be destroyed.
1444 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1445 close( child->read_data_fd );
1446 close( child->write_data_fd );
1447 close( child->read_status_fd );
1448 close( child->write_status_fd );
1450 // Stick the prefork_child in a free list for potential reuse. This is a
1451 // non-circular, singly linked list.
1453 child->next = forker->free_list;
1454 forker->free_list = child;