We default to non-deterministic ip now where ips are "sticky" and dont change
[sahlberg/ctdb.git] / tcp / tcp_connect.c
index 79717845f51470afcda609fc1a343b46de044386..d7a0b3368773ff05d7dc5f1ed8b8c4dff5cd087b 100644 (file)
@@ -2,10 +2,11 @@
    ctdb over TCP
 
    Copyright (C) Andrew Tridgell  2006
+   Copyright (C) Ronnie Sahlberg  2008
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/tdb/include/tdb.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "../include/ctdb_private.h"
 #include "ctdb_tcp.h"
 
+/*
+  stop any connecting (established or pending) to a node
+ */
+void ctdb_tcp_stop_connection(struct ctdb_node *node)
+{
+       struct ctdb_tcp_node *tnode = talloc_get_type(
+               node->private_data, struct ctdb_tcp_node);
+       
+       ctdb_queue_set_fd(tnode->out_queue, -1);
+       talloc_free(tnode->connect_te);
+       talloc_free(tnode->connect_fde);
+       tnode->connect_fde = NULL;
+       tnode->connect_te = NULL;
+       if (tnode->fd != -1) {
+               close(tnode->fd);
+               tnode->fd = -1;
+       }
+}
+
+
 /*
   called when a complete packet has come in - should not happen on this socket
+  unless the other side closes the connection with RST or FIN
  */
 void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
 {
@@ -39,12 +60,10 @@ void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
                node->ctdb->upcalls->node_dead(node);
        }
 
-       /* start a new connect cycle to try to re-establish the
-          link */
-       ctdb_queue_set_fd(tnode->queue, -1);
-       tnode->fd = -1;
-       event_add_timed(node->ctdb->ev, tnode, timeval_zero(), 
-                       ctdb_tcp_node_connect, node);
+       ctdb_tcp_stop_connection(node);
+       tnode->connect_te = event_add_timed(node->ctdb->ev, tnode,
+                                           timeval_current_ofs(3, 0),
+                                           ctdb_tcp_node_connect, node);
 }
 
 /*
@@ -67,37 +86,32 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
 
        if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
            error != 0) {
-               talloc_free(fde);
-               close(tnode->fd);
-               tnode->fd = -1;
-               event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0), 
-                               ctdb_tcp_node_connect, node);
+               ctdb_tcp_stop_connection(node);
+               tnode->connect_te = event_add_timed(ctdb->ev, tnode, 
+                                                   timeval_current_ofs(1, 0),
+                                                   ctdb_tcp_node_connect, node);
                return;
        }
 
-       talloc_free(fde);
-       
+       talloc_free(tnode->connect_fde);
+       tnode->connect_fde = NULL;
+
         setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
         setsockopt(tnode->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
 
-       ctdb_queue_set_fd(tnode->queue, tnode->fd);
+       ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
 
-       /* tell the ctdb layer we are connected */
-       node->ctdb->upcalls->node_connected(node);
+       /* the queue subsystem now owns this fd */
+       tnode->fd = -1;
 }
 
 
 static int ctdb_tcp_get_address(struct ctdb_context *ctdb,
-                               const char *address, struct in_addr *addr)
+                               const char *address, ctdb_sock_addr *addr)
 {
-       if (inet_pton(AF_INET, address, addr) <= 0) {
-               struct hostent *he = gethostbyname(address);
-               if (he == NULL || he->h_length > sizeof(*addr)) {
-                       ctdb_set_error(ctdb, "invalid nework address '%s'\n", 
-                                      address);
-                       return -1;
-               }
-               memcpy(addr, he->h_addr, he->h_length);
+       if (parse_ip(address, NULL, 0, addr) == 0) {
+               DEBUG(DEBUG_CRIT, (__location__ " Unparsable address : %s.\n", address));
+               return -1;
        }
        return 0;
 }
@@ -113,31 +127,38 @@ void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
        struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, 
                                                      struct ctdb_tcp_node);
        struct ctdb_context *ctdb = node->ctdb;
-        struct sockaddr_in sock_in;
-        struct sockaddr_in sock_out;
+        ctdb_sock_addr sock_in;
+       int sockin_size;
+       int sockout_size;
+        ctdb_sock_addr sock_out;
 
-       if (tnode->fd != -1) {
-               talloc_free(tnode->connect_fde);
-               tnode->connect_fde = NULL;
-               close(tnode->fd);
-               tnode->fd = -1;
-       }
-
-       tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
-
-       set_nonblocking(tnode->fd);
-       set_close_on_exec(tnode->fd);
+       ctdb_tcp_stop_connection(node);
 
        ZERO_STRUCT(sock_out);
 #ifdef HAVE_SOCK_SIN_LEN
