environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
+# A side-effect of the getncchanges tests is that they will create hundreds of
+# tombstone objects, so run them last to avoid interferring with (and slowing
+# down) the other DRS tests
+for env in ['vampire_dc', 'promoted_dc']:
+ planoldpythontestsuite(env, "getncchanges",
+ extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
+ name="samba4.drs.getncchanges.python(%s)" % env,
+ environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
+ extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
+
for env in ['ad_dc_ntvfs']:
planoldpythontestsuite(env, "repl_rodc",
extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
id.dn = str(res[0].dn)
return id
- def _check_replication(self, expected_dns, replica_flags, expected_links=[],
- drs_error=drsuapi.DRSUAPI_EXOP_ERR_NONE, drs=None, drs_handle=None,
- highwatermark=None, uptodateness_vector=None,
- more_flags=0, more_data=False,
- dn_ordered=True, links_ordered=True,
- max_objects=133, exop=0,
- dest_dsa=drsuapi.DRSUAPI_DS_BIND_GUID_W2K3,
- source_dsa=None, invocation_id=None, nc_dn_str=None,
- nc_object_count=0, nc_linked_attributes_count=0):
+ def _get_replication(self, replica_flags,
+ drs_error=drsuapi.DRSUAPI_EXOP_ERR_NONE, drs=None, drs_handle=None,
+ highwatermark=None, uptodateness_vector=None,
+ more_flags=0, max_objects=133, exop=0,
+ dest_dsa=drsuapi.DRSUAPI_DS_BIND_GUID_W2K3,
+ source_dsa=None, invocation_id=None, nc_dn_str=None):
"""
- Makes sure that replication returns the specific error given.
+ Builds a DsGetNCChanges request based on the information provided
+ and returns the response received from the DC.
"""
if source_dsa is None:
source_dsa = self.ldb_dc1.get_ntds_GUID()
self.assertEqual(level, 6, "expected level 6 response!")
self.assertEqual(ctr.source_dsa_guid, misc.GUID(source_dsa))
self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(invocation_id))
- ctr6 = ctr
- self.assertEqual(ctr6.extended_ret, drs_error)
+ self.assertEqual(ctr.extended_ret, drs_error)
+
+ return ctr
+
+ def _check_replication(self, expected_dns, replica_flags, expected_links=[],
+ drs_error=drsuapi.DRSUAPI_EXOP_ERR_NONE, drs=None, drs_handle=None,
+ highwatermark=None, uptodateness_vector=None,
+ more_flags=0, more_data=False,
+ dn_ordered=True, links_ordered=True,
+ max_objects=133, exop=0,
+ dest_dsa=drsuapi.DRSUAPI_DS_BIND_GUID_W2K3,
+ source_dsa=None, invocation_id=None, nc_dn_str=None,
+ nc_object_count=0, nc_linked_attributes_count=0):
+ """
+ Makes sure that replication returns the specific error given.
+ """
+
+ # send a DsGetNCChanges to the DC
+ ctr6 = self._get_replication(replica_flags,
+ drs_error, drs, drs_handle,
+ highwatermark, uptodateness_vector,
+ more_flags, max_objects, exop, dest_dsa,
+ source_dsa, invocation_id, nc_dn_str)
+
+ # check the response is what we expect
self._check_ctr6(ctr6, expected_dns, expected_links,
- nc_object_count=nc_object_count)
+ nc_object_count=nc_object_count, more_data=more_data,
+ dn_ordered=dn_ordered)
return (ctr6.new_highwatermark, ctr6.uptodateness_vector)
+
+ def _get_ctr6_dn_list(self, ctr6):
+ """
+ Returns the DNs contained in a DsGetNCChanges response.
+ """
+ dn_list = []
+ next_object = ctr6.first_object
+ for i in range(0, ctr6.object_count):
+ dn_list.append(next_object.object.identifier.dn)
+ next_object = next_object.next_object
+ self.assertEqual(next_object, None)
+
+ return dn_list
+
+
def _check_ctr6(self, ctr6, expected_dns=[], expected_links=[],
dn_ordered=True, links_ordered=True,
more_data=False, nc_object_count=0,
self.assertEqual(ctr6.nc_linked_attributes_count, nc_linked_attributes_count)
self.assertEqual(ctr6.drs_error[0], drs_error)
- ctr6_dns = []
- next_object = ctr6.first_object
- for i in range(0, ctr6.object_count):
- ctr6_dns.append(next_object.object.identifier.dn)
- next_object = next_object.next_object
- self.assertEqual(next_object, None)
+ ctr6_dns = self._get_ctr6_dn_list(ctr6)
i = 0
for dn in expected_dns:
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Tests various schema replication scenarios
+#
+# Copyright (C) Catalyst.Net Ltd. 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+#
+# Usage:
+# export DC1=dc1_dns_name
+# export DC2=dc2_dns_name
+# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
+# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
+#
+
+import drs_base
+import samba.tests
+import ldb
+from ldb import SCOPE_BASE
+
+from samba.dcerpc import drsuapi
+
+class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
+ def setUp(self):
+ super(DrsReplicaSyncIntegrityTestCase, self).setUp()
+ self.base_dn = self.ldb_dc1.get_default_basedn()
+ self.ou = "OU=uptodateness_test,%s" % self.base_dn
+ self.ldb_dc1.add({
+ "dn": self.ou,
+ "objectclass": "organizationalUnit"})
+ (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
+ (self.default_hwm, self.default_utdv) = self._get_highest_hwm_utdv(self.ldb_dc1)
+ self._debug = True
+
+ def tearDown(self):
+ super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
+ # tidyup groups and users
+ try:
+ self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
+ except ldb.LdbError as (enum, string):
+ if enum == ldb.ERR_NO_SUCH_OBJECT:
+ pass
+
+ def add_object(self, dn):
+ """Adds an OU object"""
+ self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalunit"})
+ res = self.ldb_dc1.search(base=dn, scope=SCOPE_BASE)
+ self.assertEquals(len(res), 1)
+
+ def modify_object(self, dn, attr, value):
+ """Modifies an object's USN by adding an attribute value to it"""
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, dn)
+ m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
+ self.ldb_dc1.modify(m)
+
+ def create_object_range(self, start, end, prefix=""):
+ """
+ Creates a block of objects. Object names are numbered sequentially,
+ using the optional prefix supplied.
+ """
+ dn_list = []
+
+ # Create the parents first, then the children.
+ # This makes it easier to see in debug when GET_ANC takes effect
+ # because the parent/children become interleaved (by default,
+ # this approach means the objects are organized into blocks of
+ # parents and blocks of children together)
+ for x in range(start, end):
+ ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
+ self.add_object(ou)
+ dn_list.append(ou)
+
+ return dn_list
+
+ def assert_expected_data(self, received_list, expected_list):
+ """
+ Asserts that we received all the DNs that we expected and
+ none are missing.
+ """
+
+ # Note that with GET_ANC Windows can end up sending the same parent
+ # object multiple times, so this might be noteworthy but doesn't
+ # warrant failing the test
+ if (len(received_list) != len(expected_list)):
+ print("Note: received %d objects but expected %d" %(len(received_list),
+ len(expected_list)))
+
+ # Check that we received every object that we were expecting
+ for dn in expected_list:
+ self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn)
+
+ def test_repl_integrity(self):
+ """
+ Modify the objects being replicated while the replication is still
+ in progress and check that no object loss occurs.
+ """
+
+ # The server behaviour differs between samba and Windows. Samba returns
+ # the objects in the original order (up to the pre-modify HWM). Windows
+ # incorporates the modified objects and returns them in the new order
+ # (i.e. modified objects last), up to the post-modify HWM. The Microsoft
+ # docs state the Windows behaviour is optional.
+
+ # Create a range of objects to replicate.
+ expected_dn_list = self.create_object_range(0, 400)
+ (orig_hwm, unused) = self._get_highest_hwm_utdv(self.ldb_dc1)
+
+ # We ask for the first page of 100 objects.
+ # For this test, we don't care what order we receive the objects in,
+ # so long as by the end we've received everything
+ rxd_dn_list = []
+ ctr6 = self._get_replication(drsuapi.DRSUAPI_DRS_WRIT_REP, max_objects=100)
+ rxd_dn_list = self._get_ctr6_dn_list(ctr6)
+
+ # Modify some of the second page of objects. This should bump the highwatermark
+ for x in range(100, 200):
+ self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
+
+ (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.ldb_dc1)
+ self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
+
+ # Get the remaining blocks of data
+ while ctr6.more_data:
+ ctr6 = self._get_replication(drsuapi.DRSUAPI_DRS_WRIT_REP, max_objects=100,
+ highwatermark=ctr6.new_highwatermark,
+ uptodateness_vector=ctr6.uptodateness_vector)
+ rxd_dn_list += self._get_ctr6_dn_list(ctr6)
+
+ # Check we still receive all the objects we're expecting
+ self.assert_expected_data(rxd_dn_list, expected_dn_list)
+
+