Parially, a patch from Scott McKellar:
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 #include <opensrf/socket_bundle.h>
2
3 /* buffer used to read from the sockets */
4 #define RBUFSIZE 1024
5
6 static socket_node* _socket_add_node(socket_manager* mgr,
7                 int endpoint, int addr_type, int sock_fd, int parent_id );
8 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
9 static void socket_remove_node(socket_manager*, int sock_fd);
10 static int _socket_send(int sock_fd, const char* data, int flags);
11 static int _socket_route_data(socket_manager* mgr, int num_active, fd_set* read_set);
12 static int _socket_route_data_id( socket_manager* mgr, int sock_id);
13 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
14 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
15
16
17 /* -------------------------------------------------------------------- 
18         Test Code 
19         -------------------------------------------------------------------- */
20 /*
21 int count = 0;
22 void printme(void* blob, socket_manager* mgr, 
23                 int sock_fd, char* data, int parent_id) {
24
25         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
26                         sock_fd, parent_id, data );
27
28         socket_send(sock_fd, data);
29
30         if(count++ > 2) {
31                 socket_disconnect(mgr, sock_fd);
32                 _socket_print_list(mgr);
33         }
34 }
35
36 int main(int argc, char* argv[]) {
37         socket_manager manager;
38         memset(&manager, 0, sizeof(socket_manager));
39         int port = 11000;
40         if(argv[1])
41                 port = atoi(argv[1]);
42
43         manager.data_received = &printme;
44         socket_open_tcp_server(&manager, port);
45
46         while(1)
47                 socket_wait_all(&manager, -1);
48
49         return 0;
50 }
51 */
52 /* -------------------------------------------------------------------- */
53
54
55 /* allocates and inserts a new socket node into the nodeset.
56         if parent_id is positive and non-zero, it will be set */
57 static socket_node* _socket_add_node(socket_manager* mgr, 
58                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
59
60         if(mgr == NULL) return NULL;
61         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
62         socket_node* new_node = safe_malloc(sizeof(socket_node));
63
64         new_node->endpoint      = endpoint;
65         new_node->addr_type     = addr_type;
66         new_node->sock_fd               = sock_fd;
67         new_node->next                  = NULL;
68         new_node->parent_id = 0;
69         if(parent_id > 0)
70                 new_node->parent_id = parent_id;
71
72         new_node->next                  = mgr->socket;
73         mgr->socket                             = new_node;
74         return new_node;
75 }
76
77 /* creates a new server socket node and adds it to the socket set.
78         returns new socket fd on success.  -1 on failure.
79         socket_type is one of INET or UNIX  */
80 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
81
82         if( mgr == NULL ) {
83                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr"); 
84                 return -1;
85         }
86
87         int sock_fd;
88         struct sockaddr_in server_addr;
89
90         errno = 0;
91         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
92         if(sock_fd < 0) {
93                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
94                         strerror( errno ) );
95                 return -1;
96         }
97
98         server_addr.sin_family = AF_INET;
99
100         if(listen_ip != NULL) {
101                 struct in_addr addr;
102                 if( inet_aton( listen_ip, &addr ) )
103                         server_addr.sin_addr.s_addr = addr.s_addr;
104                 else {
105                         osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
106                         return -1;
107                 }
108         } else {
109                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
110         }
111
112         server_addr.sin_port = htons(port);
113
114         errno = 0;
115         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
116                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
117                         port, strerror( errno ) );
118                 return -1;
119         }
120
121         errno = 0;
122         if(listen(sock_fd, 20) == -1) {
123                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
124                         strerror( errno ) );
125                 return -1;
126         }
127
128         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
129         return sock_fd;
130 }
131
132 int socket_open_unix_server(socket_manager* mgr, const char* path) {
133         if(mgr == NULL || path == NULL) return -1;
134
135         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
136         int sock_fd;
137         struct sockaddr_un server_addr;
138
139         if(strlen(path) > sizeof(server_addr.sun_path) - 1)
140         {
141                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): path too long: %s",
142                         path );
143                 return -1;
144         }
145
146         errno = 0;
147         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
148         if(sock_fd < 0){
149                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
150                         strerror( errno ) );
151                 return -1;
152         }
153
154         server_addr.sun_family = AF_UNIX;
155         strcpy(server_addr.sun_path, path);
156
157         errno = 0;
158         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
159                                 sizeof(struct sockaddr_un)) < 0) {
160                 osrfLogWarning( OSRF_LOG_MARK, 
161                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
162                         path, strerror( errno ) );
163                 return -1;
164         }
165
166         errno = 0;
167         if(listen(sock_fd, 20) == -1) {
168                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
169                         strerror( errno ) );
170                 return -1;
171         }
172
173         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
174         
175         int i = 1;
176
177         /* causing problems with router for some reason ... */
178         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
179         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
180         
181         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
182         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
183
184         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
185         return sock_fd;
186 }
187
188
189
190 int socket_open_udp_server( 
191                 socket_manager* mgr, int port, const char* listen_ip ) {
192
193         int sockfd;
194         struct sockaddr_in server_addr;
195
196         errno = 0;
197         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
198                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
199                 return -1;
200         }
201
202         server_addr.sin_family = AF_INET;
203         server_addr.sin_port = htons(port);
204         if(listen_ip) {
205                 struct in_addr addr;
206                 if( inet_aton( listen_ip, &addr ) )
207                         server_addr.sin_addr.s_addr = addr.s_addr;
208                 else {
209                         osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
210                         return -1;
211                 }
212         } else server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
213
214         errno = 0;
215         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
216                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
217                         port, strerror( errno ) );
218                 return -1;
219         }
220
221         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
222         return sockfd;
223 }
224
225
226 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
227
228         struct sockaddr_in remoteAddr, localAddr;
229    struct hostent *hptr;
230    int sock_fd;
231
232    // ------------------------------------------------------------------
233    // Create the socket
234    // ------------------------------------------------------------------
235    errno = 0;
236    if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
237            osrfLogWarning( OSRF_LOG_MARK,  "socket_open_tcp_client(): Cannot create TCP socket: %s",
238                         strerror( errno ) );
239       return -1;
240    }
241
242         int i = 1;
243         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
244         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
245
246
247    // ------------------------------------------------------------------
248    // Get the hostname
249    // ------------------------------------------------------------------
250    errno = 0;
251    if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
252            osrfLogWarning(  OSRF_LOG_MARK, "socket_open_tcp_client(): Unknown Host => %s: %s",
253                                                 dest_addr, strerror( errno ) );
254       return -1;
255    }
256
257    // ------------------------------------------------------------------
258    // Construct server info struct
259    // ------------------------------------------------------------------
260    memset( &remoteAddr, 0, sizeof(remoteAddr));
261    remoteAddr.sin_family = AF_INET;
262    remoteAddr.sin_port = htons( port );
263    memcpy( (char*) &remoteAddr.sin_addr.s_addr,
264          hptr->h_addr_list[0], hptr->h_length );
265
266    // ------------------------------------------------------------------
267    // Construct local info struct
268    // ------------------------------------------------------------------
269    memset( &localAddr, 0, sizeof( localAddr ) );
270    localAddr.sin_family = AF_INET;
271    localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
272    localAddr.sin_port = htons(0);
273
274    // ------------------------------------------------------------------
275    // Bind to a local port
276    // ------------------------------------------------------------------
277    errno = 0;
278    if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
279            osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot bind to local port: %s",
280                 strerror( errno ) );
281       return -1;
282    }
283
284    // ------------------------------------------------------------------
285    // Connect to server
286    // ------------------------------------------------------------------
287    errno = 0;
288    if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
289            osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
290                    dest_addr, strerror(errno) );
291            return -1;
292    }
293
294         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
295
296    return sock_fd;
297 }
298
299
300 int socket_open_udp_client( 
301                 socket_manager* mgr, int port, const char* dest_addr) {
302
303         int sockfd;
304         struct sockaddr_in client_addr, server_addr;
305         struct hostent* host;
306
307         errno = 0;
308         if( (host = gethostbyname(dest_addr)) == NULL) {
309                 osrfLogWarning( OSRF_LOG_MARK, "Unable to resolve host: %s: %s",
310                         dest_addr, strerror( errno ) );
311                 return -1;
312         }
313
314         server_addr.sin_family = host->h_addrtype;
315         memcpy((char *) &server_addr.sin_addr.s_addr,
316                              host->h_addr_list[0], host->h_length);
317         server_addr.sin_port = htons(port);
318
319         errno = 0;
320         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
321                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
322                 return -1;
323         }
324
325         client_addr.sin_family = AF_INET;
326         client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
327         client_addr.sin_port = htons(0);
328
329         errno = 0;
330         if( (bind(sockfd, (struct sockaddr *) &client_addr, sizeof(client_addr))) < 0 ) {
331                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind UDP socket: %s", strerror( errno ) );
332                 return -1;
333         }
334
335         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
336
337         return sockfd;
338 }
339
340
341 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
342
343         int sock_fd, len;
344    struct sockaddr_un usock;
345
346    if(strlen(sock_path) > sizeof(usock.sun_path) - 1)
347    {
348            osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_client(): path too long: %s",
349                    sock_path );
350            return -1;
351    }
352
353    errno = 0;
354    if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
355            osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
356                 return -1;
357         }
358
359    usock.sun_family = AF_UNIX;
360    strcpy( usock.sun_path, sock_path );
361
362    len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
363
364    errno = 0;
365    if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
366            osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
367                         strerror( errno ) );
368                 return -1;
369         }
370
371         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
372
373    return sock_fd;
374 }
375
376
377 /* returns the socket_node with the given sock_fd */
378 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
379         if(mgr == NULL) return NULL;
380         socket_node* node = mgr->socket;
381         while(node) {
382                 if(node->sock_fd == sock_fd)
383                         return node;
384                 node = node->next;
385         }
386         return NULL;
387 }
388
389 /* removes the node with the given sock_fd from the list and frees it */
390 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
391
392         if(mgr == NULL) return;
393
394         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
395
396         socket_node* head = mgr->socket;
397         socket_node* tail = head;
398         if(head == NULL) return;
399
400         /* if removing the first node in the list */
401         if(head->sock_fd == sock_fd) {
402                 mgr->socket = head->next;
403                 free(head);
404                 return;
405         }
406
407         head = head->next;
408
409         /* if removing any other node */
410         while(head) {
411                 if(head->sock_fd == sock_fd) {
412                         tail->next = head->next;
413                         free(head);
414                         return;
415                 }
416                 tail = head;
417                 head = head->next;
418         }
419 }
420
421
422 void _socket_print_list(socket_manager* mgr) {
423         if(mgr == NULL) return;
424         socket_node* node = mgr->socket;
425         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
426         while(node) {
427                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d", 
428                                 node->sock_fd, node->parent_id);
429                 node = node->next;
430         }
431         osrfLogDebug( OSRF_LOG_MARK, "]");
432 }
433
434 /* sends the given data to the given socket */
435 int socket_send(int sock_fd, const char* data) {
436         return _socket_send( sock_fd, data, 0);
437 }
438
439 /* utility method */
440 static int _socket_send(int sock_fd, const char* data, int flags) {
441
442         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
443
444         errno = 0;
445         size_t r = send( sock_fd, data, strlen(data), flags );
446         int local_errno = errno;
447         
448         if( r == -1 ) {
449                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
450                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
451                 return -1;
452         }
453
454         return 0;
455 }
456
457
458 /* sends the given data to the given socket. 
459  * sets the send flag MSG_DONTWAIT which will allow the 
460  * process to continue even if the socket buffer is full
461  * returns 0 on success, -1 otherwise */
462 //int socket_send_nowait( int sock_fd, const char* data) {
463 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
464 //}
465
466
467 /*
468  * Waits at most usecs microseconds for the send buffer of the given
469  * socket to accept new data.  This does not guarantee that the 
470  * socket will accept all the data we want to give it.
471  */
472 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
473
474         fd_set write_set;
475         FD_ZERO( &write_set );
476         FD_SET( sock_fd, &write_set );
477
478         int mil = 1000000;
479         int secs = (int) usecs / mil;
480         usecs = usecs - (secs * mil);
481
482         struct timeval tv;
483         tv.tv_sec = secs;
484         tv.tv_usec = usecs;
485
486         errno = 0;
487         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
488         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
489
490         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
491                 "timed out on send for socket %d after %d secs, %d usecs: %s",
492                 sock_fd, secs, usecs, strerror( errno ) );
493
494         return -1;
495 }
496
497
498 /* disconnects the node with the given sock_fd and removes
499         it from the socket set */
500 void socket_disconnect(socket_manager* mgr, int sock_fd) {
501         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
502         close( sock_fd );
503         socket_remove_node(mgr, sock_fd);
504 }
505
506
507 /* we assume that if select() fails, the socket is no longer valid */
508 int socket_connected(int sock_fd) {
509         fd_set read_set;
510         FD_ZERO( &read_set );
511         FD_SET( sock_fd, &read_set );
512         if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 ) 
513                 return 0;
514         return 1;
515
516 }
517
518 /* this only waits on the server socket and does not handle the actual
519         data coming in from the client..... XXX */
520 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
521
522         int retval = 0;
523         fd_set read_set;
524         FD_ZERO( &read_set );
525         FD_SET( sock_fd, &read_set );
526
527         struct timeval tv;
528         tv.tv_sec = timeout;
529         tv.tv_usec = 0;
530         errno = 0;
531
532         if( timeout < 0 ) {  
533
534                 // If timeout is -1, we block indefinitely
535                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
536                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
537                         return -1;
538                 }
539
540         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
541
542                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
543                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
544                         return -1;
545                 }
546         }
547
548         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
549         return _socket_route_data_id(mgr, sock_fd);
550 }
551
552
553 int socket_wait_all(socket_manager* mgr, int timeout) {
554
555         if(mgr == NULL) {
556                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
557                 return -1;
558         }
559
560         int retval = 0;
561         fd_set read_set;
562         FD_ZERO( &read_set );
563
564         socket_node* node = mgr->socket;
565         int max_fd = 0;
566         while(node) {
567                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
568                 FD_SET( node->sock_fd, &read_set );
569                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
570                 node = node->next;
571         }
572         max_fd += 1;
573
574         struct timeval tv;
575         tv.tv_sec = timeout;
576         tv.tv_usec = 0;
577         errno = 0;
578
579         if( timeout < 0 ) {  
580
581                 // If timeout is -1, there is no timeout passed to the call to select
582                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
583                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
584                         return -1;
585                 }
586
587         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
588
589                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
590                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
591                         return -1;
592                 }
593         }
594
595         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
596         return _socket_route_data(mgr, retval, &read_set);
597 }
598
599 /* iterates over the sockets in the set and handles active sockets.
600         new sockets connecting to server sockets cause the creation
601         of a new socket node.
602         Any new data read is is passed off to the data_received callback
603         as it arrives */
604 /* determines if we're receiving a new client or data
605         on an existing client */
606 static int _socket_route_data(
607         socket_manager* mgr, int num_active, fd_set* read_set) {
608
609         if(!(mgr && read_set)) return -1;
610
611         int last_failed_id = -1;
612
613
614         /* come back here if someone yanks a socket_node from beneath us */
615         while(1) {
616
617                 socket_node* node = mgr->socket;
618                 int handled = 0;
619                 int status = 0;
620                 
621                 while(node && (handled < num_active)) {
622         
623                         int sock_fd = node->sock_fd;
624                         
625                         if(last_failed_id != -1) {
626                                 /* in case it was not removed by our overlords */
627                                 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
628                                 socket_remove_node( mgr, last_failed_id );
629                                 last_failed_id = -1;
630                                 status = -1;
631                                 break;
632                         }
633         
634                         /* does this socket have data? */
635                         if( FD_ISSET( sock_fd, read_set ) ) {
636         
637                                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
638                                 handled++;
639                                 FD_CLR(sock_fd, read_set);
640         
641                                 if(node->endpoint == SERVER_SOCKET) 
642                                         _socket_handle_new_client(mgr, node);
643         
644                                 else
645                                         status = _socket_handle_client_data(mgr, node);
646         
647                                 /* someone may have yanked a socket_node out from under 
648                                         us...start over with the first socket */
649                                 if(status == -1)  {
650                                         last_failed_id = sock_fd;
651                                         osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
652                                                         "of -1 return code from _socket_handle_client_data()");
653                                 }
654                         }
655
656                         if(status == -1) break;
657                         node = node->next;
658
659                 } // is_set
660
661                 if(status == 0) break;
662                 if(status == -1) status = 0;
663         } 
664
665         return 0;
666 }
667
668
669 /* routes data from a single known socket */
670 static int _socket_route_data_id( socket_manager* mgr, int sock_id) {
671         socket_node* node = socket_find_node(mgr, sock_id);     
672         int status = 0;
673
674         if(node) {
675                 if(node->endpoint == SERVER_SOCKET) 
676                         _socket_handle_new_client(mgr, node);
677         
678                 if(node->endpoint == CLIENT_SOCKET ) 
679                         status = _socket_handle_client_data(mgr, node);
680
681                 if(status == -1) {
682                         socket_remove_node(mgr, sock_id);
683                         return -1;
684                 }
685                 return 0;
686         } 
687
688         return -1;
689 }
690
691
692 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
693         if(mgr == NULL || node == NULL) return -1;
694
695         errno = 0;
696         int new_sock_fd;
697         new_sock_fd = accept(node->sock_fd, NULL, NULL);
698         if(new_sock_fd < 0) {
699                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
700                         strerror( errno ) );
701                 return -1;
702         }
703
704         if(node->addr_type == INET) {
705                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
706                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
707
708         } else if(node->addr_type == UNIX) {
709                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
710                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
711         }
712
713         return 0;
714 }
715
716
717 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
718         if(mgr == NULL || node == NULL) return -1;
719
720         char buf[RBUFSIZE];
721         int read_bytes;
722         int sock_fd = node->sock_fd;
723
724         osrf_clearbuf(buf, sizeof(buf));
725         set_fl(sock_fd, O_NONBLOCK);
726
727         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n", (long) getpid(), get_timestamp_millis());
728
729         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
730                 buf[read_bytes] = '\0';
731                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
732                 if(mgr->data_received)
733                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
734
735                 osrf_clearbuf(buf, sizeof(buf));
736         }
737     int local_errno = errno; /* capture errno as set by recv() */
738
739         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
740                 clr_fl(sock_fd, O_NONBLOCK); 
741                 if(read_bytes < 0) { 
742                         if(local_errno != EAGAIN) 
743                                 osrfLogWarning(OSRF_LOG_MARK,  " * Error reading socket with error %s", strerror(local_errno));
744                 }
745
746         } else { return -1; } /* inform the caller that this node has been tampered with */
747
748         if(read_bytes == 0) {  /* socket closed by client */
749                 if(mgr->on_socket_closed) {
750                         mgr->on_socket_closed(mgr->blob, sock_fd);
751                 }
752                 return -1;
753         }
754
755         return 0;
756
757 }
758
759
760 void socket_manager_free(socket_manager* mgr) {
761         if(mgr == NULL) return;
762         socket_node* tmp;
763         while(mgr->socket) {
764                 tmp = mgr->socket->next;
765                 socket_disconnect(mgr, mgr->socket->sock_fd);
766                 mgr->socket = tmp;
767         }
768         free(mgr);
769
770 }
771