1 #include <opensrf/socket_bundle.h>
3 /* buffer used to read from the sockets */
6 static socket_node* _socket_add_node(socket_manager* mgr,
7 int endpoint, int addr_type, int sock_fd, int parent_id );
8 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
9 static void socket_remove_node(socket_manager*, int sock_fd);
10 static int _socket_send(int sock_fd, const char* data, int flags);
11 static int _socket_route_data(socket_manager* mgr, int num_active, fd_set* read_set);
12 static int _socket_route_data_id( socket_manager* mgr, int sock_id);
13 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
14 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
17 /* --------------------------------------------------------------------
19 -------------------------------------------------------------------- */
22 void printme(void* blob, socket_manager* mgr,
23 int sock_fd, char* data, int parent_id) {
25 fprintf(stderr, "Got data from socket %d with parent %d => %s",
26 sock_fd, parent_id, data );
28 socket_send(sock_fd, data);
31 socket_disconnect(mgr, sock_fd);
32 _socket_print_list(mgr);
36 int main(int argc, char* argv[]) {
37 socket_manager manager;
38 memset(&manager, 0, sizeof(socket_manager));
43 manager.data_received = &printme;
44 socket_open_tcp_server(&manager, port);
47 socket_wait_all(&manager, -1);
52 /* -------------------------------------------------------------------- */
55 /* allocates and inserts a new socket node into the nodeset.
56 if parent_id is positive and non-zero, it will be set */
57 static socket_node* _socket_add_node(socket_manager* mgr,
58 int endpoint, int addr_type, int sock_fd, int parent_id ) {
60 if(mgr == NULL) return NULL;
61 osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
62 socket_node* new_node = safe_malloc(sizeof(socket_node));
64 new_node->endpoint = endpoint;
65 new_node->addr_type = addr_type;
66 new_node->sock_fd = sock_fd;
67 new_node->next = NULL;
68 new_node->parent_id = 0;
70 new_node->parent_id = parent_id;
72 new_node->next = mgr->socket;
73 mgr->socket = new_node;
77 /* creates a new server socket node and adds it to the socket set.
78 returns new socket fd on success. -1 on failure.
79 socket_type is one of INET or UNIX */
80 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
83 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr");
88 struct sockaddr_in server_addr;
91 sock_fd = socket(AF_INET, SOCK_STREAM, 0);
93 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
98 server_addr.sin_family = AF_INET;
100 if(listen_ip != NULL) {
102 if( inet_aton( listen_ip, &addr ) )
103 server_addr.sin_addr.s_addr = addr.s_addr;
105 osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
109 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
112 server_addr.sin_port = htons(port);
115 if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
116 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
117 port, strerror( errno ) );
122 if(listen(sock_fd, 20) == -1) {
123 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
128 _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
132 int socket_open_unix_server(socket_manager* mgr, const char* path) {
133 if(mgr == NULL || path == NULL) return -1;
135 osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
137 struct sockaddr_un server_addr;
139 if(strlen(path) > sizeof(server_addr.sun_path) - 1)
141 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
147 sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
149 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
154 server_addr.sun_family = AF_UNIX;
155 strcpy(server_addr.sun_path, path);
158 if( bind(sock_fd, (struct sockaddr*) &server_addr,
159 sizeof(struct sockaddr_un)) < 0) {
160 osrfLogWarning( OSRF_LOG_MARK,
161 "socket_open_unix_server(): cannot bind to unix port %s: %s",
162 path, strerror( errno ) );
167 if(listen(sock_fd, 20) == -1) {
168 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
173 osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
177 /* causing problems with router for some reason ... */
178 //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
179 //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
181 //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
182 setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
184 _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
190 int socket_open_udp_server(
191 socket_manager* mgr, int port, const char* listen_ip ) {
194 struct sockaddr_in server_addr;
197 if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
198 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
202 server_addr.sin_family = AF_INET;
203 server_addr.sin_port = htons(port);
206 if( inet_aton( listen_ip, &addr ) )
207 server_addr.sin_addr.s_addr = addr.s_addr;
209 osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
212 } else server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
215 if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
216 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
217 port, strerror( errno ) );
221 _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
226 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
228 struct sockaddr_in remoteAddr, localAddr;
229 struct hostent *hptr;
232 // ------------------------------------------------------------------
234 // ------------------------------------------------------------------
236 if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
237 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot create TCP socket: %s",
243 //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
244 setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
247 // ------------------------------------------------------------------
249 // ------------------------------------------------------------------
251 if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
252 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Unknown Host => %s: %s",
253 dest_addr, strerror( errno ) );
257 // ------------------------------------------------------------------
258 // Construct server info struct
259 // ------------------------------------------------------------------
260 memset( &remoteAddr, 0, sizeof(remoteAddr));
261 remoteAddr.sin_family = AF_INET;
262 remoteAddr.sin_port = htons( port );
263 memcpy( (char*) &remoteAddr.sin_addr.s_addr,
264 hptr->h_addr_list[0], hptr->h_length );
266 // ------------------------------------------------------------------
267 // Construct local info struct
268 // ------------------------------------------------------------------
269 memset( &localAddr, 0, sizeof( localAddr ) );
270 localAddr.sin_family = AF_INET;
271 localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
272 localAddr.sin_port = htons(0);
274 // ------------------------------------------------------------------
275 // Bind to a local port
276 // ------------------------------------------------------------------
278 if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
279 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot bind to local port: %s",
284 // ------------------------------------------------------------------
286 // ------------------------------------------------------------------
288 if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
289 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
290 dest_addr, strerror(errno) );
294 _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
300 int socket_open_udp_client(
301 socket_manager* mgr, int port, const char* dest_addr) {
304 struct sockaddr_in client_addr, server_addr;
305 struct hostent* host;
308 if( (host = gethostbyname(dest_addr)) == NULL) {
309 osrfLogWarning( OSRF_LOG_MARK, "Unable to resolve host: %s: %s",
310 dest_addr, strerror( errno ) );
314 server_addr.sin_family = host->h_addrtype;
315 memcpy((char *) &server_addr.sin_addr.s_addr,
316 host->h_addr_list[0], host->h_length);
317 server_addr.sin_port = htons(port);
320 if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
321 osrfLogWarning( OSRF_LOG_MARK, "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
325 client_addr.sin_family = AF_INET;
326 client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
327 client_addr.sin_port = htons(0);
330 if( (bind(sockfd, (struct sockaddr *) &client_addr, sizeof(client_addr))) < 0 ) {
331 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind UDP socket: %s", strerror( errno ) );
335 _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
341 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
344 struct sockaddr_un usock;
346 if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
348 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
354 if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
355 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
359 usock.sun_family = AF_UNIX;
360 strcpy( usock.sun_path, sock_path );
362 len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
365 if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
366 osrfLogWarning( OSRF_LOG_MARK, "Error connecting to unix socket: %s",
371 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
377 /* returns the socket_node with the given sock_fd */
378 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
379 if(mgr == NULL) return NULL;
380 socket_node* node = mgr->socket;
382 if(node->sock_fd == sock_fd)
389 /* removes the node with the given sock_fd from the list and frees it */
390 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
392 if(mgr == NULL) return;
394 osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
396 socket_node* head = mgr->socket;
397 socket_node* tail = head;
398 if(head == NULL) return;
400 /* if removing the first node in the list */
401 if(head->sock_fd == sock_fd) {
402 mgr->socket = head->next;
409 /* if removing any other node */
411 if(head->sock_fd == sock_fd) {
412 tail->next = head->next;
422 void _socket_print_list(socket_manager* mgr) {
423 if(mgr == NULL) return;
424 socket_node* node = mgr->socket;
425 osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
427 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d",
428 node->sock_fd, node->parent_id);
431 osrfLogDebug( OSRF_LOG_MARK, "]");
434 /* sends the given data to the given socket */
435 int socket_send(int sock_fd, const char* data) {
436 return _socket_send( sock_fd, data, 0);
440 static int _socket_send(int sock_fd, const char* data, int flags) {
442 signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
445 size_t r = send( sock_fd, data, strlen(data), flags );
446 int local_errno = errno;
449 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
450 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
458 /* sends the given data to the given socket.
459 * sets the send flag MSG_DONTWAIT which will allow the
460 * process to continue even if the socket buffer is full
461 * returns 0 on success, -1 otherwise */
462 //int socket_send_nowait( int sock_fd, const char* data) {
463 // return _socket_send( sock_fd, data, MSG_DONTWAIT);
468 * Waits at most usecs microseconds for the send buffer of the given
469 * socket to accept new data. This does not guarantee that the
470 * socket will accept all the data we want to give it.
472 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
475 FD_ZERO( &write_set );
476 FD_SET( sock_fd, &write_set );
479 int secs = (int) usecs / mil;
480 usecs = usecs - (secs * mil);
487 int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
488 if( ret > 0 ) return _socket_send( sock_fd, data, 0);
490 osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
491 "timed out on send for socket %d after %d secs, %d usecs: %s",
492 sock_fd, secs, usecs, strerror( errno ) );
498 /* disconnects the node with the given sock_fd and removes
499 it from the socket set */
500 void socket_disconnect(socket_manager* mgr, int sock_fd) {
501 osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
503 socket_remove_node(mgr, sock_fd);
507 /* we assume that if select() fails, the socket is no longer valid */
508 int socket_connected(int sock_fd) {
510 FD_ZERO( &read_set );
511 FD_SET( sock_fd, &read_set );
512 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
518 /* this only waits on the server socket and does not handle the actual
519 data coming in from the client..... XXX */
520 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
524 FD_ZERO( &read_set );
525 FD_SET( sock_fd, &read_set );
534 // If timeout is -1, we block indefinitely
535 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
536 osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
540 } else if( timeout > 0 ) { /* timeout of 0 means don't block */
542 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
543 osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
548 osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
549 return _socket_route_data_id(mgr, sock_fd);
553 int socket_wait_all(socket_manager* mgr, int timeout) {
556 osrfLogWarning( OSRF_LOG_MARK, "socket_wait_all(): null mgr" );
562 FD_ZERO( &read_set );
564 socket_node* node = mgr->socket;
567 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
568 FD_SET( node->sock_fd, &read_set );
569 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
581 // If timeout is -1, there is no timeout passed to the call to select
582 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
583 osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
587 } else if( timeout != 0 ) { /* timeout of 0 means don't block */
589 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
590 osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
595 osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
596 return _socket_route_data(mgr, retval, &read_set);
599 /* iterates over the sockets in the set and handles active sockets.
600 new sockets connecting to server sockets cause the creation
601 of a new socket node.
602 Any new data read is is passed off to the data_received callback
604 /* determines if we're receiving a new client or data
605 on an existing client */
606 static int _socket_route_data(
607 socket_manager* mgr, int num_active, fd_set* read_set) {
609 if(!(mgr && read_set)) return -1;
611 int last_failed_id = -1;
614 /* come back here if someone yanks a socket_node from beneath us */
617 socket_node* node = mgr->socket;
621 while(node && (handled < num_active)) {
623 int sock_fd = node->sock_fd;
625 if(last_failed_id != -1) {
626 /* in case it was not removed by our overlords */
627 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
628 socket_remove_node( mgr, last_failed_id );
634 /* does this socket have data? */
635 if( FD_ISSET( sock_fd, read_set ) ) {
637 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
639 FD_CLR(sock_fd, read_set);
641 if(node->endpoint == SERVER_SOCKET)
642 _socket_handle_new_client(mgr, node);
645 status = _socket_handle_client_data(mgr, node);
647 /* someone may have yanked a socket_node out from under
648 us...start over with the first socket */
650 last_failed_id = sock_fd;
651 osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
652 "of -1 return code from _socket_handle_client_data()");
656 if(status == -1) break;
661 if(status == 0) break;
662 if(status == -1) status = 0;
669 /* routes data from a single known socket */
670 static int _socket_route_data_id( socket_manager* mgr, int sock_id) {
671 socket_node* node = socket_find_node(mgr, sock_id);
675 if(node->endpoint == SERVER_SOCKET)
676 _socket_handle_new_client(mgr, node);
678 if(node->endpoint == CLIENT_SOCKET )
679 status = _socket_handle_client_data(mgr, node);
682 socket_remove_node(mgr, sock_id);
692 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
693 if(mgr == NULL || node == NULL) return -1;
697 new_sock_fd = accept(node->sock_fd, NULL, NULL);
698 if(new_sock_fd < 0) {
699 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
704 if(node->addr_type == INET) {
705 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
706 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
708 } else if(node->addr_type == UNIX) {
709 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
710 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
717 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
718 if(mgr == NULL || node == NULL) return -1;
722 int sock_fd = node->sock_fd;
724 osrf_clearbuf(buf, sizeof(buf));
725 set_fl(sock_fd, O_NONBLOCK);
727 osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n", (long) getpid(), get_timestamp_millis());
729 while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
730 buf[read_bytes] = '\0';
731 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
732 if(mgr->data_received)
733 mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
735 osrf_clearbuf(buf, sizeof(buf));
737 int local_errno = errno; /* capture errno as set by recv() */
739 if(socket_find_node(mgr, sock_fd)) { /* someone may have closed this socket */
740 clr_fl(sock_fd, O_NONBLOCK);
742 if(local_errno != EAGAIN)
743 osrfLogWarning(OSRF_LOG_MARK, " * Error reading socket with error %s", strerror(local_errno));
746 } else { return -1; } /* inform the caller that this node has been tampered with */
748 if(read_bytes == 0) { /* socket closed by client */
749 if(mgr->on_socket_closed) {
750 mgr->on_socket_closed(mgr->blob, sock_fd);
760 void socket_manager_free(socket_manager* mgr) {
761 if(mgr == NULL) return;
764 tmp = mgr->socket->next;
765 socket_disconnect(mgr, mgr->socket->sock_fd);