2 # -*- coding: utf-8 -*-
4 # Test replication scenarios involving an RODC
6 # Copyright (C) Catalyst.Net Ltd. 2017
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc1_dns_name [this is unused for the test, but it'll still try to connect]
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_rodc -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
33 from ldb import SCOPE_BASE
35 from samba import WERRORError
36 from samba.join import dc_join
37 from samba.dcerpc import drsuapi, misc, drsblobs, security
38 from samba.drs_utils import drs_DsBind, drs_Replicate
39 from samba.ndr import ndr_unpack, ndr_pack
40 from samba.common import dsdb_Dn
41 from samba.credentials import Credentials
46 def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=[]):
47 '''get a list of attributes for RODC replication'''
48 partial_attribute_set = drsuapi.DsPartialAttributeSet()
49 partial_attribute_set.version = 1
53 # the exact list of attids we send is quite critical. Note that
54 # we do ask for the secret attributes, but set SPECIAL_SECRET_PROCESSING
56 schema_dn = samdb.get_schema_basedn()
57 res = samdb.search(base=schema_dn, scope=ldb.SCOPE_SUBTREE,
58 expression="objectClass=attributeSchema",
59 attrs=["lDAPDisplayName", "systemFlags",
63 ldap_display_name = r["lDAPDisplayName"][0]
64 if "systemFlags" in r:
65 system_flags = r["systemFlags"][0]
66 if (int(system_flags) & (samba.dsdb.DS_FLAG_ATTR_NOT_REPLICATED |
67 samba.dsdb.DS_FLAG_ATTR_IS_CONSTRUCTED)):
69 if "searchFlags" in r:
70 search_flags = r["searchFlags"][0]
71 if (int(search_flags) & samba.dsdb.SEARCH_FLAG_RODC_ATTRIBUTE):
74 attid = samdb1.get_attid_from_lDAPDisplayName(ldap_display_name)
75 if not attid in exceptions:
76 attids.append(int(attid))
80 # the attids do need to be sorted, or windows doesn't return
81 # all the attributes we need
83 partial_attribute_set.attids = attids
84 partial_attribute_set.num_attids = len(attids)
85 return partial_attribute_set
87 class DrsRodcTestCase(drs_base.DrsBaseTestCase):
88 """Intended as a semi-black box test case for replication involving
92 super(DrsRodcTestCase, self).setUp()
93 self.base_dn = self.ldb_dc1.get_default_basedn()
95 self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_drs_rodc")
96 self.allowed_group = "CN=Allowed RODC Password Replication Group,CN=Users,%s" % self.base_dn
98 self.site = self.ldb_dc1.server_site_name()
99 self.rodc_name = "TESTRODCDRS%s" % random.randint(1, 10000000)
100 self.rodc_pass = "password12#"
101 self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
104 self.rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
105 site=self.site, netbios_name=self.rodc_name,
106 targetdir=None, domain=None, machinepass=self.rodc_pass)
107 self._create_rodc(self.rodc_ctx)
108 self.rodc_ctx.create_tmp_samdb()
109 self.tmp_samdb = self.rodc_ctx.tmp_samdb
111 rodc_creds = Credentials()
112 rodc_creds.guess(self.rodc_ctx.lp)
113 rodc_creds.set_username(self.rodc_name+'$')
114 rodc_creds.set_password(self.rodc_pass)
115 self.rodc_creds = rodc_creds
117 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
118 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
121 self.rodc_ctx.cleanup_old_join()
122 super(DrsRodcTestCase, self).tearDown()
124 def test_admin_repl_secrets(self):
126 When a secret attribute is set to be replicated to an RODC with the
127 admin credentials, it should always replicate regardless of whether
128 or not it's in the Allowed RODC Password Replication Group.
130 rand = random.randint(1, 10000000)
131 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
132 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
133 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
134 drsuapi.DRSUAPI_ATTID_unicodePwd,
135 drsuapi.DRSUAPI_ATTID_dBCSPwd]
137 user_name = "test_rodcA_%s" % rand
138 user_dn = "CN=%s,%s" % (user_name, self.ou)
141 "objectclass": "user",
142 "sAMAccountName": user_name
145 # Store some secret on this user
146 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
148 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
149 invocation_id=self.ldb_dc1.get_invocation_id(),
151 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
152 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
155 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
157 # Check that the user has been added to msDSRevealedUsers
158 self._assert_in_revealed_users(user_dn, expected_user_attributes)
160 def test_rodc_repl_secrets(self):
162 When a secret attribute is set to be replicated to an RODC with
163 the RODC account credentials, it should not replicate if it's in
164 the Allowed RODC Password Replication Group. Once it is added to
165 the group, it should replicate.
167 rand = random.randint(1, 10000000)
168 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
169 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
170 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
171 drsuapi.DRSUAPI_ATTID_unicodePwd,
172 drsuapi.DRSUAPI_ATTID_dBCSPwd]
174 user_name = "test_rodcB_%s" % rand
175 user_dn = "CN=%s,%s" % (user_name, self.ou)
178 "objectclass": "user",
179 "sAMAccountName": user_name
182 # Store some secret on this user
183 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
185 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
186 invocation_id=self.ldb_dc1.get_invocation_id(),
188 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
189 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
194 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
195 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
196 except WERRORError as e:
197 (enum, estr) = e.args
198 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
200 # send the same request again and we should get the same response
202 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
203 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
204 except WERRORError as e1:
205 (enum, estr) = e1.args
206 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
208 # Retry with Administrator credentials, ignores password replication groups
209 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
211 # Check that the user has been added to msDSRevealedUsers
212 self._assert_in_revealed_users(user_dn, expected_user_attributes)
214 def test_rodc_repl_secrets_follow_on_req(self):
216 Checks that an RODC can't subvert an existing (valid) GetNCChanges
217 request to reveal secrets it shouldn't have access to.
220 # send an acceptable request that will match as many GUIDs as possible.
221 # Here we set the SPECIAL_SECRET_PROCESSING flag so that the request gets accepted.
222 # (On the server, this builds up the getnc_state->guids array)
223 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
224 invocation_id=self.ldb_dc1.get_invocation_id(),
225 nc_dn_str=self.ldb_dc1.domain_dn(),
226 exop=drsuapi.DRSUAPI_EXOP_NONE,
228 replica_flags=drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
229 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
231 # Get the next replication chunk, but set REPL_SECRET this time. This
232 # is following on the the previous accepted request, but we've changed
233 # exop to now request secrets. This request should fail
235 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
236 invocation_id=self.ldb_dc1.get_invocation_id(),
237 nc_dn_str=self.ldb_dc1.domain_dn(),
238 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
239 req8.highwatermark = ctr.new_highwatermark
241 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
243 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
244 except RuntimeError as e2:
245 (enum, estr) = e2.args
248 def test_msDSRevealedUsers_admin(self):
250 When a secret attribute is to be replicated to an RODC, the contents
251 of the attribute should be added to the msDSRevealedUsers attribute
252 of the computer object corresponding to the RODC.
255 rand = random.randint(1, 10000000)
256 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
257 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
258 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
259 drsuapi.DRSUAPI_ATTID_unicodePwd,
260 drsuapi.DRSUAPI_ATTID_dBCSPwd]
262 # Add a user on DC1, add it to allowed password replication
263 # group, and replicate to RODC with EXOP_REPL_SECRETS
264 user_name = "test_rodcC_%s" % rand
265 password = "password12#"
266 user_dn = "CN=%s,%s" % (user_name, self.ou)
269 "objectclass": "user",
270 "sAMAccountName": user_name
273 # Store some secret on this user
274 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
276 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
278 add_members_operation=True)
280 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
281 invocation_id=self.ldb_dc1.get_invocation_id(),
283 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
284 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
287 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
289 # Check that the user has been added to msDSRevealedUsers
290 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
292 # Change the user's password on DC1
293 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
295 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
296 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
298 # Replicate to RODC again with EXOP_REPL_SECRETS
299 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
300 invocation_id=self.ldb_dc1.get_invocation_id(),
302 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
303 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
306 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
308 # This is important for Windows, because the entry won't have been
309 # updated in time if we don't have it. Even with this sleep, it only
310 # passes some of the time...
313 # Check that the entry in msDSRevealedUsers has been updated
314 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
315 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
317 # We should be able to delete the user
318 self.ldb_dc1.deleteuser(user_name)
320 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
321 attrs=["msDS-RevealedUsers"])
322 self.assertFalse("msDS-RevealedUsers" in res[0])
324 def test_msDSRevealedUsers(self):
326 When a secret attribute is to be replicated to an RODC, the contents
327 of the attribute should be added to the msDSRevealedUsers attribute
328 of the computer object corresponding to the RODC.
331 rand = random.randint(1, 10000000)
332 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
333 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
334 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
335 drsuapi.DRSUAPI_ATTID_unicodePwd,
336 drsuapi.DRSUAPI_ATTID_dBCSPwd]
338 # Add a user on DC1, add it to allowed password replication
339 # group, and replicate to RODC with EXOP_REPL_SECRETS
340 user_name = "test_rodcD_%s" % rand
341 password = "password12#"
342 user_dn = "CN=%s,%s" % (user_name, self.ou)
345 "objectclass": "user",
346 "sAMAccountName": user_name
349 # Store some secret on this user
350 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
352 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
354 add_members_operation=True)
356 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
357 invocation_id=self.ldb_dc1.get_invocation_id(),
359 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
360 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
363 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
365 # Check that the user has been added to msDSRevealedUsers
366 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
368 # Change the user's password on DC1
369 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
371 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
372 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
374 # Replicate to RODC again with EXOP_REPL_SECRETS
375 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
376 invocation_id=self.ldb_dc1.get_invocation_id(),
378 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
379 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
382 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
384 # This is important for Windows, because the entry won't have been
385 # updated in time if we don't have it. Even with this sleep, it only
386 # passes some of the time...
389 # Check that the entry in msDSRevealedUsers has been updated
390 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
391 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
393 # We should be able to delete the user
394 self.ldb_dc1.deleteuser(user_name)
396 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
397 attrs=["msDS-RevealedUsers"])
398 self.assertFalse("msDS-RevealedUsers" in res[0])
400 def test_msDSRevealedUsers_pas(self):
402 If we provide a Partial Attribute Set when replicating to an RODC,
403 we should ignore it and replicate all of the secret attributes anyway
404 msDSRevealedUsers attribute.
406 rand = random.randint(1, 10000000)
407 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
408 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
409 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
410 drsuapi.DRSUAPI_ATTID_unicodePwd,
411 drsuapi.DRSUAPI_ATTID_dBCSPwd]
412 pas_exceptions = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
413 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
414 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
415 drsuapi.DRSUAPI_ATTID_dBCSPwd]
417 # Add a user on DC1, add it to allowed password replication
418 # group, and replicate to RODC with EXOP_REPL_SECRETS
419 user_name = "test_rodcE_%s" % rand
420 password = "password12#"
421 user_dn = "CN=%s,%s" % (user_name, self.ou)
424 "objectclass": "user",
425 "sAMAccountName": user_name
428 # Store some secret on this user
429 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
431 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
433 add_members_operation=True)
435 pas = drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb, exceptions=pas_exceptions)
436 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
437 invocation_id=self.ldb_dc1.get_invocation_id(),
439 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
440 partial_attribute_set=pas,
443 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
445 # Make sure that we still replicate the secrets
446 for attribute in ctr.first_object.object.attribute_ctr.attributes:
447 if attribute.attid in pas_exceptions:
448 pas_exceptions.remove(attribute.attid)
449 for attribute in pas_exceptions:
450 self.fail("%d was not replicated even though the partial attribute set should be ignored."
453 # Check that the user has been added to msDSRevealedUsers
454 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
456 def test_msDSRevealedUsers_using_other_RODC(self):
458 Ensure that the machine account is tied to the destination DSA.
460 # Create a new identical RODC with just the first letter missing
461 other_rodc_name = self.rodc_name[1:]
462 other_rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
463 site=self.site, netbios_name=other_rodc_name,
464 targetdir=None, domain=None, machinepass=self.rodc_pass)
465 self._create_rodc(other_rodc_ctx)
467 other_rodc_creds = Credentials()
468 other_rodc_creds.guess(other_rodc_ctx.lp)
469 other_rodc_creds.set_username(other_rodc_name+'$')
470 other_rodc_creds.set_password(self.rodc_pass)
472 (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
474 rand = random.randint(1, 10000000)
475 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
476 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
477 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
478 drsuapi.DRSUAPI_ATTID_unicodePwd,
479 drsuapi.DRSUAPI_ATTID_dBCSPwd]
481 user_name = "test_rodcF_%s" % rand
482 user_dn = "CN=%s,%s" % (user_name, self.ou)
485 "objectclass": "user",
486 "sAMAccountName": user_name
489 # Store some secret on this user
490 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
491 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
493 add_members_operation=True)
495 req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
496 invocation_id=self.ldb_dc1.get_invocation_id(),
498 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
499 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
504 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
505 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
506 except WERRORError as e3:
507 (enum, estr) = e3.args
508 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
510 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
511 invocation_id=self.ldb_dc1.get_invocation_id(),
513 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
514 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
519 (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
520 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
521 except WERRORError as e4:
522 (enum, estr) = e4.args
523 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
525 def test_msDSRevealedUsers_local_deny_allow(self):
527 Ensure that the deny trumps allow, and we can modify these
528 attributes directly instead of the global groups.
530 This may fail on Windows due to tokenGroup calculation caching.
532 rand = random.randint(1, 10000000)
533 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
534 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
535 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
536 drsuapi.DRSUAPI_ATTID_unicodePwd,
537 drsuapi.DRSUAPI_ATTID_dBCSPwd]
539 # Add a user on DC1, add it to allowed password replication
540 # group, and replicate to RODC with EXOP_REPL_SECRETS
541 user_name = "test_rodcF_%s" % rand
542 password = "password12#"
543 user_dn = "CN=%s,%s" % (user_name, self.ou)
546 "objectclass": "user",
547 "sAMAccountName": user_name
550 # Store some secret on this user
551 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
553 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
554 invocation_id=self.ldb_dc1.get_invocation_id(),
556 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
557 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
562 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
564 m["msDS-RevealOnDemandGroup"] = \
565 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
566 "msDS-RevealOnDemandGroup")
567 self.ldb_dc1.modify(m)
569 # In local allow, should be success
571 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
573 self.fail("Should have succeeded when in local allow group")
575 self._assert_in_revealed_users(user_dn, expected_user_attributes)
577 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
580 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
582 m["msDS-NeverRevealGroup"] = \
583 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
584 "msDS-NeverRevealGroup")
585 self.ldb_dc1.modify(m)
587 # In local allow and deny, should be failure
589 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
590 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
591 except WERRORError as e5:
592 (enum, estr) = e5.args
593 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
596 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
598 m["msDS-RevealOnDemandGroup"] = \
599 ldb.MessageElement(user_dn, ldb.FLAG_MOD_DELETE,
600 "msDS-RevealOnDemandGroup")
601 self.ldb_dc1.modify(m)
603 # In local deny, should be failure
604 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
606 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
607 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
608 except WERRORError as e6:
609 (enum, estr) = e6.args
610 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
612 def _assert_in_revealed_users(self, user_dn, attrlist):
613 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
614 attrs=["msDS-RevealedUsers"])
615 revealed_users = res[0]["msDS-RevealedUsers"]
619 for attribute in revealed_users:
620 dsdb_dn = dsdb_Dn(self.ldb_dc1, attribute.decode('utf8'))
621 metadata = ndr_unpack(drsblobs.replPropertyMetaData1, dsdb_dn.get_bytes())
622 if user_dn in attribute:
623 unpacked_attrs.append(metadata)
624 packed_attrs.append(dsdb_dn.get_bytes())
625 actual_attrids.append(metadata.attid)
627 self.assertEquals(sorted(actual_attrids), sorted(attrlist))
629 return (packed_attrs, unpacked_attrs)
631 def _assert_attrlist_equals(self, list_1, list_2):
632 return self._assert_attrlist_changed(list_1, list_2, [], num_changes=0, expected_new_usn=False)
634 def _assert_attrlist_changed(self, list_1, list_2, changed_attributes, num_changes=1, expected_new_usn=True):
635 for i in range(len(list_2)):
636 self.assertEquals(list_1[i].attid, list_2[i].attid)
637 self.assertEquals(list_1[i].originating_invocation_id, list_2[i].originating_invocation_id)
638 self.assertEquals(list_1[i].version + num_changes, list_2[i].version)
641 self.assertTrue(list_1[i].originating_usn < list_2[i].originating_usn)
642 self.assertTrue(list_1[i].local_usn < list_2[i].local_usn)
644 self.assertEquals(list_1[i].originating_usn, list_2[i].originating_usn)
645 self.assertEquals(list_1[i].local_usn, list_2[i].local_usn)
647 if list_1[i].attid in changed_attributes:
648 # We do the changes too quickly, so unless we put sleeps
649 # inbetween calls, these remain the same. Checking the USNs
652 #self.assertTrue(list_1[i].originating_change_time < list_2[i].originating_change_time)
654 self.assertEquals(list_1[i].originating_change_time, list_2[i].originating_change_time)
657 def _create_rodc(self, ctx):
658 ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
659 ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
660 ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
662 ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
663 "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
664 "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
665 "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
666 "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
667 ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
669 mysid = ctx.get_mysid()
670 admin_dn = "<SID=%s>" % mysid
671 ctx.managedby = admin_dn
673 ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
674 samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
675 samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
677 ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
678 ctx.secure_channel_type = misc.SEC_CHAN_RODC
680 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
681 drsuapi.DRSUAPI_DRS_PER_SYNC |
682 drsuapi.DRSUAPI_DRS_GET_ANC |
683 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
684 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
686 ctx.join_add_objects()