2 # -*- coding: utf-8 -*-
4 # Test replication scenarios involving an RODC
6 # Copyright (C) Catalyst.Net Ltd. 2017
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc1_dns_name [this is unused for the test, but it'll still try to connect]
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_rodc -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
33 from ldb import SCOPE_BASE
35 from samba import WERRORError
36 from samba.join import DCJoinContext
37 from samba.dcerpc import drsuapi, misc, drsblobs, security
38 from samba.drs_utils import drs_DsBind, drs_Replicate
39 from samba.ndr import ndr_unpack, ndr_pack
40 from samba.common import dsdb_Dn
41 from samba.credentials import Credentials
46 def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=[]):
47 '''get a list of attributes for RODC replication'''
48 partial_attribute_set = drsuapi.DsPartialAttributeSet()
49 partial_attribute_set.version = 1
53 # the exact list of attids we send is quite critical. Note that
54 # we do ask for the secret attributes, but set SPECIAL_SECRET_PROCESSING
56 schema_dn = samdb.get_schema_basedn()
57 res = samdb.search(base=schema_dn, scope=ldb.SCOPE_SUBTREE,
58 expression="objectClass=attributeSchema",
59 attrs=["lDAPDisplayName", "systemFlags",
63 ldap_display_name = r["lDAPDisplayName"][0]
64 if "systemFlags" in r:
65 system_flags = r["systemFlags"][0]
66 if (int(system_flags) & (samba.dsdb.DS_FLAG_ATTR_NOT_REPLICATED |
67 samba.dsdb.DS_FLAG_ATTR_IS_CONSTRUCTED)):
69 if "searchFlags" in r:
70 search_flags = r["searchFlags"][0]
71 if (int(search_flags) & samba.dsdb.SEARCH_FLAG_RODC_ATTRIBUTE):
74 attid = samdb1.get_attid_from_lDAPDisplayName(ldap_display_name)
75 if not attid in exceptions:
76 attids.append(int(attid))
80 # the attids do need to be sorted, or windows doesn't return
81 # all the attributes we need
83 partial_attribute_set.attids = attids
84 partial_attribute_set.num_attids = len(attids)
85 return partial_attribute_set
87 class DrsRodcTestCase(drs_base.DrsBaseTestCase):
88 """Intended as a semi-black box test case for replication involving
92 super(DrsRodcTestCase, self).setUp()
93 self.base_dn = self.ldb_dc1.get_default_basedn()
95 self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_drs_rodc")
96 self.allowed_group = "CN=Allowed RODC Password Replication Group,CN=Users,%s" % self.base_dn
98 self.site = self.ldb_dc1.server_site_name()
99 self.rodc_name = "TESTRODCDRS%s" % random.randint(1, 10000000)
100 self.rodc_pass = "password12#"
101 self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
104 self.rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
105 creds=self.get_credentials(),
106 lp=self.get_loadparm(), site=self.site,
107 netbios_name=self.rodc_name,
108 targetdir=None, domain=None,
109 machinepass=self.rodc_pass)
110 self._create_rodc(self.rodc_ctx)
111 self.rodc_ctx.create_tmp_samdb()
112 self.tmp_samdb = self.rodc_ctx.tmp_samdb
114 rodc_creds = Credentials()
115 rodc_creds.guess(self.rodc_ctx.lp)
116 rodc_creds.set_username(self.rodc_name+'$')
117 rodc_creds.set_password(self.rodc_pass)
118 self.rodc_creds = rodc_creds
120 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
121 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
124 self.rodc_ctx.cleanup_old_join()
125 super(DrsRodcTestCase, self).tearDown()
127 def test_admin_repl_secrets(self):
129 When a secret attribute is set to be replicated to an RODC with the
130 admin credentials, it should always replicate regardless of whether
131 or not it's in the Allowed RODC Password Replication Group.
133 rand = random.randint(1, 10000000)
134 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
135 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
136 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
137 drsuapi.DRSUAPI_ATTID_unicodePwd,
138 drsuapi.DRSUAPI_ATTID_dBCSPwd]
140 user_name = "test_rodcA_%s" % rand
141 user_dn = "CN=%s,%s" % (user_name, self.ou)
144 "objectclass": "user",
145 "sAMAccountName": user_name
148 # Store some secret on this user
149 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
151 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
152 invocation_id=self.ldb_dc1.get_invocation_id(),
154 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
155 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
158 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
160 # Check that the user has been added to msDSRevealedUsers
161 self._assert_in_revealed_users(user_dn, expected_user_attributes)
163 def test_rodc_repl_secrets(self):
165 When a secret attribute is set to be replicated to an RODC with
166 the RODC account credentials, it should not replicate if it's in
167 the Allowed RODC Password Replication Group. Once it is added to
168 the group, it should replicate.
170 rand = random.randint(1, 10000000)
171 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
172 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
173 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
174 drsuapi.DRSUAPI_ATTID_unicodePwd,
175 drsuapi.DRSUAPI_ATTID_dBCSPwd]
177 user_name = "test_rodcB_%s" % rand
178 user_dn = "CN=%s,%s" % (user_name, self.ou)
181 "objectclass": "user",
182 "sAMAccountName": user_name
185 # Store some secret on this user
186 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
188 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
189 invocation_id=self.ldb_dc1.get_invocation_id(),
191 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
192 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
197 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
198 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
199 except WERRORError as e:
200 (enum, estr) = e.args
201 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
203 # send the same request again and we should get the same response
205 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
206 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
207 except WERRORError as e1:
208 (enum, estr) = e1.args
209 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
211 # Retry with Administrator credentials, ignores password replication groups
212 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
214 # Check that the user has been added to msDSRevealedUsers
215 self._assert_in_revealed_users(user_dn, expected_user_attributes)
217 def test_rodc_repl_secrets_follow_on_req(self):
219 Checks that an RODC can't subvert an existing (valid) GetNCChanges
220 request to reveal secrets it shouldn't have access to.
223 # send an acceptable request that will match as many GUIDs as possible.
224 # Here we set the SPECIAL_SECRET_PROCESSING flag so that the request gets accepted.
225 # (On the server, this builds up the getnc_state->guids array)
226 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
227 invocation_id=self.ldb_dc1.get_invocation_id(),
228 nc_dn_str=self.ldb_dc1.domain_dn(),
229 exop=drsuapi.DRSUAPI_EXOP_NONE,
231 replica_flags=drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
232 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
234 # Get the next replication chunk, but set REPL_SECRET this time. This
235 # is following on the the previous accepted request, but we've changed
236 # exop to now request secrets. This request should fail
238 req8 = self._exop_req8(dest_dsa=str(self.rodc_ctx.ntds_guid),
239 invocation_id=self.ldb_dc1.get_invocation_id(),
240 nc_dn_str=self.ldb_dc1.domain_dn(),
241 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
242 req8.highwatermark = ctr.new_highwatermark
244 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 8, req8)
246 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
247 except RuntimeError as e2:
248 (enum, estr) = e2.args
251 def test_msDSRevealedUsers_admin(self):
253 When a secret attribute is to be replicated to an RODC, the contents
254 of the attribute should be added to the msDSRevealedUsers attribute
255 of the computer object corresponding to the RODC.
258 rand = random.randint(1, 10000000)
259 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
260 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
261 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
262 drsuapi.DRSUAPI_ATTID_unicodePwd,
263 drsuapi.DRSUAPI_ATTID_dBCSPwd]
265 # Add a user on DC1, add it to allowed password replication
266 # group, and replicate to RODC with EXOP_REPL_SECRETS
267 user_name = "test_rodcC_%s" % rand
268 password = "password12#"
269 user_dn = "CN=%s,%s" % (user_name, self.ou)
272 "objectclass": "user",
273 "sAMAccountName": user_name
276 # Store some secret on this user
277 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
279 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
281 add_members_operation=True)
283 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
284 invocation_id=self.ldb_dc1.get_invocation_id(),
286 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
287 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
290 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
292 # Check that the user has been added to msDSRevealedUsers
293 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
295 # Change the user's password on DC1
296 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
298 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
299 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
301 # Replicate to RODC again with EXOP_REPL_SECRETS
302 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
303 invocation_id=self.ldb_dc1.get_invocation_id(),
305 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
306 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
309 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
311 # This is important for Windows, because the entry won't have been
312 # updated in time if we don't have it. Even with this sleep, it only
313 # passes some of the time...
316 # Check that the entry in msDSRevealedUsers has been updated
317 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
318 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
320 # We should be able to delete the user
321 self.ldb_dc1.deleteuser(user_name)
323 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
324 attrs=["msDS-RevealedUsers"])
325 self.assertFalse("msDS-RevealedUsers" in res[0])
327 def test_msDSRevealedUsers(self):
329 When a secret attribute is to be replicated to an RODC, the contents
330 of the attribute should be added to the msDSRevealedUsers attribute
331 of the computer object corresponding to the RODC.
334 rand = random.randint(1, 10000000)
335 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
336 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
337 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
338 drsuapi.DRSUAPI_ATTID_unicodePwd,
339 drsuapi.DRSUAPI_ATTID_dBCSPwd]
341 # Add a user on DC1, add it to allowed password replication
342 # group, and replicate to RODC with EXOP_REPL_SECRETS
343 user_name = "test_rodcD_%s" % rand
344 password = "password12#"
345 user_dn = "CN=%s,%s" % (user_name, self.ou)
348 "objectclass": "user",
349 "sAMAccountName": user_name
352 # Store some secret on this user
353 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
355 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
357 add_members_operation=True)
359 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
360 invocation_id=self.ldb_dc1.get_invocation_id(),
362 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
363 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
366 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
368 # Check that the user has been added to msDSRevealedUsers
369 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
371 # Change the user's password on DC1
372 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
374 (packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
375 self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
377 # Replicate to RODC again with EXOP_REPL_SECRETS
378 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
379 invocation_id=self.ldb_dc1.get_invocation_id(),
381 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
382 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
385 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
387 # This is important for Windows, because the entry won't have been
388 # updated in time if we don't have it. Even with this sleep, it only
389 # passes some of the time...
392 # Check that the entry in msDSRevealedUsers has been updated
393 (packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
394 self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
396 # We should be able to delete the user
397 self.ldb_dc1.deleteuser(user_name)
399 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
400 attrs=["msDS-RevealedUsers"])
401 self.assertFalse("msDS-RevealedUsers" in res[0])
403 def test_msDSRevealedUsers_pas(self):
405 If we provide a Partial Attribute Set when replicating to an RODC,
406 we should ignore it and replicate all of the secret attributes anyway
407 msDSRevealedUsers attribute.
409 rand = random.randint(1, 10000000)
410 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
411 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
412 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
413 drsuapi.DRSUAPI_ATTID_unicodePwd,
414 drsuapi.DRSUAPI_ATTID_dBCSPwd]
415 pas_exceptions = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
416 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
417 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
418 drsuapi.DRSUAPI_ATTID_dBCSPwd]
420 # Add a user on DC1, add it to allowed password replication
421 # group, and replicate to RODC with EXOP_REPL_SECRETS
422 user_name = "test_rodcE_%s" % rand
423 password = "password12#"
424 user_dn = "CN=%s,%s" % (user_name, self.ou)
427 "objectclass": "user",
428 "sAMAccountName": user_name
431 # Store some secret on this user
432 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
434 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
436 add_members_operation=True)
438 pas = drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb, exceptions=pas_exceptions)
439 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
440 invocation_id=self.ldb_dc1.get_invocation_id(),
442 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
443 partial_attribute_set=pas,
446 (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
448 # Make sure that we still replicate the secrets
449 for attribute in ctr.first_object.object.attribute_ctr.attributes:
450 if attribute.attid in pas_exceptions:
451 pas_exceptions.remove(attribute.attid)
452 for attribute in pas_exceptions:
453 self.fail("%d was not replicated even though the partial attribute set should be ignored."
456 # Check that the user has been added to msDSRevealedUsers
457 (packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
459 def test_msDSRevealedUsers_using_other_RODC(self):
461 Ensure that the machine account is tied to the destination DSA.
463 # Create a new identical RODC with just the first letter missing
464 other_rodc_name = self.rodc_name[1:]
465 other_rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
466 creds=self.get_credentials(),
467 lp=self.get_loadparm(), site=self.site,
468 netbios_name=other_rodc_name,
469 targetdir=None, domain=None,
470 machinepass=self.rodc_pass)
471 self._create_rodc(other_rodc_ctx)
473 other_rodc_creds = Credentials()
474 other_rodc_creds.guess(other_rodc_ctx.lp)
475 other_rodc_creds.set_username(other_rodc_name+'$')
476 other_rodc_creds.set_password(self.rodc_pass)
478 (other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
480 rand = random.randint(1, 10000000)
481 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
482 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
483 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
484 drsuapi.DRSUAPI_ATTID_unicodePwd,
485 drsuapi.DRSUAPI_ATTID_dBCSPwd]
487 user_name = "test_rodcF_%s" % rand
488 user_dn = "CN=%s,%s" % (user_name, self.ou)
491 "objectclass": "user",
492 "sAMAccountName": user_name
495 # Store some secret on this user
496 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
497 self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
499 add_members_operation=True)
501 req10 = self._getnc_req10(dest_dsa=str(other_rodc_ctx.ntds_guid),
502 invocation_id=self.ldb_dc1.get_invocation_id(),
504 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
505 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
510 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
511 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
512 except WERRORError as e3:
513 (enum, estr) = e3.args
514 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
516 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
517 invocation_id=self.ldb_dc1.get_invocation_id(),
519 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
520 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
525 (level, ctr) = other_rodc_drs.DsGetNCChanges(other_rodc_drs_handle, 10, req10)
526 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
527 except WERRORError as e4:
528 (enum, estr) = e4.args
529 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
531 def test_msDSRevealedUsers_local_deny_allow(self):
533 Ensure that the deny trumps allow, and we can modify these
534 attributes directly instead of the global groups.
536 This may fail on Windows due to tokenGroup calculation caching.
538 rand = random.randint(1, 10000000)
539 expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
540 drsuapi.DRSUAPI_ATTID_supplementalCredentials,
541 drsuapi.DRSUAPI_ATTID_ntPwdHistory,
542 drsuapi.DRSUAPI_ATTID_unicodePwd,
543 drsuapi.DRSUAPI_ATTID_dBCSPwd]
545 # Add a user on DC1, add it to allowed password replication
546 # group, and replicate to RODC with EXOP_REPL_SECRETS
547 user_name = "test_rodcF_%s" % rand
548 password = "password12#"
549 user_dn = "CN=%s,%s" % (user_name, self.ou)
552 "objectclass": "user",
553 "sAMAccountName": user_name
556 # Store some secret on this user
557 self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
559 req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
560 invocation_id=self.ldb_dc1.get_invocation_id(),
562 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
563 partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
568 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
570 m["msDS-RevealOnDemandGroup"] = \
571 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
572 "msDS-RevealOnDemandGroup")
573 self.ldb_dc1.modify(m)
575 # In local allow, should be success
577 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
579 self.fail("Should have succeeded when in local allow group")
581 self._assert_in_revealed_users(user_dn, expected_user_attributes)
583 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
586 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
588 m["msDS-NeverRevealGroup"] = \
589 ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD,
590 "msDS-NeverRevealGroup")
591 self.ldb_dc1.modify(m)
593 # In local allow and deny, should be failure
595 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
596 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
597 except WERRORError as e5:
598 (enum, estr) = e5.args
599 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
602 m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
604 m["msDS-RevealOnDemandGroup"] = \
605 ldb.MessageElement(user_dn, ldb.FLAG_MOD_DELETE,
606 "msDS-RevealOnDemandGroup")
607 self.ldb_dc1.modify(m)
609 # In local deny, should be failure
610 (self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, self.rodc_creds)
612 (level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
613 self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
614 except WERRORError as e6:
615 (enum, estr) = e6.args
616 self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
618 def _assert_in_revealed_users(self, user_dn, attrlist):
619 res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
620 attrs=["msDS-RevealedUsers"])
621 revealed_users = res[0]["msDS-RevealedUsers"]
625 for attribute in revealed_users:
626 dsdb_dn = dsdb_Dn(self.ldb_dc1, attribute.decode('utf8'))
627 metadata = ndr_unpack(drsblobs.replPropertyMetaData1, dsdb_dn.get_bytes())
628 if user_dn in attribute:
629 unpacked_attrs.append(metadata)
630 packed_attrs.append(dsdb_dn.get_bytes())
631 actual_attrids.append(metadata.attid)
633 self.assertEquals(sorted(actual_attrids), sorted(attrlist))
635 return (packed_attrs, unpacked_attrs)
637 def _assert_attrlist_equals(self, list_1, list_2):
638 return self._assert_attrlist_changed(list_1, list_2, [], num_changes=0, expected_new_usn=False)
640 def _assert_attrlist_changed(self, list_1, list_2, changed_attributes, num_changes=1, expected_new_usn=True):
641 for i in range(len(list_2)):
642 self.assertEquals(list_1[i].attid, list_2[i].attid)
643 self.assertEquals(list_1[i].originating_invocation_id, list_2[i].originating_invocation_id)
644 self.assertEquals(list_1[i].version + num_changes, list_2[i].version)
647 self.assertTrue(list_1[i].originating_usn < list_2[i].originating_usn)
648 self.assertTrue(list_1[i].local_usn < list_2[i].local_usn)
650 self.assertEquals(list_1[i].originating_usn, list_2[i].originating_usn)
651 self.assertEquals(list_1[i].local_usn, list_2[i].local_usn)
653 if list_1[i].attid in changed_attributes:
654 # We do the changes too quickly, so unless we put sleeps
655 # inbetween calls, these remain the same. Checking the USNs
658 #self.assertTrue(list_1[i].originating_change_time < list_2[i].originating_change_time)
660 self.assertEquals(list_1[i].originating_change_time, list_2[i].originating_change_time)
663 def _create_rodc(self, ctx):
664 ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
665 ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
666 ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
668 ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
669 "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
670 "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
671 "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
672 "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
673 ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
675 mysid = ctx.get_mysid()
676 admin_dn = "<SID=%s>" % mysid
677 ctx.managedby = admin_dn
679 ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
680 samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
681 samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
683 ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
684 ctx.secure_channel_type = misc.SEC_CHAN_RODC
686 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
687 drsuapi.DRSUAPI_DRS_PER_SYNC |
688 drsuapi.DRSUAPI_DRS_GET_ANC |
689 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
690 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
692 ctx.join_add_objects()