tests/dsdb.py: test creation of foreignSecurityPrincipal via 'attr: <SID=...>'
[samba.git] / python / samba / tests / dsdb.py
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for samba.dsdb."""
19
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.tests import delete_force
25 from samba.ndr import ndr_unpack, ndr_pack
26 from samba.dcerpc import drsblobs, security, misc
27 from samba import dsdb
28 from samba import werror
29 import ldb
30 import samba
31 import uuid
32
33 class DsdbTests(TestCase):
34
35     def setUp(self):
36         super(DsdbTests, self).setUp()
37         self.lp = samba.tests.env_loadparm()
38         self.creds = Credentials()
39         self.creds.guess(self.lp)
40         self.session = system_session()
41         self.samdb = SamDB(session_info=self.session,
42                            credentials=self.creds,
43                            lp=self.lp)
44
45         # Create a test user
46         user_name = "dsdb-user-" + str(uuid.uuid4().hex[0:6])
47         user_pass = samba.generate_random_password(32, 32)
48         user_description = "Test user for dsdb test"
49
50         base_dn = self.samdb.domain_dn()
51
52         self.account_dn = "cn=" + user_name + ",cn=Users," + base_dn
53         self.samdb.newuser(username=user_name,
54                            password=user_pass,
55                            description=user_description)
56         # Cleanup (teardown)
57         self.addCleanup(delete_force, self.samdb, self.account_dn)
58
59     def test_get_oid_from_attrid(self):
60         oid = self.samdb.get_oid_from_attid(591614)
61         self.assertEquals(oid, "1.2.840.113556.1.4.1790")
62
63     def test_error_replpropertymetadata(self):
64         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
65                                 base=self.account_dn,
66                                 attrs=["replPropertyMetaData"])
67         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
68                           res[0]["replPropertyMetaData"][0])
69         ctr = repl.ctr
70         for o in ctr.array:
71             # Search for Description
72             if o.attid == 13:
73                 old_version = o.version
74                 o.version = o.version + 1
75         replBlob = ndr_pack(repl)
76         msg = ldb.Message()
77         msg.dn = res[0].dn
78         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
79         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
80
81     def test_error_replpropertymetadata_nochange(self):
82         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
83                                 base=self.account_dn,
84                                 attrs=["replPropertyMetaData"])
85         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
86                           res[0]["replPropertyMetaData"][0])
87         replBlob = ndr_pack(repl)
88         msg = ldb.Message()
89         msg.dn = res[0].dn
90         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
91         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
92
93     def test_error_replpropertymetadata_allow_sort(self):
94         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
95                                 base=self.account_dn,
96                                 attrs=["replPropertyMetaData"])
97         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
98                           res[0]["replPropertyMetaData"][0])
99         replBlob = ndr_pack(repl)
100         msg = ldb.Message()
101         msg.dn = res[0].dn
102         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
103         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
104
105     def test_twoatt_replpropertymetadata(self):
106         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
107                                 base=self.account_dn,
108                                 attrs=["replPropertyMetaData", "uSNChanged"])
109         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
110                           res[0]["replPropertyMetaData"][0])
111         ctr = repl.ctr
112         for o in ctr.array:
113             # Search for Description
114             if o.attid == 13:
115                 old_version = o.version
116                 o.version = o.version + 1
117                 o.local_usn = int(str(res[0]["uSNChanged"])) + 1
118         replBlob = ndr_pack(repl)
119         msg = ldb.Message()
120         msg.dn = res[0].dn
121         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
122         msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
123         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
124
125     def test_set_replpropertymetadata(self):
126         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
127                                 base=self.account_dn,
128                                 attrs=["replPropertyMetaData", "uSNChanged"])
129         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
130                           res[0]["replPropertyMetaData"][0])
131         ctr = repl.ctr
132         for o in ctr.array:
133             # Search for Description
134             if o.attid == 13:
135                 old_version = o.version
136                 o.version = o.version + 1
137                 o.local_usn = int(str(res[0]["uSNChanged"])) + 1
138                 o.originating_usn = int(str(res[0]["uSNChanged"])) + 1
139         replBlob = ndr_pack(repl)
140         msg = ldb.Message()
141         msg.dn = res[0].dn
142         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
143         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
144
145     def test_ok_get_attribute_from_attid(self):
146         self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
147
148     def test_ko_get_attribute_from_attid(self):
149         self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
150
151     def test_get_attribute_replmetadata_version(self):
152         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
153                                 base=self.account_dn,
154                                 attrs=["dn"])
155         self.assertEquals(len(res), 1)
156         dn = str(res[0].dn)
157         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2)
158
159     def test_set_attribute_replmetadata_version(self):
160         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
161                                 base=self.account_dn,
162                                 attrs=["dn"])
163         self.assertEquals(len(res), 1)
164         dn = str(res[0].dn)
165         version = self.samdb.get_attribute_replmetadata_version(dn, "description")
166         self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
167         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
168
169     def test_no_error_on_invalid_control(self):
170         try:
171             res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
172                                     base=self.account_dn,
173                                     attrs=["replPropertyMetaData"],
174                                     controls=["local_oid:%s:0"
175                                               % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
176         except ldb.LdbError as e:
177             self.fail("Should have not raised an exception")
178
179     def test_error_on_invalid_critical_control(self):
180         try:
181             res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
182                                     base=self.account_dn,
183                                     attrs=["replPropertyMetaData"],
184                                     controls=["local_oid:%s:1"
185                                               % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
186         except ldb.LdbError as e:
187             (errno, estr) = e.args
188             if errno != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
189                 self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
190                           % e[1])
191
192     # Allocate a unique RID for use in the objectSID tests.
193     #
194     def allocate_rid(self):
195         self.samdb.transaction_start()
196         try:
197             rid = self.samdb.allocate_rid()
198         except:
199             self.samdb.transaction_cancel()
200             raise
201         self.samdb.transaction_commit()
202         return str(rid)
203
204     # Ensure that duplicate objectSID's are permitted for foreign security
205     # principals.
206     #
207     def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self):
208
209         #
210         # We need to build a foreign security principal SID
211         # i.e a  SID not in the current domain.
212         #
213         dom_sid = self.samdb.get_domain_sid()
214         if str(dom_sid).endswith("0"):
215             c = "9"
216         else:
217             c = "0"
218         sid_str = str(dom_sid)[:-1] + c + "-1000"
219         sid     = ndr_pack(security.dom_sid(sid_str))
220         basedn  = self.samdb.get_default_basedn()
221         dn      = "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid_str, basedn)
222
223         #
224         # First without control
225         #
226
227         try:
228             self.samdb.add({
229                 "dn": dn,
230                 "objectClass": "foreignSecurityPrincipal"})
231             self.fail("No exception should get ERR_OBJECT_CLASS_VIOLATION")
232         except ldb.LdbError as e:
233             (code, msg) = e.args
234             self.assertEqual(code, ldb.ERR_OBJECT_CLASS_VIOLATION, str(e))
235             werr = "%08X" % werror.WERR_DS_MISSING_REQUIRED_ATT
236             self.assertTrue(werr in msg, msg)
237
238         try:
239             self.samdb.add({
240                 "dn": dn,
241                 "objectClass": "foreignSecurityPrincipal",
242                 "objectSid": sid})
243             self.fail("No exception should get ERR_UNWILLING_TO_PERFORM")
244         except ldb.LdbError as e:
245             (code, msg) = e.args
246             self.assertEqual(code, ldb.ERR_UNWILLING_TO_PERFORM, str(e))
247             werr = "%08X" % werror.WERR_DS_ILLEGAL_MOD_OPERATION
248             self.assertTrue(werr in msg, msg)
249
250         #
251         # We need to use the provision control
252         # in order to add foreignSecurityPrincipal
253         # objects
254         #
255
256         controls = ["provision:0"]
257         self.samdb.add({
258             "dn": dn,
259             "objectClass": "foreignSecurityPrincipal"},
260             controls=controls)
261
262         self.samdb.delete(dn)
263
264         try:
265             self.samdb.add({
266                 "dn": dn,
267                 "objectClass": "foreignSecurityPrincipal"},
268                 controls=controls)
269         except ldb.LdbError as e:
270             (code, msg) = e.args
271             self.fail("Got unexpected exception %d - %s "
272                       % (code, msg))
273
274         # cleanup
275         self.samdb.delete(dn)
276
277     def _test_foreignSecurityPrincipal(self, obj_class, fpo_attr):
278
279         dom_sid = self.samdb.get_domain_sid()
280         lsid_str = str(dom_sid) + "-4294967294"
281         bsid_str = "S-1-5-32-4294967294"
282         fsid_str = "S-1-5-4294967294"
283         basedn   = self.samdb.get_default_basedn()
284         cn       = "dsdb_test_fpo"
285         dn_str   = "cn=%s,cn=Users,%s" % (cn, basedn)
286         dn = ldb.Dn(self.samdb, dn_str)
287
288         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
289                                 base=basedn,
290                                 expression="(objectSid=%s)" % lsid_str,
291                                 attrs=[])
292         self.assertEqual(len(res), 0)
293         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
294                                 base=basedn,
295                                 expression="(objectSid=%s)" % bsid_str,
296                                 attrs=[])
297         self.assertEqual(len(res), 0)
298         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
299                                 base=basedn,
300                                 expression="(objectSid=%s)" % fsid_str,
301                                 attrs=[])
302         self.assertEqual(len(res), 0)
303
304         self.addCleanup(delete_force, self.samdb, dn_str)
305
306         self.samdb.add({
307             "dn": dn_str,
308             "objectClass": obj_class})
309
310         msg = ldb.Message()
311         msg.dn = dn
312         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % lsid_str,
313                                            ldb.FLAG_MOD_ADD,
314                                            fpo_attr)
315         try:
316             self.samdb.modify(msg)
317             self.fail("No exception should get LDB_ERR_UNWILLING_TO_PERFORM")
318         except ldb.LdbError as e:
319             (code, msg) = e.args
320             self.assertEqual(code, ldb.ERR_UNWILLING_TO_PERFORM, str(e))
321             werr = "%08X" % werror.WERR_DS_INVALID_GROUP_TYPE
322             self.assertTrue(werr in msg, msg)
323
324         msg = ldb.Message()
325         msg.dn = dn
326         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % bsid_str,
327                                            ldb.FLAG_MOD_ADD,
328                                            fpo_attr)
329         try:
330             self.samdb.modify(msg)
331             self.fail("No exception should get LDB_ERR_NO_SUCH_OBJECT")
332         except ldb.LdbError as e:
333             (code, msg) = e.args
334             self.assertEqual(code, ldb.ERR_NO_SUCH_OBJECT, str(e))
335             werr = "%08X" % werror.WERR_NO_SUCH_MEMBER
336             self.assertTrue(werr in msg, msg)
337
338         msg = ldb.Message()
339         msg.dn = dn
340         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % fsid_str,
341                                            ldb.FLAG_MOD_ADD,
342                                            fpo_attr)
343         try:
344             self.samdb.modify(msg)
345         except ldb.LdbError as e:
346             self.fail("Should have not raised an exception")
347
348         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
349                                 base=basedn,
350                                 expression="(objectSid=%s)" % fsid_str,
351                                 attrs=[])
352         self.assertEqual(len(res), 1)
353         self.samdb.delete(res[0].dn)
354         self.samdb.delete(dn)
355         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
356                                 base=basedn,
357                                 expression="(objectSid=%s)" % fsid_str,
358                                 attrs=[])
359         self.assertEqual(len(res), 0)
360
361     def test_foreignSecurityPrincipal_member(self):
362         return self._test_foreignSecurityPrincipal(
363                 "group", "member")
364
365     def test_foreignSecurityPrincipal_MembersForAzRole(self):
366         return self._test_foreignSecurityPrincipal(
367                 "msDS-AzRole", "msDS-MembersForAzRole")
368
369     def test_foreignSecurityPrincipal_NeverRevealGroup(self):
370         return self._test_foreignSecurityPrincipal(
371                 "computer", "msDS-NeverRevealGroup")
372
373     def test_foreignSecurityPrincipal_RevealOnDemandGroup(self):
374         return self._test_foreignSecurityPrincipal(
375                 "computer", "msDS-RevealOnDemandGroup")
376
377     def _test_fail_foreignSecurityPrincipal(self, obj_class, fpo_attr,
378                                             msg_exp, lerr_exp, werr_exp,
379                                             allow_reference=True):
380
381         dom_sid = self.samdb.get_domain_sid()
382         lsid_str = str(dom_sid) + "-4294967294"
383         bsid_str = "S-1-5-32-4294967294"
384         fsid_str = "S-1-5-4294967294"
385         basedn   = self.samdb.get_default_basedn()
386         cn1       = "dsdb_test_fpo1"
387         dn1_str   = "cn=%s,cn=Users,%s" % (cn1, basedn)
388         dn1 = ldb.Dn(self.samdb, dn1_str)
389         cn2       = "dsdb_test_fpo2"
390         dn2_str   = "cn=%s,cn=Users,%s" % (cn2, basedn)
391         dn2 = ldb.Dn(self.samdb, dn2_str)
392
393         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
394                                 base=basedn,
395                                 expression="(objectSid=%s)" % lsid_str,
396                                 attrs=[])
397         self.assertEqual(len(res), 0)
398         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
399                                 base=basedn,
400                                 expression="(objectSid=%s)" % bsid_str,
401                                 attrs=[])
402         self.assertEqual(len(res), 0)
403         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
404                                 base=basedn,
405                                 expression="(objectSid=%s)" % fsid_str,
406                                 attrs=[])
407         self.assertEqual(len(res), 0)
408
409         self.addCleanup(delete_force, self.samdb, dn1_str)
410         self.addCleanup(delete_force, self.samdb, dn2_str)
411
412         self.samdb.add({
413             "dn": dn1_str,
414             "objectClass": obj_class})
415
416         self.samdb.add({
417             "dn": dn2_str,
418             "objectClass": obj_class})
419
420         msg = ldb.Message()
421         msg.dn = dn1
422         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % lsid_str,
423                                            ldb.FLAG_MOD_ADD,
424                                            fpo_attr)
425         try:
426             self.samdb.modify(msg)
427             self.fail("No exception should get %s" % msg_exp)
428         except ldb.LdbError as e:
429             (code, msg) = e.args
430             self.assertEqual(code, lerr_exp, str(e))
431             werr = "%08X" % werr_exp
432             self.assertTrue(werr in msg, msg)
433
434         msg = ldb.Message()
435         msg.dn = dn1
436         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % bsid_str,
437                                            ldb.FLAG_MOD_ADD,
438                                            fpo_attr)
439         try:
440             self.samdb.modify(msg)
441             self.fail("No exception should get %s" % msg_exp)
442         except ldb.LdbError as e:
443             (code, msg) = e.args
444             self.assertEqual(code, lerr_exp, str(e))
445             werr = "%08X" % werr_exp
446             self.assertTrue(werr in msg, msg)
447
448         msg = ldb.Message()
449         msg.dn = dn1
450         msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % fsid_str,
451                                            ldb.FLAG_MOD_ADD,
452                                            fpo_attr)
453         try:
454             self.samdb.modify(msg)
455             self.fail("No exception should get %s" % msg)
456         except ldb.LdbError as e:
457             (code, msg) = e.args
458             self.assertEqual(code, lerr_exp, str(e))
459             werr = "%08X" % werr_exp
460             self.assertTrue(werr in msg, msg)
461
462         msg = ldb.Message()
463         msg.dn = dn1
464         msg[fpo_attr] = ldb.MessageElement("%s" % dn2,
465                                            ldb.FLAG_MOD_ADD,
466                                            fpo_attr)
467         try:
468             self.samdb.modify(msg)
469             if not allow_reference:
470                 sel.fail("No exception should get %s" % msg_exp)
471         except ldb.LdbError as e:
472             if allow_reference:
473                 self.fail("Should have not raised an exception: %s" % e)
474             (code, msg) = e.args
475             self.assertEqual(code, lerr_exp, str(e))
476             werr = "%08X" % werr_exp
477             self.assertTrue(werr in msg, msg)
478
479         self.samdb.delete(dn2)
480         self.samdb.delete(dn1)
481
482     def test_foreignSecurityPrincipal_NonMembers(self):
483         return self._test_fail_foreignSecurityPrincipal(
484                 "group", "msDS-NonMembers",
485                 "LDB_ERR_UNWILLING_TO_PERFORM/WERR_NOT_SUPPORTED",
486                 ldb.ERR_UNWILLING_TO_PERFORM, werror.WERR_NOT_SUPPORTED,
487                 allow_reference=False)
488
489     def test_foreignSecurityPrincipal_HostServiceAccount(self):
490         return self._test_fail_foreignSecurityPrincipal(
491                 "computer", "msDS-HostServiceAccount",
492                 "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID",
493                 ldb.ERR_CONSTRAINT_VIOLATION,
494                 werror.WERR_DS_NAME_REFERENCE_INVALID)
495
496     def test_foreignSecurityPrincipal_manager(self):
497         return self._test_fail_foreignSecurityPrincipal(
498                 "user", "manager",
499                 "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID",
500                 ldb.ERR_CONSTRAINT_VIOLATION,
501                 werror.WERR_DS_NAME_REFERENCE_INVALID)
502
503     #
504     # Duplicate objectSID's should not be permitted for sids in the local
505     # domain. The test sequence is add an object, delete it, then attempt to
506     # re-add it, this should fail with a constraint violation
507     #
508     def test_duplicate_objectSIDs_not_allowed_on_local_objects(self):
509
510         dom_sid = self.samdb.get_domain_sid()
511         rid     = self.allocate_rid()
512         sid_str = str(dom_sid) + "-" + rid
513         sid     = ndr_pack(security.dom_sid(sid_str))
514         basedn  = self.samdb.get_default_basedn()
515         cn       = "dsdb_test_01"
516         dn      = "cn=%s,cn=Users,%s" % (cn, basedn)
517
518         self.samdb.add({
519             "dn": dn,
520             "objectClass": "user",
521             "objectSID": sid})
522         self.samdb.delete(dn)
523
524         try:
525             self.samdb.add({
526                 "dn": dn,
527                 "objectClass": "user",
528                 "objectSID": sid})
529             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
530         except ldb.LdbError as e:
531             (code, msg) = e.args
532             if code != ldb.ERR_CONSTRAINT_VIOLATION:
533                 self.fail("Got %d - %s should have got "
534                           "LDB_ERR_CONSTRAINT_VIOLATION"
535                           % (code, msg))
536
537     def test_linked_vs_non_linked_reference(self):
538         basedn   = self.samdb.get_default_basedn()
539         kept_dn_str   = "cn=reference_kept,cn=Users,%s" % (basedn)
540         removed_dn_str   = "cn=reference_removed,cn=Users,%s" % (basedn)
541         dom_sid = self.samdb.get_domain_sid()
542         none_sid_str = str(dom_sid) + "-4294967294"
543         none_guid_str = "afafafaf-fafa-afaf-fafa-afafafafafaf"
544
545         self.addCleanup(delete_force, self.samdb, kept_dn_str)
546         self.addCleanup(delete_force, self.samdb, removed_dn_str)
547
548         self.samdb.add({
549             "dn": kept_dn_str,
550             "objectClass": "user"})
551         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
552                                 base=kept_dn_str,
553                                 attrs=["objectGUID", "objectSID"])
554         self.assertEqual(len(res), 1)
555         kept_guid = ndr_unpack(misc.GUID, res[0]["objectGUID"][0])
556         kept_sid = ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
557         kept_dn = res[0].dn
558
559         self.samdb.add({
560             "dn": removed_dn_str,
561             "objectClass": "user"})
562         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
563                                 base=removed_dn_str,
564                                 attrs=["objectGUID", "objectSID"])
565         self.assertEqual(len(res), 1)
566         removed_guid = ndr_unpack(misc.GUID, res[0]["objectGUID"][0])
567         removed_sid = ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
568         self.samdb.delete(removed_dn_str)
569
570         #
571         # First try the linked attribute 'manager'
572         # by GUID and SID
573         #
574
575         msg = ldb.Message()
576         msg.dn = kept_dn
577         msg["manager"] = ldb.MessageElement("<SID=%s>" % removed_sid,
578                                            ldb.FLAG_MOD_ADD,
579                                            "manager")
580         try:
581             self.samdb.modify(msg)
582             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
583         except ldb.LdbError as e:
584             (code, msg) = e.args
585             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
586             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
587             self.assertTrue(werr in msg, msg)
588
589         msg = ldb.Message()
590         msg.dn = kept_dn
591         msg["manager"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
592                                            ldb.FLAG_MOD_ADD,
593                                            "manager")
594         try:
595             self.samdb.modify(msg)
596             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
597         except ldb.LdbError as e:
598             (code, msg) = e.args
599             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
600             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
601             self.assertTrue(werr in msg, msg)
602
603         #
604         # Try the non-linked attribute 'assistant'
605         # by GUID and SID, which should work.
606         #
607         msg = ldb.Message()
608         msg.dn = kept_dn
609         msg["assistant"] = ldb.MessageElement("<SID=%s>" % removed_sid,
610                                               ldb.FLAG_MOD_ADD,
611                                               "assistant")
612         self.samdb.modify(msg)
613         msg = ldb.Message()
614         msg.dn = kept_dn
615         msg["assistant"] = ldb.MessageElement("<SID=%s>" % removed_sid,
616                                               ldb.FLAG_MOD_DELETE,
617                                               "assistant")
618         self.samdb.modify(msg)
619
620         msg = ldb.Message()
621         msg.dn = kept_dn
622         msg["assistant"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
623                                               ldb.FLAG_MOD_ADD,
624                                               "assistant")
625         self.samdb.modify(msg)
626         msg = ldb.Message()
627         msg.dn = kept_dn
628         msg["assistant"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
629                                               ldb.FLAG_MOD_DELETE,
630                                               "assistant")
631         self.samdb.modify(msg)
632
633         #
634         # Finally ry the non-linked attribute 'assistant'
635         # but with non existing GUID, SID, DN
636         #
637         msg = ldb.Message()
638         msg.dn = kept_dn
639         msg["assistant"] = ldb.MessageElement("CN=NoneNone,%s" % (basedn),
640                                               ldb.FLAG_MOD_ADD,
641                                               "assistant")
642         try:
643             self.samdb.modify(msg)
644             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
645         except ldb.LdbError as e:
646             (code, msg) = e.args
647             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
648             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
649             self.assertTrue(werr in msg, msg)
650
651         msg = ldb.Message()
652         msg.dn = kept_dn
653         msg["assistant"] = ldb.MessageElement("<SID=%s>" % none_sid_str,
654                                               ldb.FLAG_MOD_ADD,
655                                               "assistant")
656         try:
657             self.samdb.modify(msg)
658             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
659         except ldb.LdbError as e:
660             (code, msg) = e.args
661             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
662             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
663             self.assertTrue(werr in msg, msg)
664
665         msg = ldb.Message()
666         msg.dn = kept_dn
667         msg["assistant"] = ldb.MessageElement("<GUID=%s>" % none_guid_str,
668                                               ldb.FLAG_MOD_ADD,
669                                               "assistant")
670         try:
671             self.samdb.modify(msg)
672             self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
673         except ldb.LdbError as e:
674             (code, msg) = e.args
675             self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
676             werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
677             self.assertTrue(werr in msg, msg)
678
679         self.samdb.delete(kept_dn)
680
681     def test_normalize_dn_in_domain_full(self):
682         domain_dn = self.samdb.domain_dn()
683
684         part_dn = ldb.Dn(self.samdb, "CN=Users")
685
686         full_dn = part_dn
687         full_dn.add_base(domain_dn)
688
689         full_str = str(full_dn)
690
691         # That is, no change
692         self.assertEqual(full_dn,
693                          self.samdb.normalize_dn_in_domain(full_str))
694
695     def test_normalize_dn_in_domain_part(self):
696         domain_dn = self.samdb.domain_dn()
697
698         part_str = "CN=Users"
699
700         full_dn = ldb.Dn(self.samdb, part_str)
701         full_dn.add_base(domain_dn)
702
703         # That is, the domain DN appended
704         self.assertEqual(full_dn,
705                          self.samdb.normalize_dn_in_domain(part_str))
706
707     def test_normalize_dn_in_domain_full_dn(self):
708         domain_dn = self.samdb.domain_dn()
709
710         part_dn = ldb.Dn(self.samdb, "CN=Users")
711
712         full_dn = part_dn
713         full_dn.add_base(domain_dn)
714
715         # That is, no change
716         self.assertEqual(full_dn,
717                          self.samdb.normalize_dn_in_domain(full_dn))
718
719     def test_normalize_dn_in_domain_part_dn(self):
720         domain_dn = self.samdb.domain_dn()
721
722         part_dn = ldb.Dn(self.samdb, "CN=Users")
723
724         # That is, the domain DN appended
725         self.assertEqual(ldb.Dn(self.samdb,
726                                 str(part_dn) + "," + str(domain_dn)),
727                          self.samdb.normalize_dn_in_domain(part_dn))
728