tests dsdb: Add tests for optionally unique objectSID's
[samba.git] / python / samba / tests / dsdb.py
index bafd040d8659529f64b1c65bb3eca02485f7224c..34a9435ea445a7d0e24c2e230bced757285663e4 100644 (file)
@@ -21,13 +21,12 @@ from samba.credentials import Credentials
 from samba.samdb import SamDB
 from samba.auth import system_session
 from samba.tests import TestCase
+from samba.tests import delete_force
 from samba.ndr import ndr_unpack, ndr_pack
-from samba.dcerpc import drsblobs
+from samba.dcerpc import drsblobs, security
+from samba import dsdb
 import ldb
-import os
 import samba
-import gc
-import time
 
 class DsdbTests(TestCase):
 
@@ -41,14 +40,27 @@ class DsdbTests(TestCase):
                            credentials=self.creds,
                            lp=self.lp)
 
+        # Create a test user
+        user_name = "samdb-testuser"
+        user_pass = samba.generate_random_password(32, 32)
+        user_description = "Test user for dsdb test"
+
+        base_dn = self.samdb.domain_dn()
+
+        self.account_dn = "cn=" + user_name + ",cn=Users," + base_dn
+        delete_force(self.samdb, self.account_dn)
+        self.samdb.newuser(username=user_name,
+                           password=user_pass,
+                           description=user_description)
+
     def test_get_oid_from_attrid(self):
         oid = self.samdb.get_oid_from_attid(591614)
         self.assertEquals(oid, "1.2.840.113556.1.4.1790")
 
     def test_error_replpropertymetadata(self):
-        res = self.samdb.search(expression="cn=Administrator",
-                            scope=ldb.SCOPE_SUBTREE,
-                            attrs=["replPropertyMetaData"])
+        res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                base=self.account_dn,
+                                attrs=["replPropertyMetaData"])
         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
                             str(res[0]["replPropertyMetaData"]))
         ctr = repl.ctr
@@ -64,9 +76,9 @@ class DsdbTests(TestCase):
         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
 
     def test_error_replpropertymetadata_nochange(self):
-        res = self.samdb.search(expression="cn=Administrator",
-                            scope=ldb.SCOPE_SUBTREE,
-                            attrs=["replPropertyMetaData"])
+        res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                base=self.account_dn,
+                                attrs=["replPropertyMetaData"])
         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
                             str(res[0]["replPropertyMetaData"]))
         replBlob = ndr_pack(repl)
@@ -76,9 +88,9 @@ class DsdbTests(TestCase):
         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
 
     def test_error_replpropertymetadata_allow_sort(self):
-        res = self.samdb.search(expression="cn=Administrator",
-                            scope=ldb.SCOPE_SUBTREE,
-                            attrs=["replPropertyMetaData"])
+        res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                base=self.account_dn,
+                                attrs=["replPropertyMetaData"])
         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
                             str(res[0]["replPropertyMetaData"]))
         replBlob = ndr_pack(repl)
@@ -88,9 +100,9 @@ class DsdbTests(TestCase):
         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
 
     def test_twoatt_replpropertymetadata(self):
-        res = self.samdb.search(expression="cn=Administrator",
-                            scope=ldb.SCOPE_SUBTREE,
-                            attrs=["replPropertyMetaData", "uSNChanged"])
+        res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                base=self.account_dn,
+                                attrs=["replPropertyMetaData", "uSNChanged"])
         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
                             str(res[0]["replPropertyMetaData"]))
         ctr = repl.ctr
@@ -108,9 +120,9 @@ class DsdbTests(TestCase):
         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
 
     def test_set_replpropertymetadata(self):
-        res = self.samdb.search(expression="cn=Administrator",
-                            scope=ldb.SCOPE_SUBTREE,
-                            attrs=["replPropertyMetaData", "uSNChanged"])
+        res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                base=self.account_dn,
+                                attrs=["replPropertyMetaData", "uSNChanged"])
         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
                             str(res[0]["replPropertyMetaData"]))
         ctr = repl.ctr
@@ -134,70 +146,119 @@ class DsdbTests(TestCase):
         self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
 
     def test_get_attribute_replmetadata_version(self):
-        res = self.samdb.search(expression="cn=Administrator",
-                            scope=ldb.SCOPE_SUBTREE,
-                            attrs=["dn"])
+        res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                base=self.account_dn,
+                                attrs=["dn"])
         self.assertEquals(len(res), 1)
         dn = str(res[0].dn)
-        self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
+        self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2)
 
     def test_set_attribute_replmetadata_version(self):
-        res = self.samdb.search(expression="cn=Administrator",
-                            scope=ldb.SCOPE_SUBTREE,
-                            attrs=["dn"])
+        res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                base=self.account_dn,
+                                attrs=["dn"])
         self.assertEquals(len(res), 1)
         dn = str(res[0].dn)
         version = self.samdb.get_attribute_replmetadata_version(dn, "description")
         self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
 
