selftest: Extend further getnc_unpriv tests to pass against windows 2012R2
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / getnc_unpriv.py
index dd6142aa816d1a9fe0aebf4e8951b90b4d6fe55e..41d96110492b666a7f45cff4e6c1b399a78c360b 100644 (file)
@@ -1,7 +1,12 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 #
-# Tests replication scenarios with different user privileges
+# Tests replication scenarios with different user privileges.
+# We want to test every replication scenario we can think of against:
+# - users with only GET_CHANGES privileges
+# - users with only GET_ALL_CHANGES privileges
+# - users with both GET_CHANGES and GET_ALL_CHANGES privileges
+# - users with no privileges
 #
 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
 
 import drs_base
 import samba.tests
+from samba import werror, WERRORError
 
 from samba import sd_utils
 import ldb
 from ldb import SCOPE_BASE
 
-from samba.dcerpc import drsuapi
+from samba.dcerpc import drsuapi, security
 from samba.credentials import DONT_USE_KERBEROS
 
 class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
@@ -55,10 +61,13 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
         (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
 
         self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
-        user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
-        user_sid = self.sd_utils.get_object_sid(user_dn)
-        mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
-        self.sd_utils.dacl_add_ace(self.base_dn, mod)
+        self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
+        user_sid = self.sd_utils.get_object_sid(self.user_dn)
+        self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
+                                                        str(user_sid))
+        self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
+                                                            str(user_sid))
+        self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
 
         # We set DONT_USE_KERBEROS to avoid a race with getting the
         # user replicated to our selected KDC
@@ -70,6 +79,7 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                                               self.user_creds)
 
     def tearDown(self):
+        self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
         try:
             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
         except ldb.LdbError as (enum, string):
@@ -77,35 +87,175 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                 pass
         super(DrsReplicaSyncUnprivTestCase, self).tearDown()
 
-    def test_do_single_repl(self):
+    def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None,
+                        partial_attribute_set=None):
         """
-        Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
-        user with the correct GET_CHANGES rights
+        Common function to send a replication request and check the result
+        matches what's expected.
         """
