ctdb over TCP
Copyright (C) Andrew Tridgell 2006
+ Copyright (C) Ronnie Sahlberg 2008
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
+ the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
#include "lib/tdb/include/tdb.h"
#include "system/network.h"
#include "system/filesys.h"
#include "../include/ctdb_private.h"
#include "ctdb_tcp.h"
+/*
+ stop any connecting (established or pending) to a node
+ */
+void ctdb_tcp_stop_connection(struct ctdb_node *node)
+{
+ struct ctdb_tcp_node *tnode = talloc_get_type(
+ node->private_data, struct ctdb_tcp_node);
+
+ ctdb_queue_set_fd(tnode->out_queue, -1);
+ talloc_free(tnode->connect_te);
+ talloc_free(tnode->connect_fde);
+ tnode->connect_fde = NULL;
+ tnode->connect_te = NULL;
+ if (tnode->fd != -1) {
+ close(tnode->fd);
+ tnode->fd = -1;
+ }
+}
+
+
/*
called when a complete packet has come in - should not happen on this socket
+ unless the other side closes the connection with RST or FIN
*/
void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
{
node->ctdb->upcalls->node_dead(node);
}
- /* start a new connect cycle to try to re-establish the
- link */
- ctdb_queue_set_fd(tnode->queue, -1);
- tnode->fd = -1;
- event_add_timed(node->ctdb->ev, tnode, timeval_zero(),
- ctdb_tcp_node_connect, node);
+ ctdb_tcp_stop_connection(node);
+ tnode->connect_te = event_add_timed(node->ctdb->ev, tnode,
+ timeval_current_ofs(3, 0),
+ ctdb_tcp_node_connect, node);
}
/*
if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
error != 0) {
- talloc_free(fde);
- close(tnode->fd);
- tnode->fd = -1;
- event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0),
- ctdb_tcp_node_connect, node);
+ ctdb_tcp_stop_connection(node);
+ tnode->connect_te = event_add_timed(ctdb->ev, tnode,
+ timeval_current_ofs(1, 0),
+ ctdb_tcp_node_connect, node);
return;
}
- talloc_free(fde);
-
+ talloc_free(tnode->connect_fde);
+ tnode->connect_fde = NULL;
+
setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
setsockopt(tnode->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
- ctdb_queue_set_fd(tnode->queue, tnode->fd);
+ ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
- /* tell the ctdb layer we are connected */
- node->ctdb->upcalls->node_connected(node);
+ /* the queue subsystem now owns this fd */
+ tnode->fd = -1;
}
static int ctdb_tcp_get_address(struct ctdb_context *ctdb,
- const char *address, struct in_addr *addr)
+ const char *address, ctdb_sock_addr *addr)
{
- if (inet_pton(AF_INET, address, addr) <= 0) {
- struct hostent *he = gethostbyname(address);
- if (he == NULL || he->h_length > sizeof(*addr)) {
- ctdb_set_error(ctdb, "invalid nework address '%s'\n",
- address);
- return -1;
- }
- memcpy(addr, he->h_addr, he->h_length);
+ if (parse_ip(address, NULL, 0, addr) == 0) {
+ DEBUG(DEBUG_CRIT, (__location__ " Unparsable address : %s.\n", address));
+ return -1;
}
return 0;
}
struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data,
struct ctdb_tcp_node);
struct ctdb_context *ctdb = node->ctdb;
- struct sockaddr_in sock_in;
- struct sockaddr_in sock_out;
+ ctdb_sock_addr sock_in;
+ int sockin_size;
+ int sockout_size;
+ ctdb_sock_addr sock_out;
- if (tnode->fd != -1) {
- talloc_free(tnode->connect_fde);
- tnode->connect_fde = NULL;
- close(tnode->fd);
- tnode->fd = -1;
- }
-
- tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
-
- set_nonblocking(tnode->fd);
- set_close_on_exec(tnode->fd);
+ ctdb_tcp_stop_connection(node);
ZERO_STRUCT(sock_out);
#ifdef HAVE_SOCK_SIN_LEN
- sock_out.sin_len = sizeof(sock_out);
+ sock_out.ip.sin_len = sizeof(sock_out);
#endif
- if (ctdb_tcp_get_address(ctdb, node->address.address, &sock_out.sin_addr) != 0) {
+ if (ctdb_tcp_get_address(ctdb, node->address.address, &sock_out) != 0) {
+ return;
+ }
+ switch (sock_out.sa.sa_family) {
+ case AF_INET:
+ sock_out.ip.sin_port = htons(node->address.port);
+ break;
+ case AF_INET6:
+ sock_out.ip6.sin6_port = htons(node->address.port);
+ break;
+ default:
+ DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
+ sock_out.sa.sa_family));
return;
}
- sock_out.sin_port = htons(node->address.port);
- sock_out.sin_family = PF_INET;
+ tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
+ set_nonblocking(tnode->fd);
+ set_close_on_exec(tnode->fd);
+
+ DEBUG(DEBUG_DEBUG, (__location__ " Created TCP SOCKET FD:%d\n", tnode->fd));
/* Bind our side of the socketpair to the same address we use to listen
* on incoming CTDB traffic.
* a dedicated non-routeable network.
*/
ZERO_STRUCT(sock_in);
-#ifdef HAVE_SOCK_SIN_LEN
- sock_in.sin_len = sizeof(sock_in);
-#endif
- if (ctdb_tcp_get_address(ctdb, ctdb->address.address, &sock_in.sin_addr) != 0) {
+ if (ctdb_tcp_get_address(ctdb, ctdb->address.address, &sock_in) != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " Failed to find our address. Failing bind.\n"));
+ close(tnode->fd);
return;
}
- sock_in.sin_port = htons(0); /* INPORT_ANY is not always available */
- sock_in.sin_family = PF_INET;
- bind(tnode->fd, (struct sockaddr *)&sock_in, sizeof(sock_in));
- if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 &&
- errno != EINPROGRESS) {
- /* try again once a second */
+ /* AIX libs check to see if the socket address and length
+ arguments are consistent with each other on calls like
+ connect(). Can not get by with just sizeof(sock_in),
+ need sizeof(sock_in.ip).
+ */
+ switch (sock_in.sa.sa_family) {
+ case AF_INET:
+ sockin_size = sizeof(sock_in.ip);
+ sockout_size = sizeof(sock_out.ip);
+ break;
+ case AF_INET6:
+ sockin_size = sizeof(sock_in.ip6);
+ sockout_size = sizeof(sock_out.ip6);
+ break;
+ default:
+ DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
+ sock_in.sa.sa_family));
close(tnode->fd);
- tnode->fd = -1;
- event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0),
- ctdb_tcp_node_connect, node);
+ return;
+ }
+#ifdef HAVE_SOCK_SIN_LEN
+ sock_in.ip.sin_len = sockin_size;
+ sock_out.ip.sin_len = sockout_size;
+#endif
+ bind(tnode->fd, (struct sockaddr *)&sock_in, sockin_size);
+
+ if (connect(tnode->fd, (struct sockaddr *)&sock_out, sockout_size) != 0 &&
+ errno != EINPROGRESS) {
+ ctdb_tcp_stop_connection(node);
+ tnode->connect_te = event_add_timed(ctdb->ev, tnode,
+ timeval_current_ofs(1, 0),
+ ctdb_tcp_node_connect, node);
return;
}
{
struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
- struct sockaddr_in addr;
+ ctdb_sock_addr addr;
socklen_t len;
- int fd;
+ int fd, nodeid;
struct ctdb_incoming *in;
int one = 1;
+ const char *incoming_node;
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);
fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
if (fd == -1) return;
+ incoming_node = ctdb_addr_to_str(&addr);
+ nodeid = ctdb_ip_to_nodeid(ctdb, incoming_node);
+
+ if (nodeid == -1) {
+ DEBUG(DEBUG_ERR, ("Refused connection from unknown node %s\n", incoming_node));
+ close(fd);
+ return;
+ }
+
in = talloc_zero(ctcp, struct ctdb_incoming);
in->fd = fd;
in->ctdb = ctdb;
set_nonblocking(in->fd);
set_close_on_exec(in->fd);
+ DEBUG(DEBUG_DEBUG, (__location__ " Created SOCKET FD:%d to incoming ctdb connection\n", fd));
+
setsockopt(in->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT,
- ctdb_tcp_read_cb, in);
+ ctdb_tcp_read_cb, in, "ctdbd-%s", incoming_node);
}
{
struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
struct ctdb_tcp);
- struct sockaddr_in sock;
+ ctdb_sock_addr sock;
int lock_fd, i;
const char *lock_path = "/tmp/.ctdb_socket_lock";
struct flock lock;
+ int one = 1;
+ int sock_size;
+ struct tevent_fd *fde;
/* in order to ensure that we don't get two nodes with the
same adddress, we must make the bind() and listen() calls
binds if the first socket is in LISTEN state */
lock_fd = open(lock_path, O_RDWR|O_CREAT, 0666);
if (lock_fd == -1) {
- DEBUG(0,("Unable to open %s\n", lock_path));
+ DEBUG(DEBUG_CRIT,("Unable to open %s\n", lock_path));
return -1;
}
lock.l_pid = 0;
if (fcntl(lock_fd, F_SETLKW, &lock) != 0) {
- DEBUG(0,("Unable to lock %s\n", lock_path));
+ DEBUG(DEBUG_CRIT,("Unable to lock %s\n", lock_path));
close(lock_fd);
return -1;
}
- for (i=0;i<ctdb->num_nodes;i++) {
+ for (i=0; i < ctdb->num_nodes; i++) {
+ if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
+ continue;
+ }
+
+ /* if node_ip is specified we will only try to bind to that
+ ip.
+ */
+ if (ctdb->node_ip != NULL) {
+ if (strcmp(ctdb->node_ip, ctdb->nodes[i]->address.address)) {
+ continue;
+ }
+ }
+
ZERO_STRUCT(sock);
+ if (ctdb_tcp_get_address(ctdb,
+ ctdb->nodes[i]->address.address,
+ &sock) != 0) {
+ continue;
+ }
+
+ switch (sock.sa.sa_family) {
+ case AF_INET:
+ sock.ip.sin_port = htons(ctdb->nodes[i]->address.port);
+ sock_size = sizeof(sock.ip);
+ break;
+ case AF_INET6:
+ sock.ip6.sin6_port = htons(ctdb->nodes[i]->address.port);
+ sock_size = sizeof(sock.ip6);
+ break;
+ default:
+ DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
+ sock.sa.sa_family));
+ continue;
+ }
#ifdef HAVE_SOCK_SIN_LEN
- sock.sin_len = sizeof(sock);
+ sock.ip.sin_len = sock_size;
#endif
- sock.sin_port = htons(ctdb->nodes[i]->address.port);
- sock.sin_family = PF_INET;
- if (ctdb_tcp_get_address(ctdb, ctdb->nodes[i]->address.address,
- &sock.sin_addr) != 0) {
+
+ ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
+ if (ctcp->listen_fd == -1) {
+ ctdb_set_error(ctdb, "socket failed\n");
continue;
}
-
- if (bind(ctcp->listen_fd, (struct sockaddr * )&sock,
- sizeof(sock)) == 0) {
+
+ set_close_on_exec(ctcp->listen_fd);
+
+ setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
+
+ if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) == 0) {
break;
}
+
+ if (errno == EADDRNOTAVAIL) {
+ DEBUG(DEBUG_DEBUG,(__location__ " Failed to bind() to socket. %s(%d)\n",
+ strerror(errno), errno));
+ } else {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n",
+ strerror(errno), errno));
+ }
}
if (i == ctdb->num_nodes) {
- DEBUG(0,("Unable to bind to any of the node addresses - giving up\n"));
+ DEBUG(DEBUG_CRIT,("Unable to bind to any of the node addresses - giving up\n"));
goto failed;
}
- ctdb->address = ctdb->nodes[i]->address;
+ ctdb->address.address = talloc_strdup(ctdb, ctdb->nodes[i]->address.address);
+ ctdb->address.port = ctdb->nodes[i]->address.port;
ctdb->name = talloc_asprintf(ctdb, "%s:%u",
ctdb->address.address,
ctdb->address.port);
- ctdb->vnn = ctdb->nodes[i]->vnn;
+ ctdb->pnn = ctdb->nodes[i]->pnn;
ctdb->nodes[i]->flags &= ~NODE_FLAGS_DISCONNECTED;
- DEBUG(1,("ctdb chose network address %s:%u vnn %u\n",
+ DEBUG(DEBUG_INFO,("ctdb chose network address %s:%u pnn %u\n",
ctdb->address.address,
ctdb->address.port,
- ctdb->vnn));
-
+ ctdb->pnn));
+ /* do we start out in DISABLED mode? */
+ if (ctdb->start_as_disabled != 0) {
+ DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
+ ctdb->nodes[i]->flags |= NODE_FLAGS_DISABLED;
+ }
+ /* do we start out in STOPPED mode? */
+ if (ctdb->start_as_stopped != 0) {
+ DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
+ ctdb->nodes[i]->flags |= NODE_FLAGS_STOPPED;
+ }
+
if (listen(ctcp->listen_fd, 10) == -1) {
goto failed;
}
- event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
- ctdb_listen_event, ctdb);
+ fde = event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ,
+ ctdb_listen_event, ctdb);
+ tevent_fd_set_auto_close(fde);
close(lock_fd);
return 0;
{
struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
struct ctdb_tcp);
- struct sockaddr_in sock;
+ ctdb_sock_addr sock;
+ int sock_size;
int one = 1;
-
- ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (ctcp->listen_fd == -1) {
- ctdb_set_error(ctdb, "socket failed\n");
- return -1;
- }
-
- set_close_on_exec(ctcp->listen_fd);
-
- setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
+ struct tevent_fd *fde;
/* we can either auto-bind to the first available address, or we can
use a specified address */
}
ZERO_STRUCT(sock);
-#ifdef HAVE_SOCK_SIN_LEN
- sock.sin_len = sizeof(sock);
-#endif
- sock.sin_port = htons(ctdb->address.port);
- sock.sin_family = PF_INET;
-
if (ctdb_tcp_get_address(ctdb, ctdb->address.address,
- &sock.sin_addr) != 0) {
+ &sock) != 0) {
goto failed;
}
- if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
+ switch (sock.sa.sa_family) {
+ case AF_INET:
+ sock.ip.sin_port = htons(ctdb->address.port);
+ sock_size = sizeof(sock.ip);
+ break;
+ case AF_INET6:
+ sock.ip6.sin6_port = htons(ctdb->address.port);
+ sock_size = sizeof(sock.ip6);
+ break;
+ default:
+ DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
+ sock.sa.sa_family));
+ goto failed;
+ }
+#ifdef HAVE_SOCK_SIN_LEN
+ sock.ip.sin_len = sock_size;
+#endif
+
+ ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
+ if (ctcp->listen_fd == -1) {
+ ctdb_set_error(ctdb, "socket failed\n");
+ return -1;
+ }
+
+ set_close_on_exec(ctcp->listen_fd);
+
+ setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
+
+ if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n", strerror(errno), errno));
goto failed;
}
goto failed;
}
- event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
+ fde = event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ,
ctdb_listen_event, ctdb);
+ tevent_fd_set_auto_close(fde);
return 0;
failed:
- close(ctcp->listen_fd);
+ if (ctcp->listen_fd != -1) {
+ close(ctcp->listen_fd);
+ }
ctcp->listen_fd = -1;
return -1;
}