]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/osrf_prefork.c
69ccc07806829313387e51bff1e27c4dc56a0df2
[OpenSRF.git] / src / libopensrf / osrf_prefork.c
1 /**
2         @file osrf_prefork.c
3         @brief Spawn and manage a collection of child process to service requests.
4
5         Spawn a collection of child processes, replacing them as needed.  Forward requests to them
6         and let the children do the work.
7
8         Each child processes some maximum number of requests before it terminates itself.  When a
9         child dies, either deliberately or otherwise, we can spawn another one to replace it,
10         keeping the number of children within a predefined range.
11
12         Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13         a request, and who are still working on them.  Use a separate linear linked list to keep
14         track of children that are currently idle.  Move them back and forth as needed.
15
16         For each child, set up two pipes:
17         - One for the parent to send requests to the child.
18         - One for the child to notify the parent that it is available for another request.
19
20         The message sent to the child represents an XML stanza as received from Jabber.
21
22         When the child finishes processing the request, it writes the string "available" back
23         to the parent.  Then the parent knows that it can send that child another request.
24 */
25
26 #include <errno.h>
27 #include <signal.h>
28 #include <sys/types.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #include <stdlib.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <sys/select.h>
35 #include <sys/wait.h>
36
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
43
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
46
47 typedef struct {
48         int max_requests;     /**< How many requests a child processes before terminating. */
49         int min_children;     /**< Minimum number of children to maintain. */
50         int max_children;     /**< Maximum number of children to maintain. */
51         int fd;               /**< Unused. */
52         int data_to_child;    /**< Unused. */
53         int data_to_parent;   /**< Unused. */
54         int current_num_children;   /**< How many children are currently on the list. */
55         int keepalive;        /**< Keepalive time for stateful sessions. */
56         char* appname;        /**< Name of the application. */
57         /** Points to a circular linked list of children. */
58         struct prefork_child_struct* first_child;
59         /** List of of child processes that aren't doing anything at the moment and are
60                 therefore available to service a new request. */
61         struct prefork_child_struct* idle_list;
62         /** List of allocated but unused prefork_children, available for reuse.  Each one is just
63                 raw memory, apart from the "next" pointer used to stitch them together.  In particular,
64                 there is no child process for them, and the file descriptors are not open. */
65         struct prefork_child_struct* free_list;
66         transport_client* connection;  /**< Connection to Jabber. */
67 } prefork_simple;
68
69 struct prefork_child_struct {
70         pid_t pid;            /**< Process ID of the child. */
71         int read_data_fd;     /**< Child uses to read request. */
72         int write_data_fd;    /**< Parent uses to write request. */
73         int read_status_fd;   /**< Parent reads to see if child is available. */
74         int write_status_fd;  /**< Child uses to notify parent when it's available again. */
75         int max_requests;     /**< How many requests a child can process before terminating. */
76         const char* appname;  /**< Name of the application. */
77         int keepalive;        /**< Keepalive time for stateful sessions. */
78         struct prefork_child_struct* next;  /**< Linkage pointer for linked list. */
79         struct prefork_child_struct* prev;  /**< Linkage pointer for linked list. */
80 };
81
82 typedef struct prefork_child_struct prefork_child;
83
84 /** Boolean.  Set to true by a signal handler when it traps SIGCHLD. */
85 static volatile sig_atomic_t child_dead;
86
87 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
88         int max_requests, int min_children, int max_children );
89 static prefork_child* launch_child( prefork_simple* forker );
90 static void prefork_launch_children( prefork_simple* forker );
91 static void prefork_run(prefork_simple* forker);
92 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
93
94 static void del_prefork_child( prefork_simple* forker, pid_t pid );
95 static void check_children( prefork_simple* forker, int forever );
96 static int  prefork_child_process_request(prefork_child*, char* data);
97 static int prefork_child_init_hook(prefork_child*);
98 static prefork_child* prefork_child_init( prefork_simple* forker,
99                 int read_data_fd, int write_data_fd,
100                 int read_status_fd, int write_status_fd );
101
102 /* listens on the 'data_to_child' fd and wait for incoming data */
103 static void prefork_child_wait( prefork_child* child );
104 static void prefork_clear( prefork_simple* );
105 static void prefork_child_free( prefork_simple* forker, prefork_child* );
106 static void osrf_prefork_register_routers( const char* appname );
107 static void osrf_prefork_child_exit( prefork_child* );
108
109 static void sigchld_handler( int sig );
110
111 /**
112         @brief Spawn and manage a collection of drone processes for servicing requests.
113         @param appname Name of the application.
114         @return 0 if successful, or -1 if error.
115 */
116 int osrf_prefork_run( const char* appname ) {
117
118         if(!appname) {
119                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
120                 return -1;
121         }
122
123         set_proc_title( "OpenSRF Listener [%s]", appname );
124
125         int maxr = 1000;
126         int maxc = 10;
127         int minc = 3;
128         int kalive = 5;
129
130         osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
131
132         char* max_req      = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
133         char* min_children = osrf_settings_host_value("/apps/%s/unix_config/min_children", appname);
134         char* max_children = osrf_settings_host_value("/apps/%s/unix_config/max_children", appname);
135         char* keepalive    = osrf_settings_host_value("/apps/%s/keepalive", appname);
136
137         if(!keepalive) osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive);
138         else kalive = atoi(keepalive);
139
140         if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr);
141         else maxr = atoi(max_req);
142
143         if(!min_children) osrfLogWarning( OSRF_LOG_MARK,
144                         "Min children not defined, assuming %d", minc);
145         else minc = atoi(min_children);
146
147         if(!max_children) osrfLogWarning( OSRF_LOG_MARK,
148                         "Max children not defined, assuming %d", maxc);
149         else maxc = atoi(max_children);
150
151         free(keepalive);
152         free(max_req);
153         free(min_children);
154         free(max_children);
155         /* --------------------------------------------------- */
156
157         char* resc = va_list_to_string("%s_listener", appname);
158
159         // Make sure that we haven't already booted
160         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
161                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
162                 free(resc);
163                 return -1;
164         }
165
166         free(resc);
167
168         prefork_simple forker;
169
170         if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc ) ) {
171                 osrfLogError( OSRF_LOG_MARK,
172                                 "osrf_prefork_run() failed to create prefork_simple object" );
173                 return -1;
174         }
175
176         // Finish initializing the prefork_simple.
177         forker.appname   = strdup(appname);
178         forker.keepalive = kalive;
179
180         // Spawn the children; put them in the idle list.
181         prefork_launch_children( &forker );
182
183         // Tell the router that you're open for business.
184         osrf_prefork_register_routers(appname);
185
186         // Sit back and let the requests roll in
187         osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
188         prefork_run( &forker );
189
190         osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??");
191         prefork_clear( &forker );
192         return 0;
193 }
194
195 /**
196         @brief Register the application with a specified router.
197         @param appname Name of the application.
198         @param routerName Name of the router.
199         @param routerDomain Domain of the router.
200
201         Tell the router that you're open for business so that it can route requests to you.
202
203         Called only by the parent process.
204 */
205 static void osrf_prefork_send_router_registration(
206                 const char* appname, const char* routerName, const char* routerDomain ) {
207         // Get a pointer to the global transport_client
208         transport_client* client = osrfSystemGetTransportClient();
209
210         // Construct the Jabber address of the router
211         char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
212         osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
213
214         // Create the registration message, and send it
215         transport_message* msg = message_init( "registering", NULL, NULL, jid, NULL );
216         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
217         client_send_message( client, msg );
218
219         // Clean up
220         message_free( msg );
221         free(jid);
222 }
223
224 /* parses a single "complex" router configuration chunk */
225 // Called only by the parent process
226 static void osrf_prefork_parse_router_chunk(const char* appname, const jsonObject* routerChunk) {
227
228         const char* routerName = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "name"));
229         const char* domain = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "domain"));
230         const jsonObject* services = jsonObjectGetKeyConst(routerChunk, "services");
231         osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s",
232                         routerName, domain);
233
234         if( services && services->type == JSON_HASH ) {
235                 osrfLogDebug(OSRF_LOG_MARK, "investigating router information...");
236                 const jsonObject* service_obj = jsonObjectGetKeyConst(services, "service");
237                 if( !service_obj )
238                         ;    // do nothing (shouldn't happen)
239                 else if( JSON_ARRAY == service_obj->type ) {
240                         int j;
241                         for(j = 0; j < service_obj->size; j++ ) {
242                                 const char* service = jsonObjectGetString(jsonObjectGetIndex(service_obj, j));
243                                 if( service && !strcmp( appname, service ))
244                                         osrf_prefork_send_router_registration(appname, routerName, domain);
245                         }
246                 }
247                 else if( JSON_STRING == service_obj->type ) {
248                         if( !strcmp(appname, jsonObjectGetString( service_obj )) )
249                                 osrf_prefork_send_router_registration(appname, routerName, domain);
250                 }
251         } else {
252                 osrf_prefork_send_router_registration(appname, routerName, domain);
253         }
254 }
255
256 /**
257         @brief Register the application with one or more routers, according to the configuration.
258         @param appname Name of the application.
259
260         Called only by the parent process.
261 */
262 static void osrf_prefork_register_routers( const char* appname ) {
263
264         jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
265
266         int i;
267         for(i = 0; i < routerInfo->size; i++) {
268                 const jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
269
270                 if(routerChunk->type == JSON_STRING) {
271                         /* this accomodates simple router configs */
272                         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
273                         char* domain = osrfConfigGetValue(NULL, "/routers/router");
274                         osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s",
275                                         routerName);
276                         osrf_prefork_send_router_registration(appname, routerName, domain);
277
278                         free( routerName );
279                         free( domain );
280                 } else {
281                         osrf_prefork_parse_router_chunk(appname, routerChunk);
282                 }
283         }
284
285         jsonObjectFree( routerInfo );
286 }
287
288 /**
289         @brief Initialize a child process.
290         @param child Pointer to the prefork_child representing the new child process.
291         @return Zero if successful, or -1 if not.
292
293         Called only by child processes.  Actions:
294         - Connect to one or more cache servers
295         - Reconfigure logger, if necessary
296         - Discard parent's Jabber connection and open a new one
297         - Dynamically call an application-specific initialization routine
298         - Change the command line as reported by ps
299 */
300 static int prefork_child_init_hook(prefork_child* child) {
301
302         if(!child) return -1;
303         osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
304
305         // Connect to cache server(s).
306         osrfSystemInitCache();
307         char* resc = va_list_to_string("%s_drone", child->appname);
308
309         // If we're a source-client, tell the logger now that we're a new process.
310         char* isclient = osrfConfigGetValue(NULL, "/client");
311         if( isclient && !strcasecmp(isclient,"true") )
312                 osrfLogSetIsClient(1);
313         free(isclient);
314
315         // Remove traces of our parent's socket connection so we can have our own.
316         osrfSystemIgnoreTransportClient();
317
318         // Connect to Jabber
319         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
320                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
321                 free(resc);
322                 return -1;
323         }
324
325         free(resc);
326
327         // Dynamically call the application-specific initialization function
328         // from a previously loaded shared library.
329         if( ! osrfAppRunChildInit(child->appname) ) {
330                 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
331         } else {
332                 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
333                 return -1;
334         }
335
336         // Change the command line as reported by ps
337         set_proc_title( "OpenSRF Drone [%s]", child->appname );
338         return 0;
339 }
340
341 // Called only by a child process
342 // Non-zero return code means that the child process has decided to terminate immediately,
343 // without waiting for a DISCONNECT or max_requests.
344 static int prefork_child_process_request(prefork_child* child, char* data) {
345         if( !child ) return 0;
346
347         transport_client* client = osrfSystemGetTransportClient();
348
349         if(!client_connected(client)) {
350                 osrfSystemIgnoreTransportClient();
351                 osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
352                 if(!osrf_system_bootstrap_client(NULL, NULL)) {
353                         osrfLogError( OSRF_LOG_MARK,
354                                 "Unable to bootstrap client in prefork_child_process_request()");
355                         sleep(1);
356                         osrf_prefork_child_exit(child);
357                 }
358         }
359
360         /* construct the message from the xml */
361         transport_message* msg = new_message_from_xml( data );
362
363         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
364         if(!session) return 0;
365
366         int rc = session->panic;
367
368         if( rc ) {
369                 osrfLogWarning( OSRF_LOG_MARK,
370                         "Drone for session %s terminating immediately", session->session_id );
371                 osrfAppSessionFree( session );
372                 return rc;
373         }
374
375         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
376                 osrfAppSessionFree( session );
377                 return rc;
378         }
379
380         osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
381         int keepalive = child->keepalive;
382         int retval;
383         int recvd;
384         time_t start;
385         time_t end;
386
387         while(1) {
388
389                 osrfLogDebug(OSRF_LOG_MARK,
390                                 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
391                 start   = time(NULL);
392                 retval  = osrf_app_session_queue_wait(session, keepalive, &recvd);
393                 end     = time(NULL);
394
395                 osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
396
397                 if( session->panic )
398                         rc = 1;
399
400                 if(retval) {
401                         osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
402                         break;
403                 }
404
405                 /* see if the client disconnected from us */
406                 if(session->state != OSRF_SESSION_CONNECTED)
407                         break;
408
409                 /* if no data was reveived within the timeout interval */
410                 if( !recvd && (end - start) >= keepalive ) {
411                         osrfLogInfo(OSRF_LOG_MARK,
412                                         "No request was received in %d seconds, exiting stateful session", keepalive);
413                         osrfAppSessionStatus(
414                                         session,
415                                         OSRF_STATUS_TIMEOUT,
416                                         "osrfConnectStatus",
417                                         0, "Disconnected on timeout" );
418
419                         break;
420                 }
421
422                 // If the child process has decided to terminate immediately
423                 if( rc )
424                         break;
425         }
426
427         osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
428         osrfAppSessionFree( session );
429         return rc;
430 }
431
432 /**
433         @brief Partially initialize a prefork_simple provided by the caller.
434         @param prefork Pointer to a a raw prefork_simple to be initialized.
435         @param client Pointer to a transport_client (connection to Jabber).
436         @param max_requests The maximum number of requests that a child process may service
437                         before terminating.
438         @param min_children Minimum number of child processes to maintain.
439         @param max_children Maximum number of child processes to maintain.
440         @return 0 if successful, or 1 if not (due to invalid parameters).
441 */
442 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
443                 int max_requests, int min_children, int max_children ) {
444
445         if( min_children > max_children ) {
446                 osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
447                                 "than max_children (%d)", min_children, max_children );
448                 return 1;
449         }
450
451         if( max_children > ABS_MAX_CHILDREN ) {
452                 osrfLogError( OSRF_LOG_MARK,  "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
453                                 max_children, ABS_MAX_CHILDREN );
454                 return 1;
455         }
456
457         osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
458                 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
459
460         /* flesh out the struct */
461         prefork->max_requests = max_requests;
462         prefork->min_children = min_children;
463         prefork->max_children = max_children;
464         prefork->fd           = 0;
465         prefork->data_to_child = 0;
466         prefork->data_to_parent = 0;
467         prefork->current_num_children = 0;
468         prefork->keepalive    = 0;
469         prefork->appname      = NULL;
470         prefork->first_child  = NULL;
471         prefork->idle_list    = NULL;
472         prefork->free_list    = NULL;
473         prefork->connection   = client;
474
475         return 0;
476 }
477
478 /**
479         @brief Spawn a new child process and put it in the idle list.
480         @param forker Pointer to the prefork_simple that will own the process.
481         @return Pointer to the new prefork_child, or not at all.
482
483         Spawn a new child process.  Create a prefork_child for it and put it in the idle list.
484
485         After forking, the parent returns a pointer to the new prefork_child.  The child
486         services its quota of requests and then terminates without returning.
487 */
488 static prefork_child* launch_child( prefork_simple* forker ) {
489
490         pid_t pid;
491         int data_fd[2];
492         int status_fd[2];
493
494         // Set up the data and status pipes
495         if( pipe(data_fd) < 0 ) { /* build the data pipe*/
496                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
497                 return NULL;
498         }
499
500         if( pipe(status_fd) < 0 ) {/* build the status pipe */
501                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
502                 close( data_fd[1] );
503                 close( data_fd[0] );
504                 return NULL;
505         }
506
507         osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
508                         data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
509
510         // Create and initialize a prefork_child for the new process
511         prefork_child* child = prefork_child_init( forker, data_fd[0],
512                         data_fd[1], status_fd[0], status_fd[1] );
513
514         if( (pid=fork()) < 0 ) {
515                 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
516                 prefork_child_free( forker, child );
517                 return NULL;
518         }
519
520         // Add the new child to the head of the idle list
521         child->next = forker->idle_list;
522         forker->idle_list = child;
523
524         if( pid > 0 ) {  /* parent */
525
526                 signal(SIGCHLD, sigchld_handler);
527                 (forker->current_num_children)++;
528                 child->pid = pid;
529
530                 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
531                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
532                         the children are currently using */
533                 return child;
534         }
535
536         else { /* child */
537
538                 osrfLogInternal( OSRF_LOG_MARK,
539                                 "I am new child with read_data_fd = %d and write_status_fd = %d",
540                                 child->read_data_fd, child->write_status_fd );
541
542                 child->pid = getpid();
543                 close( child->write_data_fd );
544                 close( child->read_status_fd );
545
546                 /* do the initing */
547                 if( prefork_child_init_hook(child) == -1 ) {
548                         osrfLogError(OSRF_LOG_MARK,
549                                 "Forker child going away because we could not connect to OpenSRF...");
550                         osrf_prefork_child_exit(child);
551                 }
552
553                 prefork_child_wait( child );      // Should exit without returning
554                 osrf_prefork_child_exit( child ); // Just to be sure
555                 return NULL;  // Unreachable, but it keeps the compiler happy
556         }
557 }
558
559 /**
560         @brief Terminate a child process.
561         @param child Pointer to the prefork_child representing the child process.
562
563         Called only by child processes.  Dynamically call an application-specific shutdown
564         function from a previously loaded shared library; then exit.
565 */
566 static void osrf_prefork_child_exit(prefork_child* child) {
567         osrfAppRunExitCode();
568         exit(0);
569 }
570
571 /**
572         @brief Launch all the child processes, putting them in the idle list.
573         @param forker Pointer to the prefork_simple that will own the children.
574
575         Called only by the parent process (in order to become a parent).
576 */
577 static void prefork_launch_children( prefork_simple* forker ) {
578         if(!forker) return;
579         int c = 0;
580         while( c++ < forker->min_children )
581                 launch_child( forker );
582 }
583
584 /**
585         @brief Signal handler for SIGCHLD: note that a child process has terminated.
586         @param sig The value of the trapped signal; always SIGCHLD.
587
588         Set a boolean to be checked later.
589 */
590 static void sigchld_handler( int sig ) {
591         signal(SIGCHLD, sigchld_handler);
592         child_dead = 1;
593 }
594
595 /**
596         @brief Replenish the collection of child processes, after one has terminated.
597         @param forker Pointer to the prefork_simple that manages the child processes.
598
599         The parent calls this function when it notices (via a signal handler) that
600         a child process has died.
601
602         Wait on the dead children so that they won't be zombies.  Spawn new ones as needed
603         to maintain at least a minimum number.
604 */
605 void reap_children( prefork_simple* forker ) {
606
607         pid_t child_pid;
608
609         // Reset our boolean so that we can detect any further terminations.
610         child_dead = 0;
611
612         // Bury the children so that they won't be zombies.  WNOHANG means that waitpid() returns
613         // immediately if there are no waitable children, instead of waiting for more to die.
614         // Ignore the return code of the child.  We don't do an autopsy.
615         while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0) {
616                 --forker->current_num_children;
617                 del_prefork_child( forker, child_pid );
618         }
619
620         // Spawn more children as needed.
621         while( forker->current_num_children < forker->min_children )
622                 launch_child( forker );
623 }
624
625 /**
626         @brief Read transport_messages and dispatch them to child processes for servicing.
627         @param forker Pointer to the prefork_simple that manages the child processes.
628
629         This is the main loop of the parent process, and once entered, does not exit.
630
631         For each usable transport_message received: look for an idle child to service it.  If
632         no idle children are available, either spawn a new one or, if we've already spawned the
633         maximum number of children, wait for one to become available.  Once a child is available
634         by whatever means, write an XML version of the input message, to a pipe designated for
635         use by that child.
636 */
637 static void prefork_run( prefork_simple* forker ) {
638
639         if( NULL == forker->idle_list )
640                 return;   // No available children, and we haven't even started yet
641
642         transport_message* cur_msg = NULL;
643
644         while(1) {
645
646                 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
647                         osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
648                         return;
649                 }
650
651                 // Wait indefinitely for an input message
652                 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
653                 cur_msg = client_recv( forker->connection, -1 );
654
655                 if( cur_msg == NULL )
656                         continue;           // Error?  Interrupted by a signal?  Try again...
657
658                 message_prepare_xml( cur_msg );
659                 const char* msg_data = cur_msg->msg_xml;
660                 if( ! msg_data || ! *msg_data ) {
661                         osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
662                                         (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
663                         message_free( cur_msg );
664                         continue;       // Message not usable; go on to the next one.
665                 }
666
667                 int honored = 0;     /* will be set to true when we service the request */
668                 int no_recheck = 0;
669
670                 while( ! honored ) {
671
672                         if(!no_recheck)
673                                 check_children( forker, 0 );
674                         no_recheck = 0;
675
676                         osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
677
678                         prefork_child* cur_child = NULL;
679
680                         // Look for an available child in the idle list.  Since the idle list operates
681                         // as a stack, the child we get is the one that was most recently active, or
682                         // most recently spawned.  That means it's the one most likely still to be in
683                         // physical memory, and the one least likely to have to be swapped in.
684                         while( forker->idle_list ) {
685
686                                 osrfLogInfo( OSRF_LOG_MARK, "Looking for idle child" );
687                                 // Grab the prefork_child at the head of the idle list
688                                 cur_child = forker->idle_list;
689                                 forker->idle_list = cur_child->next;
690                                 cur_child->next = NULL;
691
692                                 osrfLogInternal( OSRF_LOG_MARK,
693                                                 "Searching for available child. cur_child->pid = %d", cur_child->pid );
694                                 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
695                                                 forker->current_num_children );
696
697                                 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
698                                 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
699                                                 cur_child->write_data_fd );
700
701                                 int written = write(cur_child->write_data_fd, msg_data, strlen(msg_data) + 1);
702                                 if( written < 0 ) {
703                                         // This child appears to be dead or unusable.  Discard it.
704                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
705                                                         errno, strerror( errno ) );
706                                         kill( cur_child->pid, SIGKILL );
707                                         del_prefork_child( forker, cur_child->pid );
708                                         continue;
709                                 }
710
711                                 add_prefork_child( forker, cur_child );  // Add it to active list
712                                 honored = 1;
713                                 break;
714                         }
715
716                         /* if none available, add a new child if we can */
717                         if( ! honored ) {
718                                 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
719
720                                 if( forker->current_num_children < forker->max_children ) {
721                                         osrfLogDebug( OSRF_LOG_MARK,  "Launching new child with current_num = %d",
722                                                         forker->current_num_children );
723
724                                         launch_child( forker );  // Put a new child into the idle list
725                                         if( forker->idle_list ) {
726
727                                                 // Take the new child from the idle list
728                                                 prefork_child* new_child = forker->idle_list;
729                                                 forker->idle_list = new_child->next;
730                                                 new_child->next = NULL;
731
732                                                 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
733                                                                 new_child->write_data_fd, new_child->pid );
734
735                                                 int written = write(
736                                                                 new_child->write_data_fd, msg_data, strlen(msg_data) + 1);
737                                                 if( written < 0 ) {
738                                                         // This child appears to be dead or unusable.  Discard it.
739                                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
740                                                                                         errno, strerror( errno ) );
741                                                         kill( cur_child->pid, SIGKILL );
742                                                         del_prefork_child( forker, cur_child->pid );
743                                                 } else {
744                                                         add_prefork_child( forker, new_child );
745                                                         honored = 1;
746                                                 }
747                                         }
748
749                                 }
750                         }
751
752                         if( !honored ) {
753                                 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
754                                 check_children( forker, 1 );
755                                 // Tell the loop not to call check_children again, since we're calling it now
756                                 no_recheck = 1;
757                         }
758
759                         if( child_dead )
760                                 reap_children(forker);
761
762                 } // end while( ! honored )
763
764                 message_free( cur_msg );
765
766         } /* end top level listen loop */
767 }
768
769
770 /* XXX Add a flag which tells select() to wait forever on children
771         in the best case, this will be faster than calling usleep(x), and
772         in the worst case it won't be slower and will do less logging...
773 */
774 /**
775         @brief See if any children have become available.
776         @param forker Pointer to the prefork_simple that owns the children.
777         @param forever Boolean: true if we should wait indefinitely.
778         @return 
779
780         Call select() for all the children in the active list.  Read each active file
781         descriptor and move the corresponding child to the idle list.
782
783         If @a forever is true, wait indefinitely for input.  Otherwise return immediately if
784         there are no active file descriptors.
785 */
786 static void check_children( prefork_simple* forker, int forever ) {
787
788         if( child_dead )
789                 reap_children( forker );
790
791         if( NULL == forker->first_child ) {
792                 // If forever is true, then we're here because we've run out of idle
793                 // processes, so there should be some active ones around.
794                 // If forever is false, then the children may all be idle, and that's okay.
795                 if( forever )
796                         osrfLogError( OSRF_LOG_MARK, "No active child processes to check" );
797                 return;
798         }
799
800         int select_ret;
801         fd_set read_set;
802         FD_ZERO(&read_set);
803         int max_fd = 0;
804         int n;
805
806         // Prepare to select() on pipes from all the active children
807         prefork_child* cur_child = forker->first_child;
808         do {
809                 if( cur_child->read_status_fd > max_fd )
810                         max_fd = cur_child->read_status_fd;
811                 FD_SET( cur_child->read_status_fd, &read_set );
812                 cur_child = cur_child->next;
813         } while( cur_child != forker->first_child );
814
815         FD_CLR(0,&read_set); /* just to be sure */
816
817         if( forever ) {
818                 osrfLogWarning(OSRF_LOG_MARK,
819                                 "We have no children available - waiting for one to show up...");
820
821                 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
822                         osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
823                                         errno, strerror( errno ) );
824                 }
825                 osrfLogInfo(OSRF_LOG_MARK,
826                                 "select() completed after waiting on children to become available");
827
828         } else {
829
830                 struct timeval tv;
831                 tv.tv_sec   = 0;
832                 tv.tv_usec  = 0;
833
834                 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
835                         osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
836                                         errno, strerror( errno ) );
837                 }
838         }
839
840         if( select_ret == 0 )
841                 return;
842
843         // Check each child in the active list.
844         // If it has responded, move it to the idle list.
845         cur_child = forker->first_child;
846         prefork_child* next_child = NULL;
847         int num_handled = 0;
848         do {
849                 next_child = cur_child->next;
850                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
851                         osrfLogDebug( OSRF_LOG_MARK,
852                                         "Server received status from a child %d", cur_child->pid );
853
854                         num_handled++;
855
856                         /* now suck off the data */
857                         char buf[64];
858                         if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
859                                 osrfLogWarning( OSRF_LOG_MARK,
860                                                 "Read error after select in child status read with errno %d: %s",
861                                                 errno, strerror( errno ) );
862                         }
863                         else {
864                                 buf[n] = '\0';
865                                 osrfLogDebug( OSRF_LOG_MARK,  "Read %d bytes from status buffer: %s", n, buf );
866                         }
867
868                         // Remove the child from the active list
869                         if( forker->first_child == cur_child ) {
870                                 if( cur_child->next == cur_child )
871                                         forker->first_child = NULL;   // only child in the active list
872                                 else
873                                         forker->first_child = cur_child->next;
874                         }
875                         cur_child->next->prev = cur_child->prev;
876                         cur_child->prev->next = cur_child->next;
877
878                         // Add it to the idle list
879                         cur_child->prev = NULL;
880                         cur_child->next = forker->idle_list;
881                         forker->idle_list = cur_child;
882                 }
883                 cur_child = next_child;
884         } while( forker->first_child && forker->first_child != next_child );
885 }
886
887 /**
888         @brief Service up a set maximum number of requests; then shut down.
889         @param child Pointer to the prefork_child representing the child process.
890
891         Called only by child process.
892
893         Enter a loop, for up to max_requests iterations.  On each iteration:
894         - Wait indefinitely for a request from the parent.
895         - Service the request.
896         - Increment a counter.  If the limit hasn't been reached, notify the parent that you
897         are available for another request.
898
899         After exiting the loop, shut down and terminate the process.
900 */
901 static void prefork_child_wait( prefork_child* child ) {
902
903         int i,n;
904         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
905         char buf[READ_BUFSIZE];
906
907         for( i = 0; i < child->max_requests; i++ ) {
908
909                 n = -1;
910                 int gotdata = 0;    // boolean; set to true if we get data
911                 clr_fl(child->read_data_fd, O_NONBLOCK );
912
913                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
914                         buf[n] = '\0';
915                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
916                         if(!gotdata) {
917                                 set_fl(child->read_data_fd, O_NONBLOCK );
918                                 gotdata = 1;
919                         }
920                         buffer_add( gbuf, buf );
921                 }
922
923                 if( errno == EAGAIN ) n = 0;
924
925                 if( errno == EPIPE ) {
926                         osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
927                         break;
928                 }
929
930                 int terminate_now = 0;     // Boolean
931
932                 if( n < 0 ) {
933                         osrfLogWarning( OSRF_LOG_MARK,
934                                         "Prefork child read returned error with errno %d", errno );
935                         break;
936
937                 } else if( gotdata ) {
938                         osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
939                         terminate_now = prefork_child_process_request( child, gbuf->buf );
940                         buffer_reset( gbuf );
941                 }
942
943                 if( terminate_now ) {
944                         osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
945                         break;
946                 }
947
948                 if( i < child->max_requests - 1 ) {
949                         size_t msg_len = 9;
950                         ssize_t len = write(
951                                 child->write_status_fd, "available" /*less than 64 bytes*/, msg_len );
952                         if( len != msg_len ) {
953                                 osrfLogError( OSRF_LOG_MARK, 
954                                         "Drone terminating: unable to notify listener of availability: %s",
955                                         strerror( errno ));
956                                 buffer_free(gbuf);
957                                 osrf_prefork_child_exit(child);
958                         }
959                 }
960         }
961
962         buffer_free(gbuf);
963
964         osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
965                         child->max_requests, i, (long) getpid() );
966
967         osrf_prefork_child_exit(child);
968 }
969
970 /**
971         @brief Add a prefork_child to the end of the active list.
972         @param forker Pointer to the prefork_simple that owns the list.
973         @param child Pointer to the prefork_child to be added.
974 */
975 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
976
977         if( forker->first_child == NULL ) {
978                 // Simplest case: list is initially empty.
979                 forker->first_child = child;
980                 child->next = child;
981                 child->prev = child;
982         } else {
983                 // Find the last node in the circular list.
984                 prefork_child* last_child = forker->first_child->prev;
985
986                 // Insert the new child between the last and first children.
987                 last_child->next = child;
988                 child->prev      = last_child;
989                 child->next      = forker->first_child;
990                 forker->first_child->prev = child;
991         }
992 }
993
994 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
995
996         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
997
998         prefork_child* cur_child = NULL;
999         
1000         // Look first in the active list
1001         if( forker->first_child ) {
1002                 cur_child = forker->first_child; /* current pointer */
1003                 while( cur_child->pid != pid && cur_child->next != forker->first_child )
1004                         cur_child = cur_child->next;
1005
1006                 if( cur_child->pid == pid ) {
1007                         // We found the right node.  Remove it from the list.
1008                         if( cur_child->next == cur_child )
1009                                 forker->first_child = NULL;    // only child in the list
1010                         else {
1011                                 if( forker->first_child == cur_child )
1012                                         forker->first_child = cur_child->next;  // Reseat forker->first_child
1013
1014                                 // Stitch the adjacent nodes together
1015                                 cur_child->prev->next = cur_child->next;
1016                                 cur_child->next->prev = cur_child->prev;
1017                         }
1018                 } else
1019                         cur_child = NULL;  // Didn't find it in the active list
1020         }
1021
1022         if( ! cur_child ) {
1023                 // Maybe it's in the idle list.  This can happen if, for example,
1024                 // a child is killed by a signal while it's between requests.
1025
1026                 prefork_child* prev = NULL;
1027                 cur_child = forker->idle_list;
1028                 while( cur_child && cur_child->pid != pid ) {
1029                         prev = cur_child;
1030                         cur_child = cur_child->next;
1031                 }
1032
1033                 if( cur_child ) {
1034                         // Detach from the list
1035                         if( prev )
1036                                 prev->next = cur_child->next;
1037                         else
1038                                 forker->idle_list = cur_child->next;
1039                 } // else we can't find it
1040         }
1041
1042         // If we found the node, destroy it.
1043         if( cur_child )
1044                 prefork_child_free( forker, cur_child );
1045 }
1046
1047 /**
1048         @brief Create and initialize a prefork_child.
1049         @param forker Pointer to the prefork_simple that will own the prefork_child.
1050         @param read_data_fd Used by child to read request from parent.
1051         @param write_data_fd Used by parent to write request to child.
1052         @param read_status_fd Used by parent to read status from child.
1053         @param write_status_fd Used by child to write status to parent.
1054         @return Pointer to the newly created prefork_child.
1055
1056         The calling code is responsible for freeing the prefork_child by calling
1057         prefork_child_free().
1058 */
1059 static prefork_child* prefork_child_init( prefork_simple* forker,
1060         int read_data_fd, int write_data_fd,
1061         int read_status_fd, int write_status_fd ) {
1062
1063         // Allocate a prefork_child -- from the free list if possible, or from
1064         // the heap if necessary.  The free list is a non-circular, singly-linked list.
1065         prefork_child* child;
1066         if( forker->free_list ) {
1067                 child = forker->free_list;
1068                 forker->free_list = child->next;
1069         } else
1070                 child = safe_malloc(sizeof(prefork_child));
1071
1072         child->pid              = 0;
1073         child->read_data_fd     = read_data_fd;
1074         child->write_data_fd    = write_data_fd;
1075         child->read_status_fd   = read_status_fd;
1076         child->write_status_fd  = write_status_fd;
1077         child->max_requests     = forker->max_requests;
1078         child->appname          = forker->appname;  // We don't make a separate copy
1079         child->keepalive        = forker->keepalive;
1080         child->next             = NULL;
1081         child->prev             = NULL;
1082
1083         return child;
1084 }
1085
1086 /**
1087         @brief Terminate all child processes and clear out a prefork_simple.
1088         @param prefork Pointer to the prefork_simple to be cleared out.
1089
1090         We do not deallocate the prefork_simple itself, just its contents.
1091 */
1092 static void prefork_clear( prefork_simple* prefork ) {
1093
1094         // Kill all the active children, and move their prefork_child nodes to the free list.
1095         while( prefork->first_child ) {
1096                 kill( prefork->first_child->pid, SIGKILL );
1097                 del_prefork_child( prefork, prefork->first_child->pid );
1098         }
1099
1100         // Kill all the idle prefork children, close their file
1101         // descriptors, and move them to the free list.
1102         prefork_child* child = prefork->idle_list;
1103         prefork->idle_list = NULL;
1104         while( child ) {
1105                 prefork_child* temp = child->next;
1106                 kill( child->pid, SIGKILL );
1107                 prefork_child_free( prefork, child );
1108                 child = temp;
1109         }
1110         //prefork->current_num_children = 0;
1111
1112         // Physically free the free list of prefork_children.
1113         child = prefork->free_list;
1114         prefork->free_list = NULL;
1115         while( child ) {
1116                 prefork_child* temp = child->next;
1117                 free( child );
1118                 child = temp;
1119         }
1120
1121         // Close the Jabber connection
1122         client_free( prefork->connection );
1123         prefork->connection = NULL;
1124
1125         // After giving the child processes a second to terminate, wait on them so that they
1126         // don't become zombies.  We don't wait indefinitely, so it's possible that some
1127         // children will survive a bit longer.
1128         sleep( 1 );
1129         while( (waitpid(-1, NULL, WNOHANG)) > 0) {
1130                 --prefork->current_num_children;
1131         }
1132
1133         free(prefork->appname);
1134         prefork->appname = NULL;
1135 }
1136
1137 /**
1138         @brief Destroy and deallocate a prefork_child.
1139         @param forker Pointer to the prefork_simple that owns the prefork_child.
1140         @param child Pointer to the prefork_child to be destroyed.
1141 */
1142 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1143         close( child->read_data_fd );
1144         close( child->write_data_fd );
1145         close( child->read_status_fd );
1146         close( child->write_status_fd );
1147
1148         // Stick the prefork_child in a free list for potential reuse.  This is a
1149         // non-circular, singly linked list.
1150         child->prev = NULL;
1151         child->next = forker->free_list;
1152         forker->free_list = child;
1153 }