selftest: Add sanity-check RODC can't use cache to reveal secrets
[bbaumbach/samba-autobuild/.git] / source4 / torture / drs / python / repl_rodc.py
index f5d08d472a6de20638ea1ebe34d55dbce4af7983..57679ee00badc933f666ea31afaf8c3b49b33135 100644 (file)
@@ -118,6 +118,7 @@ class DrsRodcTestCase(drs_base.DrsBaseTestCase):
         rodc_creds.guess(self.rodc_ctx.lp)
         rodc_creds.set_username(self.rodc_name+'$')
         rodc_creds.set_password(self.rodc_pass)
+        self.rodc_creds = rodc_creds
 
         (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
         (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
@@ -201,12 +202,52 @@ class DrsRodcTestCase(drs_base.DrsBaseTestCase):
         except WERRORError as (enum, estr):
             self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
 
+        # send the same request again and we should get the same response
+        try:
+            (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+            self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
         # Retry with Administrator credentials, ignores password replication groups
         (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
 
         # Check that the user has been added to msDSRevealedUsers
         self._assert_in_revealed_users(user_dn, expected_user_attributes)
 
+    def test_rodc_repl_secrets_follow_on_req(self):
+        """
+        Checks that an RODC can't subvert an existing (valid) GetNCChanges
+        request to reveal secrets it shouldn't have access to.
+        """
+
+        # send an acceptable request that will match as many GUIDs as possible.
+        # Here we set the SPECIAL_SECRET_PROCESSING flag so that the request gets accepted.
+        # (On the server, this builds up the getnc_state->guids array)
+        req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.ldb_dc1.domain_dn(),
+                               exop=drsuapi.DRSUAPI_EXOP_NONE,
+                               max_objects=1,
+                               replica_flags=drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+        (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
+
+        # Get the next replication chunk, but set REPL_SECRET this time. This
+        # is following on the the previous accepted request, but we've changed
+        # exop to now request secrets. This request should fail
+        try:
+            req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
+                                   invocation_id=self.ldb_dc1.get_invocation_id(),
+                                   nc_dn_str=self.ldb_dc1.domain_dn(),
+                                   exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
+            req8.highwatermark = ctr.new_highwatermark
+
+            (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
+
+            self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+        except RuntimeError as (enum, estr):
+            pass
+
     def test_msDSRevealedUsers_admin(self):
         """
         When a secret attribute is to be replicated to an RODC, the contents
@@ -415,6 +456,158 @@ class DrsRodcTestCase(drs_base.DrsBaseTestCase):
         # Check that the user has been added to msDSRevealedUsers
         (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
 
+    def test_msDSRevealedUsers_using_other_RODC(self):
+        """
+        Ensure that the machine account is tied to the destination DSA.
+        """
+        # Create a new identical RODC with just the first letter missing
+        other_rodc_name = self.rodc_name[1:]
+        other_rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
+                                 site=self.site, netbios_name=other_rodc_name,
+                                 targetdir=None, domain=None, machinepass=self.rodc_pass)
+        self._create_rodc(other_rodc_ctx)
+
+        other_rodc_creds = Credentials()
+        other_rodc_creds.guess(other_rodc_ctx.lp)
+        other_rodc_creds.set_username(other_rodc_name+'$')
+        other_rodc_creds.set_password(self.rodc_pass)
+
+        (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
+
+        rand = random.randint(1, 10000000)
+        expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+                                    drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+                                    drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+                                    drsuapi.DRSUAPI_ATTID_unicodePwd,
+                                    drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+        user_name = "test_rodcF_%s" % rand
+        user_dn = "CN=%s,%s" % (user_name, self.ou)
+        self.ldb_dc1.add({
+            "dn": user_dn,
+            "objectclass": "user",
+            "sAMAccountName": user_name
+        })
+
+        # Store some secret on this user
+        self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
+        self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
+                                              [user_name],
+                                              add_members_operation=True)
+
+        req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
+                                  invocation_id=self.ldb_dc1.get_invocation_id(),
+                                  nc_dn_str=user_dn,
+                                  exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                                  partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+                                  max_objects=133,
+                                  replica_flags=0)
+
+        try:
+            (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+            self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+        req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+                                  invocation_id=self.ldb_dc1.get_invocation_id(),
+                                  nc_dn_str=user_dn,
+                                  exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                                  partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+                                  max_objects=133,
+                                  replica_flags=0)
+
+        try:
+            (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
+            self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+    def test_msDSRevealedUsers_local_deny_allow(self):
+        """
+        Ensure that the deny trumps allow, and we can modify these
+        attributes directly instead of the global groups.
+
+        This may fail on Windows due to tokenGroup calculation caching.
+        """
+        rand = random.randint(1, 10000000)
+        expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+                                    drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+                                    drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+                                    drsuapi.DRSUAPI_ATTID_unicodePwd,
+                                    drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+        # Add a user on DC1, add it to allowed password replication
+        # group, and replicate to RODC with EXOP_REPL_SECRETS
+        user_name = "test_rodcF_%s" % rand
+        password = "password12#"
+        user_dn = "CN=%s,%s" % (user_name, self.ou)
+        self.ldb_dc1.add({
+            "dn": user_dn,
+            "objectclass": "user",
+            "sAMAccountName": user_name
+        })
+
+        # Store some secret on this user
+        self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
+
+        req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+                                  invocation_id=self.ldb_dc1.get_invocation_id(),
+                                  nc_dn_str=user_dn,
+                                  exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                                  partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+                                  max_objects=133,
+                                  replica_flags=0)
+
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+        m["msDS-RevealOnDemandGroup"] = \
+            ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
+                               "msDS-RevealOnDemandGroup")
+        self.ldb_dc1.modify(m)
+
+        # In local allow, should be success
+        try:
+            (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+        except:
+            self.fail("Should have succeeded when in local allow group")
+
+        self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+        (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
+
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+        m["msDS-NeverRevealGroup"] = \
+            ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
+                               "msDS-NeverRevealGroup")
+        self.ldb_dc1.modify(m)
+
+        # In local allow and deny, should be failure
+        try:
+            (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+            self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+        m["msDS-RevealOnDemandGroup"] = \
+            ldb.MessageElement(user_dn, ldb.FLAG_MOD_DELETE,
+                               "msDS-RevealOnDemandGroup")
+        self.ldb_dc1.modify(m)
+
+        # In local deny, should be failure
+        (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
+        try:
+            (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+            self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
     def _assert_in_revealed_users(self, user_dn, attrlist):
         res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
                                   attrs=["msDS-RevealedUsers"])