]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/socket_bundle.c
Merged libopensrf source directories (libtransport, libstack, and utils) into a singl...
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 #include <opensrf/socket_bundle.h>
2
3 /* buffer used to read from the sockets */
4 #define RBUFSIZE 1024
5
6 static socket_node* _socket_add_node(socket_manager* mgr,
7                 int endpoint, int addr_type, int sock_fd, int parent_id );
8 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
9 static void socket_remove_node(socket_manager*, int sock_fd);
10 static int _socket_send(int sock_fd, const char* data, int flags);
11 static int _socket_route_data(socket_manager* mgr, int num_active, fd_set* read_set);
12 static int _socket_route_data_id( socket_manager* mgr, int sock_id);
13 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
14 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
15
16
17 /* -------------------------------------------------------------------- 
18         Test Code 
19         -------------------------------------------------------------------- */
20 /*
21 int count = 0;
22 void printme(void* blob, socket_manager* mgr, 
23                 int sock_fd, char* data, int parent_id) {
24
25         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
26                         sock_fd, parent_id, data );
27
28         socket_send(sock_fd, data);
29
30         if(count++ > 2) {
31                 socket_disconnect(mgr, sock_fd);
32                 _socket_print_list(mgr);
33         }
34 }
35
36 int main(int argc, char* argv[]) {
37         socket_manager manager;
38         memset(&manager, 0, sizeof(socket_manager));
39         int port = 11000;
40         if(argv[1])
41                 port = atoi(argv[1]);
42
43         manager.data_received = &printme;
44         socket_open_tcp_server(&manager, port);
45
46         while(1)
47                 socket_wait_all(&manager, -1);
48
49         return 0;
50 }
51 */
52 /* -------------------------------------------------------------------- */
53
54
55 /* allocates and inserts a new socket node into the nodeset.
56         if parent_id is positive and non-zero, it will be set */
57 static socket_node* _socket_add_node(socket_manager* mgr, 
58                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
59
60         if(mgr == NULL) return NULL;
61         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
62         socket_node* new_node = safe_malloc(sizeof(socket_node));
63
64         new_node->endpoint      = endpoint;
65         new_node->addr_type     = addr_type;
66         new_node->sock_fd               = sock_fd;
67         new_node->next                  = NULL;
68         new_node->parent_id = 0;
69         if(parent_id > 0)
70                 new_node->parent_id = parent_id;
71
72         new_node->next                  = mgr->socket;
73         mgr->socket                             = new_node;
74         return new_node;
75 }
76
77 /* creates a new server socket node and adds it to the socket set.
78         returns new socket fd on success.  -1 on failure.
79         socket_type is one of INET or UNIX  */
80 int socket_open_tcp_server(socket_manager* mgr, int port, char* listen_ip) {
81
82         if( mgr == NULL ) {
83                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr"); 
84                 return -1;
85         }
86
87         int sock_fd;
88         struct sockaddr_in server_addr;
89
90         errno = 0;
91         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
92         if(sock_fd < 0) {
93                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
94                         strerror( errno ) );
95                 return -1;
96         }
97
98         server_addr.sin_family = AF_INET;
99
100         if(listen_ip != NULL) {
101                 server_addr.sin_addr.s_addr = inet_addr(listen_ip);
102         } else {
103                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
104         }
105
106         server_addr.sin_port = htons(port);
107
108         errno = 0;
109         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
110                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
111                         port, strerror( errno ) );
112                 return -1;
113         }
114
115         errno = 0;
116         if(listen(sock_fd, 20) == -1) {
117                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
118                         strerror( errno ) );
119                 return -1;
120         }
121
122         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
123         return sock_fd;
124 }
125
126 int socket_open_unix_server(socket_manager* mgr, char* path) {
127         if(mgr == NULL || path == NULL) return -1;
128
129         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
130         int sock_fd;
131         struct sockaddr_un server_addr;
132
133         errno = 0;
134         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
135         if(sock_fd < 0){
136                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
137                         strerror( errno ) );
138                 return -1;
139         }
140
141         server_addr.sun_family = AF_UNIX;
142         strcpy(server_addr.sun_path, path);
143
144         errno = 0;
145         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
146                                 sizeof(struct sockaddr_un)) < 0) {
147                 osrfLogWarning( OSRF_LOG_MARK, 
148                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
149                         path, strerror( errno ) );
150                 return -1;
151         }
152
153         errno = 0;
154         if(listen(sock_fd, 20) == -1) {
155                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
156                         strerror( errno ) );
157                 return -1;
158         }
159
160         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
161         
162         int i = 1;
163
164         /* causing problems with router for some reason ... */
165         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
166         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
167         
168         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
169         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
170
171         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
172         return sock_fd;
173 }
174
175
176
177 int socket_open_udp_server( 
178                 socket_manager* mgr, int port, char* listen_ip ) {
179
180         int sockfd;
181         struct sockaddr_in server_addr;
182
183         errno = 0;
184         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
185                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
186                 return -1;
187         }
188
189         server_addr.sin_family = AF_INET;
190         server_addr.sin_port = htons(port);
191         if(listen_ip) server_addr.sin_addr.s_addr = inet_addr(listen_ip);
192         else server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
193
194         errno = 0;
195         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
196                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
197                         port, strerror( errno ) );
198                 return -1;
199         }
200
201         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
202         return sockfd;
203 }
204
205
206 int socket_open_tcp_client(socket_manager* mgr, int port, char* dest_addr) {
207
208         struct sockaddr_in remoteAddr, localAddr;
209    struct hostent *hptr;
210    int sock_fd;
211
212    // ------------------------------------------------------------------
213    // Create the socket
214    // ------------------------------------------------------------------
215    errno = 0;
216    if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
217            osrfLogWarning( OSRF_LOG_MARK,  "socket_open_tcp_client(): Cannot create TCP socket: %s",
218                         strerror( errno ) );
219       return -1;
220    }
221
222         int i = 1;
223         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
224         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
225
226
227    // ------------------------------------------------------------------
228    // Get the hostname
229    // ------------------------------------------------------------------
230    errno = 0;
231    if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
232            osrfLogWarning(  OSRF_LOG_MARK, "socket_open_tcp_client(): Unknown Host => %s: %s",
233                                                 dest_addr, strerror( errno ) );
234       return -1;
235    }
236
237    // ------------------------------------------------------------------
238    // Construct server info struct
239    // ------------------------------------------------------------------
240    memset( &remoteAddr, 0, sizeof(remoteAddr));
241    remoteAddr.sin_family = AF_INET;
242    remoteAddr.sin_port = htons( port );
243    memcpy( (char*) &remoteAddr.sin_addr.s_addr,
244          hptr->h_addr_list[0], hptr->h_length );
245
246    // ------------------------------------------------------------------
247    // Construct local info struct
248    // ------------------------------------------------------------------
249    memset( &localAddr, 0, sizeof( localAddr ) );
250    localAddr.sin_family = AF_INET;
251    localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
252    localAddr.sin_port = htons(0);
253
254    // ------------------------------------------------------------------
255    // Bind to a local port
256    // ------------------------------------------------------------------
257    errno = 0;
258    if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
259            osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot bind to local port: %s",
260                 strerror( errno ) );
261       return -1;
262    }
263
264    // ------------------------------------------------------------------
265    // Connect to server
266    // ------------------------------------------------------------------
267    errno = 0;
268    if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
269            osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
270                    dest_addr, strerror(errno) );
271            return -1;
272    }
273
274         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
275
276    return sock_fd;
277 }
278
279
280 int socket_open_udp_client( 
281                 socket_manager* mgr, int port, char* dest_addr) {
282
283         int sockfd;
284         struct sockaddr_in client_addr, server_addr;
285         struct hostent* host;
286
287         errno = 0;
288         if( (host = gethostbyname(dest_addr)) == NULL) {
289                 osrfLogWarning( OSRF_LOG_MARK, "Unable to resolve host: %s: %s",
290                         dest_addr, strerror( errno ) );
291                 return -1;
292         }
293
294         server_addr.sin_family = host->h_addrtype;
295         memcpy((char *) &server_addr.sin_addr.s_addr,
296                              host->h_addr_list[0], host->h_length);
297         server_addr.sin_port = htons(port);
298
299         errno = 0;
300         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
301                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
302                 return -1;
303         }
304
305         client_addr.sin_family = AF_INET;
306         client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
307         client_addr.sin_port = htons(0);
308
309         errno = 0;
310         if( (bind(sockfd, (struct sockaddr *) &client_addr, sizeof(client_addr))) < 0 ) {
311                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind UDP socket: %s", strerror( errno ) );
312                 return -1;
313         }
314
315         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
316
317         return sockfd;
318 }
319
320
321 int socket_open_unix_client(socket_manager* mgr, char* sock_path) {
322
323         int sock_fd, len;
324    struct sockaddr_un usock;
325
326    errno = 0;
327    if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
328            osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
329                 return -1;
330         }
331
332    usock.sun_family = AF_UNIX;
333    strcpy( usock.sun_path, sock_path );
334
335    len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
336
337    errno = 0;
338    if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
339            osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
340                         strerror( errno ) );
341                 return -1;
342         }
343
344         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
345
346    return sock_fd;
347 }
348
349
350 /* returns the socket_node with the given sock_fd */
351 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
352         if(mgr == NULL) return NULL;
353         socket_node* node = mgr->socket;
354         while(node) {
355                 if(node->sock_fd == sock_fd)
356                         return node;
357                 node = node->next;
358         }
359         return NULL;
360 }
361
362 /* removes the node with the given sock_fd from the list and frees it */
363 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
364
365         if(mgr == NULL) return;
366
367         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
368
369         socket_node* head = mgr->socket;
370         socket_node* tail = head;
371         if(head == NULL) return;
372
373         /* if removing the first node in the list */
374         if(head->sock_fd == sock_fd) {
375                 mgr->socket = head->next;
376                 free(head);
377                 return;
378         }
379
380         head = head->next;
381
382         /* if removing any other node */
383         while(head) {
384                 if(head->sock_fd == sock_fd) {
385                         tail->next = head->next;
386                         free(head);
387                         return;
388                 }
389                 tail = head;
390                 head = head->next;
391         }
392 }
393
394
395 void _socket_print_list(socket_manager* mgr) {
396         if(mgr == NULL) return;
397         socket_node* node = mgr->socket;
398         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
399         while(node) {
400                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d", 
401                                 node->sock_fd, node->parent_id);
402                 node = node->next;
403         }
404         osrfLogDebug( OSRF_LOG_MARK, "]");
405 }
406
407 /* sends the given data to the given socket */
408 int socket_send(int sock_fd, const char* data) {
409         return _socket_send( sock_fd, data, 0);
410 }
411
412 /* utility method */
413 static int _socket_send(int sock_fd, const char* data, int flags) {
414
415         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
416
417         errno = 0;
418         size_t r = send( sock_fd, data, strlen(data), flags );
419         int local_errno = errno;
420         
421         if( r == -1 ) {
422                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
423                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
424                 return -1;
425         }
426
427         return 0;
428 }
429
430
431 /* sends the given data to the given socket. 
432  * sets the send flag MSG_DONTWAIT which will allow the 
433  * process to continue even if the socket buffer is full
434  * returns 0 on success, -1 otherwise */
435 //int socket_send_nowait( int sock_fd, const char* data) {
436 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
437 //}
438
439
440 /*
441  * Waits at most usecs microseconds for the send buffer of the given
442  * socket to accept new data.  This does not guarantee that the 
443  * socket will accept all the data we want to give it.
444  */
445 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
446
447         fd_set write_set;
448         FD_ZERO( &write_set );
449         FD_SET( sock_fd, &write_set );
450
451         int mil = 1000000;
452         int secs = (int) usecs / mil;
453         usecs = usecs - (secs * mil);
454
455         struct timeval tv;
456         tv.tv_sec = secs;
457         tv.tv_usec = usecs;
458
459         errno = 0;
460         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
461         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
462
463         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
464                 "timed out on send for socket %d after %d secs, %d usecs: %s",
465                 sock_fd, secs, usecs, strerror( errno ) );
466
467         return -1;
468 }
469
470
471 /* disconnects the node with the given sock_fd and removes
472         it from the socket set */
473 void socket_disconnect(socket_manager* mgr, int sock_fd) {
474         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
475         close( sock_fd );
476         socket_remove_node(mgr, sock_fd);
477 }
478
479
480 /* we assume that if select() fails, the socket is no longer valid */
481 int socket_connected(int sock_fd) {
482         fd_set read_set;
483         FD_ZERO( &read_set );
484         FD_SET( sock_fd, &read_set );
485         if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 ) 
486                 return 0;
487         return 1;
488
489 }
490
491 /* this only waits on the server socket and does not handle the actual
492         data coming in from the client..... XXX */
493 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
494
495         int retval = 0;
496         fd_set read_set;
497         FD_ZERO( &read_set );
498         FD_SET( sock_fd, &read_set );
499
500         struct timeval tv;
501         tv.tv_sec = timeout;
502         tv.tv_usec = 0;
503         errno = 0;
504
505         if( timeout < 0 ) {  
506
507                 // If timeout is -1, we block indefinitely
508                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
509                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
510                         return -1;
511                 }
512
513         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
514
515                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
516                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
517                         return -1;
518                 }
519         }
520
521         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
522         return _socket_route_data_id(mgr, sock_fd);
523 }
524
525
526 int socket_wait_all(socket_manager* mgr, int timeout) {
527
528         if(mgr == NULL) {
529                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
530                 return -1;
531         }
532
533         int retval = 0;
534         fd_set read_set;
535         FD_ZERO( &read_set );
536
537         socket_node* node = mgr->socket;
538         int max_fd = 0;
539         while(node) {
540                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
541                 FD_SET( node->sock_fd, &read_set );
542                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
543                 node = node->next;
544         }
545         max_fd += 1;
546
547         struct timeval tv;
548         tv.tv_sec = timeout;
549         tv.tv_usec = 0;
550         errno = 0;
551
552         if( timeout < 0 ) {  
553
554                 // If timeout is -1, there is no timeout passed to the call to select
555                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
556                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
557                         return -1;
558                 }
559
560         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
561
562                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
563                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
564                         return -1;
565                 }
566         }
567
568         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
569         return _socket_route_data(mgr, retval, &read_set);
570 }
571
572 /* iterates over the sockets in the set and handles active sockets.
573         new sockets connecting to server sockets cause the creation
574         of a new socket node.
575         Any new data read is is passed off to the data_received callback
576         as it arrives */
577 /* determines if we're receiving a new client or data
578         on an existing client */
579 static int _socket_route_data(
580         socket_manager* mgr, int num_active, fd_set* read_set) {
581
582         if(!(mgr && read_set)) return -1;
583
584         int last_failed_id = -1;
585
586
587         /* come back here if someone yanks a socket_node from beneath us */
588         while(1) {
589
590                 socket_node* node = mgr->socket;
591                 int handled = 0;
592                 int status = 0;
593                 
594                 while(node && (handled < num_active)) {
595         
596                         int sock_fd = node->sock_fd;
597                         
598                         if(last_failed_id != -1) {
599                                 /* in case it was not removed by our overlords */
600                                 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
601                                 socket_remove_node( mgr, last_failed_id );
602                                 last_failed_id = -1;
603                                 status = -1;
604                                 break;
605                         }
606         
607                         /* does this socket have data? */
608                         if( FD_ISSET( sock_fd, read_set ) ) {
609         
610                                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
611                                 handled++;
612                                 FD_CLR(sock_fd, read_set);
613         
614                                 if(node->endpoint == SERVER_SOCKET) 
615                                         _socket_handle_new_client(mgr, node);
616         
617                                 else
618                                         status = _socket_handle_client_data(mgr, node);
619         
620                                 /* someone may have yanked a socket_node out from under 
621                                         us...start over with the first socket */
622                                 if(status == -1)  {
623                                         last_failed_id = sock_fd;
624                                         osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
625                                                         "of -1 return code from _socket_handle_client_data()");
626                                 }
627                         }
628
629                         if(status == -1) break;
630                         node = node->next;
631
632                 } // is_set
633
634                 if(status == 0) break;
635                 if(status == -1) status = 0;
636         } 
637
638         return 0;
639 }
640
641
642 /* routes data from a single known socket */
643 static int _socket_route_data_id( socket_manager* mgr, int sock_id) {
644         socket_node* node = socket_find_node(mgr, sock_id);     
645         int status = 0;
646
647         if(node) {
648                 if(node->endpoint == SERVER_SOCKET) 
649                         _socket_handle_new_client(mgr, node);
650         
651                 if(node->endpoint == CLIENT_SOCKET ) 
652                         status = _socket_handle_client_data(mgr, node);
653
654                 if(status == -1) {
655                         socket_remove_node(mgr, sock_id);
656                         return -1;
657                 }
658                 return 0;
659         } 
660
661         return -1;
662 }
663
664
665 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
666         if(mgr == NULL || node == NULL) return -1;
667
668         errno = 0;
669         int new_sock_fd;
670         new_sock_fd = accept(node->sock_fd, NULL, NULL);
671         if(new_sock_fd < 0) {
672                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
673                         strerror( errno ) );
674                 return -1;
675         }
676
677         if(node->addr_type == INET) {
678                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
679                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
680
681         } else if(node->addr_type == UNIX) {
682                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
683                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
684         }
685
686         return 0;
687 }
688
689
690 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
691         if(mgr == NULL || node == NULL) return -1;
692
693         char buf[RBUFSIZE];
694         int read_bytes;
695         int sock_fd = node->sock_fd;
696
697         memset(buf, 0, RBUFSIZE);
698         set_fl(sock_fd, O_NONBLOCK);
699
700         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n", (long) getpid(), get_timestamp_millis());
701
702         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
703                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
704                 if(mgr->data_received)
705                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
706
707                 memset(buf, 0, RBUFSIZE);
708         }
709     int local_errno = errno; /* capture errno as set by recv() */
710
711         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
712                 clr_fl(sock_fd, O_NONBLOCK); 
713                 if(read_bytes < 0) { 
714                         if(local_errno != EAGAIN) 
715                                 osrfLogWarning(OSRF_LOG_MARK,  " * Error reading socket with error %s", strerror(local_errno));
716                 }
717
718         } else { return -1; } /* inform the caller that this node has been tampered with */
719
720         if(read_bytes == 0) {  /* socket closed by client */
721                 if(mgr->on_socket_closed) {
722                         mgr->on_socket_closed(mgr->blob, sock_fd);
723                 }
724                 return -1;
725         }
726
727         return 0;
728
729 }
730
731
732 void socket_manager_free(socket_manager* mgr) {
733         if(mgr == NULL) return;
734         socket_node* tmp;
735         while(mgr->socket) {
736                 tmp = mgr->socket->next;
737                 socket_disconnect(mgr, mgr->socket->sock_fd);
738                 mgr->socket = tmp;
739         }
740         free(mgr);
741
742 }
743