s4-dsdb-tests: Some tests for deleted objects undelete operation
authorNadezhda Ivanova <nivanova@symas.com>
Tue, 21 Oct 2014 13:35:30 +0000 (16:35 +0300)
committerAndrew Bartlett <abartlet@samba.org>
Tue, 3 Feb 2015 04:02:10 +0000 (05:02 +0100)
Based on MS-ADTS 3.1.1.5.3.7.2

Signed-off-by: Nadezhda Ivanova <nivanova@symas.com>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Change-Id: I650b315601fce574f9302435f812d1dd4b177e68

source4/dsdb/tests/python/deletetest.py

index 370f56cdea44db4c18f73da2ee48548fb57fd450..cb08db404d3a18eb33985550dfc8a715294607d3 100755 (executable)
@@ -13,9 +13,9 @@ from samba.tests.subunitrun import SubunitOptions, TestProgram
 import samba.getopt as options
 
 from samba.auth import system_session
-from ldb import SCOPE_BASE, LdbError
-from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF
-from ldb import ERR_UNWILLING_TO_PERFORM
+from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
+from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS
+from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR
 from samba.samdb import SamDB
 from samba.tests import delete_force
 
@@ -39,13 +39,13 @@ host = args[0]
 lp = sambaopts.get_loadparm()
 creds = credopts.get_credentials(lp)
 
-class BasicDeleteTests(samba.tests.TestCase):
+class BaseDeleteTests(samba.tests.TestCase):
 
     def GUID_string(self, guid):
         return self.ldb.schema_format_value("objectGUID", guid)
 
     def setUp(self):
-        super(BasicDeleteTests, self).setUp()
+        super(BaseDeleteTests, self).setUp()
         self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
 
         self.base_dn = self.ldb.domain_dn()
@@ -69,6 +69,12 @@ class BasicDeleteTests(samba.tests.TestCase):
         self.assertEquals(len(res), 1)
         return res[0]
 
+
+class BasicDeleteTests(BaseDeleteTests):
+
+    def setUp(self):
+        super(BasicDeleteTests, self).setUp()
+
     def del_attr_values(self, delObj):
         print "Checking attributes for %s" % delObj["dn"]
 
@@ -373,6 +379,193 @@ class BasicDeleteTests(samba.tests.TestCase):
         self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn))
         self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
 
