lib ldb key value: fix index buffering
authorGary Lockyer <gary@catalyst.net.nz>
Wed, 6 Mar 2019 02:28:45 +0000 (15:28 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Fri, 21 Jun 2019 04:27:13 +0000 (04:27 +0000)
As a performance enhancement the key value layer maintains a cache of
the index records, which is written to disk as part of a prepare commit.
This patch adds an extra cache at the operation layer to ensure that the
cached indexes remain consistent in the event of an operation failing.

Add test to test for index corruption in a failed modify.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Pair-Programmed-With: Andrew Bartlett <abartlet@samba.org>
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
lib/ldb/ldb_key_value/ldb_kv.c
lib/ldb/ldb_key_value/ldb_kv.h
lib/ldb/ldb_key_value/ldb_kv_index.c
lib/ldb/tests/python/api.py

index 189da288f62660a1a4071ddaaa99cd0edd3a065c..56316eb89757e46321301185c6e01b0933d7d2c6 100644 (file)
@@ -469,14 +469,6 @@ static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
        return false;
 }
 
-
-/*
- * Place holder actual implementation to be added in subsequent commits
- */
-int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
-{
-       return LDB_SUCCESS;
-}
 /*
  * Starts a sub transaction if they are supported by the backend
  */
@@ -491,13 +483,6 @@ static int ldb_kv_sub_transaction_start(struct ldb_kv_private *ldb_kv)
        return ret;
 }
 
-/*
- * Place holder actual implementation to be added in subsequent commits
- */
-int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
-{
-       return LDB_SUCCESS;
-}
 /*
  * Commits a sub transaction if they are supported by the backend
  */
@@ -513,13 +498,6 @@ static int ldb_kv_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
        return ret;
 }
 
-/*
- * Place holder actual implementation to be added in subsequent commits
- */
-int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
-{
-       return LDB_SUCCESS;
-}
 /*
  * Cancels a sub transaction if they are supported by the backend
  */
index f344155eedd2a5eef7a17c6e3fbdfa8030e17ac8..7ebda13692e2eae0a235b292f98f6e843ab8d63f 100644 (file)
@@ -85,7 +85,19 @@ struct ldb_kv_private {
 
        bool check_base;
        bool disallow_dn_filter;
+       /*
+        * To improve the performance of batch operations we maintain a cache
+        * of index records, these entries get written to disk in the
+        * prepare_commit phase.
+        */
        struct ldb_kv_idxptr *idxptr;
+       /*
+        * To ensure that the indexes in idxptr are consistent we cache any
+        * index updates during an operation, i.e. ldb_kv_add, ldb_kv_delete ...
+        * Once the changes to the data record have been commited to disk
+        * the contents of this cache are copied to idxptr
+        */
+       struct ldb_kv_idxptr *nested_idx_ptr;
        bool prepared_commit;
        int read_lock_count;
 
index 5697c2f9c43dc0ac2e49c8b87a7f68a2357cba5b..2671d55a4d348c5aac989acf2ade60bda0152859 100644 (file)
@@ -163,6 +163,11 @@ struct dn_list {
 };
 
 struct ldb_kv_idxptr {
+       /*
+        * In memory tdb to cache the index updates performed during a
+        * transaction.  This improves the performance of operations like
+        * re-index and join
+        */
        struct tdb_context *itdb;
        int error;
 };
@@ -347,6 +352,11 @@ static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
        return list;
 }
 
+enum dn_list_will_be_read_only {
+       DN_LIST_MUTABLE = 0,
+       DN_LIST_WILL_BE_READ_ONLY = 1,
+};
+
 /*
   return the @IDX list in an index entry for a dn as a
   struct dn_list
@@ -354,27 +364,44 @@ static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
 static int ldb_kv_dn_list_load(struct ldb_module *module,
                               struct ldb_kv_private *ldb_kv,
                               struct ldb_dn *dn,
-                              struct dn_list *list)
+                              struct dn_list *list,
+                              enum dn_list_will_be_read_only read_only)
 {
        struct ldb_message *msg;
        int ret, version;
        struct ldb_message_element *el;
-       TDB_DATA rec;
+       TDB_DATA rec = {0};
        struct dn_list *list2;
-       TDB_DATA key;
+       bool from_primary_cache = false;
+       TDB_DATA key = {0};
 
        list->dn = NULL;
        list->count = 0;
+       list->strict = false;
 
-       /* see if we have any in-memory index entries */
-       if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
+       /*
+        * See if we have an in memory index cache
+        */
+       if (ldb_kv->idxptr == NULL) {
                goto normal_index;
        }
 
        key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
        key.dsize = strlen((char *)key.dptr);
 
