]> git.evergreen-ils.org Git - working/Evergreen.git/blob - OpenSRF/src/libstack/osrf_prefork.c
adding utility methods
[working/Evergreen.git] / OpenSRF / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3
4 /* true if we just deleted a child.  This will allow us to make sure we're
5         not trying to use freed memory */
6 int child_dead;
7
8 int main();
9 void sigchld_handler( int sig );
10
11 int osrf_prefork_run(char* appname) {
12
13         if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
14
15         int maxr = 1000; 
16         int maxc = 10;
17         int minc = 3;
18
19         info_handler("Loading config in osrf_forker for app %s", appname);
20
21         object* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
22         object* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
23         object* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
24
25         if(!max_req) warning_handler("Max requests not defined, assuming 1000");
26         else maxr = max_req->num_value;
27
28         if(!min_children) warning_handler("Min children not defined, assuming 3");
29         else minc = min_children->num_value;
30
31         if(!max_children) warning_handler("Max children not defined, assuming 10");
32         else maxc = max_children->num_value;
33
34         free_object(max_req);
35         free_object(min_children);
36         free_object(max_children);
37         /* --------------------------------------------------- */
38
39         char* resc = va_list_to_string("%s_listener", appname);
40
41         if(!osrf_system_bootstrap_client_resc(
42                 osrf_get_bootstrap_config(), osrf_get_config_context(), resc)) 
43                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
44         free(resc);
45
46         prefork_simple* forker = prefork_simple_init(
47                 osrf_system_get_transport_client(), maxr, minc, maxc);
48
49         forker->appname = strdup(appname);
50
51         if(forker == NULL)
52                 fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
53
54         prefork_launch_children(forker);
55         
56         info_handler("Launching osrf_forker for app %s", appname);
57         prefork_run(forker);
58         
59         warning_handler("prefork_run() retuned - how??");
60         prefork_free(forker);
61         return 0;
62
63 }
64
65 void osrf_prefork_register_routers() {
66         //char* router = osrf_config_value("//%s/
67 }
68
69 void prefork_child_init_hook(prefork_child* child) {
70
71         if(!child) return;
72         info_handler("Child init hook for child %d", child->pid);
73         char* resc = va_list_to_string("%s_drone",child->appname);
74         if(!osrf_system_bootstrap_client_resc(
75                 osrf_get_bootstrap_config(), osrf_get_config_context(), resc)) 
76                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
77         free(resc);
78 }
79
80 void prefork_child_process_request(prefork_child* child, char* data) {
81         if(!child && child->connection) return;
82
83         /* construct the message from the xml */
84         debug_handler("Child %d received data from parent:\n%s\n", child->pid, data);
85         transport_message* msg = new_message_from_xml( data );
86
87         osrf_stack_transport_handler(msg, child->appname);
88
89         /*
90         transport_message* ret_msg = message_init(
91                         msg->body, msg->subject, msg->thread, msg->sender, NULL );
92
93         client_send_message(child->connection, ret_msg);
94         message_free( ret_msg );
95
96         printf("Message body size %d\n", strlen(msg->body));
97
98         printf( "Message Info\n" );
99         printf( "%s\n", msg->sender );
100         printf( "%s\n", msg->recipient );
101         printf( "%s\n", msg->thread );
102         printf( "%s\n", msg->body );
103         printf( "%s\n", msg->subject );
104         printf( "%s\n", msg->router_from );
105         printf( "%d\n", msg->broadcast );
106
107         message_free( msg );
108         */
109 }
110
111
112 prefork_simple*  prefork_simple_init( transport_client* client, 
113                 int max_requests, int min_children, int max_children ) {
114
115         if( min_children > max_children )
116                 fatal_handler( "min_children (%d) is greater "
117                                 "than max_children (%d)", min_children, max_children );
118
119         if( max_children > ABS_MAX_CHILDREN )
120                 fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
121                                 max_children, ABS_MAX_CHILDREN );
122
123         /* flesh out the struct */
124         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
125         prefork->max_requests = max_requests;
126         prefork->min_children = min_children;
127         prefork->max_children = max_children;
128         prefork->first_child = NULL;
129         prefork->connection = client;
130
131         return prefork;
132 }
133
134 prefork_child*  launch_child( prefork_simple* forker ) {
135
136         pid_t pid;
137         int data_fd[2];
138         int status_fd[2];
139
140         /* Set up the data pipes and add the child struct to the parent */
141         if( pipe(data_fd) < 0 ) /* build the data pipe*/
142                 fatal_handler( "Pipe making error" );
143
144         if( pipe(status_fd) < 0 ) /* build the status pipe */
145                 fatal_handler( "Pipe making error" );
146
147         debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
148         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
149                         data_fd[1], status_fd[0], status_fd[1] );
150
151         child->appname = strdup(forker->appname);
152
153
154         add_prefork_child( forker, child );
155
156         if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
157
158         if( pid > 0 ) {  /* parent */
159
160                 signal(SIGCHLD, sigchld_handler);
161                 (forker->current_num_children)++;
162                 child->pid = pid;
163
164                 info_handler( "Parent launched %d", pid );
165                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
166                         the children are currently using */
167                 return child;
168         }
169
170         else { /* child */
171
172                 debug_handler("I am  new child with read_data_fd = %d and write_status_fd = %d",
173                         child->read_data_fd, child->write_status_fd );
174
175                 child->pid = getpid();
176                 close( child->write_data_fd );
177                 close( child->read_status_fd );
178
179                 /* do the initing */
180                 prefork_child_init_hook(child);
181
182                 prefork_child_wait( child );
183                 exit(0); /* just to be sure */
184          }
185         return NULL;
186 }
187
188
189 void prefork_launch_children( prefork_simple* forker ) {
190         if(!forker) return;
191         int c = 0;
192         while( c++ < forker->min_children )
193                 launch_child( forker );
194 }
195
196
197 void sigchld_handler( int sig ) {
198         signal(SIGCHLD, sigchld_handler);
199         child_dead = 1;
200 }
201
202
203 void reap_children( prefork_simple* forker ) {
204
205         pid_t child_pid;
206         int status;
207
208         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
209                 del_prefork_child( forker, child_pid ); 
210
211         /* replenish */
212         while( forker->current_num_children < forker->min_children ) 
213                 launch_child( forker );
214
215         child_dead = 0;
216 }
217
218 void prefork_run(prefork_simple* forker) {
219
220         if( forker->first_child == NULL )
221                 return;
222
223         transport_message* cur_msg = NULL;
224
225         while(1) {
226
227                 if( forker->first_child == NULL ) {/* no more children */
228                         warning_handler("No more children..." );
229                         return;
230                 }
231
232                 debug_handler("Forker going into wait for data...");
233                 sleep(2);
234                 cur_msg = client_recv( forker->connection, -1 );
235
236                 fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
237
238                 if( cur_msg == NULL ) continue;
239                 
240                 int honored = 0;        /* true if we've serviced the request */
241
242                 while( ! honored ) {
243
244                         check_children( forker ); 
245
246                         debug_handler( "Server received inbound data" );
247                         int k;
248                         prefork_child* cur_child = forker->first_child;
249
250                         /* Look for an available child */
251                         for( k = 0; k < forker->current_num_children; k++ ) {
252
253                                 debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
254                                 debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
255                         
256                                 if( cur_child->available ) {
257                                         debug_handler( "sending data to %d", cur_child->pid );
258                                         message_prepare_xml( cur_msg );
259                                         char* data = cur_msg->msg_xml;
260                                         if( ! data || strlen(data) < 1 ) break;
261                                         cur_child->available = 0;
262                                         debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
263
264                                         int written = 0;
265                                         fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
266                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
267                                                 warning_handler("Write returned error %d", errno);
268                                                 cur_child = cur_child->next;
269                                                 continue;
270                                         }
271
272                                         fprintf(stderr, "Wrote %d bytes to child\n", written);
273
274                                         forker->first_child = cur_child->next;
275                                         honored = 1;
276                                         break;
277                                 } else 
278                                         cur_child = cur_child->next;
279                         } 
280
281                         /* if none available, add a new child if we can */
282                         if( ! honored ) {
283                                 debug_handler("Not enough children, attempting to add...");
284                                 if( forker->current_num_children < forker->max_children ) {
285                                         debug_handler( "Launching new child with current_num = %d",
286                                                         forker->current_num_children );
287
288                                         prefork_child* new_child = launch_child( forker );
289                                         message_prepare_xml( cur_msg );
290                                         char* data = cur_msg->msg_xml;
291                                         if( ! data || strlen(data) < 1 ) break;
292                                         new_child->available = 0;
293                                         debug_handler( "sending data to %d", new_child->pid );
294                                         debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
295                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
296                                         forker->first_child = new_child->next;
297                                         honored = 1;
298                                 }
299                         }
300
301                         if( !honored ) {
302                                 warning_handler( "No children available, sleeping and looping..." );
303                                 usleep( 50000 ); /* 50 milliseconds */
304                         }
305
306                         if( child_dead )
307                                 reap_children(forker);
308
309
310                         fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
311
312                 } // honored?
313
314         } /* top level listen loop */
315
316 }
317
318
319 void check_children( prefork_simple* forker ) {
320
321         //check_begin:
322
323         int select_ret;
324         fd_set read_set;
325         FD_ZERO(&read_set);
326         int max_fd = 0;
327         int n;
328
329         struct timeval tv;
330         tv.tv_sec       = 0;
331         tv.tv_usec      = 0;
332
333         if( child_dead )
334                 reap_children(forker);
335
336         prefork_child* cur_child = forker->first_child;
337
338         int i;
339         for( i = 0; i!= forker->current_num_children; i++ ) {
340
341                 if( cur_child->read_status_fd > max_fd )
342                         max_fd = cur_child->read_status_fd;
343                 FD_SET( cur_child->read_status_fd, &read_set );
344                 cur_child = cur_child->next;
345         }
346
347         FD_CLR(0,&read_set);/* just to be sure */
348
349         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
350                 warning_handler( "Select returned error %d on check_children", errno );
351         }
352
353         if( select_ret == 0 )
354                 return;
355
356         /* see if one of a child has told us it's done */
357         cur_child = forker->first_child;
358         int j;
359         int num_handled = 0;
360         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
361
362                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
363                         printf( "Server received status from a child %d\n", cur_child->pid );
364                         debug_handler( "Server received status from a child %d", cur_child->pid );
365
366                         num_handled++;
367
368                         /* now suck off the data */
369                         char buf[64];
370                         memset( buf, 0, 64);
371                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
372                                 warning_handler("Read error afer select in child status read with errno %d", errno);
373                         }
374
375                         debug_handler( "Read %d bytes from status buffer: %s", n, buf );
376                         cur_child->available = 1;
377                 }
378                 cur_child = cur_child->next;
379         } 
380
381 }
382
383
384 void prefork_child_wait( prefork_child* child ) {
385
386         int i,n;
387         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
388         char buf[READ_BUFSIZE];
389         memset( buf, 0, READ_BUFSIZE );
390
391         for( i = 0; i!= child->max_requests; i++ ) {
392
393                 n = -1;
394                 clr_fl(child->read_data_fd, O_NONBLOCK );
395                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
396                         buffer_add( gbuf, buf );
397                         memset( buf, 0, READ_BUFSIZE );
398
399                         fprintf(stderr, "Child read %d bytes\n", n);
400
401                         if( n == READ_BUFSIZE ) { 
402                                 fprintf(stderr, "We read READ_BUFSIZE data....\n");
403                                 /* XXX */
404                                 /* either we have exactly READ_BUFSIZE data, 
405                                         or there's more waiting that we need to grab*/
406                                 /* must set to non-block for reading more */
407                         } else {
408                                 fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
409                                 prefork_child_process_request(child, gbuf->buf);
410                                 buffer_reset( gbuf );
411                                 break;
412                         }
413                 }
414
415                 if( n < 0 ) {
416                         warning_handler( "Child read returned error with errno %d", errno );
417                         break;
418                 }
419
420                 if( i < child->max_requests - 1 ) 
421                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
422         }
423
424         buffer_free(gbuf);
425
426         debug_handler("Child exiting...[%d]", getpid() );
427
428         exit(0);
429 }
430
431
432 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
433         
434         if( forker->first_child == NULL ) {
435                 forker->first_child = child;
436                 child->next = child;
437                 return;
438         }
439
440         /* we put the child in as the last because, regardless, 
441                 we have to do the DLL splice dance, and this is the
442            simplest way */
443
444         prefork_child* start_child = forker->first_child;
445         while(1) {
446                 if( forker->first_child->next == start_child ) 
447                         break;
448                 forker->first_child = forker->first_child->next;
449         }
450
451         /* here we know that forker->first_child is the last element 
452                 in the list and start_child is the first.  Insert the
453                 new child between them*/
454
455         forker->first_child->next = child;
456         child->next = start_child;
457         return;
458 }
459
460 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
461
462         if( forker->first_child == NULL ) { return NULL; }
463         prefork_child* start_child = forker->first_child;
464         do {
465                 if( forker->first_child->pid == pid ) 
466                         return forker->first_child;
467         } while( (forker->first_child = forker->first_child->next) != start_child );
468
469         return NULL;
470 }
471
472
473 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
474
475         if( forker->first_child == NULL ) { return; }
476
477         (forker->current_num_children)--;
478         debug_handler("Deleting Child: %d", pid );
479
480         prefork_child* start_child = forker->first_child; /* starting point */
481         prefork_child* cur_child        = start_child; /* current pointer */
482         prefork_child* prev_child       = start_child; /* the trailing pointer */
483
484         /* special case where there is only one in the list */
485         if( start_child == start_child->next ) {
486                 if( start_child->pid == pid ) {
487                         forker->first_child = NULL;
488
489                         close( start_child->read_data_fd );
490                         close( start_child->write_data_fd );
491                         close( start_child->read_status_fd );
492                         close( start_child->write_status_fd );
493
494                         prefork_child_free( start_child );
495                 }
496                 return;
497         }
498
499
500         /* special case where the first item in the list needs to be removed */
501         if( start_child->pid == pid ) { 
502
503                 /* find the last one so we can remove the start_child */
504                 do { 
505                         prev_child = cur_child;
506                         cur_child = cur_child->next;
507                 }while( cur_child != start_child );
508
509                 /* now cur_child == start_child */
510                 prev_child->next = cur_child->next;
511                 forker->first_child = prev_child;
512
513                 close( cur_child->read_data_fd );
514                 close( cur_child->write_data_fd );
515                 close( cur_child->read_status_fd );
516                 close( cur_child->write_status_fd );
517
518                 prefork_child_free( cur_child );
519                 return;
520         } 
521
522         do {
523                 prev_child = cur_child;
524                 cur_child = cur_child->next;
525
526                 if( cur_child->pid == pid ) {
527                         prev_child->next = cur_child->next;
528
529                         close( cur_child->read_data_fd );
530                         close( cur_child->write_data_fd );
531                         close( cur_child->read_status_fd );
532                         close( cur_child->write_status_fd );
533
534                         prefork_child_free( cur_child );
535                         return;
536                 }
537
538         } while(cur_child != start_child);
539 }
540
541
542
543
544 prefork_child* prefork_child_init( 
545         int max_requests, int read_data_fd, int write_data_fd, 
546         int read_status_fd, int write_status_fd ) {
547
548         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
549         child->max_requests             = max_requests;
550         child->read_data_fd             = read_data_fd;
551         child->write_data_fd            = write_data_fd;
552         child->read_status_fd   = read_status_fd;
553         child->write_status_fd  = write_status_fd;
554         child->available                        = 1;
555
556         return child;
557 }
558
559
560 int prefork_free( prefork_simple* prefork ) {
561         
562         while( prefork->first_child != NULL ) {
563                 info_handler( "Killing children and sleeping 1 to reap..." );
564                 kill( 0,        SIGKILL );
565                 sleep(1);
566         }
567
568         client_free(prefork->connection);
569         free(prefork->appname);
570         free( prefork );
571         return 1;
572 }
573
574 int prefork_child_free( prefork_child* child ) { 
575         free(child->appname);
576         close(child->read_data_fd);
577         close(child->write_status_fd);
578         free( child ); 
579         return 1;
580 }
581