Initial revision
[OpenSRF.git] / src / router / router.c
1 #include "transport_router.h"
2 #include <sys/types.h>
3 #include <signal.h>
4
5
6 char* router_resource;
7 transport_router_registrar* routt;
8
9 void sig_hup_handler( int a ) { 
10         router_registrar_free( routt ); 
11         config_reader_free();   
12         log_free();
13         free( router_resource );
14         exit(0); 
15 }
16
17
18 int main( int argc, char* argv[] ) {
19
20         if( argc < 2 ) {
21                 fatal_handler( "Usage: %s <path_to_config_file>", argv[0] );
22                 exit(0);
23         }
24
25
26         config_reader_init( argv[1] );  
27         if( conf_reader == NULL ) fatal_handler( "main(): Config is NULL" ); 
28
29         /* laod the config options */
30         char* server                    = config_value("//router/transport/server");
31         char* port                              = config_value("//router/transport/port");
32         char* username                  = config_value("//router/transport/username");
33         char* password                  = config_value("//router/transport/password");
34         router_resource         = config_value("//router/transport/resource");
35         char* con_timeout               = config_value("//router/transport/connect_timeout" );
36         char* max_retries               = config_value("//router/transport/max_reconnect_attempts" );
37
38         fprintf(stderr, "%s %s %s %s", server, port, username, password );
39
40         int iport                       = atoi( port );
41         int con_itimeout        = atoi( con_timeout );
42         int max_retries_        = atoi(max_retries);
43
44         if( iport < 1 ) { 
45                 fatal_handler( "Port is negative or 0" );
46                 return 99;
47         }
48
49
50         /* build the router_registrar */
51         transport_router_registrar* router_registrar = 
52                 router_registrar_init( server, iport, username, password, router_resource, 0, con_itimeout ); 
53
54         routt = router_registrar;
55
56         free(server);
57         free(port);
58         free(username);
59         free(password);
60         free(con_timeout);
61         free(max_retries);
62
63         signal(SIGHUP,sig_hup_handler);
64
65
66         int counter = 0;
67         /* wait for incoming... */
68         while( ++counter <= max_retries_ ) {
69
70                 /* connect to jabber */
71                 if( router_registrar_connect( router_registrar ) )  {
72                         info_handler( "Connected..." );
73                         counter = 0;
74                         listen_loop( router_registrar );
75                 } else  
76                         warning_handler( "Could not connect to Jabber" );
77
78                 /* this should never happen */
79                 warning_handler( "Jabber server probably went away, attempting reconnect" );
80
81                 sleep(5);
82         }
83
84
85         router_registrar_free( router_registrar );
86         config_reader_free();   
87         return 1;
88
89 }
90
91 transport_router_registrar* router_registrar_init( char* server, 
92                 int port, char* username, char* password, 
93                 char* resource, int client_timeout, int con_timeout ) {
94
95         if( server == NULL ) { return NULL; }
96         
97         /* allocate a new router_registrar object */
98         size_t size = sizeof( transport_router_registrar );
99         transport_router_registrar* router_registrar = (transport_router_registrar*) safe_malloc( size );
100
101         router_registrar->client_timeout        = client_timeout;
102         router_registrar->jabber = jabber_connect_init( server, port, username, password, resource, con_timeout );
103         return router_registrar;
104
105 }
106
107 jabber_connect* jabber_connect_init( char* server, 
108                 int port, char* username, char* password, char* resource, int connect_timeout ) {
109
110         size_t len = sizeof(jabber_connect);
111         jabber_connect* jabber = (jabber_connect*) safe_malloc( len );
112
113         jabber->port                            = port;
114         jabber->connect_timeout = connect_timeout;
115
116         jabber->server                          = strdup(server);
117         jabber->username                        = strdup(username);
118         jabber->password                        = strdup(password);
119         jabber->resource                        = strdup(resource);
120
121         if( jabber->server == NULL || jabber->username == NULL ||
122                         jabber->password == NULL || jabber->resource == NULL ) {
123                 fatal_handler( "jabber_init(): Out of Memory" );
124                 return NULL;
125         }
126
127         /* build the transport client */
128         jabber->t_client = client_init( jabber->server, jabber->port );
129
130         return jabber;
131 }
132
133 /* connect the router_registrar to jabber */
134 int router_registrar_connect( transport_router_registrar* router ) {
135         return j_connect( router->jabber );
136 }
137
138 /* connect a jabber_connect object jabber */
139 int j_connect( jabber_connect* jabber ) {
140         if( jabber == NULL ) { return 0; }
141         return client_connect( jabber->t_client, 
142                         jabber->username, jabber->password, jabber->resource, jabber->connect_timeout );
143 }
144
145 int fill_fd_set( transport_router_registrar* router, fd_set* set ) {
146         
147         int max_fd;
148         FD_ZERO(set);
149
150         int router_fd = router->jabber->t_client->session->sock_obj->sock_fd;
151         max_fd = router_fd;
152         FD_SET( router_fd, set );
153
154         server_class_node* cur_node = router->server_class_list;
155         while( cur_node != NULL ) {
156                 int cur_class_fd = cur_node->jabber->t_client->session->sock_obj->sock_fd;
157                 if( cur_class_fd > max_fd ) 
158                         max_fd = cur_class_fd;
159                 FD_SET( cur_class_fd, set );
160                 cur_node = cur_node->next;
161         }
162
163         FD_CLR( 0, set );
164         return max_fd;
165 }
166
167
168 void listen_loop( transport_router_registrar* router ) {
169
170         if( router == NULL )
171                 return;
172
173         int select_ret;
174         int router_fd = router->jabber->t_client->session->sock_obj->sock_fd;
175         transport_message* cur_msg;
176
177         while(1) {
178
179                 fd_set listen_set;
180                 int max_fd = fill_fd_set( router, &listen_set );
181
182                 if( max_fd < 1 ) 
183                         fatal_handler( "fill_fd_set return bogus max_fd: %d", max_fd );
184
185                 int num_handled = 0;
186                 info_handler( "Going into select" );
187
188                 if( (select_ret=select(max_fd+ 1, &listen_set, NULL, NULL, NULL)) < 0 ) {
189
190                         warning_handler( "Select returned error %d", select_ret );
191                         warning_handler( "Select Error %d on fd %d", errno );
192                         perror( "Select Error" );
193                         warning_handler( "Errors: EBADF %d, EINTR %d, EINVAL %d, ENOMEM %d",
194                                         EBADF, EINTR, EINVAL, ENOMEM );
195                         continue;
196
197                 } else {
198
199                         info_handler( "Select returned %d", select_ret );
200                         
201                         if( FD_ISSET( router_fd, &listen_set ) ) {
202                                 cur_msg = client_recv( router->jabber->t_client, 1 );
203                                 router_registrar_handle_msg( router, cur_msg );
204                                 message_free( cur_msg );
205                                 if( ++num_handled == select_ret ) 
206                                         continue;
207                         }
208
209                         /* cycle through the children and find any whose fd's are ready for reading */
210                         server_class_node* cur_node = router->server_class_list;
211                         while( cur_node != NULL ) {
212                                 info_handler("searching child activity" );
213                                 int cur_fd = cur_node->jabber->t_client->session->sock_obj->sock_fd;
214
215                                 if( FD_ISSET(cur_fd, &listen_set) ) {
216                                         ++num_handled;
217                                         FD_CLR(cur_fd,&listen_set);
218                                         info_handler( "found active child %s", cur_node->server_class );
219
220                                         cur_msg = client_recv( cur_node->jabber->t_client, 1 );
221                                         info_handler( "%s received from %s", cur_node->server_class, cur_msg->sender );
222                                         int handle_ret = server_class_handle_msg( router, cur_node, cur_msg );
223
224                                         if( handle_ret == -1 ) {
225                                                 warning_handler( "server_class_handle_msg() returned -1" );
226                                                 cur_node = router->server_class_list; /*start over*/
227                                                 continue;
228
229                                         } else if( handle_ret == 0 ) {
230                                                 /* delete and continue */
231                                                 warning_handler( "server_class_handle_msg() returned 0" );
232                                                 server_class_node* tmp_node = cur_node->next;
233                                                 remove_server_class( router, cur_node );        
234                                                 cur_node = tmp_node;
235                                                 continue;
236                                         } 
237
238                                         info_handler( "%s handled message successfully", cur_node->server_class );
239                                         /* dont free message here */
240                                         if( num_handled == select_ret ) 
241                                                 break;
242                                 }
243                                 if( num_handled == select_ret ) 
244                                         break;
245                                 cur_node = cur_node->next;
246
247                         } /* cycling through the server_class list */
248
249                 } /* no select errors */
250         } 
251 }
252
253
254 /* determine where to route top level messages */
255 int router_registrar_handle_msg( transport_router_registrar* router_registrar, transport_message* msg ) {
256
257         info_handler( "Received class: %s : command %s:  body: %s", msg->router_class, msg->router_command, msg->body );
258
259         if( router_registrar == NULL || msg == NULL ) { return 0; }
260
261         info_handler("Looking for server_class_node %s...",msg->router_class);
262         server_class_node* active_class_node = find_server_class( router_registrar, msg->router_class );
263
264         if( active_class_node == NULL ) { 
265                 info_handler("Could not find server_class_node %s, creating one.",msg->router_class);
266
267                 /* there is no server_class for msg->router_class so we build it here */
268                 if( strcmp( msg->router_command, "register") == 0 ) {
269
270                         info_handler("Adding server_class_node for %s",msg->router_class);
271                         active_class_node = 
272                                 init_server_class( router_registrar, msg->sender, msg->router_class ); 
273
274                         if( active_class_node == NULL ) {
275                                 fatal_handler( "router_listen(): active_class_node == NULL for %s", msg->sender );
276                                 return 0;
277                         }
278
279                         if (router_registrar->server_class_list != NULL) {
280                                 active_class_node->next = router_registrar->server_class_list;
281                                 router_registrar->server_class_list->prev = active_class_node;
282                         }
283                         router_registrar->server_class_list = active_class_node;
284
285                         //spawn_server_class( (void*) active_class_node );
286
287                 } else {
288                         warning_handler( "router_register_handler_msg(): Bad Command [%s] for class [%s]",
289                                 msg->router_command, msg->router_class );
290                 }
291
292         } else if( strcmp( msg->router_command, "register") == 0 ) {
293                 /* there is a server_class for msg->router_class so we 
294                         need to either add a new server_node or update the existing one */
295
296                 
297                 server_node* s_node = find_server_node( active_class_node, msg->sender );
298
299                 if( s_node != NULL ) {
300                         s_node->available = 1;
301                         s_node->upd_time = time(NULL);
302                         info_handler( "Found matching registered server: %s. Updating.",
303                                         s_node->remote_id );
304                 } else {
305                         s_node = init_server_node( msg->sender );
306
307                         info_handler( "Adding server_node for: %s.", s_node->remote_id );
308
309                         if (s_node == NULL ) {
310                                 warning_handler( " Could not create new xerver_node for %s.",
311                                         msg->sender );
312                                 return 0;
313                         }
314
315                         s_node->next = active_class_node->current_server_node->next;
316                         s_node->prev = active_class_node->current_server_node;
317
318                         active_class_node->current_server_node->next->prev = s_node;
319                         active_class_node->current_server_node->next = s_node;
320                 }
321
322
323         } else if( strcmp( msg->router_command, "unregister") == 0 ) {
324
325                 if( ! unregister_server_node( active_class_node, msg->sender ) )
326                         remove_server_class( router_registrar, active_class_node );
327
328         } else {
329                 warning_handler( "router_register_handler_msg(): Bad Command [%s] for class [%s]",
330                         msg->router_command, msg->router_class );
331         }
332
333         return 1;
334 }
335
336
337 /* removes a server class node from the top level router_registrar */
338 int unregister_server_node( server_class_node* active_class_node, char* remote_id ) {
339
340         server_node* d_node = find_server_node( active_class_node, remote_id );
341
342         if ( d_node != NULL ) {
343
344                 info_handler( "Removing server_node for: %s.", d_node->remote_id );
345
346                 if ( d_node->next == NULL ) {
347                         warning_handler( "NEXT is NULL in ring [%s] -- "
348                                 "THIS SHOULD NEVER HAPPEN",
349                                 d_node->remote_id );
350
351                 }
352                 
353                 if ( d_node->prev == NULL ) {
354                         warning_handler( "PREV is NULL in a ring [%s] -- "
355                                 "THIS SHOULD NEVER HAPPEN",
356                                 d_node->remote_id );
357
358                 }
359
360                 if ( d_node->next == d_node && d_node->prev == d_node) {
361                         info_handler( "Last node, setting ring to NULL: %s.",
362                                 d_node->remote_id );
363
364                         active_class_node->current_server_node = NULL;
365
366                         server_node_free( d_node );
367                         return 0;
368
369                 } else {
370                         info_handler( "Nodes remain, splicing: %s, %s",
371                                 d_node->prev->remote_id,
372                                 d_node->next->remote_id);
373
374                 info_handler( "d_node => %x, next => %x, prev => %x",
375                                         d_node, d_node->next, d_node->prev );
376
377
378                         d_node->prev->next = d_node->next;
379                         d_node->next->prev = d_node->prev;
380
381                         info_handler( "prev => %x, prev->next => %x, prev->prev => %x",
382                                 d_node->prev, d_node->prev->next, d_node->prev->prev );
383
384                         info_handler( "next => %x, next->next => %x, next->prev => %x",
385                                 d_node->next, d_node->next->next, d_node->next->prev );
386                                 
387                         if (active_class_node->current_server_node == d_node)
388                                 active_class_node->current_server_node = d_node->next;
389
390
391                         server_node_free( d_node );
392                 }
393         } 
394
395         return 1;
396 }
397
398 server_node * find_server_node ( server_class_node * class, const char * remote_id ) {
399
400         if ( class == NULL ) {
401                 warning_handler(" find_server_node(): bad arg!");
402                 return NULL;
403         }
404
405         server_node * start_node = class->current_server_node;
406         server_node * node = class->current_server_node;
407
408         do {
409                 if (node == NULL)
410                         return NULL;
411
412                 if ( strcmp(node->remote_id, remote_id) == 0 )
413                         return node;
414
415                 node = node->next;
416
417         } while ( node != start_node );
418
419         return NULL;
420 }
421
422 /* if we return -1, then we just deleted the server_class you were looking for
423         if we return 0, then some other error has occured
424         we return 1 otherwise */
425 int remove_server_class( transport_router_registrar* router, server_class_node* class ) {
426         if( class == NULL )
427                 return 0;
428
429         transport_message * msg = NULL;
430         while ( (msg = client_recv(class->jabber->t_client, 0)) != NULL ) {
431                 server_class_handle_msg(router, class, msg);
432                 message_free(msg);
433         }
434         
435         free( class->server_class );
436         class->server_class = NULL;
437
438         find_server_class( router, router_resource ); /* find deletes for us */
439
440         if( router->server_class_list == NULL ) 
441                 return 0;
442         return 1;
443 }
444
445 server_class_node * find_server_class ( transport_router_registrar * router, const char * class_id ) {
446
447         if ( router == NULL ) {
448                 warning_handler(" find_server_class(): bad arg!");
449                 return NULL;
450         }
451
452         info_handler( "Finding server class for %s", class_id );
453         server_class_node * class = router->server_class_list;
454         server_class_node * dead_class = NULL;
455
456         while ( class != NULL ) {
457
458                 if ( class->server_class == NULL ) {
459                         info_handler( "Found an empty server class" );
460
461                         if ( class->prev != NULL ) {
462                                 class->prev->next = class->next;
463                                 if( class->next != NULL ) {
464                                         class->next->prev = class->prev;
465                                 }
466
467                         } else {
468                                 info_handler( "Empty class is the first on the list" );
469                                 if( class->next != NULL ) 
470                                         router->server_class_list = class->next;
471
472                                 else { /* we're the last class node in the class node list */
473                                         info_handler( "Empty class is the last on the list" );
474                                         server_class_node_free( router->server_class_list );
475                                         router->server_class_list = NULL;
476                                         break;
477                                 }
478                                         
479                         }
480
481                         dead_class = class;
482                         class = class->next;
483
484                         info_handler( "Tossing our dead class" );
485                         server_class_node_free( dead_class );
486
487                         if ( class == NULL )
488                                 return NULL;
489                 }
490
491                 if ( strcmp(class->server_class, class_id) == 0 )
492                         return class;
493                 info_handler( "%s != %s", class->server_class, class_id );
494
495                 class = class->next;
496         }
497
498         return NULL;
499 }
500
501 /* builds a new server class and connects to the jabber server with the new resource */
502 server_class_node* init_server_class( 
503                 transport_router_registrar* router, char* remote_id, char* server_class ) {
504
505         size_t len = sizeof( server_class_node );
506         server_class_node* node = (server_class_node*) safe_malloc( len );
507
508         node->jabber = jabber_connect_init( router->jabber->server,
509                         router->jabber->port, router->jabber->username, 
510                         router->jabber->password, server_class, router->jabber->connect_timeout );
511
512
513
514         node->server_class = strdup( server_class );
515         if( server_class == NULL ) {
516                 fatal_handler( "imit_server_class(): out of memory for %s", server_class );
517                 return NULL;
518         }
519
520         info_handler( "Received class to init_server_class: %s", server_class );
521         node->current_server_node = init_server_node( remote_id );
522         if( node->current_server_node == NULL ) {
523                 fatal_handler( "init_server_class(): NULL server_node for %s", remote_id );
524                 return NULL;
525         }
526
527
528         if( ! j_connect( node->jabber ) ) {
529                 fatal_handler( "Unable to init server class %s", node->server_class );
530                 return NULL;
531         }
532
533         info_handler( "Jabber address in init for %s : address %x : username %s : resource %s", 
534                         node->server_class, node->jabber->t_client->session->sock_obj->sock_fd, 
535                         node->jabber->username,  node->jabber->resource );
536
537         return node;
538
539 }
540
541 /* builds a new server_node to be added to the ring of server_nodes */
542 server_node* init_server_node(  char* remote_id ) {
543
544         info_handler( "Initing server node for %s", remote_id );
545         server_node* current_server_node;
546         size_t size = sizeof( server_node);
547         current_server_node = (server_node*) safe_malloc( size );
548
549         current_server_node->remote_id = strdup(remote_id);
550         if( current_server_node->remote_id == NULL ) {
551                 fatal_handler("init_server_class(): Out of Memory for %s", remote_id );
552                 return NULL;
553         }
554         
555         current_server_node->reg_time = time(NULL);     
556         current_server_node->available = 1;
557         current_server_node->next = current_server_node;
558         current_server_node->prev = current_server_node;
559
560
561         return current_server_node;
562
563 }
564
565 int  server_class_handle_msg( transport_router_registrar* router, 
566                 server_class_node* s_node, transport_message* msg ) {
567
568         if( s_node->current_server_node == NULL ) {
569                 /* return error to client ??!*/
570                 /* WE have no one to send the message to */
571                 warning_handler( "We no longer have any servers for %s : " 
572                                 "no one to send the message to. Sending error message to %s", s_node->server_class, msg->sender );
573                 free( msg->recipient );  
574
575                 char* rec = strdup( msg->sender );
576                 if( rec == NULL ) {
577                         fatal_handler( "class msg_handler: out of memory");
578                         return 0;
579                 }
580
581                 info_handler( "Building error message to return for %s", s_node->server_class);
582                 msg->recipient = rec;
583                 set_msg_error(msg, "cancel", 501);
584
585                 client_send_message( s_node->jabber->t_client, msg );
586                 message_free( msg );
587
588                 remove_server_class( router, s_node );
589
590                 return -1;
591         }
592
593         info_handler( "[%s] Received from %s to \n%s", 
594                         s_node->server_class, msg->sender, msg->recipient );
595
596         if( msg->is_error ) {
597                 warning_handler( "We've received an error message type: %s : code: %d", 
598                                 msg->error_type, msg->error_code );
599
600                 if( strcmp( msg->error_type, "cancel" ) == 0 ) {
601                         warning_handler( "Looks like we've lost a server!" );
602                         server_node* dead_node = find_server_node( s_node, msg->sender );
603
604                         if( dead_node != NULL ) { 
605                                 //message_free( msg );
606                                 transport_message* tmp = dead_node->last_sent;
607
608                                 /* copy over last sent, it will be freed in the unregister function */
609                                 transport_message* tmp2 = message_init( tmp->body, tmp->subject, tmp->thread,
610                                                 tmp->recipient, tmp->sender );
611                                         
612                                 message_set_router_info( tmp2, tmp->router_from,  
613                                                 tmp->router_to, tmp->router_class, tmp->router_command, tmp->broadcast );
614
615                                 if( ! unregister_server_node( s_node, dead_node->remote_id ) ) { 
616                                         /* WE have no one to send the message to */
617                                         warning_handler( "We no longer have any servers for %s : " 
618                                                         "no one to send the message to.", s_node->server_class );
619                                         free( msg->recipient );  
620
621                                         char* rec = strdup( msg->router_from );
622                                         if( rec == NULL ) {
623                                                 fatal_handler( "class msg_handler: out of memory");
624                                                 return 0;
625                                         }
626
627                                         info_handler( "Building error message to return for %s", s_node->server_class);
628                                         msg->recipient = rec;
629                                         client_send_message( s_node->jabber->t_client, msg );
630                                         message_free( tmp2 );
631                                         message_free( msg );
632                                         return 0;
633
634                                 } else {
635                                         msg = tmp2;
636                                 }
637                         }
638                 }
639         } 
640
641
642         server_node* c_node = s_node->current_server_node->next;
643
644         /* not implemented yet */
645         while( ! c_node->available ) {
646                 if( c_node == s_node->current_server_node ) {
647                         warning_handler("No server_node's are available for %s", s_node->server_class );
648                         /* XXX send error message to client */
649                         return 0;
650                 }
651                 c_node = c_node->next;
652         }
653         s_node->current_server_node = c_node;
654
655         transport_message * new_msg =
656                 message_init(   msg->body, msg->subject, msg->thread, 
657                                 s_node->current_server_node->remote_id, msg->sender );
658
659         message_set_router_info( new_msg, NULL, msg->sender, NULL, NULL, 0 );
660
661         info_handler( "[%s] Routing message from [%s]\nto [%s]", s_node->server_class, new_msg->sender, new_msg->recipient );
662
663         message_free( s_node->current_server_node->last_sent );
664         s_node->current_server_node->last_sent = msg;
665
666         if ( new_msg != NULL && client_send_message( s_node->jabber->t_client, new_msg ) ) {
667                 s_node->current_server_node->serve_count++;
668                 s_node->current_server_node->la_time = time(NULL);
669                 message_free( new_msg ); // XXX
670                 return 1;
671         }
672         info_handler( "message sent" );
673         message_free( new_msg ); // XXX
674
675         return 0;
676 }
677
678 int router_registrar_free( transport_router_registrar* router_registrar ) {
679         if( router_registrar == NULL ) return 0;
680         jabber_connect_free( router_registrar->jabber );
681
682         /* free the server_class list XXX */
683         while( router_registrar->server_class_list != NULL ) {
684                 remove_server_class(router_registrar, router_registrar->server_class_list);
685         }
686
687         free( router_registrar );
688
689
690
691         return 1;
692 }
693
694
695 int server_class_node_free( server_class_node* node ) {
696         if( node == NULL ) { return 0; }
697         if( node->server_class != NULL ) 
698                 free( node->server_class );
699
700         jabber_connect_free( node->jabber );
701
702         /* just in case, free the list */
703         while( node->current_server_node != NULL ) {
704                 unregister_server_node( node, node->current_server_node->remote_id );
705         }
706         free( node );
707         return 1;
708 }
709
710 int server_node_free( server_node* node ) {
711         if( node == NULL ) { return 0; }
712         message_free( node->last_sent );
713         free( node->remote_id );
714         free( node );
715         return 1;
716 }
717
718 int jabber_connect_free( jabber_connect* jabber ) {
719         if( jabber == NULL ) { return 0; }
720         client_free( jabber->t_client );
721         free( jabber->username );
722         free( jabber->password );
723         free( jabber->resource );
724         free( jabber->server );
725         free( jabber );
726         return 1;
727 }
728
729