-    def test_db_lock(self):
-        basedn = self.samdb.get_default_basedn()
-        (r1, w1) = os.pipe()
-
-        pid = os.fork()
-        if pid == 0:
-            # In the child, close the main DB, re-open just one DB
-            del(self.samdb)
-            gc.collect()
-            self.samdb = SamDB(session_info=self.session,
-                               credentials=self.creds,
-                               lp=self.lp)
-
-            self.samdb.transaction_start()
+    def test_no_error_on_invalid_control(self):
+        try:
+            res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                    base=self.account_dn,
+                                    attrs=["replPropertyMetaData"],
+                                    controls=["local_oid:%s:0"
+                                              % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
+        except ldb.LdbError as e:
+            self.fail("Should have not raised an exception")
 
-            self.samdb.add({
-                 "dn": "cn=test_db_lock_user,cn=users," + str(basedn),
-                 "objectclass": "user",
-            })
-
-            # Obtain a write lock
-            self.samdb.transaction_prepare_commit()
-            os.write(w1, b"added")
-            time.sleep(2)
+    def test_error_on_invalid_critical_control(self):
+        try:
+            res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                    base=self.account_dn,
+                                    attrs=["replPropertyMetaData"],
+                                    controls=["local_oid:%s:1"
+                                              % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
+        except ldb.LdbError as e:
+            if e[0] != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
+                self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
+                          % e[1])
 
-            # Drop the write lock
+    # Allocate a unique RID for use in the objectSID tests.
+    #
+    def allocate_rid(self):
+        self.samdb.transaction_start()
+        try:
+            rid = self.samdb.allocate_rid()
+        except:
             self.samdb.transaction_cancel()
-            os._exit(0)
+            raise
+        self.samdb.transaction_commit()
+        return str(rid)
 
-        self.assertEqual(os.read(r1, 5), b"added")
+    # Ensure that duplicate objectSID's are permitted for foreign security
+    # principals.
+    #
+    def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self):
 
-        start = time.time()
+        #
+        # We need to build a foreign security principal SID
+        # i.e a  SID not in the current domain.
+        #
+        dom_sid = self.samdb.get_domain_sid()
+        if str(dom_sid)[:-1] == "0":
+            c = "9"
+        else:
+            c = "0"
+        sid     = str(dom_sid)[:-1] + c + "-1000"
+        basedn  = self.samdb.get_default_basedn()
+        dn      = "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid, basedn)
+        self.samdb.add({
+            "dn": dn,
+            "objectClass": "foreignSecurityPrincipal"})
 
-        # We need to hold this iterator open to hold the all-record lock.
-        res = self.samdb.search_iterator()
+        self.samdb.delete(dn)
 
-        # This should take at least 2 seconds because the transaction
-        # has a write lock on one backend db open
+        try:
+            self.samdb.add({
+                "dn": dn,
+                "objectClass": "foreignSecurityPrincipal"})
+        except ldb.LdbError as e:
+            (code, msg) = e
+            self.fail("Got unexpected exception %d - %s "
+                      % (code, msg))
 
-        # Release the locks
-        for l in res:
-            pass
+    #
+    # Duplicate objectSID's should not be permitted for sids in the local
+    # domain. The test sequence is add an object, delete it, then attempt to
+    # re-add it, this should fail with a constraint violation
+    #
+    def test_duplicate_objectSIDs_not_allowed_on_local_objects(self):
 
-        end = time.time()
-        self.assertGreater(end - start, 1.9)
+        dom_sid = self.samdb.get_domain_sid()
+        rid     = self.allocate_rid()
+        sid_str = str(dom_sid) + "-" + rid
+        sid     = ndr_pack(security.dom_sid(sid_str))
+        basedn  = self.samdb.get_default_basedn()
+        cn       = "dsdb_test_01"
+        dn      = "cn=%s,cn=Users,%s" % (cn, basedn)
 
-        (got_pid, status) = os.waitpid(pid, 0)
-        self.assertEqual(got_pid, pid)
-        self.assertTrue(os.WIFEXITED(status))
-        self.assertEqual(os.WEXITSTATUS(status), 0)
+        self.samdb.add({
+            "dn": dn,
+            "objectClass": "user",
+            "objectSID": sid})
+        self.samdb.delete(dn)
+
+        try:
+            self.samdb.add({
+                "dn": dn,
+                "objectClass": "user",
+                "objectSID": sid})
+            self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
+        except ldb.LdbError as e:
+            (code, msg) = e
+            if code != ldb.ERR_CONSTRAINT_VIOLATION:
+                self.fail("Got %d - %s should have got "
+                          "LDB_ERR_CONSTRAINT_VIOLATION"
+                          % (code, msg))