repl_meta_data: Correctly use msDS-IntId for custom schema, not the prefixMap value
[abartlet/samba.git/.git] / source4 / dsdb / tests / python / ldap_schema.py
index fb0ef9ca564d6c90ba24fc935753d408de630fb2..2d20b48768494b07936635cb3498d4586a030a43 100755 (executable)
@@ -2,6 +2,23 @@
 # -*- coding: utf-8 -*-
 # This is a port of the original in testprogs/ejs/ldap.js
 
+# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008-2011
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+
 import optparse
 import sys
 import time
@@ -10,8 +27,7 @@ import os
 
 sys.path.insert(0, "bin/python")
 import samba
-samba.ensure_external_module("testtools", "testtools")
-samba.ensure_external_module("subunit", "subunit/python")
+from samba.tests.subunitrun import TestProgram, SubunitOptions
 
 import samba.getopt as options
 
@@ -25,9 +41,8 @@ from ldb import FLAG_MOD_REPLACE
 from samba.samdb import SamDB
 from samba.dsdb import DS_DOMAIN_FUNCTION_2003
 from samba.tests import delete_force
-
-from subunit.run import SubunitTestRunner
-import unittest
+from samba.ndr import ndr_unpack
+from samba.dcerpc import drsblobs
 
 parser = optparse.OptionParser("ldap_schema.py [options] <host>")
 sambaopts = options.SambaOptions(parser)
@@ -36,6 +51,8 @@ parser.add_option_group(options.VersionOptions(parser))
 # use command line creds if available
 credopts = options.CredentialsOptions(parser)
 parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
 opts, args = parser.parse_args()
 
 if len(args) < 1:
@@ -52,9 +69,10 @@ class SchemaTests(samba.tests.TestCase):
 
     def setUp(self):
         super(SchemaTests, self).setUp()
-        self.ldb = ldb
-        self.base_dn = ldb.domain_dn()
-        self.schema_dn = ldb.get_schema_basedn().get_linearized()
+        self.ldb = SamDB(host, credentials=creds,
+            session_info=system_session(lp), lp=lp, options=ldb_options)
+        self.base_dn = self.ldb.domain_dn()
+        self.schema_dn = self.ldb.get_schema_basedn().get_linearized()
 
     def test_generated_schema(self):
         """Testing we can read the generated schema via LDAP"""
@@ -108,10 +126,16 @@ schemaUpdateNow: 1
         # Search for created attribute
         res = []
         res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE,
-                              attrs=["lDAPDisplayName","schemaIDGUID"])
+                              attrs=["lDAPDisplayName","schemaIDGUID", "msDS-IntID"])
         self.assertEquals(len(res), 1)
         self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
         self.assertTrue("schemaIDGUID" in res[0])
+        if "msDS-IntId" in res[0]:
+            msDS_IntId = int(res[0]["msDS-IntId"][0])
+            if msDS_IntId < 0:
+                msDS_IntId += (1 << 32)
+        else:
+            msDS_IntId = None
 
         class_name = "test-Class" + time.strftime("%s", time.gmtime())
         class_ldap_display_name = class_name.replace("-", "")
@@ -195,9 +219,24 @@ name: """ + object_name + """
         self.ldb.add_ldif(ldif)
 
         # Search for created object
-        res = []
-        res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["dn"])
-        self.assertEquals(len(res), 1)
+        obj_res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["replPropertyMetaData"])
+
+        self.assertEquals(len(obj_res), 1)
+        self.assertTrue("replPropertyMetaData" in obj_res[0])
+        val = obj_res[0]["replPropertyMetaData"][0]
+        repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(val))
+        obj = repl.ctr
+
+        # Windows 2000 functional level won't have this.  It is too
+        # hard to work it out from the prefixmap however, so we skip
+        # this test in that case.
+        if msDS_IntId is not None:
+            found = False
+            for o in repl.ctr.array:
+                if o.attid == msDS_IntId:
+                    found = True
+                    break
+            self.assertTrue(found, "Did not find 0x%08x in replPropertyMetaData" % msDS_IntId)
         # Delete the object
         delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn))
 
@@ -264,8 +303,9 @@ class SchemaTests_msDS_IntId(samba.tests.TestCase):
 
     def setUp(self):
         super(SchemaTests_msDS_IntId, self).setUp()
-        self.ldb = ldb
-        res = ldb.search(base="", expression="", scope=SCOPE_BASE,
+        self.ldb = SamDB(host, credentials=creds,
+            session_info=system_session(lp), lp=lp, options=ldb_options)
+        res = self.ldb.search(base="", expression="", scope=SCOPE_BASE,
                          attrs=["schemaNamingContext", "defaultNamingContext",
                                 "forestFunctionality"])
         self.assertEquals(len(res), 1)
@@ -540,8 +580,9 @@ class SchemaTests_msDS_isRODC(samba.tests.TestCase):
 
     def setUp(self):
         super(SchemaTests_msDS_isRODC, self).setUp()
-        self.ldb = ldb
-        res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"])
+        self.ldb =  SamDB(host, credentials=creds,
+            session_info=system_session(lp), lp=lp, options=ldb_options)
+        res = self.ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"])
         self.assertEquals(len(res), 1)
         self.base_dn = res[0]["defaultNamingContext"][0]
 
@@ -585,15 +626,4 @@ if host.startswith("ldap://"):
     # user 'paged_search' module when connecting remotely
     ldb_options = ["modules:paged_searches"]
 
-ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp, options=ldb_options)
-
-runner = SubunitTestRunner()
-rc = 0
-if not runner.run(unittest.makeSuite(SchemaTests)).wasSuccessful():
-    rc = 1
-if not runner.run(unittest.makeSuite(SchemaTests_msDS_IntId)).wasSuccessful():
-    rc = 1
-if not runner.run(unittest.makeSuite(SchemaTests_msDS_isRODC)).wasSuccessful():
-    rc = 1
-
-sys.exit(rc)
+TestProgram(module=__name__, opts=subunitopts)