]> git.evergreen-ils.org Git - Evergreen.git/blob - OpenSRF/src/libstack/osrf_prefork.c
like the Perl server, you can now no longer re-bootstrap clients in a process that...
[Evergreen.git] / OpenSRF / src / libstack / osrf_prefork.c
1 #include "osrf_prefork.h"
2 #include <signal.h>
3 #include "osrf_app_session.h"
4
5 /* true if we just deleted a child.  This will allow us to make sure we're
6         not trying to use freed memory */
7 int child_dead;
8
9 int main();
10 void sigchld_handler( int sig );
11
12 int osrf_prefork_run(char* appname) {
13
14         if(!appname) {
15                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
16                 return -1;
17         }
18
19         set_proc_title( "OpenSRF Listener [%s]", appname );
20
21         int maxr = 1000; 
22         int maxc = 10;
23         int minc = 3;
24
25         osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
26
27         jsonObject* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname);
28         jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname);
29         jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname);
30
31         
32         if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming 1000");
33         else maxr = (int) jsonObjectGetNumber(max_req);
34
35         if(!min_children) osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming 3");
36         else minc = (int) jsonObjectGetNumber(min_children);
37
38         if(!max_children) osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming 10");
39         else maxc = (int) jsonObjectGetNumber(max_children);
40
41         jsonObjectFree(max_req);
42         jsonObjectFree(min_children);
43         jsonObjectFree(max_children);
44         /* --------------------------------------------------- */
45
46         char* resc = va_list_to_string("%s_listener", appname);
47
48         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc )) {
49                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
50                 free(resc);
51                 return -1;
52         }
53
54         free(resc);
55
56         prefork_simple* forker = prefork_simple_init(
57                 osrfSystemGetTransportClient(), maxr, minc, maxc);
58
59         forker->appname = strdup(appname);
60
61         if(forker == NULL) {
62                 osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object");
63                 return -1;
64         }
65
66         prefork_launch_children(forker);
67
68         osrf_prefork_register_routers(appname);
69         
70         osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
71         prefork_run(forker);
72         
73         osrfLogWarning( OSRF_LOG_MARK, "prefork_run() retuned - how??");
74         prefork_free(forker);
75         return 0;
76
77 }
78
79 void osrf_prefork_register_routers( char* appname ) {
80
81         osrfStringArray* arr = osrfNewStringArray(4);
82
83         int c = osrfConfigGetValueList( NULL, arr, "/routers/router" );
84         char* routerName = osrfConfigGetValue( NULL, "/router_name" );
85         transport_client* client = osrfSystemGetTransportClient();
86
87         osrfLogInfo( OSRF_LOG_MARK, "router name is %s and we have %d routers to connect to", routerName, c );
88
89         while( c ) {
90                 char* domain = osrfStringArrayGetString(arr, --c);
91                 if(domain) {
92
93                         char* jid = va_list_to_string( "%s@%s/router", routerName, domain );
94                         osrfLogInfo( OSRF_LOG_MARK, "Registering with router %s", jid );
95
96                         transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
97                         message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
98
99                         client_send_message( client, msg );
100                         message_free( msg );
101                         free(jid);
102                 }
103         }
104
105         free(routerName);
106         osrfStringArrayFree(arr);
107 }
108
109 void prefork_child_init_hook(prefork_child* child) {
110
111         if(!child) return;
112         osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
113         char* resc = va_list_to_string("%s_drone",child->appname);
114
115         /* we want to remove traces of our parents socket connection 
116          * so we can have our own */
117         osrfSystemIgnoreTransportClient();
118
119         if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) {
120                 osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
121                 free(resc);
122                 return;
123         }
124
125         free(resc);
126
127         set_proc_title( "OpenSRF Drone [%s]", child->appname );
128 }
129
130 void prefork_child_process_request(prefork_child* child, char* data) {
131         if( !child ) return;
132
133         /* construct the message from the xml */
134         transport_message* msg = new_message_from_xml( data );
135
136         osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
137         if(!session) return;
138
139         if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
140                 osrfAppSessionFree( session );
141                 return;
142         }
143
144         /* keepalive loop for stateful sessions */
145
146                 osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
147 }
148
149
150 prefork_simple*  prefork_simple_init( transport_client* client, 
151                 int max_requests, int min_children, int max_children ) {
152
153         if( min_children > max_children ) {
154                 osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
155                                 "than max_children (%d)", min_children, max_children );
156                 return NULL;
157         }
158
159         if( max_children > ABS_MAX_CHILDREN ) {
160                 osrfLogError( OSRF_LOG_MARK,  "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)",
161                                 max_children, ABS_MAX_CHILDREN );
162                 return NULL;
163         }
164
165         /* flesh out the struct */
166         prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));        
167         prefork->max_requests = max_requests;
168         prefork->min_children = min_children;
169         prefork->max_children = max_children;
170         prefork->first_child = NULL;
171         prefork->connection = client;
172
173         return prefork;
174 }
175
176 prefork_child*  launch_child( prefork_simple* forker ) {
177
178         pid_t pid;
179         int data_fd[2];
180         int status_fd[2];
181
182         /* Set up the data pipes and add the child struct to the parent */
183         if( pipe(data_fd) < 0 ) { /* build the data pipe*/
184                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
185                 return NULL;
186         }
187
188         if( pipe(status_fd) < 0 ) {/* build the status pipe */
189                 osrfLogError( OSRF_LOG_MARK,  "Pipe making error" );
190                 return NULL;
191         }
192
193         osrfLogInternal( OSRF_LOG_MARK,  "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
194         prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], 
195                         data_fd[1], status_fd[0], status_fd[1] );
196
197         child->appname = strdup(forker->appname);
198
199
200         add_prefork_child( forker, child );
201
202         if( (pid=fork()) < 0 ) {
203                 osrfLogError( OSRF_LOG_MARK,  "Forking Error" );
204                 return NULL;
205         }
206
207         if( pid > 0 ) {  /* parent */
208
209                 signal(SIGCHLD, sigchld_handler);
210                 (forker->current_num_children)++;
211                 child->pid = pid;
212
213                 osrfLogDebug( OSRF_LOG_MARK,  "Parent launched %d", pid );
214                 /* *no* child pipe FD's can be closed or the parent will re-use fd's that
215                         the children are currently using */
216                 return child;
217         }
218
219         else { /* child */
220
221                 osrfLogInternal( OSRF_LOG_MARK, "I am  new child with read_data_fd = %d and write_status_fd = %d",
222                         child->read_data_fd, child->write_status_fd );
223
224                 child->pid = getpid();
225                 close( child->write_data_fd );
226                 close( child->read_status_fd );
227
228                 /* do the initing */
229                 prefork_child_init_hook(child);
230
231                 prefork_child_wait( child );
232                 exit(0); /* just to be sure */
233          }
234         return NULL;
235 }
236
237
238 void prefork_launch_children( prefork_simple* forker ) {
239         if(!forker) return;
240         int c = 0;
241         while( c++ < forker->min_children )
242                 launch_child( forker );
243 }
244
245
246 void sigchld_handler( int sig ) {
247         signal(SIGCHLD, sigchld_handler);
248         child_dead = 1;
249 }
250
251
252 void reap_children( prefork_simple* forker ) {
253
254         pid_t child_pid;
255         int status;
256
257         while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) 
258                 del_prefork_child( forker, child_pid ); 
259
260         /* replenish */
261         while( forker->current_num_children < forker->min_children ) 
262                 launch_child( forker );
263
264         child_dead = 0;
265 }
266
267 void prefork_run(prefork_simple* forker) {
268
269         if( forker->first_child == NULL )
270                 return;
271
272         transport_message* cur_msg = NULL;
273
274
275         while(1) {
276
277                 if( forker->first_child == NULL ) {/* no more children */
278                         osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
279                         return;
280                 }
281
282                 osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data...");
283                 cur_msg = client_recv( forker->connection, -1 );
284
285                 //fprintf(stderr, "Got Data %f\n", get_timestamp_millis() );
286
287                 if( cur_msg == NULL ) continue;
288
289                 int honored = 0;        /* true if we've serviced the request */
290
291                 while( ! honored ) {
292
293                         check_children( forker ); 
294
295                         osrfLogDebug( OSRF_LOG_MARK,  "Server received inbound data" );
296                         int k;
297                         prefork_child* cur_child = forker->first_child;
298
299                         /* Look for an available child */
300                         for( k = 0; k < forker->current_num_children; k++ ) {
301
302                                 osrfLogInternal( OSRF_LOG_MARK, "Searching for available child. cur_child->pid = %d", cur_child->pid );
303                                 osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d", forker->current_num_children, k);
304                         
305                                 if( cur_child->available ) {
306                                         osrfLogDebug( OSRF_LOG_MARK,  "forker sending data to %d", cur_child->pid );
307
308                                         message_prepare_xml( cur_msg );
309                                         char* data = cur_msg->msg_xml;
310                                         if( ! data || strlen(data) < 1 ) break;
311
312                                         cur_child->available = 0;
313                                         osrfLogInternal( OSRF_LOG_MARK,  "Writing to child fd %d", cur_child->write_data_fd );
314
315                                         int written = 0;
316                                         //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
317                                         if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
318                                                 osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
319                                                 cur_child = cur_child->next;
320                                                 continue;
321                                         }
322
323                                         //fprintf(stderr, "Wrote %d bytes to child\n", written);
324
325                                         forker->first_child = cur_child->next;
326                                         honored = 1;
327                                         break;
328                                 } else 
329                                         cur_child = cur_child->next;
330                         } 
331
332                         /* if none available, add a new child if we can */
333                         if( ! honored ) {
334                                 osrfLogDebug( OSRF_LOG_MARK, "Not enough children, attempting to add...");
335                                 if( forker->current_num_children < forker->max_children ) {
336                                         osrfLogDebug( OSRF_LOG_MARK,  "Launching new child with current_num = %d",
337                                                         forker->current_num_children );
338
339                                         prefork_child* new_child = launch_child( forker );
340                                         message_prepare_xml( cur_msg );
341                                         char* data = cur_msg->msg_xml;
342                                         if( ! data || strlen(data) < 1 ) break;
343                                         new_child->available = 0;
344                                         osrfLogDebug( OSRF_LOG_MARK,  "Writing to new child fd %d : pid %d", 
345                                                         new_child->write_data_fd, new_child->pid );
346                                         write( new_child->write_data_fd, data, strlen(data) + 1 );
347                                         forker->first_child = new_child->next;
348                                         honored = 1;
349                                 }
350                         }
351
352                         if( !honored ) {
353                                 osrfLogWarning( OSRF_LOG_MARK,  "No children available, sleeping and looping..." );
354                                 usleep( 50000 ); /* 50 milliseconds */
355                         }
356
357                         if( child_dead )
358                                 reap_children(forker);
359
360
361                         //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
362
363                 } // honored?
364
365                 message_free( cur_msg );
366
367         } /* top level listen loop */
368
369 }
370
371
372 void check_children( prefork_simple* forker ) {
373
374         //check_begin:
375
376         int select_ret;
377         fd_set read_set;
378         FD_ZERO(&read_set);
379         int max_fd = 0;
380         int n;
381
382         struct timeval tv;
383         tv.tv_sec       = 0;
384         tv.tv_usec      = 0;
385
386         if( child_dead )
387                 reap_children(forker);
388
389         prefork_child* cur_child = forker->first_child;
390
391         int i;
392         for( i = 0; i!= forker->current_num_children; i++ ) {
393
394                 if( cur_child->read_status_fd > max_fd )
395                         max_fd = cur_child->read_status_fd;
396                 FD_SET( cur_child->read_status_fd, &read_set );
397                 cur_child = cur_child->next;
398         }
399
400         FD_CLR(0,&read_set);/* just to be sure */
401
402         if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
403                 osrfLogWarning( OSRF_LOG_MARK,  "Select returned error %d on check_children", errno );
404         }
405
406         if( select_ret == 0 )
407                 return;
408
409         /* see if one of a child has told us it's done */
410         cur_child = forker->first_child;
411         int j;
412         int num_handled = 0;
413         for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
414
415                 if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
416                         //printf( "Server received status from a child %d\n", cur_child->pid );
417                         osrfLogDebug( OSRF_LOG_MARK,  "Server received status from a child %d", cur_child->pid );
418
419                         num_handled++;
420
421                         /* now suck off the data */
422                         char buf[64];
423                         memset( buf, 0, 64);
424                         if( (n=read(cur_child->read_status_fd, buf, 63))  < 0 ) {
425                                 osrfLogWarning( OSRF_LOG_MARK, "Read error afer select in child status read with errno %d", errno);
426                         }
427
428                         osrfLogDebug( OSRF_LOG_MARK,  "Read %d bytes from status buffer: %s", n, buf );
429                         cur_child->available = 1;
430                 }
431                 cur_child = cur_child->next;
432         } 
433
434 }
435
436
437 void prefork_child_wait( prefork_child* child ) {
438
439         int i,n;
440         growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
441         char buf[READ_BUFSIZE];
442         memset( buf, 0, READ_BUFSIZE );
443
444         for( i = 0; i!= child->max_requests; i++ ) {
445
446                 n = -1;
447                 clr_fl(child->read_data_fd, O_NONBLOCK );
448                 while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
449                         buffer_add( gbuf, buf );
450                         memset( buf, 0, READ_BUFSIZE );
451
452                         //fprintf(stderr, "Child read %d bytes\n", n);
453
454                         if( n == READ_BUFSIZE ) { 
455                                 //fprintf(stderr, "We read READ_BUFSIZE data....\n");
456                                 /* XXX */
457                                 /* either we have exactly READ_BUFSIZE data, 
458                                         or there's more waiting that we need to grab*/
459                                 /* must set to non-block for reading more */
460                         } else {
461                                 //fprintf(stderr, "Read Data %f\n", get_timestamp_millis() );
462                                 prefork_child_process_request(child, gbuf->buf);
463                                 buffer_reset( gbuf );
464                                 break;
465                         }
466                 }
467
468                 if( n < 0 ) {
469                         osrfLogWarning( OSRF_LOG_MARK,  "Child read returned error with errno %d", errno );
470                         break;
471                 }
472
473                 if( i < child->max_requests - 1 ) 
474                         write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
475         }
476
477         buffer_free(gbuf);
478
479         osrfLogDebug( OSRF_LOG_MARK, "Child exiting...[%d]", getpid() );
480
481         exit(0);
482 }
483
484
485 void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
486         
487         if( forker->first_child == NULL ) {
488                 forker->first_child = child;
489                 child->next = child;
490                 return;
491         }
492
493         /* we put the child in as the last because, regardless, 
494                 we have to do the DLL splice dance, and this is the
495            simplest way */
496
497         prefork_child* start_child = forker->first_child;
498         while(1) {
499                 if( forker->first_child->next == start_child ) 
500                         break;
501                 forker->first_child = forker->first_child->next;
502         }
503
504         /* here we know that forker->first_child is the last element 
505                 in the list and start_child is the first.  Insert the
506                 new child between them*/
507
508         forker->first_child->next = child;
509         child->next = start_child;
510         return;
511 }
512
513 prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
514
515         if( forker->first_child == NULL ) { return NULL; }
516         prefork_child* start_child = forker->first_child;
517         do {
518                 if( forker->first_child->pid == pid ) 
519                         return forker->first_child;
520         } while( (forker->first_child = forker->first_child->next) != start_child );
521
522         return NULL;
523 }
524
525
526 void del_prefork_child( prefork_simple* forker, pid_t pid ) { 
527
528         if( forker->first_child == NULL ) { return; }
529
530         (forker->current_num_children)--;
531         osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
532
533         prefork_child* start_child = forker->first_child; /* starting point */
534         prefork_child* cur_child        = start_child; /* current pointer */
535         prefork_child* prev_child       = start_child; /* the trailing pointer */
536
537         /* special case where there is only one in the list */
538         if( start_child == start_child->next ) {
539                 if( start_child->pid == pid ) {
540                         forker->first_child = NULL;
541
542                         close( start_child->read_data_fd );
543                         close( start_child->write_data_fd );
544                         close( start_child->read_status_fd );
545                         close( start_child->write_status_fd );
546
547                         prefork_child_free( start_child );
548                 }
549                 return;
550         }
551
552
553         /* special case where the first item in the list needs to be removed */
554         if( start_child->pid == pid ) { 
555
556                 /* find the last one so we can remove the start_child */
557                 do { 
558                         prev_child = cur_child;
559                         cur_child = cur_child->next;
560                 }while( cur_child != start_child );
561
562                 /* now cur_child == start_child */
563                 prev_child->next = cur_child->next;
564                 forker->first_child = prev_child;
565
566                 close( cur_child->read_data_fd );
567                 close( cur_child->write_data_fd );
568                 close( cur_child->read_status_fd );
569                 close( cur_child->write_status_fd );
570
571                 prefork_child_free( cur_child );
572                 return;
573         } 
574
575         do {
576                 prev_child = cur_child;
577                 cur_child = cur_child->next;
578
579                 if( cur_child->pid == pid ) {
580                         prev_child->next = cur_child->next;
581
582                         close( cur_child->read_data_fd );
583                         close( cur_child->write_data_fd );
584                         close( cur_child->read_status_fd );
585                         close( cur_child->write_status_fd );
586
587                         prefork_child_free( cur_child );
588                         return;
589                 }
590
591         } while(cur_child != start_child);
592 }
593
594
595
596
597 prefork_child* prefork_child_init( 
598         int max_requests, int read_data_fd, int write_data_fd, 
599         int read_status_fd, int write_status_fd ) {
600
601         prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
602         child->max_requests             = max_requests;
603         child->read_data_fd             = read_data_fd;
604         child->write_data_fd            = write_data_fd;
605         child->read_status_fd   = read_status_fd;
606         child->write_status_fd  = write_status_fd;
607         child->available                        = 1;
608
609         return child;
610 }
611
612
613 int prefork_free( prefork_simple* prefork ) {
614         
615         while( prefork->first_child != NULL ) {
616                 osrfLogInfo( OSRF_LOG_MARK,  "Killing children and sleeping 1 to reap..." );
617                 kill( 0,        SIGKILL );
618                 sleep(1);
619         }
620
621         client_free(prefork->connection);
622         free(prefork->appname);
623         free( prefork );
624         return 1;
625 }
626
627 int prefork_child_free( prefork_child* child ) { 
628         free(child->appname);
629         close(child->read_data_fd);
630         close(child->write_status_fd);
631         free( child ); 
632         return 1;
633 }
634