82f2e746192e061f237d54fe4df7d31f3e546437
[samba.git] / ctdb / tcp / tcp_connect.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5    Copyright (C) Ronnie Sahlberg  2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24
25 #include <talloc.h>
26 #include <tevent.h>
27
28 #include "lib/util/debug.h"
29 #include "lib/util/time.h"
30 #include "lib/util/blocking.h"
31
32 #include "ctdb_private.h"
33
34 #include "common/system.h"
35 #include "common/common.h"
36 #include "common/logging.h"
37
38 #include "ctdb_tcp.h"
39
40 /*
41   stop any connecting (established or pending) to a node
42  */
43 void ctdb_tcp_stop_connection(struct ctdb_node *node)
44 {
45         struct ctdb_tcp_node *tnode = talloc_get_type(
46                 node->private_data, struct ctdb_tcp_node);
47         
48         ctdb_queue_set_fd(tnode->out_queue, -1);
49         talloc_free(tnode->connect_te);
50         talloc_free(tnode->connect_fde);
51         tnode->connect_fde = NULL;
52         tnode->connect_te = NULL;
53         if (tnode->fd != -1) {
54                 close(tnode->fd);
55                 tnode->fd = -1;
56         }
57 }
58
59
60 /*
61   called when a complete packet has come in - should not happen on this socket
62   unless the other side closes the connection with RST or FIN
63  */
64 void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
65 {
66         struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node);
67         struct ctdb_tcp_node *tnode = talloc_get_type(
68                 node->private_data, struct ctdb_tcp_node);
69
70         if (data == NULL) {
71                 node->ctdb->upcalls->node_dead(node);
72         }
73
74         ctdb_tcp_stop_connection(node);
75         tnode->connect_te = tevent_add_timer(node->ctdb->ev, tnode,
76                                              timeval_current_ofs(3, 0),
77                                              ctdb_tcp_node_connect, node);
78 }
79
80 /*
81   called when socket becomes writeable on connect
82 */
83 static void ctdb_node_connect_write(struct tevent_context *ev,
84                                     struct tevent_fd *fde,
85                                     uint16_t flags, void *private_data)
86 {
87         struct ctdb_node *node = talloc_get_type(private_data,
88                                                  struct ctdb_node);
89         struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data,
90                                                       struct ctdb_tcp_node);
91         struct ctdb_context *ctdb = node->ctdb;
92         int error = 0;
93         socklen_t len = sizeof(error);
94         int one = 1;
95
96         talloc_free(tnode->connect_te);
97         tnode->connect_te = NULL;
98
99         if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
100             error != 0) {
101                 ctdb_tcp_stop_connection(node);
102                 tnode->connect_te = tevent_add_timer(ctdb->ev, tnode,
103                                                     timeval_current_ofs(1, 0),
104                                                     ctdb_tcp_node_connect, node);
105                 return;
106         }
107
108         talloc_free(tnode->connect_fde);
109         tnode->connect_fde = NULL;
110
111         if (setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)) == -1) {
112                 DEBUG(DEBUG_WARNING, ("Failed to set TCP_NODELAY on fd - %s\n",
113                                       strerror(errno)));
114         }
115         if (setsockopt(tnode->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one)) == -1) {
116                 DEBUG(DEBUG_WARNING, ("Failed to set KEEPALIVE on fd - %s\n",
117                                       strerror(errno)));
118         }
119
120         ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
121
122         /* the queue subsystem now owns this fd */
123         tnode->fd = -1;
124 }
125
126
127 /*
128   called when we should try and establish a tcp connection to a node
129 */
130 void ctdb_tcp_node_connect(struct tevent_context *ev, struct tevent_timer *te,
131                            struct timeval t, void *private_data)
132 {
133         struct ctdb_node *node = talloc_get_type(private_data,
134                                                  struct ctdb_node);
135         struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, 
136                                                       struct ctdb_tcp_node);
137         struct ctdb_context *ctdb = node->ctdb;
138         ctdb_sock_addr sock_in;
139         int sockin_size;
140         int sockout_size;
141         ctdb_sock_addr sock_out;
142         int ret;
143
144         ctdb_tcp_stop_connection(node);
145
146         sock_out = node->address;
147
148         tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
149         if (tnode->fd == -1) {
150                 DEBUG(DEBUG_ERR, (__location__ " Failed to create socket\n"));
151                 return;
152         }
153
154         ret = set_blocking(tnode->fd, false);
155         if (ret != 0) {
156                 DEBUG(DEBUG_ERR,
157                       (__location__
158                        " failed to set socket non-blocking (%s)\n",
159                        strerror(errno)));
160                 close(tnode->fd);
161                 tnode->fd = -1;
162                 return;
163         }
164
165         set_close_on_exec(tnode->fd);
166
167         DEBUG(DEBUG_DEBUG, (__location__ " Created TCP SOCKET FD:%d\n", tnode->fd));
168
169         /* Bind our side of the socketpair to the same address we use to listen
170          * on incoming CTDB traffic.
171          * We must specify this address to make sure that the address we expose to
172          * the remote side is actually routable in case CTDB traffic will run on
173          * a dedicated non-routeable network.
174          */
175         sock_in = *ctdb->address;
176
177         /* AIX libs check to see if the socket address and length
178            arguments are consistent with each other on calls like
179            connect().   Can not get by with just sizeof(sock_in),
180            need sizeof(sock_in.ip).
181         */
182         switch (sock_in.sa.sa_family) {
183         case AF_INET:
184                 sock_in.ip.sin_port = 0 /* Any port */;
185                 sockin_size = sizeof(sock_in.ip);
186                 sockout_size = sizeof(sock_out.ip);
187                 break;
188         case AF_INET6:
189                 sock_in.ip6.sin6_port = 0 /* Any port */;
190                 sockin_size = sizeof(sock_in.ip6);
191                 sockout_size = sizeof(sock_out.ip6);
192                 break;
193         default:
194                 DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
195                         sock_in.sa.sa_family));
196                 close(tnode->fd);
197                 tnode->fd = -1;
198                 return;
199         }
200
201         if (bind(tnode->fd, (struct sockaddr *)&sock_in, sockin_size) == -1) {
202                 DEBUG(DEBUG_ERR, (__location__ " Failed to bind socket %s(%d)\n",
203                                   strerror(errno), errno));
204                 close(tnode->fd);
205                 tnode->fd = -1;
206                 return;
207         }
208
209         if (connect(tnode->fd, (struct sockaddr *)&sock_out, sockout_size) != 0 &&
210             errno != EINPROGRESS) {
211                 ctdb_tcp_stop_connection(node);
212                 tnode->connect_te = tevent_add_timer(ctdb->ev, tnode,
213                                                      timeval_current_ofs(1, 0),
214                                                      ctdb_tcp_node_connect, node);
215                 return;
216         }
217
218         /* non-blocking connect - wait for write event */
219         tnode->connect_fde = tevent_add_fd(node->ctdb->ev, tnode, tnode->fd,
220                                            TEVENT_FD_WRITE|TEVENT_FD_READ,
221                                            ctdb_node_connect_write, node);
222
223         /* don't give it long to connect - retry in one second. This ensures
224            that we find a node is up quickly (tcp normally backs off a syn reply
225            delay by quite a lot) */
226         tnode->connect_te = tevent_add_timer(ctdb->ev, tnode,
227                                              timeval_current_ofs(1, 0),
228                                              ctdb_tcp_node_connect, node);
229 }
230
231 /*
232   called when we get contacted by another node
233   currently makes no attempt to check if the connection is really from a ctdb
234   node in our cluster
235 */
236 static void ctdb_listen_event(struct tevent_context *ev, struct tevent_fd *fde,
237                               uint16_t flags, void *private_data)
238 {
239         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
240         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
241         ctdb_sock_addr addr;
242         socklen_t len;
243         int fd, nodeid;
244         struct ctdb_incoming *in;
245         int one = 1;
246         int ret;
247
248         memset(&addr, 0, sizeof(addr));
249         len = sizeof(addr);
250         fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
251         if (fd == -1) return;
252
253         nodeid = ctdb_ip_to_nodeid(ctdb, &addr);
254
255         if (nodeid == -1) {
256                 DEBUG(DEBUG_ERR, ("Refused connection from unknown node %s\n", ctdb_addr_to_str(&addr)));
257                 close(fd);
258                 return;
259         }
260
261         in = talloc_zero(ctcp, struct ctdb_incoming);
262         in->fd = fd;
263         in->ctdb = ctdb;
264
265         ret = set_blocking(in->fd, false);
266         if (ret != 0) {
267                 DEBUG(DEBUG_ERR,
268                       (__location__
269                        " failed to set socket non-blocking (%s)\n",
270                        strerror(errno)));
271                 close(in->fd);
272                 in->fd = -1;
273                 return;
274         }
275
276         set_close_on_exec(in->fd);
277
278         DEBUG(DEBUG_DEBUG, (__location__ " Created SOCKET FD:%d to incoming ctdb connection\n", fd));
279
280         if (setsockopt(in->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one)) == -1) {
281                 DEBUG(DEBUG_WARNING, ("Failed to set KEEPALIVE on fd - %s\n",
282                                       strerror(errno)));
283         }
284
285         in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT,
286                                      ctdb_tcp_read_cb, in, "ctdbd-%s", ctdb_addr_to_str(&addr));
287 }
288
289
290 /*
291   automatically find which address to listen on
292 */
293 static int ctdb_tcp_listen_automatic(struct ctdb_context *ctdb)
294 {
295         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
296                                                 struct ctdb_tcp);
297         ctdb_sock_addr sock;
298         int lock_fd, i;
299         const char *lock_path = CTDB_RUNDIR "/.socket_lock";
300         struct flock lock;
301         int one = 1;
302         int sock_size;
303         struct tevent_fd *fde;
304
305         /* If there are no nodes, then it won't be possible to find
306          * the first one.  Log a failure and short circuit the whole
307          * process.
308          */
309         if (ctdb->num_nodes == 0) {
310                 DEBUG(DEBUG_CRIT,("No nodes available to attempt bind to - is the nodes file empty?\n"));
311                 return -1;
312         }
313
314         /* in order to ensure that we don't get two nodes with the
315            same adddress, we must make the bind() and listen() calls
316            atomic. The SO_REUSEADDR setsockopt only prevents double
317            binds if the first socket is in LISTEN state  */
318         lock_fd = open(lock_path, O_RDWR|O_CREAT, 0666);
319         if (lock_fd == -1) {
320                 DEBUG(DEBUG_CRIT,("Unable to open %s\n", lock_path));
321                 return -1;
322         }
323
324         lock.l_type = F_WRLCK;
325         lock.l_whence = SEEK_SET;
326         lock.l_start = 0;
327         lock.l_len = 1;
328         lock.l_pid = 0;
329
330         if (fcntl(lock_fd, F_SETLKW, &lock) != 0) {
331                 DEBUG(DEBUG_CRIT,("Unable to lock %s\n", lock_path));
332                 close(lock_fd);
333                 return -1;
334         }
335
336         for (i=0; i < ctdb->num_nodes; i++) {
337                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
338                         continue;
339                 }
340                 sock = ctdb->nodes[i]->address;
341
342                 switch (sock.sa.sa_family) {
343                 case AF_INET:
344                         sock_size = sizeof(sock.ip);
345                         break;
346                 case AF_INET6:
347                         sock_size = sizeof(sock.ip6);
348                         break;
349                 default:
350                         DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
351                                 sock.sa.sa_family));
352                         continue;
353                 }
354
355                 ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
356                 if (ctcp->listen_fd == -1) {
357                         ctdb_set_error(ctdb, "socket failed\n");
358                         continue;
359                 }
360
361                 set_close_on_exec(ctcp->listen_fd);
362
363                 if (setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,
364                                (char *)&one,sizeof(one)) == -1) {
365                         DEBUG(DEBUG_WARNING, ("Failed to set REUSEADDR on fd - %s\n",
366                                               strerror(errno)));
367                 }
368
369                 if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) == 0) {
370                         break;
371                 }
372
373                 if (errno == EADDRNOTAVAIL) {
374                         DEBUG(DEBUG_DEBUG,(__location__ " Failed to bind() to socket. %s(%d)\n",
375                                         strerror(errno), errno));
376                 } else {
377                         DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n",
378                                         strerror(errno), errno));
379                 }
380
381                 close(ctcp->listen_fd);
382                 ctcp->listen_fd = -1;
383         }
384
385         if (i == ctdb->num_nodes) {
386                 DEBUG(DEBUG_CRIT,("Unable to bind to any of the node addresses - giving up\n"));
387                 goto failed;
388         }
389         ctdb->address = talloc_memdup(ctdb,
390                                       &ctdb->nodes[i]->address,
391                                       sizeof(ctdb_sock_addr));
392         if (ctdb->address == NULL) {
393                 ctdb_set_error(ctdb, "Out of memory at %s:%d",
394                                __FILE__, __LINE__);
395                 goto failed;
396         }
397
398         ctdb->name = talloc_asprintf(ctdb, "%s:%u",
399                                      ctdb_addr_to_str(ctdb->address),
400                                      ctdb_addr_to_port(ctdb->address));
401         if (ctdb->name == NULL) {
402                 ctdb_set_error(ctdb, "Out of memory at %s:%d",
403                                __FILE__, __LINE__);
404                 goto failed;
405         }
406         DEBUG(DEBUG_INFO,("ctdb chose network address %s\n", ctdb->name));
407
408         if (listen(ctcp->listen_fd, 10) == -1) {
409                 goto failed;
410         }
411
412         fde = tevent_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, TEVENT_FD_READ,
413                             ctdb_listen_event, ctdb);
414         tevent_fd_set_auto_close(fde);
415
416         close(lock_fd);
417
418         return 0;
419
420 failed:
421         close(lock_fd);
422         if (ctcp->listen_fd != -1) {
423                 close(ctcp->listen_fd);
424                 ctcp->listen_fd = -1;
425         }
426         return -1;
427 }
428
429
430 /*
431   listen on our own address
432 */
433 int ctdb_tcp_listen(struct ctdb_context *ctdb)
434 {
435         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
436                                                 struct ctdb_tcp);
437         ctdb_sock_addr sock;
438         int sock_size;
439         int one = 1;
440         struct tevent_fd *fde;
441
442         /* we can either auto-bind to the first available address, or we can
443            use a specified address */
444         if (!ctdb->address) {
445                 return ctdb_tcp_listen_automatic(ctdb);
446         }
447
448         sock = *ctdb->address;
449
450         switch (sock.sa.sa_family) {
451         case AF_INET:
452                 sock_size = sizeof(sock.ip);
453                 break;
454         case AF_INET6:
455                 sock_size = sizeof(sock.ip6);
456                 break;
457         default:
458                 DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
459                         sock.sa.sa_family));
460                 goto failed;
461         }
462
463         ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
464         if (ctcp->listen_fd == -1) {
465                 ctdb_set_error(ctdb, "socket failed\n");
466                 return -1;
467         }
468
469         set_close_on_exec(ctcp->listen_fd);
470
471         if (setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one)) == -1) {
472                 DEBUG(DEBUG_WARNING, ("Failed to set REUSEADDR on fd - %s\n",
473                                       strerror(errno)));
474         }
475
476         if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) != 0) {
477                 DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n", strerror(errno), errno));
478                 goto failed;
479         }
480
481         if (listen(ctcp->listen_fd, 10) == -1) {
482                 goto failed;
483         }
484
485         fde = tevent_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, TEVENT_FD_READ,
486                             ctdb_listen_event, ctdb);
487         tevent_fd_set_auto_close(fde);
488
489         return 0;
490
491 failed:
492         if (ctcp->listen_fd != -1) {
493                 close(ctcp->listen_fd);
494         }
495         ctcp->listen_fd = -1;
496         return -1;
497 }
498