2 # -*- coding: utf-8 -*-
4 # Test replication scenarios involving an RODC
6 # Copyright (C) Catalyst.Net Ltd. 2017
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc1_dns_name [this is unused for the test, but it'll still try to connect]
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_rodc -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
33 from ldb import SCOPE_BASE
35 from samba import WERRORError
36 from samba.join import dc_join
37 from samba.dcerpc import drsuapi, misc, drsblobs, security
38 from samba.drs_utils import drs_DsBind, drs_Replicate
39 from samba.ndr import ndr_unpack, ndr_pack
40 from samba.common import dsdb_Dn
41 from samba.credentials import Credentials
46 def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=[]):
47 '''get a list of attributes for RODC replication'''
48 partial_attribute_set = drsuapi.DsPartialAttributeSet()
49 partial_attribute_set.version = 1
53 # the exact list of attids we send is quite critical. Note that
54 # we do ask for the secret attributes, but set SPECIAL_SECRET_PROCESSING
56 schema_dn = samdb.get_schema_basedn()
57 res = samdb.search(base=schema_dn, scope=ldb.SCOPE_SUBTREE,
58 expression="objectClass=attributeSchema",
59 attrs=["lDAPDisplayName", "systemFlags",
63 ldap_display_name = r["lDAPDisplayName"][0]
64 if "systemFlags" in r:
65 system_flags = r["systemFlags"][0]
66 if (int(system_flags) & (samba.dsdb.DS_FLAG_ATTR_NOT_REPLICATED |
67 samba.dsdb.DS_FLAG_ATTR_IS_CONSTRUCTED)):
69 if "searchFlags" in r:
70 search_flags = r["searchFlags"][0]
71 if (int(search_flags) & samba.dsdb.SEARCH_FLAG_RODC_ATTRIBUTE):
74 attid = samdb1.get_attid_from_lDAPDisplayName(ldap_display_name)
75 if not attid in exceptions:
76 attids.append(int(attid))
80 # the attids do need to be sorted, or windows doesn't return
81 # all the attributes we need
83 partial_attribute_set.attids = attids
84 partial_attribute_set.num_attids = len(attids)
85 return partial_attribute_set
87 class DrsRodcTestCase(drs_base.DrsBaseTestCase):
88 """Intended as a semi-black box test case for replication involving
92 super(DrsRodcTestCase, self).setUp()
93 self.base_dn = self.ldb_dc1.get_default_basedn()
95 rand = random.randint(1, 10000000)
97 self.ou = "OU=test_drs_rodc%s,%s" % (rand, self.base_dn)
100 "objectclass": "organizationalUnit"
102 self.allowed_group = "CN=Allowed RODC Password Replication Group,CN=Users,%s" % self.base_dn
104 self.site = self.ldb_dc1.server_site_name()
105 self.rodc_name = "TESTRODCDRS%s" % rand
106 self.rodc_pass = "password12#"
107 self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
110 self.rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
111 site=self.site, netbios_name=self.rodc_name,
112 targetdir=None, domain=None, machinepass=self.rodc_pass)
113 self._create_rodc(self.rodc_ctx)
114 self.rodc_ctx.create_tmp_samdb()
115 self.tmp_samdb = self.rodc_ctx.tmp_samdb
117 rodc_creds = Credentials()
118 rodc_creds.guess(self.rodc_ctx.lp)
119 rodc_creds.set_username(self.rodc_name+'$')
120 rodc_creds.set_password(self.rodc_pass)
122 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
123 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
126 self.rodc_ctx.cleanup_old_join()
127 super(DrsRodcTestCase, self).tearDown()
129 def test_admin_repl_secrets(self):
131 When a secret attribute is set to be replicated to an RODC with the
132 admin credentials, it should always replicate regardless of whether
133 or not it's in the Allowed RODC Password Replication Group.
135 rand = random.randint(1, 10000000)
136 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
137 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
138 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
139 drsuapi.DRSUAPI_ATTID_unicodePwd,
140 drsuapi.DRSUAPI_ATTID_dBCSPwd]
142 user_name = "test_rodcA_%s" % rand
143 user_dn = "CN=%s,%s" % (user_name, self.ou)
146 "objectclass": "user",
147 "sAMAccountName": user_name
150 # Store some secret on this user
151 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
153 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
154 invocation_id=self.ldb_dc1.get_invocation_id(),
156 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
157 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
160 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
162 # Check that the user has been added to msDSRevealedUsers
163 self._assert_in_revealed_users(user_dn, expected_user_attributes)
165 def test_rodc_repl_secrets(self):
167 When a secret attribute is set to be replicated to an RODC with
168 the RODC account credentials, it should not replicate if it's in
169 the Allowed RODC Password Replication Group. Once it is added to
170 the group, it should replicate.
172 rand = random.randint(1, 10000000)
173 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
174 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
175 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
176 drsuapi.DRSUAPI_ATTID_unicodePwd,
177 drsuapi.DRSUAPI_ATTID_dBCSPwd]
179 user_name = "test_rodcB_%s" % rand
180 user_dn = "CN=%s,%s" % (user_name, self.ou)
183 "objectclass": "user",
184 "sAMAccountName": user_name
187 # Store some secret on this user
188 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
190 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
191 invocation_id=self.ldb_dc1.get_invocation_id(),
193 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
194 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
199 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
200 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
201 except WERRORError as (enum, estr):
202 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
204 # Retry with Administrator credentials, ignores password replication groups
205 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
207 # Check that the user has been added to msDSRevealedUsers
208 self._assert_in_revealed_users(user_dn, expected_user_attributes)
210 def test_msDSRevealedUsers_admin(self):
212 When a secret attribute is to be replicated to an RODC, the contents
213 of the attribute should be added to the msDSRevealedUsers attribute
214 of the computer object corresponding to the RODC.
217 rand = random.randint(1, 10000000)
218 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
219 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
220 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
221 drsuapi.DRSUAPI_ATTID_unicodePwd,
222 drsuapi.DRSUAPI_ATTID_dBCSPwd]
224 # Add a user on DC1, add it to allowed password replication
225 # group, and replicate to RODC with EXOP_REPL_SECRETS
226 user_name = "test_rodcC_%s" % rand
227 password = "password12#"
228 user_dn = "CN=%s,%s" % (user_name, self.ou)
231 "objectclass": "user",
232 "sAMAccountName": user_name
235 # Store some secret on this user
236 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
238 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
240 add_members_operation=True)
242 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
243 invocation_id=self.ldb_dc1.get_invocation_id(),
245 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
246 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
249 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
251 # Check that the user has been added to msDSRevealedUsers
252 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
254 # Change the user's password on DC1
255 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
257 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
258 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
260 # Replicate to RODC again with EXOP_REPL_SECRETS
261 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
262 invocation_id=self.ldb_dc1.get_invocation_id(),
264 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
265 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
268 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
270 # This is important for Windows, because the entry won't have been
271 # updated in time if we don't have it. Even with this sleep, it only
272 # passes some of the time...
275 # Check that the entry in msDSRevealedUsers has been updated
276 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
277 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
279 # We should be able to delete the user
280 self.ldb_dc1.deleteuser(user_name)
282 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
283 attrs=["msDS-RevealedUsers"])
284 self.assertFalse("msDS-RevealedUsers" in res[0])
286 def test_msDSRevealedUsers(self):
288 When a secret attribute is to be replicated to an RODC, the contents
289 of the attribute should be added to the msDSRevealedUsers attribute
290 of the computer object corresponding to the RODC.
293 rand = random.randint(1, 10000000)
294 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
295 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
296 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
297 drsuapi.DRSUAPI_ATTID_unicodePwd,
298 drsuapi.DRSUAPI_ATTID_dBCSPwd]
300 # Add a user on DC1, add it to allowed password replication
301 # group, and replicate to RODC with EXOP_REPL_SECRETS
302 user_name = "test_rodcD_%s" % rand
303 password = "password12#"
304 user_dn = "CN=%s,%s" % (user_name, self.ou)
307 "objectclass": "user",
308 "sAMAccountName": user_name
311 # Store some secret on this user
312 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
314 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
316 add_members_operation=True)
318 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
319 invocation_id=self.ldb_dc1.get_invocation_id(),
321 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
322 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
325 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
327 # Check that the user has been added to msDSRevealedUsers
328 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
330 # Change the user's password on DC1
331 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
333 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
334 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
336 # Replicate to RODC again with EXOP_REPL_SECRETS
337 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
338 invocation_id=self.ldb_dc1.get_invocation_id(),
340 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
341 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
344 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
346 # This is important for Windows, because the entry won't have been
347 # updated in time if we don't have it. Even with this sleep, it only
348 # passes some of the time...
351 # Check that the entry in msDSRevealedUsers has been updated
352 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
353 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
355 # We should be able to delete the user
356 self.ldb_dc1.deleteuser(user_name)
358 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
359 attrs=["msDS-RevealedUsers"])
360 self.assertFalse("msDS-RevealedUsers" in res[0])
362 def test_msDSRevealedUsers_pas(self):
364 If we provide a Partial Attribute Set when replicating to an RODC,
365 we should ignore it and replicate all of the secret attributes anyway
366 msDSRevealedUsers attribute.
368 rand = random.randint(1, 10000000)
369 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
370 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
371 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
372 drsuapi.DRSUAPI_ATTID_unicodePwd,
373 drsuapi.DRSUAPI_ATTID_dBCSPwd]
374 pas_exceptions = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
375 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
376 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
377 drsuapi.DRSUAPI_ATTID_dBCSPwd]
379 # Add a user on DC1, add it to allowed password replication
380 # group, and replicate to RODC with EXOP_REPL_SECRETS
381 user_name = "test_rodcE_%s" % rand
382 password = "password12#"
383 user_dn = "CN=%s,%s" % (user_name, self.ou)
386 "objectclass": "user",
387 "sAMAccountName": user_name
390 # Store some secret on this user
391 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
393 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
395 add_members_operation=True)
397 pas = drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb, exceptions=pas_exceptions)
398 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
399 invocation_id=self.ldb_dc1.get_invocation_id(),
401 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
402 partial_attribute_set=pas,
405 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
407 # Make sure that we still replicate the secrets
408 for attribute in ctr.first_object.object.attribute_ctr.attributes:
409 if attribute.attid in pas_exceptions:
410 pas_exceptions.remove(attribute.attid)
411 for attribute in pas_exceptions:
412 self.fail("%d was not replicated even though the partial attribute set should be ignored."
415 # Check that the user has been added to msDSRevealedUsers
416 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
418 def _assert_in_revealed_users(self, user_dn, attrlist):
419 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
420 attrs=["msDS-RevealedUsers"])
421 revealed_users = res[0]["msDS-RevealedUsers"]
425 for attribute in revealed_users:
426 dsdb_dn = dsdb_Dn(self.ldb_dc1, attribute)
427 metadata = ndr_unpack(drsblobs.replPropertyMetaData1, dsdb_dn.get_bytes())
428 if user_dn in attribute:
429 unpacked_attrs.append(metadata)
430 packed_attrs.append(dsdb_dn.get_bytes())
431 actual_attrids.append(metadata.attid)
433 self.assertEquals(sorted(actual_attrids), sorted(attrlist))
435 return (packed_attrs, unpacked_attrs)
437 def _assert_attrlist_equals(self, list_1, list_2):
438 return self._assert_attrlist_changed(list_1, list_2, [], num_changes=0, expected_new_usn=False)
440 def _assert_attrlist_changed(self, list_1, list_2, changed_attributes, num_changes=1, expected_new_usn=True):
441 for i in range(len(list_2)):
442 self.assertEquals(list_1[i].attid, list_2[i].attid)
443 self.assertEquals(list_1[i].originating_invocation_id, list_2[i].originating_invocation_id)
444 self.assertEquals(list_1[i].version + num_changes, list_2[i].version)
447 self.assertTrue(list_1[i].originating_usn < list_2[i].originating_usn)
448 self.assertTrue(list_1[i].local_usn < list_2[i].local_usn)
450 self.assertEquals(list_1[i].originating_usn, list_2[i].originating_usn)
451 self.assertEquals(list_1[i].local_usn, list_2[i].local_usn)
453 if list_1[i].attid in changed_attributes:
454 # We do the changes too quickly, so unless we put sleeps
455 # inbetween calls, these remain the same. Checking the USNs
458 #self.assertTrue(list_1[i].originating_change_time < list_2[i].originating_change_time)
460 self.assertEquals(list_1[i].originating_change_time, list_2[i].originating_change_time)
463 def _create_rodc(self, ctx):
464 ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
465 ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
466 ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
468 ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
469 "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
470 "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
471 "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
472 "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
473 ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
475 mysid = ctx.get_mysid()
476 admin_dn = "<SID=%s>" % mysid
477 ctx.managedby = admin_dn
479 ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
480 samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
481 samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
483 ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
484 ctx.secure_channel_type = misc.SEC_CHAN_RODC
486 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
487 drsuapi.DRSUAPI_DRS_PER_SYNC |
488 drsuapi.DRSUAPI_DRS_GET_ANC |
489 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
490 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
492 ctx.join_add_objects()