r22775: For the cluster code I've developed a wrapper around tdb to put different
authorVolker Lendecke <vlendec@samba.org>
Thu, 10 May 2007 10:42:13 +0000 (10:42 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 17:21:56 +0000 (12:21 -0500)
database backends in place dynamically.

The main abstractions are db_context and db_record, it should be mainly
self-describing, see include/dbwrap.h.  You open the db just as you would open
a tdb, this time with db_open(). If you want to fetch a record, just do the
db->fetch() call, if you want to do operations on it, you need to get it with
fetch_locked().

I added dbwrap_file.c (not heavily tested lately) as an example for what can
be done with that abstraction, uses a file per key. So if anybody is willing
to shape that up, we might have a chance on reiserfs again.... :-)

This abstraction works fine for brlock.tdb, locking.tdb, connections.tdb and
sessionid.tdb. It should work fine for the others as well, I just did not yet
get around to convert them.

If nobody loudly screams NO, then I will import the code that uses this soon.

Volker
(This used to be commit e9d7484ca246cfca4a1fd23be35edc2783136ebe)

source3/Makefile.in
source3/include/dbwrap.h [new file with mode: 0644]
source3/include/includes.h
source3/lib/dbwrap.c [new file with mode: 0644]
source3/lib/dbwrap_file.c [new file with mode: 0644]
source3/lib/dbwrap_tdb.c [new file with mode: 0644]

index 0346217ff7fd613307823869ab1e7e53bf387245..93e3dd0e44328c7898d68d55bd09a2f13fc6c4ca 100644 (file)
@@ -206,7 +206,8 @@ TDBBASE_OBJ = lib/tdb/common/tdb.o lib/tdb/common/dump.o lib/tdb/common/error.o
        lib/tdb/common/open.o lib/tdb/common/transaction.o \
        lib/tdb/common/traverse.o
 
-TDB_OBJ = $(TDBBASE_OBJ) lib/util_tdb.o
+TDB_OBJ = $(TDBBASE_OBJ) lib/util_tdb.o\
+       lib/dbwrap.o lib/dbwrap_tdb.o
 
 SMBLDAP_OBJ = @SMBLDAP@ @SMBLDAPUTIL@
 
diff --git a/source3/include/dbwrap.h b/source3/include/dbwrap.h
new file mode 100644 (file)
index 0000000..17d36b8
--- /dev/null
@@ -0,0 +1,51 @@
+/* 
+   Unix SMB/CIFS implementation.
+   Database interface wrapper around tdb
+   Copyright (C) Volker Lendecke 2005-2007
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#ifndef __FILEDB_H__
+#define __FILEDB_H__
+
+struct db_record {
+       TDB_DATA key, value;
+       NTSTATUS (*store)(struct db_record *rec, TDB_DATA data, int flag);
+       NTSTATUS (*delete_rec)(struct db_record *rec);
+       void *private_data;
+};
+
+struct db_context {
+       struct db_record *(*fetch_locked)(struct db_context *db,
+                                         TALLOC_CTX *mem_ctx,
+                                         TDB_DATA key);
+       int (*fetch)(struct db_context *db, TALLOC_CTX *mem_ctx,
+                    TDB_DATA key, TDB_DATA *data);
+       int (*traverse)(struct db_context *db,
+                       int (*f)(struct db_record *db,
+                                void *private_data),
+                       void *private_data);
+       int (*get_seqnum)(struct db_context *db);
+       void *private_data;
+};
+
+struct db_context *db_open(TALLOC_CTX *mem_ctx,
+                          const char *name,
+                          int hash_size, int tdb_flags,
+                          int open_flags, mode_t mode);
+
+
+#endif /* __FILEDB_H__ */
index 9a368f8084fcfb3c8c03ecd01073dd19dafb22ed..5acd7abc3600b3883747250668ebf031248432b5 100644 (file)
@@ -703,6 +703,7 @@ typedef int BOOL;
 #include "spnego.h"
 #include "rpc_client.h"
 #include "event.h"
