]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libstack/osrf_transgroup.c
cleaned up the code some
[OpenSRF.git] / src / libstack / osrf_transgroup.c
1 #include "osrf_transgroup.h"
2 #include <sys/select.h>
3
4
5 osrfTransportGroupNode* osrfNewTransportGroupNode( 
6                 char* domain, int port, char* username, char* password, char* resource ) {
7
8         if(!(domain && port && username && password && resource)) return NULL;
9
10         osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode));
11         node->domain    = strdup(domain);
12         node->port              = port;
13         node->username = strdup(username);
14         node->password = strdup(password);
15         node->domain    = strdup(domain);
16         node->resource  = strdup(resource);
17         node->active    = 0;
18         node->lastsent  = 0;
19         node->connection = client_init( domain, port, NULL, 0 );
20
21         return node;
22 }
23
24
25 osrfTransportGroup* osrfNewTransportGroup( osrfTransportGroupNode* nodes[], int count ) {
26         if(!nodes || count < 1) return NULL;
27
28         osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
29         grp->nodes                                      = osrfNewHash();
30         grp->itr                                                = osrfNewHashIterator(grp->nodes);
31
32         int i;
33         for( i = 0; i != count; i++ ) {
34                 if(!(nodes[i] && nodes[i]->domain) ) return NULL;
35                 osrfHashSet( grp->nodes, nodes[i], nodes[i]->domain );
36         }
37
38         return grp;
39 }
40
41
42 /* connect all of the nodes to their servers */
43 int osrfTransportGroupConnect( osrfTransportGroup* grp ) {
44         if(!grp) return -1;
45         int active = 0;
46
47         osrfTransportGroupNode* node;
48         osrfHashIteratorReset(grp->itr);
49
50         while( (node = osrfHashIteratorNext(grp->itr)) ) {
51                 if(client_connect( node->connection, node->username, 
52                                         node->password, node->resource, 10, AUTH_DIGEST )) {
53                         node->active = 1;
54                         active++;
55                 }
56         }
57
58         osrfHashIteratorReset(grp->itr);
59         return active;
60 }
61
62
63 int osrfTransportGroupSendMatch( osrfTransportGroup* grp, transport_message* msg ) {
64         if(!(grp && msg)) return -1;
65
66         char domain[256];
67         bzero(domain, 256);
68         jid_get_domain( msg->recipient, domain, 255 );
69
70         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
71         if(node) {
72                 if( (client_send_message( node->connection, msg )) == 0 )
73                         return 0;
74         }
75
76         return warning_handler("Error sending message to domain %s", domain );
77 }
78
79 int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg ) {
80
81         if(!(grp && msg)) return -1;
82         int bufsize = 256;
83
84         char domain[bufsize];
85         bzero(domain, bufsize);
86         jid_get_domain( msg->recipient, domain, bufsize - 1 );
87
88         char msgrecip[bufsize];
89         bzero(msgrecip, bufsize);
90         jid_get_username(msg->recipient, msgrecip, bufsize - 1);
91
92         char msgres[bufsize];
93         bzero(msgres, bufsize);
94         jid_get_resource(msg->recipient, msgres, bufsize - 1);
95
96         char* firstdomain = NULL;
97         char newrcp[1024];
98
99         int updateRecip = 1;
100         /* if we don't host this domain, don't update the recipient but send it as is */
101         if(!osrfHashGet(grp->nodes, domain)) updateRecip = 0;
102
103         osrfTransportGroupNode* node;
104
105         do {
106
107                 node = osrfHashIteratorNext(grp->itr);
108                 if(!node) osrfHashIteratorReset(grp->itr);
109
110                 node = osrfHashIteratorNext(grp->itr);
111                 if(!node) return -1;
112
113                 if(firstdomain == NULL) {
114                         firstdomain = node->domain;
115
116                 } else {
117                         if(!strcmp(firstdomain, node->domain)) { /* we've made a full loop */
118                                 return warning_handler("We've tried to send to all domains.. giving up");
119                         }
120                 }
121
122                 /* update the recipient domain if necessary */
123
124                 if(updateRecip) {
125                         bzero(newrcp, 1024);
126                         sprintf(newrcp, "%s@%s/%s", msgrecip, node->domain, msgres);
127                         free(msg->recipient);
128                         msg->recipient = strdup(newrcp);
129                 }
130
131                 if( (client_send_message( node->connection, msg )) == 0 ) 
132                         return 0;
133
134         } while(1);
135
136         return -1;
137 }
138
139 static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) {
140         if(!(fdset && maxfd)) return 0;
141
142         struct timeval tv;
143         tv.tv_sec = timeout;
144         tv.tv_usec = 0;
145         int retval = 0;
146
147         if( timeout < 0 ) {
148                 if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 ) 
149                         return 0;
150
151         } else {
152                 if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 ) 
153                         return 0;
154         }
155
156         return retval;
157 }
158
159
160 transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
161         if(!grp) return NULL;
162
163         int maxfd = 0;
164         fd_set fdset;
165         FD_ZERO( &fdset );
166
167         osrfTransportGroupNode* node;
168         osrfHashIterator* itr = osrfNewHashIterator(grp->nodes);
169
170         while( (node = osrfHashIteratorNext(itr)) ) {
171                 if(node->active) {
172                         int fd = node->connection->session->sock_id;
173                         if( fd < maxfd ) maxfd = fd;
174                         FD_SET( fd, &fdset );
175                 }
176         }
177         osrfHashIteratorReset(itr);
178
179         if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
180                 while( (node = osrfHashIteratorNext(itr)) ) {
181                         if(node->active) {
182                                 int fd = node->connection->session->sock_id;
183                                 if( FD_ISSET( fd, &fdset ) ) {
184                                         return client_recv( node->connection, 0 );
185                                 }
186                         }
187                 }
188         }
189
190         osrfHashIteratorFree(itr);
191         return NULL;
192 }
193
194 transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
195         if(!(grp && domain)) return NULL;
196
197         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
198         if(!node && node->connection && node->connection->session) return NULL;
199         int fd = node->connection->session->sock_id;
200
201         fd_set fdset;
202         FD_ZERO( &fdset );
203         FD_SET( fd, &fdset );
204
205         int active = __osrfTGWait( &fdset, fd, timeout );
206         if(active) return client_recv( node->connection, 0 );
207         
208         return NULL;
209 }
210
211 void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
212         if(!(grp && domain)) return;
213         osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain );
214         if(node) node->active = 0;
215 }
216
217