ctdb-recovery-helper: Introduce push database abstraction
authorAmitay Isaacs <amitay@gmail.com>
Thu, 25 Feb 2016 07:07:11 +0000 (18:07 +1100)
committerMartin Schwenke <martins@samba.org>
Fri, 25 Mar 2016 02:26:15 +0000 (03:26 +0100)
This abstraction uses capabilities of the remote nodes to either send
older PUSH_DB controls or newer DB_PUSH_START and DB_PUSH_CONFIRM
controls.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/server/ctdb_recovery_helper.c

index 0558bc76e70f94113491c3930df9850ceb05acf7..4be992a0e5c9153f1f18ccbfeac148b4db768a77 100644 (file)
@@ -426,6 +426,9 @@ static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
        }
        state.num_buffers += 1;
 
+       LOG("Wrote %d buffers of recovery records for %s\n",
+           state.num_buffers, recdb_name(recdb));
+
        return state.num_buffers;
 }
 
@@ -688,6 +691,521 @@ static bool pull_database_recv(struct tevent_req *req, int *perr)
        return generic_recv(req, perr);
 }
 
+/*
+ * Push database to specified nodes (old style)
+ */
+
+struct push_database_old_state {
+       struct tevent_context *ev;
+       struct ctdb_client_context *client;
+       struct recdb_context *recdb;
+       uint32_t *pnn_list;
+       int count;
+       struct ctdb_rec_buffer *recbuf;
+       int index;
+};
+
+static void push_database_old_push_done(struct tevent_req *subreq);
+
+static struct tevent_req *push_database_old_send(
+                       TALLOC_CTX *mem_ctx,
+                       struct tevent_context *ev,
+                       struct ctdb_client_context *client,
+                       uint32_t *pnn_list, int count,
+                       struct recdb_context *recdb)
+{
+       struct tevent_req *req, *subreq;
+       struct push_database_old_state *state;
+       struct ctdb_req_control request;
+       uint32_t pnn;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct push_database_old_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ev = ev;
+       state->client = client;
+       state->recdb = recdb;
+       state->pnn_list = pnn_list;
+       state->count = count;
+       state->index = 0;
+
+       state->recbuf = recdb_records(recdb, state,
+                                     ctdb_client_pnn(client));
+       if (tevent_req_nomem(state->recbuf, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       pnn = state->pnn_list[state->index];
+
+       ctdb_req_control_push_db(&request, state->recbuf);
+       subreq = ctdb_client_control_send(state, ev, client, pnn,
+                                         TIMEOUT(), &request);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, push_database_old_push_done, req);
+
+       return req;
+}
+
+static void push_database_old_push_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct push_database_old_state *state = tevent_req_data(
+               req, struct push_database_old_state);
+       struct ctdb_req_control request;
+       uint32_t pnn;
+       int ret;
+       bool status;
+
+       status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
+                   recdb_name(state->recdb), state->pnn_list[state->index],
+                   ret);
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       state->index += 1;
+       if (state->index == state->count) {
+               TALLOC_FREE(state->recbuf);
+               tevent_req_done(req);
+               return;
+       }
+
+       pnn = state->pnn_list[state->index];
+
+       ctdb_req_control_push_db(&request, state->recbuf);
+       subreq = ctdb_client_control_send(state, state->ev, state->client,
+                                         pnn, TIMEOUT(), &request);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, push_database_old_push_done, req);
+}
+
+static bool push_database_old_recv(struct tevent_req *req, int *perr)
+{
+       return generic_recv(req, perr);
+}
+
+/*
+ * Push database to specified nodes (new style)
+ */
+
+struct push_database_new_state {
+       struct tevent_context *ev;
+       struct ctdb_client_context *client;
+       struct recdb_context *recdb;
+       uint32_t *pnn_list;
+       int count;
+       uint64_t srvid;
+       uint32_t dmaster;
+       int fd;
+       int num_buffers;
+       int num_buffers_sent;
+       int num_records;
+};
+
+static void push_database_new_started(struct tevent_req *subreq);
+static void push_database_new_send_msg(struct tevent_req *req);
+static void push_database_new_send_done(struct tevent_req *subreq);
+static void push_database_new_confirmed(struct tevent_req *subreq);
+
+static struct tevent_req *push_database_new_send(
+                       TALLOC_CTX *mem_ctx,
+                       struct tevent_context *ev,
+                       struct ctdb_client_context *client,
+                       uint32_t *pnn_list, int count,
+                       struct recdb_context *recdb,
+                       int max_size)
+{
+       struct tevent_req *req, *subreq;
+       struct push_database_new_state *state;
+       struct ctdb_req_control request;
+       struct ctdb_pulldb_ext pulldb_ext;
+       char *filename;
+       off_t offset;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct push_database_new_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ev = ev;
+       state->client = client;
+       state->recdb = recdb;
+       state->pnn_list = pnn_list;
+       state->count = count;
+
+       state->srvid = srvid_next();
+       state->dmaster = ctdb_client_pnn(client);
+       state->num_buffers_sent = 0;
+       state->num_records = 0;
+
+       filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
+       if (tevent_req_nomem(filename, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       state->fd = open(filename, O_RDWR|O_CREAT, 0644);
+       if (state->fd == -1) {
+               tevent_req_error(req, errno);
+               return tevent_req_post(req, ev);
+       }
+       unlink(filename);
+       talloc_free(filename);
+
+       state->num_buffers = recdb_file(recdb, state, state->dmaster,
+                                       state->fd, max_size);
+       if (state->num_buffers == -1) {
+               tevent_req_error(req, ENOMEM);
+               return tevent_req_post(req, ev);
+       }
+
+       offset = lseek(state->fd, 0, SEEK_SET);
+       if (offset != 0) {
+               tevent_req_error(req, EIO);
+               return tevent_req_post(req, ev);
+       }
+
+       pulldb_ext.db_id = recdb_id(recdb);
+       pulldb_ext.srvid = state->srvid;
+
+       ctdb_req_control_db_push_start(&request, &pulldb_ext);
+       subreq = ctdb_client_control_multi_send(state, ev, client,
+                                               pnn_list, count,
+                                               TIMEOUT(), &request);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, push_database_new_started, req);
+
+       return req;
+}
+
+static void push_database_new_started(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct push_database_new_state *state = tevent_req_data(
+               req, struct push_database_new_state);
+       int *err_list;
+       int ret;
+       bool status;
+
+       status = ctdb_client_control_multi_recv(subreq, &ret, state,
+                                               &err_list, NULL);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               int ret2;
+               uint32_t pnn;
+
+               ret2 = ctdb_client_control_multi_error(state->pnn_list,
+                                                      state->count,
+                                                      err_list, &pnn);
+               if (ret2 != 0) {
+                       LOG("control DB_PUSH_START failed for db %s "
+                           "on node %u, ret=%d\n",
+                           recdb_name(state->recdb), pnn, ret2);
+               } else {
+                       LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
+                           recdb_name(state->recdb), ret);
+               }
+               talloc_free(err_list);
+
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       push_database_new_send_msg(req);
+}
+
+static void push_database_new_send_msg(struct tevent_req *req)
+{
+       struct push_database_new_state *state = tevent_req_data(
+               req, struct push_database_new_state);
+       struct tevent_req *subreq;
+       struct ctdb_rec_buffer *recbuf;
+       struct ctdb_req_message message;
+       TDB_DATA data;
+       int ret;
+
+       if (state->num_buffers_sent == state->num_buffers) {
+               struct ctdb_req_control request;
+
+               ctdb_req_control_db_push_confirm(&request,
+                                                recdb_id(state->recdb));
+               subreq = ctdb_client_control_multi_send(state, state->ev,
+                                                       state->client,
+                                                       state->pnn_list,
+                                                       state->count,
+                                                       TIMEOUT(), &request);
+               if (tevent_req_nomem(subreq, req)) {
+                       return;
+               }
+               tevent_req_set_callback(subreq, push_database_new_confirmed,
+                                       req);
+               return;
+       }
+
+       ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       data.dsize = ctdb_rec_buffer_len(recbuf);
+       data.dptr = talloc_size(state, data.dsize);
+       if (tevent_req_nomem(data.dptr, req)) {
+               return;
+       }
+
+       ctdb_rec_buffer_push(recbuf, data.dptr);
+
+       message.srvid = state->srvid;
+       message.data.data = data;
+
+       LOG("Pushing buffer %d with %d records for %s\n",
+           state->num_buffers_sent, recbuf->count, recdb_name(state->recdb));
+
+       subreq = ctdb_client_message_multi_send(state, state->ev,
+                                               state->client,
+                                               state->pnn_list, state->count,
+                                               &message);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, push_database_new_send_done, req);
+
+       state->num_records += recbuf->count;
+
+       talloc_free(data.dptr);
+       talloc_free(recbuf);
+}
+
+static void push_database_new_send_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct push_database_new_state *state = tevent_req_data(
+               req, struct push_database_new_state);
+       bool status;
+       int ret;
+
+       status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               LOG("Sending recovery records failed for %s\n",
+                   recdb_name(state->recdb));
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       state->num_buffers_sent += 1;
+
+       push_database_new_send_msg(req);
+}
+
+static void push_database_new_confirmed(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct push_database_new_state *state = tevent_req_data(
+               req, struct push_database_new_state);
+       struct ctdb_reply_control **reply;
+       int *err_list;
+       bool status;
+       int ret, i;
+       uint32_t num_records;
+
+       status = ctdb_client_control_multi_recv(subreq, &ret, state,
+                                               &err_list, &reply);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               int ret2;
+               uint32_t pnn;
+
+               ret2 = ctdb_client_control_multi_error(state->pnn_list,
+                                                      state->count, err_list,
+                                                      &pnn);
+               if (ret2 != 0) {
+                       LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
+                           " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
+               } else {
+                       LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
+                           recdb_name(state->recdb), ret);
+               }
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       for (i=0; i<state->count; i++) {
+               ret = ctdb_reply_control_db_push_confirm(reply[i],
+                                                        &num_records);
+               if (ret != 0) {
+                       tevent_req_error(req, EPROTO);
+                       return;
+               }
+
+               if (num_records != state->num_records) {
+                       LOG("Node %u received %d of %d records for %s\n",
+                           state->pnn_list[i], num_records,
+                           state->num_records, recdb_name(state->recdb));
+                       tevent_req_error(req, EPROTO);
+                       return;
+               }
+       }
+
+       talloc_free(reply);
+
+       LOG("Pushed %d records for db %s\n",
+           state->num_records, recdb_name(state->recdb));
+
+       tevent_req_done(req);
+}
+
+static bool push_database_new_recv(struct tevent_req *req, int *perr)
+{
+       return generic_recv(req, perr);
+}
+
+/*
+ * wrapper for push_database_old and push_database_new
+ */
+
+struct push_database_state {
+       bool old_done, new_done;
+};
+
+static void push_database_old_done(struct tevent_req *subreq);
+static void push_database_new_done(struct tevent_req *subreq);
+
+static struct tevent_req *push_database_send(
+                       TALLOC_CTX *mem_ctx,
+                       struct tevent_context *ev,
+                       struct ctdb_client_context *client,
+                       uint32_t *pnn_list, int count, uint32_t *caps,
+                       struct ctdb_tunable_list *tun_list,
+                       struct recdb_context *recdb)
+{
+       struct tevent_req *req, *subreq;
+       struct push_database_state *state;
+       uint32_t *old_list, *new_list;
+       int old_count, new_count;
+       int i;
+
+       req = tevent_req_create(mem_ctx, &state, struct push_database_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->old_done = false;
+       state->new_done = false;
+
+       old_count = 0;
+       new_count = 0;
+       old_list = talloc_array(state, uint32_t, count);
+       new_list = talloc_array(state, uint32_t, count);
+       if (tevent_req_nomem(old_list, req) ||
+           tevent_req_nomem(new_list,req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       for (i=0; i<count; i++) {
+               uint32_t pnn = pnn_list[i];
+
+               if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
+                       new_list[new_count] = pnn;
+                       new_count += 1;
+               } else {
+                       old_list[old_count] = pnn;
+                       old_count += 1;
+               }
+       }
+
+       if (old_count > 0) {
+               subreq = push_database_old_send(state, ev, client,
+                                               old_list, old_count, recdb);
+               if (tevent_req_nomem(subreq, req)) {
+                       return tevent_req_post(req, ev);
+               }
+               tevent_req_set_callback(subreq, push_database_old_done, req);
+       } else {
+               state->old_done = true;
+       }
+
+       if (new_count > 0) {
+               subreq = push_database_new_send(state, ev, client,
+                                               new_list, new_count, recdb,
+                                               tun_list->rec_buffer_size_limit);
+               if (tevent_req_nomem(subreq, req)) {
+                       return tevent_req_post(req, ev);
+               }
+               tevent_req_set_callback(subreq, push_database_new_done, req);
+       } else {
+               state->new_done = true;
+       }
+
+       return req;
+}
+
+static void push_database_old_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct push_database_state *state = tevent_req_data(
+               req, struct push_database_state);
+       bool status;
+       int ret;
+
+       status = push_database_old_recv(subreq, &ret);
+       if (! status) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       state->old_done = true;
+
+       if (state->old_done && state->new_done) {
+               tevent_req_done(req);
+       }
+}
+
+static void push_database_new_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct push_database_state *state = tevent_req_data(
+               req, struct push_database_state);
+       bool status;
+       int ret;
+
+       status = push_database_new_recv(subreq, &ret);
+       if (! status) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       state->new_done = true;
+
+       if (state->old_done && state->new_done) {
+               tevent_req_done(req);
+       }
+}
+
+static bool push_database_recv(struct tevent_req *req, int *perr)
+{
+       return generic_recv(req, perr);
+}
+
 /*
  * Collect databases using highest sequence number
  */
