2 # -*- coding: utf-8 -*-
4 # Tests replication scenarios with different user privileges.
5 # We want to test every replication scenario we can think of against:
6 # - users with only GET_CHANGES privileges
7 # - users with only GET_ALL_CHANGES privileges
8 # - users with both GET_CHANGES and GET_ALL_CHANGES privileges
9 # - users with no privileges
11 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
12 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
14 # This program is free software; you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation; either version 3 of the License, or
17 # (at your option) any later version.
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 # GNU General Public License for more details.
24 # You should have received a copy of the GNU General Public License
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
30 # export DC1=dc1_dns_name
31 # export DC2=dc2_dns_name
32 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
33 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_unpriv -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
38 from samba import werror, WERRORError
40 from samba import sd_utils
42 from ldb import SCOPE_BASE
44 from samba.dcerpc import drsuapi, security
45 from samba.credentials import DONT_USE_KERBEROS
47 class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
48 """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
51 super(DrsReplicaSyncUnprivTestCase, self).setUp()
52 self.get_changes_user = "get-changes-user"
53 self.base_dn = self.ldb_dc1.get_default_basedn()
54 self.ou = "OU=test_getncchanges,%s" % self.base_dn
55 self.user_pass = samba.generate_random_password(12, 16)
58 "objectclass": "organizationalUnit"})
59 self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
60 userou="OU=test_getncchanges")
61 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
63 self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
64 self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
65 user_sid = self.sd_utils.get_object_sid(self.user_dn)
66 self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
68 self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
70 self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
72 # We set DONT_USE_KERBEROS to avoid a race with getting the
73 # user replicated to our selected KDC
74 self.user_creds = self.insta_creds(template=self.get_credentials(),
75 username=self.get_changes_user,
76 userpass=self.user_pass,
77 kerberos_state=DONT_USE_KERBEROS)
78 (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
82 self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
84 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
85 except ldb.LdbError as (enum, string):
86 if enum == ldb.ERR_NO_SUCH_OBJECT:
88 super(DrsReplicaSyncUnprivTestCase, self).tearDown()
90 def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None,
91 partial_attribute_set=None):
93 Common function to send a replication request and check the result
94 matches what's expected.
96 req8 = self._exop_req8(dest_dsa=dest_dsa,
97 invocation_id=self.ldb_dc1.get_invocation_id(),
100 replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP,
101 partial_attribute_set=partial_attribute_set)
103 if expected_error is None:
104 # user is OK, request should be accepted without throwing an error
105 (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
108 # check the request is rejected (with the error we're expecting)
110 (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
112 self.fail("Should have failed with user denied access")
113 except WERRORError as (enum, estr):
114 self.assertTrue(enum in expected_error,
115 "Got unexpected error: %s" % estr)
117 def _test_repl_single_obj(self, repl_obj, expected_error,
118 partial_attribute_set=None):
120 Checks that replication on a single object either succeeds or fails as
121 expected (based on the user's access rights)
123 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
125 expected_error=expected_error,
126 partial_attribute_set=partial_attribute_set)
128 def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None):
130 Checks that REPL_SECRET on an object either succeeds or fails as
131 expected (based on the user's access rights)
133 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
135 expected_error=expected_error,
138 def _test_repl_full(self, expected_error, partial_attribute_set=None):
140 Checks that a full replication either succeeds or fails as expected
141 (based on the user's access rights)
143 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
144 repl_obj=self.ldb_dc1.get_default_basedn(),
145 expected_error=expected_error,
146 partial_attribute_set=partial_attribute_set)
148 def _test_repl_full_on_ou(self, expected_error):
150 Full replication on a specific OU should always fail (it should be done
151 against a base NC). The error may vary based on the user's access rights
153 # Just try against the OU created in the test setup
154 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
156 expected_error=expected_error)
158 def test_repl_getchanges_userpriv(self):
160 Tests various replication requests made by a user with only GET_CHANGES
161 rights. Some requests will be accepted, but most will be rejected.
164 # Assign the user GET_CHANGES rights
165 self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes)
167 self._test_repl_single_obj(repl_obj=self.ou,
168 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
170 self._test_repl_secret(repl_obj=self.ou,
171 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
172 self._test_repl_secret(repl_obj=self.user_dn,
173 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
174 self._test_repl_secret(repl_obj=self.user_dn,
175 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
176 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
178 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
179 self._test_repl_full_on_ou(expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC,
180 werror.WERR_DS_DRA_ACCESS_DENIED])
182 # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
183 # expect the following to succeed
184 self._test_repl_single_obj(repl_obj=self.ou,
186 partial_attribute_set=self.get_partial_attribute_set())
187 self._test_repl_full(expected_error=None,
188 partial_attribute_set=self.get_partial_attribute_set())
190 def test_repl_getallchanges_userpriv(self):
192 Tests various replication requests made by a user with only
193 GET_ALL_CHANGES rights. Note that assigning these rights is possible,
194 but doesn't make a lot of sense. We test it anyway for consistency.
197 # Assign the user GET_ALL_CHANGES rights
198 self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes)
200 # We can expect to get the same responses as an unprivileged user,
201 # i.e. we have permission to see the results, but don't have permission
203 self.test_repl_no_userpriv()
205 def test_repl_both_userpriv(self):
207 Tests various replication requests made by a privileged user (i.e. has
208 both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
212 # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
213 both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes
214 self.sd_utils.dacl_add_ace(self.base_dn, both_rights)
216 self._test_repl_single_obj(repl_obj=self.ou,
219 # Microsoft returns DB_ERROR, Samba returns ACCESS_DENIED
220 self._test_repl_secret(repl_obj=self.ou,
221 expected_error=[werror.WERR_DS_DRA_DB_ERROR,
222 werror.WERR_DS_DRA_ACCESS_DENIED])
223 self._test_repl_secret(repl_obj=self.user_dn,
224 expected_error=[werror.WERR_DS_DRA_DB_ERROR,
225 werror.WERR_DS_DRA_ACCESS_DENIED])
226 # Note that Windows accepts this but Samba rejects it
227 self._test_repl_secret(repl_obj=self.user_dn,
228 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
229 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
231 self._test_repl_full(expected_error=None)
232 self._test_repl_full_on_ou(expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC])
234 self._test_repl_single_obj(repl_obj=self.ou,
236 partial_attribute_set=self.get_partial_attribute_set())
237 self._test_repl_full(expected_error=None,
238 partial_attribute_set=self.get_partial_attribute_set())
240 def test_repl_no_userpriv(self):
242 Tests various replication requests made by a unprivileged user.
243 We expect all these requests to be rejected.
246 # Microsoft usually returns BAD_DN, Samba returns ACCESS_DENIED
247 usual_error = [werror.WERR_DS_DRA_BAD_DN, werror.WERR_DS_DRA_ACCESS_DENIED]
249 self._test_repl_single_obj(repl_obj=self.ou,
250 expected_error=usual_error)
252 self._test_repl_secret(repl_obj=self.ou,
253 expected_error=usual_error)
254 self._test_repl_secret(repl_obj=self.user_dn,
255 expected_error=usual_error)
256 self._test_repl_secret(repl_obj=self.user_dn,
257 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
258 expected_error=usual_error)
260 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
261 self._test_repl_full_on_ou(expected_error=usual_error)
263 self._test_repl_single_obj(repl_obj=self.ou,
264 expected_error=usual_error,
265 partial_attribute_set=self.get_partial_attribute_set())
266 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED],
267 partial_attribute_set=self.get_partial_attribute_set())