PEP8: fix E225: missing whitespace around operator
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / getnc_unpriv.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests replication scenarios with different user privileges.
5 # We want to test every replication scenario we can think of against:
6 # - users with only GET_CHANGES privileges
7 # - users with only GET_ALL_CHANGES privileges
8 # - users with both GET_CHANGES and GET_ALL_CHANGES privileges
9 # - users with no privileges
10 #
11 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
12 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
13 #
14 # This program is free software; you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation; either version 3 of the License, or
17 # (at your option) any later version.
18 #
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 # GNU General Public License for more details.
23 #
24 # You should have received a copy of the GNU General Public License
25 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
26 #
27
28 #
29 # Usage:
30 #  export DC1=dc1_dns_name
31 #  export DC2=dc2_dns_name
32 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
33 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_unpriv -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
34 #
35
36 import drs_base
37 import samba.tests
38 from samba import werror, WERRORError
39
40 from samba import sd_utils
41 import ldb
42 from ldb import SCOPE_BASE
43 import random
44
45 from samba.dcerpc import drsuapi, security
46 from samba.credentials import DONT_USE_KERBEROS
47
48 class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
49     """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
50
51     def setUp(self):
52         super(DrsReplicaSyncUnprivTestCase, self).setUp()
53         self.get_changes_user = "get-changes-user"
54         self.base_dn = self.ldb_dc1.get_default_basedn()
55         self.user_pass = samba.generate_random_password(12, 16)
56
57         # add some randomness to the test OU. (Deletion of the last test's
58         # objects can be slow to replicate out. So the OU created by a previous
59         # testenv may still exist at this point).
60         rand = random.randint(1, 10000000)
61         test_ou = "OU=test_getnc_unpriv%d" % rand
62         self.ou = "%s,%s" % (test_ou, self.base_dn)
63         self.ldb_dc1.add({
64             "dn": self.ou,
65             "objectclass": "organizationalUnit"})
66         self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
67                              userou=test_ou)
68         (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
69
70         self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
71         self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
72         user_sid = self.sd_utils.get_object_sid(self.user_dn)
73         self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
74                                                         str(user_sid))
75         self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
76                                                             str(user_sid))
77         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
78
79         # We set DONT_USE_KERBEROS to avoid a race with getting the
80         # user replicated to our selected KDC
81         self.user_creds = self.insta_creds(template=self.get_credentials(),
82                                            username=self.get_changes_user,
83                                            userpass=self.user_pass,
84                                            kerberos_state=DONT_USE_KERBEROS)
85         (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
86                                                               self.user_creds)
87
88     def tearDown(self):
89         self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
90         try:
91             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
92         except ldb.LdbError as e1:
93             (enum, string) = e1.args
94             if enum == ldb.ERR_NO_SUCH_OBJECT:
95                 pass
96         super(DrsReplicaSyncUnprivTestCase, self).tearDown()
97
98     def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None,
99                         partial_attribute_set=None):
100         """
101         Common function to send a replication request and check the result
102         matches what's expected.
103         """
104         req8 = self._exop_req8(dest_dsa=dest_dsa,
105                                invocation_id=self.ldb_dc1.get_invocation_id(),
106                                nc_dn_str=repl_obj,
107                                exop=exop,
108                                replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP,
109                                partial_attribute_set=partial_attribute_set)
110
111         if expected_error is None:
112             # user is OK, request should be accepted without throwing an error
113             (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
114                                                         8, req8)
115         else:
116             # check the request is rejected (with the error we're expecting)
117             try:
118                 (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
119                                                             8, req8)
120                 self.fail("Should have failed with user denied access")
121             except WERRORError as e:
122                 (enum, estr) = e.args
123                 self.assertTrue(enum in expected_error,
124                                 "Got unexpected error: %s" % estr)
125
126     def _test_repl_single_obj(self, repl_obj, expected_error,
127                               partial_attribute_set=None):
128         """
129         Checks that replication on a single object either succeeds or fails as
130         expected (based on the user's access rights)
131         """
132         self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
133                              repl_obj=repl_obj,
134                              expected_error=expected_error,
135                              partial_attribute_set=partial_attribute_set)
136
137     def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None):
138         """
139         Checks that REPL_SECRET on an object either succeeds or fails as
140         expected (based on the user's access rights)
141         """
142         self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
143                              repl_obj=repl_obj,
144                              expected_error=expected_error,
145                              dest_dsa=dest_dsa)
146
147     def _test_repl_full(self, expected_error, partial_attribute_set=None):
148         """
149         Checks that a full replication either succeeds or fails as expected
150         (based on the user's access rights)
151         """
152         self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
153                              repl_obj=self.ldb_dc1.get_default_basedn(),
154                              expected_error=expected_error,
155                              partial_attribute_set=partial_attribute_set)
156
157     def _test_repl_full_on_ou(self, repl_obj, expected_error):
158         """
159         Full replication on a specific OU should always fail (it should be done
160         against a base NC). The error may vary based on the user's access rights
161         """
162         # Just try against the OU created in the test setup
163         self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
164                              repl_obj=repl_obj,
165                              expected_error=expected_error)
166
167     def test_repl_getchanges_userpriv(self):
168         """
169         Tests various replication requests made by a user with only GET_CHANGES
170         rights. Some requests will be accepted, but most will be rejected.
171         """
172
173         # Assign the user GET_CHANGES rights
174         self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes)
175
176         self._test_repl_single_obj(repl_obj=self.ou,
177                                    expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
178         bad_ou = "OU=bad_obj,%s" % self.ou
179         self._test_repl_single_obj(repl_obj=bad_ou,
180                                    expected_error=[werror.WERR_DS_DRA_BAD_DN,
181                                                    werror.WERR_DS_DRA_ACCESS_DENIED])
182
183         self._test_repl_secret(repl_obj=self.ou,
184                                expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
185         self._test_repl_secret(repl_obj=self.user_dn,
186                                expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
187         self._test_repl_secret(repl_obj=self.user_dn,
188                                dest_dsa=self.ldb_dc1.get_ntds_GUID(),
189                                expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
190         self._test_repl_secret(repl_obj=bad_ou,
191                                expected_error=[werror.WERR_DS_DRA_BAD_DN])
192
193         self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
194         self._test_repl_full_on_ou(repl_obj=self.ou,
195                                    expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC,
196                                                    werror.WERR_DS_DRA_ACCESS_DENIED])
197         self._test_repl_full_on_ou(repl_obj=bad_ou,
198                                    expected_error=[werror.WERR_DS_DRA_BAD_NC,
199                                                    werror.WERR_DS_DRA_ACCESS_DENIED])
200
201         # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
202         # expect the following to succeed
203         self._test_repl_single_obj(repl_obj=self.ou,
204                                    expected_error=None,
205                                    partial_attribute_set=self.get_partial_attribute_set())
206         self._test_repl_full(expected_error=None,
207                              partial_attribute_set=self.get_partial_attribute_set())
208
209     def test_repl_getallchanges_userpriv(self):
210         """
211         Tests various replication requests made by a user with only
212         GET_ALL_CHANGES rights. Note that assigning these rights is possible,
213         but doesn't make a lot of sense. We test it anyway for consistency.
214         """
215
216         # Assign the user GET_ALL_CHANGES rights
217         self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes)
218
219         # We can expect to get the same responses as an unprivileged user,
220         # i.e. we have permission to see the results, but don't have permission
221         # to ask
222         self.test_repl_no_userpriv()
223
224     def test_repl_both_userpriv(self):
225         """
226         Tests various replication requests made by a privileged user (i.e. has
227         both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
228         to be accepted.
229         """
230
231         # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
232         both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes
233         self.sd_utils.dacl_add_ace(self.base_dn, both_rights)
234
235         self._test_repl_single_obj(repl_obj=self.ou,
236                                    expected_error=None)
237         bad_ou = "OU=bad_obj,%s" % self.ou
238         self._test_repl_single_obj(repl_obj=bad_ou,
239                                    expected_error=[werror.WERR_DS_DRA_BAD_DN])
240
241         # Microsoft returns DB_ERROR, Samba returns ACCESS_DENIED
242         self._test_repl_secret(repl_obj=self.ou,
243                                expected_error=[werror.WERR_DS_DRA_DB_ERROR,
244                                                werror.WERR_DS_DRA_ACCESS_DENIED])
245         self._test_repl_secret(repl_obj=self.user_dn,
246                                expected_error=[werror.WERR_DS_DRA_DB_ERROR,
247                                                werror.WERR_DS_DRA_ACCESS_DENIED])
248         # Note that Windows accepts this but Samba rejects it
249         self._test_repl_secret(repl_obj=self.user_dn,
250                                dest_dsa=self.ldb_dc1.get_ntds_GUID(),
251                                expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
252
253         self._test_repl_secret(repl_obj=bad_ou,
254                                expected_error=[werror.WERR_DS_DRA_BAD_DN])
255
256         self._test_repl_full(expected_error=None)
257         self._test_repl_full_on_ou(repl_obj=self.ou,
258                                    expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC])
259         self._test_repl_full_on_ou(repl_obj=bad_ou,
260                                    expected_error=[werror.WERR_DS_DRA_BAD_NC,
261                                                    werror.WERR_DS_DRA_BAD_DN])
262
263         self._test_repl_single_obj(repl_obj=self.ou,
264                                    expected_error=None,
265                                    partial_attribute_set=self.get_partial_attribute_set())
266         self._test_repl_full(expected_error=None,
267                              partial_attribute_set=self.get_partial_attribute_set())
268
269     def test_repl_no_userpriv(self):
270         """
271         Tests various replication requests made by a unprivileged user.
272         We expect all these requests to be rejected.
273         """
274
275         # Microsoft usually returns BAD_DN, Samba returns ACCESS_DENIED
276         usual_error = [werror.WERR_DS_DRA_BAD_DN, werror.WERR_DS_DRA_ACCESS_DENIED]
277
278         self._test_repl_single_obj(repl_obj=self.ou,
279                                    expected_error=usual_error)
280         bad_ou = "OU=bad_obj,%s" % self.ou
281         self._test_repl_single_obj(repl_obj=bad_ou,
282                                    expected_error=usual_error)
283
284         self._test_repl_secret(repl_obj=self.ou,
285                                expected_error=usual_error)
286         self._test_repl_secret(repl_obj=self.user_dn,
287                                expected_error=usual_error)
288         self._test_repl_secret(repl_obj=self.user_dn,
289                                dest_dsa=self.ldb_dc1.get_ntds_GUID(),
290                                expected_error=usual_error)
291         self._test_repl_secret(repl_obj=bad_ou,
292                                expected_error=usual_error)
293
294         self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
295         self._test_repl_full_on_ou(repl_obj=self.ou,
296                                    expected_error=usual_error)
297         self._test_repl_full_on_ou(repl_obj=bad_ou,
298                                    expected_error=[werror.WERR_DS_DRA_BAD_NC,
299                                                    werror.WERR_DS_DRA_ACCESS_DENIED])
300
301         self._test_repl_single_obj(repl_obj=self.ou,
302                                    expected_error=usual_error,
303                                    partial_attribute_set=self.get_partial_attribute_set())
304         self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED],
305                              partial_attribute_set=self.get_partial_attribute_set())
306
307