1 #include "osrf_router.h"
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
9 #define ROUTER_REQUEST_STATS_NODE_FULL "opensrf.router.info.stats.class.node.all"
10 #define ROUTER_REQUEST_STATS_CLASS_FULL "opensrf.router.info.stats.class.all"
11 #define ROUTER_REQUEST_STATS_CLASS "opensrf.router.info.stats.class"
12 #define ROUTER_REQUEST_STATS_CLASS_SUMMARY "opensrf.router.info.stats.class.summary"
14 osrfRouter* osrfNewRouter(
15 const char* domain, const char* name,
16 const char* resource, const char* password, int port,
17 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
19 if(!( domain && name && resource && password && port && trustedClients && trustedServers )) return NULL;
21 osrfRouter* router = safe_malloc(sizeof(osrfRouter));
22 router->domain = strdup(domain);
23 router->name = strdup(name);
24 router->password = strdup(password);
25 router->resource = strdup(resource);
28 router->trustedClients = trustedClients;
29 router->trustedServers = trustedServers;
32 router->classes = osrfNewHash();
33 router->classes->freeItem = &osrfRouterClassFree;
35 router->connection = client_init( domain, port, NULL, 0 );
42 int osrfRouterConnect( osrfRouter* router ) {
43 if(!router) return -1;
44 int ret = client_connect( router->connection, router->name,
45 router->password, router->resource, 10, AUTH_DIGEST );
46 if( ret == 0 ) return -1;
51 void osrfRouterRun( osrfRouter* router ) {
52 if(!(router && router->classes)) return;
54 int routerfd = router->ROUTER_SOCKFD;
60 int maxfd = __osrfRouterFillFDSet( router, &set );
63 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
64 osrfLogWarning( OSRF_LOG_MARK, "Top level select call failed with errno %d", errno);
68 /* see if there is a top level router message */
70 if( FD_ISSET(routerfd, &set) ) {
71 osrfLogDebug( OSRF_LOG_MARK, "Top router socket is active: %d", routerfd );
73 osrfRouterHandleIncoming( router );
77 /* now check each of the connected classes and see if they have data to route */
78 while( numhandled < selectret ) {
80 osrfRouterClass* class;
81 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
83 while( (class = osrfHashIteratorNext(itr)) ) {
85 char* classname = itr->current;
87 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
89 osrfLogDebug( OSRF_LOG_MARK, "Checking %s for activity...", classname );
91 int sockfd = class->ROUTER_SOCKFD;
92 if(FD_ISSET( sockfd, &set )) {
93 osrfLogDebug( OSRF_LOG_MARK, "Socket is active: %d", sockfd );
95 osrfRouterClassHandleIncoming( router, classname, class );
100 osrfHashIteratorFree(itr);
106 void osrfRouterHandleIncoming( osrfRouter* router ) {
109 transport_message* msg = NULL;
111 //if( (msg = client_recv( router->connection, 0 )) ) {
112 while( (msg = client_recv( router->connection, 0 )) ) {
116 osrfLogDebug(OSRF_LOG_MARK,
117 "osrfRouterHandleIncoming(): investigating message from %s", msg->sender);
119 /* if the sender is not a trusted server, drop the message */
120 int len = strlen(msg->sender) + 1;
122 memset(domain, 0, sizeof(domain));
123 jid_get_domain( msg->sender, domain, len - 1 );
125 if(osrfStringArrayContains( router->trustedServers, domain))
126 osrfRouterHandleMessage( router, msg );
128 osrfLogWarning( OSRF_LOG_MARK, "Received message from un-trusted server domain %s", msg->sender);
135 int osrfRouterClassHandleIncoming( osrfRouter* router, const char* classname,
136 osrfRouterClass* class ) {
137 if(!(router && class)) return -1;
139 transport_message* msg;
140 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleIncoming()");
142 while( (msg = client_recv( class->connection, 0 )) ) {
144 osrfLogSetXid(msg->osrf_xid);
148 osrfLogDebug(OSRF_LOG_MARK,
149 "osrfRouterClassHandleIncoming(): investigating message from %s", msg->sender);
151 /* if the client is not from a trusted domain, drop the message */
152 int len = strlen(msg->sender) + 1;
154 memset(domain, 0, sizeof(domain));
155 jid_get_domain( msg->sender, domain, len - 1 );
157 if(osrfStringArrayContains( router->trustedClients, domain)) {
159 transport_message* bouncedMessage = NULL;
160 if( msg->is_error ) {
162 /* handle bounced message */
163 if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) )
164 return -1; /* we have no one to send the requested message to */
167 msg = bouncedMessage;
169 osrfRouterClassHandleMessage( router, class, msg );
172 osrfLogWarning( OSRF_LOG_MARK, "Received client message from untrusted client domain %s", domain );
186 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
187 if(!(router && msg)) return -1;
189 if( !msg->router_command || !strcmp(msg->router_command,""))
190 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
192 if(!msg->router_class) return -1;
194 osrfRouterClass* class = NULL;
195 if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
196 class = osrfRouterFindClass( router, msg->router_class );
198 osrfLogInfo( OSRF_LOG_MARK, "Registering class %s", msg->router_class );
200 if(!class) class = osrfRouterAddClass( router, msg->router_class );
204 if( osrfRouterClassFindNode( class, msg->sender ) )
207 osrfRouterClassAddNode( class, msg->sender );
211 } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
213 if( msg->router_class && strcmp( msg->router_class, "") ) {
214 osrfLogInfo( OSRF_LOG_MARK, "Unregistering router class %s", msg->router_class );
215 osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
224 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, const char* classname ) {
225 if(!(router && router->classes && classname)) return NULL;
227 osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
228 class->nodes = osrfNewHash();
229 class->itr = osrfNewHashIterator(class->nodes);
230 class->nodes->freeItem = &osrfRouterNodeFree;
231 class->router = router;
233 class->connection = client_init( router->domain, router->port, NULL, 0 );
235 if(!client_connect( class->connection, router->name,
236 router->password, classname, 10, AUTH_DIGEST ) ) {
237 // We cast away the constness of classname. Though ugly, this
238 // cast is benign because osrfRouterClassFree doesn't actually
239 // write through the pointer. We can't readily change its
240 // signature because it is used for a function pointer, and
241 // we would have to change other signatures the same way.
242 osrfRouterClassFree( (char *) classname, class );
246 osrfHashSet( router->classes, class, classname );
251 int osrfRouterClassAddNode( osrfRouterClass* rclass, const char* remoteId ) {
252 if(!(rclass && rclass->nodes && remoteId)) return -1;
254 osrfLogInfo( OSRF_LOG_MARK, "Adding router node for remote id %s", remoteId );
256 osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
258 node->lastMessage = NULL;
259 node->remoteId = strdup(remoteId);
261 osrfHashSet( rclass->nodes, node, remoteId );
265 /* copy off the lastMessage, remove the offending node, send error if it's tht last node
266 ? return NULL if it's the last node ?
269 transport_message* osrfRouterClassHandleBounce( osrfRouter* router,
270 const char* classname, osrfRouterClass* rclass, transport_message* msg ) {
272 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
274 osrfLogInfo( OSRF_LOG_MARK, "Received network layer error message from %s", msg->sender );
275 osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
276 transport_message* lastSent = NULL;
278 if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
280 if( node->lastMessage ) {
281 osrfLogWarning( OSRF_LOG_MARK, "We lost the last node in the class, responding with error and removing...");
283 transport_message* error = message_init(
284 node->lastMessage->body, node->lastMessage->subject,
285 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
286 message_set_osrf_xid(error, node->lastMessage->osrf_xid);
287 set_msg_error( error, "cancel", 501 );
289 /* send the error message back to the original sender */
290 client_send_message( rclass->connection, error );
291 message_free( error );
299 if( node->lastMessage ) {
300 osrfLogDebug( OSRF_LOG_MARK, "Cloning lastMessage so next node can send it");
301 lastSent = message_init( node->lastMessage->body,
302 node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
303 message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
304 message_set_osrf_xid( lastSent, node->lastMessage->osrf_xid );
308 osrfLogInfo(OSRF_LOG_MARK, "network error occurred after we removed the class.. ignoring");
313 /* remove the dead node */
314 osrfRouterClassRemoveNode( router, classname, msg->sender);
320 If we get a regular message, we send it to the next node in the list of nodes
321 if we get an error, it's a bounce back from a previous attempt. We take the
322 body and thread from the last sent on the node that had the bounced message
323 and propogate them on to the new message being sent
325 int osrfRouterClassHandleMessage(
326 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
327 if(!(router && rclass && msg)) return -1;
329 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
331 osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
333 osrfHashIteratorReset(rclass->itr);
334 node = osrfHashIteratorNext( rclass->itr );
339 transport_message* new_msg= message_init( msg->body,
340 msg->subject, msg->thread, node->remoteId, msg->sender );
341 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
342 message_set_osrf_xid( new_msg, msg->osrf_xid );
344 osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
345 new_msg->router_from, new_msg->recipient );
347 message_free( node->lastMessage );
348 node->lastMessage = new_msg;
350 if ( client_send_message( rclass->connection, new_msg ) == 0 )
354 message_prepare_xml(new_msg);
355 osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s",
356 new_msg->sender, new_msg->recipient, new_msg->msg_xml );
365 int osrfRouterRemoveClass( osrfRouter* router, const char* classname ) {
366 if(!(router && router->classes && classname)) return -1;
367 osrfLogInfo( OSRF_LOG_MARK, "Removing router class %s", classname );
368 osrfHashRemove( router->classes, classname );
373 int osrfRouterClassRemoveNode(
374 osrfRouter* router, const char* classname, const char* remoteId ) {
376 if(!(router && router->classes && classname && remoteId)) return 0;
378 osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
380 osrfRouterClass* class = osrfRouterFindClass( router, classname );
384 osrfHashRemove( class->nodes, remoteId );
385 if( osrfHashGetCount(class->nodes) == 0 ) {
386 osrfRouterRemoveClass( router, classname );
397 void osrfRouterClassFree( char* classname, void* c ) {
398 if(!(classname && c)) return;
399 osrfRouterClass* rclass = (osrfRouterClass*) c;
400 client_disconnect( rclass->connection );
401 client_free( rclass->connection );
403 osrfHashIteratorReset( rclass->itr );
404 osrfRouterNode* node;
406 while( (node = osrfHashIteratorNext(rclass->itr)) )
407 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
409 osrfHashIteratorFree(rclass->itr);
410 osrfHashFree(rclass->nodes);
416 void osrfRouterNodeFree( char* remoteId, void* n ) {
418 osrfRouterNode* node = (osrfRouterNode*) n;
419 free(node->remoteId);
420 message_free(node->lastMessage);
425 void osrfRouterFree( osrfRouter* router ) {
428 free(router->domain);
430 free(router->resource);
431 free(router->password);
433 osrfStringArrayFree( router->trustedClients );
434 osrfStringArrayFree( router->trustedServers );
436 client_free( router->connection );
442 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, const char* classname ) {
443 if(!( router && router->classes && classname )) return NULL;
444 return (osrfRouterClass*) osrfHashGet( router->classes, classname );
448 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, const char* remoteId ) {
449 if(!(rclass && remoteId)) return NULL;
450 return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
454 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
455 if(!(router && router->classes && set)) return -1;
458 int maxfd = router->ROUTER_SOCKFD;
463 osrfRouterClass* class = NULL;
464 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
466 while( (class = osrfHashIteratorNext(itr)) ) {
467 char* classname = itr->current;
469 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
470 sockid = class->ROUTER_SOCKFD;
472 if( osrfUtilsCheckFileDescriptor( sockid ) ) {
474 osrfLogWarning(OSRF_LOG_MARK,
475 "Removing router class '%s' because of a bad top-level file descriptor [%d]", classname, sockid);
476 osrfRouterRemoveClass( router, classname );
479 if( sockid > maxfd ) maxfd = sockid;
485 osrfHashIteratorFree(itr);
491 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
495 memset(arr, 0, sizeof(arr));
497 int num_msgs = osrf_message_deserialize( msg->body, arr, T );
498 osrfMessage* omsg = NULL;
501 for( i = 0; i != num_msgs; i++ ) {
503 if( !(omsg = arr[i]) ) continue;
505 switch( omsg->m_type ) {
508 osrfRouterRespondConnect( router, msg, omsg );
512 osrfRouterProcessAppRequest( router, msg, omsg );
518 osrfMessageFree( omsg );
524 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
525 if(!(router && msg && omsg)) return -1;
527 osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
529 osrfLogDebug( OSRF_LOG_MARK, "router received a CONNECT message from %s", msg->sender );
531 osrf_message_set_status_info(
532 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
534 char* data = osrf_message_serialize(success);
536 transport_message* return_m = message_init(
537 data, "", msg->thread, msg->sender, "" );
539 client_send_message(router->connection, return_m);
542 osrf_message_free(success);
543 message_free(return_m);
550 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
552 if(!(router && msg && omsg && omsg->method_name)) return -1;
554 osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
556 jsonObject* jresponse = NULL;
557 if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
560 jresponse = jsonParseString("[]");
562 osrfStringArray* keys = osrfHashKeys( router->classes );
563 for( i = 0; i != keys->size; i++ )
564 jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
565 osrfStringArrayFree(keys);
568 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_SUMMARY )) {
570 osrfRouterClass* class;
571 osrfRouterNode* node;
574 char* classname = jsonObjectToSimpleString( jsonObjectGetIndex( omsg->_params, 0 ) );
581 class = osrfHashGet(router->classes, classname);
583 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
584 while( (node = osrfHashIteratorNext(node_itr)) ) {
585 count += node->count;
586 //jsonObjectSetKey( class_res, node->remoteId, jsonNewNumberObject( (double) node->count ) );
588 osrfHashIteratorFree(node_itr);
590 jresponse = jsonNewNumberObject( (double) count );
592 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS )) {
594 osrfRouterClass* class;
595 osrfRouterNode* node;
597 char* classname = jsonObjectToSimpleString( jsonObjectGetIndex( omsg->_params, 0 ) );
604 jresponse = jsonParseString("{}");
605 class = osrfHashGet(router->classes, classname);
607 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
608 while( (node = osrfHashIteratorNext(node_itr)) ) {
609 jsonObjectSetKey( jresponse, node->remoteId, jsonNewNumberObject( (double) node->count ) );
611 osrfHashIteratorFree(node_itr);
613 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_FULL )) {
615 osrfRouterClass* class;
616 osrfRouterNode* node;
617 jresponse = jsonParseString("{}");
619 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
620 while( (class = osrfHashIteratorNext(class_itr)) ) {
622 jsonObject* class_res = jsonParseString("{}");
623 char* classname = class_itr->current;
625 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
626 while( (node = osrfHashIteratorNext(node_itr)) ) {
627 jsonObjectSetKey( class_res, node->remoteId, jsonNewNumberObject( (double) node->count ) );
629 osrfHashIteratorFree(node_itr);
631 jsonObjectSetKey( jresponse, classname, class_res );
634 osrfHashIteratorFree(class_itr);
636 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_NODE_FULL )) {
638 osrfRouterClass* class;
639 osrfRouterNode* node;
641 jresponse = jsonParseString("{}");
643 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
644 while( (class = osrfHashIteratorNext(class_itr)) ) {
647 char* classname = class_itr->current;
649 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
650 while( (node = osrfHashIteratorNext(node_itr)) ) {
651 count += node->count;
653 osrfHashIteratorFree(node_itr);
655 jsonObjectSetKey( jresponse, classname, jsonNewNumberObject( (double) count ) );
658 osrfHashIteratorFree(class_itr);
662 return osrfRouterHandleMethodNFound( router, msg, omsg );
666 osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
667 jsonObjectFree(jresponse);
675 int osrfRouterHandleMethodNFound(
676 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
678 osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
679 osrf_message_set_status_info( err,
680 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
682 char* data = osrf_message_serialize(err);
684 transport_message* tresponse = message_init(
685 data, "", msg->thread, msg->sender, msg->recipient );
687 client_send_message(router->connection, tresponse );
690 osrf_message_free( err );
691 message_free(tresponse);
697 int osrfRouterHandleAppResponse( osrfRouter* router,
698 transport_message* msg, osrfMessage* omsg, const jsonObject* response ) {
700 if( response ) { /* send the response message */
702 osrfMessage* oresponse = osrf_message_init(
703 RESULT, omsg->thread_trace, omsg->protocol );
705 char* json = jsonObjectToJSON(response);
706 osrf_message_set_result_content( oresponse, json);
708 char* data = osrf_message_serialize(oresponse);
709 osrfLogDebug( OSRF_LOG_MARK, "Responding to client app request with data: \n%s\n", data );
711 transport_message* tresponse = message_init(
712 data, "", msg->thread, msg->sender, msg->recipient );
714 client_send_message(router->connection, tresponse );
716 osrfMessageFree(oresponse);
717 message_free(tresponse);
723 /* now send the 'request complete' message */
724 osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
725 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
727 char* statusdata = osrf_message_serialize(status);
729 transport_message* sresponse = message_init(
730 statusdata, "", msg->thread, msg->sender, msg->recipient );
731 client_send_message(router->connection, sresponse );
735 osrfMessageFree(status);
736 message_free(sresponse);