+#include "dbwrap.h"
 
 /*
  * Type for wide character dirent structure.
diff --git a/source3/lib/dbwrap.c b/source3/lib/dbwrap.c
new file mode 100644 (file)
index 0000000..9f74a9e
--- /dev/null
@@ -0,0 +1,61 @@
+/* 
+   Unix SMB/CIFS implementation.
+   Database interface wrapper
+   Copyright (C) Jim McDonough <jmcd@us.ibm.com> 2006
+
+   Major code contributions from Aleksey Fedoseev (fedoseev@ru.ibm.com)
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+
+/*
+ * Fall back using fetch_locked if no genuine fetch operation is provided
+ */
+
+static int dbwrap_fallback_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
+                                TDB_DATA key, TDB_DATA *data)
+{
+       struct db_record *rec;
+
+       if (!(rec = db->fetch_locked(db, mem_ctx, key))) {
+               return -1;
+       }
+
+       data->dsize = rec->value.dsize;
+       data->dptr = talloc_move(mem_ctx, &rec->value.dptr);
+       TALLOC_FREE(rec);
+       return 0;
+}
+
+struct db_context *db_open(TALLOC_CTX *mem_ctx,
+                          const char *name,
+                          int hash_size, int tdb_flags,
+                          int open_flags, mode_t mode)
+{
+       struct db_context *result = NULL;
+
+       if (result == NULL) {
+               result = db_open_tdb(mem_ctx, name, hash_size,
+                                    tdb_flags, open_flags, mode);
+       }
+
+       if ((result != NULL) && (result->fetch == NULL)) {
+               result->fetch = dbwrap_fallback_fetch;
+       }
+
+       return result;
+}
diff --git a/source3/lib/dbwrap_file.c b/source3/lib/dbwrap_file.c
new file mode 100644 (file)
index 0000000..9e18c6b
--- /dev/null
@@ -0,0 +1,413 @@
+/* 
+   Unix SMB/CIFS implementation.
+   Database interface using a file per record
+   Copyright (C) Volker Lendecke 2005
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+/*
+ * Be aware that this is just sample code that has not seen too much testing
+ */
+
+#include "includes.h"
+
+struct db_file_ctx {
+       const char *dirname;
+
+       /* We only support one locked record at a time -- everything else
+        * would lead to a potential deadlock anyway! */
+       struct db_record *locked_record;
+};
+
+struct db_locked_file {
+       int fd;
+       uint8 hash;
+       const char *name;
+       const char *path;
+       struct db_file_ctx *parent;
+};
+
+/* Copy from statcache.c... */
+
+static uint32 fsh(const uint8 *p, int len)
+{
+        uint32 n = 0;
+       int i;
+        for (i=0; i<len; i++) {
+                n = ((n << 5) + n) ^ (uint32)(p[i]);
+        }
+        return n;
+}
+
+static int db_locked_file_destr(struct db_locked_file *data)
+{
+       if (data->parent != NULL) {
+               data->parent->locked_record = NULL;
+       }
+
+       if (close(data->fd) != 0) {
+               DEBUG(3, ("close failed: %s\n", strerror(errno)));
+               return -1;
+       }
+
+       return 0;
+}
+
+static NTSTATUS db_file_store(struct db_record *rec, TDB_DATA data, int flag);
+static NTSTATUS db_file_delete(struct db_record *rec);
+
+static struct db_record *db_file_fetch_locked(struct db_context *db,
+                                             TALLOC_CTX *mem_ctx,
+                                             TDB_DATA key)
+{
+       struct db_file_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_file_ctx);
+       struct db_record *result;
+       struct db_locked_file *file;
+       struct flock fl;
+       SMB_STRUCT_STAT statbuf;
+       ssize_t nread;
+       int ret;
+
+       SMB_ASSERT(ctx->locked_record == NULL);
+
+ again:
+       if (!(result = TALLOC_P(mem_ctx, struct db_record))) {
+               DEBUG(0, ("talloc failed\n"));
+               return NULL;
+       }
+
+       if (!(file = TALLOC_P(result, struct db_locked_file))) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       result->private_data = file;
+       result->store = db_file_store;
+       result->delete_rec = db_file_delete;
+
+       result->key.dsize = key.dsize;
+       result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
+       if (result->key.dptr == NULL) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       /* Cut to 8 bits */
+       file->hash = fsh(key.dptr, key.dsize);
+       file->name = hex_encode(file, (unsigned char *)key.dptr, key.dsize);
+       if (file->name == NULL) {
+               DEBUG(0, ("hex_encode failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       file->path = talloc_asprintf(file, "%s/%2.2X/%s", ctx->dirname,
+                                    file->hash, file->name);
+       if (file->path == NULL) {
+               DEBUG(0, ("talloc_asprintf failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       become_root();
+       file->fd = open(file->path, O_RDWR|O_CREAT, 0644);
+       unbecome_root();
+
+       if (file->fd < 0) {
+               DEBUG(3, ("Could not open/create %s: %s\n",
+                         file->path, strerror(errno)));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       talloc_set_destructor(file, db_locked_file_destr);
+
+       fl.l_type = F_WRLCK;
+       fl.l_whence = SEEK_SET;
+       fl.l_start = 0;
+       fl.l_len = 1;
+       fl.l_pid = 0;
+
+       do {
+               ret = fcntl(file->fd, F_SETLKW, &fl);
+       } while ((ret == -1) && (errno == EINTR));
+
+       if (ret == -1) {
+               DEBUG(3, ("Could not get lock on %s: %s\n",
+                         file->path, strerror(errno)));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       if (sys_fstat(file->fd, &statbuf) != 0) {
+               DEBUG(3, ("Could not fstat %s: %s\n",
+                         file->path, strerror(errno)));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       if (statbuf.st_nlink == 0) {
+               /* Someone has deleted it under the lock, retry */
+               TALLOC_FREE(result);
+               goto again;
+       }
+
+       result->value.dsize = 0;
+       result->value.dptr = NULL;
+
+       if (statbuf.st_size != 0) {
+               result->value.dsize = statbuf.st_size;
+               result->value.dptr = TALLOC_ARRAY(result, uint8,
+                                                 statbuf.st_size);
+               if (result->value.dptr == NULL) {
+                       DEBUG(1, ("talloc failed\n"));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+
+               nread = read_data(file->fd, (char *)result->value.dptr,
+                                 result->value.dsize);
+               if (nread != result->value.dsize) {
+                       DEBUG(3, ("read_data failed: %s\n", strerror(errno)));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+       }
+
+       ctx->locked_record = result;
+       file->parent = (struct db_file_ctx *)talloc_reference(file, ctx);
+
+       return result;
+}
+
+static NTSTATUS db_file_store_root(int fd, TDB_DATA data)
+{
+       if (sys_lseek(fd, 0, SEEK_SET) != 0) {
+               DEBUG(0, ("sys_lseek failed: %s\n", strerror(errno)));
+               return map_nt_error_from_unix(errno);
+       }
+
+       if (write_data(fd, (char *)data.dptr, data.dsize) != data.dsize) {
+               DEBUG(3, ("write_data failed: %s\n", strerror(errno)));
+               return map_nt_error_from_unix(errno);
+       }
+
+       if (sys_ftruncate(fd, data.dsize) != 0) {
+               DEBUG(3, ("sys_ftruncate failed: %s\n", strerror(errno)));
+               return map_nt_error_from_unix(errno);
+       }
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS db_file_store(struct db_record *rec, TDB_DATA data, int flag)
+{
+       struct db_locked_file *file =
+               talloc_get_type_abort(rec->private_data,
+                                     struct db_locked_file);
+       NTSTATUS status;
+
+       become_root();
+       status = db_file_store_root(file->fd, data);
+       unbecome_root();
+
+       return status;
+}
+
+static NTSTATUS db_file_delete(struct db_record *rec)
+{
+       struct db_locked_file *file =
+               talloc_get_type_abort(rec->private_data,
+                                     struct db_locked_file);
+       int res;
+
+       become_root();
+       res = unlink(file->path);
+       unbecome_root();
+
+       if (res == -1) {
+               DEBUG(3, ("unlink(%s) failed: %s\n", file->path,
+                         strerror(errno)));
+               return map_nt_error_from_unix(errno);
+       }
+
+       return NT_STATUS_OK;
+}
+
+static int db_file_traverse(struct db_context *db,
+                           int (*fn)(struct db_record *rec,
+                                     void *private_data),
+                           void *private_data)
+{
+       struct db_file_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_file_ctx);
+       TALLOC_CTX *mem_ctx = talloc_init("traversal %s\n", ctx->dirname);
+       
+       int i;
+       int count = 0;
+
+       for (i=0; i<256; i++) {
+               const char *dirname = talloc_asprintf(mem_ctx, "%s/%2.2X",
+                                                     ctx->dirname, i);
+               DIR *dir;
+               struct dirent *dirent;
+
+               if (dirname == NULL) {
+                       DEBUG(0, ("talloc failed\n"));
+                       TALLOC_FREE(mem_ctx);
+                       return -1;
+               }
+
+               dir = opendir(dirname);
+               if (dir == NULL) {
+                       DEBUG(3, ("Could not open dir %s: %s\n", dirname,
+                                 strerror(errno)));
+                       TALLOC_FREE(mem_ctx);
+                       return -1;
+               }
+
+               while ((dirent = readdir(dir)) != NULL) {
+                       DATA_BLOB keyblob;
+                       TDB_DATA key;
+                       struct db_record *rec;
+
+                       if ((dirent->d_name[0] == '.') &&
+                           ((dirent->d_name[1] == '\0') ||
+                            ((dirent->d_name[1] == '.') &&
+                             (dirent->d_name[2] == '\0')))) {
+                               continue;
+                       }
+
+                       keyblob = strhex_to_data_blob(mem_ctx, dirent->d_name);
+                       if (keyblob.data == NULL) {
+                               DEBUG(5, ("strhex_to_data_blob failed\n"));
+                               continue;
+                       }
+
+                       key.dptr = keyblob.data;
+                       key.dsize = keyblob.length;
+
+                       if ((ctx->locked_record != NULL) &&
+                           (key.dsize == ctx->locked_record->key.dsize) &&
+                           (memcmp(key.dptr, ctx->locked_record->key.dptr,
+                                   key.dsize) == 0)) {
+                               count += 1;
+                               if (fn(ctx->locked_record,
+                                      private_data) != 0) {
+                                       TALLOC_FREE(mem_ctx);
+                                       closedir(dir);
+                                       return count;
+                               }
+                       }
+
+                       rec = db_file_fetch_locked(db, mem_ctx, key);
+                       if (rec == NULL) {
+                               /* Someone might have deleted it */
+                               continue;
+                       }
+
+                       if (rec->value.dptr == NULL) {
+                               TALLOC_FREE(rec);
+                               continue;
+                       }
+
+                       count += 1;
+
+                       if (fn(rec, private_data) != 0) {
+                               TALLOC_FREE(mem_ctx);
+                               closedir(dir);
+                               return count;
+                       }
+                       TALLOC_FREE(rec);
+               }
+
+               closedir(dir);
+       }
+
+       TALLOC_FREE(mem_ctx);
+       return count;
+}
+
+struct db_context *db_open_file(TALLOC_CTX *mem_ctx,
+                               struct messaging_context *msg_ctx,
+                               const char *name,
+                               int hash_size, int tdb_flags,
+                               int open_flags, mode_t mode)
+{
+       struct db_context *result = NULL;
+       struct db_file_ctx *ctx;
+
+       if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
+               DEBUG(0, ("talloc failed\n"));
+               return NULL;
+       }
+
+       if (!(ctx = TALLOC_P(result, struct db_file_ctx))) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       result->private_data = ctx;
+       result->fetch_locked = db_file_fetch_locked;
+       result->traverse = db_file_traverse;
+
+       ctx->locked_record = NULL;
+       if (!(ctx->dirname = talloc_strdup(ctx, name))) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       if (open_flags & O_CREAT) {
+               int ret, i;
+
+               mode |= (mode & S_IRUSR) ? S_IXUSR : 0;
+               mode |= (mode & S_IRGRP) ? S_IXGRP : 0;
+               mode |= (mode & S_IROTH) ? S_IXOTH : 0;
+
+               ret = mkdir(name, mode);
+               if ((ret != 0) && (errno != EEXIST)) {
+                       DEBUG(5, ("mkdir(%s,%o) failed: %s\n", name, mode,
+                                 strerror(errno)));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+
+               for (i=0; i<256; i++) {
+                       char *path;
+                       path = talloc_asprintf(result, "%s/%2.2X", name, i);
+                       if (path == NULL) {
+                               DEBUG(0, ("asprintf failed\n"));
+                               TALLOC_FREE(result);
+                               return NULL;
+                       }
+                       ret = mkdir(path, mode);
+                       if ((ret != 0) && (errno != EEXIST)) {
+                               DEBUG(5, ("mkdir(%s,%o) failed: %s\n", path,
+                                         mode, strerror(errno)));
+                               TALLOC_FREE(result);
+                               return NULL;
+                       }
+                       TALLOC_FREE(path);
+               }
+       }
+
+       return result;
+}
diff --git a/source3/lib/dbwrap_tdb.c b/source3/lib/dbwrap_tdb.c
new file mode 100644 (file)
index 0000000..238ba51
--- /dev/null
@@ -0,0 +1,227 @@
+/* 
+   Unix SMB/CIFS implementation.
+   Database interface wrapper around tdb
+   Copyright (C) Volker Lendecke 2005-2007
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+
+struct db_tdb_ctx {
+       TDB_CONTEXT *tdb;
+};
+
+static NTSTATUS db_tdb_store(struct db_record *rec, TDB_DATA data, int flag);
+static NTSTATUS db_tdb_delete(struct db_record *rec);
+
+static int db_tdb_record_destr(struct db_record* data)
+{
+       struct db_tdb_ctx *ctx =
+               talloc_get_type_abort(data->private_data, struct db_tdb_ctx);
+
+       DEBUG(10, ("Unlocking key %s\n",
+                  hex_encode(data, (unsigned char *)data->key.dptr,
+                             data->key.dsize)));
+
+       if (tdb_chainunlock(ctx->tdb, data->key) != 0) {
+               DEBUG(0, ("tdb_chainunlock failed\n"));
+               return -1;
+       }
+       return 0;
+}
+
+static struct db_record *db_tdb_fetch_locked(struct db_context *db,
+                                    TALLOC_CTX *mem_ctx, TDB_DATA key)
+{
+       struct db_tdb_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                      struct db_tdb_ctx);
+       struct db_record *result;
+       TDB_DATA value;
+
+       result = TALLOC_P(mem_ctx, struct db_record);
+       if (result == NULL) {
+               DEBUG(0, ("talloc failed\n"));
+               return NULL;
+       }
+
+       result->key.dsize = key.dsize;
+       result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
+       if (result->key.dptr == NULL) {
+               DEBUG(0, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       result->value.dptr = NULL;
+       result->value.dsize = 0;
+       result->private_data = talloc_reference(result, ctx);
+       result->store = db_tdb_store;
+       result->delete_rec = db_tdb_delete;
+
+       {
+               char *keystr = hex_encode(NULL, (unsigned char *)key.dptr,
+                                         key.dsize);
+               DEBUG(10, ("Locking key %s\n", keystr));
+               TALLOC_FREE(keystr);
+       }
+
+       if (tdb_chainlock(ctx->tdb, key) != 0) {
+               DEBUG(3, ("tdb_chainlock failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       talloc_set_destructor(result, db_tdb_record_destr);
+
+       value = tdb_fetch(ctx->tdb, key);
+
+       if (value.dptr == NULL) {
+               return result;
+       }
+
+       result->value.dsize = value.dsize;
+       result->value.dptr = (uint8 *)talloc_memdup(result, value.dptr,
+                                                   value.dsize);
+       if (result->value.dptr == NULL) {
+               DEBUG(3, ("talloc failed\n"));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       SAFE_FREE(value.dptr);
+
+       DEBUG(10, ("Allocated locked data 0x%p\n", result));
+
+       return result;
+}
+
+static NTSTATUS db_tdb_store(struct db_record *rec, TDB_DATA data, int flag)
+{
+       struct db_tdb_ctx *ctx = talloc_get_type_abort(rec->private_data,
+                                                      struct db_tdb_ctx);
+
+       /*
+        * This has a bug: We need to replace rec->value for correct
+        * operation, but right now brlock and locking don't use the value
+        * anymore after it was stored.
+        */
+
+       return (tdb_store(ctx->tdb, rec->key, data, flag) == 0) ?
+               NT_STATUS_OK : NT_STATUS_UNSUCCESSFUL;
+}
+
+static NTSTATUS db_tdb_delete(struct db_record *rec)
+{
+       struct db_tdb_ctx *ctx = talloc_get_type_abort(rec->private_data,
+                                                      struct db_tdb_ctx);
+
+       return (tdb_delete(ctx->tdb, rec->key) == 0) ?
+               NT_STATUS_OK : NT_STATUS_UNSUCCESSFUL;
+}
+
+struct db_tdb_traverse_ctx {
+       struct db_context *db;
+       int (*f)(struct db_record *rec, void *private_data);
+       void *private_data;
+};
+
+static int db_tdb_traverse_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
+                               void *private_data)
+{
+       struct db_tdb_traverse_ctx *ctx =
+               (struct db_tdb_traverse_ctx *)private_data;
+       struct db_record rec;
+
+       rec.key = kbuf;
+       rec.value = dbuf;
+       rec.store = db_tdb_store;
+       rec.delete_rec = db_tdb_delete;
+       rec.private_data = ctx->db->private_data;
+
+       return ctx->f(&rec, ctx->private_data);
+}
+
+static int db_tdb_traverse(struct db_context *db,
+                          int (*f)(struct db_record *rec, void *private_data),
+                          void *private_data)
+{
+       struct db_tdb_ctx *db_ctx =
+               talloc_get_type_abort(db->private_data, struct db_tdb_ctx);
+       struct db_tdb_traverse_ctx ctx;
+
+       ctx.db = db;
+       ctx.f = f;
+       ctx.private_data = private_data;
+       return tdb_traverse(db_ctx->tdb, db_tdb_traverse_func, &ctx);
+}
+
+static int db_tdb_get_seqnum(struct db_context *db)
+
+{
+       struct db_tdb_ctx *db_ctx =
+               talloc_get_type_abort(db->private_data, struct db_tdb_ctx);
+       return tdb_get_seqnum(db_ctx->tdb);
+}
+
+static int db_tdb_ctx_destr(struct db_tdb_ctx *ctx)
+{
+       if (tdb_close(ctx->tdb) != 0) {
+               DEBUG(0, ("Failed to close tdb: %s\n", strerror(errno)));
+               return -1;
+       }
+
+       return 0;
+}
+
+struct db_context *db_open_tdb(TALLOC_CTX *mem_ctx,
+                              const char *name,
+                              int hash_size, int tdb_flags,
+                              int open_flags, mode_t mode)
+{
+       struct db_context *result = NULL;
+       struct db_tdb_ctx *db_tdb;
+
+       result = TALLOC_ZERO_P(mem_ctx, struct db_context);
+       if (result == NULL) {
+               DEBUG(0, ("talloc failed\n"));
+               goto fail;
+       }
+
+       result->private_data = db_tdb = TALLOC_P(result, struct db_tdb_ctx);
+       if (db_tdb == NULL) {
+               DEBUG(0, ("talloc failed\n"));
+               goto fail;
+       }
+
+       db_tdb->tdb = tdb_open_log(name, hash_size, tdb_flags,
+                                  open_flags, mode);
+       if (db_tdb->tdb == NULL) {
+               DEBUG(3, ("Could not open tdb: %s\n", strerror(errno)));
+               goto fail;
+       }
+
+       talloc_set_destructor(db_tdb, db_tdb_ctx_destr);
+       result->fetch_locked = db_tdb_fetch_locked;
+       result->traverse = db_tdb_traverse;
+       result->get_seqnum = db_tdb_get_seqnum;
+       return result;
+
+ fail:
+       if (result != NULL) {
+               TALLOC_FREE(result);
+       }
+       return NULL;
+}