dbwrap_ctdb: Pass on mutex flags to tdb_open
[obnox/samba/samba-obnox.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
index 95c23eeb42562082aa032eed15daacd828f36718..e6dcc0e987c35908e854e97c7a35500f5752a9aa 100644 (file)
@@ -1,4 +1,4 @@
-/* 
+/*
    Unix SMB/CIFS implementation.
    Database interface wrapper around ctdbd
    Copyright (C) Volker Lendecke 2007-2009
 
 #include "includes.h"
 #include "system/filesys.h"
-#include "lib/util/tdb_wrap.h"
+#include "lib/tdb_wrap/tdb_wrap.h"
 #include "util_tdb.h"
-#ifdef CLUSTER_SUPPORT
+#include "dbwrap/dbwrap.h"
+#include "dbwrap/dbwrap_ctdb.h"
+#include "dbwrap/dbwrap_rbt.h"
+#include "lib/param/param.h"
 
 /*
  * It is not possible to include ctdb.h and tdb_compat.h (included via
 struct db_ctdb_transaction_handle {
        struct db_ctdb_ctx *ctx;
        /*
-        * we store the reads and writes done under a transaction:
-        * - one list stores both reads and writes (m_all),
-        * - the other just writes (m_write)
+        * we store the writes done under a transaction:
         */
-       struct ctdb_marshall_buffer *m_all;
        struct ctdb_marshall_buffer *m_write;
        uint32_t nesting;
        bool nested_cancel;
@@ -68,9 +68,15 @@ struct db_ctdb_transaction_handle {
 struct db_ctdb_ctx {
        struct db_context *db;
        struct tdb_wrap *wtdb;
-       uint32 db_id;
+       uint32_t db_id;
        struct db_ctdb_transaction_handle *transaction;
        struct g_lock_ctx *lock_ctx;
+
+       /* thresholds for warning messages */
+       int warn_unlock_msecs;
+       int warn_migrate_msecs;
+       int warn_migrate_attempts;
+       int warn_locktime_msecs;
 };
 
 struct db_ctdb_rec {
@@ -81,76 +87,53 @@ struct db_ctdb_rec {
 
 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
 {
-       NTSTATUS status;
        enum TDB_ERROR tret = tdb_error(tdb);
 
-       switch (tret) {
-       case TDB_ERR_EXISTS:
-               status = NT_STATUS_OBJECT_NAME_COLLISION;
-               break;
-       case TDB_ERR_NOEXIST:
-               status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
-               break;
-       default:
-               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
-               break;
-       }
-
-       return status;
+       return map_nt_error_from_tdb(tret);
 }
 
+struct db_ctdb_ltdb_parse_state {
+       void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
+                      TDB_DATA data, void *private_data);
+       void *private_data;
+};
 
-/**
- * fetch a record from the tdb, separating out the header
- * information and returning the body of the record.
- */
-static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
-                                  TDB_DATA key,
-                                  struct ctdb_ltdb_header *header,
-                                  TALLOC_CTX *mem_ctx,
-                                  TDB_DATA *data)
+static int db_ctdb_ltdb_parser(TDB_DATA key, TDB_DATA data,
+                              void *private_data)
 {
-       TDB_DATA rec;
-       NTSTATUS status;
+       struct db_ctdb_ltdb_parse_state *state =
+               (struct db_ctdb_ltdb_parse_state *)private_data;
 
-       rec = tdb_fetch_compat(db->wtdb->tdb, key);
-       if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
-               status = NT_STATUS_NOT_FOUND;
-               if (data) {
-                       ZERO_STRUCTP(data);
-               }
-               if (header) {
-                       header->dmaster = (uint32_t)-1;
-                       header->rsn = 0;
-               }
-               goto done;
+       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return -1;
        }
 
-       if (header) {
-               *header = *(struct ctdb_ltdb_header *)rec.dptr;
-       }
+       state->parser(
+               key, (struct ctdb_ltdb_header *)data.dptr,
+               make_tdb_data(data.dptr + sizeof(struct ctdb_ltdb_header),
+                             data.dsize - sizeof(struct ctdb_ltdb_header)),
+               state->private_data);
+       return 0;
+}
 
-       if (data) {
-               data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
-               if (data->dsize == 0) {
-                       data->dptr = NULL;
-               } else {
-                       data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
-                                       rec.dptr
-                                        + sizeof(struct ctdb_ltdb_header),
-                                       data->dsize);
-                       if (data->dptr == NULL) {
-                               status = NT_STATUS_NO_MEMORY;
-                               goto done;
-                       }
-               }
-       }
+static NTSTATUS db_ctdb_ltdb_parse(
+       struct db_ctdb_ctx *db, TDB_DATA key,
+       void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
+                      TDB_DATA data, void *private_data),
+       void *private_data)
+{
+       struct db_ctdb_ltdb_parse_state state;
+       int ret;
 
-       status = NT_STATUS_OK;
+       state.parser = parser;
+       state.private_data = private_data;
 
-done:
-       SAFE_FREE(rec.dptr);
-       return status;
+       ret = tdb_parse_record(db->wtdb->tdb, key, db_ctdb_ltdb_parser,
+                              &state);
+       if (ret == -1) {
+               return NT_STATUS_NOT_FOUND;
+       }
+       return NT_STATUS_OK;
 }
 
 /*
@@ -162,15 +145,13 @@ static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
                                   struct ctdb_ltdb_header *header,
                                   TDB_DATA data)
 {
-       TALLOC_CTX *tmp_ctx = talloc_stackframe();
        TDB_DATA rec;
        int ret;
 
        rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
-       rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
+       rec.dptr = (uint8_t *)talloc_size(talloc_tos(), rec.dsize);
 
        if (rec.dptr == NULL) {
-               talloc_free(tmp_ctx);
                return NT_STATUS_NO_MEMORY;
        }
 
@@ -179,7 +160,7 @@ static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
 
        ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
 
-       talloc_free(tmp_ctx);
+       talloc_free(rec.dptr);
 
        return (ret == 0) ? NT_STATUS_OK
                          : tdb_error_to_ntstatus(db->wtdb->tdb);
@@ -188,20 +169,17 @@ static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
 
 /*
   form a ctdb_rec_data record from a key/data pair
-
-  note that header may be NULL. If not NULL then it is included in the data portion
-  of the record
  */
