f9761d22499876af968847674d2c9adae419a390
[OpenSRF.git] / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3 #include "osrf_app_session.h"
4 #include "osrf_application.h"
5
6 /* true if we just deleted a child.  This will allow us to make sure we're
7         not trying to use freed memory */
8 int child_dead;
9
10 int main();
11 void sigchld_handler( int sig );
12
13 int osrf_prefork_run(char* appname) {
14
15         if(!appname) {
16                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
17                 return -1;
18         }
19
20         set_proc_title( "OpenSRF Listener [%s]", appname );
21
22         int maxr = 1000; 
23         int maxc = 10;
24         int minc = 3;
25
26         osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
27
28         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
29         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
30         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
31
32         
33         if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming 1000");
34         else maxr = (int) jsonObjectGetNumber(max_req);
35
36         if(!min_children) osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming 3");
37         else minc = (int) jsonObjectGetNumber(min_children);
38
39         if(!max_children) osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming 10");
40         else maxc = (int) jsonObjectGetNumber(max_children);
41
42         jsonObjectFree(max_req);
43         jsonObjectFree(min_children);
44         jsonObjectFree(max_children);
45         /* --------------------------------------------------- */
46
47         char* resc = va_list_to_string("%s_listener", appname);
48
49         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc )) {
50                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
51                 free(resc);
52                 return -1;
53         }
54
55         free(resc);
56
57         prefork_simple* forker = prefork_simple_init(
58                 osrfSystemGetTransportClient(), maxr, minc, maxc);
59
60         forker->appname = strdup(appname);
61
62         if(forker == NULL) {
63                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object");
64                 return -1;
65         }
66
67         prefork_launch_children(forker);
68
69         osrf_prefork_register_routers(appname);
70         
71         osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
72         prefork_run(forker);
73         
74         osrfLogWarning( OSRF_LOG_MARK, "prefork_run() retuned - how??");
75         prefork_free(forker);
76         return 0;
77
78 }
79
80 void osrf_prefork_register_routers( char* appname ) {
81
82         osrfStringArray* arr = osrfNewStringArray(4);
83
84         int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
85         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
86         transport_client* client = osrfSystemGetTransportClient();
87
88         osrfLogInfo( OSRF_LOG_MARK, "router name is %s and we have %d routers to connect to", routerName, c );
89
90         while( c ) {
91                 char* domain = osrfStringArrayGetString(arr, --c);
92                 if(domain) {
93
94                         char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
95                         osrfLogInfo( OSRF_LOG_MARK, "Registering with router %s", jid );
96
97                         transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
98                         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
99
100                         client_send_message( client, msg );
101                         message_free( msg );
102                         free(jid);
103                 }
104         }
105
106         free(routerName);
107         osrfStringArrayFree(arr);
108 }
109
110 void prefork_child_init_hook(prefork_child* child) {
111
112         if(!child) return;
113         osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
114         char* resc = va_list_to_string("%s_drone",child->appname);
115
116         /* we want to remove traces of our parents socket connection 
117          * so we can have our own */
118         osrfSystemIgnoreTransportClient();
119
120         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) {
121                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
122                 free(resc);
123                 return;
124         }
125
126         free(resc);
127
128         if( ! osrfAppRunChildInit(child->appname) ) {
129                 osrfLogDebug(OSRF_LOG_MARK, "Prefork child_init succeeded\n");
130         } else {
131                 osrfLogError(OSRF_LOG_MARK, "Prefork child_init failed\n");
132         }
133
134         set_proc_title( "OpenSRF Drone [%s]", child->appname );
135 }
136
137 void prefork_child_process_request(prefork_child* child, char* data) {
138         if( !child ) return;
139
140         /* construct the message from the xml */
141         transport_message* msg = new_message_from_xml( data );
142
143         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
144         if(!session) return;
145
146         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
147                 osrfAppSessionFree( session );
148                 return;
149         }
150
151         /* keepalive loop for stateful sessions */
152
153                 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
154 }
155
156
157 prefork_simple*  prefork_simple_init( transport_client* client, 
158                 int max_requests, int min_children, int max_children ) {
159
160         if( min_children > max_children ) {
161                 osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
162                                 "than max_children (%d)", min_children, max_children );
163                 return NULL;
164         }
165
166         if( max_children > ABS_MAX_CHILDREN ) {
167                 osrfLogError( OSRF_LOG_MARK,  "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
168                                 max_children, ABS_MAX_CHILDREN );
169                 return NULL;
170         }
171
172         osrfLogInfo(OSRF_LOG_MARK, "Prefork launching child with max_request=%d,"
173                 "min_children=%d, max_children=%d", max_requests, min_children, max_children );
174
175         /* flesh out the struct */
176         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
177         prefork->max_requests = max_requests;
178         prefork->min_children = min_children;
179         prefork->max_children = max_children;
180         prefork->first_child = NULL;
181         prefork->connection = client;
182
183         return prefork;
184 }
185
186 prefork_child*  launch_child( prefork_simple* forker ) {
187
188         pid_t pid;
189         int data_fd[2];
190         int status_fd[2];
191
192         /* Set up the data pipes and add the child struct to the parent */
193         if( pipe(data_fd) < 0 ) { /* build the data pipe*/
194                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
195                 return NULL;
196         }
197
198         if( pipe(status_fd) < 0 ) {/* build the status pipe */
199                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
200                 return NULL;
201         }
202
203         osrfLogInternal( OSRF_LOG_MARK,  "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
204         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
205                         data_fd[1], status_fd[0], status_fd[1] );
206
207         child->appname = strdup(forker->appname);
208
209
210         add_prefork_child( forker, child );
211
212         if( (pid=fork()) < 0 ) {
213                 osrfLogError( OSRF_LOG_MARK,  "Forking Error" );
214                 return NULL;
215         }
216
217         if( pid > 0 ) {  /* parent */
218
219                 signal(SIGCHLD, sigchld_handler);
220                 (forker->current_num_children)++;
221                 child->pid = pid;
222
223                 osrfLogDebug( OSRF_LOG_MARK,  "Parent launched %d", pid );
224                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
225                         the children are currently using */
226                 return child;
227         }
228
229         else { /* child */
230
231                 osrfLogInternal( OSRF_LOG_MARK, "I am  new child with read_data_fd = %d and write_status_fd = %d",
232                         child->read_data_fd, child->write_status_fd );
233
234                 child->pid = getpid();
235                 close( child->write_data_fd );
236                 close( child->read_status_fd );
237
238                 /* do the initing */
239                 prefork_child_init_hook(child);
240
241                 prefork_child_wait( child );
242                 exit(0); /* just to be sure */
243          }
244         return NULL;
245 }
246
247
248 void prefork_launch_children( prefork_simple* forker ) {
249         if(!forker) return;
250         int c = 0;
251         while( c++ < forker->min_children )
252                 launch_child( forker );
253 }
254
255
256 void sigchld_handler( int sig ) {
257         signal(SIGCHLD, sigchld_handler);
258         child_dead = 1;
259 }
260
261
262 void reap_children( prefork_simple* forker ) {
263
264         pid_t child_pid;
265         int status;
266
267         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
268                 del_prefork_child( forker, child_pid ); 
269
270         /* replenish */
271         while( forker->current_num_children < forker->min_children ) 
272                 launch_child( forker );
273
274         child_dead = 0;
275 }
276
277 void prefork_run(prefork_simple* forker) {
278
279         if( forker->first_child == NULL )
280                 return;
281
282         transport_message* cur_msg = NULL;
283
284
285         while(1) {
286
287                 if( forker->first_child == NULL ) {/* no more children */
288                         osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
289                         return;
290                 }
291
292                 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
293                 cur_msg = client_recv( forker->connection, -1 );
294
295                 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
296
297                 if( cur_msg == NULL ) continue;
298
299                 int honored = 0;        /* true if we've serviced the request */
300
301                 while( ! honored ) {
302
303                         check_children( forker ); 
304
305                         osrfLogDebug( OSRF_LOG_MARK,  "Server received inbound data" );
306                         int k;
307                         prefork_child* cur_child = forker->first_child;
308
309                         /* Look for an available child */
310                         for( k = 0; k < forker->current_num_children; k++ ) {
311
312                                 osrfLogInternal( OSRF_LOG_MARK, "Searching for available child. cur_child->pid = %d", cur_child->pid );
313                                 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d", forker->current_num_children, k);
314                         
315                                 if( cur_child->available ) {
316                                         osrfLogDebug( OSRF_LOG_MARK,  "forker sending data to %d", cur_child->pid );
317
318                                         message_prepare_xml( cur_msg );
319                                         char* data = cur_msg->msg_xml;
320                                         if( ! data || strlen(data) < 1 ) break;
321
322                                         cur_child->available = 0;
323                                         osrfLogInternal( OSRF_LOG_MARK,  "Writing to child fd %d", cur_child->write_data_fd );
324
325                                         int written = 0;
326                                         //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
327                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
328                                                 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
329                                                 cur_child = cur_child->next;
330                                                 continue;
331                                         }
332
333                                         //fprintf(stderr, "Wrote %d bytes to child\n", written);
334
335                                         forker->first_child = cur_child->next;
336                                         honored = 1;
337                                         break;
338                                 } else 
339                                         cur_child = cur_child->next;
340                         } 
341
342                         /* if none available, add a new child if we can */
343                         if( ! honored ) {
344                                 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
345                                 if( forker->current_num_children < forker->max_children ) {
346                                         osrfLogDebug( OSRF_LOG_MARK,  "Launching new child with current_num = %d",
347                                                         forker->current_num_children );
348
349                                         prefork_child* new_child = launch_child( forker );
350                                         message_prepare_xml( cur_msg );
351                                         char* data = cur_msg->msg_xml;
352                                         if( ! data || strlen(data) < 1 ) break;
353                                         new_child->available = 0;
354                                         osrfLogDebug( OSRF_LOG_MARK,  "Writing to new child fd %d : pid %d", 
355                                                         new_child->write_data_fd, new_child->pid );
356                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
357                                         forker->first_child = new_child->next;
358                                         honored = 1;
359                                 }
360                         }
361
362                         if( !honored ) {
363                                 osrfLogWarning( OSRF_LOG_MARK,  "No children available, sleeping and looping..." );
364                                 usleep( 50000 ); /* 50 milliseconds */
365                         }
366
367                         if( child_dead )
368                                 reap_children(forker);
369
370
371                         //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
372
373                 } // honored?
374
375                 message_free( cur_msg );
376
377         } /* top level listen loop */
378
379 }
380
381
382 void check_children( prefork_simple* forker ) {
383
384         //check_begin:
385
386         int select_ret;
387         fd_set read_set;
388         FD_ZERO(&read_set);
389         int max_fd = 0;
390         int n;
391
392         struct timeval tv;
393         tv.tv_sec       = 0;
394         tv.tv_usec      = 0;
395
396         if( child_dead )
397                 reap_children(forker);
398
399         prefork_child* cur_child = forker->first_child;
400
401         int i;
402         for( i = 0; i!= forker->current_num_children; i++ ) {
403
404                 if( cur_child->read_status_fd > max_fd )
405                         max_fd = cur_child->read_status_fd;
406                 FD_SET( cur_child->read_status_fd, &read_set );
407                 cur_child = cur_child->next;
408         }
409
410         FD_CLR(0,&read_set);/* just to be sure */
411
412         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
413                 osrfLogWarning( OSRF_LOG_MARK,  "Select returned error %d on check_children", errno );
414         }
415
416         if( select_ret == 0 )
417                 return;
418
419         /* see if one of a child has told us it's done */
420         cur_child = forker->first_child;
421         int j;
422         int num_handled = 0;
423         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
424
425                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
426                         //printf( "Server received status from a child %d\n", cur_child->pid );
427                         osrfLogDebug( OSRF_LOG_MARK,  "Server received status from a child %d", cur_child->pid );
428
429                         num_handled++;
430
431                         /* now suck off the data */
432                         char buf[64];
433                         memset( buf, 0, 64);
434                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
435                                 osrfLogWarning( OSRF_LOG_MARK, "Read error afer select in child status read with errno %d", errno);
436                         }
437
438                         osrfLogDebug( OSRF_LOG_MARK,  "Read %d bytes from status buffer: %s", n, buf );
439                         cur_child->available = 1;
440                 }
441                 cur_child = cur_child->next;
442         } 
443
444 }
445
446
447 void prefork_child_wait( prefork_child* child ) {
448
449         int i,n;
450         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
451         char buf[READ_BUFSIZE];
452         memset( buf, 0, READ_BUFSIZE );
453
454         for( i = 0; i < child->max_requests; i++ ) {
455
456                 n = -1;
457                 clr_fl(child->read_data_fd, O_NONBLOCK );
458                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
459                         buffer_add( gbuf, buf );
460                         memset( buf, 0, READ_BUFSIZE );
461
462                         //fprintf(stderr, "Child read %d bytes\n", n);
463
464                         if( n == READ_BUFSIZE ) { 
465                                 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
466                                 /* XXX */
467                                 /* either we have exactly READ_BUFSIZE data, 
468                                         or there's more waiting that we need to grab*/
469                                 /* must set to non-block for reading more */
470                         } else {
471                                 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
472                                 prefork_child_process_request(child, gbuf->buf);
473                                 buffer_reset( gbuf );
474                                 break;
475                         }
476                 }
477
478                 if( n < 0 ) {
479                         osrfLogWarning( OSRF_LOG_MARK,  "Prefork child read returned error with errno %d", errno );
480                         break;
481                 }
482
483                 if( i < child->max_requests - 1 ) 
484                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
485         }
486
487         buffer_free(gbuf);
488
489         osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%d]", 
490                         child->max_requests, i, getpid() );
491
492         exit(0);
493 }
494
495
496 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
497         
498         if( forker->first_child == NULL ) {
499                 forker->first_child = child;
500                 child->next = child;
501                 return;
502         }
503
504         /* we put the child in as the last because, regardless, 
505                 we have to do the DLL splice dance, and this is the
506            simplest way */
507
508         prefork_child* start_child = forker->first_child;
509         while(1) {
510                 if( forker->first_child->next == start_child ) 
511                         break;
512                 forker->first_child = forker->first_child->next;
513         }
514
515         /* here we know that forker->first_child is the last element 
516                 in the list and start_child is the first.  Insert the
517                 new child between them*/
518
519         forker->first_child->next = child;
520         child->next = start_child;
521         return;
522 }
523
524 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
525
526         if( forker->first_child == NULL ) { return NULL; }
527         prefork_child* start_child = forker->first_child;
528         do {
529                 if( forker->first_child->pid == pid ) 
530                         return forker->first_child;
531         } while( (forker->first_child = forker->first_child->next) != start_child );
532
533         return NULL;
534 }
535
536
537 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
538
539         if( forker->first_child == NULL ) { return; }
540
541         (forker->current_num_children)--;
542         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
543
544         prefork_child* start_child = forker->first_child; /* starting point */
545         prefork_child* cur_child        = start_child; /* current pointer */
546         prefork_child* prev_child       = start_child; /* the trailing pointer */
547
548         /* special case where there is only one in the list */
549         if( start_child == start_child->next ) {
550                 if( start_child->pid == pid ) {
551                         forker->first_child = NULL;
552
553                         close( start_child->read_data_fd );
554                         close( start_child->write_data_fd );
555                         close( start_child->read_status_fd );
556                         close( start_child->write_status_fd );
557
558                         prefork_child_free( start_child );
559                 }
560                 return;
561         }
562
563
564         /* special case where the first item in the list needs to be removed */
565         if( start_child->pid == pid ) { 
566
567                 /* find the last one so we can remove the start_child */
568                 do { 
569                         prev_child = cur_child;
570                         cur_child = cur_child->next;
571                 }while( cur_child != start_child );
572
573                 /* now cur_child == start_child */
574                 prev_child->next = cur_child->next;
575                 forker->first_child = prev_child;
576
577                 close( cur_child->read_data_fd );
578                 close( cur_child->write_data_fd );
579                 close( cur_child->read_status_fd );
580                 close( cur_child->write_status_fd );
581
582                 prefork_child_free( cur_child );
583                 return;
584         } 
585
586         do {
587                 prev_child = cur_child;
588                 cur_child = cur_child->next;
589
590                 if( cur_child->pid == pid ) {
591                         prev_child->next = cur_child->next;
592
593                         close( cur_child->read_data_fd );
594                         close( cur_child->write_data_fd );
595                         close( cur_child->read_status_fd );
596                         close( cur_child->write_status_fd );
597
598                         prefork_child_free( cur_child );
599                         return;
600                 }
601
602         } while(cur_child != start_child);
603 }
604
605
606
607
608 prefork_child* prefork_child_init( 
609         int max_requests, int read_data_fd, int write_data_fd, 
610         int read_status_fd, int write_status_fd ) {
611
612         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
613         child->max_requests             = max_requests;
614         child->read_data_fd             = read_data_fd;
615         child->write_data_fd            = write_data_fd;
616         child->read_status_fd   = read_status_fd;
617         child->write_status_fd  = write_status_fd;
618         child->available                        = 1;
619
620         return child;
621 }
622
623
624 int prefork_free( prefork_simple* prefork ) {
625         
626         while( prefork->first_child != NULL ) {
627                 osrfLogInfo( OSRF_LOG_MARK,  "Killing children and sleeping 1 to reap..." );
628                 kill( 0,        SIGKILL );
629                 sleep(1);
630         }
631
632         client_free(prefork->connection);
633         free(prefork->appname);
634         free( prefork );
635         return 1;
636 }
637
638 int prefork_child_free( prefork_child* child ) { 
639         free(child->appname);
640         close(child->read_data_fd);
641         close(child->write_status_fd);
642         free( child ); 
643         return 1;
644 }
645