r17425: Add the multi-key wrapper. If it's necessary to add general blobs as keys,
authorVolker Lendecke <vlendec@samba.org>
Sat, 5 Aug 2006 17:49:35 +0000 (17:49 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 16:38:33 +0000 (11:38 -0500)
this can trivially be added later.

Volker
(This used to be commit 6915adb9780052952e4a1d9e1c3e6cac06f48463)

source3/Makefile.in
source3/lib/tdb_multikey.c [new file with mode: 0644]
source3/script/tests/test_smbtorture_s3.sh
source3/tdb/tdbutil.c
source3/tdb/tdbutil.h
source3/torture/torture.c

index 0ff5872063bb5f40829ac40d9e2b27b60c740b66..fad87632475b54728bea47dd575fd3212897d873 100644 (file)
@@ -225,7 +225,7 @@ LIB_OBJ = $(VERSION_OBJ) lib/charcnv.o lib/debug.o lib/fault.o \
          lib/md5.o lib/hmacmd5.o lib/arc4.o lib/iconv.o \
          nsswitch/wb_client.o $(WBCOMMON_OBJ) \
          lib/pam_errors.o intl/lang_tdb.o \
-         lib/adt_tree.o lib/gencache.o $(TDB_OBJ) \
+         lib/adt_tree.o lib/gencache.o $(TDB_OBJ) lib/tdb_multikey.o \
          lib/module.o lib/events.o lib/ldap_escape.o @CHARSET_STATIC@ \
          lib/secdesc.o lib/util_seaccess.o lib/secace.o lib/secacl.o @SOCKWRAP@ \
          libads/krb5_errs.o lib/system_smbd.o lib/audit.o
diff --git a/source3/lib/tdb_multikey.c b/source3/lib/tdb_multikey.c
new file mode 100644 (file)
index 0000000..77e63c5
--- /dev/null
@@ -0,0 +1,530 @@
+/* 
+ *  Unix SMB/CIFS implementation.
+ *  TDB multi-key wrapper
+ *  Copyright (C) Volker Lendecke 2006
+ *  
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *  
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *  
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include "includes.h"
+
+static struct { enum TDB_ERROR t; NTSTATUS n; } tdb_to_ntstatus_map[] = {
+       { TDB_ERR_CORRUPT, NT_STATUS_INTERNAL_DB_CORRUPTION },
+       { TDB_ERR_IO, NT_STATUS_UNEXPECTED_IO_ERROR },
+       { TDB_ERR_LOCK, NT_STATUS_FILE_LOCK_CONFLICT },
+       { TDB_ERR_OOM, NT_STATUS_NO_MEMORY },
+       { TDB_ERR_EXISTS, NT_STATUS_OBJECTID_EXISTS },
+       { TDB_ERR_NOLOCK, NT_STATUS_NOT_LOCKED },
+       { TDB_ERR_LOCK_TIMEOUT, NT_STATUS_IO_TIMEOUT },
+       { TDB_ERR_NOEXIST, NT_STATUS_NOT_FOUND },
+       { TDB_ERR_EINVAL, NT_STATUS_INVALID_PARAMETER },
+       { TDB_ERR_RDONLY, NT_STATUS_ACCESS_DENIED },
+       { 0, NT_STATUS_OK },
+};     
+
+static NTSTATUS map_ntstatus_from_tdb(struct tdb_context *t)
+{
+       enum TDB_ERROR err = tdb_error(t);
+       int i = 0;
+
+       while (tdb_to_ntstatus_map[i].t != 0) {
+               if (tdb_to_ntstatus_map[i].t == err) {
+                       return tdb_to_ntstatus_map[i].n;
+               }
+               i += 1;
+       }
+
+       return NT_STATUS_INTERNAL_ERROR;
+}
+
+#define KEY_VERSION (1)
+#define PRIMARY_KEY_LENGTH (24)
+
+/*
+ * Check that the keying version is acceptable. Change operations are very
+ * expensive under transactions anyway, so we do this upon every change to
+ * avoid damage when someone changes the key format while we have the db open.
+ *
+ * To be called only within a transaction, we don't do locking here.
+ */
+
+static BOOL tdb_check_keyversion(struct tdb_context *tdb)
+{
+       const char *versionkey = "KEYVERSION";
+       TDB_DATA key, data;
+       NTSTATUS status;
+       unsigned long version;
+       char *endptr;
+
+       key.dptr = CONST_DISCARD(char *, versionkey);
+       key.dsize = strlen(versionkey)+1;
+
+       data = tdb_fetch(tdb, key);
+       if (data.dptr == NULL) {
+               char *vstr;
+               int res;
+
+               asprintf(&vstr, "%d", KEY_VERSION);
+               if (vstr == NULL) {
+                       DEBUG(0, ("asprintf failed\n"));
+                       return False;
+               }
+               data.dptr = vstr;
+               data.dsize = strlen(vstr)+1;
+
+               res = tdb_store(tdb, key, data, TDB_INSERT);
+               SAFE_FREE(vstr);
+
+               if (res < 0) {
+                       status = map_ntstatus_from_tdb(tdb);
+                       DEBUG(5, ("Could not store key: %s\n",
+                                 nt_errstr(status)));
+                       return False;
+               }
+
+               return True;
+       }
+
+       /*
+        * We have a key, check it
+        */
+
+       SMB_ASSERT(data.dsize > 0);
+       if (data.dptr[data.dsize-1] != '\0') {
+               DEBUG(1, ("Key field not NUL terminated\n"));
+               SAFE_FREE(data.dptr);
+               return False;
+       }
+
+       version = strtoul(data.dptr, &endptr, 10);
+       if (endptr != data.dptr+data.dsize-1) {
+               DEBUG(1, ("Invalid version string\n"));
+               SAFE_FREE(data.dptr);
+               return False;
+       }
+       SAFE_FREE(data.dptr);
+
+       if (version != KEY_VERSION) {
+               DEBUG(1, ("Wrong key version: %ld, expected %d\n",
+                         version, KEY_VERSION));
+               return False;
+       }
+
+       return True;
+}
+
+/*
+ * Find a record according to a key and value expected in that key. The
+ * primary_key is returned for later reference in tdb_idx_update or
+ * tdb_idx_delete.
+ */
+
+NTSTATUS tdb_find_keyed(TALLOC_CTX *ctx, struct tdb_context *tdb,
+                       int keynumber, const char *value,
+                       TDB_DATA *result, char **primary_key)
+{
+       TDB_DATA key, prim, data;
+       NTSTATUS status;
+
+       prim.dptr = data.dptr = NULL;
+
+       key.dptr = talloc_asprintf(ctx, "KEY/%d/%s", keynumber, value);
+       if (key.dptr == NULL) {
+               DEBUG(0, ("talloc_asprintf failed\n"));
+               status = NT_STATUS_NO_MEMORY;
+               goto fail;
+       }
+       key.dsize = strlen(key.dptr)+1;
+
+       prim = tdb_fetch(tdb, key);
+       if (prim.dptr == NULL) {
+               status = NT_STATUS_NOT_FOUND;
+               goto fail;
+       }
+
+       data = tdb_fetch(tdb, prim);
+       if (data.dptr == NULL) {
+               DEBUG(1, ("Did not find record %s for key %s\n",
+                         prim.dptr, key.dptr));
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               goto fail;
+       }
+
+       if (primary_key != NULL) {
+               *primary_key = talloc_strndup(ctx, prim.dptr, prim.dsize);
+               if (*primary_key == NULL) {
+                       status = NT_STATUS_NO_MEMORY;
+                       goto fail;
+               }
+       }
+       
+       /*
+        * The following copy will be removed when tdb_fetch takes a
+        * TALLOC_CTX as parameter.
+        */
+
+       result->dptr = (char *)talloc_memdup(ctx, data.dptr, data.dsize);
+       if (result->dptr == NULL) {
+               status = NT_STATUS_NO_MEMORY;
+               goto fail;
+       }
+       result->dsize = data.dsize;
+
+       status = NT_STATUS_OK;
+
+ fail:
+       TALLOC_FREE(key.dptr);
+       SAFE_FREE(prim.dptr);
+       SAFE_FREE(data.dptr);
+       return status;
+}
+
+/*
+ * Store all the key entries for a data entry. Best called within a tdb
+ * transaction.
+ */
+
+static NTSTATUS set_keys(struct tdb_context *tdb,
+                        char **(*getkeys)(TALLOC_CTX *mem_ctx, TDB_DATA data,
+                                          void *private_data),
+                        TDB_DATA primary_key, TDB_DATA user_data,
+                        void *private_data)
+{
+       int i;
+       char **keys = getkeys(NULL, user_data, private_data);
+
+       if (keys == NULL) {
+               DEBUG(5, ("Could not get keys\n"));
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       for (i=0; keys[i] != NULL; i++) {
+               NTSTATUS status;
+               TDB_DATA key;
+
+               key.dptr = talloc_asprintf(keys, "KEY/%d/%s", i, keys[i]);
+               if (key.dptr == NULL) {
+                       DEBUG(0, ("talloc_asprintf failed\n"));
+                       TALLOC_FREE(keys);
+                       return NT_STATUS_NO_MEMORY;
+               }
+               key.dsize = strlen(key.dptr)+1;
+
+               if (tdb_store(tdb, key, primary_key, TDB_INSERT) < 0) {
+                       status = map_ntstatus_from_tdb(tdb);
+                       DEBUG(5, ("Could not store key %d: %s\n", i,
+                                 nt_errstr(status)));
+                       TALLOC_FREE(keys);
+                       return status;
+               }
+       }
+
+       TALLOC_FREE(keys);
+       return NT_STATUS_OK;
+}
+
+/*
+ * Delete all the key entries for a data entry. Best called within a tdb
+ * transaction.
+ */
+
+static NTSTATUS del_keys(struct tdb_context *tdb,
+                        char **(*getkeys)(TALLOC_CTX *mem_ctx, TDB_DATA data,
+                                          void *private_data),
+                        TDB_DATA primary_key, void *private_data)
+{
+       TDB_DATA data;
+       int i;
+       char **keys;
+
+       /*
+        * We need the data record to be able to fetch all the keys, so pull
+        * the user data
+        */
+
+       data = tdb_fetch(tdb, primary_key);
+       if (data.dptr == NULL) {
+               DEBUG(5, ("Could not find record for key %s\n",
+                         primary_key.dptr));
+               return NT_STATUS_NOT_FOUND;
+       }
+
+       keys = getkeys(NULL, data, private_data);
+       if (keys == NULL) {
+               DEBUG(5, ("Could not get keys\n"));
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       SAFE_FREE(data.dptr);
+
+       for (i=0; keys[i] != NULL; i++) {
+               NTSTATUS status;
+               TDB_DATA key;
+
+               key.dptr = talloc_asprintf(keys, "KEY/%d/%s", i, keys[i]);
+               if (key.dptr == NULL) {
+                       DEBUG(0, ("talloc_asprintf failed\n"));
+                       TALLOC_FREE(keys);
+                       return NT_STATUS_NO_MEMORY;
+               }
+               key.dsize = strlen(key.dptr)+1;
+
+               if (tdb_delete(tdb, key) < 0) {
+                       status = map_ntstatus_from_tdb(tdb);
+                       DEBUG(5, ("Could not delete key %d: %s\n", i,
+                                 nt_errstr(status)));
+                       TALLOC_FREE(keys);
+                       return status;
+               }
+       }
+
+       TALLOC_FREE(keys);
+       return NT_STATUS_OK;
+}
+
+/*
+ * Generate a unique primary key
+ */
+
+static TDB_DATA new_primary_key(struct tdb_context *tdb)
+{
+       TDB_DATA key;
+       int i;
+
+       /*
+        * Generate a new primary key, the for loop is for the very unlikely
+        * collisions.
+        */
+
+       for (i=0; i<20; i++) {
+               TDB_DATA data;
+               asprintf(&key.dptr, "KEYPRIM/%s", generate_random_str(16));
+               if (key.dptr == NULL) {
+                       DEBUG(0, ("talloc_asprintf failed\n"));
+                       return key;
+               }
+
+#ifdef DEVELOPER
+               SMB_ASSERT(strlen(key.dptr) == PRIMARY_KEY_LENGTH);
+#endif
+               key.dsize = PRIMARY_KEY_LENGTH+1;
+
+               data = tdb_fetch(tdb, key);
+               if (data.dptr == NULL) {
+                       return key;
+               }
+               SAFE_FREE(key.dptr);
+               SAFE_FREE(data.dptr);
+       }
+
+       DEBUG(0, ("Did not find a unique key string!\n"));
+       key.dptr = NULL;
+       key.dsize = 0;
+       return key;
+}
+
+/*
+ * Add a new record to the database
+ */
+
+NTSTATUS tdb_add_keyed(struct tdb_context *tdb,
+                      char **(*getkeys)(TALLOC_CTX *mem_ctx, TDB_DATA data,
+                                        void *private_data),
+                      TDB_DATA data, void *private_data)
+{
+       NTSTATUS status = NT_STATUS_OK;
+       TDB_DATA key;
+
+       key.dptr = NULL;
+
+       if (tdb_transaction_start(tdb) < 0) {
+               status = map_ntstatus_from_tdb(tdb);
+               DEBUG(5, ("Could not start transaction: %s\n",
+                         nt_errstr(status)));
+               return status;
+       }
+
+       if (!tdb_check_keyversion(tdb)) {
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               goto fail;
+       }
+
+       key = new_primary_key(tdb);
+       if (key.dptr == NULL) {
+               status = NT_STATUS_NO_MEMORY;
+               goto fail;
+       }
+
+       if (tdb_store(tdb, key, data, TDB_INSERT) < 0) {
+               status = map_ntstatus_from_tdb(tdb);
+               DEBUG(5, ("Could not store record: %s\n", nt_errstr(status)));
+               goto fail;
+       }
+
+       status = set_keys(tdb, getkeys, key, data, private_data);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(5, ("set_keys failed: %s\n", nt_errstr(status)));
+               goto fail;
+       }
+
+       if (tdb_transaction_commit(tdb) < 0) {
+               status = map_ntstatus_from_tdb(tdb);
+               DEBUG(5, ("tdb_transaction_commit failed: %s\n",
+                         nt_errstr(status)));
+               goto fail;
+       }
+
+       SAFE_FREE(key.dptr);
+       return NT_STATUS_OK;
+
+ fail:
+       if (tdb_transaction_cancel(tdb) < 0) {
+               smb_panic("tdb_cancel_transaction failed\n");
+       }
+
+       SAFE_FREE(key.dptr);
+       return status;
+}
+
+/*
+ * Delete a record from the database, given its primary key
+ */
+
+NTSTATUS tdb_del_keyed(struct tdb_context *tdb,
+                      char **(*getkeys)(TALLOC_CTX *mem_ctx, TDB_DATA data,
+                                        void *private_data),
+                      const char *primary_key, void *private_data)
+{
+       NTSTATUS status = NT_STATUS_OK;
+       TDB_DATA key;
+
+       if ((primary_key == NULL) ||
+           (strlen(primary_key) != PRIMARY_KEY_LENGTH) ||
+           (strncmp(primary_key, "KEYPRIM/", 7) != 0)) {
+               return NT_STATUS_INVALID_PARAMETER;
+       }
+
+       if (tdb_transaction_start(tdb) < 0) {
+               status = map_ntstatus_from_tdb(tdb);
+               DEBUG(5, ("Could not start transaction: %s\n",
+                         nt_errstr(status)));
+               return status;
+       }
+
+       if (!tdb_check_keyversion(tdb)) {
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               goto fail;
+       }
+
+       key.dptr = CONST_DISCARD(char *, primary_key);
+       key.dsize = PRIMARY_KEY_LENGTH+1;
+
+       status = del_keys(tdb, getkeys, key, private_data);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(0, ("del_keys failed: %s\n", nt_errstr(status)));
+               goto fail;
+       }
+
+       if (tdb_delete(tdb, key) < 0) {
+               DEBUG(5, ("Could not delete record %s\n", primary_key));
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               goto fail;
+       }
+
+       if (tdb_transaction_commit(tdb) < 0) {
+               status = map_ntstatus_from_tdb(tdb);
+               DEBUG(5, ("tdb_transaction_commit failed: %s\n",
+                         nt_errstr(status)));
+               goto fail;
+       }
+
+       return NT_STATUS_OK;
+
+ fail:
+       if (tdb_transaction_cancel(tdb) < 0) {
+               smb_panic("tdb_cancel_transaction failed\n");
+       }
+
+       return status;
+}
+
+/*
+ * Update a record that has previously been fetched and then changed.
+ */
+
+NTSTATUS tdb_update_keyed(struct tdb_context *tdb, const char *primary_key,
+                         char **(*getkeys)(TALLOC_CTX *mem_ctx,
+                                           TDB_DATA data, void *private_data),
+                         TDB_DATA data, void *private_data)
+{
+       NTSTATUS status = NT_STATUS_OK;
+       TDB_DATA key;
+
+       if ((primary_key == NULL) ||
+           (strlen(primary_key) != PRIMARY_KEY_LENGTH) ||
+           (strncmp(primary_key, "KEYPRIM/", 7) != 0)) {
+               return NT_STATUS_INVALID_PARAMETER;
+       }
+
+       if (tdb_transaction_start(tdb) < 0) {
+               status = map_ntstatus_from_tdb(tdb);
+               DEBUG(5, ("Could not start transaction: %s\n",
+                         nt_errstr(status)));
+               return status;
+       }
+
+       if (!tdb_check_keyversion(tdb)) {
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               goto fail;
+       }
+
+       key.dptr = CONST_DISCARD(char *, primary_key);
+       key.dsize = PRIMARY_KEY_LENGTH+1;
+
+       status = del_keys(tdb, getkeys, key, private_data);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(5, ("del_keys failed: %s\n", nt_errstr(status)));
+               goto fail;
+       }
+
+       if (tdb_store(tdb, key, data, TDB_REPLACE) < 0) {
+               status = map_ntstatus_from_tdb(tdb);
+               DEBUG(5, ("Could not store new record: %s\n",
+                         nt_errstr(status)));
+               goto fail;
+       }
+
+       status = set_keys(tdb, getkeys, key, data, private_data);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(5, ("set_keys failed: %s\n", nt_errstr(status)));
+               goto fail;
+       }
+
+       if (tdb_transaction_commit(tdb) < 0) {
+               status = map_ntstatus_from_tdb(tdb);
+               DEBUG(5, ("tdb_transaction_commit failed: %s\n",
+                         nt_errstr(status)));
+               goto fail;
+       }
+
+       return NT_STATUS_OK;
+
+ fail:
+       if (tdb_transaction_cancel(tdb) < 0) {
+               smb_panic("tdb_cancel_transaction failed\n");
+       }
+
+       return status;
+}
index 842d914ecff2f7d476abbeae8814c1a92751d66a..3675f39257332e099d38806aec437a94438cb69a 100755 (executable)
@@ -25,6 +25,7 @@ tests="$tests OPLOCK1 OPLOCK2 OPLOCK3"
 tests="$tests DIR DIR1 TCON TCONDEV RW1 RW2 RW3"
 tests="$tests OPEN XCOPY RENAME DELETE PROPERTIES W2K"
 tests="$tests PIPE_NUMBER TCON2 IOCTL CHKPATH FDSESS LOCAL-SUBSTITUTE"
