tests/repl_rodc: Ensure that the machine account is tied to the destination DSA
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / repl_rodc.py
index f5d08d472a6de20638ea1ebe34d55dbce4af7983..535bd9380b342ebfaf06797cd06f7296dd181f06 100644 (file)
@@ -415,6 +415,73 @@ class DrsRodcTestCase(drs_base.DrsBaseTestCase):
         # Check that the user has been added to msDSRevealedUsers
         (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
 
+    def test_msDSRevealedUsers_using_other_RODC(self):
+        """
+        Ensure that the machine account is tied to the destination DSA.
+        """
+        # Create a new identical RODC with just the first letter missing
+        other_rodc_name = self.rodc_name[1:]
+        other_rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
+                                 site=self.site, netbios_name=other_rodc_name,
+                                 targetdir=None, domain=None, machinepass=self.rodc_pass)
+        self._create_rodc(other_rodc_ctx)
+
+        other_rodc_creds = Credentials()
+        other_rodc_creds.guess(other_rodc_ctx.lp)
+        other_rodc_creds.set_username(other_rodc_name+'$')
+        other_rodc_creds.set_password(self.rodc_pass)
+
+        (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
+
+        rand = random.randint(1, 10000000)
+        expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+                                    drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+                                    drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+                                    drsuapi.DRSUAPI_ATTID_unicodePwd,
+                                    drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+        user_name = "test_rodcF_%s" % rand
+        user_dn = "CN=%s,%s" % (user_name, self.ou)
+        self.ldb_dc1.add({
+            "dn": user_dn,
+            "objectclass": "user",
+            "sAMAccountName": user_name
+        })
+
+        # Store some secret on this user
+        self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
+        self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
+                                              [user_name],
+                                              add_members_operation=True)
+
+        req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
+                                  invocation_id=self.ldb_dc1.get_invocation_id(),
+                                  nc_dn_str=user_dn,
+                                  exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                                  partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+                                  max_objects=133,
+                                  replica_flags=0)
+
+        try:
+            (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+            self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+        req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+                                  invocation_id=self.ldb_dc1.get_invocation_id(),
+                                  nc_dn_str=user_dn,
+                                  exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                                  partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+                                  max_objects=133,
+                                  replica_flags=0)
+
+        try:
+            (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
+            self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
     def _assert_in_revealed_users(self, user_dn, attrlist):
         res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
                                   attrs=["msDS-RevealedUsers"])