X-Git-Url: http://git.samba.org/?p=samba.git;a=blobdiff_plain;f=python%2Fsamba%2Ftests%2Fdsdb.py;h=34a9435ea445a7d0e24c2e230bced757285663e4;hp=f38393e71a01a6cadea506309b54e27f947b6eb0;hb=073328673f0fb0a9797290f922ca8230e3f5fdf3;hpb=c5b4cbf34e21ac77124a2ca421c1fdd179329783 diff --git a/python/samba/tests/dsdb.py b/python/samba/tests/dsdb.py index f38393e71a0..34a9435ea44 100644 --- a/python/samba/tests/dsdb.py +++ b/python/samba/tests/dsdb.py @@ -21,13 +21,12 @@ from samba.credentials import Credentials from samba.samdb import SamDB from samba.auth import system_session from samba.tests import TestCase +from samba.tests import delete_force from samba.ndr import ndr_unpack, ndr_pack -from samba.dcerpc import drsblobs +from samba.dcerpc import drsblobs, security +from samba import dsdb import ldb -import os import samba -import gc -import time class DsdbTests(TestCase): @@ -41,14 +40,27 @@ class DsdbTests(TestCase): credentials=self.creds, lp=self.lp) + # Create a test user + user_name = "samdb-testuser" + user_pass = samba.generate_random_password(32, 32) + user_description = "Test user for dsdb test" + + base_dn = self.samdb.domain_dn() + + self.account_dn = "cn=" + user_name + ",cn=Users," + base_dn + delete_force(self.samdb, self.account_dn) + self.samdb.newuser(username=user_name, + password=user_pass, + description=user_description) + def test_get_oid_from_attrid(self): oid = self.samdb.get_oid_from_attid(591614) self.assertEquals(oid, "1.2.840.113556.1.4.1790") def test_error_replpropertymetadata(self): - res = self.samdb.search(expression="cn=Administrator", - scope=ldb.SCOPE_SUBTREE, - attrs=["replPropertyMetaData"]) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) ctr = repl.ctr @@ -64,9 +76,9 @@ class DsdbTests(TestCase): self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) def test_error_replpropertymetadata_nochange(self): - res = self.samdb.search(expression="cn=Administrator", - scope=ldb.SCOPE_SUBTREE, - attrs=["replPropertyMetaData"]) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) replBlob = ndr_pack(repl) @@ -76,9 +88,9 @@ class DsdbTests(TestCase): self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) def test_error_replpropertymetadata_allow_sort(self): - res = self.samdb.search(expression="cn=Administrator", - scope=ldb.SCOPE_SUBTREE, - attrs=["replPropertyMetaData"]) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) replBlob = ndr_pack(repl) @@ -88,9 +100,9 @@ class DsdbTests(TestCase): self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"]) def test_twoatt_replpropertymetadata(self): - res = self.samdb.search(expression="cn=Administrator", - scope=ldb.SCOPE_SUBTREE, - attrs=["replPropertyMetaData", "uSNChanged"]) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData", "uSNChanged"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) ctr = repl.ctr @@ -108,9 +120,9 @@ class DsdbTests(TestCase): self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) def test_set_replpropertymetadata(self): - res = self.samdb.search(expression="cn=Administrator", - scope=ldb.SCOPE_SUBTREE, - attrs=["replPropertyMetaData", "uSNChanged"]) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData", "uSNChanged"]) repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(res[0]["replPropertyMetaData"])) ctr = repl.ctr @@ -134,124 +146,119 @@ class DsdbTests(TestCase): self.assertEquals(self.samdb.get_attribute_from_attid(11979), None) def test_get_attribute_replmetadata_version(self): - res = self.samdb.search(expression="cn=Administrator", - scope=ldb.SCOPE_SUBTREE, - attrs=["dn"]) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["dn"]) self.assertEquals(len(res), 1) dn = str(res[0].dn) - self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2) def test_set_attribute_replmetadata_version(self): - res = self.samdb.search(expression="cn=Administrator", - scope=ldb.SCOPE_SUBTREE, - attrs=["dn"]) + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["dn"]) self.assertEquals(len(res), 1) dn = str(res[0].dn) version = self.samdb.get_attribute_replmetadata_version(dn, "description") self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2) self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2) - def test_db_lock(self): - basedn = self.samdb.get_default_basedn() - (r1, w1) = os.pipe() - - pid = os.fork() - if pid == 0: - # In the child, close the main DB, re-open just one DB - del(self.samdb) - gc.collect() - self.samdb = SamDB(session_info=self.session, - credentials=self.creds, - lp=self.lp) - - self.samdb.transaction_start() - - self.samdb.add({ - "dn": "cn=test_db_lock_user,cn=users," + str(basedn), - "objectclass": "user", - }) - - # Obtain a write lock - self.samdb.transaction_prepare_commit() - os.write(w1, b"added") - time.sleep(2) - - # Drop the write lock + def test_no_error_on_invalid_control(self): + try: + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"], + controls=["local_oid:%s:0" + % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED]) + except ldb.LdbError as e: + self.fail("Should have not raised an exception") + + def test_error_on_invalid_critical_control(self): + try: + res = self.samdb.search(scope=ldb.SCOPE_SUBTREE, + base=self.account_dn, + attrs=["replPropertyMetaData"], + controls=["local_oid:%s:1" + % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED]) + except ldb.LdbError as e: + if e[0] != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION: + self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION" + % e[1]) + + # Allocate a unique RID for use in the objectSID tests. + # + def allocate_rid(self): + self.samdb.transaction_start() + try: + rid = self.samdb.allocate_rid() + except: self.samdb.transaction_cancel() - os._exit(0) - - self.assertEqual(os.read(r1, 5), b"added") - - start = time.time() - - # We need to hold this iterator open to hold the all-record lock. - res = self.samdb.search_iterator() - - # This should take at least 2 seconds because the transaction - # has a write lock on one backend db open - - # Release the locks - for l in res: - pass - - end = time.time() - self.assertGreater(end - start, 1.9) - - (got_pid, status) = os.waitpid(pid, 0) - self.assertEqual(got_pid, pid) - self.assertTrue(os.WIFEXITED(status)) - self.assertEqual(os.WEXITSTATUS(status), 0) - - - def test_full_db_lock(self): - basedn = self.samdb.get_default_basedn() - backend_filename = "%s.ldb" % basedn.get_casefold() - backend_subpath = os.path.join("sam.ldb.d", - backend_filename) - backend_path = self.lp.private_path(backend_subpath) - (r1, w1) = os.pipe() - - pid = os.fork() - if pid == 0: - # In the child, close the main DB, re-open just one DB - del(self.samdb) - gc.collect() - - backenddb = ldb.Ldb(backend_path) - - - backenddb.transaction_start() - - backenddb.add({"dn":"@DSDB_LOCK_TEST"}) - backenddb.delete("@DSDB_LOCK_TEST") - - # Obtain a write lock - backenddb.transaction_prepare_commit() - os.write(w1, b"added") - time.sleep(2) - - # Drop the write lock - backenddb.transaction_cancel() - os._exit(0) - - self.assertEqual(os.read(r1, 5), b"added") - - start = time.time() - - # We need to hold this iterator open to hold the all-record lock. - res = self.samdb.search_iterator() - - # This should take at least 2 seconds because the transaction - # has a write lock on one backend db open - - end = time.time() - self.assertGreater(end - start, 1.9) - - # Release the locks - for l in res: - pass - - (got_pid, status) = os.waitpid(pid, 0) - self.assertEqual(got_pid, pid) - self.assertTrue(os.WIFEXITED(status)) - self.assertEqual(os.WEXITSTATUS(status), 0) + raise + self.samdb.transaction_commit() + return str(rid) + + # Ensure that duplicate objectSID's are permitted for foreign security + # principals. + # + def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self): + + # + # We need to build a foreign security principal SID + # i.e a SID not in the current domain. + # + dom_sid = self.samdb.get_domain_sid() + if str(dom_sid)[:-1] == "0": + c = "9" + else: + c = "0" + sid = str(dom_sid)[:-1] + c + "-1000" + basedn = self.samdb.get_default_basedn() + dn = "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid, basedn) + self.samdb.add({ + "dn": dn, + "objectClass": "foreignSecurityPrincipal"}) + + self.samdb.delete(dn) + + try: + self.samdb.add({ + "dn": dn, + "objectClass": "foreignSecurityPrincipal"}) + except ldb.LdbError as e: + (code, msg) = e + self.fail("Got unexpected exception %d - %s " + % (code, msg)) + + # + # Duplicate objectSID's should not be permitted for sids in the local + # domain. The test sequence is add an object, delete it, then attempt to + # re-add it, this should fail with a constraint violation + # + def test_duplicate_objectSIDs_not_allowed_on_local_objects(self): + + dom_sid = self.samdb.get_domain_sid() + rid = self.allocate_rid() + sid_str = str(dom_sid) + "-" + rid + sid = ndr_pack(security.dom_sid(sid_str)) + basedn = self.samdb.get_default_basedn() + cn = "dsdb_test_01" + dn = "cn=%s,cn=Users,%s" % (cn, basedn) + + self.samdb.add({ + "dn": dn, + "objectClass": "user", + "objectSID": sid}) + self.samdb.delete(dn) + + try: + self.samdb.add({ + "dn": dn, + "objectClass": "user", + "objectSID": sid}) + self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION") + except ldb.LdbError as e: + (code, msg) = e + if code != ldb.ERR_CONSTRAINT_VIOLATION: + self.fail("Got %d - %s should have got " + "LDB_ERR_CONSTRAINT_VIOLATION" + % (code, msg))