#
import time
+import uuid
+from samba.ndr import ndr_unpack
+from samba.dcerpc import drsblobs
+from samba.dcerpc import misc
+from samba.drs_utils import drs_DsBind
from ldb import (
SCOPE_BASE,
)
import drs_base, ldb
-
+from samba.dcerpc.drsuapi import *
class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase):
+ def _ds_bind(self, server_name):
+ binding_str = "ncacn_ip_tcp:%s[print,seal]" % server_name
+
+ drs = drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
+ (drs_handle, supported_extensions) = drs_DsBind(drs)
+ return (drs, drs_handle)
+
def setUp(self):
super(DrsMoveObjectTestCase, self).setUp()
# make sure DCs are synchronized before the test
# trigger replication from DC1 to DC2
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ self.dc1_guid = self.ldb_dc1.get_invocation_id()
+ self.dc2_guid = self.ldb_dc2.get_invocation_id()
+
+ self.drs_dc1 = self._ds_bind(self.dnsname_dc1)
+ self.drs_dc2 = self._ds_bind(self.dnsname_dc2)
def tearDown(self):
try:
def _make_username(self):
return "DrsMoveU_" + time.strftime("%s", time.gmtime())
+ def _check_metadata(self, user_dn, sam_ldb, drs, metadata, expected):
+ repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(metadata[0]))
+
+ # We should assert the entire array, but windows and Samba
+ # disagree both on sorting (on Windows the RDN is not sorted
+ # last in replPropertyMetaData, only over DRS), and internal
+ # implementation behaviours show up in terms of the version.
+ #
+ # In particular, Windows seems to add password attributes that
+ # Samba doesn't add until they are actually needed. So we
+ # optionally permit that all the password attributes are omited
+
+ password_attids = set([DRSUAPI_ATTID_dBCSPwd,
+ DRSUAPI_ATTID_unicodePwd,
+ DRSUAPI_ATTID_lmPwdHistory,
+ DRSUAPI_ATTID_ntPwdHistory,
+ DRSUAPI_ATTID_logonHours])
+
+ if len(repl.ctr.array) != len(expected):
+ new_expected = []
+ for e in expected:
+ (attid, orig_dsa, version) = e
+ if attid not in password_attids:
+ new_expected.append(e)
+ expected = new_expected
+
+ self.assertEqual(len(repl.ctr.array), len(expected))
+
+ i = 0
+ for o in repl.ctr.array:
+ e = expected[i]
+ (attid, orig_dsa, version) = e
+ self.assertEquals(attid, o.attid,
+ "(LDAP) Wrong attid "
+ "for expected value %d, wanted 0x%08x got 0x%08x"
+ % (i, attid, o.attid))
+ self.assertEquals(o.originating_invocation_id,
+ misc.GUID(orig_dsa),
+ "(LDAP) Wrong originating_invocation_id "
+ "for expected value %d, attid 0x%08x, wanted %s got %s"
+ % (i, o.attid,
+ misc.GUID(orig_dsa),
+ o.originating_invocation_id))
+ # Allow version to be skipped when it does not matter
+ if version is not None:
+ self.assertEquals(o.version, version,
+ "(LDAP) Wrong version for expected value %d, "
+ "attid 0x%08x, "
+ "wanted %d got %d"
+ % (i, o.attid,
+ version, o.version))
+ i = i + 1
+
+ if drs == None:
+ return
+
+ req8 = DsGetNCChangesRequest8()
+
+ req8.source_dsa_invocation_id = misc.GUID(sam_ldb.get_invocation_id())
+ req8.naming_context = DsReplicaObjectIdentifier()
+ req8.naming_context.dn = str(user_dn)
+ req8.highwatermark = DsReplicaHighWaterMark()
+ req8.highwatermark.tmp_highest_usn = 0
+ req8.highwatermark.reserved_usn = 0
+ req8.highwatermark.highest_usn = 0
+ req8.uptodateness_vector = None
+ req8.replica_flags = 0
+ req8.max_object_count = 1
+ req8.max_ndr_size = 402116
+ req8.extended_op = DRSUAPI_EXOP_REPL_OBJ
+ req8.fsmo_info = 0
+ req8.partial_attribute_set = None
+ req8.partial_attribute_set_ex = None
+ req8.mapping_ctr.num_mappings = 0
+ req8.mapping_ctr.mappings = None
+
+ (drs_conn, drs_handle) = drs
+
+ (level, drs_ctr) = drs_conn.DsGetNCChanges(drs_handle, 8, req8)
+ self.assertEqual(level, 6)
+ self.assertEqual(drs_ctr.object_count, 1)
+
+ self.assertEqual(len(drs_ctr.first_object.meta_data_ctr.meta_data), len(expected) - 1)
+ att_idx = 0
+ for o in drs_ctr.first_object.meta_data_ctr.meta_data:
+ i = 0
+ drs_attid = drs_ctr.first_object.object.attribute_ctr.attributes[att_idx]
+ e = expected[i];
+ (attid, orig_dsa, version) = e
+
+ # Skip the RDN from the expected set, it is not sent over DRS
+ if (user_dn.get_rdn_name().upper() == "CN" \
+ and attid == DRSUAPI_ATTID_cn) \
+ or (user_dn.get_rdn_name().upper() == "OU" \
+ and attid == DRSUAPI_ATTID_ou):
+ i = i + 1
+ e = expected[i];
+ (attid, orig_dsa, version) = e
+
+ self.assertEquals(attid, drs_attid.attid,
+ "(DRS) Wrong attid "
+ "for expected value %d, wanted 0x%08x got 0x%08x"
+ % (i, attid, drs_attid.attid))
+
+ self.assertEquals(o.originating_invocation_id,
+ misc.GUID(orig_dsa),
+ "(DRS) Wrong originating_invocation_id "
+ "for expected value %d, attid 0x%08x, wanted %s got %s"
+ % (i, attid,
+ misc.GUID(orig_dsa),
+ o.originating_invocation_id))
+ # Allow version to be skipped when it does not matter
+ if version is not None:
+ self.assertEquals(o.version, version,
+ "(DRS) Wrong version for expected value %d, "
+ "attid 0x%08x, "
+ "wanted %d got %d"
+ % (i, attid, version, o.version))
+ break
+ i = i + 1
+ att_idx = att_idx + 1
+
# now also used to check the group
- def _check_obj(self, sam_ldb, obj_orig, is_deleted):
+ def _check_obj(self, sam_ldb, obj_orig, is_deleted, expected_metadata=None, drs=None):
# search the user by guid as it may be deleted
guid_str = self._GUID_string(obj_orig["objectGUID"][0])
res = sam_ldb.search(base='<GUID=%s>' % guid_str,
controls=["show_deleted:1"],
- attrs=["*", "parentGUID"])
+ attrs=["*", "parentGUID",
+ "replPropertyMetaData"])
self.assertEquals(len(res), 1)
user_cur = res[0]
rdn_orig = obj_orig[user_cur.dn.get_rdn_name()][0]
self.assertEquals(name_cur, name_orig)
self.assertEquals(dn_cur, dn_orig)
self.assertEqual(name_cur, rdn_cur)
+ parent_cur = user_cur["parentGUID"][0]
+ try:
+ parent_orig = obj_orig["parentGUID"][0]
+ self.assertEqual(parent_orig, parent_cur)
+ except KeyError:
+ pass
self.assertEqual(name_cur, user_cur.dn.get_rdn_value())
+ if expected_metadata is not None:
+ self._check_metadata(dn_cur, sam_ldb, drs, user_cur["replPropertyMetaData"],
+ expected_metadata)
+
return user_cur
password=None, setpassword=False)
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_orig = ldb_res[0]
user_dn = ldb_res[0]["dn"]
# check user info on DC1
print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
- self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+ initial_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_orig, is_deleted=False,
+ expected_metadata=initial_metadata)
new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
new_dn.add_base(self.ou2_dn)
self.ldb_dc1.rename(user_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_moved_orig = ldb_res[0]
user_moved_dn = ldb_res[0]["dn"]
+ moved_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ # check user info on DC1 after rename - should be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=moved_metadata)
+
# trigger replication from DC1 to DC2
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ moved_metadata_dc2 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
# check user info on DC2 - should be valid user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=moved_metadata_dc2)
# delete user on DC1
self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ deleted_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True, expected_metadata=deleted_metadata)
+
# Modify description on DC2. This triggers a replication, but
# not of 'name' and so a bug in Samba regarding the DN.
msg = ldb.Message()
msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
self.ldb_dc2.modify(msg)
+ modified_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_description, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=modified_metadata)
+
+
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ deleted_modified_metadata_dc2 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 2),
+ (DRSUAPI_ATTID_description, self.dc2_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
# check user info on DC2 - should be delted user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_modified_metadata_dc2)
self.assertFalse("description" in user_cur)
# trigger replication from DC2 to DC1, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ deleted_modified_metadata_dc1 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_description, self.dc2_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
# check user info on DC1 - should be delted user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_modified_metadata_dc1)
self.assertFalse("description" in user_cur)
password=None, setpassword=False)
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_orig = ldb_res[0]
user_dn = ldb_res[0]["dn"]
# check user info on DC1
print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
- self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+ initial_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_orig, is_deleted=False,
+ expected_metadata=initial_metadata)
new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
new_dn.add_base(self.ou2_dn)
self.ldb_dc1.rename(user_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
+ user_moved_orig = ldb_res[0]
+
+ moved_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ # check user info on DC1 after rename - should be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=moved_metadata)
# check user info on DC2 - should not be there, we have not done replication
ldb_res = self.ldb_dc2.search(base=self.ou2_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 0)
# delete user on DC1
self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ deleted_metadata_dc1 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
+ # check user info on DC1 - should be delted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc1)
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ deleted_metadata_dc2 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
# check user info on DC2 - should be delted user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc2)
# trigger replication from DC2 to DC1, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
# check user info on DC1 - should be delted user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc1)
def test_ReplicateMoveObject3(self):
password=None, setpassword=False)
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_orig = ldb_res[0]
user_dn = ldb_res[0]["dn"]
# check user info on DC1
print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
- self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+ initial_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_orig, is_deleted=False,
+ expected_metadata=initial_metadata)
+
new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
new_dn.add_base(self.ou2_dn)
self.ldb_dc1.rename(user_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_moved_orig = ldb_res[0]
user_moved_dn = ldb_res[0]["dn"]
- # trigger replication from DC2 (Which has never seen the object) to DC1
+ # trigger replication from DC1 to DC2
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
- # check user info on DC1 - should be valid user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False)
+ moved_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ # check user info on DC1 after rename - should be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=moved_metadata)
# delete user on DC1
self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ deleted_metadata_dc1 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
+ # check user info on DC1 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc1)
+
# trigger replication from DC2 to DC1
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+
+ # check user info on DC1 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc1)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ deleted_metadata_dc2 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
# check user info on DC2 - should be delted user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc2)
+
+
+ def test_ReplicateMoveObject3b(self):
+ """Verifies how a moved container with a user inside is replicated between two DCs.
+ This test should verify that:
+ - the OU is created on DC1
+ - the OU is renamed on DC1
+ - We verify that after replication,
+ that the user has the correct DN (under OU2).
+
+ """
+ # work-out unique username to test with
+ username = self._make_username()
+
+ # create user on DC1
+ self.ldb_dc1.newuser(username=username,
+ userou="ou=%s" % self.ou1_dn.get_component_value(0),
+ password=None, setpassword=False)
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_SUBTREE,
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
+ self.assertEquals(len(ldb_res), 1)
+ user_orig = ldb_res[0]
+ user_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
+ initial_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_orig, is_deleted=False,
+ expected_metadata=initial_metadata)
+
+
+ new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
+ new_dn.add_base(self.ou2_dn)
+ self.ldb_dc1.rename(user_dn, new_dn)
+ ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
+ scope=SCOPE_SUBTREE,
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
+ self.assertEquals(len(ldb_res), 1)
+
+ user_moved_orig = ldb_res[0]
+ user_moved_dn = ldb_res[0]["dn"]
+
+ # trigger replication from DC2 (Which has never seen the object) to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ moved_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ # check user info on DC1 after rename - should be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=moved_metadata)
+
+ # delete user on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ deleted_metadata_dc1 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
+ # check user info on DC1 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc1)
+
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+
+ # check user info on DC1 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc1)
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ deleted_metadata_dc2 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
# check user info on DC2 - should be delted user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc2)
def test_ReplicateMoveObject4(self):
password=None, setpassword=False)
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_orig = ldb_res[0]
user_dn = ldb_res[0]["dn"]
# check user info on DC1
print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
- self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+ initial_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_orig, is_deleted=False,
+ expected_metadata=initial_metadata)
# trigger replication from DC1 to DC2
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ initial_metadata_dc2 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
# check user info on DC2 - should still be valid user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_orig, is_deleted=False,
+ expected_metadata=initial_metadata_dc2)
new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
new_dn.add_base(self.ou2_dn)
self.ldb_dc1.rename(user_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_moved_orig = ldb_res[0]
user_moved_dn = ldb_res[0]["dn"]
+ moved_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ # check user info on DC1 after rename - should be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=moved_metadata)
+
# Modify description on DC2. This triggers a replication, but
# not of 'name' and so a bug in Samba regarding the DN.
msg = ldb.Message()
msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
self.ldb_dc2.modify(msg)
+ modified_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_description, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_orig,
+ is_deleted=False,
+ expected_metadata=modified_metadata)
+
# trigger replication from DC1 to DC2
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ modified_renamed_metadata = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 2),
+ (DRSUAPI_ATTID_description, self.dc2_guid, 1),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
+
# check user info on DC2 - should still be valid user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=modified_renamed_metadata)
+
self.assertTrue("description" in user_cur)
# delete user on DC1
self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ deleted_metadata_dc1 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
+ # check user info on DC1 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc1)
+
# trigger replication from DC2 to DC1
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
- # check user info on DC2 - should be deleted user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
+ # check user info on DC2 - should still be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=False,
+ expected_metadata=modified_renamed_metadata)
+
+ self.assertTrue("description" in user_cur)
+
+ deleted_metadata_dc1 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_description, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
+
+ # check user info on DC1 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc1)
+
self.assertFalse("description" in user_cur)
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ deleted_metadata_dc2 = [
+ (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_cn, self.dc2_guid, 3),
+ (DRSUAPI_ATTID_description, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_name, self.dc1_guid, 3),
+ (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
+ (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
+ (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
+ (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
+
# check user info on DC2 - should be deleted user
- user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
+ obj_orig=user_moved_orig,
+ is_deleted=True,
+ expected_metadata=deleted_metadata_dc2)
+
self.assertFalse("description" in user_cur)
def test_ReplicateMoveObject5(self):
password=None, setpassword=False)
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_orig = ldb_res[0]
user_dn = ldb_res[0]["dn"]
self.ldb_dc1.rename(user_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
scope=SCOPE_SUBTREE,
- expression="(samAccountName=%s)" % username)
+ expression="(samAccountName=%s)" % username,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
user_moved_orig = ldb_res[0]
"""
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_orig = ldb_res[0]
ou_dn = ldb_res[0]["dn"]
new_dn.add_base(self.ou2_dn)
self.ldb_dc1.rename(ou_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=new_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_moved_orig = ldb_res[0]
"""
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_orig = ldb_res[0]
ou_dn = ldb_res[0]["dn"]
new_dn.add_base(self.ou2_dn)
self.ldb_dc1.rename(ou_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=new_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_moved_orig = ldb_res[0]
"""
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_orig = ldb_res[0]
ou_dn = ldb_res[0]["dn"]
new_dn.add_base(self.ou1_dn.parent())
self.ldb_dc1.rename(ou_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=new_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_moved_orig = ldb_res[0]
"""
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_orig = ldb_res[0]
ou_dn = ldb_res[0]["dn"]
new_dn.add_base(self.ou1_dn.parent())
self.ldb_dc1.rename(ou_dn, new_dn)
ldb_res = self.ldb_dc1.search(base=new_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_moved_orig = ldb_res[0]
"""
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_orig = ldb_res[0]
ou_dn = ldb_res[0]["dn"]
"""
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
- scope=SCOPE_BASE)
+ scope=SCOPE_BASE,
+ attrs=["*", "parentGUID"])
self.assertEquals(len(ldb_res), 1)
ou_orig = ldb_res[0]
ou_dn = ldb_res[0]["dn"]