2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # export DC1=dc1_dns_name
24 # export DC2=dc2_dns_name
25 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
26 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN delete_object -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
39 class DrsDeleteObjectTestCase(drs_base.DrsBaseTestCase):
42 super(DrsDeleteObjectTestCase, self).setUp()
43 # disable automatic replication temporary
44 self._disable_all_repl(self.dnsname_dc1)
45 self._disable_all_repl(self.dnsname_dc2)
46 # make sure DCs are synchronized before the test
47 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
48 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
51 self._enable_all_repl(self.dnsname_dc1)
52 self._enable_all_repl(self.dnsname_dc2)
53 super(DrsDeleteObjectTestCase, self).tearDown()
55 def _make_username(self):
56 return "DrsDelObjUser_" + time.strftime("%s", time.gmtime())
58 # now also used to check the group
59 def _check_obj(self, sam_ldb, obj_orig, is_deleted):
60 # search the user by guid as it may be deleted
61 guid_str = self._GUID_string(obj_orig["objectGUID"][0])
62 expression = "(objectGUID=%s)" % guid_str
63 res = sam_ldb.search(base=self.domain_dn,
64 expression=expression,
65 controls=["show_deleted:1"])
66 self.assertEquals(len(res), 1)
68 # Deleted Object base DN
69 dodn = self._deleted_objects_dn(sam_ldb)
70 # now check properties of the user
71 cn_orig = obj_orig["cn"][0]
72 cn_cur = user_cur["cn"][0]
73 name_orig = obj_orig["name"][0]
74 name_cur = user_cur["name"][0]
76 self.assertEquals(user_cur["isDeleted"][0],"TRUE")
77 self.assertFalse("objectCategory" in user_cur)
78 self.assertFalse("sAMAccountType" in user_cur)
79 self.assertFalse("description" in user_cur)
80 self.assertFalse("memberOf" in user_cur)
81 self.assertFalse("member" in user_cur)
82 self.assertTrue(dodn in str(user_cur["dn"]),
83 "User %s is deleted but it is not located under %s (found at %s)!" % (name_orig, dodn, user_cur["dn"]))
84 self.assertEquals(name_cur, name_orig + "\nDEL:" + guid_str)
85 self.assertEquals(name_cur, user_cur.dn.get_rdn_value())
86 self.assertEquals(cn_cur, cn_orig + "\nDEL:" + guid_str)
87 self.assertEquals(name_cur, cn_cur)
89 self.assertFalse("isDeleted" in user_cur)
90 self.assertEquals(name_cur, name_orig)
91 self.assertEquals(name_cur, user_cur.dn.get_rdn_value())
92 self.assertEquals(cn_cur, cn_orig)
93 self.assertEquals(name_cur, cn_cur)
94 self.assertEquals(obj_orig["dn"], user_cur["dn"])
95 self.assertTrue(dodn not in str(user_cur["dn"]))
98 def test_ReplicateDeletedObject1(self):
99 """Verifies how a deleted-object is replicated between two DCs.
100 This test should verify that:
101 - deleted-object is replicated properly
102 - We verify that after replication,
103 object's state to conform to a tombstone-object state
104 - This test replicates the object modifications to
105 the server with the user deleted first
107 TODO: It will also be great if check replPropertyMetaData.
108 TODO: Check for deleted-object state, depending on DC's features
109 when recycle-bin is enabled
111 # work-out unique username to test with
112 username = self._make_username()
115 self.ldb_dc1.newuser(username=username, password="P@sswOrd!")
116 ldb_res = self.ldb_dc1.search(base=self.domain_dn,
118 expression="(samAccountName=%s)" % username)
119 self.assertEquals(len(ldb_res), 1)
120 user_orig = ldb_res[0]
121 user_dn = ldb_res[0]["dn"]
123 # check user info on DC1
124 print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
125 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
127 # trigger replication from DC1 to DC2
128 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
131 self.ldb_dc1.delete(user_dn)
132 # check user info on DC1 - should be deleted
133 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
134 # check user info on DC2 - should be valid user
135 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
137 # The user should not have a description or memberOf yet
138 self.assertFalse("description" in user_cur)
139 self.assertFalse("memberOf" in user_cur)
141 self.ldb_dc2.newgroup("group_%s" % username)
143 self.ldb_dc2.newgroup("group2_%s" % username)
145 ldb_res = self.ldb_dc2.search(base=self.domain_dn,
147 expression="(samAccountName=group_%s)" % username)
148 self.assertTrue(len(ldb_res) == 1)
149 self.assertTrue("sAMAccountName" in ldb_res[0])
150 group_orig = ldb_res[0]
151 group_dn = ldb_res[0]["dn"]
153 # modify user on DC2 to have a description and be a member of the group
156 m["description"] = ldb.MessageElement("a description",
157 ldb.FLAG_MOD_ADD, "description")
158 self.ldb_dc2.modify(m)
161 m["member"] = ldb.MessageElement(str(user_dn),
162 ldb.FLAG_MOD_ADD, "member")
163 self.ldb_dc2.modify(m)
165 ldb_res = self.ldb_dc2.search(base=self.domain_dn,
167 expression="(samAccountName=group2_%s)" % username)
168 self.assertTrue(len(ldb_res) == 1)
169 self.assertTrue("sAMAccountName" in ldb_res[0])
170 group2_dn = ldb_res[0]["dn"]
171 group2_orig = ldb_res[0]
175 m["member"] = ldb.MessageElement(str(group_dn),
176 ldb.FLAG_MOD_ADD, "member")
177 self.ldb_dc2.modify(m)
179 # check user info on DC2 - should be valid user
180 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
182 # The user should not have a description yet
183 self.assertTrue("description" in user_cur)
184 self.assertTrue("memberOf" in user_cur)
186 ldb_res = self.ldb_dc2.search(base=self.domain_dn,
188 expression="(samAccountName=group_%s)" % username)
189 self.assertTrue(len(ldb_res) == 1)
191 # This group is a member of another group
192 self.assertTrue("memberOf" in ldb_res[0])
194 # The user was deleted on DC1, but check the modify we just did on DC2
195 self.assertTrue("member" in ldb_res[0])
197 # trigger replication from DC2 to DC1
198 # to check if deleted object gets restored
199 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
200 # check user info on DC1 - should be deleted
201 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
202 # check user info on DC2 - should be valid user
203 self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
205 ldb_res = self.ldb_dc1.search(base=self.domain_dn,
207 expression="(samAccountName=group_%s)" % username)
208 self.assertTrue(len(ldb_res) == 1)
210 # This group is a member of another group
211 self.assertTrue("memberOf" in ldb_res[0])
213 # The user was deleted on DC1, but the modify we did on DC2, check it never replicated in
214 self.assertFalse("member" in ldb_res[0])
216 # trigger replication from DC1 to DC2
217 # to check if deleted object is replicated
218 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
219 # check user info on DC1 - should be deleted
220 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
221 # check user info on DC2 - should be deleted
222 self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
224 # delete group on DC1
225 self.ldb_dc1.delete(group_dn)
227 # trigger replication from DC1 to DC2
228 # to check if deleted object is replicated
229 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
231 # check group info on DC1 - should be deleted
232 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=group_orig, is_deleted=True)
233 # check group info on DC2 - should be deleted
234 self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=group_orig, is_deleted=True)
236 ldb_res = self.ldb_dc2.search(base=self.domain_dn,
238 expression="(samAccountName=group2_%s)" % username)
239 self.assertTrue(len(ldb_res) == 1)
240 self.assertFalse("member" in ldb_res[0])
242 # delete group on DC1
243 self.ldb_dc1.delete(group2_dn)
245 # trigger replication from DC1 to DC2
246 # to check if deleted object is replicated
247 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
249 # check group info on DC1 - should be deleted
250 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=group2_orig, is_deleted=True)
251 # check group info on DC2 - should be deleted
252 self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=group2_orig, is_deleted=True)
254 def test_ReplicateDeletedObject2(self):
255 """Verifies how a deleted-object is replicated between two DCs.
256 This test should verify that:
257 - deleted-object is replicated properly
258 - We verify that after replication,
259 object's state to conform to a tombstone-object state
260 - This test replicates the delete to the server with the
261 object modifications first
263 TODO: It will also be great if check replPropertyMetaData.
264 TODO: Check for deleted-object state, depending on DC's features
265 when recycle-bin is enabled
267 # work-out unique username to test with
268 username = self._make_username()
271 self.ldb_dc1.newuser(username=username, password="P@sswOrd!")
272 ldb_res = self.ldb_dc1.search(base=self.domain_dn,
274 expression="(samAccountName=%s)" % username)
275 self.assertEquals(len(ldb_res), 1)
276 user_orig = ldb_res[0]
277 user_dn = ldb_res[0]["dn"]
279 # check user info on DC1
280 print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
281 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
283 # trigger replication from DC1 to DC2
284 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
287 self.ldb_dc1.delete(user_dn)
288 # check user info on DC1 - should be deleted
289 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
290 # check user info on DC2 - should be valid user
291 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
293 # The user should not have a description or memberOf yet
294 self.assertFalse("description" in user_cur)
295 self.assertFalse("memberOf" in user_cur)
297 self.ldb_dc2.newgroup("group_%s" % username)
299 self.ldb_dc2.newgroup("group2_%s" % username)
301 ldb_res = self.ldb_dc2.search(base=self.domain_dn,
303 expression="(samAccountName=group_%s)" % username)
304 self.assertTrue(len(ldb_res) == 1)
305 self.assertTrue("sAMAccountName" in ldb_res[0])
306 group_orig = ldb_res[0]
307 group_dn = ldb_res[0]["dn"]
309 # modify user on DC2 to have a description and be a member of the group
312 m["description"] = ldb.MessageElement("a description",
313 ldb.FLAG_MOD_ADD, "description")
314 self.ldb_dc2.modify(m)
317 m["member"] = ldb.MessageElement(str(user_dn),
318 ldb.FLAG_MOD_ADD, "member")
319 self.ldb_dc2.modify(m)
321 ldb_res = self.ldb_dc2.search(base=self.domain_dn,
323 expression="(samAccountName=group2_%s)" % username)
324 self.assertTrue(len(ldb_res) == 1)
325 self.assertTrue("sAMAccountName" in ldb_res[0])
326 group2_dn = ldb_res[0]["dn"]
327 group2_orig = ldb_res[0]
331 m["member"] = ldb.MessageElement(str(group_dn),
332 ldb.FLAG_MOD_ADD, "member")
333 self.ldb_dc2.modify(m)
335 # check user info on DC2 - should be valid user
336 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
338 # The user should not have a description yet
339 self.assertTrue("description" in user_cur)
340 self.assertTrue("memberOf" in user_cur)
342 # trigger replication from DC1 to DC2
343 # to check if deleted object gets restored
344 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
345 # check user info on DC1 - should be deleted
346 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
347 # check user info on DC2 - should be deleted
348 self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
350 ldb_res = self.ldb_dc2.search(base=self.domain_dn,
352 expression="(samAccountName=group_%s)" % username)
353 self.assertTrue(len(ldb_res) == 1)
354 self.assertTrue("memberOf" in ldb_res[0])
355 self.assertFalse("member" in ldb_res[0])
357 # trigger replication from DC2 to DC1
358 # to check if deleted object is replicated
359 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
360 # check user info on DC1 - should be deleted
361 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
362 # check user info on DC2 - should be deleted
363 self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
365 ldb_res = self.ldb_dc1.search(base=self.domain_dn,
367 expression="(samAccountName=group_%s)" % username)
368 self.assertTrue(len(ldb_res) == 1)
369 self.assertTrue("memberOf" in ldb_res[0])
370 self.assertFalse("member" in ldb_res[0])
372 # delete group on DC1
373 self.ldb_dc1.delete(group_dn)
374 self.ldb_dc1.delete(group2_dn)
376 # trigger replication from DC1 to DC2, for cleanup
377 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)