# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
#
+import random
+
import drs_base
+from drs_base import AbstractLink
+
import samba.tests
+import random
+from samba import werror, WERRORError
import ldb
from ldb import SCOPE_BASE
# Ascending target object GUID
return cmp(ndr_pack(la1_target), ndr_pack(la2_target))
-class AbstractLink:
- def __init__(self, attid, flags, identifier, targetGUID):
- self.attid = attid
- self.flags = flags
- self.identifier = identifier
- self.targetGUID = targetGUID
-
- def __eq__(self, other):
- return isinstance(other, AbstractLink) and \
- ((self.attid, self.flags, self.identifier, self.targetGUID) ==
- (other.attid, other.flags, other.identifier, other.targetGUID))
-
- def __hash__(self):
- return hash((self.attid, self.flags, self.identifier, self.targetGUID))
-
-class ExopBaseTest:
- def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
- replica_flags=0, max_objects=0, partial_attribute_set=None,
- partial_attribute_set_ex=None, mapping_ctr=None):
- req8 = drsuapi.DsGetNCChangesRequest8()
-
- req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
- req8.source_dsa_invocation_id = misc.GUID(invocation_id)
- req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
- req8.naming_context.dn = unicode(nc_dn_str)
- req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
- req8.highwatermark.tmp_highest_usn = 0
- req8.highwatermark.reserved_usn = 0
- req8.highwatermark.highest_usn = 0
- req8.uptodateness_vector = None
- req8.replica_flags = replica_flags
- req8.max_object_count = max_objects
- req8.max_ndr_size = 402116
- req8.extended_op = exop
- req8.fsmo_info = 0
- req8.partial_attribute_set = partial_attribute_set
- req8.partial_attribute_set_ex = partial_attribute_set_ex
- if mapping_ctr:
- req8.mapping_ctr = mapping_ctr
- else:
- req8.mapping_ctr.num_mappings = 0
- req8.mapping_ctr.mappings = None
-
- return req8
-
- def _ds_bind(self, server_name):
- binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
-
- drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
- (drs_handle, supported_extensions) = drs_DsBind(drs)
- return (drs, drs_handle)
-
-class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
+class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
"""Intended as a semi-black box test case for DsGetNCChanges
implementation for extended operations. It should be testing
how DsGetNCChanges handles different input params (mostly invalid).
def setUp(self):
super(DrsReplicaSyncTestCase, self).setUp()
+ self.base_dn = self.ldb_dc1.get_default_basedn()
+ self.ou = "OU=test_getncchanges,%s" % self.base_dn
+ self.ldb_dc1.add({
+ "dn": self.ou,
+ "objectclass": "organizationalUnit"})
+ (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
+ (self.default_hwm, self.default_utdv) = self._get_highest_hwm_utdv(self.ldb_dc1)
def tearDown(self):
+ try:
+ self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
+ except ldb.LdbError as (enum, string):
+ if enum == ldb.ERR_NO_SUCH_OBJECT:
+ pass
super(DrsReplicaSyncTestCase, self).tearDown()
def _determine_fSMORoleOwner(self, fsmo_obj_dn):
self.assertEqual(ctr6.linked_attributes, [])
self.assertEqual(ctr6.drs_error[0], 0)
+ def test_do_single_repl(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ never replicates more than
+ one object, even when we use DRS_GET_ANC.
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ ou2 = "OU=get_anc2,%s" % ou1
+ self.ldb_dc1.add({
+ "dn": ou2,
+ "objectclass": "organizationalUnit"
+ })
+ ou2_id = self._get_identifier(self.ldb_dc1, ou2)
+ dc3 = "CN=test_anc_dc_%u,%s" % (random.randint(0, 4294967295), ou2)
+ self.ldb_dc1.add({
+ "dn": dc3,
+ "objectclass": "computer",
+ "userAccountControl": "%d" % (samba.dsdb.UF_ACCOUNTDISABLE | samba.dsdb.UF_SERVER_TRUST_ACCOUNT)
+ })
+ dc3_id = self._get_identifier(self.ldb_dc1, dc3)
+
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
+ self._check_ctr6(ctr, [ou1])
+
+ # DRSUAPI_DRS_WRIT_REP means that we should only replicate the dn we give (dc3).
+ # DRSUAPI_DRS_GET_ANC means that we should also replicate its ancestors, but
+ # Windows doesn't do this if we use both.
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=dc3,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP |
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
+ self._check_ctr6(ctr, [dc3])
+
+ # Even though the ancestor of ou2 (ou1) has changed since last hwm, and we're
+ # sending DRSUAPI_DRS_GET_ANC, the expected response is that it will only try
+ # and replicate the single object still.
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou2,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+ replica_flags=drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
+ self._check_ctr6(ctr, [ou2])
+
+ def test_do_full_repl_on_ou(self):
+ """
+ Make sure that a full replication on a not-an-nc fails with
+ the right error code
+ """
+
+ non_nc_ou = "OU=not-an-NC,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": non_nc_ou,
+ "objectclass": "organizationalUnit"
+ })
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=non_nc_ou,
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ try:
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
+ self.fail("Expected DsGetNCChanges to fail with WERR_DS_CANT_FIND_EXPECTED_NC")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
+ def test_link_utdv_hwm(self):
+ """Test verify the DRS_GET_ANC behavior."""
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ ou2 = "OU=get_anc2,%s" % ou1
+ self.ldb_dc1.add({
+ "dn": ou2,
+ "objectclass": "organizationalUnit"
+ })
+ ou2_id = self._get_identifier(self.ldb_dc1, ou2)
+ dc3 = "CN=test_anc_dc_%u,%s" % (random.randint(0, 4294967295), ou2)
+ self.ldb_dc1.add({
+ "dn": dc3,
+ "objectclass": "computer",
+ "userAccountControl": "%d" % (samba.dsdb.UF_ACCOUNTDISABLE | samba.dsdb.UF_SERVER_TRUST_ACCOUNT)
+ })
+ dc3_id = self._get_identifier(self.ldb_dc1, dc3)
+
+ (hwm1, utdv1) = self._check_replication([ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ self._check_replication([ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
+
+ self._check_replication([ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, ou1)
+ m["displayName"] = ldb.MessageElement("OU1", ldb.FLAG_MOD_ADD, "displayName")
+ self.ldb_dc1.modify(m)
+
+ (hwm2, utdv2) = self._check_replication([ou2,dc3,ou1],
+ drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ self._check_replication([ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
+
+ self._check_replication([ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([ou1],
+ drsuapi.DRSUAPI_DRS_WRIT_REP,
+ highwatermark=hwm1)
+
+ self._check_replication([ou1],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ highwatermark=hwm1)
+
+ self._check_replication([ou1],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ uptodateness_vector=utdv1)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, ou2)
+ m["displayName"] = ldb.MessageElement("OU2", ldb.FLAG_MOD_ADD, "displayName")
+ self.ldb_dc1.modify(m)
+
+ (hwm3, utdv3) = self._check_replication([dc3,ou1,ou2],
+ drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ self._check_replication([ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
+
+ self._check_replication([ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([ou1,ou2],
+ drsuapi.DRSUAPI_DRS_WRIT_REP,
+ highwatermark=hwm1)
+
+ self._check_replication([ou1,ou2],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ highwatermark=hwm1)
+
+ self._check_replication([ou1,ou2],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ uptodateness_vector=utdv1)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, self.ou)
+ m["displayName"] = ldb.MessageElement("OU", ldb.FLAG_MOD_ADD, "displayName")
+ self.ldb_dc1.modify(m)
+
+ (hwm4, utdv4) = self._check_replication([dc3,ou1,ou2,self.ou],
+ drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ self._check_replication([self.ou,ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
+
+ self._check_replication([self.ou,ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([self.ou,ou2],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ uptodateness_vector=utdv2)
+
+ cn3 = "CN=get_anc3,%s" % ou2
+ self.ldb_dc1.add({
+ "dn": cn3,
+ "objectclass": "container",
+ })
+ cn3_id = self._get_identifier(self.ldb_dc1, cn3)
+
+ (hwm5, utdv5) = self._check_replication([dc3,ou1,ou2,self.ou,cn3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ self._check_replication([self.ou,ou1,ou2,dc3,cn3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
+
+ self._check_replication([self.ou,ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, ou2)
+ m["managedBy"] = ldb.MessageElement(dc3, ldb.FLAG_MOD_ADD, "managedBy")
+ self.ldb_dc1.modify(m)
+ ou2_managedBy_dc3 = AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ ou2_id.guid, dc3_id.guid)
+
+ (hwm6, utdv6) = self._check_replication([dc3,ou1,self.ou,cn3,ou2],
+ drsuapi.DRSUAPI_DRS_WRIT_REP,
+ expected_links=[ou2_managedBy_dc3])
+
+ # Can fail against Windows due to equal precedence of dc3, cn3
+ self._check_replication([self.ou,ou1,ou2,dc3,cn3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ expected_links=[ou2_managedBy_dc3])
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
+
+ self._check_replication([self.ou,ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_WRIT_REP,
+ uptodateness_vector=utdv5,
+ expected_links=[ou2_managedBy_dc3])
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ uptodateness_vector=utdv5)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ uptodateness_vector=utdv5)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, dc3)
+ m["managedBy"] = ldb.MessageElement(ou1, ldb.FLAG_MOD_ADD, "managedBy")
+ self.ldb_dc1.modify(m)
+ dc3_managedBy_ou1 = AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ dc3_id.guid, ou1_id.guid)
+
+ (hwm7, utdv7) = self._check_replication([ou1,self.ou,cn3,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP,
+ expected_links=[ou2_managedBy_dc3,dc3_managedBy_ou1])
+
+ # Can fail against Windows due to equal precedence of dc3, cn3
+ #self._check_replication([self.ou,ou1,ou2,dc3,cn3],
+ # drsuapi.DRSUAPI_DRS_WRIT_REP|
+ # drsuapi.DRSUAPI_DRS_GET_ANC,
+ # expected_links=[ou2_managedBy_dc3,dc3_managedBy_ou1])
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ expected_links=[dc3_managedBy_ou1])
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ expected_links=[dc3_managedBy_ou1])
+
+ self._check_replication([self.ou,ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ expected_links=[dc3_managedBy_ou1])
+
+ # GET_TGT seems to override DRS_CRITICAL_ONLY and also returns any
+ # object(s) that relate to the linked attributes (similar to GET_ANC)
+ self._check_replication([ou1, dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ more_flags=drsuapi.DRSUAPI_DRS_GET_TGT,
+ expected_links=[dc3_managedBy_ou1], dn_ordered=False)
+
+ # Change DC3's managedBy to OU2 instead of OU1
+ # Note that the OU1 managedBy linked attribute will still exist as
+ # a tombstone object (and so will be returned in the replication still)
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, dc3)
+ m["managedBy"] = ldb.MessageElement(ou2, ldb.FLAG_MOD_REPLACE, "managedBy")
+ self.ldb_dc1.modify(m)
+ dc3_managedBy_ou1.flags &= ~drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
+ dc3_managedBy_ou2 = AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ dc3_id.guid, ou2_id.guid)
+
+ (hwm8, utdv8) = self._check_replication([ou1,self.ou,cn3,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_WRIT_REP,
+ expected_links=[ou2_managedBy_dc3,dc3_managedBy_ou1,dc3_managedBy_ou2])
+
+ # Can fail against Windows due to equal precedence of dc3, cn3
+ #self._check_replication([self.ou,ou1,ou2,dc3,cn3],
+ # drsuapi.DRSUAPI_DRS_WRIT_REP|
+ # drsuapi.DRSUAPI_DRS_GET_ANC,
+ # expected_links=[ou2_managedBy_dc3,dc3_managedBy_ou1,dc3_managedBy_ou2])
+
+ self._check_replication([dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2])
+
+ self._check_replication([self.ou,ou1,ou2,dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2])
+
+ # GET_TGT will also return any DNs referenced by the linked attributes
+ # (including the Tombstone attribute)
+ self._check_replication([ou1, ou2, dc3],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ more_flags=drsuapi.DRSUAPI_DRS_GET_TGT,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2], dn_ordered=False)
+
+ # Use the highwater-mark prior to changing ManagedBy - this should
+ # only return the old/Tombstone and new linked attributes (we already
+ # know all the DNs)
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_WRIT_REP,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ highwatermark=hwm7)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ highwatermark=hwm7)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ highwatermark=hwm7)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ highwatermark=hwm7)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ more_flags=drsuapi.DRSUAPI_DRS_GET_TGT,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ highwatermark=hwm7)
+
+ # Repeat the above set of tests using the uptodateness_vector
+ # instead of the highwater-mark
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_WRIT_REP,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ uptodateness_vector=utdv7)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ uptodateness_vector=utdv7)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ uptodateness_vector=utdv7)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ uptodateness_vector=utdv7)
+
+ self._check_replication([],
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
+ more_flags=drsuapi.DRSUAPI_DRS_GET_TGT,
+ expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ uptodateness_vector=utdv7)
+
def test_FSMONotOwner(self):
"""Test role transfer with against DC not owner of the role"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
-class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
+class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaPrefixMapTestCase, self).setUp()
self.base_dn = self.ldb_dc1.get_default_basedn()
if enum == ldb.ERR_NO_SUCH_OBJECT:
pass
- def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
- partial_attribute_set = drsuapi.DsPartialAttributeSet()
- partial_attribute_set.attids = attids
- partial_attribute_set.num_attids = len(attids)
- return partial_attribute_set
-
def test_missing_prefix_map_dsa(self):
partial_attribute_set = self.get_partial_attribute_set()
pfm.ctr.num_mappings += 1
return pfm.ctr
-class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
+class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaSyncSortTestCase, self).setUp()
self.base_dn = self.ldb_dc1.get_default_basedn()
self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
- u1_guid = str(misc.GUID(self.ldb_dc1.search(base=user1_dn,
- attrs=["objectGUID"])[0]['objectGUID'][0]))
- u2_guid = str(misc.GUID(self.ldb_dc1.search(base=user2_dn,
- attrs=["objectGUID"])[0]['objectGUID'][0]))
- u3_guid = str(misc.GUID(self.ldb_dc1.search(base=user3_dn,
- attrs=["objectGUID"])[0]['objectGUID'][0]))
- g_guid = str(misc.GUID(self.ldb_dc1.search(base=group_dn,
- attrs=["objectGUID"])[0]['objectGUID'][0]))
+ u1_guid = misc.GUID(self.ldb_dc1.search(base=user1_dn,
+ attrs=["objectGUID"])[0]['objectGUID'][0])
+ u2_guid = misc.GUID(self.ldb_dc1.search(base=user2_dn,
+ attrs=["objectGUID"])[0]['objectGUID'][0])
+ u3_guid = misc.GUID(self.ldb_dc1.search(base=user3_dn,
+ attrs=["objectGUID"])[0]['objectGUID'][0])
+ g_guid = misc.GUID(self.ldb_dc1.search(base=group_dn,
+ attrs=["objectGUID"])[0]['objectGUID'][0])
self.add_linked_attribute(group_dn, user1_dn,
attr='member')
link.value.blob).guid
no_inactive.append((link, target_guid))
self.assertTrue(AbstractLink(link.attid, link.flags,
- str(link.identifier.guid),
- str(target_guid)) in expected_links)
+ link.identifier.guid,
+ target_guid) in expected_links)
no_inactive.sort(cmp=_linked_attribute_compare)
link.value.blob).guid
has_inactive.append((link, target_guid))
self.assertTrue(AbstractLink(link.attid, link.flags,
- str(link.identifier.guid),
- str(target_guid)) in expected_links)
+ link.identifier.guid,
+ target_guid) in expected_links)
has_inactive.sort(cmp=_linked_attribute_compare)