s4-drsuapi: Refuse to replicate an NC is that not actually an NC
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / getnc_unpriv.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests replication scenarios with different user privileges
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 #
24 # Usage:
25 #  export DC1=dc1_dns_name
26 #  export DC2=dc2_dns_name
27 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
28 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_unpriv -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
29 #
30
31 import drs_base
32 import samba.tests
33
34 from samba import sd_utils
35 import ldb
36 from ldb import SCOPE_BASE
37
38 from samba.dcerpc import drsuapi
39 from samba.credentials import DONT_USE_KERBEROS
40
41 class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
42     """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
43
44     def setUp(self):
45         super(DrsReplicaSyncUnprivTestCase, self).setUp()
46         self.get_changes_user = "get-changes-user"
47         self.base_dn = self.ldb_dc1.get_default_basedn()
48         self.ou = "OU=test_getncchanges,%s" % self.base_dn
49         self.user_pass = samba.generate_random_password(12, 16)
50         self.ldb_dc1.add({
51             "dn": self.ou,
52             "objectclass": "organizationalUnit"})
53         self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
54                              userou="OU=test_getncchanges")
55         (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
56
57         self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
58         user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
59         user_sid = self.sd_utils.get_object_sid(user_dn)
60         mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
61         self.sd_utils.dacl_add_ace(self.base_dn, mod)
62
63         # We set DONT_USE_KERBEROS to avoid a race with getting the
64         # user replicated to our selected KDC
65         self.user_creds = self.insta_creds(template=self.get_credentials(),
66                                            username=self.get_changes_user,
67                                            userpass=self.user_pass,
68                                            kerberos_state=DONT_USE_KERBEROS)
69         (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
70                                                               self.user_creds)
71
72     def tearDown(self):
73         try:
74             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
75         except ldb.LdbError as (enum, string):
76             if enum == ldb.ERR_NO_SUCH_OBJECT:
77                 pass
78         super(DrsReplicaSyncUnprivTestCase, self).tearDown()
79
80     def test_do_single_repl(self):
81         """
82         Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
83         user with the correct GET_CHANGES rights
84         """
85
86         ou1 = "OU=single_obj,%s" % self.ou
87         self.ldb_dc1.add({
88             "dn": ou1,
89             "objectclass": "organizationalUnit"
90             })
91         req8 = self._exop_req8(dest_dsa=None,
92                                invocation_id=self.ldb_dc1.get_invocation_id(),
93                                nc_dn_str=ou1,
94                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
95                                replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
96         (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
97         self._check_ctr6(ctr, [ou1])
98
99     def test_do_full_repl(self):
100         """
101         Make sure that full replication works as a less-privileged
102         user with the correct GET_CHANGES rights
103         """
104
105         req8 = self._exop_req8(dest_dsa=None,
106                                invocation_id=self.ldb_dc1.get_invocation_id(),
107                                nc_dn_str=self.ldb_dc1.get_default_basedn(),
108                                exop=drsuapi.DRSUAPI_EXOP_NONE,
109                                replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
110         (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
111         self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)