dbwrap ctdb: add db_ctdb_delete_persistent() and use it for persistent DBs
[gd/samba/.git] / source3 / lib / dbwrap_ctdb.c
index 73d8eeffea10cd256c5b42f588ee12aeb7804bea..1cccecbad93efd6035dc2fa5f44291aa4a1c1292 100644 (file)
 */
 
 #include "includes.h"
-
 #ifdef CLUSTER_SUPPORT
-
 #include "ctdb.h"
 #include "ctdb_private.h"
+#include "ctdbd_conn.h"
 
 struct db_ctdb_ctx {
        struct tdb_wrap *wtdb;
        uint32 db_id;
-       struct ctdbd_connection *conn;
 };
 
 struct db_ctdb_rec {
@@ -35,7 +33,10 @@ struct db_ctdb_rec {
        struct ctdb_ltdb_header header;
 };
 
-static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx);
+static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
+                                              TALLOC_CTX *mem_ctx,
+                                              TDB_DATA key,
+                                              bool persistent);
 
 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 {
@@ -60,24 +61,157 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
        return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
 }
 
+
+/* for persistent databases the store is a bit different. We have to
+   ask the ctdb daemon to push the record to all nodes after the
+   store */
+static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
+{
+       struct db_ctdb_rec *crec;
+       struct db_record *record;
+       TDB_DATA cdata;
+       int ret;
+       NTSTATUS status;
+       uint32_t count;
+       int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
+
+       for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
+            (count < max_retries) && !NT_STATUS_IS_OK(status);
+            count++)
+       {
+               if (count > 0) {
+                       /* retry */
+                       /*
+                        * There is a hack here: We use rec as a memory
+                        * context and re-use it as the record struct ptr.
+                        * We don't free the record data allocated
+                        * in each turn. So all gets freed when the caller
+                        * releases the original record. This is because
+                        * we don't get the record passed in by reference
+                        * in the first place and the caller relies on
+                        * having to free the record himself.
+                        */
+                       record = fetch_locked_internal(crec->ctdb_ctx,
+                                                      rec,
+                                                      rec->key,
+                                                      true /* persistent */);
+                       if (record == NULL) {
+                               DEBUG(5, ("fetch_locked_internal failed.\n"));
+                               status = NT_STATUS_NO_MEMORY;
+                               break;
+                       }
+               }
+
+               crec = talloc_get_type_abort(record->private_data,
+                                            struct db_ctdb_rec);
+
+               cdata.dsize = sizeof(crec->header) + data.dsize;
+
+               if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
+                       return NT_STATUS_NO_MEMORY;
+               }
+
+               crec->header.rsn++;
+
+               memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
+               memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
+
+               status = ctdbd_start_persistent_update(
+                               messaging_ctdbd_connection(),
+                               crec->ctdb_ctx->db_id,
+                               rec->key,
+                               cdata);
+
+               if (NT_STATUS_IS_OK(status)) {
+                       ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
+                                       cdata, TDB_REPLACE);
+                       status = (ret == 0) ? NT_STATUS_OK
+                                           : NT_STATUS_INTERNAL_DB_CORRUPTION;
+               }
+
+               /*
+                * release the lock *now* in order to prevent deadlocks.
+                *
+                * There is a tradeoff: Usually, the record is still locked
+                * after db->store operation. This lock is usually released
+                * via the talloc destructor with the TALLOC_FREE to
+                * the record. So we have two choices:
+                *
+                * - Either re-lock the record after the call to persistent_store
+                *   or cancel_persistent update and this way not changing any
+                *   assumptions callers may have about the state, but possibly
+                *   introducing new race conditions.
+                *
+                * - Or don't lock the record again but just remove the
+                *   talloc_destructor. This is less racy but assumes that
+                *   the lock is always released via TALLOC_FREE of the record.
+                *
+                * I choose the first variant for now since it seems less racy.
+                * We can't guarantee that we succeed in getting the lock
+                * anyways. The only real danger here is that a caller
+                * performs multiple store operations after a fetch_locked()
+                * which is currently not the case.
+                */
+               tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
+               talloc_set_destructor(record, NULL);
+
+               /* now tell ctdbd to update this record on all other nodes */
+               if (NT_STATUS_IS_OK(status)) {
+                       status = ctdbd_persistent_store(
+                                       messaging_ctdbd_connection(),
+                                       crec->ctdb_ctx->db_id,
+                                       rec->key,
+                                       cdata);
+               } else {
+                       ctdbd_cancel_persistent_update(
+                                       messaging_ctdbd_connection(),
+                                       crec->ctdb_ctx->db_id,
+                                       rec->key,
+                                       cdata);
+               }
+
+               SAFE_FREE(cdata.dptr);
+       } /* retry-loop */
+
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(5, ("ctdbd_persistent_store still failed after "
+                         "%d retries with error %s - giving up.\n",
+                         count, nt_errstr(status)));
+       }
+
+       SAFE_FREE(cdata.dptr);
+
+       return status;
+}
+
 static NTSTATUS db_ctdb_delete(struct db_record *rec)
 {
-       struct db_ctdb_rec *crec = talloc_get_type_abort(
-               rec->private_data, struct db_ctdb_rec);
        TDB_DATA data;
-       int ret;
 
        /*
         * We have to store the header with empty data. TODO: Fix the
         * tdb-level cleanup
         */
 
-       data.dptr = (uint8 *)&crec->header;
-       data.dsize = sizeof(crec->header);
+       ZERO_STRUCT(data);
 
-       ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
+       return db_ctdb_store(rec, data, 0);
+
+}
+
+static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
+{
+       TDB_DATA data;
+
+       /*
+        * We have to store the header with empty data. TODO: Fix the
+        * tdb-level cleanup
+        */
+
+       ZERO_STRUCT(data);
+
+       return db_ctdb_store_persistent(rec, data, 0);
 
-       return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
 }
 
 static int db_ctdb_record_destr(struct db_record* data)
