]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/osrf_prefork.c
Rearrange the way that a listener juggles its drones.
[OpenSRF.git] / src / libopensrf / osrf_prefork.c
1 /**
2         @file osrf_prefork.c
3         @brief Spawn and manage a collection of child process to service requests.
4
5         Spawn a collection of child processes, replacing them as needed.  Forward requests to them
6         and let the children do the work.
7
8         Each child processes some maximum number of requests before it terminates itself.  When a
9         child dies, either deliberately or otherwise, we can spawn another one to replace it,
10         keeping the number of children within a predefined range.
11
12         Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13         a request, and who are still working on them.  Use a separate linear linked list to keep
14         track of children that are currently idle.  Move them back and forth as needed.
15
16         For each child, set up two pipes:
17         - One for the parent to send requests to the child.
18         - One for the child to notify the parent that it is available for another request.
19
20         The message sent to the child represents an XML stanza as received from Jabber.
21
22         When the child finishes processing the request, it writes the string "available" back
23         to the parent.  Then the parent knows that it can send that child another request.
24 */
25
26 #include <signal.h>
27 #include <sys/types.h>
28 #include <sys/time.h>
29 #include <unistd.h>
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include <string.h>
33 #include <sys/select.h>
34 #include <sys/wait.h>
35
36 #include "opensrf/utils.h"
37 #include "opensrf/log.h"
38 #include "opensrf/transport_client.h"
39 #include "opensrf/osrf_stack.h"
40 #include "opensrf/osrf_settings.h"
41 #include "opensrf/osrf_application.h"
42
43 #define READ_BUFSIZE 1024
44 #define ABS_MAX_CHILDREN 256
45
46 typedef struct {
47         int max_requests;     /**< How many requests a child processes before terminating. */
48         int min_children;     /**< Minimum number of children to maintain. */
49         int max_children;     /**< Maximum number of children to maintain. */
50         int fd;               /**< Unused. */
51         int data_to_child;    /**< Unused. */
52         int data_to_parent;   /**< Unused. */
53         int current_num_children;   /**< How many children are currently on the list. */
54         int keepalive;        /**< Keepalive time for stateful sessions. */
55         char* appname;        /**< Name of the application. */
56         /** Points to a circular linked list of children. */
57         struct prefork_child_struct* first_child;
58         /** List of of child processes that aren't doing anything at the moment and are
59                 therefore available to service a new request. */
60         struct prefork_child_struct* idle_list;
61         /** List of allocated but unused prefork_children, available for reuse.  Each one is just
62                 raw memory, apart from the "next" pointer used to stitch them together.  In particular,
63                 there is no child process for them, and the file descriptors are not open. */
64         struct prefork_child_struct* free_list;
65         transport_client* connection;  /**< Connection to Jabber. */
66 } prefork_simple;
67
68 struct prefork_child_struct {
69         pid_t pid;            /**< Process ID of the child. */
70         int read_data_fd;     /**< Child uses to read request. */
71         int write_data_fd;    /**< Parent uses to write request. */
72         int read_status_fd;   /**< Parent reads to see if child is available. */
73         int write_status_fd;  /**< Child uses to notify parent when it's available again. */
74         int max_requests;     /**< How many requests a child can process before terminating. */
75         const char* appname;  /**< Name of the application. */
76         int keepalive;        /**< Keepalive time for stateful sessions. */
77         struct prefork_child_struct* next;  /**< Linkage pointer for linked list. */
78         struct prefork_child_struct* prev;  /**< Linkage pointer for linked list. */
79 };
80
81 typedef struct prefork_child_struct prefork_child;
82
83 /** Boolean.  Set to true by a signal handler when it traps SIGCHLD. */
84 static volatile sig_atomic_t child_dead;
85
86 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
87         int max_requests, int min_children, int max_children );
88 static prefork_child* launch_child( prefork_simple* forker );
89 static void prefork_launch_children( prefork_simple* forker );
90 static void prefork_run(prefork_simple* forker);
91 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
92
93 static void del_prefork_child( prefork_simple* forker, pid_t pid );
94 static void check_children( prefork_simple* forker, int forever );
95 static void prefork_child_process_request(prefork_child*, char* data);
96 static int prefork_child_init_hook(prefork_child*);
97 static prefork_child* prefork_child_init( prefork_simple* forker,
98                 int read_data_fd, int write_data_fd,
99                 int read_status_fd, int write_status_fd );
100
101 /* listens on the 'data_to_child' fd and wait for incoming data */
102 static void prefork_child_wait( prefork_child* child );
103 static void prefork_clear( prefork_simple* );
104 static void prefork_child_free( prefork_simple* forker, prefork_child* );
105 static void osrf_prefork_register_routers( const char* appname );
106 static void osrf_prefork_child_exit( prefork_child* );
107
108 static void sigchld_handler( int sig );
109
110 /**
111         @brief Spawn and manage a collection of drone processes for servicing requests.
112         @param appname Name of the application.
113         @return 0 if successful, or -1 if error.
114 */
115 int osrf_prefork_run( const char* appname ) {
116
117         if(!appname) {
118                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
119                 return -1;
120         }
121
122         set_proc_title( "OpenSRF Listener [%s]", appname );
123
124         int maxr = 1000;
125         int maxc = 10;
126         int minc = 3;
127         int kalive = 5;
128
129         osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
130
131         char* max_req      = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
132         char* min_children = osrf_settings_host_value("/apps/%s/unix_config/min_children", appname);
133         char* max_children = osrf_settings_host_value("/apps/%s/unix_config/max_children", appname);
134         char* keepalive    = osrf_settings_host_value("/apps/%s/keepalive", appname);
135
136         if(!keepalive) osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive);
137         else kalive = atoi(keepalive);
138
139         if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr);
140         else maxr = atoi(max_req);
141
142         if(!min_children) osrfLogWarning( OSRF_LOG_MARK,
143                         "Min children not defined, assuming %d", minc);
144         else minc = atoi(min_children);
145
146         if(!max_children) osrfLogWarning( OSRF_LOG_MARK,
147                         "Max children not defined, assuming %d", maxc);
148         else maxc = atoi(max_children);
149
150         free(keepalive);
151         free(max_req);
152         free(min_children);
153         free(max_children);
154         /* --------------------------------------------------- */
155
156         char* resc = va_list_to_string("%s_listener", appname);
157
158         // Make sure that we haven't already booted
159         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
160                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
161                 free(resc);
162                 return -1;
163         }
164
165         free(resc);
166
167         prefork_simple forker;
168
169         if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc ) ) {
170                 osrfLogError( OSRF_LOG_MARK,
171                                 "osrf_prefork_run() failed to create prefork_simple object" );
172                 return -1;
173         }
174
175         // Finish initializing the prefork_simple.
176         forker.appname   = strdup(appname);
177         forker.keepalive = kalive;
178
179         // Spawn the children; put them in the idle list.
180         prefork_launch_children( &forker );
181
182         // Tell the router that you're open for business.
183         osrf_prefork_register_routers(appname);
184
185         // Sit back and let the requests roll in
186         osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
187         prefork_run( &forker );
188
189         osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??");
190         prefork_clear( &forker );
191         return 0;
192 }
193
194 /**
195         @brief Register the application with a specified router.
196         @param appname Name of the application.
197         @param routerName Name of the router.
198         @param routerDomain Domain of the router.
199
200         Tell the router that you're open for business so that it can route requests to you.
201
202         Called only by the parent process.
203 */
204 static void osrf_prefork_send_router_registration(
205                 const char* appname, const char* routerName, const char* routerDomain ) {
206         // Get a pointer to the global transport_client
207         transport_client* client = osrfSystemGetTransportClient();
208
209         // Construct the Jabber address of the router
210         char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
211         osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
212
213         // Create the registration message, and send it
214         transport_message* msg = message_init( "registering", NULL, NULL, jid, NULL );
215         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
216         client_send_message( client, msg );
217
218         // Clean up
219         message_free( msg );
220         free(jid);
221 }
222
223 /* parses a single "complex" router configuration chunk */
224 // Called only by the parent process
225 static void osrf_prefork_parse_router_chunk(const char* appname, const jsonObject* routerChunk) {
226
227         const char* routerName = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "name"));
228         const char* domain = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "domain"));
229         const jsonObject* services = jsonObjectGetKeyConst(routerChunk, "services");
230         osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s",
231                         routerName, domain);
232
233         if( services && services->type == JSON_HASH ) {
234                 osrfLogDebug(OSRF_LOG_MARK, "investigating router information...");
235                 const jsonObject* service_obj = jsonObjectGetKeyConst(services, "service");
236                 if( !service_obj )
237                         ;    // do nothing (shouldn't happen)
238                 else if( JSON_ARRAY == service_obj->type ) {
239                         int j;
240                         for(j = 0; j < service_obj->size; j++ ) {
241                                 const char* service = jsonObjectGetString(jsonObjectGetIndex(service_obj, j));
242                                 if( service && !strcmp( appname, service ))
243                                         osrf_prefork_send_router_registration(appname, routerName, domain);
244                         }
245                 }
246                 else if( JSON_STRING == service_obj->type ) {
247                         if( !strcmp(appname, jsonObjectGetString( service_obj )) )
248                                 osrf_prefork_send_router_registration(appname, routerName, domain);
249                 }
250         } else {
251                 osrf_prefork_send_router_registration(appname, routerName, domain);
252         }
253 }
254
255 /**
256         @brief Register the application with one or more routers, according to the configuration.
257         @param appname Name of the application.
258
259         Called only by the parent process.
260 */
261 static void osrf_prefork_register_routers( const char* appname ) {
262
263         jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
264
265         int i;
266         for(i = 0; i < routerInfo->size; i++) {
267                 const jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
268
269                 if(routerChunk->type == JSON_STRING) {
270                         /* this accomodates simple router configs */
271                         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
272                         char* domain = osrfConfigGetValue(NULL, "/routers/router");
273                         osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s",
274                                         routerName);
275                         osrf_prefork_send_router_registration(appname, routerName, domain);
276
277                         free( routerName );
278                         free( domain );
279                 } else {
280                         osrf_prefork_parse_router_chunk(appname, routerChunk);
281                 }
282         }
283
284         jsonObjectFree( routerInfo );
285 }
286
287 /**
288         @brief Initialize a child process.
289         @param child Pointer to the prefork_child representing the new child process.
290         @return Zero if successful, or -1 if not.
291
292         Called only by child processes.  Actions:
293         - Connect to one or more cache servers
294         - Reconfigure logger, if necessary
295         - Discard parent's Jabber connection and open a new one
296         - Dynamically call an application-specific initialization routine
297         - Change the command line as reported by ps
298 */
299 static int prefork_child_init_hook(prefork_child* child) {
300
301         if(!child) return -1;
302         osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
303
304         // Connect to cache server(s).
305         osrfSystemInitCache();
306         char* resc = va_list_to_string("%s_drone", child->appname);
307
308         // If we're a source-client, tell the logger now that we're a new process.
309         char* isclient = osrfConfigGetValue(NULL, "/client");
310         if( isclient && !strcasecmp(isclient,"true") )
311                 osrfLogSetIsClient(1);
312         free(isclient);
313
314         // Remove traces of our parent's socket connection so we can have our own.
315         osrfSystemIgnoreTransportClient();
316
317         // Connect to Jabber
318         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
319                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
320                 free(resc);
321                 return -1;
322         }
323
324         free(resc);
325
326         // Dynamically call the application-specific initialization function
327         // from a previously loaded shared library.
328         if( ! osrfAppRunChildInit(child->appname) ) {
329                 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
330         } else {
331                 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
332                 return -1;
333         }
334
335         // Change the command line as reported by ps
336         set_proc_title( "OpenSRF Drone [%s]", child->appname );
337         return 0;
338 }
339
340 // Called only by a child process
341 static void prefork_child_process_request(prefork_child* child, char* data) {
342         if( !child ) return;
343
344         transport_client* client = osrfSystemGetTransportClient();
345
346         if(!client_connected(client)) {
347                 osrfSystemIgnoreTransportClient();
348                 osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
349                 if(!osrf_system_bootstrap_client(NULL, NULL)) {
350                         osrfLogError( OSRF_LOG_MARK,
351                                 "Unable to bootstrap client in prefork_child_process_request()");
352                         sleep(1);
353                         osrf_prefork_child_exit(child);
354                 }
355         }
356
357         /* construct the message from the xml */
358         transport_message* msg = new_message_from_xml( data );
359
360         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
361         if(!session) return;
362
363         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
364                 osrfAppSessionFree( session );
365                 return;
366         }
367
368         osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
369         int keepalive = child->keepalive;
370         int retval;
371         int recvd;
372         time_t start;
373         time_t end;
374
375         while(1) {
376
377                 osrfLogDebug(OSRF_LOG_MARK,
378                                 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
379                 start   = time(NULL);
380                 retval  = osrf_app_session_queue_wait(session, keepalive, &recvd);
381                 end     = time(NULL);
382
383                 osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
384
385                 if(retval) {
386                         osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
387                         break;
388                 }
389
390                 /* see if the client disconnected from us */
391                 if(session->state != OSRF_SESSION_CONNECTED)
392                         break;
393
394                 /* if no data was reveived within the timeout interval */
395                 if( !recvd && (end - start) >= keepalive ) {
396                         osrfLogInfo(OSRF_LOG_MARK,
397                                         "No request was received in %d seconds, exiting stateful session", keepalive);
398                         osrfAppSessionStatus(
399                                         session,
400                                         OSRF_STATUS_TIMEOUT,
401                                         "osrfConnectStatus",
402                                         0, "Disconnected on timeout" );
403
404                         break;
405                 }
406         }
407
408         osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
409         osrfAppSessionFree( session );
410         return;
411 }
412
413 /**
414         @brief Partially initialize a prefork_simple provided by the caller.
415         @param prefork Pointer to a a raw prefork_simple to be initialized.
416         @param client Pointer to a transport_client (connection to Jabber).
417         @param max_requests The maximum number of requests that a child process may service
418                         before terminating.
419         @param min_children Minimum number of child processes to maintain.
420         @param max_children Maximum number of child processes to maintain.
421         @return 0 if successful, or 1 if not (due to invalid parameters).
422 */
423 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
424                 int max_requests, int min_children, int max_children ) {
425
426         if( min_children > max_children ) {
427                 osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
428                                 "than max_children (%d)", min_children, max_children );
429                 return 1;
430         }
431
432         if( max_children > ABS_MAX_CHILDREN ) {
433                 osrfLogError( OSRF_LOG_MARK,  "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
434                                 max_children, ABS_MAX_CHILDREN );
435                 return 1;
436         }
437
438         osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
439                 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
440
441         /* flesh out the struct */
442         prefork->max_requests = max_requests;
443         prefork->min_children = min_children;
444         prefork->max_children = max_children;
445         prefork->fd           = 0;
446         prefork->data_to_child = 0;
447         prefork->data_to_parent = 0;
448         prefork->current_num_children = 0;
449         prefork->keepalive    = 0;
450         prefork->appname      = NULL;
451         prefork->first_child  = NULL;
452         prefork->idle_list    = NULL;
453         prefork->free_list    = NULL;
454         prefork->connection   = client;
455
456         return 0;
457 }
458
459 /**
460         @brief Spawn a new child process and put it in the idle list.
461         @param forker Pointer to the prefork_simple that will own the process.
462         @return Pointer to the new prefork_child, or not at all.
463
464         Spawn a new child process.  Create a prefork_child for it and put it in the idle list.
465
466         After forking, the parent returns a pointer to the new prefork_child.  The child
467         services its quota of requests and then terminates without returning.
468 */
469 static prefork_child* launch_child( prefork_simple* forker ) {
470
471         pid_t pid;
472         int data_fd[2];
473         int status_fd[2];
474
475         // Set up the data and status pipes
476         if( pipe(data_fd) < 0 ) { /* build the data pipe*/
477                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
478                 return NULL;
479         }
480
481         if( pipe(status_fd) < 0 ) {/* build the status pipe */
482                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
483                 close( data_fd[1] );
484                 close( data_fd[0] );
485                 return NULL;
486         }
487
488         osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
489                         data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
490
491         // Create and initialize a prefork_child for the new process
492         prefork_child* child = prefork_child_init( forker, data_fd[0],
493                         data_fd[1], status_fd[0], status_fd[1] );
494
495         if( (pid=fork()) < 0 ) {
496                 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
497                 prefork_child_free( forker, child );
498                 return NULL;
499         }
500
501         // Add the new child to the head of the idle list
502         child->next = forker->idle_list;
503         forker->idle_list = child;
504
505         if( pid > 0 ) {  /* parent */
506
507                 signal(SIGCHLD, sigchld_handler);
508                 (forker->current_num_children)++;
509                 child->pid = pid;
510
511                 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
512                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
513                         the children are currently using */
514                 return child;
515         }
516
517         else { /* child */
518
519                 osrfLogInternal( OSRF_LOG_MARK,
520                                 "I am new child with read_data_fd = %d and write_status_fd = %d",
521                                 child->read_data_fd, child->write_status_fd );
522
523                 child->pid = getpid();
524                 close( child->write_data_fd );
525                 close( child->read_status_fd );
526
527                 /* do the initing */
528                 if( prefork_child_init_hook(child) == -1 ) {
529                         osrfLogError(OSRF_LOG_MARK,
530                                 "Forker child going away because we could not connect to OpenSRF...");
531                         osrf_prefork_child_exit(child);
532                 }
533
534                 prefork_child_wait( child );      // Should exit without returning
535                 osrf_prefork_child_exit( child ); // Just to be sure
536                 return NULL;  // Unreachable, but it keeps the compiler happy
537         }
538 }
539
540 /**
541         @brief Terminate a child process.
542         @param child Pointer to the prefork_child representing the child process.
543
544         Called only by child processes.  Dynamically call an application-specific shutdown
545         function from a previously loaded shared library; then exit.
546 */
547 static void osrf_prefork_child_exit(prefork_child* child) {
548         osrfAppRunExitCode();
549         exit(0);
550 }
551
552 /**
553         @brief Launch all the child processes, putting them in the idle list.
554         @param forker Pointer to the prefork_simple that will own the children.
555
556         Called only by the parent process (in order to become a parent).
557 */
558 static void prefork_launch_children( prefork_simple* forker ) {
559         if(!forker) return;
560         int c = 0;
561         while( c++ < forker->min_children )
562                 launch_child( forker );
563 }
564
565 /**
566         @brief Signal handler for SIGCHLD: note that a child process has terminated.
567         @param sig The value of the trapped signal; always SIGCHLD.
568
569         Set a boolean to be checked later.
570 */
571 static void sigchld_handler( int sig ) {
572         signal(SIGCHLD, sigchld_handler);
573         child_dead = 1;
574 }
575
576 /**
577         @brief Replenish the collection of child processes, after one has terminated.
578         @param forker Pointer to the prefork_simple that manages the child processes.
579
580         The parent calls this function when it notices (via a signal handler) that
581         a child process has died.
582
583         Wait on the dead children so that they won't be zombies.  Spawn new ones as needed
584         to maintain at least a minimum number.
585 */
586 void reap_children( prefork_simple* forker ) {
587
588         pid_t child_pid;
589
590         // Reset our boolean so that we can detect any further terminations.
591         child_dead = 0;
592
593         // Bury the children so that they won't be zombies.  WNOHANG means that waitpid() returns
594         // immediately if there are no waitable children, instead of waiting for more to die.
595         // Ignore the return code of the child.  We don't do an autopsy.
596         while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0) {
597                 --forker->current_num_children;
598                 del_prefork_child( forker, child_pid );
599         }
600
601         // Spawn more children as needed.
602         while( forker->current_num_children < forker->min_children )
603                 launch_child( forker );
604 }
605
606 /**
607         @brief Read transport_messages and dispatch them to child processes for servicing.
608         @param forker Pointer to the prefork_simple that manages the child processes.
609
610         This is the main loop of the parent process, and once entered, does not exit.
611
612         For each usable transport_message received: look for an idle child to service it.  If
613         no idle children are available, either spawn a new one or, if we've already spawned the
614         maximum number of children, wait for one to become available.  Once a child is available
615         by whatever means, write an XML version of the input message, to a pipe designated for
616         use by that child.
617 */
618 static void prefork_run( prefork_simple* forker ) {
619
620         if( NULL == forker->idle_list )
621                 return;   // No available children, and we haven't even started yet
622
623         transport_message* cur_msg = NULL;
624
625         while(1) {
626
627                 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
628                         osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
629                         return;
630                 }
631
632                 // Wait indefinitely for an input message
633                 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
634                 cur_msg = client_recv( forker->connection, -1 );
635
636                 if( cur_msg == NULL )
637                         continue;           // Error?  Interrupted by a signal?  Try again...
638
639                 message_prepare_xml( cur_msg );
640                 const char* msg_data = cur_msg->msg_xml;
641                 if( ! msg_data || ! *msg_data ) {
642                         osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
643                                         (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
644                         message_free( cur_msg );
645                         continue;       // Message not usable; go on to the next one.
646                 }
647
648                 int honored = 0;     /* will be set to true when we service the request */
649                 int no_recheck = 0;
650
651                 while( ! honored ) {
652
653                         if(!no_recheck)
654                                 check_children( forker, 0 );
655                         no_recheck = 0;
656
657                         osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
658
659                         prefork_child* cur_child = NULL;
660
661                         // Look for an available child in the idle list.  Since the idle list operates
662                         // as a stack, the child we get is the one that was most recently active, or
663                         // most recently spawned.  That means it's the one most likely still to be in
664                         // physical memory, and the one least likely to have to be swapped in.
665                         while( forker->idle_list ) {
666
667                                 osrfLogInfo( OSRF_LOG_MARK, "Looking for idle child" );
668                                 // Grab the prefork_child at the head of the idle list
669                                 cur_child = forker->idle_list;
670                                 forker->idle_list = cur_child->next;
671                                 cur_child->next = NULL;
672
673                                 osrfLogInternal( OSRF_LOG_MARK,
674                                                 "Searching for available child. cur_child->pid = %d", cur_child->pid );
675                                 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
676                                                 forker->current_num_children );
677
678                                 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
679                                 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
680                                                 cur_child->write_data_fd );
681
682                                 int written = write(cur_child->write_data_fd, msg_data, strlen(msg_data) + 1);
683                                 if( written < 0 ) {
684                                         // This child appears to be dead or unusable.  Discard it.
685                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
686                                                         errno, strerror( errno ) );
687                                         kill( cur_child->pid, SIGKILL );
688                                         del_prefork_child( forker, cur_child->pid );
689                                         continue;
690                                 }
691
692                                 add_prefork_child( forker, cur_child );  // Add it to active list
693                                 honored = 1;
694                                 break;
695                         }
696
697                         /* if none available, add a new child if we can */
698                         if( ! honored ) {
699                                 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
700
701                                 if( forker->current_num_children < forker->max_children ) {
702                                         osrfLogDebug( OSRF_LOG_MARK,  "Launching new child with current_num = %d",
703                                                         forker->current_num_children );
704
705                                         launch_child( forker );  // Put a new child into the idle list
706                                         if( forker->idle_list ) {
707
708                                                 // Take the new child from the idle list
709                                                 prefork_child* new_child = forker->idle_list;
710                                                 forker->idle_list = new_child->next;
711                                                 new_child->next = NULL;
712
713                                                 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
714                                                                 new_child->write_data_fd, new_child->pid );
715
716                                                 int written = write(
717                                                                 new_child->write_data_fd, msg_data, strlen(msg_data) + 1);
718                                                 if( written < 0 ) {
719                                                         // This child appears to be dead or unusable.  Discard it.
720                                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
721                                                                                         errno, strerror( errno ) );
722                                                         kill( cur_child->pid, SIGKILL );
723                                                         del_prefork_child( forker, cur_child->pid );
724                                                 } else {
725                                                         add_prefork_child( forker, new_child );
726                                                         honored = 1;
727                                                 }
728                                         }
729
730                                 }
731                         }
732
733                         if( !honored ) {
734                                 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
735                                 check_children( forker, 1 );
736                                 // Tell the loop not to call check_children again, since we're calling it now
737                                 no_recheck = 1;
738                         }
739
740                         if( child_dead )
741                                 reap_children(forker);
742
743                 } // end while( ! honored )
744
745                 message_free( cur_msg );
746
747         } /* end top level listen loop */
748 }
749
750
751 /* XXX Add a flag which tells select() to wait forever on children
752         in the best case, this will be faster than calling usleep(x), and
753         in the worst case it won't be slower and will do less logging...
754 */
755 /**
756         @brief See if any children have become available.
757         @param forker Pointer to the prefork_simple that owns the children.
758         @param forever Boolean: true if we should wait indefinitely.
759         @return 
760
761         Call select() for all the children in the active list.  Read each active file
762         descriptor and move the corresponding child to the idle list.
763
764         If @a forever is true, wait indefinitely for input.  Otherwise return immediately if
765         there are no active file descriptors.
766 */
767 static void check_children( prefork_simple* forker, int forever ) {
768
769         if( child_dead )
770                 reap_children( forker );
771
772         if( NULL == forker->first_child ) {
773                 // If forever is true, then we're here because we've run out of idle
774                 // processes, so there should be some active ones around.
775                 // If forever is false, then the children may all be idle, and that's okay.
776                 if( forever )
777                         osrfLogError( OSRF_LOG_MARK, "No active child processes to check" );
778                 return;
779         }
780
781         int select_ret;
782         fd_set read_set;
783         FD_ZERO(&read_set);
784         int max_fd = 0;
785         int n;
786
787         // Prepare to select() on pipes from all the active children
788         prefork_child* cur_child = forker->first_child;
789         do {
790                 if( cur_child->read_status_fd > max_fd )
791                         max_fd = cur_child->read_status_fd;
792                 FD_SET( cur_child->read_status_fd, &read_set );
793                 cur_child = cur_child->next;
794         } while( cur_child != forker->first_child );
795
796         FD_CLR(0,&read_set); /* just to be sure */
797
798         if( forever ) {
799                 osrfLogWarning(OSRF_LOG_MARK,
800                                 "We have no children available - waiting for one to show up...");
801
802                 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
803                         osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
804                                         errno, strerror( errno ) );
805                 }
806                 osrfLogInfo(OSRF_LOG_MARK,
807                                 "select() completed after waiting on children to become available");
808
809         } else {
810
811                 struct timeval tv;
812                 tv.tv_sec   = 0;
813                 tv.tv_usec  = 0;
814
815                 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
816                         osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
817                                         errno, strerror( errno ) );
818                 }
819         }
820
821         if( select_ret == 0 )
822                 return;
823
824         // Check each child in the active list.
825         // If it has responded, move it to the idle list.
826         cur_child = forker->first_child;
827         prefork_child* next_child = NULL;
828         int num_handled = 0;
829         do {
830                 next_child = cur_child->next;
831                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
832                         osrfLogDebug( OSRF_LOG_MARK,
833                                         "Server received status from a child %d", cur_child->pid );
834
835                         num_handled++;
836
837                         /* now suck off the data */
838                         char buf[64];
839                         if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
840                                 osrfLogWarning( OSRF_LOG_MARK,
841                                                 "Read error after select in child status read with errno %d: %s",
842                                                 errno, strerror( errno ) );
843                         }
844                         else {
845                                 buf[n] = '\0';
846                                 osrfLogDebug( OSRF_LOG_MARK,  "Read %d bytes from status buffer: %s", n, buf );
847                         }
848
849                         // Remove the child from the active list
850                         if( forker->first_child == cur_child ) {
851                                 if( cur_child->next == cur_child )
852                                         forker->first_child = NULL;   // only child in the active list
853                                 else
854                                         forker->first_child = cur_child->next;
855                         }
856                         cur_child->next->prev = cur_child->prev;
857                         cur_child->prev->next = cur_child->next;
858
859                         // Add it to the idle list
860                         cur_child->prev = NULL;
861                         cur_child->next = forker->idle_list;
862                         forker->idle_list = cur_child;
863                 }
864                 cur_child = next_child;
865         } while( forker->first_child && forker->first_child != next_child );
866 }
867
868 /**
869         @brief Service up a set maximum number of requests; then shut down.
870         @param child Pointer to the prefork_child representing the child process.
871
872         Called only by child process.
873
874         Enter a loop, for up to max_requests iterations.  On each iteration:
875         - Wait indefinitely for a request from the parent.
876         - Service the request.
877         - Increment a counter.  If the limit hasn't been reached, notify the parent that you
878         are available for another request.
879
880         After exiting the loop, shut down and terminate the process.
881 */
882 static void prefork_child_wait( prefork_child* child ) {
883
884         int i,n;
885         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
886         char buf[READ_BUFSIZE];
887
888         for( i = 0; i < child->max_requests; i++ ) {
889
890                 n = -1;
891                 int gotdata = 0;    // boolean; set to true if we get data
892                 clr_fl(child->read_data_fd, O_NONBLOCK );
893
894                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
895                         buf[n] = '\0';
896                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
897                         if(!gotdata) {
898                                 set_fl(child->read_data_fd, O_NONBLOCK );
899                                 gotdata = 1;
900                         }
901                         buffer_add( gbuf, buf );
902                 }
903
904                 if( errno == EAGAIN ) n = 0;
905
906                 if( errno == EPIPE ) {
907                         osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
908                         break;
909                 }
910
911                 if( n < 0 ) {
912                         osrfLogWarning( OSRF_LOG_MARK,
913                                         "Prefork child read returned error with errno %d", errno );
914                         break;
915
916                 } else if( gotdata ) {
917                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
918                         prefork_child_process_request(child, gbuf->buf);
919                         buffer_reset( gbuf );
920                 }
921
922                 if( i < child->max_requests - 1 )
923                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
924         }
925
926         buffer_free(gbuf);
927
928         osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
929                         child->max_requests, i, (long) getpid() );
930
931         osrf_prefork_child_exit(child);
932 }
933
934 /**
935         @brief Add a prefork_child to the end of the active list.
936         @param forker Pointer to the prefork_simple that owns the list.
937         @param child Pointer to the prefork_child to be added.
938 */
939 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
940
941         if( forker->first_child == NULL ) {
942                 // Simplest case: list is initially empty.
943                 forker->first_child = child;
944                 child->next = child;
945                 child->prev = child;
946         } else {
947                 // Find the last node in the circular list.
948                 prefork_child* last_child = forker->first_child->prev;
949
950                 // Insert the new child between the last and first children.
951                 last_child->next = child;
952                 child->prev      = last_child;
953                 child->next      = forker->first_child;
954                 forker->first_child->prev = child;
955         }
956 }
957
958 /**
959         @brief Remove a prefork_child, representing a terminated child, from the list it's on.
960         @param forker Pointer to the prefork_simple that owns the child.
961         @param pid Process ID of the child to be removed.
962
963         Look for the node in the active list, and, failing that, in the idle list.  If you
964         find it, close its file descriptors and put it in the free list for potential reuse.
965 */
966 static void old_del_prefork_child( prefork_simple* forker, pid_t pid ) {
967
968         if( forker->first_child == NULL )
969                 return;  // Empty list; bail out.
970
971         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
972
973         // Find the node in question
974         prefork_child* cur_child = forker->first_child; /* current pointer */
975         while( cur_child->pid != pid && cur_child->next != forker->first_child )
976                 cur_child = cur_child->next;
977
978         if( cur_child->pid == pid ) {
979                 // We found the right node.  Remove it from the list.
980                 if( cur_child->next == cur_child )
981                         forker->first_child = NULL;    // only child in the list
982                 else {
983                         if( forker->first_child == cur_child )
984                                 forker->first_child = cur_child->next;  // Reseat forker->first_child
985
986                         // Stitch the nodes on either side together
987                         cur_child->prev->next = cur_child->next;
988                         cur_child->next->prev = cur_child->prev;
989                 }
990
991                 //Destroy the node
992                 prefork_child_free( forker, cur_child );
993
994         } else {
995                 // Maybe it's in the idle list.  This can happen if, for example,
996                 // a child is killed by a signal while it's between requests.
997
998                 prefork_child* prev = NULL;
999                 cur_child = forker->idle_list;
1000                 while( cur_child && cur_child->pid != pid ) {
1001                         prev = cur_child;
1002                         cur_child = cur_child->next;
1003                 }
1004
1005                 if( cur_child ) {
1006                         // Detach from the list
1007                         if( prev )
1008                                 prev->next = cur_child->next;
1009                         else
1010                                 forker->idle_list = cur_child->next;
1011
1012                         //Destroy the node
1013                         prefork_child_free( forker, cur_child );
1014                 } // else we can't find it, so do nothing.
1015         }
1016 }
1017
1018 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
1019
1020         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
1021
1022         prefork_child* cur_child = NULL;
1023         
1024         // Look first in the active list
1025         if( forker->first_child ) {
1026                 cur_child = forker->first_child; /* current pointer */
1027                 while( cur_child->pid != pid && cur_child->next != forker->first_child )
1028                         cur_child = cur_child->next;
1029
1030                 if( cur_child->pid == pid ) {
1031                         // We found the right node.  Remove it from the list.
1032                         if( cur_child->next == cur_child )
1033                                 forker->first_child = NULL;    // only child in the list
1034                         else {
1035                                 if( forker->first_child == cur_child )
1036                                         forker->first_child = cur_child->next;  // Reseat forker->first_child
1037
1038                                 // Stitch the adjacent nodes together
1039                                 cur_child->prev->next = cur_child->next;
1040                                 cur_child->next->prev = cur_child->prev;
1041                         }
1042                 } else
1043                         cur_child = NULL;  // Didn't find it in the active list
1044         }
1045
1046         if( ! cur_child ) {
1047                 // Maybe it's in the idle list.  This can happen if, for example,
1048                 // a child is killed by a signal while it's between requests.
1049
1050                 prefork_child* prev = NULL;
1051                 cur_child = forker->idle_list;
1052                 while( cur_child && cur_child->pid != pid ) {
1053                         prev = cur_child;
1054                         cur_child = cur_child->next;
1055                 }
1056
1057                 if( cur_child ) {
1058                         // Detach from the list
1059                         if( prev )
1060                                 prev->next = cur_child->next;
1061                         else
1062                                 forker->idle_list = cur_child->next;
1063                 } // else we can't find it
1064         }
1065
1066         // If we found the node, destroy it.
1067         if( cur_child )
1068                 prefork_child_free( forker, cur_child );
1069 }
1070
1071 /**
1072         @brief Create and initialize a prefork_child.
1073         @param forker Pointer to the prefork_simple that will own the prefork_child.
1074         @param read_data_fd Used by child to read request from parent.
1075         @param write_data_fd Used by parent to write request to child.
1076         @param read_status_fd Used by parent to read status from child.
1077         @param write_status_fd Used by child to write status to parent.
1078         @return Pointer to the newly created prefork_child.
1079
1080         The calling code is responsible for freeing the prefork_child by calling
1081         prefork_child_free().
1082 */
1083 static prefork_child* prefork_child_init( prefork_simple* forker,
1084         int read_data_fd, int write_data_fd,
1085         int read_status_fd, int write_status_fd ) {
1086
1087         // Allocate a prefork_child -- from the free list if possible, or from
1088         // the heap if necessary.  The free list is a non-circular, singly-linked list.
1089         prefork_child* child;
1090         if( forker->free_list ) {
1091                 child = forker->free_list;
1092                 forker->free_list = child->next;
1093         } else
1094                 child = safe_malloc(sizeof(prefork_child));
1095
1096         child->pid              = 0;
1097         child->read_data_fd     = read_data_fd;
1098         child->write_data_fd    = write_data_fd;
1099         child->read_status_fd   = read_status_fd;
1100         child->write_status_fd  = write_status_fd;
1101         child->max_requests     = forker->max_requests;
1102         child->appname          = forker->appname;  // We don't make a separate copy
1103         child->keepalive        = forker->keepalive;
1104         child->next             = NULL;
1105         child->prev             = NULL;
1106
1107         return child;
1108 }
1109
1110 /**
1111         @brief Terminate all child processes and clear out a prefork_simple.
1112         @param prefork Pointer to the prefork_simple to be cleared out.
1113
1114         We do not deallocate the prefork_simple itself, just its contents.
1115 */
1116 static void prefork_clear( prefork_simple* prefork ) {
1117
1118         // Kill all the active children, and move their prefork_child nodes to the free list.
1119         while( prefork->first_child ) {
1120                 kill( prefork->first_child->pid, SIGKILL );
1121                 del_prefork_child( prefork, prefork->first_child->pid );
1122         }
1123
1124         // Kill all the idle prefork children, close their file
1125         // descriptors, and move them to the free list.
1126         prefork_child* child = prefork->idle_list;
1127         prefork->idle_list = NULL;
1128         while( child ) {
1129                 prefork_child* temp = child->next;
1130                 kill( child->pid, SIGKILL );
1131                 prefork_child_free( prefork, child );
1132                 child = temp;
1133         }
1134         //prefork->current_num_children = 0;
1135
1136         // Physically free the free list of prefork_children.
1137         child = prefork->free_list;
1138         prefork->free_list = NULL;
1139         while( child ) {
1140                 prefork_child* temp = child->next;
1141                 free( child );
1142                 child = temp;
1143         }
1144
1145         // Close the Jabber connection
1146         client_free( prefork->connection );
1147         prefork->connection = NULL;
1148
1149         // After giving the child processes a second to terminate, wait on them so that they
1150         // don't become zombies.  We don't wait indefinitely, so it's possible that some
1151         // children will survive a bit longer.
1152         sleep( 1 );
1153         while( (waitpid(-1, NULL, WNOHANG)) > 0) {
1154                 --prefork->current_num_children;
1155         }
1156
1157         free(prefork->appname);
1158         prefork->appname = NULL;
1159 }
1160
1161 /**
1162         @brief Destroy and deallocate a prefork_child.
1163         @param forker Pointer to the prefork_simple that owns the prefork_child.
1164         @param child Pointer to the prefork_child to be destroyed.
1165 */
1166 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1167         close( child->read_data_fd );
1168         close( child->write_data_fd );
1169         close( child->read_status_fd );
1170         close( child->write_status_fd );
1171
1172         // Stick the prefork_child in a free list for potential reuse.  This is a
1173         // non-circular, singly linked list.
1174         child->prev = NULL;
1175         child->next = forker->free_list;
1176         forker->free_list = child;
1177 }