1 #include "osrf_router.h"
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
9 #define ROUTER_REQUEST_STATS_NODE_FULL "opensrf.router.info.stats.class.node.all"
10 #define ROUTER_REQUEST_STATS_CLASS_FULL "opensrf.router.info.stats.class.all"
11 #define ROUTER_REQUEST_STATS_CLASS "opensrf.router.info.stats.class"
12 #define ROUTER_REQUEST_STATS_CLASS_SUMMARY "opensrf.router.info.stats.class.summary"
14 osrfRouter* osrfNewRouter(
15 char* domain, char* name,
16 char* resource, char* password, int port,
17 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
19 if(!( domain && name && resource && password && port && trustedClients && trustedServers )) return NULL;
21 osrfRouter* router = safe_malloc(sizeof(osrfRouter));
22 router->domain = strdup(domain);
23 router->name = strdup(name);
24 router->password = strdup(password);
25 router->resource = strdup(resource);
28 router->trustedClients = trustedClients;
29 router->trustedServers = trustedServers;
32 router->classes = osrfNewHash();
33 router->classes->freeItem = &osrfRouterClassFree;
35 router->connection = client_init( domain, port, NULL, 0 );
42 int osrfRouterConnect( osrfRouter* router ) {
43 if(!router) return -1;
44 int ret = client_connect( router->connection, router->name,
45 router->password, router->resource, 10, AUTH_DIGEST );
46 if( ret == 0 ) return -1;
51 void osrfRouterRun( osrfRouter* router ) {
52 if(!(router && router->classes)) return;
54 int routerfd = router->ROUTER_SOCKFD;
60 int maxfd = __osrfRouterFillFDSet( router, &set );
63 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
64 osrfLogWarning( OSRF_LOG_MARK, "Top level select call failed with errno %d", errno);
68 /* see if there is a top level router message */
70 if( FD_ISSET(routerfd, &set) ) {
71 osrfLogDebug( OSRF_LOG_MARK, "Top router socket is active: %d", routerfd );
73 osrfRouterHandleIncoming( router );
77 /* now check each of the connected classes and see if they have data to route */
78 while( numhandled < selectret ) {
80 osrfRouterClass* class;
81 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
83 while( (class = osrfHashIteratorNext(itr)) ) {
85 char* classname = itr->current;
87 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
89 osrfLogDebug( OSRF_LOG_MARK, "Checking %s for activity...", classname );
91 int sockfd = class->ROUTER_SOCKFD;
92 if(FD_ISSET( sockfd, &set )) {
93 osrfLogDebug( OSRF_LOG_MARK, "Socket is active: %d", sockfd );
95 osrfRouterClassHandleIncoming( router, classname, class );
100 osrfHashIteratorFree(itr);
106 void osrfRouterHandleIncoming( osrfRouter* router ) {
109 transport_message* msg = NULL;
111 //if( (msg = client_recv( router->connection, 0 )) ) {
112 while( (msg = client_recv( router->connection, 0 )) ) {
116 osrfLogDebug(OSRF_LOG_MARK,
117 "osrfRouterHandleIncoming(): investigating message from %s", msg->sender);
119 /* if the sender is not a trusted server, drop the message */
120 int len = strlen(msg->sender) + 1;
123 jid_get_domain( msg->sender, domain, len - 1 );
125 if(osrfStringArrayContains( router->trustedServers, domain))
126 osrfRouterHandleMessage( router, msg );
128 osrfLogWarning( OSRF_LOG_MARK, "Received message from un-trusted server domain %s", msg->sender);
135 int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
136 if(!(router && class)) return -1;
138 transport_message* msg;
139 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleIncoming()");
141 while( (msg = client_recv( class->connection, 0 )) ) {
143 osrfLogSetXid(msg->osrf_xid);
147 osrfLogDebug(OSRF_LOG_MARK,
148 "osrfRouterClassHandleIncoming(): investigating message from %s", msg->sender);
150 /* if the client is not from a trusted domain, drop the message */
151 int len = strlen(msg->sender) + 1;
154 jid_get_domain( msg->sender, domain, len - 1 );
156 if(osrfStringArrayContains( router->trustedClients, domain)) {
158 transport_message* bouncedMessage = NULL;
159 if( msg->is_error ) {
161 /* handle bounced message */
162 if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) )
163 return -1; /* we have no one to send the requested message to */
166 msg = bouncedMessage;
168 osrfRouterClassHandleMessage( router, class, msg );
171 osrfLogWarning( OSRF_LOG_MARK, "Received client message from untrusted client domain %s", domain );
185 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
186 if(!(router && msg)) return -1;
188 if( !msg->router_command || !strcmp(msg->router_command,""))
189 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
191 if(!msg->router_class) return -1;
193 osrfRouterClass* class = NULL;
194 if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
195 class = osrfRouterFindClass( router, msg->router_class );
197 osrfLogInfo( OSRF_LOG_MARK, "Registering class %s", msg->router_class );
199 if(!class) class = osrfRouterAddClass( router, msg->router_class );
203 if( osrfRouterClassFindNode( class, msg->sender ) )
206 osrfRouterClassAddNode( class, msg->sender );
210 } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
212 if( msg->router_class && strcmp( msg->router_class, "") ) {
213 osrfLogInfo( OSRF_LOG_MARK, "Unregistering router class %s", msg->router_class );
214 osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
223 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
224 if(!(router && router->classes && classname)) return NULL;
226 osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
227 class->nodes = osrfNewHash();
228 class->itr = osrfNewHashIterator(class->nodes);
229 class->nodes->freeItem = &osrfRouterNodeFree;
230 class->router = router;
232 class->connection = client_init( router->domain, router->port, NULL, 0 );
234 if(!client_connect( class->connection, router->name,
235 router->password, classname, 10, AUTH_DIGEST ) ) {
236 osrfRouterClassFree( classname, class );
240 osrfHashSet( router->classes, class, classname );
245 int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
246 if(!(rclass && rclass->nodes && remoteId)) return -1;
248 osrfLogInfo( OSRF_LOG_MARK, "Adding router node for remote id %s", remoteId );
250 osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
252 node->lastMessage = NULL;
253 node->remoteId = strdup(remoteId);
255 osrfHashSet( rclass->nodes, node, remoteId );
259 /* copy off the lastMessage, remove the offending node, send error if it's tht last node
260 ? return NULL if it's the last node ?
263 transport_message* osrfRouterClassHandleBounce(
264 osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
266 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
268 osrfLogInfo( OSRF_LOG_MARK, "Received network layer error message from %s", msg->sender );
269 osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
270 transport_message* lastSent = NULL;
272 if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
274 if( node->lastMessage ) {
275 osrfLogWarning( OSRF_LOG_MARK, "We lost the last node in the class, responding with error and removing...");
277 transport_message* error = message_init(
278 node->lastMessage->body, node->lastMessage->subject,
279 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
280 message_set_osrf_xid(error, node->lastMessage->osrf_xid);
281 set_msg_error( error, "cancel", 501 );
283 /* send the error message back to the original sender */
284 client_send_message( rclass->connection, error );
285 message_free( error );
293 if( node->lastMessage ) {
294 osrfLogDebug( OSRF_LOG_MARK, "Cloning lastMessage so next node can send it");
295 lastSent = message_init( node->lastMessage->body,
296 node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
297 message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
298 message_set_osrf_xid( lastSent, node->lastMessage->osrf_xid );
302 osrfLogInfo(OSRF_LOG_MARK, "network error occurred after we removed the class.. ignoring");
307 /* remove the dead node */
308 osrfRouterClassRemoveNode( router, classname, msg->sender);
314 If we get a regular message, we send it to the next node in the list of nodes
315 if we get an error, it's a bounce back from a previous attempt. We take the
316 body and thread from the last sent on the node that had the bounced message
317 and propogate them on to the new message being sent
319 int osrfRouterClassHandleMessage(
320 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
321 if(!(router && rclass && msg)) return -1;
323 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
325 osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
327 osrfHashIteratorReset(rclass->itr);
328 node = osrfHashIteratorNext( rclass->itr );
333 transport_message* new_msg= message_init( msg->body,
334 msg->subject, msg->thread, node->remoteId, msg->sender );
335 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
336 message_set_osrf_xid( new_msg, msg->osrf_xid );
338 osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
339 new_msg->router_from, new_msg->recipient );
341 message_free( node->lastMessage );
342 node->lastMessage = new_msg;
344 if ( client_send_message( rclass->connection, new_msg ) == 0 )
348 message_prepare_xml(new_msg);
349 osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s",
350 new_msg->sender, new_msg->recipient, new_msg->msg_xml );
359 int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
360 if(!(router && router->classes && classname)) return -1;
361 osrfLogInfo( OSRF_LOG_MARK, "Removing router class %s", classname );
362 osrfHashRemove( router->classes, classname );
367 int osrfRouterClassRemoveNode(
368 osrfRouter* router, char* classname, char* remoteId ) {
370 if(!(router && router->classes && classname && remoteId)) return 0;
372 osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
374 osrfRouterClass* class = osrfRouterFindClass( router, classname );
378 osrfHashRemove( class->nodes, remoteId );
379 if( osrfHashGetCount(class->nodes) == 0 ) {
380 osrfRouterRemoveClass( router, classname );
391 void osrfRouterClassFree( char* classname, void* c ) {
392 if(!(classname && c)) return;
393 osrfRouterClass* rclass = (osrfRouterClass*) c;
394 client_disconnect( rclass->connection );
395 client_free( rclass->connection );
397 osrfHashIteratorReset( rclass->itr );
398 osrfRouterNode* node;
400 while( (node = osrfHashIteratorNext(rclass->itr)) )
401 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
403 osrfHashIteratorFree(rclass->itr);
404 osrfHashFree(rclass->nodes);
410 void osrfRouterNodeFree( char* remoteId, void* n ) {
412 osrfRouterNode* node = (osrfRouterNode*) n;
413 free(node->remoteId);
414 message_free(node->lastMessage);
419 void osrfRouterFree( osrfRouter* router ) {
422 free(router->domain);
424 free(router->resource);
425 free(router->password);
427 osrfStringArrayFree( router->trustedClients );
428 osrfStringArrayFree( router->trustedServers );
430 client_free( router->connection );
436 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
437 if(!( router && router->classes && classname )) return NULL;
438 return (osrfRouterClass*) osrfHashGet( router->classes, classname );
442 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
443 if(!(rclass && remoteId)) return NULL;
444 return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
448 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
449 if(!(router && router->classes && set)) return -1;
452 int maxfd = router->ROUTER_SOCKFD;
457 osrfRouterClass* class = NULL;
458 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
460 while( (class = osrfHashIteratorNext(itr)) ) {
461 char* classname = itr->current;
463 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
464 sockid = class->ROUTER_SOCKFD;
466 if( osrfUtilsCheckFileDescriptor( sockid ) ) {
468 osrfLogWarning(OSRF_LOG_MARK,
469 "Removing router class '%s' because of a bad top-level file descriptor [%d]", classname, sockid);
470 osrfRouterRemoveClass( router, classname );
473 if( sockid > maxfd ) maxfd = sockid;
479 osrfHashIteratorFree(itr);
485 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
491 int num_msgs = osrf_message_deserialize( msg->body, arr, T );
492 osrfMessage* omsg = NULL;
495 for( i = 0; i != num_msgs; i++ ) {
497 if( !(omsg = arr[i]) ) continue;
499 switch( omsg->m_type ) {
502 osrfRouterRespondConnect( router, msg, omsg );
506 osrfRouterProcessAppRequest( router, msg, omsg );
512 osrfMessageFree( omsg );
518 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
519 if(!(router && msg && omsg)) return -1;
521 osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
523 osrfLogDebug( OSRF_LOG_MARK, "router recevied a CONNECT message from %s", msg->sender );
525 osrf_message_set_status_info(
526 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
528 char* data = osrf_message_serialize(success);
530 transport_message* return_m = message_init(
531 data, "", msg->thread, msg->sender, "" );
533 client_send_message(router->connection, return_m);
536 osrf_message_free(success);
537 message_free(return_m);
544 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
546 if(!(router && msg && omsg && omsg->method_name)) return -1;
548 osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
550 jsonObject* jresponse = NULL;
551 if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
554 jresponse = jsonParseString("[]");
556 osrfStringArray* keys = osrfHashKeys( router->classes );
557 for( i = 0; i != keys->size; i++ )
558 jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
559 osrfStringArrayFree(keys);
562 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_SUMMARY )) {
564 osrfRouterClass* class;
565 osrfRouterNode* node;
568 char* classname = jsonObjectToSimpleString( jsonObjectGetIndex( omsg->_params, 0 ) );
575 class = osrfHashGet(router->classes, classname);
577 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
578 while( (node = osrfHashIteratorNext(node_itr)) ) {
579 count += node->count;
580 //jsonObjectSetKey( class_res, node->remoteId, jsonNewNumberObject( (double) node->count ) );
582 osrfHashIteratorFree(node_itr);
584 jresponse = jsonNewNumberObject( (double) count );
586 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS )) {
588 osrfRouterClass* class;
589 osrfRouterNode* node;
591 char* classname = jsonObjectToSimpleString( jsonObjectGetIndex( omsg->_params, 0 ) );
598 jresponse = jsonParseString("{}");
599 class = osrfHashGet(router->classes, classname);
601 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
602 while( (node = osrfHashIteratorNext(node_itr)) ) {
603 jsonObjectSetKey( jresponse, node->remoteId, jsonNewNumberObject( (double) node->count ) );
605 osrfHashIteratorFree(node_itr);
607 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_FULL )) {
609 osrfRouterClass* class;
610 osrfRouterNode* node;
611 jresponse = jsonParseString("{}");
613 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
614 while( (class = osrfHashIteratorNext(class_itr)) ) {
616 jsonObject* class_res = jsonParseString("{}");
617 char* classname = class_itr->current;
619 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
620 while( (node = osrfHashIteratorNext(node_itr)) ) {
621 jsonObjectSetKey( class_res, node->remoteId, jsonNewNumberObject( (double) node->count ) );
623 osrfHashIteratorFree(node_itr);
625 jsonObjectSetKey( jresponse, classname, class_res );
628 osrfHashIteratorFree(class_itr);
630 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_NODE_FULL )) {
632 osrfRouterClass* class;
633 osrfRouterNode* node;
635 jresponse = jsonParseString("{}");
637 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
638 while( (class = osrfHashIteratorNext(class_itr)) ) {
641 char* classname = class_itr->current;
643 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
644 while( (node = osrfHashIteratorNext(node_itr)) ) {
645 count += node->count;
647 osrfHashIteratorFree(node_itr);
649 jsonObjectSetKey( jresponse, classname, jsonNewNumberObject( (double) count ) );
652 osrfHashIteratorFree(class_itr);
656 return osrfRouterHandleMethodNFound( router, msg, omsg );
660 osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
661 jsonObjectFree(jresponse);
669 int osrfRouterHandleMethodNFound(
670 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
672 osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
673 osrf_message_set_status_info( err,
674 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
676 char* data = osrf_message_serialize(err);
678 transport_message* tresponse = message_init(
679 data, "", msg->thread, msg->sender, msg->recipient );
681 client_send_message(router->connection, tresponse );
684 osrf_message_free( err );
685 message_free(tresponse);
691 int osrfRouterHandleAppResponse( osrfRouter* router,
692 transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
694 if( response ) { /* send the response message */
696 osrfMessage* oresponse = osrf_message_init(
697 RESULT, omsg->thread_trace, omsg->protocol );
699 char* json = jsonObjectToJSON(response);
700 osrf_message_set_result_content( oresponse, json);
702 char* data = osrf_message_serialize(oresponse);
703 osrfLogDebug( OSRF_LOG_MARK, "Responding to client app request with data: \n%s\n", data );
705 transport_message* tresponse = message_init(
706 data, "", msg->thread, msg->sender, msg->recipient );
708 client_send_message(router->connection, tresponse );
710 osrfMessageFree(oresponse);
711 message_free(tresponse);
717 /* now send the 'request complete' message */
718 osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
719 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
721 char* statusdata = osrf_message_serialize(status);
723 transport_message* sresponse = message_init(
724 statusdata, "", msg->thread, msg->sender, msg->recipient );
725 client_send_message(router->connection, sresponse );
729 osrfMessageFree(status);
730 message_free(sresponse);