1. Moved several macros from the header to the implementation file. They aren't...
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 /**
2         @file socket_bundle.c
3         @brief Collection of socket-handling routines.
4 */
5
6 #include <opensrf/socket_bundle.h>
7
8 #define LISTENER_SOCKET   1
9 #define DATA_SOCKET       2
10
11 #define INET 10
12 #define UNIX 11
13
14 /**
15         @brief Represents a socket owned by a socket_manager.
16
17         A socket_manager owns a linked list of socket_nodes representing the collection of
18         sockets that it manages.  It may contain a single socket for passing data, or it may
19         contain a listener socket (conceivably more than one) together with any associated
20         sockets created by accept() for communicating with a client.
21 */
22 struct socket_node_struct {
23         int endpoint;       /**< Role of socket: LISTENER_SOCKET or DATA_SOCKET. */
24         int addr_type;      /**< INET or UNIX. */
25         int sock_fd;        /**< File descriptor for socket. */
26         int parent_id;      /**< For a socket created by accept() for a listener socket,
27                                 this is the listener socket we spawned from. */
28         struct socket_node_struct* next;  /**< Linkage pointer for linked list. */
29 };
30
31 /** @brief Size of buffer used to read from the sockets */
32 #define RBUFSIZE 1024
33
34 static socket_node* _socket_add_node(socket_manager* mgr,
35                 int endpoint, int addr_type, int sock_fd, int parent_id );
36 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
37 static void socket_remove_node(socket_manager*, int sock_fd);
38 static int _socket_send(int sock_fd, const char* data, int flags);
39 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
40 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
41
42
43 /* --------------------------------------------------------------------
44         Test Code
45         -------------------------------------------------------------------- */
46 /*
47 int count = 0;
48 void printme(void* blob, socket_manager* mgr,
49                 int sock_fd, char* data, int parent_id) {
50
51         fprintf(stderr, "Got data from socket %d with parent %d => %s",
52                         sock_fd, parent_id, data );
53
54         socket_send(sock_fd, data);
55
56         if(count++ > 2) {
57                 socket_disconnect(mgr, sock_fd);
58                 _socket_print_list(mgr);
59         }
60 }
61
62 int main(int argc, char* argv[]) {
63         socket_manager manager;
64         memset(&manager, 0, sizeof(socket_manager));
65         int port = 11000;
66         if(argv[1])
67                 port = atoi(argv[1]);
68
69         manager.data_received = &printme;
70         socket_open_tcp_server(&manager, port);
71
72         while(1)
73                 socket_wait_all(&manager, -1);
74
75         return 0;
76 }
77 */
78 /* -------------------------------------------------------------------- */
79
80
81 /**
82         @brief Create a new socket_node and add it to a socket_manager's list.
83         @param mgr Pointer to the socket_manager.
84         @param endpoint LISTENER_SOCKET or DATA_SOCKET, denoting how the socket is to be used.
85         @param addr_type address type: INET or UNIX.
86         @param sock_fd sock_fd for the new socket_node.
87         @param parent_id parent_id for the new node.
88         @return Pointer to the new socket_node.
89
90         If @a parent_id is negative, the new socket_node receives a parent_id of 0.
91 */
92 static socket_node* _socket_add_node(socket_manager* mgr,
93                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
94
95         if(mgr == NULL) return NULL;
96         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
97         socket_node* new_node = safe_malloc(sizeof(socket_node));
98
99         new_node->endpoint      = endpoint;
100         new_node->addr_type     = addr_type;
101         new_node->sock_fd       = sock_fd;
102         new_node->next          = NULL;
103         new_node->parent_id = 0;
104         if(parent_id > 0)
105                 new_node->parent_id = parent_id;
106
107         new_node->next                  = mgr->socket;
108         mgr->socket                             = new_node;
109         return new_node;
110 }
111
112 /**
113         @brief Create an TCP INET listener socket and add it to a socket_manager's list.
114         @param mgr Pointer to the socket manager that will own the socket.
115         @param port The port number to bind to.
116         @param listen_ip The IP address to bind to; or, NULL for INADDR_ANY.
117         @return The socket's file descriptor if successful; otherwise -1.
118
119         Calls: socket(), bind(), and listen().  Creates a LISTENER_SOCKET.
120 */
121 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
122
123         if( mgr == NULL ) {
124                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr");
125                 return -1;
126         }
127
128         int sock_fd;
129         struct sockaddr_in server_addr;
130
131         server_addr.sin_family = AF_INET;
132
133         if(listen_ip != NULL) {
134                 struct in_addr addr;
135                 if( inet_aton( listen_ip, &addr ) )
136                         server_addr.sin_addr.s_addr = addr.s_addr;
137                 else {
138                         osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
139                         return -1;
140                 }
141         } else {
142                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
143         }
144
145         server_addr.sin_port = htons(port);
146
147         errno = 0;
148         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
149         if(sock_fd < 0) {
150                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
151                         strerror( errno ) );
152                 return -1;
153         }
154
155         errno = 0;
156         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
157                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
158                         port, strerror( errno ) );
159                 close( sock_fd );
160                 return -1;
161         }
162
163         errno = 0;
164         if(listen(sock_fd, 20) == -1) {
165                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
166                         strerror( errno ) );
167                 close( sock_fd );
168                 return -1;
169         }
170
171         _socket_add_node(mgr, LISTENER_SOCKET, INET, sock_fd, 0);
172         return sock_fd;
173 }
174
175 /**
176         @brief Create a UNIX domain listener socket and add it to the socket_manager's list.
177         @param mgr Pointer to the socket_manager that will own the socket.
178         @param path Name of the socket within the file system.
179         @return The socket's file descriptor if successful; otherwise -1.
180
181         Calls: socket(), bind(), listen().  Creates a LISTENER_SOCKET.
182
183         Apply socket option TCP_NODELAY in order to reduce latency.
184 */
185 int socket_open_unix_server(socket_manager* mgr, const char* path) {
186         if(mgr == NULL || path == NULL) return -1;
187
188         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
189         int sock_fd;
190         struct sockaddr_un server_addr;
191
192         if(strlen(path) > sizeof(server_addr.sun_path) - 1)
193         {
194                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
195                         path );
196                 return -1;
197         }
198
199         errno = 0;
200         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
201         if(sock_fd < 0){
202                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
203                         strerror( errno ) );
204                 return -1;
205         }
206
207         server_addr.sun_family = AF_UNIX;
208         strcpy(server_addr.sun_path, path);
209
210         errno = 0;
211         if( bind(sock_fd, (struct sockaddr*) &server_addr,
212                                 sizeof(struct sockaddr_un)) < 0) {
213                 osrfLogWarning( OSRF_LOG_MARK,
214                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
215                         path, strerror( errno ) );
216                 close( sock_fd );
217                 return -1;
218         }
219
220         errno = 0;
221         if(listen(sock_fd, 20) == -1) {
222                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
223                         strerror( errno ) );
224                 close( sock_fd );
225                 return -1;
226         }
227
228         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
229
230         int i = 1;
231
232         /* causing problems with router for some reason ... */
233         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
234         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
235
236         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
237         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
238
239         _socket_add_node(mgr, LISTENER_SOCKET, UNIX, sock_fd, 0);
240         return sock_fd;
241 }
242
243
244 /**
245         @brief Create a UDP socket for a server, and add it to a socket_manager's list.
246         @param mgr Pointer to the socket_manager that will own the socket.
247         @param port The port number to bind to.
248         @param listen_ip The IP address to bind to, or NULL for INADDR_ANY.
249         @return The socket's file descriptor if successful; otherwise -1.
250
251         Calls: socket(), bind().  Creates a DATA_SOCKET.
252 */
253 int socket_open_udp_server(
254                 socket_manager* mgr, int port, const char* listen_ip ) {
255
256         int sockfd;
257         struct sockaddr_in server_addr;
258
259         server_addr.sin_family = AF_INET;
260         server_addr.sin_port = htons(port);
261         if(listen_ip) {
262                 struct in_addr addr;
263                 if( inet_aton( listen_ip, &addr ) )
264                         server_addr.sin_addr.s_addr = addr.s_addr;
265                 else {
266                         osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
267                         return -1;
268                 }
269         } else
270                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
271
272         errno = 0;
273         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
274                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
275                 return -1;
276         }
277
278         errno = 0;
279         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
280                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
281                         port, strerror( errno ) );
282                 close( sockfd );
283                 return -1;
284         }
285
286         _socket_add_node(mgr, DATA_SOCKET, INET, sockfd, 0);
287         return sockfd;
288 }
289
290
291 /**
292         @brief Create a client TCP socket, connect with it, and add it to a socket_manager's list.
293         @param mgr Pointer to the socket_manager that will own the socket.
294         @param port What port number to connect to.
295         @param dest_addr Host name or IP address of the server to which we are connecting.
296         @return The socket's file descriptor if successful; otherwise -1.
297
298         Calls: getaddrinfo(), socket(), connect().  Creates a DATA_SOCKET.
299
300         Applies socket option TCP_NODELAY in order to reduce latency.
301 */
302 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
303
304         struct sockaddr_in remoteAddr;
305         int sock_fd;
306
307         // ------------------------------------------------------------------
308         // Get the IP address of the hostname (for TCP only)
309         // ------------------------------------------------------------------
310         struct addrinfo hints = { 0, 0, 0, 0, 0, NULL, NULL, NULL };
311         hints.ai_socktype = SOCK_STREAM;
312         struct addrinfo* addr_info = NULL;
313         errno = 0;
314         int rc = getaddrinfo( dest_addr, NULL, &hints, &addr_info );
315         if( rc || ! addr_info ) {
316                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): No Such Host => %s: %s",
317                         dest_addr, gai_strerror( rc ) );
318                 return -1;
319         }
320
321         // Look for an address supporting IPv4.  Someday we'll look for
322         // either IPv4 or IPv6, and branch according to what we find.
323         while( addr_info && addr_info->ai_family != PF_INET ) {
324                 addr_info = addr_info->ai_next;
325         }
326
327         if( ! addr_info ) {
328                 osrfLogWarning( OSRF_LOG_MARK,
329                         "socket_open_tcp_client(): Host %s does not support IPV4", dest_addr );
330                 return -1;
331         }
332
333         // ------------------------------------------------------------------
334         // Create the socket
335         // ------------------------------------------------------------------
336         errno = 0;
337         if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
338                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot create TCP socket: %s",
339                         strerror( errno ) );
340                 return -1;
341         }
342
343         int i = 1;
344         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
345
346         // ------------------------------------------------------------------
347         // Construct server info struct
348         // ------------------------------------------------------------------
349         memset( &remoteAddr, 0, sizeof(remoteAddr));
350         remoteAddr.sin_family = AF_INET;
351         remoteAddr.sin_port = htons( port );
352         struct sockaddr_in* ai_addr_in = (struct sockaddr_in*) addr_info->ai_addr;
353         remoteAddr.sin_addr.s_addr = ai_addr_in->sin_addr.s_addr;
354
355         freeaddrinfo( addr_info );
356
357         // ------------------------------------------------------------------
358         // Connect to server
359         // ------------------------------------------------------------------
360         errno = 0;
361         if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
362                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
363                         dest_addr, strerror(errno) );
364                 close( sock_fd );
365                 return -1;
366         }
367
368         _socket_add_node(mgr, DATA_SOCKET, INET, sock_fd, -1 );
369
370         return sock_fd;
371 }
372
373
374 /**
375         @brief Create a client UDP socket and add it to a socket_manager's list.
376         @param mgr Pointer to the socket_manager that will own the socket.
377         @return The socket's file descriptor if successful; otherwise -1.
378
379         Calls: socket().  Creates a DATA_SOCKET.
380 */
381 int socket_open_udp_client( socket_manager* mgr ) {
382
383         int sockfd;
384
385         errno = 0;
386         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
387                 osrfLogWarning( OSRF_LOG_MARK,
388                         "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
389                 return -1;
390         }
391
392         _socket_add_node(mgr, DATA_SOCKET, INET, sockfd, -1 );
393
394         return sockfd;
395 }
396
397
398 /**
399         @brief Create a UNIX domain client socket, connect with it, add it to the socket_manager's list
400         @param mgr Pointer to the socket_manager that will own the socket.
401         @param sock_path Name of the socket within the file system.
402         @return The socket's file descriptor if successful; otherwise -1.
403
404         Calls: socket(), connect().  Creates a DATA_SOCKET.
405 */
406 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
407
408         int sock_fd, len;
409         struct sockaddr_un usock;
410
411         if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
412         {
413                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
414                    sock_path );
415                 return -1;
416         }
417
418         errno = 0;
419         if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
420                 osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
421                 return -1;
422         }
423
424         usock.sun_family = AF_UNIX;
425         strcpy( usock.sun_path, sock_path );
426
427         len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
428
429         errno = 0;
430         if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
431                 osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
432                         strerror( errno ) );
433                 close( sock_fd );
434                 return -1;
435         }
436
437         _socket_add_node(mgr, DATA_SOCKET, UNIX, sock_fd, -1 );
438
439         return sock_fd;
440 }
441
442
443 /**
444         @brief Search a socket_manager's list for a socket node for a given file descriptor.
445         @param mgr Pointer to the socket manager.
446         @param sock_fd The file descriptor to be sought.
447         @return A pointer to the socket_node if found; otherwise NULL.
448
449         Traverse a linked list owned by the socket_manager.
450 */
451 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
452         if(mgr == NULL) return NULL;
453         socket_node* node = mgr->socket;
454         while(node) {
455                 if(node->sock_fd == sock_fd)
456                         return node;
457                 node = node->next;
458         }
459         return NULL;
460 }
461
462 /* removes the node with the given sock_fd from the list and frees it */
463 /**
464         @brief Remove a socket node for a given fd from a socket_manager's list.
465         @param mgr Pointer to the socket_manager.
466         @param sock_fd The file descriptor whose socket_node is to be removed.
467
468         This function does @em not close the socket.  It just removes a node from the list, and
469         frees it.  The disposition of the socket is the responsibility of the calling code.
470 */
471 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
472
473         if(mgr == NULL) return;
474
475         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
476
477         socket_node* head = mgr->socket;
478         socket_node* tail = head;
479         if(head == NULL) return;
480
481         /* if removing the first node in the list */
482         if(head->sock_fd == sock_fd) {
483                 mgr->socket = head->next;
484                 free(head);
485                 return;
486         }
487
488         head = head->next;
489
490         /* if removing any other node */
491         while(head) {
492                 if(head->sock_fd == sock_fd) {
493                         tail->next = head->next;
494                         free(head);
495                         return;
496                 }
497                 tail = head;
498                 head = head->next;
499         }
500 }
501
502
503 /**
504         @brief Write to the log: a list of socket_nodes in a socket_manager's list.
505         @param mgr Pointer to the socket_manager.
506
507         For testing and debugging.
508
509         The messages are issued as DEBG messages, and show each file descriptor and its parent.
510 */
511 void _socket_print_list(socket_manager* mgr) {
512         if(mgr == NULL) return;
513         socket_node* node = mgr->socket;
514         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
515         while(node) {
516                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d",
517                                 node->sock_fd, node->parent_id);
518                 node = node->next;
519         }
520         osrfLogDebug( OSRF_LOG_MARK, "]");
521 }
522
523 /**
524         @brief Send a nul-terminated string over a socket.
525         @param sock_fd The file descriptor for the socket.
526         @param data Pointer to the string to be sent.
527         @return 0 if successful, -1 if not.
528
529         This function is a thin wrapper for _socket_send().
530 */
531 int socket_send(int sock_fd, const char* data) {
532         return _socket_send( sock_fd, data, 0);
533 }
534
535 /**
536         @brief Send a nul-terminated string over a socket.
537         @param sock_fd The file descriptor for the socket.
538         @param data Pointer to the string to be sent.
539         @param flags A set of bitflags to be passed to send().
540         @return 0 if successful, -1 if not.
541
542         This function is the final common pathway for all outgoing socket traffic.
543 */
544 static int _socket_send(int sock_fd, const char* data, int flags) {
545
546         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
547
548         errno = 0;
549         size_t r = send( sock_fd, data, strlen(data), flags );
550         int local_errno = errno;
551
552         if( r == -1 ) {
553                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
554                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
555                 return -1;
556         }
557
558         return 0;
559 }
560
561
562 /* sends the given data to the given socket.
563  * sets the send flag MSG_DONTWAIT which will allow the
564  * process to continue even if the socket buffer is full
565  * returns 0 on success, -1 otherwise */
566 //int socket_send_nowait( int sock_fd, const char* data) {
567 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
568 //}
569
570
571 /**
572         @brief Wait for a socket to be ready to send, and then send a string over it.
573         @param sock_fd File descriptor of the socket.
574         @param data Pointer to a nul-terminated string to be sent.
575         @param usecs How long to wait, in microseconds, before timing out.
576         @return 0 if successful, -1 if not.
577
578         The socket may not accept all the data we want to give it.
579 */
580 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
581
582         fd_set write_set;
583         FD_ZERO( &write_set );
584         FD_SET( sock_fd, &write_set );
585
586         const int mil = 1000000;
587         int secs = (int) usecs / mil;
588         usecs = usecs - (secs * mil);
589
590         struct timeval tv;
591         tv.tv_sec = secs;
592         tv.tv_usec = usecs;
593
594         errno = 0;
595         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
596         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
597
598         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
599                 "timed out on send for socket %d after %d secs, %d usecs: %s",
600                 sock_fd, secs, usecs, strerror( errno ) );
601
602         return -1;
603 }
604
605
606 /* disconnects the node with the given sock_fd and removes
607         it from the socket set */
608 /**
609         @brief Close a socket, and remove it from the socket_manager's list.
610         @param mgr Pointer to the socket_manager.
611         @param sock_fd File descriptor for the socket to be closed.
612
613         We close the socket before determining whether it belongs to the socket_manager in question.
614 */
615 void socket_disconnect(socket_manager* mgr, int sock_fd) {
616         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
617         close( sock_fd );
618         socket_remove_node(mgr, sock_fd);
619 }
620
621
622 /**
623         @brief Determine whether a socket is valid.
624         @param sock_fd File descriptor for the socket.
625         @return 1 if the socket is valid, or 0 if it isn't.
626
627         The test is based on a call to select().  If the socket is valid but is not ready to be
628         written to, we wait until it is ready, then return 1.
629
630         If the select() fails, it may be because it was interrupted by a signal.  In that case
631         we try again.  Otherwise we assume that the socket is no longer valid.  This can happen
632         if, for example, the other end of a connection has closed the connection.
633
634         The select() can also fail if it is unable to allocate enough memory for its own internal
635         use.  If that happens, we may erroneously report a valid socket as invalid, but we
636         probably wouldn't be able to use it anyway if we're that close to exhausting memory.
637 */
638 int socket_connected(int sock_fd) {
639         fd_set read_set;
640         FD_ZERO( &read_set );
641         FD_SET( sock_fd, &read_set );
642         while( 1 ) {
643                 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
644                         return 0;
645                 else if( EINTR == errno )
646                         continue;
647                 else
648                         return 1;
649         }
650 }
651
652 /**
653         @brief Look for input on a given socket.  If you find some, react to it.
654         @param mgr Pointer to the socket_manager that presumably owns the socket.
655         @param timeout Timeout interval, in seconds (see notes).
656         @param sock_fd The file descriptor to look at.
657         @return 0 if successful, or -1 if a timeout or other error occurs, or if the sender
658                 closes the connection.
659
660         If @a timeout is -1, wait indefinitely for input activity to appear.  If @a timeout is
661         zero, don't wait at all.  If @a timeout is positive, wait that number of seconds
662         before timing out.  If @a timeout has a negative value other than -1, the results are not
663         well defined, but we'll probably get an EINVAL error from select().
664
665         If we detect activity, branch on the type of socket:
666
667         - If it's a listener, accept a new connection, and add the new socket to the
668         socket_manager's list, without actually reading any data.
669         - Otherwise, read as much data as is available from the input socket, passing it a
670         buffer at a time to whatever callback function has been defined to the socket_manager.
671 */
672 int socket_wait( socket_manager* mgr, int timeout, int sock_fd ) {
673
674         int retval = 0;
675         fd_set read_set;
676         FD_ZERO( &read_set );
677         FD_SET( sock_fd, &read_set );
678
679         struct timeval tv;
680         tv.tv_sec = timeout;
681         tv.tv_usec = 0;
682         errno = 0;
683
684         if( timeout < 0 ) {
685
686                 // If timeout is -1, we block indefinitely
687                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
688                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
689                                         strerror(errno));
690                         return -1;
691                 }
692
693         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
694
695                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
696                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
697                                         strerror(errno));
698                         return -1;
699                 }
700         }
701
702         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
703
704         socket_node* node = socket_find_node(mgr, sock_fd);
705         if( node ) {
706                 if( node->endpoint == LISTENER_SOCKET ) {
707                         _socket_handle_new_client( mgr, node );  // accept new connection
708                 } else {
709                         int status = _socket_handle_client_data( mgr, node );   // read data
710                         if( status == -1 ) {
711                                 close( sock_fd );
712                                 socket_remove_node( mgr, sock_fd );
713                                 return -1;
714                         }
715                 }
716                 return 0;
717         }
718         else
719                 return -1;    // No such file descriptor for this socket_manager
720 }
721
722
723 /**
724         @brief Wait for input on all of a socket_manager's sockets; react to any input found.
725         @param mgr Pointer to the socket_manager.
726         @param timeout How many seconds to wait before timing out (see notes).
727         @return 0 if successful, or -1 if a timeout or other error occurs.
728
729         If @a timeout is -1, wait indefinitely for input activity to appear.  If @a timeout is
730         zero, don't wait at all.  If @a timeout is positive, wait that number of seconds
731         before timing out.  If @a timeout has a negative value other than -1, the results are not
732         well defined, but we'll probably get an EINVAL error from select().
733
734         For each active socket found:
735
736         - If it's a listener, accept a new connection, and add the new socket to the
737         socket_manager's list, without actually reading any data.
738         - Otherwise, read as much data as is available from the input socket, passing it a
739         buffer at a time to whatever callback function has been defined to the socket_manager.
740 */
741 int socket_wait_all(socket_manager* mgr, int timeout) {
742
743         if(mgr == NULL) {
744                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
745                 return -1;
746         }
747
748         int num_active = 0;
749         fd_set read_set;
750         FD_ZERO( &read_set );
751
752         socket_node* node = mgr->socket;
753         int max_fd = 0;
754         while(node) {
755                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
756                 FD_SET( node->sock_fd, &read_set );
757                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
758                 node = node->next;
759         }
760         max_fd += 1;
761
762         struct timeval tv;
763         tv.tv_sec = timeout;
764         tv.tv_usec = 0;
765         errno = 0;
766
767         if( timeout < 0 ) {
768
769                 // If timeout is -1, there is no timeout passed to the call to select
770                 if( (num_active = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
771                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
772                         return -1;
773                 }
774
775         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
776
777                 if( (num_active = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
778                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
779                         return -1;
780                 }
781         }
782
783         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", num_active);
784
785         node = mgr->socket;
786         int handled = 0;
787
788         while(node && (handled < num_active)) {
789
790                 socket_node* next_node = node->next;
791                 int sock_fd = node->sock_fd;
792
793                 /* does this socket have data? */
794                 if( FD_ISSET( sock_fd, &read_set ) ) {
795
796                         osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
797                         handled++;
798                         FD_CLR(sock_fd, &read_set);
799
800                         if(node->endpoint == LISTENER_SOCKET)
801                                 _socket_handle_new_client(mgr, node);
802
803                         else {
804                                 if( _socket_handle_client_data(mgr, node) == -1 ) {
805                                         /* someone may have yanked a socket_node out from under us */
806                                         close( sock_fd );
807                                         socket_remove_node( mgr, sock_fd );
808                                 }
809                         }
810                 }
811
812                 node = next_node;
813         } // is_set
814
815         return 0;
816 }
817
818 /**
819         @brief Accept a new socket from a listener, and add it to the socket_manager's list.
820         @param mgr Pointer to the socket_manager that will own the new socket.
821         @param node Pointer to the socket_node for the listener socket.
822         @return 0 if successful, or -1 if not.
823
824         Call: accept().  Creates a DATA_SOCKET (even though the socket resides on the server).
825 */
826 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
827         if(mgr == NULL || node == NULL) return -1;
828
829         errno = 0;
830         int new_sock_fd;
831         new_sock_fd = accept(node->sock_fd, NULL, NULL);
832         if(new_sock_fd < 0) {
833                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
834                         strerror( errno ) );
835                 return -1;
836         }
837
838         if(node->addr_type == INET) {
839                 _socket_add_node(mgr, DATA_SOCKET, INET, new_sock_fd, node->sock_fd);
840                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
841
842         } else if(node->addr_type == UNIX) {
843                 _socket_add_node(mgr, DATA_SOCKET, UNIX, new_sock_fd, node->sock_fd);
844                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
845         }
846
847         return 0;
848 }
849
850
851 /**
852         @brief Receive data on a streaming socket.
853         @param mgr Pointer to the socket_manager that owns the socket_node.
854         @param node Pointer to the socket_node that owns the socket.
855         @return 0 if successful, or -1 upon failure.
856
857         Receive one or more buffers until no more bytes are available for receipt.  Add a
858         terminal nul to each buffer and pass it to a callback function previously defined by the
859         application to the socket_manager.
860
861         If the sender closes the connection, call another callback function, if one has been
862         defined.
863
864         Even when the function returns successfully, the received message may not be complete --
865         there may be more data that hasn't arrived yet.  It is the responsibility of the
866         calling code to recognize message boundaries.
867
868         Called only for a DATA_SOCKET.
869 */
870 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
871         if(mgr == NULL || node == NULL) return -1;
872
873         char buf[RBUFSIZE];
874         int read_bytes;
875         int sock_fd = node->sock_fd;
876
877         set_fl(sock_fd, O_NONBLOCK);
878
879         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n",
880                         (long) getpid(), get_timestamp_millis());
881
882         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
883                 buf[read_bytes] = '\0';
884                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s",
885                                 sock_fd, read_bytes, buf);
886                 if(mgr->data_received)
887                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
888         }
889         int local_errno = errno; /* capture errno as set by recv() */
890
891         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
892                 clr_fl(sock_fd, O_NONBLOCK);
893                 if(read_bytes < 0) {
894                         // EAGAIN would have meant that no more data was available
895                         if(local_errno != EAGAIN)   // but if that's not the case...
896                                 osrfLogWarning( OSRF_LOG_MARK, " * Error reading socket with error %s",
897                                         strerror(local_errno) );
898                 }
899
900         } else { return -1; } /* inform the caller that this node has been tampered with */
901
902         if(read_bytes == 0) {  /* socket closed by client */
903                 if(mgr->on_socket_closed) {
904                         mgr->on_socket_closed(mgr->blob, sock_fd);
905                 }
906                 return -1;
907         }
908
909         return 0;
910
911 }
912
913
914 /**
915         @brief Destroy a socket_manager, and close all of its sockets.
916         @param mgr Pointer to the socket_manager to be destroyed.
917 */
918 void socket_manager_free(socket_manager* mgr) {
919         if(mgr == NULL) return;
920         socket_node* tmp;
921         while(mgr->socket) {
922                 tmp = mgr->socket->next;
923                 socket_disconnect(mgr, mgr->socket->sock_fd);
924                 mgr->socket = tmp;
925         }
926         free(mgr);
927
928 }