1 #include <sys/select.h>
3 #include "opensrf/utils.h"
4 #include "opensrf/log.h"
5 #include "opensrf/osrf_list.h"
6 #include "opensrf/string_array.h"
7 #include "opensrf/osrf_hash.h"
8 #include "osrf_router.h"
9 #include "opensrf/transport_client.h"
10 #include "opensrf/transport_message.h"
11 #include "opensrf/osrf_message.h"
15 @brief Implementation of osrfRouter.
17 The router opens multiple Jabber sessions for the same username and domain, one for
18 each server class. The Jabber IDs for these sessions are distinguished by the use of
19 the class names as Jabber resource names.
21 For each server class there may be multiple server nodes. Each node corresponds to a
22 listener process for a service.
26 @brief Collection of server classes, with connection parameters for Jabber.
28 struct osrfRouterStruct {
31 @brief Hash store of server classes.
33 For each entry, the key is the class name, and the corresponding datum is an
37 osrfHashIterator* class_itr; /**< For traversing the list of classes. */
38 char* domain; /**< Domain name of Jabber server. */
39 char* name; /**< Router's username for the Jabber logon. */
40 char* resource; /**< Router's resource name for the Jabber logon. */
41 char* password; /**< Router's password for the Jabber logon. */
42 int port; /**< Jabber's port number. */
43 volatile sig_atomic_t stop; /**< To be set by signal handler to interrupt main loop. */
45 /** Array of client domains that we allow to send requests through us. */
46 osrfStringArray* trustedClients;
47 /** Array of server domains that we allow to register, etc. with us. */
48 osrfStringArray* trustedServers;
49 /** List of osrfMessages to be returned from osrfMessageDeserialize() */
50 osrfList* message_list;
52 transport_client* connection;
56 @brief Maintains a set of server nodes belonging to the same class.
58 struct _osrfRouterClassStruct {
59 osrfRouter* router; /**< The osrfRouter that owns this osrfRouterClass. */
60 osrfHashIterator* itr; /**< Iterator for set of osrfRouterNodes. */
62 @brief Hash store of server nodes.
64 The key of each entry is a node name, and the associated data is an osrfRouterNode.
65 We install a callback for freeing the entries.
68 /** The transport_client used for communicating with this server. */
69 transport_client* connection;
71 typedef struct _osrfRouterClassStruct osrfRouterClass;
74 @brief Represents a link to a single server's inbound connection.
76 struct _osrfRouterNodeStruct {
77 char* remoteId; /**< Send message to me via this login. */
78 int count; /**< How many message have been sent to this node. */
79 transport_message* lastMessage;
81 typedef struct _osrfRouterNodeStruct osrfRouterNode;
83 static osrfRouterClass* osrfRouterAddClass( osrfRouter* router, const char* classname );
84 static void osrfRouterClassAddNode( osrfRouterClass* rclass, const char* remoteId );
85 static void osrfRouterHandleCommand( osrfRouter* router, const transport_message* msg );
86 static void osrfRouterClassHandleMessage( osrfRouter* router,
87 osrfRouterClass* rclass, const transport_message* msg );
88 static void osrfRouterRemoveClass( osrfRouter* router, const char* classname );
89 static void osrfRouterClassRemoveNode( osrfRouter* router, const char* classname,
90 const char* remoteId );
91 static void osrfRouterClassFree( char* classname, void* rclass );
92 static void osrfRouterNodeFree( char* remoteId, void* node );
93 static osrfRouterClass* osrfRouterFindClass( osrfRouter* router, const char* classname );
94 static osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass,
95 const char* remoteId );
96 static int _osrfRouterFillFDSet( osrfRouter* router, fd_set* set );
97 static void osrfRouterHandleIncoming( osrfRouter* router );
98 static void osrfRouterClassHandleIncoming( osrfRouter* router,
99 const char* classname, osrfRouterClass* class );
100 static transport_message* osrfRouterClassHandleBounce( osrfRouter* router,
101 const char* classname, osrfRouterClass* rclass, const transport_message* msg );
102 static void osrfRouterHandleAppRequest( osrfRouter* router, const transport_message* msg );
103 static void osrfRouterRespondConnect( osrfRouter* router, const transport_message* msg,
104 const osrfMessage* omsg );
105 static void osrfRouterProcessAppRequest( osrfRouter* router, const transport_message* msg,
106 const osrfMessage* omsg );
107 static void osrfRouterSendAppResponse( osrfRouter* router, const transport_message* msg,
108 const osrfMessage* omsg, const jsonObject* response );
109 static void osrfRouterHandleMethodNFound( osrfRouter* router,
110 const transport_message* msg, const osrfMessage* omsg );
112 #define ROUTER_REGISTER "register"
113 #define ROUTER_UNREGISTER "unregister"
115 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
116 #define ROUTER_REQUEST_STATS_NODE_FULL "opensrf.router.info.stats.class.node.all"
117 #define ROUTER_REQUEST_STATS_CLASS_FULL "opensrf.router.info.stats.class.all"
118 #define ROUTER_REQUEST_STATS_CLASS "opensrf.router.info.stats.class"
119 #define ROUTER_REQUEST_STATS_CLASS_SUMMARY "opensrf.router.info.stats.class.summary"
122 @brief Stop the otherwise endless main loop of the router.
123 @param router Pointer to the osrfRouter to be stopped.
125 To be called by a signal handler. We don't stop the loop immediately; we just set
126 a switch that the main loop checks on each iteration.
128 void router_stop( osrfRouter* router )
135 @brief Allocate and initialize a new osrfRouter.
136 @param domain Domain name of Jabber server.
137 @param name Router's username for the Jabber logon.
138 @param resource Router's resource name for the Jabber logon.
139 @param password Router's password for the Jabber logon.
140 @param port Jabber's port number.
141 @param trustedClients Array of client domains that we allow to send requests through us.
142 @param trustedServers Array of server domains that we allow to register, etc. with us.
143 @return Pointer to the newly allocated osrfRouter, or NULL upon error.
145 Don't connect to Jabber yet. We'll do that later, upon a call to osrfRouterConnect().
147 The calling code is responsible for freeing the osrfRouter by calling osrfRouterFree().
149 osrfRouter* osrfNewRouter(
150 const char* domain, const char* name,
151 const char* resource, const char* password, int port,
152 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
154 if(!( domain && name && resource && password && port && trustedClients && trustedServers ))
157 osrfRouter* router = safe_malloc(sizeof(osrfRouter));
158 router->domain = strdup(domain);
159 router->name = strdup(name);
160 router->password = strdup(password);
161 router->resource = strdup(resource);
165 router->trustedClients = trustedClients;
166 router->trustedServers = trustedServers;
169 router->classes = osrfNewHash();
170 osrfHashSetCallback(router->classes, &osrfRouterClassFree);
171 router->class_itr = osrfNewHashIterator( router->classes );
172 router->message_list = NULL; // We'll allocate one later
174 // Prepare to connect to Jabber, as a non-component, over TCP (not UNIX domain).
175 router->connection = client_init( domain, port, NULL, 0 );
181 @brief Connect to Jabber.
182 @param router Pointer to the osrfRouter to connect to Jabber.
183 @return 0 if successful, or -1 on error.
185 Allow up to 10 seconds for the logon to succeed.
187 We connect over TCP (not over a UNIX domain), as a non-component.
189 int osrfRouterConnect( osrfRouter* router ) {
190 if(!router) return -1;
191 int ret = client_connect( router->connection, router->name,
192 router->password, router->resource, 10, AUTH_DIGEST );
193 if( ret == 0 ) return -1;
198 @brief Enter endless loop to receive and respond to input.
199 @param router Pointer to the osrfRouter that's looping.
201 On each iteration: wait for incoming messages to arrive on any of our sockets -- i.e.
202 either the top level socket belonging to the router or any of the lower level sockets
203 belonging to the classes. React to the incoming activity as needed.
205 We don't exit the loop until we receive a signal to stop, or until we encounter an error.
207 void osrfRouterRun( osrfRouter* router ) {
208 if(!(router && router->classes)) return;
210 int routerfd = client_sock_fd( router->connection );
213 // Loop until a signal handler sets router->stop
214 while( ! router->stop ) {
217 int maxfd = _osrfRouterFillFDSet( router, &set );
219 // Wait indefinitely for an incoming message
220 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
221 if( EINTR == errno ) {
223 osrfLogWarning( OSRF_LOG_MARK, "Top level select call interrupted by signal" );
227 continue; // Irrelevant signal; ignore it
229 osrfLogWarning( OSRF_LOG_MARK, "Top level select call failed with errno %d: %s",
230 errno, strerror( errno ) );
235 /* see if there is a top level router message */
236 if( FD_ISSET(routerfd, &set) ) {
237 osrfLogDebug( OSRF_LOG_MARK, "Top router socket is active: %d", routerfd );
238 osrfRouterHandleIncoming( router );
241 /* Check each of the connected classes and see if they have data to route */
242 osrfRouterClass* class;
243 osrfHashIterator* itr = router->class_itr; // remove a layer of indirection
244 osrfHashIteratorReset( itr );
246 while( (class = osrfHashIteratorNext(itr)) ) { // for each class
248 const char* classname = osrfHashIteratorKey(itr);
252 osrfLogDebug( OSRF_LOG_MARK, "Checking %s for activity...", classname );
254 int sockfd = client_sock_fd( class->connection );
255 if(FD_ISSET( sockfd, &set )) {
256 osrfLogDebug( OSRF_LOG_MARK, "Socket is active: %d", sockfd );
257 osrfRouterClassHandleIncoming( router, classname, class );
266 @brief Handle incoming requests to the router.
267 @param router Pointer to the osrfRouter.
269 Read all available input messages from the top-level transport_client. For each one:
270 if the domain of the sender's Jabber id is on the list of approved domains, pass the
271 message to osrfRouterHandleCommand() (if there's a message) or osrfRouterHandleAppRequest()
272 (if there isn't). If the domain is @em not on the approved list, log a warning and
275 static void osrfRouterHandleIncoming( osrfRouter* router ) {
278 transport_message* msg = NULL;
280 while( (msg = client_recv( router->connection, 0 )) ) { // for each message
284 osrfLogDebug(OSRF_LOG_MARK,
285 "osrfRouterHandleIncoming(): investigating message from %s", msg->sender);
287 /* if the server is not on a trusted domain, drop the message */
288 int len = strlen(msg->sender) + 1;
290 jid_get_domain( msg->sender, domain, len - 1 );
292 if(osrfStringArrayContains( router->trustedServers, domain)) {
294 // If there's a command, obey it. Otherwise, treat
295 // the message as an app session level request.
296 if( msg->router_command && *msg->router_command )
297 osrfRouterHandleCommand( router, msg );
299 osrfRouterHandleAppRequest( router, msg );
302 osrfLogWarning( OSRF_LOG_MARK,
303 "Received message from un-trusted server domain %s", msg->sender);
311 @brief Handle all available incoming messages for a router class.
312 @param router Pointer to the osrfRouter.
313 @param classname Class name.
314 @param class Pointer to the osrfRouterClass.
316 For each message: if the sender is on a trusted domain, process the message.
318 static void osrfRouterClassHandleIncoming( osrfRouter* router, const char* classname,
319 osrfRouterClass* class ) {
320 if(!(router && class)) return;
322 transport_message* msg;
323 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleIncoming()");
325 // For each incoming message for this class:
326 while( (msg = client_recv( class->connection, 0 )) ) {
328 // Save the transaction id so that we can incorporate it
329 // into any relevant messages
330 osrfLogSetXid(msg->osrf_xid);
334 osrfLogDebug(OSRF_LOG_MARK,
335 "osrfRouterClassHandleIncoming(): investigating message from %s", msg->sender);
337 /* if the client is not from a trusted domain, drop the message */
338 int len = strlen(msg->sender) + 1;
340 jid_get_domain( msg->sender, domain, len - 1 );
342 if(osrfStringArrayContains( router->trustedClients, domain )) {
344 if( msg->is_error ) {
346 // A previous message bounced. Try to send a clone of it to a
347 // different node of the same class.
348 transport_message* bouncedMessage = osrfRouterClassHandleBounce(
349 router, classname, class, msg );
350 /* handle bounced message */
351 if( !bouncedMessage ) {
352 /* we have no one to send the requested message to */
357 osrfRouterClassHandleMessage( router, class, bouncedMessage );
358 message_free( bouncedMessage );
360 osrfRouterClassHandleMessage( router, class, msg );
363 osrfLogWarning( OSRF_LOG_MARK,
364 "Received client message from untrusted client domain %s", domain );
369 osrfLogClearXid(); // We're done with this transaction id
374 @brief Handle a top level router command.
375 @param router Pointer to the osrfRouter.
376 @param msg Pointer to the transport_message to be handled.
378 Currently supported commands:
379 - "register" -- Add a server class and/or a server node to our lists.
380 - "unregister" -- Remove a node from a class, and the class as well if no nodes are
383 static void osrfRouterHandleCommand( osrfRouter* router, const transport_message* msg ) {
384 if(!(router && msg && msg->router_class)) return;
386 if( !strcmp( msg->router_command, ROUTER_REGISTER ) ) {
388 osrfLogInfo( OSRF_LOG_MARK, "Registering class %s", msg->router_class );
390 // Add the server class to the list, if it isn't already there
391 osrfRouterClass* class = osrfRouterFindClass( router, msg->router_class );
393 class = osrfRouterAddClass( router, msg->router_class );
395 // Add the node to the osrfRouterClass's list, if it isn't already there
396 if(class && ! osrfRouterClassFindNode( class, msg->sender ) )
397 osrfRouterClassAddNode( class, msg->sender );
399 } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
401 if( msg->router_class && *msg->router_class ) {
402 osrfLogInfo( OSRF_LOG_MARK, "Unregistering router class %s", msg->router_class );
403 osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
410 @brief Add an osrfRouterClass to a router, and open a connection for it.
411 @param router Pointer to the osrfRouter.
412 @param classname The name of the class this node handles.
413 @return A pointer to the new osrfRouterClass, or NULL upon error.
415 Open a Jabber session to be used for this server class. The Jabber ID incorporates the
416 class name as the resource name.
418 static osrfRouterClass* osrfRouterAddClass( osrfRouter* router, const char* classname ) {
419 if(!(router && router->classes && classname)) return NULL;
421 osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
422 class->nodes = osrfNewHash();
423 class->itr = osrfNewHashIterator(class->nodes);
424 osrfHashSetCallback(class->nodes, &osrfRouterNodeFree);
425 class->router = router;
427 class->connection = client_init( router->domain, router->port, NULL, 0 );
429 if(!client_connect( class->connection, router->name,
430 router->password, classname, 10, AUTH_DIGEST ) ) {
431 // Cast away the constness of classname. Though ugly, this
432 // cast is benign because osrfRouterClassFree doesn't actually
433 // write through the pointer. We can't readily change its
434 // signature because it is used for a function pointer, and
435 // we would have to change other signatures the same way.
436 osrfRouterClassFree( (char *) classname, class );
440 osrfHashSet( router->classes, class, classname );
446 @brief Add a new server node to an osrfRouterClass.
447 @param rclass Pointer to the osrfRouterClass to which we are to add the node.
448 @param remoteId The remote login of the osrfRouterNode.
450 static void osrfRouterClassAddNode( osrfRouterClass* rclass, const char* remoteId ) {
451 if(!(rclass && rclass->nodes && remoteId)) return;
453 osrfLogInfo( OSRF_LOG_MARK, "Adding router node for remote id %s", remoteId );
455 osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
457 node->lastMessage = NULL;
458 node->remoteId = strdup(remoteId);
460 osrfHashSet( rclass->nodes, node, remoteId );
464 @brief Handle an input message representing a Jabber error stanza.
465 @param router Pointer to the current osrfRouter.
466 @param classname Name of the class to which the error stanza was sent.
467 @param rclass Pointer to the osrfRouterClass to which the error stanza was sent.
468 @param msg Pointer to the transport_message representing the error stanza.
469 @return Pointer to a newly allocated transport_message; or NULL (see remarks).
471 The presumption is that the relevant node is dead. If another node is available for
472 the same class, then remove the dead one, create a clone of the message to be sent
473 elsewhere, and return a pointer to it. If there is no other node for the same class,
474 send a cancel message back to the sender, remove both the node and the class it belongs
475 to, and return NULL. If we can't even do that because the entire class is dead, log
476 a message to that effect and return NULL.
478 static transport_message* osrfRouterClassHandleBounce( osrfRouter* router,
479 const char* classname, osrfRouterClass* rclass, const transport_message* msg ) {
481 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
483 osrfLogInfo( OSRF_LOG_MARK, "Received network layer error message from %s", msg->sender );
484 osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
486 osrfLogInfo( OSRF_LOG_MARK,
487 "network error occurred after we removed the class.. ignoring");
491 if( osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
493 if( node->lastMessage ) {
494 osrfLogWarning( OSRF_LOG_MARK,
495 "We lost the last node in the class, responding with error and removing...");
497 transport_message* error = message_init(
498 node->lastMessage->body, node->lastMessage->subject,
499 node->lastMessage->thread, node->lastMessage->router_from,
500 node->lastMessage->recipient );
501 message_set_osrf_xid(error, node->lastMessage->osrf_xid);
502 set_msg_error( error, "cancel", 501 );
504 /* send the error message back to the original sender */
505 client_send_message( rclass->connection, error );
506 message_free( error );
509 /* remove the dead node */
510 osrfRouterClassRemoveNode( router, classname, msg->sender);
515 transport_message* lastSent = NULL;
517 if( node->lastMessage ) {
518 osrfLogDebug( OSRF_LOG_MARK, "Cloning lastMessage so next node can send it");
519 lastSent = message_init( node->lastMessage->body,
520 node->lastMessage->subject, node->lastMessage->thread, "",
521 node->lastMessage->router_from );
522 message_set_router_info( lastSent, node->lastMessage->router_from,
523 NULL, NULL, NULL, 0 );
524 message_set_osrf_xid( lastSent, node->lastMessage->osrf_xid );
527 /* remove the dead node */
528 osrfRouterClassRemoveNode( router, classname, msg->sender);
535 @brief Forward a class-level message to a listener for the corresponding service.
536 @param router Pointer to the current osrfRouter.
537 @param rclass Pointer to the class to which the message is directed.
538 @param msg Pointer to the message to be forwarded.
540 Pick a node for the specified class, and forward the message to it.
542 We use an iterator, stored with the class, to maintain a position in the class's list
543 of nodes. Advance the iterator to pick the next node, and if we reach the end, go
544 back to the beginning of the list.
546 static void osrfRouterClassHandleMessage(
547 osrfRouter* router, osrfRouterClass* rclass, const transport_message* msg ) {
548 if(!(router && rclass && msg)) return;
550 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
552 // Pick a node, in a round-robin.
553 osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
554 if(!node) { // wrap around to the beginning of the list
555 osrfHashIteratorReset(rclass->itr);
556 node = osrfHashIteratorNext( rclass->itr );
559 if(node) { // should always be true -- no class without a node
561 // Build a transport message
562 transport_message* new_msg = message_init( msg->body,
563 msg->subject, msg->thread, node->remoteId, msg->sender );
564 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
565 message_set_osrf_xid( new_msg, msg->osrf_xid );
567 osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
568 new_msg->router_from, new_msg->recipient );
570 // Save it for possible future reference
571 message_free( node->lastMessage );
572 node->lastMessage = new_msg;
575 if ( client_send_message( rclass->connection, new_msg ) == 0 )
579 message_prepare_xml(new_msg);
580 osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s",
581 new_msg->sender, new_msg->recipient, new_msg->msg_xml );
583 // We don't free new_msg here because we saved it as node->lastMessage.
589 @brief Remove a given osrfRouterClass from an osrfRouter
590 @param router Pointer to the osrfRouter.
591 @param classname The name of the class to be removed.
593 Delete an osrfRouterClass from the router's list of classes. Indirectly (via a callback
594 function installed in the osrfHash), free the osrfRouterClass and any associated nodes.
596 static void osrfRouterRemoveClass( osrfRouter* router, const char* classname ) {
597 if( router && router->classes && classname ) {
598 osrfLogInfo( OSRF_LOG_MARK, "Removing router class %s", classname );
599 osrfHashRemove( router->classes, classname );
605 @brief Remove a node from a class. If the class thereby becomes empty, remove it as well.
606 @param router Pointer to the current osrfRouter.
607 @param classname Class name.
608 @param remoteId Identifier for the node to be removed.
610 static void osrfRouterClassRemoveNode(
611 osrfRouter* router, const char* classname, const char* remoteId ) {
613 if(!(router && router->classes && classname && remoteId)) // sanity check
616 osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
618 osrfRouterClass* class = osrfRouterFindClass( router, classname );
620 osrfHashRemove( class->nodes, remoteId );
621 if( osrfHashGetCount(class->nodes) == 0 ) {
622 osrfRouterRemoveClass( router, classname );
629 @brief Free a router class object.
630 @param classname Class name (not used).
631 @param c Pointer to the osrfRouterClass, cast to a void pointer.
633 This function is invoked as a callback when we remove an osrfRouterClass from the
634 router's list of classes.
636 static void osrfRouterClassFree( char* classname, void* c ) {
639 osrfRouterClass* rclass = (osrfRouterClass*) c;
640 client_disconnect( rclass->connection );
641 client_free( rclass->connection );
643 osrfHashIteratorReset( rclass->itr );
644 osrfRouterNode* node;
646 while( (node = osrfHashIteratorNext(rclass->itr)) )
647 osrfHashRemove( rclass->nodes, node->remoteId );
649 osrfHashIteratorFree(rclass->itr);
650 osrfHashFree(rclass->nodes);
656 @brief Free an osrfRouterNode.
657 @param remoteId Jabber ID of node (not used).
658 @param n Pointer to the osrfRouterNode to be freed, cast to a void pointer.
660 This is a callback installed in an osrfHash (the nodes member of an osrfRouterClass).
662 static void osrfRouterNodeFree( char* remoteId, void* n ) {
664 osrfRouterNode* node = (osrfRouterNode*) n;
665 free(node->remoteId);
666 message_free(node->lastMessage);
672 @brief Free an osrfRouter and everything it owns.
673 @param router Pointer to the osrfRouter to be freed.
675 The osrfRouterClasses and osrfRouterNodes are freed by callback functions installed in
678 void osrfRouterFree( osrfRouter* router ) {
681 osrfHashIteratorFree( router->class_itr);
682 osrfHashFree(router->classes);
683 free(router->domain);
685 free(router->resource);
686 free(router->password);
688 osrfStringArrayFree( router->trustedClients );
689 osrfStringArrayFree( router->trustedServers );
690 osrfListFree( router->message_list );
692 client_free( router->connection );
698 @brief Given a class name, find the corresponding osrfRouterClass.
699 @param router Pointer to the osrfRouter that owns the osrfRouterClass.
700 @param classname Name of the class.
701 @return Pointer to a matching osrfRouterClass if found, or NULL if not.
703 static osrfRouterClass* osrfRouterFindClass( osrfRouter* router, const char* classname ) {
704 if(!( router && router->classes && classname )) return NULL;
705 return (osrfRouterClass*) osrfHashGet( router->classes, classname );
710 @brief Find a given node for a given class.
711 @param rclass Pointer to the osrfRouterClass in which to search.
712 @param remoteId Jabber ID of the node for which to search.
713 @return Pointer to the matching osrfRouterNode, if found; otherwise NULL.
715 static osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass,
716 const char* remoteId ) {
717 if(!(rclass && remoteId)) return NULL;
718 return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
723 @brief Fill an fd_set with all the sockets owned by the osrfRouter.
724 @param router Pointer to the osrfRouter whose sockets are to be used.
725 @param set Pointer to the fd_set that is to be filled.
726 @return The largest file descriptor loaded into the fd_set; or -1 upon error.
728 There's one socket for the osrfRouter as a whole, and one for each osrfRouterClass
729 that belongs to it. We load them all.
731 static int _osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
732 if(!(router && router->classes && set)) return -1;
735 int maxfd = client_sock_fd( router->connection );
740 osrfRouterClass* class = NULL;
741 osrfHashIterator* itr = router->class_itr;
742 osrfHashIteratorReset( itr );
744 while( (class = osrfHashIteratorNext(itr)) ) {
745 const char* classname = osrfHashIteratorKey(itr);
747 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
748 sockid = client_sock_fd( class->connection );
750 if( osrfUtilsCheckFileDescriptor( sockid ) ) {
752 osrfLogWarning(OSRF_LOG_MARK,
753 "Removing router class '%s' because of a bad top-level file descriptor [%d]",
755 osrfRouterRemoveClass( router, classname );
758 if( sockid > maxfd ) maxfd = sockid;
768 @brief Handler a router-level message that isn't a command; presumed to be an app request.
769 @param router Pointer to the current osrfRouter.
770 @param msg Pointer to the incoming message.
772 The body of the transport_message is a JSON string, specifically an JSON array.
773 Translate the JSON into a series of osrfMessages, one for each element of the JSON
774 array. Process each osrfMessage in turn. Each message is either a CONNECT or a
777 static void osrfRouterHandleAppRequest( osrfRouter* router, const transport_message* msg ) {
779 // Translate the JSON into a list of osrfMessages
780 router->message_list = osrfMessageDeserialize( msg->body, router->message_list );
781 osrfMessage* omsg = NULL;
783 // Process each osrfMessage
785 for( i = 0; i < router->message_list->size; ++i ) {
787 omsg = osrfListGetIndex( router->message_list, i );
790 switch( omsg->m_type ) {
793 osrfRouterRespondConnect( router, msg, omsg );
797 osrfRouterProcessAppRequest( router, msg, omsg );
804 osrfMessageFree( omsg );
812 @brief Respond to a CONNECT message.
813 @param router Pointer to the current osrfRouter.
814 @param msg Pointer to the transport_message that the osrfMessage came from.
815 @param omsg Pointer to the osrfMessage to be processed.
817 An application is trying to connect to the router. Reply with a STATUS message
820 "CONNECT" is a bit of a misnomer. We don't establish a stateful session; i.e. we
821 don't retain any memory of this message. We just confirm that the router is alive,
822 and that the client has a good address for it.
824 static void osrfRouterRespondConnect( osrfRouter* router, const transport_message* msg,
825 const osrfMessage* omsg ) {
826 if(!(router && msg && omsg))
829 osrfLogDebug( OSRF_LOG_MARK, "router received a CONNECT message from %s", msg->sender );
831 // Build a success message
832 osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
833 osrf_message_set_status_info(
834 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
836 // Translate the success message into JSON,
837 // and then package the JSON in a transport message
838 char* data = osrf_message_serialize(success);
839 osrfMessageFree( success );
840 transport_message* return_msg = message_init(
841 data, // message payload, translated from osrfMessage
843 msg->thread, // same thread
844 msg->sender, // destination (client's Jabber ID)
845 "" // don't send our address; client already has it
849 client_send_message( router->connection, return_msg );
850 message_free( return_msg );
855 @brief Respond to a REQUEST message.
856 @param router Pointer to the current osrfRouter.
857 @param msg Pointer to the transport_message that the osrfMessage came from.
858 @param omsg Pointer to the osrfMessage to be processed.
860 Respond to an information request from an application. Most types of request involve
861 one or more counts of messages successfully routed.
863 Request types currently supported:
864 - "opensrf.router.info.class.list" -- list of class names.
865 - "opensrf.router.info.stats.class.summary" -- total count for a specified class.
866 - "opensrf.router.info.stats.class" -- count for every node of a specified class.
867 - "opensrf.router.info.stats.class.all" -- count for every node of every class.
868 - "opensrf.router.info.stats.class.node.all" -- total count for every class.
870 static void osrfRouterProcessAppRequest( osrfRouter* router, const transport_message* msg,
871 const osrfMessage* omsg ) {
873 if(!(router && msg && omsg && omsg->method_name))
876 osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
878 // Branch on the request type. Build a jsonObject as an answer to the request.
879 jsonObject* jresponse = NULL;
880 if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
882 // Prepare an array of class names.
884 jresponse = jsonNewObjectType(JSON_ARRAY);
886 osrfStringArray* keys = osrfHashKeys( router->classes );
887 for( i = 0; i != keys->size; i++ )
888 jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
889 osrfStringArrayFree(keys);
891 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_SUMMARY )) {
893 // Prepare a count of all the messages successfully routed for a given class.
896 // class name is the first parameter
897 const char* classname = jsonObjectGetString( jsonObjectGetIndex( omsg->_params, 0 ) );
901 osrfRouterClass* class = osrfHashGet(router->classes, classname);
903 // For each node: add the count to the total.
904 osrfRouterNode* node;
905 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
906 while( (node = osrfHashIteratorNext(node_itr)) ) {
907 count += node->count;
908 // jsonObjectSetKey( class_res, node->remoteId,
909 // jsonNewNumberObject( (double) node->count ) );
911 osrfHashIteratorFree(node_itr);
913 jresponse = jsonNewNumberObject( (double) count );
915 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS )) {
917 // Prepare a hash for a given class. Key: the remoteId of a node. Datum: the
918 // number of messages successfully routed for that node.
920 // class name is the first parameter
921 const char* classname = jsonObjectGetString( jsonObjectGetIndex( omsg->_params, 0 ) );
925 jresponse = jsonNewObjectType(JSON_HASH);
926 osrfRouterClass* class = osrfHashGet(router->classes, classname);
928 // For each node: get the count and store it in the hash.
929 osrfRouterNode* node;
930 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
931 while( (node = osrfHashIteratorNext(node_itr)) ) {
932 jsonObjectSetKey( jresponse, node->remoteId,
933 jsonNewNumberObject( (double) node->count ) );
935 osrfHashIteratorFree(node_itr);
937 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_CLASS_FULL )) {
939 // Prepare a hash of hashes, giving the message counts for each node for each class.
941 osrfRouterClass* class;
942 osrfRouterNode* node;
943 jresponse = jsonNewObjectType(JSON_HASH); // Key: class name.
945 // Traverse the list of classes.
946 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
947 while( (class = osrfHashIteratorNext(class_itr)) ) {
949 jsonObject* class_res = jsonNewObjectType(JSON_HASH); // Key: remoteId of node.
950 const char* classname = osrfHashIteratorKey(class_itr);
952 // Traverse the list of nodes for the current class.
953 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
954 while( (node = osrfHashIteratorNext(node_itr)) ) {
955 jsonObjectSetKey( class_res, node->remoteId,
956 jsonNewNumberObject( (double) node->count ) );
958 osrfHashIteratorFree(node_itr);
960 jsonObjectSetKey( jresponse, classname, class_res );
963 osrfHashIteratorFree(class_itr);
965 } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_STATS_NODE_FULL )) {
967 // Prepare a hash. Key: class name. Datum: total number of successfully routed
968 // messages routed for nodes of that class.
970 osrfRouterClass* class;
971 osrfRouterNode* node;
972 jresponse = jsonNewObjectType(JSON_HASH);
974 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
975 while( (class = osrfHashIteratorNext(class_itr)) ) { // For each class
978 const char* classname = osrfHashIteratorKey(class_itr);
980 osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
981 while( (node = osrfHashIteratorNext(node_itr)) ) { // For each node
982 count += node->count;
984 osrfHashIteratorFree(node_itr);
986 jsonObjectSetKey( jresponse, classname, jsonNewNumberObject( (double) count ) );
989 osrfHashIteratorFree(class_itr);
991 } else { // None of the above
993 osrfRouterHandleMethodNFound( router, msg, omsg );
997 // Send the result back to the requester.
998 osrfRouterSendAppResponse( router, msg, omsg, jresponse );
999 jsonObjectFree(jresponse);
1004 @brief Respond to an invalid REQUEST message.
1005 @param router Pointer to the current osrfRouter.
1006 @param msg Pointer to the transport_message that contained the REQUEST message.
1007 @param omsg Pointer to the osrfMessage that contained the REQUEST.
1009 static void osrfRouterHandleMethodNFound( osrfRouter* router,
1010 const transport_message* msg, const osrfMessage* omsg ) {
1012 // Create an exception message
1013 osrfMessage* err = osrf_message_init( STATUS, omsg->thread_trace, 1 );
1014 osrf_message_set_status_info( err,
1015 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
1017 // Translate it into JSON
1018 char* data = osrf_message_serialize(err);
1019 osrfMessageFree( err );
1021 // Wrap the JSON up in a transport_message
1022 transport_message* tresponse = message_init(
1023 data, "", msg->thread, msg->sender, msg->recipient );
1027 client_send_message( router->connection, tresponse );
1028 message_free( tresponse );
1033 @brief Send a response to a router REQUEST message.
1034 @param router Pointer to the current osrfRouter.
1035 @param msg Pointer to the transport_message that contained the REQUEST message.
1036 @param omsg Pointer to the osrfMessage that contained the REQUEST.
1037 @param response Pointer to the jsonObject that constitutes the response.
1039 static void osrfRouterSendAppResponse( osrfRouter* router, const transport_message* msg,
1040 const osrfMessage* omsg, const jsonObject* response ) {
1042 if( response ) { /* send the response message */
1044 // Turn the jsonObject into JSON, and load it into an osrfMessage
1045 osrfMessage* oresponse = osrf_message_init(
1046 RESULT, omsg->thread_trace, omsg->protocol );
1048 char* json = jsonObjectToJSON(response);
1049 osrf_message_set_result_content( oresponse, json );
1052 // Package the osrfMessage into a transport_message, and send it
1053 char* data = osrf_message_serialize(oresponse);
1054 osrfMessageFree(oresponse);
1055 osrfLogDebug( OSRF_LOG_MARK, "Responding to client app request with data: \n%s\n", data );
1057 transport_message* tresponse = message_init(
1058 data, "", msg->thread, msg->sender, msg->recipient );
1061 client_send_message(router->connection, tresponse );
1062 message_free(tresponse);
1065 /* now send the 'request complete' message */
1066 osrfMessage* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
1067 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete",
1068 OSRF_STATUS_COMPLETE );
1069 char* statusdata = osrf_message_serialize(status);
1070 osrfMessageFree(status);
1072 transport_message* sresponse = message_init(
1073 statusdata, "", msg->thread, msg->sender, msg->recipient );
1076 client_send_message(router->connection, sresponse );
1077 message_free(sresponse);