first cut at adding full transactions for ctdb to samba3
authorAndrew Tridgell <tridge@samba.org>
Thu, 7 Aug 2008 06:20:05 +0000 (16:20 +1000)
committerMichael Adam <obnox@samba.org>
Wed, 13 Aug 2008 09:54:08 +0000 (11:54 +0200)
(This used to be commit f91a3e0f7b7737c1d0667cd961ea950e2b93e592)

18 files changed:
source3/Makefile.in
source3/groupdb/mapping_tdb.c
source3/include/ctdbd_conn.h
source3/include/dbwrap.h
source3/lib/account_pol.c
source3/lib/ctdbd_conn.c
source3/lib/dbwrap.c
source3/lib/dbwrap_ctdb.c
source3/lib/dbwrap_tdb2.c [deleted file]
source3/lib/messages_ctdbd.c
source3/lib/sharesec.c
source3/nmbd/nmbd.c
source3/passdb/pdb_tdb.c
source3/passdb/secrets.c
source3/registry/reg_backend_db.c
source3/smbd/server.c
source3/utils/status.c
source3/winbindd/winbindd.c

index 6fe234da70090dfd0c91c6828b8dce2110925d69..aac58d38a8f048510e2d0d59c7af31b0727eac51 100644 (file)
@@ -230,7 +230,7 @@ MODULES = $(VFS_MODULES) $(PDB_MODULES) $(RPC_MODULES) $(IDMAP_MODULES) \
 
 TDB_OBJ = lib/util_tdb.o \
          lib/dbwrap.o lib/dbwrap_tdb.o \
-         lib/dbwrap_tdb2.o lib/dbwrap_ctdb.o \
+         lib/dbwrap_ctdb.o \
          lib/dbwrap_rbt.o @LIBTDB_STATIC@
 
 SMBLDAP_OBJ = @SMBLDAP@ @SMBLDAPUTIL@
index f1168887f986fc831c2d8f5fe52e8bb635f8e18b..59c692297b3f7aa89f1735920acf8998f4e473a9 100644 (file)
@@ -38,7 +38,7 @@ static bool init_group_mapping(void)
                return true;
        }
 
