]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/osrf_prefork.c
e6f28b284b11c6ed99bbf9c6d244c5f0ea05fcdf
[OpenSRF.git] / src / libopensrf / osrf_prefork.c
1 /**
2         @file osrf_prefork.c
3         @brief Implementation of 
4
5         Spawn a collection of child processes, replacing them as needed.  Forward requests to them
6         and let the children do the work.
7
8         Each child processes some maximum number of requests before it terminates itself.  When a
9         child dies, either deliberately or otherwise, we can spawn another one to replace it,
10         keeping the number of children within a predefined range.
11
12         Use a singly-linked circular list to keep track of the children, forwarding requests to them
13         in an approximately round-robin fashion.
14
15         For each child, set up two pipes:
16         - One for the parent to send requests to the child.
17         - One for the child to notify the parent that it is available for another request.
18
19         The message sent to the child is an XML stanza as received from Jabber.
20
21         When the child finishes processing the request, it writes the string "available" back
22         to the parent.  Then the parent knows that it can send that child another request.
23 */
24
25 #include <signal.h>
26 #include <sys/types.h>
27 #include <sys/time.h>
28 #include <unistd.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <sys/select.h>
33 #include <sys/wait.h>
34
35 #include "opensrf/utils.h"
36 #include "opensrf/log.h"
37 #include "opensrf/transport_client.h"
38 #include "opensrf/osrf_stack.h"
39 #include "opensrf/osrf_settings.h"
40 #include "opensrf/osrf_application.h"
41
42 #define READ_BUFSIZE 1024
43 #define ABS_MAX_CHILDREN 256
44
45 typedef struct {
46         int max_requests;     /**< How many requests a child processes before terminating. */
47         int min_children;     /**< Minimum number of children to maintain. */
48         int max_children;     /**< Maximum number of children to maintain. */
49         int fd;               /**< Unused. */
50         int data_to_child;    /**< Unused. */
51         int data_to_parent;   /**< Unused. */
52         int current_num_children;   /**< How many children are currently on the list. */
53         int keepalive;        /**< Keepalive time for stateful sessions. */
54         char* appname;        /**< Name of the application.
55         /** Points to a circular linked list of children */
56         struct prefork_child_struct* first_child;
57         transport_client* connection;  /**< Connection to Jabber */
58 } prefork_simple;
59
60 struct prefork_child_struct {
61         pid_t pid;            /**< Process ID of the child */
62         int read_data_fd;     /**< Child uses to read request */
63         int write_data_fd;    /**< Parent uses to write request */
64         int read_status_fd;   /**< Parent reads to see if child is available */
65         int write_status_fd;  /**< Child uses to notify parent when it's available again */
66         int available;        /**< Boolean; true when the child is between requests */
67         int max_requests;     /**< How many requests a child can process before terminating */
68         char* appname;        /**< Name of the application */
69         int keepalive;        /**< Keepalive time for stateful sessions. */
70         struct prefork_child_struct* next;  /**< Linkage pointer for linked list */
71 };
72
73 typedef struct prefork_child_struct prefork_child;
74
75 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
76         int max_requests, int min_children, int max_children );
77 static prefork_child* launch_child( prefork_simple* forker );
78 static void prefork_launch_children( prefork_simple* forker );
79 static void prefork_run(prefork_simple* forker);
80 static void add_prefork_child( prefork_simple* forker, prefork_child* child );
81
82 static void del_prefork_child( prefork_simple* forker, pid_t pid );
83 static void check_children( prefork_simple* forker, int forever );
84 static void prefork_child_process_request(prefork_child*, char* data);
85 static int prefork_child_init_hook(prefork_child*);
86 static prefork_child* prefork_child_init(
87                 int max_requests, int read_data_fd, int write_data_fd,
88                 int read_status_fd, int write_status_fd );
89
90 /* listens on the 'data_to_child' fd and wait for incoming data */
91 static void prefork_child_wait( prefork_child* child );
92 static void prefork_clear( prefork_simple* );
93 static void prefork_child_free( prefork_child* );
94 static void osrf_prefork_register_routers( const char* appname );
95 static void osrf_prefork_child_exit( prefork_child* );
96
97 /** Boolean.  Set to true by a signal handler when it traps SIGCHLD. */
98 static volatile sig_atomic_t child_dead;
99
100 static void sigchld_handler( int sig );
101
102 int osrf_prefork_run(const char* appname) {
103
104         if(!appname) {
105                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
106                 return -1;
107         }
108
109         set_proc_title( "OpenSRF Listener [%s]", appname );
110
111         int maxr = 1000;
112         int maxc = 10;
113         int minc = 3;
114         int kalive = 5;
115
116         osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
117
118         char* max_req      = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
119         char* min_children = osrf_settings_host_value("/apps/%s/unix_config/min_children", appname);
120         char* max_children = osrf_settings_host_value("/apps/%s/unix_config/max_children", appname);
121         char* keepalive    = osrf_settings_host_value("/apps/%s/keepalive", appname);
122
123         if(!keepalive) osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive);
124         else kalive = atoi(keepalive);
125
126         if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr);
127         else maxr = atoi(max_req);
128
129         if(!min_children) osrfLogWarning( OSRF_LOG_MARK,
130                         "Min children not defined, assuming %d", minc);
131         else minc = atoi(min_children);
132
133         if(!max_children) osrfLogWarning( OSRF_LOG_MARK,
134                         "Max children not defined, assuming %d", maxc);
135         else maxc = atoi(max_children);
136
137         free(keepalive);
138         free(max_req);
139         free(min_children);
140         free(max_children);
141         /* --------------------------------------------------- */
142
143         char* resc = va_list_to_string("%s_listener", appname);
144
145         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
146                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
147                 free(resc);
148                 return -1;
149         }
150
151         free(resc);
152
153         prefork_simple forker;
154
155         if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc ) ) {
156                 osrfLogError( OSRF_LOG_MARK,
157                                 "osrf_prefork_run() failed to create prefork_simple object" );
158                 return -1;
159         }
160
161         // Finish initializing the prefork_simple
162         forker.appname   = strdup(appname);
163         forker.keepalive = kalive;
164
165         // Spawn the children
166         prefork_launch_children( &forker );
167
168         // Tell the router that you're open for business
169         osrf_prefork_register_routers(appname);
170
171         // Sit back and let the requests roll in
172         osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
173         prefork_run( &forker );
174
175         osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??");
176         prefork_clear( &forker );
177         return 0;
178 }
179
180 /**
181         @brief Register the application with a specified router.
182         @param appname Name of the application.
183         @param routerName Name of the router.
184         @param routerDomain Domain of the router.
185
186         Tell the router that you're open for business so that it can route requests to you.
187 */
188 static void osrf_prefork_send_router_registration(
189                 const char* appname, const char* routerName, const char* routerDomain ) {
190         // Get a pointer to the global transport_client
191         transport_client* client = osrfSystemGetTransportClient();
192
193         // Construct the Jabber address of the router
194         char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
195         osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
196
197         // Create the registration message, and send it
198         transport_message* msg = message_init( "registering", NULL, NULL, jid, NULL );
199         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
200         client_send_message( client, msg );
201
202         // Clean up
203         message_free( msg );
204         free(jid);
205 }
206
207 /** parses a single "complex" router configuration chunk */
208 static void osrf_prefork_parse_router_chunk(const char* appname, jsonObject* routerChunk) {
209
210         const char* routerName = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "name"));
211         const char* domain = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "domain"));
212         const jsonObject* services = jsonObjectGetKeyConst(routerChunk, "services");
213         osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s",
214                         routerName, domain);
215
216         if( services && services->type == JSON_HASH ) {
217                 osrfLogDebug(OSRF_LOG_MARK, "investigating router information...");
218                 const jsonObject* service_obj = jsonObjectGetKeyConst(services, "service");
219                 if( !service_obj )
220                         ;    // do nothing (shouldn't happen)
221                 else if( JSON_ARRAY == service_obj->type ) {
222                         int j;
223                         for(j = 0; j < service_obj->size; j++ ) {
224                                 const char* service = jsonObjectGetString(jsonObjectGetIndex(service_obj, j));
225                                 if( service && !strcmp( appname, service ))
226                                         osrf_prefork_send_router_registration(appname, routerName, domain);
227                         }
228                 }
229                 else if( JSON_STRING == service_obj->type ) {
230                         if( !strcmp(appname, jsonObjectGetString( service_obj )) )
231                                 osrf_prefork_send_router_registration(appname, routerName, domain);
232                 }
233         } else {
234                 osrf_prefork_send_router_registration(appname, routerName, domain);
235         }
236 }
237
238 static void osrf_prefork_register_routers( const char* appname ) {
239
240         jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
241
242         int i;
243         for(i = 0; i < routerInfo->size; i++) {
244                 jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
245
246                 if(routerChunk->type == JSON_STRING) {
247                         /* this accomodates simple router configs */
248                         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
249                         char* domain = osrfConfigGetValue(NULL, "/routers/router");
250                         osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s",
251                                         routerName);
252                         osrf_prefork_send_router_registration(appname, routerName, domain);
253
254                 } else {
255                         osrf_prefork_parse_router_chunk(appname, routerChunk);
256                 }
257         }
258 }
259
260 static int prefork_child_init_hook(prefork_child* child) {
261
262         if(!child) return -1;
263         osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
264
265         osrfSystemInitCache();
266         char* resc = va_list_to_string("%s_drone", child->appname);
267
268         /* if we're a source-client, tell the logger now that we're a new process*/
269         char* isclient = osrfConfigGetValue(NULL, "/client");
270         if( isclient && !strcasecmp(isclient,"true") )
271                 osrfLogSetIsClient(1);
272         free(isclient);
273
274         /* we want to remove traces of our parents socket connection
275         * so we can have our own */
276         osrfSystemIgnoreTransportClient();
277
278         if(!osrfSystemBootstrapClientResc( NULL, NULL, resc)) {
279                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
280                 free(resc);
281                 return -1;
282         }
283
284         free(resc);
285
286         if( ! osrfAppRunChildInit(child->appname) ) {
287                 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
288         } else {
289                 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
290                 return -1;
291         }
292
293         set_proc_title( "OpenSRF Drone [%s]", child->appname );
294         return 0;
295 }
296
297 static void prefork_child_process_request(prefork_child* child, char* data) {
298         if( !child ) return;
299
300         transport_client* client = osrfSystemGetTransportClient();
301
302         if(!client_connected(client)) {
303                 osrfSystemIgnoreTransportClient();
304                 osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
305                 if(!osrf_system_bootstrap_client(NULL, NULL)) {
306                         osrfLogError( OSRF_LOG_MARK,
307                                 "Unable to bootstrap client in prefork_child_process_request()");
308                         sleep(1);
309                         osrf_prefork_child_exit(child);
310                 }
311         }
312
313         /* construct the message from the xml */
314         transport_message* msg = new_message_from_xml( data );
315
316         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
317         if(!session) return;
318
319         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
320                 osrfAppSessionFree( session );
321                 return;
322         }
323
324         osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
325         int keepalive = child->keepalive;
326         int retval;
327         int recvd;
328         time_t start;
329         time_t end;
330
331         while(1) {
332
333                 osrfLogDebug(OSRF_LOG_MARK,
334                                 "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
335                 start   = time(NULL);
336                 retval  = osrf_app_session_queue_wait(session, keepalive, &recvd);
337                 end     = time(NULL);
338
339                 osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
340
341                 if(retval) {
342                         osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
343                         break;
344                 }
345
346                 /* see if the client disconnected from us */
347                 if(session->state != OSRF_SESSION_CONNECTED)
348                         break;
349
350                 /* if no data was reveived within the timeout interval */
351                 if( !recvd && (end - start) >= keepalive ) {
352                         osrfLogInfo(OSRF_LOG_MARK,
353                                         "No request was received in %d seconds, exiting stateful session", keepalive);
354                         osrfAppSessionStatus(
355                                         session,
356                                         OSRF_STATUS_TIMEOUT,
357                                         "osrfConnectStatus",
358                                         0, "Disconnected on timeout" );
359
360                         break;
361                 }
362         }
363
364         osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
365         osrfAppSessionFree( session );
366         return;
367 }
368
369
370 /**
371         @brief Partially initialize a prefork_simple provided by the caller.
372         @param prefork Pointer to a a raw prefork_simple to be initialized.
373         @param client Pointer to a transport_client (connection to Jabber).
374         @param max_requests
375         @param min_children Minimum number of child processes to maintain.
376         @param max_children Maximum number of child processes to maintain.
377         @return 0 if successful, or 1 if not (due to invalid parameters).
378 */
379 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
380                  int max_requests, int min_children, int max_children ) {
381
382         if( min_children > max_children ) {
383                 osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
384                                 "than max_children (%d)", min_children, max_children );
385                 return 1;
386         }
387
388         if( max_children > ABS_MAX_CHILDREN ) {
389                 osrfLogError( OSRF_LOG_MARK,  "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
390                                 max_children, ABS_MAX_CHILDREN );
391                 return 1;
392         }
393
394         osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
395                 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
396
397         /* flesh out the struct */
398         //prefork_simple* prefork = safe_malloc(sizeof(prefork_simple));
399         prefork->max_requests = max_requests;
400         prefork->min_children = min_children;
401         prefork->max_children = max_children;
402         prefork->fd           = 0;
403         prefork->data_to_child = 0;
404         prefork->data_to_parent = 0;
405         prefork->current_num_children = 0;
406         prefork->keepalive    = 0;
407         prefork->appname      = NULL;
408         prefork->first_child  = NULL;
409         prefork->connection   = client;
410
411         return 0;
412 }
413
414 static prefork_child* launch_child( prefork_simple* forker ) {
415
416         pid_t pid;
417         int data_fd[2];
418         int status_fd[2];
419
420         /* Set up the data pipes and add the child struct to the parent */
421         if( pipe(data_fd) < 0 ) { /* build the data pipe*/
422                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
423                 return NULL;
424         }
425
426         if( pipe(status_fd) < 0 ) {/* build the status pipe */
427                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
428                 return NULL;
429         }
430
431         osrfLogInternal( OSRF_LOG_MARK,  "Pipes: %d %d %d %d",
432                         data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
433         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
434                         data_fd[1], status_fd[0], status_fd[1] );
435
436         child->appname = strdup(forker->appname);
437         child->keepalive = forker->keepalive;
438
439
440         add_prefork_child( forker, child );
441
442         if( (pid=fork()) < 0 ) {
443                 osrfLogError( OSRF_LOG_MARK,  "Forking Error" );
444                 return NULL;
445         }
446
447         if( pid > 0 ) {  /* parent */
448
449                 signal(SIGCHLD, sigchld_handler);
450                 (forker->current_num_children)++;
451                 child->pid = pid;
452
453                 osrfLogDebug( OSRF_LOG_MARK,  "Parent launched %d", pid );
454                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
455                         the children are currently using */
456                 return child;
457         }
458
459         else { /* child */
460
461                 osrfLogInternal( OSRF_LOG_MARK,
462                                 "I am  new child with read_data_fd = %d and write_status_fd = %d",
463                                 child->read_data_fd, child->write_status_fd );
464
465                 child->pid = getpid();
466                 close( child->write_data_fd );
467                 close( child->read_status_fd );
468
469                 /* do the initing */
470                 if( prefork_child_init_hook(child) == -1 ) {
471                         osrfLogError(OSRF_LOG_MARK,
472                                 "Forker child going away because we could not connect to OpenSRF...");
473                         osrf_prefork_child_exit(child);
474                 }
475
476                 prefork_child_wait( child );
477                 osrf_prefork_child_exit(child); /* just to be sure */
478         }
479         return NULL;
480 }
481
482 static void osrf_prefork_child_exit(prefork_child* child) {
483         osrfAppRunExitCode();
484         exit(0);
485 }
486
487 static void prefork_launch_children( prefork_simple* forker ) {
488         if(!forker) return;
489         int c = 0;
490         while( c++ < forker->min_children )
491                 launch_child( forker );
492 }
493
494
495 /**
496         @brief Signal handler for SIGCHLD: note that a child process has terminated.
497         @param sig The value of the trapped signal; always SIGCHLD.
498
499         Set a boolean to be checked later.
500 */
501 static void sigchld_handler( int sig ) {
502         signal(SIGCHLD, sigchld_handler);
503         child_dead = 1;
504 }
505
506
507 /**
508         @brief Replenish the collection of child processes, after one has terminated.
509         @param forker Pointer to the prefork_simple that manages the child processes.
510
511         This function is called when we notice (via a signal handler) that a child
512         process has died.
513
514         Spawn a new child process to replace each of the terminated ones.
515 */
516 void reap_children( prefork_simple* forker ) {
517
518         pid_t child_pid;
519
520         // Reset our boolean so that we can detect any further terminations.
521         child_dead = 0;
522
523         // Bury the children so that they won't be zombies.  WNOHANG means that waitpid() returns
524         // immediately if there are no waitable children, instead of waiting for more to die.
525         // Ignore the return code of the child.  We don't do an autopsy.
526         while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0)
527                 del_prefork_child( forker, child_pid );
528
529         /* Spawn more children as needed to maintain a minimum brood. */
530         while( forker->current_num_children < forker->min_children )
531                 launch_child( forker );
532 }
533
534 static void prefork_run(prefork_simple* forker) {
535
536         if( forker->first_child == NULL )
537                 return;
538
539         transport_message* cur_msg = NULL;
540
541
542         while(1) {
543
544                 if( forker->first_child == NULL ) {/* no more children */
545                         osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
546                         return;
547                 }
548
549                 // Wait indefinitely for an input message
550                 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
551                 cur_msg = client_recv( forker->connection, -1 );
552
553                 if( cur_msg == NULL )
554                         continue;           // Error?  Interrupted by a signal?
555
556                 int honored = 0;     /* will be set to true when we service the request */
557                 int no_recheck = 0;
558
559                 while( ! honored ) {
560
561                         if(!no_recheck)
562                                 check_children( forker, 0 );
563                         no_recheck = 0;
564
565                         osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
566                         int k;
567                         prefork_child* cur_child = forker->first_child;
568
569                         /* Look for an available child */
570                         for( k = 0; k < forker->current_num_children; k++ ) {
571
572                                 osrfLogInternal( OSRF_LOG_MARK,
573                                                 "Searching for available child. cur_child->pid = %d", cur_child->pid );
574                                 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d",
575                                                 forker->current_num_children, k);
576
577                                 if( cur_child->available ) {
578                                         osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
579
580                                         message_prepare_xml( cur_msg );
581                                         char* data = cur_msg->msg_xml;
582                                         if( ! data || strlen(data) < 1 )
583                                                 break;
584
585                                         cur_child->available = 0;
586                                         osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
587                                                         cur_child->write_data_fd );
588
589                                         int written = 0;
590                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
591                                                 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
592                                                 cur_child = cur_child->next;
593                                                 continue;
594                                         }
595
596                                         forker->first_child = cur_child->next;
597                                         honored = 1;
598                                         break;
599                                 } else
600                                         cur_child = cur_child->next;
601                         }
602
603                         /* if none available, add a new child if we can */
604                         if( ! honored ) {
605                                 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
606
607                                 if( forker->current_num_children < forker->max_children ) {
608                                         osrfLogDebug( OSRF_LOG_MARK,  "Launching new child with current_num = %d",
609                                                         forker->current_num_children );
610
611                                         prefork_child* new_child = launch_child( forker );
612                                         if( new_child ) {
613
614                                                 message_prepare_xml( cur_msg );
615                                                 char* data = cur_msg->msg_xml;
616
617                                                 if( data ) {
618                                                         int len = strlen(data);
619
620                                                         if( len > 0 ) {
621                                                                 new_child->available = 0;
622                                                                 osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
623                                                                                 new_child->write_data_fd, new_child->pid );
624
625                                                                 if( write( new_child->write_data_fd, data, len + 1 ) >= 0 ) {
626                                                                         forker->first_child = new_child->next;
627                                                                         honored = 1;
628                                                                 }
629                                                         }
630                                                 }
631                                         }
632
633                                 }
634                         }
635
636                         if( !honored ) {
637                                 osrfLogWarning( OSRF_LOG_MARK,  "No children available, waiting...");
638
639                                 check_children( forker, 1 );  /* non-poll version */
640                                 /* tell the loop not to call check_children again, since we're calling it now */
641                                 no_recheck = 1;
642                         }
643
644                         if( child_dead )
645                                 reap_children(forker);
646
647                 } // honored?
648
649                 message_free( cur_msg );
650
651         } /* end top level listen loop */
652
653 }
654
655
656 /** XXX Add a flag which tells select() to wait forever on children
657         in the best case, this will be faster than calling usleep(x), and
658         in the worst case it won't be slower and will do less logging...
659 */
660 static void check_children( prefork_simple* forker, int forever ) {
661
662         //check_begin:
663
664         int select_ret;
665         fd_set read_set;
666         FD_ZERO(&read_set);
667         int max_fd = 0;
668         int n;
669
670         if( child_dead )
671                 reap_children(forker);
672
673         prefork_child* cur_child = forker->first_child;
674
675         int i;
676         for( i = 0; i!= forker->current_num_children; i++ ) {
677
678                 if( cur_child->read_status_fd > max_fd )
679                         max_fd = cur_child->read_status_fd;
680                 FD_SET( cur_child->read_status_fd, &read_set );
681                 cur_child = cur_child->next;
682         }
683
684         FD_CLR(0,&read_set); /* just to be sure */
685
686         if( forever ) {
687                 osrfLogWarning(OSRF_LOG_MARK,
688                                 "We have no children available - waiting for one to show up...");
689
690                 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, NULL)) == -1 ) {
691                         osrfLogWarning( OSRF_LOG_MARK,  "Select returned error %d on check_children", errno );
692                 }
693                 osrfLogInfo(OSRF_LOG_MARK,
694                                 "select() completed after waiting on children to become available");
695
696         } else {
697
698                 struct timeval tv;
699                 tv.tv_sec   = 0;
700                 tv.tv_usec  = 0;
701
702                 if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
703                         osrfLogWarning( OSRF_LOG_MARK,  "Select returned error %d on check_children", errno );
704                 }
705         }
706
707         if( select_ret == 0 )
708                 return;
709
710         /* see if one of a child has told us it's done */
711         cur_child = forker->first_child;
712         int j;
713         int num_handled = 0;
714         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
715
716                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
717                         //printf( "Server received status from a child %d\n", cur_child->pid );
718                         osrfLogDebug( OSRF_LOG_MARK,
719                                         "Server received status from a child %d", cur_child->pid );
720
721                         num_handled++;
722
723                         /* now suck off the data */
724                         char buf[64];
725                         if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
726                                 osrfLogWarning( OSRF_LOG_MARK,
727                                                 "Read error after select in child status read with errno %d", errno);
728                         }
729                         else {
730                                 buf[n] = '\0';
731                                 osrfLogDebug( OSRF_LOG_MARK,  "Read %d bytes from status buffer: %s", n, buf );
732                         }
733                         cur_child->available = 1;
734                 }
735                 cur_child = cur_child->next;
736         }
737
738 }
739
740
741 static void prefork_child_wait( prefork_child* child ) {
742
743         int i,n;
744         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
745         char buf[READ_BUFSIZE];
746
747         for( i = 0; i < child->max_requests; i++ ) {
748
749                 n = -1;
750                 int gotdata = 0;    // boolean; set to true if we get data
751                 clr_fl(child->read_data_fd, O_NONBLOCK );
752
753                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
754                         buf[n] = '\0';
755                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
756                         if(!gotdata) {
757                                 set_fl(child->read_data_fd, O_NONBLOCK );
758                                 gotdata = 1;
759                         }
760                         buffer_add( gbuf, buf );
761                 }
762
763                 if( errno == EAGAIN ) n = 0;
764
765                 if( errno == EPIPE ) {
766                         osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
767                         break;
768                 }
769
770                 if( n < 0 ) {
771                         osrfLogWarning( OSRF_LOG_MARK,
772                                         "Prefork child read returned error with errno %d", errno );
773                         break;
774
775                 } else if( gotdata ) {
776                         osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
777                         prefork_child_process_request(child, gbuf->buf);
778                         buffer_reset( gbuf );
779                 }
780
781                 if( i < child->max_requests - 1 )
782                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
783         }
784
785         buffer_free(gbuf);
786
787         osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
788                         child->max_requests, i, (long) getpid() );
789
790         osrf_prefork_child_exit(child); /* just to be sure */
791 }
792
793 /**
794         @brief Add a prefork_child to the list.
795         @param forker Pointer to the prefork_simple that owns the list.
796         @param child Pointer to the prefork_child to be added.
797 */
798 static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
799
800         if( forker->first_child == NULL ) {
801                 // Simplest case: list is initially empty.
802                 forker->first_child = child;
803                 child->next = child;
804                 return;
805         }
806
807         // Reposition forker->first_child to the last node by walking around the circle.
808         prefork_child* start_child = forker->first_child;
809         while(1) {
810                 if( forker->first_child->next == start_child )
811                         break;
812                 forker->first_child = forker->first_child->next;
813         }
814
815         // Insert the new child after forker->first_child.
816         forker->first_child->next = child;
817         child->next = start_child;
818         return;
819
820         // What had been the last node in the list is now the first node,
821         // and the new node is second.
822 }
823
824 /**
825         @brief Remove a prefork_child from the child list.
826         @param forker Pointer to the prefork_simple that owns the child.
827         @param pid Process ID of the child to be removed.
828
829         Besides removing the node from the list, we also close its file descriptors.
830 */
831 static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
832
833         if( forker->first_child == NULL ) { return; }
834
835         (forker->current_num_children)--;
836         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
837
838         prefork_child* start_child = forker->first_child; /* starting point */
839         prefork_child* cur_child   = start_child; /* current pointer */
840         prefork_child* prev_child  = start_child; /* the trailing pointer */
841
842         /* special case where there is only one in the list */
843         if( start_child == start_child->next ) {
844                 if( start_child->pid == pid ) {
845                         forker->first_child = NULL;
846                         prefork_child_free( start_child );
847                 }
848                 return;
849         }
850
851
852         /* special case where the first item in the list needs to be removed */
853         if( start_child->pid == pid ) {
854
855                 /* find the last one so we can remove the start_child */
856                 do {
857                         prev_child = cur_child;
858                         cur_child = cur_child->next;
859                 } while( cur_child != start_child );
860
861                 /* now cur_child == start_child */
862                 prev_child->next = cur_child->next;
863                 forker->first_child = prev_child;
864                 prefork_child_free( cur_child );
865                 return;
866         }
867
868         do {
869                 prev_child = cur_child;
870                 cur_child = cur_child->next;
871
872                 if( cur_child->pid == pid ) {
873                         prev_child->next = cur_child->next;
874                         prefork_child_free( cur_child );
875                         return;
876                 }
877
878         } while(cur_child != start_child);
879 }
880
881 /**
882         @brief Create and initialize a prefork_child.
883         @param max_requests How many requests to service before terminating.
884         @param read_data_fd Used by child to read request from parent.
885         @param write_data_fd Used by parent to write request to child.
886         @param read_status_fd Used by parent to read status from child.
887         @param write_status_fd Used by child to write status to parent.
888         @return Pointer to the newly created prefork_child.
889
890         The calling code is responsible for freeing the prefork_child by calling
891         prefork_child_free().
892 */
893 static prefork_child* prefork_child_init(
894         int max_requests, int read_data_fd, int write_data_fd,
895         int read_status_fd, int write_status_fd ) {
896
897         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
898         child->pid              = 0;
899         child->max_requests     = max_requests;
900         child->read_data_fd     = read_data_fd;
901         child->write_data_fd    = write_data_fd;
902         child->read_status_fd   = read_status_fd;
903         child->write_status_fd  = write_status_fd;
904         child->available        = 1;
905         child->appname          = NULL;
906         child->keepalive        = 0;
907         child->next             = NULL;
908
909         return child;
910 }
911
912
913 /**
914         @brief Terminate all child processes and clear out a prefork_simple.
915         @param prefork Pointer to the prefork_simple to be cleared out.
916
917         We do not deallocate the prefork_simple itself, just its contents.
918 */
919 static void prefork_clear( prefork_simple* prefork ) {
920
921         // Kill all the child processes with a single call, not by killing each one separately.
922         // Implication: we can't have more than one prefork_simple outstanding, because
923         // destroying one would kill the children of both.
924         while( prefork->first_child != NULL ) {
925                 osrfLogInfo( OSRF_LOG_MARK, "Killing child processes ..." );
926                 kill( 0, SIGKILL );
927         }
928
929         // Bury the children so that they won't be zombies.  WNOHANG means that waitpid() returns
930         // immediately if there are no waitable children, instead of waiting for more to die.
931         // Ignore the return code of the child.  We don't do an autopsy.
932         pid_t child_pid;
933         while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0)
934                 del_prefork_child( prefork, child_pid );
935
936         client_free(prefork->connection);    // Close the Jabber connection
937         free(prefork->appname);
938 }
939
940 /**
941         @brief Destroy and deallocate a prefork_child.
942         @param child Pointer to the prefork_child to be destroyed.
943 */
944 static void prefork_child_free( prefork_child* child ) {
945         free(child->appname);
946         close( child->read_data_fd );
947         close( child->write_data_fd );
948         close( child->read_status_fd );
949         close( child->write_status_fd );
950         free( child );
951 }