1 #include "osrf_transgroup.h"
2 #include <sys/select.h>
5 osrfTransportGroupNode* osrfNewTransportGroupNode(
6 char* domain, int port, char* username, char* password, char* resource ) {
8 if(!(domain && port && username && password && resource)) return NULL;
10 osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode));
11 node->domain = strdup(domain);
13 node->username = strdup(username);
14 node->password = strdup(password);
15 node->domain = strdup(domain);
16 node->resource = strdup(resource);
19 node->connection = client_init( domain, port, NULL, 0 );
25 osrfTransportGroup* osrfNewTransportGroup( osrfTransportGroupNode* nodes[], int count ) {
26 if(!nodes || count < 1) return NULL;
28 osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
29 grp->nodes = osrfNewHash();
30 grp->itr = osrfNewHashIterator(grp->nodes);
33 for( i = 0; i != count; i++ ) {
34 if(!(nodes[i] && nodes[i]->domain) ) return NULL;
35 osrfHashSet( grp->nodes, nodes[i], nodes[i]->domain );
36 debug_handler("Adding domain %s to TransportGroup", nodes[i]->domain);
43 /* connect all of the nodes to their servers */
44 int osrfTransportGroupConnectAll( osrfTransportGroup* grp ) {
48 osrfTransportGroupNode* node;
49 osrfHashIteratorReset(grp->itr);
51 while( (node = osrfHashIteratorNext(grp->itr)) ) {
52 info_handler("TransportGroup attempting to connect to domain %s",
53 node->connection->session->server);
55 if(client_connect( node->connection, node->username,
56 node->password, node->resource, 10, AUTH_DIGEST )) {
59 info_handler("TransportGroup successfully connected to domain %s",
60 node->connection->session->server);
62 warning_handler("TransportGroup unable to connect to domain %s",
63 node->connection->session->server);
67 osrfHashIteratorReset(grp->itr);
71 void osrfTransportGroupDisconnectAll( osrfTransportGroup* grp ) {
74 osrfTransportGroupNode* node;
75 osrfHashIteratorReset(grp->itr);
77 while( (node = osrfHashIteratorNext(grp->itr)) ) {
78 info_handler("TransportGroup disconnecting from domain %s",
79 node->connection->session->server);
80 client_disconnect(node->connection);
84 osrfHashIteratorReset(grp->itr);
88 int osrfTransportGroupSendMatch( osrfTransportGroup* grp, transport_message* msg ) {
89 if(!(grp && msg)) return -1;
93 jid_get_domain( msg->recipient, domain, 255 );
95 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
97 if( (client_send_message( node->connection, msg )) == 0 )
101 return warning_handler("Error sending message to domain %s", domain );
104 int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg ) {
106 if(!(grp && msg)) return -1;
109 char domain[bufsize];
110 bzero(domain, bufsize);
111 jid_get_domain( msg->recipient, domain, bufsize - 1 );
113 char msgrecip[bufsize];
114 bzero(msgrecip, bufsize);
115 jid_get_username(msg->recipient, msgrecip, bufsize - 1);
117 char msgres[bufsize];
118 bzero(msgres, bufsize);
119 jid_get_resource(msg->recipient, msgres, bufsize - 1);
121 char* firstdomain = NULL;
125 /* if we don't host this domain, don't update the recipient but send it as is */
126 if(!osrfHashGet(grp->nodes, domain)) updateRecip = 0;
128 osrfTransportGroupNode* node;
132 node = osrfHashIteratorNext(grp->itr);
133 if(!node) osrfHashIteratorReset(grp->itr);
135 node = osrfHashIteratorNext(grp->itr);
138 if(firstdomain == NULL) {
139 firstdomain = node->domain;
142 if(!strcmp(firstdomain, node->domain)) { /* we've made a full loop */
143 return warning_handler("We've tried to send to all domains.. giving up");
147 /* update the recipient domain if necessary */
151 sprintf(newrcp, "%s@%s/%s", msgrecip, node->domain, msgres);
152 free(msg->recipient);
153 msg->recipient = strdup(newrcp);
156 if( (client_send_message( node->connection, msg )) == 0 )
164 static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) {
165 if(!(fdset && maxfd)) return 0;
173 if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 )
177 if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 )
185 transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
186 if(!grp) return NULL;
192 osrfTransportGroupNode* node;
193 osrfHashIterator* itr = osrfNewHashIterator(grp->nodes);
195 while( (node = osrfHashIteratorNext(itr)) ) {
197 int fd = node->connection->session->sock_id;
198 if( fd < maxfd ) maxfd = fd;
199 FD_SET( fd, &fdset );
202 osrfHashIteratorReset(itr);
204 if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
205 while( (node = osrfHashIteratorNext(itr)) ) {
207 int fd = node->connection->session->sock_id;
208 if( FD_ISSET( fd, &fdset ) ) {
209 return client_recv( node->connection, 0 );
215 osrfHashIteratorFree(itr);
219 transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
220 if(!(grp && domain)) return NULL;
222 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
223 if(!node && node->connection && node->connection->session) return NULL;
224 int fd = node->connection->session->sock_id;
228 FD_SET( fd, &fdset );
230 int active = __osrfTGWait( &fdset, fd, timeout );
231 if(active) return client_recv( node->connection, 0 );
236 void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
237 if(!(grp && domain)) return;
238 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain );
239 if(node) node->active = 0;