3 @brief Spawn and manage a collection of child process to service requests.
5 Spawn a collection of child processes, replacing them as needed. Forward requests to them
6 and let the children do the work.
8 Each child processes some maximum number of requests before it terminates itself. When a
9 child dies, either deliberately or otherwise, we can spawn another one to replace it,
10 keeping the number of children within a predefined range.
12 Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13 a request, and who are still working on them. Use a separate linear linked list to keep
14 track of children that are currently idle. Move them back and forth as needed.
16 For each child, set up two pipes:
17 - One for the parent to send requests to the child.
18 - One for the child to notify the parent that it is available for another request.
20 The message sent to the child represents an XML stanza as received from Jabber.
22 When the child finishes processing the request, it writes the string "available" back
23 to the parent. Then the parent knows that it can send that child another request.
28 #include <sys/types.h>
34 #include <sys/select.h>
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
48 int max_requests; /**< How many requests a child processes before terminating. */
49 int min_children; /**< Minimum number of children to maintain. */
50 int max_children; /**< Maximum number of children to maintain. */
51 int max_backlog_queue; /**< Maximum size of backlog queue. */
52 int fd; /**< Unused. */
53 int data_to_child; /**< Unused. */
54 int data_to_parent; /**< Unused. */
55 int current_num_children; /**< How many children are currently on the list. */
56 int keepalive; /**< Keepalive time for stateful sessions. */
57 char* appname; /**< Name of the application. */
58 /** Points to a circular linked list of children. */
59 struct prefork_child_struct* first_child;
60 /** List of of child processes that aren't doing anything at the moment and are
61 therefore available to service a new request. */
62 struct prefork_child_struct* idle_list;
63 /** List of allocated but unused prefork_children, available for reuse. Each one is just
64 raw memory, apart from the "next" pointer used to stitch them together. In particular,
65 there is no child process for them, and the file descriptors are not open. */
66 struct prefork_child_struct* free_list;
67 struct prefork_child_struct* sighup_pending_list;
68 transport_client* connection; /**< Connection to Jabber. */
71 struct prefork_child_struct {
72 pid_t pid; /**< Process ID of the child. */
73 int read_data_fd; /**< Child uses to read request. */
74 int write_data_fd; /**< Parent uses to write request. */
75 int read_status_fd; /**< Parent reads to see if child is available. */
76 int write_status_fd; /**< Child uses to notify parent when it's available again. */
77 int max_requests; /**< How many requests a child can process before terminating. */
78 const char* appname; /**< Name of the application. */
79 int keepalive; /**< Keepalive time for stateful sessions. */
80 struct prefork_child_struct* next; /**< Linkage pointer for linked list. */
81 struct prefork_child_struct* prev; /**< Linkage pointer for linked list. */
84 typedef struct prefork_child_struct prefork_child;
86 /** Boolean. Set to true by a signal handler when it traps SIGCHLD. */
87 static volatile sig_atomic_t child_dead;
89 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
90 int max_requests, int min_children, int max_children, int max_backlog_queue );
91 static prefork_child* launch_child( prefork_simple* forker );
92 static void prefork_launch_children( prefork_simple* forker );
93 static void prefork_run( prefork_simple* forker );
94 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
96 static void del_prefork_child( prefork_simple* forker, pid_t pid );
97 static int check_children( prefork_simple* forker, int forever );
98 static int prefork_child_process_request( prefork_child*, char* data );
99 static int prefork_child_init_hook( prefork_child* );
100 static prefork_child* prefork_child_init( prefork_simple* forker,
101 int read_data_fd, int write_data_fd,
102 int read_status_fd, int write_status_fd );
104 /* listens on the 'data_to_child' fd and wait for incoming data */
105 static void prefork_child_wait( prefork_child* child );
106 static void prefork_clear( prefork_simple*, bool graceful);
107 static void prefork_child_free( prefork_simple* forker, prefork_child* );
108 static void osrf_prefork_register_routers( const char* appname, bool unregister );
109 static void osrf_prefork_child_exit( prefork_child* );
111 static void sigchld_handler( int sig );
112 static void sigusr1_handler( int sig );
113 static void sigusr2_handler( int sig );
114 static void sigterm_handler( int sig );
115 static void sigint_handler( int sig );
116 static void sighup_handler( int sig );
118 /** Maintain a global pointer to the prefork_simple object
119 * for the current process so we can refer to it later
120 * for signal handling. There will only ever be one
121 * forker per process.
123 static prefork_simple *global_forker = NULL;
126 @brief Spawn and manage a collection of drone processes for servicing requests.
127 @param appname Name of the application.
128 @return 0 if successful, or -1 if error.
130 int osrf_prefork_run( const char* appname ) {
133 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
137 set_proc_title( "OpenSRF Listener [%s]", appname );
145 // Get configuration settings
146 osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname );
148 char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
149 char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
150 char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
151 char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname );
152 char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname );
155 osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive );
157 kalive = atoi( keepalive );
160 osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr );
162 maxr = atoi( max_req );
165 osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming %d", minc );
167 minc = atoi( min_children );
170 osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming %d", maxc );
172 maxc = atoi( max_children );
174 if( !max_backlog_queue )
175 osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq );
177 maxbq = atoi( max_backlog_queue );
181 free( min_children );
182 free( max_children );
183 free( max_backlog_queue );
184 /* --------------------------------------------------- */
186 char* resc = va_list_to_string( "%s_listener", appname );
188 // Make sure that we haven't already booted
189 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
190 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
197 prefork_simple forker;
199 if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) {
200 osrfLogError( OSRF_LOG_MARK,
201 "osrf_prefork_run() failed to create prefork_simple object" );
205 // Finish initializing the prefork_simple.
206 forker.appname = strdup( appname );
207 forker.keepalive = kalive;
208 global_forker = &forker;
210 // Spawn the children; put them in the idle list.
211 prefork_launch_children( &forker );
213 // Tell the router that you're open for business.
214 osrf_prefork_register_routers( appname, false );
216 signal( SIGUSR1, sigusr1_handler);
217 signal( SIGUSR2, sigusr2_handler);
218 signal( SIGTERM, sigterm_handler);
219 signal( SIGINT, sigint_handler );
220 signal( SIGQUIT, sigint_handler );
221 signal( SIGHUP, sighup_handler );
223 // Sit back and let the requests roll in
224 osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname );
225 prefork_run( &forker );
227 osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??" );
228 prefork_clear( &forker, false );
233 @brief Register the application with a specified router.
234 @param appname Name of the application.
235 @param routerName Name of the router.
236 @param routerDomain Domain of the router.
238 Tell the router that you're open for business so that it can route requests to you.
240 Called only by the parent process.
242 static void osrf_prefork_send_router_registration(
243 const char* appname, const char* routerName,
244 const char* routerDomain, bool unregister ) {
246 // Get a pointer to the global transport_client
247 transport_client* client = osrfSystemGetTransportClient();
249 // Construct the Jabber address of the router
250 char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
252 // Create the registration message, and send it
253 transport_message* msg;
256 osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
257 msg = message_init( "unregistering", NULL, NULL, jid, NULL );
258 message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
262 osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
263 msg = message_init( "registering", NULL, NULL, jid, NULL );
264 message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
267 client_send_message( client, msg );
275 @brief Register with a router, or not, according to some config settings.
276 @param appname Name of the application
277 @param RouterChunk A representation of part of the config file.
279 Parse a "complex" router configuration chunk.
281 Examine the services listed for a given router (normally in opensrf_core.xml). If
282 there is an entry for this service, or if there are @em no services listed, then
283 register with this router. Otherwise don't.
285 Called only by the parent process.
287 static void osrf_prefork_parse_router_chunk(
288 const char* appname, const jsonObject* routerChunk, bool unregister ) {
290 const char* routerName = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "name" ));
291 const char* domain = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "domain" ));
292 const jsonObject* services = jsonObjectGetKeyConst( routerChunk, "services" );
293 osrfLogDebug( OSRF_LOG_MARK, "found router config with domain %s and name %s",
294 routerName, domain );
296 if( services && services->type == JSON_HASH ) {
297 osrfLogDebug( OSRF_LOG_MARK, "investigating router information..." );
298 const jsonObject* service_obj = jsonObjectGetKeyConst( services, "service" );
300 ; // do nothing (shouldn't happen)
301 else if( JSON_ARRAY == service_obj->type ) {
302 // There are multiple services listed. Register with this router
303 // if and only if this service is on the list.
305 for( j = 0; j < service_obj->size; j++ ) {
306 const char* service = jsonObjectGetString( jsonObjectGetIndex( service_obj, j ));
307 if( service && !strcmp( appname, service ))
308 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
311 else if( JSON_STRING == service_obj->type ) {
312 // There's only one service listed. Register with this router
313 // if and only if this service is the one listed.
314 if( !strcmp( appname, jsonObjectGetString( service_obj )) )
315 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
318 // This router is not restricted to any set of services,
319 // so go ahead and register with it.
320 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
325 @brief Register the application with one or more routers, according to the configuration.
326 @param appname Name of the application.
328 Called only by the parent process.
330 static void osrf_prefork_register_routers( const char* appname, bool unregister ) {
332 jsonObject* routerInfo = osrfConfigGetValueObject( NULL, "/routers/router" );
335 for( i = 0; i < routerInfo->size; i++ ) {
336 const jsonObject* routerChunk = jsonObjectGetIndex( routerInfo, i );
338 if( routerChunk->type == JSON_STRING ) {
339 /* this accomodates simple router configs */
340 char* routerName = osrfConfigGetValue( NULL, "/router_name" );
341 char* domain = osrfConfigGetValue( NULL, "/routers/router" );
342 osrfLogDebug( OSRF_LOG_MARK, "found simple router settings with router name %s",
344 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
349 osrf_prefork_parse_router_chunk( appname, routerChunk, unregister );
353 jsonObjectFree( routerInfo );
357 @brief Initialize a child process.
358 @param child Pointer to the prefork_child representing the new child process.
359 @return Zero if successful, or -1 if not.
361 Called only by child processes. Actions:
362 - Connect to one or more cache servers
363 - Reconfigure logger, if necessary
364 - Discard parent's Jabber connection and open a new one
365 - Dynamically call an application-specific initialization routine
366 - Change the command line as reported by ps
368 static int prefork_child_init_hook( prefork_child* child ) {
370 if( !child ) return -1;
371 osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid );
373 // Connect to cache server(s).
374 osrfSystemInitCache();
375 char* resc = va_list_to_string( "%s_drone", child->appname );
377 // If we're a source-client, tell the logger now that we're a new process.
378 char* isclient = osrfConfigGetValue( NULL, "/client" );
379 if( isclient && !strcasecmp( isclient,"true" ))
380 osrfLogSetIsClient( 1 );
383 // Remove traces of our parent's socket connection so we can have our own.
384 // TODO: not necessary if parent disconnects first
385 osrfSystemIgnoreTransportClient();
388 if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
389 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
396 // Dynamically call the application-specific initialization function
397 // from a previously loaded shared library.
398 if( ! osrfAppRunChildInit( child->appname )) {
399 osrfLogDebug( OSRF_LOG_MARK, "Prefork child_init succeeded\n" );
401 osrfLogError( OSRF_LOG_MARK, "Prefork child_init failed\n" );
405 // Change the command line as reported by ps
406 set_proc_title( "OpenSRF Drone [%s]", child->appname );
411 @brief Respond to a client request forwarded by the parent.
412 @param child Pointer to the state of the child process.
413 @param data Pointer to the raw XMPP message received from the parent.
414 @return 0 on success; non-zero means that the child process should clean itself up
415 and terminate immediately, presumably due to a fatal error condition.
417 Called only by a child process.
419 static int prefork_child_process_request( prefork_child* child, char* data ) {
420 if( !child ) return 0;
422 transport_client* client = osrfSystemGetTransportClient();
424 // Make sure that we're still connected to Jabber; reconnect if necessary.
425 if( !client_connected( client )) {
426 osrfSystemIgnoreTransportClient();
427 osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
428 if( !osrf_system_bootstrap_client( NULL, NULL )) {
429 osrfLogError( OSRF_LOG_MARK,
430 "Unable to bootstrap client in prefork_child_process_request()" );
432 osrf_prefork_child_exit( child );
436 // Construct the message from the xml.
437 transport_message* msg = new_message_from_xml( data );
439 // Respond to the transport message. This is where method calls are buried.
440 osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
444 int rc = session->panic;
447 osrfLogWarning( OSRF_LOG_MARK,
448 "Drone for session %s terminating immediately", session->session_id );
449 osrfAppSessionFree( session );
453 if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
454 // We're no longer connected to the client, which presumably means that
455 // we're done with this request. Bail out.
456 osrfAppSessionFree( session );
460 // If we get this far, then the client has opened an application connection so that it
461 // can send multiple requests directly to the same server drone, bypassing the router
462 // and the listener. For example, it may need to do a database transaction, requiring
463 // multiple method calls within the same database session.
465 // Hence we go into a loop, responding to successive requests from the same client, until
466 // either the client disconnects or an error occurs.
468 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
469 int keepalive = child->keepalive;
477 // Respond to any input messages. This is where the method calls are buried.
478 osrfLogDebug( OSRF_LOG_MARK,
479 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive );
480 start = time( NULL );
481 retval = osrf_app_session_queue_wait( session, keepalive, &recvd );
484 osrfLogDebug( OSRF_LOG_MARK, "Data received == %d", recvd );
486 // Now we check a number of possible reasons to exit the loop.
488 // If the method call decided to terminate immediately,
489 // note that for future reference.
493 // If an error occurred when we tried to service the request, exit the loop.
495 osrfLogError( OSRF_LOG_MARK, "queue-wait returned non-success %d", retval );
499 // If the client disconnected, exit the loop.
500 if( session->state != OSRF_SESSION_CONNECTED )
503 // If we timed out while waiting for a request, exit the loop.
504 if( !recvd && (end - start) >= keepalive ) {
505 osrfLogInfo( OSRF_LOG_MARK,
506 "No request was received in %d seconds, exiting stateful session", keepalive );
507 osrfAppSessionStatus(
511 0, "Disconnected on timeout" );
516 // If the child process has decided to terminate immediately, exit the loop.
521 osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
522 osrfAppSessionFree( session );
527 @brief Partially initialize a prefork_simple provided by the caller.
528 @param prefork Pointer to a a raw prefork_simple to be initialized.
529 @param client Pointer to a transport_client (connection to Jabber).
530 @param max_requests The maximum number of requests that a child process may service
532 @param min_children Minimum number of child processes to maintain.
533 @param max_children Maximum number of child processes to maintain.
534 @param max_backlog_queue Maximum size of backlog queue.
535 @return 0 if successful, or 1 if not (due to invalid parameters).
537 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
538 int max_requests, int min_children, int max_children, int max_backlog_queue ) {
540 if( min_children > max_children ) {
541 osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
542 "than max_children (%d)", min_children, max_children );
546 if( max_children > ABS_MAX_CHILDREN ) {
547 osrfLogError( OSRF_LOG_MARK, "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
548 max_children, ABS_MAX_CHILDREN );
552 osrfLogInfo( OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
553 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
555 /* flesh out the struct */
556 prefork->max_requests = max_requests;
557 prefork->min_children = min_children;
558 prefork->max_children = max_children;
559 prefork->max_backlog_queue = max_backlog_queue;
561 prefork->data_to_child = 0;
562 prefork->data_to_parent = 0;
563 prefork->current_num_children = 0;
564 prefork->keepalive = 0;
565 prefork->appname = NULL;
566 prefork->first_child = NULL;
567 prefork->idle_list = NULL;
568 prefork->free_list = NULL;
569 prefork->connection = client;
570 prefork->sighup_pending_list = NULL;
576 @brief Spawn a new child process and put it in the idle list.
577 @param forker Pointer to the prefork_simple that will own the process.
578 @return Pointer to the new prefork_child, or not at all.
580 Spawn a new child process. Create a prefork_child for it and put it in the idle list.
582 After forking, the parent returns a pointer to the new prefork_child. The child
583 services its quota of requests and then terminates without returning.
585 static prefork_child* launch_child( prefork_simple* forker ) {
591 // Set up the data and status pipes
592 if( pipe( data_fd ) < 0 ) { /* build the data pipe*/
593 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
597 if( pipe( status_fd ) < 0 ) {/* build the status pipe */
598 osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
604 osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
605 data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
607 // Create and initialize a prefork_child for the new process
608 prefork_child* child = prefork_child_init( forker, data_fd[0],
609 data_fd[1], status_fd[0], status_fd[1] );
611 if( (pid=fork()) < 0 ) {
612 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
613 prefork_child_free( forker, child );
617 // Add the new child to the head of the idle list
618 child->next = forker->idle_list;
619 forker->idle_list = child;
621 if( pid > 0 ) { /* parent */
623 signal( SIGCHLD, sigchld_handler );
624 ( forker->current_num_children )++;
627 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
628 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
629 the children are currently using */
635 // we don't want to adopt our parent's handlers.
636 signal( SIGUSR1, SIG_DFL );
637 signal( SIGUSR2, SIG_DFL );
638 signal( SIGTERM, SIG_DFL );
639 signal( SIGINT, SIG_DFL );
640 signal( SIGQUIT, SIG_DFL );
641 signal( SIGCHLD, SIG_DFL );
642 signal( SIGHUP, SIG_DFL );
644 osrfLogInternal( OSRF_LOG_MARK,
645 "I am new child with read_data_fd = %d and write_status_fd = %d",
646 child->read_data_fd, child->write_status_fd );
648 child->pid = getpid();
649 close( child->write_data_fd );
650 close( child->read_status_fd );
653 if( prefork_child_init_hook( child ) == -1 ) {
654 osrfLogError( OSRF_LOG_MARK,
655 "Forker child going away because we could not connect to OpenSRF..." );
656 osrf_prefork_child_exit( child );
659 prefork_child_wait( child ); // Should exit without returning
660 osrf_prefork_child_exit( child ); // Just to be sure
661 return NULL; // Unreachable, but it keeps the compiler happy
666 @brief Terminate a child process.
667 @param child Pointer to the prefork_child representing the child process (not used).
669 Called only by child processes. Dynamically call an application-specific shutdown
670 function from a previously loaded shared library; then exit.
672 static void osrf_prefork_child_exit( prefork_child* child ) {
673 osrfAppRunExitCode();
678 @brief Launch all the child processes, putting them in the idle list.
679 @param forker Pointer to the prefork_simple that will own the children.
681 Called only by the parent process (in order to become a parent).
683 static void prefork_launch_children( prefork_simple* forker ) {
684 if( !forker ) return;
686 while( c++ < forker->min_children )
687 launch_child( forker );
691 @brief Signal handler for SIGCHLD: note that a child process has terminated.
692 @param sig The value of the trapped signal; always SIGCHLD.
694 Set a boolean to be checked later.
696 static void sigchld_handler( int sig ) {
697 signal( SIGCHLD, sigchld_handler );
702 @brief Signal handler for SIGUSR1
703 @param sig The value of the trapped signal; always SIGUSR1.
705 Send unregister command to all registered routers.
707 static void sigusr1_handler( int sig ) {
708 if (!global_forker) return;
709 osrf_prefork_register_routers(global_forker->appname, true);
710 signal( SIGUSR1, sigusr1_handler );
714 @brief Signal handler for SIGUSR2
715 @param sig The value of the trapped signal; always SIGUSR2.
717 Send register command to all known routers
719 static void sigusr2_handler( int sig ) {
720 if (!global_forker) return;
721 osrf_prefork_register_routers(global_forker->appname, false);
722 signal( SIGUSR2, sigusr2_handler );
726 @brief Signal handler for SIGTERM
727 @param sig The value of the trapped signal; always SIGTERM
729 Perform a graceful prefork server shutdown.
731 static void sigterm_handler(int sig) {
732 if (!global_forker) return;
733 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGTERM, shutting down");
734 prefork_clear(global_forker, true);
739 @brief Signal handler for SIGINT or SIGQUIT
740 @param sig The value of the trapped signal
742 Perform a non-graceful prefork server shutdown.
744 static void sigint_handler(int sig) {
745 if (!global_forker) return;
746 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGINT/QUIT, shutting down");
747 prefork_clear(global_forker, false);
751 static void sighup_handler(int sig) {
752 if (!global_forker) return;
753 osrfLogInfo(OSRF_LOG_MARK, "server: received SIGHUP, reloading config");
755 osrfConfig* oldConfig = osrfConfigGetDefaultConfig();
756 osrfConfig* newConfig = osrfConfigInit(
757 oldConfig->configFileName, oldConfig->configContext);
760 osrfLogError(OSRF_LOG_MARK, "Config reload failed");
765 osrfConfigSetDefaultConfig(newConfig);
767 // apply the log level from the reloaded file
768 char* log_level = osrfConfigGetValue(NULL, "/loglevel");
770 int level = atoi(log_level);
771 osrfLogSetLevel(level);
774 // Copy the list of active children into the sighup_pending list.
775 // Cloning is necessary, since the nodes in the active list, particularly
776 // their next/prev pointers, will start changing once we exit this func.
777 // sighup_pending_list is a non-circular, singly linked list.
778 prefork_child* cur_child = global_forker->first_child;
779 prefork_child* clone;
781 // the first_pid lets us know when we've made a full circle of the active
784 while (cur_child && cur_child->pid != first_pid) {
786 if (!first_pid) first_pid = cur_child->pid;
788 // all we need to keep track of is the pid
789 clone = safe_malloc(sizeof(prefork_child));
790 clone->pid = cur_child->pid;
793 osrfLogDebug(OSRF_LOG_MARK,
794 "Adding child %d to sighup pending list", clone->pid);
796 // add the clone to the front of the list
797 if (global_forker->sighup_pending_list)
798 clone->next = global_forker->sighup_pending_list;
799 global_forker->sighup_pending_list = clone;
801 cur_child = cur_child->next;
804 // Kill all idle children.
805 // Let them get cleaned up through the normal response-handling cycle
806 cur_child = global_forker->idle_list;
808 osrfLogDebug(OSRF_LOG_MARK, "Killing child in SIGHUP %d", cur_child->pid);
809 kill(cur_child->pid, SIGKILL);
810 cur_child = cur_child->next;
816 @brief Replenish the collection of child processes, after one has terminated.
817 @param forker Pointer to the prefork_simple that manages the child processes.
819 The parent calls this function when it notices (via a signal handler) that
820 a child process has died.
822 Wait on the dead children so that they won't be zombies. Spawn new ones as needed
823 to maintain at least a minimum number.
825 static void reap_children( prefork_simple* forker ) {
829 // Reset our boolean so that we can detect any further terminations.
832 // Bury the children so that they won't be zombies. WNOHANG means that waitpid() returns
833 // immediately if there are no waitable children, instead of waiting for more to die.
834 // Ignore the return code of the child. We don't do an autopsy.
835 while( (child_pid = waitpid( -1, NULL, WNOHANG )) > 0 ) {
836 --forker->current_num_children;
837 del_prefork_child( forker, child_pid );
840 // Spawn more children as needed.
841 while( forker->current_num_children < forker->min_children )
842 launch_child( forker );
846 @brief Read transport_messages and dispatch them to child processes for servicing.
847 @param forker Pointer to the prefork_simple that manages the child processes.
849 This is the main loop of the parent process, and once entered, does not exit.
851 For each usable transport_message received: look for an idle child to service it. If
852 no idle children are available, either spawn a new one or, if we've already spawned the
853 maximum number of children, wait for one to become available. Once a child is available
854 by whatever means, write an XML version of the input message, to a pipe designated for
857 static void prefork_run( prefork_simple* forker ) {
859 if( NULL == forker->idle_list )
860 return; // No available children, and we haven't even started yet
862 transport_message* cur_msg = NULL;
864 // The backlog queue accumulates messages received while there
865 // are not yet children available to process them. While the
866 // transport client maintains its own queue of messages, sweeping
867 // the transport client's queue in the backlog queue gives us the
868 // ability to set a limit on the size of the backlog queue (and
869 // then to drop messages once the backlog queue has filled up)
870 transport_message* backlog_queue_head = NULL;
871 transport_message* backlog_queue_tail = NULL;
872 int backlog_queue_size = 0;
876 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
877 osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
881 int received_from_network = 0;
882 if ( backlog_queue_size == 0 ) {
883 // Wait indefinitely for an input message
884 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
885 cur_msg = client_recv( forker->connection, -1 );
886 received_from_network = 1;
888 // We have queued messages, which means all of our drones
889 // are occupied. See if any new messages are available on the
890 // network while waiting up to 1 second to allow time for a drone
891 // to become available to handle the next request in the queue.
892 cur_msg = client_recv( forker->connection, 1 );
893 if ( cur_msg != NULL )
894 received_from_network = 1;
897 if (received_from_network) {
898 if( cur_msg == NULL ) {
899 // most likely a signal was received. clean up any recently
900 // deceased children and try again.
902 reap_children(forker);
906 if (cur_msg->error_type) {
907 osrfLogInfo(OSRF_LOG_MARK,
908 "Listener received an XMPP error message. "
909 "Likely a bounced message. sender=%s", cur_msg->sender);
911 reap_children(forker);
915 message_prepare_xml( cur_msg );
916 const char* msg_data = cur_msg->msg_xml;
917 if( ! msg_data || ! *msg_data ) {
918 osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
919 (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
920 message_free( cur_msg );
921 continue; // Message not usable; go on to the next one.
924 // stick message onto queue
925 cur_msg->next = NULL;
926 if (backlog_queue_size == 0) {
927 backlog_queue_head = cur_msg;
928 backlog_queue_tail = cur_msg;
930 if (backlog_queue_size >= forker->max_backlog_queue) {
931 osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping "
933 forker->max_backlog_queue );
934 osrfMessage* err = osrf_message_init( STATUS, 1, 1 );
935 osrf_message_set_status_info( err, "osrfMethodException",
936 "Service unavailable: no available children and backlog queue at limit",
937 OSRF_STATUS_SERVICEUNAVAILABLE );
938 char *data = osrf_message_serialize( err );
939 osrfMessageFree( err );
940 transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient );
941 message_set_osrf_xid(tresponse, cur_msg->osrf_xid);
943 transport_client* client = osrfSystemGetTransportClient();
944 client_send_message( client, tresponse );
945 message_free( tresponse );
946 message_free(cur_msg);
949 backlog_queue_tail->next = cur_msg;
950 backlog_queue_tail = cur_msg;
951 osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." );
953 backlog_queue_size++;
956 if (backlog_queue_size == 0) {
957 // strictly speaking, this check may be redundant, but
958 // from this point forward we can be sure that the
959 // backlog queue has at least one message in it and
960 // that if we can find a child to process it, we want to
961 // process the head of that queue.
965 cur_msg = backlog_queue_head;
967 int honored = 0; /* will be set to true when we service the request */
973 if(check_children( forker, 0 ) < 0) {
974 continue; // check failed, try again
979 osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
981 prefork_child* cur_child = NULL;
983 // Look for an available child in the idle list. Since the idle list operates
984 // as a stack, the child we get is the one that was most recently active, or
985 // most recently spawned. That means it's the one most likely still to be in
986 // physical memory, and the one least likely to have to be swapped in.
987 while( forker->idle_list ) {
989 osrfLogDebug( OSRF_LOG_MARK, "Looking for idle child" );
990 // Grab the prefork_child at the head of the idle list
991 cur_child = forker->idle_list;
992 forker->idle_list = cur_child->next;
993 cur_child->next = NULL;
995 osrfLogInternal( OSRF_LOG_MARK,
996 "Searching for available child. cur_child->pid = %d", cur_child->pid );
997 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
998 forker->current_num_children );
1000 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
1001 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
1002 cur_child->write_data_fd );
1004 const char* msg_data = cur_msg->msg_xml;
1005 int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
1007 // This child appears to be dead or unusable. Discard it.
1008 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
1009 errno, strerror( errno ));
1010 kill( cur_child->pid, SIGKILL );
1011 del_prefork_child( forker, cur_child->pid );
1015 add_prefork_child( forker, cur_child ); // Add it to active list
1020 /* if none available, add a new child if we can */
1022 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add..." );
1024 if( forker->current_num_children < forker->max_children ) {
1025 osrfLogDebug( OSRF_LOG_MARK, "Launching new child with current_num = %d",
1026 forker->current_num_children );
1028 launch_child( forker ); // Put a new child into the idle list
1029 if( forker->idle_list ) {
1031 // Take the new child from the idle list
1032 prefork_child* new_child = forker->idle_list;
1033 forker->idle_list = new_child->next;
1034 new_child->next = NULL;
1036 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
1037 new_child->write_data_fd, new_child->pid );
1039 const char* msg_data = cur_msg->msg_xml;
1040 int written = write(
1041 new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
1043 // This child appears to be dead or unusable. Discard it.
1044 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
1045 errno, strerror( errno ));
1046 kill( cur_child->pid, SIGKILL );
1047 del_prefork_child( forker, cur_child->pid );
1049 add_prefork_child( forker, new_child );
1054 osrfLogWarning( OSRF_LOG_MARK, "Could not launch a new child as %d children "
1055 "were already running; consider increasing max_children for this "
1056 "application higher than %d in the OpenSRF configuration if this "
1057 "message occurs frequently",
1058 forker->current_num_children, forker->max_children );
1063 reap_children( forker );
1069 } // end while( ! honored )
1072 backlog_queue_head = cur_msg->next;
1073 backlog_queue_size--;
1074 cur_msg->next = NULL;
1075 message_free( cur_msg );
1078 } /* end top level listen loop */
1083 @brief See if any children have become available.
1084 @param forker Pointer to the prefork_simple that owns the children.
1085 @param forever Boolean: true if we should wait indefinitely.
1086 @return 0 or greater if successful, -1 on select error/interrupt
1088 Call select() for all the children in the active list. Read each active file
1089 descriptor and move the corresponding child to the idle list.
1091 If @a forever is true, wait indefinitely for input. Otherwise return immediately if
1092 there are no active file descriptors.
1094 static int check_children( prefork_simple* forker, int forever ) {
1097 reap_children( forker );
1099 if( NULL == forker->first_child ) {
1100 // If forever is true, then we're here because we've run out of idle
1101 // processes, so there should be some active ones around, except during
1102 // graceful shutdown, as we wait for all active children to become idle.
1103 // If forever is false, then the children may all be idle, and that's okay.
1105 osrfLogDebug( OSRF_LOG_MARK, "No active child processes to check" );
1111 FD_ZERO( &read_set );
1115 // Prepare to select() on pipes from all the active children
1116 prefork_child* cur_child = forker->first_child;
1118 if( cur_child->read_status_fd > max_fd )
1119 max_fd = cur_child->read_status_fd;
1120 FD_SET( cur_child->read_status_fd, &read_set );
1121 cur_child = cur_child->next;
1122 } while( cur_child != forker->first_child );
1124 FD_CLR( 0, &read_set ); /* just to be sure */
1128 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL )) == -1 ) {
1129 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
1130 errno, strerror( errno ));
1132 osrfLogInfo( OSRF_LOG_MARK,
1133 "select() completed after waiting on children to become available" );
1141 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv )) == -1 ) {
1142 osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
1143 errno, strerror( errno ));
1147 if( select_ret <= 0 ) // we're done here
1150 // Check each child in the active list.
1151 // If it has responded, move it to the idle list.
1152 cur_child = forker->first_child;
1153 prefork_child* next_child = NULL;
1154 int num_handled = 0;
1156 next_child = cur_child->next;
1157 if( FD_ISSET( cur_child->read_status_fd, &read_set )) {
1158 osrfLogDebug( OSRF_LOG_MARK,
1159 "Server received status from a child %d", cur_child->pid );
1163 /* now suck off the data */
1165 if( (n=read( cur_child->read_status_fd, buf, sizeof( buf ) - 1 )) < 0 ) {
1166 osrfLogWarning( OSRF_LOG_MARK,
1167 "Read error after select in child status read with errno %d: %s",
1168 errno, strerror( errno ));
1172 osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
1176 // if this child is in the sighup_pending list, kill the child,
1177 // but leave it in the active list so that it won't be picked
1178 // for new work. When reap_children() next runs, it will be
1179 // properly cleaned up.
1180 prefork_child* hup_child = forker->sighup_pending_list;
1181 prefork_child* prev_hup_child = NULL;
1182 int hup_cleanup = 0;
1185 pid_t hup_pid = hup_child->pid;
1186 if (hup_pid == cur_child->pid) {
1188 osrfLogDebug(OSRF_LOG_MARK,
1189 "server: killing previously-active child after "
1190 "receiving SIGHUP: %d", hup_pid);
1192 if (forker->sighup_pending_list == hup_child) {
1193 // hup_child is the first (maybe only) in the list
1194 forker->sighup_pending_list = hup_child->next;
1196 // splice it from the list
1197 prev_hup_child->next = hup_child->next;
1200 free(hup_child); // clean up the thin clone
1201 kill(hup_pid, SIGKILL);
1206 prev_hup_child = hup_child;
1207 hup_child = hup_child->next;
1212 // Remove the child from the active list
1213 if( forker->first_child == cur_child ) {
1214 if( cur_child->next == cur_child ) {
1215 // only child in the active list
1216 forker->first_child = NULL;
1218 forker->first_child = cur_child->next;
1221 cur_child->next->prev = cur_child->prev;
1222 cur_child->prev->next = cur_child->next;
1224 // Add it to the idle list
1225 cur_child->prev = NULL;
1226 cur_child->next = forker->idle_list;
1227 forker->idle_list = cur_child;
1231 cur_child = next_child;
1232 } while( forker->first_child && forker->first_child != next_child );
1238 @brief Service up a set maximum number of requests; then shut down.
1239 @param child Pointer to the prefork_child representing the child process.
1241 Called only by child process.
1243 Enter a loop, for up to max_requests iterations. On each iteration:
1244 - Wait indefinitely for a request from the parent.
1245 - Service the request.
1246 - Increment a counter. If the limit hasn't been reached, notify the parent that you
1247 are available for another request.
1249 After exiting the loop, shut down and terminate the process.
1251 static void prefork_child_wait( prefork_child* child ) {
1254 growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
1255 char buf[READ_BUFSIZE];
1257 for( i = 0; i < child->max_requests; i++ ) {
1260 int gotdata = 0; // boolean; set to true if we get data
1261 clr_fl( child->read_data_fd, O_NONBLOCK );
1263 // Read a request from the parent, via a pipe, into a growing_buffer.
1264 while( (n = read( child->read_data_fd, buf, READ_BUFSIZE-1 )) > 0 ) {
1266 osrfLogDebug( OSRF_LOG_MARK, "Prefork child read %d bytes of data", n );
1268 set_fl( child->read_data_fd, O_NONBLOCK );
1271 buffer_add_n( gbuf, buf, n );
1274 if( errno == EAGAIN )
1277 if( errno == EPIPE ) {
1278 osrfLogDebug( OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting..." );
1282 int terminate_now = 0; // Boolean
1285 osrfLogWarning( OSRF_LOG_MARK,
1286 "Prefork child read returned error with errno %d", errno );
1289 } else if( gotdata ) {
1290 // Process the request
1291 osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
1292 terminate_now = prefork_child_process_request( child, gbuf->buf );
1293 buffer_reset( gbuf );
1296 if( terminate_now ) {
1297 // We're terminating prematurely -- presumably due to a fatal error condition.
1298 osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
1302 if( i < child->max_requests - 1 ) {
1303 // Report back to the parent for another request.
1305 ssize_t len = write(
1306 child->write_status_fd, "available" /*less than 64 bytes*/, msg_len );
1307 if( len != msg_len ) {
1308 osrfLogError( OSRF_LOG_MARK,
1309 "Drone terminating: unable to notify listener of availability: %s",
1311 buffer_free( gbuf );
1312 osrf_prefork_child_exit( child );
1317 buffer_free( gbuf );
1319 osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
1320 child->max_requests, i, (long) getpid());
1322 osrf_prefork_child_exit( child );
1326 @brief Add a prefork_child to the end of the active list.
1327 @param forker Pointer to the prefork_simple that owns the list.
1328 @param child Pointer to the prefork_child to be added.
1330 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
1332 if( forker->first_child == NULL ) {
1333 // Simplest case: list is initially empty.
1334 forker->first_child = child;
1335 child->next = child;
1336 child->prev = child;
1338 // Find the last node in the circular list.
1339 prefork_child* last_child = forker->first_child->prev;
1341 // Insert the new child between the last and first children.
1342 last_child->next = child;
1343 child->prev = last_child;
1344 child->next = forker->first_child;
1345 forker->first_child->prev = child;
1350 @brief Delete and destroy a dead child from our list.
1351 @param forker Pointer to the prefork_simple that owns the dead child.
1352 @param pid Process ID of the dead child.
1354 Look for the dead child first in the list of active children. If you don't find it
1355 there, look in the list of idle children. If you find it, remove it from whichever
1356 list it's on, and destroy it.
1358 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
1360 osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
1362 prefork_child* cur_child = NULL;
1364 // Look first in the active list
1365 if( forker->first_child ) {
1366 cur_child = forker->first_child; /* current pointer */
1367 while( cur_child->pid != pid && cur_child->next != forker->first_child )
1368 cur_child = cur_child->next;
1370 if( cur_child->pid == pid ) {
1371 // We found the right node. Remove it from the list.
1372 if( cur_child->next == cur_child )
1373 forker->first_child = NULL; // only child in the list
1375 if( forker->first_child == cur_child )
1376 forker->first_child = cur_child->next; // Reseat forker->first_child
1378 // Stitch the adjacent nodes together
1379 cur_child->prev->next = cur_child->next;
1380 cur_child->next->prev = cur_child->prev;
1383 cur_child = NULL; // Didn't find it in the active list
1387 // Maybe it's in the idle list. This can happen if, for example,
1388 // a child is killed by a signal while it's between requests.
1390 prefork_child* prev = NULL;
1391 cur_child = forker->idle_list;
1392 while( cur_child && cur_child->pid != pid ) {
1394 cur_child = cur_child->next;
1398 // Detach from the list
1400 prev->next = cur_child->next;
1402 forker->idle_list = cur_child->next;
1403 } // else we can't find it
1406 // If we found the node, destroy it.
1408 prefork_child_free( forker, cur_child );
1412 @brief Create and initialize a prefork_child.
1413 @param forker Pointer to the prefork_simple that will own the prefork_child.
1414 @param read_data_fd Used by child to read request from parent.
1415 @param write_data_fd Used by parent to write request to child.
1416 @param read_status_fd Used by parent to read status from child.
1417 @param write_status_fd Used by child to write status to parent.
1418 @return Pointer to the newly created prefork_child.
1420 The calling code is responsible for freeing the prefork_child by calling
1421 prefork_child_free().
1423 static prefork_child* prefork_child_init( prefork_simple* forker,
1424 int read_data_fd, int write_data_fd,
1425 int read_status_fd, int write_status_fd ) {
1427 // Allocate a prefork_child -- from the free list if possible, or from
1428 // the heap if necessary. The free list is a non-circular, singly-linked list.
1429 prefork_child* child;
1430 if( forker->free_list ) {
1431 child = forker->free_list;
1432 forker->free_list = child->next;
1434 child = safe_malloc( sizeof( prefork_child ));
1437 child->read_data_fd = read_data_fd;
1438 child->write_data_fd = write_data_fd;
1439 child->read_status_fd = read_status_fd;
1440 child->write_status_fd = write_status_fd;
1441 child->max_requests = forker->max_requests;
1442 child->appname = forker->appname; // We don't make a separate copy
1443 child->keepalive = forker->keepalive;
1451 @brief Terminate all child processes and clear out a prefork_simple.
1452 @param prefork Pointer to the prefork_simple to be cleared out.
1454 We do not deallocate the prefork_simple itself, just its contents.
1456 static void prefork_clear( prefork_simple* prefork, bool graceful ) {
1458 // always de-register routers before killing child processes (or waiting
1459 // for them to complete) so that new requests are directed elsewhere.
1460 osrf_prefork_register_routers(global_forker->appname, true);
1462 while( prefork->first_child ) {
1465 // wait for at least one active child to become idle, then repeat.
1466 // once complete, all children will be idle and cleaned up below.
1467 osrfLogInfo(OSRF_LOG_MARK, "graceful shutdown waiting...");
1468 check_children(prefork, 1);
1471 // Kill and delete all the active children
1472 kill( prefork->first_child->pid, SIGKILL );
1473 del_prefork_child( prefork, prefork->first_child->pid );
1478 osrfLogInfo(OSRF_LOG_MARK,
1479 "all active children are now idle in graceful shutdown");
1482 // Kill all the idle prefork children, close their file
1483 // descriptors, and move them to the free list.
1484 prefork_child* child = prefork->idle_list;
1485 prefork->idle_list = NULL;
1487 prefork_child* temp = child->next;
1488 kill( child->pid, SIGKILL );
1489 prefork_child_free( prefork, child );
1492 //prefork->current_num_children = 0;
1494 // Physically free the free list of prefork_children.
1495 child = prefork->free_list;
1496 prefork->free_list = NULL;
1498 prefork_child* temp = child->next;
1503 // Close the Jabber connection
1504 client_free( prefork->connection );
1505 prefork->connection = NULL;
1507 // After giving the child processes a second to terminate, wait on them so that they
1508 // don't become zombies. We don't wait indefinitely, so it's possible that some
1509 // children will survive a bit longer.
1511 while( (waitpid( -1, NULL, WNOHANG )) > 0 ) {
1512 --prefork->current_num_children;
1515 free( prefork->appname );
1516 prefork->appname = NULL;
1520 @brief Destroy and deallocate a prefork_child.
1521 @param forker Pointer to the prefork_simple that owns the prefork_child.
1522 @param child Pointer to the prefork_child to be destroyed.
1524 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1525 close( child->read_data_fd );
1526 close( child->write_data_fd );
1527 close( child->read_status_fd );
1528 close( child->write_status_fd );
1530 // Stick the prefork_child in a free list for potential reuse. This is a
1531 // non-circular, singly linked list.
1533 child->next = forker->free_list;
1534 forker->free_list = child;