selftest: Disable all replication during most replication tests
[samba.git] / source4 / torture / drs / python / delete_object.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20
21 #
22 # Usage:
23 #  export DC1=dc1_dns_name
24 #  export DC2=dc2_dns_name
25 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
26 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN delete_object -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
27 #
28
29 import time
30
31
32 from ldb import (
33     SCOPE_SUBTREE,
34     )
35
36 import drs_base, ldb
37
38
39 class DrsDeleteObjectTestCase(drs_base.DrsBaseTestCase):
40
41     def setUp(self):
42         super(DrsDeleteObjectTestCase, self).setUp()
43         # disable automatic replication temporary
44         self._disable_all_repl(self.dnsname_dc1)
45         self._disable_all_repl(self.dnsname_dc2)
46         # make sure DCs are synchronized before the test
47         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
48         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
49
50     def tearDown(self):
51         self._enable_all_repl(self.dnsname_dc1)
52         self._enable_all_repl(self.dnsname_dc2)
53         super(DrsDeleteObjectTestCase, self).tearDown()
54
55     def _make_username(self):
56         return "DrsDelObjUser_" + time.strftime("%s", time.gmtime())
57
58     # now also used to check the group
59     def _check_obj(self, sam_ldb, obj_orig, is_deleted):
60         # search the user by guid as it may be deleted
61         guid_str = self._GUID_string(obj_orig["objectGUID"][0])
62         expression = "(objectGUID=%s)" % guid_str
63         res = sam_ldb.search(base=self.domain_dn,
64                              expression=expression,
65                              controls=["show_deleted:1"])
66         self.assertEquals(len(res), 1)
67         user_cur = res[0]
68         # Deleted Object base DN
69         dodn = self._deleted_objects_dn(sam_ldb)
70         # now check properties of the user
71         cn_orig = obj_orig["cn"][0]
72         cn_cur  = user_cur["cn"][0]
73         name_orig = obj_orig["name"][0]
74         name_cur  = user_cur["name"][0]
75         if is_deleted:
76             self.assertEquals(user_cur["isDeleted"][0],"TRUE")
77             self.assertFalse("objectCategory" in user_cur)
78             self.assertFalse("sAMAccountType" in user_cur)
79             self.assertFalse("description" in user_cur)
80             self.assertFalse("memberOf" in user_cur)
81             self.assertFalse("member" in user_cur)
82             self.assertTrue(dodn in str(user_cur["dn"]),
83                             "User %s is deleted but it is not located under %s (found at %s)!" % (name_orig, dodn, user_cur["dn"]))
84             self.assertEquals(name_cur, name_orig + "\nDEL:" + guid_str)
85             self.assertEquals(name_cur, user_cur.dn.get_rdn_value())
86             self.assertEquals(cn_cur, cn_orig + "\nDEL:" + guid_str)
87             self.assertEquals(name_cur, cn_cur)
88         else:
89             self.assertFalse("isDeleted" in user_cur)
90             self.assertEquals(name_cur, name_orig)
91             self.assertEquals(name_cur, user_cur.dn.get_rdn_value())
92             self.assertEquals(cn_cur, cn_orig)
93             self.assertEquals(name_cur, cn_cur)
94             self.assertEquals(obj_orig["dn"], user_cur["dn"])
95             self.assertTrue(dodn not in str(user_cur["dn"]))
96         return user_cur
97
98     def test_ReplicateDeletedObject1(self):
99         """Verifies how a deleted-object is replicated between two DCs.
100            This test should verify that:
101             - deleted-object is replicated properly
102             - We verify that after replication,
103               object's state to conform to a tombstone-object state
104             - This test replicates the object modifications to
105               the server with the user deleted first
106
107            TODO:  It will also be great if check replPropertyMetaData.
108            TODO:  Check for deleted-object state, depending on DC's features
109                   when recycle-bin is enabled
110            """
111         # work-out unique username to test with
112         username = self._make_username()
113
114         # create user on DC1
115         self.ldb_dc1.newuser(username=username, password="P@sswOrd!")
116         ldb_res = self.ldb_dc1.search(base=self.domain_dn,
117                                       scope=SCOPE_SUBTREE,
118                                       expression="(samAccountName=%s)" % username)
119         self.assertEquals(len(ldb_res), 1)
120         user_orig = ldb_res[0]
121         user_dn   = ldb_res[0]["dn"]
122
123         # check user info on DC1
124         print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
125         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
126
127         # trigger replication from DC1 to DC2
128         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
129
130         # delete user on DC1
131         self.ldb_dc1.delete(user_dn)
132         # check user info on DC1 - should be deleted
133         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
134         # check user info on DC2 - should be valid user
135         user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
136
137         # The user should not have a description or memberOf yet
138         self.assertFalse("description" in user_cur)
139         self.assertFalse("memberOf" in user_cur)
140
141         self.ldb_dc2.newgroup("group_%s" % username)
142
143         self.ldb_dc2.newgroup("group2_%s" % username)
144
145         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
146                                       scope=SCOPE_SUBTREE,
147                                       expression="(samAccountName=group_%s)" % username)
148         self.assertTrue(len(ldb_res) == 1)
149         self.assertTrue("sAMAccountName" in ldb_res[0])
150         group_orig = ldb_res[0]
151         group_dn = ldb_res[0]["dn"]
152
153         # modify user on DC2 to have a description and be a member of the group
154         m = ldb.Message()
155         m.dn = user_dn
156         m["description"] = ldb.MessageElement("a description",
157                                               ldb.FLAG_MOD_ADD, "description")
158         self.ldb_dc2.modify(m)
159         m = ldb.Message()
160         m.dn = group_dn
161         m["member"] = ldb.MessageElement(str(user_dn),
162                                            ldb.FLAG_MOD_ADD, "member")
163         self.ldb_dc2.modify(m)
164
165         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
166                                       scope=SCOPE_SUBTREE,
167                                       expression="(samAccountName=group2_%s)" % username)
168         self.assertTrue(len(ldb_res) == 1)
169         self.assertTrue("sAMAccountName" in ldb_res[0])
170         group2_dn = ldb_res[0]["dn"]
171         group2_orig = ldb_res[0]
172
173         m = ldb.Message()
174         m.dn = group2_dn
175         m["member"] = ldb.MessageElement(str(group_dn),
176                                          ldb.FLAG_MOD_ADD, "member")
177         self.ldb_dc2.modify(m)
178
179         # check user info on DC2 - should be valid user
180         user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
181
182         # The user should not have a description yet
183         self.assertTrue("description" in user_cur)
184         self.assertTrue("memberOf" in user_cur)
185
186         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
187                                       scope=SCOPE_SUBTREE,
188                                       expression="(samAccountName=group_%s)" % username)
189         self.assertTrue(len(ldb_res) == 1)
190
191         # This group is a member of another group
192         self.assertTrue("memberOf" in ldb_res[0])
193
194         # The user was deleted on DC1, but check the modify we just did on DC2
195         self.assertTrue("member" in ldb_res[0])
196
197         # trigger replication from DC2 to DC1
198         # to check if deleted object gets restored
199         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
200         # check user info on DC1 - should be deleted
201         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
202         # check user info on DC2 - should be valid user
203         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
204
205         ldb_res = self.ldb_dc1.search(base=self.domain_dn,
206                                       scope=SCOPE_SUBTREE,
207                                       expression="(samAccountName=group_%s)" % username)
208         self.assertTrue(len(ldb_res) == 1)
209
210         # This group is a member of another group
211         self.assertTrue("memberOf" in ldb_res[0])
212
213         # The user was deleted on DC1, but the modify we did on DC2, check it never replicated in
214         self.assertFalse("member" in ldb_res[0])
215
216         # trigger replication from DC1 to DC2
217         # to check if deleted object is replicated
218         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
219         # check user info on DC1 - should be deleted
220         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
221         # check user info on DC2 - should be deleted
222         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
223
224         # delete group on DC1
225         self.ldb_dc1.delete(group_dn)
226
227         # trigger replication from DC1 to DC2
228         # to check if deleted object is replicated
229         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
230
231         # check group info on DC1 - should be deleted
232         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=group_orig, is_deleted=True)
233         # check group info on DC2 - should be deleted
234         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=group_orig, is_deleted=True)
235
236         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
237                                       scope=SCOPE_SUBTREE,
238                                       expression="(samAccountName=group2_%s)" % username)
239         self.assertTrue(len(ldb_res) == 1)
240         self.assertFalse("member" in ldb_res[0])
241
242         # delete group on DC1
243         self.ldb_dc1.delete(group2_dn)
244
245         # trigger replication from DC1 to DC2
246         # to check if deleted object is replicated
247         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
248
249         # check group info on DC1 - should be deleted
250         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=group2_orig, is_deleted=True)
251         # check group info on DC2 - should be deleted
252         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=group2_orig, is_deleted=True)
253
254     def test_ReplicateDeletedObject2(self):
255         """Verifies how a deleted-object is replicated between two DCs.
256            This test should verify that:
257             - deleted-object is replicated properly
258             - We verify that after replication,
259               object's state to conform to a tombstone-object state
260             - This test replicates the delete to the server with the
261               object modifications first
262
263            TODO:  It will also be great if check replPropertyMetaData.
264            TODO:  Check for deleted-object state, depending on DC's features
265                   when recycle-bin is enabled
266            """
267         # work-out unique username to test with
268         username = self._make_username()
269
270         # create user on DC1
271         self.ldb_dc1.newuser(username=username, password="P@sswOrd!")
272         ldb_res = self.ldb_dc1.search(base=self.domain_dn,
273                                       scope=SCOPE_SUBTREE,
274                                       expression="(samAccountName=%s)" % username)
275         self.assertEquals(len(ldb_res), 1)
276         user_orig = ldb_res[0]
277         user_dn   = ldb_res[0]["dn"]
278
279         # check user info on DC1
280         print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
281         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
282
283         # trigger replication from DC1 to DC2
284         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
285
286         # delete user on DC1
287         self.ldb_dc1.delete(user_dn)
288         # check user info on DC1 - should be deleted
289         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
290         # check user info on DC2 - should be valid user
291         user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
292
293         # The user should not have a description or memberOf yet
294         self.assertFalse("description" in user_cur)
295         self.assertFalse("memberOf" in user_cur)
296
297         self.ldb_dc2.newgroup("group_%s" % username)
298
299         self.ldb_dc2.newgroup("group2_%s" % username)
300
301         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
302                                       scope=SCOPE_SUBTREE,
303                                       expression="(samAccountName=group_%s)" % username)
304         self.assertTrue(len(ldb_res) == 1)
305         self.assertTrue("sAMAccountName" in ldb_res[0])
306         group_orig = ldb_res[0]
307         group_dn = ldb_res[0]["dn"]
308
309         # modify user on DC2 to have a description and be a member of the group
310         m = ldb.Message()
311         m.dn = user_dn
312         m["description"] = ldb.MessageElement("a description",
313                                               ldb.FLAG_MOD_ADD, "description")
314         self.ldb_dc2.modify(m)
315         m = ldb.Message()
316         m.dn = group_dn
317         m["member"] = ldb.MessageElement(str(user_dn),
318                                            ldb.FLAG_MOD_ADD, "member")
319         self.ldb_dc2.modify(m)
320
321         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
322                                       scope=SCOPE_SUBTREE,
323                                       expression="(samAccountName=group2_%s)" % username)
324         self.assertTrue(len(ldb_res) == 1)
325         self.assertTrue("sAMAccountName" in ldb_res[0])
326         group2_dn = ldb_res[0]["dn"]
327         group2_orig = ldb_res[0]
328
329         m = ldb.Message()
330         m.dn = group2_dn
331         m["member"] = ldb.MessageElement(str(group_dn),
332                                          ldb.FLAG_MOD_ADD, "member")
333         self.ldb_dc2.modify(m)
334
335         # check user info on DC2 - should be valid user
336         user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
337
338         # The user should not have a description yet
339         self.assertTrue("description" in user_cur)
340         self.assertTrue("memberOf" in user_cur)
341
342         # trigger replication from DC1 to DC2
343         # to check if deleted object gets restored
344         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
345         # check user info on DC1 - should be deleted
346         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
347         # check user info on DC2 - should be deleted
348         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
349
350         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
351                                       scope=SCOPE_SUBTREE,
352                                       expression="(samAccountName=group_%s)" % username)
353         self.assertTrue(len(ldb_res) == 1)
354         self.assertTrue("memberOf" in ldb_res[0])
355         self.assertFalse("member" in ldb_res[0])
356
357         # trigger replication from DC2 to DC1
358         # to check if deleted object is replicated
359         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
360         # check user info on DC1 - should be deleted
361         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
362         # check user info on DC2 - should be deleted
363         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
364
365         ldb_res = self.ldb_dc1.search(base=self.domain_dn,
366                                       scope=SCOPE_SUBTREE,
367                                       expression="(samAccountName=group_%s)" % username)
368         self.assertTrue(len(ldb_res) == 1)
369         self.assertTrue("memberOf" in ldb_res[0])
370         self.assertFalse("member" in ldb_res[0])
371
372         # delete group on DC1
373         self.ldb_dc1.delete(group_dn)
374         self.ldb_dc1.delete(group2_dn)
375
376         # trigger replication from DC1 to DC2, for cleanup
377         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
378