2 # -*- coding: utf-8 -*-
4 # Test replication scenarios involving an RODC
6 # Copyright (C) Catalyst.Net Ltd. 2017
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc1_dns_name [this is unused for the test, but it'll still try to connect]
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_rodc -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
33 from ldb import SCOPE_BASE
35 from samba import WERRORError
36 from samba.join import DCJoinContext
37 from samba.dcerpc import drsuapi, misc, drsblobs, security
38 from samba.drs_utils import drs_DsBind, drs_Replicate
39 from samba.ndr import ndr_unpack, ndr_pack
40 from samba.common import dsdb_Dn
41 from samba.credentials import Credentials
47 def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=[]):
48 '''get a list of attributes for RODC replication'''
49 partial_attribute_set = drsuapi.DsPartialAttributeSet()
50 partial_attribute_set.version = 1
54 # the exact list of attids we send is quite critical. Note that
55 # we do ask for the secret attributes, but set SPECIAL_SECRET_PROCESSING
57 schema_dn = samdb.get_schema_basedn()
58 res = samdb.search(base=schema_dn, scope=ldb.SCOPE_SUBTREE,
59 expression="objectClass=attributeSchema",
60 attrs=["lDAPDisplayName", "systemFlags",
64 ldap_display_name = r["lDAPDisplayName"][0]
65 if "systemFlags" in r:
66 system_flags = r["systemFlags"][0]
67 if (int(system_flags) & (samba.dsdb.DS_FLAG_ATTR_NOT_REPLICATED |
68 samba.dsdb.DS_FLAG_ATTR_IS_CONSTRUCTED)):
70 if "searchFlags" in r:
71 search_flags = r["searchFlags"][0]
72 if (int(search_flags) & samba.dsdb.SEARCH_FLAG_RODC_ATTRIBUTE):
75 attid = samdb1.get_attid_from_lDAPDisplayName(ldap_display_name)
76 if not attid in exceptions:
77 attids.append(int(attid))
81 # the attids do need to be sorted, or windows doesn't return
82 # all the attributes we need
84 partial_attribute_set.attids = attids
85 partial_attribute_set.num_attids = len(attids)
86 return partial_attribute_set
89 class DrsRodcTestCase(drs_base.DrsBaseTestCase):
90 """Intended as a semi-black box test case for replication involving
94 super(DrsRodcTestCase, self).setUp()
95 self.base_dn = self.ldb_dc1.get_default_basedn()
97 self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_drs_rodc")
98 self.allowed_group = "CN=Allowed RODC Password Replication Group,CN=Users,%s" % self.base_dn
100 self.site = self.ldb_dc1.server_site_name()
101 self.rodc_name = "TESTRODCDRS%s" % random.randint(1, 10000000)
102 self.rodc_pass = "password12#"
103 self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
106 self.rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
107 creds=self.get_credentials(),
108 lp=self.get_loadparm(), site=self.site,
109 netbios_name=self.rodc_name,
110 targetdir=None, domain=None,
111 machinepass=self.rodc_pass)
112 self._create_rodc(self.rodc_ctx)
113 self.rodc_ctx.create_tmp_samdb()
114 self.tmp_samdb = self.rodc_ctx.tmp_samdb
116 rodc_creds = Credentials()
117 rodc_creds.guess(self.rodc_ctx.lp)
118 rodc_creds.set_username(self.rodc_name + '$')
119 rodc_creds.set_password(self.rodc_pass)
120 self.rodc_creds = rodc_creds
122 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
123 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
126 self.rodc_ctx.cleanup_old_join()
127 super(DrsRodcTestCase, self).tearDown()
129 def test_admin_repl_secrets(self):
131 When a secret attribute is set to be replicated to an RODC with the
132 admin credentials, it should always replicate regardless of whether
133 or not it's in the Allowed RODC Password Replication Group.
135 rand = random.randint(1, 10000000)
136 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
137 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
138 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
139 drsuapi.DRSUAPI_ATTID_unicodePwd,
140 drsuapi.DRSUAPI_ATTID_dBCSPwd]
142 user_name = "test_rodcA_%s" % rand
143 user_dn = "CN=%s,%s" % (user_name, self.ou)
146 "objectclass": "user",
147 "sAMAccountName": user_name
150 # Store some secret on this user
151 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
153 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
154 invocation_id=self.ldb_dc1.get_invocation_id(),
156 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
157 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
160 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
162 # Check that the user has been added to msDSRevealedUsers
163 self._assert_in_revealed_users(user_dn, expected_user_attributes)
165 def test_rodc_repl_secrets(self):
167 When a secret attribute is set to be replicated to an RODC with
168 the RODC account credentials, it should not replicate if it's in
169 the Allowed RODC Password Replication Group. Once it is added to
170 the group, it should replicate.
172 rand = random.randint(1, 10000000)
173 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
174 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
175 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
176 drsuapi.DRSUAPI_ATTID_unicodePwd,
177 drsuapi.DRSUAPI_ATTID_dBCSPwd]
179 user_name = "test_rodcB_%s" % rand
180 user_dn = "CN=%s,%s" % (user_name, self.ou)
183 "objectclass": "user",
184 "sAMAccountName": user_name
187 # Store some secret on this user
188 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
190 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
191 invocation_id=self.ldb_dc1.get_invocation_id(),
193 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
194 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
199 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
200 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
201 except WERRORError as e:
202 (enum, estr) = e.args
203 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
205 # send the same request again and we should get the same response
207 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
208 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
209 except WERRORError as e1:
210 (enum, estr) = e1.args
211 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
213 # Retry with Administrator credentials, ignores password replication groups
214 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
216 # Check that the user has been added to msDSRevealedUsers
217 self._assert_in_revealed_users(user_dn, expected_user_attributes)
219 def test_rodc_repl_secrets_follow_on_req(self):
221 Checks that an RODC can't subvert an existing (valid) GetNCChanges
222 request to reveal secrets it shouldn't have access to.
225 # send an acceptable request that will match as many GUIDs as possible.
226 # Here we set the SPECIAL_SECRET_PROCESSING flag so that the request gets accepted.
227 # (On the server, this builds up the getnc_state->guids array)
228 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
229 invocation_id=self.ldb_dc1.get_invocation_id(),
230 nc_dn_str=self.ldb_dc1.domain_dn(),
231 exop=drsuapi.DRSUAPI_EXOP_NONE,
233 replica_flags=drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
234 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
236 # Get the next replication chunk, but set REPL_SECRET this time. This
237 # is following on the the previous accepted request, but we've changed
238 # exop to now request secrets. This request should fail
240 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
241 invocation_id=self.ldb_dc1.get_invocation_id(),
242 nc_dn_str=self.ldb_dc1.domain_dn(),
243 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
244 req8.highwatermark = ctr.new_highwatermark
246 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
248 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
249 except RuntimeError as e2:
250 (enum, estr) = e2.args
253 def test_msDSRevealedUsers_admin(self):
255 When a secret attribute is to be replicated to an RODC, the contents
256 of the attribute should be added to the msDSRevealedUsers attribute
257 of the computer object corresponding to the RODC.
260 rand = random.randint(1, 10000000)
261 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
262 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
263 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
264 drsuapi.DRSUAPI_ATTID_unicodePwd,
265 drsuapi.DRSUAPI_ATTID_dBCSPwd]
267 # Add a user on DC1, add it to allowed password replication
268 # group, and replicate to RODC with EXOP_REPL_SECRETS
269 user_name = "test_rodcC_%s" % rand
270 password = "password12#"
271 user_dn = "CN=%s,%s" % (user_name, self.ou)
274 "objectclass": "user",
275 "sAMAccountName": user_name
278 # Store some secret on this user
279 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
281 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
283 add_members_operation=True)
285 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
286 invocation_id=self.ldb_dc1.get_invocation_id(),
288 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
289 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
292 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
294 # Check that the user has been added to msDSRevealedUsers
295 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
297 # Change the user's password on DC1
298 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password + "1", False, user_name)
300 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
301 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
303 # Replicate to RODC again with EXOP_REPL_SECRETS
304 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
305 invocation_id=self.ldb_dc1.get_invocation_id(),
307 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
308 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
311 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
313 # This is important for Windows, because the entry won't have been
314 # updated in time if we don't have it. Even with this sleep, it only
315 # passes some of the time...
318 # Check that the entry in msDSRevealedUsers has been updated
319 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
320 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
322 # We should be able to delete the user
323 self.ldb_dc1.deleteuser(user_name)
325 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
326 attrs=["msDS-RevealedUsers"])
327 self.assertFalse("msDS-RevealedUsers" in res[0])
329 def test_msDSRevealedUsers(self):
331 When a secret attribute is to be replicated to an RODC, the contents
332 of the attribute should be added to the msDSRevealedUsers attribute
333 of the computer object corresponding to the RODC.
336 rand = random.randint(1, 10000000)
337 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
338 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
339 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
340 drsuapi.DRSUAPI_ATTID_unicodePwd,
341 drsuapi.DRSUAPI_ATTID_dBCSPwd]
343 # Add a user on DC1, add it to allowed password replication
344 # group, and replicate to RODC with EXOP_REPL_SECRETS
345 user_name = "test_rodcD_%s" % rand
346 password = "password12#"
347 user_dn = "CN=%s,%s" % (user_name, self.ou)
350 "objectclass": "user",
351 "sAMAccountName": user_name
354 # Store some secret on this user
355 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
357 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
359 add_members_operation=True)
361 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
362 invocation_id=self.ldb_dc1.get_invocation_id(),
364 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
365 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
368 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
370 # Check that the user has been added to msDSRevealedUsers
371 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
373 # Change the user's password on DC1
374 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password + "1", False, user_name)
376 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
377 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
379 # Replicate to RODC again with EXOP_REPL_SECRETS
380 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
381 invocation_id=self.ldb_dc1.get_invocation_id(),
383 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
384 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
387 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
389 # This is important for Windows, because the entry won't have been
390 # updated in time if we don't have it. Even with this sleep, it only
391 # passes some of the time...
394 # Check that the entry in msDSRevealedUsers has been updated
395 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
396 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
398 # We should be able to delete the user
399 self.ldb_dc1.deleteuser(user_name)
401 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
402 attrs=["msDS-RevealedUsers"])
403 self.assertFalse("msDS-RevealedUsers" in res[0])
405 def test_msDSRevealedUsers_pas(self):
407 If we provide a Partial Attribute Set when replicating to an RODC,
408 we should ignore it and replicate all of the secret attributes anyway
409 msDSRevealedUsers attribute.
411 rand = random.randint(1, 10000000)
412 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
413 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
414 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
415 drsuapi.DRSUAPI_ATTID_unicodePwd,
416 drsuapi.DRSUAPI_ATTID_dBCSPwd]
417 pas_exceptions = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
418 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
419 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
420 drsuapi.DRSUAPI_ATTID_dBCSPwd]
422 # Add a user on DC1, add it to allowed password replication
423 # group, and replicate to RODC with EXOP_REPL_SECRETS
424 user_name = "test_rodcE_%s" % rand
425 password = "password12#"
426 user_dn = "CN=%s,%s" % (user_name, self.ou)
429 "objectclass": "user",
430 "sAMAccountName": user_name
433 # Store some secret on this user
434 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
436 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
438 add_members_operation=True)
440 pas = drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb, exceptions=pas_exceptions)
441 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
442 invocation_id=self.ldb_dc1.get_invocation_id(),
444 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
445 partial_attribute_set=pas,
448 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
450 # Make sure that we still replicate the secrets
451 for attribute in ctr.first_object.object.attribute_ctr.attributes:
452 if attribute.attid in pas_exceptions:
453 pas_exceptions.remove(attribute.attid)
454 for attribute in pas_exceptions:
455 self.fail("%d was not replicated even though the partial attribute set should be ignored."
458 # Check that the user has been added to msDSRevealedUsers
459 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
461 def test_msDSRevealedUsers_using_other_RODC(self):
463 Ensure that the machine account is tied to the destination DSA.
465 # Create a new identical RODC with just the first letter missing
466 other_rodc_name = self.rodc_name[1:]
467 other_rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
468 creds=self.get_credentials(),
469 lp=self.get_loadparm(), site=self.site,
470 netbios_name=other_rodc_name,
471 targetdir=None, domain=None,
472 machinepass=self.rodc_pass)
473 self._create_rodc(other_rodc_ctx)
475 other_rodc_creds = Credentials()
476 other_rodc_creds.guess(other_rodc_ctx.lp)
477 other_rodc_creds.set_username(other_rodc_name + '$')
478 other_rodc_creds.set_password(self.rodc_pass)
480 (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
482 rand = random.randint(1, 10000000)
483 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
484 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
485 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
486 drsuapi.DRSUAPI_ATTID_unicodePwd,
487 drsuapi.DRSUAPI_ATTID_dBCSPwd]
489 user_name = "test_rodcF_%s" % rand
490 user_dn = "CN=%s,%s" % (user_name, self.ou)
493 "objectclass": "user",
494 "sAMAccountName": user_name
497 # Store some secret on this user
498 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
499 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
501 add_members_operation=True)
503 req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
504 invocation_id=self.ldb_dc1.get_invocation_id(),
506 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
507 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
512 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
513 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
514 except WERRORError as e3:
515 (enum, estr) = e3.args
516 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
518 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
519 invocation_id=self.ldb_dc1.get_invocation_id(),
521 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
522 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
527 (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
528 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
529 except WERRORError as e4:
530 (enum, estr) = e4.args
531 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
533 def test_msDSRevealedUsers_local_deny_allow(self):
535 Ensure that the deny trumps allow, and we can modify these
536 attributes directly instead of the global groups.
538 This may fail on Windows due to tokenGroup calculation caching.
540 rand = random.randint(1, 10000000)
541 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
542 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
543 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
544 drsuapi.DRSUAPI_ATTID_unicodePwd,
545 drsuapi.DRSUAPI_ATTID_dBCSPwd]
547 # Add a user on DC1, add it to allowed password replication
548 # group, and replicate to RODC with EXOP_REPL_SECRETS
549 user_name = "test_rodcF_%s" % rand
550 password = "password12#"
551 user_dn = "CN=%s,%s" % (user_name, self.ou)
554 "objectclass": "user",
555 "sAMAccountName": user_name
558 # Store some secret on this user
559 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
561 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
562 invocation_id=self.ldb_dc1.get_invocation_id(),
564 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
565 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
570 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
572 m["msDS-RevealOnDemandGroup"] = \
573 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
574 "msDS-RevealOnDemandGroup")
575 self.ldb_dc1.modify(m)
577 # In local allow, should be success
579 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
581 self.fail("Should have succeeded when in local allow group")
583 self._assert_in_revealed_users(user_dn, expected_user_attributes)
585 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
588 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
590 m["msDS-NeverRevealGroup"] = \
591 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
592 "msDS-NeverRevealGroup")
593 self.ldb_dc1.modify(m)
595 # In local allow and deny, should be failure
597 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
598 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
599 except WERRORError as e5:
600 (enum, estr) = e5.args
601 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
604 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
606 m["msDS-RevealOnDemandGroup"] = \
607 ldb.MessageElement(user_dn, ldb.FLAG_MOD_DELETE,
608 "msDS-RevealOnDemandGroup")
609 self.ldb_dc1.modify(m)
611 # In local deny, should be failure
612 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
614 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
615 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
616 except WERRORError as e6:
617 (enum, estr) = e6.args
618 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
620 def _assert_in_revealed_users(self, user_dn, attrlist):
621 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
622 attrs=["msDS-RevealedUsers"])
623 revealed_users = res[0]["msDS-RevealedUsers"]
627 for attribute in revealed_users:
628 dsdb_dn = dsdb_Dn(self.ldb_dc1, attribute.decode('utf8'))
629 metadata = ndr_unpack(drsblobs.replPropertyMetaData1, dsdb_dn.get_bytes())
630 if user_dn in attribute:
631 unpacked_attrs.append(metadata)
632 packed_attrs.append(dsdb_dn.get_bytes())
633 actual_attrids.append(metadata.attid)
635 self.assertEquals(sorted(actual_attrids), sorted(attrlist))
637 return (packed_attrs, unpacked_attrs)
639 def _assert_attrlist_equals(self, list_1, list_2):
640 return self._assert_attrlist_changed(list_1, list_2, [], num_changes=0, expected_new_usn=False)
642 def _assert_attrlist_changed(self, list_1, list_2, changed_attributes, num_changes=1, expected_new_usn=True):
643 for i in range(len(list_2)):
644 self.assertEquals(list_1[i].attid, list_2[i].attid)
645 self.assertEquals(list_1[i].originating_invocation_id, list_2[i].originating_invocation_id)
646 self.assertEquals(list_1[i].version + num_changes, list_2[i].version)
649 self.assertTrue(list_1[i].originating_usn < list_2[i].originating_usn)
650 self.assertTrue(list_1[i].local_usn < list_2[i].local_usn)
652 self.assertEquals(list_1[i].originating_usn, list_2[i].originating_usn)
653 self.assertEquals(list_1[i].local_usn, list_2[i].local_usn)
655 if list_1[i].attid in changed_attributes:
656 # We do the changes too quickly, so unless we put sleeps
657 # inbetween calls, these remain the same. Checking the USNs
660 #self.assertTrue(list_1[i].originating_change_time < list_2[i].originating_change_time)
662 self.assertEquals(list_1[i].originating_change_time, list_2[i].originating_change_time)
665 def _create_rodc(self, ctx):
666 ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
667 ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
668 ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
670 ctx.never_reveal_sid = ["<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
671 "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
672 "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
673 "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
674 "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
675 ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
677 mysid = ctx.get_mysid()
678 admin_dn = "<SID=%s>" % mysid
679 ctx.managedby = admin_dn
681 ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
682 samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
683 samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
685 ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
686 ctx.secure_channel_type = misc.SEC_CHAN_RODC
688 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
689 drsuapi.DRSUAPI_DRS_PER_SYNC |
690 drsuapi.DRSUAPI_DRS_GET_ANC |
691 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
692 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
694 ctx.join_add_objects()