2c89bb85faac8a72cfcf363380e86b3625e2e1ba
[OpenSRF.git] / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3 #include "osrf_app_session.h"
4
5 /* true if we just deleted a child.  This will allow us to make sure we're
6         not trying to use freed memory */
7 int child_dead;
8
9 int main();
10 void sigchld_handler( int sig );
11
12 int osrf_prefork_run(char* appname) {
13
14         if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
15
16         set_proc_title( "OpenSRF Listener [%s]", appname );
17
18         int maxr = 1000; 
19         int maxc = 10;
20         int minc = 3;
21
22         info_handler("Loading config in osrf_forker for app %s", appname);
23
24         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
25         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
26         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
27
28         
29         if(!max_req) warning_handler("Max requests not defined, assuming 1000");
30         else maxr = (int) jsonObjectGetNumber(max_req);
31
32         if(!min_children) warning_handler("Min children not defined, assuming 3");
33         else minc = (int) jsonObjectGetNumber(min_children);
34
35         if(!max_children) warning_handler("Max children not defined, assuming 10");
36         else maxc = (int) jsonObjectGetNumber(max_children);
37
38         jsonObjectFree(max_req);
39         jsonObjectFree(min_children);
40         jsonObjectFree(max_children);
41         /* --------------------------------------------------- */
42
43         char* resc = va_list_to_string("%s_listener", appname);
44
45         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc ))
46                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
47         free(resc);
48
49         prefork_simple* forker = prefork_simple_init(
50                 osrfSystemGetTransportClient(), maxr, minc, maxc);
51
52         forker->appname = strdup(appname);
53
54         if(forker == NULL)
55                 fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
56
57         prefork_launch_children(forker);
58
59         osrf_prefork_register_routers(appname);
60         
61         info_handler("Launching osrf_forker for app %s", appname);
62         prefork_run(forker);
63         
64         warning_handler("prefork_run() retuned - how??");
65         prefork_free(forker);
66         return 0;
67
68 }
69
70 void osrf_prefork_register_routers( char* appname ) {
71
72         osrfStringArray* arr = osrfNewStringArray(4);
73
74         int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
75         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
76         transport_client* client = osrfSystemGetTransportClient();
77
78         info_handler("router name is %s and we have %d routers to connect to", routerName, c );
79
80         while( c ) {
81                 char* domain = osrfStringArrayGetString(arr, --c);
82                 if(domain) {
83
84                         char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
85                         info_handler("Registering with router %s", jid );
86
87                         transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
88                         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
89
90                         client_send_message( client, msg );
91                         message_free( msg );
92                         free(jid);
93                 }
94         }
95
96         free(routerName);
97         osrfStringArrayFree(arr);
98 }
99
100 void prefork_child_init_hook(prefork_child* child) {
101
102         if(!child) return;
103         info_handler("Child init hook for child %d", child->pid);
104         char* resc = va_list_to_string("%s_drone",child->appname);
105         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) 
106                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
107         free(resc);
108
109         set_proc_title( "OpenSRF Drone [%s]", child->appname );
110 }
111
112 void prefork_child_process_request(prefork_child* child, char* data) {
113         if( !child ) return;
114
115         /* construct the message from the xml */
116         transport_message* msg = new_message_from_xml( data );
117
118         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
119         if(!session) return;
120
121         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
122                 osrfAppSessionFree( session );
123                 return;
124         }
125
126         /* keepalive loop for stateful sessions */
127
128                 debug_handler("Entering keepalive loop for session %s", session->session_id );
129 }
130
131
132 prefork_simple*  prefork_simple_init( transport_client* client, 
133                 int max_requests, int min_children, int max_children ) {
134
135         if( min_children > max_children )
136                 fatal_handler( "min_children (%d) is greater "
137                                 "than max_children (%d)", min_children, max_children );
138
139         if( max_children > ABS_MAX_CHILDREN )
140                 fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
141                                 max_children, ABS_MAX_CHILDREN );
142
143         /* flesh out the struct */
144         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
145         prefork->max_requests = max_requests;
146         prefork->min_children = min_children;
147         prefork->max_children = max_children;
148         prefork->first_child = NULL;
149         prefork->connection = client;
150
151         return prefork;
152 }
153
154 prefork_child*  launch_child( prefork_simple* forker ) {
155
156         pid_t pid;
157         int data_fd[2];
158         int status_fd[2];
159
160         /* Set up the data pipes and add the child struct to the parent */
161         if( pipe(data_fd) < 0 ) /* build the data pipe*/
162                 fatal_handler( "Pipe making error" );
163
164         if( pipe(status_fd) < 0 ) /* build the status pipe */
165                 fatal_handler( "Pipe making error" );
166
167         debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
168         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
169                         data_fd[1], status_fd[0], status_fd[1] );
170
171         child->appname = strdup(forker->appname);
172
173
174         add_prefork_child( forker, child );
175
176         if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
177
178         if( pid > 0 ) {  /* parent */
179
180                 signal(SIGCHLD, sigchld_handler);
181                 (forker->current_num_children)++;
182                 child->pid = pid;
183
184                 info_handler( "Parent launched %d", pid );
185                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
186                         the children are currently using */
187                 return child;
188         }
189
190         else { /* child */
191
192                 debug_handler("I am  new child with read_data_fd = %d and write_status_fd = %d",
193                         child->read_data_fd, child->write_status_fd );
194
195                 child->pid = getpid();
196                 close( child->write_data_fd );
197                 close( child->read_status_fd );
198
199                 /* do the initing */
200                 prefork_child_init_hook(child);
201
202                 prefork_child_wait( child );
203                 exit(0); /* just to be sure */
204          }
205         return NULL;
206 }
207
208
209 void prefork_launch_children( prefork_simple* forker ) {
210         if(!forker) return;
211         int c = 0;
212         while( c++ < forker->min_children )
213                 launch_child( forker );
214 }
215
216
217 void sigchld_handler( int sig ) {
218         signal(SIGCHLD, sigchld_handler);
219         child_dead = 1;
220 }
221
222
223 void reap_children( prefork_simple* forker ) {
224
225         pid_t child_pid;
226         int status;
227
228         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
229                 del_prefork_child( forker, child_pid ); 
230
231         /* replenish */
232         while( forker->current_num_children < forker->min_children ) 
233                 launch_child( forker );
234
235         child_dead = 0;
236 }
237
238 void prefork_run(prefork_simple* forker) {
239
240         if( forker->first_child == NULL )
241                 return;
242
243         transport_message* cur_msg = NULL;
244
245
246         while(1) {
247
248                 if( forker->first_child == NULL ) {/* no more children */
249                         warning_handler("No more children..." );
250                         return;
251                 }
252
253                 debug_handler("Forker going into wait for data...");
254                 cur_msg = client_recv( forker->connection, -1 );
255
256                 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
257
258                 if( cur_msg == NULL ) continue;
259
260                 int honored = 0;        /* true if we've serviced the request */
261
262                 while( ! honored ) {
263
264                         check_children( forker ); 
265
266                         debug_handler( "Server received inbound data" );
267                         int k;
268                         prefork_child* cur_child = forker->first_child;
269
270                         /* Look for an available child */
271                         for( k = 0; k < forker->current_num_children; k++ ) {
272
273                                 debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
274                                 debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
275                         
276                                 if( cur_child->available ) {
277                                         debug_handler( "sending data to %d", cur_child->pid );
278
279                                         message_prepare_xml( cur_msg );
280                                         char* data = cur_msg->msg_xml;
281                                         if( ! data || strlen(data) < 1 ) break;
282
283                                         cur_child->available = 0;
284                                         debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
285
286                                         int written = 0;
287                                         //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
288                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
289                                                 warning_handler("Write returned error %d", errno);
290                                                 cur_child = cur_child->next;
291                                                 continue;
292                                         }
293
294                                         //fprintf(stderr, "Wrote %d bytes to child\n", written);
295
296                                         forker->first_child = cur_child->next;
297                                         honored = 1;
298                                         break;
299                                 } else 
300                                         cur_child = cur_child->next;
301                         } 
302
303                         /* if none available, add a new child if we can */
304                         if( ! honored ) {
305                                 debug_handler("Not enough children, attempting to add...");
306                                 if( forker->current_num_children < forker->max_children ) {
307                                         debug_handler( "Launching new child with current_num = %d",
308                                                         forker->current_num_children );
309
310                                         prefork_child* new_child = launch_child( forker );
311                                         message_prepare_xml( cur_msg );
312                                         char* data = cur_msg->msg_xml;
313                                         if( ! data || strlen(data) < 1 ) break;
314                                         new_child->available = 0;
315                                         debug_handler( "sending data to %d", new_child->pid );
316                                         debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
317                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
318                                         forker->first_child = new_child->next;
319                                         honored = 1;
320                                 }
321                         }
322
323                         if( !honored ) {
324                                 warning_handler( "No children available, sleeping and looping..." );
325                                 usleep( 50000 ); /* 50 milliseconds */
326                         }
327
328                         if( child_dead )
329                                 reap_children(forker);
330
331
332                         //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
333
334                 } // honored?
335
336                 message_free( cur_msg );
337
338         } /* top level listen loop */
339
340 }
341
342
343 void check_children( prefork_simple* forker ) {
344
345         //check_begin:
346
347         int select_ret;
348         fd_set read_set;
349         FD_ZERO(&read_set);
350         int max_fd = 0;
351         int n;
352
353         struct timeval tv;
354         tv.tv_sec       = 0;
355         tv.tv_usec      = 0;
356
357         if( child_dead )
358                 reap_children(forker);
359
360         prefork_child* cur_child = forker->first_child;
361
362         int i;
363         for( i = 0; i!= forker->current_num_children; i++ ) {
364
365                 if( cur_child->read_status_fd > max_fd )
366                         max_fd = cur_child->read_status_fd;
367                 FD_SET( cur_child->read_status_fd, &read_set );
368                 cur_child = cur_child->next;
369         }
370
371         FD_CLR(0,&read_set);/* just to be sure */
372
373         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
374                 warning_handler( "Select returned error %d on check_children", errno );
375         }
376
377         if( select_ret == 0 )
378                 return;
379
380         /* see if one of a child has told us it's done */
381         cur_child = forker->first_child;
382         int j;
383         int num_handled = 0;
384         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
385
386                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
387                         //printf( "Server received status from a child %d\n", cur_child->pid );
388                         debug_handler( "Server received status from a child %d", cur_child->pid );
389
390                         num_handled++;
391
392                         /* now suck off the data */
393                         char buf[64];
394                         memset( buf, 0, 64);
395                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
396                                 warning_handler("Read error afer select in child status read with errno %d", errno);
397                         }
398
399                         debug_handler( "Read %d bytes from status buffer: %s", n, buf );
400                         cur_child->available = 1;
401                 }
402                 cur_child = cur_child->next;
403         } 
404
405 }
406
407
408 void prefork_child_wait( prefork_child* child ) {
409
410         int i,n;
411         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
412         char buf[READ_BUFSIZE];
413         memset( buf, 0, READ_BUFSIZE );
414
415         for( i = 0; i!= child->max_requests; i++ ) {
416
417                 n = -1;
418                 clr_fl(child->read_data_fd, O_NONBLOCK );
419                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
420                         buffer_add( gbuf, buf );
421                         memset( buf, 0, READ_BUFSIZE );
422
423                         //fprintf(stderr, "Child read %d bytes\n", n);
424
425                         if( n == READ_BUFSIZE ) { 
426                                 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
427                                 /* XXX */
428                                 /* either we have exactly READ_BUFSIZE data, 
429                                         or there's more waiting that we need to grab*/
430                                 /* must set to non-block for reading more */
431                         } else {
432                                 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
433                                 prefork_child_process_request(child, gbuf->buf);
434                                 buffer_reset( gbuf );
435                                 break;
436                         }
437                 }
438
439                 if( n < 0 ) {
440                         warning_handler( "Child read returned error with errno %d", errno );
441                         break;
442                 }
443
444                 if( i < child->max_requests - 1 ) 
445                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
446         }
447
448         buffer_free(gbuf);
449
450         debug_handler("Child exiting...[%d]", getpid() );
451
452         exit(0);
453 }
454
455
456 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
457         
458         if( forker->first_child == NULL ) {
459                 forker->first_child = child;
460                 child->next = child;
461                 return;
462         }
463
464         /* we put the child in as the last because, regardless, 
465                 we have to do the DLL splice dance, and this is the
466            simplest way */
467
468         prefork_child* start_child = forker->first_child;
469         while(1) {
470                 if( forker->first_child->next == start_child ) 
471                         break;
472                 forker->first_child = forker->first_child->next;
473         }
474
475         /* here we know that forker->first_child is the last element 
476                 in the list and start_child is the first.  Insert the
477                 new child between them*/
478
479         forker->first_child->next = child;
480         child->next = start_child;
481         return;
482 }
483
484 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
485
486         if( forker->first_child == NULL ) { return NULL; }
487         prefork_child* start_child = forker->first_child;
488         do {
489                 if( forker->first_child->pid == pid ) 
490                         return forker->first_child;
491         } while( (forker->first_child = forker->first_child->next) != start_child );
492
493         return NULL;
494 }
495
496
497 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
498
499         if( forker->first_child == NULL ) { return; }
500
501         (forker->current_num_children)--;
502         debug_handler("Deleting Child: %d", pid );
503
504         prefork_child* start_child = forker->first_child; /* starting point */
505         prefork_child* cur_child        = start_child; /* current pointer */
506         prefork_child* prev_child       = start_child; /* the trailing pointer */
507
508         /* special case where there is only one in the list */
509         if( start_child == start_child->next ) {
510                 if( start_child->pid == pid ) {
511                         forker->first_child = NULL;
512
513                         close( start_child->read_data_fd );
514                         close( start_child->write_data_fd );
515                         close( start_child->read_status_fd );
516                         close( start_child->write_status_fd );
517
518                         prefork_child_free( start_child );
519                 }
520                 return;
521         }
522
523
524         /* special case where the first item in the list needs to be removed */
525         if( start_child->pid == pid ) { 
526
527                 /* find the last one so we can remove the start_child */
528                 do { 
529                         prev_child = cur_child;
530                         cur_child = cur_child->next;
531                 }while( cur_child != start_child );
532
533                 /* now cur_child == start_child */
534                 prev_child->next = cur_child->next;
535                 forker->first_child = prev_child;
536
537                 close( cur_child->read_data_fd );
538                 close( cur_child->write_data_fd );
539                 close( cur_child->read_status_fd );
540                 close( cur_child->write_status_fd );
541
542                 prefork_child_free( cur_child );
543                 return;
544         } 
545
546         do {
547                 prev_child = cur_child;
548                 cur_child = cur_child->next;
549
550                 if( cur_child->pid == pid ) {
551                         prev_child->next = cur_child->next;
552
553                         close( cur_child->read_data_fd );
554                         close( cur_child->write_data_fd );
555                         close( cur_child->read_status_fd );
556                         close( cur_child->write_status_fd );
557
558                         prefork_child_free( cur_child );
559                         return;
560                 }
561
562         } while(cur_child != start_child);
563 }
564
565
566
567
568 prefork_child* prefork_child_init( 
569         int max_requests, int read_data_fd, int write_data_fd, 
570         int read_status_fd, int write_status_fd ) {
571
572         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
573         child->max_requests             = max_requests;
574         child->read_data_fd             = read_data_fd;
575         child->write_data_fd            = write_data_fd;
576         child->read_status_fd   = read_status_fd;
577         child->write_status_fd  = write_status_fd;
578         child->available                        = 1;
579
580         return child;
581 }
582
583
584 int prefork_free( prefork_simple* prefork ) {
585         
586         while( prefork->first_child != NULL ) {
587                 info_handler( "Killing children and sleeping 1 to reap..." );
588                 kill( 0,        SIGKILL );
589                 sleep(1);
590         }
591
592         client_free(prefork->connection);
593         free(prefork->appname);
594         free( prefork );
595         return 1;
596 }
597
598 int prefork_child_free( prefork_child* child ) { 
599         free(child->appname);
600         close(child->read_data_fd);
601         close(child->write_status_fd);
602         free( child ); 
603         return 1;
604 }
605