2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Catalyst.Net Ltd. 2017
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
33 from ldb import SCOPE_BASE
35 from samba.dcerpc import drsuapi
37 class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
39 super(DrsReplicaSyncIntegrityTestCase, self).setUp()
40 self.base_dn = self.ldb_dc1.get_default_basedn()
41 self.ou = "OU=uptodateness_test,%s" % self.base_dn
44 "objectclass": "organizationalUnit"})
45 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
46 (self.default_hwm, self.default_utdv) = self._get_highest_hwm_utdv(self.ldb_dc1)
50 super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
51 # tidyup groups and users
53 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
54 except ldb.LdbError as (enum, string):
55 if enum == ldb.ERR_NO_SUCH_OBJECT:
58 def add_object(self, dn):
59 """Adds an OU object"""
60 self.ldb_dc1.add({"dn": dn, "objectclass": "organizationalunit"})
61 res = self.ldb_dc1.search(base=dn, scope=SCOPE_BASE)
62 self.assertEquals(len(res), 1)
64 def modify_object(self, dn, attr, value):
65 """Modifies an object's USN by adding an attribute value to it"""
67 m.dn = ldb.Dn(self.ldb_dc1, dn)
68 m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
69 self.ldb_dc1.modify(m)
71 def create_object_range(self, start, end, prefix=""):
73 Creates a block of objects. Object names are numbered sequentially,
74 using the optional prefix supplied.
78 # Create the parents first, then the children.
79 # This makes it easier to see in debug when GET_ANC takes effect
80 # because the parent/children become interleaved (by default,
81 # this approach means the objects are organized into blocks of
82 # parents and blocks of children together)
83 for x in range(start, end):
84 ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
90 def assert_expected_data(self, received_list, expected_list):
92 Asserts that we received all the DNs that we expected and
96 # Note that with GET_ANC Windows can end up sending the same parent
97 # object multiple times, so this might be noteworthy but doesn't
98 # warrant failing the test
99 if (len(received_list) != len(expected_list)):
100 print("Note: received %d objects but expected %d" %(len(received_list),
103 # Check that we received every object that we were expecting
104 for dn in expected_list:
105 self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn)
107 def test_repl_integrity(self):
109 Modify the objects being replicated while the replication is still
110 in progress and check that no object loss occurs.
113 # The server behaviour differs between samba and Windows. Samba returns
114 # the objects in the original order (up to the pre-modify HWM). Windows
115 # incorporates the modified objects and returns them in the new order
116 # (i.e. modified objects last), up to the post-modify HWM. The Microsoft
117 # docs state the Windows behaviour is optional.
119 # Create a range of objects to replicate.
120 expected_dn_list = self.create_object_range(0, 400)
121 (orig_hwm, unused) = self._get_highest_hwm_utdv(self.ldb_dc1)
123 # We ask for the first page of 100 objects.
124 # For this test, we don't care what order we receive the objects in,
125 # so long as by the end we've received everything
127 ctr6 = self._get_replication(drsuapi.DRSUAPI_DRS_WRIT_REP, max_objects=100)
128 rxd_dn_list = self._get_ctr6_dn_list(ctr6)
130 # Modify some of the second page of objects. This should bump the highwatermark
131 for x in range(100, 200):
132 self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
134 (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.ldb_dc1)
135 self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
137 # Get the remaining blocks of data
138 while ctr6.more_data:
139 ctr6 = self._get_replication(drsuapi.DRSUAPI_DRS_WRIT_REP, max_objects=100,
140 highwatermark=ctr6.new_highwatermark,
141 uptodateness_vector=ctr6.uptodateness_vector)
142 rxd_dn_list += self._get_ctr6_dn_list(ctr6)
144 # Check we still receive all the objects we're expecting
145 self.assert_expected_data(rxd_dn_list, expected_dn_list)