Young C server code added
[OpenSRF.git] / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3
4 /* true if we just deleted a child.  This will allow us to make sure we're
5         not trying to use freed memory */
6 int child_dead;
7
8 int main();
9 void sigchld_handler( int sig );
10
11 int osrf_prefork_run(char* appname) {
12
13         if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
14
15         int maxr = 1000; 
16         int maxc = 10;
17         int minc = 3;
18
19         info_handler("Loading config in osrf_forker for app %s", appname);
20
21         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
22         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
23         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
24
25         if(!max_req) warning_handler("Max requests not defined, assuming 1000");
26         else maxr = (int) jsonObjectGetNumber(max_req);
27
28         if(!min_children) warning_handler("Min children not defined, assuming 3");
29         else minc = (int) jsonObjectGetNumber(min_children);
30
31         if(!max_children) warning_handler("Max children not defined, assuming 10");
32         else maxc = (int) jsonObjectGetNumber(max_children);
33
34         jsonObjectFree(max_req);
35         jsonObjectFree(min_children);
36         jsonObjectFree(max_children);
37         /* --------------------------------------------------- */
38
39         char* resc = va_list_to_string("%s_listener", appname);
40
41         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc ))
42                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
43         free(resc);
44
45         prefork_simple* forker = prefork_simple_init(
46                 osrfSystemGetTransportClient(), maxr, minc, maxc);
47
48         forker->appname = strdup(appname);
49
50         if(forker == NULL)
51                 fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
52
53         prefork_launch_children(forker);
54
55         osrf_prefork_register_routers(appname);
56         
57         info_handler("Launching osrf_forker for app %s", appname);
58         prefork_run(forker);
59         
60         warning_handler("prefork_run() retuned - how??");
61         prefork_free(forker);
62         return 0;
63
64 }
65
66 void osrf_prefork_register_routers( char* appname ) {
67
68         osrfStringArray* arr = osrfNewStringArray(4);
69
70         int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
71         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
72         transport_client* client = osrfSystemGetTransportClient();
73
74         info_handler("router name is %s and we have %d routers to connect to", routerName, c );
75
76         while( c ) {
77                 char* domain = osrfStringArrayGetString(arr, --c);
78                 if(domain) {
79
80                         char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
81                         info_handler("Registering with router %s", jid );
82
83                         transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
84                         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
85
86                         client_send_message( client, msg );
87                         message_free( msg );
88                         free(jid);
89                 }
90         }
91
92         free(routerName);
93         osrfStringArrayFree(arr);
94 }
95
96 void prefork_child_init_hook(prefork_child* child) {
97
98         if(!child) return;
99         info_handler("Child init hook for child %d", child->pid);
100         char* resc = va_list_to_string("%s_drone",child->appname);
101         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) 
102                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
103         free(resc);
104 }
105
106 void prefork_child_process_request(prefork_child* child, char* data) {
107         if(!child && child->connection) return;
108
109         /* construct the message from the xml */
110         debug_handler("Child %d received data from parent:\n%s\n", child->pid, data);
111         transport_message* msg = new_message_from_xml( data );
112
113         osrf_stack_transport_handler(msg, child->appname);
114
115         /*
116         transport_message* ret_msg = message_init(
117                         msg->body, msg->subject, msg->thread, msg->sender, NULL );
118
119         client_send_message(child->connection, ret_msg);
120         message_free( ret_msg );
121
122         printf("Message body size %d\n", strlen(msg->body));
123
124         printf( "Message Info\n" );
125         printf( "%s\n", msg->sender );
126         printf( "%s\n", msg->recipient );
127         printf( "%s\n", msg->thread );
128         printf( "%s\n", msg->body );
129         printf( "%s\n", msg->subject );
130         printf( "%s\n", msg->router_from );
131         printf( "%d\n", msg->broadcast );
132
133         message_free( msg );
134         */
135 }
136
137
138 prefork_simple*  prefork_simple_init( transport_client* client, 
139                 int max_requests, int min_children, int max_children ) {
140
141         if( min_children > max_children )
142                 fatal_handler( "min_children (%d) is greater "
143                                 "than max_children (%d)", min_children, max_children );
144
145         if( max_children > ABS_MAX_CHILDREN )
146                 fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
147                                 max_children, ABS_MAX_CHILDREN );
148
149         /* flesh out the struct */
150         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
151         prefork->max_requests = max_requests;
152         prefork->min_children = min_children;
153         prefork->max_children = max_children;
154         prefork->first_child = NULL;
155         prefork->connection = client;
156
157         return prefork;
158 }
159
160 prefork_child*  launch_child( prefork_simple* forker ) {
161
162         pid_t pid;
163         int data_fd[2];
164         int status_fd[2];
165
166         /* Set up the data pipes and add the child struct to the parent */
167         if( pipe(data_fd) < 0 ) /* build the data pipe*/
168                 fatal_handler( "Pipe making error" );
169
170         if( pipe(status_fd) < 0 ) /* build the status pipe */
171                 fatal_handler( "Pipe making error" );
172
173         debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
174         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
175                         data_fd[1], status_fd[0], status_fd[1] );
176
177         child->appname = strdup(forker->appname);
178
179
180         add_prefork_child( forker, child );
181
182         if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
183
184         if( pid > 0 ) {  /* parent */
185
186                 signal(SIGCHLD, sigchld_handler);
187                 (forker->current_num_children)++;
188                 child->pid = pid;
189
190                 info_handler( "Parent launched %d", pid );
191                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
192                         the children are currently using */
193                 return child;
194         }
195
196         else { /* child */
197
198                 debug_handler("I am  new child with read_data_fd = %d and write_status_fd = %d",
199                         child->read_data_fd, child->write_status_fd );
200
201                 child->pid = getpid();
202                 close( child->write_data_fd );
203                 close( child->read_status_fd );
204
205                 /* do the initing */
206                 prefork_child_init_hook(child);
207
208                 prefork_child_wait( child );
209                 exit(0); /* just to be sure */
210          }
211         return NULL;
212 }
213
214
215 void prefork_launch_children( prefork_simple* forker ) {
216         if(!forker) return;
217         int c = 0;
218         while( c++ < forker->min_children )
219                 launch_child( forker );
220 }
221
222
223 void sigchld_handler( int sig ) {
224         signal(SIGCHLD, sigchld_handler);
225         child_dead = 1;
226 }
227
228
229 void reap_children( prefork_simple* forker ) {
230
231         pid_t child_pid;
232         int status;
233
234         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
235                 del_prefork_child( forker, child_pid ); 
236
237         /* replenish */
238         while( forker->current_num_children < forker->min_children ) 
239                 launch_child( forker );
240
241         child_dead = 0;
242 }
243
244 void prefork_run(prefork_simple* forker) {
245
246         if( forker->first_child == NULL )
247                 return;
248
249         transport_message* cur_msg = NULL;
250
251         while(1) {
252
253                 if( forker->first_child == NULL ) {/* no more children */
254                         warning_handler("No more children..." );
255                         return;
256                 }
257
258                 debug_handler("Forker going into wait for data...");
259                 cur_msg = client_recv( forker->connection, -1 );
260
261                 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
262
263                 if( cur_msg == NULL ) continue;
264                 
265                 int honored = 0;        /* true if we've serviced the request */
266
267                 while( ! honored ) {
268
269                         check_children( forker ); 
270
271                         debug_handler( "Server received inbound data" );
272                         int k;
273                         prefork_child* cur_child = forker->first_child;
274
275                         /* Look for an available child */
276                         for( k = 0; k < forker->current_num_children; k++ ) {
277
278                                 debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
279                                 debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
280                         
281                                 if( cur_child->available ) {
282                                         debug_handler( "sending data to %d", cur_child->pid );
283                                         message_prepare_xml( cur_msg );
284                                         char* data = cur_msg->msg_xml;
285                                         if( ! data || strlen(data) < 1 ) break;
286                                         cur_child->available = 0;
287                                         debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
288
289                                         int written = 0;
290                                         //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
291                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
292                                                 warning_handler("Write returned error %d", errno);
293                                                 cur_child = cur_child->next;
294                                                 continue;
295                                         }
296
297                                         //fprintf(stderr, "Wrote %d bytes to child\n", written);
298
299                                         forker->first_child = cur_child->next;
300                                         honored = 1;
301                                         break;
302                                 } else 
303                                         cur_child = cur_child->next;
304                         } 
305
306                         /* if none available, add a new child if we can */
307                         if( ! honored ) {
308                                 debug_handler("Not enough children, attempting to add...");
309                                 if( forker->current_num_children < forker->max_children ) {
310                                         debug_handler( "Launching new child with current_num = %d",
311                                                         forker->current_num_children );
312
313                                         prefork_child* new_child = launch_child( forker );
314                                         message_prepare_xml( cur_msg );
315                                         char* data = cur_msg->msg_xml;
316                                         if( ! data || strlen(data) < 1 ) break;
317                                         new_child->available = 0;
318                                         debug_handler( "sending data to %d", new_child->pid );
319                                         debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
320                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
321                                         forker->first_child = new_child->next;
322                                         honored = 1;
323                                 }
324                         }
325
326                         if( !honored ) {
327                                 warning_handler( "No children available, sleeping and looping..." );
328                                 usleep( 50000 ); /* 50 milliseconds */
329                         }
330
331                         if( child_dead )
332                                 reap_children(forker);
333
334
335                         //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
336
337                 } // honored?
338
339         } /* top level listen loop */
340
341 }
342
343
344 void check_children( prefork_simple* forker ) {
345
346         //check_begin:
347
348         int select_ret;
349         fd_set read_set;
350         FD_ZERO(&read_set);
351         int max_fd = 0;
352         int n;
353
354         struct timeval tv;
355         tv.tv_sec       = 0;
356         tv.tv_usec      = 0;
357
358         if( child_dead )
359                 reap_children(forker);
360
361         prefork_child* cur_child = forker->first_child;
362
363         int i;
364         for( i = 0; i!= forker->current_num_children; i++ ) {
365
366                 if( cur_child->read_status_fd > max_fd )
367                         max_fd = cur_child->read_status_fd;
368                 FD_SET( cur_child->read_status_fd, &read_set );
369                 cur_child = cur_child->next;
370         }
371
372         FD_CLR(0,&read_set);/* just to be sure */
373
374         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
375                 warning_handler( "Select returned error %d on check_children", errno );
376         }
377
378         if( select_ret == 0 )
379                 return;
380
381         /* see if one of a child has told us it's done */
382         cur_child = forker->first_child;
383         int j;
384         int num_handled = 0;
385         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
386
387                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
388                         //printf( "Server received status from a child %d\n", cur_child->pid );
389                         debug_handler( "Server received status from a child %d", cur_child->pid );
390
391                         num_handled++;
392
393                         /* now suck off the data */
394                         char buf[64];
395                         memset( buf, 0, 64);
396                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
397                                 warning_handler("Read error afer select in child status read with errno %d", errno);
398                         }
399
400                         debug_handler( "Read %d bytes from status buffer: %s", n, buf );
401                         cur_child->available = 1;
402                 }
403                 cur_child = cur_child->next;
404         } 
405
406 }
407
408
409 void prefork_child_wait( prefork_child* child ) {
410
411         int i,n;
412         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
413         char buf[READ_BUFSIZE];
414         memset( buf, 0, READ_BUFSIZE );
415
416         for( i = 0; i!= child->max_requests; i++ ) {
417
418                 n = -1;
419                 clr_fl(child->read_data_fd, O_NONBLOCK );
420                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
421                         buffer_add( gbuf, buf );
422                         memset( buf, 0, READ_BUFSIZE );
423
424                         //fprintf(stderr, "Child read %d bytes\n", n);
425
426                         if( n == READ_BUFSIZE ) { 
427                                 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
428                                 /* XXX */
429                                 /* either we have exactly READ_BUFSIZE data, 
430                                         or there's more waiting that we need to grab*/
431                                 /* must set to non-block for reading more */
432                         } else {
433                                 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
434                                 prefork_child_process_request(child, gbuf->buf);
435                                 buffer_reset( gbuf );
436                                 break;
437                         }
438                 }
439
440                 if( n < 0 ) {
441                         warning_handler( "Child read returned error with errno %d", errno );
442                         break;
443                 }
444
445                 if( i < child->max_requests - 1 ) 
446                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
447         }
448
449         buffer_free(gbuf);
450
451         debug_handler("Child exiting...[%d]", getpid() );
452
453         exit(0);
454 }
455
456
457 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
458         
459         if( forker->first_child == NULL ) {
460                 forker->first_child = child;
461                 child->next = child;
462                 return;
463         }
464
465         /* we put the child in as the last because, regardless, 
466                 we have to do the DLL splice dance, and this is the
467            simplest way */
468
469         prefork_child* start_child = forker->first_child;
470         while(1) {
471                 if( forker->first_child->next == start_child ) 
472                         break;
473                 forker->first_child = forker->first_child->next;
474         }
475
476         /* here we know that forker->first_child is the last element 
477                 in the list and start_child is the first.  Insert the
478                 new child between them*/
479
480         forker->first_child->next = child;
481         child->next = start_child;
482         return;
483 }
484
485 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
486
487         if( forker->first_child == NULL ) { return NULL; }
488         prefork_child* start_child = forker->first_child;
489         do {
490                 if( forker->first_child->pid == pid ) 
491                         return forker->first_child;
492         } while( (forker->first_child = forker->first_child->next) != start_child );
493
494         return NULL;
495 }
496
497
498 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
499
500         if( forker->first_child == NULL ) { return; }
501
502         (forker->current_num_children)--;
503         debug_handler("Deleting Child: %d", pid );
504
505         prefork_child* start_child = forker->first_child; /* starting point */
506         prefork_child* cur_child        = start_child; /* current pointer */
507         prefork_child* prev_child       = start_child; /* the trailing pointer */
508
509         /* special case where there is only one in the list */
510         if( start_child == start_child->next ) {
511                 if( start_child->pid == pid ) {
512                         forker->first_child = NULL;
513
514                         close( start_child->read_data_fd );
515                         close( start_child->write_data_fd );
516                         close( start_child->read_status_fd );
517                         close( start_child->write_status_fd );
518
519                         prefork_child_free( start_child );
520                 }
521                 return;
522         }
523
524
525         /* special case where the first item in the list needs to be removed */
526         if( start_child->pid == pid ) { 
527
528                 /* find the last one so we can remove the start_child */
529                 do { 
530                         prev_child = cur_child;
531                         cur_child = cur_child->next;
532                 }while( cur_child != start_child );
533
534                 /* now cur_child == start_child */
535                 prev_child->next = cur_child->next;
536                 forker->first_child = prev_child;
537
538                 close( cur_child->read_data_fd );
539                 close( cur_child->write_data_fd );
540                 close( cur_child->read_status_fd );
541                 close( cur_child->write_status_fd );
542
543                 prefork_child_free( cur_child );
544                 return;
545         } 
546
547         do {
548                 prev_child = cur_child;
549                 cur_child = cur_child->next;
550
551                 if( cur_child->pid == pid ) {
552                         prev_child->next = cur_child->next;
553
554                         close( cur_child->read_data_fd );
555                         close( cur_child->write_data_fd );
556                         close( cur_child->read_status_fd );
557                         close( cur_child->write_status_fd );
558
559                         prefork_child_free( cur_child );
560                         return;
561                 }
562
563         } while(cur_child != start_child);
564 }
565
566
567
568
569 prefork_child* prefork_child_init( 
570         int max_requests, int read_data_fd, int write_data_fd, 
571         int read_status_fd, int write_status_fd ) {
572
573         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
574         child->max_requests             = max_requests;
575         child->read_data_fd             = read_data_fd;
576         child->write_data_fd            = write_data_fd;
577         child->read_status_fd   = read_status_fd;
578         child->write_status_fd  = write_status_fd;
579         child->available                        = 1;
580
581         return child;
582 }
583
584
585 int prefork_free( prefork_simple* prefork ) {
586         
587         while( prefork->first_child != NULL ) {
588                 info_handler( "Killing children and sleeping 1 to reap..." );
589                 kill( 0,        SIGKILL );
590                 sleep(1);
591         }
592
593         client_free(prefork->connection);
594         free(prefork->appname);
595         free( prefork );
596         return 1;
597 }
598
599 int prefork_child_free( prefork_child* child ) { 
600         free(child->appname);
601         close(child->read_data_fd);
602         close(child->write_status_fd);
603         free( child ); 
604         return 1;
605 }
606