+class BasicUndeleteTests(BaseDeleteTests):
+
+    def setUp(self):
+        super(BasicUndeleteTests, self).setUp()
+
+    def enable_recycle_bin(self):
+        msg = Message()
+        msg.dn = Dn(ldb, "")
+        msg["enableOptionalFeature"] = MessageElement(
+            "CN=Partitions," +  self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
+            FLAG_MOD_ADD, "enableOptionalFeature")
+        try:
+            ldb.modify(msg)
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
+
+    def get_ldb_connection(self, target_username, target_password):
+        creds_tmp = Credentials()
+        creds_tmp.set_username(target_username)
+        creds_tmp.set_password(target_password)
+        creds_tmp.set_domain(creds.get_domain())
+        creds_tmp.set_realm(creds.get_realm())
+        creds_tmp.set_workstation(creds.get_workstation())
+        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
+                                      | gensec.FEATURE_SEAL)
+        creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+        ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
+        return ldb_target
+
+    def undelete_deleted(self, olddn, newdn, samldb):
+        msg = Message()
+        msg.dn = Dn(ldb, olddn)
+        msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
+        msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
+        res = samldb.modify(msg, ["show_deleted:1"])
+
+    def undelete_deleted_with_mod(self, olddn, newdn):
+        msg = Message()
+        msg.dn = Dn(ldb, olddn)
+        msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
+        msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
+        msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
+        res = ldb.modify(msg, ["show_deleted:1"])
+
+
+    def test_undelete(self):
+        print "Testing standard undelete operation"
+        usr1="cn=testuser,cn=users," + self.base_dn
+        delete_force(self.ldb, usr1)
+        ldb.add({
+            "dn": usr1,
+            "objectclass": "user",
+            "description": "test user description",
+            "samaccountname": "testuser"})
+        objLive1 = self.search_dn(usr1)
+        guid1=objLive1["objectGUID"][0]
+        ldb.delete(usr1)
+        objDeleted1 = self.search_guid(guid1)
+        self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
+        objLive2 = self.search_dn(usr1)
+        self.assertEqual(str(objLive2.dn),str(objLive1.dn))
+        delete_force(self.ldb, usr1)
+
+    def test_rename(self):
+        print "Testing attempt to rename deleted object"
+        usr1="cn=testuser,cn=users," + self.base_dn
+        ldb.add({
+            "dn": usr1,
+            "objectclass": "user",
+            "description": "test user description",
+            "samaccountname": "testuser"})
+        objLive1 = self.search_dn(usr1)
+        guid1=objLive1["objectGUID"][0]
+        ldb.delete(usr1)
+        objDeleted1 = self.search_guid(guid1)
+        #just to make sure we get the correct error if the show deleted is missing
+        try:
+            ldb.rename(str(objDeleted1.dn), usr1)
+            self.fail()
+        except LdbError, (num, _):
+            self.assertEquals(num,ERR_NO_SUCH_OBJECT)
+
+        try:
+            ldb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
+            self.fail()
+        except LdbError, (num, _):
+            self.assertEquals(num,ERR_UNWILLING_TO_PERFORM)
+
+    def test_undelete_with_mod(self):
+        print "Testing standard undelete operation with modification of additional attributes"
+        usr1="cn=testuser,cn=users," + self.base_dn
+        ldb.add({
+            "dn": usr1,
+            "objectclass": "user",
+            "description": "test user description",
+            "samaccountname": "testuser"})
+        objLive1 = self.search_dn(usr1)
+        guid1=objLive1["objectGUID"][0]
+        ldb.delete(usr1)
+        objDeleted1 = self.search_guid(guid1)
+        self.undelete_deleted_with_mod(str(objDeleted1.dn), usr1)
+        objLive2 = self.search_dn(usr1)
+        self.assertEqual(objLive2["url"][0],"www.samba.org")
+        delete_force(self.ldb, usr1)
+
+    def test_undelete_newuser(self):
+        print "Testing undelete user with a different dn"
+        usr1="cn=testuser,cn=users," + self.base_dn
+        usr2="cn=testuser2,cn=users," + self.base_dn
+        delete_force(self.ldb, usr1)
+        ldb.add({
+            "dn": usr1,
+            "objectclass": "user",
+            "description": "test user description",
+            "samaccountname": "testuser"})
+        objLive1 = self.search_dn(usr1)
+        guid1=objLive1["objectGUID"][0]
+        ldb.delete(usr1)
+        objDeleted1 = self.search_guid(guid1)
+        self.undelete_deleted(str(objDeleted1.dn), usr2, ldb)
+        objLive2 = self.search_dn(usr2)
+        delete_force(self.ldb, usr1)
+        delete_force(self.ldb, usr2)
+
+    def test_undelete_existing(self):
+        print "Testing undelete user after a user with the same dn has been created"
+        usr1="cn=testuser,cn=users," + self.base_dn
+        ldb.add({
+            "dn": usr1,
+            "objectclass": "user",
+            "description": "test user description",
+            "samaccountname": "testuser"})
+        objLive1 = self.search_dn(usr1)
+        guid1=objLive1["objectGUID"][0]
+        ldb.delete(usr1)
+        ldb.add({
+            "dn": usr1,
+            "objectclass": "user",
+            "description": "test user description",
+            "samaccountname": "testuser"})
+        objDeleted1 = self.search_guid(guid1)
+        try:
+            self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
+            self.fail()
+        except LdbError, (num, _):
+            self.assertEquals(num,ERR_ENTRY_ALREADY_EXISTS)
+
+    def test_undelete_cross_nc(self):
+        print "Cross NC undelete"
+        c1 = "cn=ldaptestcontainer," + self.base_dn;
+        c2 = "cn=ldaptestcontainer2," + self.configuration_dn
+        c3 = "cn=ldaptestcontainer," + self.configuration_dn
+        c4 = "cn=ldaptestcontainer2," + self.base_dn;
+        ldb.add({
+            "dn": c1,
+            "objectclass": "container"})
+        ldb.add({
+            "dn": c2,
+            "objectclass": "container"})
+        objLive1 = self.search_dn(c1)
+        objLive2 = self.search_dn(c2)
+        guid1=objLive1["objectGUID"][0]
+        guid2=objLive2["objectGUID"][0]
+        ldb.delete(c1)
+        ldb.delete(c2)
+        objDeleted1 = self.search_guid(guid1)
+        objDeleted2 = self.search_guid(guid2)
+        #try to undelete from base dn to config
+        try:
+            self.undelete_deleted(str(objDeleted1.dn), c3, ldb)
+            self.fail()
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+        #try to undelete from config to base dn
+        try:
+            self.undelete_deleted(str(objDeleted2.dn), c4, ldb)
+            self.fail()
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+        #assert undeletion will work in same nc
+        self.undelete_deleted(str(objDeleted1.dn), c4, ldb)
+        self.undelete_deleted(str(objDeleted2.dn), c3, ldb)
+        delete_force(self.ldb, c3)
+        delete_force(self.ldb, c4)
+
+
+
 if not "://" in host:
     if os.path.isfile(host):
         host = "tdb://%s" % host