2 # -*- coding: utf-8 -*-
4 # Test replication scenarios involving an RODC
6 # Copyright (C) Catalyst.Net Ltd. 2017
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc1_dns_name [this is unused for the test, but it'll still try to connect]
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_rodc -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
33 from ldb import SCOPE_BASE
35 from samba import WERRORError
36 from samba.join import DCJoinContext
37 from samba.dcerpc import drsuapi, misc, drsblobs, security
38 from samba.drs_utils import drs_DsBind, drs_Replicate
39 from samba.ndr import ndr_unpack, ndr_pack
40 from samba.common import dsdb_Dn
41 from samba.credentials import Credentials
47 def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=[]):
48 '''get a list of attributes for RODC replication'''
49 partial_attribute_set = drsuapi.DsPartialAttributeSet()
50 partial_attribute_set.version = 1
54 # the exact list of attids we send is quite critical. Note that
55 # we do ask for the secret attributes, but set SPECIAL_SECRET_PROCESSING
57 schema_dn = samdb.get_schema_basedn()
58 res = samdb.search(base=schema_dn, scope=ldb.SCOPE_SUBTREE,
59 expression="objectClass=attributeSchema",
60 attrs=["lDAPDisplayName", "systemFlags",
64 ldap_display_name = r["lDAPDisplayName"][0]
65 if "systemFlags" in r:
66 system_flags = r["systemFlags"][0]
67 if (int(system_flags) & (samba.dsdb.DS_FLAG_ATTR_NOT_REPLICATED |
68 samba.dsdb.DS_FLAG_ATTR_IS_CONSTRUCTED)):
70 if "searchFlags" in r:
71 search_flags = r["searchFlags"][0]
72 if (int(search_flags) & samba.dsdb.SEARCH_FLAG_RODC_ATTRIBUTE):
75 attid = samdb1.get_attid_from_lDAPDisplayName(ldap_display_name)
76 if not attid in exceptions:
77 attids.append(int(attid))
81 # the attids do need to be sorted, or windows doesn't return
82 # all the attributes we need
84 partial_attribute_set.attids = attids
85 partial_attribute_set.num_attids = len(attids)
86 return partial_attribute_set
89 class DrsRodcTestCase(drs_base.DrsBaseTestCase):
90 """Intended as a semi-black box test case for replication involving
94 super(DrsRodcTestCase, self).setUp()
95 self.base_dn = self.ldb_dc1.get_default_basedn()
97 self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_drs_rodc")
98 self.allowed_group = "CN=Allowed RODC Password Replication Group,CN=Users,%s" % self.base_dn
100 self.site = self.ldb_dc1.server_site_name()
101 self.rodc_name = "TESTRODCDRS%s" % random.randint(1, 10000000)
102 self.rodc_pass = "password12#"
103 self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
105 self.rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
106 creds=self.get_credentials(),
107 lp=self.get_loadparm(), site=self.site,
108 netbios_name=self.rodc_name,
109 targetdir=None, domain=None,
110 machinepass=self.rodc_pass)
111 self._create_rodc(self.rodc_ctx)
112 self.rodc_ctx.create_tmp_samdb()
113 self.tmp_samdb = self.rodc_ctx.tmp_samdb
115 rodc_creds = Credentials()
116 rodc_creds.guess(self.rodc_ctx.lp)
117 rodc_creds.set_username(self.rodc_name + '$')
118 rodc_creds.set_password(self.rodc_pass)
119 self.rodc_creds = rodc_creds
121 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
122 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
125 self.rodc_ctx.cleanup_old_join()
126 super(DrsRodcTestCase, self).tearDown()
128 def test_admin_repl_secrets(self):
130 When a secret attribute is set to be replicated to an RODC with the
131 admin credentials, it should always replicate regardless of whether
132 or not it's in the Allowed RODC Password Replication Group.
134 rand = random.randint(1, 10000000)
135 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
136 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
137 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
138 drsuapi.DRSUAPI_ATTID_unicodePwd,
139 drsuapi.DRSUAPI_ATTID_dBCSPwd]
141 user_name = "test_rodcA_%s" % rand
142 user_dn = "CN=%s,%s" % (user_name, self.ou)
145 "objectclass": "user",
146 "sAMAccountName": user_name
149 # Store some secret on this user
150 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
152 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
153 invocation_id=self.ldb_dc1.get_invocation_id(),
155 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
156 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
159 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
161 # Check that the user has been added to msDSRevealedUsers
162 self._assert_in_revealed_users(user_dn, expected_user_attributes)
164 def test_rodc_repl_secrets(self):
166 When a secret attribute is set to be replicated to an RODC with
167 the RODC account credentials, it should not replicate if it's in
168 the Allowed RODC Password Replication Group. Once it is added to
169 the group, it should replicate.
171 rand = random.randint(1, 10000000)
172 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
173 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
174 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
175 drsuapi.DRSUAPI_ATTID_unicodePwd,
176 drsuapi.DRSUAPI_ATTID_dBCSPwd]
178 user_name = "test_rodcB_%s" % rand
179 user_dn = "CN=%s,%s" % (user_name, self.ou)
182 "objectclass": "user",
183 "sAMAccountName": user_name
186 # Store some secret on this user
187 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
189 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
190 invocation_id=self.ldb_dc1.get_invocation_id(),
192 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
193 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
198 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
199 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
200 except WERRORError as e:
201 (enum, estr) = e.args
202 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
204 # send the same request again and we should get the same response
206 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
207 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
208 except WERRORError as e1:
209 (enum, estr) = e1.args
210 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
212 # Retry with Administrator credentials, ignores password replication groups
213 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
215 # Check that the user has been added to msDSRevealedUsers
216 self._assert_in_revealed_users(user_dn, expected_user_attributes)
218 def test_rodc_repl_secrets_follow_on_req(self):
220 Checks that an RODC can't subvert an existing (valid) GetNCChanges
221 request to reveal secrets it shouldn't have access to.
224 # send an acceptable request that will match as many GUIDs as possible.
225 # Here we set the SPECIAL_SECRET_PROCESSING flag so that the request gets accepted.
226 # (On the server, this builds up the getnc_state->guids array)
227 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
228 invocation_id=self.ldb_dc1.get_invocation_id(),
229 nc_dn_str=self.ldb_dc1.domain_dn(),
230 exop=drsuapi.DRSUAPI_EXOP_NONE,
232 replica_flags=drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
233 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
235 # Get the next replication chunk, but set REPL_SECRET this time. This
236 # is following on the the previous accepted request, but we've changed
237 # exop to now request secrets. This request should fail
239 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
240 invocation_id=self.ldb_dc1.get_invocation_id(),
241 nc_dn_str=self.ldb_dc1.domain_dn(),
242 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
243 req8.highwatermark = ctr.new_highwatermark
245 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
247 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
248 except RuntimeError as e2:
249 (enum, estr) = e2.args
252 def test_msDSRevealedUsers_admin(self):
254 When a secret attribute is to be replicated to an RODC, the contents
255 of the attribute should be added to the msDSRevealedUsers attribute
256 of the computer object corresponding to the RODC.
259 rand = random.randint(1, 10000000)
260 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
261 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
262 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
263 drsuapi.DRSUAPI_ATTID_unicodePwd,
264 drsuapi.DRSUAPI_ATTID_dBCSPwd]
266 # Add a user on DC1, add it to allowed password replication
267 # group, and replicate to RODC with EXOP_REPL_SECRETS
268 user_name = "test_rodcC_%s" % rand
269 password = "password12#"
270 user_dn = "CN=%s,%s" % (user_name, self.ou)
273 "objectclass": "user",
274 "sAMAccountName": user_name
277 # Store some secret on this user
278 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
280 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
282 add_members_operation=True)
284 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
285 invocation_id=self.ldb_dc1.get_invocation_id(),
287 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
288 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
291 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
293 # Check that the user has been added to msDSRevealedUsers
294 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
296 # Change the user's password on DC1
297 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password + "1", False, user_name)
299 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
300 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
302 # Replicate to RODC again with EXOP_REPL_SECRETS
303 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
304 invocation_id=self.ldb_dc1.get_invocation_id(),
306 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
307 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
310 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
312 # This is important for Windows, because the entry won't have been
313 # updated in time if we don't have it. Even with this sleep, it only
314 # passes some of the time...
317 # Check that the entry in msDSRevealedUsers has been updated
318 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
319 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
321 # We should be able to delete the user
322 self.ldb_dc1.deleteuser(user_name)
324 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
325 attrs=["msDS-RevealedUsers"])
326 self.assertFalse("msDS-RevealedUsers" in res[0])
328 def test_msDSRevealedUsers(self):
330 When a secret attribute is to be replicated to an RODC, the contents
331 of the attribute should be added to the msDSRevealedUsers attribute
332 of the computer object corresponding to the RODC.
335 rand = random.randint(1, 10000000)
336 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
337 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
338 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
339 drsuapi.DRSUAPI_ATTID_unicodePwd,
340 drsuapi.DRSUAPI_ATTID_dBCSPwd]
342 # Add a user on DC1, add it to allowed password replication
343 # group, and replicate to RODC with EXOP_REPL_SECRETS
344 user_name = "test_rodcD_%s" % rand
345 password = "password12#"
346 user_dn = "CN=%s,%s" % (user_name, self.ou)
349 "objectclass": "user",
350 "sAMAccountName": user_name
353 # Store some secret on this user
354 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
356 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
358 add_members_operation=True)
360 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
361 invocation_id=self.ldb_dc1.get_invocation_id(),
363 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
364 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
367 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
369 # Check that the user has been added to msDSRevealedUsers
370 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
372 # Change the user's password on DC1
373 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password + "1", False, user_name)
375 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
376 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
378 # Replicate to RODC again with EXOP_REPL_SECRETS
379 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
380 invocation_id=self.ldb_dc1.get_invocation_id(),
382 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
383 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
386 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
388 # This is important for Windows, because the entry won't have been
389 # updated in time if we don't have it. Even with this sleep, it only
390 # passes some of the time...
393 # Check that the entry in msDSRevealedUsers has been updated
394 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
395 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
397 # We should be able to delete the user
398 self.ldb_dc1.deleteuser(user_name)
400 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
401 attrs=["msDS-RevealedUsers"])
402 self.assertFalse("msDS-RevealedUsers" in res[0])
404 def test_msDSRevealedUsers_pas(self):
406 If we provide a Partial Attribute Set when replicating to an RODC,
407 we should ignore it and replicate all of the secret attributes anyway
408 msDSRevealedUsers attribute.
410 rand = random.randint(1, 10000000)
411 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
412 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
413 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
414 drsuapi.DRSUAPI_ATTID_unicodePwd,
415 drsuapi.DRSUAPI_ATTID_dBCSPwd]
416 pas_exceptions = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
417 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
418 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
419 drsuapi.DRSUAPI_ATTID_dBCSPwd]
421 # Add a user on DC1, add it to allowed password replication
422 # group, and replicate to RODC with EXOP_REPL_SECRETS
423 user_name = "test_rodcE_%s" % rand
424 password = "password12#"
425 user_dn = "CN=%s,%s" % (user_name, self.ou)
428 "objectclass": "user",
429 "sAMAccountName": user_name
432 # Store some secret on this user
433 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
435 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
437 add_members_operation=True)
439 pas = drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb, exceptions=pas_exceptions)
440 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
441 invocation_id=self.ldb_dc1.get_invocation_id(),
443 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
444 partial_attribute_set=pas,
447 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
449 # Make sure that we still replicate the secrets
450 for attribute in ctr.first_object.object.attribute_ctr.attributes:
451 if attribute.attid in pas_exceptions:
452 pas_exceptions.remove(attribute.attid)
453 for attribute in pas_exceptions:
454 self.fail("%d was not replicated even though the partial attribute set should be ignored."
457 # Check that the user has been added to msDSRevealedUsers
458 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
460 def test_msDSRevealedUsers_using_other_RODC(self):
462 Ensure that the machine account is tied to the destination DSA.
464 # Create a new identical RODC with just the first letter missing
465 other_rodc_name = self.rodc_name[1:]
466 other_rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
467 creds=self.get_credentials(),
468 lp=self.get_loadparm(), site=self.site,
469 netbios_name=other_rodc_name,
470 targetdir=None, domain=None,
471 machinepass=self.rodc_pass)
472 self._create_rodc(other_rodc_ctx)
474 other_rodc_creds = Credentials()
475 other_rodc_creds.guess(other_rodc_ctx.lp)
476 other_rodc_creds.set_username(other_rodc_name + '$')
477 other_rodc_creds.set_password(self.rodc_pass)
479 (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
481 rand = random.randint(1, 10000000)
482 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
483 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
484 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
485 drsuapi.DRSUAPI_ATTID_unicodePwd,
486 drsuapi.DRSUAPI_ATTID_dBCSPwd]
488 user_name = "test_rodcF_%s" % rand
489 user_dn = "CN=%s,%s" % (user_name, self.ou)
492 "objectclass": "user",
493 "sAMAccountName": user_name
496 # Store some secret on this user
497 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
498 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
500 add_members_operation=True)
502 req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
503 invocation_id=self.ldb_dc1.get_invocation_id(),
505 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
506 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
511 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
512 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
513 except WERRORError as e3:
514 (enum, estr) = e3.args
515 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
517 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
518 invocation_id=self.ldb_dc1.get_invocation_id(),
520 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
521 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
526 (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
527 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
528 except WERRORError as e4:
529 (enum, estr) = e4.args
530 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
532 def test_msDSRevealedUsers_local_deny_allow(self):
534 Ensure that the deny trumps allow, and we can modify these
535 attributes directly instead of the global groups.
537 This may fail on Windows due to tokenGroup calculation caching.
539 rand = random.randint(1, 10000000)
540 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
541 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
542 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
543 drsuapi.DRSUAPI_ATTID_unicodePwd,
544 drsuapi.DRSUAPI_ATTID_dBCSPwd]
546 # Add a user on DC1, add it to allowed password replication
547 # group, and replicate to RODC with EXOP_REPL_SECRETS
548 user_name = "test_rodcF_%s" % rand
549 password = "password12#"
550 user_dn = "CN=%s,%s" % (user_name, self.ou)
553 "objectclass": "user",
554 "sAMAccountName": user_name
557 # Store some secret on this user
558 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
560 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
561 invocation_id=self.ldb_dc1.get_invocation_id(),
563 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
564 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
569 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
571 m["msDS-RevealOnDemandGroup"] = \
572 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
573 "msDS-RevealOnDemandGroup")
574 self.ldb_dc1.modify(m)
576 # In local allow, should be success
578 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
580 self.fail("Should have succeeded when in local allow group")
582 self._assert_in_revealed_users(user_dn, expected_user_attributes)
584 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
587 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
589 m["msDS-NeverRevealGroup"] = \
590 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
591 "msDS-NeverRevealGroup")
592 self.ldb_dc1.modify(m)
594 # In local allow and deny, should be failure
596 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
597 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
598 except WERRORError as e5:
599 (enum, estr) = e5.args
600 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
603 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
605 m["msDS-RevealOnDemandGroup"] = \
606 ldb.MessageElement(user_dn, ldb.FLAG_MOD_DELETE,
607 "msDS-RevealOnDemandGroup")
608 self.ldb_dc1.modify(m)
610 # In local deny, should be failure
611 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
613 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
614 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
615 except WERRORError as e6:
616 (enum, estr) = e6.args
617 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
619 def _assert_in_revealed_users(self, user_dn, attrlist):
620 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
621 attrs=["msDS-RevealedUsers"])
622 revealed_users = res[0]["msDS-RevealedUsers"]
626 for attribute in revealed_users:
627 dsdb_dn = dsdb_Dn(self.ldb_dc1, attribute.decode('utf8'))
628 metadata = ndr_unpack(drsblobs.replPropertyMetaData1, dsdb_dn.get_bytes())
629 if user_dn in attribute:
630 unpacked_attrs.append(metadata)
631 packed_attrs.append(dsdb_dn.get_bytes())
632 actual_attrids.append(metadata.attid)
634 self.assertEquals(sorted(actual_attrids), sorted(attrlist))
636 return (packed_attrs, unpacked_attrs)
638 def _assert_attrlist_equals(self, list_1, list_2):
639 return self._assert_attrlist_changed(list_1, list_2, [], num_changes=0, expected_new_usn=False)
641 def _assert_attrlist_changed(self, list_1, list_2, changed_attributes, num_changes=1, expected_new_usn=True):
642 for i in range(len(list_2)):
643 self.assertEquals(list_1[i].attid, list_2[i].attid)
644 self.assertEquals(list_1[i].originating_invocation_id, list_2[i].originating_invocation_id)
645 self.assertEquals(list_1[i].version + num_changes, list_2[i].version)
648 self.assertTrue(list_1[i].originating_usn < list_2[i].originating_usn)
649 self.assertTrue(list_1[i].local_usn < list_2[i].local_usn)
651 self.assertEquals(list_1[i].originating_usn, list_2[i].originating_usn)
652 self.assertEquals(list_1[i].local_usn, list_2[i].local_usn)
654 if list_1[i].attid in changed_attributes:
655 # We do the changes too quickly, so unless we put sleeps
656 # inbetween calls, these remain the same. Checking the USNs
659 #self.assertTrue(list_1[i].originating_change_time < list_2[i].originating_change_time)
661 self.assertEquals(list_1[i].originating_change_time, list_2[i].originating_change_time)
663 def _create_rodc(self, ctx):
664 ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
665 ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
666 ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
668 ctx.never_reveal_sid = ["<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
669 "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
670 "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
671 "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
672 "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
673 ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
675 mysid = ctx.get_mysid()
676 admin_dn = "<SID=%s>" % mysid
677 ctx.managedby = admin_dn
679 ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
680 samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
681 samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
683 ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
684 ctx.secure_channel_type = misc.SEC_CHAN_RODC
686 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
687 drsuapi.DRSUAPI_DRS_PER_SYNC |
688 drsuapi.DRSUAPI_DRS_GET_ANC |
689 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
690 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
692 ctx.join_add_objects()