]> git.samba.org - vlendec/samba-autobuild/.git/commitdiff
ctdb-daemon: Implement new controls DB_PULL and DB_PUSH_START/DB_PUSH_CONFIRM
authorAmitay Isaacs <amitay@gmail.com>
Fri, 19 Feb 2016 06:32:09 +0000 (17:32 +1100)
committerMartin Schwenke <martins@samba.org>
Fri, 25 Mar 2016 02:26:15 +0000 (03:26 +0100)
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/include/ctdb_private.h
ctdb/server/ctdb_control.c
ctdb/server/ctdb_recover.c

index b7c3e5d31a6229161cba34fc12cb0313e2740216..04574fe9c4c45900f5a6020ad237e247ffb3e826 100644 (file)
@@ -437,6 +437,9 @@ struct ctdb_db_context {
        bool freeze_transaction_started;
        uint32_t freeze_transaction_id;
        uint32_t generation;
+
+       bool push_started;
+       void *push_state;
 };
 
 
@@ -873,6 +876,14 @@ int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata,
                             TDB_DATA *outdata);
 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata);
 
+int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
+                            struct ctdb_req_control_old *c,
+                            TDB_DATA indata, TDB_DATA *outdata);
+int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb,
+                                  TDB_DATA indata);
+int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
+                                    TDB_DATA indata, TDB_DATA *outdata);
+
 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb);
 
 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
index e38832653b20082d0e9fa04cec94a84262fa2ef0..7d18969b1b01089d404ae241d035cd5c7614f1d2 100644 (file)
@@ -705,6 +705,18 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
                CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
                return ctdb_control_db_transaction_cancel(ctdb, indata);
 
+       case CTDB_CONTROL_DB_PULL:
+               CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_pulldb_ext));
+               return ctdb_control_db_pull(ctdb, c, indata, outdata);
+
+       case CTDB_CONTROL_DB_PUSH_START:
+               CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_pulldb_ext));
+               return ctdb_control_db_push_start(ctdb, indata);
+
+       case CTDB_CONTROL_DB_PUSH_CONFIRM:
+               CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+               return ctdb_control_db_push_confirm(ctdb, indata, outdata);
+
        default:
                DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index 79b5b2b9748d061f7e31dff179b5d9cf669a6e4a..102854564bc9e8fe97c48aa6b07bd5f6689f6125 100644 (file)
@@ -313,6 +313,133 @@ int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DAT
        return 0;
 }
 
