34a9435ea445a7d0e24c2e230bced757285663e4
[samba.git] / python / samba / tests / dsdb.py
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for samba.dsdb."""
19
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.tests import delete_force
25 from samba.ndr import ndr_unpack, ndr_pack
26 from samba.dcerpc import drsblobs, security
27 from samba import dsdb
28 import ldb
29 import samba
30
31 class DsdbTests(TestCase):
32
33     def setUp(self):
34         super(DsdbTests, self).setUp()
35         self.lp = samba.tests.env_loadparm()
36         self.creds = Credentials()
37         self.creds.guess(self.lp)
38         self.session = system_session()
39         self.samdb = SamDB(session_info=self.session,
40                            credentials=self.creds,
41                            lp=self.lp)
42
43         # Create a test user
44         user_name = "samdb-testuser"
45         user_pass = samba.generate_random_password(32, 32)
46         user_description = "Test user for dsdb test"
47
48         base_dn = self.samdb.domain_dn()
49
50         self.account_dn = "cn=" + user_name + ",cn=Users," + base_dn
51         delete_force(self.samdb, self.account_dn)
52         self.samdb.newuser(username=user_name,
53                            password=user_pass,
54                            description=user_description)
55
56     def test_get_oid_from_attrid(self):
57         oid = self.samdb.get_oid_from_attid(591614)
58         self.assertEquals(oid, "1.2.840.113556.1.4.1790")
59
60     def test_error_replpropertymetadata(self):
61         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
62                                 base=self.account_dn,
63                                 attrs=["replPropertyMetaData"])
64         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
65                             str(res[0]["replPropertyMetaData"]))
66         ctr = repl.ctr
67         for o in ctr.array:
68             # Search for Description
69             if o.attid == 13:
70                 old_version = o.version
71                 o.version = o.version + 1
72         replBlob = ndr_pack(repl)
73         msg = ldb.Message()
74         msg.dn = res[0].dn
75         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
76         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
77
78     def test_error_replpropertymetadata_nochange(self):
79         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
80                                 base=self.account_dn,
81                                 attrs=["replPropertyMetaData"])
82         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
83                             str(res[0]["replPropertyMetaData"]))
84         replBlob = ndr_pack(repl)
85         msg = ldb.Message()
86         msg.dn = res[0].dn
87         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
88         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
89
90     def test_error_replpropertymetadata_allow_sort(self):
91         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
92                                 base=self.account_dn,
93                                 attrs=["replPropertyMetaData"])
94         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
95                             str(res[0]["replPropertyMetaData"]))
96         replBlob = ndr_pack(repl)
97         msg = ldb.Message()
98         msg.dn = res[0].dn
99         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
100         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
101
102     def test_twoatt_replpropertymetadata(self):
103         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
104                                 base=self.account_dn,
105                                 attrs=["replPropertyMetaData", "uSNChanged"])
106         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
107                             str(res[0]["replPropertyMetaData"]))
108         ctr = repl.ctr
109         for o in ctr.array:
110             # Search for Description
111             if o.attid == 13:
112                 old_version = o.version
113                 o.version = o.version + 1
114                 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
115         replBlob = ndr_pack(repl)
116         msg = ldb.Message()
117         msg.dn = res[0].dn
118         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
119         msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
120         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
121
122     def test_set_replpropertymetadata(self):
123         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
124                                 base=self.account_dn,
125                                 attrs=["replPropertyMetaData", "uSNChanged"])
126         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
127                             str(res[0]["replPropertyMetaData"]))
128         ctr = repl.ctr
129         for o in ctr.array:
130             # Search for Description
131             if o.attid == 13:
132                 old_version = o.version
133                 o.version = o.version + 1
134                 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
135                 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1
136         replBlob = ndr_pack(repl)
137         msg = ldb.Message()
138         msg.dn = res[0].dn
139         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
140         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
141
142     def test_ok_get_attribute_from_attid(self):
143         self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
144
145     def test_ko_get_attribute_from_attid(self):
146         self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
147
148     def test_get_attribute_replmetadata_version(self):
149         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
150                                 base=self.account_dn,
151                                 attrs=["dn"])
152         self.assertEquals(len(res), 1)
153         dn = str(res[0].dn)
154         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2)
155
156     def test_set_attribute_replmetadata_version(self):
157         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
158                                 base=self.account_dn,
159                                 attrs=["dn"])
160         self.assertEquals(len(res), 1)
161         dn = str(res[0].dn)
162         version = self.samdb.get_attribute_replmetadata_version(dn, "description")
163         self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
164         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
165
166     def test_no_error_on_invalid_control(self):
167         try:
168             res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
169                                     base=self.account_dn,
170                                     attrs=["replPropertyMetaData"],
171                                     controls=["local_oid:%s:0"
172                                               % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
173         except ldb.LdbError as e:
174             self.fail("Should have not raised an exception")
175
176     def test_error_on_invalid_critical_control(self):
177         try:
178             res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
179                                     base=self.account_dn,
180                                     attrs=["replPropertyMetaData"],
181                                     controls=["local_oid:%s:1"
182                                               % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
183         except ldb.LdbError as e:
184             if e[0] != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
185                 self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
186                           % e[1])
187
188     # Allocate a unique RID for use in the objectSID tests.
189     #
190     def allocate_rid(self):
191         self.samdb.transaction_start()
192         try:
193             rid = self.samdb.allocate_rid()
194         except:
195             self.samdb.transaction_cancel()
196             raise
197         self.samdb.transaction_commit()
198         return str(rid)
199
200     # Ensure that duplicate objectSID's are permitted for foreign security
201     # principals.
202     #
203     def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self):
204
205         #
206         # We need to build a foreign security principal SID
207         # i.e a  SID not in the current domain.
208         #
209         dom_sid = self.samdb.get_domain_sid()
210         if str(dom_sid)[:-1] == "0":
211             c = "9"
212         else:
213             c = "0"
214         sid     = str(dom_sid)[:-1] + c + "-1000"
215         basedn  = self.samdb.get_default_basedn()
216         dn      = "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid, basedn)
217         self.samdb.add({
218             "dn": dn,
219             "objectClass": "foreignSecurityPrincipal"})
220
221         self.samdb.delete(dn)
222
223         try:
224             self.samdb.add({
225                 "dn": dn,
226                 "objectClass": "foreignSecurityPrincipal"})
227         except ldb.LdbError as e:
228             (code, msg) = e
229             self.fail("Got unexpected exception %d - %s "
230                       % (code, msg))
231
232     #
233     # Duplicate objectSID's should not be permitted for sids in the local
234     # domain. The test sequence is add an object, delete it, then attempt to
235     # re-add it, this should fail with a constraint violation
236     #
237     def test_duplicate_objectSIDs_not_allowed_on_local_objects(self):
238
239         dom_sid = self.samdb.get_domain_sid()
240         rid     = self.allocate_rid()
241         sid_str = str(dom_sid) + "-" + rid
242         sid     = ndr_pack(security.dom_sid(sid_str))
243         basedn  = self.samdb.get_default_basedn()
244         cn       = "dsdb_test_01"
245         dn      = "cn=%s,cn=Users,%s" % (cn, basedn)
246
247         self.samdb.add({
248             "dn": dn,
249             "objectClass": "user",
250             "objectSID": sid})
251         self.samdb.delete(dn)
252
253         try:
254             self.samdb.add({
255                 "dn": dn,
256                 "objectClass": "user",
257                 "objectSID": sid})
258             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
259         except ldb.LdbError as e:
260             (code, msg) = e
261             if code != ldb.ERR_CONSTRAINT_VIOLATION:
262                 self.fail("Got %d - %s should have got "
263                           "LDB_ERR_CONSTRAINT_VIOLATION"
264                           % (code, msg))