1 #include "osrf_transgroup.h"
2 #include <sys/select.h>
5 osrfTransportGroupNode* osrfNewTransportGroupNode(
6 char* domain, int port, char* username, char* password, char* resource ) {
8 if(!(domain && port && username && password && resource)) return NULL;
10 osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode));
11 node->domain = strdup(domain);
13 node->username = strdup(username);
14 node->password = strdup(password);
15 node->domain = strdup(domain);
16 node->resource = strdup(resource);
19 node->connection = client_init( domain, port, NULL, 0 );
25 osrfTransportGroup* osrfNewTransportGroup( osrfTransportGroupNode* nodes[], int count ) {
26 if(!nodes || count < 1) return NULL;
28 osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
29 grp->nodes = osrfNewHash();
30 grp->itr = osrfNewHashIterator(grp->nodes);
33 for( i = 0; i != count; i++ ) {
34 if(!(nodes[i] && nodes[i]->domain) ) return NULL;
35 osrfHashSet( grp->nodes, nodes[i], nodes[i]->domain );
36 osrfLogDebug("Adding domain %s to TransportGroup", nodes[i]->domain);
43 /* connect all of the nodes to their servers */
44 int osrfTransportGroupConnectAll( osrfTransportGroup* grp ) {
48 osrfTransportGroupNode* node;
49 osrfHashIteratorReset(grp->itr);
51 while( (node = osrfHashIteratorNext(grp->itr)) ) {
52 osrfLogInfo("TransportGroup attempting to connect to domain %s",
53 node->connection->session->server);
55 if(client_connect( node->connection, node->username,
56 node->password, node->resource, 10, AUTH_DIGEST )) {
59 osrfLogInfo("TransportGroup successfully connected to domain %s",
60 node->connection->session->server);
62 osrfLogWarning("TransportGroup unable to connect to domain %s",
63 node->connection->session->server);
67 osrfHashIteratorReset(grp->itr);
71 void osrfTransportGroupDisconnectAll( osrfTransportGroup* grp ) {
74 osrfTransportGroupNode* node;
75 osrfHashIteratorReset(grp->itr);
77 while( (node = osrfHashIteratorNext(grp->itr)) ) {
78 osrfLogInfo("TransportGroup disconnecting from domain %s",
79 node->connection->session->server);
80 client_disconnect(node->connection);
84 osrfHashIteratorReset(grp->itr);
88 int osrfTransportGroupSendMatch( osrfTransportGroup* grp, transport_message* msg ) {
89 if(!(grp && msg)) return -1;
93 jid_get_domain( msg->recipient, domain, 255 );
95 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
97 if( (client_send_message( node->connection, msg )) == 0 )
101 osrfLogWarning("Error sending message to domain %s", domain );
105 int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg ) {
107 if(!(grp && msg)) return -1;
110 char domain[bufsize];
111 bzero(domain, bufsize);
112 jid_get_domain( msg->recipient, domain, bufsize - 1 );
114 char msgrecip[bufsize];
115 bzero(msgrecip, bufsize);
116 jid_get_username(msg->recipient, msgrecip, bufsize - 1);
118 char msgres[bufsize];
119 bzero(msgres, bufsize);
120 jid_get_resource(msg->recipient, msgres, bufsize - 1);
122 char* firstdomain = NULL;
126 /* if we don't host this domain, don't update the recipient but send it as is */
127 if(!osrfHashGet(grp->nodes, domain)) updateRecip = 0;
129 osrfTransportGroupNode* node;
133 node = osrfHashIteratorNext(grp->itr);
134 if(!node) osrfHashIteratorReset(grp->itr);
136 node = osrfHashIteratorNext(grp->itr);
139 if(firstdomain == NULL) {
140 firstdomain = node->domain;
143 if(!strcmp(firstdomain, node->domain)) { /* we've made a full loop */
144 osrfLogWarning("We've tried to send to all domains.. giving up");
149 /* update the recipient domain if necessary */
153 sprintf(newrcp, "%s@%s/%s", msgrecip, node->domain, msgres);
154 free(msg->recipient);
155 msg->recipient = strdup(newrcp);
158 if( (client_send_message( node->connection, msg )) == 0 )
166 static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) {
167 if(!(fdset && maxfd)) return 0;
175 if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 )
179 if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 )
187 transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
188 if(!grp) return NULL;
194 osrfTransportGroupNode* node;
195 osrfHashIterator* itr = osrfNewHashIterator(grp->nodes);
197 while( (node = osrfHashIteratorNext(itr)) ) {
199 int fd = node->connection->session->sock_id;
200 if( fd < maxfd ) maxfd = fd;
201 FD_SET( fd, &fdset );
204 osrfHashIteratorReset(itr);
206 if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
207 while( (node = osrfHashIteratorNext(itr)) ) {
209 int fd = node->connection->session->sock_id;
210 if( FD_ISSET( fd, &fdset ) ) {
211 return client_recv( node->connection, 0 );
217 osrfHashIteratorFree(itr);
221 transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
222 if(!(grp && domain)) return NULL;
224 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
225 if(!node && node->connection && node->connection->session) return NULL;
226 int fd = node->connection->session->sock_id;
230 FD_SET( fd, &fdset );
232 int active = __osrfTGWait( &fdset, fd, timeout );
233 if(active) return client_recv( node->connection, 0 );
238 void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
239 if(!(grp && domain)) return;
240 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain );
241 if(node) node->active = 0;