#include "system/filesys.h"
#include "lib/util/tdb_wrap.h"
#include "util_tdb.h"
+#include "dbwrap/dbwrap_rbt.h"
+
#ifdef CLUSTER_SUPPORT
+
+/*
+ * It is not possible to include ctdb.h and tdb_compat.h (included via
+ * some other include above) without warnings. This fixes those
+ * warnings.
+ */
+
+#ifdef typesafe_cb
+#undef typesafe_cb
+#endif
+
+#ifdef typesafe_cb_preargs
+#undef typesafe_cb_preargs
+#endif
+
+#ifdef typesafe_cb_postargs
+#undef typesafe_cb_postargs
+#endif
+
#include "ctdb.h"
#include "ctdb_private.h"
#include "ctdbd_conn.h"
#include "dbwrap/dbwrap.h"
+#include "dbwrap/dbwrap_private.h"
+#include "dbwrap/dbwrap_ctdb.h"
#include "g_lock.h"
#include "messages.h"
struct db_ctdb_transaction_handle {
struct db_ctdb_ctx *ctx;
/*
- * we store the reads and writes done under a transaction:
- * - one list stores both reads and writes (m_all),
- * - the other just writes (m_write)
+ * we store the writes done under a transaction:
*/
- struct ctdb_marshall_buffer *m_all;
struct ctdb_marshall_buffer *m_write;
uint32_t nesting;
bool nested_cancel;
status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
if (!NT_STATUS_IS_OK(status)) {
- DEBUG(0, ("g_lock_unlock failed: %s\n", nt_errstr(status)));
+ DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h->lock_name,
+ nt_errstr(status)));
return -1;
}
return 0;
if (ctx->transaction) {
ctx->transaction->nesting++;
+ DEBUG(5, (__location__ " transaction start on db 0x%08x: nesting %d -> %d\n",
+ ctx->db_id, ctx->transaction->nesting - 1, ctx->transaction->nesting));
return 0;
}
ctx->transaction = h;
- DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
+ DEBUG(5,(__location__ " transaction started on db 0x%08x\n", ctx->db_id));
return 0;
}
return -1;
}
- h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key,
- NULL, *data);
- if (h->m_all == NULL) {
- DEBUG(0,(__location__ " Failed to add to marshalling "
- "record\n"));
- data->dsize = 0;
- talloc_free(data->dptr);
- return -1;
- }
-
return 0;
}
header.dmaster = get_my_vnn();
header.rsn++;
- h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key,
- NULL, data);
- if (h->m_all == NULL) {
- DEBUG(0,(__location__ " Failed to add to marshalling "
- "record\n"));
- talloc_free(tmp_ctx);
- return NT_STATUS_NO_MEMORY;
- }
-
h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
if (h->m_write == NULL) {
DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
if (h->nesting != 0) {
h->nesting--;
+ DEBUG(5, (__location__ " transaction commit on db 0x%08x: nesting %d -> %d\n",
+ ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
return 0;
}
goto done;
}
- DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
+ DEBUG(5,(__location__ " transaction commit on db 0x%08x\n", ctx->db_id));
/*
* As the last db action before committing, bump the database sequence
if (h->nesting != 0) {
h->nesting--;
h->nested_cancel = true;
+ DEBUG(5, (__location__ " transaction cancel on db 0x%08x: nesting %d -> %d\n",
+ ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
return 0;
}
struct db_record *rec;
TALLOC_CTX *tmp_ctx = talloc_new(state->db);
int ret = 0;
+
+ /*
+ * Skip the __db_sequence_number__ key:
+ * This is used for persistent transactions internally.
+ */
+ if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
+ strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
+ {
+ goto done;
+ }
+
/* we have to give them a locked record to prevent races */
rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
if (rec && rec->value.dsize > 0) {
ret = state->fn(rec, state->private_data);
}
+
+done:
talloc_free(tmp_ctx);
return ret;
}
+/* wrapper to use traverse_persistent_callback with dbwrap */
+static int traverse_persistent_callback_dbwrap(struct db_record *rec, void* data)
+{
+ return traverse_persistent_callback(NULL, rec->key, rec->value, data);
+}
+
+
static int db_ctdb_traverse(struct db_context *db,
int (*fn)(struct db_record *rec,
void *private_data),
state.private_data = private_data;
if (db->persistent) {
+ struct tdb_context *ltdb = ctx->wtdb->tdb;
+ int ret;
+
/* for persistent databases we don't need to do a ctdb traverse,
we can do a faster local traverse */
- return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
+ ret = tdb_traverse(ltdb, traverse_persistent_callback, &state);
+ if (ret < 0) {
+ return ret;
+ }
+ if (ctx->transaction && ctx->transaction->m_write) {
+ /* we now have to handle keys not yet present at transaction start */
+ struct db_context *newkeys = db_open_rbt(talloc_tos());
+ struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
+ struct ctdb_rec_data *rec=NULL;
+ NTSTATUS status;
+ int i;
+ for (i=0; i<mbuf->count; i++) {
+ TDB_DATA key;
+ rec =db_ctdb_marshall_loop_next(mbuf, rec,
+ NULL, NULL,
+ &key, NULL);
+ SMB_ASSERT(rec != NULL);
+
+ if (!tdb_exists(ltdb, key)) {
+ dbwrap_store(newkeys, key, tdb_null, 0);
+ }
+ }
+ status = dbwrap_traverse(newkeys,
+ traverse_persistent_callback_dbwrap,
+ &state,
+ NULL);
+ ret = NT_STATUS_IS_OK(status) ? 0 : -1;
+ talloc_free(newkeys);
+ }
+ return ret;
}
{
struct traverse_state *state = (struct traverse_state *)private_data;
struct db_record rec;
+
+ /*
+ * Skip the __db_sequence_number__ key:
+ * This is used for persistent transactions internally.
+ */
+ if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
+ strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
+ {
+ return 0;
+ }
+
rec.key = kbuf;
rec.value = dbuf;
rec.store = db_ctdb_store_deny;