changed router binary to opensrf_router to prevent opensrf_all from destroying
[OpenSRF.git] / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3 #include "osrf_app_session.h"
4
5 /* true if we just deleted a child.  This will allow us to make sure we're
6         not trying to use freed memory */
7 int child_dead;
8
9 int main();
10 void sigchld_handler( int sig );
11
12 int osrf_prefork_run(char* appname) {
13
14         if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
15
16         set_proc_title( "OpenSRF Listener [%s]", appname );
17
18         int maxr = 1000; 
19         int maxc = 10;
20         int minc = 3;
21
22         info_handler("Loading config in osrf_forker for app %s", appname);
23
24         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
25         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
26         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
27
28         
29         if(!max_req) warning_handler("Max requests not defined, assuming 1000");
30         else maxr = (int) jsonObjectGetNumber(max_req);
31
32         if(!min_children) warning_handler("Min children not defined, assuming 3");
33         else minc = (int) jsonObjectGetNumber(min_children);
34
35         if(!max_children) warning_handler("Max children not defined, assuming 10");
36         else maxc = (int) jsonObjectGetNumber(max_children);
37
38         jsonObjectFree(max_req);
39         jsonObjectFree(min_children);
40         jsonObjectFree(max_children);
41         /* --------------------------------------------------- */
42
43         char* resc = va_list_to_string("%s_listener", appname);
44
45         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc ))
46                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
47         free(resc);
48
49         prefork_simple* forker = prefork_simple_init(
50                 osrfSystemGetTransportClient(), maxr, minc, maxc);
51
52         forker->appname = strdup(appname);
53
54         if(forker == NULL)
55                 fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
56
57         prefork_launch_children(forker);
58
59         osrf_prefork_register_routers(appname);
60         
61         info_handler("Launching osrf_forker for app %s", appname);
62         prefork_run(forker);
63         
64         warning_handler("prefork_run() retuned - how??");
65         prefork_free(forker);
66         return 0;
67
68 }
69
70 void osrf_prefork_register_routers( char* appname ) {
71
72         osrfStringArray* arr = osrfNewStringArray(4);
73
74         int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
75         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
76         transport_client* client = osrfSystemGetTransportClient();
77
78         info_handler("router name is %s and we have %d routers to connect to", routerName, c );
79
80         while( c ) {
81                 char* domain = osrfStringArrayGetString(arr, --c);
82                 if(domain) {
83
84                         char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
85                         info_handler("Registering with router %s", jid );
86
87                         transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
88                         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
89
90                         client_send_message( client, msg );
91                         message_free( msg );
92                         free(jid);
93                 }
94         }
95
96         free(routerName);
97         osrfStringArrayFree(arr);
98 }
99
100 void prefork_child_init_hook(prefork_child* child) {
101
102         if(!child) return;
103         info_handler("Child init hook for child %d", child->pid);
104         char* resc = va_list_to_string("%s_drone",child->appname);
105         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) 
106                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
107         free(resc);
108
109         set_proc_title( "OpenSRF Drone [%s]", child->appname );
110 }
111
112 void prefork_child_process_request(prefork_child* child, char* data) {
113         if( !child ) return;
114
115         /* construct the message from the xml */
116         transport_message* msg = new_message_from_xml( data );
117
118         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
119
120         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) return;
121
122         /* keepalive loop for stateful sessions */
123
124                 debug_handler("Entering keepalive loop for session %s", session->session_id );
125 }
126
127
128 prefork_simple*  prefork_simple_init( transport_client* client, 
129                 int max_requests, int min_children, int max_children ) {
130
131         if( min_children > max_children )
132                 fatal_handler( "min_children (%d) is greater "
133                                 "than max_children (%d)", min_children, max_children );
134
135         if( max_children > ABS_MAX_CHILDREN )
136                 fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
137                                 max_children, ABS_MAX_CHILDREN );
138
139         /* flesh out the struct */
140         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
141         prefork->max_requests = max_requests;
142         prefork->min_children = min_children;
143         prefork->max_children = max_children;
144         prefork->first_child = NULL;
145         prefork->connection = client;
146
147         return prefork;
148 }
149
150 prefork_child*  launch_child( prefork_simple* forker ) {
151
152         pid_t pid;
153         int data_fd[2];
154         int status_fd[2];
155
156         /* Set up the data pipes and add the child struct to the parent */
157         if( pipe(data_fd) < 0 ) /* build the data pipe*/
158                 fatal_handler( "Pipe making error" );
159
160         if( pipe(status_fd) < 0 ) /* build the status pipe */
161                 fatal_handler( "Pipe making error" );
162
163         debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
164         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
165                         data_fd[1], status_fd[0], status_fd[1] );
166
167         child->appname = strdup(forker->appname);
168
169
170         add_prefork_child( forker, child );
171
172         if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
173
174         if( pid > 0 ) {  /* parent */
175
176                 signal(SIGCHLD, sigchld_handler);
177                 (forker->current_num_children)++;
178                 child->pid = pid;
179
180                 info_handler( "Parent launched %d", pid );
181                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
182                         the children are currently using */
183                 return child;
184         }
185
186         else { /* child */
187
188                 debug_handler("I am  new child with read_data_fd = %d and write_status_fd = %d",
189                         child->read_data_fd, child->write_status_fd );
190
191                 child->pid = getpid();
192                 close( child->write_data_fd );
193                 close( child->read_status_fd );
194
195                 /* do the initing */
196                 prefork_child_init_hook(child);
197
198                 prefork_child_wait( child );
199                 exit(0); /* just to be sure */
200          }
201         return NULL;
202 }
203
204
205 void prefork_launch_children( prefork_simple* forker ) {
206         if(!forker) return;
207         int c = 0;
208         while( c++ < forker->min_children )
209                 launch_child( forker );
210 }
211
212
213 void sigchld_handler( int sig ) {
214         signal(SIGCHLD, sigchld_handler);
215         child_dead = 1;
216 }
217
218
219 void reap_children( prefork_simple* forker ) {
220
221         pid_t child_pid;
222         int status;
223
224         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
225                 del_prefork_child( forker, child_pid ); 
226
227         /* replenish */
228         while( forker->current_num_children < forker->min_children ) 
229                 launch_child( forker );
230
231         child_dead = 0;
232 }
233
234 void prefork_run(prefork_simple* forker) {
235
236         if( forker->first_child == NULL )
237                 return;
238
239         transport_message* cur_msg = NULL;
240
241
242         while(1) {
243
244                 if( forker->first_child == NULL ) {/* no more children */
245                         warning_handler("No more children..." );
246                         return;
247                 }
248
249                 debug_handler("Forker going into wait for data...");
250                 cur_msg = client_recv( forker->connection, -1 );
251
252                 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
253
254                 if( cur_msg == NULL ) continue;
255
256                 int honored = 0;        /* true if we've serviced the request */
257
258                 while( ! honored ) {
259
260                         check_children( forker ); 
261
262                         debug_handler( "Server received inbound data" );
263                         int k;
264                         prefork_child* cur_child = forker->first_child;
265
266                         /* Look for an available child */
267                         for( k = 0; k < forker->current_num_children; k++ ) {
268
269                                 debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
270                                 debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
271                         
272                                 if( cur_child->available ) {
273                                         debug_handler( "sending data to %d", cur_child->pid );
274
275                                         message_prepare_xml( cur_msg );
276                                         char* data = cur_msg->msg_xml;
277                                         if( ! data || strlen(data) < 1 ) break;
278
279                                         cur_child->available = 0;
280                                         debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
281
282                                         int written = 0;
283                                         //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
284                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
285                                                 warning_handler("Write returned error %d", errno);
286                                                 cur_child = cur_child->next;
287                                                 continue;
288                                         }
289
290                                         //fprintf(stderr, "Wrote %d bytes to child\n", written);
291
292                                         forker->first_child = cur_child->next;
293                                         honored = 1;
294                                         break;
295                                 } else 
296                                         cur_child = cur_child->next;
297                         } 
298
299                         /* if none available, add a new child if we can */
300                         if( ! honored ) {
301                                 debug_handler("Not enough children, attempting to add...");
302                                 if( forker->current_num_children < forker->max_children ) {
303                                         debug_handler( "Launching new child with current_num = %d",
304                                                         forker->current_num_children );
305
306                                         prefork_child* new_child = launch_child( forker );
307                                         message_prepare_xml( cur_msg );
308                                         char* data = cur_msg->msg_xml;
309                                         if( ! data || strlen(data) < 1 ) break;
310                                         new_child->available = 0;
311                                         debug_handler( "sending data to %d", new_child->pid );
312                                         debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
313                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
314                                         forker->first_child = new_child->next;
315                                         honored = 1;
316                                 }
317                         }
318
319                         if( !honored ) {
320                                 warning_handler( "No children available, sleeping and looping..." );
321                                 usleep( 50000 ); /* 50 milliseconds */
322                         }
323
324                         if( child_dead )
325                                 reap_children(forker);
326
327
328                         //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
329
330                 } // honored?
331
332                 message_free( cur_msg );
333
334         } /* top level listen loop */
335
336 }
337
338
339 void check_children( prefork_simple* forker ) {
340
341         //check_begin:
342
343         int select_ret;
344         fd_set read_set;
345         FD_ZERO(&read_set);
346         int max_fd = 0;
347         int n;
348
349         struct timeval tv;
350         tv.tv_sec       = 0;
351         tv.tv_usec      = 0;
352
353         if( child_dead )
354                 reap_children(forker);
355
356         prefork_child* cur_child = forker->first_child;
357
358         int i;
359         for( i = 0; i!= forker->current_num_children; i++ ) {
360
361                 if( cur_child->read_status_fd > max_fd )
362                         max_fd = cur_child->read_status_fd;
363                 FD_SET( cur_child->read_status_fd, &read_set );
364                 cur_child = cur_child->next;
365         }
366
367         FD_CLR(0,&read_set);/* just to be sure */
368
369         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
370                 warning_handler( "Select returned error %d on check_children", errno );
371         }
372
373         if( select_ret == 0 )
374                 return;
375
376         /* see if one of a child has told us it's done */
377         cur_child = forker->first_child;
378         int j;
379         int num_handled = 0;
380         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
381
382                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
383                         //printf( "Server received status from a child %d\n", cur_child->pid );
384                         debug_handler( "Server received status from a child %d", cur_child->pid );
385
386                         num_handled++;
387
388                         /* now suck off the data */
389                         char buf[64];
390                         memset( buf, 0, 64);
391                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
392                                 warning_handler("Read error afer select in child status read with errno %d", errno);
393                         }
394
395                         debug_handler( "Read %d bytes from status buffer: %s", n, buf );
396                         cur_child->available = 1;
397                 }
398                 cur_child = cur_child->next;
399         } 
400
401 }
402
403
404 void prefork_child_wait( prefork_child* child ) {
405
406         int i,n;
407         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
408         char buf[READ_BUFSIZE];
409         memset( buf, 0, READ_BUFSIZE );
410
411         for( i = 0; i!= child->max_requests; i++ ) {
412
413                 n = -1;
414                 clr_fl(child->read_data_fd, O_NONBLOCK );
415                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
416                         buffer_add( gbuf, buf );
417                         memset( buf, 0, READ_BUFSIZE );
418
419                         //fprintf(stderr, "Child read %d bytes\n", n);
420
421                         if( n == READ_BUFSIZE ) { 
422                                 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
423                                 /* XXX */
424                                 /* either we have exactly READ_BUFSIZE data, 
425                                         or there's more waiting that we need to grab*/
426                                 /* must set to non-block for reading more */
427                         } else {
428                                 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
429                                 prefork_child_process_request(child, gbuf->buf);
430                                 buffer_reset( gbuf );
431                                 break;
432                         }
433                 }
434
435                 if( n < 0 ) {
436                         warning_handler( "Child read returned error with errno %d", errno );
437                         break;
438                 }
439
440                 if( i < child->max_requests - 1 ) 
441                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
442         }
443
444         buffer_free(gbuf);
445
446         debug_handler("Child exiting...[%d]", getpid() );
447
448         exit(0);
449 }
450
451
452 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
453         
454         if( forker->first_child == NULL ) {
455                 forker->first_child = child;
456                 child->next = child;
457                 return;
458         }
459
460         /* we put the child in as the last because, regardless, 
461                 we have to do the DLL splice dance, and this is the
462            simplest way */
463
464         prefork_child* start_child = forker->first_child;
465         while(1) {
466                 if( forker->first_child->next == start_child ) 
467                         break;
468                 forker->first_child = forker->first_child->next;
469         }
470
471         /* here we know that forker->first_child is the last element 
472                 in the list and start_child is the first.  Insert the
473                 new child between them*/
474
475         forker->first_child->next = child;
476         child->next = start_child;
477         return;
478 }
479
480 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
481
482         if( forker->first_child == NULL ) { return NULL; }
483         prefork_child* start_child = forker->first_child;
484         do {
485                 if( forker->first_child->pid == pid ) 
486                         return forker->first_child;
487         } while( (forker->first_child = forker->first_child->next) != start_child );
488
489         return NULL;
490 }
491
492
493 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
494
495         if( forker->first_child == NULL ) { return; }
496
497         (forker->current_num_children)--;
498         debug_handler("Deleting Child: %d", pid );
499
500         prefork_child* start_child = forker->first_child; /* starting point */
501         prefork_child* cur_child        = start_child; /* current pointer */
502         prefork_child* prev_child       = start_child; /* the trailing pointer */
503
504         /* special case where there is only one in the list */
505         if( start_child == start_child->next ) {
506                 if( start_child->pid == pid ) {
507                         forker->first_child = NULL;
508
509                         close( start_child->read_data_fd );
510                         close( start_child->write_data_fd );
511                         close( start_child->read_status_fd );
512                         close( start_child->write_status_fd );
513
514                         prefork_child_free( start_child );
515                 }
516                 return;
517         }
518
519
520         /* special case where the first item in the list needs to be removed */
521         if( start_child->pid == pid ) { 
522
523                 /* find the last one so we can remove the start_child */
524                 do { 
525                         prev_child = cur_child;
526                         cur_child = cur_child->next;
527                 }while( cur_child != start_child );
528
529                 /* now cur_child == start_child */
530                 prev_child->next = cur_child->next;
531                 forker->first_child = prev_child;
532
533                 close( cur_child->read_data_fd );
534                 close( cur_child->write_data_fd );
535                 close( cur_child->read_status_fd );
536                 close( cur_child->write_status_fd );
537
538                 prefork_child_free( cur_child );
539                 return;
540         } 
541
542         do {
543                 prev_child = cur_child;
544                 cur_child = cur_child->next;
545
546                 if( cur_child->pid == pid ) {
547                         prev_child->next = cur_child->next;
548
549                         close( cur_child->read_data_fd );
550                         close( cur_child->write_data_fd );
551                         close( cur_child->read_status_fd );
552                         close( cur_child->write_status_fd );
553
554                         prefork_child_free( cur_child );
555                         return;
556                 }
557
558         } while(cur_child != start_child);
559 }
560
561
562
563
564 prefork_child* prefork_child_init( 
565         int max_requests, int read_data_fd, int write_data_fd, 
566         int read_status_fd, int write_status_fd ) {
567
568         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
569         child->max_requests             = max_requests;
570         child->read_data_fd             = read_data_fd;
571         child->write_data_fd            = write_data_fd;
572         child->read_status_fd   = read_status_fd;
573         child->write_status_fd  = write_status_fd;
574         child->available                        = 1;
575
576         return child;
577 }
578
579
580 int prefork_free( prefork_simple* prefork ) {
581         
582         while( prefork->first_child != NULL ) {
583                 info_handler( "Killing children and sleeping 1 to reap..." );
584                 kill( 0,        SIGKILL );
585                 sleep(1);
586         }
587
588         client_free(prefork->connection);
589         free(prefork->appname);
590         free( prefork );
591         return 1;
592 }
593
594 int prefork_child_free( prefork_child* child ) { 
595         free(child->appname);
596         close(child->read_data_fd);
597         close(child->write_status_fd);
598         free( child ); 
599         return 1;
600 }
601