pytests: heed assertEquals deprecation warning en-masse
[samba.git] / source4 / torture / drs / python / replica_sync.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN replica_sync -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 from __future__ import print_function
31 import drs_base
32 import samba.tests
33 import time
34 import ldb
35
36 from ldb import (
37     SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT)
38
39
40 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
41     """Intended as a black box test case for DsReplicaSync
42        implementation. It should test the behavior of this
43        case in cases when inbound replication is disabled"""
44
45     def setUp(self):
46         super(DrsReplicaSyncTestCase, self).setUp()
47
48         # This OU avoids this test conflicting with anything
49         # that may already be in the DB
50         self.top_ou = samba.tests.create_test_ou(self.ldb_dc1,
51                                                  "replica_sync")
52         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
53         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
54         self.ou1 = None
55         self.ou2 = None
56
57     def tearDown(self):
58         self._cleanup_object(self.ou1)
59         self._cleanup_object(self.ou2)
60         self._cleanup_dn(self.top_ou)
61
62         # re-enable replication
63         self._enable_inbound_repl(self.dnsname_dc1)
64         self._enable_inbound_repl(self.dnsname_dc2)
65
66         super(DrsReplicaSyncTestCase, self).tearDown()
67
68     def _cleanup_dn(self, dn):
69         try:
70             self.ldb_dc2.delete(dn, ["tree_delete:1"])
71         except LdbError as e:
72             (num, _) = e.args
73             self.assertEqual(num, ERR_NO_SUCH_OBJECT)
74         try:
75             self.ldb_dc1.delete(dn, ["tree_delete:1"])
76         except LdbError as e1:
77             (num, _) = e1.args
78             self.assertEqual(num, ERR_NO_SUCH_OBJECT)
79
80     def _cleanup_object(self, guid):
81         """Cleans up a test object, if it still exists"""
82         if guid is not None:
83             self._cleanup_dn('<GUID=%s>' % guid)
84
85     def test_ReplEnabled(self):
86         """Tests we can replicate when replication is enabled"""
87         self._enable_inbound_repl(self.dnsname_dc1)
88         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False)
89
90     def test_ReplDisabled(self):
91         """Tests we cann't replicate when replication is disabled"""
92         self._disable_inbound_repl(self.dnsname_dc1)
93
94         ccache_name = self.get_creds_ccache_name()
95
96         # Tunnel the command line credentials down to the
97         # subcommand to avoid a new kinit
98         cmdline_auth = "--krb5-ccache=%s" % ccache_name
99
100         # bin/samba-tool drs <drs_command> <cmdline_auth>
101         cmd_list = ["drs", "replicate", cmdline_auth]
102
103         nc_dn = self.domain_dn
104         # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
105         cmd_list += [self.dnsname_dc1, self.dnsname_dc2, nc_dn]
106
107         (result, out, err) = self.runsubcmd(*cmd_list)
108         self.assertCmdFail(result)
109         self.assertTrue('WERR_DS_DRA_SINK_DISABLED' in err)
110
111     def test_ReplDisabledForced(self):
112         """Tests we can force replicate when replication is disabled"""
113         self._disable_inbound_repl(self.dnsname_dc1)
114         out = self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
115
116     def test_ReplLocal(self):
117         """Tests we can replicate direct to the local db"""
118         self._enable_inbound_repl(self.dnsname_dc1)
119         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False, local=True, full_sync=True)
120
121     def _create_ou(self, samdb, name):
122         ldif = """
123 dn: %s,%s
124 objectClass: organizationalUnit
125 """ % (name, self.top_ou)
126         samdb.add_ldif(ldif)
127         res = samdb.search(base="%s,%s" % (name, self.top_ou),
128                            scope=SCOPE_BASE, attrs=["objectGUID"])
129         return self._GUID_string(res[0]["objectGUID"][0])
130
131     def _check_deleted(self, sam_ldb, guid):
132         # search the user by guid as it may be deleted
133         res = sam_ldb.search(base='<GUID=%s>' % guid,
134                              controls=["show_deleted:1"],
135                              attrs=["isDeleted", "objectCategory", "ou"])
136         self.assertEqual(len(res), 1)
137         ou_cur = res[0]
138         # Deleted Object base DN
139         dodn = self._deleted_objects_dn(sam_ldb)
140         # now check properties of the user
141         name_cur = ou_cur["ou"][0]
142         self.assertEqual(ou_cur["isDeleted"][0], b"TRUE")
143         self.assertTrue(not(b"objectCategory" in ou_cur))
144         self.assertTrue(dodn in str(ou_cur["dn"]),
145                         "OU %s is deleted but it is not located under %s!" % (name_cur, dodn))
146
147     def test_ReplConflictsFullSync(self):
148         """Tests that objects created in conflict become conflict DNs (honour full sync override)"""
149
150         # First confirm local replication (so when we test against windows, this fails fast without creating objects)
151         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
152
153         self._disable_inbound_repl(self.dnsname_dc1)
154         self._disable_inbound_repl(self.dnsname_dc2)
155
156         # Create conflicting objects on DC1 and DC2, with DC1 object created first
157         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Full Sync")
158         # We have to sleep to ensure that the two objects have different timestamps
159         time.sleep(1)
160         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Full Sync")
161
162         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
163
164         # Check that DC2 got the DC1 object, and OU1 was make into conflict
165         res1 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou1,
166                                    scope=SCOPE_BASE, attrs=["name"])
167         res2 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou2,
168                                    scope=SCOPE_BASE, attrs=["name"])
169         print(res1[0]["name"][0])
170         print(res2[0]["name"][0])
171         self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
172         self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
173         self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res1[0].dn))
174         self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res2[0].dn))
175         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
176         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
177
178         # Delete both objects by GUID on DC2
179
180         self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
181         self.ldb_dc2.delete('<GUID=%s>' % self.ou2)
182
183         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=True)
184
185         self._check_deleted(self.ldb_dc1, self.ou1)
186         self._check_deleted(self.ldb_dc1, self.ou2)
187         # Check deleted on DC2
188         self._check_deleted(self.ldb_dc2, self.ou1)
189         self._check_deleted(self.ldb_dc2, self.ou2)
190
191     def test_ReplConflictsRemoteWin(self):
192         """Tests that objects created in conflict become conflict DNs"""
193         self._disable_inbound_repl(self.dnsname_dc1)
194         self._disable_inbound_repl(self.dnsname_dc2)
195
196         # Create conflicting objects on DC1 and DC2, with DC1 object created first
197         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Conflict")
198         # We have to sleep to ensure that the two objects have different timestamps
199         time.sleep(1)
200         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Conflict")
201
202         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
203
204         # Check that DC2 got the DC1 object, and OU1 was make into conflict
205         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
206                                    scope=SCOPE_BASE, attrs=["name"])
207         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
208                                    scope=SCOPE_BASE, attrs=["name"])
209         print(res1[0]["name"][0])
210         print(res2[0]["name"][0])
211         self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
212         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
213         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
214         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
215         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
216
217         # Delete both objects by GUID on DC1
218
219         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
220         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
221
222         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
223
224         self._check_deleted(self.ldb_dc1, self.ou1)
225         self._check_deleted(self.ldb_dc1, self.ou2)
226         # Check deleted on DC2
227         self._check_deleted(self.ldb_dc2, self.ou1)
228         self._check_deleted(self.ldb_dc2, self.ou2)
229
230     def test_ReplConflictsLocalWin(self):
231         """Tests that objects created in conflict become conflict DNs"""
232         self._disable_inbound_repl(self.dnsname_dc1)
233         self._disable_inbound_repl(self.dnsname_dc2)
234
235         # Create conflicting objects on DC1 and DC2, with DC2 object created first
236         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Local Conflict")
237         # We have to sleep to ensure that the two objects have different timestamps
238         time.sleep(1)
239         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Local Conflict")
240
241         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
242
243         # Check that DC2 got the DC1 object, and OU2 was make into conflict
244         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
245                                    scope=SCOPE_BASE, attrs=["name"])
246         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
247                                    scope=SCOPE_BASE, attrs=["name"])
248         print(res1[0]["name"][0])
249         print(res2[0]["name"][0])
250         self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]), "Got %s for %s" % (str(res2[0]["name"][0]), self.ou2))
251         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
252         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
253         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
254         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
255
256         # Delete both objects by GUID on DC1
257
258         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
259         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
260
261         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
262
263         self._check_deleted(self.ldb_dc1, self.ou1)
264         self._check_deleted(self.ldb_dc1, self.ou2)
265         # Check deleted on DC2
266         self._check_deleted(self.ldb_dc2, self.ou1)
267         self._check_deleted(self.ldb_dc2, self.ou2)
268
269     def test_ReplConflictsRemoteWin_with_child(self):
270         """Tests that objects created in conflict become conflict DNs"""
271         self._disable_inbound_repl(self.dnsname_dc1)
272         self._disable_inbound_repl(self.dnsname_dc2)
273
274         # Create conflicting objects on DC1 and DC2, with DC1 object created first
275         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Conflict")
276         # We have to sleep to ensure that the two objects have different timestamps
277         time.sleep(1)
278         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Conflict")
279         # Create children on DC2
280         ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Conflict")
281         ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Conflict")
282
283         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
284
285         # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
286         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
287                                    scope=SCOPE_BASE, attrs=["name"])
288         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
289                                    scope=SCOPE_BASE, attrs=["name"])
290         print(res1[0]["name"][0])
291         print(res2[0]["name"][0])
292         self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
293         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
294         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
295         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
296         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
297
298         # Delete both objects by GUID on DC1
299
300         self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
301         self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
302
303         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
304
305         self._check_deleted(self.ldb_dc1, self.ou1)
306         self._check_deleted(self.ldb_dc1, self.ou2)
307         # Check deleted on DC2
308         self._check_deleted(self.ldb_dc2, self.ou1)
309         self._check_deleted(self.ldb_dc2, self.ou2)
310
311         self._check_deleted(self.ldb_dc1, ou1_child)
312         self._check_deleted(self.ldb_dc1, ou2_child)
313         # Check deleted on DC2
314         self._check_deleted(self.ldb_dc2, ou1_child)
315         self._check_deleted(self.ldb_dc2, ou2_child)
316
317     def test_ReplConflictsRenamedVsNewRemoteWin(self):
318         """Tests resolving a DN conflict between a renamed object and a new object"""
319         self._disable_inbound_repl(self.dnsname_dc1)
320         self._disable_inbound_repl(self.dnsname_dc2)
321
322         # Create an OU and rename it on DC1
323         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict orig")
324         self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict,%s" % self.top_ou)
325
326         # We have to sleep to ensure that the two objects have different timestamps
327         time.sleep(1)
328
329         # create a conflicting object with the same DN on DC2
330         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict")
331
332         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
333
334         # Check that DC2 got the DC1 object, and SELF.OU1 was made into conflict
335         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
336                                    scope=SCOPE_BASE, attrs=["name"])
337         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
338                                    scope=SCOPE_BASE, attrs=["name"])
339         print(res1[0]["name"][0])
340         print(res2[0]["name"][0])
341         self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
342         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
343         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
344         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
345         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
346
347         # Delete both objects by GUID on DC1
348         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
349         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
350
351         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
352
353         self._check_deleted(self.ldb_dc1, self.ou1)
354         self._check_deleted(self.ldb_dc1, self.ou2)
355         # Check deleted on DC2
356         self._check_deleted(self.ldb_dc2, self.ou1)
357         self._check_deleted(self.ldb_dc2, self.ou2)
358
359     def test_ReplConflictsRenamedVsNewLocalWin(self):
360         """Tests resolving a DN conflict between a renamed object and a new object"""
361         self._disable_inbound_repl(self.dnsname_dc1)
362         self._disable_inbound_repl(self.dnsname_dc2)
363
364         # Create conflicting objects on DC1 and DC2, where the DC2 object has been renamed
365         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict orig")
366         self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict,%s" % self.top_ou)
367         # We have to sleep to ensure that the two objects have different timestamps
368         time.sleep(1)
369         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
370
371         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
372
373         # Check that DC2 got the DC1 object, and OU2 was made into conflict
374         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
375                                    scope=SCOPE_BASE, attrs=["name"])
376         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
377                                    scope=SCOPE_BASE, attrs=["name"])
378         print(res1[0]["name"][0])
379         print(res2[0]["name"][0])
380         self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
381         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
382         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
383         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
384         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
385
386         # Delete both objects by GUID on DC1
387         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
388         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
389
390         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
391
392         self._check_deleted(self.ldb_dc1, self.ou1)
393         self._check_deleted(self.ldb_dc1, self.ou2)
394         # Check deleted on DC2
395         self._check_deleted(self.ldb_dc2, self.ou1)
396         self._check_deleted(self.ldb_dc2, self.ou2)
397
398     def test_ReplConflictsRenameRemoteWin(self):
399         """Tests that objects created in conflict become conflict DNs"""
400         self._disable_inbound_repl(self.dnsname_dc1)
401         self._disable_inbound_repl(self.dnsname_dc2)
402
403         # Create conflicting objects on DC1 and DC2, with DC1 object created first
404         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict")
405         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict 2")
406
407         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
408
409         self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict 3,%s" % self.top_ou)
410         # We have to sleep to ensure that the two objects have different timestamps
411         time.sleep(1)
412         self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Remote Rename Conflict 3,%s" % self.top_ou)
413
414         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
415
416         # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
417         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
418                                    scope=SCOPE_BASE, attrs=["name"])
419         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
420                                    scope=SCOPE_BASE, attrs=["name"])
421         print(res1[0]["name"][0])
422         print(res2[0]["name"][0])
423         self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
424         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
425         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
426         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
427         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
428
429         # Delete both objects by GUID on DC1
430
431         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
432         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
433
434         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
435
436         self._check_deleted(self.ldb_dc1, self.ou1)
437         self._check_deleted(self.ldb_dc1, self.ou2)
438         # Check deleted on DC2
439         self._check_deleted(self.ldb_dc2, self.ou1)
440         self._check_deleted(self.ldb_dc2, self.ou2)
441
442     def test_ReplConflictsRenameRemoteWin_with_child(self):
443         """Tests that objects created in conflict become conflict DNs"""
444         self._disable_inbound_repl(self.dnsname_dc1)
445         self._disable_inbound_repl(self.dnsname_dc2)
446
447         # Create conflicting objects on DC1 and DC2, with DC1 object created first
448         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Rename Conflict")
449         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Rename Conflict 2")
450         # Create children on DC2
451         ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Rename Conflict")
452         ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Rename Conflict 2")
453
454         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
455
456         self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Parent Remote Rename Conflict 3,%s" % self.top_ou)
457         # We have to sleep to ensure that the two objects have different timestamps
458         time.sleep(1)
459         self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Parent Remote Rename Conflict 3,%s" % self.top_ou)
460
461         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
462
463         # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
464         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
465                                    scope=SCOPE_BASE, attrs=["name"])
466         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
467                                    scope=SCOPE_BASE, attrs=["name"])
468         print(res1[0]["name"][0])
469         print(res2[0]["name"][0])
470         self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
471         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
472         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
473         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
474         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
475
476         # Delete both objects by GUID on DC1
477
478         self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
479         self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
480
481         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
482
483         self._check_deleted(self.ldb_dc1, self.ou1)
484         self._check_deleted(self.ldb_dc1, self.ou2)
485         # Check deleted on DC2
486         self._check_deleted(self.ldb_dc2, self.ou1)
487         self._check_deleted(self.ldb_dc2, self.ou2)
488
489         self._check_deleted(self.ldb_dc1, ou1_child)
490         self._check_deleted(self.ldb_dc1, ou2_child)
491         # Check deleted on DC2
492         self._check_deleted(self.ldb_dc2, ou1_child)
493         self._check_deleted(self.ldb_dc2, ou2_child)
494
495     def test_ReplConflictsRenameLocalWin(self):
496         """Tests that objects created in conflict become conflict DNs"""
497         self._disable_inbound_repl(self.dnsname_dc1)
498         self._disable_inbound_repl(self.dnsname_dc2)
499
500         # Create conflicting objects on DC1 and DC2, with DC1 object created first
501         self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
502         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict 2")
503
504         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
505
506         self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict 3,%s" % self.top_ou)
507         # We have to sleep to ensure that the two objects have different timestamps
508         time.sleep(1)
509         self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Rename Local Conflict 3,%s" % self.top_ou)
510
511         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
512
513         # Check that DC2 got the DC1 object, and OU2 was make into conflict
514         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
515                                    scope=SCOPE_BASE, attrs=["name"])
516         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
517                                    scope=SCOPE_BASE, attrs=["name"])
518         print(res1[0]["name"][0])
519         print(res2[0]["name"][0])
520         self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
521         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
522         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
523         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
524         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
525
526         # Delete both objects by GUID on DC1
527
528         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
529         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
530
531         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
532
533         self._check_deleted(self.ldb_dc1, self.ou1)
534         self._check_deleted(self.ldb_dc1, self.ou2)
535         # Check deleted on DC2
536         self._check_deleted(self.ldb_dc2, self.ou1)
537         self._check_deleted(self.ldb_dc2, self.ou2)
538
539     def test_ReplLostAndFound(self):
540         """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
541         self._disable_inbound_repl(self.dnsname_dc1)
542         self._disable_inbound_repl(self.dnsname_dc2)
543
544         # Create two OUs on DC2
545         self.ou1 = self._create_ou(self.ldb_dc2, "OU=Deleted parent")
546         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Deleted parent 2")
547
548         # replicate them from DC2 to DC1
549         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
550
551         # Delete both objects by GUID on DC1
552
553         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
554         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
555
556         # Create children on DC2
557         ou1_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent")
558         ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent 2")
559
560         # Replicate from DC2
561         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
562
563         # Check the sub-OUs are now in lostAndFound and the first one is a conflict DN
564
565         # Check that DC2 got the DC1 object, and one or other object was make into conflict
566         res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
567                                    scope=SCOPE_BASE, attrs=["name"])
568         res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
569                                    scope=SCOPE_BASE, attrs=["name"])
570         print(res1[0]["name"][0])
571         print(res2[0]["name"][0])
572         self.assertTrue('CNF:%s' % ou1_child in str(res1[0]["name"][0]) or 'CNF:%s' % ou2_child in str(res2[0]["name"][0]))
573         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res1[0].dn))
574         self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res2[0].dn))
575         self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
576         self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
577
578         # Delete all objects by GUID on DC1
579
580         self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
581         self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
582
583         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
584
585         # Check all deleted on DC1
586         self._check_deleted(self.ldb_dc1, self.ou1)
587         self._check_deleted(self.ldb_dc1, self.ou2)
588         self._check_deleted(self.ldb_dc1, ou1_child)
589         self._check_deleted(self.ldb_dc1, ou2_child)
590         # Check all deleted on DC2
591         self._check_deleted(self.ldb_dc2, self.ou1)
592         self._check_deleted(self.ldb_dc2, self.ou2)
593         self._check_deleted(self.ldb_dc2, ou1_child)
594         self._check_deleted(self.ldb_dc2, ou2_child)
595
596     def test_ReplRenames(self):
597         """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
598         self._disable_inbound_repl(self.dnsname_dc1)
599         self._disable_inbound_repl(self.dnsname_dc2)
600
601         # Create two OUs on DC2
602         self.ou1 = self._create_ou(self.ldb_dc2, "OU=Original parent")
603         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Original parent 2")
604
605         # replicate them from DC2 to DC1
606         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
607
608         # Create children on DC1
609         ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Original parent")
610         ou2_child = self._create_ou(self.ldb_dc1, "OU=Test Child 2,OU=Original parent")
611         ou3_child = self._create_ou(self.ldb_dc1, "OU=Test Case Child,OU=Original parent")
612
613         # replicate them from DC1 to DC2
614         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
615
616         self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child 3,OU=Original parent 2,%s" % self.top_ou)
617         self.ldb_dc1.rename("<GUID=%s>" % ou1_child, "OU=Test Child 2,OU=Original parent 2,%s" % self.top_ou)
618         self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child,OU=Original parent 2,%s" % self.top_ou)
619         self.ldb_dc1.rename("<GUID=%s>" % ou3_child, "OU=Test CASE Child,OU=Original parent,%s" % self.top_ou)
620         self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Original parent 3,%s" % self.top_ou)
621         self.ldb_dc2.rename("<GUID=%s>" % self.ou1, "OU=Original parent 2,%s" % self.top_ou)
622
623         # replicate them from DC1 to DC2
624         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
625
626         # Check the sub-OUs are now under Original Parent 3 (original
627         # parent 2 for Test CASE Child), and both have the right names
628
629         # Check that DC2 got the DC1 object, and the renames are all correct
630         res1 = self.ldb_dc2.search(base="<GUID=%s>" % ou1_child,
631                                    scope=SCOPE_BASE, attrs=["name"])
632         res2 = self.ldb_dc2.search(base="<GUID=%s>" % ou2_child,
633                                    scope=SCOPE_BASE, attrs=["name"])
634         res3 = self.ldb_dc2.search(base="<GUID=%s>" % ou3_child,
635                                    scope=SCOPE_BASE, attrs=["name"])
636         print(res1[0].dn)
637         print(res2[0].dn)
638         print(res3[0].dn)
639         self.assertEqual('Test Child 2', str(res1[0]["name"][0]))
640         self.assertEqual('Test Child', str(res2[0]["name"][0]))
641         self.assertEqual('Test CASE Child', str(res3[0]["name"][0]))
642         self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.top_ou)
643         self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.top_ou)
644         self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.top_ou)
645
646         # replicate them from DC2 to DC1
647         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
648
649         # Check that DC1 got the DC2 object, and the renames are all correct
650         res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
651                                    scope=SCOPE_BASE, attrs=["name"])
652         res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
653                                    scope=SCOPE_BASE, attrs=["name"])
654         res3 = self.ldb_dc1.search(base="<GUID=%s>" % ou3_child,
655                                    scope=SCOPE_BASE, attrs=["name"])
656         print(res1[0].dn)
657         print(res2[0].dn)
658         print(res3[0].dn)
659         self.assertEqual('Test Child 2', str(res1[0]["name"][0]))
660         self.assertEqual('Test Child', str(res2[0]["name"][0]))
661         self.assertEqual('Test CASE Child', str(res3[0]["name"][0]))
662         self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.top_ou)
663         self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.top_ou)
664         self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.top_ou)
665
666         # Delete all objects by GUID on DC1
667
668         self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
669         self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
670         self.ldb_dc1.delete('<GUID=%s>' % ou3_child)
671         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
672         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
673
674         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
675
676         # Check all deleted on DC1
677         self._check_deleted(self.ldb_dc1, self.ou1)
678         self._check_deleted(self.ldb_dc1, self.ou2)
679         self._check_deleted(self.ldb_dc1, ou1_child)
680         self._check_deleted(self.ldb_dc1, ou2_child)
681         self._check_deleted(self.ldb_dc1, ou3_child)
682         # Check all deleted on DC2
683         self._check_deleted(self.ldb_dc2, self.ou1)
684         self._check_deleted(self.ldb_dc2, self.ou2)
685         self._check_deleted(self.ldb_dc2, ou1_child)
686         self._check_deleted(self.ldb_dc2, ou2_child)
687         self._check_deleted(self.ldb_dc2, ou3_child)
688
689     def reanimate_object(self, samdb, guid, new_dn):
690         """Re-animates a deleted object"""
691         res = samdb.search(base="<GUID=%s>" % guid, attrs=["isDeleted"],
692                            controls=['show_deleted:1'], scope=SCOPE_BASE)
693         if len(res) != 1:
694             return
695
696         msg = ldb.Message()
697         msg.dn = res[0].dn
698         msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
699         msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
700         samdb.modify(msg, ["show_deleted:1"])
701
702     def test_ReplReanimationConflict(self):
703         """
704         Checks that if a reanimated object conflicts with a new object, then
705         the conflict is resolved correctly.
706         """
707
708         self._disable_inbound_repl(self.dnsname_dc1)
709         self._disable_inbound_repl(self.dnsname_dc2)
710
711         # create an object, "accidentally" delete it, and replicate the changes to both DCs
712         self.ou1 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
713         self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
714         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
715
716         # Now pretend that the admin for one DC resolves the problem by
717         # re-animating the object...
718         self.reanimate_object(self.ldb_dc1, self.ou1, "OU=Conflict object,%s" % self.top_ou)
719
720         # ...whereas another admin just creates a user with the same name
721         # again on a different DC
722         time.sleep(1)
723         self.ou2 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
724
725         # Now sync the DCs to resolve the conflict
726         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
727
728         # Check the latest change won and SELF.OU1 was made into a conflict
729         res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
730                                    scope=SCOPE_BASE, attrs=["name"])
731         res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
732                                    scope=SCOPE_BASE, attrs=["name"])
733         print(res1[0]["name"][0])
734         print(res2[0]["name"][0])
735         self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
736         self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
737
738         # Delete both objects by GUID on DC1
739         self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
740         self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
741
742         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
743
744         self._check_deleted(self.ldb_dc1, self.ou1)
745         self._check_deleted(self.ldb_dc1, self.ou2)
746         # Check deleted on DC2
747         self._check_deleted(self.ldb_dc2, self.ou1)
748         self._check_deleted(self.ldb_dc2, self.ou2)