LP1204123 SIGUSR1 causes router unregister (Perl/C)
[OpenSRF.git] / src / libopensrf / osrf_prefork.c
1 /**
2         @file osrf_prefork.c
3         @brief Spawn and manage a collection of child process to service requests.
4
5         Spawn a collection of child processes, replacing them as needed.  Forward requests to them
6         and let the children do the work.
7
8         Each child processes some maximum number of requests before it terminates itself.  When a
9         child dies, either deliberately or otherwise, we can spawn another one to replace it,
10         keeping the number of children within a predefined range.
11
12         Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13         a request, and who are still working on them.  Use a separate linear linked list to keep
14         track of children that are currently idle.  Move them back and forth as needed.
15
16         For each child, set up two pipes:
17         - One for the parent to send requests to the child.
18         - One for the child to notify the parent that it is available for another request.
19
20         The message sent to the child represents an XML stanza as received from Jabber.
21
22         When the child finishes processing the request, it writes the string "available" back
23         to the parent.  Then the parent knows that it can send that child another request.
24 */
25
26 #include <errno.h>
27 #include <signal.h>
28 #include <sys/types.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #include <stdlib.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <sys/select.h>
35 #include <sys/wait.h>
36
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
43
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
46
47 typedef struct {
48         int max_requests;     /**< How many requests a child processes before terminating. */
49         int min_children;     /**< Minimum number of children to maintain. */
50         int max_children;     /**< Maximum number of children to maintain. */
51         int fd;               /**< Unused. */
52         int data_to_child;    /**< Unused. */
53         int data_to_parent;   /**< Unused. */
54         int current_num_children;   /**< How many children are currently on the list. */
55         int keepalive;        /**< Keepalive time for stateful sessions. */
56         char* appname;        /**< Name of the application. */
57         /** Points to a circular linked list of children. */
58         struct prefork_child_struct* first_child;
59         /** List of of child processes that aren't doing anything at the moment and are
60                 therefore available to service a new request. */
61         struct prefork_child_struct* idle_list;
62         /** List of allocated but unused prefork_children, available for reuse.  Each one is just
63                 raw memory, apart from the "next" pointer used to stitch them together.  In particular,
64                 there is no child process for them, and the file descriptors are not open. */
65         struct prefork_child_struct* free_list;
66         transport_client* connection;  /**< Connection to Jabber. */
67 } prefork_simple;
68
69 struct prefork_child_struct {
70         pid_t pid;            /**< Process ID of the child. */
71         int read_data_fd;     /**< Child uses to read request. */
72         int write_data_fd;    /**< Parent uses to write request. */
73         int read_status_fd;   /**< Parent reads to see if child is available. */
74         int write_status_fd;  /**< Child uses to notify parent when it's available again. */
75         int max_requests;     /**< How many requests a child can process before terminating. */
76         const char* appname;  /**< Name of the application. */
77         int keepalive;        /**< Keepalive time for stateful sessions. */
78         struct prefork_child_struct* next;  /**< Linkage pointer for linked list. */
79         struct prefork_child_struct* prev;  /**< Linkage pointer for linked list. */
80 };
81
82 typedef struct prefork_child_struct prefork_child;
83
84 /** Boolean.  Set to true by a signal handler when it traps SIGCHLD. */
85 static volatile sig_atomic_t child_dead;
86
87 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
88         int max_requests, int min_children, int max_children );
89 static prefork_child* launch_child( prefork_simple* forker );
90 static void prefork_launch_children( prefork_simple* forker );
91 static void prefork_run( prefork_simple* forker );
92 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
93
94 static void del_prefork_child( prefork_simple* forker, pid_t pid );
95 static int check_children( prefork_simple* forker, int forever );
96 static int  prefork_child_process_request( prefork_child*, char* data );
97 static int prefork_child_init_hook( prefork_child* );
98 static prefork_child* prefork_child_init( prefork_simple* forker,
99         int read_data_fd, int write_data_fd,
100         int read_status_fd, int write_status_fd );
101
102 /* listens on the 'data_to_child' fd and wait for incoming data */
103 static void prefork_child_wait( prefork_child* child );
104 static void prefork_clear( prefork_simple* );
105 static void prefork_child_free( prefork_simple* forker, prefork_child* );
106 static void osrf_prefork_register_routers( const char* appname, bool unregister );
107 static void osrf_prefork_child_exit( prefork_child* );
108
109 static void sigchld_handler( int sig );
110 static void sigusr1_handler( int sig );
111
112 /** Track the appname globally so we can refer back to it for 
113     router de-registration.  */
114 static const char* service_name = NULL;
115
116 /**
117         @brief Spawn and manage a collection of drone processes for servicing requests.
118         @param appname Name of the application.
119         @return 0 if successful, or -1 if error.
120 */
121 int osrf_prefork_run( const char* appname ) {
122
123         if( !appname ) {
124                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
125                 return -1;
126         }
127
128         set_proc_title( "OpenSRF Listener [%s]", appname );
129     service_name = appname;
130
131         int maxr = 1000;
132         int maxc = 10;
133         int minc = 3;
134         int kalive = 5;
135
136         // Get configuration settings
137         osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname );
138
139         char* max_req      = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
140         char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
141         char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
142         char* keepalive    = osrf_settings_host_value( "/apps/%s/keepalive", appname );
143
144         if( !keepalive )
145                 osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive );
146         else
147                 kalive = atoi( keepalive );
148
149         if( !max_req )
150                 osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr );
151         else
152                 maxr = atoi( max_req );
153
154         if( !min_children )
155                 osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming %d", minc );
156         else
157                 minc = atoi( min_children );
158
159         if( !max_children )
160                 osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming %d", maxc );
161         else
162                 maxc = atoi( max_children );
163
164         free( keepalive );
165         free( max_req );
166         free( min_children );
167         free( max_children );
168         /* --------------------------------------------------- */
169
170         char* resc = va_list_to_string( "%s_listener", appname );
171
172         // Make sure that we haven't already booted
173         if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
174                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
175                 free( resc );
176                 return -1;
177         }
178
179         free( resc );
180
181         prefork_simple forker;
182
183         if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
184                 osrfLogError( OSRF_LOG_MARK,
185                         "osrf_prefork_run() failed to create prefork_simple object" );
186                 return -1;
187         }
188
189         // Finish initializing the prefork_simple.
190         forker.appname   = strdup( appname );
191         forker.keepalive = kalive;
192
193         // Spawn the children; put them in the idle list.
194         prefork_launch_children( &forker );
195
196         // Tell the router that you're open for business.
197         osrf_prefork_register_routers( appname, false );
198
199     signal( SIGUSR1, sigusr1_handler );
200
201         // Sit back and let the requests roll in
202         osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname );
203         prefork_run( &forker );
204
205         osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??" );
206         prefork_clear( &forker );
207         return 0;
208 }
209
210 /**
211         @brief Register the application with a specified router.
212         @param appname Name of the application.
213         @param routerName Name of the router.
214         @param routerDomain Domain of the router.
215
216         Tell the router that you're open for business so that it can route requests to you.
217
218         Called only by the parent process.
219 */
220 static void osrf_prefork_send_router_registration(
221                 const char* appname, const char* routerName, 
222             const char* routerDomain, bool unregister ) {
223
224         // Get a pointer to the global transport_client
225         transport_client* client = osrfSystemGetTransportClient();
226
227         // Construct the Jabber address of the router
228         char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
229
230         // Create the registration message, and send it
231         transport_message* msg;
232     if (unregister) {
233
234             osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
235             msg = message_init( "unregistering", NULL, NULL, jid, NULL );
236             message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
237
238     } else {
239
240             osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
241             msg = message_init( "registering", NULL, NULL, jid, NULL );
242             message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
243     }
244
245         client_send_message( client, msg );
246
247         // Clean up
248         message_free( msg );
249         free( jid );
250 }
251
252 /**
253         @brief Register with a router, or not, according to some config settings.
254         @param appname Name of the application
255         @param RouterChunk A representation of part of the config file.
256
257         Parse a "complex" router configuration chunk.
258
259         Examine the services listed for a given router (normally in opensrf_core.xml).  If
260         there is an entry for this service, or if there are @em no services listed, then
261         register with this router.  Otherwise don't.
262
263         Called only by the parent process.
264 */
265 static void osrf_prefork_parse_router_chunk( 
266     const char* appname, const jsonObject* routerChunk, bool unregister ) {
267
268         const char* routerName = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "name" ));
269         const char* domain = jsonObjectGetString( jsonObjectGetKeyConst( routerChunk, "domain" ));
270         const jsonObject* services = jsonObjectGetKeyConst( routerChunk, "services" );
271         osrfLogDebug( OSRF_LOG_MARK, "found router config with domain %s and name %s",
272                 routerName, domain );
273
274         if( services && services->type == JSON_HASH ) {
275                 osrfLogDebug( OSRF_LOG_MARK, "investigating router information..." );
276                 const jsonObject* service_obj = jsonObjectGetKeyConst( services, "service" );
277                 if( !service_obj )
278                         ;    // do nothing (shouldn't happen)
279                 else if( JSON_ARRAY == service_obj->type ) {
280                         // There are multiple services listed.  Register with this router
281                         // if and only if this service is on the list.
282                         int j;
283                         for( j = 0; j < service_obj->size; j++ ) {
284                                 const char* service = jsonObjectGetString( jsonObjectGetIndex( service_obj, j ));
285                                 if( service && !strcmp( appname, service ))
286                                         osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
287                         }
288                 }
289                 else if( JSON_STRING == service_obj->type ) {
290                         // There's only one service listed.  Register with this router
291                         // if and only if this service is the one listed.
292                         if( !strcmp( appname, jsonObjectGetString( service_obj )) )
293                                 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
294                 }
295         } else {
296                 // This router is not restricted to any set of services,
297                 // so go ahead and register with it.
298                 osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
299         }
300 }
301
302 /**
303         @brief Register the application with one or more routers, according to the configuration.
304         @param appname Name of the application.
305
306         Called only by the parent process.
307 */
308 static void osrf_prefork_register_routers( const char* appname, bool unregister ) {
309
310         jsonObject* routerInfo = osrfConfigGetValueObject( NULL, "/routers/router" );
311
312         int i;
313         for( i = 0; i < routerInfo->size; i++ ) {
314                 const jsonObject* routerChunk = jsonObjectGetIndex( routerInfo, i );
315
316                 if( routerChunk->type == JSON_STRING ) {
317                         /* this accomodates simple router configs */
318                         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
319                         char* domain = osrfConfigGetValue( NULL, "/routers/router" );
320                         osrfLogDebug( OSRF_LOG_MARK, "found simple router settings with router name %s",
321                                 routerName );
322                         osrf_prefork_send_router_registration( appname, routerName, domain, unregister );
323
324                         free( routerName );
325                         free( domain );
326                 } else {
327                         osrf_prefork_parse_router_chunk( appname, routerChunk, unregister );
328                 }
329         }
330
331         jsonObjectFree( routerInfo );
332 }
333
334 /**
335         @brief Initialize a child process.
336         @param child Pointer to the prefork_child representing the new child process.
337         @return Zero if successful, or -1 if not.
338
339         Called only by child processes.  Actions:
340         - Connect to one or more cache servers
341         - Reconfigure logger, if necessary
342         - Discard parent's Jabber connection and open a new one
343         - Dynamically call an application-specific initialization routine
344         - Change the command line as reported by ps
345 */
346 static int prefork_child_init_hook( prefork_child* child ) {
347
348         if( !child ) return -1;
349         osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid );
350
351         // Connect to cache server(s).
352         osrfSystemInitCache();
353         char* resc = va_list_to_string( "%s_drone", child->appname );
354
355         // If we're a source-client, tell the logger now that we're a new process.
356         char* isclient = osrfConfigGetValue( NULL, "/client" );
357         if( isclient && !strcasecmp( isclient,"true" ))
358                 osrfLogSetIsClient( 1 );
359         free( isclient );
360
361         // Remove traces of our parent's socket connection so we can have our own.
362         osrfSystemIgnoreTransportClient();
363
364         // Connect to Jabber
365         if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
366                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
367                 free( resc );
368                 return -1;
369         }
370
371         free( resc );
372
373         // Dynamically call the application-specific initialization function
374         // from a previously loaded shared library.
375         if( ! osrfAppRunChildInit( child->appname )) {
376                 osrfLogDebug( OSRF_LOG_MARK, "Prefork child_init succeeded\n" );
377         } else {
378                 osrfLogError( OSRF_LOG_MARK, "Prefork child_init failed\n" );
379                 return -1;
380         }
381
382         // Change the command line as reported by ps
383         set_proc_title( "OpenSRF Drone [%s]", child->appname );
384         return 0;
385 }
386
387 /**
388         @brief Respond to a client request forwarded by the parent.
389         @param child Pointer to the state of the child process.
390         @param data Pointer to the raw XMPP message received from the parent.
391         @return 0 on success; non-zero means that the child process should clean itself up
392                 and terminate immediately, presumably due to a fatal error condition.
393
394         Called only by a child process.
395 */
396 static int prefork_child_process_request( prefork_child* child, char* data ) {
397         if( !child ) return 0;
398
399         transport_client* client = osrfSystemGetTransportClient();
400
401         // Make sure that we're still connected to Jabber; reconnect if necessary.
402         if( !client_connected( client )) {
403                 osrfSystemIgnoreTransportClient();
404                 osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
405                 if( !osrf_system_bootstrap_client( NULL, NULL )) {
406                         osrfLogError( OSRF_LOG_MARK,
407                                 "Unable to bootstrap client in prefork_child_process_request()" );
408                         sleep( 1 );
409                         osrf_prefork_child_exit( child );
410                 }
411         }
412
413         // Construct the message from the xml.
414         transport_message* msg = new_message_from_xml( data );
415
416         // Respond to the transport message.  This is where method calls are buried.
417         osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
418         if( !session )
419                 return 0;
420
421         int rc = session->panic;
422
423         if( rc ) {
424                 osrfLogWarning( OSRF_LOG_MARK,
425                         "Drone for session %s terminating immediately", session->session_id );
426                 osrfAppSessionFree( session );
427                 return rc;
428         }
429
430         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
431                 // We're no longer connected to the client, which presumably means that
432                 // we're done with this request.  Bail out.
433                 osrfAppSessionFree( session );
434                 return rc;
435         }
436
437         // If we get this far, then the client has opened an application connection so that it
438         // can send multiple requests directly to the same server drone, bypassing the router
439         // and the listener.  For example, it may need to do a database transaction, requiring
440         // multiple method calls within the same database session.
441
442         // Hence we go into a loop, responding to successive requests from the same client, until
443         // either the client disconnects or an error occurs.
444
445         osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
446         int keepalive = child->keepalive;
447         int retval;
448         int recvd;
449         time_t start;
450         time_t end;
451
452         while( 1 ) {
453
454                 // Respond to any input messages.  This is where the method calls are buried.
455                 osrfLogDebug( OSRF_LOG_MARK,
456                         "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive );
457                 start   = time( NULL );
458                 retval  = osrf_app_session_queue_wait( session, keepalive, &recvd );
459                 end     = time( NULL );
460
461                 osrfLogDebug( OSRF_LOG_MARK, "Data received == %d", recvd );
462
463                 // Now we check a number of possible reasons to exit the loop.
464
465                 // If the method call decided to terminate immediately,
466                 // note that for future reference.
467                 if( session->panic )
468                         rc = 1;
469
470                 // If an error occurred when we tried to service the request, exit the loop.
471                 if( retval ) {
472                         osrfLogError( OSRF_LOG_MARK, "queue-wait returned non-success %d", retval );
473                         break;
474                 }
475
476                 // If the client disconnected, exit the loop.
477                 if( session->state != OSRF_SESSION_CONNECTED )
478                         break;
479
480                 // If we timed out while waiting for a request, exit the loop.
481                 if( !recvd && (end - start) >= keepalive ) {
482                         osrfLogInfo( OSRF_LOG_MARK,
483                                 "No request was received in %d seconds, exiting stateful session", keepalive );
484                         osrfAppSessionStatus(
485                                 session,
486                                 OSRF_STATUS_TIMEOUT,
487                                 "osrfConnectStatus",
488                                 0, "Disconnected on timeout" );
489
490                         break;
491                 }
492
493                 // If the child process has decided to terminate immediately, exit the loop.
494                 if( rc )
495                         break;
496         }
497
498         osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
499         osrfAppSessionFree( session );
500         return rc;
501 }
502
503 /**
504         @brief Partially initialize a prefork_simple provided by the caller.
505         @param prefork Pointer to a a raw prefork_simple to be initialized.
506         @param client Pointer to a transport_client (connection to Jabber).
507         @param max_requests The maximum number of requests that a child process may service
508                         before terminating.
509         @param min_children Minimum number of child processes to maintain.
510         @param max_children Maximum number of child processes to maintain.
511         @return 0 if successful, or 1 if not (due to invalid parameters).
512 */
513 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
514                 int max_requests, int min_children, int max_children ) {
515
516         if( min_children > max_children ) {
517                 osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
518                         "than max_children (%d)", min_children, max_children );
519                 return 1;
520         }
521
522         if( max_children > ABS_MAX_CHILDREN ) {
523                 osrfLogError( OSRF_LOG_MARK,  "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
524                         max_children, ABS_MAX_CHILDREN );
525                 return 1;
526         }
527
528         osrfLogInfo( OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
529                 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
530
531         /* flesh out the struct */
532         prefork->max_requests = max_requests;
533         prefork->min_children = min_children;
534         prefork->max_children = max_children;
535         prefork->fd           = 0;
536         prefork->data_to_child = 0;
537         prefork->data_to_parent = 0;
538         prefork->current_num_children = 0;
539         prefork->keepalive    = 0;
540         prefork->appname      = NULL;
541         prefork->first_child  = NULL;
542         prefork->idle_list    = NULL;
543         prefork->free_list    = NULL;
544         prefork->connection   = client;
545
546         return 0;
547 }
548
549 /**
550         @brief Spawn a new child process and put it in the idle list.
551         @param forker Pointer to the prefork_simple that will own the process.
552         @return Pointer to the new prefork_child, or not at all.
553
554         Spawn a new child process.  Create a prefork_child for it and put it in the idle list.
555
556         After forking, the parent returns a pointer to the new prefork_child.  The child
557         services its quota of requests and then terminates without returning.
558 */
559 static prefork_child* launch_child( prefork_simple* forker ) {
560
561         pid_t pid;
562         int data_fd[2];
563         int status_fd[2];
564
565         // Set up the data and status pipes
566         if( pipe( data_fd ) < 0 ) { /* build the data pipe*/
567                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
568                 return NULL;
569         }
570
571         if( pipe( status_fd ) < 0 ) {/* build the status pipe */
572                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
573                 close( data_fd[1] );
574                 close( data_fd[0] );
575                 return NULL;
576         }
577
578         osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
579                 data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
580
581         // Create and initialize a prefork_child for the new process
582         prefork_child* child = prefork_child_init( forker, data_fd[0],
583                 data_fd[1], status_fd[0], status_fd[1] );
584
585         if( (pid=fork()) < 0 ) {
586                 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
587                 prefork_child_free( forker, child );
588                 return NULL;
589         }
590
591         // Add the new child to the head of the idle list
592         child->next = forker->idle_list;
593         forker->idle_list = child;
594
595         if( pid > 0 ) {  /* parent */
596
597                 signal( SIGCHLD, sigchld_handler );
598                 ( forker->current_num_children )++;
599                 child->pid = pid;
600
601                 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
602                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
603                         the children are currently using */
604                 return child;
605         }
606
607         else { /* child */
608
609         // ignore the unregister-router signal.  Belt+suspenders protection 
610         // against sending USR1 to the wrong PID.
611                 signal( SIGUSR1, SIG_IGN );
612
613                 osrfLogInternal( OSRF_LOG_MARK,
614                         "I am new child with read_data_fd = %d and write_status_fd = %d",
615                         child->read_data_fd, child->write_status_fd );
616
617                 child->pid = getpid();
618                 close( child->write_data_fd );
619                 close( child->read_status_fd );
620
621                 /* do the initing */
622                 if( prefork_child_init_hook( child ) == -1 ) {
623                         osrfLogError( OSRF_LOG_MARK,
624                                 "Forker child going away because we could not connect to OpenSRF..." );
625                         osrf_prefork_child_exit( child );
626                 }
627
628                 prefork_child_wait( child );      // Should exit without returning
629                 osrf_prefork_child_exit( child ); // Just to be sure
630                 return NULL;  // Unreachable, but it keeps the compiler happy
631         }
632 }
633
634 /**
635         @brief Terminate a child process.
636         @param child Pointer to the prefork_child representing the child process (not used).
637
638         Called only by child processes.  Dynamically call an application-specific shutdown
639         function from a previously loaded shared library; then exit.
640 */
641 static void osrf_prefork_child_exit( prefork_child* child ) {
642         osrfAppRunExitCode();
643         exit( 0 );
644 }
645
646 /**
647         @brief Launch all the child processes, putting them in the idle list.
648         @param forker Pointer to the prefork_simple that will own the children.
649
650         Called only by the parent process (in order to become a parent).
651 */
652 static void prefork_launch_children( prefork_simple* forker ) {
653         if( !forker ) return;
654         int c = 0;
655         while( c++ < forker->min_children )
656                 launch_child( forker );
657 }
658
659 /**
660         @brief Signal handler for SIGCHLD: note that a child process has terminated.
661         @param sig The value of the trapped signal; always SIGCHLD.
662
663         Set a boolean to be checked later.
664 */
665 static void sigchld_handler( int sig ) {
666         signal( SIGCHLD, sigchld_handler );
667         child_dead = 1;
668 }
669
670 /**
671         @brief Signal handler for SIGUSR1
672         @param sig The value of the trapped signal; always SIGUSR1.
673
674     Send unregister command to all registered routers.
675 */
676 static void sigusr1_handler( int sig ) {
677     osrf_prefork_register_routers(service_name, true);
678         signal( SIGUSR1, sigusr1_handler );
679 }
680
681 /**
682         @brief Replenish the collection of child processes, after one has terminated.
683         @param forker Pointer to the prefork_simple that manages the child processes.
684
685         The parent calls this function when it notices (via a signal handler) that
686         a child process has died.
687
688         Wait on the dead children so that they won't be zombies.  Spawn new ones as needed
689         to maintain at least a minimum number.
690 */
691 void reap_children( prefork_simple* forker ) {
692
693         pid_t child_pid;
694
695         // Reset our boolean so that we can detect any further terminations.
696         child_dead = 0;
697
698         // Bury the children so that they won't be zombies.  WNOHANG means that waitpid() returns
699         // immediately if there are no waitable children, instead of waiting for more to die.
700         // Ignore the return code of the child.  We don't do an autopsy.
701         while( (child_pid = waitpid( -1, NULL, WNOHANG )) > 0 ) {
702                 --forker->current_num_children;
703                 del_prefork_child( forker, child_pid );
704         }
705
706         // Spawn more children as needed.
707         while( forker->current_num_children < forker->min_children )
708                 launch_child( forker );
709 }
710
711 /**
712         @brief Read transport_messages and dispatch them to child processes for servicing.
713         @param forker Pointer to the prefork_simple that manages the child processes.
714
715         This is the main loop of the parent process, and once entered, does not exit.
716
717         For each usable transport_message received: look for an idle child to service it.  If
718         no idle children are available, either spawn a new one or, if we've already spawned the
719         maximum number of children, wait for one to become available.  Once a child is available
720         by whatever means, write an XML version of the input message, to a pipe designated for
721         use by that child.
722 */
723 static void prefork_run( prefork_simple* forker ) {
724
725         if( NULL == forker->idle_list )
726                 return;   // No available children, and we haven't even started yet
727
728         transport_message* cur_msg = NULL;
729
730         while( 1 ) {
731
732                 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
733                         osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
734                         return;
735                 }
736
737                 // Wait indefinitely for an input message
738                 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
739                 cur_msg = client_recv( forker->connection, -1 );
740
741                 if( cur_msg == NULL )
742                         continue;           // Error?  Interrupted by a signal?  Try again...
743
744                 message_prepare_xml( cur_msg );
745                 const char* msg_data = cur_msg->msg_xml;
746                 if( ! msg_data || ! *msg_data ) {
747                         osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
748                                 (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
749                         message_free( cur_msg );
750                         continue;       // Message not usable; go on to the next one.
751                 }
752
753                 int honored = 0;     /* will be set to true when we service the request */
754                 int no_recheck = 0;
755
756                 while( ! honored ) {
757
758             if( !no_recheck ) {
759                 if(check_children( forker, 0 ) < 0) {
760                     continue; // check failed, try again
761                 }
762             }
763             no_recheck = 0;
764
765                         osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
766
767                         prefork_child* cur_child = NULL;
768
769                         // Look for an available child in the idle list.  Since the idle list operates
770                         // as a stack, the child we get is the one that was most recently active, or
771                         // most recently spawned.  That means it's the one most likely still to be in
772                         // physical memory, and the one least likely to have to be swapped in.
773                         while( forker->idle_list ) {
774
775                                 osrfLogDebug( OSRF_LOG_MARK, "Looking for idle child" );
776                                 // Grab the prefork_child at the head of the idle list
777                                 cur_child = forker->idle_list;
778                                 forker->idle_list = cur_child->next;
779                                 cur_child->next = NULL;
780
781                                 osrfLogInternal( OSRF_LOG_MARK,
782                                         "Searching for available child. cur_child->pid = %d", cur_child->pid );
783                                 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
784                                         forker->current_num_children );
785
786                                 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
787                                 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
788                                         cur_child->write_data_fd );
789
790                                 int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
791                                 if( written < 0 ) {
792                                         // This child appears to be dead or unusable.  Discard it.
793                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
794                                                 errno, strerror( errno ));
795                                         kill( cur_child->pid, SIGKILL );
796                                         del_prefork_child( forker, cur_child->pid );
797                                         continue;
798                                 }
799
800                                 add_prefork_child( forker, cur_child );  // Add it to active list
801                                 honored = 1;
802                                 break;
803                         }
804
805                         /* if none available, add a new child if we can */
806                         if( ! honored ) {
807                                 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add..." );
808
809                                 if( forker->current_num_children < forker->max_children ) {
810                                         osrfLogDebug( OSRF_LOG_MARK,  "Launching new child with current_num = %d",
811                                                 forker->current_num_children );
812
813                                         launch_child( forker );  // Put a new child into the idle list
814                                         if( forker->idle_list ) {
815
816                                                 // Take the new child from the idle list
817                                                 prefork_child* new_child = forker->idle_list;
818                                                 forker->idle_list = new_child->next;
819                                                 new_child->next = NULL;
820
821                                                 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
822                                                         new_child->write_data_fd, new_child->pid );
823
824                                                 int written = write(
825                                                         new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
826                                                 if( written < 0 ) {
827                                                         // This child appears to be dead or unusable.  Discard it.
828                                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
829                                                                 errno, strerror( errno ));
830                                                         kill( cur_child->pid, SIGKILL );
831                                                         del_prefork_child( forker, cur_child->pid );
832                                                 } else {
833                                                         add_prefork_child( forker, new_child );
834                                                         honored = 1;
835                                                 }
836                                         }
837                 } else {
838                     osrfLogWarning( OSRF_LOG_MARK, "Could not launch a new child as %d children "
839                         "were already running; consider increasing max_children for this "
840                         "application higher than %d in the OpenSRF configuration if this "
841                         "message occurs frequently",
842                         forker->current_num_children, forker->max_children );
843                 }
844             }
845
846                         if( !honored ) {
847                                 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
848                                 if( check_children( forker, 1 ) >= 0 ) {
849                                     // Tell the loop not to call check_children again, since we just successfully called it
850                                     no_recheck = 1;
851                 }
852                         }
853
854                         if( child_dead )
855                                 reap_children( forker );
856
857                 } // end while( ! honored )
858
859                 message_free( cur_msg );
860
861         } /* end top level listen loop */
862 }
863
864
865 /**
866         @brief See if any children have become available.
867         @param forker Pointer to the prefork_simple that owns the children.
868         @param forever Boolean: true if we should wait indefinitely.
869     @return 0 or greater if successful, -1 on select error/interrupt
870
871         Call select() for all the children in the active list.  Read each active file
872         descriptor and move the corresponding child to the idle list.
873
874         If @a forever is true, wait indefinitely for input.  Otherwise return immediately if
875         there are no active file descriptors.
876 */
877 static int check_children( prefork_simple* forker, int forever ) {
878
879         if( child_dead )
880                 reap_children( forker );
881
882         if( NULL == forker->first_child ) {
883                 // If forever is true, then we're here because we've run out of idle
884                 // processes, so there should be some active ones around.
885                 // If forever is false, then the children may all be idle, and that's okay.
886                 if( forever )
887                         osrfLogError( OSRF_LOG_MARK, "No active child processes to check" );
888                 return 0;
889         }
890
891         int select_ret;
892         fd_set read_set;
893         FD_ZERO( &read_set );
894         int max_fd = 0;
895         int n;
896
897         // Prepare to select() on pipes from all the active children
898         prefork_child* cur_child = forker->first_child;
899         do {
900                 if( cur_child->read_status_fd > max_fd )
901                         max_fd = cur_child->read_status_fd;
902                 FD_SET( cur_child->read_status_fd, &read_set );
903                 cur_child = cur_child->next;
904         } while( cur_child != forker->first_child );
905
906         FD_CLR( 0, &read_set ); /* just to be sure */
907
908         if( forever ) {
909                 osrfLogWarning( OSRF_LOG_MARK,
910                         "We have no children available - waiting for one to show up..." );
911
912                 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL )) == -1 ) {
913                         osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
914                                 errno, strerror( errno ));
915                 }
916                 osrfLogInfo( OSRF_LOG_MARK,
917                         "select() completed after waiting on children to become available" );
918
919         } else {
920
921                 struct timeval tv;
922                 tv.tv_sec   = 0;
923                 tv.tv_usec  = 0;
924
925                 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv )) == -1 ) {
926                         osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
927                                 errno, strerror( errno ));
928                 }
929         }
930
931     if( select_ret <= 0 ) // we're done here
932         return select_ret;
933
934         // Check each child in the active list.
935         // If it has responded, move it to the idle list.
936         cur_child = forker->first_child;
937         prefork_child* next_child = NULL;
938         int num_handled = 0;
939         do {
940                 next_child = cur_child->next;
941                 if( FD_ISSET( cur_child->read_status_fd, &read_set )) {
942                         osrfLogDebug( OSRF_LOG_MARK,
943                                 "Server received status from a child %d", cur_child->pid );
944
945                         num_handled++;
946
947                         /* now suck off the data */
948                         char buf[64];
949                         if( (n=read( cur_child->read_status_fd, buf, sizeof( buf ) - 1 )) < 0 ) {
950                                 osrfLogWarning( OSRF_LOG_MARK,
951                                         "Read error after select in child status read with errno %d: %s",
952                                         errno, strerror( errno ));
953                         }
954                         else {
955                                 buf[n] = '\0';
956                                 osrfLogDebug( OSRF_LOG_MARK,  "Read %d bytes from status buffer: %s", n, buf );
957                         }
958
959                         // Remove the child from the active list
960                         if( forker->first_child == cur_child ) {
961                                 if( cur_child->next == cur_child )
962                                         forker->first_child = NULL;   // only child in the active list
963                                 else
964                                         forker->first_child = cur_child->next;
965                         }
966                         cur_child->next->prev = cur_child->prev;
967                         cur_child->prev->next = cur_child->next;
968
969                         // Add it to the idle list
970                         cur_child->prev = NULL;
971                         cur_child->next = forker->idle_list;
972                         forker->idle_list = cur_child;
973                 }
974                 cur_child = next_child;
975         } while( forker->first_child && forker->first_child != next_child );
976
977     return select_ret;
978 }
979
980 /**
981         @brief Service up a set maximum number of requests; then shut down.
982         @param child Pointer to the prefork_child representing the child process.
983
984         Called only by child process.
985
986         Enter a loop, for up to max_requests iterations.  On each iteration:
987         - Wait indefinitely for a request from the parent.
988         - Service the request.
989         - Increment a counter.  If the limit hasn't been reached, notify the parent that you
990         are available for another request.
991
992         After exiting the loop, shut down and terminate the process.
993 */
994 static void prefork_child_wait( prefork_child* child ) {
995
996         int i,n;
997         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
998         char buf[READ_BUFSIZE];
999
1000         for( i = 0; i < child->max_requests; i++ ) {
1001
1002                 n = -1;
1003                 int gotdata = 0;    // boolean; set to true if we get data
1004                 clr_fl( child->read_data_fd, O_NONBLOCK );
1005
1006                 // Read a request from the parent, via a pipe, into a growing_buffer.
1007                 while( (n = read( child->read_data_fd, buf, READ_BUFSIZE-1 )) > 0 ) {
1008                         buf[n] = '\0';
1009                         osrfLogDebug( OSRF_LOG_MARK, "Prefork child read %d bytes of data", n );
1010                         if( !gotdata ) {
1011                                 set_fl( child->read_data_fd, O_NONBLOCK );
1012                                 gotdata = 1;
1013                         }
1014                         buffer_add_n( gbuf, buf, n );
1015                 }
1016
1017                 if( errno == EAGAIN )
1018                         n = 0;
1019
1020                 if( errno == EPIPE ) {
1021                         osrfLogDebug( OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting..." );
1022                         break;
1023                 }
1024
1025                 int terminate_now = 0;     // Boolean
1026
1027                 if( n < 0 ) {
1028                         osrfLogWarning( OSRF_LOG_MARK,
1029                                 "Prefork child read returned error with errno %d", errno );
1030                         break;
1031
1032                 } else if( gotdata ) {
1033                         // Process the request
1034                         osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
1035                         terminate_now = prefork_child_process_request( child, gbuf->buf );
1036                         buffer_reset( gbuf );
1037                 }
1038
1039                 if( terminate_now ) {
1040                         // We're terminating prematurely -- presumably due to a fatal error condition.
1041                         osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
1042                         break;
1043                 }
1044
1045                 if( i < child->max_requests - 1 ) {
1046                         // Report back to the parent for another request.
1047                         size_t msg_len = 9;
1048                         ssize_t len = write(
1049                                 child->write_status_fd, "available" /*less than 64 bytes*/, msg_len );
1050                         if( len != msg_len ) {
1051                                 osrfLogError( OSRF_LOG_MARK,
1052                                         "Drone terminating: unable to notify listener of availability: %s",
1053                                         strerror( errno ));
1054                                 buffer_free( gbuf );
1055                                 osrf_prefork_child_exit( child );
1056                         }
1057                 }
1058         }
1059
1060         buffer_free( gbuf );
1061
1062         osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
1063                 child->max_requests, i, (long) getpid());
1064
1065         osrf_prefork_child_exit( child );
1066 }
1067
1068 /**
1069         @brief Add a prefork_child to the end of the active list.
1070         @param forker Pointer to the prefork_simple that owns the list.
1071         @param child Pointer to the prefork_child to be added.
1072 */
1073 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
1074
1075         if( forker->first_child == NULL ) {
1076                 // Simplest case: list is initially empty.
1077                 forker->first_child = child;
1078                 child->next = child;
1079                 child->prev = child;
1080         } else {
1081                 // Find the last node in the circular list.
1082                 prefork_child* last_child = forker->first_child->prev;
1083
1084                 // Insert the new child between the last and first children.
1085                 last_child->next = child;
1086                 child->prev      = last_child;
1087                 child->next      = forker->first_child;
1088                 forker->first_child->prev = child;
1089         }
1090 }
1091
1092 /**
1093         @brief Delete and destroy a dead child from our list.
1094         @param forker Pointer to the prefork_simple that owns the dead child.
1095         @param pid Process ID of the dead child.
1096
1097         Look for the dead child first in the list of active children.  If you don't find it
1098         there, look in the list of idle children.  If you find it, remove it from whichever
1099         list it's on, and destroy it.
1100 */
1101 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
1102
1103         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
1104
1105         prefork_child* cur_child = NULL;
1106
1107         // Look first in the active list
1108         if( forker->first_child ) {
1109                 cur_child = forker->first_child; /* current pointer */
1110                 while( cur_child->pid != pid && cur_child->next != forker->first_child )
1111                         cur_child = cur_child->next;
1112
1113                 if( cur_child->pid == pid ) {
1114                         // We found the right node.  Remove it from the list.
1115                         if( cur_child->next == cur_child )
1116                                 forker->first_child = NULL;    // only child in the list
1117                         else {
1118                                 if( forker->first_child == cur_child )
1119                                         forker->first_child = cur_child->next;  // Reseat forker->first_child
1120
1121                                 // Stitch the adjacent nodes together
1122                                 cur_child->prev->next = cur_child->next;
1123                                 cur_child->next->prev = cur_child->prev;
1124                         }
1125                 } else
1126                         cur_child = NULL;  // Didn't find it in the active list
1127         }
1128
1129         if( ! cur_child ) {
1130                 // Maybe it's in the idle list.  This can happen if, for example,
1131                 // a child is killed by a signal while it's between requests.
1132
1133                 prefork_child* prev = NULL;
1134                 cur_child = forker->idle_list;
1135                 while( cur_child && cur_child->pid != pid ) {
1136                         prev = cur_child;
1137                         cur_child = cur_child->next;
1138                 }
1139
1140                 if( cur_child ) {
1141                         // Detach from the list
1142                         if( prev )
1143                                 prev->next = cur_child->next;
1144                         else
1145                                 forker->idle_list = cur_child->next;
1146                 } // else we can't find it
1147         }
1148
1149         // If we found the node, destroy it.
1150         if( cur_child )
1151                 prefork_child_free( forker, cur_child );
1152 }
1153
1154 /**
1155         @brief Create and initialize a prefork_child.
1156         @param forker Pointer to the prefork_simple that will own the prefork_child.
1157         @param read_data_fd Used by child to read request from parent.
1158         @param write_data_fd Used by parent to write request to child.
1159         @param read_status_fd Used by parent to read status from child.
1160         @param write_status_fd Used by child to write status to parent.
1161         @return Pointer to the newly created prefork_child.
1162
1163         The calling code is responsible for freeing the prefork_child by calling
1164         prefork_child_free().
1165 */
1166 static prefork_child* prefork_child_init( prefork_simple* forker,
1167         int read_data_fd, int write_data_fd,
1168         int read_status_fd, int write_status_fd ) {
1169
1170         // Allocate a prefork_child -- from the free list if possible, or from
1171         // the heap if necessary.  The free list is a non-circular, singly-linked list.
1172         prefork_child* child;
1173         if( forker->free_list ) {
1174                 child = forker->free_list;
1175                 forker->free_list = child->next;
1176         } else
1177                 child = safe_malloc( sizeof( prefork_child ));
1178
1179         child->pid              = 0;
1180         child->read_data_fd     = read_data_fd;
1181         child->write_data_fd    = write_data_fd;
1182         child->read_status_fd   = read_status_fd;
1183         child->write_status_fd  = write_status_fd;
1184         child->max_requests     = forker->max_requests;
1185         child->appname          = forker->appname;  // We don't make a separate copy
1186         child->keepalive        = forker->keepalive;
1187         child->next             = NULL;
1188         child->prev             = NULL;
1189
1190         return child;
1191 }
1192
1193 /**
1194         @brief Terminate all child processes and clear out a prefork_simple.
1195         @param prefork Pointer to the prefork_simple to be cleared out.
1196
1197         We do not deallocate the prefork_simple itself, just its contents.
1198 */
1199 static void prefork_clear( prefork_simple* prefork ) {
1200
1201         // Kill all the active children, and move their prefork_child nodes to the free list.
1202         while( prefork->first_child ) {
1203                 kill( prefork->first_child->pid, SIGKILL );
1204                 del_prefork_child( prefork, prefork->first_child->pid );
1205         }
1206
1207         // Kill all the idle prefork children, close their file
1208         // descriptors, and move them to the free list.
1209         prefork_child* child = prefork->idle_list;
1210         prefork->idle_list = NULL;
1211         while( child ) {
1212                 prefork_child* temp = child->next;
1213                 kill( child->pid, SIGKILL );
1214                 prefork_child_free( prefork, child );
1215                 child = temp;
1216         }
1217         //prefork->current_num_children = 0;
1218
1219         // Physically free the free list of prefork_children.
1220         child = prefork->free_list;
1221         prefork->free_list = NULL;
1222         while( child ) {
1223                 prefork_child* temp = child->next;
1224                 free( child );
1225                 child = temp;
1226         }
1227
1228         // Close the Jabber connection
1229         client_free( prefork->connection );
1230         prefork->connection = NULL;
1231
1232         // After giving the child processes a second to terminate, wait on them so that they
1233         // don't become zombies.  We don't wait indefinitely, so it's possible that some
1234         // children will survive a bit longer.
1235         sleep( 1 );
1236         while( (waitpid( -1, NULL, WNOHANG )) > 0 ) {
1237                 --prefork->current_num_children;
1238         }
1239
1240         free( prefork->appname );
1241         prefork->appname = NULL;
1242 }
1243
1244 /**
1245         @brief Destroy and deallocate a prefork_child.
1246         @param forker Pointer to the prefork_simple that owns the prefork_child.
1247         @param child Pointer to the prefork_child to be destroyed.
1248 */
1249 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1250         close( child->read_data_fd );
1251         close( child->write_data_fd );
1252         close( child->read_status_fd );
1253         close( child->write_status_fd );
1254
1255         // Stick the prefork_child in a free list for potential reuse.  This is a
1256         // non-circular, singly linked list.
1257         child->prev = NULL;
1258         child->next = forker->free_list;
1259         forker->free_list = child;
1260 }