2 # -*- coding: utf-8 -*-
4 # Test replication scenarios involving an RODC
6 # Copyright (C) Catalyst.Net Ltd. 2017
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc1_dns_name [this is unused for the test, but it'll still try to connect]
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_rodc -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
33 from ldb import SCOPE_BASE
35 from samba import WERRORError
36 from samba.join import dc_join
37 from samba.dcerpc import drsuapi, misc, drsblobs, security
38 from samba.drs_utils import drs_DsBind, drs_Replicate
39 from samba.ndr import ndr_unpack, ndr_pack
40 from samba.common import dsdb_Dn
41 from samba.credentials import Credentials
46 def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=[]):
47 '''get a list of attributes for RODC replication'''
48 partial_attribute_set = drsuapi.DsPartialAttributeSet()
49 partial_attribute_set.version = 1
53 # the exact list of attids we send is quite critical. Note that
54 # we do ask for the secret attributes, but set SPECIAL_SECRET_PROCESSING
56 schema_dn = samdb.get_schema_basedn()
57 res = samdb.search(base=schema_dn, scope=ldb.SCOPE_SUBTREE,
58 expression="objectClass=attributeSchema",
59 attrs=["lDAPDisplayName", "systemFlags",
63 ldap_display_name = r["lDAPDisplayName"][0]
64 if "systemFlags" in r:
65 system_flags = r["systemFlags"][0]
66 if (int(system_flags) & (samba.dsdb.DS_FLAG_ATTR_NOT_REPLICATED |
67 samba.dsdb.DS_FLAG_ATTR_IS_CONSTRUCTED)):
69 if "searchFlags" in r:
70 search_flags = r["searchFlags"][0]
71 if (int(search_flags) & samba.dsdb.SEARCH_FLAG_RODC_ATTRIBUTE):
74 attid = samdb1.get_attid_from_lDAPDisplayName(ldap_display_name)
75 if not attid in exceptions:
76 attids.append(int(attid))
80 # the attids do need to be sorted, or windows doesn't return
81 # all the attributes we need
83 partial_attribute_set.attids = attids
84 partial_attribute_set.num_attids = len(attids)
85 return partial_attribute_set
87 class DrsRodcTestCase(drs_base.DrsBaseTestCase):
88 """Intended as a semi-black box test case for replication involving
92 super(DrsRodcTestCase, self).setUp()
93 self.base_dn = self.ldb_dc1.get_default_basedn()
95 rand = random.randint(1, 10000000)
97 self.ou = "OU=test_drs_rodc%s,%s" % (rand, self.base_dn)
100 "objectclass": "organizationalUnit"
102 self.allowed_group = "CN=Allowed RODC Password Replication Group,CN=Users,%s" % self.base_dn
104 self.site = self.ldb_dc1.server_site_name()
105 self.rodc_name = "TESTRODCDRS%s" % rand
106 self.rodc_pass = "password12#"
107 self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
110 self.rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
111 site=self.site, netbios_name=self.rodc_name,
112 targetdir=None, domain=None, machinepass=self.rodc_pass)
113 self._create_rodc(self.rodc_ctx)
114 self.rodc_ctx.create_tmp_samdb()
115 self.tmp_samdb = self.rodc_ctx.tmp_samdb
117 rodc_creds = Credentials()
118 rodc_creds.guess(self.rodc_ctx.lp)
119 rodc_creds.set_username(self.rodc_name+'$')
120 rodc_creds.set_password(self.rodc_pass)
121 self.rodc_creds = rodc_creds
123 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
124 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
127 self.rodc_ctx.cleanup_old_join()
128 super(DrsRodcTestCase, self).tearDown()
130 def test_admin_repl_secrets(self):
132 When a secret attribute is set to be replicated to an RODC with the
133 admin credentials, it should always replicate regardless of whether
134 or not it's in the Allowed RODC Password Replication Group.
136 rand = random.randint(1, 10000000)
137 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
138 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
139 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
140 drsuapi.DRSUAPI_ATTID_unicodePwd,
141 drsuapi.DRSUAPI_ATTID_dBCSPwd]
143 user_name = "test_rodcA_%s" % rand
144 user_dn = "CN=%s,%s" % (user_name, self.ou)
147 "objectclass": "user",
148 "sAMAccountName": user_name
151 # Store some secret on this user
152 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
154 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
155 invocation_id=self.ldb_dc1.get_invocation_id(),
157 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
158 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
161 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
163 # Check that the user has been added to msDSRevealedUsers
164 self._assert_in_revealed_users(user_dn, expected_user_attributes)
166 def test_rodc_repl_secrets(self):
168 When a secret attribute is set to be replicated to an RODC with
169 the RODC account credentials, it should not replicate if it's in
170 the Allowed RODC Password Replication Group. Once it is added to
171 the group, it should replicate.
173 rand = random.randint(1, 10000000)
174 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
175 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
176 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
177 drsuapi.DRSUAPI_ATTID_unicodePwd,
178 drsuapi.DRSUAPI_ATTID_dBCSPwd]
180 user_name = "test_rodcB_%s" % rand
181 user_dn = "CN=%s,%s" % (user_name, self.ou)
184 "objectclass": "user",
185 "sAMAccountName": user_name
188 # Store some secret on this user
189 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
191 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
192 invocation_id=self.ldb_dc1.get_invocation_id(),
194 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
195 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
200 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
201 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
202 except WERRORError as (enum, estr):
203 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
205 # send the same request again and we should get the same response
207 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
208 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
209 except WERRORError as (enum, estr):
210 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
212 # Retry with Administrator credentials, ignores password replication groups
213 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
215 # Check that the user has been added to msDSRevealedUsers
216 self._assert_in_revealed_users(user_dn, expected_user_attributes)
218 def test_rodc_repl_secrets_follow_on_req(self):
220 Checks that an RODC can't subvert an existing (valid) GetNCChanges
221 request to reveal secrets it shouldn't have access to.
224 # send an acceptable request that will match as many GUIDs as possible.
225 # Here we set the SPECIAL_SECRET_PROCESSING flag so that the request gets accepted.
226 # (On the server, this builds up the getnc_state->guids array)
227 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
228 invocation_id=self.ldb_dc1.get_invocation_id(),
229 nc_dn_str=self.ldb_dc1.domain_dn(),
230 exop=drsuapi.DRSUAPI_EXOP_NONE,
232 replica_flags=drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
233 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
235 # Get the next replication chunk, but set REPL_SECRET this time. This
236 # is following on the the previous accepted request, but we've changed
237 # exop to now request secrets. This request should fail
239 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
240 invocation_id=self.ldb_dc1.get_invocation_id(),
241 nc_dn_str=self.ldb_dc1.domain_dn(),
242 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
243 req8.highwatermark = ctr.new_highwatermark
245 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
247 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
248 except RuntimeError as (enum, estr):
251 def test_msDSRevealedUsers_admin(self):
253 When a secret attribute is to be replicated to an RODC, the contents
254 of the attribute should be added to the msDSRevealedUsers attribute
255 of the computer object corresponding to the RODC.
258 rand = random.randint(1, 10000000)
259 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
260 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
261 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
262 drsuapi.DRSUAPI_ATTID_unicodePwd,
263 drsuapi.DRSUAPI_ATTID_dBCSPwd]
265 # Add a user on DC1, add it to allowed password replication
266 # group, and replicate to RODC with EXOP_REPL_SECRETS
267 user_name = "test_rodcC_%s" % rand
268 password = "password12#"
269 user_dn = "CN=%s,%s" % (user_name, self.ou)
272 "objectclass": "user",
273 "sAMAccountName": user_name
276 # Store some secret on this user
277 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
279 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
281 add_members_operation=True)
283 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
284 invocation_id=self.ldb_dc1.get_invocation_id(),
286 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
287 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
290 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
292 # Check that the user has been added to msDSRevealedUsers
293 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
295 # Change the user's password on DC1
296 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
298 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
299 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
301 # Replicate to RODC again with EXOP_REPL_SECRETS
302 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
303 invocation_id=self.ldb_dc1.get_invocation_id(),
305 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
306 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
309 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
311 # This is important for Windows, because the entry won't have been
312 # updated in time if we don't have it. Even with this sleep, it only
313 # passes some of the time...
316 # Check that the entry in msDSRevealedUsers has been updated
317 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
318 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
320 # We should be able to delete the user
321 self.ldb_dc1.deleteuser(user_name)
323 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
324 attrs=["msDS-RevealedUsers"])
325 self.assertFalse("msDS-RevealedUsers" in res[0])
327 def test_msDSRevealedUsers(self):
329 When a secret attribute is to be replicated to an RODC, the contents
330 of the attribute should be added to the msDSRevealedUsers attribute
331 of the computer object corresponding to the RODC.
334 rand = random.randint(1, 10000000)
335 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
336 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
337 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
338 drsuapi.DRSUAPI_ATTID_unicodePwd,
339 drsuapi.DRSUAPI_ATTID_dBCSPwd]
341 # Add a user on DC1, add it to allowed password replication
342 # group, and replicate to RODC with EXOP_REPL_SECRETS
343 user_name = "test_rodcD_%s" % rand
344 password = "password12#"
345 user_dn = "CN=%s,%s" % (user_name, self.ou)
348 "objectclass": "user",
349 "sAMAccountName": user_name
352 # Store some secret on this user
353 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
355 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
357 add_members_operation=True)
359 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
360 invocation_id=self.ldb_dc1.get_invocation_id(),
362 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
363 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
366 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
368 # Check that the user has been added to msDSRevealedUsers
369 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
371 # Change the user's password on DC1
372 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
374 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
375 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
377 # Replicate to RODC again with EXOP_REPL_SECRETS
378 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
379 invocation_id=self.ldb_dc1.get_invocation_id(),
381 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
382 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
385 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
387 # This is important for Windows, because the entry won't have been
388 # updated in time if we don't have it. Even with this sleep, it only
389 # passes some of the time...
392 # Check that the entry in msDSRevealedUsers has been updated
393 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
394 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
396 # We should be able to delete the user
397 self.ldb_dc1.deleteuser(user_name)
399 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
400 attrs=["msDS-RevealedUsers"])
401 self.assertFalse("msDS-RevealedUsers" in res[0])
403 def test_msDSRevealedUsers_pas(self):
405 If we provide a Partial Attribute Set when replicating to an RODC,
406 we should ignore it and replicate all of the secret attributes anyway
407 msDSRevealedUsers attribute.
409 rand = random.randint(1, 10000000)
410 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
411 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
412 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
413 drsuapi.DRSUAPI_ATTID_unicodePwd,
414 drsuapi.DRSUAPI_ATTID_dBCSPwd]
415 pas_exceptions = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
416 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
417 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
418 drsuapi.DRSUAPI_ATTID_dBCSPwd]
420 # Add a user on DC1, add it to allowed password replication
421 # group, and replicate to RODC with EXOP_REPL_SECRETS
422 user_name = "test_rodcE_%s" % rand
423 password = "password12#"
424 user_dn = "CN=%s,%s" % (user_name, self.ou)
427 "objectclass": "user",
428 "sAMAccountName": user_name
431 # Store some secret on this user
432 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
434 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
436 add_members_operation=True)
438 pas = drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb, exceptions=pas_exceptions)
439 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
440 invocation_id=self.ldb_dc1.get_invocation_id(),
442 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
443 partial_attribute_set=pas,
446 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
448 # Make sure that we still replicate the secrets
449 for attribute in ctr.first_object.object.attribute_ctr.attributes:
450 if attribute.attid in pas_exceptions:
451 pas_exceptions.remove(attribute.attid)
452 for attribute in pas_exceptions:
453 self.fail("%d was not replicated even though the partial attribute set should be ignored."
456 # Check that the user has been added to msDSRevealedUsers
457 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
459 def test_msDSRevealedUsers_using_other_RODC(self):
461 Ensure that the machine account is tied to the destination DSA.
463 # Create a new identical RODC with just the first letter missing
464 other_rodc_name = self.rodc_name[1:]
465 other_rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
466 site=self.site, netbios_name=other_rodc_name,
467 targetdir=None, domain=None, machinepass=self.rodc_pass)
468 self._create_rodc(other_rodc_ctx)
470 other_rodc_creds = Credentials()
471 other_rodc_creds.guess(other_rodc_ctx.lp)
472 other_rodc_creds.set_username(other_rodc_name+'$')
473 other_rodc_creds.set_password(self.rodc_pass)
475 (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
477 rand = random.randint(1, 10000000)
478 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
479 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
480 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
481 drsuapi.DRSUAPI_ATTID_unicodePwd,
482 drsuapi.DRSUAPI_ATTID_dBCSPwd]
484 user_name = "test_rodcF_%s" % rand
485 user_dn = "CN=%s,%s" % (user_name, self.ou)
488 "objectclass": "user",
489 "sAMAccountName": user_name
492 # Store some secret on this user
493 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
494 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
496 add_members_operation=True)
498 req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
499 invocation_id=self.ldb_dc1.get_invocation_id(),
501 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
502 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
507 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
508 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
509 except WERRORError as (enum, estr):
510 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
512 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
513 invocation_id=self.ldb_dc1.get_invocation_id(),
515 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
516 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
521 (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
522 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
523 except WERRORError as (enum, estr):
524 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
526 def test_msDSRevealedUsers_local_deny_allow(self):
528 Ensure that the deny trumps allow, and we can modify these
529 attributes directly instead of the global groups.
531 This may fail on Windows due to tokenGroup calculation caching.
533 rand = random.randint(1, 10000000)
534 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
535 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
536 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
537 drsuapi.DRSUAPI_ATTID_unicodePwd,
538 drsuapi.DRSUAPI_ATTID_dBCSPwd]
540 # Add a user on DC1, add it to allowed password replication
541 # group, and replicate to RODC with EXOP_REPL_SECRETS
542 user_name = "test_rodcF_%s" % rand
543 password = "password12#"
544 user_dn = "CN=%s,%s" % (user_name, self.ou)
547 "objectclass": "user",
548 "sAMAccountName": user_name
551 # Store some secret on this user
552 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
554 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
555 invocation_id=self.ldb_dc1.get_invocation_id(),
557 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
558 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
563 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
565 m["msDS-RevealOnDemandGroup"] = \
566 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
567 "msDS-RevealOnDemandGroup")
568 self.ldb_dc1.modify(m)
570 # In local allow, should be success
572 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
574 self.fail("Should have succeeded when in local allow group")
576 self._assert_in_revealed_users(user_dn, expected_user_attributes)
578 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
581 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
583 m["msDS-NeverRevealGroup"] = \
584 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
585 "msDS-NeverRevealGroup")
586 self.ldb_dc1.modify(m)
588 # In local allow and deny, should be failure
590 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
591 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
592 except WERRORError as (enum, estr):
593 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
596 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
598 m["msDS-RevealOnDemandGroup"] = \
599 ldb.MessageElement(user_dn, ldb.FLAG_MOD_DELETE,
600 "msDS-RevealOnDemandGroup")
601 self.ldb_dc1.modify(m)
603 # In local deny, should be failure
604 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
606 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
607 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
608 except WERRORError as (enum, estr):
609 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
611 def _assert_in_revealed_users(self, user_dn, attrlist):
612 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
613 attrs=["msDS-RevealedUsers"])
614 revealed_users = res[0]["msDS-RevealedUsers"]
618 for attribute in revealed_users:
619 dsdb_dn = dsdb_Dn(self.ldb_dc1, attribute)
620 metadata = ndr_unpack(drsblobs.replPropertyMetaData1, dsdb_dn.get_bytes())
621 if user_dn in attribute:
622 unpacked_attrs.append(metadata)
623 packed_attrs.append(dsdb_dn.get_bytes())
624 actual_attrids.append(metadata.attid)
626 self.assertEquals(sorted(actual_attrids), sorted(attrlist))
628 return (packed_attrs, unpacked_attrs)
630 def _assert_attrlist_equals(self, list_1, list_2):
631 return self._assert_attrlist_changed(list_1, list_2, [], num_changes=0, expected_new_usn=False)
633 def _assert_attrlist_changed(self, list_1, list_2, changed_attributes, num_changes=1, expected_new_usn=True):
634 for i in range(len(list_2)):
635 self.assertEquals(list_1[i].attid, list_2[i].attid)
636 self.assertEquals(list_1[i].originating_invocation_id, list_2[i].originating_invocation_id)
637 self.assertEquals(list_1[i].version + num_changes, list_2[i].version)
640 self.assertTrue(list_1[i].originating_usn < list_2[i].originating_usn)
641 self.assertTrue(list_1[i].local_usn < list_2[i].local_usn)
643 self.assertEquals(list_1[i].originating_usn, list_2[i].originating_usn)
644 self.assertEquals(list_1[i].local_usn, list_2[i].local_usn)
646 if list_1[i].attid in changed_attributes:
647 # We do the changes too quickly, so unless we put sleeps
648 # inbetween calls, these remain the same. Checking the USNs
651 #self.assertTrue(list_1[i].originating_change_time < list_2[i].originating_change_time)
653 self.assertEquals(list_1[i].originating_change_time, list_2[i].originating_change_time)
656 def _create_rodc(self, ctx):
657 ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
658 ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
659 ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
661 ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
662 "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
663 "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
664 "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
665 "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
666 ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
668 mysid = ctx.get_mysid()
669 admin_dn = "<SID=%s>" % mysid
670 ctx.managedby = admin_dn
672 ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
673 samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
674 samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
676 ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
677 ctx.secure_channel_type = misc.SEC_CHAN_RODC
679 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
680 drsuapi.DRSUAPI_DRS_PER_SYNC |
681 drsuapi.DRSUAPI_DRS_GET_ANC |
682 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
683 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
685 ctx.join_add_objects()