]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/socket_bundle.c
e3b7f1e628d132b910251906a7fe1f5f5e281fc3
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 /**
2         @file socket_bundle.c
3         @brief Collection of socket-handling routines.
4 */
5
6 #include <opensrf/socket_bundle.h>
7
8 struct socket_node_struct {
9         int endpoint;           /* SERVER_SOCKET or CLIENT_SOCKET */
10         int addr_type;          /* INET or UNIX */
11         int sock_fd;
12         int parent_id;          /* if we're a new client for a server socket, 
13         this points to the server socket we spawned from */
14         struct socket_node_struct* next;
15 };
16
17 /* buffer used to read from the sockets */
18 #define RBUFSIZE 1024
19
20 static socket_node* _socket_add_node(socket_manager* mgr,
21                 int endpoint, int addr_type, int sock_fd, int parent_id );
22 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
23 static void socket_remove_node(socket_manager*, int sock_fd);
24 static int _socket_send(int sock_fd, const char* data, int flags);
25 static int _socket_route_data(socket_manager* mgr, int num_active, fd_set* read_set);
26 static int _socket_route_data_id( socket_manager* mgr, int sock_id);
27 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
28 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
29
30
31 /* -------------------------------------------------------------------- 
32         Test Code 
33         -------------------------------------------------------------------- */
34 /*
35 int count = 0;
36 void printme(void* blob, socket_manager* mgr, 
37                 int sock_fd, char* data, int parent_id) {
38
39         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
40                         sock_fd, parent_id, data );
41
42         socket_send(sock_fd, data);
43
44         if(count++ > 2) {
45                 socket_disconnect(mgr, sock_fd);
46                 _socket_print_list(mgr);
47         }
48 }
49
50 int main(int argc, char* argv[]) {
51         socket_manager manager;
52         memset(&manager, 0, sizeof(socket_manager));
53         int port = 11000;
54         if(argv[1])
55                 port = atoi(argv[1]);
56
57         manager.data_received = &printme;
58         socket_open_tcp_server(&manager, port);
59
60         while(1)
61                 socket_wait_all(&manager, -1);
62
63         return 0;
64 }
65 */
66 /* -------------------------------------------------------------------- */
67
68
69 /**
70         @brief Create a new socket_node and add it to a socket_manager's list.
71         @param mgr Pointer to the socket_manager.
72         @param endpoint SERVER_SOCKET or CLIENT_SOCKET, denoting how the socket is to be used.
73         @param addr_type address type: INET or UNIX.
74         @param sock_fd sock_fd for the new socket_node.
75         @param parent_id parent_id for the new node.
76         @return Pointer to the new socket_node.
77
78         If @a parent_id is negative, the new socket_node receives a parent_id of 0.
79 */
80 static socket_node* _socket_add_node(socket_manager* mgr, 
81                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
82
83         if(mgr == NULL) return NULL;
84         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
85         socket_node* new_node = safe_malloc(sizeof(socket_node));
86
87         new_node->endpoint      = endpoint;
88         new_node->addr_type     = addr_type;
89         new_node->sock_fd       = sock_fd;
90         new_node->next          = NULL;
91         new_node->parent_id = 0;
92         if(parent_id > 0)
93                 new_node->parent_id = parent_id;
94
95         new_node->next                  = mgr->socket;
96         mgr->socket                             = new_node;
97         return new_node;
98 }
99
100 /**
101         @brief Create an TCP INET listener socket and add it to a socket_manager's list.
102         @param mgr Pointer to the socket manager that will own the socket.
103         @param port The port number to bind to.
104         @param listen_ip The IP address to bind to; or, NULL for INADDR_ANY.
105         @return The socket fd if successful; otherwise -1.
106
107         Calls: socket(), bind(), and listen().  Creates a SERVER_SOCKET.
108 */
109 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
110
111         if( mgr == NULL ) {
112                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr"); 
113                 return -1;
114         }
115
116         int sock_fd;
117         struct sockaddr_in server_addr;
118
119         server_addr.sin_family = AF_INET;
120
121         if(listen_ip != NULL) {
122                 struct in_addr addr;
123                 if( inet_aton( listen_ip, &addr ) )
124                         server_addr.sin_addr.s_addr = addr.s_addr;
125                 else {
126                         osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
127                         return -1;
128                 }
129         } else {
130                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
131         }
132
133         server_addr.sin_port = htons(port);
134
135         errno = 0;
136         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
137         if(sock_fd < 0) {
138                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
139                         strerror( errno ) );
140                 return -1;
141         }
142
143         errno = 0;
144         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
145                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
146                         port, strerror( errno ) );
147                 close( sock_fd );
148                 return -1;
149         }
150
151         errno = 0;
152         if(listen(sock_fd, 20) == -1) {
153                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
154                         strerror( errno ) );
155                 close( sock_fd );
156                 return -1;
157         }
158
159         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
160         return sock_fd;
161 }
162
163 /**
164         @brief Create a UNIX domain listener socket and add it to the socket_manager's list.
165         @param mgr Pointer to the socket_manager that will own the socket.
166         @param path Name of the socket within the file system.
167         @return The socket fd if successful; otherwise -1.
168
169         Calls: socket(), bind(), listen().  Creates a SERVER_SOCKET.
170
171         Applies socket option TCP_NODELAY in order to reduce latency.
172  */
173 int socket_open_unix_server(socket_manager* mgr, const char* path) {
174         if(mgr == NULL || path == NULL) return -1;
175
176         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
177         int sock_fd;
178         struct sockaddr_un server_addr;
179
180         if(strlen(path) > sizeof(server_addr.sun_path) - 1)
181         {
182                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
183                         path );
184                 return -1;
185         }
186
187         errno = 0;
188         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
189         if(sock_fd < 0){
190                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
191                         strerror( errno ) );
192                 return -1;
193         }
194
195         server_addr.sun_family = AF_UNIX;
196         strcpy(server_addr.sun_path, path);
197
198         errno = 0;
199         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
200                                 sizeof(struct sockaddr_un)) < 0) {
201                 osrfLogWarning( OSRF_LOG_MARK, 
202                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
203                         path, strerror( errno ) );
204                 close( sock_fd );
205                 return -1;
206         }
207
208         errno = 0;
209         if(listen(sock_fd, 20) == -1) {
210                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
211                         strerror( errno ) );
212                 close( sock_fd );
213                 return -1;
214         }
215
216         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
217         
218         int i = 1;
219
220         /* causing problems with router for some reason ... */
221         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
222         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
223         
224         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
225         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
226
227         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
228         return sock_fd;
229 }
230
231
232 /**
233         @brief Create a UDP socket for a server, and add it to a socket_manager's list.
234         @param mgr Pointer to the socket_manager that will own the socket.
235         @param port The port number to bind to.
236         @param listen_ip The IP address to bind to, or NULL for INADDR_ANY.
237         @return The socket fd if successful; otherwise -1.
238
239         Calls: socket(), bind().  Creates a SERVER_SOCKET.
240 */
241 int socket_open_udp_server( 
242                 socket_manager* mgr, int port, const char* listen_ip ) {
243
244         int sockfd;
245         struct sockaddr_in server_addr;
246
247         server_addr.sin_family = AF_INET;
248         server_addr.sin_port = htons(port);
249         if(listen_ip) {
250                 struct in_addr addr;
251                 if( inet_aton( listen_ip, &addr ) )
252                         server_addr.sin_addr.s_addr = addr.s_addr;
253                 else {
254                         osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
255                         return -1;
256                 }
257         } else
258                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
259
260         errno = 0;
261         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
262                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
263                 return -1;
264         }
265
266         errno = 0;
267         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
268                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
269                         port, strerror( errno ) );
270                 close( sockfd );
271                 return -1;
272         }
273
274         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
275         return sockfd;
276 }
277
278
279 /**
280         @brief Create a client TCP socket, connect with it, and add it to a socket_manager's list.
281         @param mgr Pointer to the socket_manager that will own the socket.
282         @param port What port number to connect to.
283         @param dest_addr Host name or IP address of the server to which we are connecting.
284         @return The socket fd if successful; otherwise -1.
285
286         Calls: getaddrinfo(), socket(), connect().  Creates a CLIENT_SOCKET.
287
288         Applies socket option TCP_NODELAY in order to reduce latency.
289  */
290 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
291
292         struct sockaddr_in remoteAddr;
293         int sock_fd;
294
295         // ------------------------------------------------------------------
296         // Get the IP address of the hostname (for TCP only)
297         // ------------------------------------------------------------------
298         struct addrinfo hints = { 0, 0, 0, 0, 0, NULL, NULL, NULL };
299         hints.ai_socktype = SOCK_STREAM;
300         struct addrinfo* addr_info = NULL;
301         errno = 0;
302         int rc = getaddrinfo( dest_addr, NULL, &hints, &addr_info );
303         if( rc || ! addr_info ) {
304                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): No Such Host => %s: %s",
305                         dest_addr, gai_strerror( rc ) );
306                 return -1;
307         }
308
309         // Look for an address supporting IPv4.  Someday we'll look for
310         // either IPv4 or IPv6, and branch according to what we find.
311         while( addr_info && addr_info->ai_family != PF_INET ) {
312                 addr_info = addr_info->ai_next;
313         }
314
315         if( ! addr_info ) {
316                 osrfLogWarning( OSRF_LOG_MARK,
317                         "socket_open_tcp_client(): Host %s does not support IPV4", dest_addr );
318                 return -1;      
319         }
320
321         // ------------------------------------------------------------------
322         // Create the socket
323         // ------------------------------------------------------------------
324         errno = 0;
325         if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
326                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot create TCP socket: %s",
327                         strerror( errno ) );
328                 return -1;
329         }
330
331         int i = 1;
332         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
333
334         // ------------------------------------------------------------------
335         // Construct server info struct
336         // ------------------------------------------------------------------
337         memset( &remoteAddr, 0, sizeof(remoteAddr));
338         remoteAddr.sin_family = AF_INET;
339         remoteAddr.sin_port = htons( port );
340         struct sockaddr_in* ai_addr_in = (struct sockaddr_in*) addr_info->ai_addr;
341         remoteAddr.sin_addr.s_addr = ai_addr_in->sin_addr.s_addr;
342
343         freeaddrinfo( addr_info );
344
345         // ------------------------------------------------------------------
346         // Connect to server
347         // ------------------------------------------------------------------
348         errno = 0;
349         if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
350                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
351                         dest_addr, strerror(errno) );
352                 close( sock_fd );
353                 return -1;
354         }
355
356         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
357
358         return sock_fd;
359 }
360
361
362 /**
363         @brief Create a client UDP socket and add it to a socket_manager's list.
364         @param mgr Pointer to the socket_manager that will own the socket.
365         @return The socket fd if successful; otherwise -1.
366
367         Calls: socket().  Creates a CLIENT_SOCKET.
368 */
369 int socket_open_udp_client( socket_manager* mgr ) {
370
371         int sockfd;
372
373         errno = 0;
374         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
375                 osrfLogWarning( OSRF_LOG_MARK,
376                         "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
377                 return -1;
378         }
379
380         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
381
382         return sockfd;
383 }
384
385
386 /**
387         @brief Create a UNIX domain client socket, connect with it, add it to the socket_manager's list
388         @param mgr Pointer to the socket_manager that will own the socket.
389         @param sock_path Name of the socket within the file system.
390         @return The socket fd if successful; otherwise -1.
391
392         Calls: socket(), connect().  Creates a CLIENT_SOCKET.
393 */
394 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
395
396         int sock_fd, len;
397         struct sockaddr_un usock;
398
399         if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
400         {
401                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
402                    sock_path );
403                 return -1;
404         }
405
406         errno = 0;
407         if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
408                 osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
409                 return -1;
410         }
411
412         usock.sun_family = AF_UNIX;
413         strcpy( usock.sun_path, sock_path );
414
415         len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
416
417         errno = 0;
418         if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
419                 osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
420                         strerror( errno ) );
421                 close( sock_fd );
422                 return -1;
423         }
424
425         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
426
427         return sock_fd;
428 }
429
430
431 /**
432         @brief Search a socket_manager's list for a socket node for a given file descriptor.
433         @param mgr Pointer to the socket manager.
434         @param sock_fd The file descriptor to be sought.
435         @return A pointer to the socket_node if found; otherwise NULL.
436
437         Traverse a linked list owned by the socket_manager.
438 */
439 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
440         if(mgr == NULL) return NULL;
441         socket_node* node = mgr->socket;
442         while(node) {
443                 if(node->sock_fd == sock_fd)
444                         return node;
445                 node = node->next;
446         }
447         return NULL;
448 }
449
450 /* removes the node with the given sock_fd from the list and frees it */
451 /**
452         @brief Remove a socket node for a given fd from a socket_manager's list.
453         @param mgr Pointer to the socket_manager.
454         @param sock_fd The file descriptor whose socket_node is to be removed.
455
456         This function does @em not close the socket.  It just removes a node from the list, and
457         frees it.  The disposition of the socket is the responsibility of the calling code.
458 */
459 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
460
461         if(mgr == NULL) return;
462
463         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
464
465         socket_node* head = mgr->socket;
466         socket_node* tail = head;
467         if(head == NULL) return;
468
469         /* if removing the first node in the list */
470         if(head->sock_fd == sock_fd) {
471                 mgr->socket = head->next;
472                 free(head);
473                 return;
474         }
475
476         head = head->next;
477
478         /* if removing any other node */
479         while(head) {
480                 if(head->sock_fd == sock_fd) {
481                         tail->next = head->next;
482                         free(head);
483                         return;
484                 }
485                 tail = head;
486                 head = head->next;
487         }
488 }
489
490
491 /**
492         @brief Write to the log: a list of socket_nodes in a socket_manager's list.
493         @param mgr Pointer to the socket_manager.
494
495         For testing and debugging.
496
497         The messages are issued as DEBG messages, and show each file descriptor and its parent.
498 */
499 void _socket_print_list(socket_manager* mgr) {
500         if(mgr == NULL) return;
501         socket_node* node = mgr->socket;
502         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
503         while(node) {
504                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d", 
505                                 node->sock_fd, node->parent_id);
506                 node = node->next;
507         }
508         osrfLogDebug( OSRF_LOG_MARK, "]");
509 }
510
511 /**
512         @brief Send a nul-terminated string over a socket.
513         @param sock_fd The file descriptor for the socket.
514         @param data Pointer to the string to be sent.
515         @return 0 if successful, -1 if not.
516
517         This function is a thin wrapper for _socket_send().
518 */
519 int socket_send(int sock_fd, const char* data) {
520         return _socket_send( sock_fd, data, 0);
521 }
522
523 /**
524         @brief Send a nul-terminated string over a socket.
525         @param sock_fd The file descriptor for the socket.
526         @param data Pointer to the string to be sent.
527         @param flags A set of bitflags to be passed to send().
528         @return 0 if successful, -1 if not.
529
530         This function is the final common pathway for all outgoing socket traffic.
531 */
532 static int _socket_send(int sock_fd, const char* data, int flags) {
533
534         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
535
536         errno = 0;
537         size_t r = send( sock_fd, data, strlen(data), flags );
538         int local_errno = errno;
539
540         if( r == -1 ) {
541                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
542                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
543                 return -1;
544         }
545
546         return 0;
547 }
548
549
550 /* sends the given data to the given socket. 
551  * sets the send flag MSG_DONTWAIT which will allow the 
552  * process to continue even if the socket buffer is full
553  * returns 0 on success, -1 otherwise */
554 //int socket_send_nowait( int sock_fd, const char* data) {
555 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
556 //}
557
558
559 /**
560         @brief Wait for a socket to be ready to send, and then send a string over it.
561         @param sock_fd File descriptor of the socket.
562         @param data Pointer to a nul-terminated string to be sent.
563         @param usecs How long to wait, in microseconds, before timing out.
564         @return 0 if successful, -1 if not.
565
566         The socket may not accept all the data we want to give it.
567 */
568 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
569
570         fd_set write_set;
571         FD_ZERO( &write_set );
572         FD_SET( sock_fd, &write_set );
573
574         const int mil = 1000000;
575         int secs = (int) usecs / mil;
576         usecs = usecs - (secs * mil);
577
578         struct timeval tv;
579         tv.tv_sec = secs;
580         tv.tv_usec = usecs;
581
582         errno = 0;
583         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
584         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
585
586         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
587                 "timed out on send for socket %d after %d secs, %d usecs: %s",
588                 sock_fd, secs, usecs, strerror( errno ) );
589
590         return -1;
591 }
592
593
594 /* disconnects the node with the given sock_fd and removes
595         it from the socket set */
596 /**
597         @brief Close a socket, and remove it from the socket_manager's list.
598         @param mgr Pointer to the socket_manager.
599         @param sock_fd File descriptor for the socket to be closed.
600
601         We close the socket before determining whether it belongs to the socket_manager in question.
602 */
603 void socket_disconnect(socket_manager* mgr, int sock_fd) {
604         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
605         close( sock_fd );
606         socket_remove_node(mgr, sock_fd);
607 }
608
609
610 /**
611         @brief Determine whether a socket is valid.
612         @param sock_fd File descriptor for the socket.
613         @return 1 if the socket is valid, or 0 if it isn't.
614
615         The test is based on a call to select().  If the socket is valid but is not ready to be
616         written to, we wait until it is ready, then return 1.
617
618         If the select() fails, it may be because it was interrupted by a signal.  In that case
619         we try again.  Otherwise we assume that the socket is no longer valid.  This can happen
620         if, for example, the other end of a connection has closed the connection.
621
622         The select() can also fail if it is unable to allocate enough memory for its own internal
623         use.  If that happens, we may erroneously report a valid socket as invalid, but we
624         probably wouldn't be able to use it anyway if we're that close to exhausting memory.
625 */
626 int socket_connected(int sock_fd) {
627         fd_set read_set;
628         FD_ZERO( &read_set );
629         FD_SET( sock_fd, &read_set );
630         while( 1 ) {
631                 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
632                         return 0;
633                 else if( EINTR == errno )
634                         continue;
635                 else
636                         return 1;
637         }
638 }
639
640 /* this only waits on the server socket and does not handle the actual
641         data coming in from the client..... XXX */
642 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
643
644         int retval = 0;
645         fd_set read_set;
646         FD_ZERO( &read_set );
647         FD_SET( sock_fd, &read_set );
648
649         struct timeval tv;
650         tv.tv_sec = timeout;
651         tv.tv_usec = 0;
652         errno = 0;
653
654         if( timeout < 0 ) {  
655
656                 // If timeout is -1, we block indefinitely
657                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
658                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
659                                                   strerror(errno));
660                         return -1;
661                 }
662
663         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
664
665                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
666                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
667                         return -1;
668                 }
669         }
670
671         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
672         return _socket_route_data_id(mgr, sock_fd);
673 }
674
675
676 int socket_wait_all(socket_manager* mgr, int timeout) {
677
678         if(mgr == NULL) {
679                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
680                 return -1;
681         }
682
683         int retval = 0;
684         fd_set read_set;
685         FD_ZERO( &read_set );
686
687         socket_node* node = mgr->socket;
688         int max_fd = 0;
689         while(node) {
690                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
691                 FD_SET( node->sock_fd, &read_set );
692                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
693                 node = node->next;
694         }
695         max_fd += 1;
696
697         struct timeval tv;
698         tv.tv_sec = timeout;
699         tv.tv_usec = 0;
700         errno = 0;
701
702         if( timeout < 0 ) {  
703
704                 // If timeout is -1, there is no timeout passed to the call to select
705                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
706                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
707                         return -1;
708                 }
709
710         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
711
712                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
713                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
714                         return -1;
715                 }
716         }
717
718         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
719         return _socket_route_data(mgr, retval, &read_set);
720 }
721
722 /* iterates over the sockets in the set and handles active sockets.
723         new sockets connecting to server sockets cause the creation
724         of a new socket node.
725         Any new data read is is passed off to the data_received callback
726         as it arrives */
727 /* determines if we're receiving a new client or data
728         on an existing client */
729 static int _socket_route_data(
730         socket_manager* mgr, int num_active, fd_set* read_set) {
731
732         if(!(mgr && read_set)) return -1;
733
734         int last_failed_id = -1;
735
736
737         /* come back here if someone yanks a socket_node from beneath us */
738         while(1) {
739
740                 socket_node* node = mgr->socket;
741                 int handled = 0;
742                 int status = 0;
743                 
744                 while(node && (handled < num_active)) {
745         
746                         int sock_fd = node->sock_fd;
747                         
748                         if(last_failed_id != -1) {
749                                 /* in case it was not removed by our overlords */
750                                 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
751                                 socket_remove_node( mgr, last_failed_id );
752                                 last_failed_id = -1;
753                                 status = -1;
754                                 break;
755                         }
756         
757                         /* does this socket have data? */
758                         if( FD_ISSET( sock_fd, read_set ) ) {
759         
760                                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
761                                 handled++;
762                                 FD_CLR(sock_fd, read_set);
763         
764                                 if(node->endpoint == SERVER_SOCKET) 
765                                         _socket_handle_new_client(mgr, node);
766         
767                                 else
768                                         status = _socket_handle_client_data(mgr, node);
769         
770                                 /* someone may have yanked a socket_node out from under 
771                                         us...start over with the first socket */
772                                 if(status == -1)  {
773                                         last_failed_id = sock_fd;
774                                         osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
775                                                         "of -1 return code from _socket_handle_client_data()");
776                                 }
777                         }
778
779                         if(status == -1) break;
780                         node = node->next;
781
782                 } // is_set
783
784                 if(status == 0) break;
785                 if(status == -1) status = 0;
786         } 
787
788         return 0;
789 }
790
791
792 /* routes data from a single known socket */
793 static int _socket_route_data_id( socket_manager* mgr, int sock_id) {
794         socket_node* node = socket_find_node(mgr, sock_id);     
795         int status = 0;
796
797         if(node) {
798                 if(node->endpoint == SERVER_SOCKET) 
799                         _socket_handle_new_client(mgr, node);
800         
801                 if(node->endpoint == CLIENT_SOCKET ) 
802                         status = _socket_handle_client_data(mgr, node);
803
804                 if(status == -1) {
805                         socket_remove_node(mgr, sock_id);
806                         return -1;
807                 }
808                 return 0;
809         } 
810
811         return -1;
812 }
813
814 /* Creates a CLIENT_SOCKET. */
815 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
816         if(mgr == NULL || node == NULL) return -1;
817
818         errno = 0;
819         int new_sock_fd;
820         new_sock_fd = accept(node->sock_fd, NULL, NULL);
821         if(new_sock_fd < 0) {
822                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
823                         strerror( errno ) );
824                 return -1;
825         }
826
827         if(node->addr_type == INET) {
828                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
829                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
830
831         } else if(node->addr_type == UNIX) {
832                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
833                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
834         }
835
836         return 0;
837 }
838
839
840 /**
841         @brief Receive data on a streaming socket.
842         @param mgr Pointer to the socket_manager that owns the socket_node.
843         @param node Pointer to the socket_node that owns the socket.
844         @return 0 if successful, or -1 upon failure.
845
846         Receive one or more buffers until no more bytes are available for receipt.  Add a
847         terminal nul to each buffer and pass it to a callback function previously defined by the
848         application to the socket_manager.
849
850         If the sender closes the connection, call another callback function, if one has been
851         defined.
852
853         Even when the function returns successfully, the received message may not be complete --
854         there may be more data that hasn't arrived yet.  It is the responsibility of the
855         calling code to recognize message boundaries.
856
857         Called only for a CLIENT_SOCKET.
858 */
859 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
860         if(mgr == NULL || node == NULL) return -1;
861
862         char buf[RBUFSIZE];
863         int read_bytes;
864         int sock_fd = node->sock_fd;
865
866         set_fl(sock_fd, O_NONBLOCK);
867
868         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n", 
869                         (long) getpid(), get_timestamp_millis());
870
871         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
872                 buf[read_bytes] = '\0';
873                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s",
874                                 sock_fd, read_bytes, buf);
875                 if(mgr->data_received)
876                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
877         }
878         int local_errno = errno; /* capture errno as set by recv() */
879
880         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
881                 clr_fl(sock_fd, O_NONBLOCK); 
882                 if(read_bytes < 0) { 
883                         // EAGAIN would have meant that no more data was available
884                         if(local_errno != EAGAIN)   // but if that's not the case...
885                                 osrfLogWarning( OSRF_LOG_MARK, " * Error reading socket with error %s",
886                                         strerror(local_errno) );
887                 }
888
889         } else { return -1; } /* inform the caller that this node has been tampered with */
890
891         if(read_bytes == 0) {  /* socket closed by client */
892                 if(mgr->on_socket_closed) {
893                         mgr->on_socket_closed(mgr->blob, sock_fd);
894                 }
895                 return -1;
896         }
897
898         return 0;
899
900 }
901
902
903 /**
904         @brief Destroy a socket_manager, and close all of its sockets.
905         @param mgr Pointer to the socket_manager to be destroyed.
906 */
907 void socket_manager_free(socket_manager* mgr) {
908         if(mgr == NULL) return;
909         socket_node* tmp;
910         while(mgr->socket) {
911                 tmp = mgr->socket->next;
912                 socket_disconnect(mgr, mgr->socket->sock_fd);
913                 mgr->socket = tmp;
914         }
915         free(mgr);
916
917 }
918