-
-        ou1 = "OU=single_obj,%s" % self.ou
-        self.ldb_dc1.add({
-            "dn": ou1,
-            "objectclass": "organizationalUnit"
-            })
-        req8 = self._exop_req8(dest_dsa=None,
+        req8 = self._exop_req8(dest_dsa=dest_dsa,
                                invocation_id=self.ldb_dc1.get_invocation_id(),
-                               nc_dn_str=ou1,
-                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
-                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
-        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
-        self._check_ctr6(ctr, [ou1])
+                               nc_dn_str=repl_obj,
+                               exop=exop,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP,
+                               partial_attribute_set=partial_attribute_set)
+
+        if expected_error is None:
+            # user is OK, request should be accepted without throwing an error
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+        else:
+            # check the request is rejected (with the error we're expecting)
+            try:
+                (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                            8, req8)
+                self.fail("Should have failed with user denied access")
+            except WERRORError as (enum, estr):
+                self.assertEquals(enum, expected_error,
+                                  "Got unexpected error: %s" % estr)
 
-    def test_do_full_repl(self):
+    def _test_repl_single_obj(self, repl_obj, expected_error,
+                              partial_attribute_set=None):
         """
-        Make sure that full replication works as a less-privileged
-        user with the correct GET_CHANGES rights
+        Checks that replication on a single object either succeeds or fails as
+        expected (based on the user's access rights)
         """
+        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+                             repl_obj=repl_obj,
+                             expected_error=expected_error,
+                             partial_attribute_set=partial_attribute_set)
+
+    def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None):
+        """
+        Checks that REPL_SECRET on an object either succeeds or fails as
+        expected (based on the user's access rights)
+        """
+        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                             repl_obj=repl_obj,
+                             expected_error=expected_error,
+                             dest_dsa=dest_dsa)
+
+    def _test_repl_full(self, expected_error, partial_attribute_set=None):
+        """
+        Checks that a full replication either succeeds or fails as expected
+        (based on the user's access rights)
+        """
+        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
+                             repl_obj=self.ldb_dc1.get_default_basedn(),
+                             expected_error=expected_error,
+                             partial_attribute_set=partial_attribute_set)
+
+    def _test_repl_full_on_ou(self, expected_error):
+        """
+        Full replication on a specific OU should always fail (it should be done
+        against a base NC). The error may vary based on the user's access rights
+        """
+        # Just try against the OU created in the test setup
+        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
+                             repl_obj=self.ou,
+                             expected_error=expected_error)
+
+    def test_repl_getchanges_userpriv(self):
+        """
+        Tests various replication requests made by a user with only GET_CHANGES
+        rights. Some requests will be accepted, but most will be rejected.
+        """
+
+        # Assign the user GET_CHANGES rights
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes)
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+
+        self._test_repl_secret(repl_obj=self.ou,
+                               expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+
+        self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+        self._test_repl_full_on_ou(expected_error=werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
+        # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
+        # expect the following to succeed
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=None,
+                                   partial_attribute_set=self.get_partial_attribute_set())
+        self._test_repl_full(expected_error=None,
+                             partial_attribute_set=self.get_partial_attribute_set())
+
+    def test_repl_getallchanges_userpriv(self):
+        """
+        Tests various replication requests made by a user with only
+        GET_ALL_CHANGES rights. Note that assigning these rights is possible,
+        but doesn't make a lot of sense. We test it anyway for consistency.
+        """
+
+        # Assign the user GET_ALL_CHANGES rights
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes)
+
+        # We can expect to get the same responses as an unprivileged user,
+        # i.e. we have permission to see the results, but don't have permission
+        # to ask
+        self.test_repl_no_userpriv()
+
+    def test_repl_both_userpriv(self):
+        """
+        Tests various replication requests made by a privileged user (i.e. has
+        both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
+        to be accepted.
+        """
+
+        # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
+        both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes
+        self.sd_utils.dacl_add_ace(self.base_dn, both_rights)
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=None)
+
+        self._test_repl_secret(repl_obj=self.ou,
+                               expected_error=werror.WERR_DS_DRA_DB_ERROR)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               expected_error=werror.WERR_DS_DRA_DB_ERROR)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               expected_error=None)
+
+        self._test_repl_full(expected_error=None)
+        self._test_repl_full_on_ou(expected_error=werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=None,
+                                   partial_attribute_set=self.get_partial_attribute_set())
+        self._test_repl_full(expected_error=None,
+                             partial_attribute_set=self.get_partial_attribute_set())
+
+    def test_repl_no_userpriv(self):
+        """
+        Tests various replication requests made by a unprivileged user.
+        We expect all these requests to be rejected.
+        """
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=werror.WERR_DS_DRA_BAD_DN)
+
+        self._test_repl_secret(repl_obj=self.ou,
+                               expected_error=werror.WERR_DS_DRA_BAD_DN)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               expected_error=werror.WERR_DS_DRA_BAD_DN)
+        self._test_repl_secret(repl_obj=self.user_dn,
+                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               expected_error=werror.WERR_DS_DRA_BAD_DN)
+
+        self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED)
+        self._test_repl_full_on_ou(expected_error=werror.WERR_DS_DRA_BAD_DN)
+
+        self._test_repl_single_obj(repl_obj=self.ou,
+                                   expected_error=werror.WERR_DS_DRA_BAD_DN,
+                                   partial_attribute_set=self.get_partial_attribute_set())
+        self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED,
+                             partial_attribute_set=self.get_partial_attribute_set())
+
 
-        req8 = self._exop_req8(dest_dsa=None,
-                               invocation_id=self.ldb_dc1.get_invocation_id(),
-                               nc_dn_str=self.ldb_dc1.get_default_basedn(),
-                               exop=drsuapi.DRSUAPI_EXOP_NONE,
-                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
-        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
-        self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)