1 #include "transport_router.h"
7 transport_router_registrar* routt;
9 void sig_hup_handler( int a ) {
10 router_registrar_free( routt );
13 free( router_resource );
18 int main( int argc, char* argv[] ) {
21 fatal_handler( "Usage: %s <path_to_config_file>", argv[0] );
26 config_reader_init( argv[1] );
27 if( conf_reader == NULL ) fatal_handler( "main(): Config is NULL" );
29 /* laod the config options */
30 char* server = config_value("//router/transport/server");
31 char* port = config_value("//router/transport/port");
32 char* username = config_value("//router/transport/username");
33 char* password = config_value("//router/transport/password");
34 router_resource = config_value("//router/transport/resource");
35 char* con_timeout = config_value("//router/transport/connect_timeout" );
36 char* max_retries = config_value("//router/transport/max_reconnect_attempts" );
38 fprintf(stderr, "%s %s %s %s", server, port, username, password );
40 int iport = atoi( port );
41 int con_itimeout = atoi( con_timeout );
42 int max_retries_ = atoi(max_retries);
45 fatal_handler( "Port is negative or 0" );
50 /* build the router_registrar */
51 transport_router_registrar* router_registrar =
52 router_registrar_init( server, iport, username, password, router_resource, 0, con_itimeout );
54 routt = router_registrar;
63 signal(SIGHUP,sig_hup_handler);
67 /* wait for incoming... */
68 while( ++counter <= max_retries_ ) {
70 /* connect to jabber */
71 if( router_registrar_connect( router_registrar ) ) {
72 info_handler( "Connected..." );
74 listen_loop( router_registrar );
76 warning_handler( "Could not connect to Jabber" );
78 /* this should never happen */
79 warning_handler( "Jabber server probably went away, attempting reconnect" );
85 router_registrar_free( router_registrar );
91 transport_router_registrar* router_registrar_init( char* server,
92 int port, char* username, char* password,
93 char* resource, int client_timeout, int con_timeout ) {
95 if( server == NULL ) { return NULL; }
97 /* allocate a new router_registrar object */
98 size_t size = sizeof( transport_router_registrar );
99 transport_router_registrar* router_registrar = (transport_router_registrar*) safe_malloc( size );
101 router_registrar->client_timeout = client_timeout;
102 router_registrar->jabber = jabber_connect_init( server, port, username, password, resource, con_timeout );
103 return router_registrar;
107 jabber_connect* jabber_connect_init( char* server,
108 int port, char* username, char* password, char* resource, int connect_timeout ) {
110 size_t len = sizeof(jabber_connect);
111 jabber_connect* jabber = (jabber_connect*) safe_malloc( len );
114 jabber->connect_timeout = connect_timeout;
116 jabber->server = strdup(server);
117 jabber->username = strdup(username);
118 jabber->password = strdup(password);
119 jabber->resource = strdup(resource);
121 if( jabber->server == NULL || jabber->username == NULL ||
122 jabber->password == NULL || jabber->resource == NULL ) {
123 fatal_handler( "jabber_init(): Out of Memory" );
127 /* build the transport client */
128 jabber->t_client = client_init( jabber->server, jabber->port );
133 /* connect the router_registrar to jabber */
134 int router_registrar_connect( transport_router_registrar* router ) {
135 return j_connect( router->jabber );
138 /* connect a jabber_connect object jabber */
139 int j_connect( jabber_connect* jabber ) {
140 if( jabber == NULL ) { return 0; }
141 return client_connect( jabber->t_client,
142 jabber->username, jabber->password, jabber->resource, jabber->connect_timeout );
145 int fill_fd_set( transport_router_registrar* router, fd_set* set ) {
150 int router_fd = router->jabber->t_client->session->sock_obj->sock_fd;
152 FD_SET( router_fd, set );
154 server_class_node* cur_node = router->server_class_list;
155 while( cur_node != NULL ) {
156 int cur_class_fd = cur_node->jabber->t_client->session->sock_obj->sock_fd;
157 if( cur_class_fd > max_fd )
158 max_fd = cur_class_fd;
159 FD_SET( cur_class_fd, set );
160 cur_node = cur_node->next;
168 void listen_loop( transport_router_registrar* router ) {
174 int router_fd = router->jabber->t_client->session->sock_obj->sock_fd;
175 transport_message* cur_msg;
180 int max_fd = fill_fd_set( router, &listen_set );
183 fatal_handler( "fill_fd_set return bogus max_fd: %d", max_fd );
186 info_handler( "Going into select" );
188 if( (select_ret=select(max_fd+ 1, &listen_set, NULL, NULL, NULL)) < 0 ) {
190 warning_handler( "Select returned error %d", select_ret );
191 warning_handler( "Select Error %d on fd %d", errno );
192 perror( "Select Error" );
193 warning_handler( "Errors: EBADF %d, EINTR %d, EINVAL %d, ENOMEM %d",
194 EBADF, EINTR, EINVAL, ENOMEM );
199 info_handler( "Select returned %d", select_ret );
201 if( FD_ISSET( router_fd, &listen_set ) ) {
202 cur_msg = client_recv( router->jabber->t_client, 1 );
203 router_registrar_handle_msg( router, cur_msg );
204 message_free( cur_msg );
205 if( ++num_handled == select_ret )
209 /* cycle through the children and find any whose fd's are ready for reading */
210 server_class_node* cur_node = router->server_class_list;
211 while( cur_node != NULL ) {
212 info_handler("searching child activity" );
213 int cur_fd = cur_node->jabber->t_client->session->sock_obj->sock_fd;
215 if( FD_ISSET(cur_fd, &listen_set) ) {
217 FD_CLR(cur_fd,&listen_set);
218 info_handler( "found active child %s", cur_node->server_class );
220 cur_msg = client_recv( cur_node->jabber->t_client, 1 );
221 info_handler( "%s received from %s", cur_node->server_class, cur_msg->sender );
222 int handle_ret = server_class_handle_msg( router, cur_node, cur_msg );
224 if( handle_ret == -1 ) {
225 warning_handler( "server_class_handle_msg() returned -1" );
226 cur_node = router->server_class_list; /*start over*/
229 } else if( handle_ret == 0 ) {
230 /* delete and continue */
231 warning_handler( "server_class_handle_msg() returned 0" );
232 server_class_node* tmp_node = cur_node->next;
233 remove_server_class( router, cur_node );
238 info_handler( "%s handled message successfully", cur_node->server_class );
239 /* dont free message here */
240 if( num_handled == select_ret )
243 if( num_handled == select_ret )
245 cur_node = cur_node->next;
247 } /* cycling through the server_class list */
249 } /* no select errors */
254 /* determine where to route top level messages */
255 int router_registrar_handle_msg( transport_router_registrar* router_registrar, transport_message* msg ) {
257 info_handler( "Received class: %s : command %s: body: %s", msg->router_class, msg->router_command, msg->body );
259 if( router_registrar == NULL || msg == NULL ) { return 0; }
261 info_handler("Looking for server_class_node %s...",msg->router_class);
262 server_class_node* active_class_node = find_server_class( router_registrar, msg->router_class );
264 if( active_class_node == NULL ) {
265 info_handler("Could not find server_class_node %s, creating one.",msg->router_class);
267 /* there is no server_class for msg->router_class so we build it here */
268 if( strcmp( msg->router_command, "register") == 0 ) {
270 info_handler("Adding server_class_node for %s",msg->router_class);
272 init_server_class( router_registrar, msg->sender, msg->router_class );
274 if( active_class_node == NULL ) {
275 fatal_handler( "router_listen(): active_class_node == NULL for %s", msg->sender );
279 if (router_registrar->server_class_list != NULL) {
280 active_class_node->next = router_registrar->server_class_list;
281 router_registrar->server_class_list->prev = active_class_node;
283 router_registrar->server_class_list = active_class_node;
285 //spawn_server_class( (void*) active_class_node );
288 warning_handler( "router_register_handler_msg(): Bad Command [%s] for class [%s]",
289 msg->router_command, msg->router_class );
292 } else if( strcmp( msg->router_command, "register") == 0 ) {
293 /* there is a server_class for msg->router_class so we
294 need to either add a new server_node or update the existing one */
297 server_node* s_node = find_server_node( active_class_node, msg->sender );
299 if( s_node != NULL ) {
300 s_node->available = 1;
301 s_node->upd_time = time(NULL);
302 info_handler( "Found matching registered server: %s. Updating.",
305 s_node = init_server_node( msg->sender );
307 info_handler( "Adding server_node for: %s.", s_node->remote_id );
309 if (s_node == NULL ) {
310 warning_handler( " Could not create new xerver_node for %s.",
315 s_node->next = active_class_node->current_server_node->next;
316 s_node->prev = active_class_node->current_server_node;
318 active_class_node->current_server_node->next->prev = s_node;
319 active_class_node->current_server_node->next = s_node;
323 } else if( strcmp( msg->router_command, "unregister") == 0 ) {
325 if( ! unregister_server_node( active_class_node, msg->sender ) )
326 remove_server_class( router_registrar, active_class_node );
329 warning_handler( "router_register_handler_msg(): Bad Command [%s] for class [%s]",
330 msg->router_command, msg->router_class );
337 /* removes a server class node from the top level router_registrar */
338 int unregister_server_node( server_class_node* active_class_node, char* remote_id ) {
340 server_node* d_node = find_server_node( active_class_node, remote_id );
342 if ( d_node != NULL ) {
344 info_handler( "Removing server_node for: %s.", d_node->remote_id );
346 if ( d_node->next == NULL ) {
347 warning_handler( "NEXT is NULL in ring [%s] -- "
348 "THIS SHOULD NEVER HAPPEN",
353 if ( d_node->prev == NULL ) {
354 warning_handler( "PREV is NULL in a ring [%s] -- "
355 "THIS SHOULD NEVER HAPPEN",
360 if ( d_node->next == d_node && d_node->prev == d_node) {
361 info_handler( "Last node, setting ring to NULL: %s.",
364 active_class_node->current_server_node = NULL;
366 server_node_free( d_node );
370 info_handler( "Nodes remain, splicing: %s, %s",
371 d_node->prev->remote_id,
372 d_node->next->remote_id);
374 info_handler( "d_node => %x, next => %x, prev => %x",
375 d_node, d_node->next, d_node->prev );
378 d_node->prev->next = d_node->next;
379 d_node->next->prev = d_node->prev;
381 info_handler( "prev => %x, prev->next => %x, prev->prev => %x",
382 d_node->prev, d_node->prev->next, d_node->prev->prev );
384 info_handler( "next => %x, next->next => %x, next->prev => %x",
385 d_node->next, d_node->next->next, d_node->next->prev );
387 if (active_class_node->current_server_node == d_node)
388 active_class_node->current_server_node = d_node->next;
391 server_node_free( d_node );
398 server_node * find_server_node ( server_class_node * class, const char * remote_id ) {
400 if ( class == NULL ) {
401 warning_handler(" find_server_node(): bad arg!");
405 server_node * start_node = class->current_server_node;
406 server_node * node = class->current_server_node;
412 if ( strcmp(node->remote_id, remote_id) == 0 )
417 } while ( node != start_node );
422 /* if we return -1, then we just deleted the server_class you were looking for
423 if we return 0, then some other error has occured
424 we return 1 otherwise */
425 int remove_server_class( transport_router_registrar* router, server_class_node* class ) {
429 transport_message * msg = NULL;
430 while ( (msg = client_recv(class->jabber->t_client, 0)) != NULL ) {
431 server_class_handle_msg(router, class, msg);
435 free( class->server_class );
436 class->server_class = NULL;
438 find_server_class( router, router_resource ); /* find deletes for us */
440 if( router->server_class_list == NULL )
445 server_class_node * find_server_class ( transport_router_registrar * router, const char * class_id ) {
447 if ( router == NULL ) {
448 warning_handler(" find_server_class(): bad arg!");
452 info_handler( "Finding server class for %s", class_id );
453 server_class_node * class = router->server_class_list;
454 server_class_node * dead_class = NULL;
456 while ( class != NULL ) {
458 if ( class->server_class == NULL ) {
459 info_handler( "Found an empty server class" );
461 if ( class->prev != NULL ) {
462 class->prev->next = class->next;
463 if( class->next != NULL ) {
464 class->next->prev = class->prev;
468 info_handler( "Empty class is the first on the list" );
469 if( class->next != NULL )
470 router->server_class_list = class->next;
472 else { /* we're the last class node in the class node list */
473 info_handler( "Empty class is the last on the list" );
474 server_class_node_free( router->server_class_list );
475 router->server_class_list = NULL;
484 info_handler( "Tossing our dead class" );
485 server_class_node_free( dead_class );
491 if ( strcmp(class->server_class, class_id) == 0 )
493 info_handler( "%s != %s", class->server_class, class_id );
501 /* builds a new server class and connects to the jabber server with the new resource */
502 server_class_node* init_server_class(
503 transport_router_registrar* router, char* remote_id, char* server_class ) {
505 size_t len = sizeof( server_class_node );
506 server_class_node* node = (server_class_node*) safe_malloc( len );
508 node->jabber = jabber_connect_init( router->jabber->server,
509 router->jabber->port, router->jabber->username,
510 router->jabber->password, server_class, router->jabber->connect_timeout );
514 node->server_class = strdup( server_class );
515 if( server_class == NULL ) {
516 fatal_handler( "imit_server_class(): out of memory for %s", server_class );
520 info_handler( "Received class to init_server_class: %s", server_class );
521 node->current_server_node = init_server_node( remote_id );
522 if( node->current_server_node == NULL ) {
523 fatal_handler( "init_server_class(): NULL server_node for %s", remote_id );
528 if( ! j_connect( node->jabber ) ) {
529 fatal_handler( "Unable to init server class %s", node->server_class );
533 info_handler( "Jabber address in init for %s : address %x : username %s : resource %s",
534 node->server_class, node->jabber->t_client->session->sock_obj->sock_fd,
535 node->jabber->username, node->jabber->resource );
541 /* builds a new server_node to be added to the ring of server_nodes */
542 server_node* init_server_node( char* remote_id ) {
544 info_handler( "Initing server node for %s", remote_id );
545 server_node* current_server_node;
546 size_t size = sizeof( server_node);
547 current_server_node = (server_node*) safe_malloc( size );
549 current_server_node->remote_id = strdup(remote_id);
550 if( current_server_node->remote_id == NULL ) {
551 fatal_handler("init_server_class(): Out of Memory for %s", remote_id );
555 current_server_node->reg_time = time(NULL);
556 current_server_node->available = 1;
557 current_server_node->next = current_server_node;
558 current_server_node->prev = current_server_node;
561 return current_server_node;
565 int server_class_handle_msg( transport_router_registrar* router,
566 server_class_node* s_node, transport_message* msg ) {
568 if( s_node->current_server_node == NULL ) {
569 /* return error to client ??!*/
570 /* WE have no one to send the message to */
571 warning_handler( "We no longer have any servers for %s : "
572 "no one to send the message to. Sending error message to %s", s_node->server_class, msg->sender );
573 free( msg->recipient );
575 char* rec = strdup( msg->sender );
577 fatal_handler( "class msg_handler: out of memory");
581 info_handler( "Building error message to return for %s", s_node->server_class);
582 msg->recipient = rec;
583 set_msg_error(msg, "cancel", 501);
585 client_send_message( s_node->jabber->t_client, msg );
588 remove_server_class( router, s_node );
593 info_handler( "[%s] Received from %s to \n%s",
594 s_node->server_class, msg->sender, msg->recipient );
596 if( msg->is_error ) {
597 warning_handler( "We've received an error message type: %s : code: %d",
598 msg->error_type, msg->error_code );
600 if( strcmp( msg->error_type, "cancel" ) == 0 ) {
601 warning_handler( "Looks like we've lost a server!" );
602 server_node* dead_node = find_server_node( s_node, msg->sender );
604 if( dead_node != NULL ) {
605 //message_free( msg );
606 transport_message* tmp = dead_node->last_sent;
608 /* copy over last sent, it will be freed in the unregister function */
609 transport_message* tmp2 = message_init( tmp->body, tmp->subject, tmp->thread,
610 tmp->recipient, tmp->sender );
612 message_set_router_info( tmp2, tmp->router_from,
613 tmp->router_to, tmp->router_class, tmp->router_command, tmp->broadcast );
615 if( ! unregister_server_node( s_node, dead_node->remote_id ) ) {
616 /* WE have no one to send the message to */
617 warning_handler( "We no longer have any servers for %s : "
618 "no one to send the message to.", s_node->server_class );
619 free( msg->recipient );
621 char* rec = strdup( msg->router_from );
623 fatal_handler( "class msg_handler: out of memory");
627 info_handler( "Building error message to return for %s", s_node->server_class);
628 msg->recipient = rec;
629 client_send_message( s_node->jabber->t_client, msg );
630 message_free( tmp2 );
642 server_node* c_node = s_node->current_server_node->next;
644 /* not implemented yet */
645 while( ! c_node->available ) {
646 if( c_node == s_node->current_server_node ) {
647 warning_handler("No server_node's are available for %s", s_node->server_class );
648 /* XXX send error message to client */
651 c_node = c_node->next;
653 s_node->current_server_node = c_node;
655 transport_message * new_msg =
656 message_init( msg->body, msg->subject, msg->thread,
657 s_node->current_server_node->remote_id, msg->sender );
659 message_set_router_info( new_msg, NULL, msg->sender, NULL, NULL, 0 );
661 info_handler( "[%s] Routing message from [%s]\nto [%s]", s_node->server_class, new_msg->sender, new_msg->recipient );
663 message_free( s_node->current_server_node->last_sent );
664 s_node->current_server_node->last_sent = msg;
666 if ( new_msg != NULL && client_send_message( s_node->jabber->t_client, new_msg ) ) {
667 s_node->current_server_node->serve_count++;
668 s_node->current_server_node->la_time = time(NULL);
669 message_free( new_msg ); // XXX
672 info_handler( "message sent" );
673 message_free( new_msg ); // XXX
678 int router_registrar_free( transport_router_registrar* router_registrar ) {
679 if( router_registrar == NULL ) return 0;
680 jabber_connect_free( router_registrar->jabber );
682 /* free the server_class list XXX */
683 while( router_registrar->server_class_list != NULL ) {
684 remove_server_class(router_registrar, router_registrar->server_class_list);
687 free( router_registrar );
695 int server_class_node_free( server_class_node* node ) {
696 if( node == NULL ) { return 0; }
697 if( node->server_class != NULL )
698 free( node->server_class );
700 jabber_connect_free( node->jabber );
702 /* just in case, free the list */
703 while( node->current_server_node != NULL ) {
704 unregister_server_node( node, node->current_server_node->remote_id );
710 int server_node_free( server_node* node ) {
711 if( node == NULL ) { return 0; }
712 message_free( node->last_sent );
713 free( node->remote_id );
718 int jabber_connect_free( jabber_connect* jabber ) {
719 if( jabber == NULL ) { return 0; }
720 client_free( jabber->t_client );
721 free( jabber->username );
722 free( jabber->password );
723 free( jabber->resource );
724 free( jabber->server );