paged results: testing suite for new paged results module
authorAaron Haslett <aaronhaslett@catalyst.net.nz>
Mon, 12 Nov 2018 01:35:40 +0000 (14:35 +1300)
committerGary Lockyer <gary@samba.org>
Fri, 21 Dec 2018 10:10:30 +0000 (11:10 +0100)
Testing the new GUID list based paged results module

Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz>
Reviewed-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Autobuild-User(master): Gary Lockyer <gary@samba.org>
Autobuild-Date(master): Fri Dec 21 11:10:30 CET 2018 on sn-devel-144

selftest/knownfail.d/vlv
source4/dsdb/tests/python/vlv.py

index ee4970cad8bce8faec19774bbbbe5116ec487ff1..f187a2ed55eb3d5ea0dedf91e3edc1828a52c4dc 100644 (file)
@@ -1 +1,2 @@
 samba4.ldap.vlv.python.*__main__.VLVTests.test_vlv_change_search_expr
+samba4.ldap.vlv.python.*__main__.PagedResultsTests.test_paged_cant_change_controls_data
index 04bd89926cff17d7e43ee8cfb2c41d90989a945e..8550a38e2870cffbc9fe180f9d2fca30d8bba0a2 100644 (file)
@@ -95,7 +95,7 @@ def get_cookie(controls, expected_n=None):
     raise ValueError("there is no VLV response")
 
 
-class VLVTests(samba.tests.TestCase):
+class TestsWithUserOU(samba.tests.TestCase):
 
     def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
         name = "%s%d%s" % (prefix, i, suffix)
@@ -148,7 +148,7 @@ class VLVTests(samba.tests.TestCase):
         return user
 
     def setUp(self):
-        super(VLVTests, self).setUp()
+        super(TestsWithUserOU, self).setUp()
         self.ldb = SamDB(host, credentials=creds,
                          session_info=system_session(lp), lp=lp)
 
@@ -189,10 +189,13 @@ class VLVTests(samba.tests.TestCase):
         self.delicate_keys = ['cn']
 
     def tearDown(self):
-        super(VLVTests, self).tearDown()
+        super(TestsWithUserOU, self).tearDown()
         if not opts.delete_in_setup:
             self.ldb.delete(self.ou, ['tree_delete:1'])
 
+
+class VLVTests(TestsWithUserOU):
+
     def get_full_list(self, attr, include_cn=False):
         """Fetch the whole list sorted on the attribute, using the VLV.
         This way you get a VLV cookie."""
@@ -1183,6 +1186,417 @@ class VLVTests(samba.tests.TestCase):
         expected_results = [r for r in full_results if r != del_user[attr]]
         self.assertEqual(results, expected_results)
 
