75f920708d04d478ee4a95d8a34e1cf66664b918
[OpenSRF.git] / src / libopensrf / osrf_prefork.c
1 /**
2         @file osrf_prefork.c
3         @brief Spawn and manage a collection of child process to service requests.
4
5         Spawn a collection of child processes, replacing them as needed.  Forward requests to them
6         and let the children do the work.
7
8         Each child processes some maximum number of requests before it terminates itself.  When a
9         child dies, either deliberately or otherwise, we can spawn another one to replace it,
10         keeping the number of children within a predefined range.
11
12         Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13         a request, and who are still working on them.  Use a separate linear linked list to keep
14         track of children that are currently idle.  Move them back and forth as needed.
15
16         For each child, set up two pipes:
17         - One for the parent to send requests to the child.
18         - One for the child to notify the parent that it is available for another request.
19
20         The message sent to the child represents an XML stanza as received from Jabber.
21
22         When the child finishes processing the request, it writes the string "available" back
23         to the parent.  Then the parent knows that it can send that child another request.
24 */
25
26 #include <signal.h>
27 #include <sys/types.h>
28 #include <sys/time.h>
29 #include <unistd.h>
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include <string.h>
33 #include <sys/select.h>
34 #include <signal.h>
35 #include <sys/wait.h>
36
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
43
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
46
47 typedef struct {
48         int max_requests;     /**< How many requests a child processes before terminating. */
49         int min_children;     /**< Minimum number of children to maintain. */
50         int max_children;     /**< Maximum number of children to maintain. */
51         int fd;               /**< Unused. */
52         int data_to_child;    /**< Unused. */
53         int data_to_parent;   /**< Unused. */
54         int current_num_children;   /**< How many children are currently on the list. */
55         int keepalive;        /**< Keepalive time for stateful sessions. */
56         char* appname;        /**< Name of the application. */
57         /** Points to a circular linked list of children. */
58         struct prefork_child_struct* first_child;
59         /** List of of child processes that aren't doing anything at the moment and are
60                 therefore available to service a new request. */
61         struct prefork_child_struct* idle_list;
62         /** List of allocated but unused prefork_children, available for reuse.  Each one is just
63                 raw memory, apart from the "next" pointer used to stitch them together.  In particular,
64                 there is no child process for them, and the file descriptors are not open. */
65         struct prefork_child_struct* free_list;
66         transport_client* connection;  /**< Connection to Jabber. */
67 } prefork_simple;
68
69 struct prefork_child_struct {
70         pid_t pid;            /**< Process ID of the child. */
71         int read_data_fd;     /**< Child uses to read request. */
72         int write_data_fd;    /**< Parent uses to write request. */
73         int read_status_fd;   /**< Parent reads to see if child is available. */
74         int write_status_fd;  /**< Child uses to notify parent when it's available again. */
75         int max_requests;     /**< How many requests a child can process before terminating. */
76         const char* appname;  /**< Name of the application. */
77         int keepalive;        /**< Keepalive time for stateful sessions. */
78         struct prefork_child_struct* next;  /**< Linkage pointer for linked list. */
79         struct prefork_child_struct* prev;  /**< Linkage pointer for linked list. */
80 };
81
82 typedef struct prefork_child_struct prefork_child;
83
84 /** Boolean.  Set to true by a signal handler when it traps SIGCHLD. */
85 static volatile sig_atomic_t child_dead;
86
87 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
88         int max_requests, int min_children, int max_children );
89 static prefork_child* launch_child( prefork_simple* forker );
90 static void prefork_launch_children( prefork_simple* forker );
91 static void prefork_run(prefork_simple* forker);
92 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
93
94 static void del_prefork_child( prefork_simple* forker, pid_t pid );
95 static void check_children( prefork_simple* forker, int forever );
96 static void prefork_child_process_request(prefork_child*, char* data);
97 static int prefork_child_init_hook(prefork_child*);
98 static prefork_child* prefork_child_init( prefork_simple* forker,
99                 int read_data_fd, int write_data_fd,
100                 int read_status_fd, int write_status_fd );
101
102 /* listens on the 'data_to_child' fd and wait for incoming data */
103 static void prefork_child_wait( prefork_child* child );
104 static void prefork_clear( prefork_simple* );
105 static void prefork_child_free( prefork_simple* forker, prefork_child* );
106 static void osrf_prefork_register_routers( const char* appname );
107 static void osrf_prefork_child_exit( prefork_child* );
108
109 static void sigchld_handler( int sig );
110
111 /**
112         @brief Spawn and manage a collection of drone processes for servicing requests.
113         @param appname Name of the application.
114         @return 0 if successful, or -1 if error.
115 */
116 int osrf_prefork_run( const char* appname ) {
117
118         if(!appname) {
119                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
120                 return -1;
121         }
122
123         set_proc_title( "OpenSRF Listener [%s]", appname );
124
125         int maxr = 1000;
126         int maxc = 10;
127         int minc = 3;
128         int kalive = 5;
129
130         osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
131
132         char* max_req      = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
133         char* min_children = osrf_settings_host_value("/apps/%s/unix_config/min_children", appname);
134         char* max_children = osrf_settings_host_value("/apps/%s/unix_config/max_children", appname);
135         char* keepalive    = osrf_settings_host_value("/apps/%s/keepalive", appname);
136
137         if(!keepalive) osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive);
138         else kalive = atoi(keepalive);
139
140         if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr);
141         else maxr = atoi(max_req);
142
143         if(!min_children) osrfLogWarning( OSRF_LOG_MARK,
144                         "Min children not defined, assuming %d", minc);
145         else minc = atoi(min_children);
146
147         if(!max_children) osrfLogWarning( OSRF_LOG_MARK,
148                         "Max children not defined, assuming %d", maxc);
149         else maxc = atoi(max_children);
150
151         free(keepalive);
152         free(max_req);
153         free(min_children);
154         free(max_children);
155         /* --------------------------------------------------- */
156
157         char* resc = va_list_to_string("%s_listener", appname);
158
159         // Make sure that we haven't already booted
160         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
161                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
162                 free(resc);
163                 return -1;
164         }
165
166         free(resc);
167
168         prefork_simple forker;
169
170         if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc ) ) {
171                 osrfLogError( OSRF_LOG_MARK,
172                                 "osrf_prefork_run() failed to create prefork_simple object" );
173                 return -1;
174         }
175
176         // Finish initializing the prefork_simple.
177         forker.appname   = strdup(appname);
178         forker.keepalive = kalive;
179
180         // Spawn the children; put them in the idle list.
181         prefork_launch_children( &forker );
182
183         // Tell the router that you're open for business.
184         osrf_prefork_register_routers(appname);
185
186         // Sit back and let the requests roll in
187         osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
188         prefork_run( &forker );
189
190         osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??");
191         prefork_clear( &forker );
192         return 0;
193 }
194
195 /**
196         @brief Register the application with a specified router.
197         @param appname Name of the application.
198         @param routerName Name of the router.
199         @param routerDomain Domain of the router.
200
201         Tell the router that you're open for business so that it can route requests to you.
202 */
203 static void osrf_prefork_send_router_registration(
204                 const char* appname, const char* routerName, const char* routerDomain ) {
205         // Get a pointer to the global transport_client
206         transport_client* client = osrfSystemGetTransportClient();
207
208         // Construct the Jabber address of the router
209         char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
210         osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
211
212         // Create the registration message, and send it
213         transport_message* msg = message_init( "registering", NULL, NULL, jid, NULL );
214         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
215         client_send_message( client, msg );
216
217         // Clean up
218         message_free( msg );
219         free(jid);
220 }
221
222 /** parses a single "complex" router configuration chunk */
223 static void osrf_prefork_parse_router_chunk(const char* appname, jsonObject* routerChunk) {
224
225         const char* routerName = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "name"));
226         const char* domain = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "domain"));
227         const jsonObject* services = jsonObjectGetKeyConst(routerChunk, "services");
228         osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s",
229                         routerName, domain);
230
231         if( services && services->type == JSON_HASH ) {
232                 osrfLogDebug(OSRF_LOG_MARK, "investigating router information...");
233                 const jsonObject* service_obj = jsonObjectGetKeyConst(services, "service");
234                 if( !service_obj )
235                         ;    // do nothing (shouldn't happen)
236                 else if( JSON_ARRAY == service_obj->type ) {
237                         int j;
238                         for(j = 0; j < service_obj->size; j++ ) {
239                                 const char* service = jsonObjectGetString(jsonObjectGetIndex(service_obj, j));
240                                 if( service && !strcmp( appname, service ))
241                                         osrf_prefork_send_router_registration(appname, routerName, domain);
242                         }
243                 }
244                 else if( JSON_STRING == service_obj->type ) {
245                         if( !strcmp(appname, jsonObjectGetString( service_obj )) )
246                                 osrf_prefork_send_router_registration(appname, routerName, domain);
247                 }
248         } else {
249                 osrf_prefork_send_router_registration(appname, routerName, domain);
250         }
251 }
252
253 static void osrf_prefork_register_routers( const char* appname ) {
254
255         jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
256
257         int i;
258         for(i = 0; i < routerInfo->size; i++) {
259                 jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
260
261                 if(routerChunk->type == JSON_STRING) {
262                         /* this accomodates simple router configs */
263                         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
264                         char* domain = osrfConfigGetValue(NULL, "/routers/router");
265                         osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s",
266                                         routerName);
267                         osrf_prefork_send_router_registration(appname, routerName, domain);
268
269                 } else {
270                         osrf_prefork_parse_router_chunk(appname, routerChunk);
271                 }
272         }
273 }
274
275 static int prefork_child_init_hook(prefork_child* child) {
276
277         if(!child) return -1;
278         osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
279
280         osrfSystemInitCache();
281         char* resc = va_list_to_string("%s_drone", child->appname);
282
283         /* if we're a source-client, tell the logger now that we're a new process*/
284         char* isclient = osrfConfigGetValue(NULL, "/client");
285         if( isclient && !strcasecmp(isclient,"true") )
286                 osrfLogSetIsClient(1);
287         free(isclient);
288
289         /* we want to remove traces of our parent's socket connection
290         * so we can have our own */
291         osrfSystemIgnoreTransportClient();
292
293         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
294                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
295                 free(resc);
296                 return -1;
297         }
298
299         free(resc);
300
301         if( ! osrfAppRunChildInit(child->appname) ) {
302                 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
303         } else {
304                 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
305                 return -1;
306         }
307
308         set_proc_title( "OpenSRF Drone [%s]", child->appname );
309         return 0;
310 }
311
312 static void prefork_child_process_request(prefork_child* child, char* data) {
313         if( !child ) return;
314
315         transport_client* client = osrfSystemGetTransportClient();
316
317         if(!client_connected(client)) {
318                 osrfSystemIgnoreTransportClient();
319                 osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
320                 if(!osrf_system_bootstrap_client(NULL, NULL)) {
321                         osrfLogError( OSRF_LOG_MARK,
322                                 "Unable to bootstrap client in prefork_child_process_request()");
323                         sleep(1);
324                         osrf_prefork_child_exit(child);
325                 }
326         }
327
328         /* construct the message from the xml */
329         transport_message* msg = new_message_from_xml( data );
330
331         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
332         if(!session) return;
333
334         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
335                 osrfAppSessionFree( session );
336                 return;
337         }
338
339         osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
340         int keepalive = child->keepalive;
341         int retval;
342         int recvd;
343         time_t start;
344         time_t end;
345
346         while(1) {
347
348                 osrfLogDebug(OSRF_LOG_MARK,
349                                 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
350                 start   = time(NULL);
351                 retval  = osrf_app_session_queue_wait(session, keepalive, &recvd);
352                 end     = time(NULL);
353
354                 osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
355
356                 if(retval) {
357                         osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
358                         break;
359                 }
360
361                 /* see if the client disconnected from us */
362                 if(session->state != OSRF_SESSION_CONNECTED)
363                         break;
364
365                 /* if no data was reveived within the timeout interval */
366                 if( !recvd && (end - start) >= keepalive ) {
367                         osrfLogInfo(OSRF_LOG_MARK,
368                                         "No request was received in %d seconds, exiting stateful session", keepalive);
369                         osrfAppSessionStatus(
370                                         session,
371                                         OSRF_STATUS_TIMEOUT,
372                                         "osrfConnectStatus",
373                                         0, "Disconnected on timeout" );
374
375                         break;
376                 }
377         }
378
379         osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
380         osrfAppSessionFree( session );
381         return;
382 }
383
384 /**
385         @brief Partially initialize a prefork_simple provided by the caller.
386         @param prefork Pointer to a a raw prefork_simple to be initialized.
387         @param client Pointer to a transport_client (connection to Jabber).
388         @param max_requests The maximum number of requests that a child process may service
389                         before terminating.
390         @param min_children Minimum number of child processes to maintain.
391         @param max_children Maximum number of child processes to maintain.
392         @return 0 if successful, or 1 if not (due to invalid parameters).
393 */
394 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
395                 int max_requests, int min_children, int max_children ) {
396
397         if( min_children > max_children ) {
398                 osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
399                                 "than max_children (%d)", min_children, max_children );
400                 return 1;
401         }
402
403         if( max_children > ABS_MAX_CHILDREN ) {
404                 osrfLogError( OSRF_LOG_MARK,  "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
405                                 max_children, ABS_MAX_CHILDREN );
406                 return 1;
407         }
408
409         osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
410                 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
411
412         /* flesh out the struct */
413         prefork->max_requests = max_requests;
414         prefork->min_children = min_children;
415         prefork->max_children = max_children;
416         prefork->fd           = 0;
417         prefork->data_to_child = 0;
418         prefork->data_to_parent = 0;
419         prefork->current_num_children = 0;
420         prefork->keepalive    = 0;
421         prefork->appname      = NULL;
422         prefork->first_child  = NULL;
423         prefork->idle_list    = NULL;
424         prefork->free_list    = NULL;
425         prefork->connection   = client;
426
427         return 0;
428 }
429
430 /**
431         @brief Spawn a new child process.
432         @param forker Pointer to the prefork_simple that will own the process.
433         @return Pointer to the new prefork_child, or not at all.
434
435         Spawn a new child process.  Create a prefork_child for it and put it in the idle list.
436
437         After forking, the parent returns a pointer to the new prefork_child.  The child
438         services its quota of requests and then terminates without returning.
439 */
440 static prefork_child* launch_child( prefork_simple* forker ) {
441
442         pid_t pid;
443         int data_fd[2];
444         int status_fd[2];
445
446         // Set up the data and status pipes
447         if( pipe(data_fd) < 0 ) { /* build the data pipe*/
448                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
449                 return NULL;
450         }
451
452         if( pipe(status_fd) < 0 ) {/* build the status pipe */
453                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
454                 close( data_fd[1] );
455                 close( data_fd[0] );
456                 return NULL;
457         }
458
459         osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
460                         data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
461
462         // Create and initialize a prefork_child for the new process
463         prefork_child* child = prefork_child_init( forker, data_fd[0],
464                         data_fd[1], status_fd[0], status_fd[1] );
465
466         if( (pid=fork()) < 0 ) {
467                 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
468                 prefork_child_free( forker, child );
469                 return NULL;
470         }
471
472         // Add the new child to the head of the idle list
473         child->next = forker->idle_list;
474         forker->idle_list = child;
475
476         if( pid > 0 ) {  /* parent */
477
478                 signal(SIGCHLD, sigchld_handler);
479                 (forker->current_num_children)++;
480                 child->pid = pid;
481
482                 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
483                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
484                         the children are currently using */
485                 return child;
486         }
487
488         else { /* child */
489
490                 osrfLogInternal( OSRF_LOG_MARK,
491                                 "I am new child with read_data_fd = %d and write_status_fd = %d",
492                                 child->read_data_fd, child->write_status_fd );
493
494                 child->pid = getpid();
495                 close( child->write_data_fd );
496                 close( child->read_status_fd );
497
498                 /* do the initing */
499                 if( prefork_child_init_hook(child) == -1 ) {
500                         osrfLogError(OSRF_LOG_MARK,
501                                 "Forker child going away because we could not connect to OpenSRF...");
502                         osrf_prefork_child_exit(child);
503                 }
504
505                 prefork_child_wait( child );      // Should exit without returning
506                 osrf_prefork_child_exit( child ); // Just to be sure
507                 return NULL;  // Unreachable, but it keeps the compiler happy
508         }
509 }
510
511 static void osrf_prefork_child_exit(prefork_child* child) {
512         osrfAppRunExitCode();
513         exit(0);
514 }
515
516 static void prefork_launch_children( prefork_simple* forker ) {
517         if(!forker) return;
518         int c = 0;
519         while( c++ < forker->min_children )
520                 launch_child( forker );
521 }
522
523
524 /**
525         @brief Signal handler for SIGCHLD: note that a child process has terminated.
526         @param sig The value of the trapped signal; always SIGCHLD.
527
528         Set a boolean to be checked later.
529 */
530 static void sigchld_handler( int sig ) {
531         signal(SIGCHLD, sigchld_handler);
532         child_dead = 1;
533 }
534
535
536 /**
537         @brief Replenish the collection of child processes, after one has terminated.
538         @param forker Pointer to the prefork_simple that manages the child processes.
539
540         This function is called when we notice (via a signal handler) that a child
541         process has died.
542
543         Spawn a new child process to replace each of the terminated ones.
544 */
545 void reap_children( prefork_simple* forker ) {
546
547         pid_t child_pid;
548
549         // Reset our boolean so that we can detect any further terminations.
550         child_dead = 0;
551
552         // Bury the children so that they won't be zombies.  WNOHANG means that waitpid() returns
553         // immediately if there are no waitable children, instead of waiting for more to die.
554         // Ignore the return code of the child.  We don't do an autopsy.
555         while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0)
556                 del_prefork_child( forker, child_pid );
557
558         /* Spawn more children as needed to maintain a minimum brood. */
559         while( forker->current_num_children < forker->min_children )
560                 launch_child( forker );
561 }
562
563 /**
564         @brief Read transport_messages and dispatch them to child processes for servicing.
565         @param forker Pointer to the prefork_simple that manages the child processes.
566
567         This is the main loop of the parent process, and once entered, does not exit.
568
569         For each usable transport_message received: look for an idle child to service it.  If
570         no idle children are available, either spawn a new one or, if we've already spawned the
571         maximum number of children, wait for one to become available.  Once a child is available
572         by whatever means, write an XML version of the input message, to a pipe designated for
573         use by that child.
574 */
575 static void prefork_run( prefork_simple* forker ) {
576
577         if( NULL == forker->idle_list )
578                 return;   // No available children, and we haven't even started yet
579
580         transport_message* cur_msg = NULL;
581
582         while(1) {
583
584                 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
585                         osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
586                         return;
587                 }
588
589                 // Wait indefinitely for an input message
590                 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
591                 cur_msg = client_recv( forker->connection, -1 );
592
593                 if( cur_msg == NULL )
594                         continue;           // Error?  Interrupted by a signal?  Try again...
595
596                 message_prepare_xml( cur_msg );
597                 const char* msg_data = cur_msg->msg_xml;
598                 if( ! msg_data || ! *msg_data ) {
599                         osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
600                                         (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
601                         message_free( cur_msg );
602                         continue;       // Message not usable; go on to the next one.
603                 }
604
605                 int honored = 0;     /* will be set to true when we service the request */
606                 int no_recheck = 0;
607
608                 while( ! honored ) {
609
610                         if(!no_recheck)
611                                 check_children( forker, 0 );
612                         no_recheck = 0;
613
614                         osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
615
616                         prefork_child* cur_child = NULL;
617
618                         // Look for an available child in the idle list.  Since the idle list operates
619                         // as a stack, the child we get is the one that was most recently active, or
620                         // most recently spawned.  That means it's the one most likely still to be in
621                         // physical memory, and the one least likely to have to be swapped in.
622                         while( forker->idle_list ) {
623
624                                 // Grab the prefork_child at the head of the idle list
625                                 cur_child = forker->idle_list;
626                                 forker->idle_list = cur_child->next;
627
628                                 osrfLogInternal( OSRF_LOG_MARK,
629                                                 "Searching for available child. cur_child->pid = %d", cur_child->pid );
630                                 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
631                                                 forker->current_num_children );
632
633                                 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
634                                 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
635                                                 cur_child->write_data_fd );
636
637                                 int written = write(cur_child->write_data_fd, msg_data, strlen(msg_data) + 1);
638                                 if( written < 0 ) {
639                                         // This child appears to be dead or unusable.  Discard it.
640                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
641                                                         errno, strerror( errno ) );
642                                         kill( cur_child->pid, SIGKILL );
643                                         del_prefork_child( forker, cur_child->pid );
644                                         continue;
645                                 }
646
647                                 add_prefork_child( forker, cur_child );  // Add it to active list
648                                 honored = 1;
649                                 break;
650                         }
651
652                         /* if none available, add a new child if we can */
653                         if( ! honored ) {
654                                 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
655
656                                 if( forker->current_num_children < forker->max_children ) {
657                                         osrfLogDebug( OSRF_LOG_MARK,  "Launching new child with current_num = %d",
658                                                         forker->current_num_children );
659
660                                         prefork_child* new_child = launch_child( forker );
661                                         if( new_child ) {
662
663                                                 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
664                                                                 new_child->write_data_fd, new_child->pid );
665
666                                                 if(write(new_child->write_data_fd, msg_data, strlen(msg_data) + 1) >= 0 ) {
667                                                         forker->first_child = new_child->next;
668                                                         add_prefork_child( forker, new_child );
669                                                         honored = 1;
670                                                 } else {
671                                                         // This child appears to be dead or unusable.  Discard it.
672                                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
673                                                                                         errno, strerror( errno ) );
674                                                         kill( cur_child->pid, SIGKILL );
675                                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
676                                                                                         errno, strerror( errno ) );
677                                                         del_prefork_child( forker, cur_child->pid );
678                                                 }
679                                         }
680
681                                 }
682                         }
683
684                         if( !honored ) {
685                                 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
686                                 check_children( forker, 1 );
687                                 // Tell the loop not to call check_children again, since we're calling it now
688                                 no_recheck = 1;
689                         }
690
691                         if( child_dead )
692                                 reap_children(forker);
693
694                 } // end while( ! honored )
695
696                 message_free( cur_msg );
697
698         } /* end top level listen loop */
699
700 }
701
702
703 /* XXX Add a flag which tells select() to wait forever on children
704         in the best case, this will be faster than calling usleep(x), and
705         in the worst case it won't be slower and will do less logging...
706 */
707 static void check_children( prefork_simple* forker, int forever ) {
708
709         if( child_dead )
710                 reap_children(forker);
711
712         if( NULL == forker->first_child ) {
713                 // If forever is true, then we're here because we've run out of idle
714                 // processes, so there should be some active ones around.  Otherwise
715                 // the children may all be idle, and that's okay.
716                 if( forever )
717                         osrfLogError( OSRF_LOG_MARK, "No active child processes to check" );
718                 return;
719         }
720
721         int select_ret;
722         fd_set read_set;
723         FD_ZERO(&read_set);
724         int max_fd = 0;
725         int n;
726
727         if( child_dead )
728                 reap_children( forker );
729
730         // Prepare to select() on pipes from all the active children
731         prefork_child* cur_child = forker->first_child;
732         do {
733                 if( cur_child->read_status_fd > max_fd )
734                         max_fd = cur_child->read_status_fd;
735                 FD_SET( cur_child->read_status_fd, &read_set );
736                 cur_child = cur_child->next;
737         } while( cur_child != forker->first_child );
738
739         FD_CLR(0,&read_set); /* just to be sure */
740
741         if( forever ) {
742                 osrfLogWarning(OSRF_LOG_MARK,
743                                 "We have no children available - waiting for one to show up...");
744
745                 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, NULL)) == -1 ) {
746                         osrfLogWarning( OSRF_LOG_MARK,  "Select returned error %d on check_children: %s",
747                                         errno, strerror( errno ) );
748                 }
749                 osrfLogInfo(OSRF_LOG_MARK,
750                                 "select() completed after waiting on children to become available");
751
752         } else {
753
754                 struct timeval tv;
755                 tv.tv_sec   = 0;
756                 tv.tv_usec  = 0;
757
758                 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
759                         osrfLogWarning( OSRF_LOG_MARK,  "Select returned error %d on check_children: %s",
760                                         errno, strerror( errno ) );
761                 }
762         }
763
764         if( select_ret == 0 )
765                 return;
766
767         /* see if any children have told us they're done */
768         cur_child = forker->first_child;
769         int j;
770         int num_handled = 0;
771         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
772
773                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
774                         osrfLogDebug( OSRF_LOG_MARK,
775                                         "Server received status from a child %d", cur_child->pid );
776
777                         num_handled++;
778
779                         /* now suck off the data */
780                         char buf[64];
781                         if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
782                                 osrfLogWarning( OSRF_LOG_MARK,
783                                                 "Read error after select in child status read with errno %d: %s",
784                                                 errno, strerror( errno ) );
785                         }
786                         else {
787                                 buf[n] = '\0';
788                                 osrfLogDebug( OSRF_LOG_MARK,  "Read %d bytes from status buffer: %s", n, buf );
789                         }
790                         // Remove the child from the active list
791                         if( forker->first_child == cur_child ) {
792                                 if( cur_child->next == cur_child )
793                                         forker->first_child = NULL;   // only child in the active list
794                                 else {
795                                         forker->first_child = cur_child->next;
796                                 }
797                                 cur_child->next->prev = cur_child->prev;
798                                 cur_child->prev->next = cur_child->next;
799                         }
800
801                         // Add it to the idle list
802                         cur_child->prev = NULL;
803                         cur_child->next = forker->idle_list;
804                         forker->idle_list = cur_child;
805                 }
806                 cur_child = cur_child->next;
807         }
808 }
809
810
811 static void prefork_child_wait( prefork_child* child ) {
812
813         int i,n;
814         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
815         char buf[READ_BUFSIZE];
816
817         for( i = 0; i < child->max_requests; i++ ) {
818
819                 n = -1;
820                 int gotdata = 0;    // boolean; set to true if we get data
821                 clr_fl(child->read_data_fd, O_NONBLOCK );
822
823                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
824                         buf[n] = '\0';
825                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
826                         if(!gotdata) {
827                                 set_fl(child->read_data_fd, O_NONBLOCK );
828                                 gotdata = 1;
829                         }
830                         buffer_add( gbuf, buf );
831                 }
832
833                 if( errno == EAGAIN ) n = 0;
834
835                 if( errno == EPIPE ) {
836                         osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
837                         break;
838                 }
839
840                 if( n < 0 ) {
841                         osrfLogWarning( OSRF_LOG_MARK,
842                                         "Prefork child read returned error with errno %d", errno );
843                         break;
844
845                 } else if( gotdata ) {
846                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
847                         prefork_child_process_request(child, gbuf->buf);
848                         buffer_reset( gbuf );
849                 }
850
851                 if( i < child->max_requests - 1 )
852                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
853         }
854
855         buffer_free(gbuf);
856
857         osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
858                         child->max_requests, i, (long) getpid() );
859
860         osrf_prefork_child_exit(child);
861 }
862
863 /**
864         @brief Add a prefork_child to the end of the active list.
865         @param forker Pointer to the prefork_simple that owns the list.
866         @param child Pointer to the prefork_child to be added.
867 */
868 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
869
870         if( forker->first_child == NULL ) {
871                 // Simplest case: list is initially empty.
872                 forker->first_child = child;
873                 child->next = child;
874                 child->prev = child;
875         } else {
876                 // Find the last node in the circular list.
877                 prefork_child* last_child = forker->first_child->prev;
878
879                 // Insert the new child between the last and first children.
880                 last_child->next = child;
881                 child->prev      = last_child;
882                 child->next      = forker->first_child;
883                 forker->first_child->prev = child;
884         }
885 }
886
887 /**
888         @brief Remove a prefork_child, representing a terminated child, from the active list.
889         @param forker Pointer to the prefork_simple that owns the child.
890         @param pid Process ID of the child to be removed.
891
892         Remove the node from the active list, close its file descriptors, and put it in the
893         free list for potential reuse.
894 */
895 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
896
897         if( forker->first_child == NULL )
898                 return;  // Empty list; bail out.
899
900         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
901
902         // Find the node in question
903         prefork_child* cur_child = forker->first_child; /* current pointer */
904         while( cur_child->pid != pid && cur_child->next != forker->first_child )
905                 cur_child = cur_child->next;
906
907         if( cur_child->pid == pid ) {
908                 // We found the right node.  Remove it from the list.
909                 if( cur_child->next == cur_child )
910                         forker->first_child = NULL;    // only child in the list
911                 else {
912                         if( forker->first_child == cur_child )
913                                 forker->first_child = cur_child->next;  // Reseat forker->first_child
914
915                         // Stitch the nodes on either side together
916                         cur_child->prev->next = cur_child->next;
917                         cur_child->next->prev = cur_child->prev;
918                 }
919                 --forker->current_num_children;
920
921                 //Destroy the node
922                 prefork_child_free( forker, cur_child );
923
924         } else {
925                 // Maybe it's in the idle list.  This can happen if, for example,
926                 // a child is killed by a signal while it's between requests.
927
928                 prefork_child* prev = NULL;
929                 cur_child = forker->idle_list;
930                 while( cur_child && cur_child->pid != pid ) {
931                         prev = cur_child;
932                         cur_child = cur_child->next;
933                 }
934
935                 if( cur_child ) {
936                         // Detach from the list
937                         if( prev )
938                                 prev->next = cur_child->next;
939                         else
940                                 forker->idle_list = cur_child->next;
941
942                         --forker->current_num_children;
943
944                         //Destroy the node
945                         prefork_child_free( forker, cur_child );
946                 } // else we can't find it, so do nothing.
947         }
948 }
949
950 /**
951         @brief Create and initialize a prefork_child.
952         @param forker Pointer to the prefork_simple that will own the prefork_child.
953         @param read_data_fd Used by child to read request from parent.
954         @param write_data_fd Used by parent to write request to child.
955         @param read_status_fd Used by parent to read status from child.
956         @param write_status_fd Used by child to write status to parent.
957         @return Pointer to the newly created prefork_child.
958
959         The calling code is responsible for freeing the prefork_child by calling
960         prefork_child_free().
961 */
962 static prefork_child* prefork_child_init( prefork_simple* forker,
963         int read_data_fd, int write_data_fd,
964         int read_status_fd, int write_status_fd ) {
965
966         // Allocate a prefork_child -- from the free list if possible, or from
967         // the heap if necessary.  The free list is a non-circular, singly-linked list.
968         prefork_child* child;
969         if( forker->free_list ) {
970                 child = forker->free_list;
971                 forker->free_list = child->next;
972         } else
973                 child = safe_malloc(sizeof(prefork_child));
974
975         child->pid              = 0;
976         child->read_data_fd     = read_data_fd;
977         child->write_data_fd    = write_data_fd;
978         child->read_status_fd   = read_status_fd;
979         child->write_status_fd  = write_status_fd;
980         child->max_requests     = forker->max_requests;
981         child->appname          = forker->appname;  // We don't make a separate copy
982         child->keepalive        = forker->keepalive;
983         child->next             = NULL;
984         child->prev             = NULL;
985
986         return child;
987 }
988
989 /**
990         @brief Terminate all child processes and clear out a prefork_simple.
991         @param prefork Pointer to the prefork_simple to be cleared out.
992
993         We do not deallocate the prefork_simple itself, just its contents.
994 */
995 static void prefork_clear( prefork_simple* prefork ) {
996
997         // Kill all the active children, and move their prefork_child nodes to the free list.
998         while( prefork->first_child ) {
999                 kill( prefork->first_child->pid, SIGKILL );
1000                 del_prefork_child( prefork, prefork->first_child->pid );
1001         }
1002
1003         // Kill all the idle prefork children, close their file
1004         // descriptors, and move them to the free list.
1005         prefork_child* child = prefork->idle_list;
1006         prefork->idle_list = NULL;
1007         while( child ) {
1008                 prefork_child* temp = child->next;
1009                 kill( child->pid, SIGKILL );
1010                 prefork_child_free( prefork, child );
1011                 child = temp;
1012         }
1013         prefork->current_num_children = 0;
1014
1015         // Physically free the free list of prefork_children.
1016         child = prefork->free_list;
1017         prefork->free_list = NULL;
1018         while( child ) {
1019                 prefork_child* temp = child->next;
1020                 free( child );
1021                 child = temp;
1022         }
1023
1024         // Close the Jabber connection
1025         client_free( prefork->connection );
1026         prefork->connection = NULL;
1027
1028         // After giving the child processes a second to terminate, wait on them so that they
1029         // don't become zombies.  We don't wait indefinitely, so it's possible that some
1030         // children will survive a bit longer.
1031         sleep( 1 );
1032         while( (waitpid(-1, NULL, WNOHANG)) > 0) {
1033                 ;   // Another one died...go around again
1034         }
1035
1036         free(prefork->appname);
1037         prefork->appname = NULL;
1038 }
1039
1040 /**
1041         @brief Destroy and deallocate a prefork_child.
1042         @param forker Pointer to the prefork_simple that owns the prefork_child.
1043         @param child Pointer to the prefork_child to be destroyed.
1044 */
1045 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1046         close( child->read_data_fd );
1047         close( child->write_data_fd );
1048         close( child->read_status_fd );
1049         close( child->write_status_fd );
1050
1051         // Stick the prefork_child in a free list for potential reuse.  This is a
1052         // non-circular, singly linked list.
1053         child->prev = NULL;
1054         child->next = forker->free_list;
1055         forker->free_list = child;
1056 }