selftest: Extend further getnc_unpriv tests to pass against windows 2012R2
authorTim Beale <timbeale@catalyst.net.nz>
Wed, 16 Aug 2017 04:57:15 +0000 (16:57 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Tue, 29 Aug 2017 05:23:28 +0000 (07:23 +0200)
An important change in this patch is changing the ACE type from
 A (Allow)
to
 AO (Object Allow)

as that will then respect the supplied GUID, which we also make use
the constant from the security.idl.

This reworks the tests to check replication with users with the
following rights:
- only GET_CHANGES
- only GET_ALL_CHANGES
- both GET_CHANGES and GET_ALL_CHANGES
- no rights

We basically want to test various different GetNCChanges requests
against each type of user rights, and the only difference is the
error/success value we get back. I've structured the tests this way, so
that we have 4 test_repl_xyz_userpriv() functions (to cover each of the
above user rights cases), and each test sends the same series of
GetNCChanges requests of varying validity.

Currently all these tests fail against Samba because Samba sends
different error codes to Windows.

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Signed-off-by: Tim Beale <timbeale@catalyst.net.nz>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
selftest/knownfail.d/getnc_unpriv [new file with mode: 0644]
source4/torture/drs/python/getnc_unpriv.py

diff --git a/selftest/knownfail.d/getnc_unpriv b/selftest/knownfail.d/getnc_unpriv
new file mode 100644 (file)
index 0000000..40977f9
--- /dev/null
@@ -0,0 +1,10 @@
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_repl_no_userpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_repl_getchanges_userpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_repl_getallchanges_userpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_repl_both_userpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\)\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_repl_no_userpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_repl_getchanges_userpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_repl_getallchanges_userpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_repl_both_userpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\)\(promoted_dc\)
index dd6142aa816d1a9fe0aebf4e8951b90b4d6fe55e..41d96110492b666a7f45cff4e6c1b399a78c360b 100644 (file)
@@ -1,7 +1,12 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 #
-# Tests replication scenarios with different user privileges
+# Tests replication scenarios with different user privileges.
+# We want to test every replication scenario we can think of against:
+# - users with only GET_CHANGES privileges
+# - users with only GET_ALL_CHANGES privileges
+# - users with both GET_CHANGES and GET_ALL_CHANGES privileges
+# - users with no privileges
 #
 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
 
 import drs_base
 import samba.tests
+from samba import werror, WERRORError
 
 from samba import sd_utils
 import ldb
 from ldb import SCOPE_BASE
 
-from samba.dcerpc import drsuapi
+from samba.dcerpc import drsuapi, security
 from samba.credentials import DONT_USE_KERBEROS
 
 class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
@@ -55,10 +61,13 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
         (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
 
         self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
-        user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
-        user_sid = self.sd_utils.get_object_sid(user_dn)
-        mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
-        self.sd_utils.dacl_add_ace(self.base_dn, mod)
+        self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
+        user_sid = self.sd_utils.get_object_sid(self.user_dn)
+        self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
+                                                        str(user_sid))
+        self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
+                                                            str(user_sid))
+        self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
 
         # We set DONT_USE_KERBEROS to avoid a race with getting the
         # user replicated to our selected KDC
@@ -70,6 +79,7 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                                               self.user_creds)
 
     def tearDown(self):
+        self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
         try:
             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
         except ldb.LdbError as (enum, string):
@@ -77,35 +87,175 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                 pass
         super(DrsReplicaSyncUnprivTestCase, self).tearDown()
 
-    def test_do_single_repl(self):
+    def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None,
+                        partial_attribute_set=None):
         """
-        Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
-        user with the correct GET_CHANGES rights
+        Common function to send a replication request and check the result
+        matches what's expected.
         """