-static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,      
-                                                 TDB_DATA key, 
+static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
+                                                 TDB_DATA key,
                                                  struct ctdb_ltdb_header *header,
                                                  TDB_DATA data)
 {
        size_t length;
        struct ctdb_rec_data *d;
 
-       length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
-               data.dsize + (header?sizeof(*header):0);
+       length = offsetof(struct ctdb_rec_data, data) + key.dsize +
+               data.dsize + sizeof(*header);
        d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
        if (d == NULL) {
                return NULL;
@@ -210,20 +188,16 @@ static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32
        d->reqid = reqid;
        d->keylen = key.dsize;
        memcpy(&d->data[0], key.dptr, key.dsize);
-       if (header) {
-               d->datalen = data.dsize + sizeof(*header);
-               memcpy(&d->data[key.dsize], header, sizeof(*header));
-               memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
-       } else {
-               d->datalen = data.dsize;
-               memcpy(&d->data[key.dsize], data.dptr, data.dsize);
-       }
+
+       d->datalen = data.dsize + sizeof(*header);
+       memcpy(&d->data[key.dsize], header, sizeof(*header));
+       memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
        return d;
 }
 
 
 /* helper function for marshalling multiple records */
-static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
+static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
                                               struct ctdb_marshall_buffer *m,
                                               uint64_t db_id,
                                               uint32_t reqid,
@@ -278,16 +252,14 @@ static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
        return data;
 }
 
-/* 
-   loop over a marshalling buffer 
+/*
+   loop over a marshalling buffer
 
      - pass r==NULL to start
      - loop the number of times indicated by m->count
 */
-static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
-                                                    uint32_t *reqid,
-                                                    struct ctdb_ltdb_header *header,
-                                                    TDB_DATA *key, TDB_DATA *data)
+static struct ctdb_rec_data *db_ctdb_marshall_loop_next_key(
+       struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r, TDB_DATA *key)
 {
        if (r == NULL) {
                r = (struct ctdb_rec_data *)&m->data[0];
@@ -295,31 +267,27 @@ static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buf
                r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
        }
 
-       if (reqid != NULL) {
-               *reqid = r->reqid;
-       }
+       key->dptr   = &r->data[0];
+       key->dsize  = r->keylen;
+       return r;
+}
 
