CVE-2020-10760 dsdb: Add tests for paged_results and VLV over the Global Catalog...
authorAndrew Bartlett <abartlet@samba.org>
Mon, 8 Jun 2020 04:32:14 +0000 (16:32 +1200)
committerKarolin Seeger <kseeger@samba.org>
Thu, 2 Jul 2020 09:01:41 +0000 (09:01 +0000)
This should avoid a regression.

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
selftest/knownfail.d/vlv
source4/dsdb/tests/python/vlv.py

index f187a2ed55eb3d5ea0dedf91e3edc1828a52c4dc..7ae02baf17b872cd2435067e019a02a9be51964e 100644 (file)
@@ -1,2 +1,2 @@
 samba4.ldap.vlv.python.*__main__.VLVTests.test_vlv_change_search_expr
-samba4.ldap.vlv.python.*__main__.PagedResultsTests.test_paged_cant_change_controls_data
+samba4.ldap.vlv.python.*__main__.PagedResultsTestsRW.test_paged_cant_change_controls_data
index 0adde9799f01683f456dfa1f29add6284fd7d42f..90a29ab6ec2decd3de03a298e6b9680b4c8e84c7 100644 (file)
@@ -152,7 +152,7 @@ class TestsWithUserOU(samba.tests.TestCase):
         super(TestsWithUserOU, self).setUp()
         self.ldb = SamDB(host, credentials=creds,
                          session_info=system_session(lp), lp=lp)
-
+        self.ldb_ro = self.ldb
         self.base_dn = self.ldb.domain_dn()
         self.tree_dn = "ou=vlvtesttree,%s" % self.base_dn
         self.ou = "ou=vlvou,%s" % self.tree_dn
@@ -199,8 +199,60 @@ class TestsWithUserOU(samba.tests.TestCase):
             self.ldb.delete(self.tree_dn, ['tree_delete:1'])
 
 
-class VLVTests(TestsWithUserOU):
+class VLVTestsBase(TestsWithUserOU):
+
+    # Run a vlv search and return important fields of the response control
+    def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
+        sort_ctrl = "server_sort:1:0:%s" % attr
+        ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
+        if cookie:
+            ctrl += ":" + cookie
+
+        res = self.ldb_ro.search(self.ou,
+                              expression=expr,
+                              scope=ldb.SCOPE_ONELEVEL,
+                              attrs=[attr],
+                              controls=[ctrl, sort_ctrl])
+        results = [str(x[attr][0]) for x in res]
+
+        ctrls = [str(c) for c in res.controls if
+                 str(c).startswith('vlv')]
+        self.assertEqual(len(ctrls), 1)
+
+        spl = ctrls[0].rsplit(':')
+        cookie = ""
+        if len(spl) == 6:
+            cookie = spl[-1]
+
+        return results, cookie
+
+
+class VLVTestsRO(VLVTestsBase):
+    def test_vlv_simple_double_run(self):
+        """Do the simplest possible VLV query to confirm if VLV
+           works at all.  Useful for showing VLV as a whole works
+           on Global Catalog (for example)"""
+        attr = 'roomNumber'
+        expr = "(objectclass=user)"
 
+        # Start new search
+        full_results, cookie = self.vlv_search(attr, expr,
+                                               after_count=len(self.users))
+
+        results, cookie = self.vlv_search(attr, expr, cookie=cookie,
+                                          after_count=len(self.users))
+        expected_results = full_results
+        self.assertEqual(results, expected_results)
+
+
+class VLVTestsGC(VLVTestsRO):
+    def setUp(self):
+        super(VLVTestsRO, self).setUp()
+        self.ldb_ro = SamDB(host + ":3268", credentials=creds,
+                            session_info=system_session(lp), lp=lp)
+
+
+class VLVTests(VLVTestsBase):
     def get_full_list(self, attr, include_cn=False):
         """Fetch the whole list sorted on the attribute, using the VLV.
         This way you get a VLV cookie."""
