72b203f0375cc9e3c7d83c2521bb3367d33446b5
[OpenSRF.git] / src / libopensrf / osrf_transgroup.c
1 #include <opensrf/osrf_transgroup.h>
2 #include <sys/select.h>
3
4
5 osrfTransportGroupNode* osrfNewTransportGroupNode( 
6                 char* domain, int port, char* username, char* password, char* resource ) {
7
8         if(!(domain && port && username && password && resource)) return NULL;
9
10         osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode));
11         node->domain    = strdup(domain);
12         node->port              = port;
13         node->username = strdup(username);
14         node->password = strdup(password);
15         node->domain    = strdup(domain);
16         node->resource  = strdup(resource);
17         node->active    = 0;
18         node->lastsent  = 0;
19         node->connection = client_init( domain, port, NULL, 0 );
20
21         return node;
22 }
23
24
25 osrfTransportGroup* osrfNewTransportGroup( osrfTransportGroupNode* nodes[], int count ) {
26         if(!nodes || count < 1) return NULL;
27
28         osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
29         grp->nodes                                      = osrfNewHash();
30         grp->itr                                                = osrfNewHashIterator(grp->nodes);
31
32         int i;
33         for( i = 0; i != count; i++ ) {
34                 if(!(nodes[i] && nodes[i]->domain) ) return NULL;
35                 osrfHashSet( grp->nodes, nodes[i], nodes[i]->domain );
36                 osrfLogDebug( OSRF_LOG_MARK, "Adding domain %s to TransportGroup", nodes[i]->domain);
37         }
38
39         return grp;
40 }
41
42
43 /* connect all of the nodes to their servers */
44 int osrfTransportGroupConnectAll( osrfTransportGroup* grp ) {
45         if(!grp) return -1;
46         int active = 0;
47
48         osrfTransportGroupNode* node;
49         osrfHashIteratorReset(grp->itr);
50
51         while( (node = osrfHashIteratorNext(grp->itr)) ) {
52                 osrfLogInfo( OSRF_LOG_MARK, "TransportGroup attempting to connect to domain %s", 
53                                                          node->connection->session->server);
54
55                 if(client_connect( node->connection, node->username, 
56                                         node->password, node->resource, 10, AUTH_DIGEST )) {
57                         node->active = 1;
58                         active++;
59                         osrfLogInfo( OSRF_LOG_MARK, "TransportGroup successfully connected to domain %s", 
60                                                          node->connection->session->server);
61                 } else {
62                         osrfLogWarning( OSRF_LOG_MARK, "TransportGroup unable to connect to domain %s", 
63                                                          node->connection->session->server);
64                 }
65         }
66
67         osrfHashIteratorReset(grp->itr);
68         return active;
69 }
70
71 void osrfTransportGroupDisconnectAll( osrfTransportGroup* grp ) {
72         if(!grp) return;
73
74         osrfTransportGroupNode* node;
75         osrfHashIteratorReset(grp->itr);
76
77         while( (node = osrfHashIteratorNext(grp->itr)) ) {
78                 osrfLogInfo( OSRF_LOG_MARK, "TransportGroup disconnecting from domain %s", 
79                                                          node->connection->session->server);
80                 client_disconnect(node->connection);
81                 node->active = 0;
82         }
83
84         osrfHashIteratorReset(grp->itr);
85 }
86
87
88 int osrfTransportGroupSendMatch( osrfTransportGroup* grp, transport_message* msg ) {
89         if(!(grp && msg)) return -1;
90
91         char domain[256];
92         bzero(domain, 256);
93         jid_get_domain( msg->recipient, domain, 255 );
94
95         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
96         if(node) {
97                 if( (client_send_message( node->connection, msg )) == 0 )
98                         return 0;
99         }
100
101         osrfLogWarning( OSRF_LOG_MARK, "Error sending message to domain %s", domain );
102         return -1;
103 }
104
105 int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg ) {
106
107         if(!(grp && msg)) return -1;
108         int bufsize = 256;
109
110         char domain[bufsize];
111         bzero(domain, bufsize);
112         jid_get_domain( msg->recipient, domain, bufsize - 1 );
113
114         char msgrecip[bufsize];
115         bzero(msgrecip, bufsize);
116         jid_get_username(msg->recipient, msgrecip, bufsize - 1);
117
118         char msgres[bufsize];
119         bzero(msgres, bufsize);
120         jid_get_resource(msg->recipient, msgres, bufsize - 1);
121
122         char* firstdomain = NULL;
123         char newrcp[1024];
124
125         int updateRecip = 1;
126         /* if we don't host this domain, don't update the recipient but send it as is */
127         if(!osrfHashGet(grp->nodes, domain)) updateRecip = 0;
128
129         osrfTransportGroupNode* node;
130
131         do {
132
133                 node = osrfHashIteratorNext(grp->itr);
134                 if(!node) osrfHashIteratorReset(grp->itr);
135
136                 node = osrfHashIteratorNext(grp->itr);
137                 if(!node) return -1;
138
139                 if(firstdomain == NULL) {
140                         firstdomain = node->domain;
141
142                 } else {
143                         if(!strcmp(firstdomain, node->domain)) { /* we've made a full loop */
144                                 osrfLogWarning( OSRF_LOG_MARK, "We've tried to send to all domains.. giving up");
145                                 return -1;
146                         }
147                 }
148
149                 /* update the recipient domain if necessary */
150
151                 if(updateRecip) {
152                         bzero(newrcp, 1024);
153                         sprintf(newrcp, "%s@%s/%s", msgrecip, node->domain, msgres);
154                         free(msg->recipient);
155                         msg->recipient = strdup(newrcp);
156                 }
157
158                 if( (client_send_message( node->connection, msg )) == 0 ) 
159                         return 0;
160
161         } while(1);
162
163         return -1;
164 }
165
166 static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) {
167         if(!(fdset && maxfd)) return 0;
168
169         struct timeval tv;
170         tv.tv_sec = timeout;
171         tv.tv_usec = 0;
172         int retval = 0;
173
174         if( timeout < 0 ) {
175                 if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 ) 
176                         return 0;
177
178         } else {
179                 if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 ) 
180                         return 0;
181         }
182
183         return retval;
184 }
185
186
187 transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
188         if(!grp) return NULL;
189
190         int maxfd = 0;
191         fd_set fdset;
192         FD_ZERO( &fdset );
193
194         osrfTransportGroupNode* node;
195         osrfHashIterator* itr = osrfNewHashIterator(grp->nodes);
196
197         while( (node = osrfHashIteratorNext(itr)) ) {
198                 if(node->active) {
199                         int fd = node->connection->session->sock_id;
200                         if( fd < maxfd ) maxfd = fd;
201                         FD_SET( fd, &fdset );
202                 }
203         }
204         osrfHashIteratorReset(itr);
205
206         if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
207                 while( (node = osrfHashIteratorNext(itr)) ) {
208                         if(node->active) {
209                                 int fd = node->connection->session->sock_id;
210                                 if( FD_ISSET( fd, &fdset ) ) {
211                                         return client_recv( node->connection, 0 );
212                                 }
213                         }
214                 }
215         }
216
217         osrfHashIteratorFree(itr);
218         return NULL;
219 }
220
221 transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
222         if(!(grp && domain)) return NULL;
223
224         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
225         if(!node && node->connection && node->connection->session) return NULL;
226         int fd = node->connection->session->sock_id;
227
228         fd_set fdset;
229         FD_ZERO( &fdset );
230         FD_SET( fd, &fdset );
231
232         int active = __osrfTGWait( &fdset, fd, timeout );
233         if(active) return client_recv( node->connection, 0 );
234         
235         return NULL;
236 }
237
238 void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
239         if(!(grp && domain)) return;
240         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain );
241         if(node) node->active = 0;
242 }
243
244