1 #include "socket_bundle.h"
3 /* --------------------------------------------------------------------
5 -------------------------------------------------------------------- */
8 void printme(void* blob, socket_manager* mgr,
9 int sock_fd, char* data, int parent_id) {
11 fprintf(stderr, "Got data from socket %d with parent %d => %s",
12 sock_fd, parent_id, data );
14 socket_send(sock_fd, data);
17 socket_disconnect(mgr, sock_fd);
18 _socket_print_list(mgr);
22 int main(int argc, char* argv[]) {
23 socket_manager manager;
24 memset(&manager, 0, sizeof(socket_manager));
29 manager.data_received = &printme;
30 socket_open_tcp_server(&manager, port);
33 socket_wait_all(&manager, -1);
38 /* -------------------------------------------------------------------- */
42 socket_node* _socket_add_node(socket_manager* mgr,
43 int endpoint, int addr_type, int sock_fd, int parent_id ) {
45 if(mgr == NULL) return NULL;
46 osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
47 socket_node* new_node = safe_malloc(sizeof(socket_node));
49 new_node->endpoint = endpoint;
50 new_node->addr_type = addr_type;
51 new_node->sock_fd = sock_fd;
52 new_node->next = NULL;
53 new_node->parent_id = 0;
55 new_node->parent_id = parent_id;
57 new_node->next = mgr->socket;
58 mgr->socket = new_node;
62 /* creates a new server socket node and adds it to the socket set.
63 returns new socket fd on success. -1 on failure.
64 socket_type is one of INET or UNIX */
65 int socket_open_tcp_server(socket_manager* mgr, int port, char* listen_ip) {
68 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr");
73 struct sockaddr_in server_addr;
75 sock_fd = socket(AF_INET, SOCK_STREAM, 0);
78 osrfLogWarning( OSRF_LOG_MARK, "tcp_server_connect(): Unable to create socket");
82 server_addr.sin_family = AF_INET;
84 if(listen_ip != NULL) {
85 server_addr.sin_addr.s_addr = inet_addr(listen_ip);
87 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
90 server_addr.sin_port = htons(port);
92 if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
93 osrfLogWarning( OSRF_LOG_MARK, "tcp_server_connect(): cannot bind to port %d", port );
97 if(listen(sock_fd, 20) == -1) {
98 osrfLogWarning( OSRF_LOG_MARK, "tcp_server_connect(): listen() returned error");
102 _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
106 int socket_open_unix_server(socket_manager* mgr, char* path) {
107 if(mgr == NULL || path == NULL) return -1;
109 osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
111 struct sockaddr_un server_addr;
113 sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
115 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed");
119 server_addr.sun_family = AF_UNIX;
120 strcpy(server_addr.sun_path, path);
122 if( bind(sock_fd, (struct sockaddr*) &server_addr,
123 sizeof(struct sockaddr_un)) < 0) {
124 osrfLogWarning( OSRF_LOG_MARK,
125 "socket_open_unix_server(): cannot bind to unix port %s", path );
129 if(listen(sock_fd, 20) == -1) {
130 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error");
134 osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
138 /* causing problems with router for some reason ... */
139 //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
140 //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
142 //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
143 setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
145 _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
151 int socket_open_udp_server(
152 socket_manager* mgr, int port, char* listen_ip ) {
155 struct sockaddr_in server_addr;
157 if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
158 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket");
162 server_addr.sin_family = AF_INET;
163 server_addr.sin_port = htons(port);
164 if(listen_ip) server_addr.sin_addr.s_addr = inet_addr(listen_ip);
165 else server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
167 if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
168 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d", port);
172 _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
177 int socket_open_tcp_client(socket_manager* mgr, int port, char* dest_addr) {
179 struct sockaddr_in remoteAddr, localAddr;
180 struct hostent *hptr;
183 // ------------------------------------------------------------------
185 // ------------------------------------------------------------------
186 if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
187 osrfLogWarning( OSRF_LOG_MARK, "tcp_connect(): Cannot create socket" );
192 //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
193 setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
196 // ------------------------------------------------------------------
198 // ------------------------------------------------------------------
199 if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
200 osrfLogWarning( OSRF_LOG_MARK, "tcp_connect(): Unknown Host => %s", dest_addr );
204 // ------------------------------------------------------------------
205 // Construct server info struct
206 // ------------------------------------------------------------------
207 memset( &remoteAddr, 0, sizeof(remoteAddr));
208 remoteAddr.sin_family = AF_INET;
209 remoteAddr.sin_port = htons( port );
210 memcpy( (char*) &remoteAddr.sin_addr.s_addr,
211 hptr->h_addr_list[0], hptr->h_length );
213 // ------------------------------------------------------------------
214 // Construct local info struct
215 // ------------------------------------------------------------------
216 memset( &localAddr, 0, sizeof( localAddr ) );
217 localAddr.sin_family = AF_INET;
218 localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
219 localAddr.sin_port = htons(0);
221 // ------------------------------------------------------------------
222 // Bind to a local port
223 // ------------------------------------------------------------------
224 if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
225 osrfLogWarning( OSRF_LOG_MARK, "tcp_connect(): Cannot bind to local port" );
229 // ------------------------------------------------------------------
231 // ------------------------------------------------------------------
232 if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
233 osrfLogWarning( OSRF_LOG_MARK, "tcp_connect(): Cannot connect to server %s", dest_addr );
237 _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
243 int socket_open_udp_client(
244 socket_manager* mgr, int port, char* dest_addr) {
247 struct sockaddr_in client_addr, server_addr;
248 struct hostent* host;
250 if( (host = gethostbyname(dest_addr)) == NULL) {
251 osrfLogWarning( OSRF_LOG_MARK, "Unable to resolve host: %s", dest_addr);
255 server_addr.sin_family = host->h_addrtype;
256 memcpy((char *) &server_addr.sin_addr.s_addr,
257 host->h_addr_list[0], host->h_length);
258 server_addr.sin_port = htons(port);
260 if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
261 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket");
265 client_addr.sin_family = AF_INET;
266 client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
267 client_addr.sin_port = htons(0);
269 if( (bind(sockfd, (struct sockaddr *) &client_addr, sizeof(client_addr))) < 0 ) {
270 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind UDP socket");
274 _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
280 int socket_open_unix_client(socket_manager* mgr, char* sock_path) {
283 struct sockaddr_un usock;
285 if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
286 osrfLogWarning( OSRF_LOG_MARK, "Cannot create socket" );
290 usock.sun_family = AF_UNIX;
291 strcpy( usock.sun_path, sock_path );
293 len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
295 if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
296 osrfLogWarning( OSRF_LOG_MARK, "Error connecting to unix socket" );
300 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
307 /* returns the socket_node with the given sock_fd */
308 socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
309 if(mgr == NULL) return NULL;
310 socket_node* node = mgr->socket;
312 if(node->sock_fd == sock_fd)
319 /* removes the node with the given sock_fd from the list and frees it */
320 void socket_remove_node(socket_manager* mgr, int sock_fd) {
322 if(mgr == NULL) return;
324 osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
326 socket_node* head = mgr->socket;
327 socket_node* tail = head;
328 if(head == NULL) return;
330 /* if removing the first node in the list */
331 if(head->sock_fd == sock_fd) {
332 mgr->socket = head->next;
339 /* if removing any other node */
341 if(head->sock_fd == sock_fd) {
342 tail->next = head->next;
353 void _socket_print_list(socket_manager* mgr) {
354 if(mgr == NULL) return;
355 socket_node* node = mgr->socket;
356 osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
358 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d",
359 node->sock_fd, node->parent_id);
362 osrfLogDebug( OSRF_LOG_MARK, "]");
365 /* sends the given data to the given socket */
366 int socket_send(int sock_fd, const char* data) {
367 return _socket_send( sock_fd, data, 0);
371 int _socket_send(int sock_fd, const char* data, int flags) {
373 signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
375 size_t r = send( sock_fd, data, strlen(data), flags );
378 osrfLogWarning( OSRF_LOG_MARK, "tcp_server_send(): Error sending data with return %d", r );
379 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(errno));
387 int socket_send_nowait( int sock_fd, const char* data) {
388 return _socket_send( sock_fd, data, MSG_DONTWAIT);
393 * Waits at most usecs microseconds for the send buffer of the given
394 * socket to accept new data. This does not guarantee that the
395 * socket will accept all the data we want to give it.
397 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
400 FD_ZERO( &write_set );
401 FD_SET( sock_fd, &write_set );
404 int secs = (int) usecs / mil;
405 usecs = usecs - (secs * mil);
411 int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
412 if( ret > 0 ) return _socket_send( sock_fd, data, 0);
414 osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
415 "timed out on send for socket %d after %d secs, %d usecs", sock_fd, secs, usecs );
421 /* disconnects the node with the given sock_fd and removes
422 it from the socket set */
423 void socket_disconnect(socket_manager* mgr, int sock_fd) {
424 osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
426 socket_remove_node(mgr, sock_fd);
430 /* we assume that if select() fails, the socket is no longer valid */
431 int socket_connected(int sock_fd) {
433 FD_ZERO( &read_set );
434 FD_SET( sock_fd, &read_set );
435 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
441 /* this only waits on the server socket and does not handle the actual
442 data coming in from the client..... XXX */
443 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
447 FD_ZERO( &read_set );
448 FD_SET( sock_fd, &read_set );
456 // If timeout is -1, we block indefinitely
457 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
458 osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
462 } else if( timeout > 0 ) { /* timeout of 0 means don't block */
464 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
465 osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
470 osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
471 return _socket_route_data_id(mgr, sock_fd);
475 int socket_wait_all(socket_manager* mgr, int timeout) {
478 osrfLogWarning( OSRF_LOG_MARK, "tcp_wait(): null mgr" );
484 FD_ZERO( &read_set );
486 socket_node* node = mgr->socket;
489 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
490 FD_SET( node->sock_fd, &read_set );
491 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
502 // If timeout is -1, there is no timeout passed to the call to select
503 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
504 osrfLogWarning( OSRF_LOG_MARK, "Call to select interrupted (returned -1)");
505 osrfLogWarning( OSRF_LOG_MARK, "Sys Error: %s", strerror(errno));
509 } else if( timeout != 0 ) { /* timeout of 0 means don't block */
511 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
512 osrfLogWarning( OSRF_LOG_MARK, "Call to select interrupted (returned -1)" );
513 osrfLogWarning( OSRF_LOG_MARK, "Sys Error: %s", strerror(errno));
518 osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
519 return _socket_route_data(mgr, retval, &read_set);
522 /* determines if we'er receiving a new client or data
523 on an existing client */
524 int _socket_route_data(
525 socket_manager* mgr, int num_active, fd_set* read_set) {
527 if(!(mgr && read_set)) return -1;
529 int last_failed_id = -1;
532 /* come back here if someone yanks a socket_node from beneath us */
535 socket_node* node = mgr->socket;
539 while(node && (handled < num_active)) {
541 int sock_fd = node->sock_fd;
543 if(last_failed_id != -1) {
544 /* in case it was not removed by our overlords */
545 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
546 socket_remove_node( mgr, last_failed_id );
552 /* does this socket have data? */
553 if( FD_ISSET( sock_fd, read_set ) ) {
555 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
557 FD_CLR(sock_fd, read_set);
559 if(node->endpoint == SERVER_SOCKET)
560 _socket_handle_new_client(mgr, node);
563 status = _socket_handle_client_data(mgr, node);
565 /* someone may have yanked a socket_node out from under
566 us...start over with the first socket */
568 last_failed_id = sock_fd;
569 osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
570 "of -1 return code from _socket_handle_client_data()");
574 if(status == -1) break;
579 if(status == 0) break;
580 if(status == -1) status = 0;
587 int _socket_route_data_id( socket_manager* mgr, int sock_id) {
588 socket_node* node = socket_find_node(mgr, sock_id);
592 if(node->endpoint == SERVER_SOCKET)
593 _socket_handle_new_client(mgr, node);
595 if(node->endpoint == CLIENT_SOCKET )
596 status = _socket_handle_client_data(mgr, node);
599 socket_remove_node(mgr, sock_id);
609 int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
610 if(mgr == NULL || node == NULL) return -1;
613 new_sock_fd = accept(node->sock_fd, NULL, NULL);
614 if(new_sock_fd < 0) {
615 osrfLogWarning( OSRF_LOG_MARK, "_socket_route_data(): accept() failed");
619 if(node->addr_type == INET) {
620 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
621 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
623 } else if(node->addr_type == UNIX) {
624 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
625 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
632 int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
633 if(mgr == NULL || node == NULL) return -1;
637 int sock_fd = node->sock_fd;
639 memset(buf, 0, RBUFSIZE);
640 set_fl(sock_fd, O_NONBLOCK);
642 osrfLogInternal( OSRF_LOG_MARK, "%d : Received data at %f\n", getpid(), get_timestamp_millis());
644 while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
645 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
646 if(mgr->data_received)
647 mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
649 memset(buf, 0, RBUFSIZE);
652 if(socket_find_node(mgr, sock_fd)) { /* someone may have closed this socket */
653 clr_fl(sock_fd, O_NONBLOCK);
655 if( errno != EAGAIN )
656 osrfLogWarning( OSRF_LOG_MARK, " * Error reading socket with errno %d", errno );
659 } else { return -1; } /* inform the caller that this node has been tampered with */
661 if(read_bytes == 0) { /* socket closed by client */
662 if(mgr->on_socket_closed) {
663 mgr->on_socket_closed(mgr->blob, sock_fd);
673 void socket_manager_free(socket_manager* mgr) {
674 if(mgr == NULL) return;
677 tmp = mgr->socket->next;
678 socket_disconnect(mgr, mgr->socket->sock_fd);