1 #include <opensrf/osrf_transgroup.h>
2 #include <sys/select.h>
5 osrfTransportGroupNode* osrfNewTransportGroupNode(
6 char* domain, int port, char* username, char* password, char* resource ) {
8 if(!(domain && port && username && password && resource)) return NULL;
10 osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode));
11 node->domain = strdup(domain);
13 node->username = strdup(username);
14 node->password = strdup(password);
15 node->domain = strdup(domain);
16 node->resource = strdup(resource);
19 node->connection = client_init( domain, port, NULL, 0 );
25 osrfTransportGroup* osrfNewTransportGroup( osrfTransportGroupNode* nodes[], int count ) {
26 if(!nodes || count < 1) return NULL;
28 osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
29 grp->nodes = osrfNewHash();
30 grp->itr = osrfNewHashIterator(grp->nodes);
33 for( i = 0; i != count; i++ ) {
34 if(!(nodes[i] && nodes[i]->domain) ) return NULL;
35 osrfHashSet( grp->nodes, nodes[i], nodes[i]->domain );
36 osrfLogDebug( OSRF_LOG_MARK, "Adding domain %s to TransportGroup", nodes[i]->domain);
43 /* connect all of the nodes to their servers */
44 int osrfTransportGroupConnectAll( osrfTransportGroup* grp ) {
48 osrfTransportGroupNode* node;
49 osrfHashIteratorReset(grp->itr);
51 while( (node = osrfHashIteratorNext(grp->itr)) ) {
52 osrfLogInfo( OSRF_LOG_MARK, "TransportGroup attempting to connect to domain %s",
53 node->connection->session->server);
55 if(client_connect( node->connection, node->username,
56 node->password, node->resource, 10, AUTH_DIGEST )) {
59 osrfLogInfo( OSRF_LOG_MARK, "TransportGroup successfully connected to domain %s",
60 node->connection->session->server);
62 osrfLogWarning( OSRF_LOG_MARK, "TransportGroup unable to connect to domain %s",
63 node->connection->session->server);
67 osrfHashIteratorReset(grp->itr);
71 void osrfTransportGroupDisconnectAll( osrfTransportGroup* grp ) {
74 osrfTransportGroupNode* node;
75 osrfHashIteratorReset(grp->itr);
77 while( (node = osrfHashIteratorNext(grp->itr)) ) {
78 osrfLogInfo( OSRF_LOG_MARK, "TransportGroup disconnecting from domain %s",
79 node->connection->session->server);
80 client_disconnect(node->connection);
84 osrfHashIteratorReset(grp->itr);
88 int osrfTransportGroupSendMatch( osrfTransportGroup* grp, transport_message* msg ) {
89 if(!(grp && msg)) return -1;
93 jid_get_domain( msg->recipient, domain, 255 );
95 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
97 if( (client_send_message( node->connection, msg )) == 0 )
101 osrfLogWarning( OSRF_LOG_MARK, "Error sending message to domain %s", domain );
105 int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg ) {
107 if(!(grp && msg)) return -1;
110 char domain[bufsize];
112 jid_get_domain( msg->recipient, domain, bufsize - 1 );
114 char msgrecip[bufsize];
116 jid_get_username(msg->recipient, msgrecip, bufsize - 1);
118 char msgres[bufsize];
120 jid_get_resource(msg->recipient, msgres, bufsize - 1);
122 char* firstdomain = NULL;
126 /* if we don't host this domain, don't update the recipient but send it as is */
127 if(!osrfHashGet(grp->nodes, domain)) updateRecip = 0;
129 osrfTransportGroupNode* node;
133 node = osrfHashIteratorNext(grp->itr);
134 if(!node) osrfHashIteratorReset(grp->itr);
136 node = osrfHashIteratorNext(grp->itr);
139 if(firstdomain == NULL) {
140 firstdomain = node->domain;
143 if(!strcmp(firstdomain, node->domain)) { /* we've made a full loop */
144 osrfLogWarning( OSRF_LOG_MARK, "We've tried to send to all domains.. giving up");
149 /* update the recipient domain if necessary */
152 snprintf(newrcp, sizeof(newrcp), "%s@%s/%s", msgrecip, node->domain, msgres);
153 free(msg->recipient);
154 msg->recipient = strdup(newrcp);
157 if( (client_send_message( node->connection, msg )) == 0 )
165 static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) {
166 if(!(fdset && maxfd)) return 0;
174 if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 )
178 if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 )
186 transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
187 if(!grp) return NULL;
193 osrfTransportGroupNode* node;
194 osrfHashIterator* itr = osrfNewHashIterator(grp->nodes);
196 while( (node = osrfHashIteratorNext(itr)) ) {
198 int fd = node->connection->session->sock_id;
199 if( fd < maxfd ) maxfd = fd;
200 FD_SET( fd, &fdset );
203 osrfHashIteratorReset(itr);
205 if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
206 while( (node = osrfHashIteratorNext(itr)) ) {
208 int fd = node->connection->session->sock_id;
209 if( FD_ISSET( fd, &fdset ) ) {
210 return client_recv( node->connection, 0 );
216 osrfHashIteratorFree(itr);
220 transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
221 if(!(grp && domain)) return NULL;
223 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
224 if(!node && node->connection && node->connection->session) return NULL;
225 int fd = node->connection->session->sock_id;
229 FD_SET( fd, &fdset );
231 int active = __osrfTGWait( &fdset, fd, timeout );
232 if(active) return client_recv( node->connection, 0 );
237 void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
238 if(!(grp && domain)) return;
239 osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain );
240 if(node) node->active = 0;