Merge commit 'ronnie/master'
[sahlberg/ctdb.git] / tcp / tcp_connect.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5    Copyright (C) Ronnie Sahlberg  2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "ctdb_tcp.h"
28
29 /*
30   stop any connecting (established or pending) to a node
31  */
32 void ctdb_tcp_stop_connection(struct ctdb_node *node)
33 {
34         struct ctdb_tcp_node *tnode = talloc_get_type(
35                 node->private_data, struct ctdb_tcp_node);
36         
37         ctdb_queue_set_fd(tnode->out_queue, -1);
38         talloc_free(tnode->connect_te);
39         talloc_free(tnode->connect_fde);
40         tnode->connect_fde = NULL;
41         tnode->connect_te = NULL;
42         if (tnode->fd != -1) {
43                 close(tnode->fd);
44                 tnode->fd = -1;
45         }
46 }
47
48
49 /*
50   called when a complete packet has come in - should not happen on this socket
51   unless the other side closes the connection with RST or FIN
52  */
53 void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
54 {
55         struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node);
56         struct ctdb_tcp_node *tnode = talloc_get_type(
57                 node->private_data, struct ctdb_tcp_node);
58
59         if (data == NULL) {
60                 node->ctdb->upcalls->node_dead(node);
61         }
62
63         ctdb_tcp_stop_connection(node);
64         tnode->connect_te = event_add_timed(node->ctdb->ev, tnode,
65                                             timeval_current_ofs(3, 0),
66                                             ctdb_tcp_node_connect, node);
67 }
68
69 /*
70   called when socket becomes writeable on connect
71 */
72 static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, 
73                                     uint16_t flags, void *private_data)
74 {
75         struct ctdb_node *node = talloc_get_type(private_data,
76                                                  struct ctdb_node);
77         struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data,
78                                                       struct ctdb_tcp_node);
79         struct ctdb_context *ctdb = node->ctdb;
80         int error = 0;
81         socklen_t len = sizeof(error);
82         int one = 1;
83
84         talloc_free(tnode->connect_te);
85         tnode->connect_te = NULL;
86
87         if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
88             error != 0) {
89                 ctdb_tcp_stop_connection(node);
90                 tnode->connect_te = event_add_timed(ctdb->ev, tnode, 
91                                                     timeval_current_ofs(1, 0),
92                                                     ctdb_tcp_node_connect, node);
93                 return;
94         }
95
96         talloc_free(tnode->connect_fde);
97         tnode->connect_fde = NULL;
98
99         setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
100         setsockopt(tnode->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
101
102         ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
103
104         /* the queue subsystem now owns this fd */
105         tnode->fd = -1;
106 }
107
108
109 static int ctdb_tcp_get_address(struct ctdb_context *ctdb,
110                                 const char *address, ctdb_sock_addr *addr)
111 {
112         if (parse_ip(address, NULL, addr) == 0) {
113                 DEBUG(DEBUG_CRIT, (__location__ " Unparsable address : %s.\n", address));
114                 return -1;
115         }
116         return 0;
117 }
118
119 /*
120   called when we should try and establish a tcp connection to a node
121 */
122 void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, 
123                            struct timeval t, void *private_data)
124 {
125         struct ctdb_node *node = talloc_get_type(private_data,
126                                                  struct ctdb_node);
127         struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, 
128                                                       struct ctdb_tcp_node);
129         struct ctdb_context *ctdb = node->ctdb;
130         ctdb_sock_addr sock_in;
131         int sockin_size;
132         ctdb_sock_addr sock_out;
133
134         ctdb_tcp_stop_connection(node);
135
136         ZERO_STRUCT(sock_out);
137 #ifdef HAVE_SOCK_SIN_LEN
138         sock_out.ip.sin_len = sizeof(sock_out);
139 #endif
140         if (ctdb_tcp_get_address(ctdb, node->address.address, &sock_out) != 0) {
141                 return;
142         }
143         switch (sock_out.sa.sa_family) {
144         case AF_INET:
145                 sock_out.ip.sin_port = htons(node->address.port);
146                 break;
147         case AF_INET6:
148                 sock_out.ip6.sin6_port = htons(node->address.port);
149                 break;
150         default:
151                 DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
152                         sock_out.sa.sa_family));
153                 return;
154         }
155
156         tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
157         set_nonblocking(tnode->fd);
158         set_close_on_exec(tnode->fd);
159
160         /* Bind our side of the socketpair to the same address we use to listen
161          * on incoming CTDB traffic.
162          * We must specify this address to make sure that the address we expose to
163          * the remote side is actually routable in case CTDB traffic will run on
164          * a dedicated non-routeable network.
165          */
166         ZERO_STRUCT(sock_in);
167         if (ctdb_tcp_get_address(ctdb, ctdb->address.address, &sock_in) != 0) {
168                 return;
169         }
170         switch (sock_in.sa.sa_family) {
171         case AF_INET:
172                 sockin_size = sizeof(sock_in.ip);
173                 break;
174         case AF_INET6:
175                 sockin_size = sizeof(sock_in.ip6);
176                 break;
177         default:
178                 DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
179                         sock_in.sa.sa_family));
180                 return;
181         }
182 #ifdef HAVE_SOCK_SIN_LEN
183         sock_in.ip.sin_len = sockin_size;
184 #endif
185         bind(tnode->fd, (struct sockaddr *)&sock_in, sockin_size);
186
187         if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 &&
188             errno != EINPROGRESS) {
189                 ctdb_tcp_stop_connection(node);
190                 tnode->connect_te = event_add_timed(ctdb->ev, tnode, 
191                                                     timeval_current_ofs(1, 0),
192                                                     ctdb_tcp_node_connect, node);
193                 return;
194         }
195
196         /* non-blocking connect - wait for write event */
197         tnode->connect_fde = event_add_fd(node->ctdb->ev, tnode, tnode->fd, 
198                                           EVENT_FD_WRITE|EVENT_FD_READ, 
199                                           ctdb_node_connect_write, node);
200
201         /* don't give it long to connect - retry in one second. This ensures
202            that we find a node is up quickly (tcp normally backs off a syn reply
203            delay by quite a lot) */
204         tnode->connect_te = event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0), 
205                                             ctdb_tcp_node_connect, node);
206 }
207
208 /*
209   called when we get contacted by another node
210   currently makes no attempt to check if the connection is really from a ctdb
211   node in our cluster
212 */
213 static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, 
214                               uint16_t flags, void *private_data)
215 {
216         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
217         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
218         ctdb_sock_addr addr;
219         socklen_t len;
220         int fd, nodeid;
221         struct ctdb_incoming *in;
222         int one = 1;
223         const char *incoming_node;
224
225         memset(&addr, 0, sizeof(addr));
226         len = sizeof(addr);
227         fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
228         if (fd == -1) return;
229
230         incoming_node = ctdb_addr_to_str(&addr);
231         nodeid = ctdb_ip_to_nodeid(ctdb, incoming_node);
232
233         if (nodeid == -1) {
234                 DEBUG(DEBUG_ERR, ("Refused connection from unknown node %s\n", incoming_node));
235                 close(fd);
236                 return;
237         }
238
239         in = talloc_zero(ctcp, struct ctdb_incoming);
240         in->fd = fd;
241         in->ctdb = ctdb;
242
243         set_nonblocking(in->fd);
244         set_close_on_exec(in->fd);
245
246         setsockopt(in->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
247
248         in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT, 
249                                      ctdb_tcp_read_cb, in);
250 }
251
252
253 /*
254   automatically find which address to listen on
255 */
256 static int ctdb_tcp_listen_automatic(struct ctdb_context *ctdb)
257 {
258         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
259                                                 struct ctdb_tcp);
260         ctdb_sock_addr sock;
261         int lock_fd, i;
262         const char *lock_path = "/tmp/.ctdb_socket_lock";
263         struct flock lock;
264         int one = 1;
265         int sock_size;
266
267         /* in order to ensure that we don't get two nodes with the
268            same adddress, we must make the bind() and listen() calls
269            atomic. The SO_REUSEADDR setsockopt only prevents double
270            binds if the first socket is in LISTEN state  */
271         lock_fd = open(lock_path, O_RDWR|O_CREAT, 0666);
272         if (lock_fd == -1) {
273                 DEBUG(DEBUG_CRIT,("Unable to open %s\n", lock_path));
274                 return -1;
275         }
276
277         lock.l_type = F_WRLCK;
278         lock.l_whence = SEEK_SET;
279         lock.l_start = 0;
280         lock.l_len = 1;
281         lock.l_pid = 0;
282
283         if (fcntl(lock_fd, F_SETLKW, &lock) != 0) {
284                 DEBUG(DEBUG_CRIT,("Unable to lock %s\n", lock_path));
285                 close(lock_fd);
286                 return -1;
287         }
288
289         for (i=0;i<ctdb->num_nodes;i++) {
290                 /* if node_ip is specified we will only try to bind to that
291                    ip.
292                 */
293                 if (ctdb->node_ip != NULL) {
294                         if (strcmp(ctdb->node_ip, ctdb->nodes[i]->address.address)) {
295                                 continue;
296                         }
297                 }
298
299                 ZERO_STRUCT(sock);
300                 if (ctdb_tcp_get_address(ctdb,
301                                 ctdb->nodes[i]->address.address, 
302                                 &sock) != 0) {
303                         continue;
304                 }
305         
306                 switch (sock.sa.sa_family) {
307                 case AF_INET:
308                         sock.ip.sin_port = htons(ctdb->nodes[i]->address.port);
309                         sock_size = sizeof(sock.ip);
310                         break;
311                 case AF_INET6:
312                         sock.ip6.sin6_port = htons(ctdb->nodes[i]->address.port);
313                         sock_size = sizeof(sock.ip6);
314                         break;
315                 default:
316                         DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
317                                 sock.sa.sa_family));
318                         continue;
319                 }
320 #ifdef HAVE_SOCK_SIN_LEN
321                 sock.ip.sin_len = sock_size;
322 #endif
323
324                 ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
325                 if (ctcp->listen_fd == -1) {
326                         ctdb_set_error(ctdb, "socket failed\n");
327                         continue;
328                 }
329
330                 set_close_on_exec(ctcp->listen_fd);
331
332                 setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
333
334                 if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) == 0) {
335                         DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n", strerror(errno), errno));
336                         break;
337                 }
338         }
339         
340         if (i == ctdb->num_nodes) {
341                 DEBUG(DEBUG_CRIT,("Unable to bind to any of the node addresses - giving up\n"));
342                 goto failed;
343         }
344         ctdb->address.address = talloc_strdup(ctdb, ctdb->nodes[i]->address.address);
345         ctdb->address.port    = ctdb->nodes[i]->address.port;
346         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
347                                      ctdb->address.address, 
348                                      ctdb->address.port);
349         ctdb->pnn = ctdb->nodes[i]->pnn;
350         ctdb->nodes[i]->flags &= ~NODE_FLAGS_DISCONNECTED;
351         DEBUG(DEBUG_INFO,("ctdb chose network address %s:%u pnn %u\n", 
352                  ctdb->address.address, 
353                  ctdb->address.port, 
354                  ctdb->pnn));
355         /* do we start out in DISABLED mode? */
356         if (ctdb->start_as_disabled != 0) {
357                 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
358                 ctdb->nodes[i]->flags |= NODE_FLAGS_DISABLED;
359         }
360         
361         if (listen(ctcp->listen_fd, 10) == -1) {
362                 goto failed;
363         }
364
365         event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
366                      ctdb_listen_event, ctdb);  
367
368         close(lock_fd);
369         return 0;
370         
371 failed:
372         close(lock_fd);
373         close(ctcp->listen_fd);
374         ctcp->listen_fd = -1;
375         return -1;
376 }
377
378
379 /*
380   listen on our own address
381 */
382 int ctdb_tcp_listen(struct ctdb_context *ctdb)
383 {
384         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
385                                                 struct ctdb_tcp);
386         ctdb_sock_addr sock;
387         int sock_size;
388         int one = 1;
389
390         /* we can either auto-bind to the first available address, or we can
391            use a specified address */
392         if (!ctdb->address.address) {
393                 return ctdb_tcp_listen_automatic(ctdb);
394         }
395
396         ZERO_STRUCT(sock);
397         if (ctdb_tcp_get_address(ctdb, ctdb->address.address, 
398                                  &sock) != 0) {
399                 goto failed;
400         }
401         
402         switch (sock.sa.sa_family) {
403         case AF_INET:
404                 sock.ip.sin_port = htons(ctdb->address.port);
405                 sock_size = sizeof(sock.ip);
406                 break;
407         case AF_INET6:
408                 sock.ip6.sin6_port = htons(ctdb->address.port);
409                 sock_size = sizeof(sock.ip6);
410                 break;
411         default:
412                 DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
413                         sock.sa.sa_family));
414                 goto failed;
415         }
416 #ifdef HAVE_SOCK_SIN_LEN
417         sock.ip.sin_len = sock_size;
418 #endif
419
420         ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
421         if (ctcp->listen_fd == -1) {
422                 ctdb_set_error(ctdb, "socket failed\n");
423                 return -1;
424         }
425
426         set_close_on_exec(ctcp->listen_fd);
427
428         setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
429
430         if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) != 0) {
431                 DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n", strerror(errno), errno));
432                 goto failed;
433         }
434
435         if (listen(ctcp->listen_fd, 10) == -1) {
436                 goto failed;
437         }
438
439         event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
440                      ctdb_listen_event, ctdb);  
441
442         return 0;
443
444 failed:
445         if (ctcp->listen_fd != -1) {
446                 close(ctcp->listen_fd);
447         }
448         ctcp->listen_fd = -1;
449         return -1;
450 }
451