dbwrap_ctdb: Pass on mutex flags to tdb_open
[obnox/samba/samba-obnox.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
index fe2af3609cf36e166d06eca65c68bc2b52e8e8a1..e6dcc0e987c35908e854e97c7a35500f5752a9aa 100644 (file)
@@ -1,4 +1,4 @@
-/* 
+/*
    Unix SMB/CIFS implementation.
    Database interface wrapper around ctdbd
    Copyright (C) Volker Lendecke 2007-2009
 #include "system/filesys.h"
 #include "lib/tdb_wrap/tdb_wrap.h"
 #include "util_tdb.h"
+#include "dbwrap/dbwrap.h"
 #include "dbwrap/dbwrap_ctdb.h"
 #include "dbwrap/dbwrap_rbt.h"
 #include "lib/param/param.h"
 
-#ifdef CLUSTER_SUPPORT
-
 /*
  * It is not possible to include ctdb.h and tdb_compat.h (included via
  * some other include above) without warnings. This fixes those
@@ -69,9 +68,15 @@ struct db_ctdb_transaction_handle {
 struct db_ctdb_ctx {
        struct db_context *db;
        struct tdb_wrap *wtdb;
-       uint32 db_id;
+       uint32_t db_id;
        struct db_ctdb_transaction_handle *transaction;
        struct g_lock_ctx *lock_ctx;
+
+       /* thresholds for warning messages */
+       int warn_unlock_msecs;
+       int warn_migrate_msecs;
+       int warn_migrate_attempts;
+       int warn_locktime_msecs;
 };
 
 struct db_ctdb_rec {
@@ -87,58 +92,48 @@ static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
        return map_nt_error_from_tdb(tret);
 }
 
+struct db_ctdb_ltdb_parse_state {
+       void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
+                      TDB_DATA data, void *private_data);
+       void *private_data;
+};
 
-/**
- * fetch a record from the tdb, separating out the header
- * information and returning the body of the record.
- */
-static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
-                                  TDB_DATA key,
-                                  struct ctdb_ltdb_header *header,
-                                  TALLOC_CTX *mem_ctx,
-                                  TDB_DATA *data)
+static int db_ctdb_ltdb_parser(TDB_DATA key, TDB_DATA data,
+                              void *private_data)
 {
-       TDB_DATA rec;
-       NTSTATUS status;
+       struct db_ctdb_ltdb_parse_state *state =
+               (struct db_ctdb_ltdb_parse_state *)private_data;
 
-       rec = tdb_fetch_compat(db->wtdb->tdb, key);
-       if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
-               status = NT_STATUS_NOT_FOUND;
-               if (data) {
-                       ZERO_STRUCTP(data);
-               }
-               if (header) {
-                       header->dmaster = (uint32_t)-1;
-                       header->rsn = 0;
-               }
-               goto done;
+       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return -1;
        }
 
-       if (header) {
-               *header = *(struct ctdb_ltdb_header *)rec.dptr;
-       }
+       state->parser(
+               key, (struct ctdb_ltdb_header *)data.dptr,
+               make_tdb_data(data.dptr + sizeof(struct ctdb_ltdb_header),
+                             data.dsize - sizeof(struct ctdb_ltdb_header)),
+               state->private_data);
+       return 0;
+}
 
-       if (data) {
-               data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
-               if (data->dsize == 0) {
-                       data->dptr = NULL;
-               } else {
-                       data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
-                                       rec.dptr
-                                        + sizeof(struct ctdb_ltdb_header),
-                                       data->dsize);
-                       if (data->dptr == NULL) {
-                               status = NT_STATUS_NO_MEMORY;
-                               goto done;
-                       }
-               }
-       }
+static NTSTATUS db_ctdb_ltdb_parse(
+       struct db_ctdb_ctx *db, TDB_DATA key,
+       void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
+                      TDB_DATA data, void *private_data),
+       void *private_data)
+{
+       struct db_ctdb_ltdb_parse_state state;
+       int ret;
 
-       status = NT_STATUS_OK;
+       state.parser = parser;
+       state.private_data = private_data;
 
-done:
-       SAFE_FREE(rec.dptr);
-       return status;
+       ret = tdb_parse_record(db->wtdb->tdb, key, db_ctdb_ltdb_parser,
+                              &state);
+       if (ret == -1) {
+               return NT_STATUS_NOT_FOUND;
+       }
+       return NT_STATUS_OK;
 }
 
 /*
@@ -150,15 +145,13 @@ static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
                                   struct ctdb_ltdb_header *header,
                                   TDB_DATA data)
 {
-       TALLOC_CTX *tmp_ctx = talloc_stackframe();
        TDB_DATA rec;
        int ret;
 
        rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
-       rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
+       rec.dptr = (uint8_t *)talloc_size(talloc_tos(), rec.dsize);
 
        if (rec.dptr == NULL) {
-               talloc_free(tmp_ctx);
                return NT_STATUS_NO_MEMORY;
        }
 
@@ -167,7 +160,7 @@ static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
 
        ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
 
-       talloc_free(tmp_ctx);
+       talloc_free(rec.dptr);
 
        return (ret == 0) ? NT_STATUS_OK
                          : tdb_error_to_ntstatus(db->wtdb->tdb);
@@ -176,20 +169,17 @@ static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
 
 /*
   form a ctdb_rec_data record from a key/data pair
-
-  note that header may be NULL. If not NULL then it is included in the data portion
-  of the record
  */