@@ -85,7 +219,10 @@ static int db_ctdb_record_destr(struct db_record* data)
        struct db_ctdb_rec *crec = talloc_get_type_abort(
                data->private_data, struct db_ctdb_rec);
 
-       DEBUG(10, ("Unlocking key %s\n",
+       DEBUG(10, (DEBUGLEVEL > 10
+                  ? "Unlocking db %u key %s\n"
+                  : "Unlocking db %u key %.20s\n",
+                  (int)crec->ctdb_ctx->db_id,
                   hex_encode(data, (unsigned char *)data->key.dptr,
                              data->key.dsize)));
 
@@ -97,16 +234,16 @@ static int db_ctdb_record_destr(struct db_record* data)
        return 0;
 }
 
-static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
-                                             TALLOC_CTX *mem_ctx,
-                                             TDB_DATA key)
+static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
+                                              TALLOC_CTX *mem_ctx,
+                                              TDB_DATA key,
+                                              bool persistent)
 {
-       struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
-                                                       struct db_ctdb_ctx);
        struct db_record *result;
        struct db_ctdb_rec *crec;
        NTSTATUS status;
        TDB_DATA ctdb_data;
+       int migrate_attempts = 0;
 
        if (!(result = talloc(mem_ctx, struct db_record))) {
                DEBUG(0, ("talloc failed\n"));
@@ -135,9 +272,14 @@ static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
         */
 again:
 
-       DEBUG(10, ("Locking key %s\n",
-                  hex_encode(result, (unsigned char *)key.dptr,
-                             key.dsize)));
+       if (DEBUGLEVEL >= 10) {
+               char *keystr = hex_encode(result, key.dptr, key.dsize);
+               DEBUG(10, (DEBUGLEVEL > 10
+                          ? "Locking db %u key %s\n"
+                          : "Locking db %u key %.20s\n",
+                          (int)crec->ctdb_ctx->db_id, keystr));
+               TALLOC_FREE(keystr);
+       }
        
        if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
                DEBUG(3, ("tdb_chainlock failed\n"));
@@ -145,8 +287,13 @@ again:
                return NULL;
        }
 