+tests="$tests LOCAL-MULTIKEY"
 
 skipped1="RANDOMIPC NEGNOWAIT NBENCH ERRMAPEXTRACT TRANS2SCAN NTTRANSSCAN"
 skipped2="DENY1 DENY2 OPENATTR CASETABLE EATEST"
index b946f856aad7ca6c439486cda147447b7a1d4d05..21d593a26a988e66acb515fac9d80528ac153d85 100644 (file)
@@ -505,9 +505,10 @@ size_t tdb_pack(char *buf, int bufsize, const char *fmt, ...)
        return result;
 }
 
-BOOL tdb_pack_append(TALLOC_CTX *mem_ctx, uint8 **buf, size_t *len,
+BOOL tdb_pack_append(void *_mem_ctx, char **buf, size_t *len,
                     const char *fmt, ...)
 {
+       TALLOC_CTX *mem_ctx = (void *)_mem_ctx;
        va_list ap;
        size_t len1, len2;
 
@@ -515,13 +516,8 @@ BOOL tdb_pack_append(TALLOC_CTX *mem_ctx, uint8 **buf, size_t *len,
        len1 = tdb_pack_va(NULL, 0, fmt, ap);
        va_end(ap);
 
-       if (mem_ctx != NULL) {
-               *buf = TALLOC_REALLOC_ARRAY(mem_ctx, *buf, uint8,
-                                           (*len) + len1);
-       } else {
-               *buf = SMB_REALLOC_ARRAY(*buf, uint8, (*len) + len1);
-       }
-
+       *buf = TALLOC_REALLOC_ARRAY(mem_ctx, *buf, char,
+                                   (*len) + len1);
        if (*buf == NULL) {
                return False;
        }
index 44351619f451da25c159a697e197d4bce80eb19b..60107adacbb125a915235a887a683a6ca0bc8bca 100644 (file)
@@ -54,6 +54,8 @@ struct tdb_context *tdb_open_log(const char *name, int hash_size,
                                 int tdb_flags, int open_flags, mode_t mode);
 int tdb_unpack(char *buf, int bufsize, const char *fmt, ...);
 size_t tdb_pack(char *buf, int bufsize, const char *fmt, ...);
+BOOL tdb_pack_append(void *_mem_ctx, char **buf, size_t *len,
+                    const char *fmt, ...);
 TDB_DATA make_tdb_data(const char *dptr, size_t dsize);
 TDB_DATA string_tdb_data(const char *string);
 int tdb_trans_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf,
index 5876707d62ff0fa184be72cf201dcb169ec8b5a5..009eb84787c0ef2a25d721343448fcf184c7c674 100644 (file)
@@ -4797,6 +4797,169 @@ static BOOL run_local_substitute(int dummy)
        return (diff == 0);
 }
 
+static char **key_fn(TALLOC_CTX *mem_ctx, TDB_DATA data,
+                    void *private_data)
+{
+       fstring key, value;
+       char **result;
+
+       result = TALLOC_ARRAY(mem_ctx, char *, 3);
+       if (result == NULL) {
+               return NULL;
+       }
+
+       if (tdb_unpack(data.dptr, data.dsize, "ff", key, value) < 0) {
+               d_fprintf(stderr, "tdb_unpack failed\n");
+               TALLOC_FREE(result);
+               return NULL;
+       }
+       result[0] = talloc_strdup(result, key);
+       result[1] = talloc_strdup(result, value);
+       result[2] = NULL;
+
+       if ((result[0] == NULL) || (result[1] == NULL)) {
+               d_fprintf(stderr, "talloc_strdup failed\n");
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       return result;
+}
+
+static NTSTATUS multikey_add(struct tdb_context *tdb, const char *key,
+                            const char *value)
+{
+       NTSTATUS status;
+       TDB_DATA data;
+
+       data.dptr = NULL;
+       data.dsize = 0;
+
+       if (!tdb_pack_append(NULL, &data.dptr, &data.dsize,
+                            "ff", key, value)) {
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       status = tdb_add_keyed(tdb, key_fn, data, NULL);
+       TALLOC_FREE(data.dptr);
+       return status;
+}
+
+#define CHECK_STATUS(_status, _expected) do { \
+       if (!NT_STATUS_EQUAL(_status, _expected)) { \
+               printf("(%d) Incorrect status %s - should be %s\n", \
+                      __LINE__, nt_errstr(status), nt_errstr(_expected)); \
+               ret = False; \
+               goto fail; \
+       }} while (0)
+
+static BOOL run_local_multikey(int dummy)
+{
+       TALLOC_CTX *mem_ctx;
+       char *prim;
+       const char *tdbname = "multi_key_test.tdb";
+       struct tdb_context *tdb = NULL;
+       NTSTATUS status;
+       BOOL ret = False;
+       TDB_DATA data;
+       int i;
+       fstring key,value;
+
+       unlink(tdbname);
+
+       mem_ctx = talloc_init("run_local_multikey");
+       if (mem_ctx == NULL) {
+               d_fprintf(stderr, "talloc_init failed\n");
+               return False;
+       }
+
+       tdb = tdb_open(tdbname, 0, 0, O_CREAT|O_RDWR, 0644);
+       if (tdb == NULL) {
+               d_fprintf(stderr, "tdb_open failed: %s\n", strerror(errno));
+               goto fail;
+       }
+
+       for (i=0; i<200; i++) {
+               fstr_sprintf(key, "KEY%d", i);
+               fstr_sprintf(value, "VAL%d", i);
+
+               status = multikey_add(tdb, key, value);
+               if (!NT_STATUS_IS_OK(status)) {
+                       d_fprintf(stderr, "tdb_add_keyed failed: %s\n",
+                                 nt_errstr(status));
+                       goto fail;
+               }
+       }
+
+       status = multikey_add(tdb, "KEY35", "FOOO");
+       CHECK_STATUS(status, NT_STATUS_OBJECTID_EXISTS);
+       status = multikey_add(tdb, "KEY42", "VAL45");
+       CHECK_STATUS(status, NT_STATUS_OBJECTID_EXISTS);
+       status = multikey_add(tdb, "FOO", "VAL45");
+       CHECK_STATUS(status, NT_STATUS_OBJECTID_EXISTS);
+
+       for (i=0; i<200; i++) {
+               fstr_sprintf(key, "KEY%d", i);
+               fstr_sprintf(value, "VAL%d", i);
+
+               status = tdb_find_keyed(mem_ctx, tdb, 0, key, &data, &prim);
+               CHECK_STATUS(status, NT_STATUS_OK);
+               status = tdb_find_keyed(mem_ctx, tdb, 1, value, &data, &prim);
+               CHECK_STATUS(status, NT_STATUS_OK);
+               status = tdb_find_keyed(mem_ctx, tdb, 1, key, &data, &prim);
+               CHECK_STATUS(status, NT_STATUS_NOT_FOUND);
+               status = tdb_find_keyed(mem_ctx, tdb, 0, value, &data, &prim);
+               CHECK_STATUS(status, NT_STATUS_NOT_FOUND);
+       }
+
+       status = tdb_find_keyed(mem_ctx, tdb, 0, "FOO", &data, &prim);
+       CHECK_STATUS(status, NT_STATUS_NOT_FOUND);
+       status = tdb_find_keyed(mem_ctx, tdb, 1, "BAR", &data, &prim);
+       CHECK_STATUS(status, NT_STATUS_NOT_FOUND);
+
+       status = tdb_find_keyed(mem_ctx, tdb, 0, "KEY0", &data, &prim);
+       CHECK_STATUS(status, NT_STATUS_OK);
+
+       ZERO_STRUCT(data);
+       if (tdb_pack_append(mem_ctx, &data.dptr, &data.dsize, "ff",
+                           "NEWKEY", "NEWVAL") < 0) {
+               d_printf("tdb_pack_alloc failed\n");
+               goto fail;
+       }
+
+       status = tdb_update_keyed(tdb, prim, key_fn, data, NULL);
+       CHECK_STATUS(status, NT_STATUS_OK);
+
+       status = tdb_find_keyed(mem_ctx, tdb, 0, "KEY0", &data, &prim);
+       CHECK_STATUS(status, NT_STATUS_NOT_FOUND);
+       status = tdb_find_keyed(mem_ctx, tdb, 1, "VAL0", &data, &prim);
+       CHECK_STATUS(status, NT_STATUS_NOT_FOUND);
+       status = tdb_find_keyed(mem_ctx, tdb, 0, "NEWKEY", &data, &prim);
+       CHECK_STATUS(status, NT_STATUS_OK);
+       status = tdb_find_keyed(mem_ctx, tdb, 1, "NEWVAL", &data, &prim);
+       CHECK_STATUS(status, NT_STATUS_OK);
+
+       status = tdb_del_keyed(tdb, key_fn, prim, NULL);
+       CHECK_STATUS(status, NT_STATUS_OK);
+
+       for (i=1; i<200; i++) {
+               fstr_sprintf(key, "KEY%d", i);
+               status = tdb_find_keyed(mem_ctx, tdb, 0, key, &data, &prim);
+               CHECK_STATUS(status, NT_STATUS_OK);
+               status = tdb_del_keyed(tdb, key_fn, prim, NULL);
+               CHECK_STATUS(status, NT_STATUS_OK);
+       }
+
+       ret = True;
+ fail:
+       if (tdb != NULL) {
+               tdb_close(tdb);
+       }
+       unlink(tdbname);
+       TALLOC_FREE(mem_ctx);
+       return ret;
+}
+
 static double create_procs(BOOL (*fn)(int), BOOL *result)
 {
        int i, status;
@@ -4948,6 +5111,7 @@ static struct {
        {"FDSESS", run_fdsesstest, 0},
        { "EATEST", run_eatest, 0},
        { "LOCAL-SUBSTITUTE", run_local_substitute, 0},
+       { "LOCAL-MULTIKEY", run_local_multikey, 0},
        {NULL, NULL, 0}};