rodc_creds.guess(self.rodc_ctx.lp)
rodc_creds.set_username(self.rodc_name+'$')
rodc_creds.set_password(self.rodc_pass)
+ self.rodc_creds = rodc_creds
(self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
(self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
except WERRORError as (enum, estr):
self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+ # send the same request again and we should get the same response
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
# Retry with Administrator credentials, ignores password replication groups
(level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
# Check that the user has been added to msDSRevealedUsers
self._assert_in_revealed_users(user_dn, expected_user_attributes)
+ def test_rodc_repl_secrets_follow_on_req(self):
+ """
+ Checks that an RODC can't subvert an existing (valid) GetNCChanges
+ request to reveal secrets it shouldn't have access to.
+ """
+
+ # send an acceptable request that will match as many GUIDs as possible.
+ # Here we set the SPECIAL_SECRET_PROCESSING flag so that the request gets accepted.
+ # (On the server, this builds up the getnc_state->guids array)
+ req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.ldb_dc1.domain_dn(),
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ max_objects=1,
+ replica_flags=drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
+
+ # Get the next replication chunk, but set REPL_SECRET this time. This
+ # is following on the the previous accepted request, but we've changed
+ # exop to now request secrets. This request should fail
+ try:
+ req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.ldb_dc1.domain_dn(),
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
+ req8.highwatermark = ctr.new_highwatermark
+
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
+
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except RuntimeError as (enum, estr):
+ pass
+
def test_msDSRevealedUsers_admin(self):
"""
When a secret attribute is to be replicated to an RODC, the contents
# Check that the user has been added to msDSRevealedUsers
(packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
+ def test_msDSRevealedUsers_using_other_RODC(self):
+ """
+ Ensure that the machine account is tied to the destination DSA.
+ """
+ # Create a new identical RODC with just the first letter missing
+ other_rodc_name = self.rodc_name[1:]
+ other_rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
+ site=self.site, netbios_name=other_rodc_name,
+ targetdir=None, domain=None, machinepass=self.rodc_pass)
+ self._create_rodc(other_rodc_ctx)
+
+ other_rodc_creds = Credentials()
+ other_rodc_creds.guess(other_rodc_ctx.lp)
+ other_rodc_creds.set_username(other_rodc_name+'$')
+ other_rodc_creds.set_password(self.rodc_pass)
+
+ (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
+
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ user_name = "test_rodcF_%s" % rand
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
+ self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
+ [user_name],
+ add_members_operation=True)
+
+ req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ try:
+ (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ def test_msDSRevealedUsers_local_deny_allow(self):
+ """
+ Ensure that the deny trumps allow, and we can modify these
+ attributes directly instead of the global groups.
+
+ This may fail on Windows due to tokenGroup calculation caching.
+ """
+ rand = random.randint(1, 10000000)
+ expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
+ drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_ntPwdHistory,
+ drsuapi.DRSUAPI_ATTID_unicodePwd,
+ drsuapi.DRSUAPI_ATTID_dBCSPwd]
+
+ # Add a user on DC1, add it to allowed password replication
+ # group, and replicate to RODC with EXOP_REPL_SECRETS
+ user_name = "test_rodcF_%s" % rand
+ password = "password12#"
+ user_dn = "CN=%s,%s" % (user_name, self.ou)
+ self.ldb_dc1.add({
+ "dn": user_dn,
+ "objectclass": "user",
+ "sAMAccountName": user_name
+ })
+
+ # Store some secret on this user
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
+
+ req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
+ max_objects=133,
+ replica_flags=0)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+ m["msDS-RevealOnDemandGroup"] = \
+ ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
+ "msDS-RevealOnDemandGroup")
+ self.ldb_dc1.modify(m)
+
+ # In local allow, should be success
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ except:
+ self.fail("Should have succeeded when in local allow group")
+
+ self._assert_in_revealed_users(user_dn, expected_user_attributes)
+
+ (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+ m["msDS-NeverRevealGroup"] = \
+ ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
+ "msDS-NeverRevealGroup")
+ self.ldb_dc1.modify(m)
+
+ # In local allow and deny, should be failure
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
+
+ m["msDS-RevealOnDemandGroup"] = \
+ ldb.MessageElement(user_dn, ldb.FLAG_MOD_DELETE,
+ "msDS-RevealOnDemandGroup")
+ self.ldb_dc1.modify(m)
+
+ # In local deny, should be failure
+ (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
+ try:
+ (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
+ self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+
def _assert_in_revealed_users(self, user_dn, attrlist):
res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
attrs=["msDS-RevealedUsers"])