-static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,      
-                                                 TDB_DATA key, 
+static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
+                                                 TDB_DATA key,
                                                  struct ctdb_ltdb_header *header,
                                                  TDB_DATA data)
 {
        size_t length;
        struct ctdb_rec_data *d;
 
-       length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
-               data.dsize + (header?sizeof(*header):0);
+       length = offsetof(struct ctdb_rec_data, data) + key.dsize +
+               data.dsize + sizeof(*header);
        d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
        if (d == NULL) {
                return NULL;
@@ -198,20 +188,16 @@ static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32
        d->reqid = reqid;
        d->keylen = key.dsize;
        memcpy(&d->data[0], key.dptr, key.dsize);
-       if (header) {
-               d->datalen = data.dsize + sizeof(*header);
-               memcpy(&d->data[key.dsize], header, sizeof(*header));
-               memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
-       } else {
-               d->datalen = data.dsize;
-               memcpy(&d->data[key.dsize], data.dptr, data.dsize);
-       }
+
+       d->datalen = data.dsize + sizeof(*header);
+       memcpy(&d->data[key.dsize], header, sizeof(*header));
+       memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
        return d;
 }
 
 
 /* helper function for marshalling multiple records */
-static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
+static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
                                               struct ctdb_marshall_buffer *m,
                                               uint64_t db_id,
                                               uint32_t reqid,
@@ -266,16 +252,14 @@ static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
        return data;
 }
 
-/* 
-   loop over a marshalling buffer 
+/*
+   loop over a marshalling buffer
 
      - pass r==NULL to start
      - loop the number of times indicated by m->count
 */
-static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
-                                                    uint32_t *reqid,
-                                                    struct ctdb_ltdb_header *header,
-                                                    TDB_DATA *key, TDB_DATA *data)
+static struct ctdb_rec_data *db_ctdb_marshall_loop_next_key(
+       struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r, TDB_DATA *key)
 {
        if (r == NULL) {
                r = (struct ctdb_rec_data *)&m->data[0];
@@ -283,31 +267,27 @@ static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buf
                r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
        }
 
-       if (reqid != NULL) {
-               *reqid = r->reqid;
-       }
+       key->dptr   = &r->data[0];
+       key->dsize  = r->keylen;
+       return r;
+}
 
