3 @brief Spawn and manage a collection of child process to service requests.
5 Spawn a collection of child processes, replacing them as needed. Forward requests to them
6 and let the children do the work.
8 Each child processes some maximum number of requests before it terminates itself. When a
9 child dies, either deliberately or otherwise, we can spawn another one to replace it,
10 keeping the number of children within a predefined range.
12 Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13 a request, and who are still working on them. Use a separate linear linked list to keep
14 track of children that are currently idle. Move them back and forth as needed.
16 For each child, set up two pipes:
17 - One for the parent to send requests to the child.
18 - One for the child to notify the parent that it is available for another request.
20 The message sent to the child represents an XML stanza as received from Jabber.
22 When the child finishes processing the request, it writes the string "available" back
23 to the parent. Then the parent knows that it can send that child another request.
28 #include <sys/types.h>
34 #include <sys/select.h>
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
48 int max_requests; /**< How many requests a child processes before terminating. */
49 int min_children; /**< Minimum number of children to maintain. */
50 int max_children; /**< Maximum number of children to maintain. */
51 int fd; /**< Unused. */
52 int data_to_child; /**< Unused. */
53 int data_to_parent; /**< Unused. */
54 int current_num_children; /**< How many children are currently on the list. */
55 int keepalive; /**< Keepalive time for stateful sessions. */
56 char* appname; /**< Name of the application. */
57 /** Points to a circular linked list of children. */
58 struct prefork_child_struct* first_child;
59 /** List of of child processes that aren't doing anything at the moment and are
60 therefore available to service a new request. */
61 struct prefork_child_struct* idle_list;
62 /** List of allocated but unused prefork_children, available for reuse. Each one is just
63 raw memory, apart from the "next" pointer used to stitch them together. In particular,
64 there is no child process for them, and the file descriptors are not open. */
65 struct prefork_child_struct* free_list;
66 transport_client* connection; /**< Connection to Jabber. */
69 struct prefork_child_struct {
70 pid_t pid; /**< Process ID of the child. */
71 int read_data_fd; /**< Child uses to read request. */
72 int write_data_fd; /**< Parent uses to write request. */
73 int read_status_fd; /**< Parent reads to see if child is available. */
74 int write_status_fd; /**< Child uses to notify parent when it's available again. */
75 int max_requests; /**< How many requests a child can process before terminating. */
76 const char* appname; /**< Name of the application. */
77 int keepalive; /**< Keepalive time for stateful sessions. */
78 struct prefork_child_struct* next; /**< Linkage pointer for linked list. */
79 struct prefork_child_struct* prev; /**< Linkage pointer for linked list. */
82 typedef struct prefork_child_struct prefork_child;
84 /** Boolean. Set to true by a signal handler when it traps SIGCHLD. */
85 static volatile sig_atomic_t child_dead;
87 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
88 int max_requests, int min_children, int max_children );
89 static prefork_child* launch_child( prefork_simple* forker );
90 static void prefork_launch_children( prefork_simple* forker );
91 static void prefork_run( prefork_simple* forker );
92 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
94 static void del_prefork_child( prefork_simple* forker, pid_t pid );
95 static int check_children( prefork_simple* forker, int forever );
96 static int prefork_child_process_request( prefork_child*, char* data );
97 static int prefork_child_init_hook( prefork_child* );
98 static prefork_child* prefork_child_init( prefork_simple* forker,
99 int read_data_fd, int write_data_fd,
100 int read_status_fd, int write_status_fd );
102 /* listens on the 'data_to_child' fd and wait for incoming data */
103 static void prefork_child_wait( prefork_child* child );
104 static void prefork_clear( prefork_simple*, int graceful);
105 static void prefork_child_free( prefork_simple* forker, prefork_child* );
106 static void osrf_prefork_register_routers( const char* appname, bool unregister );
107 static void osrf_prefork_child_exit( prefork_child* );
109 static void sigchld_handler( int sig );
110 static void sigusr1_handler( int sig );
111 static void sigterm_handler( int sig );
112 static void sigint_handler( int sig );
114 /** Maintain a global pointer to the prefork_simple object
115 * for the current process so we can refer to it later
116 * for signal handling. There will only ever be one
117 * forker per process.
119 static prefork_simple *global_forker = NULL;
122 @brief Spawn and manage a collection of drone processes for servicing requests.
123 @param appname Name of the application.
124 @return 0 if successful, or -1 if error.
126 int osrf_prefork_run( const char* appname ) {
129 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
133 set_proc_title( "OpenSRF Listener [%s]", appname );
140 // Get configuration settings
141 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname );
143 char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
144 char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
145 char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
146 char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname );
149 osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive );
151 kalive = atoi( keepalive );
154 osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr );
156 maxr = atoi( max_req );
159 osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming %d", minc );
161 minc = atoi( min_children );
164 osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming %d", maxc );
166 maxc = atoi( max_children );
170 free( min_children );
171 free( max_children );
172 /* --------------------------------------------------- */
174 char* resc = va_list_to_string( "%s_listener", appname );
176 // Make sure that we haven't already booted
177 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
178 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
185 prefork_simple forker;
187 if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
188 osrfLogError( OSRF_LOG_MARK,
189 "osrf_prefork_run() failed to create prefork_simple object" );
193 // Finish initializing the prefork_simple.
194 forker.appname = strdup( appname );
195 forker.keepalive = kalive;
196 global_forker = &forker;
198 // Spawn the children; put them in the idle list.
199 prefork_launch_children( &forker );
201 // Tell the router that you're open for business.
202 osrf_prefork_register_routers( appname, false );
204 signal( SIGUSR1, sigusr1_handler);
205 signal( SIGTERM, sigterm_handler);
206 signal( SIGINT, sigint_handler );
207 signal( SIGQUIT, sigint_handler );
209 // Sit back and let the requests roll in
210 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname );
211 prefork_run( &forker );
213 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??" );
214 prefork_clear( &forker, 0 );
219 @brief Register the application with a specified router.
220 @param appname Name of the application.
221 @param routerName Name of the router.
222 @param routerDomain Domain of the router.
224 Tell the router that you're open for business so that it can route requests to you.
226 Called only by the parent process.
228 static void osrf_prefork_send_router_registration(
229 const char* appname, const char* routerName,
230 const char* routerDomain, bool unregister ) {
232 // Get a pointer to the global transport_client
233 transport_client* client = osrfSystemGetTransportClient();
235 // Construct the Jabber address of the router
236 char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
238 // Create the registration message, and send it
239 transport_message* msg;
242 osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
243 msg = message_init( "unregistering", NULL, NULL, jid, NULL );
244 message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
248 osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
249 msg = message_init( "registering", NULL, NULL, jid, NULL );
250 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
253 client_send_message( client, msg );
261 @brief Register with a router, or not, according to some config settings.
262 @param appname Name of the application
263 @param RouterChunk A representation of part of the config file.
265 Parse a "complex" router configuration chunk.
267 Examine the services listed for a given router (normally in opensrf_core.xml). If
268 there is an entry for this service, or if there are @em no services listed, then
269 register with this router. Otherwise don't.
271 Called only by the parent process.
273 static void osrf_prefork_parse_router_chunk(
274 const char* appname, const jsonObject* routerChunk, bool unregister ) {
276 const char* routerName = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "name" ));
277 const char* domain = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "domain" ));
278 const jsonObject* services = jsonObjectGetKeyConst( routerChunk, "services" );
279 osrfLogDebug( OSRF_LOG_MARK, "found router config with domain %s and name %s",
280 routerName, domain );
282 if( services && services->type == JSON_HASH ) {
283 osrfLogDebug( OSRF_LOG_MARK, "investigating router information..." );
284 const jsonObject* service_obj = jsonObjectGetKeyConst( services, "service" );
286 ; // do nothing (shouldn't happen)
287 else if( JSON_ARRAY == service_obj->type ) {
288 // There are multiple services listed. Register with this router
289 // if and only if this service is on the list.
291 for( j = 0; j < service_obj->size; j++ ) {
292 const char* service = jsonObjectGetString( jsonObjectGetIndex( service_obj, j ));
293 if( service && !strcmp( appname, service ))
294 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
297 else if( JSON_STRING == service_obj->type ) {
298 // There's only one service listed. Register with this router
299 // if and only if this service is the one listed.
300 if( !strcmp( appname, jsonObjectGetString( service_obj )) )
301 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
304 // This router is not restricted to any set of services,
305 // so go ahead and register with it.
306 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
311 @brief Register the application with one or more routers, according to the configuration.
312 @param appname Name of the application.
314 Called only by the parent process.
316 static void osrf_prefork_register_routers( const char* appname, bool unregister ) {
318 jsonObject* routerInfo = osrfConfigGetValueObject( NULL, "/routers/router" );
321 for( i = 0; i < routerInfo->size; i++ ) {
322 const jsonObject* routerChunk = jsonObjectGetIndex( routerInfo, i );
324 if( routerChunk->type == JSON_STRING ) {
325 /* this accomodates simple router configs */
326 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
327 char* domain = osrfConfigGetValue( NULL, "/routers/router" );
328 osrfLogDebug( OSRF_LOG_MARK, "found simple router settings with router name %s",
330 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
335 osrf_prefork_parse_router_chunk( appname, routerChunk, unregister );
339 jsonObjectFree( routerInfo );
343 @brief Initialize a child process.
344 @param child Pointer to the prefork_child representing the new child process.
345 @return Zero if successful, or -1 if not.
347 Called only by child processes. Actions:
348 - Connect to one or more cache servers
349 - Reconfigure logger, if necessary
350 - Discard parent's Jabber connection and open a new one
351 - Dynamically call an application-specific initialization routine
352 - Change the command line as reported by ps
354 static int prefork_child_init_hook( prefork_child* child ) {
356 if( !child ) return -1;
357 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid );
359 // Connect to cache server(s).
360 osrfSystemInitCache();
361 char* resc = va_list_to_string( "%s_drone", child->appname );
363 // If we're a source-client, tell the logger now that we're a new process.
364 char* isclient = osrfConfigGetValue( NULL, "/client" );
365 if( isclient && !strcasecmp( isclient,"true" ))
366 osrfLogSetIsClient( 1 );
369 // Remove traces of our parent's socket connection so we can have our own.
370 osrfSystemIgnoreTransportClient();
373 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
374 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
381 // Dynamically call the application-specific initialization function
382 // from a previously loaded shared library.
383 if( ! osrfAppRunChildInit( child->appname )) {
384 osrfLogDebug( OSRF_LOG_MARK, "Prefork child_init succeeded\n" );
386 osrfLogError( OSRF_LOG_MARK, "Prefork child_init failed\n" );
390 // Change the command line as reported by ps
391 set_proc_title( "OpenSRF Drone [%s]", child->appname );
396 @brief Respond to a client request forwarded by the parent.
397 @param child Pointer to the state of the child process.
398 @param data Pointer to the raw XMPP message received from the parent.
399 @return 0 on success; non-zero means that the child process should clean itself up
400 and terminate immediately, presumably due to a fatal error condition.
402 Called only by a child process.
404 static int prefork_child_process_request( prefork_child* child, char* data ) {
405 if( !child ) return 0;
407 transport_client* client = osrfSystemGetTransportClient();
409 // Make sure that we're still connected to Jabber; reconnect if necessary.
410 if( !client_connected( client )) {
411 osrfSystemIgnoreTransportClient();
412 osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
413 if( !osrf_system_bootstrap_client( NULL, NULL )) {
414 osrfLogError( OSRF_LOG_MARK,
415 "Unable to bootstrap client in prefork_child_process_request()" );
417 osrf_prefork_child_exit( child );
421 // Construct the message from the xml.
422 transport_message* msg = new_message_from_xml( data );
424 // Respond to the transport message. This is where method calls are buried.
425 osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
429 int rc = session->panic;
432 osrfLogWarning( OSRF_LOG_MARK,
433 "Drone for session %s terminating immediately", session->session_id );
434 osrfAppSessionFree( session );
438 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
439 // We're no longer connected to the client, which presumably means that
440 // we're done with this request. Bail out.
441 osrfAppSessionFree( session );
445 // If we get this far, then the client has opened an application connection so that it
446 // can send multiple requests directly to the same server drone, bypassing the router
447 // and the listener. For example, it may need to do a database transaction, requiring
448 // multiple method calls within the same database session.
450 // Hence we go into a loop, responding to successive requests from the same client, until
451 // either the client disconnects or an error occurs.
453 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
454 int keepalive = child->keepalive;
462 // Respond to any input messages. This is where the method calls are buried.
463 osrfLogDebug( OSRF_LOG_MARK,
464 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive );
465 start = time( NULL );
466 retval = osrf_app_session_queue_wait( session, keepalive, &recvd );
469 osrfLogDebug( OSRF_LOG_MARK, "Data received == %d", recvd );
471 // Now we check a number of possible reasons to exit the loop.
473 // If the method call decided to terminate immediately,
474 // note that for future reference.
478 // If an error occurred when we tried to service the request, exit the loop.
480 osrfLogError( OSRF_LOG_MARK, "queue-wait returned non-success %d", retval );
484 // If the client disconnected, exit the loop.
485 if( session->state != OSRF_SESSION_CONNECTED )
488 // If we timed out while waiting for a request, exit the loop.
489 if( !recvd && (end - start) >= keepalive ) {
490 osrfLogInfo( OSRF_LOG_MARK,
491 "No request was received in %d seconds, exiting stateful session", keepalive );
492 osrfAppSessionStatus(
496 0, "Disconnected on timeout" );
501 // If the child process has decided to terminate immediately, exit the loop.
506 osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
507 osrfAppSessionFree( session );
512 @brief Partially initialize a prefork_simple provided by the caller.
513 @param prefork Pointer to a a raw prefork_simple to be initialized.
514 @param client Pointer to a transport_client (connection to Jabber).
515 @param max_requests The maximum number of requests that a child process may service
517 @param min_children Minimum number of child processes to maintain.
518 @param max_children Maximum number of child processes to maintain.
519 @return 0 if successful, or 1 if not (due to invalid parameters).
521 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
522 int max_requests, int min_children, int max_children ) {
524 if( min_children > max_children ) {
525 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
526 "than max_children (%d)", min_children, max_children );
530 if( max_children > ABS_MAX_CHILDREN ) {
531 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
532 max_children, ABS_MAX_CHILDREN );
536 osrfLogInfo( OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
537 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
539 /* flesh out the struct */
540 prefork->max_requests = max_requests;
541 prefork->min_children = min_children;
542 prefork->max_children = max_children;
544 prefork->data_to_child = 0;
545 prefork->data_to_parent = 0;
546 prefork->current_num_children = 0;
547 prefork->keepalive = 0;
548 prefork->appname = NULL;
549 prefork->first_child = NULL;
550 prefork->idle_list = NULL;
551 prefork->free_list = NULL;
552 prefork->connection = client;
558 @brief Spawn a new child process and put it in the idle list.
559 @param forker Pointer to the prefork_simple that will own the process.
560 @return Pointer to the new prefork_child, or not at all.
562 Spawn a new child process. Create a prefork_child for it and put it in the idle list.
564 After forking, the parent returns a pointer to the new prefork_child. The child
565 services its quota of requests and then terminates without returning.
567 static prefork_child* launch_child( prefork_simple* forker ) {
573 // Set up the data and status pipes
574 if( pipe( data_fd ) < 0 ) { /* build the data pipe*/
575 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
579 if( pipe( status_fd ) < 0 ) {/* build the status pipe */
580 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
586 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
587 data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
589 // Create and initialize a prefork_child for the new process
590 prefork_child* child = prefork_child_init( forker, data_fd[0],
591 data_fd[1], status_fd[0], status_fd[1] );
593 if( (pid=fork()) < 0 ) {
594 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
595 prefork_child_free( forker, child );
599 // Add the new child to the head of the idle list
600 child->next = forker->idle_list;
601 forker->idle_list = child;
603 if( pid > 0 ) { /* parent */
605 signal( SIGCHLD, sigchld_handler );
606 ( forker->current_num_children )++;
609 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
610 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
611 the children are currently using */
617 // we don't want to adopt our parent's handlers.
618 signal( SIGUSR1, SIG_DFL );
619 signal( SIGTERM, SIG_DFL );
620 signal( SIGINT, SIG_DFL );
621 signal( SIGQUIT, SIG_DFL );
622 signal( SIGCHLD, SIG_DFL );
624 osrfLogInternal( OSRF_LOG_MARK,
625 "I am new child with read_data_fd = %d and write_status_fd = %d",
626 child->read_data_fd, child->write_status_fd );
628 child->pid = getpid();
629 close( child->write_data_fd );
630 close( child->read_status_fd );
633 if( prefork_child_init_hook( child ) == -1 ) {
634 osrfLogError( OSRF_LOG_MARK,
635 "Forker child going away because we could not connect to OpenSRF..." );
636 osrf_prefork_child_exit( child );
639 prefork_child_wait( child ); // Should exit without returning
640 osrf_prefork_child_exit( child ); // Just to be sure
641 return NULL; // Unreachable, but it keeps the compiler happy
646 @brief Terminate a child process.
647 @param child Pointer to the prefork_child representing the child process (not used).
649 Called only by child processes. Dynamically call an application-specific shutdown
650 function from a previously loaded shared library; then exit.
652 static void osrf_prefork_child_exit( prefork_child* child ) {
653 osrfAppRunExitCode();
658 @brief Launch all the child processes, putting them in the idle list.
659 @param forker Pointer to the prefork_simple that will own the children.
661 Called only by the parent process (in order to become a parent).
663 static void prefork_launch_children( prefork_simple* forker ) {
664 if( !forker ) return;
666 while( c++ < forker->min_children )
667 launch_child( forker );
671 @brief Signal handler for SIGCHLD: note that a child process has terminated.
672 @param sig The value of the trapped signal; always SIGCHLD.
674 Set a boolean to be checked later.
676 static void sigchld_handler( int sig ) {
677 signal( SIGCHLD, sigchld_handler );
682 @brief Signal handler for SIGUSR1
683 @param sig The value of the trapped signal; always SIGUSR1.
685 Send unregister command to all registered routers.
687 static void sigusr1_handler( int sig ) {
688 if (!global_forker) return;
689 osrf_prefork_register_routers(global_forker->appname, true);
690 signal( SIGUSR1, sigusr1_handler );
694 @brief Signal handler for SIGTERM
695 @param sig The value of the trapped signal; always SIGTERM
697 Perform a graceful prefork server shutdown.
699 static void sigterm_handler(int sig) {
700 if (!global_forker) return;
701 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGTERM, shutting down");
702 prefork_clear(global_forker, 1);
707 @brief Signal handler for SIGINT or SIGQUIT
708 @param sig The value of the trapped signal
710 Perform a non-graceful prefork server shutdown.
712 static void sigint_handler(int sig) {
713 if (!global_forker) return;
714 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGINT/QUIT, shutting down");
715 prefork_clear(global_forker, 0);
720 @brief Replenish the collection of child processes, after one has terminated.
721 @param forker Pointer to the prefork_simple that manages the child processes.
723 The parent calls this function when it notices (via a signal handler) that
724 a child process has died.
726 Wait on the dead children so that they won't be zombies. Spawn new ones as needed
727 to maintain at least a minimum number.
729 void reap_children( prefork_simple* forker ) {
733 // Reset our boolean so that we can detect any further terminations.
736 // Bury the children so that they won't be zombies. WNOHANG means that waitpid() returns
737 // immediately if there are no waitable children, instead of waiting for more to die.
738 // Ignore the return code of the child. We don't do an autopsy.
739 while( (child_pid = waitpid( -1, NULL, WNOHANG )) > 0 ) {
740 --forker->current_num_children;
741 del_prefork_child( forker, child_pid );
744 // Spawn more children as needed.
745 while( forker->current_num_children < forker->min_children )
746 launch_child( forker );
750 @brief Read transport_messages and dispatch them to child processes for servicing.
751 @param forker Pointer to the prefork_simple that manages the child processes.
753 This is the main loop of the parent process, and once entered, does not exit.
755 For each usable transport_message received: look for an idle child to service it. If
756 no idle children are available, either spawn a new one or, if we've already spawned the
757 maximum number of children, wait for one to become available. Once a child is available
758 by whatever means, write an XML version of the input message, to a pipe designated for
761 static void prefork_run( prefork_simple* forker ) {
763 if( NULL == forker->idle_list )
764 return; // No available children, and we haven't even started yet
766 transport_message* cur_msg = NULL;
770 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
771 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
775 // Wait indefinitely for an input message
776 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
777 cur_msg = client_recv( forker->connection, -1 );
779 if( cur_msg == NULL )
780 continue; // Error? Interrupted by a signal? Try again...
782 message_prepare_xml( cur_msg );
783 const char* msg_data = cur_msg->msg_xml;
784 if( ! msg_data || ! *msg_data ) {
785 osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
786 (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
787 message_free( cur_msg );
788 continue; // Message not usable; go on to the next one.
791 int honored = 0; /* will be set to true when we service the request */
797 if(check_children( forker, 0 ) < 0) {
798 continue; // check failed, try again
803 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
805 prefork_child* cur_child = NULL;
807 // Look for an available child in the idle list. Since the idle list operates
808 // as a stack, the child we get is the one that was most recently active, or
809 // most recently spawned. That means it's the one most likely still to be in
810 // physical memory, and the one least likely to have to be swapped in.
811 while( forker->idle_list ) {
813 osrfLogDebug( OSRF_LOG_MARK, "Looking for idle child" );
814 // Grab the prefork_child at the head of the idle list
815 cur_child = forker->idle_list;
816 forker->idle_list = cur_child->next;
817 cur_child->next = NULL;
819 osrfLogInternal( OSRF_LOG_MARK,
820 "Searching for available child. cur_child->pid = %d", cur_child->pid );
821 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
822 forker->current_num_children );
824 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
825 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
826 cur_child->write_data_fd );
828 int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
830 // This child appears to be dead or unusable. Discard it.
831 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
832 errno, strerror( errno ));
833 kill( cur_child->pid, SIGKILL );
834 del_prefork_child( forker, cur_child->pid );
838 add_prefork_child( forker, cur_child ); // Add it to active list
843 /* if none available, add a new child if we can */
845 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add..." );
847 if( forker->current_num_children < forker->max_children ) {
848 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
849 forker->current_num_children );
851 launch_child( forker ); // Put a new child into the idle list
852 if( forker->idle_list ) {
854 // Take the new child from the idle list
855 prefork_child* new_child = forker->idle_list;
856 forker->idle_list = new_child->next;
857 new_child->next = NULL;
859 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
860 new_child->write_data_fd, new_child->pid );
863 new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
865 // This child appears to be dead or unusable. Discard it.
866 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
867 errno, strerror( errno ));
868 kill( cur_child->pid, SIGKILL );
869 del_prefork_child( forker, cur_child->pid );
871 add_prefork_child( forker, new_child );
876 osrfLogWarning( OSRF_LOG_MARK, "Could not launch a new child as %d children "
877 "were already running; consider increasing max_children for this "
878 "application higher than %d in the OpenSRF configuration if this "
879 "message occurs frequently",
880 forker->current_num_children, forker->max_children );
885 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
886 if( check_children( forker, 1 ) >= 0 ) {
887 // Tell the loop not to call check_children again, since we just successfully called it
893 reap_children( forker );
895 } // end while( ! honored )
897 message_free( cur_msg );
899 } /* end top level listen loop */
904 @brief See if any children have become available.
905 @param forker Pointer to the prefork_simple that owns the children.
906 @param forever Boolean: true if we should wait indefinitely.
907 @return 0 or greater if successful, -1 on select error/interrupt
909 Call select() for all the children in the active list. Read each active file
910 descriptor and move the corresponding child to the idle list.
912 If @a forever is true, wait indefinitely for input. Otherwise return immediately if
913 there are no active file descriptors.
915 static int check_children( prefork_simple* forker, int forever ) {
918 reap_children( forker );
920 if( NULL == forker->first_child ) {
921 // If forever is true, then we're here because we've run out of idle
922 // processes, so there should be some active ones around, except during
923 // graceful shutdown, as we wait for all active children to become idle.
924 // If forever is false, then the children may all be idle, and that's okay.
926 osrfLogDebug( OSRF_LOG_MARK, "No active child processes to check" );
932 FD_ZERO( &read_set );
936 // Prepare to select() on pipes from all the active children
937 prefork_child* cur_child = forker->first_child;
939 if( cur_child->read_status_fd > max_fd )
940 max_fd = cur_child->read_status_fd;
941 FD_SET( cur_child->read_status_fd, &read_set );
942 cur_child = cur_child->next;
943 } while( cur_child != forker->first_child );
945 FD_CLR( 0, &read_set ); /* just to be sure */
949 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL )) == -1 ) {
950 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
951 errno, strerror( errno ));
953 osrfLogInfo( OSRF_LOG_MARK,
954 "select() completed after waiting on children to become available" );
962 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv )) == -1 ) {
963 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
964 errno, strerror( errno ));
968 if( select_ret <= 0 ) // we're done here
971 // Check each child in the active list.
972 // If it has responded, move it to the idle list.
973 cur_child = forker->first_child;
974 prefork_child* next_child = NULL;
977 next_child = cur_child->next;
978 if( FD_ISSET( cur_child->read_status_fd, &read_set )) {
979 osrfLogDebug( OSRF_LOG_MARK,
980 "Server received status from a child %d", cur_child->pid );
984 /* now suck off the data */
986 if( (n=read( cur_child->read_status_fd, buf, sizeof( buf ) - 1 )) < 0 ) {
987 osrfLogWarning( OSRF_LOG_MARK,
988 "Read error after select in child status read with errno %d: %s",
989 errno, strerror( errno ));
993 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
996 // Remove the child from the active list
997 if( forker->first_child == cur_child ) {
998 if( cur_child->next == cur_child )
999 forker->first_child = NULL; // only child in the active list
1001 forker->first_child = cur_child->next;
1003 cur_child->next->prev = cur_child->prev;
1004 cur_child->prev->next = cur_child->next;
1006 // Add it to the idle list
1007 cur_child->prev = NULL;
1008 cur_child->next = forker->idle_list;
1009 forker->idle_list = cur_child;
1011 cur_child = next_child;
1012 } while( forker->first_child && forker->first_child != next_child );
1018 @brief Service up a set maximum number of requests; then shut down.
1019 @param child Pointer to the prefork_child representing the child process.
1021 Called only by child process.
1023 Enter a loop, for up to max_requests iterations. On each iteration:
1024 - Wait indefinitely for a request from the parent.
1025 - Service the request.
1026 - Increment a counter. If the limit hasn't been reached, notify the parent that you
1027 are available for another request.
1029 After exiting the loop, shut down and terminate the process.
1031 static void prefork_child_wait( prefork_child* child ) {
1034 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
1035 char buf[READ_BUFSIZE];
1037 for( i = 0; i < child->max_requests; i++ ) {
1040 int gotdata = 0; // boolean; set to true if we get data
1041 clr_fl( child->read_data_fd, O_NONBLOCK );
1043 // Read a request from the parent, via a pipe, into a growing_buffer.
1044 while( (n = read( child->read_data_fd, buf, READ_BUFSIZE-1 )) > 0 ) {
1046 osrfLogDebug( OSRF_LOG_MARK, "Prefork child read %d bytes of data", n );
1048 set_fl( child->read_data_fd, O_NONBLOCK );
1051 buffer_add_n( gbuf, buf, n );
1054 if( errno == EAGAIN )
1057 if( errno == EPIPE ) {
1058 osrfLogDebug( OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting..." );
1062 int terminate_now = 0; // Boolean
1065 osrfLogWarning( OSRF_LOG_MARK,
1066 "Prefork child read returned error with errno %d", errno );
1069 } else if( gotdata ) {
1070 // Process the request
1071 osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
1072 terminate_now = prefork_child_process_request( child, gbuf->buf );
1073 buffer_reset( gbuf );
1076 if( terminate_now ) {
1077 // We're terminating prematurely -- presumably due to a fatal error condition.
1078 osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
1082 if( i < child->max_requests - 1 ) {
1083 // Report back to the parent for another request.
1085 ssize_t len = write(
1086 child->write_status_fd, "available" /*less than 64 bytes*/, msg_len );
1087 if( len != msg_len ) {
1088 osrfLogError( OSRF_LOG_MARK,
1089 "Drone terminating: unable to notify listener of availability: %s",
1091 buffer_free( gbuf );
1092 osrf_prefork_child_exit( child );
1097 buffer_free( gbuf );
1099 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
1100 child->max_requests, i, (long) getpid());
1102 osrf_prefork_child_exit( child );
1106 @brief Add a prefork_child to the end of the active list.
1107 @param forker Pointer to the prefork_simple that owns the list.
1108 @param child Pointer to the prefork_child to be added.
1110 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
1112 if( forker->first_child == NULL ) {
1113 // Simplest case: list is initially empty.
1114 forker->first_child = child;
1115 child->next = child;
1116 child->prev = child;
1118 // Find the last node in the circular list.
1119 prefork_child* last_child = forker->first_child->prev;
1121 // Insert the new child between the last and first children.
1122 last_child->next = child;
1123 child->prev = last_child;
1124 child->next = forker->first_child;
1125 forker->first_child->prev = child;
1130 @brief Delete and destroy a dead child from our list.
1131 @param forker Pointer to the prefork_simple that owns the dead child.
1132 @param pid Process ID of the dead child.
1134 Look for the dead child first in the list of active children. If you don't find it
1135 there, look in the list of idle children. If you find it, remove it from whichever
1136 list it's on, and destroy it.
1138 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
1140 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
1142 prefork_child* cur_child = NULL;
1144 // Look first in the active list
1145 if( forker->first_child ) {
1146 cur_child = forker->first_child; /* current pointer */
1147 while( cur_child->pid != pid && cur_child->next != forker->first_child )
1148 cur_child = cur_child->next;
1150 if( cur_child->pid == pid ) {
1151 // We found the right node. Remove it from the list.
1152 if( cur_child->next == cur_child )
1153 forker->first_child = NULL; // only child in the list
1155 if( forker->first_child == cur_child )
1156 forker->first_child = cur_child->next; // Reseat forker->first_child
1158 // Stitch the adjacent nodes together
1159 cur_child->prev->next = cur_child->next;
1160 cur_child->next->prev = cur_child->prev;
1163 cur_child = NULL; // Didn't find it in the active list
1167 // Maybe it's in the idle list. This can happen if, for example,
1168 // a child is killed by a signal while it's between requests.
1170 prefork_child* prev = NULL;
1171 cur_child = forker->idle_list;
1172 while( cur_child && cur_child->pid != pid ) {
1174 cur_child = cur_child->next;
1178 // Detach from the list
1180 prev->next = cur_child->next;
1182 forker->idle_list = cur_child->next;
1183 } // else we can't find it
1186 // If we found the node, destroy it.
1188 prefork_child_free( forker, cur_child );
1192 @brief Create and initialize a prefork_child.
1193 @param forker Pointer to the prefork_simple that will own the prefork_child.
1194 @param read_data_fd Used by child to read request from parent.
1195 @param write_data_fd Used by parent to write request to child.
1196 @param read_status_fd Used by parent to read status from child.
1197 @param write_status_fd Used by child to write status to parent.
1198 @return Pointer to the newly created prefork_child.
1200 The calling code is responsible for freeing the prefork_child by calling
1201 prefork_child_free().
1203 static prefork_child* prefork_child_init( prefork_simple* forker,
1204 int read_data_fd, int write_data_fd,
1205 int read_status_fd, int write_status_fd ) {
1207 // Allocate a prefork_child -- from the free list if possible, or from
1208 // the heap if necessary. The free list is a non-circular, singly-linked list.
1209 prefork_child* child;
1210 if( forker->free_list ) {
1211 child = forker->free_list;
1212 forker->free_list = child->next;
1214 child = safe_malloc( sizeof( prefork_child ));
1217 child->read_data_fd = read_data_fd;
1218 child->write_data_fd = write_data_fd;
1219 child->read_status_fd = read_status_fd;
1220 child->write_status_fd = write_status_fd;
1221 child->max_requests = forker->max_requests;
1222 child->appname = forker->appname; // We don't make a separate copy
1223 child->keepalive = forker->keepalive;
1231 @brief Terminate all child processes and clear out a prefork_simple.
1232 @param prefork Pointer to the prefork_simple to be cleared out.
1234 We do not deallocate the prefork_simple itself, just its contents.
1236 static void prefork_clear( prefork_simple* prefork, int graceful ) {
1238 // always de-register routers before killing child processes (or waiting
1239 // for them to complete) so that new requests are directed elsewhere.
1240 osrf_prefork_register_routers(global_forker->appname, true);
1242 while( prefork->first_child ) {
1245 // wait for at least one active child to become idle, then repeat.
1246 // once complete, all children will be idle and cleaned up below.
1247 osrfLogInfo(OSRF_LOG_MARK, "graceful shutdown waiting...");
1248 check_children(prefork, 1);
1251 // Kill and delete all the active children
1252 kill( prefork->first_child->pid, SIGKILL );
1253 del_prefork_child( prefork, prefork->first_child->pid );
1258 osrfLogInfo(OSRF_LOG_MARK,
1259 "all active children are now idle in graceful shutdown");
1262 // Kill all the idle prefork children, close their file
1263 // descriptors, and move them to the free list.
1264 prefork_child* child = prefork->idle_list;
1265 prefork->idle_list = NULL;
1267 prefork_child* temp = child->next;
1268 kill( child->pid, SIGKILL );
1269 prefork_child_free( prefork, child );
1272 //prefork->current_num_children = 0;
1274 // Physically free the free list of prefork_children.
1275 child = prefork->free_list;
1276 prefork->free_list = NULL;
1278 prefork_child* temp = child->next;
1283 // Close the Jabber connection
1284 client_free( prefork->connection );
1285 prefork->connection = NULL;
1287 // After giving the child processes a second to terminate, wait on them so that they
1288 // don't become zombies. We don't wait indefinitely, so it's possible that some
1289 // children will survive a bit longer.
1291 while( (waitpid( -1, NULL, WNOHANG )) > 0 ) {
1292 --prefork->current_num_children;
1295 free( prefork->appname );
1296 prefork->appname = NULL;
1300 @brief Destroy and deallocate a prefork_child.
1301 @param forker Pointer to the prefork_simple that owns the prefork_child.
1302 @param child Pointer to the prefork_child to be destroyed.
1304 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1305 close( child->read_data_fd );
1306 close( child->write_data_fd );
1307 close( child->read_status_fd );
1308 close( child->write_status_fd );
1310 // Stick the prefork_child in a free list for potential reuse. This is a
1311 // non-circular, singly linked list.
1313 child->next = forker->free_list;
1314 forker->free_list = child;