41012c5d1d55c5cd57fd832caa2a86d32b61c39b
[OpenSRF.git] / src / utils / socket_bundle.c
1 #include "socket_bundle.h"
2
3 /* -------------------------------------------------------------------- 
4         Test Code 
5         -------------------------------------------------------------------- */
6 /*
7 int count = 0;
8 void printme(void* blob, socket_manager* mgr, 
9                 int sock_fd, char* data, int parent_id) {
10
11         fprintf(stderr, "Got data from socket %d with parent %d => %s", 
12                         sock_fd, parent_id, data );
13
14         socket_send(sock_fd, data);
15
16         if(count++ > 2) {
17                 socket_disconnect(mgr, sock_fd);
18                 _socket_print_list(mgr);
19         }
20 }
21
22 int main(int argc, char* argv[]) {
23         socket_manager manager;
24         memset(&manager, 0, sizeof(socket_manager));
25         int port = 11000;
26         if(argv[1])
27                 port = atoi(argv[1]);
28
29         manager.data_received = &printme;
30         socket_open_tcp_server(&manager, port);
31
32         while(1)
33                 socket_wait_all(&manager, -1);
34
35         return 0;
36 }
37 */
38 /* -------------------------------------------------------------------- */
39
40
41 /*
42 int debug_handler(char* msg, ...) {
43         va_list args;
44         va_start(args, msg);
45         vfprintf(stderr, msg, args);
46         va_end(args);
47         fprintf( stderr, "\n" );
48         return -1;
49 }
50
51 int warning_handler(char* msg, ...) {
52         va_list args;
53         va_start(args, msg);
54         vfprintf(stderr, msg, args);
55         va_end(args);
56         fprintf( stderr, "\n" );
57         return -1;
58 }
59 */
60
61
62 socket_node* _socket_add_node(socket_manager* mgr, 
63                 int endpoint, int addr_type, int sock_fd, int parent_id ) {
64
65         if(mgr == NULL) return NULL;
66         debug_handler("Adding socket node with fd %d", sock_fd);
67         socket_node* new_node = safe_malloc(sizeof(socket_node));
68
69         new_node->endpoint      = endpoint;
70         new_node->addr_type     = addr_type;
71         new_node->sock_fd               = sock_fd;
72         new_node->next                  = NULL;
73         new_node->parent_id = 0;
74         if(parent_id > 0)
75                 new_node->parent_id = parent_id;
76
77         new_node->next                  = mgr->socket;
78         mgr->socket                             = new_node;
79         return new_node;
80 }
81
82 /* creates a new server socket node and adds it to the socket set.
83         returns new socket fd on success.  -1 on failure.
84         socket_type is one of INET or UNIX  */
85 int socket_open_tcp_server(socket_manager* mgr, int port, char* listen_ip) {
86
87         if( mgr == NULL ) return warning_handler("socket_open_tcp_server(): NULL mgr"); 
88
89         int sock_fd;
90         struct sockaddr_in server_addr;
91
92         sock_fd = socket(AF_INET, SOCK_STREAM, 0);
93
94         if(sock_fd < 0) 
95                 return warning_handler("tcp_server_connect(): Unable to create socket");
96
97         server_addr.sin_family = AF_INET;
98
99         if(listen_ip != NULL) {
100                 server_addr.sin_addr.s_addr = inet_addr(listen_ip);
101         } else {
102                 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
103         }
104
105         server_addr.sin_port = htons(port);
106
107         if(bind( sock_fd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0)
108                 return warning_handler("tcp_server_connect(): cannot bind to port %d", port );
109
110         if(listen(sock_fd, 20) == -1) 
111                 return warning_handler("tcp_server_connect(): listen() returned error");
112
113         _socket_add_node(mgr, SERVER_SOCKET, INET, sock_fd, 0);
114         return sock_fd;
115 }
116
117 int socket_open_unix_server(socket_manager* mgr, char* path) {
118         if(mgr == NULL || path == NULL) return -1;
119
120         debug_handler("opening unix socket at %s", path);
121         int sock_fd;
122         struct sockaddr_un server_addr;
123
124         sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
125         if(sock_fd < 0)
126                 return warning_handler("socket_open_unix_server(): socket() failed");
127
128         server_addr.sun_family = AF_UNIX;
129         strcpy(server_addr.sun_path, path);
130
131         if( bind(sock_fd, (struct sockaddr*) &server_addr, 
132                                 sizeof(struct sockaddr_un)) < 0) {
133                 return warning_handler(
134                         "socket_open_unix_server(): cannot bind to unix port %s", path );
135         }
136
137         if(listen(sock_fd, 20) == -1) 
138                 return warning_handler("socket_open_unix_server(): listen() returned error");
139
140         debug_handler("unix socket successfully opened");
141         
142         int i = 1;
143
144         /* causing problems with router for some reason ... */
145         //debug_handler("Setting SO_REUSEADDR");
146         //setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
147         
148         debug_handler("Setting TCP_NODELAY");
149         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
150
151         _socket_add_node(mgr, SERVER_SOCKET, UNIX, sock_fd, 0);
152         return sock_fd;
153 }
154
155
156 int socket_open_tcp_client(socket_manager* mgr, int port, char* dest_addr) {
157
158         struct sockaddr_in remoteAddr, localAddr;
159    struct hostent *hptr;
160    int sock_fd;
161
162    // ------------------------------------------------------------------
163    // Create the socket
164    // ------------------------------------------------------------------
165    if( (sock_fd = socket( AF_INET, SOCK_STREAM, 0 )) < 0 ) {
166       warning_handler( "tcp_connect(): Cannot create socket" );
167       return -1;
168    }
169
170         int i = 1;
171         debug_handler("Setting TCP_NODELAY");
172         setsockopt(sock_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i));
173
174
175    // ------------------------------------------------------------------
176    // Get the hostname
177    // ------------------------------------------------------------------
178    if( (hptr = gethostbyname( dest_addr ) ) == NULL ) {
179       warning_handler( "tcp_connect(): Unknown Host => %s", dest_addr );
180       return -1;
181    }
182
183    // ------------------------------------------------------------------
184    // Construct server info struct
185    // ------------------------------------------------------------------
186    memset( &remoteAddr, 0, sizeof(remoteAddr));
187    remoteAddr.sin_family = AF_INET;
188    remoteAddr.sin_port = htons( port );
189    memcpy( (char*) &remoteAddr.sin_addr.s_addr,
190          hptr->h_addr_list[0], hptr->h_length );
191
192    // ------------------------------------------------------------------
193    // Construct local info struct
194    // ------------------------------------------------------------------
195    memset( &localAddr, 0, sizeof( localAddr ) );
196    localAddr.sin_family = AF_INET;
197    localAddr.sin_addr.s_addr = htonl( INADDR_ANY );
198    localAddr.sin_port = htons(0);
199
200    // ------------------------------------------------------------------
201    // Bind to a local port
202    // ------------------------------------------------------------------
203    if( bind( sock_fd, (struct sockaddr *) &localAddr, sizeof( localAddr ) ) < 0 ) {
204       warning_handler( "tcp_connect(): Cannot bind to local port" );
205       return -1;
206    }
207
208    // ------------------------------------------------------------------
209    // Connect to server
210    // ------------------------------------------------------------------
211    if( connect( sock_fd, (struct sockaddr*) &remoteAddr, sizeof( struct sockaddr_in ) ) < 0 ) {
212       warning_handler( "tcp_connect(): Cannot connect to server %s", dest_addr );
213       return -1;
214    }
215
216         _socket_add_node(mgr, CLIENT_SOCKET, INET, sock_fd, -1 );
217
218    return sock_fd;
219 }
220
221
222 int socket_open_unix_client(socket_manager* mgr, char* sock_path) {
223
224         int sock_fd, len;
225    struct sockaddr_un usock;
226
227    if( (sock_fd = socket( AF_UNIX, SOCK_STREAM, 0 )) < 0 )
228       return warning_handler( "Cannot create socket" );
229
230    usock.sun_family = AF_UNIX;
231    strcpy( usock.sun_path, sock_path );
232
233    len = sizeof( usock.sun_family ) + strlen( usock.sun_path );
234
235    if( connect( sock_fd, (struct sockaddr *) &usock, len ) < 0 )
236       return warning_handler( "Error connecting to unix socket" );
237
238         _socket_add_node(mgr, CLIENT_SOCKET, UNIX, sock_fd, -1 );
239
240    return sock_fd;
241 }
242
243
244
245 /* returns the socket_node with the given sock_fd */
246 socket_node* socket_find_node(socket_manager* mgr, int sock_fd) {
247         if(mgr == NULL) return NULL;
248         socket_node* node = mgr->socket;
249         while(node) {
250                 if(node->sock_fd == sock_fd)
251                         return node;
252                 node = node->next;
253         }
254         return NULL;
255 }
256
257 /* removes the node with the given sock_fd from the list and frees it */
258 void socket_remove_node(socket_manager* mgr, int sock_fd) {
259
260         if(mgr == NULL) return;
261
262         debug_handler("removing socket %d", sock_fd);
263
264         socket_node* head = mgr->socket;
265         socket_node* tail = head;
266         if(head == NULL) return;
267
268         /* if removing the first node in the list */
269         if(head->sock_fd == sock_fd) {
270                 mgr->socket = head->next;
271                 free(head);
272                 debug_handler("removing first socket in list");
273                 return;
274         }
275
276         head = head->next;
277
278         /* if removing any other node */
279         while(head) {
280                 if(head->sock_fd == sock_fd) {
281                         tail->next = head->next;
282                         free(head);
283                         return;
284                 }
285                 tail = head;
286                 head = head->next;
287         }
288 }
289
290
291
292 void _socket_print_list(socket_manager* mgr) {
293         if(mgr == NULL) return;
294         socket_node* node = mgr->socket;
295         debug_handler("socket_node list: [");
296         while(node) {
297                 debug_handler("sock_fd: %d | parent_id: %d", 
298                                 node->sock_fd, node->parent_id);
299                 node = node->next;
300         }
301         debug_handler("]");
302 }
303
304 /* sends the given data to the given socket */
305 int socket_send(int sock_fd, const char* data) {
306         debug_handler( "socket_bundle sending to %d data %s",
307                 sock_fd, data);
308
309         debug_handler("%d : Sending data at %lf\n", getpid(), get_timestamp_millis());
310         signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
311         if( send( sock_fd, data, strlen(data), 0 ) < 0 ) {
312                 return warning_handler( "tcp_server_send(): Error sending data" );
313         }
314
315         return 0;
316 }
317
318 /* disconnects the node with the given sock_fd and removes
319         it from the socket set */
320 void socket_disconnect(socket_manager* mgr, int sock_fd) {
321
322         debug_handler("Closing socket %d", sock_fd);
323
324         if( shutdown( sock_fd, SHUT_RDWR ) )
325                 warning_handler( "socket_disconnect(): Error shuting down socket, removing anyway" );
326
327         if( close( sock_fd ) == -1 ) 
328                 warning_handler( "socket_disconnect(): Error closing socket, removing anyway" );
329
330         if(mgr != NULL) 
331                 socket_remove_node(mgr, sock_fd);
332         
333 }
334
335
336 /* we assume that if select() fails, the socket is no longer valid */
337 int socket_connected(int sock_fd) {
338         fd_set read_set;
339         FD_ZERO( &read_set );
340         FD_SET( sock_fd, &read_set );
341         if( select( sock_fd + 1, &read_set, NULL, NULL, NULL) == -1 ) 
342                 return 0;
343         return 1;
344
345 }
346
347 /* this only waits on the server socket and does not handle the actual
348         data coming in from the client..... XXX */
349 int socket_wait(socket_manager* mgr, int timeout, int sock_fd) {
350
351         int retval = 0;
352         fd_set read_set;
353         FD_ZERO( &read_set );
354         FD_SET( sock_fd, &read_set );
355
356         struct timeval tv;
357         tv.tv_sec = timeout;
358         tv.tv_usec = 0;
359
360         if( timeout < 0 ) {  
361
362                 // If timeout is -1, we block indefinitely
363                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) {
364                         warning_handler("Sys Error: %s", strerror(errno));
365                         return warning_handler("Call to select interrupted");
366                 }
367
368         } else if( timeout > 0 ) { /* timeout of 0 means don't block */
369
370                 if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) {
371                         warning_handler("Sys Error: %s", strerror(errno));
372                         return warning_handler( "Call to select interrupted" );
373                 }
374         }
375
376         debug_handler("%d active sockets after select()", retval);
377         return _socket_route_data_id(mgr, sock_fd);
378 }
379
380
381 int socket_wait_all(socket_manager* mgr, int timeout) {
382
383         if(mgr == NULL) return warning_handler( "tcp_wait(): null mgr" );
384
385         int retval = 0;
386         fd_set read_set;
387         FD_ZERO( &read_set );
388
389         socket_node* node = mgr->socket;
390         int max_fd = 0;
391         while(node) {
392                 //debug_handler("Adding socket %d to select set",node->sock_fd);
393                 FD_SET( node->sock_fd, &read_set );
394                 if(node->sock_fd > max_fd) max_fd = node->sock_fd;
395                 node = node->next;
396         }
397         max_fd += 1;
398
399         struct timeval tv;
400         tv.tv_sec = timeout;
401         tv.tv_usec = 0;
402
403         if( timeout < 0 ) {  
404
405                 // If timeout is -1, there is no timeout passed to the call to select
406                 if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) {
407                         warning_handler("Sys Error: %s", strerror(errno));
408                         return warning_handler("Call to select interrupted");
409                 }
410
411         } else if( timeout != 0 ) { /* timeout of 0 means don't block */
412
413                 if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) {
414                         warning_handler("Sys Error: %s", strerror(errno));
415                         return warning_handler( "Call to select interrupted" );
416                 }
417         }
418
419         debug_handler("%d active sockets after select()", retval);
420         return _socket_route_data(mgr, retval, &read_set);
421 }
422
423 /* determines if we'er receiving a new client or data
424         on an existing client */
425 int _socket_route_data(
426         socket_manager* mgr, int num_active, fd_set* read_set) {
427
428         if(!(mgr && read_set)) return -1;
429
430         int last_failed_id = -1;
431
432
433         /* come back here if someone yanks a socket_node from beneath us */
434         while(1) {
435
436                 socket_node* node = mgr->socket;
437                 int handled = 0;
438                 int status = 0;
439                 
440                 while(node && (handled < num_active)) {
441         
442                         int sock_fd = node->sock_fd;
443                         
444                         if(last_failed_id != -1) {
445                                 /* in case it was not removed by our overlords */
446                                 debug_handler("Attempting to remove last_failed_id of %d", last_failed_id);
447                                 socket_remove_node( mgr, last_failed_id );
448                                 last_failed_id = -1;
449                                 status = -1;
450                                 break;
451                         }
452         
453                         /* does this socket have data? */
454                         if( FD_ISSET( sock_fd, read_set ) ) {
455         
456                                 debug_handler("Socket %d active", sock_fd);
457                                 handled++;
458                                 FD_CLR(sock_fd, read_set);
459         
460                                 if(node->endpoint == SERVER_SOCKET) 
461                                         _socket_handle_new_client(mgr, node);
462         
463                                 else
464                                         status = _socket_handle_client_data(mgr, node);
465         
466                                 /* someone may have yanked a socket_node out from under 
467                                         us...start over with the first socket */
468                                 if(status == -1)  {
469                                         last_failed_id = sock_fd;
470                                         debug_handler("Backtracking back to start of loop because "
471                                                         "of -1 return code from _socket_handle_client_data()");
472                                 }
473                         }
474
475                         if(status == -1) break;
476                         node = node->next;
477
478                 } // is_set
479
480                 if(status == 0) break;
481                 if(status == -1) status = 0;
482         } 
483
484         return 0;
485 }
486
487
488 int _socket_route_data_id( socket_manager* mgr, int sock_id) {
489         socket_node* node = socket_find_node(mgr, sock_id);     
490         int status = 0;
491
492         if(node) {
493                 if(node->endpoint == SERVER_SOCKET) 
494                         _socket_handle_new_client(mgr, node);
495         
496                 if(node->endpoint == CLIENT_SOCKET ) 
497                         status = _socket_handle_client_data(mgr, node);
498
499                 if(status == -1) socket_remove_node(mgr, sock_id);
500                 return 0;
501         } 
502
503         return -1;
504 }
505
506
507 int _socket_handle_new_client(socket_manager* mgr, socket_node* node) {
508         if(mgr == NULL || node == NULL) return -1;
509
510         int new_sock_fd;
511         new_sock_fd = accept(node->sock_fd, NULL, NULL);
512         if(new_sock_fd < 0)
513                 return warning_handler("_socket_route_data(): accept() failed");
514
515         if(node->addr_type == INET) {
516                 _socket_add_node(mgr, CLIENT_SOCKET, INET, new_sock_fd, node->sock_fd);
517                 debug_handler("Adding new INET client for %d", node->sock_fd);
518
519         } else if(node->addr_type == UNIX) {
520                 _socket_add_node(mgr, CLIENT_SOCKET, UNIX, new_sock_fd, node->sock_fd);
521                 debug_handler("Adding new UNIX client for %d", node->sock_fd);
522         }
523
524         return 0;
525 }
526
527
528 int _socket_handle_client_data(socket_manager* mgr, socket_node* node) {
529         if(mgr == NULL || node == NULL) return -1;
530
531         char buf[RBUFSIZE];
532         int read_bytes;
533         int sock_fd = node->sock_fd;
534
535         memset(buf, 0, RBUFSIZE);
536         set_fl(sock_fd, O_NONBLOCK);
537         debug_handler("Gathering client data for %d", node->sock_fd);
538
539         debug_handler("%d : Received data at %lf\n", getpid(), get_timestamp_millis());
540
541         while( (read_bytes = recv(sock_fd, buf, RBUFSIZE-1, 0) ) > 0 ) {
542                 debug_handler("Socket %d Read %d bytes and data: %s", sock_fd, read_bytes, buf);
543                 if(mgr->data_received)
544                         mgr->data_received(mgr->blob, mgr, sock_fd, buf, node->parent_id);
545
546                 memset(buf, 0, RBUFSIZE);
547         }
548
549         if(socket_find_node(mgr, sock_fd)) {  /* someone may have closed this socket */
550                 clr_fl(sock_fd, O_NONBLOCK); 
551                 if(read_bytes < 0) { 
552                         if( errno != EAGAIN ) 
553                                 warning_handler( " * Error reading socket with errno %d", errno );
554                 }
555
556         } else { return -1; } /* inform the caller that this node has been tampered with */
557
558         if(read_bytes == 0) {  /* socket closed by client */
559                 if(mgr->on_socket_closed) {
560                         mgr->on_socket_closed(mgr->blob, sock_fd);
561                         return -1;
562                 }
563         }
564
565         return 0;
566
567 }
568
569
570 void socket_manager_free(socket_manager* mgr) {
571         if(mgr == NULL) return;
572         socket_node* tmp;
573         while(mgr->socket) {
574                 tmp = mgr->socket->next;
575                 socket_disconnect(mgr, mgr->socket->sock_fd);
576                 mgr->socket = tmp;
577         }
578         free(mgr);
579
580 }