tests/getnc_exop: Ensure that all attids are valid in a given PAS
authorGarming Sam <garming@catalyst.net.nz>
Wed, 17 Aug 2016 02:26:55 +0000 (14:26 +1200)
committerGarming Sam <garming@samba.org>
Thu, 25 Aug 2016 08:32:08 +0000 (10:32 +0200)
On Windows this does not seem to fail, but causes silent errors.

Pair-programmed-with: Garming Sam <garming@catalyst.net.nz>

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Signed-off-by: Bob Campbell <bobcampbell@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
source4/torture/drs/python/getnc_exop.py

index 97894bc1361f42762e115320c74bce0bb558c135..858d02ed4c32a504f8db8eed55211a83db1ff27a 100644 (file)
@@ -79,7 +79,8 @@ class AbstractLink:
 
 class ExopBaseTest:
     def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
-                   replica_flags=0, max_objects=0, partial_attribute_set=None):
+                   replica_flags=0, max_objects=0, partial_attribute_set=None,
+                   mapping_ctr=None):
         req8 = drsuapi.DsGetNCChangesRequest8()
 
         req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
@@ -98,8 +99,11 @@ class ExopBaseTest:
         req8.fsmo_info = 0
         req8.partial_attribute_set = partial_attribute_set
         req8.partial_attribute_set_ex = None
-        req8.mapping_ctr.num_mappings = 0
-        req8.mapping_ctr.mappings = None
+        if mapping_ctr:
+            req8.mapping_ctr = mapping_ctr
+        else:
+            req8.mapping_ctr.num_mappings = 0
+            req8.mapping_ctr.mappings = None
 
         return req8
 
@@ -306,11 +310,8 @@ class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
             if enum == ldb.ERR_NO_SUCH_OBJECT:
                 pass
 
-    def get_partial_attribute_set(self):
+    def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
         partial_attribute_set = drsuapi.DsPartialAttributeSet()
-        attids = [drsuapi.DRSUAPI_ATTID_objectClass,
-                  drsuapi.DRSUAPI_ATTID_description,
-                  drsuapi.DRSUAPI_ATTID_displayName]
         partial_attribute_set.attids = attids
         partial_attribute_set.num_attids = len(attids)
         return partial_attribute_set
@@ -334,6 +335,57 @@ class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
         except Exception:
             self.fail("Missing prefixmap shouldn't have triggered an error")
 
+    def test_invalid_prefix_map_attid(self):
+        # Request for invalid attid
+        partial_attribute_set = self.get_partial_attribute_set([99999])
+
+        pfm = self._samdb_fetch_pfm_and_schi()
+
+        dc_guid_1 = self.ldb_dc1.get_invocation_id()
+
+        drs, drs_handle = self._ds_bind(self.dnsname_dc1)
+
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=dc_guid_1,
+                               nc_dn_str=self.user,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+                               partial_attribute_set=partial_attribute_set,
+                               mapping_ctr=pfm)
+
+        try:
+            (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+            self.fail("Invalid attid (99999) should have triggered an error")
+        except Exception as (ecode, emsg):
+            self.assertEqual(ecode, 0x000020E2, "Error code should have been "
+                             "WERR_DS_DRA_SCHEMA_MISMATCH")
+
+    def _samdb_fetch_pfm_and_schi(self):
+        """Fetch prefixMap and schemaInfo stored in SamDB using LDB connection"""
+        samdb = self.ldb_dc1
+        res = samdb.search(base=samdb.get_schema_basedn(), scope=SCOPE_BASE,
+                           attrs=["prefixMap", "schemaInfo"])
+
+        pfm = ndr_unpack(drsblobs.prefixMapBlob,
+                         str(res[0]['prefixMap']))
+
+        schi = drsuapi.DsReplicaOIDMapping()
+        schi.id_prefix = 0
+
+        if 'schemaInfo' in res[0]:
+            schi.oid.length = len(map(ord, str(res[0]['schemaInfo'])))
+            schi.oid.binary_oid = map(ord, str(res[0]['schemaInfo']))
+        else:
+            schema_info = drsblobs.schemaInfoBlob()
+            schema_info.revision = 0
+            schema_info.marker = 0xFF
+            schema_info.invocation_id = misc.GUID(samdb.get_invocation_id())
+            schi.oid.length = len(map(ord, ndr_pack(schema_info)))
+            schi.oid.binary_oid = map(ord, ndr_pack(schema_info))
+
+        pfm.ctr.mappings = pfm.ctr.mappings + [schi]
+        pfm.ctr.num_mappings += 1
+        return pfm.ctr
+
 class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
     def setUp(self):
         super(DrsReplicaSyncSortTestCase, self).setUp()