2 # -*- coding: utf-8 -*-
4 # Tests replication scenarios with different user privileges.
5 # We want to test every replication scenario we can think of against:
6 # - users with only GET_CHANGES privileges
7 # - users with only GET_ALL_CHANGES privileges
8 # - users with both GET_CHANGES and GET_ALL_CHANGES privileges
9 # - users with no privileges
11 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
12 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
14 # This program is free software; you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation; either version 3 of the License, or
17 # (at your option) any later version.
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 # GNU General Public License for more details.
24 # You should have received a copy of the GNU General Public License
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
30 # export DC1=dc1_dns_name
31 # export DC2=dc2_dns_name
32 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
33 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_unpriv -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
38 from samba import werror, WERRORError
40 from samba import sd_utils
42 from ldb import SCOPE_BASE
45 from samba.dcerpc import drsuapi, security
46 from samba.credentials import DONT_USE_KERBEROS
48 class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
49 """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
52 super(DrsReplicaSyncUnprivTestCase, self).setUp()
53 self.get_changes_user = "get-changes-user"
54 self.base_dn = self.ldb_dc1.get_default_basedn()
55 self.user_pass = samba.generate_random_password(12, 16)
57 # add some randomness to the test OU. (Deletion of the last test's
58 # objects can be slow to replicate out. So the OU created by a previous
59 # testenv may still exist at this point).
60 rand = random.randint(1, 10000000)
61 test_ou = "OU=test_getnc_unpriv%d" %rand
62 self.ou = "%s,%s" %(test_ou, self.base_dn)
65 "objectclass": "organizationalUnit"})
66 self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
68 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
70 self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
71 self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
72 user_sid = self.sd_utils.get_object_sid(self.user_dn)
73 self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
75 self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
77 self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
79 # We set DONT_USE_KERBEROS to avoid a race with getting the
80 # user replicated to our selected KDC
81 self.user_creds = self.insta_creds(template=self.get_credentials(),
82 username=self.get_changes_user,
83 userpass=self.user_pass,
84 kerberos_state=DONT_USE_KERBEROS)
85 (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
89 self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
91 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
92 except ldb.LdbError as e1:
93 (enum, string) = e1.args
94 if enum == ldb.ERR_NO_SUCH_OBJECT:
96 super(DrsReplicaSyncUnprivTestCase, self).tearDown()
98 def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None,
99 partial_attribute_set=None):
101 Common function to send a replication request and check the result
102 matches what's expected.
104 req8 = self._exop_req8(dest_dsa=dest_dsa,
105 invocation_id=self.ldb_dc1.get_invocation_id(),
108 replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP,
109 partial_attribute_set=partial_attribute_set)
111 if expected_error is None:
112 # user is OK, request should be accepted without throwing an error
113 (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
116 # check the request is rejected (with the error we're expecting)
118 (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
120 self.fail("Should have failed with user denied access")
121 except WERRORError as e:
122 (enum, estr) = e.args
123 self.assertTrue(enum in expected_error,
124 "Got unexpected error: %s" % estr)
126 def _test_repl_single_obj(self, repl_obj, expected_error,
127 partial_attribute_set=None):
129 Checks that replication on a single object either succeeds or fails as
130 expected (based on the user's access rights)
132 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
134 expected_error=expected_error,
135 partial_attribute_set=partial_attribute_set)
137 def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None):
139 Checks that REPL_SECRET on an object either succeeds or fails as
140 expected (based on the user's access rights)
142 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
144 expected_error=expected_error,
147 def _test_repl_full(self, expected_error, partial_attribute_set=None):
149 Checks that a full replication either succeeds or fails as expected
150 (based on the user's access rights)
152 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
153 repl_obj=self.ldb_dc1.get_default_basedn(),
154 expected_error=expected_error,
155 partial_attribute_set=partial_attribute_set)
157 def _test_repl_full_on_ou(self, repl_obj, expected_error):
159 Full replication on a specific OU should always fail (it should be done
160 against a base NC). The error may vary based on the user's access rights
162 # Just try against the OU created in the test setup
163 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
165 expected_error=expected_error)
167 def test_repl_getchanges_userpriv(self):
169 Tests various replication requests made by a user with only GET_CHANGES
170 rights. Some requests will be accepted, but most will be rejected.
173 # Assign the user GET_CHANGES rights
174 self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes)
176 self._test_repl_single_obj(repl_obj=self.ou,
177 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
178 bad_ou = "OU=bad_obj,%s" % self.ou
179 self._test_repl_single_obj(repl_obj=bad_ou,
180 expected_error=[werror.WERR_DS_DRA_BAD_DN,
181 werror.WERR_DS_DRA_ACCESS_DENIED])
183 self._test_repl_secret(repl_obj=self.ou,
184 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
185 self._test_repl_secret(repl_obj=self.user_dn,
186 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
187 self._test_repl_secret(repl_obj=self.user_dn,
188 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
189 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
190 self._test_repl_secret(repl_obj=bad_ou,
191 expected_error=[werror.WERR_DS_DRA_BAD_DN])
193 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
194 self._test_repl_full_on_ou(repl_obj=self.ou,
195 expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC,
196 werror.WERR_DS_DRA_ACCESS_DENIED])
197 self._test_repl_full_on_ou(repl_obj=bad_ou,
198 expected_error=[werror.WERR_DS_DRA_BAD_NC,
199 werror.WERR_DS_DRA_ACCESS_DENIED])
201 # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
202 # expect the following to succeed
203 self._test_repl_single_obj(repl_obj=self.ou,
205 partial_attribute_set=self.get_partial_attribute_set())
206 self._test_repl_full(expected_error=None,
207 partial_attribute_set=self.get_partial_attribute_set())
209 def test_repl_getallchanges_userpriv(self):
211 Tests various replication requests made by a user with only
212 GET_ALL_CHANGES rights. Note that assigning these rights is possible,
213 but doesn't make a lot of sense. We test it anyway for consistency.
216 # Assign the user GET_ALL_CHANGES rights
217 self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes)
219 # We can expect to get the same responses as an unprivileged user,
220 # i.e. we have permission to see the results, but don't have permission
222 self.test_repl_no_userpriv()
224 def test_repl_both_userpriv(self):
226 Tests various replication requests made by a privileged user (i.e. has
227 both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
231 # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
232 both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes
233 self.sd_utils.dacl_add_ace(self.base_dn, both_rights)
235 self._test_repl_single_obj(repl_obj=self.ou,
237 bad_ou = "OU=bad_obj,%s" % self.ou
238 self._test_repl_single_obj(repl_obj=bad_ou,
239 expected_error=[werror.WERR_DS_DRA_BAD_DN])
241 # Microsoft returns DB_ERROR, Samba returns ACCESS_DENIED
242 self._test_repl_secret(repl_obj=self.ou,
243 expected_error=[werror.WERR_DS_DRA_DB_ERROR,
244 werror.WERR_DS_DRA_ACCESS_DENIED])
245 self._test_repl_secret(repl_obj=self.user_dn,
246 expected_error=[werror.WERR_DS_DRA_DB_ERROR,
247 werror.WERR_DS_DRA_ACCESS_DENIED])
248 # Note that Windows accepts this but Samba rejects it
249 self._test_repl_secret(repl_obj=self.user_dn,
250 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
251 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
253 self._test_repl_secret(repl_obj=bad_ou,
254 expected_error=[werror.WERR_DS_DRA_BAD_DN])
256 self._test_repl_full(expected_error=None)
257 self._test_repl_full_on_ou(repl_obj=self.ou,
258 expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC])
259 self._test_repl_full_on_ou(repl_obj=bad_ou,
260 expected_error=[werror.WERR_DS_DRA_BAD_NC,
261 werror.WERR_DS_DRA_BAD_DN])
263 self._test_repl_single_obj(repl_obj=self.ou,
265 partial_attribute_set=self.get_partial_attribute_set())
266 self._test_repl_full(expected_error=None,
267 partial_attribute_set=self.get_partial_attribute_set())
269 def test_repl_no_userpriv(self):
271 Tests various replication requests made by a unprivileged user.
272 We expect all these requests to be rejected.
275 # Microsoft usually returns BAD_DN, Samba returns ACCESS_DENIED
276 usual_error = [werror.WERR_DS_DRA_BAD_DN, werror.WERR_DS_DRA_ACCESS_DENIED]
278 self._test_repl_single_obj(repl_obj=self.ou,
279 expected_error=usual_error)
280 bad_ou = "OU=bad_obj,%s" % self.ou
281 self._test_repl_single_obj(repl_obj=bad_ou,
282 expected_error=usual_error)
284 self._test_repl_secret(repl_obj=self.ou,
285 expected_error=usual_error)
286 self._test_repl_secret(repl_obj=self.user_dn,
287 expected_error=usual_error)
288 self._test_repl_secret(repl_obj=self.user_dn,
289 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
290 expected_error=usual_error)
291 self._test_repl_secret(repl_obj=bad_ou,
292 expected_error=usual_error)
294 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
295 self._test_repl_full_on_ou(repl_obj=self.ou,
296 expected_error=usual_error)
297 self._test_repl_full_on_ou(repl_obj=bad_ou,
298 expected_error=[werror.WERR_DS_DRA_BAD_NC,
299 werror.WERR_DS_DRA_ACCESS_DENIED])
301 self._test_repl_single_obj(repl_obj=self.ou,
302 expected_error=usual_error,
303 partial_attribute_set=self.get_partial_attribute_set())
304 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED],
305 partial_attribute_set=self.get_partial_attribute_set())