]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/osrf_prefork.c
560142fcb2da8e6bef184aa189bcc46af478de28
[OpenSRF.git] / src / libopensrf / osrf_prefork.c
1 /**
2         @file osrf_prefork.c
3         @brief Spawn and manage a collection of child process to service requests.
4
5         Spawn a collection of child processes, replacing them as needed.  Forward requests to them
6         and let the children do the work.
7
8         Each child processes some maximum number of requests before it terminates itself.  When a
9         child dies, either deliberately or otherwise, we can spawn another one to replace it,
10         keeping the number of children within a predefined range.
11
12         Use a doubly-linked circular list to keep track of the children to whom we have forwarded
13         a request, and who are still working on them.  Use a separate linear linked list to keep
14         track of children that are currently idle.  Move them back and forth as needed.
15
16         For each child, set up two pipes:
17         - One for the parent to send requests to the child.
18         - One for the child to notify the parent that it is available for another request.
19
20         The message sent to the child represents an XML stanza as received from Jabber.
21
22         When the child finishes processing the request, it writes the string "available" back
23         to the parent.  Then the parent knows that it can send that child another request.
24 */
25
26 #include <errno.h>
27 #include <signal.h>
28 #include <sys/types.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #include <stdlib.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <sys/select.h>
35 #include <sys/wait.h>
36
37 #include "opensrf/utils.h"
38 #include "opensrf/log.h"
39 #include "opensrf/transport_client.h"
40 #include "opensrf/osrf_stack.h"
41 #include "opensrf/osrf_settings.h"
42 #include "opensrf/osrf_application.h"
43
44 #define READ_BUFSIZE 1024
45 #define ABS_MAX_CHILDREN 256
46
47 typedef struct {
48         int max_requests;     /**< How many requests a child processes before terminating. */
49         int min_children;     /**< Minimum number of children to maintain. */
50         int max_children;     /**< Maximum number of children to maintain. */
51         int fd;               /**< Unused. */
52         int data_to_child;    /**< Unused. */
53         int data_to_parent;   /**< Unused. */
54         int current_num_children;   /**< How many children are currently on the list. */
55         int keepalive;        /**< Keepalive time for stateful sessions. */
56         char* appname;        /**< Name of the application. */
57         /** Points to a circular linked list of children. */
58         struct prefork_child_struct* first_child;
59         /** List of of child processes that aren't doing anything at the moment and are
60                 therefore available to service a new request. */
61         struct prefork_child_struct* idle_list;
62         /** List of allocated but unused prefork_children, available for reuse.  Each one is just
63                 raw memory, apart from the "next" pointer used to stitch them together.  In particular,
64                 there is no child process for them, and the file descriptors are not open. */
65         struct prefork_child_struct* free_list;
66         transport_client* connection;  /**< Connection to Jabber. */
67 } prefork_simple;
68
69 struct prefork_child_struct {
70         pid_t pid;            /**< Process ID of the child. */
71         int read_data_fd;     /**< Child uses to read request. */
72         int write_data_fd;    /**< Parent uses to write request. */
73         int read_status_fd;   /**< Parent reads to see if child is available. */
74         int write_status_fd;  /**< Child uses to notify parent when it's available again. */
75         int max_requests;     /**< How many requests a child can process before terminating. */
76         const char* appname;  /**< Name of the application. */
77         int keepalive;        /**< Keepalive time for stateful sessions. */
78         struct prefork_child_struct* next;  /**< Linkage pointer for linked list. */
79         struct prefork_child_struct* prev;  /**< Linkage pointer for linked list. */
80 };
81
82 typedef struct prefork_child_struct prefork_child;
83
84 /** Boolean.  Set to true by a signal handler when it traps SIGCHLD. */
85 static volatile sig_atomic_t child_dead;
86
87 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
88         int max_requests, int min_children, int max_children );
89 static prefork_child* launch_child( prefork_simple* forker );
90 static void prefork_launch_children( prefork_simple* forker );
91 static void prefork_run(prefork_simple* forker);
92 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
93
94 static void del_prefork_child( prefork_simple* forker, pid_t pid );
95 static void check_children( prefork_simple* forker, int forever );
96 static void prefork_child_process_request(prefork_child*, char* data);
97 static int prefork_child_init_hook(prefork_child*);
98 static prefork_child* prefork_child_init( prefork_simple* forker,
99                 int read_data_fd, int write_data_fd,
100                 int read_status_fd, int write_status_fd );
101
102 /* listens on the 'data_to_child' fd and wait for incoming data */
103 static void prefork_child_wait( prefork_child* child );
104 static void prefork_clear( prefork_simple* );
105 static void prefork_child_free( prefork_simple* forker, prefork_child* );
106 static void osrf_prefork_register_routers( const char* appname );
107 static void osrf_prefork_child_exit( prefork_child* );
108
109 static void sigchld_handler( int sig );
110
111 /**
112         @brief Spawn and manage a collection of drone processes for servicing requests.
113         @param appname Name of the application.
114         @return 0 if successful, or -1 if error.
115 */
116 int osrf_prefork_run( const char* appname ) {
117
118         if(!appname) {
119                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
120                 return -1;
121         }
122
123         set_proc_title( "OpenSRF Listener [%s]", appname );
124
125         int maxr = 1000;
126         int maxc = 10;
127         int minc = 3;
128         int kalive = 5;
129
130         osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
131
132         char* max_req      = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
133         char* min_children = osrf_settings_host_value("/apps/%s/unix_config/min_children", appname);
134         char* max_children = osrf_settings_host_value("/apps/%s/unix_config/max_children", appname);
135         char* keepalive    = osrf_settings_host_value("/apps/%s/keepalive", appname);
136
137         if(!keepalive) osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive);
138         else kalive = atoi(keepalive);
139
140         if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr);
141         else maxr = atoi(max_req);
142
143         if(!min_children) osrfLogWarning( OSRF_LOG_MARK,
144                         "Min children not defined, assuming %d", minc);
145         else minc = atoi(min_children);
146
147         if(!max_children) osrfLogWarning( OSRF_LOG_MARK,
148                         "Max children not defined, assuming %d", maxc);
149         else maxc = atoi(max_children);
150
151         free(keepalive);
152         free(max_req);
153         free(min_children);
154         free(max_children);
155         /* --------------------------------------------------- */
156
157         char* resc = va_list_to_string("%s_listener", appname);
158
159         // Make sure that we haven't already booted
160         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
161                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
162                 free(resc);
163                 return -1;
164         }
165
166         free(resc);
167
168         prefork_simple forker;
169
170         if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc ) ) {
171                 osrfLogError( OSRF_LOG_MARK,
172                                 "osrf_prefork_run() failed to create prefork_simple object" );
173                 return -1;
174         }
175
176         // Finish initializing the prefork_simple.
177         forker.appname   = strdup(appname);
178         forker.keepalive = kalive;
179
180         // Spawn the children; put them in the idle list.
181         prefork_launch_children( &forker );
182
183         // Tell the router that you're open for business.
184         osrf_prefork_register_routers(appname);
185
186         // Sit back and let the requests roll in
187         osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
188         prefork_run( &forker );
189
190         osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??");
191         prefork_clear( &forker );
192         return 0;
193 }
194
195 /**
196         @brief Register the application with a specified router.
197         @param appname Name of the application.
198         @param routerName Name of the router.
199         @param routerDomain Domain of the router.
200
201         Tell the router that you're open for business so that it can route requests to you.
202
203         Called only by the parent process.
204 */
205 static void osrf_prefork_send_router_registration(
206                 const char* appname, const char* routerName, const char* routerDomain ) {
207         // Get a pointer to the global transport_client
208         transport_client* client = osrfSystemGetTransportClient();
209
210         // Construct the Jabber address of the router
211         char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
212         osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
213
214         // Create the registration message, and send it
215         transport_message* msg = message_init( "registering", NULL, NULL, jid, NULL );
216         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
217         client_send_message( client, msg );
218
219         // Clean up
220         message_free( msg );
221         free(jid);
222 }
223
224 /* parses a single "complex" router configuration chunk */
225 // Called only by the parent process
226 static void osrf_prefork_parse_router_chunk(const char* appname, const jsonObject* routerChunk) {
227
228         const char* routerName = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "name"));
229         const char* domain = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "domain"));
230         const jsonObject* services = jsonObjectGetKeyConst(routerChunk, "services");
231         osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s",
232                         routerName, domain);
233
234         if( services && services->type == JSON_HASH ) {
235                 osrfLogDebug(OSRF_LOG_MARK, "investigating router information...");
236                 const jsonObject* service_obj = jsonObjectGetKeyConst(services, "service");
237                 if( !service_obj )
238                         ;    // do nothing (shouldn't happen)
239                 else if( JSON_ARRAY == service_obj->type ) {
240                         int j;
241                         for(j = 0; j < service_obj->size; j++ ) {
242                                 const char* service = jsonObjectGetString(jsonObjectGetIndex(service_obj, j));
243                                 if( service && !strcmp( appname, service ))
244                                         osrf_prefork_send_router_registration(appname, routerName, domain);
245                         }
246                 }
247                 else if( JSON_STRING == service_obj->type ) {
248                         if( !strcmp(appname, jsonObjectGetString( service_obj )) )
249                                 osrf_prefork_send_router_registration(appname, routerName, domain);
250                 }
251         } else {
252                 osrf_prefork_send_router_registration(appname, routerName, domain);
253         }
254 }
255
256 /**
257         @brief Register the application with one or more routers, according to the configuration.
258         @param appname Name of the application.
259
260         Called only by the parent process.
261 */
262 static void osrf_prefork_register_routers( const char* appname ) {
263
264         jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
265
266         int i;
267         for(i = 0; i < routerInfo->size; i++) {
268                 const jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
269
270                 if(routerChunk->type == JSON_STRING) {
271                         /* this accomodates simple router configs */
272                         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
273                         char* domain = osrfConfigGetValue(NULL, "/routers/router");
274                         osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s",
275                                         routerName);
276                         osrf_prefork_send_router_registration(appname, routerName, domain);
277
278                         free( routerName );
279                         free( domain );
280                 } else {
281                         osrf_prefork_parse_router_chunk(appname, routerChunk);
282                 }
283         }
284
285         jsonObjectFree( routerInfo );
286 }
287
288 /**
289         @brief Initialize a child process.
290         @param child Pointer to the prefork_child representing the new child process.
291         @return Zero if successful, or -1 if not.
292
293         Called only by child processes.  Actions:
294         - Connect to one or more cache servers
295         - Reconfigure logger, if necessary
296         - Discard parent's Jabber connection and open a new one
297         - Dynamically call an application-specific initialization routine
298         - Change the command line as reported by ps
299 */
300 static int prefork_child_init_hook(prefork_child* child) {
301
302         if(!child) return -1;
303         osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
304
305         // Connect to cache server(s).
306         osrfSystemInitCache();
307         char* resc = va_list_to_string("%s_drone", child->appname);
308
309         // If we're a source-client, tell the logger now that we're a new process.
310         char* isclient = osrfConfigGetValue(NULL, "/client");
311         if( isclient && !strcasecmp(isclient,"true") )
312                 osrfLogSetIsClient(1);
313         free(isclient);
314
315         // Remove traces of our parent's socket connection so we can have our own.
316         osrfSystemIgnoreTransportClient();
317
318         // Connect to Jabber
319         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
320                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
321                 free(resc);
322                 return -1;
323         }
324
325         free(resc);
326
327         // Dynamically call the application-specific initialization function
328         // from a previously loaded shared library.
329         if( ! osrfAppRunChildInit(child->appname) ) {
330                 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
331         } else {
332                 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
333                 return -1;
334         }
335
336         // Change the command line as reported by ps
337         set_proc_title( "OpenSRF Drone [%s]", child->appname );
338         return 0;
339 }
340
341 // Called only by a child process
342 static void prefork_child_process_request(prefork_child* child, char* data) {
343         if( !child ) return;
344
345         transport_client* client = osrfSystemGetTransportClient();
346
347         if(!client_connected(client)) {
348                 osrfSystemIgnoreTransportClient();
349                 osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
350                 if(!osrf_system_bootstrap_client(NULL, NULL)) {
351                         osrfLogError( OSRF_LOG_MARK,
352                                 "Unable to bootstrap client in prefork_child_process_request()");
353                         sleep(1);
354                         osrf_prefork_child_exit(child);
355                 }
356         }
357
358         /* construct the message from the xml */
359         transport_message* msg = new_message_from_xml( data );
360
361         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
362         if(!session) return;
363
364         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
365                 osrfAppSessionFree( session );
366                 return;
367         }
368
369         osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
370         int keepalive = child->keepalive;
371         int retval;
372         int recvd;
373         time_t start;
374         time_t end;
375
376         while(1) {
377
378                 osrfLogDebug(OSRF_LOG_MARK,
379                                 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
380                 start   = time(NULL);
381                 retval  = osrf_app_session_queue_wait(session, keepalive, &recvd);
382                 end     = time(NULL);
383
384                 osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
385
386                 if(retval) {
387                         osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
388                         break;
389                 }
390
391                 /* see if the client disconnected from us */
392                 if(session->state != OSRF_SESSION_CONNECTED)
393                         break;
394
395                 /* if no data was reveived within the timeout interval */
396                 if( !recvd && (end - start) >= keepalive ) {
397                         osrfLogInfo(OSRF_LOG_MARK,
398                                         "No request was received in %d seconds, exiting stateful session", keepalive);
399                         osrfAppSessionStatus(
400                                         session,
401                                         OSRF_STATUS_TIMEOUT,
402                                         "osrfConnectStatus",
403                                         0, "Disconnected on timeout" );
404
405                         break;
406                 }
407         }
408
409         osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
410         osrfAppSessionFree( session );
411         return;
412 }
413
414 /**
415         @brief Partially initialize a prefork_simple provided by the caller.
416         @param prefork Pointer to a a raw prefork_simple to be initialized.
417         @param client Pointer to a transport_client (connection to Jabber).
418         @param max_requests The maximum number of requests that a child process may service
419                         before terminating.
420         @param min_children Minimum number of child processes to maintain.
421         @param max_children Maximum number of child processes to maintain.
422         @return 0 if successful, or 1 if not (due to invalid parameters).
423 */
424 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
425                 int max_requests, int min_children, int max_children ) {
426
427         if( min_children > max_children ) {
428                 osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
429                                 "than max_children (%d)", min_children, max_children );
430                 return 1;
431         }
432
433         if( max_children > ABS_MAX_CHILDREN ) {
434                 osrfLogError( OSRF_LOG_MARK,  "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
435                                 max_children, ABS_MAX_CHILDREN );
436                 return 1;
437         }
438
439         osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
440                 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
441
442         /* flesh out the struct */
443         prefork->max_requests = max_requests;
444         prefork->min_children = min_children;
445         prefork->max_children = max_children;
446         prefork->fd           = 0;
447         prefork->data_to_child = 0;
448         prefork->data_to_parent = 0;
449         prefork->current_num_children = 0;
450         prefork->keepalive    = 0;
451         prefork->appname      = NULL;
452         prefork->first_child  = NULL;
453         prefork->idle_list    = NULL;
454         prefork->free_list    = NULL;
455         prefork->connection   = client;
456
457         return 0;
458 }
459
460 /**
461         @brief Spawn a new child process and put it in the idle list.
462         @param forker Pointer to the prefork_simple that will own the process.
463         @return Pointer to the new prefork_child, or not at all.
464
465         Spawn a new child process.  Create a prefork_child for it and put it in the idle list.
466
467         After forking, the parent returns a pointer to the new prefork_child.  The child
468         services its quota of requests and then terminates without returning.
469 */
470 static prefork_child* launch_child( prefork_simple* forker ) {
471
472         pid_t pid;
473         int data_fd[2];
474         int status_fd[2];
475
476         // Set up the data and status pipes
477         if( pipe(data_fd) < 0 ) { /* build the data pipe*/
478                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
479                 return NULL;
480         }
481
482         if( pipe(status_fd) < 0 ) {/* build the status pipe */
483                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
484                 close( data_fd[1] );
485                 close( data_fd[0] );
486                 return NULL;
487         }
488
489         osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
490                         data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
491
492         // Create and initialize a prefork_child for the new process
493         prefork_child* child = prefork_child_init( forker, data_fd[0],
494                         data_fd[1], status_fd[0], status_fd[1] );
495
496         if( (pid=fork()) < 0 ) {
497                 osrfLogError( OSRF_LOG_MARK, "Forking Error" );
498                 prefork_child_free( forker, child );
499                 return NULL;
500         }
501
502         // Add the new child to the head of the idle list
503         child->next = forker->idle_list;
504         forker->idle_list = child;
505
506         if( pid > 0 ) {  /* parent */
507
508                 signal(SIGCHLD, sigchld_handler);
509                 (forker->current_num_children)++;
510                 child->pid = pid;
511
512                 osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
513                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
514                         the children are currently using */
515                 return child;
516         }
517
518         else { /* child */
519
520                 osrfLogInternal( OSRF_LOG_MARK,
521                                 "I am new child with read_data_fd = %d and write_status_fd = %d",
522                                 child->read_data_fd, child->write_status_fd );
523
524                 child->pid = getpid();
525                 close( child->write_data_fd );
526                 close( child->read_status_fd );
527
528                 /* do the initing */
529                 if( prefork_child_init_hook(child) == -1 ) {
530                         osrfLogError(OSRF_LOG_MARK,
531                                 "Forker child going away because we could not connect to OpenSRF...");
532                         osrf_prefork_child_exit(child);
533                 }
534
535                 prefork_child_wait( child );      // Should exit without returning
536                 osrf_prefork_child_exit( child ); // Just to be sure
537                 return NULL;  // Unreachable, but it keeps the compiler happy
538         }
539 }
540
541 /**
542         @brief Terminate a child process.
543         @param child Pointer to the prefork_child representing the child process.
544
545         Called only by child processes.  Dynamically call an application-specific shutdown
546         function from a previously loaded shared library; then exit.
547 */
548 static void osrf_prefork_child_exit(prefork_child* child) {
549         osrfAppRunExitCode();
550         exit(0);
551 }
552
553 /**
554         @brief Launch all the child processes, putting them in the idle list.
555         @param forker Pointer to the prefork_simple that will own the children.
556
557         Called only by the parent process (in order to become a parent).
558 */
559 static void prefork_launch_children( prefork_simple* forker ) {
560         if(!forker) return;
561         int c = 0;
562         while( c++ < forker->min_children )
563                 launch_child( forker );
564 }
565
566 /**
567         @brief Signal handler for SIGCHLD: note that a child process has terminated.
568         @param sig The value of the trapped signal; always SIGCHLD.
569
570         Set a boolean to be checked later.
571 */
572 static void sigchld_handler( int sig ) {
573         signal(SIGCHLD, sigchld_handler);
574         child_dead = 1;
575 }
576
577 /**
578         @brief Replenish the collection of child processes, after one has terminated.
579         @param forker Pointer to the prefork_simple that manages the child processes.
580
581         The parent calls this function when it notices (via a signal handler) that
582         a child process has died.
583
584         Wait on the dead children so that they won't be zombies.  Spawn new ones as needed
585         to maintain at least a minimum number.
586 */
587 void reap_children( prefork_simple* forker ) {
588
589         pid_t child_pid;
590
591         // Reset our boolean so that we can detect any further terminations.
592         child_dead = 0;
593
594         // Bury the children so that they won't be zombies.  WNOHANG means that waitpid() returns
595         // immediately if there are no waitable children, instead of waiting for more to die.
596         // Ignore the return code of the child.  We don't do an autopsy.
597         while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0) {
598                 --forker->current_num_children;
599                 del_prefork_child( forker, child_pid );
600         }
601
602         // Spawn more children as needed.
603         while( forker->current_num_children < forker->min_children )
604                 launch_child( forker );
605 }
606
607 /**
608         @brief Read transport_messages and dispatch them to child processes for servicing.
609         @param forker Pointer to the prefork_simple that manages the child processes.
610
611         This is the main loop of the parent process, and once entered, does not exit.
612
613         For each usable transport_message received: look for an idle child to service it.  If
614         no idle children are available, either spawn a new one or, if we've already spawned the
615         maximum number of children, wait for one to become available.  Once a child is available
616         by whatever means, write an XML version of the input message, to a pipe designated for
617         use by that child.
618 */
619 static void prefork_run( prefork_simple* forker ) {
620
621         if( NULL == forker->idle_list )
622                 return;   // No available children, and we haven't even started yet
623
624         transport_message* cur_msg = NULL;
625
626         while(1) {
627
628                 if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
629                         osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
630                         return;
631                 }
632
633                 // Wait indefinitely for an input message
634                 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
635                 cur_msg = client_recv( forker->connection, -1 );
636
637                 if( cur_msg == NULL )
638                         continue;           // Error?  Interrupted by a signal?  Try again...
639
640                 message_prepare_xml( cur_msg );
641                 const char* msg_data = cur_msg->msg_xml;
642                 if( ! msg_data || ! *msg_data ) {
643                         osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
644                                         (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
645                         message_free( cur_msg );
646                         continue;       // Message not usable; go on to the next one.
647                 }
648
649                 int honored = 0;     /* will be set to true when we service the request */
650                 int no_recheck = 0;
651
652                 while( ! honored ) {
653
654                         if(!no_recheck)
655                                 check_children( forker, 0 );
656                         no_recheck = 0;
657
658                         osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
659
660                         prefork_child* cur_child = NULL;
661
662                         // Look for an available child in the idle list.  Since the idle list operates
663                         // as a stack, the child we get is the one that was most recently active, or
664                         // most recently spawned.  That means it's the one most likely still to be in
665                         // physical memory, and the one least likely to have to be swapped in.
666                         while( forker->idle_list ) {
667
668                                 osrfLogInfo( OSRF_LOG_MARK, "Looking for idle child" );
669                                 // Grab the prefork_child at the head of the idle list
670                                 cur_child = forker->idle_list;
671                                 forker->idle_list = cur_child->next;
672                                 cur_child->next = NULL;
673
674                                 osrfLogInternal( OSRF_LOG_MARK,
675                                                 "Searching for available child. cur_child->pid = %d", cur_child->pid );
676                                 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
677                                                 forker->current_num_children );
678
679                                 osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
680                                 osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
681                                                 cur_child->write_data_fd );
682
683                                 int written = write(cur_child->write_data_fd, msg_data, strlen(msg_data) + 1);
684                                 if( written < 0 ) {
685                                         // This child appears to be dead or unusable.  Discard it.
686                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
687                                                         errno, strerror( errno ) );
688                                         kill( cur_child->pid, SIGKILL );
689                                         del_prefork_child( forker, cur_child->pid );
690                                         continue;
691                                 }
692
693                                 add_prefork_child( forker, cur_child );  // Add it to active list
694                                 honored = 1;
695                                 break;
696                         }
697
698                         /* if none available, add a new child if we can */
699                         if( ! honored ) {
700                                 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
701
702                                 if( forker->current_num_children < forker->max_children ) {
703                                         osrfLogDebug( OSRF_LOG_MARK,  "Launching new child with current_num = %d",
704                                                         forker->current_num_children );
705
706                                         launch_child( forker );  // Put a new child into the idle list
707                                         if( forker->idle_list ) {
708
709                                                 // Take the new child from the idle list
710                                                 prefork_child* new_child = forker->idle_list;
711                                                 forker->idle_list = new_child->next;
712                                                 new_child->next = NULL;
713
714                                                 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
715                                                                 new_child->write_data_fd, new_child->pid );
716
717                                                 int written = write(
718                                                                 new_child->write_data_fd, msg_data, strlen(msg_data) + 1);
719                                                 if( written < 0 ) {
720                                                         // This child appears to be dead or unusable.  Discard it.
721                                                         osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
722                                                                                         errno, strerror( errno ) );
723                                                         kill( cur_child->pid, SIGKILL );
724                                                         del_prefork_child( forker, cur_child->pid );
725                                                 } else {
726                                                         add_prefork_child( forker, new_child );
727                                                         honored = 1;
728                                                 }
729                                         }
730
731                                 }
732                         }
733
734                         if( !honored ) {
735                                 osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
736                                 check_children( forker, 1 );
737                                 // Tell the loop not to call check_children again, since we're calling it now
738                                 no_recheck = 1;
739                         }
740
741                         if( child_dead )
742                                 reap_children(forker);
743
744                 } // end while( ! honored )
745
746                 message_free( cur_msg );
747
748         } /* end top level listen loop */
749 }
750
751
752 /* XXX Add a flag which tells select() to wait forever on children
753         in the best case, this will be faster than calling usleep(x), and
754         in the worst case it won't be slower and will do less logging...
755 */
756 /**
757         @brief See if any children have become available.
758         @param forker Pointer to the prefork_simple that owns the children.
759         @param forever Boolean: true if we should wait indefinitely.
760         @return 
761
762         Call select() for all the children in the active list.  Read each active file
763         descriptor and move the corresponding child to the idle list.
764
765         If @a forever is true, wait indefinitely for input.  Otherwise return immediately if
766         there are no active file descriptors.
767 */
768 static void check_children( prefork_simple* forker, int forever ) {
769
770         if( child_dead )
771                 reap_children( forker );
772
773         if( NULL == forker->first_child ) {
774                 // If forever is true, then we're here because we've run out of idle
775                 // processes, so there should be some active ones around.
776                 // If forever is false, then the children may all be idle, and that's okay.
777                 if( forever )
778                         osrfLogError( OSRF_LOG_MARK, "No active child processes to check" );
779                 return;
780         }
781
782         int select_ret;
783         fd_set read_set;
784         FD_ZERO(&read_set);
785         int max_fd = 0;
786         int n;
787
788         // Prepare to select() on pipes from all the active children
789         prefork_child* cur_child = forker->first_child;
790         do {
791                 if( cur_child->read_status_fd > max_fd )
792                         max_fd = cur_child->read_status_fd;
793                 FD_SET( cur_child->read_status_fd, &read_set );
794                 cur_child = cur_child->next;
795         } while( cur_child != forker->first_child );
796
797         FD_CLR(0,&read_set); /* just to be sure */
798
799         if( forever ) {
800                 osrfLogWarning(OSRF_LOG_MARK,
801                                 "We have no children available - waiting for one to show up...");
802
803                 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
804                         osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
805                                         errno, strerror( errno ) );
806                 }
807                 osrfLogInfo(OSRF_LOG_MARK,
808                                 "select() completed after waiting on children to become available");
809
810         } else {
811
812                 struct timeval tv;
813                 tv.tv_sec   = 0;
814                 tv.tv_usec  = 0;
815
816                 if( (select_ret=select( max_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
817                         osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
818                                         errno, strerror( errno ) );
819                 }
820         }
821
822         if( select_ret == 0 )
823                 return;
824
825         // Check each child in the active list.
826         // If it has responded, move it to the idle list.
827         cur_child = forker->first_child;
828         prefork_child* next_child = NULL;
829         int num_handled = 0;
830         do {
831                 next_child = cur_child->next;
832                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
833                         osrfLogDebug( OSRF_LOG_MARK,
834                                         "Server received status from a child %d", cur_child->pid );
835
836                         num_handled++;
837
838                         /* now suck off the data */
839                         char buf[64];
840                         if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
841                                 osrfLogWarning( OSRF_LOG_MARK,
842                                                 "Read error after select in child status read with errno %d: %s",
843                                                 errno, strerror( errno ) );
844                         }
845                         else {
846                                 buf[n] = '\0';
847                                 osrfLogDebug( OSRF_LOG_MARK,  "Read %d bytes from status buffer: %s", n, buf );
848                         }
849
850                         // Remove the child from the active list
851                         if( forker->first_child == cur_child ) {
852                                 if( cur_child->next == cur_child )
853                                         forker->first_child = NULL;   // only child in the active list
854                                 else
855                                         forker->first_child = cur_child->next;
856                         }
857                         cur_child->next->prev = cur_child->prev;
858                         cur_child->prev->next = cur_child->next;
859
860                         // Add it to the idle list
861                         cur_child->prev = NULL;
862                         cur_child->next = forker->idle_list;
863                         forker->idle_list = cur_child;
864                 }
865                 cur_child = next_child;
866         } while( forker->first_child && forker->first_child != next_child );
867 }
868
869 /**
870         @brief Service up a set maximum number of requests; then shut down.
871         @param child Pointer to the prefork_child representing the child process.
872
873         Called only by child process.
874
875         Enter a loop, for up to max_requests iterations.  On each iteration:
876         - Wait indefinitely for a request from the parent.
877         - Service the request.
878         - Increment a counter.  If the limit hasn't been reached, notify the parent that you
879         are available for another request.
880
881         After exiting the loop, shut down and terminate the process.
882 */
883 static void prefork_child_wait( prefork_child* child ) {
884
885         int i,n;
886         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
887         char buf[READ_BUFSIZE];
888
889         for( i = 0; i < child->max_requests; i++ ) {
890
891                 n = -1;
892                 int gotdata = 0;    // boolean; set to true if we get data
893                 clr_fl(child->read_data_fd, O_NONBLOCK );
894
895                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
896                         buf[n] = '\0';
897                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
898                         if(!gotdata) {
899                                 set_fl(child->read_data_fd, O_NONBLOCK );
900                                 gotdata = 1;
901                         }
902                         buffer_add( gbuf, buf );
903                 }
904
905                 if( errno == EAGAIN ) n = 0;
906
907                 if( errno == EPIPE ) {
908                         osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
909                         break;
910                 }
911
912                 if( n < 0 ) {
913                         osrfLogWarning( OSRF_LOG_MARK,
914                                         "Prefork child read returned error with errno %d", errno );
915                         break;
916
917                 } else if( gotdata ) {
918                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
919                         prefork_child_process_request(child, gbuf->buf);
920                         buffer_reset( gbuf );
921                 }
922
923                 if( i < child->max_requests - 1 ) {
924                         size_t msg_len = 9;
925                         ssize_t len = write(
926                                 child->write_status_fd, "available" /*less than 64 bytes*/, msg_len );
927                         if( len != msg_len ) {
928                                 osrfLogError( OSRF_LOG_MARK, 
929                                         "Drone terminating: unable to notify listener of availability: %s",
930                                         strerror( errno ));
931                                 buffer_free(gbuf);
932                                 osrf_prefork_child_exit(child);
933                         }
934                 }
935         }
936
937         buffer_free(gbuf);
938
939         osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
940                         child->max_requests, i, (long) getpid() );
941
942         osrf_prefork_child_exit(child);
943 }
944
945 /**
946         @brief Add a prefork_child to the end of the active list.
947         @param forker Pointer to the prefork_simple that owns the list.
948         @param child Pointer to the prefork_child to be added.
949 */
950 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
951
952         if( forker->first_child == NULL ) {
953                 // Simplest case: list is initially empty.
954                 forker->first_child = child;
955                 child->next = child;
956                 child->prev = child;
957         } else {
958                 // Find the last node in the circular list.
959                 prefork_child* last_child = forker->first_child->prev;
960
961                 // Insert the new child between the last and first children.
962                 last_child->next = child;
963                 child->prev      = last_child;
964                 child->next      = forker->first_child;
965                 forker->first_child->prev = child;
966         }
967 }
968
969 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
970
971         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
972
973         prefork_child* cur_child = NULL;
974         
975         // Look first in the active list
976         if( forker->first_child ) {
977                 cur_child = forker->first_child; /* current pointer */
978                 while( cur_child->pid != pid && cur_child->next != forker->first_child )
979                         cur_child = cur_child->next;
980
981                 if( cur_child->pid == pid ) {
982                         // We found the right node.  Remove it from the list.
983                         if( cur_child->next == cur_child )
984                                 forker->first_child = NULL;    // only child in the list
985                         else {
986                                 if( forker->first_child == cur_child )
987                                         forker->first_child = cur_child->next;  // Reseat forker->first_child
988
989                                 // Stitch the adjacent nodes together
990                                 cur_child->prev->next = cur_child->next;
991                                 cur_child->next->prev = cur_child->prev;
992                         }
993                 } else
994                         cur_child = NULL;  // Didn't find it in the active list
995         }
996
997         if( ! cur_child ) {
998                 // Maybe it's in the idle list.  This can happen if, for example,
999                 // a child is killed by a signal while it's between requests.
1000
1001                 prefork_child* prev = NULL;
1002                 cur_child = forker->idle_list;
1003                 while( cur_child && cur_child->pid != pid ) {
1004                         prev = cur_child;
1005                         cur_child = cur_child->next;
1006                 }
1007
1008                 if( cur_child ) {
1009                         // Detach from the list
1010                         if( prev )
1011                                 prev->next = cur_child->next;
1012                         else
1013                                 forker->idle_list = cur_child->next;
1014                 } // else we can't find it
1015         }
1016
1017         // If we found the node, destroy it.
1018         if( cur_child )
1019                 prefork_child_free( forker, cur_child );
1020 }
1021
1022 /**
1023         @brief Create and initialize a prefork_child.
1024         @param forker Pointer to the prefork_simple that will own the prefork_child.
1025         @param read_data_fd Used by child to read request from parent.
1026         @param write_data_fd Used by parent to write request to child.
1027         @param read_status_fd Used by parent to read status from child.
1028         @param write_status_fd Used by child to write status to parent.
1029         @return Pointer to the newly created prefork_child.
1030
1031         The calling code is responsible for freeing the prefork_child by calling
1032         prefork_child_free().
1033 */
1034 static prefork_child* prefork_child_init( prefork_simple* forker,
1035         int read_data_fd, int write_data_fd,
1036         int read_status_fd, int write_status_fd ) {
1037
1038         // Allocate a prefork_child -- from the free list if possible, or from
1039         // the heap if necessary.  The free list is a non-circular, singly-linked list.
1040         prefork_child* child;
1041         if( forker->free_list ) {
1042                 child = forker->free_list;
1043                 forker->free_list = child->next;
1044         } else
1045                 child = safe_malloc(sizeof(prefork_child));
1046
1047         child->pid              = 0;
1048         child->read_data_fd     = read_data_fd;
1049         child->write_data_fd    = write_data_fd;
1050         child->read_status_fd   = read_status_fd;
1051         child->write_status_fd  = write_status_fd;
1052         child->max_requests     = forker->max_requests;
1053         child->appname          = forker->appname;  // We don't make a separate copy
1054         child->keepalive        = forker->keepalive;
1055         child->next             = NULL;
1056         child->prev             = NULL;
1057
1058         return child;
1059 }
1060
1061 /**
1062         @brief Terminate all child processes and clear out a prefork_simple.
1063         @param prefork Pointer to the prefork_simple to be cleared out.
1064
1065         We do not deallocate the prefork_simple itself, just its contents.
1066 */
1067 static void prefork_clear( prefork_simple* prefork ) {
1068
1069         // Kill all the active children, and move their prefork_child nodes to the free list.
1070         while( prefork->first_child ) {
1071                 kill( prefork->first_child->pid, SIGKILL );
1072                 del_prefork_child( prefork, prefork->first_child->pid );
1073         }
1074
1075         // Kill all the idle prefork children, close their file
1076         // descriptors, and move them to the free list.
1077         prefork_child* child = prefork->idle_list;
1078         prefork->idle_list = NULL;
1079         while( child ) {
1080                 prefork_child* temp = child->next;
1081                 kill( child->pid, SIGKILL );
1082                 prefork_child_free( prefork, child );
1083                 child = temp;
1084         }
1085         //prefork->current_num_children = 0;
1086
1087         // Physically free the free list of prefork_children.
1088         child = prefork->free_list;
1089         prefork->free_list = NULL;
1090         while( child ) {
1091                 prefork_child* temp = child->next;
1092                 free( child );
1093                 child = temp;
1094         }
1095
1096         // Close the Jabber connection
1097         client_free( prefork->connection );
1098         prefork->connection = NULL;
1099
1100         // After giving the child processes a second to terminate, wait on them so that they
1101         // don't become zombies.  We don't wait indefinitely, so it's possible that some
1102         // children will survive a bit longer.
1103         sleep( 1 );
1104         while( (waitpid(-1, NULL, WNOHANG)) > 0) {
1105                 --prefork->current_num_children;
1106         }
1107
1108         free(prefork->appname);
1109         prefork->appname = NULL;
1110 }
1111
1112 /**
1113         @brief Destroy and deallocate a prefork_child.
1114         @param forker Pointer to the prefork_simple that owns the prefork_child.
1115         @param child Pointer to the prefork_child to be destroyed.
1116 */
1117 static void prefork_child_free( prefork_simple* forker, prefork_child* child ) {
1118         close( child->read_data_fd );
1119         close( child->write_data_fd );
1120         close( child->read_status_fd );
1121         close( child->write_status_fd );
1122
1123         // Stick the prefork_child in a free list for potential reuse.  This is a
1124         // non-circular, singly linked list.
1125         child->prev = NULL;
1126         child->next = forker->free_list;
1127         forker->free_list = child;
1128 }