1 #include <opensrf/socket_bundle.h>
3 struct socket_node_struct {
4 int endpoint; /* SERVER_SOCKET or CLIENT_SOCKET */
5 int addr_type; /* INET or UNIX */
7 int parent_id; /* if we're a new client for a server socket,
8 this points to the server socket we spawned from */
9 struct socket_node_struct* next;
12 /* buffer used to read from the sockets */
15 static socket_node* _socket_add_node(socket_manager* mgr,
16 int endpoint, int addr_type, int sock_fd, int parent_id );
17 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
18 static void socket_remove_node(socket_manager*, int sock_fd);
19 static int _socket_send(int sock_fd, const char* data, int flags);
20 static int _socket_route_data(socket_manager* mgr, int num_active, fd_set* read_set);
21 static int _socket_route_data_id( socket_manager* mgr, int sock_id);
22 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
23 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
26 /* --------------------------------------------------------------------
28 -------------------------------------------------------------------- */
31 void printme(void* blob, socket_manager* mgr,
32 int sock_fd, char* data, int parent_id) {
34 fprintf(stderr, "Got data from socket %d with parent %d => %s",
35 sock_fd, parent_id, data );
37 socket_send(sock_fd, data);
40 socket_disconnect(mgr, sock_fd);
41 _socket_print_list(mgr);
45 int main(int argc, char* argv[]) {
46 socket_manager manager;
47 memset(&manager, 0, sizeof(socket_manager));
52 manager.data_received = &printme;
53 socket_open_tcp_server(&manager, port);
56 socket_wait_all(&manager, -1);
61 /* -------------------------------------------------------------------- */
65 @brief Create a new socket_node and add it to a socket_manager's list.
66 @param mgr Pointer to the socket_manager.
67 @param endpoint SERVER_SOCKET or CLIENT_SOCKET, denoting how the socket is to be used.
68 @param addr_type address type: INET or UNIX.
69 @param sock_fd sock_fd for the new socket_node.
70 @param parent_id parent_id for the new node.
71 @return Pointer to the new socket_node.
73 If @a parent_id is negative, the new socket_node receives a parent_id of 0.
75 static socket_node* _socket_add_node(socket_manager* mgr,
76 int endpoint, int addr_type, int sock_fd, int parent_id ) {
78 if(mgr == NULL) return NULL;
79 osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
80 socket_node* new_node = safe_malloc(sizeof(socket_node));
82 new_node->endpoint = endpoint;
83 new_node->addr_type = addr_type;
84 new_node->sock_fd = sock_fd;
85 new_node->next = NULL;
86 new_node->parent_id = 0;
88 new_node->parent_id = parent_id;
90 new_node->next = mgr->socket;
91 mgr->socket = new_node;
96 @brief Create an TCP INET listener socket and add it to a socket_manager's list.
97 @param mgr Pointer to the socket manager that will own the socket.
98 @param port The port number to bind to.
99 @param listen_ip The IP address to bind to; or, NULL for INADDR_ANY.
100 @return The socket fd if successful; otherwise -1.
102 Calls: socket(), bind(), and listen().
104 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
107 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr");
112 struct sockaddr_in server_addr;
114 server_addr.sin_family = AF_INET;
116 if(listen_ip != NULL) {
118 if( inet_aton( listen_ip, &addr ) )
119 server_addr.sin_addr.s_addr = addr.s_addr;
121 osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
125 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
128 server_addr.sin_port = htons(port);
131 sock_fd = socket(AF_INET, SOCK_STREAM, 0);
133 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
139 if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
140 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
141 port, strerror( errno ) );
147 if(listen(sock_fd, 20) == -1) {
148 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
154 _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
159 @brief Create a UNIX domain listener socket and add it to the socket_manager's list.
160 @param mgr Pointer to the socket_manager that will own the socket.
161 @param path Name of the socket within the file system.
162 @return The socket fd if successful; otherwise -1.
164 Calls: socket(), bind(), listen().
166 int socket_open_unix_server(socket_manager* mgr, const char* path) {
167 if(mgr == NULL || path == NULL) return -1;
169 osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
171 struct sockaddr_un server_addr;
173 if(strlen(path) > sizeof(server_addr.sun_path) - 1)
175 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
181 sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
183 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
188 server_addr.sun_family = AF_UNIX;
189 strcpy(server_addr.sun_path, path);
192 if( bind(sock_fd, (struct sockaddr*) &server_addr,
193 sizeof(struct sockaddr_un)) < 0) {
194 osrfLogWarning( OSRF_LOG_MARK,
195 "socket_open_unix_server(): cannot bind to unix port %s: %s",
196 path, strerror( errno ) );
202 if(listen(sock_fd, 20) == -1) {
203 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
209 osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
213 /* causing problems with router for some reason ... */
214 //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
215 //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
217 //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
218 setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
220 _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
226 @brief Create a UDP socket for a server, and add it to a socket_manager's list.
227 @param mgr Pointer to the socket_manager that will own the socket.
228 @param port The port number to bind to.
229 @param listen_ip The IP address to bind to, or NULL for INADDR_ANY.
230 @return The socket fd if successful; otherwise -1.
232 Calls: socket(), bind().
234 int socket_open_udp_server(
235 socket_manager* mgr, int port, const char* listen_ip ) {
238 struct sockaddr_in server_addr;
240 server_addr.sin_family = AF_INET;
241 server_addr.sin_port = htons(port);
244 if( inet_aton( listen_ip, &addr ) )
245 server_addr.sin_addr.s_addr = addr.s_addr;
247 osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
251 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
254 if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
255 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
260 if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
261 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
262 port, strerror( errno ) );
267 _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
273 @brief Create a client TCP socket, connect with it, and add it to a socket_manager's list.
274 @param mgr Pointer to the socket_manager that will own the socket.
275 @param port What port number to connect to.
276 @param dest_addr Host name or IP address of the server to which we are connecting.
277 @return The socket fd if successful; otherwise -1.
279 Calls: getaddrinfo(), socket(), connect().
281 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
283 struct sockaddr_in remoteAddr;
286 // ------------------------------------------------------------------
287 // Get the IP address of the hostname (for TCP only)
288 // ------------------------------------------------------------------
289 struct addrinfo hints = { 0, 0, 0, 0, 0, NULL, NULL, NULL };
290 hints.ai_socktype = SOCK_STREAM;
291 struct addrinfo* addr_info = NULL;
293 int rc = getaddrinfo( dest_addr, NULL, &hints, &addr_info );
294 if( rc || ! addr_info ) {
295 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): No Such Host => %s: %s",
296 dest_addr, gai_strerror( rc ) );
300 // Look for an address supporting IPv4. Someday we'll look for
301 // either IPv4 or IPv6, and branch according to what we find.
302 while( addr_info && addr_info->ai_family != PF_INET ) {
303 addr_info = addr_info->ai_next;
307 osrfLogWarning( OSRF_LOG_MARK,
308 "socket_open_tcp_client(): Host %s does not support IPV4", dest_addr );
312 // ------------------------------------------------------------------
314 // ------------------------------------------------------------------
316 if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
317 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot create TCP socket: %s",
323 setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
325 // ------------------------------------------------------------------
326 // Construct server info struct
327 // ------------------------------------------------------------------
328 memset( &remoteAddr, 0, sizeof(remoteAddr));
329 remoteAddr.sin_family = AF_INET;
330 remoteAddr.sin_port = htons( port );
331 struct sockaddr_in* ai_addr_in = (struct sockaddr_in*) addr_info->ai_addr;
332 remoteAddr.sin_addr.s_addr = ai_addr_in->sin_addr.s_addr;
334 freeaddrinfo( addr_info );
336 // ------------------------------------------------------------------
338 // ------------------------------------------------------------------
340 if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
341 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
342 dest_addr, strerror(errno) );
347 _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
354 @brief Create a client UDP socket and add it to a socket_manager's list.
355 @param mgr Pointer to the socket_manager that will own the socket.
356 @return The socket fd if successful; otherwise -1.
360 int socket_open_udp_client( socket_manager* mgr ) {
365 if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
366 osrfLogWarning( OSRF_LOG_MARK,
367 "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
371 _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
378 @brief Create a UNIX domain client socket, connect with it, add it to the socket_manager's list
379 @param mgr Pointer to the socket_manager that will own the socket.
380 @param sock_path Name of the socket within the file system.
381 @return The socket fd if successful; otherwise -1.
383 Calls: socket(), connect().
385 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
388 struct sockaddr_un usock;
390 if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
392 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
398 if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
399 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
403 usock.sun_family = AF_UNIX;
404 strcpy( usock.sun_path, sock_path );
406 len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
409 if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
410 osrfLogWarning( OSRF_LOG_MARK, "Error connecting to unix socket: %s",
416 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
423 @brief Search a socket_manager's list for a socket node for a given file descriptor.
424 @param mgr Pointer to the socket manager.
425 @param sock_fd The file descriptor to be sought.
426 @return A pointer to the socket_node if found; otherwise NULL.
428 Traverse a linked list owned by the socket_manager.
430 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
431 if(mgr == NULL) return NULL;
432 socket_node* node = mgr->socket;
434 if(node->sock_fd == sock_fd)
441 /* removes the node with the given sock_fd from the list and frees it */
443 @brief Remove a socket node for a given fd from a socket_manager's list.
444 @param mgr Pointer to the socket_manager.
445 @param sock_fd The file descriptor whose socket_node is to be removed.
447 This function does @em not close the socket. It just removes a node from the list, and
448 frees it. It is the responsibility of the calling code to close the socket.
450 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
452 if(mgr == NULL) return;
454 osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
456 socket_node* head = mgr->socket;
457 socket_node* tail = head;
458 if(head == NULL) return;
460 /* if removing the first node in the list */
461 if(head->sock_fd == sock_fd) {
462 mgr->socket = head->next;
469 /* if removing any other node */
471 if(head->sock_fd == sock_fd) {
472 tail->next = head->next;
483 @brief Write to the log: a list of socket_nodes in a socket_manager's list.
484 @param mgr Pointer to the socket_manager.
486 For testing and debugging.
488 The messages are issued as DEBG messages, and show each file descriptor and its parent.
490 void _socket_print_list(socket_manager* mgr) {
491 if(mgr == NULL) return;
492 socket_node* node = mgr->socket;
493 osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
495 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d",
496 node->sock_fd, node->parent_id);
499 osrfLogDebug( OSRF_LOG_MARK, "]");
502 /* sends the given data to the given socket */
503 int socket_send(int sock_fd, const char* data) {
504 return _socket_send( sock_fd, data, 0);
508 static int _socket_send(int sock_fd, const char* data, int flags) {
510 signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
513 size_t r = send( sock_fd, data, strlen(data), flags );
514 int local_errno = errno;
517 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
518 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
526 /* sends the given data to the given socket.
527 * sets the send flag MSG_DONTWAIT which will allow the
528 * process to continue even if the socket buffer is full
529 * returns 0 on success, -1 otherwise */
530 //int socket_send_nowait( int sock_fd, const char* data) {
531 // return _socket_send( sock_fd, data, MSG_DONTWAIT);
536 * Waits at most usecs microseconds for the send buffer of the given
537 * socket to accept new data. This does not guarantee that the
538 * socket will accept all the data we want to give it.
540 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
543 FD_ZERO( &write_set );
544 FD_SET( sock_fd, &write_set );
547 int secs = (int) usecs / mil;
548 usecs = usecs - (secs * mil);
555 int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
556 if( ret > 0 ) return _socket_send( sock_fd, data, 0);
558 osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
559 "timed out on send for socket %d after %d secs, %d usecs: %s",
560 sock_fd, secs, usecs, strerror( errno ) );
566 /* disconnects the node with the given sock_fd and removes
567 it from the socket set */
568 void socket_disconnect(socket_manager* mgr, int sock_fd) {
569 osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
571 socket_remove_node(mgr, sock_fd);
575 /* we assume that if select() fails, the socket is no longer valid */
576 int socket_connected(int sock_fd) {
578 FD_ZERO( &read_set );
579 FD_SET( sock_fd, &read_set );
580 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
586 /* this only waits on the server socket and does not handle the actual
587 data coming in from the client..... XXX */
588 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
592 FD_ZERO( &read_set );
593 FD_SET( sock_fd, &read_set );
602 // If timeout is -1, we block indefinitely
603 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
604 osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
608 } else if( timeout > 0 ) { /* timeout of 0 means don't block */
610 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
611 osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
616 osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
617 return _socket_route_data_id(mgr, sock_fd);
621 int socket_wait_all(socket_manager* mgr, int timeout) {
624 osrfLogWarning( OSRF_LOG_MARK, "socket_wait_all(): null mgr" );
630 FD_ZERO( &read_set );
632 socket_node* node = mgr->socket;
635 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
636 FD_SET( node->sock_fd, &read_set );
637 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
649 // If timeout is -1, there is no timeout passed to the call to select
650 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
651 osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
655 } else if( timeout != 0 ) { /* timeout of 0 means don't block */
657 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
658 osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
663 osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
664 return _socket_route_data(mgr, retval, &read_set);
667 /* iterates over the sockets in the set and handles active sockets.
668 new sockets connecting to server sockets cause the creation
669 of a new socket node.
670 Any new data read is is passed off to the data_received callback
672 /* determines if we're receiving a new client or data
673 on an existing client */
674 static int _socket_route_data(
675 socket_manager* mgr, int num_active, fd_set* read_set) {
677 if(!(mgr && read_set)) return -1;
679 int last_failed_id = -1;
682 /* come back here if someone yanks a socket_node from beneath us */
685 socket_node* node = mgr->socket;
689 while(node && (handled < num_active)) {
691 int sock_fd = node->sock_fd;
693 if(last_failed_id != -1) {
694 /* in case it was not removed by our overlords */
695 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
696 socket_remove_node( mgr, last_failed_id );
702 /* does this socket have data? */
703 if( FD_ISSET( sock_fd, read_set ) ) {
705 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
707 FD_CLR(sock_fd, read_set);
709 if(node->endpoint == SERVER_SOCKET)
710 _socket_handle_new_client(mgr, node);
713 status = _socket_handle_client_data(mgr, node);
715 /* someone may have yanked a socket_node out from under
716 us...start over with the first socket */
718 last_failed_id = sock_fd;
719 osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
720 "of -1 return code from _socket_handle_client_data()");
724 if(status == -1) break;
729 if(status == 0) break;
730 if(status == -1) status = 0;
737 /* routes data from a single known socket */
738 static int _socket_route_data_id( socket_manager* mgr, int sock_id) {
739 socket_node* node = socket_find_node(mgr, sock_id);
743 if(node->endpoint == SERVER_SOCKET)
744 _socket_handle_new_client(mgr, node);
746 if(node->endpoint == CLIENT_SOCKET )
747 status = _socket_handle_client_data(mgr, node);
750 socket_remove_node(mgr, sock_id);
760 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
761 if(mgr == NULL || node == NULL) return -1;
765 new_sock_fd = accept(node->sock_fd, NULL, NULL);
766 if(new_sock_fd < 0) {
767 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
772 if(node->addr_type == INET) {
773 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
774 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
776 } else if(node->addr_type == UNIX) {
777 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
778 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
785 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
786 if(mgr == NULL || node == NULL) return -1;
790 int sock_fd = node->sock_fd;
792 osrf_clearbuf(buf, sizeof(buf));
793 set_fl(sock_fd, O_NONBLOCK);
795 osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n", (long) getpid(), get_timestamp_millis());
797 while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
798 buf[read_bytes] = '\0';
799 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
800 if(mgr->data_received)
801 mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
803 osrf_clearbuf(buf, sizeof(buf));
805 int local_errno = errno; /* capture errno as set by recv() */
807 if(socket_find_node(mgr, sock_fd)) { /* someone may have closed this socket */
808 clr_fl(sock_fd, O_NONBLOCK);
810 if(local_errno != EAGAIN)
811 osrfLogWarning(OSRF_LOG_MARK, " * Error reading socket with error %s", strerror(local_errno));
814 } else { return -1; } /* inform the caller that this node has been tampered with */
816 if(read_bytes == 0) { /* socket closed by client */
817 if(mgr->on_socket_closed) {
818 mgr->on_socket_closed(mgr->blob, sock_fd);
828 void socket_manager_free(socket_manager* mgr) {
829 if(mgr == NULL) return;
832 tmp = mgr->socket->next;
833 socket_disconnect(mgr, mgr->socket->sock_fd);