from samba.samdb import SamDB
from samba.auth import system_session
from samba.tests import TestCase
+from samba.tests import delete_force
from samba.ndr import ndr_unpack, ndr_pack
-from samba.dcerpc import drsblobs
+from samba.dcerpc import drsblobs, security
+from samba import dsdb
import ldb
-import os
import samba
-import gc
-import time
class DsdbTests(TestCase):
credentials=self.creds,
lp=self.lp)
+ # Create a test user
+ user_name = "samdb-testuser"
+ user_pass = samba.generate_random_password(32, 32)
+ user_description = "Test user for dsdb test"
+
+ base_dn = self.samdb.domain_dn()
+
+ self.account_dn = "cn=" + user_name + ",cn=Users," + base_dn
+ delete_force(self.samdb, self.account_dn)
+ self.samdb.newuser(username=user_name,
+ password=user_pass,
+ description=user_description)
+
def test_get_oid_from_attrid(self):
oid = self.samdb.get_oid_from_attid(591614)
self.assertEquals(oid, "1.2.840.113556.1.4.1790")
def test_error_replpropertymetadata(self):
- res = self.samdb.search(expression="cn=Administrator",
- scope=ldb.SCOPE_SUBTREE,
- attrs=["replPropertyMetaData"])
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["replPropertyMetaData"])
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(res[0]["replPropertyMetaData"]))
ctr = repl.ctr
self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
def test_error_replpropertymetadata_nochange(self):
- res = self.samdb.search(expression="cn=Administrator",
- scope=ldb.SCOPE_SUBTREE,
- attrs=["replPropertyMetaData"])
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["replPropertyMetaData"])
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(res[0]["replPropertyMetaData"]))
replBlob = ndr_pack(repl)
self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
def test_error_replpropertymetadata_allow_sort(self):
- res = self.samdb.search(expression="cn=Administrator",
- scope=ldb.SCOPE_SUBTREE,
- attrs=["replPropertyMetaData"])
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["replPropertyMetaData"])
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(res[0]["replPropertyMetaData"]))
replBlob = ndr_pack(repl)
self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
def test_twoatt_replpropertymetadata(self):
- res = self.samdb.search(expression="cn=Administrator",
- scope=ldb.SCOPE_SUBTREE,
- attrs=["replPropertyMetaData", "uSNChanged"])
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["replPropertyMetaData", "uSNChanged"])
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(res[0]["replPropertyMetaData"]))
ctr = repl.ctr
self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
def test_set_replpropertymetadata(self):
- res = self.samdb.search(expression="cn=Administrator",
- scope=ldb.SCOPE_SUBTREE,
- attrs=["replPropertyMetaData", "uSNChanged"])
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["replPropertyMetaData", "uSNChanged"])
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(res[0]["replPropertyMetaData"]))
ctr = repl.ctr
self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
def test_get_attribute_replmetadata_version(self):
- res = self.samdb.search(expression="cn=Administrator",
- scope=ldb.SCOPE_SUBTREE,
- attrs=["dn"])
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["dn"])
self.assertEquals(len(res), 1)
dn = str(res[0].dn)
- self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
+ self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2)
def test_set_attribute_replmetadata_version(self):
- res = self.samdb.search(expression="cn=Administrator",
- scope=ldb.SCOPE_SUBTREE,
- attrs=["dn"])
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["dn"])
self.assertEquals(len(res), 1)
dn = str(res[0].dn)
version = self.samdb.get_attribute_replmetadata_version(dn, "description")
self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
- def test_db_lock1(self):
- basedn = self.samdb.get_default_basedn()
- (r1, w1) = os.pipe()
-
- pid = os.fork()
- if pid == 0:
- # In the child, close the main DB, re-open just one DB
- del(self.samdb)
- gc.collect()
- self.samdb = SamDB(session_info=self.session,
- credentials=self.creds,
- lp=self.lp)
-
- self.samdb.transaction_start()
-
- dn = "cn=test_db_lock_user,cn=users," + str(basedn)
- self.samdb.add({
- "dn": dn,
- "objectclass": "user",
- })
- self.samdb.delete(dn)
-
- # Obtain a write lock
- self.samdb.transaction_prepare_commit()
- os.write(w1, b"prepared")
- time.sleep(2)
-
- # Drop the write lock
- self.samdb.transaction_cancel()
- os._exit(0)
-
- self.assertEqual(os.read(r1, 8), b"prepared")
-
- start = time.time()
-
- # We need to hold this iterator open to hold the all-record lock.
- res = self.samdb.search_iterator()
-
- # This should take at least 2 seconds because the transaction
- # has a write lock on one backend db open
-
- # Release the locks
- for l in res:
- pass
-
- end = time.time()
- self.assertGreater(end - start, 1.9)
-
- (got_pid, status) = os.waitpid(pid, 0)
- self.assertEqual(got_pid, pid)
- self.assertTrue(os.WIFEXITED(status))
- self.assertEqual(os.WEXITSTATUS(status), 0)
-
- def test_db_lock2(self):
- basedn = self.samdb.get_default_basedn()
- (r1, w1) = os.pipe()
- (r2, w2) = os.pipe()
-
- pid = os.fork()
- if pid == 0:
- # In the child, close the main DB, re-open
- del(self.samdb)
- gc.collect()
- self.samdb = SamDB(session_info=self.session,
- credentials=self.creds,
- lp=self.lp)
-
- # We need to hold this iterator open to hold the all-record lock.
- res = self.samdb.search_iterator()
-
- os.write(w2, b"start")
- if (os.read(r1, 7) != b"started"):
- os._exit(1)
-
- os.write(w2, b"add")
- if (os.read(r1, 5) != b"added"):
- os._exit(2)
-
- # Wait 2 seconds to block prepare_commit() in the child.
- os.write(w2, b"prepare")
- time.sleep(2)
-
- # Release the locks
- for l in res:
- pass
-
- if (os.read(r1, 8) != b"prepared"):
- os._exit(3)
-
- os._exit(0)
-
- # We can start the transaction during the search
- # because both just grab the all-record read lock.
- self.assertEqual(os.read(r2, 5), b"start")
+ def test_no_error_on_invalid_control(self):
+ try:
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["replPropertyMetaData"],
+ controls=["local_oid:%s:0"
+ % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
+ except ldb.LdbError as e:
+ self.fail("Should have not raised an exception")
+
+ def test_error_on_invalid_critical_control(self):
+ try:
+ res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+ base=self.account_dn,
+ attrs=["replPropertyMetaData"],
+ controls=["local_oid:%s:1"
+ % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
+ except ldb.LdbError as e:
+ if e[0] != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
+ self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
+ % e[1])
+
+ # Allocate a unique RID for use in the objectSID tests.
+ #
+ def allocate_rid(self):
self.samdb.transaction_start()
- os.write(w1, b"started")
-
- self.assertEqual(os.read(r2, 3), b"add")
- dn = "cn=test_db_lock_user,cn=users," + str(basedn)
- self.samdb.add({
- "dn": dn,
- "objectclass": "user",
- })
- self.samdb.delete(dn)
- os.write(w1, b"added")
-
- # Obtain a write lock, this will block until
- # the parent releases the read lock.
- self.assertEqual(os.read(r2, 7), b"prepare")
- start = time.time()
- self.samdb.transaction_prepare_commit()
- end = time.time()
try:
- self.assertGreater(end - start, 1.9)
+ rid = self.samdb.allocate_rid()
except:
- raise
- finally:
- os.write(w1, b"prepared")
-
- # Drop the write lock
self.samdb.transaction_cancel()
+ raise
+ self.samdb.transaction_commit()
+ return str(rid)
+
+ # Ensure that duplicate objectSID's are permitted for foreign security
+ # principals.
+ #
+ def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self):
+
+ #
+ # We need to build a foreign security principal SID
+ # i.e a SID not in the current domain.
+ #
+ dom_sid = self.samdb.get_domain_sid()
+ if str(dom_sid)[:-1] == "0":
+ c = "9"
+ else:
+ c = "0"
+ sid = str(dom_sid)[:-1] + c + "-1000"
+ basedn = self.samdb.get_default_basedn()
+ dn = "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid, basedn)
+ self.samdb.add({
+ "dn": dn,
+ "objectClass": "foreignSecurityPrincipal"})
- (got_pid, status) = os.waitpid(pid, 0)
- self.assertEqual(got_pid, pid)
- self.assertTrue(os.WIFEXITED(status))
- self.assertEqual(os.WEXITSTATUS(status), 0)
-
- def test_full_db_lock1(self):
- basedn = self.samdb.get_default_basedn()
- backend_filename = "%s.ldb" % basedn.get_casefold()
- backend_subpath = os.path.join("sam.ldb.d",
- backend_filename)
- backend_path = self.lp.private_path(backend_subpath)
- (r1, w1) = os.pipe()
-
- pid = os.fork()
- if pid == 0:
- # In the child, close the main DB, re-open just one DB
- del(self.samdb)
- gc.collect()
-
- backenddb = ldb.Ldb(backend_path)
-
-
- backenddb.transaction_start()
-
- backenddb.add({"dn":"@DSDB_LOCK_TEST"})
- backenddb.delete("@DSDB_LOCK_TEST")
-
- # Obtain a write lock
- backenddb.transaction_prepare_commit()
- os.write(w1, b"prepared")
- time.sleep(2)
-
- # Drop the write lock
- backenddb.transaction_cancel()
- os._exit(0)
-
- self.assertEqual(os.read(r1, 8), b"prepared")
-
- start = time.time()
-
- # We need to hold this iterator open to hold the all-record lock.
- res = self.samdb.search_iterator()
-
- # This should take at least 2 seconds because the transaction
- # has a write lock on one backend db open
-
- end = time.time()
- self.assertGreater(end - start, 1.9)
-
- # Release the locks
- for l in res:
- pass
-
- (got_pid, status) = os.waitpid(pid, 0)
- self.assertEqual(got_pid, pid)
- self.assertTrue(os.WIFEXITED(status))
- self.assertEqual(os.WEXITSTATUS(status), 0)
-
- def test_full_db_lock2(self):
- basedn = self.samdb.get_default_basedn()
- backend_filename = "%s.ldb" % basedn.get_casefold()
- backend_subpath = os.path.join("sam.ldb.d",
- backend_filename)
- backend_path = self.lp.private_path(backend_subpath)
- (r1, w1) = os.pipe()
- (r2, w2) = os.pipe()
-
- pid = os.fork()
- if pid == 0:
-
- # In the child, close the main DB, re-open
- del(self.samdb)
- gc.collect()
- self.samdb = SamDB(session_info=self.session,
- credentials=self.creds,
- lp=self.lp)
-
- # We need to hold this iterator open to hold the all-record lock.
- res = self.samdb.search_iterator()
-
- os.write(w2, b"start")
- if (os.read(r1, 7) != b"started"):
- os._exit(1)
- os.write(w2, b"add")
- if (os.read(r1, 5) != b"added"):
- os._exit(2)
-
- # Wait 2 seconds to block prepare_commit() in the child.
- os.write(w2, b"prepare")
- time.sleep(2)
-
- # Release the locks
- for l in res:
- pass
-
- if (os.read(r1, 8) != b"prepared"):
- os._exit(3)
-
- os._exit(0)
-
- # In the parent, close the main DB, re-open just one DB
- del(self.samdb)
- gc.collect()
- backenddb = ldb.Ldb(backend_path)
-
- # We can start the transaction during the search
- # because both just grab the all-record read lock.
- self.assertEqual(os.read(r2, 5), b"start")
- backenddb.transaction_start()
- os.write(w1, b"started")
-
- self.assertEqual(os.read(r2, 3), b"add")
- backenddb.add({"dn":"@DSDB_LOCK_TEST"})
- backenddb.delete("@DSDB_LOCK_TEST")
- os.write(w1, b"added")
-
- # Obtain a write lock, this will block until
- # the child releases the read lock.
- self.assertEqual(os.read(r2, 7), b"prepare")
- start = time.time()
- backenddb.transaction_prepare_commit()
- end = time.time()
+ self.samdb.delete(dn)
try:
- self.assertGreater(end - start, 1.9)
- except:
- raise
- finally:
- os.write(w1, b"prepared")
+ self.samdb.add({
+ "dn": dn,
+ "objectClass": "foreignSecurityPrincipal"})
+ except ldb.LdbError as e:
+ (code, msg) = e
+ self.fail("Got unexpected exception %d - %s "
+ % (code, msg))
+
+ #
+ # Duplicate objectSID's should not be permitted for sids in the local
+ # domain. The test sequence is add an object, delete it, then attempt to
+ # re-add it, this should fail with a constraint violation
+ #
+ def test_duplicate_objectSIDs_not_allowed_on_local_objects(self):
+
+ dom_sid = self.samdb.get_domain_sid()
+ rid = self.allocate_rid()
+ sid_str = str(dom_sid) + "-" + rid
+ sid = ndr_pack(security.dom_sid(sid_str))
+ basedn = self.samdb.get_default_basedn()
+ cn = "dsdb_test_01"
+ dn = "cn=%s,cn=Users,%s" % (cn, basedn)
- # Drop the write lock
- backenddb.transaction_cancel()
+ self.samdb.add({
+ "dn": dn,
+ "objectClass": "user",
+ "objectSID": sid})
+ self.samdb.delete(dn)
- (got_pid, status) = os.waitpid(pid, 0)
- self.assertEqual(got_pid, pid)
- self.assertTrue(os.WIFEXITED(status))
- self.assertEqual(os.WEXITSTATUS(status), 0)
+ try:
+ self.samdb.add({
+ "dn": dn,
+ "objectClass": "user",
+ "objectSID": sid})
+ self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
+ except ldb.LdbError as e:
+ (code, msg) = e
+ if code != ldb.ERR_CONSTRAINT_VIOLATION:
+ self.fail("Got %d - %s should have got "
+ "LDB_ERR_CONSTRAINT_VIOLATION"
+ % (code, msg))