-       if (key != NULL) {
-               key->dptr   = &r->data[0];
-               key->dsize  = r->keylen;
-       }
-       if (data != NULL) {
-               data->dptr  = &r->data[r->keylen];
-               data->dsize = r->datalen;
-               if (header != NULL) {
-                       data->dptr += sizeof(*header);
-                       data->dsize -= sizeof(*header);
-               }
+static bool db_ctdb_marshall_buf_parse(
+       struct ctdb_rec_data *r, uint32_t *reqid,
+       struct ctdb_ltdb_header **header, TDB_DATA *data)
+{
+       if (r->datalen < sizeof(struct ctdb_ltdb_header)) {
+               return false;
        }
 
-       if (header != NULL) {
-               if (r->datalen < sizeof(*header)) {
-                       return NULL;
-               }
-               *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
-       }
+       *reqid = r->reqid;
 
-       return r;
+       data->dptr  = &r->data[r->keylen] + sizeof(struct ctdb_ltdb_header);
+       data->dsize = r->datalen - sizeof(struct ctdb_ltdb_header);
+
+       *header = (struct ctdb_ltdb_header *)&r->data[r->keylen];
+
+       return true;
 }
 
 /**
@@ -352,7 +332,7 @@ static int db_ctdb_transaction_start(struct db_context *db)
 
        h = talloc_zero(db, struct db_ctdb_transaction_handle);
        if (h == NULL) {
-               DEBUG(0,(__location__ " oom for transaction handle\n"));                
+               DEBUG(0,(__location__ " oom for transaction handle\n"));
                return -1;
        }
 
@@ -386,15 +366,14 @@ static int db_ctdb_transaction_start(struct db_context *db)
        return 0;
 }
 
-static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
-                                            TDB_DATA key,
-                                            struct ctdb_ltdb_header *pheader,
-                                            TALLOC_CTX *mem_ctx,
-                                            TDB_DATA *pdata)
+static bool parse_newest_in_marshall_buffer(
+       struct ctdb_marshall_buffer *buf, TDB_DATA key,
+       void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
+                      TDB_DATA data, void *private_data),
+       void *private_data)
 {
        struct ctdb_rec_data *rec = NULL;
-       struct ctdb_ltdb_header h;
-       bool found = false;
+       struct ctdb_ltdb_header *h = NULL;
        TDB_DATA data;
        int i;
 
@@ -402,9 +381,6 @@ static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
                return false;
        }
 
-       ZERO_STRUCT(h);
-       ZERO_STRUCT(data);
-
        /*
         * Walk the list of records written during this
         * transaction. If we want to read one we have already
@@ -414,95 +390,77 @@ static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
         */
 
        for (i=0; i<buf->count; i++) {
-               TDB_DATA tkey, tdata;
+               TDB_DATA tkey;
                uint32_t reqid;
-               struct ctdb_ltdb_header hdr;
-
-               ZERO_STRUCT(hdr);
 
-               rec = db_ctdb_marshall_loop_next(buf, rec, &reqid, &hdr, &tkey,
-                                                &tdata);
+               rec = db_ctdb_marshall_loop_next_key(buf, rec, &tkey);
                if (rec == NULL) {
                        return false;
                }
 
-               if (tdb_data_equal(key, tkey)) {
-                       found = true;
-                       data = tdata;
-                       h = hdr;
+               if (!tdb_data_equal(key, tkey)) {
+                       continue;
                }
-       }
-
-       if (!found) {
-               return false;
-       }
 
-       if (pdata != NULL) {
-               data.dptr = (uint8_t *)talloc_memdup(mem_ctx, data.dptr,
-                                                    data.dsize);
-               if ((data.dsize != 0) && (data.dptr == NULL)) {
+               if (!db_ctdb_marshall_buf_parse(rec, &reqid, &h, &data)) {
                        return false;
                }
-               *pdata = data;
        }
 
-       if (pheader != NULL) {
-               *pheader = h;
+       if (h == NULL) {
+               return false;
        }
 
+       parser(key, h, data, private_data);
+
        return true;
 }
 
-/*
-  fetch a record inside a transaction
- */
-static NTSTATUS db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
-                                         TALLOC_CTX *mem_ctx,
-                                         TDB_DATA key, TDB_DATA *data)
+struct pull_newest_from_marshall_buffer_state {
+       struct ctdb_ltdb_header *pheader;
+       TALLOC_CTX *mem_ctx;
+       TDB_DATA *pdata;
+};
+
+static void pull_newest_from_marshall_buffer_parser(
+       TDB_DATA key, struct ctdb_ltdb_header *header,
+       TDB_DATA data, void *private_data)
 {
-       struct db_ctdb_transaction_handle *h = db->transaction;
-       NTSTATUS status;
-       bool found;
+       struct pull_newest_from_marshall_buffer_state *state =
+               (struct pull_newest_from_marshall_buffer_state *)private_data;
 
-       found = pull_newest_from_marshall_buffer(h->m_write, key, NULL,
-                                                mem_ctx, data);
-       if (found) {
-               return NT_STATUS_OK;
+       if (state->pheader != NULL) {
+               memcpy(state->pheader, header, sizeof(*state->pheader));
        }
-
-       status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
-
-       if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
-               *data = tdb_null;
+       if (state->pdata != NULL) {
+               state->pdata->dsize = data.dsize;
+               state->pdata->dptr = (uint8_t *)talloc_memdup(
+                       state->mem_ctx, data.dptr, data.dsize);
        }
-
-       return status;
 }
 
-/**
- * Fetch a record from a persistent database
- * without record locking and without an active transaction.
- *
- * This just fetches from the local database copy.
- * Since the databases are kept in syc cluster-wide,
- * there is no point in doing a ctdb call to fetch the
- * record from the lmaster. It does even harm since migration
- * of records bump their RSN and hence render the persistent
- * database inconsistent.
- */
-static NTSTATUS db_ctdb_fetch_persistent(struct db_ctdb_ctx *db,
-                                        TALLOC_CTX *mem_ctx,
-                                        TDB_DATA key, TDB_DATA *data)
+static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
+                                            TDB_DATA key,
+                                            struct ctdb_ltdb_header *pheader,
+                                            TALLOC_CTX *mem_ctx,
+                                            TDB_DATA *pdata)
 {
-       NTSTATUS status;
+       struct pull_newest_from_marshall_buffer_state state;
 
-       status = db_ctdb_ltdb_fetch(db, key, NULL, mem_ctx, data);
+       state.pheader = pheader;
+       state.mem_ctx = mem_ctx;
+       state.pdata = pdata;
 
-       if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
-               *data = tdb_null;
+       if (!parse_newest_in_marshall_buffer(
+                   buf, key, pull_newest_from_marshall_buffer_parser,
+                   &state)) {
+               return false;
        }
-
-       return status;
+       if ((pdata != NULL) && (pdata->dsize != 0) && (pdata->dptr == NULL)) {
+               /* ENOMEM */
+               return false;
+       }
+       return true;
 }
 
 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
@@ -520,10 +478,12 @@ static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ct
                return NULL;
        }
 
+       result->db = ctx->db;
        result->private_data = ctx->transaction;
 
        result->key.dsize = key.dsize;
-       result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
+       result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
+                                                   key.dsize);
        if (result->key.dptr == NULL) {
                DEBUG(0, ("talloc failed\n"));
                TALLOC_FREE(result);
@@ -549,7 +509,7 @@ static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ct
        result->value.dptr = NULL;
 
        if ((result->value.dsize != 0)
-           && !(result->value.dptr = (uint8 *)talloc_memdup(
+           && !(result->value.dptr = (uint8_t *)talloc_memdup(
                         result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
                         result->value.dsize))) {
                DEBUG(0, ("talloc failed\n"));
@@ -590,7 +550,7 @@ static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx
 
        rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
        if (rec == NULL) {
-               ctx->db->transaction_cancel(ctx->db);           
+               ctx->db->transaction_cancel(ctx->db);
                return NULL;
        }
 
@@ -675,7 +635,7 @@ static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data,
        return status;
 }
 
-/* 
+/*
    a record delete inside a transaction
  */
 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
@@ -688,6 +648,19 @@ static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
        return status;
 }
 
+static void db_ctdb_fetch_db_seqnum_parser(
+       TDB_DATA key, struct ctdb_ltdb_header *header,
+       TDB_DATA data, void *private_data)
+{
+       uint64_t *seqnum = (uint64_t *)private_data;
+
+       if (data.dsize != sizeof(uint64_t)) {
+               *seqnum = 0;
+               return;
+       }
+       memcpy(seqnum, data.dptr, sizeof(*seqnum));
+}
+
 /**
  * Fetch the db sequence number of a persistent db directly from the db.
  */
@@ -695,36 +668,24 @@ static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
                                                uint64_t *seqnum)
 {
        NTSTATUS status;
-       const char *keyname = CTDB_DB_SEQNUM_KEY;
        TDB_DATA key;
-       TDB_DATA data;
-       struct ctdb_ltdb_header header;
-       TALLOC_CTX *mem_ctx = talloc_stackframe();
 
        if (seqnum == NULL) {
                return NT_STATUS_INVALID_PARAMETER;
        }
 
-       key = string_term_tdb_data(keyname);
-
-       status = db_ctdb_ltdb_fetch(db, key, &header, mem_ctx, &data);
-       if (!NT_STATUS_IS_OK(status) &&
-           !NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND))
-       {
-               goto done;
-       }
+       key = string_term_tdb_data(CTDB_DB_SEQNUM_KEY);
 
-       status = NT_STATUS_OK;
+       status = db_ctdb_ltdb_parse(
+               db, key, db_ctdb_fetch_db_seqnum_parser, seqnum);
 
-       if (data.dsize != sizeof(uint64_t)) {
+       if (NT_STATUS_IS_OK(status)) {
+               return NT_STATUS_OK;
+       }
+       if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
                *seqnum = 0;
-               goto done;
+               return NT_STATUS_OK;
        }
-
-       *seqnum = *(uint64_t *)data.dptr;
-
-done:
-       TALLOC_FREE(mem_ctx);
        return status;
 }
 