-
-        ou1 = "OU=single_obj,%s" % self.ou
-        self.ldb_dc1.add({
-            "dn": ou1,
-            "objectclass": "organizationalUnit"
-            })
-        req8 = self._exop_req8(dest_dsa=None,
+        req8 = self._exop_req8(dest_dsa=dest_dsa,
                                invocation_id=self.ldb_dc1.get_invocation_id(),
-                               nc_dn_str=ou1,
-                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
-                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
-        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
-        self._check_ctr6(ctr, [ou1])
+                               nc_dn_str=repl_obj,
+                               exop=exop,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP,
+                               partial_attribute_set=partial_attribute_set)
+
+        if expected_error is None:
+            # user is OK, request should be accepted without throwing an error
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+        else:
+            # check the request is rejected (with the error we're expecting)
+            try:
+                (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                            8, req8)
+                self.fail("Should have failed with user denied access")
+            except WERRORError as (enum, estr):
+                self.assertEquals(enum, expected_error,
+                                  "Got unexpected error: %s" % estr)
 
-    def test_do_full_repl(self):
+    def _test_repl_single_obj(self, repl_obj, expected_error,
+                              partial_attribute_set=None):
         """
-        Make sure that full replication works as a less-privileged
-        user with the correct GET_CHANGES rights
+        Checks that replication on a single object either succeeds or fails as
+        expected (based on the user's access rights)
         """
+        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+                             repl_obj=repl_obj,
+                             expected_error=expected_error,
+                             partial_attribute_set=partial_attribute_set)
+
+    def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None):
+        """
+        Checks that REPL_SECRET on an object either succeeds or fails as
+        expected (based on the user's access rights)
+        """
+        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                             repl_obj=repl_obj,
+                             expected_error=expected_error,
+                             dest_dsa=dest_dsa)
+
+    def _test_repl_full(self, expected_error, partial_attribute_set=None):
+        """
+        Checks that a full replication either succeeds or fails as expected
+        (based on the user's access rights)
+        """
+        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
+                             repl_obj=self.ldb_dc1.get_default_basedn(),
+                             expected_error=expected_error,
+                             partial_attribute_set=partial_attribute_set)
+
+    def _test_repl_full_on_ou(self, expected_error):
+        """
+        Full replication on a specific OU should always fail (it should be done
+        against a base NC). The error may vary based on the user's access rights
+        """
+        # Just try against the OU created in the test setup
+        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
+                             repl_obj=self.ou,
+                             expected_error=expected_error)
+
+    def test_repl_getchanges_userpriv(self):
+        """
+        Tests various replication requests made by a user with only GET_CHANGES
+        rights. Some requests will be accepted, but most will be rejected.
+        """
+
+        # Assign the user GET_CHANGES rights
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes)
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+
+        self._test_repl_secret(repl_obj=self.ou,
+                               expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+
+        self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+        self._test_repl_full_on_ou(expected_error=werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
+        # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
+        # expect the following to succeed
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=None,
+                                   partial_attribute_set=self.get_partial_attribute_set())
+        self._test_repl_full(expected_error=None,
+                             partial_attribute_set=self.get_partial_attribute_set())
+
+    def test_repl_getallchanges_userpriv(self):
+        """
+        Tests various replication requests made by a user with only
+        GET_ALL_CHANGES rights. Note that assigning these rights is possible,
+        but doesn't make a lot of sense. We test it anyway for consistency.
+        """
+
+        # Assign the user GET_ALL_CHANGES rights
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes)
+
+        # We can expect to get the same responses as an unprivileged user,
+        # i.e. we have permission to see the results, but don't have permission
+        # to ask
+        self.test_repl_no_userpriv()
+
+    def test_repl_both_userpriv(self):
+        """
+        Tests various replication requests made by a privileged user (i.e. has
+        both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
+        to be accepted.
+        """
+
+        # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
+        both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes
+        self.sd_utils.dacl_add_ace(self.base_dn, both_rights)
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=None)
+
+        self._test_repl_secret(repl_obj=self.ou,
+                               expected_error=werror.WERR_DS_DRA_DB_ERROR)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               expected_error=werror.WERR_DS_DRA_DB_ERROR)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               expected_error=None)
+
+        self._test_repl_full(expected_error=None)
+        self._test_repl_full_on_ou(expected_error=werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=None,
+                                   partial_attribute_set=self.get_partial_attribute_set())
+        self._test_repl_full(expected_error=None,
+                             partial_attribute_set=self.get_partial_attribute_set())
+
+    def test_repl_no_userpriv(self):
+        """
+        Tests various replication requests made by a unprivileged user.
+        We expect all these requests to be rejected.
+        """
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=werror.WERR_DS_DRA_BAD_DN)
+
+        self._test_repl_secret(repl_obj=self.ou,
+                               expected_error=werror.WERR_DS_DRA_BAD_DN)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               expected_error=werror.WERR_DS_DRA_BAD_DN)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               expected_error=werror.WERR_DS_DRA_BAD_DN)
+
+        self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+        self._test_repl_full_on_ou(expected_error=werror.WERR_DS_DRA_BAD_DN)
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=werror.WERR_DS_DRA_BAD_DN,
+                                   partial_attribute_set=self.get_partial_attribute_set())
+        self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED,
+                             partial_attribute_set=self.get_partial_attribute_set())
+
 
-        req8 = self._exop_req8(dest_dsa=None,
-                               invocation_id=self.ldb_dc1.get_invocation_id(),
-                               nc_dn_str=self.ldb_dc1.get_default_basedn(),
-                               exop=drsuapi.DRSUAPI_EXOP_NONE,
-                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
-        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
-        self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)