ldb: Intersect the index from SCOPE_ONELEVEL with the index for the search expression
authorAndrew Bartlett <abartlet@samba.org>
Mon, 18 Dec 2017 03:22:01 +0000 (16:22 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 20 Dec 2017 03:22:09 +0000 (04:22 +0100)
This helps ensure we do not have to scan all objects at this level
which could be very many (one per DNS zone entry).

However, due to the O(n*m) behaviour in list_intersect() for older
databases, we only do this in the GUID index mode, leaving the behaviour
unchanged for existing callers that do not specify the GUID index mode.

NOTE WELL: the behaviour of disallowDNFilter is enforced
in the index code, so this fixes SCOPE_ONELEVEL to also
honour disallowDNFilter, hence the additional tests.

The change to select the SUBTREE index in the absense of
the ONELEVEL index enforces this.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=13191

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
lib/ldb/ldb_tdb/ldb_index.c
lib/ldb/tests/python/api.py

index 0afeae57a607280491817446c00f0966c247512f..f2fce42eac723beddb7fe82cfec26e519c8b87b6 100644 (file)
@@ -151,6 +151,13 @@ ldb_schema_set_override_GUID_index() must be called.
 struct dn_list {
        unsigned int count;
        struct ldb_val *dn;
+       /*
+        * Do not optimise the intersection of this list,
+        * we must never return an entry not in this
+        * list.  This allows the index for
+        * SCOPE_ONELEVEL to be trusted.
+        */
+       bool strict;
 };
 
 struct ltdb_idxptr {
@@ -1029,10 +1036,10 @@ static bool list_intersect(struct ldb_context *ldb,
           what really matches, as all results are filtered by the
           full expression at the end - this shortcut avoids a lot of
           work in some cases */
-       if (list->count < 2 && list2->count > 10) {
+       if (list->count < 2 && list2->count > 10 && list2->strict == false) {
                return true;
        }
-       if (list2->count < 2 && list->count > 10) {
+       if (list2->count < 2 && list->count > 10 && list->strict == false) {
                list->count = list2->count;
                list->dn = list2->dn;
                /* note that list2 may not be the parent of list2->dn,
@@ -1073,6 +1080,7 @@ static bool list_intersect(struct ldb_context *ldb,
                }
        }
 
+       list->strict |= list2->strict;
        list->dn = talloc_steal(list, list3->dn);
        list->count = list3->count;
        talloc_free(list3);
@@ -1411,6 +1419,8 @@ static int ltdb_index_dn_one(struct ldb_module *module,
                             struct ldb_dn *parent_dn,
                             struct dn_list *list)
 {
+       /* Ensure we do not shortcut on intersection for this list */
+       list->strict = true;
        return ltdb_index_dn_attr(module, ltdb,
                                  LTDB_IDXONE, parent_dn, list);
 }
@@ -1630,9 +1640,11 @@ static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
 */
 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
 {
+       struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
        struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
        struct dn_list *dn_list;
        int ret;
+       enum ldb_scope index_scope;
 
        /* see if indexing is enabled */
        if (!ltdb->cache->attribute_indexes &&
@@ -1647,7 +1659,19 @@ int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
                return ldb_module_oom(ac->module);
        }
 
-       switch (ac->scope) {
+       /*
+        * For the purposes of selecting the switch arm below, if we
+        * don't have a one-level index then treat it like a subtree
+        * search
+        */
+       if (ac->scope == LDB_SCOPE_ONELEVEL &&
+           !ltdb->cache->one_level_indexes) {
+               index_scope = LDB_SCOPE_SUBTREE;
+       } else {
+               index_scope = ac->scope;
+       }
+
+       switch (index_scope) {
        case LDB_SCOPE_BASE:
                /*
                 * If we ever start to also load the index values for
@@ -1663,10 +1687,6 @@ int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
                break;
 
        case LDB_SCOPE_ONELEVEL:
-               if (!ltdb->cache->one_level_indexes) {
-                       talloc_free(dn_list);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
                /*
                 * If we ever start to also load the index values for
                 * the tree, we must ensure we strictly intersect with
@@ -1677,6 +1697,50 @@ int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
                        talloc_free(dn_list);
                        return ret;
                }
+
+               /*
+                * If we have too many matches, running the filter
+                * tree over the SCOPE_ONELEVEL can be quite expensive
+                * so we now check the filter tree index as well.
+                *
+                * We only do this in the GUID index mode, which is
+                * O(n*log(m)) otherwise the intersection below will
+                * be too costly at O(n*m).
+                *
+                * We don't set a heuristic for 'too many' but instead
+                * do it always and rely on the index lookup being
+                * fast enough in the small case.
+                */
+               if (ltdb->cache->GUID_index_attribute != NULL) {
+                       struct dn_list *idx_one_tree_list
+                               = talloc_zero(ac, struct dn_list);
+                       if (idx_one_tree_list == NULL) {
+                               return ldb_module_oom(ac->module);
+                       }
+
+                       if (!ltdb->cache->attribute_indexes) {
+                               talloc_free(idx_one_tree_list);
+                               talloc_free(dn_list);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+                       /*
+                        * Here we load the index for the tree.
+                        */
+                       ret = ltdb_index_dn(ac->module, ltdb, ac->tree,
+                                           idx_one_tree_list);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(idx_one_tree_list);
+                               talloc_free(dn_list);
+                               return ret;
+                       }
+
+                       if (!list_intersect(ldb, ltdb,
+                                           dn_list, idx_one_tree_list)) {
+                               talloc_free(idx_one_tree_list);
+                               talloc_free(dn_list);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+               }
                break;
 
        case LDB_SCOPE_SUBTREE:
index 14bbdcb1718897925eaa3ba67bcbb14d2c1e9cac..409f446f1ea7f185b3ae0cac561e39221512e437 100755 (executable)
@@ -794,7 +794,16 @@ class SearchTests(TestCase):
 
     def test_check_base_error(self):
         """Testing a search"""
-        self.l.add({"dn": "@OPTIONS", "checkBaseOnSearch": b"TRUE"})
+        checkbaseonsearch = {"dn": "@OPTIONS",
+                             "checkBaseOnSearch": b"TRUE"}
+        try:
+            self.l.add(checkbaseonsearch)
+        except ldb.LdbError as err:
+            enum = err.args[0]
+            self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
+            m = ldb.Message.from_dict(self.l,
+                                      checkbaseonsearch)
+            self.l.modify(m)
 
         try:
             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
@@ -973,26 +982,149 @@ class SearchTests(TestCase):
                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
         self.assertEqual(len(res11), 0)
 
+    def test_dn_filter_one(self):
+        """Testing that a dn= filter succeeds
+        (or fails with disallowDNFilter
+        set and IDXGUID or (IDX and not IDXONE) mode)
+        when the scope is SCOPE_ONELEVEL.
+
+        This should be made more consistent, but for now lock in
+        the behaviour
+
+        """
+
+        res11 = self.l.search(base="DC=SAMBA,DC=ORG",
+                              scope=ldb.SCOPE_ONELEVEL,
+                              expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
+        if hasattr(self, 'disallowDNFilter') and \
+           hasattr(self, 'IDX') and \
+           (hasattr(self, 'IDXGUID') or \
+            ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
+            self.assertEqual(len(res11), 0)
+        else:
+            self.assertEqual(len(res11), 1)
+
+    def test_dn_filter_subtree(self):
+        """Testing that a dn= filter succeeds
+        (or fails with disallowDNFilter set)
+        when the scope is SCOPE_SUBTREE"""
+
+        res11 = self.l.search(base="DC=SAMBA,DC=ORG",
+                              scope=ldb.SCOPE_SUBTREE,
+                              expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
+        if hasattr(self, 'disallowDNFilter') \
+           and hasattr(self, 'IDX'):
+            self.assertEqual(len(res11), 0)
+        else:
+            self.assertEqual(len(res11), 1)
+
+    def test_dn_filter_base(self):
+        """Testing that (incorrectly) a dn= filter works
+        when the scope is SCOPE_BASE"""
+
+        res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
+                              scope=ldb.SCOPE_BASE,
+                              expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
+
+        # At some point we should fix this, but it isn't trivial
+        self.assertEqual(len(res11), 1)
+
+
 
 class IndexedSearchTests(SearchTests):
     """Test searches using the index, to ensure the index doesn't
        break things"""
     def setUp(self):
         super(IndexedSearchTests, self).setUp()
+        self.l.add({"dn": "@INDEXLIST",
+                    "@IDXATTR": [b"x", b"y", b"ou"]})
+        self.IDX = True
+
+class IndexedSearchDnFilterTests(SearchTests):
+    """Test searches using the index, to ensure the index doesn't
+       break things"""
+    def setUp(self):
+        super(IndexedSearchDnFilterTests, self).setUp()
+        self.l.add({"dn": "@OPTIONS",
+                    "disallowDNFilter": "TRUE"})
+        self.disallowDNFilter = True
+
+        self.l.add({"dn": "@INDEXLIST",
+                    "@IDXATTR": [b"x", b"y", b"ou"]})
+        self.IDX = True
+
+class IndexedAndOneLevelSearchTests(SearchTests):
+    """Test searches using the index including @IDXONE, to ensure
+       the index doesn't break things"""
+    def setUp(self):
+        super(IndexedAndOneLevelSearchTests, self).setUp()
+        self.l.add({"dn": "@INDEXLIST",
+                    "@IDXATTR": [b"x", b"y", b"ou"],
+                    "@IDXONE": [b"1"]})
+        self.IDX = True
+
+class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
+    """Test searches using the index including @IDXONE, to ensure
+       the index doesn't break things"""
+    def setUp(self):
+        super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
+        self.l.add({"dn": "@OPTIONS",
+                    "disallowDNFilter": "TRUE"})
+        self.disallowDNFilter = True
+
         self.l.add({"dn": "@INDEXLIST",
                     "@IDXATTR": [b"x", b"y", b"ou"],
                     "@IDXONE": [b"1"]})
+        self.IDX = True
+        self.IDXONE = True
 
 class GUIDIndexedSearchTests(SearchTests):
     """Test searches using the index, to ensure the index doesn't
        break things"""
     def setUp(self):
         super(GUIDIndexedSearchTests, self).setUp()
+
+        self.l.add({"dn": "@INDEXLIST",
+                    "@IDXATTR": [b"x", b"y", b"ou"],
+                    "@IDXGUID": [b"objectUUID"],
+                    "@IDX_DN_GUID": [b"GUID"]})
+        self.IDXGUID = True
+        self.IDXONE = True
+
+class GUIDIndexedDNFilterSearchTests(SearchTests):
+    """Test searches using the index, to ensure the index doesn't
+       break things"""
+    def setUp(self):
+        super(GUIDIndexedDNFilterSearchTests, self).setUp()
+        self.l.add({"dn": "@OPTIONS",
+                    "disallowDNFilter": "TRUE"})
+        self.disallowDNFilter = True
+
+        self.l.add({"dn": "@INDEXLIST",
+                    "@IDXATTR": [b"x", b"y", b"ou"],
+                    "@IDXGUID": [b"objectUUID"],
+                    "@IDX_DN_GUID": [b"GUID"]})
+        self.IDX = True
+        self.IDXGUID = True
+
+class GUIDAndOneLevelIndexedSearchTests(SearchTests):
+    """Test searches using the index including @IDXONE, to ensure
+       the index doesn't break things"""
+    def setUp(self):
+        super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
+        self.l.add({"dn": "@OPTIONS",
+                    "disallowDNFilter": "TRUE"})
+        self.disallowDNFilter = True
+
         self.l.add({"dn": "@INDEXLIST",
                     "@IDXATTR": [b"x", b"y", b"ou"],
                     "@IDXONE": [b"1"],
                     "@IDXGUID": [b"objectUUID"],
                     "@IDX_DN_GUID": [b"GUID"]})
+        self.IDX = True
+        self.IDXGUID = True
+        self.IDXONE = True
+
 
 class AddModifyTests(TestCase):
     def tearDown(self):