@@ -841,7 +802,8 @@ again:
                if (new_seqnum == old_seqnum) {
                        /* Recovery prevented all our changes: retry. */
                        goto again;
-               } else if (new_seqnum != (old_seqnum + 1)) {
+               }
+               if (new_seqnum != (old_seqnum + 1)) {
                        DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
                                  "old_seqnum[%lu] + (0 or 1) after failed "
                                  "TRANS3_COMMIT - this should not happen!\n",
@@ -905,7 +867,6 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 
 
 
-#ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
 {
        NTSTATUS status;
@@ -949,11 +910,9 @@ static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
 
        return status;
 }
-#endif
 
 static NTSTATUS db_ctdb_delete(struct db_record *rec)
 {
-       TDB_DATA data;
        NTSTATUS status;
 
        /*
@@ -961,17 +920,12 @@ static NTSTATUS db_ctdb_delete(struct db_record *rec)
         * tdb-level cleanup
         */
 
-       ZERO_STRUCT(data);
-
-       status = db_ctdb_store(rec, data, 0);
+       status = db_ctdb_store(rec, tdb_null, 0);
        if (!NT_STATUS_IS_OK(status)) {
                return status;
        }
 
-#ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
        status = db_ctdb_send_schedule_for_deletion(rec);
-#endif
-
        return status;
 }
 
@@ -980,6 +934,9 @@ static int db_ctdb_record_destr(struct db_record* data)
        struct db_ctdb_rec *crec = talloc_get_type_abort(
                data->private_data, struct db_ctdb_rec);
        int threshold;
+       int ret;
+       struct timeval before;
+       double timediff;
 
        DEBUG(10, (DEBUGLEVEL > 10
                   ? "Unlocking db %u key %s\n"
@@ -988,42 +945,78 @@ static int db_ctdb_record_destr(struct db_record* data)
                   hex_encode_talloc(data, (unsigned char *)data->key.dptr,
                              data->key.dsize)));
 
-       tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
+       before = timeval_current();
+
+       ret = tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
+
+       timediff = timeval_elapsed(&before);
+       timediff *= 1000;       /* get us milliseconds */
+
+       if (timediff > crec->ctdb_ctx->warn_unlock_msecs) {
+               char *key;
+               key = hex_encode_talloc(talloc_tos(),
+                                       (unsigned char *)data->key.dptr,
+                                       data->key.dsize);
+               DEBUG(0, ("tdb_chainunlock on db %s, key %s took %f milliseconds\n",
+                         tdb_name(crec->ctdb_ctx->wtdb->tdb), key,
+                         timediff));
+               TALLOC_FREE(key);
+       }
+
+       if (ret != 0) {
+               DEBUG(0, ("tdb_chainunlock failed\n"));
+               return -1;
+       }
 
-       threshold = lp_ctdb_locktime_warn_threshold();
+       threshold = crec->ctdb_ctx->warn_locktime_msecs;
        if (threshold != 0) {
-               double timediff = timeval_elapsed(&crec->lock_time);
-               if ((timediff * 1000) > threshold) {
-                       DEBUG(0, ("Held tdb lock %f seconds\n", timediff));
+               timediff = timeval_elapsed(&crec->lock_time) * 1000;
+               if (timediff > threshold) {
+                       const char *key;
+
+                       key = hex_encode_talloc(data,
+                                               (unsigned char *)data->key.dptr,
+                                               data->key.dsize);
+                       DEBUG(0, ("Held tdb lock on db %s, key %s "
+                                 "%f milliseconds\n",
+                                 tdb_name(crec->ctdb_ctx->wtdb->tdb),
+                                 key, timediff));
                }
        }
 
        return 0;
 }
 
-/* Do I own this record? */
-static bool db_ctdb_own_record(TDB_DATA ctdb_data, bool read_only)
+/**
+ * Check whether we have a valid local copy of the given record,
+ * either for reading or for writing.
+ */
+static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header *hdr,
+                                     bool read_only)
 {
-       struct ctdb_ltdb_header *hdr;
-
-       if (ctdb_data.dptr == NULL)
-               return false;
-
-       if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header))
-               return false;
-
-#ifdef HAVE_CTDB_WANT_READONLY_DECL
-       hdr = (struct ctdb_ltdb_header *)ctdb_data.dptr;
        if (hdr->dmaster != get_my_vnn()) {
                /* If we're not dmaster, it must be r/o copy. */
                return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
        }
 