+struct db_pull_state {
+       struct ctdb_context *ctdb;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_marshall_buffer *recs;
+       uint32_t pnn;
+       uint64_t srvid;
+       uint32_t num_records;
+};
+
+static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
+                           TDB_DATA data, void *private_data)
+{
+       struct db_pull_state *state = (struct db_pull_state *)private_data;
+       struct ctdb_marshall_buffer *recs;
+
+       recs = ctdb_marshall_add(state->ctdb, state->recs,
+                                state->ctdb_db->db_id, 0, key, NULL, data);
+       if (recs == NULL) {
+               TALLOC_FREE(state->recs);
+               return -1;
+       }
+       state->recs = recs;
+
+       if (talloc_get_size(state->recs) >=
+                       state->ctdb->tunable.rec_buffer_size_limit) {
+               TDB_DATA buffer;
+               int ret;
+
+               buffer = ctdb_marshall_finish(state->recs);
+               ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
+                                              state->srvid, buffer);
+               if (ret != 0) {
+                       TALLOC_FREE(state->recs);
+                       return -1;
+               }
+
+               state->num_records += state->recs->count;
+               TALLOC_FREE(state->recs);
+       }
+
+       return 0;
+}
+
+int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
+                            struct ctdb_req_control_old *c,
+                            TDB_DATA indata, TDB_DATA *outdata)
+{
+       struct ctdb_pulldb_ext *pulldb_ext;
+       struct ctdb_db_context *ctdb_db;
+       struct db_pull_state state;
+       int ret;
+
+       pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
+
+       ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
+                                pulldb_ext->db_id));
+               return -1;
+       }
+
+       if (!ctdb_db_frozen(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("rejecting ctdb_control_pull_db when not frozen\n"));
+               return -1;
+       }
+
+       if (ctdb_db->unhealthy_reason) {
+               /* this is just a warning, as the tdb should be empty anyway */
+               DEBUG(DEBUG_WARNING,
+                     ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
+                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
+       }
+
+       state.ctdb = ctdb;
+       state.ctdb_db = ctdb_db;
+       state.recs = NULL;
+       state.pnn = c->hdr.srcnode;
+       state.srvid = pulldb_ext->srvid;
+       state.num_records = 0;
+
+       if (ctdb_lockdb_mark(ctdb_db) != 0) {
+               DEBUG(DEBUG_ERR,
+                     (__location__ " Failed to get lock on entire db - failing\n"));
+               return -1;
+       }
+
+       ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
+       if (ret == -1) {
+               DEBUG(DEBUG_ERR,
+                     (__location__ " Failed to get traverse db '%s'\n",
+                      ctdb_db->db_name));
+               ctdb_lockdb_unmark(ctdb_db);
+               return -1;
+       }
+
+       /* Last few records */
+       if (state.recs != NULL) {
+               TDB_DATA buffer;
+
+               buffer = ctdb_marshall_finish(state.recs);
+               ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
+                                              state.srvid, buffer);
+               if (ret != 0) {
+                       TALLOC_FREE(state.recs);
+                       ctdb_lockdb_unmark(ctdb_db);
+                       return -1;
+               }
+
+               state.num_records += state.recs->count;
+               TALLOC_FREE(state.recs);
+       }
+
+       ctdb_lockdb_unmark(ctdb_db);
+
+       outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
+       if (outdata->dptr == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
+               return -1;
+       }
+
+       memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
+       outdata->dsize = sizeof(uint32_t);
+
+       return 0;
+}
+
 /*
   push a bunch of records into a ltdb, filtering by rsn
  */
@@ -407,6 +534,219 @@ failed:
        return -1;
 }
 