-       db = db_open_trans(NULL, state_path("group_mapping.tdb"), 0,
+       db = db_open(NULL, state_path("group_mapping.tdb"), 0,
                           TDB_DEFAULT, O_RDWR|O_CREAT, 0600);
        if (db == NULL) {
                DEBUG(0, ("Failed to open group mapping database: %s\n",
index 3ea895d133e5c74be69e3395dd4128984c9b9d93..39f50c2cfcf31e0248a3cc1edc912b0f852efa34 100644 (file)
@@ -69,4 +69,9 @@ NTSTATUS ctdbd_persistent_store(struct ctdbd_connection *conn, uint32_t db_id, T
 NTSTATUS ctdbd_start_persistent_update(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key, TDB_DATA data);
 NTSTATUS ctdbd_cancel_persistent_update(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key, TDB_DATA data);
 
+NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32 opcode, 
+                            uint64_t srvid, uint32_t flags, TDB_DATA data, 
+                            TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
+                            int *cstatus);
+
 #endif /* _CTDBD_CONN_H */
index 1f388165dba2aa42064df5d91e7e5bd12a3ac369..46833fabdcec619c132b0f2508c11fb105dd58db 100644 (file)
@@ -54,11 +54,6 @@ struct db_context *db_open(TALLOC_CTX *mem_ctx,
                           int hash_size, int tdb_flags,
                           int open_flags, mode_t mode);
 
-struct db_context *db_open_trans(TALLOC_CTX *mem_ctx,
-                                const char *name,
-                                int hash_size, int tdb_flags,
-                                int open_flags, mode_t mode);
-
 struct db_context *db_open_rbt(TALLOC_CTX *mem_ctx);
 
 struct db_context *db_open_tdb(TALLOC_CTX *mem_ctx,
@@ -72,7 +67,6 @@ struct db_context *db_open_tdb2(TALLOC_CTX *mem_ctx,
                                int open_flags, mode_t mode);
 
 struct messaging_context;
-void db_tdb2_setup_messaging(struct messaging_context *msg_ctx, bool server);
 
 #ifdef CLUSTER_SUPPORT
 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
index e415d10d8ec2fec7e4f28733c6f974757fa70d2d..7fc565121c7568b868ee2b8d4bde19ad787d29b1 100644 (file)
@@ -212,12 +212,12 @@ bool init_account_policy(void)
                return True;
        }
 
-       db = db_open_trans(NULL, state_path("account_policy.tdb"), 0, TDB_DEFAULT,
+       db = db_open(NULL, state_path("account_policy.tdb"), 0, TDB_DEFAULT,
                     O_RDWR, 0600);
 
        if (db == NULL) { /* the account policies files does not exist or open
                           * failed, try to create a new one */
-               db = db_open_trans(NULL, state_path("account_policy.tdb"), 0,
+               db = db_open(NULL, state_path("account_policy.tdb"), 0,
                             TDB_DEFAULT, O_RDWR|O_CREAT, 0600);
                if (db == NULL) {
                        DEBUG(0,("Failed to open account policy database\n"));
index 51fefa93d420c2b95ef46c4150606964719764ec..966fd06602e6032e7b95805313ba2422a204db55 100644 (file)
@@ -43,7 +43,7 @@ struct ctdbd_connection {
 
 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
                              uint32_t vnn, uint32 opcode, 
-                             uint64_t srvid, TDB_DATA data, 
+                             uint64_t srvid, uint32_t flags, TDB_DATA data, 
                              TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
                              int *cstatus);
 
@@ -83,7 +83,7 @@ static NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn,
 
        int cstatus;
        return ctdbd_control(conn, CTDB_CURRENT_NODE,
-                            CTDB_CONTROL_REGISTER_SRVID, srvid,
+                            CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
                             tdb_null, NULL, NULL, &cstatus);
 }
 
@@ -95,7 +95,7 @@ static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32 *vnn)
        int32_t cstatus=-1;
        NTSTATUS status;
        status = ctdbd_control(conn,
-                              CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0,
+                              CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
                               tdb_null, NULL, NULL, &cstatus);
        if (!NT_STATUS_IS_OK(status)) {
                cluster_fatal("ctdbd_control failed\n");
@@ -680,7 +680,8 @@ NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn,
  */
 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
                              uint32_t vnn, uint32 opcode, 
-                             uint64_t srvid, TDB_DATA data, 
+                             uint64_t srvid, uint32_t flags, 
+                             TDB_DATA data, 
                              TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
                              int *cstatus)
 {
@@ -732,6 +733,11 @@ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
                cluster_fatal("cluster dispatch daemon control write error\n");
        }
 
+       if (flags & CTDB_CTRL_FLAG_NOREPLY) {
+               TALLOC_FREE(new_conn);
+               return NT_STATUS_OK;
+       }
+
        status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
 
        if (!NT_STATUS_IS_OK(status)) {
@@ -776,7 +782,7 @@ bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32 vnn, pid_t pid)
        data.dptr = (uint8_t*)&pid;
        data.dsize = sizeof(pid);
 
-       status = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0,
+       status = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0, 0,
                               data, NULL, NULL, &cstatus);
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(0, (__location__ " ctdb_control for process_exists "
@@ -801,7 +807,7 @@ char *ctdbd_dbpath(struct ctdbd_connection *conn,
        data.dsize = sizeof(db_id);
 
        status = ctdbd_control(conn, CTDB_CURRENT_NODE,
-                              CTDB_CONTROL_GETDBPATH, 0, data, 
+                              CTDB_CONTROL_GETDBPATH, 0, 0, data, 
                               mem_ctx, &data, &cstatus);
        if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
                DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n"));
@@ -829,7 +835,7 @@ NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
                               persistent
                               ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
                               : CTDB_CONTROL_DB_ATTACH,
-                              0, data, NULL, &data, &cstatus);
+                              0, 0, data, NULL, &data, &cstatus);
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(0, (__location__ " ctdb_control for db_attach "
                          "failed: %s\n", nt_errstr(status)));
@@ -852,7 +858,7 @@ NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
        data.dsize = sizeof(*db_id);
 
        status = ctdbd_control(conn, CTDB_CURRENT_NODE,
-                              CTDB_CONTROL_ENABLE_SEQNUM, 0, data, 
+                              CTDB_CONTROL_ENABLE_SEQNUM, 0, 0, data, 
                               NULL, NULL, &cstatus);
        if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
                DEBUG(0,(__location__ " ctdb_control for enable seqnum "
@@ -1087,7 +1093,7 @@ NTSTATUS ctdbd_traverse(uint32 db_id,
        data.dsize = sizeof(t);
 
        status = ctdbd_control(conn, CTDB_CURRENT_NODE,
-                              CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid,
+                              CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, 0,
                               data, NULL, NULL, &cstatus);
 
        if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
@@ -1194,7 +1200,7 @@ NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn,
        data.dsize = sizeof(p);
 
        return ctdbd_control(conn, CTDB_CURRENT_NODE, 
-                            CTDB_CONTROL_TCP_CLIENT, 
+                            CTDB_CONTROL_TCP_CLIENT, 0,
                             CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL);
 }
 
@@ -1233,7 +1239,7 @@ static NTSTATUS ctdbd_persistent_call(struct ctdbd_connection *conn, uint32_t op
        recdata.dsize = length;
 
        status = ctdbd_control(conn, CTDB_CURRENT_NODE, opcode,
-                              0, recdata, NULL, NULL, &cstatus);
+                              0, 0, recdata, NULL, NULL, &cstatus);
        if (cstatus != 0) {
                return NT_STATUS_INTERNAL_DB_CORRUPTION;
        }
@@ -1275,6 +1281,20 @@ NTSTATUS ctdbd_cancel_persistent_update(struct ctdbd_connection *conn, uint32_t
                                db_id, key, data);
 }
 
+/*
+  call a control on the local node
+ */
+NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32 opcode, 
+                            uint64_t srvid, uint32_t flags, TDB_DATA data, 
+                            TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
+                            int *cstatus)
+{
+       return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data, mem_ctx, outdata, cstatus);
+}
+
+
+
+
 #else
 
 NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
index ff200c35c0662b5a639bb4fcd789eefbac0b3b94..73c2761a1b3d069cf482a31757e9c150fe1f00a0 100644 (file)
@@ -43,7 +43,7 @@ static int dbwrap_fallback_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
 }
 
 /**
- * If you need transaction support use db_open_trans()
+ * open a database
  */
 struct db_context *db_open(TALLOC_CTX *mem_ctx,
                           const char *name,
@@ -105,80 +105,6 @@ struct db_context *db_open(TALLOC_CTX *mem_ctx,
        return result;
 }
 
-/**
- * If you use this you can only modify with a transaction
- */
-struct db_context *db_open_trans(TALLOC_CTX *mem_ctx,
-                                const char *name,
-                                int hash_size, int tdb_flags,
-                                int open_flags, mode_t mode)
-{
-       bool use_tdb2 = lp_parm_bool(-1, "dbwrap", "use_tdb2", false);
-#ifdef CLUSTER_SUPPORT
-       const char *sockname = lp_ctdbd_socket();
-#endif
-
-       if (tdb_flags & TDB_CLEAR_IF_FIRST) {
-               DEBUG(0,("db_open_trans: called with TDB_CLEAR_IF_FIRST: %s\n",
-                        name));
-               return NULL;
-       }
-
-#ifdef CLUSTER_SUPPORT
-       if(!sockname || !*sockname) {
-               sockname = CTDB_PATH;
-       }
-
-       if (lp_clustering()) {
-               const char *partname;
-
-               if (!socket_exist(sockname)) {
-                       DEBUG(1, ("ctdb socket does not exist - is ctdb not "
-                                 "running?\n"));
-                       return NULL;
-               }
-
-               /* ctdb only wants the file part of the name */
-               partname = strrchr(name, '/');
-               if (partname) {
-                       partname++;
-               } else {
-                       partname = name;
-               }
-               /* allow ctdb for individual databases to be disabled */
-               if (lp_parm_bool(-1, "ctdb", partname, true)) {
-                       struct db_context *result = NULL;
-                       result = db_open_ctdb(mem_ctx, partname, hash_size,
-                                             tdb_flags, open_flags, mode);
-                       if (result == NULL) {
-                               DEBUG(0,("failed to attach to ctdb %s\n",
-                                        partname));
-                       }
-                       return result;
-               }
-       }
-#endif
-
-       if (use_tdb2) {
-               const char *partname;
-               /* tdb2 only wants the file part of the name */
-               partname = strrchr(name, '/');
-               if (partname) {
-                       partname++;
-               } else {
-                       partname = name;
-               }
-               /* allow ctdb for individual databases to be disabled */
-               if (lp_parm_bool(-1, "tdb2", partname, true)) {
-                       return db_open_tdb2(mem_ctx, partname, hash_size,
-                                           tdb_flags, open_flags, mode);
-               }
-       }
-
-       return db_open_tdb(mem_ctx, name, hash_size,
-                          tdb_flags, open_flags, mode);
-}
-
 NTSTATUS dbwrap_delete_bystring(struct db_context *db, const char *key)
 {
        struct db_record *rec;
index 77319464f932bf399bc6ac9596058771581bd414..c1ca350de92587e4d3e5fed4fb66958cfedab381 100644 (file)
 #include "ctdb_private.h"
 #include "ctdbd_conn.h"
 
+struct db_ctdb_transaction_handle {
+       struct db_ctdb_ctx *ctx;
+       bool in_replay;
+       /* we store the reads and writes done under a transaction one
+          list stores both reads and writes, the other just writes
+       */
+       struct ctdb_marshall_buffer *m_all;
+       struct ctdb_marshall_buffer *m_write;
+};
+
 struct db_ctdb_ctx {
        struct tdb_wrap *wtdb;
        uint32 db_id;
+       struct db_ctdb_transaction_handle *transaction;
 };
 
 struct db_ctdb_rec {
@@ -58,6 +69,584 @@ static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
        return status;
 }
 
+
+
+/*
+  form a ctdb_rec_data record from a key/data pair
+  
+  note that header may be NULL. If not NULL then it is included in the data portion
+  of the record
+ */
+static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,      
+                                                 TDB_DATA key, 
+                                                 struct ctdb_ltdb_header *header,
+                                                 TDB_DATA data)
+{
+       size_t length;
+       struct ctdb_rec_data *d;
+
+       length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
+               data.dsize + (header?sizeof(*header):0);
+       d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
+       if (d == NULL) {
+               return NULL;
+       }
+       d->length = length;
+       d->reqid = reqid;
+       d->keylen = key.dsize;
+       memcpy(&d->data[0], key.dptr, key.dsize);
+       if (header) {
+               d->datalen = data.dsize + sizeof(*header);
+               memcpy(&d->data[key.dsize], header, sizeof(*header));
+               memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
+       } else {
+               d->datalen = data.dsize;
+               memcpy(&d->data[key.dsize], data.dptr, data.dsize);
+       }
+       return d;
+}
+
+
+/* helper function for marshalling multiple records */
+static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
+                                              struct ctdb_marshall_buffer *m,
+                                              uint64_t db_id,
+                                              uint32_t reqid,
+                                              TDB_DATA key,
+                                              struct ctdb_ltdb_header *header,
+                                              TDB_DATA data)
+{
+       struct ctdb_rec_data *r;
+       size_t m_size, r_size;
+       struct ctdb_marshall_buffer *m2;
+
+       r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
+       if (r == NULL) {
+               talloc_free(m);
+               return NULL;
+       }
+
+       if (m == NULL) {
+               m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
+               if (m == NULL) {
+                       return NULL;
+               }
+               m->db_id = db_id;
+       }
+
+       m_size = talloc_get_size(m);
+       r_size = talloc_get_size(r);
+
+       m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
+       if (m2 == NULL) {
+               talloc_free(m);
+               return NULL;
+       }
+
+       memcpy(m_size + (uint8_t *)m2, r, r_size);
+
+       talloc_free(r);
+
+       m2->count++;
+
+       return m2;
+}
+
+/* we've finished marshalling, return a data blob with the marshalled records */
+static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
+{
+       TDB_DATA data;
+       data.dptr = (uint8_t *)m;
+       data.dsize = talloc_get_size(m);
+       return data;
+}
+
+/* 
+   loop over a marshalling buffer 
+   
+     - pass r==NULL to start
+     - loop the number of times indicated by m->count
+*/
+static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
+                                                    uint32_t *reqid,
+                                                    struct ctdb_ltdb_header *header,
+                                                    TDB_DATA *key, TDB_DATA *data)
+{
+       if (r == NULL) {
+               r = (struct ctdb_rec_data *)&m->data[0];
+       } else {
+               r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
+       }
+
+       if (reqid != NULL) {
+               *reqid = r->reqid;
+       }
+       
+       if (key != NULL) {
+               key->dptr   = &r->data[0];
+               key->dsize  = r->keylen;
+       }
+       if (data != NULL) {
+               data->dptr  = &r->data[r->keylen];
+               data->dsize = r->datalen;
+               if (header != NULL) {
+                       data->dptr += sizeof(*header);
+                       data->dsize -= sizeof(*header);
+               }
+       }
+
+       if (header != NULL) {
+               if (r->datalen < sizeof(*header)) {
+                       return NULL;
+               }
+               *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
+       }
+
+       return r;
+}
+
+
+
+/* start a transaction on a database */
+static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
+{
+       tdb_transaction_cancel(h->ctx->wtdb->tdb);
+       return 0;
+}
+
+/* start a transaction on a database */
+static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
+{
+       struct db_record *rh;
+       TDB_DATA key;
+       struct ctdb_ltdb_header header;
+       TALLOC_CTX *tmp_ctx;
+       const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
+       int ret;
+       struct db_ctdb_ctx *ctx = h->ctx;
+       TDB_DATA data;
+
+       key.dptr = discard_const(keyname);
+       key.dsize = strlen(keyname);
+
+again:
+       tmp_ctx = talloc_new(h);
+
+       rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
+       if (rh == NULL) {
+               DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+       talloc_free(rh);
+
+       ret = tdb_transaction_start(ctx->wtdb->tdb);
+       if (ret != 0) {
+               DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       data = tdb_fetch(ctx->wtdb->tdb, key);
+       if ((data.dptr == NULL) ||
+           (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
+           ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
+               SAFE_FREE(data.dptr);
+               tdb_transaction_cancel(ctx->wtdb->tdb);
+               talloc_free(tmp_ctx);
+               goto again;
+       }
+
+       SAFE_FREE(data.dptr);
+       talloc_free(tmp_ctx);
+
+       return 0;
+}
+
+
+/* start a transaction on a database */
+static int db_ctdb_transaction_start(struct db_context *db)
+{
+       struct db_ctdb_transaction_handle *h;
+       int ret;
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_ctdb_ctx);
+
+       if (!db->persistent) {
+               DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
+                        ctx->db_id));
+               return -1;
+       }
+
+       if (ctx->transaction) {
+               DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
+               return -1;
+       }
+
+       h = talloc_zero(db, struct db_ctdb_transaction_handle);
+       if (h == NULL) {
+               DEBUG(0,(__location__ " oom for transaction handle\n"));                
+               return -1;
+       }
+
+       h->ctx = ctx;
+
+       ret = db_ctdb_transaction_fetch_start(h);
+       if (ret != 0) {
+               talloc_free(h);
+               return -1;
+       }
+
+       talloc_set_destructor(h, db_ctdb_transaction_destructor);
+
+       ctx->transaction = h;
+
+       return 0;
+}
+
+
+
+/*
+  fetch a record inside a transaction
+ */
+static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
+                                    TALLOC_CTX *mem_ctx, 
+                                    TDB_DATA key, TDB_DATA *data)
+{
+       struct db_ctdb_transaction_handle *h = db->transaction;
+
+       *data = tdb_fetch(h->ctx->wtdb->tdb, key);
+
+       if (data->dptr != NULL) {
+               uint8_t *oldptr = (uint8_t *)data->dptr;
+               data->dsize -= sizeof(struct ctdb_ltdb_header);
+               data->dptr = (uint8 *)
+                       talloc_memdup(
+                               mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
+                               data->dsize);
+               SAFE_FREE(oldptr);
+               if (data->dptr == NULL) {
+                       return -1;
+               }
+       }
+
+       if (!h->in_replay) {
+               h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
+               if (h->m_all == NULL) {
+                       DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
+                       data->dsize = 0;
+                       talloc_free(data->dptr);
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+
+static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
+static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
+
+static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
+                                                         TALLOC_CTX *mem_ctx,
+                                                         TDB_DATA key)
+{
+       struct db_record *result;
+       NTSTATUS status;
+       TDB_DATA ctdb_data;
+       int migrate_attempts = 0;
+
+       if (!(result = talloc(mem_ctx, struct db_record))) {
+               DEBUG(0, ("talloc failed\n"));
+               return NULL;
+       }
+
+       result->private_data = ctx->transaction;
+
+       result->key.dsize = key.dsize;
+       result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
+       if (result->key.dptr == NULL) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       result->store = db_ctdb_store_transaction;
+       result->delete_rec = db_ctdb_delete_transaction;
+
+       ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
+       if (ctdb_data.dptr == NULL) {
+               /* create the record */
+               result->value = tdb_null;
+               return result;
+       }
+
+       result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
+       result->value.dptr = NULL;
+
+       if ((result->value.dsize != 0)
+           && !(result->value.dptr = (uint8 *)talloc_memdup(
+                        result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
+                        result->value.dsize))) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+       }
+
+       SAFE_FREE(ctdb_data.dptr);
+
+       return result;
+}
+
+
+/*
+  stores a record inside a transaction
+ */
+static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
+                                    TDB_DATA key, TDB_DATA data)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(h);
+       int ret;
+       TDB_DATA rec;
+       struct ctdb_ltdb_header header;
+
+       /* we need the header so we can update the RSN */
+       rec = tdb_fetch(h->ctx->wtdb->tdb, key);
+       if (rec.dptr == NULL) {
+               /* the record doesn't exist - create one with us as dmaster.
+                  This is only safe because we are in a transaction and this
+                  is a persistent database */
+               ZERO_STRUCT(header);
+               header.dmaster = get_my_vnn();
+       } else {
+               memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
+               SAFE_FREE(rec.dptr);
+       }
+
+       header.rsn++;
+
+       if (!h->in_replay) {
+               h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
+               if (h->m_all == NULL) {
+                       DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+               
+               h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
+               if (h->m_write == NULL) {
+                       DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+       }
+       
+       rec.dptr = talloc_size(tmp_ctx, data.dsize + sizeof(struct ctdb_ltdb_header));
+       if (rec.dptr == NULL) {
+               DEBUG(0,(__location__ " Failed to alloc record\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+       memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
+       memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
+
+       ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
+
+       talloc_free(tmp_ctx);
+       
+       return ret;
+}
+
+
+/* 
+   a record store inside a transaction
+ */
+static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
+{
+       struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
+               rec->private_data, struct db_ctdb_transaction_handle);
+       int ret;
+
+       ret = db_ctdb_transaction_store(h, rec->key, data);
+       if (ret != 0) {
+               return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
+       }
+       return NT_STATUS_OK;
+}
+
+/* 
+   a record delete inside a transaction
+ */
+static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
+{
+       struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
+               rec->private_data, struct db_ctdb_transaction_handle);
+       int ret;
+
+       ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
+       if (ret != 0) {
+               return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
+       }
+       return NT_STATUS_OK;
+}
+
+
+/*
+  replay a transaction
+ */
+static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
+{
+       int ret, i;
+       struct ctdb_rec_data *rec = NULL;
+
+       h->in_replay = true;
+
+       ret = db_ctdb_transaction_fetch_start(h);
+       if (ret != 0) {
+               return ret;
+       }
+
+       for (i=0;i<h->m_all->count;i++) {
+               TDB_DATA key, data;
+
+               rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
+               if (rec == NULL) {
+                       DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
+                       goto failed;
+               }
+
+               if (rec->reqid == 0) {
+                       /* its a store */
+                       if (db_ctdb_transaction_store(h, key, data) != 0) {
+                               goto failed;
+                       }
+               } else {
+                       TDB_DATA data2;
+                       TALLOC_CTX *tmp_ctx = talloc_new(h);
+
+                       if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
+                               talloc_free(tmp_ctx);
+                               goto failed;
+                       }
+                       if (data2.dsize != data.dsize ||
+                           memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
+                               /* the record has changed on us - we have to give up */
+                               talloc_free(tmp_ctx);
+                               goto failed;
+                       }
+                       talloc_free(tmp_ctx);
+               }
+       }
+       
+       return 0;
+
+failed:
+       tdb_transaction_cancel(h->ctx->wtdb->tdb);
+       return -1;
+}
+
+
+/*
+  commit a transaction
+ */
+static int db_ctdb_transaction_commit(struct db_context *db)
+{
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_ctdb_ctx);
+       NTSTATUS rets;
+       int ret;
+       int status;
+       struct timeval timeout;
+       struct db_ctdb_transaction_handle *h = ctx->transaction;
+
+       if (h == NULL) {
+               DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
+               return -1;
+       }
+
+       talloc_set_destructor(h, NULL);
+
+       if (h->m_write == NULL) {
+               /* no changes were made */
+               talloc_free(h);
+               return 0;
+       }
+
+       /* our commit strategy is quite complex.
+
+          - we first try to commit the changes to all other nodes
+
+          - if that works, then we commit locally and we are done
+
+          - if a commit on another node fails, then we need to cancel
+            the transaction, then restart the transaction (thus
+            opening a window of time for a pending recovery to
+            complete), then replay the transaction, checking all the
+            reads and writes (checking that reads give the same data,
+            and writes succeed). Then we retry the transaction to the
+            other nodes
+       */
+
+again:
+       /* tell ctdbd to commit to the other nodes */
+       rets = ctdbd_control_local(messaging_ctdbd_connection(), 
+                                 CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
+                                 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
+       if (!NT_STATUS_IS_OK(rets) || status != 0) {
+               tdb_transaction_cancel(h->ctx->wtdb->tdb);
+               sleep(1);
+               if (ctdb_replay_transaction(h) != 0) {
+                       DEBUG(0,(__location__ " Failed to replay transaction\n"));
+                       ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
+                                           h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
+                                           tdb_null, NULL, NULL, NULL);
+                       h->ctx->transaction = NULL;
+                       talloc_free(h);
+                       return -1;
+               }
+               goto again;
+       }
+
+       /* do the real commit locally */
+       ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
+       if (ret != 0) {
+               DEBUG(0,(__location__ " Failed to commit transaction\n"));
+               ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id, 
+                                   CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
+               h->ctx->transaction = NULL;
+               talloc_free(h);
+               return ret;
+       }
+
+       /* tell ctdbd that we are finished with our local commit */
+       ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
+                           h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
+                           tdb_null, NULL, NULL, NULL);
+       h->ctx->transaction = NULL;
+       talloc_free(h);
+       return 0;
+}
+
+
+/*
+  cancel a transaction
+ */
+static int db_ctdb_transaction_cancel(struct db_context *db)
+{
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_ctdb_ctx);
+       struct db_ctdb_transaction_handle *h = ctx->transaction;
+
+       if (h == NULL) {
+               DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
+               return -1;
+       }
+
+       ctx->transaction = NULL;
+       talloc_free(h);
+       return 0;
+}
+
+
 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 {
        struct db_ctdb_rec *crec = talloc_get_type_abort(
@@ -269,6 +858,10 @@ static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
        TDB_DATA ctdb_data;
        int migrate_attempts = 0;
 
+       if (ctx->transaction != NULL) {
+               return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
+       }
+
        if (!(result = talloc(mem_ctx, struct db_record))) {
                DEBUG(0, ("talloc failed\n"));
                return NULL;
@@ -400,6 +993,10 @@ static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
        NTSTATUS status;
        TDB_DATA ctdb_data;
 
+       if (ctx->transaction) {
+               return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
+       }
+
        /* try a direct fetch */
        ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
 
@@ -643,9 +1240,9 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        result->traverse = db_ctdb_traverse;
        result->traverse_read = db_ctdb_traverse_read;
        result->get_seqnum = db_ctdb_get_seqnum;
-       result->transaction_start = db_ctdb_trans_dummy;
-       result->transaction_commit = db_ctdb_trans_dummy;
-       result->transaction_cancel = db_ctdb_trans_dummy;
+       result->transaction_start = db_ctdb_transaction_start;
+       result->transaction_commit = db_ctdb_transaction_commit;
+       result->transaction_cancel = db_ctdb_transaction_cancel;
 
        DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
                 name, db_ctdb->db_id));
diff --git a/source3/lib/dbwrap_tdb2.c b/source3/lib/dbwrap_tdb2.c
deleted file mode 100644 (file)
index 9f68ef4..0000000
+++ /dev/null
@@ -1,1265 +0,0 @@
-/*
-   Unix SMB/CIFS implementation.
-
-   Database interface wrapper around tdb/ctdb
-
-   Copyright (C) Volker Lendecke 2005-2007
-   Copyright (C) Stefan Metzmacher 2008
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 3 of the License, or
-   (at your option) any later version.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "includes.h"
-#include "librpc/gen_ndr/ndr_messaging.h"
-
-struct db_tdb2_ctx {
-       struct db_context *db;
-       const char *name;
-       struct tdb_wrap *mtdb;
-       const char *mtdb_path;
-       bool master_transaction;
-       struct {
-               int hash_size;
-               int tdb_flags;
-               int open_flags;
-               mode_t mode;
-       } open;
-       struct tdb_wrap *ltdb;
-       const char *ltdb_path;
-       bool local_transaction;
-       int transaction;
-       bool out_of_sync;
-       uint32_t lseqnum;
-       uint32_t mseqnum;
-#define DB_TDB2_MASTER_SEQNUM_KEYSTR "DB_TDB2_MASTER_SEQNUM_KEYSTR"
-       TDB_DATA mseqkey;
-       uint32_t max_buffer_size;
-       uint32_t current_buffer_size;
-       struct dbwrap_tdb2_changes changes;
-};
-
-
-static NTSTATUS db_tdb2_store(struct db_record *rec, TDB_DATA data, int flag);
-static NTSTATUS db_tdb2_delete(struct db_record *rec);
-
-static void db_tdb2_queue_change(struct db_tdb2_ctx *db_ctx, const TDB_DATA key);
-static void db_tdb2_send_notify(struct db_tdb2_ctx *db_ctx);
-
-static struct db_context *db_open_tdb2_ex(TALLOC_CTX *mem_ctx,
-                                         const char *name,
-                                         int hash_size, int tdb_flags,
-                                         int open_flags, mode_t mode,
-                                         const struct dbwrap_tdb2_changes *chgs);
-
-static int db_tdb2_sync_from_master(struct db_tdb2_ctx *db_ctx,
-                                   const struct dbwrap_tdb2_changes *changes);
-
-static int db_tdb2_open_master(struct db_tdb2_ctx *db_ctx, bool transaction,
-                              const struct dbwrap_tdb2_changes *changes);
-static int db_tdb2_commit_local(struct db_tdb2_ctx *db_ctx, uint32_t mseqnum);
-static int db_tdb2_close_master(struct db_tdb2_ctx *db_ctx);
-static int db_tdb2_transaction_cancel(struct db_context *db);
-
-static void db_tdb2_receive_changes(struct messaging_context *msg,
-                                   void *private_data,
-                                   uint32_t msg_type,
-                                   struct server_id server_id,
-                                   DATA_BLOB *data);
-
-static struct messaging_context *global_tdb2_msg_ctx;
-static bool global_tdb2_msg_ctx_initialized;
-
-void db_tdb2_setup_messaging(struct messaging_context *msg_ctx, bool server)
-{
-       global_tdb2_msg_ctx = msg_ctx;
-
-       global_tdb2_msg_ctx_initialized = true;
-
-       if (!server) {
-               return;
-       }
-
-       if (!lp_parm_bool(-1, "dbwrap", "use_tdb2", false)) {
-               return;
-       }
-
-       messaging_register(msg_ctx, NULL, MSG_DBWRAP_TDB2_CHANGES,
-                          db_tdb2_receive_changes);
-}
-
-static struct messaging_context *db_tdb2_get_global_messaging_context(void)
-{
-       struct messaging_context *msg_ctx;
-
-       if (global_tdb2_msg_ctx_initialized) {
-               return global_tdb2_msg_ctx;
-       }
-
-       msg_ctx = messaging_init(NULL, procid_self(),
-                                event_context_init(NULL));
-
-       db_tdb2_setup_messaging(msg_ctx, false);
-
-       return global_tdb2_msg_ctx;
-}
-
-struct tdb_fetch_locked_state {
-       TALLOC_CTX *mem_ctx;
-       struct db_record *result;
-};
-
-static int db_tdb2_fetchlock_parse(TDB_DATA key, TDB_DATA data,
-                                 void *private_data)
-{
-       struct tdb_fetch_locked_state *state =
-               (struct tdb_fetch_locked_state *)private_data;
-
-       state->result = (struct db_record *)talloc_size(
-               state->mem_ctx,
-               sizeof(struct db_record) + key.dsize + data.dsize);
-
-       if (state->result == NULL) {
-               return 0;
-       }
-
-       state->result->key.dsize = key.dsize;
-       state->result->key.dptr = ((uint8 *)state->result)
-               + sizeof(struct db_record);
-       memcpy(state->result->key.dptr, key.dptr, key.dsize);
-
-       state->result->value.dsize = data.dsize;
-
-       if (data.dsize > 0) {
-               state->result->value.dptr = state->result->key.dptr+key.dsize;
-               memcpy(state->result->value.dptr, data.dptr, data.dsize);
-       }
-       else {
-               state->result->value.dptr = NULL;
-       }
-
-       return 0;
-}
-
-static struct db_record *db_tdb2_fetch_locked(struct db_context *db,
-                                             TALLOC_CTX *mem_ctx, TDB_DATA key)
-{
-       struct db_tdb2_ctx *ctx = talloc_get_type_abort(db->private_data,
-                                                       struct db_tdb2_ctx);
-       struct tdb_fetch_locked_state state;
-
-       /* Do not accidently allocate/deallocate w/o need when debug level is lower than needed */
-       if(DEBUGLEVEL >= 10) {
-               char *keystr = hex_encode(NULL, (unsigned char*)key.dptr, key.dsize);
-               DEBUG(10, (DEBUGLEVEL > 10
-                          ? "Locking key %s\n" : "Locking key %.20s\n",
-                          keystr));
-               TALLOC_FREE(keystr);
-       }
-
-       /*
-        * we only support modifications within a
-        * started transaction.
-        */
-       if (ctx->transaction == 0) {
-               DEBUG(0, ("db_tdb2_fetch_locked[%s]: no transaction started\n",
-                         ctx->name));
-               smb_panic("no transaction");
-               return NULL;
-       }
-
-       state.mem_ctx = mem_ctx;
-       state.result = NULL;
-
-       tdb_parse_record(ctx->mtdb->tdb, key, db_tdb2_fetchlock_parse, &state);
-
-       if (state.result == NULL) {
-               db_tdb2_fetchlock_parse(key, tdb_null, &state);
-       }
-
-       if (state.result == NULL) {
-               return NULL;
-       }
-
-       state.result->private_data = talloc_reference(state.result, ctx);
-       state.result->store = db_tdb2_store;
-       state.result->delete_rec = db_tdb2_delete;
-
-       DEBUG(10, ("Allocated locked data 0x%p\n", state.result));
-
-       return state.result;
-}
-
-struct tdb_fetch_state {
-       TALLOC_CTX *mem_ctx;
-       int result;
-       TDB_DATA data;
-};
-
-static int db_tdb2_fetch_parse(TDB_DATA key, TDB_DATA data,
-                              void *private_data)
-{
-       struct tdb_fetch_state *state =
-               (struct tdb_fetch_state *)private_data;
-
-       state->data.dptr = (uint8 *)talloc_memdup(state->mem_ctx, data.dptr,
-                                                 data.dsize);
-       if (state->data.dptr == NULL) {
-               state->result = -1;
-               return 0;
-       }
-
-       state->data.dsize = data.dsize;
-       return 0;
-}
-
-static void db_tdb2_resync_before_read(struct db_tdb2_ctx *db_ctx, TDB_DATA *kbuf)
-{
-       if (db_ctx->mtdb) {
-               return;
-       }
-
-       if (!db_ctx->out_of_sync) {
-               return;
-       }
-
-       /*
-        * this function operates on the local copy,
-        * so hide the DB_TDB2_MASTER_SEQNUM_KEYSTR from the caller.
-        */
-       if (kbuf && (db_ctx->mseqkey.dsize == kbuf->dsize) &&
-           (memcmp(db_ctx->mseqkey.dptr, kbuf->dptr, kbuf->dsize) == 0)) {
-               return;
-       }
-
-       DEBUG(0,("resync_before_read[%s/%s]\n",
-                db_ctx->mtdb_path, db_ctx->ltdb_path));
-
-       db_tdb2_open_master(db_ctx, false, NULL);
-       db_tdb2_close_master(db_ctx);
-}
-
-static int db_tdb2_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
-                        TDB_DATA key, TDB_DATA *pdata)
-{
-       struct db_tdb2_ctx *ctx = talloc_get_type_abort(
-               db->private_data, struct db_tdb2_ctx);
-
-       struct tdb_fetch_state state;
-
-       db_tdb2_resync_before_read(ctx, &key);
-
-       if (ctx->out_of_sync) {
-               DEBUG(0,("out of sync[%s] failing fetch\n",
-                        ctx->ltdb_path));
-               errno = EIO;
-               return -1;
-       }
-
-       state.mem_ctx = mem_ctx;
-       state.result = 0;
-       state.data = tdb_null;
-
-       tdb_parse_record(ctx->ltdb->tdb, key, db_tdb2_fetch_parse, &state);
-
-       if (state.result == -1) {
-               return -1;
-       }
-
-       *pdata = state.data;
-       return 0;
-}
-
-static NTSTATUS db_tdb2_store(struct db_record *rec, TDB_DATA data, int flag)
-{
-       struct db_tdb2_ctx *ctx = talloc_get_type_abort(rec->private_data,
-                                                      struct db_tdb2_ctx);
-       int ret;
-
-       /*
-        * This has a bug: We need to replace rec->value for correct
-        * operation, but right now brlock and locking don't use the value
-        * anymore after it was stored.
-        */
-
-       /* first store it to the master copy */
-       ret = tdb_store(ctx->mtdb->tdb, rec->key, data, flag);
-       if (ret != 0) {
-               return NT_STATUS_UNSUCCESSFUL;
-       }
-
-       /* then store it to the local copy */
-       ret = tdb_store(ctx->ltdb->tdb, rec->key, data, flag);
-       if (ret != 0) {
-               /* try to restore the old value in the master copy */
-               if (rec->value.dptr) {
-                       tdb_store(ctx->mtdb->tdb, rec->key,
-                                 rec->value, TDB_REPLACE);
-               } else {
-                       tdb_delete(ctx->mtdb->tdb, rec->key);
-               }
-               return NT_STATUS_INTERNAL_DB_CORRUPTION;
-       }
-
-       db_tdb2_queue_change(ctx, rec->key);
-
-       return NT_STATUS_OK;
-}
-
-static NTSTATUS db_tdb2_delete(struct db_record *rec)
-{
-       struct db_tdb2_ctx *ctx = talloc_get_type_abort(rec->private_data,
-                                                      struct db_tdb2_ctx);
-       int ret;
-
-       ret = tdb_delete(ctx->mtdb->tdb, rec->key);
-       if (ret != 0) {
-               if (tdb_error(ctx->mtdb->tdb) == TDB_ERR_NOEXIST) {
-                       return NT_STATUS_NOT_FOUND;
-               }
-
-               return NT_STATUS_UNSUCCESSFUL;
-       }
-
-       ret = tdb_delete(ctx->ltdb->tdb, rec->key);
-       if (ret != 0) {
-               /* try to restore the value in the master copy */
-               tdb_store(ctx->mtdb->tdb, rec->key,
-                         rec->value, TDB_REPLACE);
-               return NT_STATUS_INTERNAL_DB_CORRUPTION;
-       }
-
-       db_tdb2_queue_change(ctx, rec->key);
-
-       return NT_STATUS_OK;
-}
-
-struct db_tdb2_traverse_ctx {
-       struct db_tdb2_ctx *db_ctx;
-       int (*f)(struct db_record *rec, void *private_data);
-       void *private_data;
-};
-
-static int db_tdb2_traverse_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
-                               void *private_data)
-{
-       struct db_tdb2_traverse_ctx *ctx =
-               (struct db_tdb2_traverse_ctx *)private_data;
-       struct db_record rec;
-
-       /* this function operates on the master copy */
-
-       rec.key = kbuf;
-       rec.value = dbuf;
-       rec.store = db_tdb2_store;
-       rec.delete_rec = db_tdb2_delete;
-       rec.private_data = ctx->db_ctx;
-
-       return ctx->f(&rec, ctx->private_data);
-}
-
-static int db_tdb2_traverse(struct db_context *db,
-                          int (*f)(struct db_record *rec, void *private_data),
-                          void *private_data)
-{
-       struct db_tdb2_ctx *db_ctx =
-               talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
-       struct db_tdb2_traverse_ctx ctx;
-
-       /*
-        * we only support modifications within a
-        * started transaction.
-        */
-       if (db_ctx->transaction == 0) {
-               DEBUG(0, ("db_tdb2_traverse[%s]: no transaction started\n",
-                         db_ctx->name));
-               smb_panic("no transaction");
-               return -1;
-       }
-
-       /* here we traverse the master copy */
-       ctx.db_ctx = db_ctx;
-       ctx.f = f;
-       ctx.private_data = private_data;
-       return tdb_traverse(db_ctx->mtdb->tdb, db_tdb2_traverse_func, &ctx);
-}
-
-static NTSTATUS db_tdb2_store_deny(struct db_record *rec, TDB_DATA data, int flag)
-{
-       return NT_STATUS_MEDIA_WRITE_PROTECTED;
-}
-
-static NTSTATUS db_tdb2_delete_deny(struct db_record *rec)
-{
-       return NT_STATUS_MEDIA_WRITE_PROTECTED;
-}
-
-static int db_tdb2_traverse_read_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
-                               void *private_data)
-{
-       struct db_tdb2_traverse_ctx *ctx =
-               (struct db_tdb2_traverse_ctx *)private_data;
-       struct db_record rec;
-
-       /*
-        * this function operates on the local copy,
-        * so hide the DB_TDB2_MASTER_SEQNUM_KEYSTR from the caller.
-        */
-       if ((ctx->db_ctx->mseqkey.dsize == kbuf.dsize) &&
-           (memcmp(ctx->db_ctx->mseqkey.dptr, kbuf.dptr, kbuf.dsize) == 0)) {
-               return 0;
-       }
-
-       rec.key = kbuf;
-       rec.value = dbuf;
-       rec.store = db_tdb2_store_deny;
-       rec.delete_rec = db_tdb2_delete_deny;
-       rec.private_data = ctx->db_ctx;
-
-       return ctx->f(&rec, ctx->private_data);
-}
-
-static int db_tdb2_traverse_read(struct db_context *db,
-                          int (*f)(struct db_record *rec, void *private_data),
-                          void *private_data)
-{
-       struct db_tdb2_ctx *db_ctx =
-               talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
-       struct db_tdb2_traverse_ctx ctx;
-       int ret;
-
-       db_tdb2_resync_before_read(db_ctx, NULL);
-
-       if (db_ctx->out_of_sync) {
-               DEBUG(0,("out of sync[%s] failing traverse_read\n",
-                        db_ctx->ltdb_path));
-               errno = EIO;
-               return -1;
-       }
-
-       /* here we traverse the local copy */
-       ctx.db_ctx = db_ctx;
-       ctx.f = f;
-       ctx.private_data = private_data;
-       ret = tdb_traverse_read(db_ctx->ltdb->tdb, db_tdb2_traverse_read_func, &ctx);
-       if (ret > 0) {
-               /* we have filtered one entry */
-               ret--;
-       }
-
-       return ret;
-}
-
-static int db_tdb2_get_seqnum(struct db_context *db)
-
-{
-       struct db_tdb2_ctx *db_ctx =
-               talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
-       uint32_t nlseq;
-       uint32_t nmseq;
-       bool ok;
-
-       nlseq = tdb_get_seqnum(db_ctx->ltdb->tdb);
-
-       if (nlseq == db_ctx->lseqnum) {
-               return db_ctx->mseqnum;
-       }
-
-       ok = tdb_fetch_uint32_byblob(db_ctx->ltdb->tdb,
-                                    db_ctx->mseqkey,
-                                    &nmseq);
-       if (!ok) {
-               /* TODO: what should we do here? */
-               return db_ctx->mseqnum;
-       }
-
-       db_ctx->lseqnum = nlseq;
-       db_ctx->mseqnum = nmseq;
-
-       return db_ctx->mseqnum;
-}
-
-static int db_tdb2_transaction_start(struct db_context *db)
-{
-       struct db_tdb2_ctx *db_ctx =
-               talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
-       int ret;
-
-       if (db_ctx->transaction) {
-               db_ctx->transaction++;
-               return 0;
-       }
-
-       /* we need to open the master tdb in order to */
-       ret = db_tdb2_open_master(db_ctx, true, NULL);
-       if (ret != 0) {
-               return ret;
-       }
-
-       ret = tdb_transaction_start(db_ctx->ltdb->tdb);
-       if (ret != 0) {
-               db_tdb2_close_master(db_ctx);
-               return ret;
-       }
-
-       db_ctx->local_transaction = true;
-       db_ctx->transaction = 1;
-
-       return 0;
-}
-
-static void db_tdb2_queue_change(struct db_tdb2_ctx *db_ctx, const TDB_DATA key)
-{
-       size_t size_needed = 4 + key.dsize;
-       size_t size_new = db_ctx->current_buffer_size + size_needed;
-       uint32_t i;
-       DATA_BLOB *keys;
-
-       db_ctx->changes.num_changes++;
-
-       if (db_ctx->changes.num_changes > 1 &&
-           db_ctx->changes.keys == NULL) {
-               /*
-                * this means we already overflowed
-                */
-               return;
-       }
-
-       if (db_ctx->changes.num_changes == 1) {
-               db_ctx->changes.old_seqnum = db_ctx->mseqnum;
-       }
-
-       for (i=0; i < db_ctx->changes.num_keys; i++) {
-               int ret;
-
-               if (key.dsize != db_ctx->changes.keys[i].length) {
-                       continue;
-               }
-               ret = memcmp(key.dptr, db_ctx->changes.keys[i].data, key.dsize);
-               if (ret != 0) {
-                       continue;
-               }
-
-               /*
-                * the key is already in the list
-                * so we're done
-                */
-               return;
-       }
-
-       if (db_ctx->max_buffer_size < size_new) {
-               goto overflow;
-       }
-
-       keys = TALLOC_REALLOC_ARRAY(db_ctx, db_ctx->changes.keys,
-                                   DATA_BLOB,
-                                   db_ctx->changes.num_keys + 1);
-       if (!keys) {
-               goto overflow;
-       }
-       db_ctx->changes.keys = keys;
-
-       keys[db_ctx->changes.num_keys].data = (uint8_t *)talloc_memdup(keys,
-                                                               key.dptr,
-                                                               key.dsize);
-       if (!keys[db_ctx->changes.num_keys].data) {
-               goto overflow;
-       }
-       keys[db_ctx->changes.num_keys].length = key.dsize;
-       db_ctx->changes.num_keys++;
-       db_ctx->current_buffer_size = size_new;
-
-       return;
-
-overflow:
-       /*
-        * on overflow discard the buffer and let
-        * the others reload the whole tdb
-        */
-       db_ctx->current_buffer_size = 0;
-       db_ctx->changes.num_keys = 0;
-       TALLOC_FREE(db_ctx->changes.keys);
-       return;
-}
-
-static void db_tdb2_send_notify(struct db_tdb2_ctx *db_ctx)
-{
-       enum ndr_err_code ndr_err;
-       bool ok;
-       DATA_BLOB blob;
-       struct messaging_context *msg_ctx;
-       int num_msgs = 0;
-       struct server_id self = procid_self();
-
-       msg_ctx = db_tdb2_get_global_messaging_context();
-
-       db_ctx->changes.name = db_ctx->name;
-
-       DEBUG(10,("%s[%s] size[%u/%u] changes[%u] keys[%u] seqnum[%u=>%u]\n",
-                __FUNCTION__,
-                db_ctx->changes.name,
-                db_ctx->current_buffer_size,
-                db_ctx->max_buffer_size,
-                db_ctx->changes.num_changes,
-                db_ctx->changes.num_keys,
-                db_ctx->changes.old_seqnum,
-                db_ctx->changes.new_seqnum));
-
-       if (db_ctx->changes.num_changes == 0) {
-               DEBUG(10,("db_tdb2_send_notify[%s]: no changes\n",
-                       db_ctx->changes.name));
-               goto done;
-       }
-
-       if (!msg_ctx) {
-               DEBUG(1,("db_tdb2_send_notify[%s]: skipped (no msg ctx)\n",
-                       db_ctx->changes.name));
-               goto done;
-       }
-
-       ndr_err = ndr_push_struct_blob(
-               &blob, talloc_tos(), &db_ctx->changes,
-               (ndr_push_flags_fn_t)ndr_push_dbwrap_tdb2_changes);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               DEBUG(0,("db_tdb2_send_notify[%s]: failed to push changes: %s\n",
-                       db_ctx->changes.name,
-                       nt_errstr(ndr_map_error2ntstatus(ndr_err))));
-               goto done;
-       }
-
-       ok = message_send_all(msg_ctx, MSG_DBWRAP_TDB2_CHANGES,
-                             blob.data, blob.length, &num_msgs);
-       if (!ok) {
-               DEBUG(0,("db_tdb2_send_notify[%s]: failed to send changes\n",
-                       db_ctx->changes.name));
-               goto done;
-       }
-
-       DEBUG(10,("db_tdb2_send_notify[%s]: pid %s send %u messages\n",
-               db_ctx->name, procid_str_static(&self), num_msgs));
-
-done:
-       TALLOC_FREE(db_ctx->changes.keys);
-       ZERO_STRUCT(db_ctx->changes);
-
-       return;
-}
-
-static void db_tdb2_receive_changes(struct messaging_context *msg,
-                                   void *private_data,
-                                   uint32_t msg_type,
-                                   struct server_id server_id,
-                                   DATA_BLOB *data)
-{
-       enum ndr_err_code ndr_err;
-       struct dbwrap_tdb2_changes changes;
-       struct db_context *db;
-       struct server_id self;
-
-       if (procid_is_me(&server_id)) {
-               DEBUG(0,("db_tdb2_receive_changes: ignore selfpacket\n"));
-               return;
-       }
-
-       self = procid_self();
-
-       DEBUG(10,("db_tdb2_receive_changes: from %s to %s\n",
-               procid_str(debug_ctx(), &server_id),
-               procid_str(debug_ctx(), &self)));
-
-       ndr_err = ndr_pull_struct_blob_all(
-               data, talloc_tos(), &changes,
-               (ndr_pull_flags_fn_t)ndr_pull_dbwrap_tdb2_changes);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               DEBUG(0,("db_tdb2_receive_changes: failed to pull changes: %s\n",
-                       nt_errstr(ndr_map_error2ntstatus(ndr_err))));
-               goto done;
-       }
-
-       if(DEBUGLEVEL >= 10) {
-               NDR_PRINT_DEBUG(dbwrap_tdb2_changes, &changes);
-       }
-
-       /* open the db, this will sync it */
-       db = db_open_tdb2_ex(talloc_tos(), changes.name, 0,
-                            0, O_RDWR, 0600, &changes);
-       TALLOC_FREE(db);
-done:
-       return;
-}
-
-static int db_tdb2_transaction_commit(struct db_context *db)
-{
-       struct db_tdb2_ctx *db_ctx =
-               talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
-       int ret;
-       uint32_t mseqnum;
-
-       if (db_ctx->transaction == 0) {
-               return -1;
-       } else if (db_ctx->transaction > 1) {
-               db_ctx->transaction--;
-               return 0;
-       }
-
-       mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
-       db_ctx->changes.new_seqnum = mseqnum;
-
-       /* first commit to the master copy */
-       ret = tdb_transaction_commit(db_ctx->mtdb->tdb);
-       db_ctx->master_transaction = false;
-       if (ret != 0) {
-               int saved_errno = errno;
-               db_tdb2_transaction_cancel(db);
-               errno = saved_errno;
-               return ret;
-       }
-
-       /*
-        * Note: as we've already commited the changes to the master copy
-        *       so we ignore errors in the following functions
-        */
-       ret = db_tdb2_commit_local(db_ctx, mseqnum);
-       if (ret == 0) {
-               db_ctx->out_of_sync = false;
-       } else {
-               db_ctx->out_of_sync = true;
-       }
-
-       db_ctx->transaction = 0;
-
-       db_tdb2_close_master(db_ctx);
-
-       db_tdb2_send_notify(db_ctx);
-
-       return 0;
-}
-
-static int db_tdb2_transaction_cancel(struct db_context *db)
-{
-       struct db_tdb2_ctx *db_ctx =
-               talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
-       int saved_errno;
-       int ret;
-
-       if (db_ctx->transaction == 0) {
-               return -1;
-       }
-       if (db_ctx->transaction > 1) {
-               db_ctx->transaction--;
-               return 0;
-       }
-
-       /* cancel the transaction and close the master copy */
-       ret = db_tdb2_close_master(db_ctx);
-       saved_errno = errno;
-
-       /* now cancel on the local copy and ignore any error */
-       tdb_transaction_cancel(db_ctx->ltdb->tdb);
-       db_ctx->local_transaction = false;
-
-       db_ctx->transaction = 0;
-
-       errno = saved_errno;
-       return ret;
-}
-
-static int db_tdb2_open_master(struct db_tdb2_ctx *db_ctx, bool transaction,
-                              const struct dbwrap_tdb2_changes *changes)
-{
-       int ret;
-
-       db_ctx->mtdb = tdb_wrap_open(db_ctx,
-                                    db_ctx->mtdb_path,
-                                    db_ctx->open.hash_size,
-                                    db_ctx->open.tdb_flags|TDB_NOMMAP|TDB_SEQNUM,
-                                    db_ctx->open.open_flags,
-                                    db_ctx->open.mode);
-       if (db_ctx->mtdb == NULL) {
-               DEBUG(0, ("Could not open master tdb[%s]: %s\n",
-                         db_ctx->mtdb_path,
-                         strerror(errno)));
-               return -1;
-       }
-       DEBUG(10,("open_master[%s]\n", db_ctx->mtdb_path));
-
-       if (!db_ctx->ltdb) {
-               struct stat st;
-
-               if (fstat(tdb_fd(db_ctx->mtdb->tdb), &st) == 0) {
-                       db_ctx->open.mode = st.st_mode;
-               }
-
-               /* make sure the local one uses the same hash size as the master one */
-               db_ctx->open.hash_size = tdb_hash_size(db_ctx->mtdb->tdb);
-
-               db_ctx->ltdb = tdb_wrap_open(db_ctx,
-                                            db_ctx->ltdb_path,
-                                            db_ctx->open.hash_size,
-                                            db_ctx->open.tdb_flags|TDB_SEQNUM,
-                                            db_ctx->open.open_flags|O_CREAT,
-                                            db_ctx->open.mode);
-               if (db_ctx->ltdb == NULL) {
-                       DEBUG(0, ("Could not open local tdb[%s]: %s\n",
-                                 db_ctx->ltdb_path,
-                                 strerror(errno)));
-                       TALLOC_FREE(db_ctx->mtdb);
-                       return -1;
-               }
-               DEBUG(10,("open_local[%s]\n", db_ctx->ltdb_path));
-       }
-
-       if (transaction) {
-               ret = tdb_transaction_start(db_ctx->mtdb->tdb);
-               if (ret != 0) {
-                       DEBUG(0,("open failed to start transaction[%s]\n",
-                                db_ctx->mtdb_path));
-                       db_tdb2_close_master(db_ctx);
-                       return ret;
-               }
-               db_ctx->master_transaction = true;
-       }
-
-       ret = db_tdb2_sync_from_master(db_ctx, changes);
-       if (ret != 0) {
-               DEBUG(0,("open failed to sync from master[%s]\n",
-                        db_ctx->ltdb_path));
-               db_tdb2_close_master(db_ctx);
-               return ret;
-       }
-
-       return 0;
-}
-
-static int db_tdb2_commit_local(struct db_tdb2_ctx *db_ctx, uint32_t mseqnum)
-{
-       bool ok;
-       int ret;
-
-       /* first fetch the master seqnum */
-       db_ctx->mseqnum = mseqnum;
-
-       /* now we try to store the master seqnum in the local tdb */
-       ok = tdb_store_uint32_byblob(db_ctx->ltdb->tdb,
-                                    db_ctx->mseqkey,
-                                    db_ctx->mseqnum);
-       if (!ok) {
-               tdb_transaction_cancel(db_ctx->ltdb->tdb);
-               db_ctx->local_transaction = false;
-               DEBUG(0,("local failed[%s] store mseq[%u]\n",
-                        db_ctx->ltdb_path, db_ctx->mseqnum));
-               return -1;
-       }
-
-       /* now commit all changes to the local tdb */
-       ret = tdb_transaction_commit(db_ctx->ltdb->tdb);
-       db_ctx->local_transaction = false;
-       if (ret != 0) {
-               DEBUG(0,("local failed[%s] commit mseq[%u]\n",
-                        db_ctx->ltdb_path, db_ctx->mseqnum));
-               return ret;
-       }
-
-       /*
-        * and update the cached local seqnum this is needed to
-        * let us cache the master seqnum.
-        */
-       db_ctx->lseqnum = tdb_get_seqnum(db_ctx->ltdb->tdb);
-       DEBUG(10,("local updated[%s] mseq[%u]\n",
-                 db_ctx->ltdb_path, db_ctx->mseqnum));
-
-       return 0;
-}
-
-static int db_tdb2_close_master(struct db_tdb2_ctx *db_ctx)
-{
-       if (db_ctx->master_transaction) {
-               tdb_transaction_cancel(db_ctx->mtdb->tdb);
-       }
-       db_ctx->master_transaction = false;
-       /* now we can close the master handle */
-       TALLOC_FREE(db_ctx->mtdb);
-
-       DEBUG(10,("close_master[%s] ok\n", db_ctx->mtdb_path));
-       return 0;
-}
-
-static int db_tdb2_traverse_sync_all_func(TDB_CONTEXT *tdb,
-                                         TDB_DATA kbuf, TDB_DATA dbuf,
-                                         void *private_data)
-{
-       struct db_tdb2_traverse_ctx *ctx =
-               (struct db_tdb2_traverse_ctx *)private_data;
-       uint32_t *seqnum = (uint32_t *)ctx->private_data;
-       int ret;
-
-       DEBUG(10,("sync_entry[%s]\n", ctx->db_ctx->mtdb_path));
-
-       /* Do not accidently allocate/deallocate w/o need when debug level is lower than needed */
-       if(DEBUGLEVEL >= 10) {
-               char *keystr = hex_encode(NULL, (unsigned char*)kbuf.dptr, kbuf.dsize);
-               DEBUG(10, (DEBUGLEVEL > 10
-                          ? "Locking key %s\n" : "Locking key %.20s\n",
-                          keystr));
-               TALLOC_FREE(keystr);
-       }
-
-       ret = tdb_store(ctx->db_ctx->ltdb->tdb, kbuf, dbuf, TDB_INSERT);
-       if (ret != 0) {
-               DEBUG(0,("sync_entry[%s] %d: %s\n",
-                       ctx->db_ctx->ltdb_path, ret,
-                       tdb_errorstr(ctx->db_ctx->ltdb->tdb)));
-               return ret;
-       }
-
-       *seqnum = tdb_get_seqnum(ctx->db_ctx->mtdb->tdb);
-
-       return 0;
-}
-
-static int db_tdb2_sync_all(struct db_tdb2_ctx *db_ctx, uint32_t *seqnum)
-{
-       struct db_tdb2_traverse_ctx ctx;
-       int ret;
-
-       ret = tdb_wipe_all(db_ctx->ltdb->tdb);
-       if (ret != 0) {
-               DEBUG(0,("tdb_wipe_all[%s] failed %d: %s\n",
-                        db_ctx->ltdb_path, ret,
-                        tdb_errorstr(db_ctx->ltdb->tdb)));
-               return ret;
-       }
-
-       ctx.db_ctx = db_ctx;
-       ctx.f = NULL;
-       ctx.private_data = seqnum;
-       ret = tdb_traverse_read(db_ctx->mtdb->tdb,
-                               db_tdb2_traverse_sync_all_func,
-                               &ctx);
-       DEBUG(10,("db_tdb2_sync_all[%s] count[%d]\n",
-                 db_ctx->mtdb_path, ret));
-       if (ret < 0) {
-               return ret;
-       }
-
-       return 0;
-}
-
-static int db_tdb2_sync_changes(struct db_tdb2_ctx *db_ctx,
-                               const struct dbwrap_tdb2_changes *changes,
-                               uint32_t *seqnum)
-{
-       uint32_t cseqnum;
-       uint32_t mseqnum;
-       uint32_t i;
-       int ret;
-       bool need_full_sync = false;
-
-       DEBUG(10,("db_tdb2_sync_changes[%s] changes[%u]\n",
-                 changes->name, changes->num_changes));
-       if(DEBUGLEVEL >= 10) {
-               NDR_PRINT_DEBUG(dbwrap_tdb2_changes, discard_const(changes));
-       }
-
-       /* for the master tdb for reading */
-       ret = tdb_lockall_read(db_ctx->mtdb->tdb);
-       if (ret != 0) {
-               DEBUG(0,("tdb_lockall_read[%s] %d\n", db_ctx->mtdb_path, ret));
-               return ret;
-       }
-
-       /* first fetch seqnum we know about */
-       cseqnum = db_tdb2_get_seqnum(db_ctx->db);
-
-       /* then fetch the master seqnum */
-       mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
-
-       if (cseqnum == mseqnum) {
-               DEBUG(10,("db_tdb2_sync_changes[%s] uptodate[%u]\n",
-                         db_ctx->mtdb_path, mseqnum));
-               /* we hit a race before and now noticed we're uptodate */
-               goto done;
-       }
-
-       /* now see if the changes describe what we need */
-       if (changes->old_seqnum != cseqnum) {
-               need_full_sync = true;
-       }
-
-       if (changes->new_seqnum != mseqnum) {
-               need_full_sync = true;
-       }
-
-       /* this was the overflow case */
-       if (changes->num_keys == 0) {
-               need_full_sync = true;
-       }
-
-       if (need_full_sync) {
-               tdb_unlockall_read(db_ctx->mtdb->tdb);
-               DEBUG(0,("fallback to full sync[%s] seq[%u=>%u] keys[%u]\n",
-                       db_ctx->ltdb_path, cseqnum, mseqnum,
-                       changes->num_keys));
-               return db_tdb2_sync_all(db_ctx, &mseqnum);
-       }
-
-       for (i=0; i < changes->num_keys; i++) {
-               const char *op = NULL;
-               bool del = false;
-               TDB_DATA key;
-               TDB_DATA val;
-
-               key.dsize = changes->keys[i].length;
-               key.dptr = changes->keys[i].data;
-
-               val = tdb_fetch(db_ctx->mtdb->tdb, key);
-               ret = tdb_error(db_ctx->mtdb->tdb);
-               if (ret == TDB_ERR_NOEXIST) {
-                       del = true;
-               } else if (ret != 0) {
-                       DEBUG(0,("sync_changes[%s] failure %d\n",
-                                db_ctx->mtdb_path, ret));
-                       goto failed;
-               }
-
-               if (del) {
-                       op = "delete";
-                       ret = tdb_delete(db_ctx->ltdb->tdb, key);
-                       DEBUG(10,("sync_changes[%s] delete key[%u] %d\n",
-                                 db_ctx->mtdb_path, i, ret));
-               } else {
-                       op = "store";
-                       ret = tdb_store(db_ctx->ltdb->tdb, key,
-                                       val, TDB_REPLACE);
-                       DEBUG(10,("sync_changes[%s] store key[%u] %d\n",
-                                 db_ctx->mtdb_path, i, ret));
-               }
-               SAFE_FREE(val.dptr);
-               if (ret != 0) {
-                       DEBUG(0,("sync_changes[%s] %s key[%u] failed %d\n",
-                                db_ctx->mtdb_path, op, i, ret));
-                       goto failed;
-               }
-       }
-
-done:
-       tdb_unlockall_read(db_ctx->mtdb->tdb);
-
-       *seqnum = mseqnum;
-       return 0;
-failed:
-       tdb_unlockall_read(db_ctx->mtdb->tdb);
-       return ret;
-}
-
-static int db_tdb2_sync_from_master(struct db_tdb2_ctx *db_ctx,
-                                   const struct dbwrap_tdb2_changes *changes)
-{
-       int ret;
-       uint32_t cseqnum;
-       uint32_t mseqnum;
-       bool force = false;
-
-       /* first fetch seqnum we know about */
-       cseqnum = db_tdb2_get_seqnum(db_ctx->db);
-
-       /* then fetch the master seqnum */
-       mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
-
-       if (db_ctx->lseqnum == 0) {
-               force = true;
-       }
-
-       if (!force && cseqnum == mseqnum) {
-               DEBUG(10,("uptodate[%s] mseq[%u]\n",
-                         db_ctx->ltdb_path, mseqnum));
-               /* the local copy is uptodate, close the master db */
-               return 0;
-       }
-       DEBUG(10,("not uptodate[%s] seq[%u=>%u]\n",
-                 db_ctx->ltdb_path, cseqnum, mseqnum));
-
-       ret = tdb_transaction_start(db_ctx->ltdb->tdb);
-       if (ret != 0) {
-               DEBUG(0,("failed to start transaction[%s] %d: %s\n",
-                        db_ctx->ltdb_path, ret,
-                        tdb_errorstr(db_ctx->ltdb->tdb)));
-               db_ctx->out_of_sync = true;
-               return ret;
-       }
-       db_ctx->local_transaction = true;
-
-       if (changes && !force) {
-               ret = db_tdb2_sync_changes(db_ctx, changes, &mseqnum);
-               if (ret != 0) {
-                       db_ctx->out_of_sync = true;
-                       tdb_transaction_cancel(db_ctx->ltdb->tdb);
-                       db_ctx->local_transaction = false;
-                       return ret;
-               }
-       } else {
-               ret = db_tdb2_sync_all(db_ctx, &mseqnum);
-               if (ret != 0) {
-                       db_ctx->out_of_sync = true;
-                       tdb_transaction_cancel(db_ctx->ltdb->tdb);
-                       db_ctx->local_transaction = false;
-                       return ret;
-               }
-       }
-
-       ret = db_tdb2_commit_local(db_ctx, mseqnum);
-       if (ret != 0) {
-               db_ctx->out_of_sync = true;
-               return ret;
-       }
-
-       db_ctx->out_of_sync = false;
-
-       return 0;
-}
-
-static int db_tdb2_ctx_destructor(struct db_tdb2_ctx *db_tdb2)
-{
-       db_tdb2_close_master(db_tdb2);
-       if (db_tdb2->local_transaction) {
-               tdb_transaction_cancel(db_tdb2->ltdb->tdb);
-       }
-       db_tdb2->local_transaction = false;
-       TALLOC_FREE(db_tdb2->ltdb);
-       return 0;
-}
-
-static struct db_context *db_open_tdb2_ex(TALLOC_CTX *mem_ctx,
-                                         const char *name,
-                                         int hash_size, int tdb_flags,
-                                         int open_flags, mode_t mode,
-                                         const struct dbwrap_tdb2_changes *chgs)
-{
-       struct db_context *result = NULL;
-       struct db_tdb2_ctx *db_tdb2;
-       int ret;
-       const char *md;
-       const char *ld;
-       const char *bn;
-
-       bn = strrchr_m(name, '/');
-       if (bn) {
-               bn++;
-               DEBUG(3,("db_open_tdb2: use basename[%s] of abspath[%s]:\n",
-                       bn, name));
-       } else {
-               bn = name;
-       }
-
-       md = lp_parm_const_string(-1, "dbwrap_tdb2", "master directory", NULL);
-       if (!md) {
-               DEBUG(0,("'dbwrap_tdb2:master directory' empty\n"));
-               goto fail;
-       }
-
-       ld = lp_parm_const_string(-1, "dbwrap_tdb2", "local directory", NULL);
-       if (!ld) {
-               DEBUG(0,("'dbwrap_tdb2:local directory' empty\n"));
-               goto fail;
-       }
-
-       result = TALLOC_ZERO_P(mem_ctx, struct db_context);
-       if (result == NULL) {
-               DEBUG(0, ("talloc failed\n"));
-               goto fail;
-       }
-
-       result->private_data = db_tdb2 = TALLOC_ZERO_P(result, struct db_tdb2_ctx);
-       if (db_tdb2 == NULL) {
-               DEBUG(0, ("talloc failed\n"));
-               goto fail;
-       }
-
-       db_tdb2->db = result;
-
-       db_tdb2->open.hash_size = hash_size;
-       db_tdb2->open.tdb_flags = tdb_flags;
-       db_tdb2->open.open_flags= open_flags;
-       db_tdb2->open.mode      = mode;
-
-       db_tdb2->max_buffer_size = lp_parm_ulong(-1, "dbwrap_tdb2",
-                                                "notify buffer size", 512);
-
-       db_tdb2->name = talloc_strdup(db_tdb2, bn);
-       if (db_tdb2->name == NULL) {
-               DEBUG(0, ("talloc_strdup failed\n"));
-               goto fail;
-       }
-
-       db_tdb2->mtdb_path = talloc_asprintf(db_tdb2, "%s/%s",
-                                            md, bn);
-       if (db_tdb2->mtdb_path == NULL) {
-               DEBUG(0, ("talloc_asprintf failed\n"));
-               goto fail;
-       }
-
-       db_tdb2->ltdb_path = talloc_asprintf(db_tdb2, "%s/%s.tdb2",
-                                            ld, bn);
-       if (db_tdb2->ltdb_path == NULL) {
-               DEBUG(0, ("talloc_asprintf failed\n"));
-               goto fail;
-       }
-
-       db_tdb2->mseqkey = string_term_tdb_data(DB_TDB2_MASTER_SEQNUM_KEYSTR);
-
-       /*
-        * this implicit opens the local one if as it's not yet open
-        * it syncs the local copy.
-        */
-       ret = db_tdb2_open_master(db_tdb2, false, chgs);
-       if (ret != 0) {
-               goto fail;
-       }
-
-       ret = db_tdb2_close_master(db_tdb2);
-       if (ret != 0) {
-               goto fail;
-       }
-
-       DEBUG(10,("db_open_tdb2[%s] opened with mseq[%u]\n",
-                 db_tdb2->name, db_tdb2->mseqnum));
-
-       result->fetch_locked = db_tdb2_fetch_locked;
-       result->fetch = db_tdb2_fetch;
-       result->traverse = db_tdb2_traverse;
-       result->traverse_read = db_tdb2_traverse_read;
-       result->get_seqnum = db_tdb2_get_seqnum;
-       result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
-       result->transaction_start = db_tdb2_transaction_start;
-       result->transaction_commit = db_tdb2_transaction_commit;
-       result->transaction_cancel = db_tdb2_transaction_cancel;
-
-       talloc_set_destructor(db_tdb2, db_tdb2_ctx_destructor);
-
-       return result;
-
- fail:
-       if (result != NULL) {
-               TALLOC_FREE(result);
-       }
-       return NULL;
-}
-
-struct db_context *db_open_tdb2(TALLOC_CTX *mem_ctx,
-                               const char *name,
-                               int hash_size, int tdb_flags,
-                               int open_flags, mode_t mode)
-{
-       return db_open_tdb2_ex(mem_ctx, name, hash_size,
-                              tdb_flags, open_flags, mode, NULL);
-}
index cf780c4e47666893ff6f427b176602f49fb0d946..847ab0fe6a9733f943debeab7d6303dde0a26688 100644 (file)
@@ -55,8 +55,6 @@ struct ctdbd_connection *messaging_ctdbd_connection(void)
                        DEBUG(0,("messaging_init failed\n"));
                        return NULL;
                }
