moved C code to a unified logging framework which currently supports syslogging and...
[OpenSRF.git] / src / router / osrf_router.c
1 #include "osrf_router.h"
2
3 #define ROUTER_SOCKFD connection->session->sock_id
4 #define ROUTER_REGISTER "register"
5 #define ROUTER_UNREGISTER "unregister"
6
7
8 #define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
9
10 osrfRouter* osrfNewRouter( 
11                 char* domain, char* name, 
12                 char* resource, char* password, int port, 
13                 osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
14
15         if(!( domain && name && resource && password && port && trustedClients && trustedServers )) return NULL;
16
17         osrfRouter* router      = safe_malloc(sizeof(osrfRouter));
18         router->domain                  = strdup(domain);
19         router->name                    = strdup(name);
20         router->password                = strdup(password);
21         router->resource                = strdup(resource);
22         router->port                    = port;
23
24         router->trustedClients = trustedClients;
25         router->trustedServers = trustedServers;
26
27         
28         router->classes = osrfNewHash(); 
29         router->classes->freeItem = &osrfRouterClassFree;
30
31         router->connection = client_init( domain, port, NULL, 0 );
32
33         return router;
34 }
35
36
37
38 int osrfRouterConnect( osrfRouter* router ) {
39         if(!router) return -1;
40         int ret = client_connect( router->connection, router->name, 
41                         router->password, router->resource, 10, AUTH_DIGEST );
42         if( ret == 0 ) return -1;
43         return 0;
44 }
45
46
47 void osrfRouterRun( osrfRouter* router ) {
48         if(!(router && router->classes)) return;
49
50         int routerfd = router->ROUTER_SOCKFD;
51         int selectret = 0;
52
53         while(1) {
54
55                 fd_set set;
56                 int maxfd = __osrfRouterFillFDSet( router, &set );
57                 int numhandled = 0;
58
59                 if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
60                         osrfLogWarning("Top level select call failed with errno %d", errno);
61                         continue;
62                 }
63
64                 /* see if there is a top level router message */
65
66                 if( FD_ISSET(routerfd, &set) ) {
67                         osrfLogDebug("Top router socket is active: %d", routerfd );
68                         numhandled++;
69                         osrfRouterHandleIncoming( router );
70                 }
71
72
73                 /* now check each of the connected classes and see if they have data to route */
74                 while( numhandled < selectret ) {
75
76                         osrfRouterClass* class;
77                         osrfHashIterator* itr = osrfNewHashIterator(router->classes);
78
79                         while( (class = osrfHashIteratorNext(itr)) ) {
80
81                                 char* classname = itr->current;
82
83                                 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
84
85                                         osrfLogDebug("Checking %s for activity...", classname );
86
87                                         int sockfd = class->ROUTER_SOCKFD;
88                                         if(FD_ISSET( sockfd, &set )) {
89                                                 osrfLogDebug("Socket is active: %d", sockfd );
90                                                 numhandled++;
91                                                 osrfRouterClassHandleIncoming( router, classname, class );
92                                         }
93                                 }
94                         }
95
96                         osrfHashIteratorFree(itr);
97                 }
98         }
99 }
100
101
102 void osrfRouterHandleIncoming( osrfRouter* router ) {
103         if(!router) return;
104
105         transport_message* msg = NULL;
106
107         if( (msg = client_recv( router->connection, 0 )) ) { 
108
109                 if( msg->sender ) {
110
111                         /* if the sender is not a trusted server, drop the message */
112                         int len = strlen(msg->sender) + 1;
113                         char domain[len];
114                         bzero(domain, len);
115                         jid_get_domain( msg->sender, domain, len - 1 );
116
117                         if(osrfStringArrayContains( router->trustedServers, domain)) 
118                                 osrfRouterHandleMessage( router, msg );
119                          else 
120                                 osrfLogWarning("Received message from un-trusted server domain %s", msg->sender);
121                 }
122
123                 message_free(msg);
124         }
125 }
126
127 int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
128         if(!(router && class)) return -1;
129
130         transport_message* msg;
131         osrfLogDebug("osrfRouterClassHandleIncoming()");
132
133         if( (msg = client_recv( class->connection, 0 )) ) {
134
135                 if( msg->sender ) {
136
137                         /* if the client is not from a trusted domain, drop the message */
138                         int len = strlen(msg->sender) + 1;
139                         char domain[len];
140                         bzero(domain, len);
141                         jid_get_domain( msg->sender, domain, len - 1 );
142
143                         if(osrfStringArrayContains( router->trustedClients, domain)) {
144
145                                 transport_message* bouncedMessage = NULL;
146                                 if( msg->is_error )  {
147
148                                         /* handle bounced message */
149                                         if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) ) 
150                                                 return -1; /* we have no one to send the requested message to */
151
152                                         message_free( msg );
153                                         msg = bouncedMessage;
154                                 }
155                                 osrfRouterClassHandleMessage( router, class, msg );
156
157                         } else {
158                                 osrfLogWarning("Received client message from untrusted client domain %s", domain );
159                         }
160                 }
161
162                 message_free( msg );
163         }
164
165         return 0;
166 }
167
168
169
170
171 int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
172         if(!(router && msg)) return -1;
173
174         if( !msg->router_command || !strcmp(msg->router_command,"")) 
175                 return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
176
177         if(!msg->router_class) return -1;
178
179         osrfRouterClass* class = NULL;
180         if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
181                 class = osrfRouterFindClass( router, msg->router_class );
182
183                 osrfLogInfo("Registering class %s", msg->router_class );
184
185                 if(!class) class = osrfRouterAddClass( router, msg->router_class );
186
187                 if(class) { 
188
189                         if( osrfRouterClassFindNode( class, msg->sender ) )
190                                 return 0;
191                         else 
192                                 osrfRouterClassAddNode( class, msg->sender );
193
194                 } 
195
196         } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
197
198                 if( msg->router_class && strcmp( msg->router_class, "") ) {
199                         osrfLogInfo("Unregistering router class %s", msg->router_class );
200                         osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
201                 }
202         }
203
204         return 0;
205 }
206
207
208
209 osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
210         if(!(router && router->classes && classname)) return NULL;
211
212         osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
213         class->nodes = osrfNewHash();
214         class->itr = osrfNewHashIterator(class->nodes);
215         class->nodes->freeItem = &osrfRouterNodeFree;
216         class->router   = router;
217
218         class->connection = client_init( router->domain, router->port, NULL, 0 );
219
220         if(!client_connect( class->connection, router->name, 
221                         router->password, classname, 10, AUTH_DIGEST ) ) {
222                 osrfRouterClassFree( classname, class );
223                 return NULL;
224         }
225         
226         osrfHashSet( router->classes, class, classname );
227         return class;
228 }
229
230
231 int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
232         if(!(rclass && rclass->nodes && remoteId)) return -1;
233
234         osrfLogInfo("Adding router node for remote id %s", remoteId );
235
236         osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
237         node->count = 0;
238         node->lastMessage = NULL;
239         node->remoteId = strdup(remoteId);
240
241         osrfHashSet( rclass->nodes, node, remoteId );
242         return 0;
243 }
244
245 /* copy off the lastMessage, remove the offending node, send error if it's tht last node 
246         ? return NULL if it's the last node ?
247  */
248
249 transport_message* osrfRouterClassHandleBounce( 
250                 osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
251
252         osrfLogDebug("osrfRouterClassHandleBounce()");
253
254         osrfLogWarning("Received network layer error message from %s", msg->sender );
255         osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
256         transport_message* lastSent = NULL;
257
258         if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
259
260                 if( node->lastMessage ) {
261                         osrfLogWarning("We lost the last node in the class, responding with error and removing...");
262         
263                         transport_message* error = message_init( 
264                                 node->lastMessage->body, node->lastMessage->subject, 
265                                 node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
266                         set_msg_error( error, "cancel", 501 );
267         
268                         /* send the error message back to the original sender */
269                         client_send_message( rclass->connection, error );
270                         message_free( error );
271                 }
272         
273                 return NULL;
274         
275         } else { 
276
277                 if( node->lastMessage ) {
278                         osrfLogDebug("Cloning lastMessage so next node can send it");
279                         lastSent = message_init( node->lastMessage->body,
280                                 node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
281                         message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
282                 }
283         }
284
285         /* remove the dead node */
286         osrfRouterClassRemoveNode( router, classname, msg->sender);
287         return lastSent;
288 }
289
290
291 /**
292   If we get a regular message, we send it to the next node in the list of nodes
293   if we get an error, it's a bounce back from a previous attempt.  We take the
294   body and thread from the last sent on the node that had the bounced message
295   and propogate them on to the new message being sent
296   */
297 int osrfRouterClassHandleMessage( 
298                 osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
299         if(!(router && rclass && msg)) return -1;
300
301         osrfLogDebug("osrfRouterClassHandleMessage()");
302
303         osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
304         if(!node) {
305                 osrfHashIteratorReset(rclass->itr);
306                 node = osrfHashIteratorNext( rclass->itr );
307         }
308
309         if(node) {
310
311                 transport_message* new_msg= message_init(       msg->body, 
312                                 msg->subject, msg->thread, node->remoteId, msg->sender );
313                 message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
314
315                 osrfLogInfo( "Routing message:\nfrom: [%s]\nto: [%s]", 
316                                 new_msg->router_from, new_msg->recipient );
317
318                 message_free( node->lastMessage );
319                 node->lastMessage = new_msg;
320
321                 if ( client_send_message( rclass->connection, new_msg ) == 0 ) 
322                         node->count++;
323
324                 else {
325                         message_prepare_xml(new_msg);
326                         osrfLogWarning("Error sending message from %s to %s\n%s", 
327                                         new_msg->sender, new_msg->recipient, new_msg->msg_xml );
328                 }
329
330         } 
331
332         return 0;
333 }
334
335
336 int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
337         if(!(router && router->classes && classname)) return -1;
338         osrfLogInfo("Removing router class %s", classname );
339         osrfHashRemove( router->classes, classname );
340         return 0;
341 }
342
343
344 int osrfRouterClassRemoveNode( 
345                 osrfRouter* router, char* classname, char* remoteId ) {
346
347         if(!(router && router->classes && classname && remoteId)) return 0;
348
349         osrfLogInfo("Removing router node %s", remoteId );
350
351         osrfRouterClass* class = osrfRouterFindClass( router, classname );
352
353         if( class ) {
354
355                 osrfHashRemove( class->nodes, remoteId );
356                 if( osrfHashGetCount(class->nodes) == 0 ) {
357                         osrfRouterRemoveClass( router, classname );
358                         return 1;
359                 }
360
361                 return 0;
362         }
363
364         return -1;
365 }
366
367
368 void osrfRouterClassFree( char* classname, void* c ) {
369         if(!(classname && c)) return;
370         osrfRouterClass* rclass = (osrfRouterClass*) c;
371         client_disconnect( rclass->connection );        
372         client_free( rclass->connection );      
373
374         osrfHashIteratorReset( rclass->itr );
375         osrfRouterNode* node;
376
377         while( (node = osrfHashIteratorNext(rclass->itr)) ) 
378                 osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
379
380         free(rclass);
381 }
382
383
384 void osrfRouterNodeFree( char* remoteId, void* n ) {
385         if(!n) return;
386         osrfRouterNode* node = (osrfRouterNode*) n;
387         free(node->remoteId);
388         message_free(node->lastMessage);
389         free(node);
390 }
391
392
393 void osrfRouterFree( osrfRouter* router ) {
394         if(!router) return;
395
396         free(router->domain);           
397         free(router->name);
398         free(router->resource);
399         free(router->password);
400
401         osrfStringArrayFree( router->trustedClients );
402         osrfStringArrayFree( router->trustedServers );
403
404         client_free( router->connection );
405         free(router);
406 }
407
408
409
410 osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
411         if(!( router && router->classes && classname )) return NULL;
412         return (osrfRouterClass*) osrfHashGet( router->classes, classname );
413 }
414
415
416 osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
417         if(!(rclass && remoteId))  return NULL;
418         return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
419 }
420
421
422 int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
423         if(!(router && router->classes && set)) return -1;
424
425         FD_ZERO(set);
426         int maxfd = router->ROUTER_SOCKFD;
427         FD_SET(maxfd, set);
428
429         int sockid;
430
431         osrfRouterClass* class = NULL;
432         osrfHashIterator* itr = osrfNewHashIterator(router->classes);
433
434         while( (class = osrfHashIteratorNext(itr)) ) {
435                 char* classname = itr->current;
436
437                 if( classname && (class = osrfRouterFindClass( router, classname )) ) {
438                         sockid = class->ROUTER_SOCKFD;
439         
440                         if( osrfUtilsCheckFileDescriptor( sockid ) ) {
441                                 osrfRouterRemoveClass( router, classname );
442         
443                         } else {
444                                 if( sockid > maxfd ) maxfd = sockid;
445                                 FD_SET(sockid, set);
446                         }
447                 }
448         }
449
450         osrfHashIteratorFree(itr);
451         return maxfd;
452 }
453
454
455
456 int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
457
458         int T = 32;
459         osrfMessage* arr[T];
460         memset(arr, 0, T );
461
462         int num_msgs = osrf_message_deserialize( msg->body, arr, T );
463         osrfMessage* omsg = NULL;
464
465         int i;
466         for( i = 0; i != num_msgs; i++ ) {
467
468                 if( !(omsg = arr[i]) ) continue;
469
470                 switch( omsg->m_type ) {
471
472                         case CONNECT:
473                                 osrfRouterRespondConnect( router, msg, omsg );
474                                 break;
475
476                         case REQUEST:
477                                 osrfRouterProcessAppRequest( router, msg, omsg );
478                                 break;
479
480                         default: break;
481                 }
482
483                 osrfMessageFree( omsg );
484         }
485
486         return 0;
487 }
488
489 int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
490         if(!(router && msg && omsg)) return -1;
491
492         osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
493
494         osrfLogDebug("router recevied a CONNECT message from %s", msg->sender );
495
496         osrf_message_set_status_info( 
497                 success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
498
499         char* data      = osrf_message_serialize(success);
500
501         transport_message* return_m = message_init( 
502                 data, "", msg->thread, msg->sender, "" );
503
504         client_send_message(router->connection, return_m);
505
506         free(data);
507         osrf_message_free(success);
508         message_free(return_m);
509
510         return 0;
511 }
512
513
514
515 int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
516
517         if(!(router && msg && omsg && omsg->method_name)) return -1;
518
519         osrfLogInfo("Router received app request: %s", omsg->method_name );
520
521         jsonObject* jresponse = NULL;
522         if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
523
524                 int i;
525                 jresponse = jsonParseString("[]");
526
527                 osrfStringArray* keys = osrfHashKeys( router->classes );
528                 for( i = 0; i != keys->size; i++ )
529                         jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
530                 osrfStringArrayFree(keys);
531
532
533         } else {
534
535                 return osrfRouterHandleMethodNFound( router, msg, omsg );
536         }
537
538
539         osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
540         jsonObjectFree(jresponse); 
541
542         return 0;
543
544 }
545
546
547
548 int osrfRouterHandleMethodNFound( 
549                 osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
550
551         osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
552                 osrf_message_set_status_info( err, 
553                                 "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
554
555                 char* data =  osrf_message_serialize(err);
556
557                 transport_message* tresponse = message_init(
558                                 data, "", msg->thread, msg->sender, msg->recipient );
559
560                 client_send_message(router->connection, tresponse );
561
562                 free(data);
563                 osrf_message_free( err );
564                 message_free(tresponse);
565                 return 0;
566 }
567
568
569
570 int osrfRouterHandleAppResponse( osrfRouter* router, 
571         transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
572
573         if( response ) { /* send the response message */
574
575                 osrfMessage* oresponse = osrf_message_init(
576                                 RESULT, omsg->thread_trace, omsg->protocol );
577         
578                 char* json = jsonObjectToJSON(response);
579                 osrf_message_set_result_content( oresponse, json);
580         
581                 char* data =  osrf_message_serialize(oresponse);
582                 osrfLogDebug( "Responding to client app request with data: \n%s\n", data );
583
584                 transport_message* tresponse = message_init(
585                                 data, "", msg->thread, msg->sender, msg->recipient );
586         
587                 client_send_message(router->connection, tresponse );
588
589                 osrfMessageFree(oresponse); 
590                 message_free(tresponse);
591                 free(json);
592                 free(data);
593         }
594
595
596         /* now send the 'request complete' message */
597         osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
598         osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
599
600         char* statusdata = osrf_message_serialize(status);
601
602         transport_message* sresponse = message_init(
603                         statusdata, "", msg->thread, msg->sender, msg->recipient );
604         client_send_message(router->connection, sresponse );
605
606
607         free(statusdata);
608         osrfMessageFree(status);
609         message_free(sresponse);
610
611         return 0;
612 }
613
614
615
616