s3:dbwrap_ctdb: improve the check for skipping the __db_sequence_number__ record...
[kai/samba.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
index 454a28399f9e18f9a497b352e43a99214034a4b6..4c5cd64da042efaeeabbf96d0d8e66018ede49e9 100644 (file)
 #include "system/filesys.h"
 #include "lib/util/tdb_wrap.h"
 #include "util_tdb.h"
+#include "dbwrap/dbwrap_rbt.h"
+
 #ifdef CLUSTER_SUPPORT
+
+/*
+ * It is not possible to include ctdb.h and tdb_compat.h (included via
+ * some other include above) without warnings. This fixes those
+ * warnings.
+ */
+
+#ifdef typesafe_cb
+#undef typesafe_cb
+#endif
+
+#ifdef typesafe_cb_preargs
+#undef typesafe_cb_preargs
+#endif
+
+#ifdef typesafe_cb_postargs
+#undef typesafe_cb_postargs
+#endif
+
 #include "ctdb.h"
 #include "ctdb_private.h"
 #include "ctdbd_conn.h"
 #include "dbwrap/dbwrap.h"
+#include "dbwrap/dbwrap_private.h"
+#include "dbwrap/dbwrap_ctdb.h"
 #include "g_lock.h"
 #include "messages.h"
 
 struct db_ctdb_transaction_handle {
        struct db_ctdb_ctx *ctx;
        /*
-        * we store the reads and writes done under a transaction:
-        * - one list stores both reads and writes (m_all),
-        * - the other just writes (m_write)
+        * we store the writes done under a transaction:
         */
-       struct ctdb_marshall_buffer *m_all;
        struct ctdb_marshall_buffer *m_write;
        uint32_t nesting;
        bool nested_cancel;
@@ -310,7 +330,8 @@ static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
 
        status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
        if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0, ("g_lock_unlock failed: %s\n", nt_errstr(status)));
+               DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h->lock_name,
+                         nt_errstr(status)));
                return -1;
        }
        return 0;
@@ -335,6 +356,8 @@ static int db_ctdb_transaction_start(struct db_context *db)
 
        if (ctx->transaction) {
                ctx->transaction->nesting++;
+               DEBUG(5, (__location__ " transaction start on db 0x%08x: nesting %d -> %d\n",
+                         ctx->db_id, ctx->transaction->nesting - 1, ctx->transaction->nesting));
                return 0;
        }
 
@@ -369,7 +392,7 @@ static int db_ctdb_transaction_start(struct db_context *db)
 
        ctx->transaction = h;
 
-       DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
+       DEBUG(5,(__location__ " transaction started on db 0x%08x\n", ctx->db_id));
 
        return 0;
 }
@@ -466,16 +489,6 @@ static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
                return -1;
        }
 
-       h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key,
-                                       NULL, *data);
-       if (h->m_all == NULL) {
-               DEBUG(0,(__location__ " Failed to add to marshalling "
-                        "record\n"));
-               data->dsize = 0;
-               talloc_free(data->dptr);
-               return -1;
-       }
-
        return 0;
 }
 
@@ -652,15 +665,6 @@ static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
        header.dmaster = get_my_vnn();
        header.rsn++;
 
-       h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key,
-                                       NULL, data);
-       if (h->m_all == NULL) {
-               DEBUG(0,(__location__ " Failed to add to marshalling "
-                        "record\n"));
-               talloc_free(tmp_ctx);
-               return NT_STATUS_NO_MEMORY;
-       }
-
        h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
        if (h->m_write == NULL) {
                DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
@@ -786,6 +790,8 @@ static int db_ctdb_transaction_commit(struct db_context *db)
 
        if (h->nesting != 0) {
                h->nesting--;
+               DEBUG(5, (__location__ " transaction commit on db 0x%08x: nesting %d -> %d\n",
+                         ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
                return 0;
        }
 
@@ -798,7 +804,7 @@ static int db_ctdb_transaction_commit(struct db_context *db)
                goto done;
        }
 
-       DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
+       DEBUG(5,(__location__ " transaction commit on db 0x%08x\n", ctx->db_id));
 
        /*
         * As the last db action before committing, bump the database sequence
@@ -891,6 +897,8 @@ static int db_ctdb_transaction_cancel(struct db_context *db)
        if (h->nesting != 0) {
                h->nesting--;
                h->nested_cancel = true;
+               DEBUG(5, (__location__ " transaction cancel on db 0x%08x: nesting %d -> %d\n",
+                         ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
                return 0;
        }
 
@@ -1235,15 +1243,35 @@ static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DAT
        struct db_record *rec;
        TALLOC_CTX *tmp_ctx = talloc_new(state->db);
        int ret = 0;
+
+       /*
+        * Skip the __db_sequence_number__ key:
+        * This is used for persistent transactions internally.
+        */
+       if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
+           strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
+       {
+               goto done;
+       }
+
        /* we have to give them a locked record to prevent races */
        rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
        if (rec && rec->value.dsize > 0) {
                ret = state->fn(rec, state->private_data);
        }
+
+done:
        talloc_free(tmp_ctx);
        return ret;
 }
 
+/* wrapper to use traverse_persistent_callback with dbwrap */
+static int traverse_persistent_callback_dbwrap(struct db_record *rec, void* data)
+{
+       return traverse_persistent_callback(NULL, rec->key, rec->value, data);
+}
+
+
 static int db_ctdb_traverse(struct db_context *db,
                            int (*fn)(struct db_record *rec,
                                      void *private_data),
@@ -1258,9 +1286,41 @@ static int db_ctdb_traverse(struct db_context *db,
        state.private_data = private_data;
 
        if (db->persistent) {
+               struct tdb_context *ltdb = ctx->wtdb->tdb;
+               int ret;
+
                /* for persistent databases we don't need to do a ctdb traverse,
                   we can do a faster local traverse */
-               return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
+               ret = tdb_traverse(ltdb, traverse_persistent_callback, &state);
+               if (ret < 0) {
+                       return ret;
+               }
+               if (ctx->transaction && ctx->transaction->m_write) {
+                       /* we now have to handle keys not yet present at transaction start */
+                       struct db_context *newkeys = db_open_rbt(talloc_tos());
+                       struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
+                       struct ctdb_rec_data *rec=NULL;
+                       NTSTATUS status;
+                       int i;
+                       for (i=0; i<mbuf->count; i++) {
+                               TDB_DATA key;
+                               rec =db_ctdb_marshall_loop_next(mbuf, rec,
+                                                               NULL, NULL,
+                                                               &key, NULL);
+                               SMB_ASSERT(rec != NULL);
+
+                               if (!tdb_exists(ltdb, key)) {
+                                       dbwrap_store(newkeys, key, tdb_null, 0);
+                               }
+                       }
+                       status = dbwrap_traverse(newkeys,
+                                                traverse_persistent_callback_dbwrap,
+                                                &state,
+                                                NULL);
+                       ret = NT_STATUS_IS_OK(status) ? 0 : -1;
+                       talloc_free(newkeys);
+               }
+               return ret;
        }
 
 
@@ -1295,6 +1355,17 @@ static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TD
 {
        struct traverse_state *state = (struct traverse_state *)private_data;
        struct db_record rec;
+
+       /*
+        * Skip the __db_sequence_number__ key:
+        * This is used for persistent transactions internally.
+        */
+       if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
+           strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
+       {
+               return 0;
+       }
+
        rec.key = kbuf;
        rec.value = dbuf;
        rec.store = db_ctdb_store_deny;