-       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       /*
+        * Have we cached this index record?
+        * If we have a nested transaction cache try that first.
+        * then try the transaction cache.
+        * if the record is not cached it will need to be read from disk.
+        */
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
+       }
+       if (rec.dptr == NULL) {
+               from_primary_cache = true;
+               rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       }
        if (rec.dptr == NULL) {
                goto normal_index;
        }
@@ -387,9 +414,72 @@ static int ldb_kv_dn_list_load(struct ldb_module *module,
        }
        free(rec.dptr);
 
-       *list = *list2;
+       /*
+        * If this is a read only transaction the indexes will not be
+        * changed so we don't need a copy in the event of a rollback
+        *
+        * In this case make an early return
+        */
+       if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * record was read from the sub transaction cache, so we have
+        * already copied the primary cache record
+        */
+       if (!from_primary_cache) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * No index sub transaction active, so no need to cache a copy
+        */
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               *list = *list2;
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * There is an active index sub transaction, and the record was
+        * found in the primary index transaction cache.  A copy of the
+        * record needs be taken to prevent the original entry being
+        * altered, until the index sub transaction is committed.
+        */
+
+       {
+               struct ldb_val *dns = NULL;
+               size_t x = 0;
+
+               dns = talloc_array(
+                       list,
+                       struct ldb_val,
+                       list2->count);
+               if (dns == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               for (x = 0; x < list2->count; x++) {
+                       dns[x].length = list2->dn[x].length;
+                       dns[x].data = talloc_memdup(
+                               dns,
+                               list2->dn[x].data,
+                               list2->dn[x].length);
+                       if (dns[x].data == NULL) {
+                               TALLOC_FREE(dns);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+               }
+               list->dn = dns;
+               list->count = list2->count;
+       }
        return LDB_SUCCESS;
 
+       /*
+        * Index record not found in the caches, read it from the
+        * database.
+        */
 normal_index:
        msg = ldb_msg_new(list);
        if (msg == NULL) {
@@ -718,13 +808,12 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 {
        struct ldb_kv_private *ldb_kv = talloc_get_type(
            ldb_module_get_private(module), struct ldb_kv_private);
-       TDB_DATA rec, key;
-       int ret;
-       struct dn_list *list2;
+       TDB_DATA rec = {0};
+       TDB_DATA key = {0};
 
-       if (ldb_kv->idxptr == NULL) {
-               return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
-       }
+       int ret = LDB_SUCCESS;
+       struct dn_list *list2 = NULL;
+       struct ldb_kv_idxptr *idxptr = NULL;
 
        key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
        if (key.dptr == NULL) {
@@ -732,7 +821,24 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
        }
        key.dsize = strlen((char *)key.dptr);
 
-       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       /*
+        * If there is an index sub transaction active, update the
+        * sub transaction index cache.  Otherwise update the
+        * primary index cache
+        */
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               idxptr = ldb_kv->nested_idx_ptr;
+       } else {
+               idxptr = ldb_kv->idxptr;
+       }
+       /*
+        * Get the cache entry for the index
+        *
+        * As the value in the cache is a pointer to a dn_list we update
+        * the dn_list directly.
+        *
+        */
+       rec = tdb_fetch(idxptr->itdb, key);
        if (rec.dptr != NULL) {
                list2 = ldb_kv_index_idxptr(module, rec);
                if (list2 == NULL) {
@@ -740,12 +846,18 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
                free(rec.dptr);
-               list2->dn = talloc_steal(list2, list->dn);
-               list2->count = list->count;
+               /* Now put the updated pointer back in the cache */
+               if (list->dn == NULL) {
+                       list2->dn = NULL;
+                       list2->count = 0;
+               } else {
+                       list2->dn = talloc_steal(list2, list->dn);
+                       list2->count = list->count;
+               }
                return LDB_SUCCESS;
        }
 
-       list2 = talloc(ldb_kv->idxptr, struct dn_list);
+       list2 = talloc(idxptr, struct dn_list);
        if (list2 == NULL) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -759,10 +871,13 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
        /*
         * This is not a store into the main DB, but into an in-memory
         * TDB, so we don't need a guard on ltdb->read_only
+        *
+        * Also as we directly update the in memory dn_list for existing
+        * cache entries we must be adding a new entry to the cache.
         */
-       ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+       ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
        if (ret != 0) {
-               return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
+               return ltdb_err_map( tdb_error(idxptr->itdb));
        }
        return LDB_SUCCESS;
 }
@@ -846,8 +961,11 @@ int ldb_kv_index_transaction_cancel(struct ldb_module *module)
        if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
                tdb_close(ldb_kv->idxptr->itdb);
        }
-       talloc_free(ldb_kv->idxptr);
-       ldb_kv->idxptr = NULL;
+       TALLOC_FREE(ldb_kv->idxptr);
+       if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
+               tdb_close(ldb_kv->nested_idx_ptr->itdb);
+       }
+       TALLOC_FREE(ldb_kv->nested_idx_ptr);
        return LDB_SUCCESS;
 }
 