-       if (key != NULL) {
-               key->dptr   = &r->data[0];
-               key->dsize  = r->keylen;
-       }
-       if (data != NULL) {
-               data->dptr  = &r->data[r->keylen];
-               data->dsize = r->datalen;
-               if (header != NULL) {
-                       data->dptr += sizeof(*header);
-                       data->dsize -= sizeof(*header);
-               }
+static bool db_ctdb_marshall_buf_parse(
+       struct ctdb_rec_data *r, uint32_t *reqid,
+       struct ctdb_ltdb_header **header, TDB_DATA *data)
+{
+       if (r->datalen < sizeof(struct ctdb_ltdb_header)) {
+               return false;
        }
 
-       if (header != NULL) {
-               if (r->datalen < sizeof(*header)) {
-                       return NULL;
-               }
-               *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
-       }
+       *reqid = r->reqid;
 
-       return r;
+       data->dptr  = &r->data[r->keylen] + sizeof(struct ctdb_ltdb_header);
+       data->dsize = r->datalen - sizeof(struct ctdb_ltdb_header);
+
+       *header = (struct ctdb_ltdb_header *)&r->data[r->keylen];
+
+       return true;
 }
 
 /**
@@ -364,7 +332,7 @@ static int db_ctdb_transaction_start(struct db_context *db)
 
        h = talloc_zero(db, struct db_ctdb_transaction_handle);
        if (h == NULL) {
-               DEBUG(0,(__location__ " oom for transaction handle\n"));                
+               DEBUG(0,(__location__ " oom for transaction handle\n"));
                return -1;
        }
 
@@ -398,15 +366,14 @@ static int db_ctdb_transaction_start(struct db_context *db)
        return 0;
 }
 
-static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
-                                            TDB_DATA key,
-                                            struct ctdb_ltdb_header *pheader,
-                                            TALLOC_CTX *mem_ctx,
-                                            TDB_DATA *pdata)
+static bool parse_newest_in_marshall_buffer(
+       struct ctdb_marshall_buffer *buf, TDB_DATA key,
+       void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
+                      TDB_DATA data, void *private_data),
+       void *private_data)
 {
        struct ctdb_rec_data *rec = NULL;
-       struct ctdb_ltdb_header h;
-       bool found = false;
+       struct ctdb_ltdb_header *h = NULL;
        TDB_DATA data;
        int i;
 
@@ -414,9 +381,6 @@ static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
                return false;
        }
 
-       ZERO_STRUCT(h);
-       ZERO_STRUCT(data);
-
        /*
         * Walk the list of records written during this
         * transaction. If we want to read one we have already
@@ -426,109 +390,77 @@ static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
         */
 
        for (i=0; i<buf->count; i++) {
-               TDB_DATA tkey, tdata;
+               TDB_DATA tkey;
                uint32_t reqid;
-               struct ctdb_ltdb_header hdr;
 
-               ZERO_STRUCT(hdr);
-
-               rec = db_ctdb_marshall_loop_next(buf, rec, &reqid, &hdr, &tkey,
-                                                &tdata);
+               rec = db_ctdb_marshall_loop_next_key(buf, rec, &tkey);
                if (rec == NULL) {
                        return false;
                }
 
-               if (tdb_data_equal(key, tkey)) {
-                       found = true;
-                       data = tdata;
-                       h = hdr;
+               if (!tdb_data_equal(key, tkey)) {
+                       continue;
                }
-       }
-
-       if (!found) {
-               return false;
-       }
 
-       if (pdata != NULL) {
-               data.dptr = (uint8_t *)talloc_memdup(mem_ctx, data.dptr,
-                                                    data.dsize);
-               if ((data.dsize != 0) && (data.dptr == NULL)) {
+               if (!db_ctdb_marshall_buf_parse(rec, &reqid, &h, &data)) {
                        return false;
                }
-               *pdata = data;
        }
 
-       if (pheader != NULL) {
-               *pheader = h;
+       if (h == NULL) {
+               return false;
        }
 
+       parser(key, h, data, private_data);
+
        return true;
 }
 
-/*
-  fetch a record inside a transaction
- */
-static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
-                                    TALLOC_CTX *mem_ctx, 
-                                    TDB_DATA key, TDB_DATA *data)
-{
-       struct db_ctdb_transaction_handle *h = db->transaction;
-       NTSTATUS status;
-       bool found;
-
-       found = pull_newest_from_marshall_buffer(h->m_write, key, NULL,
-                                                mem_ctx, data);
-       if (found) {
-               return 0;
-       }
+struct pull_newest_from_marshall_buffer_state {
+       struct ctdb_ltdb_header *pheader;
+       TALLOC_CTX *mem_ctx;
+       TDB_DATA *pdata;
+};
 
-       status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
+static void pull_newest_from_marshall_buffer_parser(
+       TDB_DATA key, struct ctdb_ltdb_header *header,
+       TDB_DATA data, void *private_data)
+{
+       struct pull_newest_from_marshall_buffer_state *state =
+               (struct pull_newest_from_marshall_buffer_state *)private_data;
 
-       if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
-               *data = tdb_null;
-       } else if (!NT_STATUS_IS_OK(status)) {
-               return -1;
+       if (state->pheader != NULL) {
+               memcpy(state->pheader, header, sizeof(*state->pheader));
        }
-
-       h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key,
-                                       NULL, *data);
-       if (h->m_all == NULL) {
-               DEBUG(0,(__location__ " Failed to add to marshalling "
-                        "record\n"));
-               data->dsize = 0;
-               talloc_free(data->dptr);
-               return -1;
+       if (state->pdata != NULL) {
+               state->pdata->dsize = data.dsize;
+               state->pdata->dptr = (uint8_t *)talloc_memdup(
+                       state->mem_ctx, data.dptr, data.dsize);
        }
-
-       return 0;
 }
 
-/**
- * Fetch a record from a persistent database
- * without record locking and without an active transaction.
- *
- * This just fetches from the local database copy.
- * Since the databases are kept in syc cluster-wide,
- * there is no point in doing a ctdb call to fetch the
- * record from the lmaster. It does even harm since migration
- * of records bump their RSN and hence render the persistent
- * database inconsistent.
- */
-static int db_ctdb_fetch_persistent(struct db_ctdb_ctx *db,
-                                   TALLOC_CTX *mem_ctx,
-                                   TDB_DATA key, TDB_DATA *data)
+static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
+                                            TDB_DATA key,
+                                            struct ctdb_ltdb_header *pheader,
+                                            TALLOC_CTX *mem_ctx,
+                                            TDB_DATA *pdata)
 {
-       NTSTATUS status;
+       struct pull_newest_from_marshall_buffer_state state;
 
-       status = db_ctdb_ltdb_fetch(db, key, NULL, mem_ctx, data);
+       state.pheader = pheader;
+       state.mem_ctx = mem_ctx;
+       state.pdata = pdata;
 
-       if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
-               *data = tdb_null;
-       } else if (!NT_STATUS_IS_OK(status)) {
-               return -1;
+       if (!parse_newest_in_marshall_buffer(
+                   buf, key, pull_newest_from_marshall_buffer_parser,
+                   &state)) {
+               return false;
        }
-
-       return 0;
+       if ((pdata != NULL) && (pdata->dsize != 0) && (pdata->dptr == NULL)) {
+               /* ENOMEM */
+               return false;
+       }
+       return true;
 }
 
 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
@@ -546,10 +478,12 @@ static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ct
                return NULL;
        }
 
+       result->db = ctx->db;
        result->private_data = ctx->transaction;
 
        result->key.dsize = key.dsize;
-       result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
+       result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
+                                                   key.dsize);
        if (result->key.dptr == NULL) {
                DEBUG(0, ("talloc failed\n"));
                TALLOC_FREE(result);
@@ -575,7 +509,7 @@ static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ct
        result->value.dptr = NULL;
 
        if ((result->value.dsize != 0)
-           && !(result->value.dptr = (uint8 *)talloc_memdup(
+           && !(result->value.dptr = (uint8_t *)talloc_memdup(
                         result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
                         result->value.dsize))) {
                DEBUG(0, ("talloc failed\n"));
@@ -616,7 +550,7 @@ static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx
 
        rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
        if (rec == NULL) {
-               ctx->db->transaction_cancel(ctx->db);           
+               ctx->db->transaction_cancel(ctx->db);
                return NULL;
        }
 
@@ -676,15 +610,6 @@ static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
        header.dmaster = get_my_vnn();
        header.rsn++;
 
-       h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key,
-                                       NULL, data);
-       if (h->m_all == NULL) {
-               DEBUG(0,(__location__ " Failed to add to marshalling "
-                        "record\n"));
-               talloc_free(tmp_ctx);
-               return NT_STATUS_NO_MEMORY;
-       }
-
        h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
        if (h->m_write == NULL) {
                DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
@@ -710,7 +635,7 @@ static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data,
        return status;
 }
 
-/* 
+/*
    a record delete inside a transaction
  */
 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
@@ -723,6 +648,19 @@ static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
        return status;
 }
 
+static void db_ctdb_fetch_db_seqnum_parser(
+       TDB_DATA key, struct ctdb_ltdb_header *header,
+       TDB_DATA data, void *private_data)
+{
+       uint64_t *seqnum = (uint64_t *)private_data;
+
+       if (data.dsize != sizeof(uint64_t)) {
+               *seqnum = 0;
+               return;
+       }
+       memcpy(seqnum, data.dptr, sizeof(*seqnum));
+}
+
 /**
  * Fetch the db sequence number of a persistent db directly from the db.
  */
@@ -730,36 +668,24 @@ static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
                                                uint64_t *seqnum)
 {
        NTSTATUS status;
-       const char *keyname = CTDB_DB_SEQNUM_KEY;
        TDB_DATA key;
-       TDB_DATA data;
-       struct ctdb_ltdb_header header;
-       TALLOC_CTX *mem_ctx = talloc_stackframe();
 
        if (seqnum == NULL) {
                return NT_STATUS_INVALID_PARAMETER;
        }
 
-       key = string_term_tdb_data(keyname);
-
-       status = db_ctdb_ltdb_fetch(db, key, &header, mem_ctx, &data);
-       if (!NT_STATUS_IS_OK(status) &&
-           !NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND))
-       {
-               goto done;
-       }
+       key = string_term_tdb_data(CTDB_DB_SEQNUM_KEY);
 
-       status = NT_STATUS_OK;
+       status = db_ctdb_ltdb_parse(
+               db, key, db_ctdb_fetch_db_seqnum_parser, seqnum);
 
-       if (data.dsize != sizeof(uint64_t)) {
+       if (NT_STATUS_IS_OK(status)) {
+               return NT_STATUS_OK;
+       }
+       if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
                *seqnum = 0;
-               goto done;
+               return NT_STATUS_OK;
        }
-
-       *seqnum = *(uint64_t *)data.dptr;
-
-done:
-       TALLOC_FREE(mem_ctx);
        return status;
 }
 
