1 #include "osrf_router.h"
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
9 #define ROUTER_REQUEST_FULL_STATS "opensrf.router.info.stats.class.node.all"
10 #define ROUTER_REQUEST_CLASS_STATS "opensrf.router.info.stats.class.all"
12 osrfRouter* osrfNewRouter(
13 char* domain, char* name,
14 char* resource, char* password, int port,
15 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
17 if(!( domain && name && resource && password && port && trustedClients && trustedServers )) return NULL;
19 osrfRouter* router = safe_malloc(sizeof(osrfRouter));
20 router->domain = strdup(domain);
21 router->name = strdup(name);
22 router->password = strdup(password);
23 router->resource = strdup(resource);
26 router->trustedClients = trustedClients;
27 router->trustedServers = trustedServers;
30 router->classes = osrfNewHash();
31 router->classes->freeItem = &osrfRouterClassFree;
33 router->connection = client_init( domain, port, NULL, 0 );
40 int osrfRouterConnect( osrfRouter* router ) {
41 if(!router) return -1;
42 int ret = client_connect( router->connection, router->name,
43 router->password, router->resource, 10, AUTH_DIGEST );
44 if( ret == 0 ) return -1;
49 void osrfRouterRun( osrfRouter* router ) {
50 if(!(router && router->classes)) return;
52 int routerfd = router->ROUTER_SOCKFD;
58 int maxfd = __osrfRouterFillFDSet( router, &set );
61 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
62 osrfLogWarning( OSRF_LOG_MARK, "Top level select call failed with errno %d", errno);
66 /* see if there is a top level router message */
68 if( FD_ISSET(routerfd, &set) ) {
69 osrfLogDebug( OSRF_LOG_MARK, "Top router socket is active: %d", routerfd );
71 osrfRouterHandleIncoming( router );
75 /* now check each of the connected classes and see if they have data to route */
76 while( numhandled < selectret ) {
78 osrfRouterClass* class;
79 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
81 while( (class = osrfHashIteratorNext(itr)) ) {
83 char* classname = itr->current;
85 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
87 osrfLogDebug( OSRF_LOG_MARK, "Checking %s for activity...", classname );
89 int sockfd = class->ROUTER_SOCKFD;
90 if(FD_ISSET( sockfd, &set )) {
91 osrfLogDebug( OSRF_LOG_MARK, "Socket is active: %d", sockfd );
93 osrfRouterClassHandleIncoming( router, classname, class );
98 osrfHashIteratorFree(itr);
104 void osrfRouterHandleIncoming( osrfRouter* router ) {
107 transport_message* msg = NULL;
109 //if( (msg = client_recv( router->connection, 0 )) ) {
110 while( (msg = client_recv( router->connection, 0 )) ) {
114 osrfLogDebug(OSRF_LOG_MARK,
115 "osrfRouterHandleIncoming(): investigating message from %s", msg->sender);
117 /* if the sender is not a trusted server, drop the message */
118 int len = strlen(msg->sender) + 1;
121 jid_get_domain( msg->sender, domain, len - 1 );
123 if(osrfStringArrayContains( router->trustedServers, domain))
124 osrfRouterHandleMessage( router, msg );
126 osrfLogWarning( OSRF_LOG_MARK, "Received message from un-trusted server domain %s", msg->sender);
133 int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
134 if(!(router && class)) return -1;
136 transport_message* msg;
137 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleIncoming()");
139 while( (msg = client_recv( class->connection, 0 )) ) {
141 osrfLogSetXid(msg->osrf_xid);
145 osrfLogDebug(OSRF_LOG_MARK,
146 "osrfRouterClassHandleIncoming(): investigating message from %s", msg->sender);
148 /* if the client is not from a trusted domain, drop the message */
149 int len = strlen(msg->sender) + 1;
152 jid_get_domain( msg->sender, domain, len - 1 );
154 if(osrfStringArrayContains( router->trustedClients, domain)) {
156 transport_message* bouncedMessage = NULL;
157 if( msg->is_error ) {
159 /* handle bounced message */
160 if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) )
161 return -1; /* we have no one to send the requested message to */
164 msg = bouncedMessage;
166 osrfRouterClassHandleMessage( router, class, msg );
169 osrfLogWarning( OSRF_LOG_MARK, "Received client message from untrusted client domain %s", domain );
183 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
184 if(!(router && msg)) return -1;
186 if( !msg->router_command || !strcmp(msg->router_command,""))
187 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
189 if(!msg->router_class) return -1;
191 osrfRouterClass* class = NULL;
192 if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
193 class = osrfRouterFindClass( router, msg->router_class );
195 osrfLogInfo( OSRF_LOG_MARK, "Registering class %s", msg->router_class );
197 if(!class) class = osrfRouterAddClass( router, msg->router_class );
201 if( osrfRouterClassFindNode( class, msg->sender ) )
204 osrfRouterClassAddNode( class, msg->sender );
208 } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
210 if( msg->router_class && strcmp( msg->router_class, "") ) {
211 osrfLogInfo( OSRF_LOG_MARK, "Unregistering router class %s", msg->router_class );
212 osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
221 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
222 if(!(router && router->classes && classname)) return NULL;
224 osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
225 class->nodes = osrfNewHash();
226 class->itr = osrfNewHashIterator(class->nodes);
227 class->nodes->freeItem = &osrfRouterNodeFree;
228 class->router = router;
230 class->connection = client_init( router->domain, router->port, NULL, 0 );
232 if(!client_connect( class->connection, router->name,
233 router->password, classname, 10, AUTH_DIGEST ) ) {
234 osrfRouterClassFree( classname, class );
238 osrfHashSet( router->classes, class, classname );
243 int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
244 if(!(rclass && rclass->nodes && remoteId)) return -1;
246 osrfLogInfo( OSRF_LOG_MARK, "Adding router node for remote id %s", remoteId );
248 osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
250 node->lastMessage = NULL;
251 node->remoteId = strdup(remoteId);
253 osrfHashSet( rclass->nodes, node, remoteId );
257 /* copy off the lastMessage, remove the offending node, send error if it's tht last node
258 ? return NULL if it's the last node ?
261 transport_message* osrfRouterClassHandleBounce(
262 osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
264 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
266 osrfLogInfo( OSRF_LOG_MARK, "Received network layer error message from %s", msg->sender );
267 osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
268 transport_message* lastSent = NULL;
270 if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
272 if( node->lastMessage ) {
273 osrfLogWarning( OSRF_LOG_MARK, "We lost the last node in the class, responding with error and removing...");
275 transport_message* error = message_init(
276 node->lastMessage->body, node->lastMessage->subject,
277 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
278 message_set_osrf_xid(error, node->lastMessage->osrf_xid);
279 set_msg_error( error, "cancel", 501 );
281 /* send the error message back to the original sender */
282 client_send_message( rclass->connection, error );
283 message_free( error );
291 if( node->lastMessage ) {
292 osrfLogDebug( OSRF_LOG_MARK, "Cloning lastMessage so next node can send it");
293 lastSent = message_init( node->lastMessage->body,
294 node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
295 message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
296 message_set_osrf_xid( lastSent, node->lastMessage->osrf_xid );
300 osrfLogInfo(OSRF_LOG_MARK, "network error occurred after we removed the class.. ignoring");
305 /* remove the dead node */
306 osrfRouterClassRemoveNode( router, classname, msg->sender);
312 If we get a regular message, we send it to the next node in the list of nodes
313 if we get an error, it's a bounce back from a previous attempt. We take the
314 body and thread from the last sent on the node that had the bounced message
315 and propogate them on to the new message being sent
317 int osrfRouterClassHandleMessage(
318 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
319 if(!(router && rclass && msg)) return -1;
321 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
323 osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
325 osrfHashIteratorReset(rclass->itr);
326 node = osrfHashIteratorNext( rclass->itr );
331 transport_message* new_msg= message_init( msg->body,
332 msg->subject, msg->thread, node->remoteId, msg->sender );
333 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
334 message_set_osrf_xid( new_msg, msg->osrf_xid );
336 osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
337 new_msg->router_from, new_msg->recipient );
339 message_free( node->lastMessage );
340 node->lastMessage = new_msg;
342 if ( client_send_message( rclass->connection, new_msg ) == 0 )
346 message_prepare_xml(new_msg);
347 osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s",
348 new_msg->sender, new_msg->recipient, new_msg->msg_xml );
357 int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
358 if(!(router && router->classes && classname)) return -1;
359 osrfLogInfo( OSRF_LOG_MARK, "Removing router class %s", classname );
360 osrfHashRemove( router->classes, classname );
365 int osrfRouterClassRemoveNode(
366 osrfRouter* router, char* classname, char* remoteId ) {
368 if(!(router && router->classes && classname && remoteId)) return 0;
370 osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
372 osrfRouterClass* class = osrfRouterFindClass( router, classname );
376 osrfHashRemove( class->nodes, remoteId );
377 if( osrfHashGetCount(class->nodes) == 0 ) {
378 osrfRouterRemoveClass( router, classname );
389 void osrfRouterClassFree( char* classname, void* c ) {
390 if(!(classname && c)) return;
391 osrfRouterClass* rclass = (osrfRouterClass*) c;
392 client_disconnect( rclass->connection );
393 client_free( rclass->connection );
395 osrfHashIteratorReset( rclass->itr );
396 osrfRouterNode* node;
398 while( (node = osrfHashIteratorNext(rclass->itr)) )
399 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
401 osrfHashIteratorFree(rclass->itr);
402 osrfHashFree(rclass->nodes);
408 void osrfRouterNodeFree( char* remoteId, void* n ) {
410 osrfRouterNode* node = (osrfRouterNode*) n;
411 free(node->remoteId);
412 message_free(node->lastMessage);
417 void osrfRouterFree( osrfRouter* router ) {
420 free(router->domain);
422 free(router->resource);
423 free(router->password);
425 osrfStringArrayFree( router->trustedClients );
426 osrfStringArrayFree( router->trustedServers );
428 client_free( router->connection );
434 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
435 if(!( router && router->classes && classname )) return NULL;
436 return (osrfRouterClass*) osrfHashGet( router->classes, classname );
440 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
441 if(!(rclass && remoteId)) return NULL;
442 return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
446 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
447 if(!(router && router->classes && set)) return -1;
450 int maxfd = router->ROUTER_SOCKFD;
455 osrfRouterClass* class = NULL;
456 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
458 while( (class = osrfHashIteratorNext(itr)) ) {
459 char* classname = itr->current;
461 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
462 sockid = class->ROUTER_SOCKFD;
464 if( osrfUtilsCheckFileDescriptor( sockid ) ) {
466 osrfLogWarning(OSRF_LOG_MARK,
467 "Removing router class '%s' because of a bad top-level file descriptor [%d]", classname, sockid);
468 osrfRouterRemoveClass( router, classname );
471 if( sockid > maxfd ) maxfd = sockid;
477 osrfHashIteratorFree(itr);
483 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
489 int num_msgs = osrf_message_deserialize( msg->body, arr, T );
490 osrfMessage* omsg = NULL;
493 for( i = 0; i != num_msgs; i++ ) {
495 if( !(omsg = arr[i]) ) continue;
497 switch( omsg->m_type ) {
500 osrfRouterRespondConnect( router, msg, omsg );
504 osrfRouterProcessAppRequest( router, msg, omsg );
510 osrfMessageFree( omsg );
516 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
517 if(!(router && msg && omsg)) return -1;
519 osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
521 osrfLogDebug( OSRF_LOG_MARK, "router recevied a CONNECT message from %s", msg->sender );
523 osrf_message_set_status_info(
524 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
526 char* data = osrf_message_serialize(success);
528 transport_message* return_m = message_init(
529 data, "", msg->thread, msg->sender, "" );
531 client_send_message(router->connection, return_m);
534 osrf_message_free(success);
535 message_free(return_m);
542 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
544 if(!(router && msg && omsg && omsg->method_name)) return -1;
546 osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
548 jsonObject* jresponse = NULL;
549 if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
552 jresponse = jsonParseString("[]");
554 osrfStringArray* keys = osrfHashKeys( router->classes );
555 for( i = 0; i != keys->size; i++ )
556 jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
557 osrfStringArrayFree(keys);
560 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_FULL_STATS )) {
562 osrfRouterClass* class;
563 osrfRouterNode* node;
564 jresponse = jsonParseString("{}");
566 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
567 while( (class = osrfHashIteratorNext(class_itr)) ) {
569 jsonObject* class_res = jsonParseString("{}");
570 char* classname = class_itr->current;
572 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
573 while( (node = osrfHashIteratorNext(node_itr)) ) {
574 jsonObjectSetKey( class_res, node->remoteId, jsonNewNumberObject( (double) node->count ) );
576 osrfHashIteratorFree(node_itr);
578 jsonObjectSetKey( jresponse, classname, class_res );
581 osrfHashIteratorFree(class_itr);
583 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_STATS )) {
585 osrfRouterClass* class;
586 osrfRouterNode* node;
588 jresponse = jsonParseString("{}");
590 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
591 while( (class = osrfHashIteratorNext(class_itr)) ) {
594 char* classname = class_itr->current;
596 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
597 while( (node = osrfHashIteratorNext(node_itr)) ) {
598 count += node->count;
600 osrfHashIteratorFree(node_itr);
602 jsonObjectSetKey( jresponse, classname, jsonNewNumberObject( (double) count ) );
605 osrfHashIteratorFree(class_itr);
609 return osrfRouterHandleMethodNFound( router, msg, omsg );
613 osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
614 jsonObjectFree(jresponse);
622 int osrfRouterHandleMethodNFound(
623 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
625 osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
626 osrf_message_set_status_info( err,
627 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
629 char* data = osrf_message_serialize(err);
631 transport_message* tresponse = message_init(
632 data, "", msg->thread, msg->sender, msg->recipient );
634 client_send_message(router->connection, tresponse );
637 osrf_message_free( err );
638 message_free(tresponse);
644 int osrfRouterHandleAppResponse( osrfRouter* router,
645 transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
647 if( response ) { /* send the response message */
649 osrfMessage* oresponse = osrf_message_init(
650 RESULT, omsg->thread_trace, omsg->protocol );
652 char* json = jsonObjectToJSON(response);
653 osrf_message_set_result_content( oresponse, json);
655 char* data = osrf_message_serialize(oresponse);
656 osrfLogDebug( OSRF_LOG_MARK, "Responding to client app request with data: \n%s\n", data );
658 transport_message* tresponse = message_init(
659 data, "", msg->thread, msg->sender, msg->recipient );
661 client_send_message(router->connection, tresponse );
663 osrfMessageFree(oresponse);
664 message_free(tresponse);
670 /* now send the 'request complete' message */
671 osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
672 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
674 char* statusdata = osrf_message_serialize(status);
676 transport_message* sresponse = message_init(
677 statusdata, "", msg->thread, msg->sender, msg->recipient );
678 client_send_message(router->connection, sresponse );
682 osrfMessageFree(status);
683 message_free(sresponse);