c467ae97da7d3d1d50870bc386fecb5ef77e1770
[Evergreen.git] / OpenSRF / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3 #include "osrf_app_session.h"
4
5 /* true if we just deleted a child.  This will allow us to make sure we're
6         not trying to use freed memory */
7 int child_dead;
8
9 int main();
10 void sigchld_handler( int sig );
11
12 int osrf_prefork_run(char* appname) {
13
14         if(!appname) {
15                 osrfLogError("osrf_prefork_run requires an appname to run!");
16                 return -1;
17         }
18
19         set_proc_title( "OpenSRF Listener [%s]", appname );
20
21         int maxr = 1000; 
22         int maxc = 10;
23         int minc = 3;
24
25         osrfLogInfo("Loading config in osrf_forker for app %s", appname);
26
27         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
28         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
29         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
30
31         
32         if(!max_req) osrfLogWarning("Max requests not defined, assuming 1000");
33         else maxr = (int) jsonObjectGetNumber(max_req);
34
35         if(!min_children) osrfLogWarning("Min children not defined, assuming 3");
36         else minc = (int) jsonObjectGetNumber(min_children);
37
38         if(!max_children) osrfLogWarning("Max children not defined, assuming 10");
39         else maxc = (int) jsonObjectGetNumber(max_children);
40
41         jsonObjectFree(max_req);
42         jsonObjectFree(min_children);
43         jsonObjectFree(max_children);
44         /* --------------------------------------------------- */
45
46         char* resc = va_list_to_string("%s_listener", appname);
47
48         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc )) {
49                 osrfLogError("Unable to bootstrap client for osrf_prefork_run()");
50                 free(resc);
51                 return -1;
52         }
53
54         free(resc);
55
56         prefork_simple* forker = prefork_simple_init(
57                 osrfSystemGetTransportClient(), maxr, minc, maxc);
58
59         forker->appname = strdup(appname);
60
61         if(forker == NULL) {
62                 osrfLogError("osrf_prefork_run() failed to create prefork_simple object");
63                 return -1;
64         }
65
66         prefork_launch_children(forker);
67
68         osrf_prefork_register_routers(appname);
69         
70         osrfLogInfo("Launching osrf_forker for app %s", appname);
71         prefork_run(forker);
72         
73         osrfLogWarning("prefork_run() retuned - how??");
74         prefork_free(forker);
75         return 0;
76
77 }
78
79 void osrf_prefork_register_routers( char* appname ) {
80
81         osrfStringArray* arr = osrfNewStringArray(4);
82
83         int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
84         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
85         transport_client* client = osrfSystemGetTransportClient();
86
87         osrfLogInfo("router name is %s and we have %d routers to connect to", routerName, c );
88
89         while( c ) {
90                 char* domain = osrfStringArrayGetString(arr, --c);
91                 if(domain) {
92
93                         char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
94                         osrfLogInfo("Registering with router %s", jid );
95
96                         transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
97                         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
98
99                         client_send_message( client, msg );
100                         message_free( msg );
101                         free(jid);
102                 }
103         }
104
105         free(routerName);
106         osrfStringArrayFree(arr);
107 }
108
109 void prefork_child_init_hook(prefork_child* child) {
110
111         if(!child) return;
112         osrfLogDebug("Child init hook for child %d", child->pid);
113         char* resc = va_list_to_string("%s_drone",child->appname);
114
115         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) {
116                 osrfLogError("Unable to bootstrap client for osrf_prefork_run()");
117                 free(resc);
118                 return;
119         }
120
121         free(resc);
122
123         set_proc_title( "OpenSRF Drone [%s]", child->appname );
124 }
125
126 void prefork_child_process_request(prefork_child* child, char* data) {
127         if( !child ) return;
128
129         /* construct the message from the xml */
130         transport_message* msg = new_message_from_xml( data );
131
132         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
133         if(!session) return;
134
135         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
136                 osrfAppSessionFree( session );
137                 return;
138         }
139
140         /* keepalive loop for stateful sessions */
141
142                 osrfLogDebug("Entering keepalive loop for session %s", session->session_id );
143 }
144
145
146 prefork_simple*  prefork_simple_init( transport_client* client, 
147                 int max_requests, int min_children, int max_children ) {
148
149         if( min_children > max_children ) {
150                 osrfLogError( "min_children (%d) is greater "
151                                 "than max_children (%d)", min_children, max_children );
152                 return NULL;
153         }
154
155         if( max_children > ABS_MAX_CHILDREN ) {
156                 osrfLogError( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
157                                 max_children, ABS_MAX_CHILDREN );
158                 return NULL;
159         }
160
161         /* flesh out the struct */
162         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
163         prefork->max_requests = max_requests;
164         prefork->min_children = min_children;
165         prefork->max_children = max_children;
166         prefork->first_child = NULL;
167         prefork->connection = client;
168
169         return prefork;
170 }
171
172 prefork_child*  launch_child( prefork_simple* forker ) {
173
174         pid_t pid;
175         int data_fd[2];
176         int status_fd[2];
177
178         /* Set up the data pipes and add the child struct to the parent */
179         if( pipe(data_fd) < 0 ) { /* build the data pipe*/
180                 osrfLogError( "Pipe making error" );
181                 return NULL;
182         }
183
184         if( pipe(status_fd) < 0 ) {/* build the status pipe */
185                 osrfLogError( "Pipe making error" );
186                 return NULL;
187         }
188
189         osrfLogInternal( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
190         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
191                         data_fd[1], status_fd[0], status_fd[1] );
192
193         child->appname = strdup(forker->appname);
194
195
196         add_prefork_child( forker, child );
197
198         if( (pid=fork()) < 0 ) {
199                 osrfLogError( "Forking Error" );
200                 return NULL;
201         }
202
203         if( pid > 0 ) {  /* parent */
204
205                 signal(SIGCHLD, sigchld_handler);
206                 (forker->current_num_children)++;
207                 child->pid = pid;
208
209                 osrfLogDebug( "Parent launched %d", pid );
210                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
211                         the children are currently using */
212                 return child;
213         }
214
215         else { /* child */
216
217                 osrfLogInternal("I am  new child with read_data_fd = %d and write_status_fd = %d",
218                         child->read_data_fd, child->write_status_fd );
219
220                 child->pid = getpid();
221                 close( child->write_data_fd );
222                 close( child->read_status_fd );
223
224                 /* do the initing */
225                 prefork_child_init_hook(child);
226
227                 prefork_child_wait( child );
228                 exit(0); /* just to be sure */
229          }
230         return NULL;
231 }
232
233
234 void prefork_launch_children( prefork_simple* forker ) {
235         if(!forker) return;
236         int c = 0;
237         while( c++ < forker->min_children )
238                 launch_child( forker );
239 }
240
241
242 void sigchld_handler( int sig ) {
243         signal(SIGCHLD, sigchld_handler);
244         child_dead = 1;
245 }
246
247
248 void reap_children( prefork_simple* forker ) {
249
250         pid_t child_pid;
251         int status;
252
253         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
254                 del_prefork_child( forker, child_pid ); 
255
256         /* replenish */
257         while( forker->current_num_children < forker->min_children ) 
258                 launch_child( forker );
259
260         child_dead = 0;
261 }
262
263 void prefork_run(prefork_simple* forker) {
264
265         if( forker->first_child == NULL )
266                 return;
267
268         transport_message* cur_msg = NULL;
269
270
271         while(1) {
272
273                 if( forker->first_child == NULL ) {/* no more children */
274                         osrfLogWarning("No more children..." );
275                         return;
276                 }
277
278                 osrfLogDebug("Forker going into wait for data...");
279                 cur_msg = client_recv( forker->connection, -1 );
280
281                 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
282
283                 if( cur_msg == NULL ) continue;
284
285                 int honored = 0;        /* true if we've serviced the request */
286
287                 while( ! honored ) {
288
289                         check_children( forker ); 
290
291                         osrfLogDebug( "Server received inbound data" );
292                         int k;
293                         prefork_child* cur_child = forker->first_child;
294
295                         /* Look for an available child */
296                         for( k = 0; k < forker->current_num_children; k++ ) {
297
298                                 osrfLogInternal("Searching for available child. cur_child->pid = %d", cur_child->pid );
299                                 osrfLogInternal("Current num children %d and loop %d", forker->current_num_children, k);
300                         
301                                 if( cur_child->available ) {
302                                         osrfLogDebug( "forker sending data to %d", cur_child->pid );
303
304                                         message_prepare_xml( cur_msg );
305                                         char* data = cur_msg->msg_xml;
306                                         if( ! data || strlen(data) < 1 ) break;
307
308                                         cur_child->available = 0;
309                                         osrfLogInternal( "Writing to child fd %d", cur_child->write_data_fd );
310
311                                         int written = 0;
312                                         //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
313                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
314                                                 osrfLogWarning("Write returned error %d", errno);
315                                                 cur_child = cur_child->next;
316                                                 continue;
317                                         }
318
319                                         //fprintf(stderr, "Wrote %d bytes to child\n", written);
320
321                                         forker->first_child = cur_child->next;
322                                         honored = 1;
323                                         break;
324                                 } else 
325                                         cur_child = cur_child->next;
326                         } 
327
328                         /* if none available, add a new child if we can */
329                         if( ! honored ) {
330                                 osrfLogDebug("Not enough children, attempting to add...");
331                                 if( forker->current_num_children < forker->max_children ) {
332                                         osrfLogDebug( "Launching new child with current_num = %d",
333                                                         forker->current_num_children );
334
335                                         prefork_child* new_child = launch_child( forker );
336                                         message_prepare_xml( cur_msg );
337                                         char* data = cur_msg->msg_xml;
338                                         if( ! data || strlen(data) < 1 ) break;
339                                         new_child->available = 0;
340                                         osrfLogDebug( "Writing to new child fd %d : pid %d", 
341                                                         new_child->write_data_fd, new_child->pid );
342                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
343                                         forker->first_child = new_child->next;
344                                         honored = 1;
345                                 }
346                         }
347
348                         if( !honored ) {
349                                 osrfLogWarning( "No children available, sleeping and looping..." );
350                                 usleep( 50000 ); /* 50 milliseconds */
351                         }
352
353                         if( child_dead )
354                                 reap_children(forker);
355
356
357                         //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
358
359                 } // honored?
360
361                 message_free( cur_msg );
362
363         } /* top level listen loop */
364
365 }
366
367
368 void check_children( prefork_simple* forker ) {
369
370         //check_begin:
371
372         int select_ret;
373         fd_set read_set;
374         FD_ZERO(&read_set);
375         int max_fd = 0;
376         int n;
377
378         struct timeval tv;
379         tv.tv_sec       = 0;
380         tv.tv_usec      = 0;
381
382         if( child_dead )
383                 reap_children(forker);
384
385         prefork_child* cur_child = forker->first_child;
386
387         int i;
388         for( i = 0; i!= forker->current_num_children; i++ ) {
389
390                 if( cur_child->read_status_fd > max_fd )
391                         max_fd = cur_child->read_status_fd;
392                 FD_SET( cur_child->read_status_fd, &read_set );
393                 cur_child = cur_child->next;
394         }
395
396         FD_CLR(0,&read_set);/* just to be sure */
397
398         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
399                 osrfLogWarning( "Select returned error %d on check_children", errno );
400         }
401
402         if( select_ret == 0 )
403                 return;
404
405         /* see if one of a child has told us it's done */
406         cur_child = forker->first_child;
407         int j;
408         int num_handled = 0;
409         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
410
411                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
412                         //printf( "Server received status from a child %d\n", cur_child->pid );
413                         osrfLogDebug( "Server received status from a child %d", cur_child->pid );
414
415                         num_handled++;
416
417                         /* now suck off the data */
418                         char buf[64];
419                         memset( buf, 0, 64);
420                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
421                                 osrfLogWarning("Read error afer select in child status read with errno %d", errno);
422                         }
423
424                         osrfLogDebug( "Read %d bytes from status buffer: %s", n, buf );
425                         cur_child->available = 1;
426                 }
427                 cur_child = cur_child->next;
428         } 
429
430 }
431
432
433 void prefork_child_wait( prefork_child* child ) {
434
435         int i,n;
436         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
437         char buf[READ_BUFSIZE];
438         memset( buf, 0, READ_BUFSIZE );
439
440         for( i = 0; i!= child->max_requests; i++ ) {
441
442                 n = -1;
443                 clr_fl(child->read_data_fd, O_NONBLOCK );
444                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
445                         buffer_add( gbuf, buf );
446                         memset( buf, 0, READ_BUFSIZE );
447
448                         //fprintf(stderr, "Child read %d bytes\n", n);
449
450                         if( n == READ_BUFSIZE ) { 
451                                 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
452                                 /* XXX */
453                                 /* either we have exactly READ_BUFSIZE data, 
454                                         or there's more waiting that we need to grab*/
455                                 /* must set to non-block for reading more */
456                         } else {
457                                 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
458                                 prefork_child_process_request(child, gbuf->buf);
459                                 buffer_reset( gbuf );
460                                 break;
461                         }
462                 }
463
464                 if( n < 0 ) {
465                         osrfLogWarning( "Child read returned error with errno %d", errno );
466                         break;
467                 }
468
469                 if( i < child->max_requests - 1 ) 
470                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
471         }
472
473         buffer_free(gbuf);
474
475         osrfLogDebug("Child exiting...[%d]", getpid() );
476
477         exit(0);
478 }
479
480
481 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
482         
483         if( forker->first_child == NULL ) {
484                 forker->first_child = child;
485                 child->next = child;
486                 return;
487         }
488
489         /* we put the child in as the last because, regardless, 
490                 we have to do the DLL splice dance, and this is the
491            simplest way */
492
493         prefork_child* start_child = forker->first_child;
494         while(1) {
495                 if( forker->first_child->next == start_child ) 
496                         break;
497                 forker->first_child = forker->first_child->next;
498         }
499
500         /* here we know that forker->first_child is the last element 
501                 in the list and start_child is the first.  Insert the
502                 new child between them*/
503
504         forker->first_child->next = child;
505         child->next = start_child;
506         return;
507 }
508
509 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
510
511         if( forker->first_child == NULL ) { return NULL; }
512         prefork_child* start_child = forker->first_child;
513         do {
514                 if( forker->first_child->pid == pid ) 
515                         return forker->first_child;
516         } while( (forker->first_child = forker->first_child->next) != start_child );
517
518         return NULL;
519 }
520
521
522 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
523
524         if( forker->first_child == NULL ) { return; }
525
526         (forker->current_num_children)--;
527         osrfLogDebug("Deleting Child: %d", pid );
528
529         prefork_child* start_child = forker->first_child; /* starting point */
530         prefork_child* cur_child        = start_child; /* current pointer */
531         prefork_child* prev_child       = start_child; /* the trailing pointer */
532
533         /* special case where there is only one in the list */
534         if( start_child == start_child->next ) {
535                 if( start_child->pid == pid ) {
536                         forker->first_child = NULL;
537
538                         close( start_child->read_data_fd );
539                         close( start_child->write_data_fd );
540                         close( start_child->read_status_fd );
541                         close( start_child->write_status_fd );
542
543                         prefork_child_free( start_child );
544                 }
545                 return;
546         }
547
548
549         /* special case where the first item in the list needs to be removed */
550         if( start_child->pid == pid ) { 
551
552                 /* find the last one so we can remove the start_child */
553                 do { 
554                         prev_child = cur_child;
555                         cur_child = cur_child->next;
556                 }while( cur_child != start_child );
557
558                 /* now cur_child == start_child */
559                 prev_child->next = cur_child->next;
560                 forker->first_child = prev_child;
561
562                 close( cur_child->read_data_fd );
563                 close( cur_child->write_data_fd );
564                 close( cur_child->read_status_fd );
565                 close( cur_child->write_status_fd );
566
567                 prefork_child_free( cur_child );
568                 return;
569         } 
570
571         do {
572                 prev_child = cur_child;
573                 cur_child = cur_child->next;
574
575                 if( cur_child->pid == pid ) {
576                         prev_child->next = cur_child->next;
577
578                         close( cur_child->read_data_fd );
579                         close( cur_child->write_data_fd );
580                         close( cur_child->read_status_fd );
581                         close( cur_child->write_status_fd );
582
583                         prefork_child_free( cur_child );
584                         return;
585                 }
586
587         } while(cur_child != start_child);
588 }
589
590
591
592
593 prefork_child* prefork_child_init( 
594         int max_requests, int read_data_fd, int write_data_fd, 
595         int read_status_fd, int write_status_fd ) {
596
597         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
598         child->max_requests             = max_requests;
599         child->read_data_fd             = read_data_fd;
600         child->write_data_fd            = write_data_fd;
601         child->read_status_fd   = read_status_fd;
602         child->write_status_fd  = write_status_fd;
603         child->available                        = 1;
604
605         return child;
606 }
607
608
609 int prefork_free( prefork_simple* prefork ) {
610         
611         while( prefork->first_child != NULL ) {
612                 osrfLogInfo( "Killing children and sleeping 1 to reap..." );
613                 kill( 0,        SIGKILL );
614                 sleep(1);
615         }
616
617         client_free(prefork->connection);
618         free(prefork->appname);
619         free( prefork );
620         return 1;
621 }
622
623 int prefork_child_free( prefork_child* child ) { 
624         free(child->appname);
625         close(child->read_data_fd);
626         close(child->write_status_fd);
627         free( child ); 
628         return 1;
629 }
630