@@ -1081,31 +1133,6 @@ class VLVTests(TestsWithUserOU):
                        controls=[sort_control,
                                  "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
 
-    # Run a vlv search and return important fields of the response control
-    def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
-        sort_ctrl = "server_sort:1:0:%s" % attr
-        ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
-        if cookie:
-            ctrl += ":" + cookie
-
-        res = self.ldb.search(self.ou,
-                              expression=expr,
-                              scope=ldb.SCOPE_ONELEVEL,
-                              attrs=[attr],
-                              controls=[ctrl, sort_ctrl])
-        results = [str(x[attr][0]) for x in res]
-
-        ctrls = [str(c) for c in res.controls if
-                 str(c).startswith('vlv')]
-        self.assertEqual(len(ctrls), 1)
-
-        spl = ctrls[0].rsplit(':')
-        cookie = ""
-        if len(spl) == 6:
-            cookie = spl[-1]
-
-        return results, cookie
-
     def test_vlv_modify_during_view(self):
         attr = 'roomNumber'
         expr = "(objectclass=user)"
@@ -1219,11 +1246,11 @@ class PagedResultsTests(TestsWithUserOU):
         if subtree:
             scope = ldb.SCOPE_SUBTREE
 
-        res = self.ldb.search(ou,
-                              expression=expr,
-                              scope=scope,
-                              controls=controls,
-                              **kwargs)
+        res = self.ldb_ro.search(ou,
+                                 expression=expr,
+                                 scope=scope,
+                                 controls=controls,
+                                 **kwargs)
         results = [str(r['cn'][0]) for r in res]
 
         ctrls = [str(c) for c in res.controls if
@@ -1236,6 +1263,53 @@ class PagedResultsTests(TestsWithUserOU):
             cookie = spl[-1]
         return results, cookie
 
+
+class PagedResultsTestsRO(PagedResultsTests):
+
+    def test_paged_search_lockstep(self):
+        expr = "(objectClass=*)"
+        ps = 3
+
+        all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
+
+        # Run two different but overlapping paged searches simultaneously.
+        set_1_index = int((len(all_results))//3)
+        set_2_index = int((2*len(all_results))//3)
+        set_1 = all_results[set_1_index:]
+        set_2 = all_results[:set_2_index+1]
+        set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
+        set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
+
+        results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
+        self.assertEqual(results, set_1[:ps])
+        results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
+        self.assertEqual(results, set_2[:ps])
+
+        results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
+                                             page_size=ps)
+        self.assertEqual(results, set_1[ps:ps*2])
+        results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
+                                             page_size=ps)
+        self.assertEqual(results, set_2[ps:ps*2])
+
+        results, _ = self.paged_search(set_1_expr, cookie=cookie1,
+                                       page_size=len(self.users))
+        self.assertEqual(results, set_1[ps*2:])
+        results, _ = self.paged_search(set_2_expr, cookie=cookie2,
+                                       page_size=len(self.users))
+        self.assertEqual(results, set_2[ps*2:])
+
+
+class PagedResultsTestsGC(PagedResultsTestsRO):
+
+    def setUp(self):
+        super(PagedResultsTestsRO, self).setUp()
+        self.ldb_ro = SamDB(host + ":3268", credentials=creds,
+                            session_info=system_session(lp), lp=lp)
+
+
+class PagedResultsTestsRW(PagedResultsTests):
+
     def test_paged_delete_during_search(self, sort=True):
         expr = "(objectClass=*)"
 
@@ -1637,39 +1711,6 @@ class PagedResultsTests(TestsWithUserOU):
                                      cookie, attrs=changed_attrs,
                                      extra_ctrls=[])
 
-    def test_paged_search_lockstep(self):
-        expr = "(objectClass=*)"
-        ps = 3
-
-        all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
-
-        # Run two different but overlapping paged searches simultaneously.
-        set_1_index = int((len(all_results))//3)
-        set_2_index = int((2*len(all_results))//3)
-        set_1 = all_results[set_1_index:]
-        set_2 = all_results[:set_2_index+1]
-        set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
-        set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
-
-        results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
-        self.assertEqual(results, set_1[:ps])
-        results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
-        self.assertEqual(results, set_2[:ps])
-
-        results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
-                                             page_size=ps)
-        self.assertEqual(results, set_1[ps:ps*2])
-        results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
-                                             page_size=ps)
-        self.assertEqual(results, set_2[ps:ps*2])
-
-        results, _ = self.paged_search(set_1_expr, cookie=cookie1,
-                                       page_size=len(self.users))
-        self.assertEqual(results, set_1[ps*2:])
-        results, _ = self.paged_search(set_2_expr, cookie=cookie2,
-                                       page_size=len(self.users))
-        self.assertEqual(results, set_2[ps*2:])
-
     def test_vlv_paged(self):
         """Testing behaviour with VLV and paged_results set.