@@ -1166,7 +1284,8 @@ static int ldb_kv_index_dn_simple(struct ldb_module *module,
         */
        if (!dn) return LDB_ERR_OPERATIONS_ERROR;
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
+                                 DN_LIST_WILL_BE_READ_ONLY);
        talloc_free(dn);
        return ret;
 }
@@ -1971,7 +2090,8 @@ static int ldb_kv_index_dn_attr(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
+                                 DN_LIST_WILL_BE_READ_ONLY);
        talloc_free(key);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -2560,7 +2680,8 @@ static int ldb_kv_index_add1(struct ldb_module *module,
        }
        talloc_steal(list, dn_key);
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+                                 DN_LIST_MUTABLE);
        if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                talloc_free(list);
                return ret;
@@ -3075,7 +3196,8 @@ int ldb_kv_index_del_value(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
+       ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
+                                 DN_LIST_MUTABLE);
        if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                /* it wasn't indexed. Did we have an earlier error? If we did then
                   its gone now */
@@ -3523,6 +3645,9 @@ int ldb_kv_reindex(struct ldb_module *module)
         * re-index
         */
        ldb_kv_index_transaction_cancel(module);
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               ldb_kv_index_sub_transaction_cancel(ldb_kv);
+       }
 
        /*
         * Calculate the size of the index cache that we'll need for
@@ -3533,6 +3658,9 @@ int ldb_kv_reindex(struct ldb_module *module)
                index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
        }
 
+       /*
+        * Note that we don't start an index sub transaction for re-indexing
+        */
        ret = ldb_kv_index_transaction_start(module, index_cache_size);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -3593,3 +3721,172 @@ int ldb_kv_reindex(struct ldb_module *module)
        }
        return LDB_SUCCESS;
 }
