f1ae7208b96b1ae098e1100c7edf6f5a85129648
[Evergreen.git] / OpenSRF / src / utils / socket_bundle.c
1 #include "socket_bundle.h"
2
3 /* -------------------------------------------------------------------- 
4         Test Code 
5         -------------------------------------------------------------------- */
6 /*
7 int count = 0;
8 void printme(void* blob, socket_manager* mgr, 
9                 int sock_fd, char* data, int parent_id) {
10
11         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
12                         sock_fd, parent_id, data );
13
14         socket_send(sock_fd, data);
15
16         if(count++ > 2) {
17                 socket_disconnect(mgr, sock_fd);
18                 _socket_print_list(mgr);
19         }
20 }
21
22 int main(int argc, char* argv[]) {
23         socket_manager manager;
24         memset(&manager, 0, sizeof(socket_manager));
25         int port = 11000;
26         if(argv[1])
27                 port = atoi(argv[1]);
28
29         manager.data_received = &printme;
30         socket_open_tcp_server(&manager, port);
31
32         while(1)
33                 socket_wait_all(&manager, -1);
34
35         return 0;
36 }
37 */
38 /* -------------------------------------------------------------------- */
39
40
41 /*
42 int osrfLogDebug(char* msg, ...) {
43         va_list args;
44         va_start(args, msg);
45         vfprintf(stderr, msg, args);
46         va_end(args);
47         fprintf( stderr, "\n" );
48         return -1;
49 }
50
51 int osrfLogWarning(char* msg, ...) {
52         va_list args;
53         va_start(args, msg);
54         vfprintf(stderr, msg, args);
55         va_end(args);
56         fprintf( stderr, "\n" );
57         return -1;
58 }
59 */
60
61
62 socket_node* _socket_add_node(socket_manager* mgr, 
63                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
64
65         if(mgr == NULL) return NULL;
66         osrfLogDebug("Adding socket node with fd %d", sock_fd);
67         socket_node* new_node = safe_malloc(sizeof(socket_node));
68
69         new_node->endpoint      = endpoint;
70         new_node->addr_type     = addr_type;
71         new_node->sock_fd               = sock_fd;
72         new_node->next                  = NULL;
73         new_node->parent_id = 0;
74         if(parent_id > 0)
75                 new_node->parent_id = parent_id;
76
77         new_node->next                  = mgr->socket;
78         mgr->socket                             = new_node;
79         return new_node;
80 }
81
82 /* creates a new server socket node and adds it to the socket set.
83         returns new socket fd on success.  -1 on failure.
84         socket_type is one of INET or UNIX  */
85 int socket_open_tcp_server(socket_manager* mgr, int port, char* listen_ip) {
86
87         if( mgr == NULL ) {
88                 osrfLogWarning("socket_open_tcp_server(): NULL mgr"); 
89                 return -1;
90         }
91
92         int sock_fd;
93         struct sockaddr_in server_addr;
94
95         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
96
97         if(sock_fd < 0) {
98                 osrfLogWarning("tcp_server_connect(): Unable to create socket");
99                 return -1;
100         }
101
102         server_addr.sin_family = AF_INET;
103
104         if(listen_ip != NULL) {
105                 server_addr.sin_addr.s_addr = inet_addr(listen_ip);
106         } else {
107                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
108         }
109
110         server_addr.sin_port = htons(port);
111
112         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
113                 osrfLogWarning("tcp_server_connect(): cannot bind to port %d", port );
114                 return -1;
115         }
116
117         if(listen(sock_fd, 20) == -1) {
118                 osrfLogWarning("tcp_server_connect(): listen() returned error");
119                 return -1;
120         }
121
122         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
123         return sock_fd;
124 }
125
126 int socket_open_unix_server(socket_manager* mgr, char* path) {
127         if(mgr == NULL || path == NULL) return -1;
128
129         osrfLogDebug("opening unix socket at %s", path);
130         int sock_fd;
131         struct sockaddr_un server_addr;
132
133         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
134         if(sock_fd < 0){
135                 osrfLogWarning("socket_open_unix_server(): socket() failed");
136                 return -1;
137         }
138
139         server_addr.sun_family = AF_UNIX;
140         strcpy(server_addr.sun_path, path);
141
142         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
143                                 sizeof(struct sockaddr_un)) < 0) {
144                 osrfLogWarning(
145                         "socket_open_unix_server(): cannot bind to unix port %s", path );
146                 return -1;
147         }
148
149         if(listen(sock_fd, 20) == -1) {
150                 osrfLogWarning("socket_open_unix_server(): listen() returned error");
151                 return -1;
152         }
153
154         osrfLogDebug("unix socket successfully opened");
155         
156         int i = 1;
157
158         /* causing problems with router for some reason ... */
159         //osrfLogDebug("Setting SO_REUSEADDR");
160         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
161         
162         osrfLogDebug("Setting TCP_NODELAY");
163         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
164
165         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
166         return sock_fd;
167 }
168
169
170
171 int socket_open_udp_server( 
172                 socket_manager* mgr, int port, char* listen_ip ) {
173
174         int sockfd;
175         struct sockaddr_in server_addr;
176
177         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
178                 osrfLogWarning("Unable to create UDP socket");
179                 return -1;
180         }
181
182         server_addr.sin_family = AF_INET;
183         server_addr.sin_port = htons(port);
184         if(listen_ip) server_addr.sin_addr.s_addr = inet_addr(listen_ip);
185         else server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
186
187         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
188                 osrfLogWarning("Unable to bind to UDP port %d", port);
189                 return -1;
190         }
191
192         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
193         return sockfd;
194 }
195
196
197 int socket_open_tcp_client(socket_manager* mgr, int port, char* dest_addr) {
198
199         struct sockaddr_in remoteAddr, localAddr;
200    struct hostent *hptr;
201    int sock_fd;
202
203    // ------------------------------------------------------------------
204    // Create the socket
205    // ------------------------------------------------------------------
206    if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
207       osrfLogWarning( "tcp_connect(): Cannot create socket" );
208       return -1;
209    }
210
211         int i = 1;
212         osrfLogDebug("Setting TCP_NODELAY");
213         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
214
215
216    // ------------------------------------------------------------------
217    // Get the hostname
218    // ------------------------------------------------------------------
219    if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
220       osrfLogWarning( "tcp_connect(): Unknown Host => %s", dest_addr );
221       return -1;
222    }
223
224    // ------------------------------------------------------------------
225    // Construct server info struct
226    // ------------------------------------------------------------------
227    memset( &remoteAddr, 0, sizeof(remoteAddr));
228    remoteAddr.sin_family = AF_INET;
229    remoteAddr.sin_port = htons( port );
230    memcpy( (char*) &remoteAddr.sin_addr.s_addr,
231          hptr->h_addr_list[0], hptr->h_length );
232
233    // ------------------------------------------------------------------
234    // Construct local info struct
235    // ------------------------------------------------------------------
236    memset( &localAddr, 0, sizeof( localAddr ) );
237    localAddr.sin_family = AF_INET;
238    localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
239    localAddr.sin_port = htons(0);
240
241    // ------------------------------------------------------------------
242    // Bind to a local port
243    // ------------------------------------------------------------------
244    if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
245       osrfLogWarning( "tcp_connect(): Cannot bind to local port" );
246       return -1;
247    }
248
249    // ------------------------------------------------------------------
250    // Connect to server
251    // ------------------------------------------------------------------
252    if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
253       osrfLogWarning( "tcp_connect(): Cannot connect to server %s", dest_addr );
254       return -1;
255    }
256
257         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
258
259    return sock_fd;
260 }
261
262
263 int socket_open_udp_client( 
264                 socket_manager* mgr, int port, char* dest_addr) {
265
266         int sockfd;
267         struct sockaddr_in client_addr, server_addr;
268         struct hostent* host;
269
270         if( (host = gethostbyname(dest_addr)) == NULL) {
271                 osrfLogWarning("Unable to resolve host: %s", dest_addr);
272                 return -1;
273         }
274
275         server_addr.sin_family = host->h_addrtype;
276         memcpy((char *) &server_addr.sin_addr.s_addr,
277                              host->h_addr_list[0], host->h_length);
278         server_addr.sin_port = htons(port);
279
280         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
281                 osrfLogWarning("Unable to create UDP socket");
282                 return -1;
283         }
284
285         client_addr.sin_family = AF_INET;
286         client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
287         client_addr.sin_port = htons(0);
288
289         if( (bind(sockfd, (struct sockaddr *) &client_addr, sizeof(client_addr))) < 0 ) {
290                 osrfLogWarning("Unable to bind UDP socket");
291                 return -1;
292         }
293
294         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
295
296         return sockfd;
297 }
298
299
300 int socket_open_unix_client(socket_manager* mgr, char* sock_path) {
301
302         int sock_fd, len;
303    struct sockaddr_un usock;
304
305    if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
306                 osrfLogWarning( "Cannot create socket" );
307                 return -1;
308         }
309
310    usock.sun_family = AF_UNIX;
311    strcpy( usock.sun_path, sock_path );
312
313    len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
314
315    if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
316       osrfLogWarning( "Error connecting to unix socket" );
317                 return -1;
318         }
319
320         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
321
322    return sock_fd;
323 }
324
325
326
327 /* returns the socket_node with the given sock_fd */
328 socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
329         if(mgr == NULL) return NULL;
330         socket_node* node = mgr->socket;
331         while(node) {
332                 if(node->sock_fd == sock_fd)
333                         return node;
334                 node = node->next;
335         }
336         return NULL;
337 }
338
339 /* removes the node with the given sock_fd from the list and frees it */
340 void socket_remove_node(socket_manager* mgr, int sock_fd) {
341
342         if(mgr == NULL) return;
343
344         osrfLogDebug("removing socket %d", sock_fd);
345
346         socket_node* head = mgr->socket;
347         socket_node* tail = head;
348         if(head == NULL) return;
349
350         /* if removing the first node in the list */
351         if(head->sock_fd == sock_fd) {
352                 mgr->socket = head->next;
353                 free(head);
354                 osrfLogDebug("removing first socket in list");
355                 return;
356         }
357
358         head = head->next;
359
360         /* if removing any other node */
361         while(head) {
362                 if(head->sock_fd == sock_fd) {
363                         tail->next = head->next;
364                         free(head);
365                         return;
366                 }
367                 tail = head;
368                 head = head->next;
369         }
370 }
371
372
373
374 void _socket_print_list(socket_manager* mgr) {
375         if(mgr == NULL) return;
376         socket_node* node = mgr->socket;
377         osrfLogDebug("socket_node list: [");
378         while(node) {
379                 osrfLogDebug("sock_fd: %d | parent_id: %d", 
380                                 node->sock_fd, node->parent_id);
381                 node = node->next;
382         }
383         osrfLogDebug("]");
384 }
385
386 /* sends the given data to the given socket */
387 int socket_send(int sock_fd, const char* data) {
388         osrfLogDebug( "socket_bundle sending to %d data %s",
389                 sock_fd, data);
390
391         osrfLogDebug("%d : Sending data at %lf\n", getpid(), get_timestamp_millis());
392         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
393         if( send( sock_fd, data, strlen(data), 0 ) < 0 ) {
394                 osrfLogWarning( "tcp_server_send(): Error sending data" );
395                 return -1;
396         }
397
398         return 0;
399 }
400
401 /* disconnects the node with the given sock_fd and removes
402         it from the socket set */
403 void socket_disconnect(socket_manager* mgr, int sock_fd) {
404
405         osrfLogDebug("Closing socket %d", sock_fd);
406
407         if( shutdown( sock_fd, SHUT_RDWR ) )
408                 osrfLogWarning( "socket_disconnect(): Error shuting down socket, removing anyway" );
409
410         if( close( sock_fd ) == -1 ) 
411                 osrfLogWarning( "socket_disconnect(): Error closing socket, removing anyway" );
412
413         if(mgr != NULL) 
414                 socket_remove_node(mgr, sock_fd);
415         
416 }
417
418
419 /* we assume that if select() fails, the socket is no longer valid */
420 int socket_connected(int sock_fd) {
421         fd_set read_set;
422         FD_ZERO( &read_set );
423         FD_SET( sock_fd, &read_set );
424         if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 ) 
425                 return 0;
426         return 1;
427
428 }
429
430 /* this only waits on the server socket and does not handle the actual
431         data coming in from the client..... XXX */
432 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
433
434         int retval = 0;
435         fd_set read_set;
436         FD_ZERO( &read_set );
437         FD_SET( sock_fd, &read_set );
438
439         struct timeval tv;
440         tv.tv_sec = timeout;
441         tv.tv_usec = 0;
442
443         if( timeout < 0 ) {  
444
445                 // If timeout is -1, we block indefinitely
446                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
447                         osrfLogWarning("Sys Error: %s", strerror(errno));
448                         osrfLogWarning("Call to select interrupted");
449                         return -1;
450                 }
451
452         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
453
454                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
455                         osrfLogWarning("Sys Error: %s", strerror(errno));
456                         osrfLogWarning( "Call to select interrupted" );
457                         return -1;
458                 }
459         }
460
461         osrfLogDebug("%d active sockets after select()", retval);
462         return _socket_route_data_id(mgr, sock_fd);
463 }
464
465
466 int socket_wait_all(socket_manager* mgr, int timeout) {
467
468         if(mgr == NULL) {
469                 osrfLogWarning( "tcp_wait(): null mgr" );
470                 return -1;
471         }
472
473         int retval = 0;
474         fd_set read_set;
475         FD_ZERO( &read_set );
476
477         socket_node* node = mgr->socket;
478         int max_fd = 0;
479         while(node) {
480                 //osrfLogDebug("Adding socket %d to select set",node->sock_fd);
481                 FD_SET( node->sock_fd, &read_set );
482                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
483                 node = node->next;
484         }
485         max_fd += 1;
486
487         struct timeval tv;
488         tv.tv_sec = timeout;
489         tv.tv_usec = 0;
490
491         if( timeout < 0 ) {  
492
493                 // If timeout is -1, there is no timeout passed to the call to select
494                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
495                         osrfLogWarning("Sys Error: %s", strerror(errno));
496                         osrfLogWarning("Call to select interrupted");
497                         return -1;
498                 }
499
500         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
501
502                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
503                         osrfLogWarning("Sys Error: %s", strerror(errno));
504                         osrfLogWarning( "Call to select interrupted" );
505                         return -1;
506                 }
507         }
508
509         osrfLogDebug("%d active sockets after select()", retval);
510         return _socket_route_data(mgr, retval, &read_set);
511 }
512
513 /* determines if we'er receiving a new client or data
514         on an existing client */
515 int _socket_route_data(
516         socket_manager* mgr, int num_active, fd_set* read_set) {
517
518         if(!(mgr && read_set)) return -1;
519
520         int last_failed_id = -1;
521
522
523         /* come back here if someone yanks a socket_node from beneath us */
524         while(1) {
525
526                 socket_node* node = mgr->socket;
527                 int handled = 0;
528                 int status = 0;
529                 
530                 while(node && (handled < num_active)) {
531         
532                         int sock_fd = node->sock_fd;
533                         
534                         if(last_failed_id != -1) {
535                                 /* in case it was not removed by our overlords */
536                                 osrfLogDebug("Attempting to remove last_failed_id of %d", last_failed_id);
537                                 socket_remove_node( mgr, last_failed_id );
538                                 last_failed_id = -1;
539                                 status = -1;
540                                 break;
541                         }
542         
543                         /* does this socket have data? */
544                         if( FD_ISSET( sock_fd, read_set ) ) {
545         
546                                 osrfLogDebug("Socket %d active", sock_fd);
547                                 handled++;
548                                 FD_CLR(sock_fd, read_set);
549         
550                                 if(node->endpoint == SERVER_SOCKET) 
551                                         _socket_handle_new_client(mgr, node);
552         
553                                 else
554                                         status = _socket_handle_client_data(mgr, node);
555         
556                                 /* someone may have yanked a socket_node out from under 
557                                         us...start over with the first socket */
558                                 if(status == -1)  {
559                                         last_failed_id = sock_fd;
560                                         osrfLogDebug("Backtracking back to start of loop because "
561                                                         "of -1 return code from _socket_handle_client_data()");
562                                 }
563                         }
564
565                         if(status == -1) break;
566                         node = node->next;
567
568                 } // is_set
569
570                 if(status == 0) break;
571                 if(status == -1) status = 0;
572         } 
573
574         return 0;
575 }
576
577
578 int _socket_route_data_id( socket_manager* mgr, int sock_id) {
579         socket_node* node = socket_find_node(mgr, sock_id);     
580         int status = 0;
581
582         if(node) {
583                 if(node->endpoint == SERVER_SOCKET) 
584                         _socket_handle_new_client(mgr, node);
585         
586                 if(node->endpoint == CLIENT_SOCKET ) 
587                         status = _socket_handle_client_data(mgr, node);
588
589                 if(status == -1) socket_remove_node(mgr, sock_id);
590                 return 0;
591         } 
592
593         return -1;
594 }
595
596
597 int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
598         if(mgr == NULL || node == NULL) return -1;
599
600         int new_sock_fd;
601         new_sock_fd = accept(node->sock_fd, NULL, NULL);
602         if(new_sock_fd < 0) {
603                 osrfLogWarning("_socket_route_data(): accept() failed");
604                 return -1;
605         }
606
607         if(node->addr_type == INET) {
608                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
609                 osrfLogDebug("Adding new INET client for %d", node->sock_fd);
610
611         } else if(node->addr_type == UNIX) {
612                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
613                 osrfLogDebug("Adding new UNIX client for %d", node->sock_fd);
614         }
615
616         return 0;
617 }
618
619
620 int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
621         if(mgr == NULL || node == NULL) return -1;
622
623         char buf[RBUFSIZE];
624         int read_bytes;
625         int sock_fd = node->sock_fd;
626
627         memset(buf, 0, RBUFSIZE);
628         set_fl(sock_fd, O_NONBLOCK);
629         osrfLogDebug("Gathering client data for %d", node->sock_fd);
630
631         osrfLogDebug("%d : Received data at %lf\n", getpid(), get_timestamp_millis());
632
633         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
634                 osrfLogDebug("Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
635                 if(mgr->data_received)
636                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
637
638                 memset(buf, 0, RBUFSIZE);
639         }
640
641         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
642                 clr_fl(sock_fd, O_NONBLOCK); 
643                 if(read_bytes < 0) { 
644                         if( errno != EAGAIN ) 
645                                 osrfLogWarning( " * Error reading socket with errno %d", errno );
646                 }
647
648         } else { return -1; } /* inform the caller that this node has been tampered with */
649
650         if(read_bytes == 0) {  /* socket closed by client */
651                 if(mgr->on_socket_closed) {
652                         mgr->on_socket_closed(mgr->blob, sock_fd);
653                         return -1;
654                 }
655         }
656
657         return 0;
658
659 }
660
661
662 void socket_manager_free(socket_manager* mgr) {
663         if(mgr == NULL) return;
664         socket_node* tmp;
665         while(mgr->socket) {
666                 tmp = mgr->socket->next;
667                 socket_disconnect(mgr, mgr->socket->sock_fd);
668                 mgr->socket = tmp;
669         }
670         free(mgr);
671
672 }