1 #include "osrf_router.h"
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
10 osrfRouter* osrfNewRouter(
11 char* domain, char* name,
12 char* resource, char* password, int port,
13 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
15 if(!( domain && name && resource && password && port && trustedClients && trustedServers )) return NULL;
17 osrfRouter* router = safe_malloc(sizeof(osrfRouter));
18 router->domain = strdup(domain);
19 router->name = strdup(name);
20 router->password = strdup(password);
21 router->resource = strdup(resource);
24 router->trustedClients = trustedClients;
25 router->trustedServers = trustedServers;
28 router->classes = osrfNewHash();
29 router->classes->freeItem = &osrfRouterClassFree;
31 router->connection = client_init( domain, port, NULL, 0 );
38 int osrfRouterConnect( osrfRouter* router ) {
39 if(!router) return -1;
40 int ret = client_connect( router->connection, router->name,
41 router->password, router->resource, 10, AUTH_DIGEST );
42 if( ret == 0 ) return -1;
47 void osrfRouterRun( osrfRouter* router ) {
48 if(!(router && router->classes)) return;
50 int routerfd = router->ROUTER_SOCKFD;
56 int maxfd = __osrfRouterFillFDSet( router, &set );
59 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
60 osrfLogWarning( OSRF_LOG_MARK, "Top level select call failed with errno %d", errno);
64 /* see if there is a top level router message */
66 if( FD_ISSET(routerfd, &set) ) {
67 osrfLogDebug( OSRF_LOG_MARK, "Top router socket is active: %d", routerfd );
69 osrfRouterHandleIncoming( router );
73 /* now check each of the connected classes and see if they have data to route */
74 while( numhandled < selectret ) {
76 osrfRouterClass* class;
77 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
79 while( (class = osrfHashIteratorNext(itr)) ) {
81 char* classname = itr->current;
83 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
85 osrfLogDebug( OSRF_LOG_MARK, "Checking %s for activity...", classname );
87 int sockfd = class->ROUTER_SOCKFD;
88 if(FD_ISSET( sockfd, &set )) {
89 osrfLogDebug( OSRF_LOG_MARK, "Socket is active: %d", sockfd );
91 osrfRouterClassHandleIncoming( router, classname, class );
96 osrfHashIteratorFree(itr);
102 void osrfRouterHandleIncoming( osrfRouter* router ) {
105 transport_message* msg = NULL;
107 //if( (msg = client_recv( router->connection, 0 )) ) {
108 while( (msg = client_recv( router->connection, 0 )) ) {
112 osrfLogDebug(OSRF_LOG_MARK,
113 "osrfRouterHandleIncoming(): investigating message from %s", msg->sender);
115 /* if the sender is not a trusted server, drop the message */
116 int len = strlen(msg->sender) + 1;
119 jid_get_domain( msg->sender, domain, len - 1 );
121 if(osrfStringArrayContains( router->trustedServers, domain))
122 osrfRouterHandleMessage( router, msg );
124 osrfLogWarning( OSRF_LOG_MARK, "Received message from un-trusted server domain %s", msg->sender);
131 int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
132 if(!(router && class)) return -1;
134 transport_message* msg;
135 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleIncoming()");
137 while( (msg = client_recv( class->connection, 0 )) ) {
141 osrfLogDebug(OSRF_LOG_MARK,
142 "osrfRouterClassHandleIncoming(): investigating message from %s", msg->sender);
144 /* if the client is not from a trusted domain, drop the message */
145 int len = strlen(msg->sender) + 1;
148 jid_get_domain( msg->sender, domain, len - 1 );
150 if(osrfStringArrayContains( router->trustedClients, domain)) {
152 transport_message* bouncedMessage = NULL;
153 if( msg->is_error ) {
155 /* handle bounced message */
156 if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) )
157 return -1; /* we have no one to send the requested message to */
160 msg = bouncedMessage;
162 osrfRouterClassHandleMessage( router, class, msg );
165 osrfLogWarning( OSRF_LOG_MARK, "Received client message from untrusted client domain %s", domain );
178 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
179 if(!(router && msg)) return -1;
181 if( !msg->router_command || !strcmp(msg->router_command,""))
182 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
184 if(!msg->router_class) return -1;
186 osrfRouterClass* class = NULL;
187 if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
188 class = osrfRouterFindClass( router, msg->router_class );
190 osrfLogInfo( OSRF_LOG_MARK, "Registering class %s", msg->router_class );
192 if(!class) class = osrfRouterAddClass( router, msg->router_class );
196 if( osrfRouterClassFindNode( class, msg->sender ) )
199 osrfRouterClassAddNode( class, msg->sender );
203 } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
205 if( msg->router_class && strcmp( msg->router_class, "") ) {
206 osrfLogInfo( OSRF_LOG_MARK, "Unregistering router class %s", msg->router_class );
207 osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
216 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
217 if(!(router && router->classes && classname)) return NULL;
219 osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
220 class->nodes = osrfNewHash();
221 class->itr = osrfNewHashIterator(class->nodes);
222 class->nodes->freeItem = &osrfRouterNodeFree;
223 class->router = router;
225 class->connection = client_init( router->domain, router->port, NULL, 0 );
227 if(!client_connect( class->connection, router->name,
228 router->password, classname, 10, AUTH_DIGEST ) ) {
229 osrfRouterClassFree( classname, class );
233 osrfHashSet( router->classes, class, classname );
238 int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
239 if(!(rclass && rclass->nodes && remoteId)) return -1;
241 osrfLogInfo( OSRF_LOG_MARK, "Adding router node for remote id %s", remoteId );
243 osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
245 node->lastMessage = NULL;
246 node->remoteId = strdup(remoteId);
248 osrfHashSet( rclass->nodes, node, remoteId );
252 /* copy off the lastMessage, remove the offending node, send error if it's tht last node
253 ? return NULL if it's the last node ?
256 transport_message* osrfRouterClassHandleBounce(
257 osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
259 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
261 osrfLogInfo( OSRF_LOG_MARK, "Received network layer error message from %s", msg->sender );
262 osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
263 transport_message* lastSent = NULL;
265 if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
267 if( node->lastMessage ) {
268 osrfLogWarning( OSRF_LOG_MARK, "We lost the last node in the class, responding with error and removing...");
270 transport_message* error = message_init(
271 node->lastMessage->body, node->lastMessage->subject,
272 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
273 set_msg_error( error, "cancel", 501 );
275 /* send the error message back to the original sender */
276 client_send_message( rclass->connection, error );
277 message_free( error );
285 if( node->lastMessage ) {
286 osrfLogDebug( OSRF_LOG_MARK, "Cloning lastMessage so next node can send it");
287 lastSent = message_init( node->lastMessage->body,
288 node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
289 message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
293 osrfLogInfo(OSRF_LOG_MARK, "network error occurred after we removed the class.. ignoring");
298 /* remove the dead node */
299 osrfRouterClassRemoveNode( router, classname, msg->sender);
305 If we get a regular message, we send it to the next node in the list of nodes
306 if we get an error, it's a bounce back from a previous attempt. We take the
307 body and thread from the last sent on the node that had the bounced message
308 and propogate them on to the new message being sent
310 int osrfRouterClassHandleMessage(
311 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
312 if(!(router && rclass && msg)) return -1;
314 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
316 osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
318 osrfHashIteratorReset(rclass->itr);
319 node = osrfHashIteratorNext( rclass->itr );
324 transport_message* new_msg= message_init( msg->body,
325 msg->subject, msg->thread, node->remoteId, msg->sender );
326 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
328 osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
329 new_msg->router_from, new_msg->recipient );
331 message_free( node->lastMessage );
332 node->lastMessage = new_msg;
334 if ( client_send_message( rclass->connection, new_msg ) == 0 )
338 message_prepare_xml(new_msg);
339 osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s",
340 new_msg->sender, new_msg->recipient, new_msg->msg_xml );
349 int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
350 if(!(router && router->classes && classname)) return -1;
351 osrfLogInfo( OSRF_LOG_MARK, "Removing router class %s", classname );
352 osrfHashRemove( router->classes, classname );
357 int osrfRouterClassRemoveNode(
358 osrfRouter* router, char* classname, char* remoteId ) {
360 if(!(router && router->classes && classname && remoteId)) return 0;
362 osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
364 osrfRouterClass* class = osrfRouterFindClass( router, classname );
368 osrfHashRemove( class->nodes, remoteId );
369 if( osrfHashGetCount(class->nodes) == 0 ) {
370 osrfRouterRemoveClass( router, classname );
381 void osrfRouterClassFree( char* classname, void* c ) {
382 if(!(classname && c)) return;
383 osrfRouterClass* rclass = (osrfRouterClass*) c;
384 client_disconnect( rclass->connection );
385 client_free( rclass->connection );
387 osrfHashIteratorReset( rclass->itr );
388 osrfRouterNode* node;
390 while( (node = osrfHashIteratorNext(rclass->itr)) )
391 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
397 void osrfRouterNodeFree( char* remoteId, void* n ) {
399 osrfRouterNode* node = (osrfRouterNode*) n;
400 free(node->remoteId);
401 message_free(node->lastMessage);
406 void osrfRouterFree( osrfRouter* router ) {
409 free(router->domain);
411 free(router->resource);
412 free(router->password);
414 osrfStringArrayFree( router->trustedClients );
415 osrfStringArrayFree( router->trustedServers );
417 client_free( router->connection );
423 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
424 if(!( router && router->classes && classname )) return NULL;
425 return (osrfRouterClass*) osrfHashGet( router->classes, classname );
429 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
430 if(!(rclass && remoteId)) return NULL;
431 return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
435 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
436 if(!(router && router->classes && set)) return -1;
439 int maxfd = router->ROUTER_SOCKFD;
444 osrfRouterClass* class = NULL;
445 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
447 while( (class = osrfHashIteratorNext(itr)) ) {
448 char* classname = itr->current;
450 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
451 sockid = class->ROUTER_SOCKFD;
453 if( osrfUtilsCheckFileDescriptor( sockid ) ) {
455 osrfLogWarning(OSRF_LOG_MARK,
456 "Removing router class '%s' because of a bad top-level file descriptor [%d]", classname, sockid);
457 osrfRouterRemoveClass( router, classname );
460 if( sockid > maxfd ) maxfd = sockid;
466 osrfHashIteratorFree(itr);
472 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
478 int num_msgs = osrf_message_deserialize( msg->body, arr, T );
479 osrfMessage* omsg = NULL;
482 for( i = 0; i != num_msgs; i++ ) {
484 if( !(omsg = arr[i]) ) continue;
486 switch( omsg->m_type ) {
489 osrfRouterRespondConnect( router, msg, omsg );
493 osrfRouterProcessAppRequest( router, msg, omsg );
499 osrfMessageFree( omsg );
505 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
506 if(!(router && msg && omsg)) return -1;
508 osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
510 osrfLogDebug( OSRF_LOG_MARK, "router recevied a CONNECT message from %s", msg->sender );
512 osrf_message_set_status_info(
513 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
515 char* data = osrf_message_serialize(success);
517 transport_message* return_m = message_init(
518 data, "", msg->thread, msg->sender, "" );
520 client_send_message(router->connection, return_m);
523 osrf_message_free(success);
524 message_free(return_m);
531 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
533 if(!(router && msg && omsg && omsg->method_name)) return -1;
535 osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
537 jsonObject* jresponse = NULL;
538 if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
541 jresponse = jsonParseString("[]");
543 osrfStringArray* keys = osrfHashKeys( router->classes );
544 for( i = 0; i != keys->size; i++ )
545 jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
546 osrfStringArrayFree(keys);
551 return osrfRouterHandleMethodNFound( router, msg, omsg );
555 osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
556 jsonObjectFree(jresponse);
564 int osrfRouterHandleMethodNFound(
565 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
567 osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
568 osrf_message_set_status_info( err,
569 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
571 char* data = osrf_message_serialize(err);
573 transport_message* tresponse = message_init(
574 data, "", msg->thread, msg->sender, msg->recipient );
576 client_send_message(router->connection, tresponse );
579 osrf_message_free( err );
580 message_free(tresponse);
586 int osrfRouterHandleAppResponse( osrfRouter* router,
587 transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
589 if( response ) { /* send the response message */
591 osrfMessage* oresponse = osrf_message_init(
592 RESULT, omsg->thread_trace, omsg->protocol );
594 char* json = jsonObjectToJSON(response);
595 osrf_message_set_result_content( oresponse, json);
597 char* data = osrf_message_serialize(oresponse);
598 osrfLogDebug( OSRF_LOG_MARK, "Responding to client app request with data: \n%s\n", data );
600 transport_message* tresponse = message_init(
601 data, "", msg->thread, msg->sender, msg->recipient );
603 client_send_message(router->connection, tresponse );
605 osrfMessageFree(oresponse);
606 message_free(tresponse);
612 /* now send the 'request complete' message */
613 osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
614 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
616 char* statusdata = osrf_message_serialize(status);
618 transport_message* sresponse = message_init(
619 statusdata, "", msg->thread, msg->sender, msg->recipient );
620 client_send_message(router->connection, sresponse );
624 osrfMessageFree(status);
625 message_free(sresponse);