-       sock_out.sin_len = sizeof(sock_out);
+       sock_out.ip.sin_len = sizeof(sock_out);
 #endif
-       if (ctdb_tcp_get_address(ctdb, node->address.address, &sock_out.sin_addr) != 0) {
+       if (ctdb_tcp_get_address(ctdb, node->address.address, &sock_out) != 0) {
+               return;
+       }
+       switch (sock_out.sa.sa_family) {
+       case AF_INET:
+               sock_out.ip.sin_port = htons(node->address.port);
+               break;
+       case AF_INET6:
+               sock_out.ip6.sin6_port = htons(node->address.port);
+               break;
+       default:
+               DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
+                       sock_out.sa.sa_family));
                return;
        }
-       sock_out.sin_port = htons(node->address.port);
-       sock_out.sin_family = PF_INET;
 
+       tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
+       set_nonblocking(tnode->fd);
+       set_close_on_exec(tnode->fd);
+
+       DEBUG(DEBUG_DEBUG, (__location__ " Created TCP SOCKET FD:%d\n", tnode->fd));
 
        /* Bind our side of the socketpair to the same address we use to listen
         * on incoming CTDB traffic.
@@ -146,23 +167,44 @@ void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
         * a dedicated non-routeable network.
         */
        ZERO_STRUCT(sock_in);
-#ifdef HAVE_SOCK_SIN_LEN
-       sock_in.sin_len = sizeof(sock_in);
-#endif
-       if (ctdb_tcp_get_address(ctdb, ctdb->address.address, &sock_in.sin_addr) != 0) {
+       if (ctdb_tcp_get_address(ctdb, ctdb->address.address, &sock_in) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to find our address. Failing bind.\n"));
+               close(tnode->fd);
                return;
        }
-       sock_in.sin_port = htons(0); /* INPORT_ANY is not always available */
-       sock_in.sin_family = PF_INET;
-       bind(tnode->fd, (struct sockaddr *)&sock_in, sizeof(sock_in));
 
-       if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 &&
-           errno != EINPROGRESS) {
-               /* try again once a second */
+       /* AIX libs check to see if the socket address and length
+          arguments are consistent with each other on calls like
+          connect().   Can not get by with just sizeof(sock_in),
+          need sizeof(sock_in.ip).
+       */
+       switch (sock_in.sa.sa_family) {
+       case AF_INET:
+               sockin_size = sizeof(sock_in.ip);
+               sockout_size = sizeof(sock_out.ip);
+               break;
+       case AF_INET6:
+               sockin_size = sizeof(sock_in.ip6);
+               sockout_size = sizeof(sock_out.ip6);
+               break;
+       default:
+               DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
+                       sock_in.sa.sa_family));
                close(tnode->fd);
-               tnode->fd = -1;
-               event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0), 
-                               ctdb_tcp_node_connect, node);
+               return;
+       }
+#ifdef HAVE_SOCK_SIN_LEN
+       sock_in.ip.sin_len = sockin_size;
+       sock_out.ip.sin_len = sockout_size;
+#endif
+       bind(tnode->fd, (struct sockaddr *)&sock_in, sockin_size);
+
+       if (connect(tnode->fd, (struct sockaddr *)&sock_out, sockout_size) != 0 &&
+           errno != EINPROGRESS) {
+               ctdb_tcp_stop_connection(node);
+               tnode->connect_te = event_add_timed(ctdb->ev, tnode, 
+                                                   timeval_current_ofs(1, 0),
+                                                   ctdb_tcp_node_connect, node);
                return;
        }
 
@@ -188,17 +230,27 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
 {
        struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
        struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
-       struct sockaddr_in addr;
+       ctdb_sock_addr addr;
        socklen_t len;
-       int fd;
+       int fd, nodeid;
        struct ctdb_incoming *in;
        int one = 1;
+       const char *incoming_node;
 
        memset(&addr, 0, sizeof(addr));
        len = sizeof(addr);
        fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
        if (fd == -1) return;
 
+       incoming_node = ctdb_addr_to_str(&addr);
+       nodeid = ctdb_ip_to_nodeid(ctdb, incoming_node);
+
+       if (nodeid == -1) {
+               DEBUG(DEBUG_ERR, ("Refused connection from unknown node %s\n", incoming_node));
+               close(fd);
+               return;
+       }
+
        in = talloc_zero(ctcp, struct ctdb_incoming);
        in->fd = fd;
        in->ctdb = ctdb;
@@ -206,10 +258,12 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
        set_nonblocking(in->fd);
        set_close_on_exec(in->fd);
 
+       DEBUG(DEBUG_DEBUG, (__location__ " Created SOCKET FD:%d to incoming ctdb connection\n", fd));
+
         setsockopt(in->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
 
        in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT, 
-                                    ctdb_tcp_read_cb, in);
+                                    ctdb_tcp_read_cb, in, "ctdbd-%s", incoming_node);
 }
 
 
