]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/socket_bundle.c
cb86a0b24d9702027ad04de54fd386782dd1db16
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 /**
2         @file socket_bundle.c
3         @brief Collection of socket-handling routines.
4 */
5
6 #include <opensrf/socket_bundle.h>
7
8 /**
9         @brief Represents a socket owned by a socket_manager.
10
11         A socket_manager owns a linked list of socket_nodes representing the collection of
12         sockets that it manages.  It may contain a single socket for passing data, or it may
13         contain a listener socket together with any associated sockets (created by accept())
14         for communicating with a client.
15 */
16 struct socket_node_struct {
17         int endpoint;       /**< Role of socket: SERVER_SOCKET or CLIENT_SOCKET. */
18         int addr_type;      /**< INET or UNIX. */
19         int sock_fd;        /**< File descriptor for socket. */
20         int parent_id;      /**< If we're a new client for a server socket,
21                                 this is the listener socket we spawned from. */
22         struct socket_node_struct* next;  /**< Linkage pointer for linked list. */
23 };
24
25 /** @brief Size of buffer used to read from the sockets */
26 #define RBUFSIZE 1024
27
28 static socket_node* _socket_add_node(socket_manager* mgr,
29                 int endpoint, int addr_type, int sock_fd, int parent_id );
30 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
31 static void socket_remove_node(socket_manager*, int sock_fd);
32 static int _socket_send(int sock_fd, const char* data, int flags);
33 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
34 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
35
36
37 /* -------------------------------------------------------------------- 
38         Test Code 
39         -------------------------------------------------------------------- */
40 /*
41 int count = 0;
42 void printme(void* blob, socket_manager* mgr, 
43                 int sock_fd, char* data, int parent_id) {
44
45         fprintf(stderr, "Got data from socket %d with parent %d => %s",
46                         sock_fd, parent_id, data );
47
48         socket_send(sock_fd, data);
49
50         if(count++ > 2) {
51                 socket_disconnect(mgr, sock_fd);
52                 _socket_print_list(mgr);
53         }
54 }
55
56 int main(int argc, char* argv[]) {
57         socket_manager manager;
58         memset(&manager, 0, sizeof(socket_manager));
59         int port = 11000;
60         if(argv[1])
61                 port = atoi(argv[1]);
62
63         manager.data_received = &printme;
64         socket_open_tcp_server(&manager, port);
65
66         while(1)
67                 socket_wait_all(&manager, -1);
68
69         return 0;
70 }
71 */
72 /* -------------------------------------------------------------------- */
73
74
75 /**
76         @brief Create a new socket_node and add it to a socket_manager's list.
77         @param mgr Pointer to the socket_manager.
78         @param endpoint SERVER_SOCKET or CLIENT_SOCKET, denoting how the socket is to be used.
79         @param addr_type address type: INET or UNIX.
80         @param sock_fd sock_fd for the new socket_node.
81         @param parent_id parent_id for the new node.
82         @return Pointer to the new socket_node.
83
84         If @a parent_id is negative, the new socket_node receives a parent_id of 0.
85 */
86 static socket_node* _socket_add_node(socket_manager* mgr, 
87                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
88
89         if(mgr == NULL) return NULL;
90         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
91         socket_node* new_node = safe_malloc(sizeof(socket_node));
92
93         new_node->endpoint      = endpoint;
94         new_node->addr_type     = addr_type;
95         new_node->sock_fd       = sock_fd;
96         new_node->next          = NULL;
97         new_node->parent_id = 0;
98         if(parent_id > 0)
99                 new_node->parent_id = parent_id;
100
101         new_node->next                  = mgr->socket;
102         mgr->socket                             = new_node;
103         return new_node;
104 }
105
106 /**
107         @brief Create an TCP INET listener socket and add it to a socket_manager's list.
108         @param mgr Pointer to the socket manager that will own the socket.
109         @param port The port number to bind to.
110         @param listen_ip The IP address to bind to; or, NULL for INADDR_ANY.
111         @return The socket's file descriptor if successful; otherwise -1.
112
113         Calls: socket(), bind(), and listen().  Creates a SERVER_SOCKET.
114 */
115 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
116
117         if( mgr == NULL ) {
118                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr"); 
119                 return -1;
120         }
121
122         int sock_fd;
123         struct sockaddr_in server_addr;
124
125         server_addr.sin_family = AF_INET;
126
127         if(listen_ip != NULL) {
128                 struct in_addr addr;
129                 if( inet_aton( listen_ip, &addr ) )
130                         server_addr.sin_addr.s_addr = addr.s_addr;
131                 else {
132                         osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
133                         return -1;
134                 }
135         } else {
136                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
137         }
138
139         server_addr.sin_port = htons(port);
140
141         errno = 0;
142         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
143         if(sock_fd < 0) {
144                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
145                         strerror( errno ) );
146                 return -1;
147         }
148
149         errno = 0;
150         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
151                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
152                         port, strerror( errno ) );
153                 close( sock_fd );
154                 return -1;
155         }
156
157         errno = 0;
158         if(listen(sock_fd, 20) == -1) {
159                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
160                         strerror( errno ) );
161                 close( sock_fd );
162                 return -1;
163         }
164
165         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
166         return sock_fd;
167 }
168
169 /**
170         @brief Create a UNIX domain listener socket and add it to the socket_manager's list.
171         @param mgr Pointer to the socket_manager that will own the socket.
172         @param path Name of the socket within the file system.
173         @return The socket's file descriptor if successful; otherwise -1.
174
175         Calls: socket(), bind(), listen().  Creates a SERVER_SOCKET.
176
177         Apply socket option TCP_NODELAY in order to reduce latency.
178 */
179 int socket_open_unix_server(socket_manager* mgr, const char* path) {
180         if(mgr == NULL || path == NULL) return -1;
181
182         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
183         int sock_fd;
184         struct sockaddr_un server_addr;
185
186         if(strlen(path) > sizeof(server_addr.sun_path) - 1)
187         {
188                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
189                         path );
190                 return -1;
191         }
192
193         errno = 0;
194         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
195         if(sock_fd < 0){
196                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
197                         strerror( errno ) );
198                 return -1;
199         }
200
201         server_addr.sun_family = AF_UNIX;
202         strcpy(server_addr.sun_path, path);
203
204         errno = 0;
205         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
206                                 sizeof(struct sockaddr_un)) < 0) {
207                 osrfLogWarning( OSRF_LOG_MARK, 
208                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
209                         path, strerror( errno ) );
210                 close( sock_fd );
211                 return -1;
212         }
213
214         errno = 0;
215         if(listen(sock_fd, 20) == -1) {
216                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
217                         strerror( errno ) );
218                 close( sock_fd );
219                 return -1;
220         }
221
222         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
223         
224         int i = 1;
225
226         /* causing problems with router for some reason ... */
227         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
228         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
229         
230         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
231         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
232
233         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
234         return sock_fd;
235 }
236
237
238 /**
239         @brief Create a UDP socket for a server, and add it to a socket_manager's list.
240         @param mgr Pointer to the socket_manager that will own the socket.
241         @param port The port number to bind to.
242         @param listen_ip The IP address to bind to, or NULL for INADDR_ANY.
243         @return The socket's file descriptor if successful; otherwise -1.
244
245         Calls: socket(), bind().  Creates a SERVER_SOCKET.
246 */
247 int socket_open_udp_server( 
248                 socket_manager* mgr, int port, const char* listen_ip ) {
249
250         int sockfd;
251         struct sockaddr_in server_addr;
252
253         server_addr.sin_family = AF_INET;
254         server_addr.sin_port = htons(port);
255         if(listen_ip) {
256                 struct in_addr addr;
257                 if( inet_aton( listen_ip, &addr ) )
258                         server_addr.sin_addr.s_addr = addr.s_addr;
259                 else {
260                         osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
261                         return -1;
262                 }
263         } else
264                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
265
266         errno = 0;
267         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
268                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
269                 return -1;
270         }
271
272         errno = 0;
273         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
274                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
275                         port, strerror( errno ) );
276                 close( sockfd );
277                 return -1;
278         }
279
280         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
281         return sockfd;
282 }
283
284
285 /**
286         @brief Create a client TCP socket, connect with it, and add it to a socket_manager's list.
287         @param mgr Pointer to the socket_manager that will own the socket.
288         @param port What port number to connect to.
289         @param dest_addr Host name or IP address of the server to which we are connecting.
290         @return The socket's file descriptor if successful; otherwise -1.
291
292         Calls: getaddrinfo(), socket(), connect().  Creates a CLIENT_SOCKET.
293
294         Applies socket option TCP_NODELAY in order to reduce latency.
295 */
296 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
297
298         struct sockaddr_in remoteAddr;
299         int sock_fd;
300
301         // ------------------------------------------------------------------
302         // Get the IP address of the hostname (for TCP only)
303         // ------------------------------------------------------------------
304         struct addrinfo hints = { 0, 0, 0, 0, 0, NULL, NULL, NULL };
305         hints.ai_socktype = SOCK_STREAM;
306         struct addrinfo* addr_info = NULL;
307         errno = 0;
308         int rc = getaddrinfo( dest_addr, NULL, &hints, &addr_info );
309         if( rc || ! addr_info ) {
310                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): No Such Host => %s: %s",
311                         dest_addr, gai_strerror( rc ) );
312                 return -1;
313         }
314
315         // Look for an address supporting IPv4.  Someday we'll look for
316         // either IPv4 or IPv6, and branch according to what we find.
317         while( addr_info && addr_info->ai_family != PF_INET ) {
318                 addr_info = addr_info->ai_next;
319         }
320
321         if( ! addr_info ) {
322                 osrfLogWarning( OSRF_LOG_MARK,
323                         "socket_open_tcp_client(): Host %s does not support IPV4", dest_addr );
324                 return -1;      
325         }
326
327         // ------------------------------------------------------------------
328         // Create the socket
329         // ------------------------------------------------------------------
330         errno = 0;
331         if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
332                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot create TCP socket: %s",
333                         strerror( errno ) );
334                 return -1;
335         }
336
337         int i = 1;
338         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
339
340         // ------------------------------------------------------------------
341         // Construct server info struct
342         // ------------------------------------------------------------------
343         memset( &remoteAddr, 0, sizeof(remoteAddr));
344         remoteAddr.sin_family = AF_INET;
345         remoteAddr.sin_port = htons( port );
346         struct sockaddr_in* ai_addr_in = (struct sockaddr_in*) addr_info->ai_addr;
347         remoteAddr.sin_addr.s_addr = ai_addr_in->sin_addr.s_addr;
348
349         freeaddrinfo( addr_info );
350
351         // ------------------------------------------------------------------
352         // Connect to server
353         // ------------------------------------------------------------------
354         errno = 0;
355         if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
356                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
357                         dest_addr, strerror(errno) );
358                 close( sock_fd );
359                 return -1;
360         }
361
362         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
363
364         return sock_fd;
365 }
366
367
368 /**
369         @brief Create a client UDP socket and add it to a socket_manager's list.
370         @param mgr Pointer to the socket_manager that will own the socket.
371         @return The socket's file descriptor if successful; otherwise -1.
372
373         Calls: socket().  Creates a CLIENT_SOCKET.
374 */
375 int socket_open_udp_client( socket_manager* mgr ) {
376
377         int sockfd;
378
379         errno = 0;
380         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
381                 osrfLogWarning( OSRF_LOG_MARK,
382                         "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
383                 return -1;
384         }
385
386         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
387
388         return sockfd;
389 }
390
391
392 /**
393         @brief Create a UNIX domain client socket, connect with it, add it to the socket_manager's list
394         @param mgr Pointer to the socket_manager that will own the socket.
395         @param sock_path Name of the socket within the file system.
396         @return The socket's file descriptor if successful; otherwise -1.
397
398         Calls: socket(), connect().  Creates a CLIENT_SOCKET.
399 */
400 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
401
402         int sock_fd, len;
403         struct sockaddr_un usock;
404
405         if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
406         {
407                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
408                    sock_path );
409                 return -1;
410         }
411
412         errno = 0;
413         if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
414                 osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
415                 return -1;
416         }
417
418         usock.sun_family = AF_UNIX;
419         strcpy( usock.sun_path, sock_path );
420
421         len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
422
423         errno = 0;
424         if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
425                 osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
426                         strerror( errno ) );
427                 close( sock_fd );
428                 return -1;
429         }
430
431         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
432
433         return sock_fd;
434 }
435
436
437 /**
438         @brief Search a socket_manager's list for a socket node for a given file descriptor.
439         @param mgr Pointer to the socket manager.
440         @param sock_fd The file descriptor to be sought.
441         @return A pointer to the socket_node if found; otherwise NULL.
442
443         Traverse a linked list owned by the socket_manager.
444 */
445 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
446         if(mgr == NULL) return NULL;
447         socket_node* node = mgr->socket;
448         while(node) {
449                 if(node->sock_fd == sock_fd)
450                         return node;
451                 node = node->next;
452         }
453         return NULL;
454 }
455
456 /* removes the node with the given sock_fd from the list and frees it */
457 /**
458         @brief Remove a socket node for a given fd from a socket_manager's list.
459         @param mgr Pointer to the socket_manager.
460         @param sock_fd The file descriptor whose socket_node is to be removed.
461
462         This function does @em not close the socket.  It just removes a node from the list, and
463         frees it.  The disposition of the socket is the responsibility of the calling code.
464 */
465 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
466
467         if(mgr == NULL) return;
468
469         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
470
471         socket_node* head = mgr->socket;
472         socket_node* tail = head;
473         if(head == NULL) return;
474
475         /* if removing the first node in the list */
476         if(head->sock_fd == sock_fd) {
477                 mgr->socket = head->next;
478                 free(head);
479                 return;
480         }
481
482         head = head->next;
483
484         /* if removing any other node */
485         while(head) {
486                 if(head->sock_fd == sock_fd) {
487                         tail->next = head->next;
488                         free(head);
489                         return;
490                 }
491                 tail = head;
492                 head = head->next;
493         }
494 }
495
496
497 /**
498         @brief Write to the log: a list of socket_nodes in a socket_manager's list.
499         @param mgr Pointer to the socket_manager.
500
501         For testing and debugging.
502
503         The messages are issued as DEBG messages, and show each file descriptor and its parent.
504 */
505 void _socket_print_list(socket_manager* mgr) {
506         if(mgr == NULL) return;
507         socket_node* node = mgr->socket;
508         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
509         while(node) {
510                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d", 
511                                 node->sock_fd, node->parent_id);
512                 node = node->next;
513         }
514         osrfLogDebug( OSRF_LOG_MARK, "]");
515 }
516
517 /**
518         @brief Send a nul-terminated string over a socket.
519         @param sock_fd The file descriptor for the socket.
520         @param data Pointer to the string to be sent.
521         @return 0 if successful, -1 if not.
522
523         This function is a thin wrapper for _socket_send().
524 */
525 int socket_send(int sock_fd, const char* data) {
526         return _socket_send( sock_fd, data, 0);
527 }
528
529 /**
530         @brief Send a nul-terminated string over a socket.
531         @param sock_fd The file descriptor for the socket.
532         @param data Pointer to the string to be sent.
533         @param flags A set of bitflags to be passed to send().
534         @return 0 if successful, -1 if not.
535
536         This function is the final common pathway for all outgoing socket traffic.
537 */
538 static int _socket_send(int sock_fd, const char* data, int flags) {
539
540         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
541
542         errno = 0;
543         size_t r = send( sock_fd, data, strlen(data), flags );
544         int local_errno = errno;
545
546         if( r == -1 ) {
547                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
548                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
549                 return -1;
550         }
551
552         return 0;
553 }
554
555
556 /* sends the given data to the given socket. 
557  * sets the send flag MSG_DONTWAIT which will allow the 
558  * process to continue even if the socket buffer is full
559  * returns 0 on success, -1 otherwise */
560 //int socket_send_nowait( int sock_fd, const char* data) {
561 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
562 //}
563
564
565 /**
566         @brief Wait for a socket to be ready to send, and then send a string over it.
567         @param sock_fd File descriptor of the socket.
568         @param data Pointer to a nul-terminated string to be sent.
569         @param usecs How long to wait, in microseconds, before timing out.
570         @return 0 if successful, -1 if not.
571
572         The socket may not accept all the data we want to give it.
573 */
574 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
575
576         fd_set write_set;
577         FD_ZERO( &write_set );
578         FD_SET( sock_fd, &write_set );
579
580         const int mil = 1000000;
581         int secs = (int) usecs / mil;
582         usecs = usecs - (secs * mil);
583
584         struct timeval tv;
585         tv.tv_sec = secs;
586         tv.tv_usec = usecs;
587
588         errno = 0;
589         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
590         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
591
592         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
593                 "timed out on send for socket %d after %d secs, %d usecs: %s",
594                 sock_fd, secs, usecs, strerror( errno ) );
595
596         return -1;
597 }
598
599
600 /* disconnects the node with the given sock_fd and removes
601         it from the socket set */
602 /**
603         @brief Close a socket, and remove it from the socket_manager's list.
604         @param mgr Pointer to the socket_manager.
605         @param sock_fd File descriptor for the socket to be closed.
606
607         We close the socket before determining whether it belongs to the socket_manager in question.
608 */
609 void socket_disconnect(socket_manager* mgr, int sock_fd) {
610         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
611         close( sock_fd );
612         socket_remove_node(mgr, sock_fd);
613 }
614
615
616 /**
617         @brief Determine whether a socket is valid.
618         @param sock_fd File descriptor for the socket.
619         @return 1 if the socket is valid, or 0 if it isn't.
620
621         The test is based on a call to select().  If the socket is valid but is not ready to be
622         written to, we wait until it is ready, then return 1.
623
624         If the select() fails, it may be because it was interrupted by a signal.  In that case
625         we try again.  Otherwise we assume that the socket is no longer valid.  This can happen
626         if, for example, the other end of a connection has closed the connection.
627
628         The select() can also fail if it is unable to allocate enough memory for its own internal
629         use.  If that happens, we may erroneously report a valid socket as invalid, but we
630         probably wouldn't be able to use it anyway if we're that close to exhausting memory.
631 */
632 int socket_connected(int sock_fd) {
633         fd_set read_set;
634         FD_ZERO( &read_set );
635         FD_SET( sock_fd, &read_set );
636         while( 1 ) {
637                 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
638                         return 0;
639                 else if( EINTR == errno )
640                         continue;
641                 else
642                         return 1;
643         }
644 }
645
646 /**
647         @brief Look for input on a given socket.  If you find some, react to it.
648         @param mgr Pointer to the socket_manager that presumably owns the socket.
649         @param timeout Timeout interval, in seconds (see notes).
650         @param sock_fd The file descriptor to look at.
651         @return 0 if successful, or -1 if a timeout or other error occurs, or if the sender
652                 closes the connection.
653
654         If @a timeout is -1, wait indefinitely for input activity to appear.  If @a timeout is
655         zero, don't wait at all.  If @a timeout is positive, wait that number of seconds
656         before timing out.  If @a timeout has a negative value other than -1, the results are not
657         well defined, but we'll probably get an EINVAL error from select().
658
659         If we detect activity, branch on the type of socket:
660
661         - If it's a listener, accept a new connection, and add the new socket to the
662         socket_manager's list, without actually reading any data.
663         - Otherwise, read as much data as is available from the input socket, passing it a
664         buffer at a time to whatever callback function has been defined to the socket_manager.
665 */
666 int socket_wait( socket_manager* mgr, int timeout, int sock_fd ) {
667
668         int retval = 0;
669         fd_set read_set;
670         FD_ZERO( &read_set );
671         FD_SET( sock_fd, &read_set );
672
673         struct timeval tv;
674         tv.tv_sec = timeout;
675         tv.tv_usec = 0;
676         errno = 0;
677
678         if( timeout < 0 ) {
679
680                 // If timeout is -1, we block indefinitely
681                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
682                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
683                                         strerror(errno));
684                         return -1;
685                 }
686
687         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
688
689                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
690                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
691                                         strerror(errno));
692                         return -1;
693                 }
694         }
695
696         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
697
698         socket_node* node = socket_find_node(mgr, sock_fd);
699         if( node ) {
700                 if( node->endpoint == SERVER_SOCKET ) {
701                         _socket_handle_new_client( mgr, node );  // accept new connection
702                 } else {
703                         int status = _socket_handle_client_data( mgr, node );   // read data
704                         if( status == -1 ) {
705                                 socket_remove_node(mgr, sock_fd);
706                                 return -1;
707                         }
708                 }
709                 return 0;
710         }
711         else
712                 return -1;    // No such file descriptor for this socket_manager
713 }
714
715
716 /**
717         @brief Wait for input on all of a socket_manager's sockets; react to any input found.
718         @param mgr Pointer to the socket_manager.
719         @param timeout How many seconds to wait before timing out (see notes).
720         @return 0 if successful, or -1 if a timeout or other error occurs.
721
722         If @a timeout is -1, wait indefinitely for input activity to appear.  If @a timeout is
723         zero, don't wait at all.  If @a timeout is positive, wait that number of seconds
724         before timing out.  If @a timeout has a negative value other than -1, the results are not
725         well defined, but we'll probably get an EINVAL error from select().
726
727         For each active socket found:
728
729         - If it's a listener, accept a new connection, and add the new socket to the
730         socket_manager's list, without actually reading any data.
731         - Otherwise, read as much data as is available from the input socket, passing it a
732         buffer at a time to whatever callback function has been defined to the socket_manager.
733  */
734 int socket_wait_all(socket_manager* mgr, int timeout) {
735
736         if(mgr == NULL) {
737                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
738                 return -1;
739         }
740
741         int num_active = 0;
742         fd_set read_set;
743         FD_ZERO( &read_set );
744
745         socket_node* node = mgr->socket;
746         int max_fd = 0;
747         while(node) {
748                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
749                 FD_SET( node->sock_fd, &read_set );
750                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
751                 node = node->next;
752         }
753         max_fd += 1;
754
755         struct timeval tv;
756         tv.tv_sec = timeout;
757         tv.tv_usec = 0;
758         errno = 0;
759
760         if( timeout < 0 ) {  
761
762                 // If timeout is -1, there is no timeout passed to the call to select
763                 if( (num_active = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
764                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
765                         return -1;
766                 }
767
768         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
769
770                 if( (num_active = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
771                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
772                         return -1;
773                 }
774         }
775
776         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", num_active);
777         
778         node = mgr->socket;
779         int handled = 0;
780
781         while(node && (handled < num_active)) {
782
783                 socket_node* next_node = node->next;
784                 int sock_fd = node->sock_fd;
785
786                 /* does this socket have data? */
787                 if( FD_ISSET( sock_fd, &read_set ) ) {
788
789                         osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
790                         handled++;
791                         FD_CLR(sock_fd, &read_set);
792
793                         if(node->endpoint == SERVER_SOCKET) 
794                                 _socket_handle_new_client(mgr, node);
795
796                         else {
797                                 if( _socket_handle_client_data(mgr, node) == -1 ) {
798                                         /* someone may have yanked a socket_node out from under us */
799                                         socket_remove_node( mgr, sock_fd );
800                                 }
801                         }
802                 }
803
804                 node = next_node;
805         } // is_set
806
807         return 0;
808 }
809
810 /**
811         @brief Accept a new socket from a listener, and add it to the socket_manager's list.
812         @param mgr Pointer to the socket_manager that will own the new socket.
813         @param node Pointer to the socket_node for the listener socket.
814         @return 0 if successful, or -1 if not.
815
816         Call: accept().  Creates a CLIENT_SOCKET (even though the socket resides on the server).
817 */
818 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
819         if(mgr == NULL || node == NULL) return -1;
820
821         errno = 0;
822         int new_sock_fd;
823         new_sock_fd = accept(node->sock_fd, NULL, NULL);
824         if(new_sock_fd < 0) {
825                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
826                         strerror( errno ) );
827                 return -1;
828         }
829
830         if(node->addr_type == INET) {
831                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
832                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
833
834         } else if(node->addr_type == UNIX) {
835                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
836                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
837         }
838
839         return 0;
840 }
841
842
843 /**
844         @brief Receive data on a streaming socket.
845         @param mgr Pointer to the socket_manager that owns the socket_node.
846         @param node Pointer to the socket_node that owns the socket.
847         @return 0 if successful, or -1 upon failure.
848
849         Receive one or more buffers until no more bytes are available for receipt.  Add a
850         terminal nul to each buffer and pass it to a callback function previously defined by the
851         application to the socket_manager.
852
853         If the sender closes the connection, call another callback function, if one has been
854         defined.
855
856         Even when the function returns successfully, the received message may not be complete --
857         there may be more data that hasn't arrived yet.  It is the responsibility of the
858         calling code to recognize message boundaries.
859
860         Called only for a CLIENT_SOCKET.
861 */
862 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
863         if(mgr == NULL || node == NULL) return -1;
864
865         char buf[RBUFSIZE];
866         int read_bytes;
867         int sock_fd = node->sock_fd;
868
869         set_fl(sock_fd, O_NONBLOCK);
870
871         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n",
872                         (long) getpid(), get_timestamp_millis());
873
874         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
875                 buf[read_bytes] = '\0';
876                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s",
877                                 sock_fd, read_bytes, buf);
878                 if(mgr->data_received)
879                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
880         }
881         int local_errno = errno; /* capture errno as set by recv() */
882
883         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
884                 clr_fl(sock_fd, O_NONBLOCK);
885                 if(read_bytes < 0) {
886                         // EAGAIN would have meant that no more data was available
887                         if(local_errno != EAGAIN)   // but if that's not the case...
888                                 osrfLogWarning( OSRF_LOG_MARK, " * Error reading socket with error %s",
889                                         strerror(local_errno) );
890                 }
891
892         } else { return -1; } /* inform the caller that this node has been tampered with */
893
894         if(read_bytes == 0) {  /* socket closed by client */
895                 if(mgr->on_socket_closed) {
896                         mgr->on_socket_closed(mgr->blob, sock_fd);
897                 }
898                 return -1;
899         }
900
901         return 0;
902
903 }
904
905
906 /**
907         @brief Destroy a socket_manager, and close all of its sockets.
908         @param mgr Pointer to the socket_manager to be destroyed.
909 */
910 void socket_manager_free(socket_manager* mgr) {
911         if(mgr == NULL) return;
912         socket_node* tmp;
913         while(mgr->socket) {
914                 tmp = mgr->socket->next;
915                 socket_disconnect(mgr, mgr->socket->sock_fd);
916                 mgr->socket = tmp;
917         }
918         free(mgr);
919
920 }