-
-               db_tdb2_setup_messaging(msg, false);
        }
 
        if (global_ctdb_connection_pid != getpid()) {
index d89434782dfeed2f91466b9d434b7893db16f401..e64611f1549b368bb8df0ca8f6b5032a1a2a1399 100644 (file)
@@ -51,7 +51,7 @@ static bool share_info_db_init(void)
                return True;
        }
 
-       share_db = db_open_trans(NULL, state_path("share_info.tdb"), 0,
+       share_db = db_open(NULL, state_path("share_info.tdb"), 0,
                                 TDB_DEFAULT, O_RDWR|O_CREAT, 0600);
        if (share_db == NULL) {
                DEBUG(0,("Failed to open share info database %s (%s)\n",
index 46c3f1dd3c90c7e1914e4f3facf45dc9d0ec8d52..83005f05bda8a6cee9250d20a0f9d69d9555a971 100644 (file)
@@ -765,8 +765,6 @@ static bool open_sockets(bool isdaemon, int port)
        };
        TALLOC_CTX *frame = talloc_stackframe(); /* Setup tos. */
 
-       db_tdb2_setup_messaging(NULL, false);
-
        load_case_tables();
 
        global_nmb_port = NMB_PORT;
@@ -861,8 +859,6 @@ static bool open_sockets(bool isdaemon, int port)
                return 1;
        }
 
-       db_tdb2_setup_messaging(nmbd_messaging_context(), true);
-
        if ( !reload_nmbd_services(False) )
                return(-1);
 
index 9c8c7b85179044387e5691dbad041688e64c1bd2..824e61b063dcafe6765a84f96700c7e3182f6f97 100644 (file)
@@ -834,7 +834,7 @@ static bool tdbsam_open( const char *name )
 
        /* Try to open tdb passwd.  Create a new one if necessary */
 
-       db_sam = db_open_trans(NULL, name, 0, TDB_DEFAULT, O_CREAT|O_RDWR, 0600);
+       db_sam = db_open(NULL, name, 0, TDB_DEFAULT, O_CREAT|O_RDWR, 0600);
        if (db_sam == NULL) {
                DEBUG(0, ("tdbsam_open: Failed to open/create TDB passwd "
                          "[%s]\n", name));
index fc01aa19b975aa15b30427c27ba532e38ce20e24..4527ae712766e47fb0a719450d312e15e573c7bd 100644 (file)
@@ -64,8 +64,8 @@ bool secrets_init(void)
                return false;
        }
 
-       db_ctx = db_open_trans(NULL, fname, 0,
-                              TDB_DEFAULT, O_RDWR|O_CREAT, 0600);
+       db_ctx = db_open(NULL, fname, 0,
+                        TDB_DEFAULT, O_RDWR|O_CREAT, 0600);
 
        if (db_ctx == NULL) {
                DEBUG(0,("Failed to open %s\n", fname));
index 489f076ce75ee19e14c25133bf4b7aad4e47ef23..6f4c614b9a5a003154092cce685668c254b4d912 100644 (file)
@@ -390,10 +390,10 @@ WERROR regdb_init(void)
                return WERR_OK;
        }
 
-       regdb = db_open_trans(NULL, state_path("registry.tdb"), 0,
+       regdb = db_open(NULL, state_path("registry.tdb"), 0,
                              REG_TDB_FLAGS, O_RDWR, 0600);
        if (!regdb) {
-               regdb = db_open_trans(NULL, state_path("registry.tdb"), 0,
+               regdb = db_open(NULL, state_path("registry.tdb"), 0,
                                      REG_TDB_FLAGS, O_RDWR|O_CREAT, 0600);
                if (!regdb) {
                        werr = ntstatus_to_werror(map_nt_error_from_unix(errno));
@@ -444,7 +444,7 @@ WERROR regdb_open( void )
        
        become_root();
 
-       regdb = db_open_trans(NULL, state_path("registry.tdb"), 0,
+       regdb = db_open(NULL, state_path("registry.tdb"), 0,
                              REG_TDB_FLAGS, O_RDWR, 0600);
        if ( !regdb ) {
                result = ntstatus_to_werror( map_nt_error_from_unix( errno ) );
index 4a0e60a8d7adc143b4e09b208567e6ed40d78ea6..05f11f6753c7ab7f43d736af7fc3f5232f94aac7 100644 (file)
@@ -1097,8 +1097,6 @@ extern void build_options(bool screen);
 
        TimeInit();
 
-       db_tdb2_setup_messaging(NULL, false);
-
 #ifdef HAVE_SET_AUTH_PARAMETERS
        set_auth_parameters(argc,argv);
 #endif
@@ -1230,11 +1228,6 @@ extern void build_options(bool screen);
        if (smbd_messaging_context() == NULL)
                exit(1);
 
-       /*
-        * Do this before reload_services.
-        */
-       db_tdb2_setup_messaging(smbd_messaging_context(), true);
-
        if (!reload_services(False))
                return(-1);     
 
index 45de872db6cade74bd14938097ebd82d77d0ccef..1a66af949a1830135f2657ca6789d65c745894ba 100644 (file)
@@ -381,8 +381,6 @@ static int traverse_sessionid(struct db_record *db, void *state)
                goto done;
        }
 
-       db_tdb2_setup_messaging(msg_ctx, true);
-
        if (!lp_load(get_dyn_CONFIGFILE(),False,False,False,True)) {
                fprintf(stderr, "Can't load %s - run testparm to debug it\n",
                        get_dyn_CONFIGFILE());
index ec20d8c1951ecef824383dc39102c5a2130441ae..f75af64f8f7000562c3e8a2ca914540155c48cb9 100644 (file)
@@ -1024,8 +1024,6 @@ int main(int argc, char **argv, char **envp)
 
        load_case_tables();
 
-       db_tdb2_setup_messaging(NULL, false);
-
        /* Initialise for running in non-root mode */
 
        sec_init();
@@ -1116,8 +1114,6 @@ int main(int argc, char **argv, char **envp)
                exit(1);
        }
 
-       db_tdb2_setup_messaging(winbind_messaging_context(), true);
-
        if (!reload_services_file(NULL)) {
                DEBUG(0, ("error opening config file\n"));
                exit(1);