]> git.evergreen-ils.org Git - Evergreen.git/blob - OpenSRF/src/libstack/osrf_prefork.c
small bug fixes
[Evergreen.git] / OpenSRF / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3 #include "osrf_app_session.h"
4
5 /* true if we just deleted a child.  This will allow us to make sure we're
6         not trying to use freed memory */
7 int child_dead;
8
9 int main();
10 void sigchld_handler( int sig );
11
12 int osrf_prefork_run(char* appname) {
13
14         if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
15
16         set_proc_title( "OpenSRF Listener [%s]", appname );
17
18         int maxr = 1000; 
19         int maxc = 10;
20         int minc = 3;
21
22         info_handler("Loading config in osrf_forker for app %s", appname);
23
24         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
25         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
26         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
27
28         
29         if(!max_req) warning_handler("Max requests not defined, assuming 1000");
30         else maxr = (int) jsonObjectGetNumber(max_req);
31
32         if(!min_children) warning_handler("Min children not defined, assuming 3");
33         else minc = (int) jsonObjectGetNumber(min_children);
34
35         if(!max_children) warning_handler("Max children not defined, assuming 10");
36         else maxc = (int) jsonObjectGetNumber(max_children);
37
38         jsonObjectFree(max_req);
39         jsonObjectFree(min_children);
40         jsonObjectFree(max_children);
41         /* --------------------------------------------------- */
42
43         char* resc = va_list_to_string("%s_listener", appname);
44
45         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc ))
46                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
47         free(resc);
48
49         prefork_simple* forker = prefork_simple_init(
50                 osrfSystemGetTransportClient(), maxr, minc, maxc);
51
52         forker->appname = strdup(appname);
53
54         if(forker == NULL)
55                 fatal_handler("osrf_prefork_run() failed to create prefork_simple object");
56
57         prefork_launch_children(forker);
58
59         osrf_prefork_register_routers(appname);
60         
61         info_handler("Launching osrf_forker for app %s", appname);
62         prefork_run(forker);
63         
64         warning_handler("prefork_run() retuned - how??");
65         prefork_free(forker);
66         return 0;
67
68 }
69
70 void osrf_prefork_register_routers( char* appname ) {
71
72         osrfStringArray* arr = osrfNewStringArray(4);
73
74         int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
75         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
76         transport_client* client = osrfSystemGetTransportClient();
77
78         info_handler("router name is %s and we have %d routers to connect to", routerName, c );
79
80         while( c ) {
81                 char* domain = osrfStringArrayGetString(arr, --c);
82                 if(domain) {
83
84                         char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
85                         info_handler("Registering with router %s", jid );
86
87                         transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
88                         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
89
90                         client_send_message( client, msg );
91                         message_free( msg );
92                         free(jid);
93                 }
94         }
95
96         free(routerName);
97         osrfStringArrayFree(arr);
98 }
99
100 void prefork_child_init_hook(prefork_child* child) {
101
102         if(!child) return;
103         info_handler("Child init hook for child %d", child->pid);
104         char* resc = va_list_to_string("%s_drone",child->appname);
105         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) 
106                 fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
107         free(resc);
108
109         set_proc_title( "OpenSRF Drone [%s]", child->appname );
110 }
111
112 void prefork_child_process_request(prefork_child* child, char* data) {
113         if( !child ) return;
114
115         /* construct the message from the xml */
116         transport_message* msg = new_message_from_xml( data );
117
118         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
119         if(!session) return;
120
121         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) return;
122
123         /* keepalive loop for stateful sessions */
124
125                 debug_handler("Entering keepalive loop for session %s", session->session_id );
126 }
127
128
129 prefork_simple*  prefork_simple_init( transport_client* client, 
130                 int max_requests, int min_children, int max_children ) {
131
132         if( min_children > max_children )
133                 fatal_handler( "min_children (%d) is greater "
134                                 "than max_children (%d)", min_children, max_children );
135
136         if( max_children > ABS_MAX_CHILDREN )
137                 fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
138                                 max_children, ABS_MAX_CHILDREN );
139
140         /* flesh out the struct */
141         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
142         prefork->max_requests = max_requests;
143         prefork->min_children = min_children;
144         prefork->max_children = max_children;
145         prefork->first_child = NULL;
146         prefork->connection = client;
147
148         return prefork;
149 }
150
151 prefork_child*  launch_child( prefork_simple* forker ) {
152
153         pid_t pid;
154         int data_fd[2];
155         int status_fd[2];
156
157         /* Set up the data pipes and add the child struct to the parent */
158         if( pipe(data_fd) < 0 ) /* build the data pipe*/
159                 fatal_handler( "Pipe making error" );
160
161         if( pipe(status_fd) < 0 ) /* build the status pipe */
162                 fatal_handler( "Pipe making error" );
163
164         debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
165         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
166                         data_fd[1], status_fd[0], status_fd[1] );
167
168         child->appname = strdup(forker->appname);
169
170
171         add_prefork_child( forker, child );
172
173         if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" );
174
175         if( pid > 0 ) {  /* parent */
176
177                 signal(SIGCHLD, sigchld_handler);
178                 (forker->current_num_children)++;
179                 child->pid = pid;
180
181                 info_handler( "Parent launched %d", pid );
182                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
183                         the children are currently using */
184                 return child;
185         }
186
187         else { /* child */
188
189                 debug_handler("I am  new child with read_data_fd = %d and write_status_fd = %d",
190                         child->read_data_fd, child->write_status_fd );
191
192                 child->pid = getpid();
193                 close( child->write_data_fd );
194                 close( child->read_status_fd );
195
196                 /* do the initing */
197                 prefork_child_init_hook(child);
198
199                 prefork_child_wait( child );
200                 exit(0); /* just to be sure */
201          }
202         return NULL;
203 }
204
205
206 void prefork_launch_children( prefork_simple* forker ) {
207         if(!forker) return;
208         int c = 0;
209         while( c++ < forker->min_children )
210                 launch_child( forker );
211 }
212
213
214 void sigchld_handler( int sig ) {
215         signal(SIGCHLD, sigchld_handler);
216         child_dead = 1;
217 }
218
219
220 void reap_children( prefork_simple* forker ) {
221
222         pid_t child_pid;
223         int status;
224
225         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
226                 del_prefork_child( forker, child_pid ); 
227
228         /* replenish */
229         while( forker->current_num_children < forker->min_children ) 
230                 launch_child( forker );
231
232         child_dead = 0;
233 }
234
235 void prefork_run(prefork_simple* forker) {
236
237         if( forker->first_child == NULL )
238                 return;
239
240         transport_message* cur_msg = NULL;
241
242
243         while(1) {
244
245                 if( forker->first_child == NULL ) {/* no more children */
246                         warning_handler("No more children..." );
247                         return;
248                 }
249
250                 debug_handler("Forker going into wait for data...");
251                 cur_msg = client_recv( forker->connection, -1 );
252
253                 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
254
255                 if( cur_msg == NULL ) continue;
256
257                 int honored = 0;        /* true if we've serviced the request */
258
259                 while( ! honored ) {
260
261                         check_children( forker ); 
262
263                         debug_handler( "Server received inbound data" );
264                         int k;
265                         prefork_child* cur_child = forker->first_child;
266
267                         /* Look for an available child */
268                         for( k = 0; k < forker->current_num_children; k++ ) {
269
270                                 debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid );
271                                 debug_handler("Current num children %d and loop %d", forker->current_num_children, k);
272                         
273                                 if( cur_child->available ) {
274                                         debug_handler( "sending data to %d", cur_child->pid );
275
276                                         message_prepare_xml( cur_msg );
277                                         char* data = cur_msg->msg_xml;
278                                         if( ! data || strlen(data) < 1 ) break;
279
280                                         cur_child->available = 0;
281                                         debug_handler( "Writing to child fd %d", cur_child->write_data_fd );
282
283                                         int written = 0;
284                                         //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
285                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
286                                                 warning_handler("Write returned error %d", errno);
287                                                 cur_child = cur_child->next;
288                                                 continue;
289                                         }
290
291                                         //fprintf(stderr, "Wrote %d bytes to child\n", written);
292
293                                         forker->first_child = cur_child->next;
294                                         honored = 1;
295                                         break;
296                                 } else 
297                                         cur_child = cur_child->next;
298                         } 
299
300                         /* if none available, add a new child if we can */
301                         if( ! honored ) {
302                                 debug_handler("Not enough children, attempting to add...");
303                                 if( forker->current_num_children < forker->max_children ) {
304                                         debug_handler( "Launching new child with current_num = %d",
305                                                         forker->current_num_children );
306
307                                         prefork_child* new_child = launch_child( forker );
308                                         message_prepare_xml( cur_msg );
309                                         char* data = cur_msg->msg_xml;
310                                         if( ! data || strlen(data) < 1 ) break;
311                                         new_child->available = 0;
312                                         debug_handler( "sending data to %d", new_child->pid );
313                                         debug_handler( "Writing to new child fd %d", new_child->write_data_fd );
314                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
315                                         forker->first_child = new_child->next;
316                                         honored = 1;
317                                 }
318                         }
319
320                         if( !honored ) {
321                                 warning_handler( "No children available, sleeping and looping..." );
322                                 usleep( 50000 ); /* 50 milliseconds */
323                         }
324
325                         if( child_dead )
326                                 reap_children(forker);
327
328
329                         //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
330
331                 } // honored?
332
333                 message_free( cur_msg );
334
335         } /* top level listen loop */
336
337 }
338
339
340 void check_children( prefork_simple* forker ) {
341
342         //check_begin:
343
344         int select_ret;
345         fd_set read_set;
346         FD_ZERO(&read_set);
347         int max_fd = 0;
348         int n;
349
350         struct timeval tv;
351         tv.tv_sec       = 0;
352         tv.tv_usec      = 0;
353
354         if( child_dead )
355                 reap_children(forker);
356
357         prefork_child* cur_child = forker->first_child;
358
359         int i;
360         for( i = 0; i!= forker->current_num_children; i++ ) {
361
362                 if( cur_child->read_status_fd > max_fd )
363                         max_fd = cur_child->read_status_fd;
364                 FD_SET( cur_child->read_status_fd, &read_set );
365                 cur_child = cur_child->next;
366         }
367
368         FD_CLR(0,&read_set);/* just to be sure */
369
370         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
371                 warning_handler( "Select returned error %d on check_children", errno );
372         }
373
374         if( select_ret == 0 )
375                 return;
376
377         /* see if one of a child has told us it's done */
378         cur_child = forker->first_child;
379         int j;
380         int num_handled = 0;
381         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
382
383                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
384                         //printf( "Server received status from a child %d\n", cur_child->pid );
385                         debug_handler( "Server received status from a child %d", cur_child->pid );
386
387                         num_handled++;
388
389                         /* now suck off the data */
390                         char buf[64];
391                         memset( buf, 0, 64);
392                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
393                                 warning_handler("Read error afer select in child status read with errno %d", errno);
394                         }
395
396                         debug_handler( "Read %d bytes from status buffer: %s", n, buf );
397                         cur_child->available = 1;
398                 }
399                 cur_child = cur_child->next;
400         } 
401
402 }
403
404
405 void prefork_child_wait( prefork_child* child ) {
406
407         int i,n;
408         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
409         char buf[READ_BUFSIZE];
410         memset( buf, 0, READ_BUFSIZE );
411
412         for( i = 0; i!= child->max_requests; i++ ) {
413
414                 n = -1;
415                 clr_fl(child->read_data_fd, O_NONBLOCK );
416                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
417                         buffer_add( gbuf, buf );
418                         memset( buf, 0, READ_BUFSIZE );
419
420                         //fprintf(stderr, "Child read %d bytes\n", n);
421
422                         if( n == READ_BUFSIZE ) { 
423                                 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
424                                 /* XXX */
425                                 /* either we have exactly READ_BUFSIZE data, 
426                                         or there's more waiting that we need to grab*/
427                                 /* must set to non-block for reading more */
428                         } else {
429                                 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
430                                 prefork_child_process_request(child, gbuf->buf);
431                                 buffer_reset( gbuf );
432                                 break;
433                         }
434                 }
435
436                 if( n < 0 ) {
437                         warning_handler( "Child read returned error with errno %d", errno );
438                         break;
439                 }
440
441                 if( i < child->max_requests - 1 ) 
442                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
443         }
444
445         buffer_free(gbuf);
446
447         debug_handler("Child exiting...[%d]", getpid() );
448
449         exit(0);
450 }
451
452
453 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
454         
455         if( forker->first_child == NULL ) {
456                 forker->first_child = child;
457                 child->next = child;
458                 return;
459         }
460
461         /* we put the child in as the last because, regardless, 
462                 we have to do the DLL splice dance, and this is the
463            simplest way */
464
465         prefork_child* start_child = forker->first_child;
466         while(1) {
467                 if( forker->first_child->next == start_child ) 
468                         break;
469                 forker->first_child = forker->first_child->next;
470         }
471
472         /* here we know that forker->first_child is the last element 
473                 in the list and start_child is the first.  Insert the
474                 new child between them*/
475
476         forker->first_child->next = child;
477         child->next = start_child;
478         return;
479 }
480
481 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
482
483         if( forker->first_child == NULL ) { return NULL; }
484         prefork_child* start_child = forker->first_child;
485         do {
486                 if( forker->first_child->pid == pid ) 
487                         return forker->first_child;
488         } while( (forker->first_child = forker->first_child->next) != start_child );
489
490         return NULL;
491 }
492
493
494 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
495
496         if( forker->first_child == NULL ) { return; }
497
498         (forker->current_num_children)--;
499         debug_handler("Deleting Child: %d", pid );
500
501         prefork_child* start_child = forker->first_child; /* starting point */
502         prefork_child* cur_child        = start_child; /* current pointer */
503         prefork_child* prev_child       = start_child; /* the trailing pointer */
504
505         /* special case where there is only one in the list */
506         if( start_child == start_child->next ) {
507                 if( start_child->pid == pid ) {
508                         forker->first_child = NULL;
509
510                         close( start_child->read_data_fd );
511                         close( start_child->write_data_fd );
512                         close( start_child->read_status_fd );
513                         close( start_child->write_status_fd );
514
515                         prefork_child_free( start_child );
516                 }
517                 return;
518         }
519
520
521         /* special case where the first item in the list needs to be removed */
522         if( start_child->pid == pid ) { 
523
524                 /* find the last one so we can remove the start_child */
525                 do { 
526                         prev_child = cur_child;
527                         cur_child = cur_child->next;
528                 }while( cur_child != start_child );
529
530                 /* now cur_child == start_child */
531                 prev_child->next = cur_child->next;
532                 forker->first_child = prev_child;
533
534                 close( cur_child->read_data_fd );
535                 close( cur_child->write_data_fd );
536                 close( cur_child->read_status_fd );
537                 close( cur_child->write_status_fd );
538
539                 prefork_child_free( cur_child );
540                 return;
541         } 
542
543         do {
544                 prev_child = cur_child;
545                 cur_child = cur_child->next;
546
547                 if( cur_child->pid == pid ) {
548                         prev_child->next = cur_child->next;
549
550                         close( cur_child->read_data_fd );
551                         close( cur_child->write_data_fd );
552                         close( cur_child->read_status_fd );
553                         close( cur_child->write_status_fd );
554
555                         prefork_child_free( cur_child );
556                         return;
557                 }
558
559         } while(cur_child != start_child);
560 }
561
562
563
564
565 prefork_child* prefork_child_init( 
566         int max_requests, int read_data_fd, int write_data_fd, 
567         int read_status_fd, int write_status_fd ) {
568
569         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
570         child->max_requests             = max_requests;
571         child->read_data_fd             = read_data_fd;
572         child->write_data_fd            = write_data_fd;
573         child->read_status_fd   = read_status_fd;
574         child->write_status_fd  = write_status_fd;
575         child->available                        = 1;
576
577         return child;
578 }
579
580
581 int prefork_free( prefork_simple* prefork ) {
582         
583         while( prefork->first_child != NULL ) {
584                 info_handler( "Killing children and sleeping 1 to reap..." );
585                 kill( 0,        SIGKILL );
586                 sleep(1);
587         }
588
589         client_free(prefork->connection);
590         free(prefork->appname);
591         free( prefork );
592         return 1;
593 }
594
595 int prefork_child_free( prefork_child* child ) { 
596         free(child->appname);
597         close(child->read_data_fd);
598         close(child->write_status_fd);
599         free( child ); 
600         return 1;
601 }
602