@@ -220,10 +274,13 @@ static int ctdb_tcp_listen_automatic(struct ctdb_context *ctdb)
 {
        struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
                                                struct ctdb_tcp);
-        struct sockaddr_in sock;
+        ctdb_sock_addr sock;
        int lock_fd, i;
        const char *lock_path = "/tmp/.ctdb_socket_lock";
        struct flock lock;
+       int one = 1;
+       int sock_size;
+       struct tevent_fd *fde;
 
        /* in order to ensure that we don't get two nodes with the
           same adddress, we must make the bind() and listen() calls
@@ -231,7 +288,7 @@ static int ctdb_tcp_listen_automatic(struct ctdb_context *ctdb)
           binds if the first socket is in LISTEN state  */
        lock_fd = open(lock_path, O_RDWR|O_CREAT, 0666);
        if (lock_fd == -1) {
-               DEBUG(0,("Unable to open %s\n", lock_path));
+               DEBUG(DEBUG_CRIT,("Unable to open %s\n", lock_path));
                return -1;
        }
 
@@ -242,50 +299,106 @@ static int ctdb_tcp_listen_automatic(struct ctdb_context *ctdb)
        lock.l_pid = 0;
 
        if (fcntl(lock_fd, F_SETLKW, &lock) != 0) {
-               DEBUG(0,("Unable to lock %s\n", lock_path));
+               DEBUG(DEBUG_CRIT,("Unable to lock %s\n", lock_path));
                close(lock_fd);
                return -1;
        }
 
-       for (i=0;i<ctdb->num_nodes;i++) {
+       for (i=0; i < ctdb->num_nodes; i++) {
+               if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
+                       continue;
+               }
+
+               /* if node_ip is specified we will only try to bind to that
+                  ip.
+               */
+               if (ctdb->node_ip != NULL) {
+                       if (strcmp(ctdb->node_ip, ctdb->nodes[i]->address.address)) {
+                               continue;
+                       }
+               }
+
                ZERO_STRUCT(sock);
+               if (ctdb_tcp_get_address(ctdb,
+                               ctdb->nodes[i]->address.address, 
+                               &sock) != 0) {
+                       continue;
+               }
+       
+               switch (sock.sa.sa_family) {
+               case AF_INET:
+                       sock.ip.sin_port = htons(ctdb->nodes[i]->address.port);
+                       sock_size = sizeof(sock.ip);
+                       break;
+               case AF_INET6:
+                       sock.ip6.sin6_port = htons(ctdb->nodes[i]->address.port);
+                       sock_size = sizeof(sock.ip6);
+                       break;
+               default:
+                       DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
+                               sock.sa.sa_family));
+                       continue;
+               }
 #ifdef HAVE_SOCK_SIN_LEN
-               sock.sin_len = sizeof(sock);
+               sock.ip.sin_len = sock_size;
 #endif
-               sock.sin_port = htons(ctdb->nodes[i]->address.port);
-               sock.sin_family = PF_INET;
-               if (ctdb_tcp_get_address(ctdb, ctdb->nodes[i]->address.address, 
-                                        &sock.sin_addr) != 0) {
+
+               ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
+               if (ctcp->listen_fd == -1) {
+                       ctdb_set_error(ctdb, "socket failed\n");
                        continue;
                }
-               
-               if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, 
-                        sizeof(sock)) == 0) {
+
+               set_close_on_exec(ctcp->listen_fd);
+
+               setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
+
+               if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) == 0) {
                        break;
                }
+
+               if (errno == EADDRNOTAVAIL) {
+                       DEBUG(DEBUG_DEBUG,(__location__ " Failed to bind() to socket. %s(%d)\n",
+                                       strerror(errno), errno));
+               } else {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n",
+                                       strerror(errno), errno));
+               }
        }
        
        if (i == ctdb->num_nodes) {
-               DEBUG(0,("Unable to bind to any of the node addresses - giving up\n"));
+               DEBUG(DEBUG_CRIT,("Unable to bind to any of the node addresses - giving up\n"));
                goto failed;
        }
