tests: Add test-case for 'group list --verbose'
[samba.git] / python / samba / tests / dsdb.py
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for samba.dsdb."""
19
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.tests import delete_force
25 from samba.ndr import ndr_unpack, ndr_pack
26 from samba.dcerpc import drsblobs, security, misc
27 from samba import dsdb
28 from samba import werror
29 import ldb
30 import samba
31 import uuid
32
33
34 class DsdbTests(TestCase):
35
36     def setUp(self):
37         super(DsdbTests, self).setUp()
38         self.lp = samba.tests.env_loadparm()
39         self.creds = Credentials()
40         self.creds.guess(self.lp)
41         self.session = system_session()
42         self.samdb = SamDB(session_info=self.session,
43                            credentials=self.creds,
44                            lp=self.lp)
45
46         # Create a test user
47         user_name = "dsdb-user-" + str(uuid.uuid4().hex[0:6])
48         user_pass = samba.generate_random_password(32, 32)
49         user_description = "Test user for dsdb test"
50
51         base_dn = self.samdb.domain_dn()
52
53         self.account_dn = "cn=" + user_name + ",cn=Users," + base_dn
54         self.samdb.newuser(username=user_name,
55                            password=user_pass,
56                            description=user_description)
57         # Cleanup (teardown)
58         self.addCleanup(delete_force, self.samdb, self.account_dn)
59
60     def test_get_oid_from_attrid(self):
61         oid = self.samdb.get_oid_from_attid(591614)
62         self.assertEquals(oid, "1.2.840.113556.1.4.1790")
63
64     def test_error_replpropertymetadata(self):
65         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
66                                 base=self.account_dn,
67                                 attrs=["replPropertyMetaData"])
68         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
69                           res[0]["replPropertyMetaData"][0])
70         ctr = repl.ctr
71         for o in ctr.array:
72             # Search for Description
73             if o.attid == 13:
74                 old_version = o.version
75                 o.version = o.version + 1
76         replBlob = ndr_pack(repl)
77         msg = ldb.Message()
78         msg.dn = res[0].dn
79         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
80         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
81
82     def test_error_replpropertymetadata_nochange(self):
83         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
84                                 base=self.account_dn,
85                                 attrs=["replPropertyMetaData"])
86         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
87                           res[0]["replPropertyMetaData"][0])
88         replBlob = ndr_pack(repl)
89         msg = ldb.Message()
90         msg.dn = res[0].dn
91         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
92         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
93
94     def test_error_replpropertymetadata_allow_sort(self):
95         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
96                                 base=self.account_dn,
97                                 attrs=["replPropertyMetaData"])
98         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
99                           res[0]["replPropertyMetaData"][0])
100         replBlob = ndr_pack(repl)
101         msg = ldb.Message()
102         msg.dn = res[0].dn
103         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
104         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
105
106     def test_twoatt_replpropertymetadata(self):
107         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
108                                 base=self.account_dn,
109                                 attrs=["replPropertyMetaData", "uSNChanged"])
110         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
111                           res[0]["replPropertyMetaData"][0])
112         ctr = repl.ctr
113         for o in ctr.array:
114             # Search for Description
115             if o.attid == 13:
116                 old_version = o.version
117                 o.version = o.version + 1
118                 o.local_usn = int(str(res[0]["uSNChanged"])) + 1
119         replBlob = ndr_pack(repl)
120         msg = ldb.Message()
121         msg.dn = res[0].dn
122         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
123         msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
124         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
125
126     def test_set_replpropertymetadata(self):
127         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
128                                 base=self.account_dn,
129                                 attrs=["replPropertyMetaData", "uSNChanged"])
130         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
131                           res[0]["replPropertyMetaData"][0])
132         ctr = repl.ctr
133         for o in ctr.array:
134             # Search for Description
135             if o.attid == 13:
136                 old_version = o.version
137                 o.version = o.version + 1
138                 o.local_usn = int(str(res[0]["uSNChanged"])) + 1
139                 o.originating_usn = int(str(res[0]["uSNChanged"])) + 1
140         replBlob = ndr_pack(repl)
141         msg = ldb.Message()
142         msg.dn = res[0].dn
143         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
144         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
145
146     def test_ok_get_attribute_from_attid(self):
147         self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
148
149     def test_ko_get_attribute_from_attid(self):
150         self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
151
152     def test_get_attribute_replmetadata_version(self):
153         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
154                                 base=self.account_dn,
155                                 attrs=["dn"])
156         self.assertEquals(len(res), 1)
157         dn = str(res[0].dn)
158         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2)
159
160     def test_set_attribute_replmetadata_version(self):
161         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
162                                 base=self.account_dn,
163                                 attrs=["dn"])
164         self.assertEquals(len(res), 1)
165         dn = str(res[0].dn)
166         version = self.samdb.get_attribute_replmetadata_version(dn, "description")
167         self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
168         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
169
170     def test_no_error_on_invalid_control(self):
171         try:
172             res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
173                                     base=self.account_dn,
174                                     attrs=["replPropertyMetaData"],
175                                     controls=["local_oid:%s:0"
176                                               % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
177         except ldb.LdbError as e:
178             self.fail("Should have not raised an exception")
179
180     def test_error_on_invalid_critical_control(self):
181         try:
182             res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
183                                     base=self.account_dn,
184                                     attrs=["replPropertyMetaData"],
185                                     controls=["local_oid:%s:1"
186                                               % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
187         except ldb.LdbError as e:
188             (errno, estr) = e.args
189             if errno != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
190                 self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
191                           % e[1])
192
193     # Allocate a unique RID for use in the objectSID tests.
194     #
195     def allocate_rid(self):
196         self.samdb.transaction_start()
197         try:
198             rid = self.samdb.allocate_rid()
199         except:
200             self.samdb.transaction_cancel()
201             raise
202         self.samdb.transaction_commit()
203         return str(rid)
204
205     # Ensure that duplicate objectSID's are permitted for foreign security
206     # principals.
207     #
208     def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self):
209
210         #
211         # We need to build a foreign security principal SID
212         # i.e a  SID not in the current domain.
213         #
214         dom_sid = self.samdb.get_domain_sid()
215         if str(dom_sid).endswith("0"):
216             c = "9"
217         else:
218             c = "0"
219         sid_str = str(dom_sid)[:-1] + c + "-1000"
220         sid     = ndr_pack(security.dom_sid(sid_str))
221         basedn  = self.samdb.get_default_basedn()
222         dn      = "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid_str, basedn)
223
224         #
225         # First without control
226         #
227
228         try:
229             self.samdb.add({
230                 "dn": dn,
231                 "objectClass": "foreignSecurityPrincipal"})
232             self.fail("No exception should get ERR_OBJECT_CLASS_VIOLATION")
233         except ldb.LdbError as e:
234             (code, msg) = e.args
235             self.assertEqual(code, ldb.ERR_OBJECT_CLASS_VIOLATION, str(e))
236             werr = "%08X" % werror.WERR_DS_MISSING_REQUIRED_ATT
237             self.assertTrue(werr in msg, msg)
238
239         try:
240             self.samdb.add({
241                 "dn": dn,
242                 "objectClass": "foreignSecurityPrincipal",
243                 "objectSid": sid})
244             self.fail("No exception should get ERR_UNWILLING_TO_PERFORM")
245         except ldb.LdbError as e:
246             (code, msg) = e.args
247             self.assertEqual(code, ldb.ERR_UNWILLING_TO_PERFORM, str(e))
248             werr = "%08X" % werror.WERR_DS_ILLEGAL_MOD_OPERATION
249             self.assertTrue(werr in msg, msg)
250
251         #
252         # We need to use the provision control
253         # in order to add foreignSecurityPrincipal
254         # objects
255         #
256
257         controls = ["provision:0"]
258         self.samdb.add({
259             "dn": dn,
260             "objectClass": "foreignSecurityPrincipal"},
261             controls=controls)
262
263         self.samdb.delete(dn)
264
265         try:
266             self.samdb.add({
267                 "dn": dn,
268                 "objectClass": "foreignSecurityPrincipal"},
269                 controls=controls)
270         except ldb.LdbError as e:
271             (code, msg) = e.args
272             self.fail("Got unexpected exception %d - %s "
273                       % (code, msg))
274
275         # cleanup
276         self.samdb.delete(dn)
277
278     def _test_foreignSecurityPrincipal(self, obj_class, fpo_attr):
279
280         dom_sid = self.samdb.get_domain_sid()
281         lsid_str = str(dom_sid) + "-4294967294"
282         bsid_str = "S-1-5-32-4294967294"
283         fsid_str = "S-1-5-4294967294"
284         basedn   = self.samdb.get_default_basedn()
285         cn       = "dsdb_test_fpo"
286         dn_str   = "cn=%s,cn=Users,%s" % (cn, basedn)
287         dn = ldb.Dn(self.samdb, dn_str)
288
289         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
290                                 base=basedn,
291                                 expression="(objectSid=%s)" % lsid_str,
292                                 attrs=[])
293         self.assertEqual(len(res), 0)
294         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
295                                 base=basedn,
296                                 expression="(objectSid=%s)" % bsid_str,
297                                 attrs=[])
298         self.assertEqual(len(res), 0)
299         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
300                                 base=basedn,
301                                 expression="(objectSid=%s)" % fsid_str,
302                                 attrs=[])
303         self.assertEqual(len(res), 0)
304
305         self.addCleanup(delete_force, self.samdb, dn_str)
306
307         self.samdb.add({
308             "dn": dn_str,
309             "objectClass": obj_class})
310
311         msg = ldb.Message()
312         msg.dn = dn
313         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % lsid_str,
314                                            ldb.FLAG_MOD_ADD,
315                                            fpo_attr)
316         try:
317             self.samdb.modify(msg)
318             self.fail("No exception should get LDB_ERR_UNWILLING_TO_PERFORM")
319         except ldb.LdbError as e:
320             (code, msg) = e.args
321             self.assertEqual(code, ldb.ERR_UNWILLING_TO_PERFORM, str(e))
322             werr = "%08X" % werror.WERR_DS_INVALID_GROUP_TYPE
323             self.assertTrue(werr in msg, msg)
324
325         msg = ldb.Message()
326         msg.dn = dn
327         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % bsid_str,
328                                            ldb.FLAG_MOD_ADD,
329                                            fpo_attr)
330         try:
331             self.samdb.modify(msg)
332             self.fail("No exception should get LDB_ERR_NO_SUCH_OBJECT")
333         except ldb.LdbError as e:
334             (code, msg) = e.args
335             self.assertEqual(code, ldb.ERR_NO_SUCH_OBJECT, str(e))
336             werr = "%08X" % werror.WERR_NO_SUCH_MEMBER
337             self.assertTrue(werr in msg, msg)
338
339         msg = ldb.Message()
340         msg.dn = dn
341         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % fsid_str,
342                                            ldb.FLAG_MOD_ADD,
343                                            fpo_attr)
344         try:
345             self.samdb.modify(msg)
346         except ldb.LdbError as e:
347             self.fail("Should have not raised an exception")
348
349         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
350                                 base=basedn,
351                                 expression="(objectSid=%s)" % fsid_str,
352                                 attrs=[])
353         self.assertEqual(len(res), 1)
354         self.samdb.delete(res[0].dn)
355         self.samdb.delete(dn)
356         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
357                                 base=basedn,
358                                 expression="(objectSid=%s)" % fsid_str,
359                                 attrs=[])
360         self.assertEqual(len(res), 0)
361
362     def test_foreignSecurityPrincipal_member(self):
363         return self._test_foreignSecurityPrincipal(
364                 "group", "member")
365
366     def test_foreignSecurityPrincipal_MembersForAzRole(self):
367         return self._test_foreignSecurityPrincipal(
368                 "msDS-AzRole", "msDS-MembersForAzRole")
369
370     def test_foreignSecurityPrincipal_NeverRevealGroup(self):
371         return self._test_foreignSecurityPrincipal(
372                 "computer", "msDS-NeverRevealGroup")
373
374     def test_foreignSecurityPrincipal_RevealOnDemandGroup(self):
375         return self._test_foreignSecurityPrincipal(
376                 "computer", "msDS-RevealOnDemandGroup")
377
378     def _test_fail_foreignSecurityPrincipal(self, obj_class, fpo_attr,
379                                             msg_exp, lerr_exp, werr_exp,
380                                             allow_reference=True):
381
382         dom_sid = self.samdb.get_domain_sid()
383         lsid_str = str(dom_sid) + "-4294967294"
384         bsid_str = "S-1-5-32-4294967294"
385         fsid_str = "S-1-5-4294967294"
386         basedn   = self.samdb.get_default_basedn()
387         cn1       = "dsdb_test_fpo1"
388         dn1_str   = "cn=%s,cn=Users,%s" % (cn1, basedn)
389         dn1 = ldb.Dn(self.samdb, dn1_str)
390         cn2       = "dsdb_test_fpo2"
391         dn2_str   = "cn=%s,cn=Users,%s" % (cn2, basedn)
392         dn2 = ldb.Dn(self.samdb, dn2_str)
393
394         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
395                                 base=basedn,
396                                 expression="(objectSid=%s)" % lsid_str,
397                                 attrs=[])
398         self.assertEqual(len(res), 0)
399         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
400                                 base=basedn,
401                                 expression="(objectSid=%s)" % bsid_str,
402                                 attrs=[])
403         self.assertEqual(len(res), 0)
404         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
405                                 base=basedn,
406                                 expression="(objectSid=%s)" % fsid_str,
407                                 attrs=[])
408         self.assertEqual(len(res), 0)
409
410         self.addCleanup(delete_force, self.samdb, dn1_str)
411         self.addCleanup(delete_force, self.samdb, dn2_str)
412
413         self.samdb.add({
414             "dn": dn1_str,
415             "objectClass": obj_class})
416
417         self.samdb.add({
418             "dn": dn2_str,
419             "objectClass": obj_class})
420
421         msg = ldb.Message()
422         msg.dn = dn1
423         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % lsid_str,
424                                            ldb.FLAG_MOD_ADD,
425                                            fpo_attr)
426         try:
427             self.samdb.modify(msg)
428             self.fail("No exception should get %s" % msg_exp)
429         except ldb.LdbError as e:
430             (code, msg) = e.args
431             self.assertEqual(code, lerr_exp, str(e))
432             werr = "%08X" % werr_exp
433             self.assertTrue(werr in msg, msg)
434
435         msg = ldb.Message()
436         msg.dn = dn1
437         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % bsid_str,
438                                            ldb.FLAG_MOD_ADD,
439                                            fpo_attr)
440         try:
441             self.samdb.modify(msg)
442             self.fail("No exception should get %s" % msg_exp)
443         except ldb.LdbError as e:
444             (code, msg) = e.args
445             self.assertEqual(code, lerr_exp, str(e))
446             werr = "%08X" % werr_exp
447             self.assertTrue(werr in msg, msg)
448
449         msg = ldb.Message()
450         msg.dn = dn1
451         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % fsid_str,
452                                            ldb.FLAG_MOD_ADD,
453                                            fpo_attr)
454         try:
455             self.samdb.modify(msg)
456             self.fail("No exception should get %s" % msg)
457         except ldb.LdbError as e:
458             (code, msg) = e.args
459             self.assertEqual(code, lerr_exp, str(e))
460             werr = "%08X" % werr_exp
461             self.assertTrue(werr in msg, msg)
462
463         msg = ldb.Message()
464         msg.dn = dn1
465         msg[fpo_attr] = ldb.MessageElement("%s" % dn2,
466                                            ldb.FLAG_MOD_ADD,
467                                            fpo_attr)
468         try:
469             self.samdb.modify(msg)
470             if not allow_reference:
471                 self.fail("No exception should get %s" % msg_exp)
472         except ldb.LdbError as e:
473             if allow_reference:
474                 self.fail("Should have not raised an exception: %s" % e)
475             (code, msg) = e.args
476             self.assertEqual(code, lerr_exp, str(e))
477             werr = "%08X" % werr_exp
478             self.assertTrue(werr in msg, msg)
479
480         self.samdb.delete(dn2)
481         self.samdb.delete(dn1)
482
483     def test_foreignSecurityPrincipal_NonMembers(self):
484         return self._test_fail_foreignSecurityPrincipal(
485                 "group", "msDS-NonMembers",
486                 "LDB_ERR_UNWILLING_TO_PERFORM/WERR_NOT_SUPPORTED",
487                 ldb.ERR_UNWILLING_TO_PERFORM, werror.WERR_NOT_SUPPORTED,
488                 allow_reference=False)
489
490     def test_foreignSecurityPrincipal_HostServiceAccount(self):
491         return self._test_fail_foreignSecurityPrincipal(
492                 "computer", "msDS-HostServiceAccount",
493                 "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID",
494                 ldb.ERR_CONSTRAINT_VIOLATION,
495                 werror.WERR_DS_NAME_REFERENCE_INVALID)
496
497     def test_foreignSecurityPrincipal_manager(self):
498         return self._test_fail_foreignSecurityPrincipal(
499                 "user", "manager",
500                 "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID",
501                 ldb.ERR_CONSTRAINT_VIOLATION,
502                 werror.WERR_DS_NAME_REFERENCE_INVALID)
503
504     #
505     # Duplicate objectSID's should not be permitted for sids in the local
506     # domain. The test sequence is add an object, delete it, then attempt to
507     # re-add it, this should fail with a constraint violation
508     #
509     def test_duplicate_objectSIDs_not_allowed_on_local_objects(self):
510
511         dom_sid = self.samdb.get_domain_sid()
512         rid     = self.allocate_rid()
513         sid_str = str(dom_sid) + "-" + rid
514         sid     = ndr_pack(security.dom_sid(sid_str))
515         basedn  = self.samdb.get_default_basedn()
516         cn       = "dsdb_test_01"
517         dn      = "cn=%s,cn=Users,%s" % (cn, basedn)
518
519         self.samdb.add({
520             "dn": dn,
521             "objectClass": "user",
522             "objectSID": sid})
523         self.samdb.delete(dn)
524
525         try:
526             self.samdb.add({
527                 "dn": dn,
528                 "objectClass": "user",
529                 "objectSID": sid})
530             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
531         except ldb.LdbError as e:
532             (code, msg) = e.args
533             if code != ldb.ERR_CONSTRAINT_VIOLATION:
534                 self.fail("Got %d - %s should have got "
535                           "LDB_ERR_CONSTRAINT_VIOLATION"
536                           % (code, msg))
537
538     def test_linked_vs_non_linked_reference(self):
539         basedn   = self.samdb.get_default_basedn()
540         kept_dn_str   = "cn=reference_kept,cn=Users,%s" % (basedn)
541         removed_dn_str   = "cn=reference_removed,cn=Users,%s" % (basedn)
542         dom_sid = self.samdb.get_domain_sid()
543         none_sid_str = str(dom_sid) + "-4294967294"
544         none_guid_str = "afafafaf-fafa-afaf-fafa-afafafafafaf"
545
546         self.addCleanup(delete_force, self.samdb, kept_dn_str)
547         self.addCleanup(delete_force, self.samdb, removed_dn_str)
548
549         self.samdb.add({
550             "dn": kept_dn_str,
551             "objectClass": "user"})
552         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
553                                 base=kept_dn_str,
554                                 attrs=["objectGUID", "objectSID"])
555         self.assertEqual(len(res), 1)
556         kept_guid = ndr_unpack(misc.GUID, res[0]["objectGUID"][0])
557         kept_sid = ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
558         kept_dn = res[0].dn
559
560         self.samdb.add({
561             "dn": removed_dn_str,
562             "objectClass": "user"})
563         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
564                                 base=removed_dn_str,
565                                 attrs=["objectGUID", "objectSID"])
566         self.assertEqual(len(res), 1)
567         removed_guid = ndr_unpack(misc.GUID, res[0]["objectGUID"][0])
568         removed_sid = ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
569         self.samdb.delete(removed_dn_str)
570
571         #
572         # First try the linked attribute 'manager'
573         # by GUID and SID
574         #
575
576         msg = ldb.Message()
577         msg.dn = kept_dn
578         msg["manager"] = ldb.MessageElement("<SID=%s>" % removed_sid,
579                                             ldb.FLAG_MOD_ADD,
580                                             "manager")
581         try:
582             self.samdb.modify(msg)
583             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
584         except ldb.LdbError as e:
585             (code, msg) = e.args
586             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
587             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
588             self.assertTrue(werr in msg, msg)
589
590         msg = ldb.Message()
591         msg.dn = kept_dn
592         msg["manager"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
593                                             ldb.FLAG_MOD_ADD,
594                                             "manager")
595         try:
596             self.samdb.modify(msg)
597             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
598         except ldb.LdbError as e:
599             (code, msg) = e.args
600             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
601             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
602             self.assertTrue(werr in msg, msg)
603
604         #
605         # Try the non-linked attribute 'assistant'
606         # by GUID and SID, which should work.
607         #
608         msg = ldb.Message()
609         msg.dn = kept_dn
610         msg["assistant"] = ldb.MessageElement("<SID=%s>" % removed_sid,
611                                               ldb.FLAG_MOD_ADD,
612                                               "assistant")
613         self.samdb.modify(msg)
614         msg = ldb.Message()
615         msg.dn = kept_dn
616         msg["assistant"] = ldb.MessageElement("<SID=%s>" % removed_sid,
617                                               ldb.FLAG_MOD_DELETE,
618                                               "assistant")
619         self.samdb.modify(msg)
620
621         msg = ldb.Message()
622         msg.dn = kept_dn
623         msg["assistant"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
624                                               ldb.FLAG_MOD_ADD,
625                                               "assistant")
626         self.samdb.modify(msg)
627         msg = ldb.Message()
628         msg.dn = kept_dn
629         msg["assistant"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
630                                               ldb.FLAG_MOD_DELETE,
631                                               "assistant")
632         self.samdb.modify(msg)
633
634         #
635         # Finally ry the non-linked attribute 'assistant'
636         # but with non existing GUID, SID, DN
637         #
638         msg = ldb.Message()
639         msg.dn = kept_dn
640         msg["assistant"] = ldb.MessageElement("CN=NoneNone,%s" % (basedn),
641                                               ldb.FLAG_MOD_ADD,
642                                               "assistant")
643         try:
644             self.samdb.modify(msg)
645             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
646         except ldb.LdbError as e:
647             (code, msg) = e.args
648             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
649             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
650             self.assertTrue(werr in msg, msg)
651
652         msg = ldb.Message()
653         msg.dn = kept_dn
654         msg["assistant"] = ldb.MessageElement("<SID=%s>" % none_sid_str,
655                                               ldb.FLAG_MOD_ADD,
656                                               "assistant")
657         try:
658             self.samdb.modify(msg)
659             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
660         except ldb.LdbError as e:
661             (code, msg) = e.args
662             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
663             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
664             self.assertTrue(werr in msg, msg)
665
666         msg = ldb.Message()
667         msg.dn = kept_dn
668         msg["assistant"] = ldb.MessageElement("<GUID=%s>" % none_guid_str,
669                                               ldb.FLAG_MOD_ADD,
670                                               "assistant")
671         try:
672             self.samdb.modify(msg)
673             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
674         except ldb.LdbError as e:
675             (code, msg) = e.args
676             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
677             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
678             self.assertTrue(werr in msg, msg)
679
680         self.samdb.delete(kept_dn)
681
682     def test_normalize_dn_in_domain_full(self):
683         domain_dn = self.samdb.domain_dn()
684
685         part_dn = ldb.Dn(self.samdb, "CN=Users")
686
687         full_dn = part_dn
688         full_dn.add_base(domain_dn)
689
690         full_str = str(full_dn)
691
692         # That is, no change
693         self.assertEqual(full_dn,
694                          self.samdb.normalize_dn_in_domain(full_str))
695
696     def test_normalize_dn_in_domain_part(self):
697         domain_dn = self.samdb.domain_dn()
698
699         part_str = "CN=Users"
700
701         full_dn = ldb.Dn(self.samdb, part_str)
702         full_dn.add_base(domain_dn)
703
704         # That is, the domain DN appended
705         self.assertEqual(full_dn,
706                          self.samdb.normalize_dn_in_domain(part_str))
707
708     def test_normalize_dn_in_domain_full_dn(self):
709         domain_dn = self.samdb.domain_dn()
710
711         part_dn = ldb.Dn(self.samdb, "CN=Users")
712
713         full_dn = part_dn
714         full_dn.add_base(domain_dn)
715
716         # That is, no change
717         self.assertEqual(full_dn,
718                          self.samdb.normalize_dn_in_domain(full_dn))
719
720     def test_normalize_dn_in_domain_part_dn(self):
721         domain_dn = self.samdb.domain_dn()
722
723         part_dn = ldb.Dn(self.samdb, "CN=Users")
724
725         # That is, the domain DN appended
726         self.assertEqual(ldb.Dn(self.samdb,
727                                 str(part_dn) + "," + str(domain_dn)),
728                          self.samdb.normalize_dn_in_domain(part_dn))
729
730
731 class DsdbFullScanTests(TestCase):
732
733     def setUp(self):
734         super(DsdbFullScanTests, self).setUp()
735         self.lp = samba.tests.env_loadparm()
736         self.creds = Credentials()
737         self.creds.guess(self.lp)
738         self.session = system_session()
739
740     def test_sam_ldb_open_no_full_scan(self):
741         try:
742             self.samdb = SamDB(session_info=self.session,
743                                credentials=self.creds,
744                                lp=self.lp,
745                                options=["disable_full_db_scan_for_self_test:1"])
746         except ldb.LdbError as err:
747             estr = err.args[1]
748             self.fail("sam.ldb required a full scan to start up")