]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/socket_bundle.c
Plug a minor memory leak that could occur when we opened a TCP connection
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 /**
2         @file socket_bundle.c
3         @brief Collection of socket-handling routines.
4 */
5
6 #include <opensrf/socket_bundle.h>
7
8 #define LISTENER_SOCKET   1
9 #define DATA_SOCKET       2
10
11 #define INET 10
12 #define UNIX 11
13
14 /**
15         @brief Represents a socket owned by a socket_manager.
16
17         A socket_manager owns a linked list of socket_nodes representing the collection of
18         sockets that it manages.  It may contain a single socket for passing data, or it may
19         contain a listener socket (conceivably more than one) together with any associated
20         sockets created by accept() for communicating with a client.
21 */
22 struct socket_node_struct {
23         int endpoint;       /**< Role of socket: LISTENER_SOCKET or DATA_SOCKET. */
24         int addr_type;      /**< INET or UNIX. */
25         int sock_fd;        /**< File descriptor for socket. */
26         int parent_id;      /**< For a socket created by accept() for a listener socket,
27                                 this is the listener socket we spawned from. */
28         struct socket_node_struct* next;  /**< Linkage pointer for linked list. */
29 };
30
31 /** @brief Size of buffer used to read from the sockets */
32 #define RBUFSIZE 1024
33
34 static socket_node* _socket_add_node(socket_manager* mgr,
35                 int endpoint, int addr_type, int sock_fd, int parent_id );
36 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
37 static void socket_remove_node(socket_manager*, int sock_fd);
38 static int _socket_send(int sock_fd, const char* data, int flags);
39 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
40 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
41
42
43 /* --------------------------------------------------------------------
44         Test Code
45         -------------------------------------------------------------------- */
46 /*
47 int count = 0;
48 void printme(void* blob, socket_manager* mgr,
49                 int sock_fd, char* data, int parent_id) {
50
51         fprintf(stderr, "Got data from socket %d with parent %d => %s",
52                         sock_fd, parent_id, data );
53
54         socket_send(sock_fd, data);
55
56         if(count++ > 2) {
57                 socket_disconnect(mgr, sock_fd);
58                 _socket_print_list(mgr);
59         }
60 }
61
62 int main(int argc, char* argv[]) {
63         socket_manager manager;
64         memset(&manager, 0, sizeof(socket_manager));
65         int port = 11000;
66         if(argv[1])
67                 port = atoi(argv[1]);
68
69         manager.data_received = &printme;
70         socket_open_tcp_server(&manager, port);
71
72         while(1)
73                 socket_wait_all(&manager, -1);
74
75         return 0;
76 }
77 */
78 /* -------------------------------------------------------------------- */
79
80
81 /**
82         @brief Create a new socket_node and add it to a socket_manager's list.
83         @param mgr Pointer to the socket_manager.
84         @param endpoint LISTENER_SOCKET or DATA_SOCKET, denoting how the socket is to be used.
85         @param addr_type address type: INET or UNIX.
86         @param sock_fd sock_fd for the new socket_node.
87         @param parent_id parent_id for the new node.
88         @return Pointer to the new socket_node.
89
90         If @a parent_id is negative, the new socket_node receives a parent_id of 0.
91 */
92 static socket_node* _socket_add_node(socket_manager* mgr,
93                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
94
95         if(mgr == NULL) return NULL;
96         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
97         socket_node* new_node = safe_malloc(sizeof(socket_node));
98
99         new_node->endpoint      = endpoint;
100         new_node->addr_type     = addr_type;
101         new_node->sock_fd       = sock_fd;
102         new_node->next          = NULL;
103         new_node->parent_id = 0;
104         if(parent_id > 0)
105                 new_node->parent_id = parent_id;
106
107         new_node->next                  = mgr->socket;
108         mgr->socket                             = new_node;
109         return new_node;
110 }
111
112 /**
113         @brief Create an TCP INET listener socket and add it to a socket_manager's list.
114         @param mgr Pointer to the socket manager that will own the socket.
115         @param port The port number to bind to.
116         @param listen_ip The IP address to bind to; or, NULL for INADDR_ANY.
117         @return The socket's file descriptor if successful; otherwise -1.
118
119         Calls: socket(), bind(), and listen().  Creates a LISTENER_SOCKET.
120 */
121 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
122
123         if( mgr == NULL ) {
124                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr");
125                 return -1;
126         }
127
128         int sock_fd;
129         struct sockaddr_in server_addr;
130
131         server_addr.sin_family = AF_INET;
132
133         if(listen_ip != NULL) {
134                 struct in_addr addr;
135                 if( inet_aton( listen_ip, &addr ) )
136                         server_addr.sin_addr.s_addr = addr.s_addr;
137                 else {
138                         osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
139                         return -1;
140                 }
141         } else {
142                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
143         }
144
145         server_addr.sin_port = htons(port);
146
147         errno = 0;
148         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
149         if(sock_fd < 0) {
150                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
151                         strerror( errno ) );
152                 return -1;
153         }
154
155         errno = 0;
156         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
157                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
158                         port, strerror( errno ) );
159                 close( sock_fd );
160                 return -1;
161         }
162
163         errno = 0;
164         if(listen(sock_fd, 20) == -1) {
165                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
166                         strerror( errno ) );
167                 close( sock_fd );
168                 return -1;
169         }
170
171         _socket_add_node(mgr, LISTENER_SOCKET, INET, sock_fd, 0);
172         return sock_fd;
173 }
174
175 /**
176         @brief Create a UNIX domain listener socket and add it to the socket_manager's list.
177         @param mgr Pointer to the socket_manager that will own the socket.
178         @param path Name of the socket within the file system.
179         @return The socket's file descriptor if successful; otherwise -1.
180
181         Calls: socket(), bind(), listen().  Creates a LISTENER_SOCKET.
182
183         Apply socket option TCP_NODELAY in order to reduce latency.
184 */
185 int socket_open_unix_server(socket_manager* mgr, const char* path) {
186         if(mgr == NULL || path == NULL) return -1;
187
188         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
189         int sock_fd;
190         struct sockaddr_un server_addr;
191
192         if(strlen(path) > sizeof(server_addr.sun_path) - 1)
193         {
194                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
195                         path );
196                 return -1;
197         }
198
199         errno = 0;
200         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
201         if(sock_fd < 0){
202                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
203                         strerror( errno ) );
204                 return -1;
205         }
206
207         server_addr.sun_family = AF_UNIX;
208         strcpy(server_addr.sun_path, path);
209
210         errno = 0;
211         if( bind(sock_fd, (struct sockaddr*) &server_addr,
212                                 sizeof(struct sockaddr_un)) < 0) {
213                 osrfLogWarning( OSRF_LOG_MARK,
214                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
215                         path, strerror( errno ) );
216                 close( sock_fd );
217                 return -1;
218         }
219
220         errno = 0;
221         if(listen(sock_fd, 20) == -1) {
222                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
223                         strerror( errno ) );
224                 close( sock_fd );
225                 return -1;
226         }
227
228         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
229
230         int i = 1;
231
232         /* causing problems with router for some reason ... */
233         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
234         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
235
236         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
237         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
238
239         _socket_add_node(mgr, LISTENER_SOCKET, UNIX, sock_fd, 0);
240         return sock_fd;
241 }
242
243
244 /**
245         @brief Create a UDP socket for a server, and add it to a socket_manager's list.
246         @param mgr Pointer to the socket_manager that will own the socket.
247         @param port The port number to bind to.
248         @param listen_ip The IP address to bind to, or NULL for INADDR_ANY.
249         @return The socket's file descriptor if successful; otherwise -1.
250
251         Calls: socket(), bind().  Creates a DATA_SOCKET.
252 */
253 int socket_open_udp_server(
254                 socket_manager* mgr, int port, const char* listen_ip ) {
255
256         int sockfd;
257         struct sockaddr_in server_addr;
258
259         server_addr.sin_family = AF_INET;
260         server_addr.sin_port = htons(port);
261         if(listen_ip) {
262                 struct in_addr addr;
263                 if( inet_aton( listen_ip, &addr ) )
264                         server_addr.sin_addr.s_addr = addr.s_addr;
265                 else {
266                         osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
267                         return -1;
268                 }
269         } else
270                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
271
272         errno = 0;
273         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
274                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
275                 return -1;
276         }
277
278         errno = 0;
279         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
280                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
281                         port, strerror( errno ) );
282                 close( sockfd );
283                 return -1;
284         }
285
286         _socket_add_node(mgr, DATA_SOCKET, INET, sockfd, 0);
287         return sockfd;
288 }
289
290
291 /**
292         @brief Create a client TCP socket, connect with it, and add it to a socket_manager's list.
293         @param mgr Pointer to the socket_manager that will own the socket.
294         @param port What port number to connect to.
295         @param dest_addr Host name or IP address of the server to which we are connecting.
296         @return The socket's file descriptor if successful; otherwise -1.
297
298         Calls: getaddrinfo(), socket(), connect().  Creates a DATA_SOCKET.
299
300         Applies socket option TCP_NODELAY in order to reduce latency.
301 */
302 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
303
304         struct sockaddr_in remoteAddr;
305         int sock_fd;
306
307         // ------------------------------------------------------------------
308         // Get the IP address of the hostname (for TCP only)
309         // ------------------------------------------------------------------
310         struct addrinfo hints = { 0, 0, 0, 0, 0, NULL, NULL, NULL };
311         hints.ai_socktype = SOCK_STREAM;
312         struct addrinfo* addr_info_list = NULL;
313         errno = 0;
314         int rc = getaddrinfo( dest_addr, NULL, &hints, &addr_info_list );
315         if( rc || ! addr_info_list ) {
316                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): No Such Host => %s: %s",
317                         dest_addr, gai_strerror( rc ) );
318                 if( addr_info_list )
319                         freeaddrinfo( addr_info_list );
320                 return -1;
321         }
322
323         // Look for an address supporting IPv4.  Someday we'll look for
324         // either IPv4 or IPv6, and branch according to what we find.
325         const struct addrinfo* addr_info = addr_info_list;
326         while( addr_info && addr_info->ai_family != PF_INET ) {
327                 addr_info = addr_info->ai_next;
328         }
329
330         if( ! addr_info ) {
331                 osrfLogWarning( OSRF_LOG_MARK,
332                         "socket_open_tcp_client(): Host %s does not support IPV4", dest_addr );
333                 if( addr_info_list )
334                         freeaddrinfo( addr_info_list );
335                 return -1;
336         }
337
338         // ------------------------------------------------------------------
339         // Create the socket
340         // ------------------------------------------------------------------
341         errno = 0;
342         if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
343                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot create TCP socket: %s",
344                         strerror( errno ) );
345                 if( addr_info_list )
346                         freeaddrinfo( addr_info_list );
347                 return -1;
348         }
349
350         int i = 1;
351         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
352
353         // ------------------------------------------------------------------
354         // Construct server info struct
355         // ------------------------------------------------------------------
356         memset( &remoteAddr, 0, sizeof(remoteAddr));
357         remoteAddr.sin_family = AF_INET;
358         remoteAddr.sin_port = htons( port );
359         struct sockaddr_in* ai_addr_in = (struct sockaddr_in*) addr_info->ai_addr;
360         remoteAddr.sin_addr.s_addr = ai_addr_in->sin_addr.s_addr;
361
362         freeaddrinfo( addr_info_list );
363
364         // ------------------------------------------------------------------
365         // Connect to server
366         // ------------------------------------------------------------------
367         errno = 0;
368         if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
369                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
370                         dest_addr, strerror(errno) );
371                 close( sock_fd );
372                 return -1;
373         }
374
375         _socket_add_node(mgr, DATA_SOCKET, INET, sock_fd, -1 );
376
377         return sock_fd;
378 }
379
380
381 /**
382         @brief Create a client UDP socket and add it to a socket_manager's list.
383         @param mgr Pointer to the socket_manager that will own the socket.
384         @return The socket's file descriptor if successful; otherwise -1.
385
386         Calls: socket().  Creates a DATA_SOCKET.
387 */
388 int socket_open_udp_client( socket_manager* mgr ) {
389
390         int sockfd;
391
392         errno = 0;
393         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
394                 osrfLogWarning( OSRF_LOG_MARK,
395                         "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
396                 return -1;
397         }
398
399         _socket_add_node(mgr, DATA_SOCKET, INET, sockfd, -1 );
400
401         return sockfd;
402 }
403
404
405 /**
406         @brief Create a UNIX domain client socket, connect with it, add it to the socket_manager's list
407         @param mgr Pointer to the socket_manager that will own the socket.
408         @param sock_path Name of the socket within the file system.
409         @return The socket's file descriptor if successful; otherwise -1.
410
411         Calls: socket(), connect().  Creates a DATA_SOCKET.
412 */
413 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
414
415         int sock_fd, len;
416         struct sockaddr_un usock;
417
418         if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
419         {
420                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
421                    sock_path );
422                 return -1;
423         }
424
425         errno = 0;
426         if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
427                 osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
428                 return -1;
429         }
430
431         usock.sun_family = AF_UNIX;
432         strcpy( usock.sun_path, sock_path );
433
434         len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
435
436         errno = 0;
437         if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
438                 osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
439                         strerror( errno ) );
440                 close( sock_fd );
441                 return -1;
442         }
443
444         _socket_add_node(mgr, DATA_SOCKET, UNIX, sock_fd, -1 );
445
446         return sock_fd;
447 }
448
449
450 /**
451         @brief Search a socket_manager's list for a socket node for a given file descriptor.
452         @param mgr Pointer to the socket manager.
453         @param sock_fd The file descriptor to be sought.
454         @return A pointer to the socket_node if found; otherwise NULL.
455
456         Traverse a linked list owned by the socket_manager.
457 */
458 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
459         if(mgr == NULL) return NULL;
460         socket_node* node = mgr->socket;
461         while(node) {
462                 if(node->sock_fd == sock_fd)
463                         return node;
464                 node = node->next;
465         }
466         return NULL;
467 }
468
469 /* removes the node with the given sock_fd from the list and frees it */
470 /**
471         @brief Remove a socket node for a given fd from a socket_manager's list.
472         @param mgr Pointer to the socket_manager.
473         @param sock_fd The file descriptor whose socket_node is to be removed.
474
475         This function does @em not close the socket.  It just removes a node from the list, and
476         frees it.  The disposition of the socket is the responsibility of the calling code.
477 */
478 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
479
480         if(mgr == NULL) return;
481
482         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
483
484         socket_node* head = mgr->socket;
485         socket_node* tail = head;
486         if(head == NULL) return;
487
488         /* if removing the first node in the list */
489         if(head->sock_fd == sock_fd) {
490                 mgr->socket = head->next;
491                 free(head);
492                 return;
493         }
494
495         head = head->next;
496
497         /* if removing any other node */
498         while(head) {
499                 if(head->sock_fd == sock_fd) {
500                         tail->next = head->next;
501                         free(head);
502                         return;
503                 }
504                 tail = head;
505                 head = head->next;
506         }
507 }
508
509
510 /**
511         @brief Write to the log: a list of socket_nodes in a socket_manager's list.
512         @param mgr Pointer to the socket_manager.
513
514         For testing and debugging.
515
516         The messages are issued as DEBG messages, and show each file descriptor and its parent.
517 */
518 void _socket_print_list(socket_manager* mgr) {
519         if(mgr == NULL) return;
520         socket_node* node = mgr->socket;
521         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
522         while(node) {
523                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d",
524                                 node->sock_fd, node->parent_id);
525                 node = node->next;
526         }
527         osrfLogDebug( OSRF_LOG_MARK, "]");
528 }
529
530 /**
531         @brief Send a nul-terminated string over a socket.
532         @param sock_fd The file descriptor for the socket.
533         @param data Pointer to the string to be sent.
534         @return 0 if successful, -1 if not.
535
536         This function is a thin wrapper for _socket_send().
537 */
538 int socket_send(int sock_fd, const char* data) {
539         return _socket_send( sock_fd, data, 0);
540 }
541
542 /**
543         @brief Send a nul-terminated string over a socket.
544         @param sock_fd The file descriptor for the socket.
545         @param data Pointer to the string to be sent.
546         @param flags A set of bitflags to be passed to send().
547         @return 0 if successful, -1 if not.
548
549         This function is the final common pathway for all outgoing socket traffic.
550 */
551 static int _socket_send(int sock_fd, const char* data, int flags) {
552
553         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
554
555         errno = 0;
556         size_t r = send( sock_fd, data, strlen(data), flags );
557         int local_errno = errno;
558
559         if( r == -1 ) {
560                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
561                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
562                 return -1;
563         }
564
565         return 0;
566 }
567
568
569 /* sends the given data to the given socket.
570  * sets the send flag MSG_DONTWAIT which will allow the
571  * process to continue even if the socket buffer is full
572  * returns 0 on success, -1 otherwise */
573 //int socket_send_nowait( int sock_fd, const char* data) {
574 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
575 //}
576
577
578 /**
579         @brief Wait for a socket to be ready to send, and then send a string over it.
580         @param sock_fd File descriptor of the socket.
581         @param data Pointer to a nul-terminated string to be sent.
582         @param usecs How long to wait, in microseconds, before timing out.
583         @return 0 if successful, -1 if not.
584
585         The socket may not accept all the data we want to give it.
586 */
587 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
588
589         fd_set write_set;
590         FD_ZERO( &write_set );
591         FD_SET( sock_fd, &write_set );
592
593         const int mil = 1000000;
594         int secs = (int) usecs / mil;
595         usecs = usecs - (secs * mil);
596
597         struct timeval tv;
598         tv.tv_sec = secs;
599         tv.tv_usec = usecs;
600
601         errno = 0;
602         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
603         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
604
605         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
606                 "timed out on send for socket %d after %d secs, %d usecs: %s",
607                 sock_fd, secs, usecs, strerror( errno ) );
608
609         return -1;
610 }
611
612
613 /* disconnects the node with the given sock_fd and removes
614         it from the socket set */
615 /**
616         @brief Close a socket, and remove it from the socket_manager's list.
617         @param mgr Pointer to the socket_manager.
618         @param sock_fd File descriptor for the socket to be closed.
619
620         We close the socket before determining whether it belongs to the socket_manager in question.
621 */
622 void socket_disconnect(socket_manager* mgr, int sock_fd) {
623         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
624         close( sock_fd );
625         socket_remove_node(mgr, sock_fd);
626 }
627
628
629 /**
630         @brief Determine whether a socket is valid.
631         @param sock_fd File descriptor for the socket.
632         @return 1 if the socket is valid, or 0 if it isn't.
633
634         The test is based on a call to select().  If the socket is valid but is not ready to be
635         written to, we wait until it is ready, then return 1.
636
637         If the select() fails, it may be because it was interrupted by a signal.  In that case
638         we try again.  Otherwise we assume that the socket is no longer valid.  This can happen
639         if, for example, the other end of a connection has closed the connection.
640
641         The select() can also fail if it is unable to allocate enough memory for its own internal
642         use.  If that happens, we may erroneously report a valid socket as invalid, but we
643         probably wouldn't be able to use it anyway if we're that close to exhausting memory.
644 */
645 int socket_connected(int sock_fd) {
646         fd_set read_set;
647         FD_ZERO( &read_set );
648         FD_SET( sock_fd, &read_set );
649         while( 1 ) {
650                 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
651                         return 0;
652                 else if( EINTR == errno )
653                         continue;
654                 else
655                         return 1;
656         }
657 }
658
659 /**
660         @brief Look for input on a given socket.  If you find some, react to it.
661         @param mgr Pointer to the socket_manager that presumably owns the socket.
662         @param timeout Timeout interval, in seconds (see notes).
663         @param sock_fd The file descriptor to look at.
664         @return 0 if successful, or -1 if a timeout or other error occurs, or if the sender
665                 closes the connection.
666
667         If @a timeout is -1, wait indefinitely for input activity to appear.  If @a timeout is
668         zero, don't wait at all.  If @a timeout is positive, wait that number of seconds
669         before timing out.  If @a timeout has a negative value other than -1, the results are not
670         well defined, but we'll probably get an EINVAL error from select().
671
672         If we detect activity, branch on the type of socket:
673
674         - If it's a listener, accept a new connection, and add the new socket to the
675         socket_manager's list, without actually reading any data.
676         - Otherwise, read as much data as is available from the input socket, passing it a
677         buffer at a time to whatever callback function has been defined to the socket_manager.
678 */
679 int socket_wait( socket_manager* mgr, int timeout, int sock_fd ) {
680
681         int retval = 0;
682         fd_set read_set;
683         FD_ZERO( &read_set );
684         FD_SET( sock_fd, &read_set );
685
686         struct timeval tv;
687         tv.tv_sec = timeout;
688         tv.tv_usec = 0;
689         errno = 0;
690
691         if( timeout < 0 ) {
692
693                 // If timeout is -1, we block indefinitely
694                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
695                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
696                                         strerror(errno));
697                         return -1;
698                 }
699
700         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
701
702                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
703                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
704                                         strerror(errno));
705                         return -1;
706                 }
707         }
708
709         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
710
711         socket_node* node = socket_find_node(mgr, sock_fd);
712         if( node ) {
713                 if( node->endpoint == LISTENER_SOCKET ) {
714                         _socket_handle_new_client( mgr, node );  // accept new connection
715                 } else {
716                         int status = _socket_handle_client_data( mgr, node );   // read data
717                         if( status == -1 ) {
718                                 close( sock_fd );
719                                 socket_remove_node( mgr, sock_fd );
720                                 return -1;
721                         }
722                 }
723                 return 0;
724         }
725         else
726                 return -1;    // No such file descriptor for this socket_manager
727 }
728
729
730 /**
731         @brief Wait for input on all of a socket_manager's sockets; react to any input found.
732         @param mgr Pointer to the socket_manager.
733         @param timeout How many seconds to wait before timing out (see notes).
734         @return 0 if successful, or -1 if a timeout or other error occurs.
735
736         If @a timeout is -1, wait indefinitely for input activity to appear.  If @a timeout is
737         zero, don't wait at all.  If @a timeout is positive, wait that number of seconds
738         before timing out.  If @a timeout has a negative value other than -1, the results are not
739         well defined, but we'll probably get an EINVAL error from select().
740
741         For each active socket found:
742
743         - If it's a listener, accept a new connection, and add the new socket to the
744         socket_manager's list, without actually reading any data.
745         - Otherwise, read as much data as is available from the input socket, passing it a
746         buffer at a time to whatever callback function has been defined to the socket_manager.
747 */
748 int socket_wait_all(socket_manager* mgr, int timeout) {
749
750         if(mgr == NULL) {
751                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
752                 return -1;
753         }
754
755         int num_active = 0;
756         fd_set read_set;
757         FD_ZERO( &read_set );
758
759         socket_node* node = mgr->socket;
760         int max_fd = 0;
761         while(node) {
762                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
763                 FD_SET( node->sock_fd, &read_set );
764                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
765                 node = node->next;
766         }
767         max_fd += 1;
768
769         struct timeval tv;
770         tv.tv_sec = timeout;
771         tv.tv_usec = 0;
772         errno = 0;
773
774         if( timeout < 0 ) {
775
776                 // If timeout is -1, there is no timeout passed to the call to select
777                 if( (num_active = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
778                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
779                         return -1;
780                 }
781
782         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
783
784                 if( (num_active = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
785                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
786                         return -1;
787                 }
788         }
789
790         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", num_active);
791
792         node = mgr->socket;
793         int handled = 0;
794
795         while(node && (handled < num_active)) {
796
797                 socket_node* next_node = node->next;
798                 int sock_fd = node->sock_fd;
799
800                 /* does this socket have data? */
801                 if( FD_ISSET( sock_fd, &read_set ) ) {
802
803                         osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
804                         handled++;
805                         FD_CLR(sock_fd, &read_set);
806
807                         if(node->endpoint == LISTENER_SOCKET)
808                                 _socket_handle_new_client(mgr, node);
809
810                         else {
811                                 if( _socket_handle_client_data(mgr, node) == -1 ) {
812                                         /* someone may have yanked a socket_node out from under us */
813                                         close( sock_fd );
814                                         socket_remove_node( mgr, sock_fd );
815                                 }
816                         }
817                 }
818
819                 node = next_node;
820         } // is_set
821
822         return 0;
823 }
824
825 /**
826         @brief Accept a new socket from a listener, and add it to the socket_manager's list.
827         @param mgr Pointer to the socket_manager that will own the new socket.
828         @param node Pointer to the socket_node for the listener socket.
829         @return 0 if successful, or -1 if not.
830
831         Call: accept().  Creates a DATA_SOCKET (even though the socket resides on the server).
832 */
833 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
834         if(mgr == NULL || node == NULL) return -1;
835
836         errno = 0;
837         int new_sock_fd;
838         new_sock_fd = accept(node->sock_fd, NULL, NULL);
839         if(new_sock_fd < 0) {
840                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
841                         strerror( errno ) );
842                 return -1;
843         }
844
845         if(node->addr_type == INET) {
846                 _socket_add_node(mgr, DATA_SOCKET, INET, new_sock_fd, node->sock_fd);
847                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
848
849         } else if(node->addr_type == UNIX) {
850                 _socket_add_node(mgr, DATA_SOCKET, UNIX, new_sock_fd, node->sock_fd);
851                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
852         }
853
854         return 0;
855 }
856
857
858 /**
859         @brief Receive data on a streaming socket.
860         @param mgr Pointer to the socket_manager that owns the socket_node.
861         @param node Pointer to the socket_node that owns the socket.
862         @return 0 if successful, or -1 upon failure.
863
864         Receive one or more buffers until no more bytes are available for receipt.  Add a
865         terminal nul to each buffer and pass it to a callback function previously defined by the
866         application to the socket_manager.
867
868         If the sender closes the connection, call another callback function, if one has been
869         defined.
870
871         Even when the function returns successfully, the received message may not be complete --
872         there may be more data that hasn't arrived yet.  It is the responsibility of the
873         calling code to recognize message boundaries.
874
875         Called only for a DATA_SOCKET.
876 */
877 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
878         if(mgr == NULL || node == NULL) return -1;
879
880         char buf[RBUFSIZE];
881         int read_bytes;
882         int sock_fd = node->sock_fd;
883
884         set_fl(sock_fd, O_NONBLOCK);
885
886         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n",
887                         (long) getpid(), get_timestamp_millis());
888
889         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
890                 buf[read_bytes] = '\0';
891                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s",
892                                 sock_fd, read_bytes, buf);
893                 if(mgr->data_received)
894                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
895         }
896         int local_errno = errno; /* capture errno as set by recv() */
897
898         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
899                 clr_fl(sock_fd, O_NONBLOCK);
900                 if(read_bytes < 0) {
901                         // EAGAIN would have meant that no more data was available
902                         if(local_errno != EAGAIN)   // but if that's not the case...
903                                 osrfLogWarning( OSRF_LOG_MARK, " * Error reading socket with error %s",
904                                         strerror(local_errno) );
905                 }
906
907         } else { return -1; } /* inform the caller that this node has been tampered with */
908
909         if(read_bytes == 0) {  /* socket closed by client */
910                 if(mgr->on_socket_closed) {
911                         mgr->on_socket_closed(mgr->blob, sock_fd);
912                 }
913                 return -1;
914         }
915
916         return 0;
917
918 }
919
920
921 /**
922         @brief Destroy a socket_manager, and close all of its sockets.
923         @param mgr Pointer to the socket_manager to be destroyed.
924 */
925 void socket_manager_free(socket_manager* mgr) {
926         if(mgr == NULL) return;
927         socket_node* tmp;
928         while(mgr->socket) {
929                 tmp = mgr->socket->next;
930                 socket_disconnect(mgr, mgr->socket->sock_fd);
931                 mgr->socket = tmp;
932         }
933         free(mgr);
934
935 }