8e718be3cf91289b1ec5699de8878f6b34c9d093
[OpenSRF.git] / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3
4 /* true if we just deleted a child.  This will allow us to make sure we're
5         not trying to use freed memory */
6 int child_dead;
7
8 int main();
9 void sigchld_handler( int sig );
10
11 int osrf_prefork_run(char* appname) {
12
13         if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
14
15         int maxr = 1000; 
16         int maxc = 10;
17         int minc = 3;
18
19         info_handler("Loading config in osrf_forker for app %s", appname);
20
21         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
22         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
23         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
24
25         if(!max_req) warning_handler("Max requests not defined, assuming 1000");
26         else maxr = (int) jsonObjectGetNumber(max_req);
27
28         if(!min_children) warning_handler("Min children not defined, assuming 3");
29         else minc = (int) jsonObjectGetNumber(min_children);
30
31         if(!max_children) warning_handler("Max children not defined, assuming 10");
32         else maxc = (int) jsonObjectGetNumber(max_children);
33
34         jsonObjectFree(max_req);
35         jsonObjectFree(min_children);
36         jsonObjectFree(max_children);
37         /* --------------------------------------------------- */
38
39         char* resc = va_list_to_string("%s_listener", appname);
40
41         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc ))
42                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
43         free(resc);
44
45         prefork_simple* forker = prefork_simple_init(
46                 osrf_system_get_transport_client(), maxr, minc, maxc);
47
48         forker->appname = strdup(appname);
49
50         if(forker == NULL)
51                 fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
52
53         prefork_launch_children(forker);
54         
55         info_handler("Launching osrf_forker for app %s", appname);
56         prefork_run(forker);
57         
58         warning_handler("prefork_run() retuned - how??");
59         prefork_free(forker);
60         return 0;
61
62 }
63
64 void osrf_prefork_register_routers() {
65         //char* router = osrf_config_value("//%s/
66 }
67
68 void prefork_child_init_hook(prefork_child* child) {
69
70         if(!child) return;
71         info_handler("Child init hook for child %d", child->pid);
72         char* resc = va_list_to_string("%s_drone",child->appname);
73         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) 
74                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
75         free(resc);
76 }
77
78 void prefork_child_process_request(prefork_child* child, char* data) {
79         if(!child && child->connection) return;
80
81         /* construct the message from the xml */
82         debug_handler("Child %d received data from parent:\n%s\n", child->pid, data);
83         transport_message* msg = new_message_from_xml( data );
84
85         osrf_stack_transport_handler(msg, child->appname);
86
87         /*
88         transport_message* ret_msg = message_init(
89                         msg->body, msg->subject, msg->thread, msg->sender, NULL );
90
91         client_send_message(child->connection, ret_msg);
92         message_free( ret_msg );
93
94         printf("Message body size %d\n", strlen(msg->body));
95
96         printf( "Message Info\n" );
97         printf( "%s\n", msg->sender );
98         printf( "%s\n", msg->recipient );
99         printf( "%s\n", msg->thread );
100         printf( "%s\n", msg->body );
101         printf( "%s\n", msg->subject );
102         printf( "%s\n", msg->router_from );
103         printf( "%d\n", msg->broadcast );
104
105         message_free( msg );
106         */
107 }
108
109
110 prefork_simple*  prefork_simple_init( transport_client* client, 
111                 int max_requests, int min_children, int max_children ) {
112
113         if( min_children > max_children )
114                 fatal_handler( "min_children (%d) is greater "
115                                 "than max_children (%d)", min_children, max_children );
116
117         if( max_children > ABS_MAX_CHILDREN )
118                 fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
119                                 max_children, ABS_MAX_CHILDREN );
120
121         /* flesh out the struct */
122         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
123         prefork->max_requests = max_requests;
124         prefork->min_children = min_children;
125         prefork->max_children = max_children;
126         prefork->first_child = NULL;
127         prefork->connection = client;
128
129         return prefork;
130 }
131
132 prefork_child*  launch_child( prefork_simple* forker ) {
133
134         pid_t pid;
135         int data_fd[2];
136         int status_fd[2];
137
138         /* Set up the data pipes and add the child struct to the parent */
139         if( pipe(data_fd) < 0 ) /* build the data pipe*/
140                 fatal_handler( "Pipe making error" );
141
142         if( pipe(status_fd) < 0 ) /* build the status pipe */
143                 fatal_handler( "Pipe making error" );
144
145         debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
146         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
147                         data_fd[1], status_fd[0], status_fd[1] );
148
149         child->appname = strdup(forker->appname);
150
151
152         add_prefork_child( forker, child );
153
154         if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
155
156         if( pid > 0 ) {  /* parent */
157
158                 signal(SIGCHLD, sigchld_handler);
159                 (forker->current_num_children)++;
160                 child->pid = pid;
161
162                 info_handler( "Parent launched %d", pid );
163                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
164                         the children are currently using */
165                 return child;
166         }
167
168         else { /* child */
169
170                 debug_handler("I am  new child with read_data_fd = %d and write_status_fd = %d",
171                         child->read_data_fd, child->write_status_fd );
172
173                 child->pid = getpid();
174                 close( child->write_data_fd );
175                 close( child->read_status_fd );
176
177                 /* do the initing */
178                 prefork_child_init_hook(child);
179
180                 prefork_child_wait( child );
181                 exit(0); /* just to be sure */
182          }
183         return NULL;
184 }
185
186
187 void prefork_launch_children( prefork_simple* forker ) {
188         if(!forker) return;
189         int c = 0;
190         while( c++ < forker->min_children )
191                 launch_child( forker );
192 }
193
194
195 void sigchld_handler( int sig ) {
196         signal(SIGCHLD, sigchld_handler);
197         child_dead = 1;
198 }
199
200
201 void reap_children( prefork_simple* forker ) {
202
203         pid_t child_pid;
204         int status;
205
206         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
207                 del_prefork_child( forker, child_pid ); 
208
209         /* replenish */
210         while( forker->current_num_children < forker->min_children ) 
211                 launch_child( forker );
212
213         child_dead = 0;
214 }
215
216 void prefork_run(prefork_simple* forker) {
217
218         if( forker->first_child == NULL )
219                 return;
220
221         transport_message* cur_msg = NULL;
222
223         while(1) {
224
225                 if( forker->first_child == NULL ) {/* no more children */
226                         warning_handler("No more children..." );
227                         return;
228                 }
229
230                 debug_handler("Forker going into wait for data...");
231                 sleep(2);
232                 cur_msg = client_recv( forker->connection, -1 );
233
234                 fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
235
236                 if( cur_msg == NULL ) continue;
237                 
238                 int honored = 0;        /* true if we've serviced the request */
239
240                 while( ! honored ) {
241
242                         check_children( forker ); 
243
244                         debug_handler( "Server received inbound data" );
245                         int k;
246                         prefork_child* cur_child = forker->first_child;
247
248                         /* Look for an available child */
249                         for( k = 0; k < forker->current_num_children; k++ ) {
250
251                                 debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
252                                 debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
253                         
254                                 if( cur_child->available ) {
255                                         debug_handler( "sending data to %d", cur_child->pid );
256                                         message_prepare_xml( cur_msg );
257                                         char* data = cur_msg->msg_xml;
258                                         if( ! data || strlen(data) < 1 ) break;
259                                         cur_child->available = 0;
260                                         debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
261
262                                         int written = 0;
263                                         fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
264                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
265                                                 warning_handler("Write returned error %d", errno);
266                                                 cur_child = cur_child->next;
267                                                 continue;
268                                         }
269
270                                         fprintf(stderr, "Wrote %d bytes to child\n", written);
271
272                                         forker->first_child = cur_child->next;
273                                         honored = 1;
274                                         break;
275                                 } else 
276                                         cur_child = cur_child->next;
277                         } 
278
279                         /* if none available, add a new child if we can */
280                         if( ! honored ) {
281                                 debug_handler("Not enough children, attempting to add...");
282                                 if( forker->current_num_children < forker->max_children ) {
283                                         debug_handler( "Launching new child with current_num = %d",
284                                                         forker->current_num_children );
285
286                                         prefork_child* new_child = launch_child( forker );
287                                         message_prepare_xml( cur_msg );
288                                         char* data = cur_msg->msg_xml;
289                                         if( ! data || strlen(data) < 1 ) break;
290                                         new_child->available = 0;
291                                         debug_handler( "sending data to %d", new_child->pid );
292                                         debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
293                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
294                                         forker->first_child = new_child->next;
295                                         honored = 1;
296                                 }
297                         }
298
299                         if( !honored ) {
300                                 warning_handler( "No children available, sleeping and looping..." );
301                                 usleep( 50000 ); /* 50 milliseconds */
302                         }
303
304                         if( child_dead )
305                                 reap_children(forker);
306
307
308                         fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
309
310                 } // honored?
311
312         } /* top level listen loop */
313
314 }
315
316
317 void check_children( prefork_simple* forker ) {
318
319         //check_begin:
320
321         int select_ret;
322         fd_set read_set;
323         FD_ZERO(&read_set);
324         int max_fd = 0;
325         int n;
326
327         struct timeval tv;
328         tv.tv_sec       = 0;
329         tv.tv_usec      = 0;
330
331         if( child_dead )
332                 reap_children(forker);
333
334         prefork_child* cur_child = forker->first_child;
335
336         int i;
337         for( i = 0; i!= forker->current_num_children; i++ ) {
338
339                 if( cur_child->read_status_fd > max_fd )
340                         max_fd = cur_child->read_status_fd;
341                 FD_SET( cur_child->read_status_fd, &read_set );
342                 cur_child = cur_child->next;
343         }
344
345         FD_CLR(0,&read_set);/* just to be sure */
346
347         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
348                 warning_handler( "Select returned error %d on check_children", errno );
349         }
350
351         if( select_ret == 0 )
352                 return;
353
354         /* see if one of a child has told us it's done */
355         cur_child = forker->first_child;
356         int j;
357         int num_handled = 0;
358         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
359
360                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
361                         printf( "Server received status from a child %d\n", cur_child->pid );
362                         debug_handler( "Server received status from a child %d", cur_child->pid );
363
364                         num_handled++;
365
366                         /* now suck off the data */
367                         char buf[64];
368                         memset( buf, 0, 64);
369                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
370                                 warning_handler("Read error afer select in child status read with errno %d", errno);
371                         }
372
373                         debug_handler( "Read %d bytes from status buffer: %s", n, buf );
374                         cur_child->available = 1;
375                 }
376                 cur_child = cur_child->next;
377         } 
378
379 }
380
381
382 void prefork_child_wait( prefork_child* child ) {
383
384         int i,n;
385         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
386         char buf[READ_BUFSIZE];
387         memset( buf, 0, READ_BUFSIZE );
388
389         for( i = 0; i!= child->max_requests; i++ ) {
390
391                 n = -1;
392                 clr_fl(child->read_data_fd, O_NONBLOCK );
393                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
394                         buffer_add( gbuf, buf );
395                         memset( buf, 0, READ_BUFSIZE );
396
397                         fprintf(stderr, "Child read %d bytes\n", n);
398
399                         if( n == READ_BUFSIZE ) { 
400                                 fprintf(stderr, "We read READ_BUFSIZE data....\n");
401                                 /* XXX */
402                                 /* either we have exactly READ_BUFSIZE data, 
403                                         or there's more waiting that we need to grab*/
404                                 /* must set to non-block for reading more */
405                         } else {
406                                 fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
407                                 prefork_child_process_request(child, gbuf->buf);
408                                 buffer_reset( gbuf );
409                                 break;
410                         }
411                 }
412
413                 if( n < 0 ) {
414                         warning_handler( "Child read returned error with errno %d", errno );
415                         break;
416                 }
417
418                 if( i < child->max_requests - 1 ) 
419                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
420         }
421
422         buffer_free(gbuf);
423
424         debug_handler("Child exiting...[%d]", getpid() );
425
426         exit(0);
427 }
428
429
430 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
431         
432         if( forker->first_child == NULL ) {
433                 forker->first_child = child;
434                 child->next = child;
435                 return;
436         }
437
438         /* we put the child in as the last because, regardless, 
439                 we have to do the DLL splice dance, and this is the
440            simplest way */
441
442         prefork_child* start_child = forker->first_child;
443         while(1) {
444                 if( forker->first_child->next == start_child ) 
445                         break;
446                 forker->first_child = forker->first_child->next;
447         }
448
449         /* here we know that forker->first_child is the last element 
450                 in the list and start_child is the first.  Insert the
451                 new child between them*/
452
453         forker->first_child->next = child;
454         child->next = start_child;
455         return;
456 }
457
458 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
459
460         if( forker->first_child == NULL ) { return NULL; }
461         prefork_child* start_child = forker->first_child;
462         do {
463                 if( forker->first_child->pid == pid ) 
464                         return forker->first_child;
465         } while( (forker->first_child = forker->first_child->next) != start_child );
466
467         return NULL;
468 }
469
470
471 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
472
473         if( forker->first_child == NULL ) { return; }
474
475         (forker->current_num_children)--;
476         debug_handler("Deleting Child: %d", pid );
477
478         prefork_child* start_child = forker->first_child; /* starting point */
479         prefork_child* cur_child        = start_child; /* current pointer */
480         prefork_child* prev_child       = start_child; /* the trailing pointer */
481
482         /* special case where there is only one in the list */
483         if( start_child == start_child->next ) {
484                 if( start_child->pid == pid ) {
485                         forker->first_child = NULL;
486
487                         close( start_child->read_data_fd );
488                         close( start_child->write_data_fd );
489                         close( start_child->read_status_fd );
490                         close( start_child->write_status_fd );
491
492                         prefork_child_free( start_child );
493                 }
494                 return;
495         }
496
497
498         /* special case where the first item in the list needs to be removed */
499         if( start_child->pid == pid ) { 
500
501                 /* find the last one so we can remove the start_child */
502                 do { 
503                         prev_child = cur_child;
504                         cur_child = cur_child->next;
505                 }while( cur_child != start_child );
506
507                 /* now cur_child == start_child */
508                 prev_child->next = cur_child->next;
509                 forker->first_child = prev_child;
510
511                 close( cur_child->read_data_fd );
512                 close( cur_child->write_data_fd );
513                 close( cur_child->read_status_fd );
514                 close( cur_child->write_status_fd );
515
516                 prefork_child_free( cur_child );
517                 return;
518         } 
519
520         do {
521                 prev_child = cur_child;
522                 cur_child = cur_child->next;
523
524                 if( cur_child->pid == pid ) {
525                         prev_child->next = cur_child->next;
526
527                         close( cur_child->read_data_fd );
528                         close( cur_child->write_data_fd );
529                         close( cur_child->read_status_fd );
530                         close( cur_child->write_status_fd );
531
532                         prefork_child_free( cur_child );
533                         return;
534                 }
535
536         } while(cur_child != start_child);
537 }
538
539
540
541
542 prefork_child* prefork_child_init( 
543         int max_requests, int read_data_fd, int write_data_fd, 
544         int read_status_fd, int write_status_fd ) {
545
546         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
547         child->max_requests             = max_requests;
548         child->read_data_fd             = read_data_fd;
549         child->write_data_fd            = write_data_fd;
550         child->read_status_fd   = read_status_fd;
551         child->write_status_fd  = write_status_fd;
552         child->available                        = 1;
553
554         return child;
555 }
556
557
558 int prefork_free( prefork_simple* prefork ) {
559         
560         while( prefork->first_child != NULL ) {
561                 info_handler( "Killing children and sleeping 1 to reap..." );
562                 kill( 0,        SIGKILL );
563                 sleep(1);
564         }
565
566         client_free(prefork->connection);
567         free(prefork->appname);
568         free( prefork );
569         return 1;
570 }
571
572 int prefork_child_free( prefork_child* child ) { 
573         free(child->appname);
574         close(child->read_data_fd);
575         close(child->write_status_fd);
576         free( child ); 
577         return 1;
578 }
579