]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/utils/socket_bundle.c
5247328c5d9ff27003466d50b35430c928a68de1
[OpenSRF.git] / src / utils / socket_bundle.c
1 #include "socket_bundle.h"
2
3 /* -------------------------------------------------------------------- 
4         Test Code 
5         -------------------------------------------------------------------- */
6 /*
7 int count = 0;
8 void printme(void* blob, socket_manager* mgr, 
9                 int sock_fd, char* data, int parent_id) {
10
11         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
12                         sock_fd, parent_id, data );
13
14         socket_send(sock_fd, data);
15
16         if(count++ > 2) {
17                 socket_disconnect(mgr, sock_fd);
18                 _socket_print_list(mgr);
19         }
20 }
21
22 int main(int argc, char* argv[]) {
23         socket_manager manager;
24         memset(&manager, 0, sizeof(socket_manager));
25         int port = 11000;
26         if(argv[1])
27                 port = atoi(argv[1]);
28
29         manager.data_received = &printme;
30         socket_open_tcp_server(&manager, port);
31
32         while(1)
33                 socket_wait_all(&manager, -1);
34
35         return 0;
36 }
37 */
38 /* -------------------------------------------------------------------- */
39
40
41
42 socket_node* _socket_add_node(socket_manager* mgr, 
43                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
44
45         if(mgr == NULL) return NULL;
46         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
47         socket_node* new_node = safe_malloc(sizeof(socket_node));
48
49         new_node->endpoint      = endpoint;
50         new_node->addr_type     = addr_type;
51         new_node->sock_fd               = sock_fd;
52         new_node->next                  = NULL;
53         new_node->parent_id = 0;
54         if(parent_id > 0)
55                 new_node->parent_id = parent_id;
56
57         new_node->next                  = mgr->socket;
58         mgr->socket                             = new_node;
59         return new_node;
60 }
61
62 /* creates a new server socket node and adds it to the socket set.
63         returns new socket fd on success.  -1 on failure.
64         socket_type is one of INET or UNIX  */
65 int socket_open_tcp_server(socket_manager* mgr, int port, char* listen_ip) {
66
67         if( mgr == NULL ) {
68                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr"); 
69                 return -1;
70         }
71
72         int sock_fd;
73         struct sockaddr_in server_addr;
74
75         errno = 0;
76         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
77         if(sock_fd < 0) {
78                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
79                         strerror( errno ) );
80                 return -1;
81         }
82
83         server_addr.sin_family = AF_INET;
84
85         if(listen_ip != NULL) {
86                 server_addr.sin_addr.s_addr = inet_addr(listen_ip);
87         } else {
88                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
89         }
90
91         server_addr.sin_port = htons(port);
92
93         errno = 0;
94         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
95                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
96                         port, strerror( errno ) );
97                 return -1;
98         }
99
100         errno = 0;
101         if(listen(sock_fd, 20) == -1) {
102                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
103                         strerror( errno ) );
104                 return -1;
105         }
106
107         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
108         return sock_fd;
109 }
110
111 int socket_open_unix_server(socket_manager* mgr, char* path) {
112         if(mgr == NULL || path == NULL) return -1;
113
114         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
115         int sock_fd;
116         struct sockaddr_un server_addr;
117
118         errno = 0;
119         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
120         if(sock_fd < 0){
121                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
122                         strerror( errno ) );
123                 return -1;
124         }
125
126         server_addr.sun_family = AF_UNIX;
127         strcpy(server_addr.sun_path, path);
128
129         errno = 0;
130         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
131                                 sizeof(struct sockaddr_un)) < 0) {
132                 osrfLogWarning( OSRF_LOG_MARK, 
133                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
134                         path, strerror( errno ) );
135                 return -1;
136         }
137
138         errno = 0;
139         if(listen(sock_fd, 20) == -1) {
140                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
141                         strerror( errno ) );
142                 return -1;
143         }
144
145         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
146         
147         int i = 1;
148
149         /* causing problems with router for some reason ... */
150         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
151         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
152         
153         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
154         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
155
156         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
157         return sock_fd;
158 }
159
160
161
162 int socket_open_udp_server( 
163                 socket_manager* mgr, int port, char* listen_ip ) {
164
165         int sockfd;
166         struct sockaddr_in server_addr;
167
168         errno = 0;
169         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
170                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
171                 return -1;
172         }
173
174         server_addr.sin_family = AF_INET;
175         server_addr.sin_port = htons(port);
176         if(listen_ip) server_addr.sin_addr.s_addr = inet_addr(listen_ip);
177         else server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
178
179         errno = 0;
180         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
181                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
182                         port, strerror( errno ) );
183                 return -1;
184         }
185
186         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
187         return sockfd;
188 }
189
190
191 int socket_open_tcp_client(socket_manager* mgr, int port, char* dest_addr) {
192
193         struct sockaddr_in remoteAddr, localAddr;
194    struct hostent *hptr;
195    int sock_fd;
196
197    // ------------------------------------------------------------------
198    // Create the socket
199    // ------------------------------------------------------------------
200    errno = 0;
201    if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
202            osrfLogWarning( OSRF_LOG_MARK,  "socket_open_tcp_client(): Cannot create TCP socket: %s",
203                         strerror( errno ) );
204       return -1;
205    }
206
207         int i = 1;
208         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
209         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
210
211
212    // ------------------------------------------------------------------
213    // Get the hostname
214    // ------------------------------------------------------------------
215    errno = 0;
216    if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
217            osrfLogWarning(  OSRF_LOG_MARK, "socket_open_tcp_client(): Unknown Host => %s: %s",
218                                                 dest_addr, strerror( errno ) );
219       return -1;
220    }
221
222    // ------------------------------------------------------------------
223    // Construct server info struct
224    // ------------------------------------------------------------------
225    memset( &remoteAddr, 0, sizeof(remoteAddr));
226    remoteAddr.sin_family = AF_INET;
227    remoteAddr.sin_port = htons( port );
228    memcpy( (char*) &remoteAddr.sin_addr.s_addr,
229          hptr->h_addr_list[0], hptr->h_length );
230
231    // ------------------------------------------------------------------
232    // Construct local info struct
233    // ------------------------------------------------------------------
234    memset( &localAddr, 0, sizeof( localAddr ) );
235    localAddr.sin_family = AF_INET;
236    localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
237    localAddr.sin_port = htons(0);
238
239    // ------------------------------------------------------------------
240    // Bind to a local port
241    // ------------------------------------------------------------------
242    errno = 0;
243    if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
244            osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot bind to local port: %s",
245                 strerror( errno ) );
246       return -1;
247    }
248
249    // ------------------------------------------------------------------
250    // Connect to server
251    // ------------------------------------------------------------------
252    errno = 0;
253    if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
254            osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
255                    dest_addr, strerror(errno) );
256            return -1;
257    }
258
259         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
260
261    return sock_fd;
262 }
263
264
265 int socket_open_udp_client( 
266                 socket_manager* mgr, int port, char* dest_addr) {
267
268         int sockfd;
269         struct sockaddr_in client_addr, server_addr;
270         struct hostent* host;
271
272         errno = 0;
273         if( (host = gethostbyname(dest_addr)) == NULL) {
274                 osrfLogWarning( OSRF_LOG_MARK, "Unable to resolve host: %s: %s",
275                         dest_addr, strerror( errno ) );
276                 return -1;
277         }
278
279         server_addr.sin_family = host->h_addrtype;
280         memcpy((char *) &server_addr.sin_addr.s_addr,
281                              host->h_addr_list[0], host->h_length);
282         server_addr.sin_port = htons(port);
283
284         errno = 0;
285         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
286                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
287                 return -1;
288         }
289
290         client_addr.sin_family = AF_INET;
291         client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
292         client_addr.sin_port = htons(0);
293
294         errno = 0;
295         if( (bind(sockfd, (struct sockaddr *) &client_addr, sizeof(client_addr))) < 0 ) {
296                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind UDP socket: %s", strerror( errno ) );
297                 return -1;
298         }
299
300         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
301
302         return sockfd;
303 }
304
305
306 int socket_open_unix_client(socket_manager* mgr, char* sock_path) {
307
308         int sock_fd, len;
309    struct sockaddr_un usock;
310
311    errno = 0;
312    if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
313            osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
314                 return -1;
315         }
316
317    usock.sun_family = AF_UNIX;
318    strcpy( usock.sun_path, sock_path );
319
320    len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
321
322    errno = 0;
323    if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
324            osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
325                         strerror( errno ) );
326                 return -1;
327         }
328
329         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
330
331    return sock_fd;
332 }
333
334
335
336 /* returns the socket_node with the given sock_fd */
337 socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
338         if(mgr == NULL) return NULL;
339         socket_node* node = mgr->socket;
340         while(node) {
341                 if(node->sock_fd == sock_fd)
342                         return node;
343                 node = node->next;
344         }
345         return NULL;
346 }
347
348 /* removes the node with the given sock_fd from the list and frees it */
349 void socket_remove_node(socket_manager* mgr, int sock_fd) {
350
351         if(mgr == NULL) return;
352
353         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
354
355         socket_node* head = mgr->socket;
356         socket_node* tail = head;
357         if(head == NULL) return;
358
359         /* if removing the first node in the list */
360         if(head->sock_fd == sock_fd) {
361                 mgr->socket = head->next;
362                 free(head);
363                 return;
364         }
365
366         head = head->next;
367
368         /* if removing any other node */
369         while(head) {
370                 if(head->sock_fd == sock_fd) {
371                         tail->next = head->next;
372                         free(head);
373                         return;
374                 }
375                 tail = head;
376                 head = head->next;
377         }
378 }
379
380
381
382 void _socket_print_list(socket_manager* mgr) {
383         if(mgr == NULL) return;
384         socket_node* node = mgr->socket;
385         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
386         while(node) {
387                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d", 
388                                 node->sock_fd, node->parent_id);
389                 node = node->next;
390         }
391         osrfLogDebug( OSRF_LOG_MARK, "]");
392 }
393
394 /* sends the given data to the given socket */
395 int socket_send(int sock_fd, const char* data) {
396         return _socket_send( sock_fd, data, 0);
397 }
398
399
400 int _socket_send(int sock_fd, const char* data, int flags) {
401
402         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
403
404         errno = 0;
405         size_t r = send( sock_fd, data, strlen(data), flags );
406         int local_errno = errno;
407         
408         if( r == -1 ) {
409                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
410                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
411                 return -1;
412         }
413
414         return 0;
415 }
416
417
418 int socket_send_nowait( int sock_fd, const char* data) {
419         return _socket_send( sock_fd, data, MSG_DONTWAIT);
420 }
421
422
423 /*
424  * Waits at most usecs microseconds for the send buffer of the given
425  * socket to accept new data.  This does not guarantee that the 
426  * socket will accept all the data we want to give it.
427  */
428 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
429
430         fd_set write_set;
431         FD_ZERO( &write_set );
432         FD_SET( sock_fd, &write_set );
433
434         int mil = 1000000;
435         int secs = (int) usecs / mil;
436         usecs = usecs - (secs * mil);
437
438         struct timeval tv;
439         tv.tv_sec = secs;
440         tv.tv_usec = usecs;
441
442         errno = 0;
443         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
444         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
445
446         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
447                 "timed out on send for socket %d after %d secs, %d usecs: %s",
448                 sock_fd, secs, usecs, strerror( errno ) );
449
450         return -1;
451 }
452
453
454 /* disconnects the node with the given sock_fd and removes
455         it from the socket set */
456 void socket_disconnect(socket_manager* mgr, int sock_fd) {
457         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
458         close( sock_fd );
459         socket_remove_node(mgr, sock_fd);
460 }
461
462
463 /* we assume that if select() fails, the socket is no longer valid */
464 int socket_connected(int sock_fd) {
465         fd_set read_set;
466         FD_ZERO( &read_set );
467         FD_SET( sock_fd, &read_set );
468         if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 ) 
469                 return 0;
470         return 1;
471
472 }
473
474 /* this only waits on the server socket and does not handle the actual
475         data coming in from the client..... XXX */
476 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
477
478         int retval = 0;
479         fd_set read_set;
480         FD_ZERO( &read_set );
481         FD_SET( sock_fd, &read_set );
482
483         struct timeval tv;
484         tv.tv_sec = timeout;
485         tv.tv_usec = 0;
486         errno = 0;
487
488         if( timeout < 0 ) {  
489
490                 // If timeout is -1, we block indefinitely
491                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
492                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
493                         return -1;
494                 }
495
496         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
497
498                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
499                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
500                         return -1;
501                 }
502         }
503
504         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
505         return _socket_route_data_id(mgr, sock_fd);
506 }
507
508
509 int socket_wait_all(socket_manager* mgr, int timeout) {
510
511         if(mgr == NULL) {
512                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
513                 return -1;
514         }
515
516         int retval = 0;
517         fd_set read_set;
518         FD_ZERO( &read_set );
519
520         socket_node* node = mgr->socket;
521         int max_fd = 0;
522         while(node) {
523                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
524                 FD_SET( node->sock_fd, &read_set );
525                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
526                 node = node->next;
527         }
528         max_fd += 1;
529
530         struct timeval tv;
531         tv.tv_sec = timeout;
532         tv.tv_usec = 0;
533         errno = 0;
534
535         if( timeout < 0 ) {  
536
537                 // If timeout is -1, there is no timeout passed to the call to select
538                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
539                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
540                         return -1;
541                 }
542
543         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
544
545                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
546                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
547                         return -1;
548                 }
549         }
550
551         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
552         return _socket_route_data(mgr, retval, &read_set);
553 }
554
555 /* determines if we'er receiving a new client or data
556         on an existing client */
557 int _socket_route_data(
558         socket_manager* mgr, int num_active, fd_set* read_set) {
559
560         if(!(mgr && read_set)) return -1;
561
562         int last_failed_id = -1;
563
564
565         /* come back here if someone yanks a socket_node from beneath us */
566         while(1) {
567
568                 socket_node* node = mgr->socket;
569                 int handled = 0;
570                 int status = 0;
571                 
572                 while(node && (handled < num_active)) {
573         
574                         int sock_fd = node->sock_fd;
575                         
576                         if(last_failed_id != -1) {
577                                 /* in case it was not removed by our overlords */
578                                 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
579                                 socket_remove_node( mgr, last_failed_id );
580                                 last_failed_id = -1;
581                                 status = -1;
582                                 break;
583                         }
584         
585                         /* does this socket have data? */
586                         if( FD_ISSET( sock_fd, read_set ) ) {
587         
588                                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
589                                 handled++;
590                                 FD_CLR(sock_fd, read_set);
591         
592                                 if(node->endpoint == SERVER_SOCKET) 
593                                         _socket_handle_new_client(mgr, node);
594         
595                                 else
596                                         status = _socket_handle_client_data(mgr, node);
597         
598                                 /* someone may have yanked a socket_node out from under 
599                                         us...start over with the first socket */
600                                 if(status == -1)  {
601                                         last_failed_id = sock_fd;
602                                         osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
603                                                         "of -1 return code from _socket_handle_client_data()");
604                                 }
605                         }
606
607                         if(status == -1) break;
608                         node = node->next;
609
610                 } // is_set
611
612                 if(status == 0) break;
613                 if(status == -1) status = 0;
614         } 
615
616         return 0;
617 }
618
619
620 int _socket_route_data_id( socket_manager* mgr, int sock_id) {
621         socket_node* node = socket_find_node(mgr, sock_id);     
622         int status = 0;
623
624         if(node) {
625                 if(node->endpoint == SERVER_SOCKET) 
626                         _socket_handle_new_client(mgr, node);
627         
628                 if(node->endpoint == CLIENT_SOCKET ) 
629                         status = _socket_handle_client_data(mgr, node);
630
631                 if(status == -1) {
632                         socket_remove_node(mgr, sock_id);
633                         return -1;
634                 }
635                 return 0;
636         } 
637
638         return -1;
639 }
640
641
642 int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
643         if(mgr == NULL || node == NULL) return -1;
644
645         errno = 0;
646         int new_sock_fd;
647         new_sock_fd = accept(node->sock_fd, NULL, NULL);
648         if(new_sock_fd < 0) {
649                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
650                         strerror( errno ) );
651                 return -1;
652         }
653
654         if(node->addr_type == INET) {
655                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
656                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
657
658         } else if(node->addr_type == UNIX) {
659                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
660                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
661         }
662
663         return 0;
664 }
665
666
667 int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
668         if(mgr == NULL || node == NULL) return -1;
669
670         char buf[RBUFSIZE];
671         int read_bytes;
672         int sock_fd = node->sock_fd;
673
674         memset(buf, 0, RBUFSIZE);
675         set_fl(sock_fd, O_NONBLOCK);
676
677         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n", (long) getpid(), get_timestamp_millis());
678
679         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
680                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
681                 if(mgr->data_received)
682                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
683
684                 memset(buf, 0, RBUFSIZE);
685         }
686     int local_errno = errno; /* capture errno as set by recv() */
687
688         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
689                 clr_fl(sock_fd, O_NONBLOCK); 
690                 if(read_bytes < 0) { 
691                         if(local_errno != EAGAIN) 
692                                 osrfLogWarning(OSRF_LOG_MARK,  " * Error reading socket with error %s", strerror(local_errno));
693                 }
694
695         } else { return -1; } /* inform the caller that this node has been tampered with */
696
697         if(read_bytes == 0) {  /* socket closed by client */
698                 if(mgr->on_socket_closed) {
699                         mgr->on_socket_closed(mgr->blob, sock_fd);
700                 }
701                 return -1;
702         }
703
704         return 0;
705
706 }
707
708
709 void socket_manager_free(socket_manager* mgr) {
710         if(mgr == NULL) return;
711         socket_node* tmp;
712         while(mgr->socket) {
713                 tmp = mgr->socket->next;
714                 socket_disconnect(mgr, mgr->socket->sock_fd);
715                 mgr->socket = tmp;
716         }
717         free(mgr);
718
719 }
720