-       result->store = db_ctdb_store;
-       result->delete_rec = db_ctdb_delete;
+       if (persistent) {
+               result->store = db_ctdb_store_persistent;
+               result->delete_rec = db_ctdb_delete_persistent;
+       } else {
+               result->store = db_ctdb_store;
+               result->delete_rec = db_ctdb_delete;
+       }
        talloc_set_destructor(result, db_ctdb_record_destr);
 
        ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
@@ -167,12 +314,14 @@ again:
                tdb_chainunlock(ctx->wtdb->tdb, key);
                talloc_set_destructor(result, NULL);
 
+               migrate_attempts += 1;
+
                DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
                           ctdb_data.dptr, ctdb_data.dptr ?
                           ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
                           get_my_vnn()));
 
-               status = ctdbd_migrate(db_ctdbd_conn(ctx), ctx->db_id, key);
+               status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(5, ("ctdb_migrate failed: %s\n",
                                  nt_errstr(status)));
@@ -183,6 +332,11 @@ again:
                goto again;
        }
 
+       if (migrate_attempts > 10) {
+               DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
+                         migrate_attempts));
+       }
+
        memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
 
        result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
@@ -201,6 +355,16 @@ again:
        return result;
 }
 
+static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
+                                             TALLOC_CTX *mem_ctx,
+                                             TDB_DATA key)
+{
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_ctdb_ctx);
+
+       return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
+}
+
 /*
   fetch (unlocked, no migration) operation on ctdb
  */
@@ -218,10 +382,12 @@ static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
        /*
         * See if we have a valid record and we are the dmaster. If so, we can
         * take the shortcut and just return it.
+        * we bypass the dmaster check for persistent databases
         */
        if ((ctdb_data.dptr != NULL) &&
            (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
-           ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn()) {
+           (db->persistent ||
+            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
                /* we are the dmaster - avoid the ctdb protocol op */
 
                data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
@@ -246,8 +412,7 @@ static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
        SAFE_FREE(ctdb_data.dptr);
 
        /* we weren't able to get it locally - ask ctdb to fetch it for us */
-       status = ctdbd_fetch(db_ctdbd_conn(ctx), ctx->db_id, key, mem_ctx,
-                            data);
+       status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
                return -1;
@@ -275,6 +440,22 @@ static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
        talloc_free(tmp_ctx);
 }
 
+static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
+                                       void *private_data)
+{
+       struct traverse_state *state = (struct traverse_state *)private_data;
+       struct db_record *rec;
+       TALLOC_CTX *tmp_ctx = talloc_new(state->db);
+       int ret = 0;
+       /* we have to give them a locked record to prevent races */
+       rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
+       if (rec && rec->value.dsize > 0) {
+               ret = state->fn(rec, state->private_data);
+       }
+       talloc_free(tmp_ctx);
+       return ret;
+}
+
 static int db_ctdb_traverse(struct db_context *db,
                            int (*fn)(struct db_record *rec,
                                      void *private_data),
@@ -288,6 +469,13 @@ static int db_ctdb_traverse(struct db_context *db,
        state.fn = fn;
        state.private_data = private_data;
 
+       if (db->persistent) {
+               /* for persistent databases we don't need to do a ctdb traverse,
+                  we can do a faster local traverse */
+               return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
+       }
+
+
        ctdbd_traverse(ctx->db_id, traverse_callback, &state);
        return 0;
 }
@@ -314,6 +502,27 @@ static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_da
        state->fn(&rec, state->private_data);
 }
 
+static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
+                                       void *private_data)
+{
+       struct traverse_state *state = (struct traverse_state *)private_data;
+       struct db_record rec;
+       rec.key = kbuf;
+       rec.value = dbuf;
+       rec.store = db_ctdb_store_deny;
+       rec.delete_rec = db_ctdb_delete_deny;
+       rec.private_data = state->db;
+
+       if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
+               /* a deleted record */
+               return 0;
+       }
+       rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
+       rec.value.dptr += sizeof(struct ctdb_ltdb_header);
+
+       return state->fn(&rec, state->private_data);
+}
+
 static int db_ctdb_traverse_read(struct db_context *db,
                                 int (*fn)(struct db_record *rec,
                                           void *private_data),
@@ -327,6 +536,12 @@ static int db_ctdb_traverse_read(struct db_context *db,
        state.fn = fn;
        state.private_data = private_data;
 
+       if (db->persistent) {
+               /* for persistent databases we don't need to do a ctdb traverse,
+                  we can do a faster local traverse */
+               return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
+       }
+
        ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
        return 0;
 }
@@ -338,40 +553,12 @@ static int db_ctdb_get_seqnum(struct db_context *db)
        return tdb_get_seqnum(ctx->wtdb->tdb);
 }
 