+
+/*
+ * Copy the contents of the nested transaction index cache record to the
+ * transaction index cache.
+ *
+ * During this 'commit' of the subtransaction to the main transaction
+ * (cache), care must be taken to free any existing index at the top
+ * level because otherwise we would leak memory.
+ */
+static int ldb_kv_sub_transaction_traverse(
+       struct tdb_context *tdb,
+       TDB_DATA key,
+       TDB_DATA data,
+       void *state)
+{
+       struct ldb_module *module = state;
+       struct ldb_kv_private *ldb_kv = talloc_get_type(
+           ldb_module_get_private(module), struct ldb_kv_private);
+       TDB_DATA rec = {0};
+       struct dn_list *index_in_subtransaction = NULL;
+       struct dn_list *index_in_top_level = NULL;
+       int ret = 0;
+
+       /*
+        * This unwraps the pointer in the DB into a pointer in
+        * memory, we are abusing TDB as a hash map, not a linearised
+        * database store
+        */
+       index_in_subtransaction = ldb_kv_index_idxptr(module, data);
+       if (index_in_subtransaction == NULL) {
+               ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+
+       /*
+        * Do we already have an entry in the primary transaction cache
+        * If so free it's dn_list and replace it with the dn_list from
+        * the secondary cache
+        *
+        * The TDB and so the fetched rec contains NO DATA, just a
+        * pointer to data held in memory.
+        */
+       rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+       if (rec.dptr != NULL) {
+               index_in_top_level = ldb_kv_index_idxptr(module, rec);
+               free(rec.dptr);
+               if (index_in_top_level == NULL) {
+                       abort();
+               }
+               /*
+                * We had this key at the top level.  However we made a copy
+                * at the sub-transaction level so that we could possibly
+                * roll back.  We have to free the top level index memory
+                * otherwise we would leak
+                */
+               if (index_in_top_level->count > 0) {
+                       TALLOC_FREE(index_in_top_level->dn);
+               }
+               index_in_top_level->dn
+                       = talloc_steal(index_in_top_level,
+                                      index_in_subtransaction->dn);
+               index_in_top_level->count = index_in_subtransaction->count;
+               return 0;
+       }
+
+       index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
+       if (index_in_top_level == NULL) {
+               ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+               return -1;
+       }
+       index_in_top_level->dn
+               = talloc_steal(index_in_top_level,
+                              index_in_subtransaction->dn);
+       index_in_top_level->count = index_in_subtransaction->count;
+
+       rec.dptr = (uint8_t *)&index_in_top_level;
+       rec.dsize = sizeof(void *);
+
+
+       /*
+        * This is not a store into the main DB, but into an in-memory
+        * TDB, so we don't need a guard on ltdb->read_only
+        */
+       ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+       if (ret != 0) {
+               ldb_kv->idxptr->error = ltdb_err_map(
+                   tdb_error(ldb_kv->idxptr->itdb));
+               return -1;
+       }
+       return 0;
+}
+
+/*
+ * Initialise the index cache for a sub transaction.
+ */
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+       ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * We use a tiny hash size for the sub-database (11).
+        *
+        * The sub-transaction is only for one record at a time, we
+        * would use a linked list but that would make the code even
+        * more complex when manipulating the index, as it would have
+        * to know if we were in a nested transaction (normal
+        * operations) or the top one (a reindex).
+        */
+       ldb_kv->nested_idx_ptr->itdb =
+               tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
+       if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+ * Clear the contents of the nested transaction index cache when the nested
+ * transaction is cancelled.
+ */
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+       if (ldb_kv->nested_idx_ptr != NULL) {
+               tdb_close(ldb_kv->nested_idx_ptr->itdb);
+               TALLOC_FREE(ldb_kv->nested_idx_ptr);
+       }
+       return LDB_SUCCESS;
+}
+
+/*
+ * Commit a nested transaction,
+ * Copy the contents of the nested transaction index cache to the
+ * transaction index cache.
+ */
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+       int ret = 0;
+
+       if (ldb_kv->nested_idx_ptr == NULL) {
+               return LDB_SUCCESS;
+       }
+       if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+               return LDB_SUCCESS;
+       }
+       tdb_traverse(
+           ldb_kv->nested_idx_ptr->itdb,
+           ldb_kv_sub_transaction_traverse,
+           ldb_kv->module);
+       tdb_close(ldb_kv->nested_idx_ptr->itdb);
+       ldb_kv->nested_idx_ptr->itdb = NULL;
+
+       ret = ldb_kv->nested_idx_ptr->error;
+       if (ret != LDB_SUCCESS) {
+               struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
+               if (!ldb_errstring(ldb)) {
+                       ldb_set_errstring(ldb, ldb_strerror(ret));
+               }
+               ldb_asprintf_errstring(
+                       ldb,
+                       __location__": Failed to update index records in "
+                       "sub transaction commit: %s",
+                       ldb_errstring(ldb));
+       }
+       TALLOC_FREE(ldb_kv->nested_idx_ptr);
+       return ret;
+}
index 462a35c4a30f93776e65e2a77214f8d1aa6537d9..f47cb28e3f01d3c3955bcfa311f82065fd245397 100755 (executable)
@@ -1790,7 +1790,7 @@ class IndexedAddModifyTests(AddModifyTests):
     def setUp(self):
         if not hasattr(self, 'index'):
             self.index = {"dn": "@INDEXLIST",
-                          "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
+                          "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID", b"z"],
                           "@IDXONE": [b"1"]}
         super(IndexedAddModifyTests, self).setUp()
 