+struct db_push_state {
+       struct ctdb_context *ctdb;
+       struct ctdb_db_context *ctdb_db;
+       uint64_t srvid;
+       uint32_t num_records;
+       bool failed;
+};
+
+static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
+                               void *private_data)
+{
+       struct db_push_state *state = talloc_get_type(
+               private_data, struct db_push_state);
+       struct ctdb_marshall_buffer *recs;
+       struct ctdb_rec_data_old *rec;
+       int i, ret;
+
+       if (state->failed) {
+               return;
+       }
+
+       recs = (struct ctdb_marshall_buffer *)indata.dptr;
+       rec = (struct ctdb_rec_data_old *)&recs->data[0];
+
+       DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
+                          recs->count, recs->db_id));
+
+       for (i=0; i<recs->count; i++) {
+               TDB_DATA key, data;
+               struct ctdb_ltdb_header *hdr;
+
+               key.dptr = &rec->data[0];
+               key.dsize = rec->keylen;
+               data.dptr = &rec->data[key.dsize];
+               data.dsize = rec->datalen;
+
+               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+                       DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
+                       goto failed;
+               }
+
+               hdr = (struct ctdb_ltdb_header *)data.dptr;
+               /* Strip off any read only record flags.
+                * All readonly records are revoked implicitely by a recovery.
+                */
+               hdr->flags &= ~CTDB_REC_RO_FLAGS;
+
+               data.dptr += sizeof(*hdr);
+               data.dsize -= sizeof(*hdr);
+
+               ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,
+                             (__location__ " Unable to store record\n"));
+                       goto failed;
+               }
+
+               rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
+       }
+
+       DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
+                           recs->count, recs->db_id));
+
+       state->num_records += recs->count;
+       return;
+
+failed:
+       state->failed = true;
+}
+
+int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+       struct ctdb_pulldb_ext *pulldb_ext;
+       struct ctdb_db_context *ctdb_db;
+       struct db_push_state *state;
+       int ret;
+
+       pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
+
+       ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR,
+                     (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
+               return -1;
+       }
+
+       if (!ctdb_db_frozen(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("rejecting ctdb_control_db_push_start when not frozen\n"));
+               return -1;
+       }
+
+       if (ctdb_db->push_started) {
+               DEBUG(DEBUG_WARNING,
+                     (__location__ " DB push already started for %s\n",
+                      ctdb_db->db_name));
+
+               /* De-register old state */
+               state = (struct db_push_state *)ctdb_db->push_state;
+               if (state != NULL) {
+                       srvid_deregister(ctdb->srv, state->srvid, state);
+                       talloc_free(state);
+                       ctdb_db->push_state = NULL;
+               }
+       }
+
+       state = talloc_zero(ctdb_db, struct db_push_state);
+       if (state == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
+               return -1;
+       }
+
+       state->ctdb = ctdb;
+       state->ctdb_db = ctdb_db;
+       state->srvid = pulldb_ext->srvid;
+       state->failed = false;
+
+       ret = srvid_register(ctdb->srv, state, state->srvid,
+                            db_push_msg_handler, state);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,
+                     (__location__ " Failed to register srvid for db push\n"));
+               talloc_free(state);
+               return -1;
+       }
+
+       if (ctdb_lockdb_mark(ctdb_db) != 0) {
+               DEBUG(DEBUG_ERR,
+                     (__location__ " Failed to get lock on entire db - failing\n"));
+               srvid_deregister(ctdb->srv, state->srvid, state);
+               talloc_free(state);
+               return -1;
+       }
+
+       ctdb_db->push_started = true;
+       ctdb_db->push_state = state;
+
+       return 0;
+}
+
+int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
+                                    TDB_DATA indata, TDB_DATA *outdata)
+{
+       uint32_t db_id;
+       struct ctdb_db_context *ctdb_db;
+       struct db_push_state *state;
+
+       db_id = *(uint32_t *)indata.dptr;
+
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
+               return -1;
+       }
+
+       if (!ctdb_db_frozen(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
+               return -1;
+       }
+
+       if (!ctdb_db->push_started) {
+               DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
+               return -1;
+       }
+
+       if (ctdb_db->readonly) {
+               DEBUG(DEBUG_ERR,
+                     ("Clearing the tracking database for dbid 0x%x\n",
+                      ctdb_db->db_id));
+               if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
+                       DEBUG(DEBUG_ERR,
+                             ("Failed to wipe tracking database for 0x%x."
+                              " Dropping read-only delegation support\n",
+                              ctdb_db->db_id));
+                       ctdb_db->readonly = false;
+                       tdb_close(ctdb_db->rottdb);
+                       ctdb_db->rottdb = NULL;
+                       ctdb_db->readonly = false;
+               }
+
+               while (ctdb_db->revokechild_active != NULL) {
+                       talloc_free(ctdb_db->revokechild_active);
+               }
+       }
+
+       ctdb_lockdb_unmark(ctdb_db);
+
+       state = (struct db_push_state *)ctdb_db->push_state;
+       if (state == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
+               return -1;
+       }
+
+       srvid_deregister(ctdb->srv, state->srvid, state);
+
+       outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
+       if (outdata->dptr == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
+               talloc_free(state);
+               ctdb_db->push_state = NULL;
+               return -1;
+       }
+
+       memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
+       outdata->dsize = sizeof(uint32_t);
+
+       talloc_free(state);
+       ctdb_db->push_state = NULL;
+
+       return 0;
+}
+
 struct ctdb_set_recmode_state {
        struct ctdb_context *ctdb;
        struct ctdb_req_control_old *c;