1 #include "osrf_router.h"
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
10 osrfRouter* osrfNewRouter(
11 char* domain, char* name,
12 char* resource, char* password, int port,
13 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
15 if(!( domain && name && resource && password && port )) return NULL;
17 osrfRouter* router = safe_malloc(sizeof(osrfRouter));
18 router->domain = strdup(domain);
19 router->name = strdup(name);
20 router->password = strdup(password);
21 router->resource = strdup(resource);
24 router->trustedClients = trustedClients;
25 router->trustedServers = trustedServers;
27 router->classes = osrfNewHash();
28 router->classes->freeItem = &osrfRouterClassFree;
30 router->connection = client_init( domain, port, NULL, 0 );
37 int osrfRouterConnect( osrfRouter* router ) {
38 if(!router) return -1;
39 int ret = client_connect( router->connection, router->name,
40 router->password, router->resource, 10, AUTH_DIGEST );
41 if( ret == 0 ) return -1;
46 void osrfRouterRun( osrfRouter* router ) {
47 if(!(router && router->classes)) return;
49 int routerfd = router->ROUTER_SOCKFD;
55 int maxfd = __osrfRouterFillFDSet( router, &set );
58 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
59 warning_handler("Top level select call failed with errno %d", errno);
63 /* see if there is a top level router message */
65 if( FD_ISSET(routerfd, &set) ) {
66 debug_handler("Top router socket is active: %d", routerfd );
68 osrfRouterHandleIncoming( router );
72 /* now check each of the connected classes and see if they have data to route */
73 while( numhandled < selectret ) {
75 osrfRouterClass* class;
76 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
78 while( (class = osrfHashIteratorNext(itr)) ) {
80 char* classname = itr->current;
82 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
84 debug_handler("Checking %s for activity...", classname );
86 int sockfd = class->ROUTER_SOCKFD;
87 if(FD_ISSET( sockfd, &set )) {
88 debug_handler("Socket is active: %d", sockfd );
90 osrfRouterClassHandleIncoming( router, classname, class );
95 osrfHashIteratorFree(itr);
101 void osrfRouterHandleIncoming( osrfRouter* router ) {
104 transport_message* msg = NULL;
106 if( (msg = client_recv( router->connection, 0 )) ) {
110 /* if the sender is not a trusted server, drop the message */
111 int len = strlen(msg->sender) + 1;
114 jid_get_domain( msg->sender, domain );
116 if(osrfStringArrayContains( router->trustedServers, domain))
117 osrfRouterHandleMessage( router, msg );
119 warning_handler("Received message from un-trusted server domain %s", msg->sender);
126 int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
127 if(!(router && class)) return -1;
129 transport_message* msg;
130 debug_handler("osrfRouterClassHandleIncoming()");
132 if( (msg = client_recv( class->connection, 0 )) ) {
136 /* if the client is not from a trusted domain, drop the message */
137 int len = strlen(msg->sender) + 1;
140 jid_get_domain( msg->sender, domain );
142 if(osrfStringArrayContains( router->trustedClients, domain)) {
144 transport_message* bouncedMessage = NULL;
145 if( msg->is_error ) {
147 /* handle bounced message */
148 if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) )
149 return -1; /* we have no one to send the requested message to */
152 msg = bouncedMessage;
154 osrfRouterClassHandleMessage( router, class, msg );
157 warning_handler("Received client message from untrusted client domain %s", domain );
170 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
171 if(!(router && msg)) return -1;
173 if( !msg->router_command || !strcmp(msg->router_command,""))
174 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
176 if(!msg->router_class) return -1;
178 osrfRouterClass* class = NULL;
179 if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
180 class = osrfRouterFindClass( router, msg->router_class );
182 info_handler("Registering class %s", msg->router_class );
184 if(!class) class = osrfRouterAddClass( router, msg->router_class );
188 if( osrfRouterClassFindNode( class, msg->sender ) )
191 osrfRouterClassAddNode( class, msg->sender );
195 } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
197 if( msg->router_class && strcmp( msg->router_class, "") ) {
198 info_handler("Unregistering router class %s", msg->router_class );
199 osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
208 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
209 if(!(router && router->classes && classname)) return NULL;
211 osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
212 class->nodes = osrfNewHash();
213 class->itr = osrfNewHashIterator(class->nodes);
214 class->nodes->freeItem = &osrfRouterNodeFree;
215 class->router = router;
217 class->connection = client_init( router->domain, router->port, NULL, 0 );
219 if(!client_connect( class->connection, router->name,
220 router->password, classname, 10, AUTH_DIGEST ) ) {
221 osrfRouterClassFree( classname, class );
225 osrfHashSet( router->classes, class, classname );
230 int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
231 if(!(rclass && rclass->nodes && remoteId)) return -1;
233 info_handler("Adding router node for remote id %s", remoteId );
235 osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
237 node->lastMessage = NULL;
238 node->remoteId = strdup(remoteId);
240 osrfHashSet( rclass->nodes, node, remoteId );
244 /* copy off the lastMessage, remove the offending node, send error if it's tht last node
245 ? return NULL if it's the last node ?
248 transport_message* osrfRouterClassHandleBounce(
249 osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
251 debug_handler("osrfRouterClassHandleBounce()");
253 warning_handler("Received network layer error message from %s", msg->sender );
254 osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
255 transport_message* lastSent = NULL;
257 if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
259 if( node->lastMessage ) {
260 warning_handler("We lost the last node in the class, responding with error and removing...");
262 transport_message* error = message_init(
263 node->lastMessage->body, node->lastMessage->subject,
264 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
265 set_msg_error( error, "cancel", 501 );
267 /* send the error message back to the original sender */
268 client_send_message( rclass->connection, error );
269 message_free( error );
276 if( node->lastMessage ) {
277 debug_handler("Cloning lastMessage so next node can send it");
278 lastSent = message_init( node->lastMessage->body,
279 node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
280 message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
284 /* remove the dead node */
285 osrfRouterClassRemoveNode( router, classname, msg->sender);
291 If we get a regular message, we send it to the next node in the list of nodes
292 if we get an error, it's a bounce back from a previous attempt. We take the
293 body and thread from the last sent on the node that had the bounced message
294 and propogate them on to the new message being sent
296 int osrfRouterClassHandleMessage(
297 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
298 if(!(router && rclass && msg)) return -1;
300 debug_handler("osrfRouterClassHandleMessage()");
302 osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
304 osrfHashIteratorReset(rclass->itr);
305 node = osrfHashIteratorNext( rclass->itr );
310 transport_message* new_msg= message_init( msg->body,
311 msg->subject, msg->thread, node->remoteId, msg->sender );
312 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
314 info_handler( "Routing message:\nfrom: [%s]\nto: [%s]",
315 new_msg->router_from, new_msg->recipient );
317 message_free( node->lastMessage );
318 node->lastMessage = new_msg;
320 if ( client_send_message( rclass->connection, new_msg ) == 0 )
324 message_prepare_xml(new_msg);
325 warning_handler("Error sending message from %s to %s\n%s",
326 new_msg->sender, new_msg->recipient, new_msg->msg_xml );
335 int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
336 if(!(router && router->classes && classname)) return -1;
337 info_handler("Removing router class %s", classname );
338 osrfHashRemove( router->classes, classname );
343 int osrfRouterClassRemoveNode(
344 osrfRouter* router, char* classname, char* remoteId ) {
346 if(!(router && router->classes && classname && remoteId)) return 0;
348 info_handler("Removing router node %s", remoteId );
350 osrfRouterClass* class = osrfRouterFindClass( router, classname );
354 osrfHashRemove( class->nodes, remoteId );
355 if( osrfHashGetCount(class->nodes) == 0 ) {
356 osrfRouterRemoveClass( router, classname );
367 void osrfRouterClassFree( char* classname, void* c ) {
368 if(!(classname && c)) return;
369 osrfRouterClass* rclass = (osrfRouterClass*) c;
370 client_disconnect( rclass->connection );
371 client_free( rclass->connection );
373 osrfHashIteratorReset( rclass->itr );
374 osrfRouterNode* node;
376 while( (node = osrfHashIteratorNext(rclass->itr)) )
377 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
383 void osrfRouterNodeFree( char* remoteId, void* n ) {
385 osrfRouterNode* node = (osrfRouterNode*) n;
386 free(node->remoteId);
387 message_free(node->lastMessage);
392 void osrfRouterFree( osrfRouter* router ) {
395 free(router->domain);
397 free(router->resource);
398 free(router->password);
400 osrfStringArrayFree( router->trustedClients );
401 osrfStringArrayFree( router->trustedServers );
403 client_free( router->connection );
409 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
410 if(!( router && router->classes && classname )) return NULL;
411 return (osrfRouterClass*) osrfHashGet( router->classes, classname );
415 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
416 if(!(rclass && remoteId)) return NULL;
417 return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
421 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
422 if(!(router && router->classes && set)) return -1;
425 int maxfd = router->ROUTER_SOCKFD;
430 osrfRouterClass* class = NULL;
431 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
433 while( (class = osrfHashIteratorNext(itr)) ) {
434 char* classname = itr->current;
436 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
437 sockid = class->ROUTER_SOCKFD;
439 if( osrfUtilsCheckFileDescriptor( sockid ) ) {
440 osrfRouterRemoveClass( router, classname );
443 if( sockid > maxfd ) maxfd = sockid;
449 osrfHashIteratorFree(itr);
455 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
461 int num_msgs = osrf_message_deserialize( msg->body, arr, T );
462 osrfMessage* omsg = NULL;
465 for( i = 0; i != num_msgs; i++ ) {
467 if( !(omsg = arr[i]) ) continue;
469 switch( omsg->m_type ) {
472 osrfRouterRespondConnect( router, msg, omsg );
476 osrfRouterProcessAppRequest( router, msg, omsg );
482 osrfMessageFree( omsg );
488 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
489 if(!(router && msg && omsg)) return -1;
491 osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
493 debug_handler("router recevied a CONNECT message from %s", msg->sender );
495 osrf_message_set_status_info(
496 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
498 char* data = osrf_message_serialize(success);
500 transport_message* return_m = message_init(
501 data, "", msg->thread, msg->sender, "" );
503 client_send_message(router->connection, return_m);
506 osrf_message_free(success);
507 message_free(return_m);
514 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
516 if(!(router && msg && omsg && omsg->method_name)) return -1;
518 info_handler("Router received app request: %s", omsg->method_name );
520 jsonObject* jresponse = NULL;
521 if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
524 jresponse = jsonParseString("[]");
526 osrfStringArray* keys = osrfHashKeys( router->classes );
527 for( i = 0; i != keys->size; i++ )
528 jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
529 osrfStringArrayFree(keys);
534 return osrfRouterHandleMethodNFound( router, msg, omsg );
538 osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
539 jsonObjectFree(jresponse);
547 int osrfRouterHandleMethodNFound(
548 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
550 osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
551 osrf_message_set_status_info( err,
552 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
554 char* data = osrf_message_serialize(err);
556 transport_message* tresponse = message_init(
557 data, "", msg->thread, msg->sender, msg->recipient );
559 client_send_message(router->connection, tresponse );
562 osrf_message_free( err );
563 message_free(tresponse);
569 int osrfRouterHandleAppResponse( osrfRouter* router,
570 transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
572 if( response ) { /* send the response message */
574 osrfMessage* oresponse = osrf_message_init(
575 RESULT, omsg->thread_trace, omsg->protocol );
577 char* json = jsonObjectToJSON(response);
578 osrf_message_set_result_content( oresponse, json);
580 char* data = osrf_message_serialize(oresponse);
581 debug_handler( "Responding to client app request with data: \n%s\n", data );
583 transport_message* tresponse = message_init(
584 data, "", msg->thread, msg->sender, msg->recipient );
586 client_send_message(router->connection, tresponse );
588 osrfMessageFree(oresponse);
589 message_free(tresponse);
595 /* now send the 'request complete' message */
596 osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
597 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
599 char* statusdata = osrf_message_serialize(status);
601 transport_message* sresponse = message_init(
602 statusdata, "", msg->thread, msg->sender, msg->recipient );
603 client_send_message(router->connection, sresponse );
607 osrfMessageFree(status);
608 message_free(sresponse);