]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/socket_bundle.c
6945fc48b9cd4491076d04c2fe97a427339ed753
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 #include <opensrf/socket_bundle.h>
2
3 struct socket_node_struct {
4         int endpoint;           /* SERVER_SOCKET or CLIENT_SOCKET */
5         int addr_type;          /* INET or UNIX */
6         int sock_fd;
7         int parent_id;          /* if we're a new client for a server socket, 
8         this points to the server socket we spawned from */
9         struct socket_node_struct* next;
10 };
11
12 /* buffer used to read from the sockets */
13 #define RBUFSIZE 1024
14
15 static socket_node* _socket_add_node(socket_manager* mgr,
16                 int endpoint, int addr_type, int sock_fd, int parent_id );
17 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
18 static void socket_remove_node(socket_manager*, int sock_fd);
19 static int _socket_send(int sock_fd, const char* data, int flags);
20 static int _socket_route_data(socket_manager* mgr, int num_active, fd_set* read_set);
21 static int _socket_route_data_id( socket_manager* mgr, int sock_id);
22 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
23 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
24
25
26 /* -------------------------------------------------------------------- 
27         Test Code 
28         -------------------------------------------------------------------- */
29 /*
30 int count = 0;
31 void printme(void* blob, socket_manager* mgr, 
32                 int sock_fd, char* data, int parent_id) {
33
34         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
35                         sock_fd, parent_id, data );
36
37         socket_send(sock_fd, data);
38
39         if(count++ > 2) {
40                 socket_disconnect(mgr, sock_fd);
41                 _socket_print_list(mgr);
42         }
43 }
44
45 int main(int argc, char* argv[]) {
46         socket_manager manager;
47         memset(&manager, 0, sizeof(socket_manager));
48         int port = 11000;
49         if(argv[1])
50                 port = atoi(argv[1]);
51
52         manager.data_received = &printme;
53         socket_open_tcp_server(&manager, port);
54
55         while(1)
56                 socket_wait_all(&manager, -1);
57
58         return 0;
59 }
60 */
61 /* -------------------------------------------------------------------- */
62
63
64 /**
65         @brief Create a new socket_node and add it to a socket_manager's list.
66         @param mgr Pointer to the socket_manager.
67         @param endpoint SERVER_SOCKET or CLIENT_SOCKET, denoting how the socket is to be used.
68         @param addr_type address type: INET or UNIX.
69         @param sock_fd sock_fd for the new socket_node.
70         @param parent_id parent_id for the new node.
71         @return Pointer to the new socket_node.
72
73         If @a parent_id is negative, the new socket_node receives a parent_id of 0.
74 */
75 static socket_node* _socket_add_node(socket_manager* mgr, 
76                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
77
78         if(mgr == NULL) return NULL;
79         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
80         socket_node* new_node = safe_malloc(sizeof(socket_node));
81
82         new_node->endpoint      = endpoint;
83         new_node->addr_type     = addr_type;
84         new_node->sock_fd       = sock_fd;
85         new_node->next          = NULL;
86         new_node->parent_id = 0;
87         if(parent_id > 0)
88                 new_node->parent_id = parent_id;
89
90         new_node->next                  = mgr->socket;
91         mgr->socket                             = new_node;
92         return new_node;
93 }
94
95 /**
96         @brief Create an TCP INET listener socket and add it to a socket_manager's list.
97         @param mgr Pointer to the socket manager that will own the socket.
98         @param port The port number to bind to.
99         @param listen_ip The IP address to bind to; or, NULL for INADDR_ANY.
100         @return The socket fd if successful; otherwise -1.
101
102         Calls: socket(), bind(), and listen().
103 */
104 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
105
106         if( mgr == NULL ) {
107                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr"); 
108                 return -1;
109         }
110
111         int sock_fd;
112         struct sockaddr_in server_addr;
113
114         server_addr.sin_family = AF_INET;
115
116         if(listen_ip != NULL) {
117                 struct in_addr addr;
118                 if( inet_aton( listen_ip, &addr ) )
119                         server_addr.sin_addr.s_addr = addr.s_addr;
120                 else {
121                         osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
122                         return -1;
123                 }
124         } else {
125                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
126         }
127
128         server_addr.sin_port = htons(port);
129
130         errno = 0;
131         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
132         if(sock_fd < 0) {
133                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
134                         strerror( errno ) );
135                 return -1;
136         }
137
138         errno = 0;
139         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
140                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
141                         port, strerror( errno ) );
142                 close( sock_fd );
143                 return -1;
144         }
145
146         errno = 0;
147         if(listen(sock_fd, 20) == -1) {
148                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
149                         strerror( errno ) );
150                 close( sock_fd );
151                 return -1;
152         }
153
154         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
155         return sock_fd;
156 }
157
158 /**
159         @brief Create a UNIX domain listener socket and add it to the socket_manager's list.
160         @param mgr Pointer to the socket_manager that will own the socket.
161         @param path Name of the socket within the file system.
162         @return The socket fd if successful; otherwise -1.
163
164         Calls: socket(), bind(), listen().
165 */
166 int socket_open_unix_server(socket_manager* mgr, const char* path) {
167         if(mgr == NULL || path == NULL) return -1;
168
169         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
170         int sock_fd;
171         struct sockaddr_un server_addr;
172
173         if(strlen(path) > sizeof(server_addr.sun_path) - 1)
174         {
175                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
176                         path );
177                 return -1;
178         }
179
180         errno = 0;
181         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
182         if(sock_fd < 0){
183                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
184                         strerror( errno ) );
185                 return -1;
186         }
187
188         server_addr.sun_family = AF_UNIX;
189         strcpy(server_addr.sun_path, path);
190
191         errno = 0;
192         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
193                                 sizeof(struct sockaddr_un)) < 0) {
194                 osrfLogWarning( OSRF_LOG_MARK, 
195                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
196                         path, strerror( errno ) );
197                 close( sock_fd );
198                 return -1;
199         }
200
201         errno = 0;
202         if(listen(sock_fd, 20) == -1) {
203                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
204                         strerror( errno ) );
205                 close( sock_fd );
206                 return -1;
207         }
208
209         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
210         
211         int i = 1;
212
213         /* causing problems with router for some reason ... */
214         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
215         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
216         
217         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
218         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
219
220         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
221         return sock_fd;
222 }
223
224
225 /**
226         @brief Create a UDP socket for a server, and add it to a socket_manager's list.
227         @param mgr Pointer to the socket_manager that will own the socket.
228         @param port The port number to bind to.
229         @param listen_ip The IP address to bind to, or NULL for INADDR_ANY.
230         @return The socket fd if successful; otherwise -1.
231
232         Calls: socket(), bind().
233 */
234 int socket_open_udp_server( 
235                 socket_manager* mgr, int port, const char* listen_ip ) {
236
237         int sockfd;
238         struct sockaddr_in server_addr;
239
240         server_addr.sin_family = AF_INET;
241         server_addr.sin_port = htons(port);
242         if(listen_ip) {
243                 struct in_addr addr;
244                 if( inet_aton( listen_ip, &addr ) )
245                         server_addr.sin_addr.s_addr = addr.s_addr;
246                 else {
247                         osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
248                         return -1;
249                 }
250         } else
251                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
252
253         errno = 0;
254         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
255                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
256                 return -1;
257         }
258
259         errno = 0;
260         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
261                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
262                         port, strerror( errno ) );
263                 close( sockfd );
264                 return -1;
265         }
266
267         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
268         return sockfd;
269 }
270
271
272 /**
273         @brief Create a client TCP socket, connect with it, and add it to a socket_manager's list.
274         @param mgr Pointer to the socket_manager that will own the socket.
275         @param port What port number to connect to.
276         @param dest_addr Host name or IP address of the server to which we are connecting.
277         @return The socket fd if successful; otherwise -1.
278
279         Calls: getaddrinfo(), socket(), connect().
280 */
281 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
282
283         struct sockaddr_in remoteAddr;
284         int sock_fd;
285
286         // ------------------------------------------------------------------
287         // Get the IP address of the hostname (for TCP only)
288         // ------------------------------------------------------------------
289         struct addrinfo hints = { 0, 0, 0, 0, 0, NULL, NULL, NULL };
290         hints.ai_socktype = SOCK_STREAM;
291         struct addrinfo* addr_info = NULL;
292         errno = 0;
293         int rc = getaddrinfo( dest_addr, NULL, &hints, &addr_info );
294         if( rc || ! addr_info ) {
295                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): No Such Host => %s: %s",
296                         dest_addr, gai_strerror( rc ) );
297                 return -1;
298         }
299
300         // Look for an address supporting IPv4.  Someday we'll look for
301         // either IPv4 or IPv6, and branch according to what we find.
302         while( addr_info && addr_info->ai_family != PF_INET ) {
303                 addr_info = addr_info->ai_next;
304         }
305
306         if( ! addr_info ) {
307                 osrfLogWarning( OSRF_LOG_MARK,
308                         "socket_open_tcp_client(): Host %s does not support IPV4", dest_addr );
309                 return -1;      
310         }
311
312         // ------------------------------------------------------------------
313         // Create the socket
314         // ------------------------------------------------------------------
315         errno = 0;
316         if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
317                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot create TCP socket: %s",
318                         strerror( errno ) );
319                 return -1;
320         }
321
322         int i = 1;
323         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
324
325         // ------------------------------------------------------------------
326         // Construct server info struct
327         // ------------------------------------------------------------------
328         memset( &remoteAddr, 0, sizeof(remoteAddr));
329         remoteAddr.sin_family = AF_INET;
330         remoteAddr.sin_port = htons( port );
331         struct sockaddr_in* ai_addr_in = (struct sockaddr_in*) addr_info->ai_addr;
332         remoteAddr.sin_addr.s_addr = ai_addr_in->sin_addr.s_addr;
333
334         freeaddrinfo( addr_info );
335
336         // ------------------------------------------------------------------
337         // Connect to server
338         // ------------------------------------------------------------------
339         errno = 0;
340         if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
341                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
342                         dest_addr, strerror(errno) );
343                 close( sock_fd );
344                 return -1;
345         }
346
347         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
348
349         return sock_fd;
350 }
351
352
353 /**
354         @brief Create a client UDP socket and add it to a socket_manager's list.
355         @param mgr Pointer to the socket_manager that will own the socket.
356         @return The socket fd if successful; otherwise -1.
357
358         Calls: socket().
359 */
360 int socket_open_udp_client( socket_manager* mgr ) {
361
362         int sockfd;
363
364         errno = 0;
365         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
366                 osrfLogWarning( OSRF_LOG_MARK,
367                         "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
368                 return -1;
369         }
370
371         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
372
373         return sockfd;
374 }
375
376
377 /**
378         @brief Create a UNIX domain client socket, connect with it, add it to the socket_manager's list
379         @param mgr Pointer to the socket_manager that will own the socket.
380         @param sock_path Name of the socket within the file system.
381         @return The socket fd if successful; otherwise -1.
382
383         Calls: socket(), connect().
384 */
385 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
386
387         int sock_fd, len;
388         struct sockaddr_un usock;
389
390         if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
391         {
392                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
393                    sock_path );
394                 return -1;
395         }
396
397         errno = 0;
398         if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
399                 osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
400                 return -1;
401         }
402
403         usock.sun_family = AF_UNIX;
404         strcpy( usock.sun_path, sock_path );
405
406         len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
407
408         errno = 0;
409         if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
410                 osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
411                         strerror( errno ) );
412                 close( sock_fd );
413                 return -1;
414         }
415
416         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
417
418         return sock_fd;
419 }
420
421
422 /**
423         @brief Search a socket_manager's list for a socket node for a given file descriptor.
424         @param mgr Pointer to the socket manager.
425         @param sock_fd The file descriptor to be sought.
426         @return A pointer to the socket_node if found; otherwise NULL.
427
428         Traverse a linked list owned by the socket_manager.
429 */
430 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
431         if(mgr == NULL) return NULL;
432         socket_node* node = mgr->socket;
433         while(node) {
434                 if(node->sock_fd == sock_fd)
435                         return node;
436                 node = node->next;
437         }
438         return NULL;
439 }
440
441 /* removes the node with the given sock_fd from the list and frees it */
442 /**
443         @brief Remove a socket node for a given fd from a socket_manager's list.
444         @param mgr Pointer to the socket_manager.
445         @param sock_fd The file descriptor whose socket_node is to be removed.
446
447         This function does @em not close the socket.  It just removes a node from the list, and
448         frees it.  It is the responsibility of the calling code to close the socket.
449 */
450 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
451
452         if(mgr == NULL) return;
453
454         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
455
456         socket_node* head = mgr->socket;
457         socket_node* tail = head;
458         if(head == NULL) return;
459
460         /* if removing the first node in the list */
461         if(head->sock_fd == sock_fd) {
462                 mgr->socket = head->next;
463                 free(head);
464                 return;
465         }
466
467         head = head->next;
468
469         /* if removing any other node */
470         while(head) {
471                 if(head->sock_fd == sock_fd) {
472                         tail->next = head->next;
473                         free(head);
474                         return;
475                 }
476                 tail = head;
477                 head = head->next;
478         }
479 }
480
481
482 /**
483         @brief Write to the log: a list of socket_nodes in a socket_manager's list.
484         @param mgr Pointer to the socket_manager.
485
486         For testing and debugging.
487
488         The messages are issued as DEBG messages, and show each file descriptor and its parent.
489 */
490 void _socket_print_list(socket_manager* mgr) {
491         if(mgr == NULL) return;
492         socket_node* node = mgr->socket;
493         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
494         while(node) {
495                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d", 
496                                 node->sock_fd, node->parent_id);
497                 node = node->next;
498         }
499         osrfLogDebug( OSRF_LOG_MARK, "]");
500 }
501
502 /* sends the given data to the given socket */
503 int socket_send(int sock_fd, const char* data) {
504         return _socket_send( sock_fd, data, 0);
505 }
506
507 /* utility method */
508 static int _socket_send(int sock_fd, const char* data, int flags) {
509
510         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
511
512         errno = 0;
513         size_t r = send( sock_fd, data, strlen(data), flags );
514         int local_errno = errno;
515         
516         if( r == -1 ) {
517                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
518                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
519                 return -1;
520         }
521
522         return 0;
523 }
524
525
526 /* sends the given data to the given socket. 
527  * sets the send flag MSG_DONTWAIT which will allow the 
528  * process to continue even if the socket buffer is full
529  * returns 0 on success, -1 otherwise */
530 //int socket_send_nowait( int sock_fd, const char* data) {
531 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
532 //}
533
534
535 /*
536  * Waits at most usecs microseconds for the send buffer of the given
537  * socket to accept new data.  This does not guarantee that the 
538  * socket will accept all the data we want to give it.
539  */
540 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
541
542         fd_set write_set;
543         FD_ZERO( &write_set );
544         FD_SET( sock_fd, &write_set );
545
546         int mil = 1000000;
547         int secs = (int) usecs / mil;
548         usecs = usecs - (secs * mil);
549
550         struct timeval tv;
551         tv.tv_sec = secs;
552         tv.tv_usec = usecs;
553
554         errno = 0;
555         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
556         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
557
558         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
559                 "timed out on send for socket %d after %d secs, %d usecs: %s",
560                 sock_fd, secs, usecs, strerror( errno ) );
561
562         return -1;
563 }
564
565
566 /* disconnects the node with the given sock_fd and removes
567         it from the socket set */
568 void socket_disconnect(socket_manager* mgr, int sock_fd) {
569         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
570         close( sock_fd );
571         socket_remove_node(mgr, sock_fd);
572 }
573
574
575 /* we assume that if select() fails, the socket is no longer valid */
576 int socket_connected(int sock_fd) {
577         fd_set read_set;
578         FD_ZERO( &read_set );
579         FD_SET( sock_fd, &read_set );
580         if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 ) 
581                 return 0;
582         return 1;
583
584 }
585
586 /* this only waits on the server socket and does not handle the actual
587         data coming in from the client..... XXX */
588 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
589
590         int retval = 0;
591         fd_set read_set;
592         FD_ZERO( &read_set );
593         FD_SET( sock_fd, &read_set );
594
595         struct timeval tv;
596         tv.tv_sec = timeout;
597         tv.tv_usec = 0;
598         errno = 0;
599
600         if( timeout < 0 ) {  
601
602                 // If timeout is -1, we block indefinitely
603                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
604                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
605                         return -1;
606                 }
607
608         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
609
610                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
611                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
612                         return -1;
613                 }
614         }
615
616         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
617         return _socket_route_data_id(mgr, sock_fd);
618 }
619
620
621 int socket_wait_all(socket_manager* mgr, int timeout) {
622
623         if(mgr == NULL) {
624                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
625                 return -1;
626         }
627
628         int retval = 0;
629         fd_set read_set;
630         FD_ZERO( &read_set );
631
632         socket_node* node = mgr->socket;
633         int max_fd = 0;
634         while(node) {
635                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
636                 FD_SET( node->sock_fd, &read_set );
637                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
638                 node = node->next;
639         }
640         max_fd += 1;
641
642         struct timeval tv;
643         tv.tv_sec = timeout;
644         tv.tv_usec = 0;
645         errno = 0;
646
647         if( timeout < 0 ) {  
648
649                 // If timeout is -1, there is no timeout passed to the call to select
650                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
651                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
652                         return -1;
653                 }
654
655         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
656
657                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
658                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
659                         return -1;
660                 }
661         }
662
663         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
664         return _socket_route_data(mgr, retval, &read_set);
665 }
666
667 /* iterates over the sockets in the set and handles active sockets.
668         new sockets connecting to server sockets cause the creation
669         of a new socket node.
670         Any new data read is is passed off to the data_received callback
671         as it arrives */
672 /* determines if we're receiving a new client or data
673         on an existing client */
674 static int _socket_route_data(
675         socket_manager* mgr, int num_active, fd_set* read_set) {
676
677         if(!(mgr && read_set)) return -1;
678
679         int last_failed_id = -1;
680
681
682         /* come back here if someone yanks a socket_node from beneath us */
683         while(1) {
684
685                 socket_node* node = mgr->socket;
686                 int handled = 0;
687                 int status = 0;
688                 
689                 while(node && (handled < num_active)) {
690         
691                         int sock_fd = node->sock_fd;
692                         
693                         if(last_failed_id != -1) {
694                                 /* in case it was not removed by our overlords */
695                                 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
696                                 socket_remove_node( mgr, last_failed_id );
697                                 last_failed_id = -1;
698                                 status = -1;
699                                 break;
700                         }
701         
702                         /* does this socket have data? */
703                         if( FD_ISSET( sock_fd, read_set ) ) {
704         
705                                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
706                                 handled++;
707                                 FD_CLR(sock_fd, read_set);
708         
709                                 if(node->endpoint == SERVER_SOCKET) 
710                                         _socket_handle_new_client(mgr, node);
711         
712                                 else
713                                         status = _socket_handle_client_data(mgr, node);
714         
715                                 /* someone may have yanked a socket_node out from under 
716                                         us...start over with the first socket */
717                                 if(status == -1)  {
718                                         last_failed_id = sock_fd;
719                                         osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
720                                                         "of -1 return code from _socket_handle_client_data()");
721                                 }
722                         }
723
724                         if(status == -1) break;
725                         node = node->next;
726
727                 } // is_set
728
729                 if(status == 0) break;
730                 if(status == -1) status = 0;
731         } 
732
733         return 0;
734 }
735
736
737 /* routes data from a single known socket */
738 static int _socket_route_data_id( socket_manager* mgr, int sock_id) {
739         socket_node* node = socket_find_node(mgr, sock_id);     
740         int status = 0;
741
742         if(node) {
743                 if(node->endpoint == SERVER_SOCKET) 
744                         _socket_handle_new_client(mgr, node);
745         
746                 if(node->endpoint == CLIENT_SOCKET ) 
747                         status = _socket_handle_client_data(mgr, node);
748
749                 if(status == -1) {
750                         socket_remove_node(mgr, sock_id);
751                         return -1;
752                 }
753                 return 0;
754         } 
755
756         return -1;
757 }
758
759
760 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
761         if(mgr == NULL || node == NULL) return -1;
762
763         errno = 0;
764         int new_sock_fd;
765         new_sock_fd = accept(node->sock_fd, NULL, NULL);
766         if(new_sock_fd < 0) {
767                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
768                         strerror( errno ) );
769                 return -1;
770         }
771
772         if(node->addr_type == INET) {
773                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
774                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
775
776         } else if(node->addr_type == UNIX) {
777                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
778                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
779         }
780
781         return 0;
782 }
783
784
785 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
786         if(mgr == NULL || node == NULL) return -1;
787
788         char buf[RBUFSIZE];
789         int read_bytes;
790         int sock_fd = node->sock_fd;
791
792         osrf_clearbuf(buf, sizeof(buf));
793         set_fl(sock_fd, O_NONBLOCK);
794
795         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n", (long) getpid(), get_timestamp_millis());
796
797         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
798                 buf[read_bytes] = '\0';
799                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
800                 if(mgr->data_received)
801                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
802
803                 osrf_clearbuf(buf, sizeof(buf));
804         }
805     int local_errno = errno; /* capture errno as set by recv() */
806
807         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
808                 clr_fl(sock_fd, O_NONBLOCK); 
809                 if(read_bytes < 0) { 
810                         if(local_errno != EAGAIN) 
811                                 osrfLogWarning(OSRF_LOG_MARK,  " * Error reading socket with error %s", strerror(local_errno));
812                 }
813
814         } else { return -1; } /* inform the caller that this node has been tampered with */
815
816         if(read_bytes == 0) {  /* socket closed by client */
817                 if(mgr->on_socket_closed) {
818                         mgr->on_socket_closed(mgr->blob, sock_fd);
819                 }
820                 return -1;
821         }
822
823         return 0;
824
825 }
826
827
828 void socket_manager_free(socket_manager* mgr) {
829         if(mgr == NULL) return;
830         socket_node* tmp;
831         while(mgr->socket) {
832                 tmp = mgr->socket->next;
833                 socket_disconnect(mgr, mgr->socket->sock_fd);
834                 mgr->socket = tmp;
835         }
836         free(mgr);
837
838 }
839