@@ -876,7 +802,8 @@ again:
                if (new_seqnum == old_seqnum) {
                        /* Recovery prevented all our changes: retry. */
                        goto again;
-               } else if (new_seqnum != (old_seqnum + 1)) {
+               }
+               if (new_seqnum != (old_seqnum + 1)) {
                        DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
                                  "old_seqnum[%lu] + (0 or 1) after failed "
                                  "TRANS3_COMMIT - this should not happen!\n",
@@ -940,7 +867,6 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 
 
 
-#ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
 {
        NTSTATUS status;
@@ -984,11 +910,9 @@ static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
 
        return status;
 }
-#endif
 
 static NTSTATUS db_ctdb_delete(struct db_record *rec)
 {
-       TDB_DATA data;
        NTSTATUS status;
 
        /*
@@ -996,17 +920,12 @@ static NTSTATUS db_ctdb_delete(struct db_record *rec)
         * tdb-level cleanup
         */
 
-       ZERO_STRUCT(data);
-
-       status = db_ctdb_store(rec, data, 0);
+       status = db_ctdb_store(rec, tdb_null, 0);
        if (!NT_STATUS_IS_OK(status)) {
                return status;
        }
 
-#ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
        status = db_ctdb_send_schedule_for_deletion(rec);
-#endif
-
        return status;
 }
 
@@ -1015,6 +934,9 @@ static int db_ctdb_record_destr(struct db_record* data)
        struct db_ctdb_rec *crec = talloc_get_type_abort(
                data->private_data, struct db_ctdb_rec);
        int threshold;
+       int ret;
+       struct timeval before;
+       double timediff;
 
        DEBUG(10, (DEBUGLEVEL > 10
                   ? "Unlocking db %u key %s\n"
@@ -1023,28 +945,97 @@ static int db_ctdb_record_destr(struct db_record* data)
                   hex_encode_talloc(data, (unsigned char *)data->key.dptr,
                              data->key.dsize)));
 
-       tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
+       before = timeval_current();
+
+       ret = tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
+
+       timediff = timeval_elapsed(&before);
+       timediff *= 1000;       /* get us milliseconds */
 
-       threshold = lp_ctdb_locktime_warn_threshold();
+       if (timediff > crec->ctdb_ctx->warn_unlock_msecs) {
+               char *key;
+               key = hex_encode_talloc(talloc_tos(),
+                                       (unsigned char *)data->key.dptr,
+                                       data->key.dsize);
+               DEBUG(0, ("tdb_chainunlock on db %s, key %s took %f milliseconds\n",
+                         tdb_name(crec->ctdb_ctx->wtdb->tdb), key,
+                         timediff));
+               TALLOC_FREE(key);
+       }
+
+       if (ret != 0) {
+               DEBUG(0, ("tdb_chainunlock failed\n"));
+               return -1;
+       }
+
+       threshold = crec->ctdb_ctx->warn_locktime_msecs;
        if (threshold != 0) {
-               double timediff = timeval_elapsed(&crec->lock_time);
-               if ((timediff * 1000) > threshold) {
-                       DEBUG(0, ("Held tdb lock %f seconds\n", timediff));
+               timediff = timeval_elapsed(&crec->lock_time) * 1000;
+               if (timediff > threshold) {
+                       const char *key;
+
+                       key = hex_encode_talloc(data,
+                                               (unsigned char *)data->key.dptr,
+                                               data->key.dsize);
+                       DEBUG(0, ("Held tdb lock on db %s, key %s "
+                                 "%f milliseconds\n",
+                                 tdb_name(crec->ctdb_ctx->wtdb->tdb),
+                                 key, timediff));
                }
        }
 
        return 0;
 }
 
+/**
+ * Check whether we have a valid local copy of the given record,
+ * either for reading or for writing.
+ */
+static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header *hdr,
+                                     bool read_only)
+{
+       if (hdr->dmaster != get_my_vnn()) {
+               /* If we're not dmaster, it must be r/o copy. */
+               return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
+       }
+
+       /*
+        * If we want write access, no one may have r/o copies.
+        */
+       return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
+}
+
+static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, bool read_only)
+{
+       if (ctdb_data.dptr == NULL) {
+               return false;
+       }
+
+       if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return false;
+       }
+
+       return db_ctdb_can_use_local_hdr(
+               (struct ctdb_ltdb_header *)ctdb_data.dptr, read_only);
+}
+
 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
                                               TALLOC_CTX *mem_ctx,