-       ctdb->address = ctdb->nodes[i]->address;
+       ctdb->address.address = talloc_strdup(ctdb, ctdb->nodes[i]->address.address);
+       ctdb->address.port    = ctdb->nodes[i]->address.port;
        ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
                                     ctdb->address.address, 
                                     ctdb->address.port);
-       ctdb->vnn = ctdb->nodes[i]->vnn;
+       ctdb->pnn = ctdb->nodes[i]->pnn;
        ctdb->nodes[i]->flags &= ~NODE_FLAGS_DISCONNECTED;
-       DEBUG(1,("ctdb chose network address %s:%u vnn %u\n", 
+       DEBUG(DEBUG_INFO,("ctdb chose network address %s:%u pnn %u\n", 
                 ctdb->address.address, 
                 ctdb->address.port, 
-                ctdb->vnn));
-
+                ctdb->pnn));
+       /* do we start out in DISABLED mode? */
+       if (ctdb->start_as_disabled != 0) {
+               DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
+               ctdb->nodes[i]->flags |= NODE_FLAGS_DISABLED;
+       }
+       /* do we start out in STOPPED mode? */
+       if (ctdb->start_as_stopped != 0) {
+               DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
+               ctdb->nodes[i]->flags |= NODE_FLAGS_STOPPED;
+       }
+       
        if (listen(ctcp->listen_fd, 10) == -1) {
                goto failed;
        }
 
-       event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
-                    ctdb_listen_event, ctdb);  
+       fde = event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ,
+                          ctdb_listen_event, ctdb);
+       tevent_fd_set_auto_close(fde);
 
        close(lock_fd);
        return 0;
@@ -305,18 +418,10 @@ int ctdb_tcp_listen(struct ctdb_context *ctdb)
 {
        struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
                                                struct ctdb_tcp);
-        struct sockaddr_in sock;
+        ctdb_sock_addr sock;
+       int sock_size;
        int one = 1;
-
-       ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
-       if (ctcp->listen_fd == -1) {
-               ctdb_set_error(ctdb, "socket failed\n");
-               return -1;
-       }
-
-       set_close_on_exec(ctcp->listen_fd);
-
-        setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
+       struct tevent_fd *fde;
 
        /* we can either auto-bind to the first available address, or we can
           use a specified address */
@@ -325,18 +430,41 @@ int ctdb_tcp_listen(struct ctdb_context *ctdb)
        }
 
        ZERO_STRUCT(sock);
-#ifdef HAVE_SOCK_SIN_LEN
-       sock.sin_len = sizeof(sock);
-#endif
-       sock.sin_port = htons(ctdb->address.port);
-       sock.sin_family = PF_INET;
-       
        if (ctdb_tcp_get_address(ctdb, ctdb->address.address, 
-                                &sock.sin_addr) != 0) {
+                                &sock) != 0) {
                goto failed;
        }
        
-       if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
+       switch (sock.sa.sa_family) {
+       case AF_INET:
+               sock.ip.sin_port = htons(ctdb->address.port);
+               sock_size = sizeof(sock.ip);
+               break;
+       case AF_INET6:
+               sock.ip6.sin6_port = htons(ctdb->address.port);
+               sock_size = sizeof(sock.ip6);
+               break;
+       default:
+               DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
+                       sock.sa.sa_family));
+               goto failed;
+       }
+#ifdef HAVE_SOCK_SIN_LEN
+       sock.ip.sin_len = sock_size;
+#endif
+
+       ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
+       if (ctcp->listen_fd == -1) {
+               ctdb_set_error(ctdb, "socket failed\n");
+               return -1;
+       }
+
+       set_close_on_exec(ctcp->listen_fd);
+
+        setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
+
+       if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n", strerror(errno), errno));
                goto failed;
        }
 
@@ -344,13 +472,16 @@ int ctdb_tcp_listen(struct ctdb_context *ctdb)
                goto failed;
        }
 
-       event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
+       fde = event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ,
                     ctdb_listen_event, ctdb);  
+       tevent_fd_set_auto_close(fde);
 
        return 0;
 
 failed:
-       close(ctcp->listen_fd);
+       if (ctcp->listen_fd != -1) {
+               close(ctcp->listen_fd);
+       }
        ctcp->listen_fd = -1;
        return -1;
 }