1 #include "socket_bundle.h"
3 /* --------------------------------------------------------------------
5 -------------------------------------------------------------------- */
8 void printme(void* blob, socket_manager* mgr,
9 int sock_fd, char* data, int parent_id) {
11 fprintf(stderr, "Got data from socket %d with parent %d => %s",
12 sock_fd, parent_id, data );
14 socket_send(sock_fd, data);
17 socket_disconnect(mgr, sock_fd);
18 _socket_print_list(mgr);
22 int main(int argc, char* argv[]) {
23 socket_manager manager;
24 memset(&manager, 0, sizeof(socket_manager));
29 manager.data_received = &printme;
30 socket_open_tcp_server(&manager, port);
33 socket_wait_all(&manager, -1);
38 /* -------------------------------------------------------------------- */
42 int osrfLogDebug(char* msg, ...) {
45 vfprintf(stderr, msg, args);
47 fprintf( stderr, "\n" );
51 int osrfLogWarning(char* msg, ...) {
54 vfprintf(stderr, msg, args);
56 fprintf( stderr, "\n" );
62 socket_node* _socket_add_node(socket_manager* mgr,
63 int endpoint, int addr_type, int sock_fd, int parent_id ) {
65 if(mgr == NULL) return NULL;
66 osrfLogInternal("Adding socket node with fd %d", sock_fd);
67 socket_node* new_node = safe_malloc(sizeof(socket_node));
69 new_node->endpoint = endpoint;
70 new_node->addr_type = addr_type;
71 new_node->sock_fd = sock_fd;
72 new_node->next = NULL;
73 new_node->parent_id = 0;
75 new_node->parent_id = parent_id;
77 new_node->next = mgr->socket;
78 mgr->socket = new_node;
82 /* creates a new server socket node and adds it to the socket set.
83 returns new socket fd on success. -1 on failure.
84 socket_type is one of INET or UNIX */
85 int socket_open_tcp_server(socket_manager* mgr, int port, char* listen_ip) {
88 osrfLogWarning("socket_open_tcp_server(): NULL mgr");
93 struct sockaddr_in server_addr;
95 sock_fd = socket(AF_INET, SOCK_STREAM, 0);
98 osrfLogWarning("tcp_server_connect(): Unable to create socket");
102 server_addr.sin_family = AF_INET;
104 if(listen_ip != NULL) {
105 server_addr.sin_addr.s_addr = inet_addr(listen_ip);
107 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
110 server_addr.sin_port = htons(port);
112 if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
113 osrfLogWarning("tcp_server_connect(): cannot bind to port %d", port );
117 if(listen(sock_fd, 20) == -1) {
118 osrfLogWarning("tcp_server_connect(): listen() returned error");
122 _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
126 int socket_open_unix_server(socket_manager* mgr, char* path) {
127 if(mgr == NULL || path == NULL) return -1;
129 osrfLogDebug("opening unix socket at %s", path);
131 struct sockaddr_un server_addr;
133 sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
135 osrfLogWarning("socket_open_unix_server(): socket() failed");
139 server_addr.sun_family = AF_UNIX;
140 strcpy(server_addr.sun_path, path);
142 if( bind(sock_fd, (struct sockaddr*) &server_addr,
143 sizeof(struct sockaddr_un)) < 0) {
145 "socket_open_unix_server(): cannot bind to unix port %s", path );
149 if(listen(sock_fd, 20) == -1) {
150 osrfLogWarning("socket_open_unix_server(): listen() returned error");
154 osrfLogDebug("unix socket successfully opened");
158 /* causing problems with router for some reason ... */
159 //osrfLogDebug("Setting SO_REUSEADDR");
160 //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
162 //osrfLogDebug("Setting TCP_NODELAY");
163 setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
165 _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
171 int socket_open_udp_server(
172 socket_manager* mgr, int port, char* listen_ip ) {
175 struct sockaddr_in server_addr;
177 if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
178 osrfLogWarning("Unable to create UDP socket");
182 server_addr.sin_family = AF_INET;
183 server_addr.sin_port = htons(port);
184 if(listen_ip) server_addr.sin_addr.s_addr = inet_addr(listen_ip);
185 else server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
187 if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
188 osrfLogWarning("Unable to bind to UDP port %d", port);
192 _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
197 int socket_open_tcp_client(socket_manager* mgr, int port, char* dest_addr) {
199 struct sockaddr_in remoteAddr, localAddr;
200 struct hostent *hptr;
203 // ------------------------------------------------------------------
205 // ------------------------------------------------------------------
206 if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
207 osrfLogWarning( "tcp_connect(): Cannot create socket" );
212 //osrfLogDebug("Setting TCP_NODELAY");
213 setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
216 // ------------------------------------------------------------------
218 // ------------------------------------------------------------------
219 if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
220 osrfLogWarning( "tcp_connect(): Unknown Host => %s", dest_addr );
224 // ------------------------------------------------------------------
225 // Construct server info struct
226 // ------------------------------------------------------------------
227 memset( &remoteAddr, 0, sizeof(remoteAddr));
228 remoteAddr.sin_family = AF_INET;
229 remoteAddr.sin_port = htons( port );
230 memcpy( (char*) &remoteAddr.sin_addr.s_addr,
231 hptr->h_addr_list[0], hptr->h_length );
233 // ------------------------------------------------------------------
234 // Construct local info struct
235 // ------------------------------------------------------------------
236 memset( &localAddr, 0, sizeof( localAddr ) );
237 localAddr.sin_family = AF_INET;
238 localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
239 localAddr.sin_port = htons(0);
241 // ------------------------------------------------------------------
242 // Bind to a local port
243 // ------------------------------------------------------------------
244 if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
245 osrfLogWarning( "tcp_connect(): Cannot bind to local port" );
249 // ------------------------------------------------------------------
251 // ------------------------------------------------------------------
252 if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
253 osrfLogWarning( "tcp_connect(): Cannot connect to server %s", dest_addr );
257 _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
263 int socket_open_udp_client(
264 socket_manager* mgr, int port, char* dest_addr) {
267 struct sockaddr_in client_addr, server_addr;
268 struct hostent* host;
270 if( (host = gethostbyname(dest_addr)) == NULL) {
271 osrfLogWarning("Unable to resolve host: %s", dest_addr);
275 server_addr.sin_family = host->h_addrtype;
276 memcpy((char *) &server_addr.sin_addr.s_addr,
277 host->h_addr_list[0], host->h_length);
278 server_addr.sin_port = htons(port);
280 if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
281 osrfLogWarning("Unable to create UDP socket");
285 client_addr.sin_family = AF_INET;
286 client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
287 client_addr.sin_port = htons(0);
289 if( (bind(sockfd, (struct sockaddr *) &client_addr, sizeof(client_addr))) < 0 ) {
290 osrfLogWarning("Unable to bind UDP socket");
294 _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
300 int socket_open_unix_client(socket_manager* mgr, char* sock_path) {
303 struct sockaddr_un usock;
305 if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
306 osrfLogWarning( "Cannot create socket" );
310 usock.sun_family = AF_UNIX;
311 strcpy( usock.sun_path, sock_path );
313 len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
315 if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
316 osrfLogWarning( "Error connecting to unix socket" );
320 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
327 /* returns the socket_node with the given sock_fd */
328 socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
329 if(mgr == NULL) return NULL;
330 socket_node* node = mgr->socket;
332 if(node->sock_fd == sock_fd)
339 /* removes the node with the given sock_fd from the list and frees it */
340 void socket_remove_node(socket_manager* mgr, int sock_fd) {
342 if(mgr == NULL) return;
344 osrfLogDebug("removing socket %d", sock_fd);
346 socket_node* head = mgr->socket;
347 socket_node* tail = head;
348 if(head == NULL) return;
350 /* if removing the first node in the list */
351 if(head->sock_fd == sock_fd) {
352 mgr->socket = head->next;
359 /* if removing any other node */
361 if(head->sock_fd == sock_fd) {
362 tail->next = head->next;
373 void _socket_print_list(socket_manager* mgr) {
374 if(mgr == NULL) return;
375 socket_node* node = mgr->socket;
376 osrfLogDebug("socket_node list: [");
378 osrfLogDebug("sock_fd: %d | parent_id: %d",
379 node->sock_fd, node->parent_id);
385 /* sends the given data to the given socket */
386 int socket_send(int sock_fd, const char* data) {
387 osrfLogInternal( "socket_bundle sending to %d data %s",
390 signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
391 if( send( sock_fd, data, strlen(data), 0 ) < 0 ) {
392 osrfLogWarning( "tcp_server_send(): Error sending data" );
399 /* disconnects the node with the given sock_fd and removes
400 it from the socket set */
401 void socket_disconnect(socket_manager* mgr, int sock_fd) {
403 osrfLogDebug("Closing socket %d", sock_fd);
405 if( close( sock_fd ) == -1 )
406 osrfLogWarning( "socket_disconnect(): Error closing socket, removing anyway" );
409 socket_remove_node(mgr, sock_fd);
414 /* we assume that if select() fails, the socket is no longer valid */
415 int socket_connected(int sock_fd) {
417 FD_ZERO( &read_set );
418 FD_SET( sock_fd, &read_set );
419 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
425 /* this only waits on the server socket and does not handle the actual
426 data coming in from the client..... XXX */
427 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
431 FD_ZERO( &read_set );
432 FD_SET( sock_fd, &read_set );
440 // If timeout is -1, we block indefinitely
441 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
442 osrfLogWarning("Sys Error: %s", strerror(errno));
443 osrfLogWarning("Call to select interrupted");
447 } else if( timeout > 0 ) { /* timeout of 0 means don't block */
449 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
450 osrfLogWarning("Sys Error: %s", strerror(errno));
451 osrfLogWarning( "Call to select interrupted" );
456 osrfLogInternal("%d active sockets after select()", retval);
457 return _socket_route_data_id(mgr, sock_fd);
461 int socket_wait_all(socket_manager* mgr, int timeout) {
464 osrfLogWarning( "tcp_wait(): null mgr" );
470 FD_ZERO( &read_set );
472 socket_node* node = mgr->socket;
475 osrfLogInternal("Adding socket fd %d to select set",node->sock_fd);
476 FD_SET( node->sock_fd, &read_set );
477 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
488 // If timeout is -1, there is no timeout passed to the call to select
489 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
490 osrfLogWarning("Call to select interrupted (returned -1)");
491 osrfLogWarning("Sys Error: %s", strerror(errno));
495 } else if( timeout != 0 ) { /* timeout of 0 means don't block */
497 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
498 osrfLogWarning( "Call to select interrupted (returned -1)" );
499 osrfLogWarning("Sys Error: %s", strerror(errno));
504 osrfLogDebug("%d active sockets after select()", retval);
505 return _socket_route_data(mgr, retval, &read_set);
508 /* determines if we'er receiving a new client or data
509 on an existing client */
510 int _socket_route_data(
511 socket_manager* mgr, int num_active, fd_set* read_set) {
513 if(!(mgr && read_set)) return -1;
515 int last_failed_id = -1;
518 /* come back here if someone yanks a socket_node from beneath us */
521 socket_node* node = mgr->socket;
525 while(node && (handled < num_active)) {
527 int sock_fd = node->sock_fd;
529 if(last_failed_id != -1) {
530 /* in case it was not removed by our overlords */
531 osrfLogInternal("Attempting to remove last_failed_id of %d", last_failed_id);
532 socket_remove_node( mgr, last_failed_id );
538 /* does this socket have data? */
539 if( FD_ISSET( sock_fd, read_set ) ) {
541 osrfLogInternal("Socket %d active", sock_fd);
543 FD_CLR(sock_fd, read_set);
545 if(node->endpoint == SERVER_SOCKET)
546 _socket_handle_new_client(mgr, node);
549 status = _socket_handle_client_data(mgr, node);
551 /* someone may have yanked a socket_node out from under
552 us...start over with the first socket */
554 last_failed_id = sock_fd;
555 osrfLogInternal("Backtracking back to start of loop because "
556 "of -1 return code from _socket_handle_client_data()");
560 if(status == -1) break;
565 if(status == 0) break;
566 if(status == -1) status = 0;
573 int _socket_route_data_id( socket_manager* mgr, int sock_id) {
574 socket_node* node = socket_find_node(mgr, sock_id);
578 if(node->endpoint == SERVER_SOCKET)
579 _socket_handle_new_client(mgr, node);
581 if(node->endpoint == CLIENT_SOCKET )
582 status = _socket_handle_client_data(mgr, node);
584 if(status == -1) socket_remove_node(mgr, sock_id);
592 int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
593 if(mgr == NULL || node == NULL) return -1;
596 new_sock_fd = accept(node->sock_fd, NULL, NULL);
597 if(new_sock_fd < 0) {
598 osrfLogWarning("_socket_route_data(): accept() failed");
602 if(node->addr_type == INET) {
603 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
604 osrfLogDebug("Adding new INET client for %d", node->sock_fd);
606 } else if(node->addr_type == UNIX) {
607 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
608 osrfLogDebug("Adding new UNIX client for %d", node->sock_fd);
615 int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
616 if(mgr == NULL || node == NULL) return -1;
620 int sock_fd = node->sock_fd;
622 memset(buf, 0, RBUFSIZE);
623 set_fl(sock_fd, O_NONBLOCK);
625 osrfLogInternal("%d : Received data at %lf\n", getpid(), get_timestamp_millis());
627 while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
628 osrfLogInternal("Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
629 if(mgr->data_received)
630 mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
632 memset(buf, 0, RBUFSIZE);
635 if(socket_find_node(mgr, sock_fd)) { /* someone may have closed this socket */
636 clr_fl(sock_fd, O_NONBLOCK);
638 if( errno != EAGAIN )
639 osrfLogWarning( " * Error reading socket with errno %d", errno );
642 } else { return -1; } /* inform the caller that this node has been tampered with */
644 if(read_bytes == 0) { /* socket closed by client */
645 if(mgr->on_socket_closed) {
646 mgr->on_socket_closed(mgr->blob, sock_fd);
656 void socket_manager_free(socket_manager* mgr) {
657 if(mgr == NULL) return;
660 tmp = mgr->socket->next;
661 socket_disconnect(mgr, mgr->socket->sock_fd);