1 #include "osrf_router.h"
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
10 osrfRouter* osrfNewRouter(
11 char* domain, char* name,
12 char* resource, char* password, int port,
13 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
15 if(!( domain && name && resource && password && port && trustedClients && trustedServers )) return NULL;
17 osrfRouter* router = safe_malloc(sizeof(osrfRouter));
18 router->domain = strdup(domain);
19 router->name = strdup(name);
20 router->password = strdup(password);
21 router->resource = strdup(resource);
24 router->trustedClients = trustedClients;
25 router->trustedServers = trustedServers;
28 router->classes = osrfNewHash();
29 router->classes->freeItem = &osrfRouterClassFree;
31 router->connection = client_init( domain, port, NULL, 0 );
38 int osrfRouterConnect( osrfRouter* router ) {
39 if(!router) return -1;
40 int ret = client_connect( router->connection, router->name,
41 router->password, router->resource, 10, AUTH_DIGEST );
42 if( ret == 0 ) return -1;
47 void osrfRouterRun( osrfRouter* router ) {
48 if(!(router && router->classes)) return;
50 int routerfd = router->ROUTER_SOCKFD;
56 int maxfd = __osrfRouterFillFDSet( router, &set );
59 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
60 osrfLogWarning( OSRF_LOG_MARK, "Top level select call failed with errno %d", errno);
64 /* see if there is a top level router message */
66 if( FD_ISSET(routerfd, &set) ) {
67 osrfLogDebug( OSRF_LOG_MARK, "Top router socket is active: %d", routerfd );
69 osrfRouterHandleIncoming( router );
73 /* now check each of the connected classes and see if they have data to route */
74 while( numhandled < selectret ) {
76 osrfRouterClass* class;
77 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
79 while( (class = osrfHashIteratorNext(itr)) ) {
81 char* classname = itr->current;
83 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
85 osrfLogDebug( OSRF_LOG_MARK, "Checking %s for activity...", classname );
87 int sockfd = class->ROUTER_SOCKFD;
88 if(FD_ISSET( sockfd, &set )) {
89 osrfLogDebug( OSRF_LOG_MARK, "Socket is active: %d", sockfd );
91 osrfRouterClassHandleIncoming( router, classname, class );
96 osrfHashIteratorFree(itr);
102 void osrfRouterHandleIncoming( osrfRouter* router ) {
105 transport_message* msg = NULL;
107 if( (msg = client_recv( router->connection, 0 )) ) {
111 /* if the sender is not a trusted server, drop the message */
112 int len = strlen(msg->sender) + 1;
115 jid_get_domain( msg->sender, domain, len - 1 );
117 if(osrfStringArrayContains( router->trustedServers, domain))
118 osrfRouterHandleMessage( router, msg );
120 osrfLogWarning( OSRF_LOG_MARK, "Received message from un-trusted server domain %s", msg->sender);
127 int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
128 if(!(router && class)) return -1;
130 transport_message* msg;
131 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleIncoming()");
133 if( (msg = client_recv( class->connection, 0 )) ) {
137 /* if the client is not from a trusted domain, drop the message */
138 int len = strlen(msg->sender) + 1;
141 jid_get_domain( msg->sender, domain, len - 1 );
143 if(osrfStringArrayContains( router->trustedClients, domain)) {
145 transport_message* bouncedMessage = NULL;
146 if( msg->is_error ) {
148 /* handle bounced message */
149 if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) )
150 return -1; /* we have no one to send the requested message to */
153 msg = bouncedMessage;
155 osrfRouterClassHandleMessage( router, class, msg );
158 osrfLogWarning( OSRF_LOG_MARK, "Received client message from untrusted client domain %s", domain );
171 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
172 if(!(router && msg)) return -1;
174 if( !msg->router_command || !strcmp(msg->router_command,""))
175 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
177 if(!msg->router_class) return -1;
179 osrfRouterClass* class = NULL;
180 if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
181 class = osrfRouterFindClass( router, msg->router_class );
183 osrfLogInfo( OSRF_LOG_MARK, "Registering class %s", msg->router_class );
185 if(!class) class = osrfRouterAddClass( router, msg->router_class );
189 if( osrfRouterClassFindNode( class, msg->sender ) )
192 osrfRouterClassAddNode( class, msg->sender );
196 } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
198 if( msg->router_class && strcmp( msg->router_class, "") ) {
199 osrfLogInfo( OSRF_LOG_MARK, "Unregistering router class %s", msg->router_class );
200 osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
209 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
210 if(!(router && router->classes && classname)) return NULL;
212 osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
213 class->nodes = osrfNewHash();
214 class->itr = osrfNewHashIterator(class->nodes);
215 class->nodes->freeItem = &osrfRouterNodeFree;
216 class->router = router;
218 class->connection = client_init( router->domain, router->port, NULL, 0 );
220 if(!client_connect( class->connection, router->name,
221 router->password, classname, 10, AUTH_DIGEST ) ) {
222 osrfRouterClassFree( classname, class );
226 osrfHashSet( router->classes, class, classname );
231 int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
232 if(!(rclass && rclass->nodes && remoteId)) return -1;
234 osrfLogInfo( OSRF_LOG_MARK, "Adding router node for remote id %s", remoteId );
236 osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
238 node->lastMessage = NULL;
239 node->remoteId = strdup(remoteId);
241 osrfHashSet( rclass->nodes, node, remoteId );
245 /* copy off the lastMessage, remove the offending node, send error if it's tht last node
246 ? return NULL if it's the last node ?
249 transport_message* osrfRouterClassHandleBounce(
250 osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
252 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
254 osrfLogInfo( OSRF_LOG_MARK, "Received network layer error message from %s", msg->sender );
255 osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
256 transport_message* lastSent = NULL;
258 if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
260 if( node->lastMessage ) {
261 osrfLogWarning( OSRF_LOG_MARK, "We lost the last node in the class, responding with error and removing...");
263 transport_message* error = message_init(
264 node->lastMessage->body, node->lastMessage->subject,
265 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
266 set_msg_error( error, "cancel", 501 );
268 /* send the error message back to the original sender */
269 client_send_message( rclass->connection, error );
270 message_free( error );
277 if( node->lastMessage ) {
278 osrfLogDebug( OSRF_LOG_MARK, "Cloning lastMessage so next node can send it");
279 lastSent = message_init( node->lastMessage->body,
280 node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
281 message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
285 /* remove the dead node */
286 osrfRouterClassRemoveNode( router, classname, msg->sender);
292 If we get a regular message, we send it to the next node in the list of nodes
293 if we get an error, it's a bounce back from a previous attempt. We take the
294 body and thread from the last sent on the node that had the bounced message
295 and propogate them on to the new message being sent
297 int osrfRouterClassHandleMessage(
298 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
299 if(!(router && rclass && msg)) return -1;
301 osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
303 osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
305 osrfHashIteratorReset(rclass->itr);
306 node = osrfHashIteratorNext( rclass->itr );
311 transport_message* new_msg= message_init( msg->body,
312 msg->subject, msg->thread, node->remoteId, msg->sender );
313 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
315 osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
316 new_msg->router_from, new_msg->recipient );
318 message_free( node->lastMessage );
319 node->lastMessage = new_msg;
321 if ( client_send_message( rclass->connection, new_msg ) == 0 )
325 message_prepare_xml(new_msg);
326 osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s",
327 new_msg->sender, new_msg->recipient, new_msg->msg_xml );
336 int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
337 if(!(router && router->classes && classname)) return -1;
338 osrfLogInfo( OSRF_LOG_MARK, "Removing router class %s", classname );
339 osrfHashRemove( router->classes, classname );
344 int osrfRouterClassRemoveNode(
345 osrfRouter* router, char* classname, char* remoteId ) {
347 if(!(router && router->classes && classname && remoteId)) return 0;
349 osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
351 osrfRouterClass* class = osrfRouterFindClass( router, classname );
355 osrfHashRemove( class->nodes, remoteId );
356 if( osrfHashGetCount(class->nodes) == 0 ) {
357 osrfRouterRemoveClass( router, classname );
368 void osrfRouterClassFree( char* classname, void* c ) {
369 if(!(classname && c)) return;
370 osrfRouterClass* rclass = (osrfRouterClass*) c;
371 client_disconnect( rclass->connection );
372 client_free( rclass->connection );
374 osrfHashIteratorReset( rclass->itr );
375 osrfRouterNode* node;
377 while( (node = osrfHashIteratorNext(rclass->itr)) )
378 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
384 void osrfRouterNodeFree( char* remoteId, void* n ) {
386 osrfRouterNode* node = (osrfRouterNode*) n;
387 free(node->remoteId);
388 message_free(node->lastMessage);
393 void osrfRouterFree( osrfRouter* router ) {
396 free(router->domain);
398 free(router->resource);
399 free(router->password);
401 osrfStringArrayFree( router->trustedClients );
402 osrfStringArrayFree( router->trustedServers );
404 client_free( router->connection );
410 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
411 if(!( router && router->classes && classname )) return NULL;
412 return (osrfRouterClass*) osrfHashGet( router->classes, classname );
416 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
417 if(!(rclass && remoteId)) return NULL;
418 return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
422 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
423 if(!(router && router->classes && set)) return -1;
426 int maxfd = router->ROUTER_SOCKFD;
431 osrfRouterClass* class = NULL;
432 osrfHashIterator* itr = osrfNewHashIterator(router->classes);
434 while( (class = osrfHashIteratorNext(itr)) ) {
435 char* classname = itr->current;
437 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
438 sockid = class->ROUTER_SOCKFD;
440 if( osrfUtilsCheckFileDescriptor( sockid ) ) {
441 osrfRouterRemoveClass( router, classname );
444 if( sockid > maxfd ) maxfd = sockid;
450 osrfHashIteratorFree(itr);
456 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
462 int num_msgs = osrf_message_deserialize( msg->body, arr, T );
463 osrfMessage* omsg = NULL;
466 for( i = 0; i != num_msgs; i++ ) {
468 if( !(omsg = arr[i]) ) continue;
470 switch( omsg->m_type ) {
473 osrfRouterRespondConnect( router, msg, omsg );
477 osrfRouterProcessAppRequest( router, msg, omsg );
483 osrfMessageFree( omsg );
489 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
490 if(!(router && msg && omsg)) return -1;
492 osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
494 osrfLogDebug( OSRF_LOG_MARK, "router recevied a CONNECT message from %s", msg->sender );
496 osrf_message_set_status_info(
497 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
499 char* data = osrf_message_serialize(success);
501 transport_message* return_m = message_init(
502 data, "", msg->thread, msg->sender, "" );
504 client_send_message(router->connection, return_m);
507 osrf_message_free(success);
508 message_free(return_m);
515 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
517 if(!(router && msg && omsg && omsg->method_name)) return -1;
519 osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
521 jsonObject* jresponse = NULL;
522 if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
525 jresponse = jsonParseString("[]");
527 osrfStringArray* keys = osrfHashKeys( router->classes );
528 for( i = 0; i != keys->size; i++ )
529 jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
530 osrfStringArrayFree(keys);
535 return osrfRouterHandleMethodNFound( router, msg, omsg );
539 osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
540 jsonObjectFree(jresponse);
548 int osrfRouterHandleMethodNFound(
549 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
551 osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
552 osrf_message_set_status_info( err,
553 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
555 char* data = osrf_message_serialize(err);
557 transport_message* tresponse = message_init(
558 data, "", msg->thread, msg->sender, msg->recipient );
560 client_send_message(router->connection, tresponse );
563 osrf_message_free( err );
564 message_free(tresponse);
570 int osrfRouterHandleAppResponse( osrfRouter* router,
571 transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
573 if( response ) { /* send the response message */
575 osrfMessage* oresponse = osrf_message_init(
576 RESULT, omsg->thread_trace, omsg->protocol );
578 char* json = jsonObjectToJSON(response);
579 osrf_message_set_result_content( oresponse, json);
581 char* data = osrf_message_serialize(oresponse);
582 osrfLogDebug( OSRF_LOG_MARK, "Responding to client app request with data: \n%s\n", data );
584 transport_message* tresponse = message_init(
585 data, "", msg->thread, msg->sender, msg->recipient );
587 client_send_message(router->connection, tresponse );
589 osrfMessageFree(oresponse);
590 message_free(tresponse);
596 /* now send the 'request complete' message */
597 osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
598 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
600 char* statusdata = osrf_message_serialize(status);
602 transport_message* sresponse = message_init(
603 statusdata, "", msg->thread, msg->sender, msg->recipient );
604 client_send_message(router->connection, sresponse );
608 osrfMessageFree(status);
609 message_free(sresponse);