5935bf446655f8182e455f4a10459a11a05e8633
[amitay/samba.git] / ctdb / tcp / tcp_connect.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5    Copyright (C) Ronnie Sahlberg  2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
26 #include "common/system.h"
27 #include "ctdb_tcp.h"
28
29 /*
30   stop any connecting (established or pending) to a node
31  */
32 void ctdb_tcp_stop_connection(struct ctdb_node *node)
33 {
34         struct ctdb_tcp_node *tnode = talloc_get_type(
35                 node->private_data, struct ctdb_tcp_node);
36         
37         ctdb_queue_set_fd(tnode->out_queue, -1);
38         talloc_free(tnode->connect_te);
39         talloc_free(tnode->connect_fde);
40         tnode->connect_fde = NULL;
41         tnode->connect_te = NULL;
42         if (tnode->fd != -1) {
43                 close(tnode->fd);
44                 tnode->fd = -1;
45         }
46 }
47
48
49 /*
50   called when a complete packet has come in - should not happen on this socket
51   unless the other side closes the connection with RST or FIN
52  */
53 void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
54 {
55         struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node);
56         struct ctdb_tcp_node *tnode = talloc_get_type(
57                 node->private_data, struct ctdb_tcp_node);
58
59         if (data == NULL) {
60                 node->ctdb->upcalls->node_dead(node);
61         }
62
63         ctdb_tcp_stop_connection(node);
64         tnode->connect_te = event_add_timed(node->ctdb->ev, tnode,
65                                             timeval_current_ofs(3, 0),
66                                             ctdb_tcp_node_connect, node);
67 }
68
69 /*
70   called when socket becomes writeable on connect
71 */
72 static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, 
73                                     uint16_t flags, void *private_data)
74 {
75         struct ctdb_node *node = talloc_get_type(private_data,
76                                                  struct ctdb_node);
77         struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data,
78                                                       struct ctdb_tcp_node);
79         struct ctdb_context *ctdb = node->ctdb;
80         int error = 0;
81         socklen_t len = sizeof(error);
82         int one = 1;
83
84         talloc_free(tnode->connect_te);
85         tnode->connect_te = NULL;
86
87         if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
88             error != 0) {
89                 ctdb_tcp_stop_connection(node);
90                 tnode->connect_te = event_add_timed(ctdb->ev, tnode, 
91                                                     timeval_current_ofs(1, 0),
92                                                     ctdb_tcp_node_connect, node);
93                 return;
94         }
95
96         talloc_free(tnode->connect_fde);
97         tnode->connect_fde = NULL;
98
99         if (setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)) == -1) {
100                 DEBUG(DEBUG_WARNING, ("Failed to set TCP_NODELAY on fd - %s\n",
101                                       strerror(errno)));
102         }
103         if (setsockopt(tnode->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one)) == -1) {
104                 DEBUG(DEBUG_WARNING, ("Failed to set KEEPALIVE on fd - %s\n",
105                                       strerror(errno)));
106         }
107
108         ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
109
110         /* the queue subsystem now owns this fd */
111         tnode->fd = -1;
112 }
113
114
115 /*
116   called when we should try and establish a tcp connection to a node
117 */
118 void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, 
119                            struct timeval t, void *private_data)
120 {
121         struct ctdb_node *node = talloc_get_type(private_data,
122                                                  struct ctdb_node);
123         struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, 
124                                                       struct ctdb_tcp_node);
125         struct ctdb_context *ctdb = node->ctdb;
126         ctdb_sock_addr sock_in;
127         int sockin_size;
128         int sockout_size;
129         ctdb_sock_addr sock_out;
130
131         ctdb_tcp_stop_connection(node);
132
133         sock_out = node->address;
134
135         tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
136         if (tnode->fd == -1) {
137                 DEBUG(DEBUG_ERR, (__location__ "Failed to create socket\n"));
138                 return;
139         }
140         set_nonblocking(tnode->fd);
141         set_close_on_exec(tnode->fd);
142
143         DEBUG(DEBUG_DEBUG, (__location__ " Created TCP SOCKET FD:%d\n", tnode->fd));
144
145         /* Bind our side of the socketpair to the same address we use to listen
146          * on incoming CTDB traffic.
147          * We must specify this address to make sure that the address we expose to
148          * the remote side is actually routable in case CTDB traffic will run on
149          * a dedicated non-routeable network.
150          */
151         sock_in = *ctdb->address;
152
153         /* AIX libs check to see if the socket address and length
154            arguments are consistent with each other on calls like
155            connect().   Can not get by with just sizeof(sock_in),
156            need sizeof(sock_in.ip).
157         */
158         switch (sock_in.sa.sa_family) {
159         case AF_INET:
160                 sock_in.ip.sin_port = 0 /* Any port */;
161                 sockin_size = sizeof(sock_in.ip);
162                 sockout_size = sizeof(sock_out.ip);
163                 break;
164         case AF_INET6:
165                 sock_in.ip6.sin6_port = 0 /* Any port */;
166                 sockin_size = sizeof(sock_in.ip6);
167                 sockout_size = sizeof(sock_out.ip6);
168                 break;
169         default:
170                 DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
171                         sock_in.sa.sa_family));
172                 close(tnode->fd);
173                 return;
174         }
175
176         if (bind(tnode->fd, (struct sockaddr *)&sock_in, sockin_size) == -1) {
177                 DEBUG(DEBUG_ERR, (__location__ "Failed to bind socket %s(%d)\n",
178                                   strerror(errno), errno));
179                 close(tnode->fd);
180                 return;
181         }
182
183         if (connect(tnode->fd, (struct sockaddr *)&sock_out, sockout_size) != 0 &&
184             errno != EINPROGRESS) {
185                 ctdb_tcp_stop_connection(node);
186                 tnode->connect_te = event_add_timed(ctdb->ev, tnode, 
187                                                     timeval_current_ofs(1, 0),
188                                                     ctdb_tcp_node_connect, node);
189                 return;
190         }
191
192         /* non-blocking connect - wait for write event */
193         tnode->connect_fde = event_add_fd(node->ctdb->ev, tnode, tnode->fd, 
194                                           EVENT_FD_WRITE|EVENT_FD_READ, 
195                                           ctdb_node_connect_write, node);
196
197         /* don't give it long to connect - retry in one second. This ensures
198            that we find a node is up quickly (tcp normally backs off a syn reply
199            delay by quite a lot) */
200         tnode->connect_te = event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0), 
201                                             ctdb_tcp_node_connect, node);
202 }
203
204 /*
205   called when we get contacted by another node
206   currently makes no attempt to check if the connection is really from a ctdb
207   node in our cluster
208 */
209 static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, 
210                               uint16_t flags, void *private_data)
211 {
212         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
213         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
214         ctdb_sock_addr addr;
215         socklen_t len;
216         int fd, nodeid;
217         struct ctdb_incoming *in;
218         int one = 1;
219
220         memset(&addr, 0, sizeof(addr));
221         len = sizeof(addr);
222         fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
223         if (fd == -1) return;
224
225         nodeid = ctdb_ip_to_nodeid(ctdb, &addr);
226
227         if (nodeid == -1) {
228                 DEBUG(DEBUG_ERR, ("Refused connection from unknown node %s\n", ctdb_addr_to_str(&addr)));
229                 close(fd);
230                 return;
231         }
232
233         in = talloc_zero(ctcp, struct ctdb_incoming);
234         in->fd = fd;
235         in->ctdb = ctdb;
236
237         set_nonblocking(in->fd);
238         set_close_on_exec(in->fd);
239
240         DEBUG(DEBUG_DEBUG, (__location__ " Created SOCKET FD:%d to incoming ctdb connection\n", fd));
241
242         if (setsockopt(in->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one)) == -1) {
243                 DEBUG(DEBUG_WARNING, ("Failed to set KEEPALIVE on fd - %s\n",
244                                       strerror(errno)));
245         }
246
247         in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT,
248                                      ctdb_tcp_read_cb, in, "ctdbd-%s", ctdb_addr_to_str(&addr));
249 }
250
251
252 /*
253   automatically find which address to listen on
254 */
255 static int ctdb_tcp_listen_automatic(struct ctdb_context *ctdb)
256 {
257         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
258                                                 struct ctdb_tcp);
259         ctdb_sock_addr sock;
260         int lock_fd, i;
261         const char *lock_path = CTDB_RUNDIR "/.socket_lock";
262         struct flock lock;
263         int one = 1;
264         int sock_size;
265         struct tevent_fd *fde;
266
267         /* If there are no nodes, then it won't be possible to find
268          * the first one.  Log a failure and short circuit the whole
269          * process.
270          */
271         if (ctdb->num_nodes == 0) {
272                 DEBUG(DEBUG_CRIT,("No nodes available to attempt bind to - is the nodes file empty?\n"));
273                 return -1;
274         }
275
276         /* in order to ensure that we don't get two nodes with the
277            same adddress, we must make the bind() and listen() calls
278            atomic. The SO_REUSEADDR setsockopt only prevents double
279            binds if the first socket is in LISTEN state  */
280         lock_fd = open(lock_path, O_RDWR|O_CREAT, 0666);
281         if (lock_fd == -1) {
282                 DEBUG(DEBUG_CRIT,("Unable to open %s\n", lock_path));
283                 return -1;
284         }
285
286         lock.l_type = F_WRLCK;
287         lock.l_whence = SEEK_SET;
288         lock.l_start = 0;
289         lock.l_len = 1;
290         lock.l_pid = 0;
291
292         if (fcntl(lock_fd, F_SETLKW, &lock) != 0) {
293                 DEBUG(DEBUG_CRIT,("Unable to lock %s\n", lock_path));
294                 close(lock_fd);
295                 return -1;
296         }
297
298         for (i=0; i < ctdb->num_nodes; i++) {
299                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
300                         continue;
301                 }
302                 sock = ctdb->nodes[i]->address;
303
304                 switch (sock.sa.sa_family) {
305                 case AF_INET:
306                         sock_size = sizeof(sock.ip);
307                         break;
308                 case AF_INET6:
309                         sock_size = sizeof(sock.ip6);
310                         break;
311                 default:
312                         DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
313                                 sock.sa.sa_family));
314                         continue;
315                 }
316
317                 ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
318                 if (ctcp->listen_fd == -1) {
319                         ctdb_set_error(ctdb, "socket failed\n");
320                         continue;
321                 }
322
323                 set_close_on_exec(ctcp->listen_fd);
324
325                 if (setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,
326                                (char *)&one,sizeof(one)) == -1) {
327                         DEBUG(DEBUG_WARNING, ("Failed to set REUSEADDR on fd - %s\n",
328                                               strerror(errno)));
329                 }
330
331                 if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) == 0) {
332                         break;
333                 }
334
335                 if (errno == EADDRNOTAVAIL) {
336                         DEBUG(DEBUG_DEBUG,(__location__ " Failed to bind() to socket. %s(%d)\n",
337                                         strerror(errno), errno));
338                 } else {
339                         DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n",
340                                         strerror(errno), errno));
341                 }
342
343                 close(ctcp->listen_fd);
344                 ctcp->listen_fd = -1;
345         }
346
347         if (i == ctdb->num_nodes) {
348                 DEBUG(DEBUG_CRIT,("Unable to bind to any of the node addresses - giving up\n"));
349                 goto failed;
350         }
351         ctdb->address = talloc_memdup(ctdb,
352                                       &ctdb->nodes[i]->address,
353                                       sizeof(ctdb_sock_addr));
354         if (ctdb->address == NULL) {
355                 ctdb_set_error(ctdb, "Out of memory at %s:%d",
356                                __FILE__, __LINE__);
357                 goto failed;
358         }
359
360         ctdb->name = talloc_asprintf(ctdb, "%s:%u",
361                                      ctdb_addr_to_str(ctdb->address),
362                                      ctdb_addr_to_port(ctdb->address));
363         if (ctdb->name == NULL) {
364                 ctdb_set_error(ctdb, "Out of memory at %s:%d",
365                                __FILE__, __LINE__);
366                 goto failed;
367         }
368         DEBUG(DEBUG_INFO,("ctdb chose network address %s\n", ctdb->name));
369
370         if (listen(ctcp->listen_fd, 10) == -1) {
371                 goto failed;
372         }
373
374         fde = event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ,
375                            ctdb_listen_event, ctdb);
376         tevent_fd_set_auto_close(fde);
377
378         close(lock_fd);
379
380         return 0;
381
382 failed:
383         close(lock_fd);
384         if (ctcp->listen_fd != -1) {
385                 close(ctcp->listen_fd);
386                 ctcp->listen_fd = -1;
387         }
388         return -1;
389 }
390
391
392 /*
393   listen on our own address
394 */
395 int ctdb_tcp_listen(struct ctdb_context *ctdb)
396 {
397         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
398                                                 struct ctdb_tcp);
399         ctdb_sock_addr sock;
400         int sock_size;
401         int one = 1;
402         struct tevent_fd *fde;
403
404         /* we can either auto-bind to the first available address, or we can
405            use a specified address */
406         if (!ctdb->address) {
407                 return ctdb_tcp_listen_automatic(ctdb);
408         }
409
410         sock = *ctdb->address;
411
412         switch (sock.sa.sa_family) {
413         case AF_INET:
414                 sock_size = sizeof(sock.ip);
415                 break;
416         case AF_INET6:
417                 sock_size = sizeof(sock.ip6);
418                 break;
419         default:
420                 DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
421                         sock.sa.sa_family));
422                 goto failed;
423         }
424
425         ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
426         if (ctcp->listen_fd == -1) {
427                 ctdb_set_error(ctdb, "socket failed\n");
428                 return -1;
429         }
430
431         set_close_on_exec(ctcp->listen_fd);
432
433         if (setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one)) == -1) {
434                 DEBUG(DEBUG_WARNING, ("Failed to set REUSEADDR on fd - %s\n",
435                                       strerror(errno)));
436         }
437
438         if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) != 0) {
439                 DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n", strerror(errno), errno));
440                 goto failed;
441         }
442
443         if (listen(ctcp->listen_fd, 10) == -1) {
444                 goto failed;
445         }
446
447         fde = event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ,
448                      ctdb_listen_event, ctdb);  
449         tevent_fd_set_auto_close(fde);
450
451         return 0;
452
453 failed:
454         if (ctcp->listen_fd != -1) {
455                 close(ctcp->listen_fd);
456         }
457         ctcp->listen_fd = -1;
458         return -1;
459 }
460