2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN replica_sync -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 from __future__ import print_function
37 SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT)
39 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
40 """Intended as a black box test case for DsReplicaSync
41 implementation. It should test the behavior of this
42 case in cases when inbound replication is disabled"""
45 super(DrsReplicaSyncTestCase, self).setUp()
47 # This OU avoids this test conflicting with anything
48 # that may already be in the DB
49 self.top_ou = samba.tests.create_test_ou(self.ldb_dc1,
51 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
52 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
57 self._cleanup_object(self.ou1)
58 self._cleanup_object(self.ou2)
59 self._cleanup_dn(self.top_ou)
61 # re-enable replication
62 self._enable_inbound_repl(self.dnsname_dc1)
63 self._enable_inbound_repl(self.dnsname_dc2)
65 super(DrsReplicaSyncTestCase, self).tearDown()
67 def _cleanup_dn(self, dn):
69 self.ldb_dc2.delete(dn, ["tree_delete:1"])
72 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
74 self.ldb_dc1.delete(dn, ["tree_delete:1"])
75 except LdbError as e1:
77 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
79 def _cleanup_object(self, guid):
80 """Cleans up a test object, if it still exists"""
82 self._cleanup_dn('<GUID=%s>' % guid)
84 def test_ReplEnabled(self):
85 """Tests we can replicate when replication is enabled"""
86 self._enable_inbound_repl(self.dnsname_dc1)
87 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False)
89 def test_ReplDisabled(self):
90 """Tests we cann't replicate when replication is disabled"""
91 self._disable_inbound_repl(self.dnsname_dc1)
93 ccache_name = self.get_creds_ccache_name()
95 # Tunnel the command line credentials down to the
96 # subcommand to avoid a new kinit
97 cmdline_auth = "--krb5-ccache=%s" % ccache_name
99 # bin/samba-tool drs <drs_command> <cmdline_auth>
100 cmd_list = ["drs", "replicate", cmdline_auth]
102 nc_dn = self.domain_dn
103 # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
104 cmd_list += [self.dnsname_dc1, self.dnsname_dc2, nc_dn]
106 (result, out, err) = self.runsubcmd(*cmd_list)
107 self.assertCmdFail(result)
108 self.assertTrue('WERR_DS_DRA_SINK_DISABLED' in err)
110 def test_ReplDisabledForced(self):
111 """Tests we can force replicate when replication is disabled"""
112 self._disable_inbound_repl(self.dnsname_dc1)
113 out = self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
115 def test_ReplLocal(self):
116 """Tests we can replicate direct to the local db"""
117 self._enable_inbound_repl(self.dnsname_dc1)
118 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False, local=True, full_sync=True)
120 def _create_ou(self, samdb, name):
123 objectClass: organizationalUnit
124 """ % (name, self.top_ou)
126 res = samdb.search(base="%s,%s" % (name, self.top_ou),
127 scope=SCOPE_BASE, attrs=["objectGUID"])
128 return self._GUID_string(res[0]["objectGUID"][0])
130 def _check_deleted(self, sam_ldb, guid):
131 # search the user by guid as it may be deleted
132 res = sam_ldb.search(base='<GUID=%s>' % guid,
133 controls=["show_deleted:1"],
134 attrs=["isDeleted", "objectCategory", "ou"])
135 self.assertEquals(len(res), 1)
137 # Deleted Object base DN
138 dodn = self._deleted_objects_dn(sam_ldb)
139 # now check properties of the user
140 name_cur = ou_cur["ou"][0]
141 self.assertEquals(ou_cur["isDeleted"][0], "TRUE")
142 self.assertTrue(not("objectCategory" in ou_cur))
143 self.assertTrue(dodn in str(ou_cur["dn"]),
144 "OU %s is deleted but it is not located under %s!" % (name_cur, dodn))
146 def test_ReplConflictsFullSync(self):
147 """Tests that objects created in conflict become conflict DNs (honour full sync override)"""
149 # First confirm local replication (so when we test against windows, this fails fast without creating objects)
150 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
152 self._disable_inbound_repl(self.dnsname_dc1)
153 self._disable_inbound_repl(self.dnsname_dc2)
155 # Create conflicting objects on DC1 and DC2, with DC1 object created first
156 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Full Sync")
157 # We have to sleep to ensure that the two objects have different timestamps
159 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Full Sync")
161 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
163 # Check that DC2 got the DC1 object, and OU1 was make into conflict
164 res1 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou1,
165 scope=SCOPE_BASE, attrs=["name"])
166 res2 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou2,
167 scope=SCOPE_BASE, attrs=["name"])
168 print(res1[0]["name"][0])
169 print(res2[0]["name"][0])
170 self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
171 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
172 self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res1[0].dn))
173 self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res2[0].dn))
174 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
175 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
177 # Delete both objects by GUID on DC2
179 self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
180 self.ldb_dc2.delete('<GUID=%s>' % self.ou2)
182 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=True)
184 self._check_deleted(self.ldb_dc1, self.ou1)
185 self._check_deleted(self.ldb_dc1, self.ou2)
186 # Check deleted on DC2
187 self._check_deleted(self.ldb_dc2, self.ou1)
188 self._check_deleted(self.ldb_dc2, self.ou2)
190 def test_ReplConflictsRemoteWin(self):
191 """Tests that objects created in conflict become conflict DNs"""
192 self._disable_inbound_repl(self.dnsname_dc1)
193 self._disable_inbound_repl(self.dnsname_dc2)
195 # Create conflicting objects on DC1 and DC2, with DC1 object created first
196 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Conflict")
197 # We have to sleep to ensure that the two objects have different timestamps
199 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Conflict")
201 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
203 # Check that DC2 got the DC1 object, and OU1 was make into conflict
204 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
205 scope=SCOPE_BASE, attrs=["name"])
206 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
207 scope=SCOPE_BASE, attrs=["name"])
208 print(res1[0]["name"][0])
209 print(res2[0]["name"][0])
210 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
211 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
212 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
213 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
214 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
216 # Delete both objects by GUID on DC1
218 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
219 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
221 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
223 self._check_deleted(self.ldb_dc1, self.ou1)
224 self._check_deleted(self.ldb_dc1, self.ou2)
225 # Check deleted on DC2
226 self._check_deleted(self.ldb_dc2, self.ou1)
227 self._check_deleted(self.ldb_dc2, self.ou2)
229 def test_ReplConflictsLocalWin(self):
230 """Tests that objects created in conflict become conflict DNs"""
231 self._disable_inbound_repl(self.dnsname_dc1)
232 self._disable_inbound_repl(self.dnsname_dc2)
234 # Create conflicting objects on DC1 and DC2, with DC2 object created first
235 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Local Conflict")
236 # We have to sleep to ensure that the two objects have different timestamps
238 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Local Conflict")
240 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
242 # Check that DC2 got the DC1 object, and OU2 was make into conflict
243 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
244 scope=SCOPE_BASE, attrs=["name"])
245 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
246 scope=SCOPE_BASE, attrs=["name"])
247 print(res1[0]["name"][0])
248 print(res2[0]["name"][0])
249 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]), "Got %s for %s" % (str(res2[0]["name"][0]), self.ou2))
250 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
251 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
252 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
253 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
255 # Delete both objects by GUID on DC1
257 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
258 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
260 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
262 self._check_deleted(self.ldb_dc1, self.ou1)
263 self._check_deleted(self.ldb_dc1, self.ou2)
264 # Check deleted on DC2
265 self._check_deleted(self.ldb_dc2, self.ou1)
266 self._check_deleted(self.ldb_dc2, self.ou2)
268 def test_ReplConflictsRemoteWin_with_child(self):
269 """Tests that objects created in conflict become conflict DNs"""
270 self._disable_inbound_repl(self.dnsname_dc1)
271 self._disable_inbound_repl(self.dnsname_dc2)
273 # Create conflicting objects on DC1 and DC2, with DC1 object created first
274 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Conflict")
275 # We have to sleep to ensure that the two objects have different timestamps
277 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Conflict")
278 # Create children on DC2
279 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Conflict")
280 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Conflict")
282 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
284 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
285 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
286 scope=SCOPE_BASE, attrs=["name"])
287 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
288 scope=SCOPE_BASE, attrs=["name"])
289 print(res1[0]["name"][0])
290 print(res2[0]["name"][0])
291 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
292 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
293 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
294 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
295 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
297 # Delete both objects by GUID on DC1
299 self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
300 self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
302 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
304 self._check_deleted(self.ldb_dc1, self.ou1)
305 self._check_deleted(self.ldb_dc1, self.ou2)
306 # Check deleted on DC2
307 self._check_deleted(self.ldb_dc2, self.ou1)
308 self._check_deleted(self.ldb_dc2, self.ou2)
310 self._check_deleted(self.ldb_dc1, ou1_child)
311 self._check_deleted(self.ldb_dc1, ou2_child)
312 # Check deleted on DC2
313 self._check_deleted(self.ldb_dc2, ou1_child)
314 self._check_deleted(self.ldb_dc2, ou2_child)
316 def test_ReplConflictsRenamedVsNewRemoteWin(self):
317 """Tests resolving a DN conflict between a renamed object and a new object"""
318 self._disable_inbound_repl(self.dnsname_dc1)
319 self._disable_inbound_repl(self.dnsname_dc2)
321 # Create an OU and rename it on DC1
322 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict orig")
323 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict,%s" % self.top_ou)
325 # We have to sleep to ensure that the two objects have different timestamps
328 # create a conflicting object with the same DN on DC2
329 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict")
331 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
333 # Check that DC2 got the DC1 object, and SELF.OU1 was made into conflict
334 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
335 scope=SCOPE_BASE, attrs=["name"])
336 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
337 scope=SCOPE_BASE, attrs=["name"])
338 print(res1[0]["name"][0])
339 print(res2[0]["name"][0])
340 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
341 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
342 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
343 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
344 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
346 # Delete both objects by GUID on DC1
347 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
348 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
350 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
352 self._check_deleted(self.ldb_dc1, self.ou1)
353 self._check_deleted(self.ldb_dc1, self.ou2)
354 # Check deleted on DC2
355 self._check_deleted(self.ldb_dc2, self.ou1)
356 self._check_deleted(self.ldb_dc2, self.ou2)
358 def test_ReplConflictsRenamedVsNewLocalWin(self):
359 """Tests resolving a DN conflict between a renamed object and a new object"""
360 self._disable_inbound_repl(self.dnsname_dc1)
361 self._disable_inbound_repl(self.dnsname_dc2)
363 # Create conflicting objects on DC1 and DC2, where the DC2 object has been renamed
364 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict orig")
365 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict,%s" % self.top_ou)
366 # We have to sleep to ensure that the two objects have different timestamps
368 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
370 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
372 # Check that DC2 got the DC1 object, and OU2 was made into conflict
373 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
374 scope=SCOPE_BASE, attrs=["name"])
375 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
376 scope=SCOPE_BASE, attrs=["name"])
377 print(res1[0]["name"][0])
378 print(res2[0]["name"][0])
379 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
380 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
381 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
382 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
383 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
385 # Delete both objects by GUID on DC1
386 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
387 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
389 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
391 self._check_deleted(self.ldb_dc1, self.ou1)
392 self._check_deleted(self.ldb_dc1, self.ou2)
393 # Check deleted on DC2
394 self._check_deleted(self.ldb_dc2, self.ou1)
395 self._check_deleted(self.ldb_dc2, self.ou2)
397 def test_ReplConflictsRenameRemoteWin(self):
398 """Tests that objects created in conflict become conflict DNs"""
399 self._disable_inbound_repl(self.dnsname_dc1)
400 self._disable_inbound_repl(self.dnsname_dc2)
402 # Create conflicting objects on DC1 and DC2, with DC1 object created first
403 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict")
404 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict 2")
406 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
408 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict 3,%s" % self.top_ou)
409 # We have to sleep to ensure that the two objects have different timestamps
411 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Remote Rename Conflict 3,%s" % self.top_ou)
413 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
415 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
416 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
417 scope=SCOPE_BASE, attrs=["name"])
418 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
419 scope=SCOPE_BASE, attrs=["name"])
420 print(res1[0]["name"][0])
421 print(res2[0]["name"][0])
422 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
423 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
424 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
425 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
426 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
428 # Delete both objects by GUID on DC1
430 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
431 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
433 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
435 self._check_deleted(self.ldb_dc1, self.ou1)
436 self._check_deleted(self.ldb_dc1, self.ou2)
437 # Check deleted on DC2
438 self._check_deleted(self.ldb_dc2, self.ou1)
439 self._check_deleted(self.ldb_dc2, self.ou2)
442 def test_ReplConflictsRenameRemoteWin_with_child(self):
443 """Tests that objects created in conflict become conflict DNs"""
444 self._disable_inbound_repl(self.dnsname_dc1)
445 self._disable_inbound_repl(self.dnsname_dc2)
447 # Create conflicting objects on DC1 and DC2, with DC1 object created first
448 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Rename Conflict")
449 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Rename Conflict 2")
450 # Create children on DC2
451 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Rename Conflict")
452 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Rename Conflict 2")
454 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
456 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Parent Remote Rename Conflict 3,%s" % self.top_ou)
457 # We have to sleep to ensure that the two objects have different timestamps
459 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Parent Remote Rename Conflict 3,%s" % self.top_ou)
461 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
463 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
464 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
465 scope=SCOPE_BASE, attrs=["name"])
466 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
467 scope=SCOPE_BASE, attrs=["name"])
468 print(res1[0]["name"][0])
469 print(res2[0]["name"][0])
470 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
471 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
472 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
473 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
474 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
476 # Delete both objects by GUID on DC1
478 self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
479 self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
481 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
483 self._check_deleted(self.ldb_dc1, self.ou1)
484 self._check_deleted(self.ldb_dc1, self.ou2)
485 # Check deleted on DC2
486 self._check_deleted(self.ldb_dc2, self.ou1)
487 self._check_deleted(self.ldb_dc2, self.ou2)
489 self._check_deleted(self.ldb_dc1, ou1_child)
490 self._check_deleted(self.ldb_dc1, ou2_child)
491 # Check deleted on DC2
492 self._check_deleted(self.ldb_dc2, ou1_child)
493 self._check_deleted(self.ldb_dc2, ou2_child)
496 def test_ReplConflictsRenameLocalWin(self):
497 """Tests that objects created in conflict become conflict DNs"""
498 self._disable_inbound_repl(self.dnsname_dc1)
499 self._disable_inbound_repl(self.dnsname_dc2)
501 # Create conflicting objects on DC1 and DC2, with DC1 object created first
502 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
503 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict 2")
505 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
507 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict 3,%s" % self.top_ou)
508 # We have to sleep to ensure that the two objects have different timestamps
510 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Rename Local Conflict 3,%s" % self.top_ou)
512 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
514 # Check that DC2 got the DC1 object, and OU2 was make into conflict
515 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
516 scope=SCOPE_BASE, attrs=["name"])
517 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
518 scope=SCOPE_BASE, attrs=["name"])
519 print(res1[0]["name"][0])
520 print(res2[0]["name"][0])
521 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
522 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
523 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
524 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
525 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
527 # Delete both objects by GUID on DC1
529 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
530 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
532 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
534 self._check_deleted(self.ldb_dc1, self.ou1)
535 self._check_deleted(self.ldb_dc1, self.ou2)
536 # Check deleted on DC2
537 self._check_deleted(self.ldb_dc2, self.ou1)
538 self._check_deleted(self.ldb_dc2, self.ou2)
540 def test_ReplLostAndFound(self):
541 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
542 self._disable_inbound_repl(self.dnsname_dc1)
543 self._disable_inbound_repl(self.dnsname_dc2)
545 # Create two OUs on DC2
546 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Deleted parent")
547 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Deleted parent 2")
549 # replicate them from DC2 to DC1
550 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
552 # Delete both objects by GUID on DC1
554 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
555 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
557 # Create children on DC2
558 ou1_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent")
559 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent 2")
562 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
564 # Check the sub-OUs are now in lostAndFound and the first one is a conflict DN
566 # Check that DC2 got the DC1 object, and one or other object was make into conflict
567 res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
568 scope=SCOPE_BASE, attrs=["name"])
569 res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
570 scope=SCOPE_BASE, attrs=["name"])
571 print(res1[0]["name"][0])
572 print(res2[0]["name"][0])
573 self.assertTrue('CNF:%s' % ou1_child in str(res1[0]["name"][0]) or 'CNF:%s' % ou2_child in str(res2[0]["name"][0]))
574 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res1[0].dn))
575 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res2[0].dn))
576 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
577 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
579 # Delete all objects by GUID on DC1
581 self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
582 self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
584 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
587 # Check all deleted on DC1
588 self._check_deleted(self.ldb_dc1, self.ou1)
589 self._check_deleted(self.ldb_dc1, self.ou2)
590 self._check_deleted(self.ldb_dc1, ou1_child)
591 self._check_deleted(self.ldb_dc1, ou2_child)
592 # Check all deleted on DC2
593 self._check_deleted(self.ldb_dc2, self.ou1)
594 self._check_deleted(self.ldb_dc2, self.ou2)
595 self._check_deleted(self.ldb_dc2, ou1_child)
596 self._check_deleted(self.ldb_dc2, ou2_child)
598 def test_ReplRenames(self):
599 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
600 self._disable_inbound_repl(self.dnsname_dc1)
601 self._disable_inbound_repl(self.dnsname_dc2)
603 # Create two OUs on DC2
604 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Original parent")
605 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Original parent 2")
607 # replicate them from DC2 to DC1
608 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
610 # Create children on DC1
611 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Original parent")
612 ou2_child = self._create_ou(self.ldb_dc1, "OU=Test Child 2,OU=Original parent")
613 ou3_child = self._create_ou(self.ldb_dc1, "OU=Test Case Child,OU=Original parent")
615 # replicate them from DC1 to DC2
616 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
618 self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child 3,OU=Original parent 2,%s" % self.top_ou)
619 self.ldb_dc1.rename("<GUID=%s>" % ou1_child, "OU=Test Child 2,OU=Original parent 2,%s" % self.top_ou)
620 self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child,OU=Original parent 2,%s" % self.top_ou)
621 self.ldb_dc1.rename("<GUID=%s>" % ou3_child, "OU=Test CASE Child,OU=Original parent,%s" % self.top_ou)
622 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Original parent 3,%s" % self.top_ou)
623 self.ldb_dc2.rename("<GUID=%s>" % self.ou1, "OU=Original parent 2,%s" % self.top_ou)
625 # replicate them from DC1 to DC2
626 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
628 # Check the sub-OUs are now under Original Parent 3 (original
629 # parent 2 for Test CASE Child), and both have the right names
631 # Check that DC2 got the DC1 object, and the renames are all correct
632 res1 = self.ldb_dc2.search(base="<GUID=%s>" % ou1_child,
633 scope=SCOPE_BASE, attrs=["name"])
634 res2 = self.ldb_dc2.search(base="<GUID=%s>" % ou2_child,
635 scope=SCOPE_BASE, attrs=["name"])
636 res3 = self.ldb_dc2.search(base="<GUID=%s>" % ou3_child,
637 scope=SCOPE_BASE, attrs=["name"])
641 self.assertEqual('Test Child 2', res1[0]["name"][0])
642 self.assertEqual('Test Child', res2[0]["name"][0])
643 self.assertEqual('Test CASE Child', res3[0]["name"][0])
644 self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.top_ou)
645 self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.top_ou)
646 self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.top_ou)
648 # replicate them from DC2 to DC1
649 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
651 # Check that DC1 got the DC2 object, and the renames are all correct
652 res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
653 scope=SCOPE_BASE, attrs=["name"])
654 res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
655 scope=SCOPE_BASE, attrs=["name"])
656 res3 = self.ldb_dc1.search(base="<GUID=%s>" % ou3_child,
657 scope=SCOPE_BASE, attrs=["name"])
661 self.assertEqual('Test Child 2', res1[0]["name"][0])
662 self.assertEqual('Test Child', res2[0]["name"][0])
663 self.assertEqual('Test CASE Child', res3[0]["name"][0])
664 self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.top_ou)
665 self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.top_ou)
666 self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.top_ou)
668 # Delete all objects by GUID on DC1
670 self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
671 self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
672 self.ldb_dc1.delete('<GUID=%s>' % ou3_child)
673 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
674 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
676 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
679 # Check all deleted on DC1
680 self._check_deleted(self.ldb_dc1, self.ou1)
681 self._check_deleted(self.ldb_dc1, self.ou2)
682 self._check_deleted(self.ldb_dc1, ou1_child)
683 self._check_deleted(self.ldb_dc1, ou2_child)
684 self._check_deleted(self.ldb_dc1, ou3_child)
685 # Check all deleted on DC2
686 self._check_deleted(self.ldb_dc2, self.ou1)
687 self._check_deleted(self.ldb_dc2, self.ou2)
688 self._check_deleted(self.ldb_dc2, ou1_child)
689 self._check_deleted(self.ldb_dc2, ou2_child)
690 self._check_deleted(self.ldb_dc2, ou3_child)
692 def reanimate_object(self, samdb, guid, new_dn):
693 """Re-animates a deleted object"""
694 res = samdb.search(base="<GUID=%s>" % guid, attrs=["isDeleted"],
695 controls=['show_deleted:1'], scope=SCOPE_BASE)
701 msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
702 msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
703 samdb.modify(msg, ["show_deleted:1"])
705 def test_ReplReanimationConflict(self):
707 Checks that if a reanimated object conflicts with a new object, then
708 the conflict is resolved correctly.
711 self._disable_inbound_repl(self.dnsname_dc1)
712 self._disable_inbound_repl(self.dnsname_dc2)
714 # create an object, "accidentally" delete it, and replicate the changes to both DCs
715 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
716 self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
717 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
719 # Now pretend that the admin for one DC resolves the problem by
720 # re-animating the object...
721 self.reanimate_object(self.ldb_dc1, self.ou1, "OU=Conflict object,%s" % self.top_ou)
723 # ...whereas another admin just creates a user with the same name
724 # again on a different DC
726 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
728 # Now sync the DCs to resolve the conflict
729 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
731 # Check the latest change won and SELF.OU1 was made into a conflict
732 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
733 scope=SCOPE_BASE, attrs=["name"])
734 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
735 scope=SCOPE_BASE, attrs=["name"])
736 print(res1[0]["name"][0])
737 print(res2[0]["name"][0])
738 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
739 self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
741 # Delete both objects by GUID on DC1
742 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
743 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
745 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
747 self._check_deleted(self.ldb_dc1, self.ou1)
748 self._check_deleted(self.ldb_dc1, self.ou2)
749 # Check deleted on DC2
750 self._check_deleted(self.ldb_dc2, self.ou1)
751 self._check_deleted(self.ldb_dc2, self.ou2)