4cbe2e858d71ad8092a7bfd232cb50bfb5671647
[samba.git] / source4 / torture / drs / python / delete_object.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20
21 #
22 # Usage:
23 #  export DC1=dc1_dns_name
24 #  export DC2=dc2_dns_name
25 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
26 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN delete_object -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
27 #
28
29 from __future__ import print_function
30 import time
31
32
33 from ldb import (
34     SCOPE_SUBTREE,
35 )
36
37 import drs_base, ldb
38
39
40 class DrsDeleteObjectTestCase(drs_base.DrsBaseTestCase):
41
42     def setUp(self):
43         super(DrsDeleteObjectTestCase, self).setUp()
44         # disable automatic replication temporary
45         self._disable_all_repl(self.dnsname_dc1)
46         self._disable_all_repl(self.dnsname_dc2)
47         # make sure DCs are synchronized before the test
48         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
49         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
50
51     def tearDown(self):
52         self._enable_all_repl(self.dnsname_dc1)
53         self._enable_all_repl(self.dnsname_dc2)
54         super(DrsDeleteObjectTestCase, self).tearDown()
55
56     def _make_username(self):
57         return "DrsDelObjUser_" + time.strftime("%s", time.gmtime())
58
59     # now also used to check the group
60     def _check_obj(self, sam_ldb, obj_orig, is_deleted):
61         # search the user by guid as it may be deleted
62         guid_str = self._GUID_string(obj_orig["objectGUID"][0])
63         expression = "(objectGUID=%s)" % guid_str
64         res = sam_ldb.search(base=self.domain_dn,
65                              expression=expression,
66                              controls=["show_deleted:1"])
67         self.assertEquals(len(res), 1)
68         user_cur = res[0]
69         # Deleted Object base DN
70         dodn = self._deleted_objects_dn(sam_ldb)
71         # now check properties of the user
72         cn_orig = obj_orig["cn"][0]
73         cn_cur  = user_cur["cn"][0]
74         name_orig = obj_orig["name"][0]
75         name_cur  = user_cur["name"][0]
76         if is_deleted:
77             self.assertEquals(user_cur["isDeleted"][0],"TRUE")
78             self.assertFalse("objectCategory" in user_cur)
79             self.assertFalse("sAMAccountType" in user_cur)
80             self.assertFalse("description" in user_cur)
81             self.assertFalse("memberOf" in user_cur)
82             self.assertFalse("member" in user_cur)
83             self.assertTrue(dodn in str(user_cur["dn"]),
84                             "User %s is deleted but it is not located under %s (found at %s)!" % (name_orig, dodn, user_cur["dn"]))
85             self.assertEquals(name_cur, name_orig + "\nDEL:" + guid_str)
86             self.assertEquals(name_cur, user_cur.dn.get_rdn_value())
87             self.assertEquals(cn_cur, cn_orig + "\nDEL:" + guid_str)
88             self.assertEquals(name_cur, cn_cur)
89         else:
90             self.assertFalse("isDeleted" in user_cur)
91             self.assertEquals(name_cur, name_orig)
92             self.assertEquals(name_cur, user_cur.dn.get_rdn_value())
93             self.assertEquals(cn_cur, cn_orig)
94             self.assertEquals(name_cur, cn_cur)
95             self.assertEquals(obj_orig["dn"], user_cur["dn"])
96             self.assertTrue(dodn not in str(user_cur["dn"]))
97         return user_cur
98
99     def test_ReplicateDeletedObject1(self):
100         """Verifies how a deleted-object is replicated between two DCs.
101            This test should verify that:
102             - deleted-object is replicated properly
103             - We verify that after replication,
104               object's state to conform to a tombstone-object state
105             - This test replicates the object modifications to
106               the server with the user deleted first
107
108            TODO:  It will also be great if check replPropertyMetaData.
109            TODO:  Check for deleted-object state, depending on DC's features
110                   when recycle-bin is enabled
111            """
112         # work-out unique username to test with
113         username = self._make_username()
114
115         # create user on DC1
116         self.ldb_dc1.newuser(username=username, password="P@sswOrd!")
117         ldb_res = self.ldb_dc1.search(base=self.domain_dn,
118                                       scope=SCOPE_SUBTREE,
119                                       expression="(samAccountName=%s)" % username)
120         self.assertEquals(len(ldb_res), 1)
121         user_orig = ldb_res[0]
122         user_dn   = ldb_res[0]["dn"]
123
124         # check user info on DC1
125         print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
126         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
127
128         # trigger replication from DC1 to DC2
129         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
130
131         # delete user on DC1
132         self.ldb_dc1.delete(user_dn)
133         # check user info on DC1 - should be deleted
134         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
135         # check user info on DC2 - should be valid user
136         user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
137
138         # The user should not have a description or memberOf yet
139         self.assertFalse("description" in user_cur)
140         self.assertFalse("memberOf" in user_cur)
141
142         self.ldb_dc2.newgroup("group_%s" % username)
143
144         self.ldb_dc2.newgroup("group2_%s" % username)
145
146         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
147                                       scope=SCOPE_SUBTREE,
148                                       expression="(samAccountName=group_%s)" % username)
149         self.assertTrue(len(ldb_res) == 1)
150         self.assertTrue("sAMAccountName" in ldb_res[0])
151         group_orig = ldb_res[0]
152         group_dn = ldb_res[0]["dn"]
153
154         # modify user on DC2 to have a description and be a member of the group
155         m = ldb.Message()
156         m.dn = user_dn
157         m["description"] = ldb.MessageElement("a description",
158                                               ldb.FLAG_MOD_ADD, "description")
159         self.ldb_dc2.modify(m)
160         m = ldb.Message()
161         m.dn = group_dn
162         m["member"] = ldb.MessageElement(str(user_dn),
163                                          ldb.FLAG_MOD_ADD, "member")
164         self.ldb_dc2.modify(m)
165
166         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
167                                       scope=SCOPE_SUBTREE,
168                                       expression="(samAccountName=group2_%s)" % username)
169         self.assertTrue(len(ldb_res) == 1)
170         self.assertTrue("sAMAccountName" in ldb_res[0])
171         group2_dn = ldb_res[0]["dn"]
172         group2_orig = ldb_res[0]
173
174         m = ldb.Message()
175         m.dn = group2_dn
176         m["member"] = ldb.MessageElement(str(group_dn),
177                                          ldb.FLAG_MOD_ADD, "member")
178         self.ldb_dc2.modify(m)
179
180         # check user info on DC2 - should be valid user
181         user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
182
183         # The user should not have a description yet
184         self.assertTrue("description" in user_cur)
185         self.assertTrue("memberOf" in user_cur)
186
187         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
188                                       scope=SCOPE_SUBTREE,
189                                       expression="(samAccountName=group_%s)" % username)
190         self.assertTrue(len(ldb_res) == 1)
191
192         # This group is a member of another group
193         self.assertTrue("memberOf" in ldb_res[0])
194
195         # The user was deleted on DC1, but check the modify we just did on DC2
196         self.assertTrue("member" in ldb_res[0])
197
198         # trigger replication from DC2 to DC1
199         # to check if deleted object gets restored
200         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
201         # check user info on DC1 - should be deleted
202         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
203         # check user info on DC2 - should be valid user
204         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
205
206         ldb_res = self.ldb_dc1.search(base=self.domain_dn,
207                                       scope=SCOPE_SUBTREE,
208                                       expression="(samAccountName=group_%s)" % username)
209         self.assertTrue(len(ldb_res) == 1)
210
211         # This group is a member of another group
212         self.assertTrue("memberOf" in ldb_res[0])
213
214         # The user was deleted on DC1, but the modify we did on DC2, check it never replicated in
215         self.assertFalse("member" in ldb_res[0])
216
217         # trigger replication from DC1 to DC2
218         # to check if deleted object is replicated
219         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
220         # check user info on DC1 - should be deleted
221         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
222         # check user info on DC2 - should be deleted
223         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
224
225         # delete group on DC1
226         self.ldb_dc1.delete(group_dn)
227
228         # trigger replication from DC1 to DC2
229         # to check if deleted object is replicated
230         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
231
232         # check group info on DC1 - should be deleted
233         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=group_orig, is_deleted=True)
234         # check group info on DC2 - should be deleted
235         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=group_orig, is_deleted=True)
236
237         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
238                                       scope=SCOPE_SUBTREE,
239                                       expression="(samAccountName=group2_%s)" % username)
240         self.assertTrue(len(ldb_res) == 1)
241         self.assertFalse("member" in ldb_res[0])
242
243         # delete group on DC1
244         self.ldb_dc1.delete(group2_dn)
245
246         # trigger replication from DC1 to DC2
247         # to check if deleted object is replicated
248         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
249
250         # check group info on DC1 - should be deleted
251         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=group2_orig, is_deleted=True)
252         # check group info on DC2 - should be deleted
253         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=group2_orig, is_deleted=True)
254
255     def test_ReplicateDeletedObject2(self):
256         """Verifies how a deleted-object is replicated between two DCs.
257            This test should verify that:
258             - deleted-object is replicated properly
259             - We verify that after replication,
260               object's state to conform to a tombstone-object state
261             - This test replicates the delete to the server with the
262               object modifications first
263
264            TODO:  It will also be great if check replPropertyMetaData.
265            TODO:  Check for deleted-object state, depending on DC's features
266                   when recycle-bin is enabled
267            """
268         # work-out unique username to test with
269         username = self._make_username()
270
271         # create user on DC1
272         self.ldb_dc1.newuser(username=username, password="P@sswOrd!")
273         ldb_res = self.ldb_dc1.search(base=self.domain_dn,
274                                       scope=SCOPE_SUBTREE,
275                                       expression="(samAccountName=%s)" % username)
276         self.assertEquals(len(ldb_res), 1)
277         user_orig = ldb_res[0]
278         user_dn   = ldb_res[0]["dn"]
279
280         # check user info on DC1
281         print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
282         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
283
284         # trigger replication from DC1 to DC2
285         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
286
287         # delete user on DC1
288         self.ldb_dc1.delete(user_dn)
289         # check user info on DC1 - should be deleted
290         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
291         # check user info on DC2 - should be valid user
292         user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
293
294         # The user should not have a description or memberOf yet
295         self.assertFalse("description" in user_cur)
296         self.assertFalse("memberOf" in user_cur)
297
298         self.ldb_dc2.newgroup("group_%s" % username)
299
300         self.ldb_dc2.newgroup("group2_%s" % username)
301
302         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
303                                       scope=SCOPE_SUBTREE,
304                                       expression="(samAccountName=group_%s)" % username)
305         self.assertTrue(len(ldb_res) == 1)
306         self.assertTrue("sAMAccountName" in ldb_res[0])
307         group_orig = ldb_res[0]
308         group_dn = ldb_res[0]["dn"]
309
310         # modify user on DC2 to have a description and be a member of the group
311         m = ldb.Message()
312         m.dn = user_dn
313         m["description"] = ldb.MessageElement("a description",
314                                               ldb.FLAG_MOD_ADD, "description")
315         self.ldb_dc2.modify(m)
316         m = ldb.Message()
317         m.dn = group_dn
318         m["member"] = ldb.MessageElement(str(user_dn),
319                                          ldb.FLAG_MOD_ADD, "member")
320         self.ldb_dc2.modify(m)
321
322         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
323                                       scope=SCOPE_SUBTREE,
324                                       expression="(samAccountName=group2_%s)" % username)
325         self.assertTrue(len(ldb_res) == 1)
326         self.assertTrue("sAMAccountName" in ldb_res[0])
327         group2_dn = ldb_res[0]["dn"]
328         group2_orig = ldb_res[0]
329
330         m = ldb.Message()
331         m.dn = group2_dn
332         m["member"] = ldb.MessageElement(str(group_dn),
333                                          ldb.FLAG_MOD_ADD, "member")
334         self.ldb_dc2.modify(m)
335
336         # check user info on DC2 - should be valid user
337         user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
338
339         # The user should not have a description yet
340         self.assertTrue("description" in user_cur)
341         self.assertTrue("memberOf" in user_cur)
342
343         # trigger replication from DC1 to DC2
344         # to check if deleted object gets restored
345         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
346         # check user info on DC1 - should be deleted
347         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
348         # check user info on DC2 - should be deleted
349         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
350
351         ldb_res = self.ldb_dc2.search(base=self.domain_dn,
352                                       scope=SCOPE_SUBTREE,
353                                       expression="(samAccountName=group_%s)" % username)
354         self.assertTrue(len(ldb_res) == 1)
355         self.assertTrue("memberOf" in ldb_res[0])
356         self.assertFalse("member" in ldb_res[0])
357
358         # trigger replication from DC2 to DC1
359         # to check if deleted object is replicated
360         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
361         # check user info on DC1 - should be deleted
362         self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
363         # check user info on DC2 - should be deleted
364         self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
365
366         ldb_res = self.ldb_dc1.search(base=self.domain_dn,
367                                       scope=SCOPE_SUBTREE,
368                                       expression="(samAccountName=group_%s)" % username)
369         self.assertTrue(len(ldb_res) == 1)
370         self.assertTrue("memberOf" in ldb_res[0])
371         self.assertFalse("member" in ldb_res[0])
372
373         # delete group on DC1
374         self.ldb_dc1.delete(group_dn)
375         self.ldb_dc1.delete(group2_dn)
376
377         # trigger replication from DC1 to DC2, for cleanup
378         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
379