ctdb-tools: Rework killtcp logic into a tevent_req-based computation
authorMartin Schwenke <martin@meltin.net>
Fri, 30 Jun 2017 09:50:43 +0000 (19:50 +1000)
committerMartin Schwenke <martins@samba.org>
Tue, 19 Sep 2017 11:30:27 +0000 (13:30 +0200)
Signed-off-by: Martin Schwenke <martin@meltin.net>
Reviewed-by: Amitay Isaacs <amitay@gmail.com>
ctdb/tools/ctdb_killtcp.c

index e37a373bb1f2abf42c607636a2654d64bf5b8b90..1917652ee66165a96407c4c9768fce4ad99b7084 100644 (file)
@@ -24,6 +24,7 @@
 #include "system/network.h"
 
 #include "lib/util/debug.h"
+#include "lib/util/tevent_unix.h"
 
 #include "protocol/protocol.h"
 #include "protocol/protocol_util.h"
 #include "common/logging.h"
 
 
-/* Contains the listening socket and the list of TCP connections to
- * kill */
-struct ctdb_kill_tcp {
+struct reset_connections_state {
+       struct tevent_context *ev;
        int capture_fd;
        struct tevent_fd *fde;
        struct db_hash_context *connections;
        void *private_data;
-       void *destructor_data;
        unsigned int attempts;
        unsigned int max_attempts;
        struct timeval retry_interval;
@@ -49,32 +48,41 @@ struct ctdb_kill_tcp {
 };
 
 
-static void capture_tcp_handler(struct tevent_context *ev,
-                               struct tevent_fd *fde,
-                               uint16_t flags, void *private_data);
+static void reset_connections_capture_tcp_handler(struct tevent_context *ev,
+                                                 struct tevent_fd *fde,
+                                                 uint16_t flags,
+                                                 void *private_data);
+static void reset_connections_batch(struct tevent_req *subreq);
+static int reset_connections_tickle_connection(
+                                       uint8_t *keybuf, size_t keylen,
+                                       uint8_t *databuf, size_t datalen,
+                                       void *private_data);
 
-static int ctdb_kill_tcp_init(TALLOC_CTX *mem_ctx,
+static struct tevent_req *reset_connections_send(
+                             TALLOC_CTX *mem_ctx,
                              struct tevent_context *ev,
                              const char *iface,
-                             struct ctdb_connection_list *conn_list,
-                             struct ctdb_kill_tcp **out)
+                             struct ctdb_connection_list *conn_list)
 {
-       struct ctdb_kill_tcp *state;
+       struct tevent_req *req, *subreq;
+       struct reset_connections_state *state;
        int i, ret;
 
-       state = talloc_zero(mem_ctx, struct ctdb_kill_tcp);
-       if (state == NULL) {
-               D_ERR("Out of memory\n");
-               return ENOMEM;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct reset_connections_state);
+       if (req == NULL) {
+               return NULL;
        }
 
+       state->ev = ev;
+
        ret = db_hash_init(state, "connections", 2048, DB_HASH_SIMPLE,
                           &state->connections);
        if (ret != 0) {
                D_ERR("Failed to initialise connection hash (%s)\n",
                      strerror(ret));
-               talloc_free(state);
-               return ret;
+               tevent_req_error(req, ret);
+               return tevent_req_post(req, ev);
        }
 
        for (i = 0; i < conn_list->num; i++) {
@@ -87,8 +95,8 @@ static int ctdb_kill_tcp_init(TALLOC_CTX *mem_ctx,
                if (ret != 0) {
                        D_ERR("Error adding connection to hash (%s)\n",
                              strerror(ret));
-                       talloc_free(state);
-                       return ret;
+                       tevent_req_error(req, ret);
+                       return tevent_req_post(req, ev);
                }
        }
 
@@ -106,32 +114,38 @@ static int ctdb_kill_tcp_init(TALLOC_CTX *mem_ctx,
        if (state->capture_fd == -1) {
                D_ERR("Failed to open capture socket on iface '%s' (%s)\n",
                      iface, strerror(errno));
-               talloc_free(state);
-               return EIO;
+                       tevent_req_error(req, EIO);
+                       return tevent_req_post(req, ev);
        }
 
        state->fde = tevent_add_fd(ev, state, state->capture_fd,
-                                  TEVENT_FD_READ, capture_tcp_handler,
+                                  TEVENT_FD_READ,
+                                  reset_connections_capture_tcp_handler,
                                   state);
-       if (state->fde == NULL) {
-               D_ERR("Out of memory\n");
-               talloc_free(state);
-               return ENOMEM;
+       if (tevent_req_nomem(state->fde, req)) {
+               return tevent_req_post(req, ev);
        }
        tevent_fd_set_auto_close(state->fde);
 
-       *out = state;
-       return 0;
+       subreq = tevent_wakeup_send(state, ev, tevent_timeval_current_ofs(0,0));
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, reset_connections_batch, req);
+
+       return req;
 }
 
 /*
   called when we get a read event on the raw socket
  */
-static void capture_tcp_handler(struct tevent_context *ev,
-                               struct tevent_fd *fde,
-                               uint16_t flags, void *private_data)
+static void reset_connections_capture_tcp_handler(struct tevent_context *ev,
+                                                 struct tevent_fd *fde,
+                                                 uint16_t flags,
+                                                 void *private_data)
 {
-       struct ctdb_kill_tcp *killtcp = talloc_get_type(private_data, struct ctdb_kill_tcp);
+       struct reset_connections_state *state = talloc_get_type_abort(
+               private_data, struct reset_connections_state);
        /* 0 the parts that don't get set by ctdb_sys_read_tcp_packet */
        struct ctdb_connection conn;
        uint32_t ack_seq, seq;
@@ -139,8 +153,8 @@ static void capture_tcp_handler(struct tevent_context *ev,
        uint16_t window;
        int ret;
 
-       ret = ctdb_sys_read_tcp_packet(killtcp->capture_fd,
-                                      killtcp->private_data,
+       ret = ctdb_sys_read_tcp_packet(state->capture_fd,
+                                      state->private_data,
                                       &conn.server, &conn.client,
                                       &ack_seq, &seq, &rst, &window);
        if (ret != 0) {
@@ -153,13 +167,13 @@ static void capture_tcp_handler(struct tevent_context *ev,
                D_DEBUG("Ignoring packet: %s, "
                        "seq=%"PRIu32", ack_seq=%"PRIu32", "
                        "rst=%d, window=%"PRIu16"\n",
-                       ctdb_connection_to_string(killtcp, &conn, false),
+                       ctdb_connection_to_string(state, &conn, false),
                        seq, ack_seq, rst, ntohs(window));
                return;
        }
 
        /* Check if this connection is one being reset, if found then delete */
-       ret = db_hash_delete(killtcp->connections,
+       ret = db_hash_delete(state->connections,
                             (uint8_t*)&conn, sizeof(conn));
        if (ret == ENOENT) {
                /* Packet for some other connection, ignore */
@@ -170,8 +184,8 @@ static void capture_tcp_handler(struct tevent_context *ev,
                return;
        }
 
-       D_INFO("Sending a TCP RST to kill connection %s\n",
-              ctdb_connection_to_string(killtcp, &conn, true));
+       D_INFO("Sending a TCP RST to for connection %s\n",
+              ctdb_connection_to_string(state, &conn, true));
 
        ret = ctdb_sys_send_tcp(&conn.server, &conn.client, ack_seq, seq, 1);
        if (ret != 0) {
@@ -179,65 +193,44 @@ static void capture_tcp_handler(struct tevent_context *ev,
        }
 }
 
-
-static int tickle_connection_parser(uint8_t *keybuf, size_t keylen,
-                                   uint8_t *databuf, size_t datalen,
-                                   void *private_data)
-{
-       struct ctdb_kill_tcp *killtcp = talloc_get_type_abort(
-               private_data, struct ctdb_kill_tcp);
-       struct ctdb_connection *conn;
-       int ret;
-
-       if (keylen != sizeof(*conn)) {
-               DBG_WARNING("Unexpected data in connection hash\n");
-               return 0;
-       }
-
-       conn = (struct ctdb_connection *)keybuf;
-
-       killtcp->batch_count++;
-       if (killtcp->batch_count > killtcp->batch_size) {
-               /* Terminate the traverse */
-               return 1;
-       }
-
-       ret = ctdb_sys_send_tcp(&conn->server, &conn->client, 0, 0, 0);
-       if (ret != 0) {
-               DBG_ERR("Error sending tickle ACK\n");
-               /* continue */
-       }
-
-       return 0;
-}
-
 /*
  * Called periodically until all sentenced connections have been reset
  * or enough attempts have been made
  */
-static void ctdb_tickle_sentenced_connections(struct tevent_context *ev,
-                                             struct tevent_timer *te,
-                                             struct timeval t, void *private_data)
+static void reset_connections_batch(struct tevent_req *subreq)
 {
-       struct ctdb_kill_tcp *killtcp = talloc_get_type(private_data, struct ctdb_kill_tcp);
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct reset_connections_state *state = tevent_req_data(
+               req, struct reset_connections_state);
+       bool status;
        int count, ret;
 
+       status = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+
+       if (! status) {
+               DBG_WARNING("Unexpected error on timer expiry\n");
+               /* Keep going... */
+       }
+
        /* loop over up to batch_size connections sending tickle ACKs */
-       killtcp->batch_count = 0;
-       ret = db_hash_traverse(killtcp->connections,
-                              tickle_connection_parser, killtcp, NULL);
+       state->batch_count = 0;
+       ret = db_hash_traverse(state->connections,
+                              reset_connections_tickle_connection,
+                              state, NULL);
        if (ret != 0) {
                DBG_WARNING("Unexpected error traversing connections (%s)\n",
                            strerror(ret));
        }
 
-       killtcp->attempts++;
+       state->attempts++;
 
        /*
         * If there are no more connections to kill or we have tried
-        * too many times we can remove the entire killtcp structure
+        * too many times we're finished
         */
-       ret = db_hash_traverse(killtcp->connections, NULL, NULL, &count);
+       ret = db_hash_traverse(state->connections, NULL, NULL, &count);
        if (ret != 0) {
                /* What now?  Try again until max_attempts reached */
                DBG_WARNING("Unexpected error traversing connections (%s)\n",
@@ -245,28 +238,68 @@ static void ctdb_tickle_sentenced_connections(struct tevent_context *ev,
                count = 1;
        }
        if (count == 0 ||
-           killtcp->attempts >= killtcp->max_attempts) {
-               talloc_free(killtcp);
+           state->attempts >= state->max_attempts) {
+               tevent_req_done(req);
                return;
        }
 
-       /* try tickling them again in a seconds time
-        */
-       tevent_add_timer(ev, killtcp,
-                        tevent_timeval_current_ofs(
-                                killtcp->retry_interval.tv_sec,
-                                killtcp->retry_interval.tv_usec),
-                        ctdb_tickle_sentenced_connections, killtcp);
+       /* Schedule next attempt */
+       subreq = tevent_wakeup_send(state, state->ev,
+                                   tevent_timeval_current_ofs(
+                                           state->retry_interval.tv_sec,
+                                           state->retry_interval.tv_usec));
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, reset_connections_batch, req);
 }
 
-static int ctdb_killtcp_destructor(struct ctdb_kill_tcp *killtcp)
+static int reset_connections_tickle_connection(
+                                       uint8_t *keybuf, size_t keylen,
+                                       uint8_t *databuf, size_t datalen,
+                                       void *private_data)
 {
-       bool *done = killtcp->destructor_data;
-       *done = true;
+       struct reset_connections_state *state = talloc_get_type_abort(
+               private_data, struct reset_connections_state);
+       struct ctdb_connection *conn;
+       int ret;
+
+       if (keylen != sizeof(*conn)) {
+               DBG_WARNING("Unexpected data in connection hash\n");
+               return 0;
+       }
+
+       conn = (struct ctdb_connection *)keybuf;
+
+       state->batch_count++;
+       if (state->batch_count > state->batch_size) {
+               /* Terminate the traverse */
+               return 1;
+       }
+
+       ret = ctdb_sys_send_tcp(&conn->server, &conn->client, 0, 0, 0);
+       if (ret != 0) {
+               DBG_ERR("Error sending tickle ACK\n");
+               /* continue */
+       }
 
        return 0;
 }
 
+static bool reset_connections_recv(struct tevent_req *req, int *perr)
+{
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               if (perr != NULL) {
+                       *perr = err;
+               }
+               return false;
+       }
+
+       return true;
+}
+
 static void usage(const char *prog)
 {
        printf("usage: %s <interface> [ <srcip:port> <dstip:port> ]\n", prog);
@@ -276,13 +309,13 @@ static void usage(const char *prog)
 int main(int argc, char **argv)
 {
        struct ctdb_connection conn;
-       struct ctdb_kill_tcp *killtcp = NULL;
        struct tevent_context *ev = NULL;
        struct TALLOC_CONTEXT *mem_ctx = NULL;
        struct ctdb_connection_list *conn_list = NULL;
        const char *t;
+       struct tevent_req *req;
        int debug_level;
-       bool done;
+       bool status;
        int ret;
 
        /* Set the debug level */
@@ -339,7 +372,7 @@ int main(int argc, char **argv)
                goto fail;
        }
 
-        ev = tevent_context_init(mem_ctx);
+       ev = tevent_context_init(mem_ctx);
        if (ev == NULL) {
                DEBUG(DEBUG_ERR, ("Failed to initialise tevent\n"));
                goto fail;
@@ -351,22 +384,17 @@ int main(int argc, char **argv)
                return 0;
        }
 
-       ret = ctdb_kill_tcp_init(mem_ctx, ev, argv[1], conn_list, &killtcp);
-       if (ret != 0) {
+       req = reset_connections_send(mem_ctx, ev, argv[1], conn_list);
+       if (req == NULL) {
                goto fail;
        }
 
-       done = false;
-       killtcp->destructor_data = &done;
-       talloc_set_destructor(killtcp, ctdb_killtcp_destructor);
+       tevent_req_poll(req, ev);
 
-       /* Do the initial processing of connections */
-       tevent_add_timer(ev, killtcp,
-                        tevent_timeval_current_ofs(0, 0),
-                        ctdb_tickle_sentenced_connections, killtcp);
-
-       while (!done) {
-               tevent_loop_once(ev);
+       status = reset_connections_recv(req, &ret);
+       if (! status) {
+               D_ERR("Failed to kill connections (%s)\n", strerror(ret));
+               goto fail;
        }
 
        talloc_free(mem_ctx);