-/*
- * Get the ctdbd connection for a database. If possible, re-use the messaging
- * ctdbd connection
- */
-static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx)
+static int db_ctdb_trans_dummy(struct db_context *db)
 {
-       struct ctdbd_connection *result;
-
-       result = messaging_ctdbd_connection();
-
-       if (result != NULL) {
-
-               if (ctx->conn == NULL) {
-                       /*
-                        * Someone has initialized messaging since we
-                        * initialized our own connection, we don't need it
-                        * anymore.
-                        */
-                       TALLOC_FREE(ctx->conn);
-               }
-
-               return result;
-       }
-
-       if (ctx->conn == NULL) {
-               NTSTATUS status;
-               status = ctdbd_init_connection(ctx, &ctx->conn);
-               if (!NT_STATUS_IS_OK(status)) {
-                       return NULL;
-               }
-               set_my_vnn(ctdbd_vnn(ctx->conn));
-       }
-
-       return ctx->conn;
+       /*
+        * Not implemented yet, just return ok
+        */
+       return 0;
 }
 
 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
@@ -382,7 +569,6 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        struct db_context *result;
        struct db_ctdb_ctx *db_ctdb;
        char *db_path;
-       NTSTATUS status;
 
        if (!lp_clustering()) {
                DEBUG(10, ("Clustering disabled -- no ctdb\n"));
@@ -401,24 +587,24 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                return NULL;
        }
 
-       db_ctdb->conn = NULL;
-
-       status = ctdbd_db_attach(db_ctdbd_conn(db_ctdb), name,
-                                &db_ctdb->db_id, tdb_flags);
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name,
-                         nt_errstr(status)));
+       if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
+               DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
                TALLOC_FREE(result);
                return NULL;
        }
 
-       db_path = ctdbd_dbpath(db_ctdbd_conn(db_ctdb), db_ctdb,
-                              db_ctdb->db_id);
+       db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
+
+       result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
 
        /* only pass through specific flags */
        tdb_flags &= TDB_SEQNUM;
 
+       /* honor permissions if user has specified O_CREAT */
+       if (open_flags & O_CREAT) {
+               chmod(db_path, mode);
+       }
+
        db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
        if (db_ctdb->wtdb == NULL) {
                DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
@@ -433,22 +619,13 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        result->traverse = db_ctdb_traverse;
        result->traverse_read = db_ctdb_traverse_read;
        result->get_seqnum = db_ctdb_get_seqnum;
+       result->transaction_start = db_ctdb_trans_dummy;
+       result->transaction_commit = db_ctdb_trans_dummy;
+       result->transaction_cancel = db_ctdb_trans_dummy;
 
        DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
                 name, db_ctdb->db_id));
 
        return result;
 }
-
-#else
-
-struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
-                               const char *name,
-                               int hash_size, int tdb_flags,
-                               int open_flags, mode_t mode)
-{
-       DEBUG(0, ("no clustering compiled in\n"));
-       return NULL;
-}
-
 #endif