@@ -951,8 +1469,6 @@ struct recover_db_state {
 
        const char *db_name, *db_path;
        struct recdb_context *recdb;
-       struct ctdb_rec_buffer *recbuf;
-
 };
 
 static void recover_db_name_done(struct tevent_req *subreq);
@@ -1225,7 +1741,6 @@ static void recover_db_wipedb_done(struct tevent_req *subreq)
                subreq, struct tevent_req);
        struct recover_db_state *state = tevent_req_data(
                req, struct recover_db_state);
-       struct ctdb_req_control request;
        int *err_list;
        int ret;
        bool status;
@@ -1251,18 +1766,10 @@ static void recover_db_wipedb_done(struct tevent_req *subreq)
                return;
        }
 
-       state->recbuf = recdb_records(state->recdb, state, state->destnode);
-       if (tevent_req_nomem(state->recbuf, req)) {
-               return;
-       }
-
-       TALLOC_FREE(state->recdb);
-
-       ctdb_req_control_push_db(&request, state->recbuf);
-       subreq = ctdb_client_control_multi_send(state, state->ev,
-                                               state->client,
-                                               state->pnn_list, state->count,
-                                               TIMEOUT(), &request);
+       subreq = push_database_send(state, state->ev, state->client,
+                                   state->pnn_list, state->count,
+                                   state->caps, state->tun_list,
+                                   state->recdb);
        if (tevent_req_nomem(subreq, req)) {
                return;
        }
@@ -1276,32 +1783,17 @@ static void recover_db_pushdb_done(struct tevent_req *subreq)
        struct recover_db_state *state = tevent_req_data(
                req, struct recover_db_state);
        struct ctdb_req_control request;
-       int *err_list;
        int ret;
        bool status;
 
-       status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
-                                               NULL);
+       status = push_database_recv(subreq, &ret);
        TALLOC_FREE(subreq);
        if (! status) {
-               int ret2;
-               uint32_t pnn;
-
-               ret2 = ctdb_client_control_multi_error(state->pnn_list,
-                                                      state->count,
-                                                      err_list, &pnn);
-               if (ret2 != 0) {
-                       LOG("control PUSHDB failed for db %s on node %u,"
-                           " ret=%d\n", state->db_name, pnn, ret2);
-               } else {
-                       LOG("control PUSHDB failed for db %s, ret=%d\n",
-                           state->db_name, ret);
-               }
                tevent_req_error(req, ret);
                return;
        }
 
-       TALLOC_FREE(state->recbuf);
+       TALLOC_FREE(state->recdb);
 
        ctdb_req_control_db_transaction_commit(&request, &state->transdb);
        subreq = ctdb_client_control_multi_send(state, state->ev,