-                                              TDB_DATA key)
+                                              TDB_DATA key,
+                                              bool tryonly)
 {
        struct db_record *result;
        struct db_ctdb_rec *crec;
        NTSTATUS status;
        TDB_DATA ctdb_data;
-       int migrate_attempts = 0;
+       int migrate_attempts;
+       struct timeval migrate_start;
+       struct timeval chainlock_start;
+       struct timeval ctdb_start_time;
+       double chainlock_time = 0;
+       double ctdb_time = 0;
+       int duration_msecs;
+       int lockret;
 
        if (!(result = talloc(mem_ctx, struct db_record))) {
                DEBUG(0, ("talloc failed\n"));
@@ -1057,17 +1048,22 @@ static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
                return NULL;
        }
 
+       result->db = ctx->db;
        result->private_data = (void *)crec;
        crec->ctdb_ctx = ctx;
 
        result->key.dsize = key.dsize;
-       result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
+       result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
+                                                   key.dsize);
        if (result->key.dptr == NULL) {
                DEBUG(0, ("talloc failed\n"));
                TALLOC_FREE(result);
                return NULL;
        }
 
+       migrate_attempts = 0;
+       GetTimeOfDay(&migrate_start);
+
        /*
         * Do a blocking lock on the record
         */
@@ -1082,7 +1078,13 @@ again:
                TALLOC_FREE(keystr);
        }
 
-       if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
+       GetTimeOfDay(&chainlock_start);
+       lockret = tryonly
+               ? tdb_chainlock_nonblock(ctx->wtdb->tdb, key)
+               : tdb_chainlock(ctx->wtdb->tdb, key);
+       chainlock_time += timeval_elapsed(&chainlock_start);
+
+       if (lockret != 0) {
                DEBUG(3, ("tdb_chainlock failed\n"));
                TALLOC_FREE(result);
                return NULL;
@@ -1099,26 +1101,31 @@ again:
         * take the shortcut and just return it.
         */
 
-       if ((ctdb_data.dptr == NULL) ||
-           (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
-           ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
-#if 0
-           || (random() % 2 != 0)
-#endif
-) {
+       if (!db_ctdb_can_use_local_copy(ctdb_data, false)) {
                SAFE_FREE(ctdb_data.dptr);
                tdb_chainunlock(ctx->wtdb->tdb, key);
                talloc_set_destructor(result, NULL);
 
+               if (tryonly && (migrate_attempts != 0)) {
+                       DEBUG(5, ("record migrated away again\n"));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+
                migrate_attempts += 1;
 
-               DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
+               DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u) %u\n",
                           ctdb_data.dptr, ctdb_data.dptr ?
                           ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
-                          get_my_vnn()));
+                          get_my_vnn(),
+                          ctdb_data.dptr ?
+                          ((struct ctdb_ltdb_header *)ctdb_data.dptr)->flags : 0));
 
+               GetTimeOfDay(&ctdb_start_time);
                status = ctdbd_migrate(messaging_ctdbd_connection(), ctx->db_id,
                                       key);
+               ctdb_time += timeval_elapsed(&ctdb_start_time);
+
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(5, ("ctdb_migrate failed: %s\n",
                                  nt_errstr(status)));
@@ -1129,9 +1136,38 @@ again:
                goto again;
        }
 
