*/
#include "includes.h"
-
#ifdef CLUSTER_SUPPORT
-
#include "ctdb.h"
#include "ctdb_private.h"
+#include "ctdbd_conn.h"
struct db_ctdb_ctx {
struct tdb_wrap *wtdb;
uint32 db_id;
- struct ctdbd_connection *conn;
};
struct db_ctdb_rec {
struct ctdb_ltdb_header header;
};
-static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx);
+static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key,
+ bool persistent);
static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
{
return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
}
+
+/* for persistent databases the store is a bit different. We have to
+ ask the ctdb daemon to push the record to all nodes after the
+ store */
+static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
+{
+ struct db_ctdb_rec *crec;
+ struct db_record *record;
+ TDB_DATA cdata;
+ int ret;
+ NTSTATUS status;
+ uint32_t count;
+ int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
+
+ for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
+ (count < max_retries) && !NT_STATUS_IS_OK(status);
+ count++)
+ {
+ if (count > 0) {
+ /* retry */
+ /*
+ * There is a hack here: We use rec as a memory
+ * context and re-use it as the record struct ptr.
+ * We don't free the record data allocated
+ * in each turn. So all gets freed when the caller
+ * releases the original record. This is because
+ * we don't get the record passed in by reference
+ * in the first place and the caller relies on
+ * having to free the record himself.
+ */
+ record = fetch_locked_internal(crec->ctdb_ctx,
+ rec,
+ rec->key,
+ true /* persistent */);
+ if (record == NULL) {
+ DEBUG(5, ("fetch_locked_internal failed.\n"));
+ status = NT_STATUS_NO_MEMORY;
+ break;
+ }
+ }
+
+ crec = talloc_get_type_abort(record->private_data,
+ struct db_ctdb_rec);
+
+ cdata.dsize = sizeof(crec->header) + data.dsize;
+
+ if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ crec->header.rsn++;
+
+ memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
+ memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
+
+ status = ctdbd_start_persistent_update(
+ messaging_ctdbd_connection(),
+ crec->ctdb_ctx->db_id,
+ rec->key,
+ cdata);
+
+ if (NT_STATUS_IS_OK(status)) {
+ ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
+ cdata, TDB_REPLACE);
+ status = (ret == 0) ? NT_STATUS_OK
+ : NT_STATUS_INTERNAL_DB_CORRUPTION;
+ }
+
+ /*
+ * release the lock *now* in order to prevent deadlocks.
+ *
+ * There is a tradeoff: Usually, the record is still locked
+ * after db->store operation. This lock is usually released
+ * via the talloc destructor with the TALLOC_FREE to
+ * the record. So we have two choices:
+ *
+ * - Either re-lock the record after the call to persistent_store
+ * or cancel_persistent update and this way not changing any
+ * assumptions callers may have about the state, but possibly
+ * introducing new race conditions.
+ *
+ * - Or don't lock the record again but just remove the
+ * talloc_destructor. This is less racy but assumes that
+ * the lock is always released via TALLOC_FREE of the record.
+ *
+ * I choose the first variant for now since it seems less racy.
+ * We can't guarantee that we succeed in getting the lock
+ * anyways. The only real danger here is that a caller
+ * performs multiple store operations after a fetch_locked()
+ * which is currently not the case.
+ */
+ tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
+ talloc_set_destructor(record, NULL);
+
+ /* now tell ctdbd to update this record on all other nodes */
+ if (NT_STATUS_IS_OK(status)) {
+ status = ctdbd_persistent_store(
+ messaging_ctdbd_connection(),
+ crec->ctdb_ctx->db_id,
+ rec->key,
+ cdata);
+ } else {
+ ctdbd_cancel_persistent_update(
+ messaging_ctdbd_connection(),
+ crec->ctdb_ctx->db_id,
+ rec->key,
+ cdata);
+ }
+
+ SAFE_FREE(cdata.dptr);
+ } /* retry-loop */
+
+ if (!NT_STATUS_IS_OK(status)) {
+ DEBUG(5, ("ctdbd_persistent_store still failed after "
+ "%d retries with error %s - giving up.\n",
+ count, nt_errstr(status)));
+ }
+
+ SAFE_FREE(cdata.dptr);
+
+ return status;
+}
+
static NTSTATUS db_ctdb_delete(struct db_record *rec)
{
- struct db_ctdb_rec *crec = talloc_get_type_abort(
- rec->private_data, struct db_ctdb_rec);
TDB_DATA data;
- int ret;
/*
* We have to store the header with empty data. TODO: Fix the
* tdb-level cleanup
*/
- data.dptr = (uint8 *)&crec->header;
- data.dsize = sizeof(crec->header);
+ ZERO_STRUCT(data);
- ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
+ return db_ctdb_store(rec, data, 0);
+
+}
+
+static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
+{
+ TDB_DATA data;
+
+ /*
+ * We have to store the header with empty data. TODO: Fix the
+ * tdb-level cleanup
+ */
+
+ ZERO_STRUCT(data);
+
+ return db_ctdb_store_persistent(rec, data, 0);
- return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
}
static int db_ctdb_record_destr(struct db_record* data)
struct db_ctdb_rec *crec = talloc_get_type_abort(
data->private_data, struct db_ctdb_rec);
- DEBUG(10, ("Unlocking key %s\n",
+ DEBUG(10, (DEBUGLEVEL > 10
+ ? "Unlocking db %u key %s\n"
+ : "Unlocking db %u key %.20s\n",
+ (int)crec->ctdb_ctx->db_id,
hex_encode(data, (unsigned char *)data->key.dptr,
data->key.dsize)));
return 0;
}
-static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
- TALLOC_CTX *mem_ctx,
- TDB_DATA key)
+static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key,
+ bool persistent)
{
- struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
- struct db_ctdb_ctx);
struct db_record *result;
struct db_ctdb_rec *crec;
NTSTATUS status;
TDB_DATA ctdb_data;
+ int migrate_attempts = 0;
if (!(result = talloc(mem_ctx, struct db_record))) {
DEBUG(0, ("talloc failed\n"));
*/
again:
- DEBUG(10, ("Locking key %s\n",
- hex_encode(result, (unsigned char *)key.dptr,
- key.dsize)));
+ if (DEBUGLEVEL >= 10) {
+ char *keystr = hex_encode(result, key.dptr, key.dsize);
+ DEBUG(10, (DEBUGLEVEL > 10
+ ? "Locking db %u key %s\n"
+ : "Locking db %u key %.20s\n",
+ (int)crec->ctdb_ctx->db_id, keystr));
+ TALLOC_FREE(keystr);
+ }
if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
DEBUG(3, ("tdb_chainlock failed\n"));
return NULL;
}
- result->store = db_ctdb_store;
- result->delete_rec = db_ctdb_delete;
+ if (persistent) {
+ result->store = db_ctdb_store_persistent;
+ result->delete_rec = db_ctdb_delete_persistent;
+ } else {
+ result->store = db_ctdb_store;
+ result->delete_rec = db_ctdb_delete;
+ }
talloc_set_destructor(result, db_ctdb_record_destr);
ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
tdb_chainunlock(ctx->wtdb->tdb, key);
talloc_set_destructor(result, NULL);
+ migrate_attempts += 1;
+
DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
ctdb_data.dptr, ctdb_data.dptr ?
((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
get_my_vnn()));
- status = ctdbd_migrate(db_ctdbd_conn(ctx), ctx->db_id, key);
+ status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
if (!NT_STATUS_IS_OK(status)) {
DEBUG(5, ("ctdb_migrate failed: %s\n",
nt_errstr(status)));
goto again;
}
+ if (migrate_attempts > 10) {
+ DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
+ migrate_attempts));
+ }
+
memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
return result;
}
+static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key)
+{
+ struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
+ struct db_ctdb_ctx);
+
+ return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
+}
+
/*
fetch (unlocked, no migration) operation on ctdb
*/
/*
* See if we have a valid record and we are the dmaster. If so, we can
* take the shortcut and just return it.
+ * we bypass the dmaster check for persistent databases
*/
if ((ctdb_data.dptr != NULL) &&
(ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
- ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn()) {
+ (db->persistent ||
+ ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
/* we are the dmaster - avoid the ctdb protocol op */
data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
SAFE_FREE(ctdb_data.dptr);
/* we weren't able to get it locally - ask ctdb to fetch it for us */
- status = ctdbd_fetch(db_ctdbd_conn(ctx), ctx->db_id, key, mem_ctx,
- data);
+ status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
if (!NT_STATUS_IS_OK(status)) {
DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
return -1;
talloc_free(tmp_ctx);
}
+static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
+ void *private_data)
+{
+ struct traverse_state *state = (struct traverse_state *)private_data;
+ struct db_record *rec;
+ TALLOC_CTX *tmp_ctx = talloc_new(state->db);
+ int ret = 0;
+ /* we have to give them a locked record to prevent races */
+ rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
+ if (rec && rec->value.dsize > 0) {
+ ret = state->fn(rec, state->private_data);
+ }
+ talloc_free(tmp_ctx);
+ return ret;
+}
+
static int db_ctdb_traverse(struct db_context *db,
int (*fn)(struct db_record *rec,
void *private_data),
state.fn = fn;
state.private_data = private_data;
+ if (db->persistent) {
+ /* for persistent databases we don't need to do a ctdb traverse,
+ we can do a faster local traverse */
+ return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
+ }
+
+
ctdbd_traverse(ctx->db_id, traverse_callback, &state);
return 0;
}
state->fn(&rec, state->private_data);
}
+static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
+ void *private_data)
+{
+ struct traverse_state *state = (struct traverse_state *)private_data;
+ struct db_record rec;
+ rec.key = kbuf;
+ rec.value = dbuf;
+ rec.store = db_ctdb_store_deny;
+ rec.delete_rec = db_ctdb_delete_deny;
+ rec.private_data = state->db;
+
+ if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
+ /* a deleted record */
+ return 0;
+ }
+ rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
+ rec.value.dptr += sizeof(struct ctdb_ltdb_header);
+
+ return state->fn(&rec, state->private_data);
+}
+
static int db_ctdb_traverse_read(struct db_context *db,
int (*fn)(struct db_record *rec,
void *private_data),
state.fn = fn;
state.private_data = private_data;
+ if (db->persistent) {
+ /* for persistent databases we don't need to do a ctdb traverse,
+ we can do a faster local traverse */
+ return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
+ }
+
ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
return 0;
}
return tdb_get_seqnum(ctx->wtdb->tdb);
}
-/*
- * Get the ctdbd connection for a database. If possible, re-use the messaging
- * ctdbd connection
- */
-static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx)
+static int db_ctdb_trans_dummy(struct db_context *db)
{
- struct ctdbd_connection *result;
-
- result = messaging_ctdbd_connection();
-
- if (result != NULL) {
-
- if (ctx->conn == NULL) {
- /*
- * Someone has initialized messaging since we
- * initialized our own connection, we don't need it
- * anymore.
- */
- TALLOC_FREE(ctx->conn);
- }
-
- return result;
- }
-
- if (ctx->conn == NULL) {
- NTSTATUS status;
- status = ctdbd_init_connection(ctx, &ctx->conn);
- if (!NT_STATUS_IS_OK(status)) {
- return NULL;
- }
- set_my_vnn(ctdbd_vnn(ctx->conn));
- }
-
- return ctx->conn;
+ /*
+ * Not implemented yet, just return ok
+ */
+ return 0;
}
struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
struct db_context *result;
struct db_ctdb_ctx *db_ctdb;
char *db_path;
- NTSTATUS status;
if (!lp_clustering()) {
DEBUG(10, ("Clustering disabled -- no ctdb\n"));
return NULL;
}
- db_ctdb->conn = NULL;
-
- status = ctdbd_db_attach(db_ctdbd_conn(db_ctdb), name,
- &db_ctdb->db_id, tdb_flags);
-
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name,
- nt_errstr(status)));
+ if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
+ DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
TALLOC_FREE(result);
return NULL;
}
- db_path = ctdbd_dbpath(db_ctdbd_conn(db_ctdb), db_ctdb,
- db_ctdb->db_id);
+ db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
+
+ result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
/* only pass through specific flags */
tdb_flags &= TDB_SEQNUM;
+ /* honor permissions if user has specified O_CREAT */
+ if (open_flags & O_CREAT) {
+ chmod(db_path, mode);
+ }
+
db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
if (db_ctdb->wtdb == NULL) {
DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
result->traverse = db_ctdb_traverse;
result->traverse_read = db_ctdb_traverse_read;
result->get_seqnum = db_ctdb_get_seqnum;
+ result->transaction_start = db_ctdb_trans_dummy;
+ result->transaction_commit = db_ctdb_trans_dummy;
+ result->transaction_cancel = db_ctdb_trans_dummy;
DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
name, db_ctdb->db_id));
return result;
}
-
-#else
-
-struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
- const char *name,
- int hash_size, int tdb_flags,
- int open_flags, mode_t mode)
-{
- DEBUG(0, ("no clustering compiled in\n"));
- return NULL;
-}
-
#endif