]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libopensrf/socket_bundle.c
added cache cleanup code
[OpenSRF.git] / src / libopensrf / socket_bundle.c
1 #include <opensrf/socket_bundle.h>
2
3 /* buffer used to read from the sockets */
4 #define RBUFSIZE 1024
5
6 static socket_node* _socket_add_node(socket_manager* mgr,
7                 int endpoint, int addr_type, int sock_fd, int parent_id );
8 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd);
9 static void socket_remove_node(socket_manager*, int sock_fd);
10 static int _socket_send(int sock_fd, const char* data, int flags);
11 static int _socket_route_data(socket_manager* mgr, int num_active, fd_set* read_set);
12 static int _socket_route_data_id( socket_manager* mgr, int sock_id);
13 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node);
14 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node);
15
16
17 /* -------------------------------------------------------------------- 
18         Test Code 
19         -------------------------------------------------------------------- */
20 /*
21 int count = 0;
22 void printme(void* blob, socket_manager* mgr, 
23                 int sock_fd, char* data, int parent_id) {
24
25         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
26                         sock_fd, parent_id, data );
27
28         socket_send(sock_fd, data);
29
30         if(count++ > 2) {
31                 socket_disconnect(mgr, sock_fd);
32                 _socket_print_list(mgr);
33         }
34 }
35
36 int main(int argc, char* argv[]) {
37         socket_manager manager;
38         memset(&manager, 0, sizeof(socket_manager));
39         int port = 11000;
40         if(argv[1])
41                 port = atoi(argv[1]);
42
43         manager.data_received = &printme;
44         socket_open_tcp_server(&manager, port);
45
46         while(1)
47                 socket_wait_all(&manager, -1);
48
49         return 0;
50 }
51 */
52 /* -------------------------------------------------------------------- */
53
54
55 /* allocates and inserts a new socket node into the nodeset.
56         if parent_id is positive and non-zero, it will be set */
57 static socket_node* _socket_add_node(socket_manager* mgr, 
58                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
59
60         if(mgr == NULL) return NULL;
61         osrfLogInternal( OSRF_LOG_MARK, "Adding socket node with fd %d", sock_fd);
62         socket_node* new_node = safe_malloc(sizeof(socket_node));
63
64         new_node->endpoint      = endpoint;
65         new_node->addr_type     = addr_type;
66         new_node->sock_fd               = sock_fd;
67         new_node->next                  = NULL;
68         new_node->parent_id = 0;
69         if(parent_id > 0)
70                 new_node->parent_id = parent_id;
71
72         new_node->next                  = mgr->socket;
73         mgr->socket                             = new_node;
74         return new_node;
75 }
76
77 /* creates a new server socket node and adds it to the socket set.
78         returns new socket fd on success.  -1 on failure.
79         socket_type is one of INET or UNIX  */
80 int socket_open_tcp_server(socket_manager* mgr, int port, const char* listen_ip) {
81
82         if( mgr == NULL ) {
83                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): NULL mgr"); 
84                 return -1;
85         }
86
87         int sock_fd;
88         struct sockaddr_in server_addr;
89
90         errno = 0;
91         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
92         if(sock_fd < 0) {
93                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): Unable to create TCP socket: %s",
94                         strerror( errno ) );
95                 return -1;
96         }
97
98         server_addr.sin_family = AF_INET;
99
100         if(listen_ip != NULL) {
101                 struct in_addr addr;
102                 if( inet_aton( listen_ip, &addr ) )
103                         server_addr.sin_addr.s_addr = addr.s_addr;
104                 else {
105                         osrfLogError( OSRF_LOG_MARK, "Listener address is invalid: %s", listen_ip );
106                         return -1;
107                 }
108         } else {
109                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
110         }
111
112         server_addr.sin_port = htons(port);
113
114         errno = 0;
115         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
116                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): cannot bind to port %d: %s",
117                         port, strerror( errno ) );
118                 return -1;
119         }
120
121         errno = 0;
122         if(listen(sock_fd, 20) == -1) {
123                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_server(): listen() returned error: %s",
124                         strerror( errno ) );
125                 return -1;
126         }
127
128         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
129         return sock_fd;
130 }
131
132 int socket_open_unix_server(socket_manager* mgr, char* path) {
133         if(mgr == NULL || path == NULL) return -1;
134
135         osrfLogDebug( OSRF_LOG_MARK, "opening unix socket at %s", path);
136         int sock_fd;
137         struct sockaddr_un server_addr;
138
139         errno = 0;
140         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
141         if(sock_fd < 0){
142                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): socket() failed: %s",
143                         strerror( errno ) );
144                 return -1;
145         }
146
147         server_addr.sun_family = AF_UNIX;
148         strcpy(server_addr.sun_path, path);
149
150         errno = 0;
151         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
152                                 sizeof(struct sockaddr_un)) < 0) {
153                 osrfLogWarning( OSRF_LOG_MARK, 
154                         "socket_open_unix_server(): cannot bind to unix port %s: %s",
155                         path, strerror( errno ) );
156                 return -1;
157         }
158
159         errno = 0;
160         if(listen(sock_fd, 20) == -1) {
161                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_unix_server(): listen() returned error: %s",
162                         strerror( errno ) );
163                 return -1;
164         }
165
166         osrfLogDebug( OSRF_LOG_MARK, "unix socket successfully opened");
167         
168         int i = 1;
169
170         /* causing problems with router for some reason ... */
171         //osrfLogDebug( OSRF_LOG_MARK, "Setting SO_REUSEADDR");
172         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
173         
174         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
175         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
176
177         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
178         return sock_fd;
179 }
180
181
182
183 int socket_open_udp_server( 
184                 socket_manager* mgr, int port, const char* listen_ip ) {
185
186         int sockfd;
187         struct sockaddr_in server_addr;
188
189         errno = 0;
190         if( (sockfd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) {
191                 osrfLogWarning( OSRF_LOG_MARK, "Unable to create UDP socket: %s", strerror( errno ) );
192                 return -1;
193         }
194
195         server_addr.sin_family = AF_INET;
196         server_addr.sin_port = htons(port);
197         if(listen_ip) {
198                 struct in_addr addr;
199                 if( inet_aton( listen_ip, &addr ) )
200                         server_addr.sin_addr.s_addr = addr.s_addr;
201                 else {
202                         osrfLogError( OSRF_LOG_MARK, "UDP listener address is invalid: %s", listen_ip );
203                         return -1;
204                 }
205         } else server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
206
207         errno = 0;
208         if( (bind (sockfd, (struct sockaddr *) &server_addr,sizeof(server_addr))) ) {
209                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind to UDP port %d: %s",
210                         port, strerror( errno ) );
211                 return -1;
212         }
213
214         _socket_add_node(mgr, SERVER_SOCKET, INET, sockfd, 0);
215         return sockfd;
216 }
217
218
219 int socket_open_tcp_client(socket_manager* mgr, int port, const char* dest_addr) {
220
221         struct sockaddr_in remoteAddr, localAddr;
222    struct hostent *hptr;
223    int sock_fd;
224
225    // ------------------------------------------------------------------
226    // Create the socket
227    // ------------------------------------------------------------------
228    errno = 0;
229    if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
230            osrfLogWarning( OSRF_LOG_MARK,  "socket_open_tcp_client(): Cannot create TCP socket: %s",
231                         strerror( errno ) );
232       return -1;
233    }
234
235         int i = 1;
236         //osrfLogDebug( OSRF_LOG_MARK, "Setting TCP_NODELAY");
237         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
238
239
240    // ------------------------------------------------------------------
241    // Get the hostname
242    // ------------------------------------------------------------------
243    errno = 0;
244    if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
245            osrfLogWarning(  OSRF_LOG_MARK, "socket_open_tcp_client(): Unknown Host => %s: %s",
246                                                 dest_addr, strerror( errno ) );
247       return -1;
248    }
249
250    // ------------------------------------------------------------------
251    // Construct server info struct
252    // ------------------------------------------------------------------
253    memset( &remoteAddr, 0, sizeof(remoteAddr));
254    remoteAddr.sin_family = AF_INET;
255    remoteAddr.sin_port = htons( port );
256    memcpy( (char*) &remoteAddr.sin_addr.s_addr,
257          hptr->h_addr_list[0], hptr->h_length );
258
259    // ------------------------------------------------------------------
260    // Construct local info struct
261    // ------------------------------------------------------------------
262    memset( &localAddr, 0, sizeof( localAddr ) );
263    localAddr.sin_family = AF_INET;
264    localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
265    localAddr.sin_port = htons(0);
266
267    // ------------------------------------------------------------------
268    // Bind to a local port
269    // ------------------------------------------------------------------
270    errno = 0;
271    if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
272            osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot bind to local port: %s",
273                 strerror( errno ) );
274       return -1;
275    }
276
277    // ------------------------------------------------------------------
278    // Connect to server
279    // ------------------------------------------------------------------
280    errno = 0;
281    if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
282            osrfLogWarning( OSRF_LOG_MARK, "socket_open_tcp_client(): Cannot connect to server %s: %s",
283                    dest_addr, strerror(errno) );
284            return -1;
285    }
286
287         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
288
289    return sock_fd;
290 }
291
292
293 int socket_open_udp_client( 
294                 socket_manager* mgr, int port, char* dest_addr) {
295
296         int sockfd;
297         struct sockaddr_in client_addr, server_addr;
298         struct hostent* host;
299
300         errno = 0;
301         if( (host = gethostbyname(dest_addr)) == NULL) {
302                 osrfLogWarning( OSRF_LOG_MARK, "Unable to resolve host: %s: %s",
303                         dest_addr, strerror( errno ) );
304                 return -1;
305         }
306
307         server_addr.sin_family = host->h_addrtype;
308         memcpy((char *) &server_addr.sin_addr.s_addr,
309                              host->h_addr_list[0], host->h_length);
310         server_addr.sin_port = htons(port);
311
312         errno = 0;
313         if( (sockfd = socket(AF_INET,SOCK_DGRAM,0)) < 0 ) {
314                 osrfLogWarning( OSRF_LOG_MARK, "socket_open_udp_client(): Unable to create UDP socket: %s", strerror( errno ) );
315                 return -1;
316         }
317
318         client_addr.sin_family = AF_INET;
319         client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
320         client_addr.sin_port = htons(0);
321
322         errno = 0;
323         if( (bind(sockfd, (struct sockaddr *) &client_addr, sizeof(client_addr))) < 0 ) {
324                 osrfLogWarning( OSRF_LOG_MARK, "Unable to bind UDP socket: %s", strerror( errno ) );
325                 return -1;
326         }
327
328         _socket_add_node(mgr, CLIENT_SOCKET, INET, sockfd, -1 );
329
330         return sockfd;
331 }
332
333
334 int socket_open_unix_client(socket_manager* mgr, const char* sock_path) {
335
336         int sock_fd, len;
337    struct sockaddr_un usock;
338
339    errno = 0;
340    if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 ) {
341            osrfLogWarning(  OSRF_LOG_MARK, "socket_open_unix_client(): Cannot create UNIX socket: %s", strerror( errno ) );
342                 return -1;
343         }
344
345    usock.sun_family = AF_UNIX;
346    strcpy( usock.sun_path, sock_path );
347
348    len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
349
350    errno = 0;
351    if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 ) {
352            osrfLogWarning(  OSRF_LOG_MARK, "Error connecting to unix socket: %s",
353                         strerror( errno ) );
354                 return -1;
355         }
356
357         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
358
359    return sock_fd;
360 }
361
362
363 /* returns the socket_node with the given sock_fd */
364 static socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
365         if(mgr == NULL) return NULL;
366         socket_node* node = mgr->socket;
367         while(node) {
368                 if(node->sock_fd == sock_fd)
369                         return node;
370                 node = node->next;
371         }
372         return NULL;
373 }
374
375 /* removes the node with the given sock_fd from the list and frees it */
376 static void socket_remove_node(socket_manager* mgr, int sock_fd) {
377
378         if(mgr == NULL) return;
379
380         osrfLogDebug( OSRF_LOG_MARK, "removing socket %d", sock_fd);
381
382         socket_node* head = mgr->socket;
383         socket_node* tail = head;
384         if(head == NULL) return;
385
386         /* if removing the first node in the list */
387         if(head->sock_fd == sock_fd) {
388                 mgr->socket = head->next;
389                 free(head);
390                 return;
391         }
392
393         head = head->next;
394
395         /* if removing any other node */
396         while(head) {
397                 if(head->sock_fd == sock_fd) {
398                         tail->next = head->next;
399                         free(head);
400                         return;
401                 }
402                 tail = head;
403                 head = head->next;
404         }
405 }
406
407
408 void _socket_print_list(socket_manager* mgr) {
409         if(mgr == NULL) return;
410         socket_node* node = mgr->socket;
411         osrfLogDebug( OSRF_LOG_MARK, "socket_node list: [");
412         while(node) {
413                 osrfLogDebug( OSRF_LOG_MARK, "sock_fd: %d | parent_id: %d", 
414                                 node->sock_fd, node->parent_id);
415                 node = node->next;
416         }
417         osrfLogDebug( OSRF_LOG_MARK, "]");
418 }
419
420 /* sends the given data to the given socket */
421 int socket_send(int sock_fd, const char* data) {
422         return _socket_send( sock_fd, data, 0);
423 }
424
425 /* utility method */
426 static int _socket_send(int sock_fd, const char* data, int flags) {
427
428         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
429
430         errno = 0;
431         size_t r = send( sock_fd, data, strlen(data), flags );
432         int local_errno = errno;
433         
434         if( r == -1 ) {
435                 osrfLogWarning( OSRF_LOG_MARK, "_socket_send(): Error sending data with return %d", r );
436                 osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(local_errno));
437                 return -1;
438         }
439
440         return 0;
441 }
442
443
444 /* sends the given data to the given socket. 
445  * sets the send flag MSG_DONTWAIT which will allow the 
446  * process to continue even if the socket buffer is full
447  * returns 0 on success, -1 otherwise */
448 //int socket_send_nowait( int sock_fd, const char* data) {
449 //      return _socket_send( sock_fd, data, MSG_DONTWAIT);
450 //}
451
452
453 /*
454  * Waits at most usecs microseconds for the send buffer of the given
455  * socket to accept new data.  This does not guarantee that the 
456  * socket will accept all the data we want to give it.
457  */
458 int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
459
460         fd_set write_set;
461         FD_ZERO( &write_set );
462         FD_SET( sock_fd, &write_set );
463
464         int mil = 1000000;
465         int secs = (int) usecs / mil;
466         usecs = usecs - (secs * mil);
467
468         struct timeval tv;
469         tv.tv_sec = secs;
470         tv.tv_usec = usecs;
471
472         errno = 0;
473         int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
474         if( ret > 0 ) return _socket_send( sock_fd, data, 0);
475
476         osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
477                 "timed out on send for socket %d after %d secs, %d usecs: %s",
478                 sock_fd, secs, usecs, strerror( errno ) );
479
480         return -1;
481 }
482
483
484 /* disconnects the node with the given sock_fd and removes
485         it from the socket set */
486 void socket_disconnect(socket_manager* mgr, int sock_fd) {
487         osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
488         close( sock_fd );
489         socket_remove_node(mgr, sock_fd);
490 }
491
492
493 /* we assume that if select() fails, the socket is no longer valid */
494 int socket_connected(int sock_fd) {
495         fd_set read_set;
496         FD_ZERO( &read_set );
497         FD_SET( sock_fd, &read_set );
498         if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 ) 
499                 return 0;
500         return 1;
501
502 }
503
504 /* this only waits on the server socket and does not handle the actual
505         data coming in from the client..... XXX */
506 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
507
508         int retval = 0;
509         fd_set read_set;
510         FD_ZERO( &read_set );
511         FD_SET( sock_fd, &read_set );
512
513         struct timeval tv;
514         tv.tv_sec = timeout;
515         tv.tv_usec = 0;
516         errno = 0;
517
518         if( timeout < 0 ) {  
519
520                 // If timeout is -1, we block indefinitely
521                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
522                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
523                         return -1;
524                 }
525
526         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
527
528                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
529                         osrfLogDebug( OSRF_LOG_MARK, "Call to select() interrupted: Sys Error: %s", strerror(errno));
530                         return -1;
531                 }
532         }
533
534         osrfLogInternal( OSRF_LOG_MARK, "%d active sockets after select()", retval);
535         return _socket_route_data_id(mgr, sock_fd);
536 }
537
538
539 int socket_wait_all(socket_manager* mgr, int timeout) {
540
541         if(mgr == NULL) {
542                 osrfLogWarning( OSRF_LOG_MARK,  "socket_wait_all(): null mgr" );
543                 return -1;
544         }
545
546         int retval = 0;
547         fd_set read_set;
548         FD_ZERO( &read_set );
549
550         socket_node* node = mgr->socket;
551         int max_fd = 0;
552         while(node) {
553                 osrfLogInternal( OSRF_LOG_MARK, "Adding socket fd %d to select set",node->sock_fd);
554                 FD_SET( node->sock_fd, &read_set );
555                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
556                 node = node->next;
557         }
558         max_fd += 1;
559
560         struct timeval tv;
561         tv.tv_sec = timeout;
562         tv.tv_usec = 0;
563         errno = 0;
564
565         if( timeout < 0 ) {  
566
567                 // If timeout is -1, there is no timeout passed to the call to select
568                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
569                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
570                         return -1;
571                 }
572
573         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
574
575                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
576                         osrfLogWarning( OSRF_LOG_MARK, "select() call aborted: %s", strerror(errno));
577                         return -1;
578                 }
579         }
580
581         osrfLogDebug( OSRF_LOG_MARK, "%d active sockets after select()", retval);
582         return _socket_route_data(mgr, retval, &read_set);
583 }
584
585 /* iterates over the sockets in the set and handles active sockets.
586         new sockets connecting to server sockets cause the creation
587         of a new socket node.
588         Any new data read is is passed off to the data_received callback
589         as it arrives */
590 /* determines if we're receiving a new client or data
591         on an existing client */
592 static int _socket_route_data(
593         socket_manager* mgr, int num_active, fd_set* read_set) {
594
595         if(!(mgr && read_set)) return -1;
596
597         int last_failed_id = -1;
598
599
600         /* come back here if someone yanks a socket_node from beneath us */
601         while(1) {
602
603                 socket_node* node = mgr->socket;
604                 int handled = 0;
605                 int status = 0;
606                 
607                 while(node && (handled < num_active)) {
608         
609                         int sock_fd = node->sock_fd;
610                         
611                         if(last_failed_id != -1) {
612                                 /* in case it was not removed by our overlords */
613                                 osrfLogInternal( OSRF_LOG_MARK, "Attempting to remove last_failed_id of %d", last_failed_id);
614                                 socket_remove_node( mgr, last_failed_id );
615                                 last_failed_id = -1;
616                                 status = -1;
617                                 break;
618                         }
619         
620                         /* does this socket have data? */
621                         if( FD_ISSET( sock_fd, read_set ) ) {
622         
623                                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d active", sock_fd);
624                                 handled++;
625                                 FD_CLR(sock_fd, read_set);
626         
627                                 if(node->endpoint == SERVER_SOCKET) 
628                                         _socket_handle_new_client(mgr, node);
629         
630                                 else
631                                         status = _socket_handle_client_data(mgr, node);
632         
633                                 /* someone may have yanked a socket_node out from under 
634                                         us...start over with the first socket */
635                                 if(status == -1)  {
636                                         last_failed_id = sock_fd;
637                                         osrfLogInternal( OSRF_LOG_MARK, "Backtracking back to start of loop because "
638                                                         "of -1 return code from _socket_handle_client_data()");
639                                 }
640                         }
641
642                         if(status == -1) break;
643                         node = node->next;
644
645                 } // is_set
646
647                 if(status == 0) break;
648                 if(status == -1) status = 0;
649         } 
650
651         return 0;
652 }
653
654
655 /* routes data from a single known socket */
656 static int _socket_route_data_id( socket_manager* mgr, int sock_id) {
657         socket_node* node = socket_find_node(mgr, sock_id);     
658         int status = 0;
659
660         if(node) {
661                 if(node->endpoint == SERVER_SOCKET) 
662                         _socket_handle_new_client(mgr, node);
663         
664                 if(node->endpoint == CLIENT_SOCKET ) 
665                         status = _socket_handle_client_data(mgr, node);
666
667                 if(status == -1) {
668                         socket_remove_node(mgr, sock_id);
669                         return -1;
670                 }
671                 return 0;
672         } 
673
674         return -1;
675 }
676
677
678 static int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
679         if(mgr == NULL || node == NULL) return -1;
680
681         errno = 0;
682         int new_sock_fd;
683         new_sock_fd = accept(node->sock_fd, NULL, NULL);
684         if(new_sock_fd < 0) {
685                 osrfLogWarning( OSRF_LOG_MARK, "_socket_handle_new_client(): accept() failed: %s",
686                         strerror( errno ) );
687                 return -1;
688         }
689
690         if(node->addr_type == INET) {
691                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
692                 osrfLogDebug( OSRF_LOG_MARK, "Adding new INET client for %d", node->sock_fd);
693
694         } else if(node->addr_type == UNIX) {
695                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
696                 osrfLogDebug( OSRF_LOG_MARK, "Adding new UNIX client for %d", node->sock_fd);
697         }
698
699         return 0;
700 }
701
702
703 static int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
704         if(mgr == NULL || node == NULL) return -1;
705
706         char buf[RBUFSIZE];
707         int read_bytes;
708         int sock_fd = node->sock_fd;
709
710         memset(buf, 0, RBUFSIZE);
711         set_fl(sock_fd, O_NONBLOCK);
712
713         osrfLogInternal( OSRF_LOG_MARK, "%ld : Received data at %f\n", (long) getpid(), get_timestamp_millis());
714
715         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
716                 osrfLogInternal( OSRF_LOG_MARK, "Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
717                 if(mgr->data_received)
718                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
719
720                 memset(buf, 0, RBUFSIZE);
721         }
722     int local_errno = errno; /* capture errno as set by recv() */
723
724         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
725                 clr_fl(sock_fd, O_NONBLOCK); 
726                 if(read_bytes < 0) { 
727                         if(local_errno != EAGAIN) 
728                                 osrfLogWarning(OSRF_LOG_MARK,  " * Error reading socket with error %s", strerror(local_errno));
729                 }
730
731         } else { return -1; } /* inform the caller that this node has been tampered with */
732
733         if(read_bytes == 0) {  /* socket closed by client */
734                 if(mgr->on_socket_closed) {
735                         mgr->on_socket_closed(mgr->blob, sock_fd);
736                 }
737                 return -1;
738         }
739
740         return 0;
741
742 }
743
744
745 void socket_manager_free(socket_manager* mgr) {
746         if(mgr == NULL) return;
747         socket_node* tmp;
748         while(mgr->socket) {
749                 tmp = mgr->socket->next;
750                 socket_disconnect(mgr, mgr->socket->sock_fd);
751                 mgr->socket = tmp;
752         }
753         free(mgr);
754
755 }
756