s4-test/repl_schema: Refactor the test a little bit to
authorKamen Mazdrashki <kamenim@samba.org>
Tue, 23 Nov 2010 15:15:21 +0000 (17:15 +0200)
committerKamen Mazdrashki <kamenim@samba.org>
Tue, 23 Nov 2010 17:44:46 +0000 (18:44 +0100)
- Cleanup imports
- make sure we have testtools and subunit imported
- use dictionaries instead of LDIFs for schema modification
  so now callers for _make_class can easily add new attributes
  for created class
- simplify a little bit classSchema creation
- test attributeSchema replication

Autobuild-User: Kamen Mazdrashki <kamenim@samba.org>
Autobuild-Date: Tue Nov 23 18:44:46 CET 2010 on sn-devel-104

source4/torture/drs/python/repl_schema.py

index 4016ed210543079bb0e99c1925f560e0a569d5ca..16f00acdc56e51948b8c8e2e3fc81eae6bc764d5 100644 (file)
@@ -33,11 +33,14 @@ import random
 import os
 
 sys.path.append("bin/python")
+import samba
+samba.ensure_external_module("testtools", "testtools")
+samba.ensure_external_module("subunit", "subunit/python")
 
-from samba.auth import system_session
 from ldb import LdbError, ERR_NO_SUCH_OBJECT
-from ldb import SCOPE_BASE, SCOPE_SUBTREE
-from samba.samdb import SamDB
+from ldb import SCOPE_BASE
+from ldb import Message
+from ldb import FLAG_MOD_REPLACE
 
 import samba.tests
 
@@ -96,34 +99,57 @@ class DrsReplSchemaTestCase(samba.tests.TestCase):
         return self.ldb_dc1.schema_format_value("objectGUID", guid)
 
     def _ldap_schemaUpdateNow(self, sam_db):
-        ldif = """
-dn:
-changetype: modify
-add: schemaUpdateNow
-schemaUpdateNow: 1
-"""
-        sam_db.modify_ldif(ldif)
+        rec = {"dn": "",
+               "schemaUpdateNow": "1"}
+        m = Message.from_dict(sam_db, rec, FLAG_MOD_REPLACE)
+        sam_db.modify(m)
 
     def _make_obj_names(self, base_name):
         '''Try to create a unique name for an object
            that is to be added to schema'''
         obj_name = self.obj_prefix + base_name
+        obj_ldn = obj_name.replace("-", "")
         obj_dn = "CN=%s,%s" % (obj_name, self.schema_dn)
-        return (obj_name, obj_dn)
-
-    def _make_class_ldif(self, class_name, class_dn, attrs=None):
-        ldif = """
-dn: """ + class_dn + """
-objectClass: top
-objectClass: classSchema
-cn: """ + class_name + """
-governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.13
-instanceType: 4
-objectClassCategory: 1
-subClassOf: organizationalPerson
-systemOnly: FALSE
-"""
-        return ldif
+        return (obj_dn, obj_name, obj_ldn)
+
+    def _schema_new_class(self, ldb_ctx, base_name, attrs=None):
+        (class_dn, class_name, class_ldn) = self._make_obj_names(base_name)
+        rec = {"dn": class_dn,
+               "objectClass": ["top", "classSchema"],
+               "cn": class_name,
+               "lDAPDisplayName": class_ldn,
+               "governsId": "1.2.840." + str(random.randint(1,100000)) + ".1.5.13",
+               "instanceType": "4",
+               "objectClassCategory": "1",
+               "subClassOf": "organizationalPerson",
+               "systemOnly": "FALSE"}
+        # allow overriding/adding attributes
+        if not attrs is None:
+            rec.update(attrs)
+        # add it to the Schema
+        ldb_ctx.add(rec)
+        self._ldap_schemaUpdateNow(ldb_ctx)
+        return (rec["lDAPDisplayName"], rec["dn"])
+
+    def _schema_new_attr(self, ldb_ctx, base_name, attrs=None):
+        (attr_dn, attr_name, attr_ldn) = self._make_obj_names(base_name)
+        rec = {"dn": attr_dn,
+               "objectClass": ["top", "attributeSchema"],
+               "cn": attr_name,
+               "lDAPDisplayName": attr_ldn,
+               "attributeId": "1.2.841." + str(random.randint(1,100000)) + ".1.5.13",
+               "attributeSyntax": "2.5.5.12",
+               "omSyntax": "64",
+               "instanceType": "4",
+               "isSingleValued": "TRUE",
+               "systemOnly": "FALSE"}
+        # allow overriding/adding attributes
+        if not attrs is None:
+            rec.update(attrs)
+        # add it to the Schema
+        ldb_ctx.add(rec)
+        self._ldap_schemaUpdateNow(ldb_ctx)
+        return (rec["lDAPDisplayName"], rec["dn"])
 
     def _check_object(self, obj_dn):
         '''Check if object obj_dn exists on both DCs'''
@@ -146,12 +172,14 @@ systemOnly: FALSE
            and attributeSchema objects, replicate Schema NC
            and then check all objects are replicated correctly"""
 
+        # add new attributeSchema object
+        (a_ldn, a_dn) = self._schema_new_attr(self.ldb_dc1, "attr-A")
         # add new classSchema object
-        (class_name, class_dn) = self._make_obj_names("cls-A")
-        ldif = self._make_class_ldif(class_name, class_dn)
-        self.ldb_dc1.add_ldif(ldif)
-        self._ldap_schemaUpdateNow(self.ldb_dc1)
+        (c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-A")
+
         # force replication from DC1 to DC2
         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, nc_dn=self.schema_dn)
-        # check object is replicated
-        self._check_object(class_dn)
+        
+        # check objects are replicated
+        self._check_object(c_dn)
+        self._check_object(a_dn)