import samba.getopt as options
from samba.auth import system_session
-from ldb import SCOPE_BASE, LdbError
-from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF
-from ldb import ERR_UNWILLING_TO_PERFORM
+from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
+from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS
+from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR
from samba.samdb import SamDB
from samba.tests import delete_force
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
-class BasicDeleteTests(samba.tests.TestCase):
+class BaseDeleteTests(samba.tests.TestCase):
def GUID_string(self, guid):
return self.ldb.schema_format_value("objectGUID", guid)
def setUp(self):
- super(BasicDeleteTests, self).setUp()
+ super(BaseDeleteTests, self).setUp()
self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
self.base_dn = self.ldb.domain_dn()
self.assertEquals(len(res), 1)
return res[0]
+
+class BasicDeleteTests(BaseDeleteTests):
+
+ def setUp(self):
+ super(BasicDeleteTests, self).setUp()
+
def del_attr_values(self, delObj):
print "Checking attributes for %s" % delObj["dn"]
self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn))
self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
+class BasicUndeleteTests(BaseDeleteTests):
+
+ def setUp(self):
+ super(BasicUndeleteTests, self).setUp()
+
+ def enable_recycle_bin(self):
+ msg = Message()
+ msg.dn = Dn(ldb, "")
+ msg["enableOptionalFeature"] = MessageElement(
+ "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
+ FLAG_MOD_ADD, "enableOptionalFeature")
+ try:
+ ldb.modify(msg)
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
+
+ def get_ldb_connection(self, target_username, target_password):
+ creds_tmp = Credentials()
+ creds_tmp.set_username(target_username)
+ creds_tmp.set_password(target_password)
+ creds_tmp.set_domain(creds.get_domain())
+ creds_tmp.set_realm(creds.get_realm())
+ creds_tmp.set_workstation(creds.get_workstation())
+ creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
+ | gensec.FEATURE_SEAL)
+ creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+ ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
+ return ldb_target
+
+ def undelete_deleted(self, olddn, newdn, samldb):
+ msg = Message()
+ msg.dn = Dn(ldb, olddn)
+ msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
+ msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
+ res = samldb.modify(msg, ["show_deleted:1"])
+
+ def undelete_deleted_with_mod(self, olddn, newdn):
+ msg = Message()
+ msg.dn = Dn(ldb, olddn)
+ msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
+ msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
+ msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
+ res = ldb.modify(msg, ["show_deleted:1"])
+
+
+ def test_undelete(self):
+ print "Testing standard undelete operation"
+ usr1="cn=testuser,cn=users," + self.base_dn
+ delete_force(self.ldb, usr1)
+ ldb.add({
+ "dn": usr1,
+ "objectclass": "user",
+ "description": "test user description",
+ "samaccountname": "testuser"})
+ objLive1 = self.search_dn(usr1)
+ guid1=objLive1["objectGUID"][0]
+ ldb.delete(usr1)
+ objDeleted1 = self.search_guid(guid1)
+ self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
+ objLive2 = self.search_dn(usr1)
+ self.assertEqual(str(objLive2.dn),str(objLive1.dn))
+ delete_force(self.ldb, usr1)
+
+ def test_rename(self):
+ print "Testing attempt to rename deleted object"
+ usr1="cn=testuser,cn=users," + self.base_dn
+ ldb.add({
+ "dn": usr1,
+ "objectclass": "user",
+ "description": "test user description",
+ "samaccountname": "testuser"})
+ objLive1 = self.search_dn(usr1)
+ guid1=objLive1["objectGUID"][0]
+ ldb.delete(usr1)
+ objDeleted1 = self.search_guid(guid1)
+ #just to make sure we get the correct error if the show deleted is missing
+ try:
+ ldb.rename(str(objDeleted1.dn), usr1)
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num,ERR_NO_SUCH_OBJECT)
+
+ try:
+ ldb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num,ERR_UNWILLING_TO_PERFORM)
+
+ def test_undelete_with_mod(self):
+ print "Testing standard undelete operation with modification of additional attributes"
+ usr1="cn=testuser,cn=users," + self.base_dn
+ ldb.add({
+ "dn": usr1,
+ "objectclass": "user",
+ "description": "test user description",
+ "samaccountname": "testuser"})
+ objLive1 = self.search_dn(usr1)
+ guid1=objLive1["objectGUID"][0]
+ ldb.delete(usr1)
+ objDeleted1 = self.search_guid(guid1)
+ self.undelete_deleted_with_mod(str(objDeleted1.dn), usr1)
+ objLive2 = self.search_dn(usr1)
+ self.assertEqual(objLive2["url"][0],"www.samba.org")
+ delete_force(self.ldb, usr1)
+
+ def test_undelete_newuser(self):
+ print "Testing undelete user with a different dn"
+ usr1="cn=testuser,cn=users," + self.base_dn
+ usr2="cn=testuser2,cn=users," + self.base_dn
+ delete_force(self.ldb, usr1)
+ ldb.add({
+ "dn": usr1,
+ "objectclass": "user",
+ "description": "test user description",
+ "samaccountname": "testuser"})
+ objLive1 = self.search_dn(usr1)
+ guid1=objLive1["objectGUID"][0]
+ ldb.delete(usr1)
+ objDeleted1 = self.search_guid(guid1)
+ self.undelete_deleted(str(objDeleted1.dn), usr2, ldb)
+ objLive2 = self.search_dn(usr2)
+ delete_force(self.ldb, usr1)
+ delete_force(self.ldb, usr2)
+
+ def test_undelete_existing(self):
+ print "Testing undelete user after a user with the same dn has been created"
+ usr1="cn=testuser,cn=users," + self.base_dn
+ ldb.add({
+ "dn": usr1,
+ "objectclass": "user",
+ "description": "test user description",
+ "samaccountname": "testuser"})
+ objLive1 = self.search_dn(usr1)
+ guid1=objLive1["objectGUID"][0]
+ ldb.delete(usr1)
+ ldb.add({
+ "dn": usr1,
+ "objectclass": "user",
+ "description": "test user description",
+ "samaccountname": "testuser"})
+ objDeleted1 = self.search_guid(guid1)
+ try:
+ self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num,ERR_ENTRY_ALREADY_EXISTS)
+
+ def test_undelete_cross_nc(self):
+ print "Cross NC undelete"
+ c1 = "cn=ldaptestcontainer," + self.base_dn;
+ c2 = "cn=ldaptestcontainer2," + self.configuration_dn
+ c3 = "cn=ldaptestcontainer," + self.configuration_dn
+ c4 = "cn=ldaptestcontainer2," + self.base_dn;
+ ldb.add({
+ "dn": c1,
+ "objectclass": "container"})
+ ldb.add({
+ "dn": c2,
+ "objectclass": "container"})
+ objLive1 = self.search_dn(c1)
+ objLive2 = self.search_dn(c2)
+ guid1=objLive1["objectGUID"][0]
+ guid2=objLive2["objectGUID"][0]
+ ldb.delete(c1)
+ ldb.delete(c2)
+ objDeleted1 = self.search_guid(guid1)
+ objDeleted2 = self.search_guid(guid2)
+ #try to undelete from base dn to config
+ try:
+ self.undelete_deleted(str(objDeleted1.dn), c3, ldb)
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_OPERATIONS_ERROR)
+ #try to undelete from config to base dn
+ try:
+ self.undelete_deleted(str(objDeleted2.dn), c4, ldb)
+ self.fail()
+ except LdbError, (num, _):
+ self.assertEquals(num, ERR_OPERATIONS_ERROR)
+ #assert undeletion will work in same nc
+ self.undelete_deleted(str(objDeleted1.dn), c4, ldb)
+ self.undelete_deleted(str(objDeleted2.dn), c3, ldb)
+ delete_force(self.ldb, c3)
+ delete_force(self.ldb, c4)
+
+
+
if not "://" in host:
if os.path.isfile(host):
host = "tdb://%s" % host