]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/socket_bundle.c
Eliminated _socket_route_data_id() as a separate function, incorporating
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 /**
2         @file socket_bundle.c
3         @brief Collection of socket-handling routines.
4 */
5
6 #include <opensrf/socket_bundle.h>
7
8 struct socket_node_struct {
9         int endpoint;           /* SERVER_SOCKET or CLIENT_SOCKET */
10         int addr_type;          /* INET or UNIX */
11         int sock_fd;
12         int parent_id;          /* if we're a new client for a server socket, 
13         this points to the server socket we spawned from */
14         struct socket_node_struct* next;
15 };
16
17 /* buffer used to read from the sockets */
18 #define RBUFSIZE 1024
19
20 static socket_node* _socket_add_node(socket_manager* mgr,
21                 int endpoint, int addr_type, int sock_fd, int parent_id );
22 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
23 static void socket_remove_node(socket_manager*, int sock_fd);
24 static int _socket_send(int sock_fd, const char* data, int flags);
25 static int _socket_route_data(socket_manager* mgr, int num_active, fd_set* read_set);
26 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
27 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
28
29
30 /* -------------------------------------------------------------------- 
31         Test Code 
32         -------------------------------------------------------------------- */
33 /*
34 int count = 0;
35 void printme(void* blob, socket_manager* mgr, 
36                 int sock_fd, char* data, int parent_id) {
37
38         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
39                         sock_fd, parent_id, data );
40
41         socket_send(sock_fd, data);
42
43         if(count++ > 2) {
44                 socket_disconnect(mgr, sock_fd);
45                 _socket_print_list(mgr);
46         }
47 }
48
49 int main(int argc, char* argv[]) {
50         socket_manager manager;
51         memset(&manager, 0, sizeof(socket_manager));
52         int port = 11000;
53         if(argv[1])
54                 port = atoi(argv[1]);
55
56         manager.data_received = &printme;
57         socket_open_tcp_server(&manager, port);
58
59         while(1)
60                 socket_wait_all(&manager, -1);
61
62         return 0;
63 }
64 */
65 /* -------------------------------------------------------------------- */
66
67
68 /**
69         @brief Create a new socket_node and add it to a socket_manager's list.
70         @param mgr Pointer to the socket_manager.
71         @param endpoint SERVER_SOCKET or CLIENT_SOCKET, denoting how the socket is to be used.
72         @param addr_type address type: INET or UNIX.
73         @param sock_fd sock_fd for the new socket_node.
74         @param parent_id parent_id for the new node.
75         @return Pointer to the new socket_node.
76
77         If @a parent_id is negative, the new socket_node receives a parent_id of 0.
78 */
79 static socket_node* _socket_add_node(socket_manager* mgr, 
80                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
81
82         if(mgr == NULL) return NULL;
83         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
84         socket_node* new_node = safe_malloc(sizeof(socket_node));
85
86         new_node->endpoint      = endpoint;
87         new_node->addr_type     = addr_type;
88         new_node->sock_fd       = sock_fd;
89         new_node->next          = NULL;
90         new_node->parent_id = 0;
91         if(parent_id > 0)
92                 new_node->parent_id = parent_id;
93
94         new_node->next                  = mgr->socket;
95         mgr->socket                             = new_node;
96         return new_node;
97 }
98
99 /**
100         @brief Create an TCP INET listener socket and add it to a socket_manager's list.
101         @param mgr Pointer to the socket manager that will own the socket.
102         @param port The port number to bind to.
103         @param listen_ip The IP address to bind to; or, NULL for INADDR_ANY.
104         @return The socket fd if successful; otherwise -1.
105
106         Calls: socket(), bind(), and listen().  Creates a SERVER_SOCKET.
107 */
108 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
109
110         if( mgr == NULL ) {
111                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr"); 
112                 return -1;
113         }
114
115         int sock_fd;
116         struct sockaddr_in server_addr;
117
118         server_addr.sin_family = AF_INET;
119
120         if(listen_ip != NULL) {
121                 struct in_addr addr;
122                 if( inet_aton( listen_ip, &addr ) )
123                         server_addr.sin_addr.s_addr = addr.s_addr;
124                 else {
125                         osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
126                         return -1;
127                 }
128         } else {
129                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
130         }
131
132         server_addr.sin_port = htons(port);
133
134         errno = 0;
135         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
136         if(sock_fd < 0) {
137                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
138                         strerror( errno ) );
139                 return -1;
140         }
141
142         errno = 0;
143         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
144                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
145                         port, strerror( errno ) );
146                 close( sock_fd );
147                 return -1;
148         }
149
150         errno = 0;
151         if(listen(sock_fd, 20) == -1) {
152                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
153                         strerror( errno ) );
154                 close( sock_fd );
155                 return -1;
156         }
157
158         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
159         return sock_fd;
160 }
161
162 /**
163         @brief Create a UNIX domain listener socket and add it to the socket_manager's list.
164         @param mgr Pointer to the socket_manager that will own the socket.
165         @param path Name of the socket within the file system.
166         @return The socket fd if successful; otherwise -1.
167
168         Calls: socket(), bind(), listen().  Creates a SERVER_SOCKET.
169
170         Applies socket option TCP_NODELAY in order to reduce latency.
171  */
172 int socket_open_unix_server(socket_manager* mgr, const char* path) {
173         if(mgr == NULL || path == NULL) return -1;
174
175         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
176         int sock_fd;
177         struct sockaddr_un server_addr;
178
179         if(strlen(path) > sizeof(server_addr.sun_path) - 1)
180         {
181                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
182                         path );
183                 return -1;
184         }
185
186         errno = 0;
187         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
188         if(sock_fd < 0){
189                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
190                         strerror( errno ) );
191                 return -1;
192         }
193
194         server_addr.sun_family = AF_UNIX;
195         strcpy(server_addr.sun_path, path);
196
197         errno = 0;
198         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
199                                 sizeof(struct sockaddr_un)) < 0) {
200                 osrfLogWarning( OSRF_LOG_MARK, 
201                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
202                         path, strerror( errno ) );
203                 close( sock_fd );
204                 return -1;
205         }
206
207         errno = 0;
208         if(listen(sock_fd, 20) == -1) {
209                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
210                         strerror( errno ) );
211                 close( sock_fd );
212                 return -1;
213         }
214
215         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
216         
217         int i = 1;
218
219         /* causing problems with router for some reason ... */
220         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
221         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
222         
223         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
224         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
225
226         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
227         return sock_fd;
228 }
229
230
231 /**
232         @brief Create a UDP socket for a server, and add it to a socket_manager's list.
233         @param mgr Pointer to the socket_manager that will own the socket.
234         @param port The port number to bind to.
235         @param listen_ip The IP address to bind to, or NULL for INADDR_ANY.
236         @return The socket fd if successful; otherwise -1.
237
238         Calls: socket(), bind().  Creates a SERVER_SOCKET.
239 */
240 int socket_open_udp_server( 
241                 socket_manager* mgr, int port, const char* listen_ip ) {
242
243         int sockfd;
244         struct sockaddr_in server_addr;
245
246         server_addr.sin_family = AF_INET;
247         server_addr.sin_port = htons(port);
248         if(listen_ip) {
249                 struct in_addr addr;
250                 if( inet_aton( listen_ip, &addr ) )
251                         server_addr.sin_addr.s_addr = addr.s_addr;
252                 else {
253                         osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
254                         return -1;
255                 }
256         } else
257                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
258
259         errno = 0;
260         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
261                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
262                 return -1;
263         }
264
265         errno = 0;
266         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
267                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
268                         port, strerror( errno ) );
269                 close( sockfd );
270                 return -1;
271         }
272
273         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
274         return sockfd;
275 }
276
277
278 /**
279         @brief Create a client TCP socket, connect with it, and add it to a socket_manager's list.
280         @param mgr Pointer to the socket_manager that will own the socket.
281         @param port What port number to connect to.
282         @param dest_addr Host name or IP address of the server to which we are connecting.
283         @return The socket fd if successful; otherwise -1.
284
285         Calls: getaddrinfo(), socket(), connect().  Creates a CLIENT_SOCKET.
286
287         Applies socket option TCP_NODELAY in order to reduce latency.
288  */
289 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
290
291         struct sockaddr_in remoteAddr;
292         int sock_fd;
293
294         // ------------------------------------------------------------------
295         // Get the IP address of the hostname (for TCP only)
296         // ------------------------------------------------------------------
297         struct addrinfo hints = { 0, 0, 0, 0, 0, NULL, NULL, NULL };
298         hints.ai_socktype = SOCK_STREAM;
299         struct addrinfo* addr_info = NULL;
300         errno = 0;
301         int rc = getaddrinfo( dest_addr, NULL, &hints, &addr_info );
302         if( rc || ! addr_info ) {
303                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): No Such Host => %s: %s",
304                         dest_addr, gai_strerror( rc ) );
305                 return -1;
306         }
307
308         // Look for an address supporting IPv4.  Someday we'll look for
309         // either IPv4 or IPv6, and branch according to what we find.
310         while( addr_info && addr_info->ai_family != PF_INET ) {
311                 addr_info = addr_info->ai_next;
312         }
313
314         if( ! addr_info ) {
315                 osrfLogWarning( OSRF_LOG_MARK,
316                         "socket_open_tcp_client(): Host %s does not support IPV4", dest_addr );
317                 return -1;      
318         }
319
320         // ------------------------------------------------------------------
321         // Create the socket
322         // ------------------------------------------------------------------
323         errno = 0;
324         if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
325                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot create TCP socket: %s",
326                         strerror( errno ) );
327                 return -1;
328         }
329
330         int i = 1;
331         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
332
333         // ------------------------------------------------------------------
334         // Construct server info struct
335         // ------------------------------------------------------------------
336         memset( &remoteAddr, 0, sizeof(remoteAddr));
337         remoteAddr.sin_family = AF_INET;
338         remoteAddr.sin_port = htons( port );
339         struct sockaddr_in* ai_addr_in = (struct sockaddr_in*) addr_info->ai_addr;
340         remoteAddr.sin_addr.s_addr = ai_addr_in->sin_addr.s_addr;
341
342         freeaddrinfo( addr_info );
343
344         // ------------------------------------------------------------------
345         // Connect to server
346         // ------------------------------------------------------------------
347         errno = 0;
348         if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
349                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
350                         dest_addr, strerror(errno) );
351                 close( sock_fd );
352                 return -1;
353         }
354
355         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
356
357         return sock_fd;
358 }
359
360
361 /**
362         @brief Create a client UDP socket and add it to a socket_manager's list.
363         @param mgr Pointer to the socket_manager that will own the socket.
364         @return The socket fd if successful; otherwise -1.
365
366         Calls: socket().  Creates a CLIENT_SOCKET.
367 */
368 int socket_open_udp_client( socket_manager* mgr ) {
369
370         int sockfd;
371
372         errno = 0;
373         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
374                 osrfLogWarning( OSRF_LOG_MARK,
375                         "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
376                 return -1;
377         }
378
379         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
380
381         return sockfd;
382 }
383
384
385 /**
386         @brief Create a UNIX domain client socket, connect with it, add it to the socket_manager's list
387         @param mgr Pointer to the socket_manager that will own the socket.
388         @param sock_path Name of the socket within the file system.
389         @return The socket fd if successful; otherwise -1.
390
391         Calls: socket(), connect().  Creates a CLIENT_SOCKET.
392 */
393 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
394
395         int sock_fd, len;
396         struct sockaddr_un usock;
397
398         if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
399         {
400                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
401                    sock_path );
402                 return -1;
403         }
404
405         errno = 0;
406         if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
407                 osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
408                 return -1;
409         }
410
411         usock.sun_family = AF_UNIX;
412         strcpy( usock.sun_path, sock_path );
413
414         len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
415
416         errno = 0;
417         if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
418                 osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
419                         strerror( errno ) );
420                 close( sock_fd );
421                 return -1;
422         }
423
424         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
425
426         return sock_fd;
427 }
428
429
430 /**
431         @brief Search a socket_manager's list for a socket node for a given file descriptor.
432         @param mgr Pointer to the socket manager.
433         @param sock_fd The file descriptor to be sought.
434         @return A pointer to the socket_node if found; otherwise NULL.
435
436         Traverse a linked list owned by the socket_manager.
437 */
438 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
439         if(mgr == NULL) return NULL;
440         socket_node* node = mgr->socket;
441         while(node) {
442                 if(node->sock_fd == sock_fd)
443                         return node;
444                 node = node->next;
445         }
446         return NULL;
447 }
448
449 /* removes the node with the given sock_fd from the list and frees it */
450 /**
451         @brief Remove a socket node for a given fd from a socket_manager's list.
452         @param mgr Pointer to the socket_manager.
453         @param sock_fd The file descriptor whose socket_node is to be removed.
454
455         This function does @em not close the socket.  It just removes a node from the list, and
456         frees it.  The disposition of the socket is the responsibility of the calling code.
457 */
458 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
459
460         if(mgr == NULL) return;
461
462         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
463
464         socket_node* head = mgr->socket;
465         socket_node* tail = head;
466         if(head == NULL) return;
467
468         /* if removing the first node in the list */
469         if(head->sock_fd == sock_fd) {
470                 mgr->socket = head->next;
471                 free(head);
472                 return;
473         }
474
475         head = head->next;
476
477         /* if removing any other node */
478         while(head) {
479                 if(head->sock_fd == sock_fd) {
480                         tail->next = head->next;
481                         free(head);
482                         return;
483                 }
484                 tail = head;
485                 head = head->next;
486         }
487 }
488
489
490 /**
491         @brief Write to the log: a list of socket_nodes in a socket_manager's list.
492         @param mgr Pointer to the socket_manager.
493
494         For testing and debugging.
495
496         The messages are issued as DEBG messages, and show each file descriptor and its parent.
497 */
498 void _socket_print_list(socket_manager* mgr) {
499         if(mgr == NULL) return;
500         socket_node* node = mgr->socket;
501         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
502         while(node) {
503                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d", 
504                                 node->sock_fd, node->parent_id);
505                 node = node->next;
506         }
507         osrfLogDebug( OSRF_LOG_MARK, "]");
508 }
509
510 /**
511         @brief Send a nul-terminated string over a socket.
512         @param sock_fd The file descriptor for the socket.
513         @param data Pointer to the string to be sent.
514         @return 0 if successful, -1 if not.
515
516         This function is a thin wrapper for _socket_send().
517 */
518 int socket_send(int sock_fd, const char* data) {
519         return _socket_send( sock_fd, data, 0);
520 }
521
522 /**
523         @brief Send a nul-terminated string over a socket.
524         @param sock_fd The file descriptor for the socket.
525         @param data Pointer to the string to be sent.
526         @param flags A set of bitflags to be passed to send().
527         @return 0 if successful, -1 if not.
528
529         This function is the final common pathway for all outgoing socket traffic.
530 */
531 static int _socket_send(int sock_fd, const char* data, int flags) {
532
533         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
534
535         errno = 0;
536         size_t r = send( sock_fd, data, strlen(data), flags );
537         int local_errno = errno;
538
539         if( r == -1 ) {
540                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
541                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
542                 return -1;
543         }
544
545         return 0;
546 }
547
548
549 /* sends the given data to the given socket. 
550  * sets the send flag MSG_DONTWAIT which will allow the 
551  * process to continue even if the socket buffer is full
552  * returns 0 on success, -1 otherwise */
553 //int socket_send_nowait( int sock_fd, const char* data) {
554 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
555 //}
556
557
558 /**
559         @brief Wait for a socket to be ready to send, and then send a string over it.
560         @param sock_fd File descriptor of the socket.
561         @param data Pointer to a nul-terminated string to be sent.
562         @param usecs How long to wait, in microseconds, before timing out.
563         @return 0 if successful, -1 if not.
564
565         The socket may not accept all the data we want to give it.
566 */
567 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
568
569         fd_set write_set;
570         FD_ZERO( &write_set );
571         FD_SET( sock_fd, &write_set );
572
573         const int mil = 1000000;
574         int secs = (int) usecs / mil;
575         usecs = usecs - (secs * mil);
576
577         struct timeval tv;
578         tv.tv_sec = secs;
579         tv.tv_usec = usecs;
580
581         errno = 0;
582         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
583         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
584
585         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
586                 "timed out on send for socket %d after %d secs, %d usecs: %s",
587                 sock_fd, secs, usecs, strerror( errno ) );
588
589         return -1;
590 }
591
592
593 /* disconnects the node with the given sock_fd and removes
594         it from the socket set */
595 /**
596         @brief Close a socket, and remove it from the socket_manager's list.
597         @param mgr Pointer to the socket_manager.
598         @param sock_fd File descriptor for the socket to be closed.
599
600         We close the socket before determining whether it belongs to the socket_manager in question.
601 */
602 void socket_disconnect(socket_manager* mgr, int sock_fd) {
603         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
604         close( sock_fd );
605         socket_remove_node(mgr, sock_fd);
606 }
607
608
609 /**
610         @brief Determine whether a socket is valid.
611         @param sock_fd File descriptor for the socket.
612         @return 1 if the socket is valid, or 0 if it isn't.
613
614         The test is based on a call to select().  If the socket is valid but is not ready to be
615         written to, we wait until it is ready, then return 1.
616
617         If the select() fails, it may be because it was interrupted by a signal.  In that case
618         we try again.  Otherwise we assume that the socket is no longer valid.  This can happen
619         if, for example, the other end of a connection has closed the connection.
620
621         The select() can also fail if it is unable to allocate enough memory for its own internal
622         use.  If that happens, we may erroneously report a valid socket as invalid, but we
623         probably wouldn't be able to use it anyway if we're that close to exhausting memory.
624 */
625 int socket_connected(int sock_fd) {
626         fd_set read_set;
627         FD_ZERO( &read_set );
628         FD_SET( sock_fd, &read_set );
629         while( 1 ) {
630                 if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 )
631                         return 0;
632                 else if( EINTR == errno )
633                         continue;
634                 else
635                         return 1;
636         }
637 }
638
639 /**
640         @brief Look for input on a given socket.  If you find some, react to it.
641         @param mgr Pointer to the socket_manager that presumably owns the socket.
642         @param timeout Timeout interval, in seconds (see notes).
643         @param sock_fd The file descriptor to look at.
644         @return 0 if successful, or -1 if a timeout or other error occurs.
645
646         If @a timeout is -1, wait indefinitely for input activity to appear.  If @a timeout is
647         zero, don't wait at all.  If @a timeout is positive, wait that number of seconds
648         before timing out.  If @a timeout has a negative value other than -1, the results are not
649         well defined, but we'll probably get an EINVAL error from select().
650
651         If we detect activity, branch on the type of socket:
652
653         - If it's a listener, accept a new connection, and add the new socket to the
654         socket_manager's list, without actually reading any data.
655         - Otherwise, read as much data as is available from the input socket, passing it a
656         buffer at a time to whatever callback function has been defined to the socket_manager.
657 */
658 int socket_wait( socket_manager* mgr, int timeout, int sock_fd ) {
659
660         int retval = 0;
661         fd_set read_set;
662         FD_ZERO( &read_set );
663         FD_SET( sock_fd, &read_set );
664
665         struct timeval tv;
666         tv.tv_sec = timeout;
667         tv.tv_usec = 0;
668         errno = 0;
669
670         if( timeout < 0 ) {
671
672                 // If timeout is -1, we block indefinitely
673                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
674                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
675                                         strerror(errno));
676                         return -1;
677                 }
678
679         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
680
681                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
682                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s",
683                                         strerror(errno));
684                         return -1;
685                 }
686         }
687
688         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
689
690         socket_node* node = socket_find_node(mgr, sock_fd);
691         if( node ) {
692                 if( node->endpoint == SERVER_SOCKET ) {
693                         _socket_handle_new_client( mgr, node );  // accept new connection
694                 } else {
695                         int status = _socket_handle_client_data( mgr, node );   // read data
696                         if( status == -1 ) {
697                                 socket_remove_node(mgr, sock_fd);
698                                 return -1;
699                         }
700                 }
701                 return 0;
702         }
703         else
704                 return -1;    // No such file descriptor for this socket_manager
705 }
706
707
708 int socket_wait_all(socket_manager* mgr, int timeout) {
709
710         if(mgr == NULL) {
711                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
712                 return -1;
713         }
714
715         int retval = 0;
716         fd_set read_set;
717         FD_ZERO( &read_set );
718
719         socket_node* node = mgr->socket;
720         int max_fd = 0;
721         while(node) {
722                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
723                 FD_SET( node->sock_fd, &read_set );
724                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
725                 node = node->next;
726         }
727         max_fd += 1;
728
729         struct timeval tv;
730         tv.tv_sec = timeout;
731         tv.tv_usec = 0;
732         errno = 0;
733
734         if( timeout < 0 ) {  
735
736                 // If timeout is -1, there is no timeout passed to the call to select
737                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
738                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
739                         return -1;
740                 }
741
742         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
743
744                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
745                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
746                         return -1;
747                 }
748         }
749
750         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
751         return _socket_route_data(mgr, retval, &read_set);
752 }
753
754 /* iterates over the sockets in the set and handles active sockets.
755         new sockets connecting to server sockets cause the creation
756         of a new socket node.
757         Any new data read is is passed off to the data_received callback
758         as it arrives */
759 /* determines if we're receiving a new client or data
760         on an existing client */
761 static int _socket_route_data(
762         socket_manager* mgr, int num_active, fd_set* read_set) {
763
764         if(!(mgr && read_set)) return -1;
765
766         int last_failed_id = -1;
767
768
769         /* come back here if someone yanks a socket_node from beneath us */
770         while(1) {
771
772                 socket_node* node = mgr->socket;
773                 int handled = 0;
774                 int status = 0;
775                 
776                 while(node && (handled < num_active)) {
777         
778                         int sock_fd = node->sock_fd;
779                         
780                         if(last_failed_id != -1) {
781                                 /* in case it was not removed by our overlords */
782                                 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
783                                 socket_remove_node( mgr, last_failed_id );
784                                 last_failed_id = -1;
785                                 status = -1;
786                                 break;
787                         }
788         
789                         /* does this socket have data? */
790                         if( FD_ISSET( sock_fd, read_set ) ) {
791         
792                                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
793                                 handled++;
794                                 FD_CLR(sock_fd, read_set);
795         
796                                 if(node->endpoint == SERVER_SOCKET) 
797                                         _socket_handle_new_client(mgr, node);
798         
799                                 else
800                                         status = _socket_handle_client_data(mgr, node);
801         
802                                 /* someone may have yanked a socket_node out from under 
803                                         us...start over with the first socket */
804                                 if(status == -1)  {
805                                         last_failed_id = sock_fd;
806                                         osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
807                                                         "of -1 return code from _socket_handle_client_data()");
808                                 }
809                         }
810
811                         if(status == -1) break;
812                         node = node->next;
813
814                 } // is_set
815
816                 if(status == 0) break;
817                 if(status == -1) status = 0;
818         } 
819
820         return 0;
821 }
822
823
824 /**
825         @brief Accept a new socket from a listener, and add it to the socket_manager's list.
826         @param mgr Pointer to the socket_manager that will own the new socket.
827         @param node Pointer to the socket_node for the listener socket.
828         @return 0 if successful, or -1 if not.
829
830         Call: accept().  Creates a CLIENT_SOCKET (even though the socket resides on the server).
831 */
832 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
833         if(mgr == NULL || node == NULL) return -1;
834
835         errno = 0;
836         int new_sock_fd;
837         new_sock_fd = accept(node->sock_fd, NULL, NULL);
838         if(new_sock_fd < 0) {
839                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
840                         strerror( errno ) );
841                 return -1;
842         }
843
844         if(node->addr_type == INET) {
845                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
846                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
847
848         } else if(node->addr_type == UNIX) {
849                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
850                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
851         }
852
853         return 0;
854 }
855
856
857 /**
858         @brief Receive data on a streaming socket.
859         @param mgr Pointer to the socket_manager that owns the socket_node.
860         @param node Pointer to the socket_node that owns the socket.
861         @return 0 if successful, or -1 upon failure.
862
863         Receive one or more buffers until no more bytes are available for receipt.  Add a
864         terminal nul to each buffer and pass it to a callback function previously defined by the
865         application to the socket_manager.
866
867         If the sender closes the connection, call another callback function, if one has been
868         defined.
869
870         Even when the function returns successfully, the received message may not be complete --
871         there may be more data that hasn't arrived yet.  It is the responsibility of the
872         calling code to recognize message boundaries.
873
874         Called only for a CLIENT_SOCKET.
875 */
876 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
877         if(mgr == NULL || node == NULL) return -1;
878
879         char buf[RBUFSIZE];
880         int read_bytes;
881         int sock_fd = node->sock_fd;
882
883         set_fl(sock_fd, O_NONBLOCK);
884
885         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n",
886                         (long) getpid(), get_timestamp_millis());
887
888         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
889                 buf[read_bytes] = '\0';
890                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s",
891                                 sock_fd, read_bytes, buf);
892                 if(mgr->data_received)
893                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
894         }
895         int local_errno = errno; /* capture errno as set by recv() */
896
897         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
898                 clr_fl(sock_fd, O_NONBLOCK);
899                 if(read_bytes < 0) {
900                         // EAGAIN would have meant that no more data was available
901                         if(local_errno != EAGAIN)   // but if that's not the case...
902                                 osrfLogWarning( OSRF_LOG_MARK, " * Error reading socket with error %s",
903                                         strerror(local_errno) );
904                 }
905
906         } else { return -1; } /* inform the caller that this node has been tampered with */
907
908         if(read_bytes == 0) {  /* socket closed by client */
909                 if(mgr->on_socket_closed) {
910                         mgr->on_socket_closed(mgr->blob, sock_fd);
911                 }
912                 return -1;
913         }
914
915         return 0;
916
917 }
918
919
920 /**
921         @brief Destroy a socket_manager, and close all of its sockets.
922         @param mgr Pointer to the socket_manager to be destroyed.
923 */
924 void socket_manager_free(socket_manager* mgr) {
925         if(mgr == NULL) return;
926         socket_node* tmp;
927         while(mgr->socket) {
928                 tmp = mgr->socket->next;
929                 socket_disconnect(mgr, mgr->socket->sock_fd);
930                 mgr->socket = tmp;
931         }
932         free(mgr);
933
934 }
935