# Check that the user has been added to msDSRevealedUsers
(packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+ def test_msDSRevealedUsers_using_other_RODC(self):
+ """
+ Ensure that the machine account is tied to the destination DSA.
+ """
+ # Create a new identical RODC with just the first letter missing
+ other_rodc_name = self.rodc_name[1:]
+ other_rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
+ site=self.site, netbios_name=other_rodc_name,
+ targetdir=None, domain=None, machinepass=self.rodc_pass)
+ self._create_rodc(other_rodc_ctx)
+
+ other_rodc_creds = Credentials()
+ other_rodc_creds.guess(other_rodc_ctx.lp)
+ other_rodc_creds.set_username(other_rodc_name+'$')
+ other_rodc_creds.set_password(self.rodc_pass)
+
+ (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
+
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ user_name = "test_rodcF_%s" % rand
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
+ self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
+ [user_name],
+ add_members_operation=True)
+
+ req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ try:
+ (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
def _assert_in_revealed_users(self, user_dn, attrlist):
res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
attrs=["msDS-RevealedUsers"])