+
+class PagedResultsTests(TestsWithUserOU):
+
+    def paged_search(self, expr, cookie="", page_size=0, extra_ctrls=None,
+                     attrs=None, ou=None, subtree=False):
+        ou = ou or self.ou
+        if cookie:
+            cookie = ":" + cookie
+        ctrl = "paged_results:1:" + str(page_size) + cookie
+        controls = [ctrl]
+
+        # If extra controls are provided then add them, else default to
+        # sort control on 'cn' attribute
+        if extra_ctrls is not None:
+            controls += extra_ctrls
+        else:
+            sort_ctrl = "server_sort:1:0:cn"
+            controls.append(sort_ctrl)
+
+        kwargs = {}
+        if attrs is not None:
+            kwargs = {"attrs": attrs}
+
+        scope = ldb.SCOPE_ONELEVEL
+        if subtree:
+            scope = ldb.SCOPE_SUBTREE
+
+        res = self.ldb.search(ou,
+                              expression=expr,
+                              scope=scope,
+                              controls=controls,
+                              **kwargs)
+        results = [str(r['cn'][0]) for r in res]
+
+        ctrls = [str(c) for c in res.controls if
+                 str(c).startswith("paged_results")]
+        assert len(ctrls) == 1, "no paged_results response"
+
+        spl = ctrls[0].rsplit(':', 3)
+        cookie = ""
+        if len(spl) == 3:
+            cookie = spl[-1]
+        return results, cookie
+
+    def test_paged_delete_during_search(self):
+        expr = "(objectClass=*)"
+
+        # Start new search
+        first_page_size = 3
+        results, cookie = self.paged_search(expr, page_size=first_page_size)
+
+        # Run normal search to get expected results
+        unedited_results, _ = self.paged_search(expr,
+                                                page_size=len(self.users))
+
+        # Get remaining users not returned by the search above
+        unreturned_users = [u for u in self.users if u['cn'] not in results]
+
+        # Delete one of the users
+        del_index = len(self.users)//2
+        del_user = unreturned_users[del_index]
+        self.ldb.delete(del_user['dn'])
+
+        # Run test
+        results, _ = self.paged_search(expr, cookie=cookie,
+                                       page_size=len(self.users))
+        expected_results = [r for r in unedited_results[first_page_size:]
+                            if r != del_user['cn']]
+        self.assertEqual(results, expected_results)
+
+    def test_paged_show_deleted(self):
+        unique = time.strftime("%s", time.gmtime())[-5:]
+        prefix = "show_deleted_test_%s_" % (unique)
+        expr = "(&(objectClass=user)(cn=%s*))" % (prefix)
+        del_ctrl = "show_deleted:1"
+
+        num_users = 10
+        users = []
+        for i in range(num_users):
+            user = self.create_user(i, num_users, prefix=prefix)
+            users.append(user)
+
+        first_user = users[0]
+        self.ldb.delete(first_user['dn'])
+
+        # Start new search
+        first_page_size = 3
+        results, cookie = self.paged_search(expr, page_size=first_page_size,
+                                            extra_ctrls=[del_ctrl],
+                                            ou=self.base_dn,
+                                            subtree=True)
+
+        # Get remaining users not returned by the search above
+        unreturned_users = [u for u in users if u['cn'] not in results]
+
+        # Delete one of the users
+        del_index = len(users)//2
+        del_user = unreturned_users[del_index]
+        self.ldb.delete(del_user['dn'])
+
+        results2, _ = self.paged_search(expr, cookie=cookie,
+                                        page_size=len(users)*2,
+                                        extra_ctrls=[del_ctrl],
+                                        ou=self.base_dn,
+                                        subtree=True)
+
+        user_cns = {str(u['cn']) for u in users}
+        deleted_cns = {first_user['cn'], del_user['cn']}
+
+        all_results = results + results2
+        normal_results = {r for r in all_results if "DEL:" not in r}
+        self.assertEqual(normal_results, user_cns - deleted_cns)
+
+        # Deleted results get "\nDEL:<GUID>" added to the CN, so cut it out.
+        deleted_results = {r[:r.index('\n')] for r in all_results
+                           if "DEL:" in r}
+        self.assertEqual(deleted_results, deleted_cns)
+
+    def test_paged_add_during_search(self):
+        expr = "(objectClass=*)"
+
+        # Start new search
+        first_page_size = 3
+        results, cookie = self.paged_search(expr, page_size=first_page_size)
+
+        unedited_results, _ = self.paged_search(expr,
+                                                page_size=len(self.users)+1)
+
+        # Get remaining users not returned by the search above
+        unwalked_users = [cn for cn in unedited_results if cn not in results]
+
+        # Add a user in the middle of the sort order
+        middle_index = len(unwalked_users)//2
+        middle_user = unwalked_users[middle_index]
+
+        user = {'cn': middle_user + '_2', "objectclass": "user"}
+        user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
+        self.ldb.add(user)
+
+        results, _ = self.paged_search(expr, cookie=cookie,
+                                       page_size=len(self.users)+1)
+        expected_results = unwalked_users[:]
+
+        # Uncomment this line to assert that adding worked.
+        # expected_results.insert(middle_index+1, user['cn'])
+
+        self.assertEqual(results, expected_results)
+
+    def test_paged_modify_during_search(self):
+        expr = "(objectClass=*)"
+
+        # Start new search
+        first_page_size = 3
+        results, cookie = self.paged_search(expr, page_size=first_page_size)
+
+        unedited_results, _ = self.paged_search(expr,
+                                                page_size=len(self.users)+1)
+
+        # Modify user in the middle of the remaining sort order
+        unwalked_users = [cn for cn in unedited_results if cn not in results]
+        middle_index = len(unwalked_users)//2
+        middle_cn = unwalked_users[middle_index]
+
+        # Find user object
+        users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
+        self.assertEqual(len(users_with_middle_cn), 1)
+        middle_user = users_with_middle_cn[0]
+
+        # Rename object
+        edit_cn = "z_" + middle_cn
+        new_dn = middle_user['dn'].replace(middle_cn, edit_cn)
+        self.ldb.rename(middle_user['dn'], new_dn)
+
+        results, _ = self.paged_search(expr, cookie=cookie,
+                                       page_size=len(self.users)+1)
+        expected_results = unwalked_users[:]
+        expected_results[middle_index] = edit_cn
+        self.assertEqual(results, expected_results)
+
+    def test_paged_modify_object_scope(self):
+        expr = "(objectClass=*)"
+
+        ou2 = "OU=vlvtestou2,%s" % (self.base_dn)
+        try:
+            self.ldb.delete(ou2, ['tree_delete:1'])
+        except ldb.LdbError:
+            pass
+        self.ldb.add({"dn": ou2, "objectclass": "organizationalUnit"})
+
+        # Do a separate, full search to get all results
+        unedited_results, _ = self.paged_search(expr,
+                                                page_size=len(self.users)+1)
+
+        # Rename before starting a search
+        first_cn = self.users[0]['cn']
+        new_dn = "CN=%s,%s" % (first_cn, ou2)
+        self.ldb.rename(self.users[0]['dn'], new_dn)
+
+        # Start new search under the original OU
+        first_page_size = 3
+        results, cookie = self.paged_search(expr, page_size=first_page_size)
+        self.assertEqual(results, unedited_results[1:1+first_page_size])
+
+        # Get one of the users that is yet to be returned
+        unwalked_users = [cn for cn in unedited_results if cn not in results]
+        middle_index = len(unwalked_users)//2
+        middle_cn = unwalked_users[middle_index]
+
+        # Find user object
+        users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
+        self.assertEqual(len(users_with_middle_cn), 1)
+        middle_user = users_with_middle_cn[0]
+
+        # Rename
+        new_dn = "CN=%s,%s" % (middle_cn, ou2)
+        self.ldb.rename(middle_user['dn'], new_dn)
+
+        results, _ = self.paged_search(expr, cookie=cookie,
+                                       page_size=len(self.users)+1)
+
+        expected_results = unwalked_users[:]
+
+        # We should really expect that the object renamed into a different
+        # OU should vanish from the results, but turns out Windows does return
+        # the object in this case.  Our module matches the Windows behaviour.
+
+        # If behaviour changes, this line inverts the test's expectations to
+        # what you might expect.
+        # del expected_results[middle_index]
+
+        # But still expect the user we removed before the search to be gone
+        del expected_results[0]
+
+        self.assertEqual(results, expected_results)
+
+    def assertPagedSearchRaises(self, err_num, expr, cookie, attrs=None,
+                                extra_ctrls=None):
+        try:
+            results, _ = self.paged_search(expr, cookie=cookie,
+                                           page_size=2,
+                                           extra_ctrls=extra_ctrls,
+                                           attrs=attrs)
+        except ldb.LdbError as e:
+            self.assertEqual(e.args[0], err_num)
+            return
+
+        self.fail("No error raised by invalid search")
+
+    def test_paged_changed_expr(self):
+        # Initiate search then use a different expr in subsequent req
+        expr = "(objectClass=*)"
+        results, cookie = self.paged_search(expr, page_size=3)
+        expr = "cn>=a"
+        expected_error_num = 12
+        self.assertPagedSearchRaises(expected_error_num, expr, cookie)
+
+    def test_paged_changed_controls(self):
+        expr = "(objectClass=*)"
+        sort_ctrl = "server_sort:1:0:cn"
+        del_ctrl = "show_deleted:1"
+        expected_error_num = 12
+        ps = 3
+
+        # Initiate search with a sort control then remove in subsequent req
+        results, cookie = self.paged_search(expr, page_size=ps,
+                                            extra_ctrls=[sort_ctrl])
+        self.assertPagedSearchRaises(expected_error_num, expr,
+                                     cookie, extra_ctrls=[])
+
+        # Initiate search with no sort control then add one in subsequent req
+        results, cookie = self.paged_search(expr, page_size=ps,
+                                            extra_ctrls=[])
+        self.assertPagedSearchRaises(expected_error_num, expr,
+                                     cookie, extra_ctrls=[sort_ctrl])
+
+        # Initiate search with show-deleted control then
+        # remove it in subsequent req
+        results, cookie = self.paged_search(expr, page_size=ps,
+                                            extra_ctrls=[del_ctrl])
+        self.assertPagedSearchRaises(expected_error_num, expr,
+                                     cookie, extra_ctrls=[])
+
+        # Initiate normal search then add show-deleted control
+        # in subsequent req
+        results, cookie = self.paged_search(expr, page_size=ps,
+                                            extra_ctrls=[])
+        self.assertPagedSearchRaises(expected_error_num, expr,
+                                     cookie, extra_ctrls=[del_ctrl])
+
+        # Changing order of controls shouldn't break the search
+        results, cookie = self.paged_search(expr, page_size=ps,
+                                            extra_ctrls=[del_ctrl, sort_ctrl])
+        try:
+            results, cookie = self.paged_search(expr, page_size=ps,
+                                                extra_ctrls=[sort_ctrl,
+                                                             del_ctrl])
+        except ldb.LdbError as e:
+            self.fail(e)
+
+    def test_paged_cant_change_controls_data(self):
+        # Some defaults for the rest of the tests
+        expr = "(objectClass=*)"
+        sort_ctrl = "server_sort:1:0:cn"
+        expected_error_num = 12
+
+        # Initiate search with sort control then change it in subsequent req
+        results, cookie = self.paged_search(expr, page_size=3,
+                                            extra_ctrls=[sort_ctrl])
+        changed_sort_ctrl = "server_sort:1:0:roomNumber"
+        self.assertPagedSearchRaises(expected_error_num, expr,
+                                     cookie, extra_ctrls=[changed_sort_ctrl])
+
+        # Initiate search with a control with crit=1, then use crit=0
+        results, cookie = self.paged_search(expr, page_size=3,
+                                            extra_ctrls=[sort_ctrl])
+        changed_sort_ctrl = "server_sort:0:0:cn"
+        self.assertPagedSearchRaises(expected_error_num, expr,
+                                     cookie, extra_ctrls=[changed_sort_ctrl])
+
+    def test_paged_search_referrals(self):
+        expr = "(objectClass=*)"
+        paged_ctrl = "paged_results:1:5"
+        res = self.ldb.search(self.base_dn,
+                              expression=expr,
+                              attrs=['cn'],
+                              scope=ldb.SCOPE_SUBTREE,
+                              controls=[paged_ctrl])
+
+        # Do a paged search walk over the whole database and save a list
+        # of all the referrals returned by each search.
+        referral_lists = []
+
+        while True:
+            referral_lists.append(res.referals)
+
+            ctrls = [str(c) for c in res.controls if
+                     str(c).startswith("paged_results")]
+            self.assertEqual(len(ctrls), 1)
+            spl = ctrls[0].rsplit(':')
+            if len(spl) != 3:
+                break
+
+            cookie = spl[-1]
+            res = self.ldb.search(self.base_dn,
+                                  expression=expr,
+                                  attrs=['cn'],
+                                  scope=ldb.SCOPE_SUBTREE,
+                                  controls=[paged_ctrl + ":" + cookie])
+
+        ref_list = referral_lists[0]
+
+        # Sanity check to make sure the search actually did something
+        self.assertGreater(len(referral_lists), 2)
+
+        # Check the first referral set contains stuff
+        self.assertGreater(len(ref_list), 0)
+
+        # Check the others don't
+        self.assertTrue(all([len(l) == 0 for l in referral_lists[1:]]))
+
+        # Check the entries in the first referral list look like referrals
+        self.assertTrue(all([s.startswith('ldap://') for s in ref_list]))
+
+    def test_paged_change_attrs(self):
+        expr = "(objectClass=*)"
+        attrs = ['cn']
+        expected_error_num = 12
+
+        results, cookie = self.paged_search(expr, page_size=3, attrs=attrs)
+        results, cookie = self.paged_search(expr, cookie=cookie, page_size=3,
+                                            attrs=attrs)
+
+        changed_attrs = attrs + ['roomNumber']
+        self.assertPagedSearchRaises(expected_error_num, expr,
+                                     cookie, attrs=changed_attrs,
+                                     extra_ctrls=[])
+
+    def test_paged_search_lockstep(self):
+        expr = "(objectClass=*)"
+        ps = 3
+
+        all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
+
+        # Run two different but overlapping paged searches simultaneously.
+        set_1_index = int((len(all_results))//3)
+        set_2_index = int((2*len(all_results))//3)
+        set_1 = all_results[set_1_index:]
+        set_2 = all_results[:set_2_index+1]
+        set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
+        set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
+
+        results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
+        self.assertEqual(results, set_1[:ps])
+        results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
+        self.assertEqual(results, set_2[:ps])
+
+        results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
+                                             page_size=ps)
+        self.assertEqual(results, set_1[ps:ps*2])
+        results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
+                                             page_size=ps)
+        self.assertEqual(results, set_2[ps:ps*2])
+
+        results, _ = self.paged_search(set_1_expr, cookie=cookie1,
+                                       page_size=len(self.users))
+        self.assertEqual(results, set_1[ps*2:])
+        results, _ = self.paged_search(set_2_expr, cookie=cookie2,
+                                       page_size=len(self.users))
+        self.assertEqual(results, set_2[ps*2:])
+
+
 if "://" not in host:
     if os.path.isfile(host):
         host = "tdb://%s" % host