-       /* If we want write access, noone can have r/o copies. */
+       /*
+        * If we want write access, no one may have r/o copies.
+        */
        return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
-#else
-       return !read_only;
-#endif
+}
+
+static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, bool read_only)
+{
+       if (ctdb_data.dptr == NULL) {
+               return false;
+       }
+
+       if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return false;
+       }
+
+       return db_ctdb_can_use_local_hdr(
+               (struct ctdb_ltdb_header *)ctdb_data.dptr, read_only);
 }
 
 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
@@ -1035,7 +1028,13 @@ static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
        struct db_ctdb_rec *crec;
        NTSTATUS status;
        TDB_DATA ctdb_data;
-       int migrate_attempts = 0;
+       int migrate_attempts;
+       struct timeval migrate_start;
+       struct timeval chainlock_start;
+       struct timeval ctdb_start_time;
+       double chainlock_time = 0;
+       double ctdb_time = 0;
+       int duration_msecs;
        int lockret;
 
        if (!(result = talloc(mem_ctx, struct db_record))) {
@@ -1049,17 +1048,22 @@ static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
                return NULL;
        }
 
+       result->db = ctx->db;
        result->private_data = (void *)crec;
        crec->ctdb_ctx = ctx;
 
        result->key.dsize = key.dsize;
-       result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
+       result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
+                                                   key.dsize);
        if (result->key.dptr == NULL) {
                DEBUG(0, ("talloc failed\n"));
                TALLOC_FREE(result);
                return NULL;
        }
 
+       migrate_attempts = 0;
+       GetTimeOfDay(&migrate_start);
+
        /*
         * Do a blocking lock on the record
         */
@@ -1074,9 +1078,12 @@ again:
                TALLOC_FREE(keystr);
        }
 
+       GetTimeOfDay(&chainlock_start);
        lockret = tryonly
                ? tdb_chainlock_nonblock(ctx->wtdb->tdb, key)
                : tdb_chainlock(ctx->wtdb->tdb, key);
+       chainlock_time += timeval_elapsed(&chainlock_start);
+
        if (lockret != 0) {
                DEBUG(3, ("tdb_chainlock failed\n"));
                TALLOC_FREE(result);
@@ -1094,11 +1101,7 @@ again:
         * take the shortcut and just return it.
         */
 
-       if (!db_ctdb_own_record(ctdb_data, false)
-#if 0
-           || (random() % 2 != 0)
-#endif
-) {
+       if (!db_ctdb_can_use_local_copy(ctdb_data, false)) {
                SAFE_FREE(ctdb_data.dptr);
                tdb_chainunlock(ctx->wtdb->tdb, key);
                talloc_set_destructor(result, NULL);
@@ -1118,8 +1121,11 @@ again:
                           ctdb_data.dptr ?
                           ((struct ctdb_ltdb_header *)ctdb_data.dptr)->flags : 0));
 
+               GetTimeOfDay(&ctdb_start_time);
                status = ctdbd_migrate(messaging_ctdbd_connection(), ctx->db_id,
                                       key);
+               ctdb_time += timeval_elapsed(&ctdb_start_time);
+
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(5, ("ctdb_migrate failed: %s\n",
                                  nt_errstr(status)));
@@ -1130,13 +1136,38 @@ again:
                goto again;
        }
 
