added list and hash code based on libJudy
[OpenSRF.git] / src / router / osrf_router.c
1 #include "osrf_router.h"
2
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
6
7
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
9
10 osrfRouter* osrfNewRouter( 
11                 char* domain, char* name, 
12                 char* resource, char* password, int port, 
13                 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
14
15         if(!( domain && name && resource && password && port )) return NULL;
16
17         osrfRouter* router      = safe_malloc(sizeof(osrfRouter));
18         router->domain                  = strdup(domain);
19         router->name                    = strdup(name);
20         router->password                = strdup(password);
21         router->resource                = strdup(resource);
22         router->port                    = port;
23
24         router->trustedClients = trustedClients;
25         router->trustedServers = trustedServers;
26
27         router->classes = osrfNewHash(); 
28         router->classes->freeItem = &osrfRouterClassFree;
29
30         router->connection = client_init( domain, port, NULL, 0 );
31
32         return router;
33 }
34
35
36
37 int osrfRouterConnect( osrfRouter* router ) {
38         if(!router) return -1;
39         int ret = client_connect( router->connection, router->name, 
40                         router->password, router->resource, 10, AUTH_DIGEST );
41         if( ret == 0 ) return -1;
42         return 0;
43 }
44
45
46 void osrfRouterRun( osrfRouter* router ) {
47         if(!(router && router->classes)) return;
48
49         int routerfd = router->ROUTER_SOCKFD;
50         int selectret = 0;
51
52         while(1) {
53
54                 fd_set set;
55                 int maxfd = __osrfRouterFillFDSet( router, &set );
56                 int numhandled = 0;
57
58                 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
59                         warning_handler("Top level select call failed with errno %d", errno);
60                         continue;
61                 }
62
63                 /* see if there is a top level router message */
64
65                 if( FD_ISSET(routerfd, &set) ) {
66                         debug_handler("Top router socket is active: %d", routerfd );
67                         numhandled++;
68                         osrfRouterHandleIncoming( router );
69                 }
70
71
72                 /* now check each of the connected classes and see if they have data to route */
73                 while( numhandled < selectret ) {
74
75                         osrfRouterClass* class;
76                         osrfHashIterator* itr = osrfNewHashIterator(router->classes);
77
78                         while( (class = osrfHashIteratorNext(itr)) ) {
79
80                                 char* classname = itr->current;
81
82                                 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
83
84                                         debug_handler("Checking %s for activity...", classname );
85
86                                         int sockfd = class->ROUTER_SOCKFD;
87                                         if(FD_ISSET( sockfd, &set )) {
88                                                 debug_handler("Socket is active: %d", sockfd );
89                                                 numhandled++;
90                                                 osrfRouterClassHandleIncoming( router, classname, class );
91                                         }
92                                 }
93                         }
94
95                         osrfHashIteratorFree(itr);
96                 }
97         }
98 }
99
100
101 void osrfRouterHandleIncoming( osrfRouter* router ) {
102         if(!router) return;
103
104         transport_message* msg = NULL;
105
106         if( (msg = client_recv( router->connection, 0 )) ) { 
107
108                 if( msg->sender ) {
109
110                         /* if the sender is not a trusted server, drop the message */
111                         int len = strlen(msg->sender) + 1;
112                         char domain[len];
113                         bzero(domain, len);
114                         jid_get_domain( msg->sender, domain );
115
116                         if(osrfStringArrayContains( router->trustedServers, domain)) 
117                                 osrfRouterHandleMessage( router, msg );
118                          else 
119                                 warning_handler("Received message from un-trusted server domain %s", msg->sender);
120                 }
121
122                 message_free(msg);
123         }
124 }
125
126 int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
127         if(!(router && class)) return -1;
128
129         transport_message* msg;
130         debug_handler("osrfRouterClassHandleIncoming()");
131
132         if( (msg = client_recv( class->connection, 0 )) ) {
133
134                 if( msg->sender ) {
135
136                         /* if the client is not from a trusted domain, drop the message */
137                         int len = strlen(msg->sender) + 1;
138                         char domain[len];
139                         bzero(domain, len);
140                         jid_get_domain( msg->sender, domain );
141
142                         if(osrfStringArrayContains( router->trustedClients, domain)) {
143
144                                 transport_message* bouncedMessage = NULL;
145                                 if( msg->is_error )  {
146
147                                         /* handle bounced message */
148                                         if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) ) 
149                                                 return -1; /* we have no one to send the requested message to */
150
151                                         message_free( msg );
152                                         msg = bouncedMessage;
153                                 }
154                                 osrfRouterClassHandleMessage( router, class, msg );
155
156                         } else {
157                                 warning_handler("Received client message from untrusted client domain %s", domain );
158                         }
159                 }
160
161                 message_free( msg );
162         }
163
164         return 0;
165 }
166
167
168
169
170 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
171         if(!(router && msg)) return -1;
172
173         if( !msg->router_command || !strcmp(msg->router_command,"")) 
174                 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
175
176         if(!msg->router_class) return -1;
177
178         osrfRouterClass* class = NULL;
179         if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
180                 class = osrfRouterFindClass( router, msg->router_class );
181
182                 info_handler("Registering class %s", msg->router_class );
183
184                 if(!class) class = osrfRouterAddClass( router, msg->router_class );
185
186                 if(class) { 
187
188                         if( osrfRouterClassFindNode( class, msg->sender ) )
189                                 return 0;
190                         else 
191                                 osrfRouterClassAddNode( class, msg->sender );
192
193                 } 
194
195         } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
196
197                 if( msg->router_class && strcmp( msg->router_class, "") ) {
198                         info_handler("Unregistering router class %s", msg->router_class );
199                         osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
200                 }
201         }
202
203         return 0;
204 }
205
206
207
208 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
209         if(!(router && router->classes && classname)) return NULL;
210
211         osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
212         class->nodes = osrfNewHash();
213         class->itr = osrfNewHashIterator(class->nodes);
214         class->nodes->freeItem = &osrfRouterNodeFree;
215         class->router   = router;
216
217         class->connection = client_init( router->domain, router->port, NULL, 0 );
218
219         if(!client_connect( class->connection, router->name, 
220                         router->password, classname, 10, AUTH_DIGEST ) ) {
221                 osrfRouterClassFree( classname, class );
222                 return NULL;
223         }
224         
225         osrfHashSet( router->classes, class, classname );
226         return class;
227 }
228
229
230 int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
231         if(!(rclass && rclass->nodes && remoteId)) return -1;
232
233         info_handler("Adding router node for remote id %s", remoteId );
234
235         osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
236         node->count = 0;
237         node->lastMessage = NULL;
238         node->remoteId = strdup(remoteId);
239
240         osrfHashSet( rclass->nodes, node, remoteId );
241         return 0;
242 }
243
244 /* copy off the lastMessage, remove the offending node, send error if it's tht last node 
245         ? return NULL if it's the last node ?
246  */
247
248 transport_message* osrfRouterClassHandleBounce( 
249                 osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
250
251         debug_handler("osrfRouterClassHandleBounce()");
252
253         warning_handler("Received network layer error message from %s", msg->sender );
254         osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
255         transport_message* lastSent = NULL;
256
257         if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
258
259                 if( node->lastMessage ) {
260                         warning_handler("We lost the last node in the class, responding with error and removing...");
261         
262                         transport_message* error = message_init( 
263                                 node->lastMessage->body, node->lastMessage->subject, 
264                                 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
265                         set_msg_error( error, "cancel", 501 );
266         
267                         /* send the error message back to the original sender */
268                         client_send_message( rclass->connection, error );
269                         message_free( error );
270                 }
271         
272                 return NULL;
273         
274         } else { 
275
276                 if( node->lastMessage ) {
277                         debug_handler("Cloning lastMessage so next node can send it");
278                         lastSent = message_init( node->lastMessage->body,
279                                 node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
280                         message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
281                 }
282         }
283
284         /* remove the dead node */
285         osrfRouterClassRemoveNode( router, classname, msg->sender);
286         return lastSent;
287 }
288
289
290 /**
291   If we get a regular message, we send it to the next node in the list of nodes
292   if we get an error, it's a bounce back from a previous attempt.  We take the
293   body and thread from the last sent on the node that had the bounced message
294   and propogate them on to the new message being sent
295   */
296 int osrfRouterClassHandleMessage( 
297                 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
298         if(!(router && rclass && msg)) return -1;
299
300         debug_handler("osrfRouterClassHandleMessage()");
301
302         osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
303         if(!node) {
304                 osrfHashIteratorReset(rclass->itr);
305                 node = osrfHashIteratorNext( rclass->itr );
306         }
307
308         if(node) {
309
310                 transport_message* new_msg= message_init(       msg->body, 
311                                 msg->subject, msg->thread, node->remoteId, msg->sender );
312                 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
313
314                 info_handler( "Routing message:\nfrom: [%s]\nto: [%s]", 
315                                 new_msg->router_from, new_msg->recipient );
316
317                 message_free( node->lastMessage );
318                 node->lastMessage = new_msg;
319
320                 if ( client_send_message( rclass->connection, new_msg ) == 0 ) 
321                         node->count++;
322
323                 else {
324                         message_prepare_xml(new_msg);
325                         warning_handler("Error sending message from %s to %s\n%s", 
326                                         new_msg->sender, new_msg->recipient, new_msg->msg_xml );
327                 }
328
329         } 
330
331         return 0;
332 }
333
334
335 int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
336         if(!(router && router->classes && classname)) return -1;
337         info_handler("Removing router class %s", classname );
338         osrfHashRemove( router->classes, classname );
339         return 0;
340 }
341
342
343 int osrfRouterClassRemoveNode( 
344                 osrfRouter* router, char* classname, char* remoteId ) {
345
346         if(!(router && router->classes && classname && remoteId)) return 0;
347
348         info_handler("Removing router node %s", remoteId );
349
350         osrfRouterClass* class = osrfRouterFindClass( router, classname );
351
352         if( class ) {
353
354                 osrfHashRemove( class->nodes, remoteId );
355                 if( osrfHashGetCount(class->nodes) == 0 ) {
356                         osrfRouterRemoveClass( router, classname );
357                         return 1;
358                 }
359
360                 return 0;
361         }
362
363         return -1;
364 }
365
366
367 void osrfRouterClassFree( char* classname, void* c ) {
368         if(!(classname && c)) return;
369         osrfRouterClass* rclass = (osrfRouterClass*) c;
370         client_disconnect( rclass->connection );        
371         client_free( rclass->connection );      
372
373         osrfHashIteratorReset( rclass->itr );
374         osrfRouterNode* node;
375
376         while( (node = osrfHashIteratorNext(rclass->itr)) ) 
377                 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
378
379         free(rclass);
380 }
381
382
383 void osrfRouterNodeFree( char* remoteId, void* n ) {
384         if(!n) return;
385         osrfRouterNode* node = (osrfRouterNode*) n;
386         free(node->remoteId);
387         message_free(node->lastMessage);
388         free(node);
389 }
390
391
392 void osrfRouterFree( osrfRouter* router ) {
393         if(!router) return;
394
395         free(router->domain);           
396         free(router->name);
397         free(router->resource);
398         free(router->password);
399
400         osrfStringArrayFree( router->trustedClients );
401         osrfStringArrayFree( router->trustedServers );
402
403         client_free( router->connection );
404         free(router);
405 }
406
407
408
409 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
410         if(!( router && router->classes && classname )) return NULL;
411         return (osrfRouterClass*) osrfHashGet( router->classes, classname );
412 }
413
414
415 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
416         if(!(rclass && remoteId))  return NULL;
417         return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
418 }
419
420
421 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
422         if(!(router && router->classes && set)) return -1;
423
424         FD_ZERO(set);
425         int maxfd = router->ROUTER_SOCKFD;
426         FD_SET(maxfd, set);
427
428         int sockid;
429
430         osrfRouterClass* class = NULL;
431         osrfHashIterator* itr = osrfNewHashIterator(router->classes);
432
433         while( (class = osrfHashIteratorNext(itr)) ) {
434                 char* classname = itr->current;
435
436                 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
437                         sockid = class->ROUTER_SOCKFD;
438         
439                         if( osrfUtilsCheckFileDescriptor( sockid ) ) {
440                                 osrfRouterRemoveClass( router, classname );
441         
442                         } else {
443                                 if( sockid > maxfd ) maxfd = sockid;
444                                 FD_SET(sockid, set);
445                         }
446                 }
447         }
448
449         osrfHashIteratorFree(itr);
450         return maxfd;
451 }
452
453
454
455 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
456
457         int T = 32;
458         osrfMessage* arr[T];
459         memset(arr, 0, T );
460
461         int num_msgs = osrf_message_deserialize( msg->body, arr, T );
462         osrfMessage* omsg = NULL;
463
464         int i;
465         for( i = 0; i != num_msgs; i++ ) {
466
467                 if( !(omsg = arr[i]) ) continue;
468
469                 switch( omsg->m_type ) {
470
471                         case CONNECT:
472                                 osrfRouterRespondConnect( router, msg, omsg );
473                                 break;
474
475                         case REQUEST:
476                                 osrfRouterProcessAppRequest( router, msg, omsg );
477                                 break;
478
479                         default: break;
480                 }
481
482                 osrfMessageFree( omsg );
483         }
484
485         return 0;
486 }
487
488 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
489         if(!(router && msg && omsg)) return -1;
490
491         osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
492
493         debug_handler("router recevied a CONNECT message from %s", msg->sender );
494
495         osrf_message_set_status_info( 
496                 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
497
498         char* data      = osrf_message_serialize(success);
499
500         transport_message* return_m = message_init( 
501                 data, "", msg->thread, msg->sender, "" );
502
503         client_send_message(router->connection, return_m);
504
505         free(data);
506         osrf_message_free(success);
507         message_free(return_m);
508
509         return 0;
510 }
511
512
513
514 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
515
516         if(!(router && msg && omsg && omsg->method_name)) return -1;
517
518         info_handler("Router received app request: %s", omsg->method_name );
519
520         jsonObject* jresponse = NULL;
521         if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
522
523                 int i;
524                 jresponse = jsonParseString("[]");
525
526                 osrfStringArray* keys = osrfHashKeys( router->classes );
527                 for( i = 0; i != keys->size; i++ )
528                         jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
529                 osrfStringArrayFree(keys);
530
531
532         } else {
533
534                 return osrfRouterHandleMethodNFound( router, msg, omsg );
535         }
536
537
538         osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
539         jsonObjectFree(jresponse); 
540
541         return 0;
542
543 }
544
545
546
547 int osrfRouterHandleMethodNFound( 
548                 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
549
550         osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
551                 osrf_message_set_status_info( err, 
552                                 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
553
554                 char* data =  osrf_message_serialize(err);
555
556                 transport_message* tresponse = message_init(
557                                 data, "", msg->thread, msg->sender, msg->recipient );
558
559                 client_send_message(router->connection, tresponse );
560
561                 free(data);
562                 osrf_message_free( err );
563                 message_free(tresponse);
564                 return 0;
565 }
566
567
568
569 int osrfRouterHandleAppResponse( osrfRouter* router, 
570         transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
571
572         if( response ) { /* send the response message */
573
574                 osrfMessage* oresponse = osrf_message_init(
575                                 RESULT, omsg->thread_trace, omsg->protocol );
576         
577                 char* json = jsonObjectToJSON(response);
578                 osrf_message_set_result_content( oresponse, json);
579         
580                 char* data =  osrf_message_serialize(oresponse);
581                 debug_handler( "Responding to client app request with data: \n%s\n", data );
582
583                 transport_message* tresponse = message_init(
584                                 data, "", msg->thread, msg->sender, msg->recipient );
585         
586                 client_send_message(router->connection, tresponse );
587
588                 osrfMessageFree(oresponse); 
589                 message_free(tresponse);
590                 free(json);
591                 free(data);
592         }
593
594
595         /* now send the 'request complete' message */
596         osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
597         osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
598
599         char* statusdata = osrf_message_serialize(status);
600
601         transport_message* sresponse = message_init(
602                         statusdata, "", msg->thread, msg->sender, msg->recipient );
603         client_send_message(router->connection, sresponse );
604
605
606         free(statusdata);
607         osrfMessageFree(status);
608         message_free(sresponse);
609
610         return 0;
611 }
612
613
614
615