2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN replica_sync -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
36 SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT)
38 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
39 """Intended as a black box test case for DsReplicaSync
40 implementation. It should test the behavior of this
41 case in cases when inbound replication is disabled"""
44 super(DrsReplicaSyncTestCase, self).setUp()
45 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
46 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
51 self._cleanup_object(self.ou1)
52 self._cleanup_object(self.ou2)
54 # re-enable replication
55 self._enable_inbound_repl(self.dnsname_dc1)
56 self._enable_inbound_repl(self.dnsname_dc2)
58 super(DrsReplicaSyncTestCase, self).tearDown()
60 def _cleanup_object(self, guid):
61 """Cleans up a test object, if it still exists"""
64 self.ldb_dc2.delete('<GUID=%s>' % guid, ["tree_delete:1"])
65 except LdbError, (num, _):
66 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
68 self.ldb_dc1.delete('<GUID=%s>' % guid, ["tree_delete:1"])
69 except LdbError, (num, _):
70 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
72 def test_ReplEnabled(self):
73 """Tests we can replicate when replication is enabled"""
74 self._enable_inbound_repl(self.dnsname_dc1)
75 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False)
77 def test_ReplDisabled(self):
78 """Tests we cann't replicate when replication is disabled"""
79 self._disable_inbound_repl(self.dnsname_dc1)
81 ccache_name = self.get_creds_ccache_name()
83 # Tunnel the command line credentials down to the
84 # subcommand to avoid a new kinit
85 cmdline_auth = "--krb5-ccache=%s" % ccache_name
87 # bin/samba-tool drs <drs_command> <cmdline_auth>
88 cmd_list = ["drs", "replicate", cmdline_auth]
90 nc_dn = self.domain_dn
91 # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
92 cmd_list += [self.dnsname_dc1, self.dnsname_dc2, nc_dn]
94 (result, out, err) = self.runsubcmd(*cmd_list)
95 self.assertCmdFail(result)
96 self.assertTrue('WERR_DS_DRA_SINK_DISABLED' in err)
98 def test_ReplDisabledForced(self):
99 """Tests we can force replicate when replication is disabled"""
100 self._disable_inbound_repl(self.dnsname_dc1)
101 out = self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
103 def test_ReplLocal(self):
104 """Tests we can replicate direct to the local db"""
105 self._enable_inbound_repl(self.dnsname_dc1)
106 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False, local=True, full_sync=True)
108 def _create_ou(self, samdb, name):
111 objectClass: organizationalUnit
112 """ % (name, self.domain_dn)
114 res = samdb.search(base="%s,%s" % (name, self.domain_dn),
115 scope=SCOPE_BASE, attrs=["objectGUID"])
116 return self._GUID_string(res[0]["objectGUID"][0])
118 def _check_deleted(self, sam_ldb, guid):
119 # search the user by guid as it may be deleted
120 res = sam_ldb.search(base='<GUID=%s>' % guid,
121 controls=["show_deleted:1"],
122 attrs=["isDeleted", "objectCategory", "ou"])
123 self.assertEquals(len(res), 1)
125 # Deleted Object base DN
126 dodn = self._deleted_objects_dn(sam_ldb)
127 # now check properties of the user
128 name_cur = ou_cur["ou"][0]
129 self.assertEquals(ou_cur["isDeleted"][0],"TRUE")
130 self.assertTrue(not("objectCategory" in ou_cur))
131 self.assertTrue(dodn in str(ou_cur["dn"]),
132 "OU %s is deleted but it is not located under %s!" % (name_cur, dodn))
134 def test_ReplConflictsFullSync(self):
135 """Tests that objects created in conflict become conflict DNs (honour full sync override)"""
137 # First confirm local replication (so when we test against windows, this fails fast without creating objects)
138 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
140 self._disable_inbound_repl(self.dnsname_dc1)
141 self._disable_inbound_repl(self.dnsname_dc2)
143 # Create conflicting objects on DC1 and DC2, with DC1 object created first
144 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Full Sync")
145 # We have to sleep to ensure that the two objects have different timestamps
147 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Full Sync")
149 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
151 # Check that DC2 got the DC1 object, and OU1 was make into conflict
152 res1 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou1,
153 scope=SCOPE_BASE, attrs=["name"])
154 res2 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou2,
155 scope=SCOPE_BASE, attrs=["name"])
156 print res1[0]["name"][0]
157 print res2[0]["name"][0]
158 self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
159 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
160 self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res1[0].dn))
161 self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res2[0].dn))
162 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
163 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
165 # Delete both objects by GUID on DC2
167 self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
168 self.ldb_dc2.delete('<GUID=%s>' % self.ou2)
170 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=True)
172 self._check_deleted(self.ldb_dc1, self.ou1)
173 self._check_deleted(self.ldb_dc1, self.ou2)
174 # Check deleted on DC2
175 self._check_deleted(self.ldb_dc2, self.ou1)
176 self._check_deleted(self.ldb_dc2, self.ou2)
178 def test_ReplConflictsRemoteWin(self):
179 """Tests that objects created in conflict become conflict DNs"""
180 self._disable_inbound_repl(self.dnsname_dc1)
181 self._disable_inbound_repl(self.dnsname_dc2)
183 # Create conflicting objects on DC1 and DC2, with DC1 object created first
184 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Conflict")
185 # We have to sleep to ensure that the two objects have different timestamps
187 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Conflict")
189 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
191 # Check that DC2 got the DC1 object, and OU1 was make into conflict
192 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
193 scope=SCOPE_BASE, attrs=["name"])
194 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
195 scope=SCOPE_BASE, attrs=["name"])
196 print res1[0]["name"][0]
197 print res2[0]["name"][0]
198 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
199 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
200 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
201 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
202 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
204 # Delete both objects by GUID on DC1
206 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
207 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
209 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
211 self._check_deleted(self.ldb_dc1, self.ou1)
212 self._check_deleted(self.ldb_dc1, self.ou2)
213 # Check deleted on DC2
214 self._check_deleted(self.ldb_dc2, self.ou1)
215 self._check_deleted(self.ldb_dc2, self.ou2)
217 def test_ReplConflictsLocalWin(self):
218 """Tests that objects created in conflict become conflict DNs"""
219 self._disable_inbound_repl(self.dnsname_dc1)
220 self._disable_inbound_repl(self.dnsname_dc2)
222 # Create conflicting objects on DC1 and DC2, with DC2 object created first
223 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Local Conflict")
224 # We have to sleep to ensure that the two objects have different timestamps
226 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Local Conflict")
228 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
230 # Check that DC2 got the DC1 object, and OU2 was make into conflict
231 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
232 scope=SCOPE_BASE, attrs=["name"])
233 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
234 scope=SCOPE_BASE, attrs=["name"])
235 print res1[0]["name"][0]
236 print res2[0]["name"][0]
237 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]), "Got %s for %s" % (str(res2[0]["name"][0]), self.ou2))
238 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
239 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
240 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
241 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
243 # Delete both objects by GUID on DC1
245 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
246 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
248 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
250 self._check_deleted(self.ldb_dc1, self.ou1)
251 self._check_deleted(self.ldb_dc1, self.ou2)
252 # Check deleted on DC2
253 self._check_deleted(self.ldb_dc2, self.ou1)
254 self._check_deleted(self.ldb_dc2, self.ou2)
256 def test_ReplConflictsRemoteWin_with_child(self):
257 """Tests that objects created in conflict become conflict DNs"""
258 self._disable_inbound_repl(self.dnsname_dc1)
259 self._disable_inbound_repl(self.dnsname_dc2)
261 # Create conflicting objects on DC1 and DC2, with DC1 object created first
262 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Conflict")
263 # We have to sleep to ensure that the two objects have different timestamps
265 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Conflict")
266 # Create children on DC2
267 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Conflict")
268 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Conflict")
270 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
272 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
273 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
274 scope=SCOPE_BASE, attrs=["name"])
275 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
276 scope=SCOPE_BASE, attrs=["name"])
277 print res1[0]["name"][0]
278 print res2[0]["name"][0]
279 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
280 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
281 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
282 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
283 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
285 # Delete both objects by GUID on DC1
287 self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
288 self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
290 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
292 self._check_deleted(self.ldb_dc1, self.ou1)
293 self._check_deleted(self.ldb_dc1, self.ou2)
294 # Check deleted on DC2
295 self._check_deleted(self.ldb_dc2, self.ou1)
296 self._check_deleted(self.ldb_dc2, self.ou2)
298 self._check_deleted(self.ldb_dc1, ou1_child)
299 self._check_deleted(self.ldb_dc1, ou2_child)
300 # Check deleted on DC2
301 self._check_deleted(self.ldb_dc2, ou1_child)
302 self._check_deleted(self.ldb_dc2, ou2_child)
304 def test_ReplConflictsRenamedVsNewRemoteWin(self):
305 """Tests resolving a DN conflict between a renamed object and a new object"""
306 self._disable_inbound_repl(self.dnsname_dc1)
307 self._disable_inbound_repl(self.dnsname_dc2)
309 # Create an OU and rename it on DC1
310 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict orig")
311 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict,%s" % self.domain_dn)
313 # We have to sleep to ensure that the two objects have different timestamps
316 # create a conflicting object with the same DN on DC2
317 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict")
319 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
321 # Check that DC2 got the DC1 object, and SELF.OU1 was made into conflict
322 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
323 scope=SCOPE_BASE, attrs=["name"])
324 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
325 scope=SCOPE_BASE, attrs=["name"])
326 print res1[0]["name"][0]
327 print res2[0]["name"][0]
328 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
329 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
330 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
331 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
332 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
334 # Delete both objects by GUID on DC1
335 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
336 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
338 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
340 self._check_deleted(self.ldb_dc1, self.ou1)
341 self._check_deleted(self.ldb_dc1, self.ou2)
342 # Check deleted on DC2
343 self._check_deleted(self.ldb_dc2, self.ou1)
344 self._check_deleted(self.ldb_dc2, self.ou2)
346 def test_ReplConflictsRenamedVsNewLocalWin(self):
347 """Tests resolving a DN conflict between a renamed object and a new object"""
348 self._disable_inbound_repl(self.dnsname_dc1)
349 self._disable_inbound_repl(self.dnsname_dc2)
351 # Create conflicting objects on DC1 and DC2, where the DC2 object has been renamed
352 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict orig")
353 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict,%s" % self.domain_dn)
354 # We have to sleep to ensure that the two objects have different timestamps
356 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
358 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
360 # Check that DC2 got the DC1 object, and OU2 was made into conflict
361 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
362 scope=SCOPE_BASE, attrs=["name"])
363 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
364 scope=SCOPE_BASE, attrs=["name"])
365 print res1[0]["name"][0]
366 print res2[0]["name"][0]
367 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
368 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
369 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
370 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
371 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
373 # Delete both objects by GUID on DC1
374 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
375 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
377 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
379 self._check_deleted(self.ldb_dc1, self.ou1)
380 self._check_deleted(self.ldb_dc1, self.ou2)
381 # Check deleted on DC2
382 self._check_deleted(self.ldb_dc2, self.ou1)
383 self._check_deleted(self.ldb_dc2, self.ou2)
385 def test_ReplConflictsRenameRemoteWin(self):
386 """Tests that objects created in conflict become conflict DNs"""
387 self._disable_inbound_repl(self.dnsname_dc1)
388 self._disable_inbound_repl(self.dnsname_dc2)
390 # Create conflicting objects on DC1 and DC2, with DC1 object created first
391 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict")
392 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict 2")
394 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
396 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict 3,%s" % self.domain_dn)
397 # We have to sleep to ensure that the two objects have different timestamps
399 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Remote Rename Conflict 3,%s" % self.domain_dn)
401 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
403 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
404 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
405 scope=SCOPE_BASE, attrs=["name"])
406 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
407 scope=SCOPE_BASE, attrs=["name"])
408 print res1[0]["name"][0]
409 print res2[0]["name"][0]
410 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
411 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
412 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
413 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
414 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
416 # Delete both objects by GUID on DC1
418 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
419 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
421 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
423 self._check_deleted(self.ldb_dc1, self.ou1)
424 self._check_deleted(self.ldb_dc1, self.ou2)
425 # Check deleted on DC2
426 self._check_deleted(self.ldb_dc2, self.ou1)
427 self._check_deleted(self.ldb_dc2, self.ou2)
430 def test_ReplConflictsRenameRemoteWin_with_child(self):
431 """Tests that objects created in conflict become conflict DNs"""
432 self._disable_inbound_repl(self.dnsname_dc1)
433 self._disable_inbound_repl(self.dnsname_dc2)
435 # Create conflicting objects on DC1 and DC2, with DC1 object created first
436 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Rename Conflict")
437 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Rename Conflict 2")
438 # Create children on DC2
439 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Rename Conflict")
440 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Rename Conflict 2")
442 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
444 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Parent Remote Rename Conflict 3,%s" % self.domain_dn)
445 # We have to sleep to ensure that the two objects have different timestamps
447 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Parent Remote Rename Conflict 3,%s" % self.domain_dn)
449 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
451 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
452 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
453 scope=SCOPE_BASE, attrs=["name"])
454 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
455 scope=SCOPE_BASE, attrs=["name"])
456 print res1[0]["name"][0]
457 print res2[0]["name"][0]
458 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
459 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
460 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
461 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
462 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
464 # Delete both objects by GUID on DC1
466 self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
467 self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
469 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
471 self._check_deleted(self.ldb_dc1, self.ou1)
472 self._check_deleted(self.ldb_dc1, self.ou2)
473 # Check deleted on DC2
474 self._check_deleted(self.ldb_dc2, self.ou1)
475 self._check_deleted(self.ldb_dc2, self.ou2)
477 self._check_deleted(self.ldb_dc1, ou1_child)
478 self._check_deleted(self.ldb_dc1, ou2_child)
479 # Check deleted on DC2
480 self._check_deleted(self.ldb_dc2, ou1_child)
481 self._check_deleted(self.ldb_dc2, ou2_child)
484 def test_ReplConflictsRenameLocalWin(self):
485 """Tests that objects created in conflict become conflict DNs"""
486 self._disable_inbound_repl(self.dnsname_dc1)
487 self._disable_inbound_repl(self.dnsname_dc2)
489 # Create conflicting objects on DC1 and DC2, with DC1 object created first
490 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
491 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict 2")
493 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
495 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict 3,%s" % self.domain_dn)
496 # We have to sleep to ensure that the two objects have different timestamps
498 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Rename Local Conflict 3,%s" % self.domain_dn)
500 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
502 # Check that DC2 got the DC1 object, and OU2 was make into conflict
503 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
504 scope=SCOPE_BASE, attrs=["name"])
505 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
506 scope=SCOPE_BASE, attrs=["name"])
507 print res1[0]["name"][0]
508 print res2[0]["name"][0]
509 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
510 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
511 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
512 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
513 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
515 # Delete both objects by GUID on DC1
517 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
518 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
520 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
522 self._check_deleted(self.ldb_dc1, self.ou1)
523 self._check_deleted(self.ldb_dc1, self.ou2)
524 # Check deleted on DC2
525 self._check_deleted(self.ldb_dc2, self.ou1)
526 self._check_deleted(self.ldb_dc2, self.ou2)
528 def test_ReplLostAndFound(self):
529 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
530 self._disable_inbound_repl(self.dnsname_dc1)
531 self._disable_inbound_repl(self.dnsname_dc2)
533 # Create two OUs on DC2
534 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Deleted parent")
535 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Deleted parent 2")
537 # replicate them from DC2 to DC1
538 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
540 # Delete both objects by GUID on DC1
542 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
543 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
545 # Create children on DC2
546 ou1_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent")
547 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent 2")
550 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
552 # Check the sub-OUs are now in lostAndFound and the first one is a conflict DN
554 # Check that DC2 got the DC1 object, and one or other object was make into conflict
555 res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
556 scope=SCOPE_BASE, attrs=["name"])
557 res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
558 scope=SCOPE_BASE, attrs=["name"])
559 print res1[0]["name"][0]
560 print res2[0]["name"][0]
561 self.assertTrue('CNF:%s' % ou1_child in str(res1[0]["name"][0]) or 'CNF:%s' % ou2_child in str(res2[0]["name"][0]))
562 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res1[0].dn))
563 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res2[0].dn))
564 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
565 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
567 # Delete all objects by GUID on DC1
569 self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
570 self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
572 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
575 # Check all deleted on DC1
576 self._check_deleted(self.ldb_dc1, self.ou1)
577 self._check_deleted(self.ldb_dc1, self.ou2)
578 self._check_deleted(self.ldb_dc1, ou1_child)
579 self._check_deleted(self.ldb_dc1, ou2_child)
580 # Check all deleted on DC2
581 self._check_deleted(self.ldb_dc2, self.ou1)
582 self._check_deleted(self.ldb_dc2, self.ou2)
583 self._check_deleted(self.ldb_dc2, ou1_child)
584 self._check_deleted(self.ldb_dc2, ou2_child)
586 def test_ReplRenames(self):
587 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
588 self._disable_inbound_repl(self.dnsname_dc1)
589 self._disable_inbound_repl(self.dnsname_dc2)
591 # Create two OUs on DC2
592 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Original parent")
593 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Original parent 2")
595 # replicate them from DC2 to DC1
596 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
598 # Create children on DC1
599 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Original parent")
600 ou2_child = self._create_ou(self.ldb_dc1, "OU=Test Child 2,OU=Original parent")
601 ou3_child = self._create_ou(self.ldb_dc1, "OU=Test Case Child,OU=Original parent")
603 # replicate them from DC1 to DC2
604 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
606 self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child 3,OU=Original parent 2,%s" % self.domain_dn)
607 self.ldb_dc1.rename("<GUID=%s>" % ou1_child, "OU=Test Child 2,OU=Original parent 2,%s" % self.domain_dn)
608 self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child,OU=Original parent 2,%s" % self.domain_dn)
609 self.ldb_dc1.rename("<GUID=%s>" % ou3_child, "OU=Test CASE Child,OU=Original parent,%s" % self.domain_dn)
610 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Original parent 3,%s" % self.domain_dn)
611 self.ldb_dc2.rename("<GUID=%s>" % self.ou1, "OU=Original parent 2,%s" % self.domain_dn)
613 # replicate them from DC1 to DC2
614 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
616 # Check the sub-OUs are now under Original Parent 3 (original
617 # parent 2 for Test CASE Child), and both have the right names
619 # Check that DC2 got the DC1 object, and the renames are all correct
620 res1 = self.ldb_dc2.search(base="<GUID=%s>" % ou1_child,
621 scope=SCOPE_BASE, attrs=["name"])
622 res2 = self.ldb_dc2.search(base="<GUID=%s>" % ou2_child,
623 scope=SCOPE_BASE, attrs=["name"])
624 res3 = self.ldb_dc2.search(base="<GUID=%s>" % ou3_child,
625 scope=SCOPE_BASE, attrs=["name"])
629 self.assertEqual('Test Child 2', res1[0]["name"][0])
630 self.assertEqual('Test Child', res2[0]["name"][0])
631 self.assertEqual('Test CASE Child', res3[0]["name"][0])
632 self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.domain_dn)
633 self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.domain_dn)
634 self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.domain_dn)
636 # replicate them from DC2 to DC1
637 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
639 # Check that DC1 got the DC2 object, and the renames are all correct
640 res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
641 scope=SCOPE_BASE, attrs=["name"])
642 res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
643 scope=SCOPE_BASE, attrs=["name"])
644 res3 = self.ldb_dc1.search(base="<GUID=%s>" % ou3_child,
645 scope=SCOPE_BASE, attrs=["name"])
649 self.assertEqual('Test Child 2', res1[0]["name"][0])
650 self.assertEqual('Test Child', res2[0]["name"][0])
651 self.assertEqual('Test CASE Child', res3[0]["name"][0])
652 self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.domain_dn)
653 self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.domain_dn)
654 self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.domain_dn)
656 # Delete all objects by GUID on DC1
658 self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
659 self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
660 self.ldb_dc1.delete('<GUID=%s>' % ou3_child)
661 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
662 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
664 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
667 # Check all deleted on DC1
668 self._check_deleted(self.ldb_dc1, self.ou1)
669 self._check_deleted(self.ldb_dc1, self.ou2)
670 self._check_deleted(self.ldb_dc1, ou1_child)
671 self._check_deleted(self.ldb_dc1, ou2_child)
672 self._check_deleted(self.ldb_dc1, ou3_child)
673 # Check all deleted on DC2
674 self._check_deleted(self.ldb_dc2, self.ou1)
675 self._check_deleted(self.ldb_dc2, self.ou2)
676 self._check_deleted(self.ldb_dc2, ou1_child)
677 self._check_deleted(self.ldb_dc2, ou2_child)
678 self._check_deleted(self.ldb_dc2, ou3_child)
680 def reanimate_object(self, samdb, guid, new_dn):
681 """Re-animates a deleted object"""
682 res = samdb.search(base="<GUID=%s>" % guid, attrs=["isDeleted"],
683 controls=['show_deleted:1'], scope=SCOPE_BASE)
689 msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
690 msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
691 samdb.modify(msg, ["show_deleted:1"])
693 def test_ReplReanimationConflict(self):
695 Checks that if a reanimated object conflicts with a new object, then
696 the conflict is resolved correctly.
699 self._disable_inbound_repl(self.dnsname_dc1)
700 self._disable_inbound_repl(self.dnsname_dc2)
702 # create an object, "accidentally" delete it, and replicate the changes to both DCs
703 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
704 self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
705 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
707 # Now pretend that the admin for one DC resolves the problem by
708 # re-animating the object...
709 self.reanimate_object(self.ldb_dc1, self.ou1, "OU=Conflict object,%s" % self.domain_dn)
711 # ...whereas another admin just creates a user with the same name
712 # again on a different DC
714 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
716 # Now sync the DCs to resolve the conflict
717 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
719 # Check the latest change won and SELF.OU1 was made into a conflict
720 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
721 scope=SCOPE_BASE, attrs=["name"])
722 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
723 scope=SCOPE_BASE, attrs=["name"])
724 print res1[0]["name"][0]
725 print res2[0]["name"][0]
726 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
727 self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
729 # Delete both objects by GUID on DC1
730 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
731 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
733 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
735 self._check_deleted(self.ldb_dc1, self.ou1)
736 self._check_deleted(self.ldb_dc1, self.ou2)
737 # Check deleted on DC2
738 self._check_deleted(self.ldb_dc2, self.ou1)
739 self._check_deleted(self.ldb_dc2, self.ou2)