-       if (migrate_attempts > 10) {
-               DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
-                         migrate_attempts));
+       {
+               double duration;
+               duration = timeval_elapsed(&migrate_start);
+
+               /*
+                * Convert the duration to milliseconds to avoid a
+                * floating-point division of
+                * lp_parm_int("migrate_duration") by 1000.
+                */
+               duration_msecs = duration * 1000;
+       }
+
+       if ((migrate_attempts > ctx->warn_migrate_attempts) ||
+           (duration_msecs > ctx->warn_migrate_msecs)) {
+               int chain = 0;
+
+               if (tdb_get_flags(ctx->wtdb->tdb) & TDB_INCOMPATIBLE_HASH) {
+                       chain = tdb_jenkins_hash(&key) %
+                               tdb_hash_size(ctx->wtdb->tdb);
+               }
+
+               DEBUG(0, ("db_ctdb_fetch_locked for %s key %s, chain %d "
+                         "needed %d attempts, %d milliseconds, "
+                         "chainlock: %f ms, CTDB %f ms\n",
+                         tdb_name(ctx->wtdb->tdb),
+                         hex_encode_talloc(talloc_tos(),
+                                           (unsigned char *)key.dptr,
+                                           key.dsize),
+                         chain,
+                         migrate_attempts, duration_msecs,
+                         chainlock_time * 1000.0,
+                         ctdb_time * 1000.0));
        }
 
        GetTimeOfDay(&crec->lock_time);
@@ -1142,7 +1178,7 @@ again:
        result->value.dptr = NULL;
 
        if ((result->value.dsize != 0)
-           && !(result->value.dptr = (uint8 *)talloc_memdup(
+           && !(result->value.dptr = (uint8_t *)talloc_memdup(
                         result, ctdb_data.dptr + sizeof(crec->header),
                         result->value.dsize))) {
                DEBUG(0, ("talloc failed\n"));
@@ -1169,78 +1205,119 @@ static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
                return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
        }
 
-       return fetch_locked_internal(ctx, mem_ctx, key);
+       return fetch_locked_internal(ctx, mem_ctx, key, false);
 }
 
-/*
-  fetch (unlocked, no migration) operation on ctdb
- */
-static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
-                        TDB_DATA key, TDB_DATA *data)
+static struct db_record *db_ctdb_try_fetch_locked(struct db_context *db,
+                                                 TALLOC_CTX *mem_ctx,
+                                                 TDB_DATA key)
 {
        struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
                                                        struct db_ctdb_ctx);
-       NTSTATUS status;
-       TDB_DATA ctdb_data;
 
-       if (ctx->transaction) {
-               return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
+       if (ctx->transaction != NULL) {
+               return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
        }
 
        if (db->persistent) {
-               return db_ctdb_fetch_persistent(ctx, mem_ctx, key, data);
+               return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
        }
 
-       /* try a direct fetch */
-       ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
+       return fetch_locked_internal(ctx, mem_ctx, key, true);
+}
 
-       /*
-        * See if we have a valid record and we are the dmaster. If so, we can
-        * take the shortcut and just return it.
-        * we bypass the dmaster check for persistent databases
-        */
-       if ((ctdb_data.dptr != NULL) &&
-           (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
-           ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())
-       {
-               /* we are the dmaster - avoid the ctdb protocol op */
+struct db_ctdb_parse_record_state {
+       void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
+       void *private_data;
+       bool ask_for_readonly_copy;
+       bool done;
+};
 
-               data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
-               if (data->dsize == 0) {
-                       SAFE_FREE(ctdb_data.dptr);
-                       data->dptr = NULL;
-                       return 0;
-               }
+static void db_ctdb_parse_record_parser(
+       TDB_DATA key, struct ctdb_ltdb_header *header,
+       TDB_DATA data, void *private_data)
+{
+       struct db_ctdb_parse_record_state *state =
+               (struct db_ctdb_parse_record_state *)private_data;
+       state->parser(key, data, state->private_data);
+}
 
-               data->dptr = (uint8 *)talloc_memdup(
-                       mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
-                       data->dsize);
+static void db_ctdb_parse_record_parser_nonpersistent(
+       TDB_DATA key, struct ctdb_ltdb_header *header,
+       TDB_DATA data, void *private_data)
+{
+       struct db_ctdb_parse_record_state *state =
+               (struct db_ctdb_parse_record_state *)private_data;
 
-               SAFE_FREE(ctdb_data.dptr);
+       if (db_ctdb_can_use_local_hdr(header, true)) {
+               state->parser(key, data, state->private_data);
+               state->done = true;
+       } else {
+               /*
+                * We found something in the db, so it seems that this record,
+                * while not usable locally right now, is popular. Ask for a
+                * R/O copy.
+                */
+               state->ask_for_readonly_copy = true;
+       }
+}
 
-               if (data->dptr == NULL) {
-                       return -1;
+static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
+                                    void (*parser)(TDB_DATA key,
+                                                   TDB_DATA data,
+                                                   void *private_data),
+                                    void *private_data)
+{
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(
+               db->private_data, struct db_ctdb_ctx);
+       struct db_ctdb_parse_record_state state;
+       NTSTATUS status;
+
+       state.parser = parser;
+       state.private_data = private_data;
+
+       if (ctx->transaction != NULL) {
+               struct db_ctdb_transaction_handle *h = ctx->transaction;
+               bool found;
+
+               /*
+                * Transactions only happen for persistent db's.
+                */
+
+               found = parse_newest_in_marshall_buffer(
+                       h->m_write, key, db_ctdb_parse_record_parser, &state);
+
+               if (found) {
+                       return NT_STATUS_OK;
                }
-               return 0;
        }
 
-       SAFE_FREE(ctdb_data.dptr);
+       if (db->persistent) {
+               /*
+                * Persistent db, but not found in the transaction buffer
+                */
+               return db_ctdb_ltdb_parse(
+                       ctx, key, db_ctdb_parse_record_parser, &state);
+       }
 
-       /* we weren't able to get it locally - ask ctdb to fetch it for us */
-       status = ctdbd_fetch(messaging_ctdbd_connection(), ctx->db_id, key,
-                            mem_ctx, data);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
-               return -1;
+       state.done = false;
+       state.ask_for_readonly_copy = false;
+
+       status = db_ctdb_ltdb_parse(
+               ctx, key, db_ctdb_parse_record_parser_nonpersistent, &state);
+       if (NT_STATUS_IS_OK(status) && state.done) {
+               return NT_STATUS_OK;
        }
 
-       return 0;
+       return ctdbd_parse(messaging_ctdbd_connection(), ctx->db_id, key,
+                          state.ask_for_readonly_copy, parser, private_data);
 }
 
 struct traverse_state {
        struct db_context *db;
        int (*fn)(struct db_record *rec, void *private_data);
        void *private_data;
+       int count;
 };
 
 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
@@ -1269,7 +1346,7 @@ static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DAT
         * This is used for persistent transactions internally.
         */
        if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
-           strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY))
+           strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
        {
                goto done;
        }
