test samr: Extra tests for samr_EnumDomainGroups
authorGary Lockyer <gary@catalyst.net.nz>
Thu, 11 Oct 2018 22:21:10 +0000 (11:21 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Tue, 20 Nov 2018 21:14:17 +0000 (22:14 +0100)
Add extra tests to test the content returned by samr_EnumDomainGroups,
and tests for the result caching added in the following commit.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/dcerpc/sam.py
selftest/knownfail.d/samr [new file with mode: 0644]

index f709ba4..11ed9b9 100644 (file)
@@ -32,6 +32,7 @@ from samba.dsdb import (
     GTYPE_SECURITY_UNIVERSAL_GROUP,
     GTYPE_SECURITY_GLOBAL_GROUP)
 from samba import generate_random_password
+from samba.ndr import ndr_unpack
 import os
 
 
@@ -40,6 +41,24 @@ def toArray(handle, array, num_entries):
     return [(entry.idx, entry.name) for entry in array.entries[:num_entries]]
 
 
+# Extract the rid from an ldb message, assumes that the message has a
+# objectSID attribute
+#
+def rid(msg):
+    sid = ndr_unpack(security.dom_sid, msg["objectSID"][0])
+    (_, rid) = sid.split()
+    return rid
+
+
+# Calculate the request size for EnumDomainUsers and EnumDomainGroups calls
+# to hold the specified number of entries.
+# We use the w2k3 element size value of 54, code under test
+# rounds this up i.e. (1+(max_size/SAMR_ENUM_USERS_MULTIPLIER))
+#
+def calc_max_size(num_entries):
+    return (num_entries - 1) * 54
+
+
 class SamrTests(RpcInterfaceTestCase):
 
     def setUp(self):
@@ -72,6 +91,18 @@ class SamrTests(RpcInterfaceTestCase):
         self.domain_handle = self.conn.OpenDomain(
             self.handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
 
+    # Filter a list of records, removing those that are not part of the
+    # current domain.
+    #
+    def filter_domain(self, unfiltered):
+        def sid(msg):
+            sid = ndr_unpack(security.dom_sid, msg["objectSID"][0])
+            (x, _) = sid.split()
+            return x
+
+        dom_sid = security.dom_sid(self.samdb.get_domain_sid())
+        return [x for x in unfiltered if sid(x) == dom_sid]
+
     def test_connect5(self):
         (level, info, handle) =\
             self.conn.Connect5(None, 0, 1, samr.ConnectInfo1())
@@ -435,3 +466,141 @@ class SamrTests(RpcInterfaceTestCase):
             5, check_results, select, attributes, self.create_groups)
 
         self.delete_dns(dns)
+
+    def test_EnumDomainGroups(self):
+        def check_results(expected, actual):
+            for (e, a) in zip(expected, actual):
+                self.assertTrue(isinstance(a, samr.SamEntry))
+                self.assertEquals(
+                    str(e["sAMAccountName"]), str(a.name.string))
+
+        # Create four groups
+        # to ensure that we have the minimum needed for the tests.
+        dns = self.create_groups([1, 2, 3, 4])
+
+        #
+        # Get the expected results by querying the samdb database directly.
+        # We do this rather than use a list of expected results as this runs
+        # with other tests so we do not have a known fixed list of elements
+        select = "(&(|(groupType=%d)(groupType=%d))(objectClass=group))" % (
+            GTYPE_SECURITY_UNIVERSAL_GROUP,
+            GTYPE_SECURITY_GLOBAL_GROUP)
+        attributes = ["sAMAccountName", "objectSID"]
+        unfiltered = self.samdb.search(expression=select, attrs=attributes)
+        filtered = self.filter_domain(unfiltered)
+        self.assertTrue(len(filtered) > 4)
+
+        # Sort the expected results by rid
+        expected = sorted(list(filtered), key=rid)
+
+        #
+        # Perform EnumDomainGroups with max size greater than the expected
+        # number of results. Allow for an extra 10 entries
+        #
+        max_size = calc_max_size(len(expected) + 10)
+        (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups(
+            self.domain_handle, 0, max_size)
+        self.assertEquals(len(expected), num_entries)
+        check_results(expected, actual.entries)
+
+        #
+        # Perform EnumDomainGroups with size set to so that it contains
+        # 4 entries.
+        #
+        max_size = calc_max_size(4)
+        (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups(
+            self.domain_handle, 0, max_size)
+        self.assertEquals(4, num_entries)
+        check_results(expected[:4], actual.entries)
+
+        #
+        # Try calling with resume_handle greater than number of entries
+        # Should return no results and a resume handle of 0
+        max_size = calc_max_size(1)
+        rh = len(expected)
+        self.conn.Close(self.handle)
+        (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+            self.domain_handle, rh, max_size)
+
+        self.assertEquals(0, num_entries)
+        self.assertEquals(0, resume_handle)
+
+        #
+        # Enumerate through the domain groups one element at a time.
+        #
+        max_size = calc_max_size(1)
+        actual = []
+        (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+            self.domain_handle, 0, max_size)
+        while resume_handle:
+            self.assertEquals(1, num_entries)
+            actual.append(a.entries[0])
+            (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+                self.domain_handle, resume_handle, max_size)
+        if num_entries:
+            actual.append(a.entries[0])
+
+        #
+        # Check that the cached results are being returned.
+        # Obtain a new resume_handle and insert new entries into the
+        # into the DB
+        #
+        actual = []
+        max_size = calc_max_size(1)
+        (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+            self.domain_handle, 0, max_size)
+        extra_dns = self.create_groups([1000, 1002, 1003, 1004])
+        while resume_handle:
+            self.assertEquals(1, num_entries)
+            actual.append(a.entries[0])
+            (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+                self.domain_handle, resume_handle, max_size)
+        if num_entries:
+            actual.append(a.entries[0])
+
+        self.assertEquals(len(expected), len(actual))
+        check_results(expected, actual)
+
+        #
+        # Perform EnumDomainGroups, we should read the newly added domains
+        #
+        max_size = calc_max_size(len(expected) + len(extra_dns) + 10)
+        (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups(
+            self.domain_handle, 0, max_size)
+        self.assertEquals(len(expected) + len(extra_dns), num_entries)
+
+        #
+        # Get a new expected result set by querying the database directly
+        unfiltered01 = self.samdb.search(expression=select, attrs=attributes)
+        filtered01 = self.filter_domain(unfiltered01)
+        self.assertTrue(len(filtered01) > len(expected))
+
+        # Sort the expected results by rid
+        expected01 = sorted(list(filtered01), key=rid)
+
+        #
+        # Now check that we read the new entries.
+        #
+        check_results(expected01, actual.entries)
+
+        #
+        # Check that deleted results are handled correctly.
+        # Obtain a new resume_handle and delete entries from the DB.
+        #
+        actual = []
+        max_size = calc_max_size(1)
+        (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+            self.domain_handle, 0, max_size)
+        self.delete_dns(extra_dns)
+        while resume_handle and num_entries:
+            self.assertEquals(1, num_entries)
+            actual.append(a.entries[0])
+            (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+                self.domain_handle, resume_handle, max_size)
+        if num_entries:
+            actual.append(a.entries[0])
+
+        self.assertEquals(len(expected), len(actual))
+        check_results(expected, actual)
+
+        self.delete_dns(dns)
diff --git a/selftest/knownfail.d/samr b/selftest/knownfail.d/samr
new file mode 100644 (file)
index 0000000..23e5e15
--- /dev/null
@@ -0,0 +1,2 @@
+^samba.tests.dcerpc.sam.samba.tests.dcerpc.sam.SamrTests.test_EnumDomainGroups\(ad_dc_ntvfs:local\)
+^samba.tests.dcerpc.sam.python3.samba.tests.dcerpc.sam.SamrTests.test_EnumDomainGroups\(ad_dc_ntvfs:local\)