-       if (migrate_attempts > 10) {
-               DEBUG(0, ("db_ctdb_fetch_locked for %s key %s needed %d "
-                         "attempts\n", tdb_name(ctx->wtdb->tdb),
+       {
+               double duration;
+               duration = timeval_elapsed(&migrate_start);
+
+               /*
+                * Convert the duration to milliseconds to avoid a
+                * floating-point division of
+                * lp_parm_int("migrate_duration") by 1000.
+                */
+               duration_msecs = duration * 1000;
+       }
+
+       if ((migrate_attempts > ctx->warn_migrate_attempts) ||
+           (duration_msecs > ctx->warn_migrate_msecs)) {
+               int chain = 0;
+
+               if (tdb_get_flags(ctx->wtdb->tdb) & TDB_INCOMPATIBLE_HASH) {
+                       chain = tdb_jenkins_hash(&key) %
+                               tdb_hash_size(ctx->wtdb->tdb);
+               }
+
+               DEBUG(0, ("db_ctdb_fetch_locked for %s key %s, chain %d "
+                         "needed %d attempts, %d milliseconds, "
+                         "chainlock: %f ms, CTDB %f ms\n",
+                         tdb_name(ctx->wtdb->tdb),
                          hex_encode_talloc(talloc_tos(),
                                            (unsigned char *)key.dptr,
                                            key.dsize),
-                         migrate_attempts));
+                         chain,
+                         migrate_attempts, duration_msecs,
+                         chainlock_time * 1000.0,
+                         ctdb_time * 1000.0));
        }
 
        GetTimeOfDay(&crec->lock_time);
@@ -1147,7 +1178,7 @@ again:
        result->value.dptr = NULL;
 
        if ((result->value.dsize != 0)
-           && !(result->value.dptr = (uint8 *)talloc_memdup(
+           && !(result->value.dptr = (uint8_t *)talloc_memdup(
                         result, ctdb_data.dptr + sizeof(crec->header),
                         result->value.dsize))) {
                DEBUG(0, ("talloc failed\n"));
@@ -1195,89 +1226,98 @@ static struct db_record *db_ctdb_try_fetch_locked(struct db_context *db,
        return fetch_locked_internal(ctx, mem_ctx, key, true);
 }
 
-/*
-  fetch (unlocked, no migration) operation on ctdb
- */
-static NTSTATUS db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
-                             TDB_DATA key, TDB_DATA *data)
+struct db_ctdb_parse_record_state {
+       void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
+       void *private_data;
+       bool ask_for_readonly_copy;
+       bool done;
+};
+
+static void db_ctdb_parse_record_parser(
+       TDB_DATA key, struct ctdb_ltdb_header *header,
+       TDB_DATA data, void *private_data)
 {
-       struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
-                                                       struct db_ctdb_ctx);
-       NTSTATUS status;
-       TDB_DATA ctdb_data;
+       struct db_ctdb_parse_record_state *state =
+               (struct db_ctdb_parse_record_state *)private_data;
+       state->parser(key, data, state->private_data);
+}
 
-       if (ctx->transaction) {
-               return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
-       }
+static void db_ctdb_parse_record_parser_nonpersistent(
+       TDB_DATA key, struct ctdb_ltdb_header *header,
+       TDB_DATA data, void *private_data)
+{
+       struct db_ctdb_parse_record_state *state =
+               (struct db_ctdb_parse_record_state *)private_data;
 
-       if (db->persistent) {
-               return db_ctdb_fetch_persistent(ctx, mem_ctx, key, data);
+       if (db_ctdb_can_use_local_hdr(header, true)) {
+               state->parser(key, data, state->private_data);
+               state->done = true;
+       } else {
+               /*
+                * We found something in the db, so it seems that this record,
+                * while not usable locally right now, is popular. Ask for a
+                * R/O copy.
+                */
+               state->ask_for_readonly_copy = true;
        }
+}
 
-       /* try a direct fetch */
-       ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
+static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
+                                    void (*parser)(TDB_DATA key,
+                                                   TDB_DATA data,
+                                                   void *private_data),
+                                    void *private_data)
+{
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(
+               db->private_data, struct db_ctdb_ctx);
+       struct db_ctdb_parse_record_state state;
+       NTSTATUS status;
 
-       /*
-        * See if we have a valid record and we are the dmaster. If so, we can
-        * take the shortcut and just return it.
-        * we bypass the dmaster check for persistent databases
-        */
-       if (db_ctdb_own_record(ctdb_data, true)) {
-               /* we are the dmaster - avoid the ctdb protocol op */
+       state.parser = parser;
+       state.private_data = private_data;
 
-               data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
+       if (ctx->transaction != NULL) {
+               struct db_ctdb_transaction_handle *h = ctx->transaction;
+               bool found;
 
-               data->dptr = (uint8 *)talloc_memdup(
-                       mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
-                       data->dsize);
+               /*
+                * Transactions only happen for persistent db's.
+                */
 
-               SAFE_FREE(ctdb_data.dptr);
+               found = parse_newest_in_marshall_buffer(
+                       h->m_write, key, db_ctdb_parse_record_parser, &state);
 
-               if (data->dptr == NULL) {
-                       return NT_STATUS_NO_MEMORY;
+               if (found) {
+                       return NT_STATUS_OK;
                }
-               return NT_STATUS_OK;
        }
 
-       SAFE_FREE(ctdb_data.dptr);
-
-       /*
-        * We weren't able to get it locally - ask ctdb to fetch it for us.
-        * If we already had *something*, it's probably worth making a local
-        * read-only copy.
-        */
-       status = ctdbd_fetch(messaging_ctdbd_connection(), ctx->db_id, key,
-                            mem_ctx, data,
-                            ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header));
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
+       if (db->persistent) {
+               /*
+                * Persistent db, but not found in the transaction buffer
+                */
+               return db_ctdb_ltdb_parse(
+                       ctx, key, db_ctdb_parse_record_parser, &state);
        }
 
-       return status;
-}
-
-static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
-                                    void (*parser)(TDB_DATA key,
-                                                   TDB_DATA data,
-                                                   void *private_data),
-                                    void *private_data)
-{
-       NTSTATUS status;
-       TDB_DATA data;
+       state.done = false;
+       state.ask_for_readonly_copy = false;
 
-       status = db_ctdb_fetch(db, talloc_tos(), key, &data);
-       if (!NT_STATUS_IS_OK(status)) {
-               return status;
+       status = db_ctdb_ltdb_parse(
+               ctx, key, db_ctdb_parse_record_parser_nonpersistent, &state);
+       if (NT_STATUS_IS_OK(status) && state.done) {
+               return NT_STATUS_OK;
        }
-       parser(key, data, private_data);
-       TALLOC_FREE(data.dptr);
-       return NT_STATUS_OK;
+
+       return ctdbd_parse(messaging_ctdbd_connection(), ctx->db_id, key,
+                          state.ask_for_readonly_copy, parser, private_data);
 }
 
 struct traverse_state {
        struct db_context *db;
        int (*fn)(struct db_record *rec, void *private_data);
        void *private_data;
+       int count;
 };
 
 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
@@ -1334,6 +1374,7 @@ static int db_ctdb_traverse(struct db_context *db,
                                      void *private_data),
                            void *private_data)
 {
+       NTSTATUS status;
         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
                                                         struct db_ctdb_ctx);
        struct traverse_state state;
@@ -1341,6 +1382,7 @@ static int db_ctdb_traverse(struct db_context *db,
        state.db = db;
        state.fn = fn;
        state.private_data = private_data;
+       state.count = 0;
 
        if (db->persistent) {
                struct tdb_context *ltdb = ctx->wtdb->tdb;
@@ -1360,7 +1402,6 @@ static int db_ctdb_traverse(struct db_context *db,
                        struct db_context *newkeys = db_open_rbt(talloc_tos());
                        struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
                        struct ctdb_rec_data *rec=NULL;
-                       NTSTATUS status;
                        int i;
                        int count = 0;
 
@@ -1370,9 +1411,8 @@ static int db_ctdb_traverse(struct db_context *db,
 
                        for (i=0; i<mbuf->count; i++) {
                                TDB_DATA key;
-                               rec =db_ctdb_marshall_loop_next(mbuf, rec,
-                                                               NULL, NULL,
-                                                               &key, NULL);
+                               rec = db_ctdb_marshall_loop_next_key(
+                                       mbuf, rec, &key);
                                SMB_ASSERT(rec != NULL);
 
                                if (!tdb_exists(ltdb, key)) {
@@ -1392,9 +1432,11 @@ static int db_ctdb_traverse(struct db_context *db,
                return ret;
        }
 
-
-       ctdbd_traverse(ctx->db_id, traverse_callback, &state);
-       return 0;
+       status = ctdbd_traverse(ctx->db_id, traverse_callback, &state);
+       if (!NT_STATUS_IS_OK(status)) {
+               return -1;
+       }
+       return state.count;
 }
 
 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
@@ -1411,12 +1453,16 @@ static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_da
 {
        struct traverse_state *state = (struct traverse_state *)private_data;
        struct db_record rec;
+
+       ZERO_STRUCT(rec);
+       rec.db = state->db;
        rec.key = key;
        rec.value = data;
        rec.store = db_ctdb_store_deny;
        rec.delete_rec = db_ctdb_delete_deny;
-       rec.private_data = state->db;
+       rec.private_data = NULL;
        state->fn(&rec, state->private_data);
+       state->count++;
 }
 
 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
@@ -1435,11 +1481,13 @@ static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TD
                return 0;
        }
 
+       ZERO_STRUCT(rec);
+       rec.db = state->db;
        rec.key = kbuf;
        rec.value = dbuf;
        rec.store = db_ctdb_store_deny;
        rec.delete_rec = db_ctdb_delete_deny;
-       rec.private_data = state->db;
+       rec.private_data = NULL;
 
        if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
                /* a deleted record */
@@ -1448,6 +1496,7 @@ static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TD
        rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
        rec.value.dptr += sizeof(struct ctdb_ltdb_header);
 
+       state->count++;
        return state->fn(&rec, state->private_data);
 }
 
@@ -1456,6 +1505,7 @@ static int db_ctdb_traverse_read(struct db_context *db,
                                           void *private_data),
                                 void *private_data)
 {
+       NTSTATUS status;
         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
                                                         struct db_ctdb_ctx);
        struct traverse_state state;
@@ -1463,6 +1513,7 @@ static int db_ctdb_traverse_read(struct db_context *db,
        state.db = db;
        state.fn = fn;
        state.private_data = private_data;
+       state.count = 0;
 
        if (db->persistent) {
                /* for persistent databases we don't need to do a ctdb traverse,
@@ -1470,8 +1521,11 @@ static int db_ctdb_traverse_read(struct db_context *db,
                return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
        }
 
-       ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
-       return 0;
+       status = ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
+       if (!NT_STATUS_IS_OK(status)) {
+               return -1;
+       }
+       return state.count;
 }
 
 static int db_ctdb_get_seqnum(struct db_context *db)
@@ -1481,18 +1535,22 @@ static int db_ctdb_get_seqnum(struct db_context *db)
        return tdb_get_seqnum(ctx->wtdb->tdb);
 }
 
-static int db_ctdb_get_flags(struct db_context *db)
+static void db_ctdb_id(struct db_context *db, const uint8_t **id,
+                      size_t *idlen)
 {
-        struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
-                                                        struct db_ctdb_ctx);
-       return tdb_get_flags(ctx->wtdb->tdb);
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(
+               db->private_data, struct db_ctdb_ctx);
+
+       *id = (uint8_t *)&ctx->db_id;
+       *idlen = sizeof(ctx->db_id);
 }
 
 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                                const char *name,
                                int hash_size, int tdb_flags,
                                int open_flags, mode_t mode,
-                               enum dbwrap_lock_order lock_order)
+                               enum dbwrap_lock_order lock_order,
+                               uint64_t dbwrap_flags)
 {
        struct db_context *result;
        struct db_ctdb_ctx *db_ctdb;
@@ -1520,6 +1578,13 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                return NULL;
        }
 
+       result->name = talloc_strdup(result, name);
+       if (result->name == NULL) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
        db_ctdb->transaction = NULL;
        db_ctdb->db = result;
 
@@ -1542,7 +1607,8 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        result->lock_order = lock_order;
 
        /* only pass through specific flags */
-       tdb_flags &= TDB_SEQNUM;
+       tdb_flags &= TDB_SEQNUM|TDB_VOLATILE|
+               TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST;
 
        /* honor permissions if user has specified O_CREAT */
        if (open_flags & O_CREAT) {
@@ -1564,10 +1630,34 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                return NULL;
        }
 
-       lp_ctx = loadparm_init_s3(db_path, loadparm_s3_context());
+       if (!result->persistent &&
+           (dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
+       {
+               TDB_DATA indata;
+
+               indata = make_tdb_data((uint8_t *)&db_ctdb->db_id,
+                                      sizeof(db_ctdb->db_id));
 
-       db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags,
-                                     O_RDWR, 0, lp_ctx);
+               status = ctdbd_control_local(
+                       conn, CTDB_CONTROL_SET_DB_READONLY, 0, 0, indata,
+                       NULL, NULL, &cstatus);
+               if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
+                       DEBUG(1, ("CTDB_CONTROL_SET_DB_READONLY failed: "
+                                 "%s, %d\n", nt_errstr(status), cstatus));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+       }
+
+       lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
+
+       if (hash_size == 0) {
+               hash_size = lpcfg_tdb_hash_size(lp_ctx, db_path);
+       }
+
+       db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size,
+                                     lpcfg_tdb_flags(lp_ctx, tdb_flags),
+                                     O_RDWR, 0);
        talloc_unlink(db_path, lp_ctx);
        if (db_ctdb->wtdb == NULL) {
                DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
@@ -1586,6 +1676,14 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                }
        }
 
+       db_ctdb->warn_unlock_msecs = lp_parm_int(-1, "ctdb",
+                                                "unlock_warn_threshold", 5);
+       db_ctdb->warn_migrate_attempts = lp_parm_int(-1, "ctdb",
+                                                    "migrate_attempts", 10);
+       db_ctdb->warn_migrate_msecs = lp_parm_int(-1, "ctdb",
+                                                 "migrate_duration", 5000);
+       db_ctdb->warn_locktime_msecs = lp_ctdb_locktime_warn_threshold();
+
        result->private_data = (void *)db_ctdb;
        result->fetch_locked = db_ctdb_fetch_locked;
        result->try_fetch_locked = db_ctdb_try_fetch_locked;
@@ -1593,27 +1691,14 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        result->traverse = db_ctdb_traverse;
        result->traverse_read = db_ctdb_traverse_read;
        result->get_seqnum = db_ctdb_get_seqnum;
-       result->get_flags = db_ctdb_get_flags;
        result->transaction_start = db_ctdb_transaction_start;
        result->transaction_commit = db_ctdb_transaction_commit;
        result->transaction_cancel = db_ctdb_transaction_cancel;
+       result->id = db_ctdb_id;
+       result->stored_callback = NULL;
 
        DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
                 name, db_ctdb->db_id));
 
        return result;
 }
-
-#else /* CLUSTER_SUPPORT */
-
-struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
-                               const char *name,
-                               int hash_size, int tdb_flags,
-                               int open_flags, mode_t mode,
-                               enum dbwrap_lock_order lock_order)
-{
-       DEBUG(3, ("db_open_ctdb: no cluster support!\n"));
-       return NULL;
-}
-
-#endif