a65dd13d99e67153f49f00e3bc7a5a85c7e12eeb
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / getnc_unpriv.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests replication scenarios with different user privileges.
5 # We want to test every replication scenario we can think of against:
6 # - users with only GET_CHANGES privileges
7 # - users with only GET_ALL_CHANGES privileges
8 # - users with both GET_CHANGES and GET_ALL_CHANGES privileges
9 # - users with no privileges
10 #
11 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
12 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
13 #
14 # This program is free software; you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation; either version 3 of the License, or
17 # (at your option) any later version.
18 #
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 # GNU General Public License for more details.
23 #
24 # You should have received a copy of the GNU General Public License
25 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
26 #
27
28 #
29 # Usage:
30 #  export DC1=dc1_dns_name
31 #  export DC2=dc2_dns_name
32 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
33 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_unpriv -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
34 #
35
36 import drs_base
37 import samba.tests
38 from samba import werror, WERRORError
39
40 from samba import sd_utils
41 import ldb
42 from ldb import SCOPE_BASE
43
44 from samba.dcerpc import drsuapi, security
45 from samba.credentials import DONT_USE_KERBEROS
46
47 class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
48     """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
49
50     def setUp(self):
51         super(DrsReplicaSyncUnprivTestCase, self).setUp()
52         self.get_changes_user = "get-changes-user"
53         self.base_dn = self.ldb_dc1.get_default_basedn()
54         self.ou = "OU=test_getncchanges,%s" % self.base_dn
55         self.user_pass = samba.generate_random_password(12, 16)
56         self.ldb_dc1.add({
57             "dn": self.ou,
58             "objectclass": "organizationalUnit"})
59         self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
60                              userou="OU=test_getncchanges")
61         (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
62
63         self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
64         self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
65         user_sid = self.sd_utils.get_object_sid(self.user_dn)
66         self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
67                                                         str(user_sid))
68         self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
69                                                             str(user_sid))
70         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
71
72         # We set DONT_USE_KERBEROS to avoid a race with getting the
73         # user replicated to our selected KDC
74         self.user_creds = self.insta_creds(template=self.get_credentials(),
75                                            username=self.get_changes_user,
76                                            userpass=self.user_pass,
77                                            kerberos_state=DONT_USE_KERBEROS)
78         (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
79                                                               self.user_creds)
80
81     def tearDown(self):
82         self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
83         try:
84             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
85         except ldb.LdbError as (enum, string):
86             if enum == ldb.ERR_NO_SUCH_OBJECT:
87                 pass
88         super(DrsReplicaSyncUnprivTestCase, self).tearDown()
89
90     def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None,
91                         partial_attribute_set=None):
92         """
93         Common function to send a replication request and check the result
94         matches what's expected.
95         """
96         req8 = self._exop_req8(dest_dsa=dest_dsa,
97                                invocation_id=self.ldb_dc1.get_invocation_id(),
98                                nc_dn_str=repl_obj,
99                                exop=exop,
100                                replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP,
101                                partial_attribute_set=partial_attribute_set)
102
103         if expected_error is None:
104             # user is OK, request should be accepted without throwing an error
105             (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
106                                                         8, req8)
107         else:
108             # check the request is rejected (with the error we're expecting)
109             try:
110                 (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
111                                                             8, req8)
112                 self.fail("Should have failed with user denied access")
113             except WERRORError as (enum, estr):
114                 self.assertTrue(enum in expected_error,
115                                 "Got unexpected error: %s" % estr)
116
117     def _test_repl_single_obj(self, repl_obj, expected_error,
118                               partial_attribute_set=None):
119         """
120         Checks that replication on a single object either succeeds or fails as
121         expected (based on the user's access rights)
122         """
123         self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
124                              repl_obj=repl_obj,
125                              expected_error=expected_error,
126                              partial_attribute_set=partial_attribute_set)
127
128     def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None):
129         """
130         Checks that REPL_SECRET on an object either succeeds or fails as
131         expected (based on the user's access rights)
132         """
133         self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
134                              repl_obj=repl_obj,
135                              expected_error=expected_error,
136                              dest_dsa=dest_dsa)
137
138     def _test_repl_full(self, expected_error, partial_attribute_set=None):
139         """
140         Checks that a full replication either succeeds or fails as expected
141         (based on the user's access rights)
142         """
143         self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
144                              repl_obj=self.ldb_dc1.get_default_basedn(),
145                              expected_error=expected_error,
146                              partial_attribute_set=partial_attribute_set)
147
148     def _test_repl_full_on_ou(self, expected_error):
149         """
150         Full replication on a specific OU should always fail (it should be done
151         against a base NC). The error may vary based on the user's access rights
152         """
153         # Just try against the OU created in the test setup
154         self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
155                              repl_obj=self.ou,
156                              expected_error=expected_error)
157
158     def test_repl_getchanges_userpriv(self):
159         """
160         Tests various replication requests made by a user with only GET_CHANGES
161         rights. Some requests will be accepted, but most will be rejected.
162         """
163
164         # Assign the user GET_CHANGES rights
165         self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes)
166
167         self._test_repl_single_obj(repl_obj=self.ou,
168                                    expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
169
170         self._test_repl_secret(repl_obj=self.ou,
171                                expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
172         self._test_repl_secret(repl_obj=self.user_dn,
173                                expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
174         self._test_repl_secret(repl_obj=self.user_dn,
175                                dest_dsa=self.ldb_dc1.get_ntds_GUID(),
176                                expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
177
178         self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
179         self._test_repl_full_on_ou(expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC,
180                                                    werror.WERR_DS_DRA_ACCESS_DENIED])
181
182         # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
183         # expect the following to succeed
184         self._test_repl_single_obj(repl_obj=self.ou,
185                                    expected_error=None,
186                                    partial_attribute_set=self.get_partial_attribute_set())
187         self._test_repl_full(expected_error=None,
188                              partial_attribute_set=self.get_partial_attribute_set())
189
190     def test_repl_getallchanges_userpriv(self):
191         """
192         Tests various replication requests made by a user with only
193         GET_ALL_CHANGES rights. Note that assigning these rights is possible,
194         but doesn't make a lot of sense. We test it anyway for consistency.
195         """
196
197         # Assign the user GET_ALL_CHANGES rights
198         self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes)
199
200         # We can expect to get the same responses as an unprivileged user,
201         # i.e. we have permission to see the results, but don't have permission
202         # to ask
203         self.test_repl_no_userpriv()
204
205     def test_repl_both_userpriv(self):
206         """
207         Tests various replication requests made by a privileged user (i.e. has
208         both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
209         to be accepted.
210         """
211
212         # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
213         both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes
214         self.sd_utils.dacl_add_ace(self.base_dn, both_rights)
215
216         self._test_repl_single_obj(repl_obj=self.ou,
217                                    expected_error=None)
218
219         # Microsoft returns DB_ERROR, Samba returns ACCESS_DENIED
220         self._test_repl_secret(repl_obj=self.ou,
221                                expected_error=[werror.WERR_DS_DRA_DB_ERROR,
222                                                werror.WERR_DS_DRA_ACCESS_DENIED])
223         self._test_repl_secret(repl_obj=self.user_dn,
224                                expected_error=[werror.WERR_DS_DRA_DB_ERROR,
225                                                werror.WERR_DS_DRA_ACCESS_DENIED])
226         # Note that Windows accepts this but Samba rejects it
227         self._test_repl_secret(repl_obj=self.user_dn,
228                                dest_dsa=self.ldb_dc1.get_ntds_GUID(),
229                                expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
230
231         self._test_repl_full(expected_error=None)
232         self._test_repl_full_on_ou(expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC])
233
234         self._test_repl_single_obj(repl_obj=self.ou,
235                                    expected_error=None,
236                                    partial_attribute_set=self.get_partial_attribute_set())
237         self._test_repl_full(expected_error=None,
238                              partial_attribute_set=self.get_partial_attribute_set())
239
240     def test_repl_no_userpriv(self):
241         """
242         Tests various replication requests made by a unprivileged user.
243         We expect all these requests to be rejected.
244         """
245
246         # Microsoft usually returns BAD_DN, Samba returns ACCESS_DENIED
247         usual_error = [werror.WERR_DS_DRA_BAD_DN, werror.WERR_DS_DRA_ACCESS_DENIED]
248
249         self._test_repl_single_obj(repl_obj=self.ou,
250                                    expected_error=usual_error)
251
252         self._test_repl_secret(repl_obj=self.ou,
253                                expected_error=usual_error)
254         self._test_repl_secret(repl_obj=self.user_dn,
255                                expected_error=usual_error)
256         self._test_repl_secret(repl_obj=self.user_dn,
257                                dest_dsa=self.ldb_dc1.get_ntds_GUID(),
258                                expected_error=usual_error)
259
260         self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
261         self._test_repl_full_on_ou(expected_error=usual_error)
262
263         self._test_repl_single_obj(repl_obj=self.ou,
264                                    expected_error=usual_error,
265                                    partial_attribute_set=self.get_partial_attribute_set())
266         self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED],
267                              partial_attribute_set=self.get_partial_attribute_set())
268
269