@@ -1297,6 +1374,7 @@ static int db_ctdb_traverse(struct db_context *db,
                                      void *private_data),
                            void *private_data)
 {
+       NTSTATUS status;
         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
                                                         struct db_ctdb_ctx);
        struct traverse_state state;
@@ -1304,6 +1382,7 @@ static int db_ctdb_traverse(struct db_context *db,
        state.db = db;
        state.fn = fn;
        state.private_data = private_data;
+       state.count = 0;
 
        if (db->persistent) {
                struct tdb_context *ltdb = ctx->wtdb->tdb;
@@ -1316,17 +1395,24 @@ static int db_ctdb_traverse(struct db_context *db,
                        return ret;
                }
                if (ctx->transaction && ctx->transaction->m_write) {
-                       /* we now have to handle keys not yet present at transaction start */
+                       /*
+                        * we now have to handle keys not yet
+                        * present at transaction start
+                        */
                        struct db_context *newkeys = db_open_rbt(talloc_tos());
                        struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
                        struct ctdb_rec_data *rec=NULL;
-                       NTSTATUS status;
                        int i;
+                       int count = 0;
+
+                       if (newkeys == NULL) {
+                               return -1;
+                       }
+
                        for (i=0; i<mbuf->count; i++) {
                                TDB_DATA key;
-                               rec =db_ctdb_marshall_loop_next(mbuf, rec,
-                                                               NULL, NULL,
-                                                               &key, NULL);
+                               rec = db_ctdb_marshall_loop_next_key(
+                                       mbuf, rec, &key);
                                SMB_ASSERT(rec != NULL);
 
                                if (!tdb_exists(ltdb, key)) {
@@ -1335,16 +1421,22 @@ static int db_ctdb_traverse(struct db_context *db,
                        }
                        status = dbwrap_traverse(newkeys,
                                                 traverse_persistent_callback_dbwrap,
-                                                &state);
-                       ret = NT_STATUS_IS_OK(status) ? 0 : -1;
+                                                &state,
+                                                &count);
                        talloc_free(newkeys);
+                       if (!NT_STATUS_IS_OK(status)) {
+                               return -1;
+                       }
+                       ret += count;
                }
                return ret;
        }
 
-
-       ctdbd_traverse(ctx->db_id, traverse_callback, &state);
-       return 0;
+       status = ctdbd_traverse(ctx->db_id, traverse_callback, &state);
+       if (!NT_STATUS_IS_OK(status)) {
+               return -1;
+       }
+       return state.count;
 }
 
 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
@@ -1361,12 +1453,16 @@ static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_da
 {
        struct traverse_state *state = (struct traverse_state *)private_data;
        struct db_record rec;
+
+       ZERO_STRUCT(rec);
+       rec.db = state->db;
        rec.key = key;
        rec.value = data;
        rec.store = db_ctdb_store_deny;
        rec.delete_rec = db_ctdb_delete_deny;
-       rec.private_data = state->db;
+       rec.private_data = NULL;
        state->fn(&rec, state->private_data);
+       state->count++;
 }
 
 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
@@ -1380,17 +1476,18 @@ static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TD
         * This is used for persistent transactions internally.
         */
        if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
-           strncmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY,
-                   strlen(CTDB_DB_SEQNUM_KEY)) == 0)
+           strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
        {
                return 0;
        }
 
+       ZERO_STRUCT(rec);
+       rec.db = state->db;
        rec.key = kbuf;
        rec.value = dbuf;
        rec.store = db_ctdb_store_deny;
        rec.delete_rec = db_ctdb_delete_deny;
-       rec.private_data = state->db;
+       rec.private_data = NULL;
 
        if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
                /* a deleted record */
@@ -1399,6 +1496,7 @@ static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TD
        rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
        rec.value.dptr += sizeof(struct ctdb_ltdb_header);
 
+       state->count++;
        return state->fn(&rec, state->private_data);
 }
 
@@ -1407,6 +1505,7 @@ static int db_ctdb_traverse_read(struct db_context *db,
                                           void *private_data),
                                 void *private_data)
 {
+       NTSTATUS status;
         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
                                                         struct db_ctdb_ctx);
        struct traverse_state state;
