return 0;
}
+struct db_pull_state {
+ struct ctdb_context *ctdb;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_marshall_buffer *recs;
+ uint32_t pnn;
+ uint64_t srvid;
+ uint32_t num_records;
+};
+
+static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
+ TDB_DATA data, void *private_data)
+{
+ struct db_pull_state *state = (struct db_pull_state *)private_data;
+ struct ctdb_marshall_buffer *recs;
+
+ recs = ctdb_marshall_add(state->ctdb, state->recs,
+ state->ctdb_db->db_id, 0, key, NULL, data);
+ if (recs == NULL) {
+ TALLOC_FREE(state->recs);
+ return -1;
+ }
+ state->recs = recs;
+
+ if (talloc_get_size(state->recs) >=
+ state->ctdb->tunable.rec_buffer_size_limit) {
+ TDB_DATA buffer;
+ int ret;
+
+ buffer = ctdb_marshall_finish(state->recs);
+ ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
+ state->srvid, buffer);
+ if (ret != 0) {
+ TALLOC_FREE(state->recs);
+ return -1;
+ }
+
+ state->num_records += state->recs->count;
+ TALLOC_FREE(state->recs);
+ }
+
+ return 0;
+}
+
+int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
+ struct ctdb_req_control_old *c,
+ TDB_DATA indata, TDB_DATA *outdata)
+{
+ struct ctdb_pulldb_ext *pulldb_ext;
+ struct ctdb_db_context *ctdb_db;
+ struct db_pull_state state;
+ int ret;
+
+ pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
+
+ ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
+ pulldb_ext->db_id));
+ return -1;
+ }
+
+ if (!ctdb_db_frozen(ctdb_db)) {
+ DEBUG(DEBUG_ERR,
+ ("rejecting ctdb_control_pull_db when not frozen\n"));
+ return -1;
+ }
+
+ if (ctdb_db->unhealthy_reason) {
+ /* this is just a warning, as the tdb should be empty anyway */
+ DEBUG(DEBUG_WARNING,
+ ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ }
+
+ state.ctdb = ctdb;
+ state.ctdb_db = ctdb_db;
+ state.recs = NULL;
+ state.pnn = c->hdr.srcnode;
+ state.srvid = pulldb_ext->srvid;
+ state.num_records = 0;
+
+ if (ctdb_lockdb_mark(ctdb_db) != 0) {
+ DEBUG(DEBUG_ERR,
+ (__location__ " Failed to get lock on entire db - failing\n"));
+ return -1;
+ }
+
+ ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
+ if (ret == -1) {
+ DEBUG(DEBUG_ERR,
+ (__location__ " Failed to get traverse db '%s'\n",
+ ctdb_db->db_name));
+ ctdb_lockdb_unmark(ctdb_db);
+ return -1;
+ }
+
+ /* Last few records */
+ if (state.recs != NULL) {
+ TDB_DATA buffer;
+
+ buffer = ctdb_marshall_finish(state.recs);
+ ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
+ state.srvid, buffer);
+ if (ret != 0) {
+ TALLOC_FREE(state.recs);
+ ctdb_lockdb_unmark(ctdb_db);
+ return -1;
+ }
+
+ state.num_records += state.recs->count;
+ TALLOC_FREE(state.recs);
+ }
+
+ ctdb_lockdb_unmark(ctdb_db);
+
+ outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
+ if (outdata->dptr == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
+ return -1;
+ }
+
+ memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
+ outdata->dsize = sizeof(uint32_t);
+
+ return 0;
+}
+
/*
push a bunch of records into a ltdb, filtering by rsn
*/
return -1;
}
+struct db_push_state {
+ struct ctdb_context *ctdb;
+ struct ctdb_db_context *ctdb_db;
+ uint64_t srvid;
+ uint32_t num_records;
+ bool failed;
+};
+
+static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
+ void *private_data)
+{
+ struct db_push_state *state = talloc_get_type(
+ private_data, struct db_push_state);
+ struct ctdb_marshall_buffer *recs;
+ struct ctdb_rec_data_old *rec;
+ int i, ret;
+
+ if (state->failed) {
+ return;
+ }
+
+ recs = (struct ctdb_marshall_buffer *)indata.dptr;
+ rec = (struct ctdb_rec_data_old *)&recs->data[0];
+
+ DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
+ recs->count, recs->db_id));
+
+ for (i=0; i<recs->count; i++) {
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header *hdr;
+
+ key.dptr = &rec->data[0];
+ key.dsize = rec->keylen;
+ data.dptr = &rec->data[key.dsize];
+ data.dsize = rec->datalen;
+
+ if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
+ goto failed;
+ }
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+ /* Strip off any read only record flags.
+ * All readonly records are revoked implicitely by a recovery.
+ */
+ hdr->flags &= ~CTDB_REC_RO_FLAGS;
+
+ data.dptr += sizeof(*hdr);
+ data.dsize -= sizeof(*hdr);
+
+ ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ (__location__ " Unable to store record\n"));
+ goto failed;
+ }
+
+ rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
+ }
+
+ DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
+ recs->count, recs->db_id));
+
+ state->num_records += recs->count;
+ return;
+
+failed:
+ state->failed = true;
+}
+
+int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+ struct ctdb_pulldb_ext *pulldb_ext;
+ struct ctdb_db_context *ctdb_db;
+ struct db_push_state *state;
+ int ret;
+
+ pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
+
+ ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR,
+ (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
+ return -1;
+ }
+
+ if (!ctdb_db_frozen(ctdb_db)) {
+ DEBUG(DEBUG_ERR,
+ ("rejecting ctdb_control_db_push_start when not frozen\n"));
+ return -1;
+ }
+
+ if (ctdb_db->push_started) {
+ DEBUG(DEBUG_WARNING,
+ (__location__ " DB push already started for %s\n",
+ ctdb_db->db_name));
+
+ /* De-register old state */
+ state = (struct db_push_state *)ctdb_db->push_state;
+ if (state != NULL) {
+ srvid_deregister(ctdb->srv, state->srvid, state);
+ talloc_free(state);
+ ctdb_db->push_state = NULL;
+ }
+ }
+
+ state = talloc_zero(ctdb_db, struct db_push_state);
+ if (state == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
+ return -1;
+ }
+
+ state->ctdb = ctdb;
+ state->ctdb_db = ctdb_db;
+ state->srvid = pulldb_ext->srvid;
+ state->failed = false;
+
+ ret = srvid_register(ctdb->srv, state, state->srvid,
+ db_push_msg_handler, state);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ (__location__ " Failed to register srvid for db push\n"));
+ talloc_free(state);
+ return -1;
+ }
+
+ if (ctdb_lockdb_mark(ctdb_db) != 0) {
+ DEBUG(DEBUG_ERR,
+ (__location__ " Failed to get lock on entire db - failing\n"));
+ srvid_deregister(ctdb->srv, state->srvid, state);
+ talloc_free(state);
+ return -1;
+ }
+
+ ctdb_db->push_started = true;
+ ctdb_db->push_state = state;
+
+ return 0;
+}
+
+int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
+ TDB_DATA indata, TDB_DATA *outdata)
+{
+ uint32_t db_id;
+ struct ctdb_db_context *ctdb_db;
+ struct db_push_state *state;
+
+ db_id = *(uint32_t *)indata.dptr;
+
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
+ return -1;
+ }
+
+ if (!ctdb_db_frozen(ctdb_db)) {
+ DEBUG(DEBUG_ERR,
+ ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
+ return -1;
+ }
+
+ if (!ctdb_db->push_started) {
+ DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
+ return -1;
+ }
+
+ if (ctdb_db->readonly) {
+ DEBUG(DEBUG_ERR,
+ ("Clearing the tracking database for dbid 0x%x\n",
+ ctdb_db->db_id));
+ if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
+ DEBUG(DEBUG_ERR,
+ ("Failed to wipe tracking database for 0x%x."
+ " Dropping read-only delegation support\n",
+ ctdb_db->db_id));
+ ctdb_db->readonly = false;
+ tdb_close(ctdb_db->rottdb);
+ ctdb_db->rottdb = NULL;
+ ctdb_db->readonly = false;
+ }
+
+ while (ctdb_db->revokechild_active != NULL) {
+ talloc_free(ctdb_db->revokechild_active);
+ }
+ }
+
+ ctdb_lockdb_unmark(ctdb_db);
+
+ state = (struct db_push_state *)ctdb_db->push_state;
+ if (state == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
+ return -1;
+ }
+
+ srvid_deregister(ctdb->srv, state->srvid, state);
+
+ outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
+ if (outdata->dptr == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
+ talloc_free(state);
+ ctdb_db->push_state = NULL;
+ return -1;
+ }
+
+ memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
+ outdata->dsize = sizeof(uint32_t);
+
+ talloc_free(state);
+ ctdb_db->push_state = NULL;
+
+ return 0;
+}
+
struct ctdb_set_recmode_state {
struct ctdb_context *ctdb;
struct ctdb_req_control_old *c;