adding router commands to gather some stats about request counts
[OpenSRF.git] / src / router / osrf_router.c
1 #include "osrf_router.h"
2
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
6
7
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
9 #define ROUTER_REQUEST_FULL_STATS "opensrf.router.info.stats.class.node.all"
10 #define ROUTER_REQUEST_CLASS_STATS "opensrf.router.info.stats.class.all"
11
12 osrfRouter* osrfNewRouter( 
13                 char* domain, char* name, 
14                 char* resource, char* password, int port, 
15                 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
16
17         if(!( domain && name && resource && password && port && trustedClients && trustedServers )) return NULL;
18
19         osrfRouter* router      = safe_malloc(sizeof(osrfRouter));
20         router->domain                  = strdup(domain);
21         router->name                    = strdup(name);
22         router->password                = strdup(password);
23         router->resource                = strdup(resource);
24         router->port                    = port;
25
26         router->trustedClients = trustedClients;
27         router->trustedServers = trustedServers;
28
29         
30         router->classes = osrfNewHash(); 
31         router->classes->freeItem = &osrfRouterClassFree;
32
33         router->connection = client_init( domain, port, NULL, 0 );
34
35         return router;
36 }
37
38
39
40 int osrfRouterConnect( osrfRouter* router ) {
41         if(!router) return -1;
42         int ret = client_connect( router->connection, router->name, 
43                         router->password, router->resource, 10, AUTH_DIGEST );
44         if( ret == 0 ) return -1;
45         return 0;
46 }
47
48
49 void osrfRouterRun( osrfRouter* router ) {
50         if(!(router && router->classes)) return;
51
52         int routerfd = router->ROUTER_SOCKFD;
53         int selectret = 0;
54
55         while(1) {
56
57                 fd_set set;
58                 int maxfd = __osrfRouterFillFDSet( router, &set );
59                 int numhandled = 0;
60
61                 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
62                         osrfLogWarning( OSRF_LOG_MARK, "Top level select call failed with errno %d", errno);
63                         continue;
64                 }
65
66                 /* see if there is a top level router message */
67
68                 if( FD_ISSET(routerfd, &set) ) {
69                         osrfLogDebug( OSRF_LOG_MARK, "Top router socket is active: %d", routerfd );
70                         numhandled++;
71                         osrfRouterHandleIncoming( router );
72                 }
73
74
75                 /* now check each of the connected classes and see if they have data to route */
76                 while( numhandled < selectret ) {
77
78                         osrfRouterClass* class;
79                         osrfHashIterator* itr = osrfNewHashIterator(router->classes);
80
81                         while( (class = osrfHashIteratorNext(itr)) ) {
82
83                                 char* classname = itr->current;
84
85                                 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
86
87                                         osrfLogDebug( OSRF_LOG_MARK, "Checking %s for activity...", classname );
88
89                                         int sockfd = class->ROUTER_SOCKFD;
90                                         if(FD_ISSET( sockfd, &set )) {
91                                                 osrfLogDebug( OSRF_LOG_MARK, "Socket is active: %d", sockfd );
92                                                 numhandled++;
93                                                 osrfRouterClassHandleIncoming( router, classname, class );
94                                         }
95                                 }
96                         }
97
98                         osrfHashIteratorFree(itr);
99                 }
100         }
101 }
102
103
104 void osrfRouterHandleIncoming( osrfRouter* router ) {
105         if(!router) return;
106
107         transport_message* msg = NULL;
108
109         //if( (msg = client_recv( router->connection, 0 )) ) { 
110         while( (msg = client_recv( router->connection, 0 )) ) { 
111
112                 if( msg->sender ) {
113
114                         osrfLogDebug(OSRF_LOG_MARK, 
115                                 "osrfRouterHandleIncoming(): investigating message from %s", msg->sender);
116
117                         /* if the sender is not a trusted server, drop the message */
118                         int len = strlen(msg->sender) + 1;
119                         char domain[len];
120                         bzero(domain, len);
121                         jid_get_domain( msg->sender, domain, len - 1 );
122
123                         if(osrfStringArrayContains( router->trustedServers, domain)) 
124                                 osrfRouterHandleMessage( router, msg );
125                          else 
126                                 osrfLogWarning( OSRF_LOG_MARK, "Received message from un-trusted server domain %s", msg->sender);
127                 }
128
129                 message_free(msg);
130         }
131 }
132
133 int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
134         if(!(router && class)) return -1;
135
136         transport_message* msg;
137         osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleIncoming()");
138
139         while( (msg = client_recv( class->connection, 0 )) ) {
140
141       osrfLogSetXid(msg->osrf_xid);
142
143                 if( msg->sender ) {
144
145                         osrfLogDebug(OSRF_LOG_MARK, 
146                                 "osrfRouterClassHandleIncoming(): investigating message from %s", msg->sender);
147
148                         /* if the client is not from a trusted domain, drop the message */
149                         int len = strlen(msg->sender) + 1;
150                         char domain[len];
151                         bzero(domain, len);
152                         jid_get_domain( msg->sender, domain, len - 1 );
153
154                         if(osrfStringArrayContains( router->trustedClients, domain)) {
155
156                                 transport_message* bouncedMessage = NULL;
157                                 if( msg->is_error )  {
158
159                                         /* handle bounced message */
160                                         if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) ) 
161                                                 return -1; /* we have no one to send the requested message to */
162
163                                         message_free( msg );
164                                         msg = bouncedMessage;
165                                 }
166                                 osrfRouterClassHandleMessage( router, class, msg );
167
168                         } else {
169                                 osrfLogWarning( OSRF_LOG_MARK, "Received client message from untrusted client domain %s", domain );
170                         }
171                 }
172
173       osrfLogClearXid();
174                 message_free( msg );
175         }
176
177         return 0;
178 }
179
180
181
182
183 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
184         if(!(router && msg)) return -1;
185
186         if( !msg->router_command || !strcmp(msg->router_command,"")) 
187                 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
188
189         if(!msg->router_class) return -1;
190
191         osrfRouterClass* class = NULL;
192         if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
193                 class = osrfRouterFindClass( router, msg->router_class );
194
195                 osrfLogInfo( OSRF_LOG_MARK, "Registering class %s", msg->router_class );
196
197                 if(!class) class = osrfRouterAddClass( router, msg->router_class );
198
199                 if(class) { 
200
201                         if( osrfRouterClassFindNode( class, msg->sender ) )
202                                 return 0;
203                         else 
204                                 osrfRouterClassAddNode( class, msg->sender );
205
206                 } 
207
208         } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
209
210                 if( msg->router_class && strcmp( msg->router_class, "") ) {
211                         osrfLogInfo( OSRF_LOG_MARK, "Unregistering router class %s", msg->router_class );
212                         osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
213                 }
214         }
215
216         return 0;
217 }
218
219
220
221 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
222         if(!(router && router->classes && classname)) return NULL;
223
224         osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
225         class->nodes = osrfNewHash();
226         class->itr = osrfNewHashIterator(class->nodes);
227         class->nodes->freeItem = &osrfRouterNodeFree;
228         class->router   = router;
229
230         class->connection = client_init( router->domain, router->port, NULL, 0 );
231
232         if(!client_connect( class->connection, router->name, 
233                         router->password, classname, 10, AUTH_DIGEST ) ) {
234                 osrfRouterClassFree( classname, class );
235                 return NULL;
236         }
237         
238         osrfHashSet( router->classes, class, classname );
239         return class;
240 }
241
242
243 int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
244         if(!(rclass && rclass->nodes && remoteId)) return -1;
245
246         osrfLogInfo( OSRF_LOG_MARK, "Adding router node for remote id %s", remoteId );
247
248         osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
249         node->count = 0;
250         node->lastMessage = NULL;
251         node->remoteId = strdup(remoteId);
252
253         osrfHashSet( rclass->nodes, node, remoteId );
254         return 0;
255 }
256
257 /* copy off the lastMessage, remove the offending node, send error if it's tht last node 
258         ? return NULL if it's the last node ?
259  */
260
261 transport_message* osrfRouterClassHandleBounce( 
262                 osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
263
264         osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleBounce()");
265
266         osrfLogInfo( OSRF_LOG_MARK, "Received network layer error message from %s", msg->sender );
267         osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
268         transport_message* lastSent = NULL;
269
270         if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
271
272                 if( node->lastMessage ) {
273                         osrfLogWarning( OSRF_LOG_MARK, "We lost the last node in the class, responding with error and removing...");
274         
275                         transport_message* error = message_init( 
276                                 node->lastMessage->body, node->lastMessage->subject, 
277                                 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
278          message_set_osrf_xid(error, node->lastMessage->osrf_xid);
279                         set_msg_error( error, "cancel", 501 );
280         
281                         /* send the error message back to the original sender */
282                         client_send_message( rclass->connection, error );
283                         message_free( error );
284                 }
285         
286                 return NULL;
287         
288         } else { 
289
290                 if( node ) {
291                         if( node->lastMessage ) {
292                                 osrfLogDebug( OSRF_LOG_MARK, "Cloning lastMessage so next node can send it");
293                                 lastSent = message_init( node->lastMessage->body,
294                                         node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
295                                 message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
296             message_set_osrf_xid( lastSent, node->lastMessage->osrf_xid );
297                         }
298                 } else {
299
300                         osrfLogInfo(OSRF_LOG_MARK, "network error occurred after we removed the class.. ignoring");
301                         return NULL;
302                 }
303         }
304
305         /* remove the dead node */
306         osrfRouterClassRemoveNode( router, classname, msg->sender);
307         return lastSent;
308 }
309
310
311 /**
312   If we get a regular message, we send it to the next node in the list of nodes
313   if we get an error, it's a bounce back from a previous attempt.  We take the
314   body and thread from the last sent on the node that had the bounced message
315   and propogate them on to the new message being sent
316   */
317 int osrfRouterClassHandleMessage( 
318                 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
319         if(!(router && rclass && msg)) return -1;
320
321         osrfLogDebug( OSRF_LOG_MARK, "osrfRouterClassHandleMessage()");
322
323         osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
324         if(!node) {
325                 osrfHashIteratorReset(rclass->itr);
326                 node = osrfHashIteratorNext( rclass->itr );
327         }
328
329         if(node) {
330
331                 transport_message* new_msg= message_init(       msg->body, 
332                                 msg->subject, msg->thread, node->remoteId, msg->sender );
333                 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
334       message_set_osrf_xid( new_msg, msg->osrf_xid );
335
336                 osrfLogInfo( OSRF_LOG_MARK,  "Routing message:\nfrom: [%s]\nto: [%s]", 
337                                 new_msg->router_from, new_msg->recipient );
338
339                 message_free( node->lastMessage );
340                 node->lastMessage = new_msg;
341
342                 if ( client_send_message( rclass->connection, new_msg ) == 0 ) 
343                         node->count++;
344
345                 else {
346                         message_prepare_xml(new_msg);
347                         osrfLogWarning( OSRF_LOG_MARK, "Error sending message from %s to %s\n%s", 
348                                         new_msg->sender, new_msg->recipient, new_msg->msg_xml );
349                 }
350
351         } 
352
353         return 0;
354 }
355
356
357 int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
358         if(!(router && router->classes && classname)) return -1;
359         osrfLogInfo( OSRF_LOG_MARK, "Removing router class %s", classname );
360         osrfHashRemove( router->classes, classname );
361         return 0;
362 }
363
364
365 int osrfRouterClassRemoveNode( 
366                 osrfRouter* router, char* classname, char* remoteId ) {
367
368         if(!(router && router->classes && classname && remoteId)) return 0;
369
370         osrfLogInfo( OSRF_LOG_MARK, "Removing router node %s", remoteId );
371
372         osrfRouterClass* class = osrfRouterFindClass( router, classname );
373
374         if( class ) {
375
376                 osrfHashRemove( class->nodes, remoteId );
377                 if( osrfHashGetCount(class->nodes) == 0 ) {
378                         osrfRouterRemoveClass( router, classname );
379                         return 1;
380                 }
381
382                 return 0;
383         }
384
385         return -1;
386 }
387
388
389 void osrfRouterClassFree( char* classname, void* c ) {
390         if(!(classname && c)) return;
391         osrfRouterClass* rclass = (osrfRouterClass*) c;
392         client_disconnect( rclass->connection );        
393         client_free( rclass->connection );      
394
395         osrfHashIteratorReset( rclass->itr );
396         osrfRouterNode* node;
397
398         while( (node = osrfHashIteratorNext(rclass->itr)) ) 
399                 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
400
401    osrfHashIteratorFree(rclass->itr);
402    osrfHashFree(rclass->nodes);
403
404         free(rclass);
405 }
406
407
408 void osrfRouterNodeFree( char* remoteId, void* n ) {
409         if(!n) return;
410         osrfRouterNode* node = (osrfRouterNode*) n;
411         free(node->remoteId);
412         message_free(node->lastMessage);
413         free(node);
414 }
415
416
417 void osrfRouterFree( osrfRouter* router ) {
418         if(!router) return;
419
420         free(router->domain);           
421         free(router->name);
422         free(router->resource);
423         free(router->password);
424
425         osrfStringArrayFree( router->trustedClients );
426         osrfStringArrayFree( router->trustedServers );
427
428         client_free( router->connection );
429         free(router);
430 }
431
432
433
434 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
435         if(!( router && router->classes && classname )) return NULL;
436         return (osrfRouterClass*) osrfHashGet( router->classes, classname );
437 }
438
439
440 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
441         if(!(rclass && remoteId))  return NULL;
442         return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
443 }
444
445
446 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
447         if(!(router && router->classes && set)) return -1;
448
449         FD_ZERO(set);
450         int maxfd = router->ROUTER_SOCKFD;
451         FD_SET(maxfd, set);
452
453         int sockid;
454
455         osrfRouterClass* class = NULL;
456         osrfHashIterator* itr = osrfNewHashIterator(router->classes);
457
458         while( (class = osrfHashIteratorNext(itr)) ) {
459                 char* classname = itr->current;
460
461                 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
462                         sockid = class->ROUTER_SOCKFD;
463         
464                         if( osrfUtilsCheckFileDescriptor( sockid ) ) {
465
466                                 osrfLogWarning(OSRF_LOG_MARK, 
467                                         "Removing router class '%s' because of a bad top-level file descriptor [%d]", classname, sockid);
468                                 osrfRouterRemoveClass( router, classname );
469         
470                         } else {
471                                 if( sockid > maxfd ) maxfd = sockid;
472                                 FD_SET(sockid, set);
473                         }
474                 }
475         }
476
477         osrfHashIteratorFree(itr);
478         return maxfd;
479 }
480
481
482
483 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
484
485         int T = 32;
486         osrfMessage* arr[T];
487         memset(arr, 0, T );
488
489         int num_msgs = osrf_message_deserialize( msg->body, arr, T );
490         osrfMessage* omsg = NULL;
491
492         int i;
493         for( i = 0; i != num_msgs; i++ ) {
494
495                 if( !(omsg = arr[i]) ) continue;
496
497                 switch( omsg->m_type ) {
498
499                         case CONNECT:
500                                 osrfRouterRespondConnect( router, msg, omsg );
501                                 break;
502
503                         case REQUEST:
504                                 osrfRouterProcessAppRequest( router, msg, omsg );
505                                 break;
506
507                         default: break;
508                 }
509
510                 osrfMessageFree( omsg );
511         }
512
513         return 0;
514 }
515
516 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
517         if(!(router && msg && omsg)) return -1;
518
519         osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
520
521         osrfLogDebug( OSRF_LOG_MARK, "router recevied a CONNECT message from %s", msg->sender );
522
523         osrf_message_set_status_info( 
524                 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
525
526         char* data      = osrf_message_serialize(success);
527
528         transport_message* return_m = message_init( 
529                 data, "", msg->thread, msg->sender, "" );
530
531         client_send_message(router->connection, return_m);
532
533         free(data);
534         osrf_message_free(success);
535         message_free(return_m);
536
537         return 0;
538 }
539
540
541
542 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
543
544         if(!(router && msg && omsg && omsg->method_name)) return -1;
545
546         osrfLogInfo( OSRF_LOG_MARK, "Router received app request: %s", omsg->method_name );
547
548         jsonObject* jresponse = NULL;
549         if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
550
551                 int i;
552                 jresponse = jsonParseString("[]");
553
554                 osrfStringArray* keys = osrfHashKeys( router->classes );
555                 for( i = 0; i != keys->size; i++ )
556                         jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
557                 osrfStringArrayFree(keys);
558
559
560         } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_FULL_STATS )) {
561
562                 osrfRouterClass* class;
563                 osrfRouterNode* node;
564                 jresponse = jsonParseString("{}");
565
566                 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
567                 while( (class = osrfHashIteratorNext(class_itr)) ) {
568
569                         jsonObject* class_res = jsonParseString("{}");
570                         char* classname = class_itr->current;
571
572                         osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
573                         while( (node = osrfHashIteratorNext(node_itr)) ) {
574                                 jsonObjectSetKey( class_res, node->remoteId, jsonNewNumberObject( (double) node->count ) );
575                         }
576                         osrfHashIteratorFree(node_itr);
577
578                         jsonObjectSetKey( jresponse, classname, class_res );
579                 }
580
581                 osrfHashIteratorFree(class_itr);
582
583         } else if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_STATS )) {
584
585                 osrfRouterClass* class;
586                 osrfRouterNode* node;
587                 int count;
588                 jresponse = jsonParseString("{}");
589
590                 osrfHashIterator* class_itr = osrfNewHashIterator(router->classes);
591                 while( (class = osrfHashIteratorNext(class_itr)) ) {
592
593                         count = 0;
594                         char* classname = class_itr->current;
595
596                         osrfHashIterator* node_itr = osrfNewHashIterator(class->nodes);
597                         while( (node = osrfHashIteratorNext(node_itr)) ) {
598                                 count += node->count;
599                         }
600                         osrfHashIteratorFree(node_itr);
601
602                         jsonObjectSetKey( jresponse, classname, jsonNewNumberObject( (double) count ) );
603                 }
604
605                 osrfHashIteratorFree(class_itr);
606
607         } else {
608
609                 return osrfRouterHandleMethodNFound( router, msg, omsg );
610         }
611
612
613         osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
614         jsonObjectFree(jresponse); 
615
616         return 0;
617
618 }
619
620
621
622 int osrfRouterHandleMethodNFound( 
623                 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
624
625         osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
626                 osrf_message_set_status_info( err, 
627                                 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
628
629                 char* data =  osrf_message_serialize(err);
630
631                 transport_message* tresponse = message_init(
632                                 data, "", msg->thread, msg->sender, msg->recipient );
633
634                 client_send_message(router->connection, tresponse );
635
636                 free(data);
637                 osrf_message_free( err );
638                 message_free(tresponse);
639                 return 0;
640 }
641
642
643
644 int osrfRouterHandleAppResponse( osrfRouter* router, 
645         transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
646
647         if( response ) { /* send the response message */
648
649                 osrfMessage* oresponse = osrf_message_init(
650                                 RESULT, omsg->thread_trace, omsg->protocol );
651         
652                 char* json = jsonObjectToJSON(response);
653                 osrf_message_set_result_content( oresponse, json);
654         
655                 char* data =  osrf_message_serialize(oresponse);
656                 osrfLogDebug( OSRF_LOG_MARK,  "Responding to client app request with data: \n%s\n", data );
657
658                 transport_message* tresponse = message_init(
659                                 data, "", msg->thread, msg->sender, msg->recipient );
660         
661                 client_send_message(router->connection, tresponse );
662
663                 osrfMessageFree(oresponse); 
664                 message_free(tresponse);
665                 free(json);
666                 free(data);
667         }
668
669
670         /* now send the 'request complete' message */
671         osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
672         osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
673
674         char* statusdata = osrf_message_serialize(status);
675
676         transport_message* sresponse = message_init(
677                         statusdata, "", msg->thread, msg->sender, msg->recipient );
678         client_send_message(router->connection, sresponse );
679
680
681         free(statusdata);
682         osrfMessageFree(status);
683         message_free(sresponse);
684
685         return 0;
686 }
687
688
689
690