@@ -1862,6 +1862,16 @@ class IndexedAddModifyTests(AddModifyTests):
                     "x": "z", "y": "a",
                     "objectUUID": b"0123456789abcde2"})
 
+    def test_duplicate_index_values(self):
+        self.l.add({"dn": "OU=DIV1,DC=SAMBA,DC=ORG",
+                    "name": b"Admins",
+                    "z": "1",
+                    "objectUUID": b"0123456789abcdff"})
+        self.l.add({"dn": "OU=DIV2,DC=SAMBA,DC=ORG",
+                    "name": b"Admins",
+                    "z": "1",
+                    "objectUUID": b"0123456789abcdfd"})
+
 
 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
     """Test searches using the index, to ensure the index doesn't
@@ -2071,6 +2081,62 @@ class BadIndexTests(LdbBaseTest):
             # https://bugzilla.samba.org/show_bug.cgi?id=13361
             self.assertEqual(len(res), 4)
 
+    def test_modify_transaction(self):
+        self.ldb.add({"dn": "x=y,dc=samba,dc=org",
+                      "objectUUID": b"0123456789abcde1",
+                      "y": "2",
+                      "z": "2"})
+
+        res = self.ldb.search(expression="(y=2)",
+                              base="dc=samba,dc=org")
+        self.assertEqual(len(res), 1)
+
+        self.ldb.add({"dn": "@ATTRIBUTES",
+                      "y": "UNIQUE_INDEX"})
+
+        self.ldb.transaction_start()
+
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
+        m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
+        m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
+
+        try:
+            self.ldb.modify(m)
+            self.fail()
+
+        except ldb.LdbError as err:
+            enum = err.args[0]
+            self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
+
+        try:
+            self.ldb.transaction_commit()
+            # We should fail here, but we want to be sure
+            # we fail below
+
+        except ldb.LdbError as err:
+            enum = err.args[0]
+            self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
+
+        # The index should still be pointing to x=y
+        res = self.ldb.search(expression="(y=2)",
+                              base="dc=samba,dc=org")
+        self.assertEqual(len(res), 1)
+
+        try:
+            self.ldb.add({"dn": "x=y2,dc=samba,dc=org",
+                        "objectUUID": b"0123456789abcde2",
+                        "y": "2"})
+            self.fail("Added unique attribute twice")
+        except ldb.LdbError as err:
+            enum = err.args[0]
+            self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
+
+        res = self.ldb.search(expression="(y=2)",
+                              base="dc=samba,dc=org")
+        self.assertEqual(len(res), 1)
+        self.assertEqual(str(res[0].dn), "x=y,dc=samba,dc=org")
+
     def tearDown(self):
         super(BadIndexTests, self).tearDown()
 
@@ -2084,6 +2150,20 @@ class GUIDBadIndexTests(BadIndexTests):
         super(GUIDBadIndexTests, self).setUp()
 
 
+class GUIDBadIndexTestsLmdb(BadIndexTests):
+
+    def setUp(self):
+        if os.environ.get('HAVE_LMDB', '1') == '0':
+            self.skipTest("No lmdb backend")
+        self.prefix = MDB_PREFIX
+        self.index = MDB_INDEX_OBJ
+        self.IDXGUID = True
+        super(GUIDBadIndexTestsLmdb, self).setUp()
+
+    def tearDown(self):
+        super(GUIDBadIndexTestsLmdb, self).tearDown()
+
+
 class DnTests(TestCase):
 
     def setUp(self):