@@ -1414,6 +1513,7 @@ static int db_ctdb_traverse_read(struct db_context *db,
        state.db = db;
        state.fn = fn;
        state.private_data = private_data;
+       state.count = 0;
 
        if (db->persistent) {
                /* for persistent databases we don't need to do a ctdb traverse,
@@ -1421,8 +1521,11 @@ static int db_ctdb_traverse_read(struct db_context *db,
                return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
        }
 
-       ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
-       return 0;
+       status = ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
+       if (!NT_STATUS_IS_OK(status)) {
+               return -1;
+       }
+       return state.count;
 }
 
 static int db_ctdb_get_seqnum(struct db_context *db)
@@ -1432,22 +1535,31 @@ static int db_ctdb_get_seqnum(struct db_context *db)
        return tdb_get_seqnum(ctx->wtdb->tdb);
 }
 
-static int db_ctdb_get_flags(struct db_context *db)
+static void db_ctdb_id(struct db_context *db, const uint8_t **id,
+                      size_t *idlen)
 {
-        struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
-                                                        struct db_ctdb_ctx);
-       return tdb_get_flags(ctx->wtdb->tdb);
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(
+               db->private_data, struct db_ctdb_ctx);
+
+       *id = (uint8_t *)&ctx->db_id;
+       *idlen = sizeof(ctx->db_id);
 }
 
 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                                const char *name,
                                int hash_size, int tdb_flags,
-                               int open_flags, mode_t mode)
+                               int open_flags, mode_t mode,
+                               enum dbwrap_lock_order lock_order,
+                               uint64_t dbwrap_flags)
 {
        struct db_context *result;
        struct db_ctdb_ctx *db_ctdb;
        char *db_path;
        struct ctdbd_connection *conn;
+       struct loadparm_context *lp_ctx;
+       struct ctdb_db_priority prio;
+       NTSTATUS status;
+       int cstatus;
 
        if (!lp_clustering()) {
                DEBUG(10, ("Clustering disabled -- no ctdb\n"));
@@ -1466,6 +1578,13 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                return NULL;
        }
 
+       result->name = talloc_strdup(result, name);
+       if (result->name == NULL) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
        db_ctdb->transaction = NULL;
        db_ctdb->db = result;
 
@@ -1485,16 +1604,61 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        db_path = ctdbd_dbpath(conn, db_ctdb, db_ctdb->db_id);
 
        result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
+       result->lock_order = lock_order;
 
        /* only pass through specific flags */
-       tdb_flags &= TDB_SEQNUM;
+       tdb_flags &= TDB_SEQNUM|TDB_VOLATILE|
+               TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST;
 
        /* honor permissions if user has specified O_CREAT */
        if (open_flags & O_CREAT) {
                chmod(db_path, mode);
        }
 
-       db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
+       prio.db_id = db_ctdb->db_id;
+       prio.priority = lock_order;
+
+       status = ctdbd_control_local(
+               conn, CTDB_CONTROL_SET_DB_PRIORITY, 0, 0,
+               make_tdb_data((uint8_t *)&prio, sizeof(prio)),
+               NULL, NULL, &cstatus);
+
+       if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
+               DEBUG(1, ("CTDB_CONTROL_SET_DB_PRIORITY failed: %s, %d\n",
+                         nt_errstr(status), cstatus));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       if (!result->persistent &&
+           (dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
+       {
+               TDB_DATA indata;
+
+               indata = make_tdb_data((uint8_t *)&db_ctdb->db_id,
+                                      sizeof(db_ctdb->db_id));
+
+               status = ctdbd_control_local(
+                       conn, CTDB_CONTROL_SET_DB_READONLY, 0, 0, indata,
+                       NULL, NULL, &cstatus);
+               if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
+                       DEBUG(1, ("CTDB_CONTROL_SET_DB_READONLY failed: "
+                                 "%s, %d\n", nt_errstr(status), cstatus));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+       }
+
+       lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
+
+       if (hash_size == 0) {
+               hash_size = lpcfg_tdb_hash_size(lp_ctx, db_path);
+       }
+
+       db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size,
+                                     lpcfg_tdb_flags(lp_ctx, tdb_flags),
+                                     O_RDWR, 0);
+       talloc_unlink(db_path, lp_ctx);
        if (db_ctdb->wtdb == NULL) {
                DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
                TALLOC_FREE(result);
@@ -1512,20 +1676,29 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                }
        }
 
+       db_ctdb->warn_unlock_msecs = lp_parm_int(-1, "ctdb",
+                                                "unlock_warn_threshold", 5);
+       db_ctdb->warn_migrate_attempts = lp_parm_int(-1, "ctdb",
+                                                    "migrate_attempts", 10);
+       db_ctdb->warn_migrate_msecs = lp_parm_int(-1, "ctdb",
+                                                 "migrate_duration", 5000);
+       db_ctdb->warn_locktime_msecs = lp_ctdb_locktime_warn_threshold();
+
        result->private_data = (void *)db_ctdb;
        result->fetch_locked = db_ctdb_fetch_locked;
-       result->fetch = db_ctdb_fetch;
+       result->try_fetch_locked = db_ctdb_try_fetch_locked;
+       result->parse_record = db_ctdb_parse_record;
        result->traverse = db_ctdb_traverse;
        result->traverse_read = db_ctdb_traverse_read;
        result->get_seqnum = db_ctdb_get_seqnum;
-       result->get_flags = db_ctdb_get_flags;
        result->transaction_start = db_ctdb_transaction_start;
        result->transaction_commit = db_ctdb_transaction_commit;
        result->transaction_cancel = db_ctdb_transaction_cancel;
+       result->id = db_ctdb_id;
+       result->stored_callback = NULL;
 
        DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
                 name, db_ctdb->db_id));
 
        return result;
 }
-#endif