477b0dc338ee1e30b781521b178623d5f0cb8bcf
[OpenSRF.git] / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3 #include "osrf_app_session.h"
4
5 /* true if we just deleted a child.  This will allow us to make sure we're
6         not trying to use freed memory */
7 int child_dead;
8
9 int main();
10 void sigchld_handler( int sig );
11
12 int osrf_prefork_run(char* appname) {
13
14         if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
15
16         set_proc_title( "opensrf listener [%s]", appname );
17
18         int maxr = 1000; 
19         int maxc = 10;
20         int minc = 3;
21
22         info_handler("Loading config in osrf_forker for app %s", appname);
23
24         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
25         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
26         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
27
28         if(!max_req) warning_handler("Max requests not defined, assuming 1000");
29         else maxr = (int) jsonObjectGetNumber(max_req);
30
31         if(!min_children) warning_handler("Min children not defined, assuming 3");
32         else minc = (int) jsonObjectGetNumber(min_children);
33
34         if(!max_children) warning_handler("Max children not defined, assuming 10");
35         else maxc = (int) jsonObjectGetNumber(max_children);
36
37         jsonObjectFree(max_req);
38         jsonObjectFree(min_children);
39         jsonObjectFree(max_children);
40         /* --------------------------------------------------- */
41
42         char* resc = va_list_to_string("%s_listener", appname);
43
44         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc ))
45                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
46         free(resc);
47
48         prefork_simple* forker = prefork_simple_init(
49                 osrfSystemGetTransportClient(), maxr, minc, maxc);
50
51         forker->appname = strdup(appname);
52
53         if(forker == NULL)
54                 fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
55
56         prefork_launch_children(forker);
57
58         osrf_prefork_register_routers(appname);
59         
60         info_handler("Launching osrf_forker for app %s", appname);
61         prefork_run(forker);
62         
63         warning_handler("prefork_run() retuned - how??");
64         prefork_free(forker);
65         return 0;
66
67 }
68
69 void osrf_prefork_register_routers( char* appname ) {
70
71         osrfStringArray* arr = osrfNewStringArray(4);
72
73         int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
74         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
75         transport_client* client = osrfSystemGetTransportClient();
76
77         info_handler("router name is %s and we have %d routers to connect to", routerName, c );
78
79         while( c ) {
80                 char* domain = osrfStringArrayGetString(arr, --c);
81                 if(domain) {
82
83                         char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
84                         info_handler("Registering with router %s", jid );
85
86                         transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
87                         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
88
89                         client_send_message( client, msg );
90                         message_free( msg );
91                         free(jid);
92                 }
93         }
94
95         free(routerName);
96         osrfStringArrayFree(arr);
97 }
98
99 void prefork_child_init_hook(prefork_child* child) {
100
101         if(!child) return;
102         info_handler("Child init hook for child %d", child->pid);
103         char* resc = va_list_to_string("%s_drone",child->appname);
104         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) 
105                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
106         free(resc);
107
108         set_proc_title( "opensrf drone [%s]", child->appname );
109 }
110
111 void prefork_child_process_request(prefork_child* child, char* data) {
112         if( !child ) return;
113
114         /* construct the message from the xml */
115         transport_message* msg = new_message_from_xml( data );
116
117         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
118
119         if( ! session->stateless ) { /* keepalive loop for stateful sessions */
120
121                 debug_handler("Entering keepalive loop for session %s", session->session_id );
122         }
123 }
124
125
126 prefork_simple*  prefork_simple_init( transport_client* client, 
127                 int max_requests, int min_children, int max_children ) {
128
129         if( min_children > max_children )
130                 fatal_handler( "min_children (%d) is greater "
131                                 "than max_children (%d)", min_children, max_children );
132
133         if( max_children > ABS_MAX_CHILDREN )
134                 fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
135                                 max_children, ABS_MAX_CHILDREN );
136
137         /* flesh out the struct */
138         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
139         prefork->max_requests = max_requests;
140         prefork->min_children = min_children;
141         prefork->max_children = max_children;
142         prefork->first_child = NULL;
143         prefork->connection = client;
144
145         return prefork;
146 }
147
148 prefork_child*  launch_child( prefork_simple* forker ) {
149
150         pid_t pid;
151         int data_fd[2];
152         int status_fd[2];
153
154         /* Set up the data pipes and add the child struct to the parent */
155         if( pipe(data_fd) < 0 ) /* build the data pipe*/
156                 fatal_handler( "Pipe making error" );
157
158         if( pipe(status_fd) < 0 ) /* build the status pipe */
159                 fatal_handler( "Pipe making error" );
160
161         debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
162         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
163                         data_fd[1], status_fd[0], status_fd[1] );
164
165         child->appname = strdup(forker->appname);
166
167
168         add_prefork_child( forker, child );
169
170         if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
171
172         if( pid > 0 ) {  /* parent */
173
174                 signal(SIGCHLD, sigchld_handler);
175                 (forker->current_num_children)++;
176                 child->pid = pid;
177
178                 info_handler( "Parent launched %d", pid );
179                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
180                         the children are currently using */
181                 return child;
182         }
183
184         else { /* child */
185
186                 debug_handler("I am  new child with read_data_fd = %d and write_status_fd = %d",
187                         child->read_data_fd, child->write_status_fd );
188
189                 child->pid = getpid();
190                 close( child->write_data_fd );
191                 close( child->read_status_fd );
192
193                 /* do the initing */
194                 prefork_child_init_hook(child);
195
196                 prefork_child_wait( child );
197                 exit(0); /* just to be sure */
198          }
199         return NULL;
200 }
201
202
203 void prefork_launch_children( prefork_simple* forker ) {
204         if(!forker) return;
205         int c = 0;
206         while( c++ < forker->min_children )
207                 launch_child( forker );
208 }
209
210
211 void sigchld_handler( int sig ) {
212         signal(SIGCHLD, sigchld_handler);
213         child_dead = 1;
214 }
215
216
217 void reap_children( prefork_simple* forker ) {
218
219         pid_t child_pid;
220         int status;
221
222         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
223                 del_prefork_child( forker, child_pid ); 
224
225         /* replenish */
226         while( forker->current_num_children < forker->min_children ) 
227                 launch_child( forker );
228
229         child_dead = 0;
230 }
231
232 void prefork_run(prefork_simple* forker) {
233
234         if( forker->first_child == NULL )
235                 return;
236
237         transport_message* cur_msg = NULL;
238
239
240         while(1) {
241
242                 if( forker->first_child == NULL ) {/* no more children */
243                         warning_handler("No more children..." );
244                         return;
245                 }
246
247                 debug_handler("Forker going into wait for data...");
248                 cur_msg = client_recv( forker->connection, -1 );
249
250                 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
251
252                 if( cur_msg == NULL ) continue;
253
254                 int honored = 0;        /* true if we've serviced the request */
255
256                 while( ! honored ) {
257
258                         check_children( forker ); 
259
260                         debug_handler( "Server received inbound data" );
261                         int k;
262                         prefork_child* cur_child = forker->first_child;
263
264                         /* Look for an available child */
265                         for( k = 0; k < forker->current_num_children; k++ ) {
266
267                                 debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
268                                 debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
269                         
270                                 if( cur_child->available ) {
271                                         debug_handler( "sending data to %d", cur_child->pid );
272
273                                         message_prepare_xml( cur_msg );
274                                         char* data = cur_msg->msg_xml;
275                                         if( ! data || strlen(data) < 1 ) break;
276
277                                         cur_child->available = 0;
278                                         debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
279
280                                         int written = 0;
281                                         //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
282                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
283                                                 warning_handler("Write returned error %d", errno);
284                                                 cur_child = cur_child->next;
285                                                 continue;
286                                         }
287
288                                         //fprintf(stderr, "Wrote %d bytes to child\n", written);
289
290                                         forker->first_child = cur_child->next;
291                                         honored = 1;
292                                         break;
293                                 } else 
294                                         cur_child = cur_child->next;
295                         } 
296
297                         /* if none available, add a new child if we can */
298                         if( ! honored ) {
299                                 debug_handler("Not enough children, attempting to add...");
300                                 if( forker->current_num_children < forker->max_children ) {
301                                         debug_handler( "Launching new child with current_num = %d",
302                                                         forker->current_num_children );
303
304                                         prefork_child* new_child = launch_child( forker );
305                                         message_prepare_xml( cur_msg );
306                                         char* data = cur_msg->msg_xml;
307                                         if( ! data || strlen(data) < 1 ) break;
308                                         new_child->available = 0;
309                                         debug_handler( "sending data to %d", new_child->pid );
310                                         debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
311                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
312                                         forker->first_child = new_child->next;
313                                         honored = 1;
314                                 }
315                         }
316
317                         if( !honored ) {
318                                 warning_handler( "No children available, sleeping and looping..." );
319                                 usleep( 50000 ); /* 50 milliseconds */
320                         }
321
322                         if( child_dead )
323                                 reap_children(forker);
324
325
326                         //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
327
328                 } // honored?
329
330                 message_free( cur_msg );
331
332         } /* top level listen loop */
333
334 }
335
336
337 void check_children( prefork_simple* forker ) {
338
339         //check_begin:
340
341         int select_ret;
342         fd_set read_set;
343         FD_ZERO(&read_set);
344         int max_fd = 0;
345         int n;
346
347         struct timeval tv;
348         tv.tv_sec       = 0;
349         tv.tv_usec      = 0;
350
351         if( child_dead )
352                 reap_children(forker);
353
354         prefork_child* cur_child = forker->first_child;
355
356         int i;
357         for( i = 0; i!= forker->current_num_children; i++ ) {
358
359                 if( cur_child->read_status_fd > max_fd )
360                         max_fd = cur_child->read_status_fd;
361                 FD_SET( cur_child->read_status_fd, &read_set );
362                 cur_child = cur_child->next;
363         }
364
365         FD_CLR(0,&read_set);/* just to be sure */
366
367         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
368                 warning_handler( "Select returned error %d on check_children", errno );
369         }
370
371         if( select_ret == 0 )
372                 return;
373
374         /* see if one of a child has told us it's done */
375         cur_child = forker->first_child;
376         int j;
377         int num_handled = 0;
378         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
379
380                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
381                         //printf( "Server received status from a child %d\n", cur_child->pid );
382                         debug_handler( "Server received status from a child %d", cur_child->pid );
383
384                         num_handled++;
385
386                         /* now suck off the data */
387                         char buf[64];
388                         memset( buf, 0, 64);
389                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
390                                 warning_handler("Read error afer select in child status read with errno %d", errno);
391                         }
392
393                         debug_handler( "Read %d bytes from status buffer: %s", n, buf );
394                         cur_child->available = 1;
395                 }
396                 cur_child = cur_child->next;
397         } 
398
399 }
400
401
402 void prefork_child_wait( prefork_child* child ) {
403
404         int i,n;
405         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
406         char buf[READ_BUFSIZE];
407         memset( buf, 0, READ_BUFSIZE );
408
409         for( i = 0; i!= child->max_requests; i++ ) {
410
411                 n = -1;
412                 clr_fl(child->read_data_fd, O_NONBLOCK );
413                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
414                         buffer_add( gbuf, buf );
415                         memset( buf, 0, READ_BUFSIZE );
416
417                         //fprintf(stderr, "Child read %d bytes\n", n);
418
419                         if( n == READ_BUFSIZE ) { 
420                                 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
421                                 /* XXX */
422                                 /* either we have exactly READ_BUFSIZE data, 
423                                         or there's more waiting that we need to grab*/
424                                 /* must set to non-block for reading more */
425                         } else {
426                                 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
427                                 prefork_child_process_request(child, gbuf->buf);
428                                 buffer_reset( gbuf );
429                                 break;
430                         }
431                 }
432
433                 if( n < 0 ) {
434                         warning_handler( "Child read returned error with errno %d", errno );
435                         break;
436                 }
437
438                 if( i < child->max_requests - 1 ) 
439                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
440         }
441
442         buffer_free(gbuf);
443
444         debug_handler("Child exiting...[%d]", getpid() );
445
446         exit(0);
447 }
448
449
450 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
451         
452         if( forker->first_child == NULL ) {
453                 forker->first_child = child;
454                 child->next = child;
455                 return;
456         }
457
458         /* we put the child in as the last because, regardless, 
459                 we have to do the DLL splice dance, and this is the
460            simplest way */
461
462         prefork_child* start_child = forker->first_child;
463         while(1) {
464                 if( forker->first_child->next == start_child ) 
465                         break;
466                 forker->first_child = forker->first_child->next;
467         }
468
469         /* here we know that forker->first_child is the last element 
470                 in the list and start_child is the first.  Insert the
471                 new child between them*/
472
473         forker->first_child->next = child;
474         child->next = start_child;
475         return;
476 }
477
478 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
479
480         if( forker->first_child == NULL ) { return NULL; }
481         prefork_child* start_child = forker->first_child;
482         do {
483                 if( forker->first_child->pid == pid ) 
484                         return forker->first_child;
485         } while( (forker->first_child = forker->first_child->next) != start_child );
486
487         return NULL;
488 }
489
490
491 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
492
493         if( forker->first_child == NULL ) { return; }
494
495         (forker->current_num_children)--;
496         debug_handler("Deleting Child: %d", pid );
497
498         prefork_child* start_child = forker->first_child; /* starting point */
499         prefork_child* cur_child        = start_child; /* current pointer */
500         prefork_child* prev_child       = start_child; /* the trailing pointer */
501
502         /* special case where there is only one in the list */
503         if( start_child == start_child->next ) {
504                 if( start_child->pid == pid ) {
505                         forker->first_child = NULL;
506
507                         close( start_child->read_data_fd );
508                         close( start_child->write_data_fd );
509                         close( start_child->read_status_fd );
510                         close( start_child->write_status_fd );
511
512                         prefork_child_free( start_child );
513                 }
514                 return;
515         }
516
517
518         /* special case where the first item in the list needs to be removed */
519         if( start_child->pid == pid ) { 
520
521                 /* find the last one so we can remove the start_child */
522                 do { 
523                         prev_child = cur_child;
524                         cur_child = cur_child->next;
525                 }while( cur_child != start_child );
526
527                 /* now cur_child == start_child */
528                 prev_child->next = cur_child->next;
529                 forker->first_child = prev_child;
530
531                 close( cur_child->read_data_fd );
532                 close( cur_child->write_data_fd );
533                 close( cur_child->read_status_fd );
534                 close( cur_child->write_status_fd );
535
536                 prefork_child_free( cur_child );
537                 return;
538         } 
539
540         do {
541                 prev_child = cur_child;
542                 cur_child = cur_child->next;
543
544                 if( cur_child->pid == pid ) {
545                         prev_child->next = cur_child->next;
546
547                         close( cur_child->read_data_fd );
548                         close( cur_child->write_data_fd );
549                         close( cur_child->read_status_fd );
550                         close( cur_child->write_status_fd );
551
552                         prefork_child_free( cur_child );
553                         return;
554                 }
555
556         } while(cur_child != start_child);
557 }
558
559
560
561
562 prefork_child* prefork_child_init( 
563         int max_requests, int read_data_fd, int write_data_fd, 
564         int read_status_fd, int write_status_fd ) {
565
566         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
567         child->max_requests             = max_requests;
568         child->read_data_fd             = read_data_fd;
569         child->write_data_fd            = write_data_fd;
570         child->read_status_fd   = read_status_fd;
571         child->write_status_fd  = write_status_fd;
572         child->available                        = 1;
573
574         return child;
575 }
576
577
578 int prefork_free( prefork_simple* prefork ) {
579         
580         while( prefork->first_child != NULL ) {
581                 info_handler( "Killing children and sleeping 1 to reap..." );
582                 kill( 0,        SIGKILL );
583                 sleep(1);
584         }
585
586         client_free(prefork->connection);
587         free(prefork->appname);
588         free( prefork );
589         return 1;
590 }
591
592 int prefork_child_free( prefork_child* child ) { 
593         free(child->appname);
594         close(child->read_data_fd);
595         close(child->write_status_fd);
596         free( child ); 
597         return 1;
598 }
599