1 #include <sys/select.h>
3 #include "opensrf/utils.h"
4 #include "opensrf/log.h"
5 #include "opensrf/osrf_list.h"
6 #include "opensrf/string_array.h"
7 #include "opensrf/osrf_hash.h"
8 #include "osrf_router.h"
9 #include "opensrf/transport_client.h"
10 #include "opensrf/transport_message.h"
11 #include "opensrf/osrf_message.h"
15 @brief Implementation of osrfRouter.
17 The router opens multiple Jabber sessions for the same username and domain, one for
18 each server class. The Jabber IDs for these sessions are distinguished by the use of
19 the class names as Jabber resource names.
21 For each server class there may be multiple server nodes. Each node corresponds to a
22 listener process for a service.
26 @brief Collection of server classes, with connection parameters for Jabber.
28 struct osrfRouterStruct {
31 @brief Hash store of server classes.
33 For each entry, the key is the class name, and the corresponding datum is an
37 osrfHashIterator* class_itr; /**< For traversing the list of classes. */
38 char* domain; /**< Domain name of Jabber server. */
39 char* name; /**< Router's username for the Jabber logon. */
40 char* resource; /**< Router's resource name for the Jabber logon. */
41 char* password; /**< Router's password for the Jabber logon. */
42 int port; /**< Jabber's port number. */
43 volatile sig_atomic_t stop; /**< To be set by signal handler to interrupt main loop. */
45 /** Array of client domains that we allow to send requests through us. */
46 osrfStringArray* trustedClients;
47 /** Array of server domains that we allow to register, etc. with us. */
48 osrfStringArray* trustedServers;
49 /** List of osrfMessages to be returned from osrfMessageDeserialize() */
50 osrfList* message_list;
52 transport_client* connection;
56 @brief Maintains a set of server nodes belonging to the same class.
58 struct _osrfRouterClassStruct {
59 osrfRouter* router; /**< The osrfRouter that owns this osrfRouterClass. */
60 osrfHashIterator* itr; /**< Iterator for set of osrfRouterNodes. */
62 @brief Hash store of server nodes.
64 The key of each entry is a node name, and the associated data is an osrfRouterNode.
65 We install a callback for freeing the entries.
68 /** The transport_client used for communicating with this server. */
69 transport_client* connection;
71 typedef struct _osrfRouterClassStruct osrfRouterClass;
74 @brief Represents a link to a single server's inbound connection.
76 struct _osrfRouterNodeStruct {
77 char* remoteId; /**< Send message to me via this login. */
78 int count; /**< How many message have been sent to this node. */
79 transport_message* lastMessage;
81 typedef struct _osrfRouterNodeStruct osrfRouterNode;
83 static osrfRouterClass* osrfRouterAddClass( osrfRouter* router, const char* classname );
84 static void osrfRouterClassAddNode( osrfRouterClass* rclass, const char* remoteId );
85 static void osrfRouterHandleCommand( osrfRouter* router, const transport_message* msg );
86 static void osrfRouterClassHandleMessage( osrfRouter* router,
87 osrfRouterClass* rclass, const transport_message* msg );
88 static void osrfRouterRemoveClass( osrfRouter* router, const char* classname );
89 static void osrfRouterClassRemoveNode( osrfRouter* router, const char* classname,
90 const char* remoteId );
91 static void osrfRouterClassFree( char* classname, void* rclass );
92 static void osrfRouterNodeFree( char* remoteId, void* node );
93 static osrfRouterClass* osrfRouterFindClass( osrfRouter* router, const char* classname );
94 static osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass,
95 const char* remoteId );
96 static int _osrfRouterFillFDSet( osrfRouter* router, fd_set* set );
97 static void osrfRouterHandleIncoming( osrfRouter* router );
98 static void osrfRouterClassHandleIncoming( osrfRouter* router,
99 const char* classname, osrfRouterClass* class );
100 static transport_message* osrfRouterClassHandleBounce( osrfRouter* router,
101 const char* classname, osrfRouterClass* rclass, const transport_message* msg );
102 static void osrfRouterHandleAppRequest( osrfRouter* router, const transport_message* msg );
103 static void osrfRouterRespondConnect( osrfRouter* router, const transport_message* msg,
104 const osrfMessage* omsg );
105 static void osrfRouterProcessAppRequest( osrfRouter* router, const transport_message* msg,
106 const osrfMessage* omsg );
107 static void osrfRouterSendAppResponse( osrfRouter* router, const transport_message* msg,
108 const osrfMessage* omsg, const jsonObject* response );
109 static void osrfRouterHandleMethodNFound( osrfRouter* router,
110 const transport_message* msg, const osrfMessage* omsg );
112 #define ROUTER_REGISTER "register"
113 #define ROUTER_UNREGISTER "unregister"
115 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
116 #define ROUTER_REQUEST_STATS_NODE_FULL "opensrf.router.info.stats.class.node.all"
117 #define ROUTER_REQUEST_STATS_CLASS_FULL "opensrf.router.info.stats.class.all"
118 #define ROUTER_REQUEST_STATS_CLASS "opensrf.router.info.stats.class"
119 #define ROUTER_REQUEST_STATS_CLASS_SUMMARY "opensrf.router.info.stats.class.summary"
122 @brief Stop the otherwise endless main loop of the router.
123 @param router Pointer to the osrfRouter to be stopped.
125 To be called by a signal handler. We don't stop the loop immediately; we just set
126 a switch that the main loop checks on each iteration.
128 void router_stop( osrfRouter* router )
135 @brief Allocate and initialize a new osrfRouter.
136 @param domain Domain name of Jabber server.
137 @param name Router's username for the Jabber logon.
138 @param resource Router's resource name for the Jabber logon.
139 @param password Router's password for the Jabber logon.
140 @param port Jabber's port number.
141 @param trustedClients Array of client domains that we allow to send requests through us.
142 @param trustedServers Array of server domains that we allow to register, etc. with us.
143 @return Pointer to the newly allocated osrfRouter, or NULL upon error.
145 Don't connect to Jabber yet. We'll do that later, upon a call to osrfRouterConnect().
147 The calling code is responsible for freeing the osrfRouter by calling osrfRouterFree().
149 osrfRouter* osrfNewRouter(
150 const char* domain, const char* name,
151 const char* resource, const char* password, int port,
152 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
154 if(!( domain && name && resource && password && port && trustedClients && trustedServers ))
157 osrfRouter* router = safe_malloc(sizeof(osrfRouter));
158 router->domain = strdup(domain);
159 router->name = strdup(name);
160 router->password = strdup(password);
161 router->resource = strdup(resource);
165 router->trustedClients = trustedClients;
166 router->trustedServers = trustedServers;
169 router->classes = osrfNewHash();
170 osrfHashSetCallback(router->classes, &osrfRouterClassFree);
171 router->class_itr = osrfNewHashIterator( router->classes );
172 router->message_list = NULL; // We'll allocate one later
174 // Prepare to connect to Jabber, as a non-component, over TCP (not UNIX domain).
175 router->connection = client_init( domain, port, NULL, 0 );
181 @brief Connect to Jabber.
182 @param router Pointer to the osrfRouter to connect to Jabber.
183 @return 0 if successful, or -1 on error.
185 Allow up to 10 seconds for the logon to succeed.
187 We connect over TCP (not over a UNIX domain), as a non-component.
189 int osrfRouterConnect( osrfRouter* router ) {
190 if(!router) return -1;
191 int ret = client_connect( router->connection, router->name,
192 router->password, router->resource, 10, AUTH_DIGEST );
193 if( ret == 0 ) return -1;
198 @brief Enter endless loop to receive and respond to input.
199 @param router Pointer to the osrfRouter that's looping.
201 On each iteration: wait for incoming messages to arrive on any of our sockets -- i.e.
202 either the top level socket belonging to the router or any of the lower level sockets
203 belonging to the classes. React to the incoming activity as needed.
205 We don't exit the loop until we receive a signal to stop, or until we encounter an error.
207 void osrfRouterRun( osrfRouter* router ) {
208 if(!(router && router->classes)) return;
210 int routerfd = client_sock_fd( router->connection );
213 // Loop until a signal handler sets router->stop
214 while( ! router->stop ) {
217 int maxfd = _osrfRouterFillFDSet( router, &set );
219 // Wait indefinitely for an incoming message
220 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
221 if( EINTR == errno ) {
223 osrfLogWarning( OSRF_LOG_MARK, "Top level select call interrupted by signal" );
227 continue; // Irrelevant signal; ignore it
229 osrfLogWarning( OSRF_LOG_MARK, "Top level select call failed with errno %d: %s",
230 errno, strerror( errno ) );
235 /* see if there is a top level router message */
236 if( FD_ISSET(routerfd, &set) ) {
237 osrfLogDebug( OSRF_LOG_MARK, "Top router socket is active: %d", routerfd );
238 osrfRouterHandleIncoming( router );
241 /* Check each of the connected classes and see if they have data to route */
242 osrfRouterClass* class;
243 osrfHashIterator* itr = router->class_itr; // remove a layer of indirection
244 osrfHashIteratorReset( itr );
246 while( (class = osrfHashIteratorNext(itr)) ) { // for each class
248 const char* classname = osrfHashIteratorKey(itr);
252 osrfLogDebug( OSRF_LOG_MARK, "Checking %s for activity...", classname );
254 int sockfd = client_sock_fd( class->connection );
255 if(FD_ISSET( sockfd, &set )) {
256 osrfLogDebug( OSRF_LOG_MARK, "Socket is active: %d", sockfd );
257 osrfRouterClassHandleIncoming( router, classname, class );
266 @brief Handle incoming requests to the router.
267 @param router Pointer to the osrfRouter.
269 Read all available input messages from the top-level transport_client. For each one:
270 if the domain of the sender's Jabber id is on the list of approved domains, pass the
271 message to osrfRouterHandleCommand() (if there's a message) or osrfRouterHandleAppRequest()
272 (if there isn't). If the domain is @em not on the approved list, log a warning and
275 static void osrfRouterHandleIncoming( osrfRouter* router ) {
278 transport_message* msg = NULL;
280 while( (msg = client_recv( router->connection, 0 )) ) { // for each message
284 osrfLogDebug(OSRF_LOG_MARK,
285 "osrfRouterHandleIncoming(): investigating message from %s", msg->sender);
287 /* if the server is not on a trusted domain, drop the message */
288 int len = strlen(msg->sender) + 1;
290 jid_get_domain( msg->sender, domain, len - 1 );
292 if(osrfStringArrayContains( router->trustedServers, domain)) {
294 // If there's a command, obey it. Otherwise, treat
295 // the message as an app session level request.
296 if( msg->router_command && *msg->router_command )
297 osrfRouterHandleCommand( router, msg );
299 osrfRouterHandleAppRequest( router, msg );
302 osrfLogWarning( OSRF_LOG_MARK,
303 "Received message from un-trusted server domain %s", msg->sender);
311 @brief Handle all available incoming messages for a router class.
312 @param router Pointer to the osrfRouter.
313 @param classname Class name.
314 @param class Pointer to the osrfRouterClass.
316 For each message: if the sender is on a trusted domain, process the message.
318 static void osrfRouterClassHandleIncoming( osrfRouter* router, const char* classname,
319 osrfRouterClass* class ) {
320 if(!(router && class)) return;
322 transport_message* msg;
323 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleIncoming()");
325 // For each incoming message for this class:
326 while( (msg = client_recv( class->connection, 0 )) ) {
328 // Save the transaction id so that we can incorporate it
329 // into any relevant messages
330 osrfLogSetXid(msg->osrf_xid);
334 osrfLogDebug(OSRF_LOG_MARK,
335 "osrfRouterClassHandleIncoming(): investigating message from %s", msg->sender);
337 /* if the client is not from a trusted domain, drop the message */
338 int len = strlen(msg->sender) + 1;
340 jid_get_domain( msg->sender, domain, len - 1 );
342 if(osrfStringArrayContains( router->trustedClients, domain )) {
344 if( msg->is_error ) {
346 // A previous message bounced. Try to send a clone of it to a
347 // different node of the same class.
349 // First make a local copy of the class name. If the class gets
350 // deleted, the classname parameter becomes invalid.
351 char classname_copy[ strlen( classname ) + 1 ];
352 strcpy( classname_copy, classname );
354 transport_message* bouncedMessage = osrfRouterClassHandleBounce(
355 router, classname, class, msg );
356 /* handle bounced message */
357 if( !bouncedMessage ) {
358 /* we have no one to send the requested message to */
362 // See if the class still exists
363 if( osrfHashGet( router->classes, classname_copy ) )
364 continue; // It does; keep going
366 break; // It doesn't; don't try to read from it any more
368 osrfRouterClassHandleMessage( router, class, bouncedMessage );
369 message_free( bouncedMessage );
371 osrfRouterClassHandleMessage( router, class, msg );
374 osrfLogWarning( OSRF_LOG_MARK,
375 "Received client message from untrusted client domain %s", domain );
380 osrfLogClearXid(); // We're done with this transaction id
385 @brief Handle a top level router command.
386 @param router Pointer to the osrfRouter.
387 @param msg Pointer to the transport_message to be handled.
389 Currently supported commands:
390 - "register" -- Add a server class and/or a server node to our lists.
391 - "unregister" -- Remove a node from a class, and the class as well if no nodes are
394 static void osrfRouterHandleCommand( osrfRouter* router, const transport_message* msg ) {
395 if(!(router && msg && msg->router_class)) return;
397 if( !strcmp( msg->router_command, ROUTER_REGISTER ) ) {
399 osrfLogInfo( OSRF_LOG_MARK, "Registering class %s", msg->router_class );
401 // Add the server class to the list, if it isn't already there
402 osrfRouterClass* class = osrfRouterFindClass( router, msg->router_class );
404 class = osrfRouterAddClass( router, msg->router_class );
406 // Add the node to the osrfRouterClass's list, if it isn't already there
407 if(class && ! osrfRouterClassFindNode( class, msg->sender ) )
408 osrfRouterClassAddNode( class, msg->sender );
410 } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
412 if( msg->router_class && *msg->router_class ) {
413 osrfLogInfo( OSRF_LOG_MARK, "Unregistering router class %s", msg->router_class );
414 osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
421 @brief Add an osrfRouterClass to a router, and open a connection for it.
422 @param router Pointer to the osrfRouter.
423 @param classname The name of the class this node handles.
424 @return A pointer to the new osrfRouterClass, or NULL upon error.
426 Open a Jabber session to be used for this server class. The Jabber ID incorporates the
427 class name as the resource name.
429 static osrfRouterClass* osrfRouterAddClass( osrfRouter* router, const char* classname ) {
430 if(!(router && router->classes && classname)) return NULL;
432 osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
433 class->nodes = osrfNewHash();
434 class->itr = osrfNewHashIterator(class->nodes);
435 osrfHashSetCallback(class->nodes, &osrfRouterNodeFree);
436 class->router = router;
438 class->connection = client_init( router->domain, router->port, NULL, 0 );
440 if(!client_connect( class->connection, router->name,
441 router->password, classname, 10, AUTH_DIGEST ) ) {
442 // Cast away the constness of classname. Though ugly, this
443 // cast is benign because osrfRouterClassFree doesn't actually
444 // write through the pointer. We can't readily change its
445 // signature because it is used for a function pointer, and
446 // we would have to change other signatures the same way.
447 osrfRouterClassFree( (char *) classname, class );
451 osrfHashSet( router->classes, class, classname );
457 @brief Add a new server node to an osrfRouterClass.
458 @param rclass Pointer to the osrfRouterClass to which we are to add the node.
459 @param remoteId The remote login of the osrfRouterNode.
461 static void osrfRouterClassAddNode( osrfRouterClass* rclass, const char* remoteId ) {
462 if(!(rclass && rclass->nodes && remoteId)) return;
464 osrfLogInfo( OSRF_LOG_MARK, "Adding router node for remote id %s", remoteId );
466 osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
468 node->lastMessage = NULL;
469 node->remoteId = strdup(remoteId);
471 osrfHashSet( rclass->nodes, node, remoteId );
475 @brief Handle an input message representing a Jabber error stanza.
476 @param router Pointer to the current osrfRouter.
477 @param classname Name of the class to which the error stanza was sent.
478 @param rclass Pointer to the osrfRouterClass to which the error stanza was sent.
479 @param msg Pointer to the transport_message representing the error stanza.
480 @return Pointer to a newly allocated transport_message; or NULL (see remarks).
482 The presumption is that the relevant node is dead. If another node is available for
483 the same class, then remove the dead one, create a clone of the message to be sent
484 elsewhere, and return a pointer to it. If there is no other node for the same class,
485 send a cancel message back to the sender, remove both the node and the class it belongs
486 to, and return NULL. If we can't even do that because the entire class is dead, log
487 a message to that effect and return NULL.
489 static transport_message* osrfRouterClassHandleBounce( osrfRouter* router,
490 const char* classname, osrfRouterClass* rclass, const transport_message* msg ) {
492 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
494 osrfLogInfo( OSRF_LOG_MARK, "Received network layer error message from %s", msg->sender );
495 osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
497 osrfLogInfo( OSRF_LOG_MARK,
498 "network error occurred after we removed the class.. ignoring");
502 if( osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
504 if( node->lastMessage ) {
505 osrfLogWarning( OSRF_LOG_MARK,
506 "We lost the last node in the class, responding with error and removing...");
508 transport_message* error = message_init(
509 node->lastMessage->body, node->lastMessage->subject,
510 node->lastMessage->thread, node->lastMessage->router_from,
511 node->lastMessage->recipient );
512 message_set_osrf_xid(error, node->lastMessage->osrf_xid);
513 set_msg_error( error, "cancel", 501 );
515 /* send the error message back to the original sender */
516 client_send_message( rclass->connection, error );
517 message_free( error );
520 /* remove the dead node */
521 osrfRouterClassRemoveNode( router, classname, msg->sender);
526 transport_message* lastSent = NULL;
528 if( node->lastMessage ) {
529 osrfLogDebug( OSRF_LOG_MARK, "Cloning lastMessage so next node can send it");
530 lastSent = message_init( node->lastMessage->body,
531 node->lastMessage->subject, node->lastMessage->thread, "",
532 node->lastMessage->router_from );
533 message_set_router_info( lastSent, node->lastMessage->router_from,
534 NULL, NULL, NULL, 0 );
535 message_set_osrf_xid( lastSent, node->lastMessage->osrf_xid );
538 /* remove the dead node */
539 osrfRouterClassRemoveNode( router, classname, msg->sender);
546 @brief Forward a class-level message to a listener for the corresponding service.
547 @param router Pointer to the current osrfRouter.
548 @param rclass Pointer to the class to which the message is directed.
549 @param msg Pointer to the message to be forwarded.
551 Pick a node for the specified class, and forward the message to it.
553 We use an iterator, stored with the class, to maintain a position in the class's list
554 of nodes. Advance the iterator to pick the next node, and if we reach the end, go
555 back to the beginning of the list.
557 static void osrfRouterClassHandleMessage(
558 osrfRouter* router, osrfRouterClass* rclass, const transport_message* msg ) {
559 if(!(router && rclass && msg)) return;
561 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
563 // Pick a node, in a round-robin.
564 osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
565 if(!node) { // wrap around to the beginning of the list
566 osrfHashIteratorReset(rclass->itr);
567 node = osrfHashIteratorNext( rclass->itr );
570 if(node) { // should always be true -- no class without a node
572 // Build a transport message
573 transport_message* new_msg = message_init( msg->body,
574 msg->subject, msg->thread, node->remoteId, msg->sender );
575 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
576 message_set_osrf_xid( new_msg, msg->osrf_xid );
578 osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
579 new_msg->router_from, new_msg->recipient );
581 // Save it for possible future reference
582 message_free( node->lastMessage );
583 node->lastMessage = new_msg;
586 if ( client_send_message( rclass->connection, new_msg ) == 0 )
590 message_prepare_xml(new_msg);
591 osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s",
592 new_msg->sender, new_msg->recipient, new_msg->msg_xml );
594 // We don't free new_msg here because we saved it as node->lastMessage.
600 @brief Remove a given osrfRouterClass from an osrfRouter
601 @param router Pointer to the osrfRouter.
602 @param classname The name of the class to be removed.
604 Delete an osrfRouterClass from the router's list of classes. Indirectly (via a callback
605 function installed in the osrfHash), free the osrfRouterClass and any associated nodes.
607 static void osrfRouterRemoveClass( osrfRouter* router, const char* classname ) {
608 if( router && router->classes && classname ) {
609 osrfLogInfo( OSRF_LOG_MARK, "Removing router class %s", classname );
610 osrfHashRemove( router->classes, classname );
616 @brief Remove a node from a class. If the class thereby becomes empty, remove it as well.
617 @param router Pointer to the current osrfRouter.
618 @param classname Class name.
619 @param remoteId Identifier for the node to be removed.
621 static void osrfRouterClassRemoveNode(
622 osrfRouter* router, const char* classname, const char* remoteId ) {
624 if(!(router && router->classes && classname && remoteId)) // sanity check
627 osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
629 osrfRouterClass* class = osrfRouterFindClass( router, classname );
631 osrfHashRemove( class->nodes, remoteId );
632 if( osrfHashGetCount(class->nodes) == 0 ) {
633 osrfRouterRemoveClass( router, classname );
640 @brief Free a router class object.
641 @param classname Class name (not used).
642 @param c Pointer to the osrfRouterClass, cast to a void pointer.
644 This function is invoked as a callback when we remove an osrfRouterClass from the
645 router's list of classes.
647 static void osrfRouterClassFree( char* classname, void* c ) {
650 osrfRouterClass* rclass = (osrfRouterClass*) c;
651 client_disconnect( rclass->connection );
652 client_free( rclass->connection );
654 osrfHashIteratorReset( rclass->itr );
655 osrfRouterNode* node;
657 while( (node = osrfHashIteratorNext(rclass->itr)) )
658 osrfHashRemove( rclass->nodes, node->remoteId );
660 osrfHashIteratorFree(rclass->itr);
661 osrfHashFree(rclass->nodes);
667 @brief Free an osrfRouterNode.
668 @param remoteId Jabber ID of node (not used).
669 @param n Pointer to the osrfRouterNode to be freed, cast to a void pointer.
671 This is a callback installed in an osrfHash (the nodes member of an osrfRouterClass).
673 static void osrfRouterNodeFree( char* remoteId, void* n ) {
675 osrfRouterNode* node = (osrfRouterNode*) n;
676 free(node->remoteId);
677 message_free(node->lastMessage);
683 @brief Free an osrfRouter and everything it owns.
684 @param router Pointer to the osrfRouter to be freed.
686 The osrfRouterClasses and osrfRouterNodes are freed by callback functions installed in
689 void osrfRouterFree( osrfRouter* router ) {
692 osrfHashIteratorFree( router->class_itr);
693 osrfHashFree(router->classes);
694 free(router->domain);
696 free(router->resource);
697 free(router->password);
699 osrfStringArrayFree( router->trustedClients );
700 osrfStringArrayFree( router->trustedServers );
701 osrfListFree( router->message_list );
703 client_free( router->connection );
709 @brief Given a class name, find the corresponding osrfRouterClass.
710 @param router Pointer to the osrfRouter that owns the osrfRouterClass.
711 @param classname Name of the class.
712 @return Pointer to a matching osrfRouterClass if found, or NULL if not.
714 static osrfRouterClass* osrfRouterFindClass( osrfRouter* router, const char* classname ) {
715 if(!( router && router->classes && classname )) return NULL;
716 return (osrfRouterClass*) osrfHashGet( router->classes, classname );
721 @brief Find a given node for a given class.
722 @param rclass Pointer to the osrfRouterClass in which to search.
723 @param remoteId Jabber ID of the node for which to search.
724 @return Pointer to the matching osrfRouterNode, if found; otherwise NULL.
726 static osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass,
727 const char* remoteId ) {
728 if(!(rclass && remoteId)) return NULL;
729 return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
734 @brief Fill an fd_set with all the sockets owned by the osrfRouter.
735 @param router Pointer to the osrfRouter whose sockets are to be used.
736 @param set Pointer to the fd_set that is to be filled.
737 @return The largest file descriptor loaded into the fd_set; or -1 upon error.
739 There's one socket for the osrfRouter as a whole, and one for each osrfRouterClass
740 that belongs to it. We load them all.
742 static int _osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
743 if(!(router && router->classes && set)) return -1;
746 int maxfd = client_sock_fd( router->connection );
751 osrfRouterClass* class = NULL;
752 osrfHashIterator* itr = router->class_itr;
753 osrfHashIteratorReset( itr );
755 while( (class = osrfHashIteratorNext(itr)) ) {
756 const char* classname = osrfHashIteratorKey(itr);
758 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
759 sockid = client_sock_fd( class->connection );
761 if( osrfUtilsCheckFileDescriptor( sockid ) ) {
763 osrfLogWarning(OSRF_LOG_MARK,
764 "Removing router class '%s' because of a bad top-level file descriptor [%d]",
766 osrfRouterRemoveClass( router, classname );
769 if( sockid > maxfd ) maxfd = sockid;
779 @brief Handler a router-level message that isn't a command; presumed to be an app request.
780 @param router Pointer to the current osrfRouter.
781 @param msg Pointer to the incoming message.
783 The body of the transport_message is a JSON string, specifically an JSON array.
784 Translate the JSON into a series of osrfMessages, one for each element of the JSON
785 array. Process each osrfMessage in turn. Each message is either a CONNECT or a
788 static void osrfRouterHandleAppRequest( osrfRouter* router, const transport_message* msg ) {
790 // Translate the JSON into a list of osrfMessages
791 router->message_list = osrfMessageDeserialize( msg->body, router->message_list );
792 osrfMessage* omsg = NULL;
794 // Process each osrfMessage
796 for( i = 0; i < router->message_list->size; ++i ) {
798 omsg = osrfListGetIndex( router->message_list, i );
801 switch( omsg->m_type ) {
804 osrfRouterRespondConnect( router, msg, omsg );
808 osrfRouterProcessAppRequest( router, msg, omsg );
815 osrfMessageFree( omsg );
823 @brief Respond to a CONNECT message.
824 @param router Pointer to the current osrfRouter.
825 @param msg Pointer to the transport_message that the osrfMessage came from.
826 @param omsg Pointer to the osrfMessage to be processed.
828 An application is trying to connect to the router. Reply with a STATUS message
831 "CONNECT" is a bit of a misnomer. We don't establish a stateful session; i.e. we
832 don't retain any memory of this message. We just confirm that the router is alive,
833 and that the client has a good address for it.
835 static void osrfRouterRespondConnect( osrfRouter* router, const transport_message* msg,
836 const osrfMessage* omsg ) {
837 if(!(router && msg && omsg))
840 osrfLogDebug( OSRF_LOG_MARK, "router received a CONNECT message from %s", msg->sender );
842 // Build a success message
843 osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
844 osrf_message_set_status_info(
845 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
847 // Translate the success message into JSON,
848 // and then package the JSON in a transport message
849 char* data = osrf_message_serialize(success);
850 osrfMessageFree( success );
851 transport_message* return_msg = message_init(
852 data, // message payload, translated from osrfMessage
854 msg->thread, // same thread
855 msg->sender, // destination (client's Jabber ID)
856 "" // don't send our address; client already has it
860 client_send_message( router->connection, return_msg );
861 message_free( return_msg );
866 @brief Respond to a REQUEST message.
867 @param router Pointer to the current osrfRouter.
868 @param msg Pointer to the transport_message that the osrfMessage came from.
869 @param omsg Pointer to the osrfMessage to be processed.
871 Respond to an information request from an application. Most types of request involve
872 one or more counts of messages successfully routed.
874 Request types currently supported:
875 - "opensrf.router.info.class.list" -- list of class names.
876 - "opensrf.router.info.stats.class.summary" -- total count for a specified class.
877 - "opensrf.router.info.stats.class" -- count for every node of a specified class.
878 - "opensrf.router.info.stats.class.all" -- count for every node of every class.
879 - "opensrf.router.info.stats.class.node.all" -- total count for every class.
881 static void osrfRouterProcessAppRequest( osrfRouter* router, const transport_message* msg,
882 const osrfMessage* omsg ) {
884 if(!(router && msg && omsg && omsg->method_name))
887 osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
889 // Branch on the request type. Build a jsonObject as an answer to the request.
890 jsonObject* jresponse = NULL;
891 if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
893 // Prepare an array of class names.
895 jresponse = jsonNewObjectType(JSON_ARRAY);
897 osrfStringArray* keys = osrfHashKeys( router->classes );
898 for( i = 0; i != keys->size; i++ )
899 jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
900 osrfStringArrayFree(keys);
902 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_SUMMARY )) {
904 // Prepare a count of all the messages successfully routed for a given class.
907 // class name is the first parameter
908 const char* classname = jsonObjectGetString( jsonObjectGetIndex( omsg->_params, 0 ) );
912 osrfRouterClass* class = osrfHashGet(router->classes, classname);
914 // For each node: add the count to the total.
915 osrfRouterNode* node;
916 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
917 while( (node = osrfHashIteratorNext(node_itr)) ) {
918 count += node->count;
919 // jsonObjectSetKey( class_res, node->remoteId,
920 // jsonNewNumberObject( (double) node->count ) );
922 osrfHashIteratorFree(node_itr);
924 jresponse = jsonNewNumberObject( (double) count );
926 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS )) {
928 // Prepare a hash for a given class. Key: the remoteId of a node. Datum: the
929 // number of messages successfully routed for that node.
931 // class name is the first parameter
932 const char* classname = jsonObjectGetString( jsonObjectGetIndex( omsg->_params, 0 ) );
936 jresponse = jsonNewObjectType(JSON_HASH);
937 osrfRouterClass* class = osrfHashGet(router->classes, classname);
939 // For each node: get the count and store it in the hash.
940 osrfRouterNode* node;
941 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
942 while( (node = osrfHashIteratorNext(node_itr)) ) {
943 jsonObjectSetKey( jresponse, node->remoteId,
944 jsonNewNumberObject( (double) node->count ) );
946 osrfHashIteratorFree(node_itr);
948 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_FULL )) {
950 // Prepare a hash of hashes, giving the message counts for each node for each class.
952 osrfRouterClass* class;
953 osrfRouterNode* node;
954 jresponse = jsonNewObjectType(JSON_HASH); // Key: class name.
956 // Traverse the list of classes.
957 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
958 while( (class = osrfHashIteratorNext(class_itr)) ) {
960 jsonObject* class_res = jsonNewObjectType(JSON_HASH); // Key: remoteId of node.
961 const char* classname = osrfHashIteratorKey(class_itr);
963 // Traverse the list of nodes for the current class.
964 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
965 while( (node = osrfHashIteratorNext(node_itr)) ) {
966 jsonObjectSetKey( class_res, node->remoteId,
967 jsonNewNumberObject( (double) node->count ) );
969 osrfHashIteratorFree(node_itr);
971 jsonObjectSetKey( jresponse, classname, class_res );
974 osrfHashIteratorFree(class_itr);
976 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_NODE_FULL )) {
978 // Prepare a hash. Key: class name. Datum: total number of successfully routed
979 // messages routed for nodes of that class.
981 osrfRouterClass* class;
982 osrfRouterNode* node;
983 jresponse = jsonNewObjectType(JSON_HASH);
985 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
986 while( (class = osrfHashIteratorNext(class_itr)) ) { // For each class
989 const char* classname = osrfHashIteratorKey(class_itr);
991 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
992 while( (node = osrfHashIteratorNext(node_itr)) ) { // For each node
993 count += node->count;
995 osrfHashIteratorFree(node_itr);
997 jsonObjectSetKey( jresponse, classname, jsonNewNumberObject( (double) count ) );
1000 osrfHashIteratorFree(class_itr);
1002 } else { // None of the above
1004 osrfRouterHandleMethodNFound( router, msg, omsg );
1008 // Send the result back to the requester.
1009 osrfRouterSendAppResponse( router, msg, omsg, jresponse );
1010 jsonObjectFree(jresponse);
1015 @brief Respond to an invalid REQUEST message.
1016 @param router Pointer to the current osrfRouter.
1017 @param msg Pointer to the transport_message that contained the REQUEST message.
1018 @param omsg Pointer to the osrfMessage that contained the REQUEST.
1020 static void osrfRouterHandleMethodNFound( osrfRouter* router,
1021 const transport_message* msg, const osrfMessage* omsg ) {
1023 // Create an exception message
1024 osrfMessage* err = osrf_message_init( STATUS, omsg->thread_trace, 1 );
1025 osrf_message_set_status_info( err,
1026 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
1028 // Translate it into JSON
1029 char* data = osrf_message_serialize(err);
1030 osrfMessageFree( err );
1032 // Wrap the JSON up in a transport_message
1033 transport_message* tresponse = message_init(
1034 data, "", msg->thread, msg->sender, msg->recipient );
1038 client_send_message( router->connection, tresponse );
1039 message_free( tresponse );
1044 @brief Send a response to a router REQUEST message.
1045 @param router Pointer to the current osrfRouter.
1046 @param msg Pointer to the transport_message that contained the REQUEST message.
1047 @param omsg Pointer to the osrfMessage that contained the REQUEST.
1048 @param response Pointer to the jsonObject that constitutes the response.
1050 static void osrfRouterSendAppResponse( osrfRouter* router, const transport_message* msg,
1051 const osrfMessage* omsg, const jsonObject* response ) {
1053 if( response ) { /* send the response message */
1055 // Turn the jsonObject into JSON, and load it into an osrfMessage
1056 osrfMessage* oresponse = osrf_message_init(
1057 RESULT, omsg->thread_trace, omsg->protocol );
1059 char* json = jsonObjectToJSON(response);
1060 osrf_message_set_result_content( oresponse, json );
1063 // Package the osrfMessage into a transport_message, and send it
1064 char* data = osrf_message_serialize(oresponse);
1065 osrfMessageFree(oresponse);
1066 osrfLogDebug( OSRF_LOG_MARK, "Responding to client app request with data: \n%s\n", data );
1068 transport_message* tresponse = message_init(
1069 data, "", msg->thread, msg->sender, msg->recipient );
1072 client_send_message(router->connection, tresponse );
1073 message_free(tresponse);
1076 /* now send the 'request complete' message */
1077 osrfMessage* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
1078 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete",
1079 OSRF_STATUS_COMPLETE );
1080 char* statusdata = osrf_message_serialize(status);
1081 osrfMessageFree(status);
1083 transport_message* sresponse = message_init(
1084 statusdata, "", msg->thread, msg->sender, msg->recipient );
1087 client_send_message(router->connection, sresponse );
1088 message_free(sresponse);