Broad patch from Dan Scott to move towards better memory management:
[OpenSRF.git] / src / libopensrf / osrf_transgroup.c
1 #include <opensrf/osrf_transgroup.h>
2 #include <sys/select.h>
3
4
5 osrfTransportGroupNode* osrfNewTransportGroupNode( 
6                 char* domain, int port, char* username, char* password, char* resource ) {
7
8         if(!(domain && port && username && password && resource)) return NULL;
9
10         osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode));
11         node->domain    = strdup(domain);
12         node->port              = port;
13         node->username = strdup(username);
14         node->password = strdup(password);
15         node->domain    = strdup(domain);
16         node->resource  = strdup(resource);
17         node->active    = 0;
18         node->lastsent  = 0;
19         node->connection = client_init( domain, port, NULL, 0 );
20
21         return node;
22 }
23
24
25 osrfTransportGroup* osrfNewTransportGroup( osrfTransportGroupNode* nodes[], int count ) {
26         if(!nodes || count < 1) return NULL;
27
28         osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
29         grp->nodes                                      = osrfNewHash();
30         grp->itr                                                = osrfNewHashIterator(grp->nodes);
31
32         int i;
33         for( i = 0; i != count; i++ ) {
34                 if(!(nodes[i] && nodes[i]->domain) ) return NULL;
35                 osrfHashSet( grp->nodes, nodes[i], nodes[i]->domain );
36                 osrfLogDebug( OSRF_LOG_MARK, "Adding domain %s to TransportGroup", nodes[i]->domain);
37         }
38
39         return grp;
40 }
41
42
43 /* connect all of the nodes to their servers */
44 int osrfTransportGroupConnectAll( osrfTransportGroup* grp ) {
45         if(!grp) return -1;
46         int active = 0;
47
48         osrfTransportGroupNode* node;
49         osrfHashIteratorReset(grp->itr);
50
51         while( (node = osrfHashIteratorNext(grp->itr)) ) {
52                 osrfLogInfo( OSRF_LOG_MARK, "TransportGroup attempting to connect to domain %s", 
53                                                          node->connection->session->server);
54
55                 if(client_connect( node->connection, node->username, 
56                                         node->password, node->resource, 10, AUTH_DIGEST )) {
57                         node->active = 1;
58                         active++;
59                         osrfLogInfo( OSRF_LOG_MARK, "TransportGroup successfully connected to domain %s", 
60                                                          node->connection->session->server);
61                 } else {
62                         osrfLogWarning( OSRF_LOG_MARK, "TransportGroup unable to connect to domain %s", 
63                                                          node->connection->session->server);
64                 }
65         }
66
67         osrfHashIteratorReset(grp->itr);
68         return active;
69 }
70
71 void osrfTransportGroupDisconnectAll( osrfTransportGroup* grp ) {
72         if(!grp) return;
73
74         osrfTransportGroupNode* node;
75         osrfHashIteratorReset(grp->itr);
76
77         while( (node = osrfHashIteratorNext(grp->itr)) ) {
78                 osrfLogInfo( OSRF_LOG_MARK, "TransportGroup disconnecting from domain %s", 
79                                                          node->connection->session->server);
80                 client_disconnect(node->connection);
81                 node->active = 0;
82         }
83
84         osrfHashIteratorReset(grp->itr);
85 }
86
87
88 int osrfTransportGroupSendMatch( osrfTransportGroup* grp, transport_message* msg ) {
89         if(!(grp && msg)) return -1;
90
91         char domain[256];
92         memset(domain, 0, sizeof(domain));
93         jid_get_domain( msg->recipient, domain, 255 );
94
95         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
96         if(node) {
97                 if( (client_send_message( node->connection, msg )) == 0 )
98                         return 0;
99         }
100
101         osrfLogWarning( OSRF_LOG_MARK, "Error sending message to domain %s", domain );
102         return -1;
103 }
104
105 int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg ) {
106
107         if(!(grp && msg)) return -1;
108         int bufsize = 256;
109
110         char domain[bufsize];
111         memset(domain, 0, sizeof(domain));
112         jid_get_domain( msg->recipient, domain, bufsize - 1 );
113
114         char msgrecip[bufsize];
115         memset(msgrecip, 0, sizeof(msgrecip));
116         jid_get_username(msg->recipient, msgrecip, bufsize - 1);
117
118         char msgres[bufsize];
119         memset(msgres, 0, sizeof(msgres));
120         jid_get_resource(msg->recipient, msgres, bufsize - 1);
121
122         char* firstdomain = NULL;
123         char newrcp[1024];
124
125         int updateRecip = 1;
126         /* if we don't host this domain, don't update the recipient but send it as is */
127         if(!osrfHashGet(grp->nodes, domain)) updateRecip = 0;
128
129         osrfTransportGroupNode* node;
130
131         do {
132
133                 node = osrfHashIteratorNext(grp->itr);
134                 if(!node) osrfHashIteratorReset(grp->itr);
135
136                 node = osrfHashIteratorNext(grp->itr);
137                 if(!node) return -1;
138
139                 if(firstdomain == NULL) {
140                         firstdomain = node->domain;
141
142                 } else {
143                         if(!strcmp(firstdomain, node->domain)) { /* we've made a full loop */
144                                 osrfLogWarning( OSRF_LOG_MARK, "We've tried to send to all domains.. giving up");
145                                 return -1;
146                         }
147                 }
148
149                 /* update the recipient domain if necessary */
150
151                 if(updateRecip) {
152                         snprintf(newrcp, sizeof(newrcp), "%s@%s/%s", msgrecip, node->domain, msgres);
153                         free(msg->recipient);
154                         msg->recipient = strdup(newrcp);
155                 }
156
157                 if( (client_send_message( node->connection, msg )) == 0 ) 
158                         return 0;
159
160         } while(1);
161
162         return -1;
163 }
164
165 static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) {
166         if(!(fdset && maxfd)) return 0;
167
168         struct timeval tv;
169         tv.tv_sec = timeout;
170         tv.tv_usec = 0;
171         int retval = 0;
172
173         if( timeout < 0 ) {
174                 if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 ) 
175                         return 0;
176
177         } else {
178                 if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 ) 
179                         return 0;
180         }
181
182         return retval;
183 }
184
185
186 transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
187         if(!grp) return NULL;
188
189         int maxfd = 0;
190         fd_set fdset;
191         FD_ZERO( &fdset );
192
193         osrfTransportGroupNode* node;
194         osrfHashIterator* itr = osrfNewHashIterator(grp->nodes);
195
196         while( (node = osrfHashIteratorNext(itr)) ) {
197                 if(node->active) {
198                         int fd = node->connection->session->sock_id;
199                         if( fd < maxfd ) maxfd = fd;
200                         FD_SET( fd, &fdset );
201                 }
202         }
203         osrfHashIteratorReset(itr);
204
205         if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
206                 while( (node = osrfHashIteratorNext(itr)) ) {
207                         if(node->active) {
208                                 int fd = node->connection->session->sock_id;
209                                 if( FD_ISSET( fd, &fdset ) ) {
210                                         return client_recv( node->connection, 0 );
211                                 }
212                         }
213                 }
214         }
215
216         osrfHashIteratorFree(itr);
217         return NULL;
218 }
219
220 transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
221         if(!(grp && domain)) return NULL;
222
223         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
224         if(!node && node->connection && node->connection->session) return NULL;
225         int fd = node->connection->session->sock_id;
226
227         fd_set fdset;
228         FD_ZERO( &fdset );
229         FD_SET( fd, &fdset );
230
231         int active = __osrfTGWait( &fdset, fd, timeout );
232         if(active) return client_recv( node->connection, 0 );
233         
234         return NULL;
235 }
236
237 void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
238         if(!(grp && domain)) return;
239         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain );
240         if(node) node->active = 0;
241 }
242
243