pydsdb: Import testtools before subunit for those that don't have
[metze/samba/wip.git] / source4 / dsdb / tests / python / deletetest.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import optparse
5 import sys
6 import os
7
8 sys.path.append("bin/python")
9 import samba
10 samba.ensure_external_module("testtools", "testtools")
11 samba.ensure_external_module("subunit", "subunit/python")
12
13 import samba.getopt as options
14
15 from samba.auth import system_session
16 from ldb import SCOPE_BASE, LdbError
17 from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF
18 from ldb import ERR_UNWILLING_TO_PERFORM
19 from samba import Ldb
20
21 from subunit.run import SubunitTestRunner
22 import unittest
23
24 parser = optparse.OptionParser("deletetest.py [options] <host|file>")
25 sambaopts = options.SambaOptions(parser)
26 parser.add_option_group(sambaopts)
27 parser.add_option_group(options.VersionOptions(parser))
28 # use command line creds if available
29 credopts = options.CredentialsOptions(parser)
30 parser.add_option_group(credopts)
31 opts, args = parser.parse_args()
32
33 if len(args) < 1:
34     parser.print_usage()
35     sys.exit(1)
36
37 host = args[0]
38
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
41
42 class BasicDeleteTests(unittest.TestCase):
43
44     def delete_force(self, ldb, dn):
45         try:
46             ldb.delete(dn)
47         except LdbError, (num, _):
48             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
49
50     def GUID_string(self, guid):
51         return self.ldb.schema_format_value("objectGUID", guid)
52
53     def find_basedn(self, ldb):
54         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
55                          attrs=["defaultNamingContext"])
56         self.assertEquals(len(res), 1)
57         return res[0]["defaultNamingContext"][0]
58
59     def find_configurationdn(self, ldb):
60         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
61                          attrs=["configurationNamingContext"])
62         self.assertEquals(len(res), 1)
63         return res[0]["configurationNamingContext"][0]
64
65     def setUp(self):
66         self.ldb = ldb
67         self.base_dn = self.find_basedn(ldb)
68         self.configuration_dn = self.find_configurationdn(ldb)
69
70     def search_guid(self, guid):
71         print "SEARCH by GUID %s" % self.GUID_string(guid)
72
73         res = ldb.search(base="<GUID=%s>" % self.GUID_string(guid),
74                          scope=SCOPE_BASE, controls=["show_deleted:1"])
75         self.assertEquals(len(res), 1)
76         return res[0]
77
78     def search_dn(self,dn):
79         print "SEARCH by DN %s" % dn
80
81         res = ldb.search(expression="(objectClass=*)",
82                          base=dn,
83                          scope=SCOPE_BASE,
84                          controls=["show_deleted:1"])
85         self.assertEquals(len(res), 1)
86         return res[0]
87
88     def del_attr_values(self, delObj):
89         print "Checking attributes for %s" % delObj["dn"]
90
91         self.assertEquals(delObj["isDeleted"][0],"TRUE")
92         self.assertTrue(not("objectCategory" in delObj))
93         self.assertTrue(not("sAMAccountType" in delObj))
94
95     def preserved_attributes_list(self, liveObj, delObj):
96         print "Checking for preserved attributes list"
97
98         preserved_list = ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
99         "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
100         "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
101         "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
102         "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
103         "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
104         "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
105
106         for a in liveObj:
107             if a in preserved_list:
108                 self.assertTrue(a in delObj)
109
110     def check_rdn(self, liveObj, delObj, rdnName):
111         print "Checking for correct rDN"
112         rdn=liveObj[rdnName][0]
113         rdn2=delObj[rdnName][0]
114         name2=delObj[rdnName][0]
115         guid=liveObj["objectGUID"][0]
116         self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid))
117         self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid))
118
119     def delete_deleted(self, ldb, dn):
120         print "Testing the deletion of the already deleted dn %s" % dn
121
122         try:
123             ldb.delete(dn)
124             self.fail()
125         except LdbError, (num, _):
126             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
127
128     def test_delete_protection(self):
129         """Delete protection tests"""
130
131         print self.base_dn
132
133         self.delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
134         self.delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
135         self.delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
136
137         ldb.add({
138             "dn": "cn=ldaptestcontainer," + self.base_dn,
139             "objectclass": "container"})
140         ldb.add({
141             "dn": "cn=entry1,cn=ldaptestcontainer," + self.base_dn,
142             "objectclass": "container"})
143         ldb.add({
144             "dn": "cn=entry2,cn=ldaptestcontainer," + self.base_dn,
145             "objectclass": "container"})
146
147         try:
148             ldb.delete("cn=ldaptestcontainer," + self.base_dn)
149             self.fail()
150         except LdbError, (num, _):
151             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
152
153         ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"])
154
155         try:
156             res = ldb.search("cn=ldaptestcontainer," + self.base_dn,
157                              scope=SCOPE_BASE, attrs=[])
158             self.fail()
159         except LdbError, (num, _):
160             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
161         try:
162             res = ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn,
163                              scope=SCOPE_BASE, attrs=[])
164             self.fail()
165         except LdbError, (num, _):
166             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
167         try:
168             res = ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn,
169                              scope=SCOPE_BASE, attrs=[])
170             self.fail()
171         except LdbError, (num, _):
172             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
173
174         self.delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
175         self.delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
176         self.delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
177
178         # Performs some protected object delete testing
179
180         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
181                          attrs=["dsServiceName", "dNSHostName"])
182         self.assertEquals(len(res), 1)
183
184         # Delete failing since DC's nTDSDSA object is protected
185         try:
186             ldb.delete(res[0]["dsServiceName"][0])
187             self.fail()
188         except LdbError, (num, _):
189             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
190
191         res = ldb.search(self.base_dn, attrs=["rIDSetReferences"],
192                          expression="(&(objectClass=computer)(dNSHostName=" + res[0]["dNSHostName"][0] + "))")
193         self.assertEquals(len(res), 1)
194
195         # Deletes failing since DC's rIDSet object is protected
196         try:
197             ldb.delete(res[0]["rIDSetReferences"][0])
198             self.fail()
199         except LdbError, (num, _):
200             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
201         try:
202             ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"])
203             self.fail()
204         except LdbError, (num, _):
205             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
206
207         # Deletes failing since three main crossRef objects are protected
208
209         try:
210             ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn)
211             self.fail()
212         except LdbError, (num, _):
213             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
214         try:
215             ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
216             self.fail()
217         except LdbError, (num, _):
218             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
219
220         try:
221             ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn)
222             self.fail()
223         except LdbError, (num, _):
224             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
225         try:
226             ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
227             self.fail()
228         except LdbError, (num, _):
229             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
230
231         res = ldb.search("cn=Partitions," + self.configuration_dn, attrs=[],
232                          expression="(nCName=%s)" % self.base_dn)
233         self.assertEquals(len(res), 1)
234
235         try:
236             ldb.delete(res[0].dn)
237             self.fail()
238         except LdbError, (num, _):
239             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
240         try:
241             ldb.delete(res[0].dn, ["tree_delete:1"])
242             self.fail()
243         except LdbError, (num, _):
244             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
245
246         # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
247         try:
248             ldb.delete("CN=Users," + self.base_dn)
249             self.fail()
250         except LdbError, (num, _):
251             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
252
253         # Tree-delete failing since "isCriticalSystemObject"
254         try:
255             ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"])
256             self.fail()
257         except LdbError, (num, _):
258             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
259
260     def test_all(self):
261         """Basic delete tests"""
262
263         print self.base_dn
264
265         usr1="cn=testuser,cn=users," + self.base_dn
266         usr2="cn=testuser2,cn=users," + self.base_dn
267         grp1="cn=testdelgroup1,cn=users," + self.base_dn
268         sit1="cn=testsite1,cn=sites," + self.configuration_dn
269         ss1="cn=NTDS Site Settings,cn=testsite1,cn=sites," + self.configuration_dn
270         srv1="cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn
271         srv2="cn=TESTSRV,cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn
272
273         self.delete_force(self.ldb, usr1)
274         self.delete_force(self.ldb, usr2)
275         self.delete_force(self.ldb, grp1)
276         self.delete_force(self.ldb, ss1)
277         self.delete_force(self.ldb, srv2)
278         self.delete_force(self.ldb, srv1)
279         self.delete_force(self.ldb, sit1)
280
281         ldb.add({
282             "dn": usr1,
283             "objectclass": "user",
284             "description": "test user description",
285             "samaccountname": "testuser"})
286
287         ldb.add({
288             "dn": usr2,
289             "objectclass": "user",
290             "description": "test user 2 description",
291             "samaccountname": "testuser2"})
292
293         ldb.add({
294             "dn": grp1,
295             "objectclass": "group",
296             "description": "test group",
297             "samaccountname": "testdelgroup1",
298             "member": [ usr1, usr2 ],
299             "isDeleted": "FALSE" })
300
301         ldb.add({
302             "dn": sit1,
303             "objectclass": "site" })
304
305         ldb.add({
306             "dn": ss1,
307             "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] })
308
309         ldb.add({
310             "dn": srv1,
311             "objectclass": "serversContainer" })
312
313         ldb.add({
314             "dn": srv2,
315             "objectClass": "server" })
316
317         objLive1 = self.search_dn(usr1)
318         guid1=objLive1["objectGUID"][0]
319
320         objLive2 = self.search_dn(usr2)
321         guid2=objLive2["objectGUID"][0]
322
323         objLive3 = self.search_dn(grp1)
324         guid3=objLive3["objectGUID"][0]
325
326         objLive4 = self.search_dn(sit1)
327         guid4=objLive4["objectGUID"][0]
328
329         objLive5 = self.search_dn(ss1)
330         guid5=objLive5["objectGUID"][0]
331
332         objLive6 = self.search_dn(srv1)
333         guid6=objLive6["objectGUID"][0]
334
335         objLive7 = self.search_dn(srv2)
336         guid7=objLive7["objectGUID"][0]
337
338         ldb.delete(usr1)
339         ldb.delete(usr2)
340         ldb.delete(grp1)
341         ldb.delete(srv1, ["tree_delete:1"])
342         ldb.delete(sit1, ["tree_delete:1"])
343
344         objDeleted1 = self.search_guid(guid1)
345         objDeleted2 = self.search_guid(guid2)
346         objDeleted3 = self.search_guid(guid3)
347         objDeleted4 = self.search_guid(guid4)
348         objDeleted5 = self.search_guid(guid5)
349         objDeleted6 = self.search_guid(guid6)
350         objDeleted7 = self.search_guid(guid7)
351
352         self.del_attr_values(objDeleted1)
353         self.del_attr_values(objDeleted2)
354         self.del_attr_values(objDeleted3)
355         self.del_attr_values(objDeleted4)
356         self.del_attr_values(objDeleted5)
357         self.del_attr_values(objDeleted6)
358         self.del_attr_values(objDeleted7)
359
360         self.preserved_attributes_list(objLive1, objDeleted1)
361         self.preserved_attributes_list(objLive2, objDeleted2)
362         self.preserved_attributes_list(objLive3, objDeleted3)
363         self.preserved_attributes_list(objLive4, objDeleted4)
364         self.preserved_attributes_list(objLive5, objDeleted5)
365         self.preserved_attributes_list(objLive6, objDeleted6)
366         self.preserved_attributes_list(objLive7, objDeleted7)
367
368         self.check_rdn(objLive1, objDeleted1, "cn")
369         self.check_rdn(objLive2, objDeleted2, "cn")
370         self.check_rdn(objLive3, objDeleted3, "cn")
371         self.check_rdn(objLive4, objDeleted4, "cn")
372         self.check_rdn(objLive5, objDeleted5, "cn")
373         self.check_rdn(objLive6, objDeleted6, "cn")
374         self.check_rdn(objLive7, objDeleted7, "cn")
375
376         self.delete_deleted(ldb, usr1)
377         self.delete_deleted(ldb, usr2)
378         self.delete_deleted(ldb, grp1)
379         self.delete_deleted(ldb, sit1)
380         self.delete_deleted(ldb, ss1)
381         self.delete_deleted(ldb, srv1)
382         self.delete_deleted(ldb, srv2)
383
384         self.assertTrue("CN=Deleted Objects" in str(objDeleted1.dn))
385         self.assertTrue("CN=Deleted Objects" in str(objDeleted2.dn))
386         self.assertTrue("CN=Deleted Objects" in str(objDeleted3.dn))
387         self.assertFalse("CN=Deleted Objects" in str(objDeleted4.dn))
388         self.assertTrue("CN=Deleted Objects" in str(objDeleted5.dn))
389         self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn))
390         self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
391
392 if not "://" in host:
393     if os.path.isfile(host):
394         host = "tdb://%s" % host
395     else:
396         host = "ldap://%s" % host
397
398 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
399
400 runner = SubunitTestRunner()
401 rc = 0
402 if not runner.run(unittest.makeSuite(BasicDeleteTests)).wasSuccessful():
403     rc = 1
404
405 sys.exit(rc)