c049661612d3358302e896cafa9b4c69a07cf688
[kai/samba-autobuild/.git] / source4 / dsdb / tests / python / tombstone_reanimation.py
1 #!/usr/bin/env python
2 #
3 # Tombstone reanimation tests
4 #
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21 import sys
22 import unittest
23
24 sys.path.insert(0, "bin/python")
25 import samba
26
27 import samba.tests
28 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
29                  MessageElement, LdbError,
30                  ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
31                  ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
32
33
34 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
35     """ verify Samba restores required attributes when
36         user restores a Deleted object
37     """
38
39     def setUp(self):
40         super(RestoredObjectAttributesBaseTestCase, self).setUp()
41         self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
42         self.base_dn = self.samdb.domain_dn()
43         self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
44         self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
45         # Get the old "dSHeuristics" if it was set
46         self.dsheuristics = self.samdb.get_dsheuristics()
47         # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
48         self.samdb.set_dsheuristics("000000001")
49         # Get the old "minPwdAge"
50         self.minPwdAge = self.samdb.get_minPwdAge()
51         # Set it temporary to "0"
52         self.samdb.set_minPwdAge("0")
53
54     def tearDown(self):
55         super(RestoredObjectAttributesBaseTestCase, self).tearDown()
56         # Reset the "dSHeuristics" as they were before
57         self.samdb.set_dsheuristics(self.dsheuristics)
58         # Reset the "minPwdAge" as it was before
59         self.samdb.set_minPwdAge(self.minPwdAge)
60
61     def GUID_string(self, guid):
62         return self.samdb.schema_format_value("objectGUID", guid)
63
64     def search_guid(self, guid):
65         res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
66                                 scope=SCOPE_BASE, controls=["show_deleted:1"])
67         self.assertEquals(len(res), 1)
68         return res[0]
69
70     def search_dn(self, dn):
71         res = self.samdb.search(expression="(objectClass=*)",
72                                 base=dn,
73                                 scope=SCOPE_BASE,
74                                 controls=["show_recycled:1"])
75         self.assertEquals(len(res), 1)
76         return res[0]
77
78     def _create_object(self, msg):
79         """:param msg: dict with dn and attributes to create an object from"""
80         # delete an object if leftover from previous test
81         samba.tests.delete_force(self.samdb, msg['dn'])
82         self.samdb.add(msg)
83         return self.search_dn(msg['dn'])
84
85     def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
86         self.assertSetEqual(attrs_orig, attrs_rest)
87         # remove volatile attributes, they can't be equal
88         attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
89         for attr in attrs_orig:
90             # convert original attr value to ldif
91             orig_val = obj_orig.get(attr)
92             if orig_val is None:
93                 continue
94             if not isinstance(orig_val, MessageElement):
95                 orig_val = MessageElement(str(orig_val), 0, attr    )
96             m = Message()
97             m.add(orig_val)
98             orig_ldif = self.samdb.write_ldif(m, 0)
99             # convert restored attr value to ldif
100             rest_val = obj_restored.get(attr)
101             self.assertIsNotNone(rest_val)
102             m = Message()
103             if not isinstance(rest_val, MessageElement):
104                 rest_val = MessageElement(str(rest_val), 0, attr)
105             m.add(rest_val)
106             rest_ldif = self.samdb.write_ldif(m, 0)
107             # compare generated ldif's
108             self.assertEqual(orig_ldif.lower(), rest_ldif.lower())
109
110     @staticmethod
111     def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
112         """Restores a deleted object
113         :param samdb: SamDB connection to SAM
114         :param del_dn: str Deleted object DN
115         :param new_dn: str Where to restore the object
116         :param new_attrs: dict Additional attributes to set
117         """
118         msg = Message()
119         msg.dn = Dn(samdb, str(del_dn))
120         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
121         msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
122         if new_attrs is not None:
123             assert isinstance(new_attrs, dict)
124             for attr in new_attrs:
125                 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
126         samdb.modify(msg, ["show_deleted:1"])
127
128
129 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
130     def setUp(self):
131         super(BaseRestoreObjectTestCase, self).setUp()
132
133     def enable_recycle_bin(self):
134         msg = Message()
135         msg.dn = Dn(self.samdb, "")
136         msg["enableOptionalFeature"] = MessageElement(
137             "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
138             FLAG_MOD_ADD, "enableOptionalFeature")
139         try:
140             self.samdb.modify(msg)
141         except LdbError, (num, _):
142             self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
143
144     def test_undelete(self):
145         print "Testing standard undelete operation"
146         usr1 = "cn=testuser,cn=users," + self.base_dn
147         samba.tests.delete_force(self.samdb, usr1)
148         self.samdb.add({
149             "dn": usr1,
150             "objectclass": "user",
151             "description": "test user description",
152             "samaccountname": "testuser"})
153         objLive1 = self.search_dn(usr1)
154         guid1 = objLive1["objectGUID"][0]
155         self.samdb.delete(usr1)
156         objDeleted1 = self.search_guid(guid1)
157         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
158         objLive2 = self.search_dn(usr1)
159         self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
160         samba.tests.delete_force(self.samdb, usr1)
161
162     def test_rename(self):
163         print "Testing attempt to rename deleted object"
164         usr1 = "cn=testuser,cn=users," + self.base_dn
165         self.samdb.add({
166             "dn": usr1,
167             "objectclass": "user",
168             "description": "test user description",
169             "samaccountname": "testuser"})
170         objLive1 = self.search_dn(usr1)
171         guid1 = objLive1["objectGUID"][0]
172         self.samdb.delete(usr1)
173         objDeleted1 = self.search_guid(guid1)
174         # just to make sure we get the correct error if the show deleted is missing
175         try:
176             self.samdb.rename(str(objDeleted1.dn), usr1)
177             self.fail()
178         except LdbError, (num, _):
179             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
180
181         try:
182             self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
183             self.fail()
184         except LdbError, (num, _):
185             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
186
187     def test_undelete_with_mod(self):
188         print "Testing standard undelete operation with modification of additional attributes"
189         usr1 = "cn=testuser,cn=users," + self.base_dn
190         self.samdb.add({
191             "dn": usr1,
192             "objectclass": "user",
193             "description": "test user description",
194             "samaccountname": "testuser"})
195         objLive1 = self.search_dn(usr1)
196         guid1 = objLive1["objectGUID"][0]
197         self.samdb.delete(usr1)
198         objDeleted1 = self.search_guid(guid1)
199         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
200         objLive2 = self.search_dn(usr1)
201         self.assertEqual(objLive2["url"][0], "www.samba.org")
202         samba.tests.delete_force(self.samdb, usr1)
203
204     def test_undelete_newuser(self):
205         print "Testing undelete user with a different dn"
206         usr1 = "cn=testuser,cn=users," + self.base_dn
207         usr2 = "cn=testuser2,cn=users," + self.base_dn
208         samba.tests.delete_force(self.samdb, usr1)
209         self.samdb.add({
210             "dn": usr1,
211             "objectclass": "user",
212             "description": "test user description",
213             "samaccountname": "testuser"})
214         objLive1 = self.search_dn(usr1)
215         guid1 = objLive1["objectGUID"][0]
216         self.samdb.delete(usr1)
217         objDeleted1 = self.search_guid(guid1)
218         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
219         objLive2 = self.search_dn(usr2)
220         samba.tests.delete_force(self.samdb, usr1)
221         samba.tests.delete_force(self.samdb, usr2)
222
223     def test_undelete_existing(self):
224         print "Testing undelete user after a user with the same dn has been created"
225         usr1 = "cn=testuser,cn=users," + self.base_dn
226         self.samdb.add({
227             "dn": usr1,
228             "objectclass": "user",
229             "description": "test user description",
230             "samaccountname": "testuser"})
231         objLive1 = self.search_dn(usr1)
232         guid1 = objLive1["objectGUID"][0]
233         self.samdb.delete(usr1)
234         self.samdb.add({
235             "dn": usr1,
236             "objectclass": "user",
237             "description": "test user description",
238             "samaccountname": "testuser"})
239         objDeleted1 = self.search_guid(guid1)
240         try:
241             self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
242             self.fail()
243         except LdbError, (num, _):
244             self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
245
246     def test_undelete_cross_nc(self):
247         print "Cross NC undelete"
248         c1 = "cn=ldaptestcontainer," + self.base_dn
249         c2 = "cn=ldaptestcontainer2," + self.configuration_dn
250         c3 = "cn=ldaptestcontainer," + self.configuration_dn
251         c4 = "cn=ldaptestcontainer2," + self.base_dn
252         samba.tests.delete_force(self.samdb, c1)
253         samba.tests.delete_force(self.samdb, c2)
254         samba.tests.delete_force(self.samdb, c3)
255         samba.tests.delete_force(self.samdb, c4)
256         self.samdb.add({
257             "dn": c1,
258             "objectclass": "container"})
259         self.samdb.add({
260             "dn": c2,
261             "objectclass": "container"})
262         objLive1 = self.search_dn(c1)
263         objLive2 = self.search_dn(c2)
264         guid1 = objLive1["objectGUID"][0]
265         guid2 = objLive2["objectGUID"][0]
266         self.samdb.delete(c1)
267         self.samdb.delete(c2)
268         objDeleted1 = self.search_guid(guid1)
269         objDeleted2 = self.search_guid(guid2)
270         # try to undelete from base dn to config
271         try:
272             self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
273             self.fail()
274         except LdbError, (num, _):
275             self.assertEquals(num, ERR_OPERATIONS_ERROR)
276         #try to undelete from config to base dn
277         try:
278             self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
279             self.fail()
280         except LdbError, (num, _):
281             self.assertEquals(num, ERR_OPERATIONS_ERROR)
282         #assert undeletion will work in same nc
283         self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
284         self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
285
286
287 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
288     """Test cases for delete/reanimate user objects"""
289
290     def test_restore_user(self):
291         print "Test restored user attributes"
292         username = "restore_user"
293         usr_dn = "cn=%s,cn=users,%s" % (username, self.base_dn)
294         samba.tests.delete_force(self.samdb, usr_dn)
295         self.samdb.add({
296             "dn": usr_dn,
297             "objectClass": "user",
298             "sAMAccountName": username})
299         obj = self.search_dn(usr_dn)
300         guid = obj["objectGUID"][0]
301         self.samdb.delete(usr_dn)
302         obj_del = self.search_guid(guid)
303         # restore the user and fetch what's restored
304         self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
305         obj_restore = self.search_guid(guid)
306         # check original attributes and restored one are same
307         orig_attrs = set(obj.keys())
308         # windows restore more attributes that originally we have
309         orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
310         rest_attrs = set(obj_restore.keys())
311         self.assertSetEqual(orig_attrs, rest_attrs)
312
313
314 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
315     """Test different scenarios for delete/reanimate group objects"""
316
317     def _make_object_dn(self, name):
318         return "cn=%s,cn=users,%s" % (name, self.base_dn)
319
320     def _create_test_user(self, user_name):
321         user_dn = self._make_object_dn(user_name)
322         ldif = {
323             "dn": user_dn,
324             "objectClass": "user",
325             "sAMAccountName": user_name,
326         }
327         # delete an object if leftover from previous test
328         samba.tests.delete_force(self.samdb, user_dn)
329         # finally, create the group
330         self.samdb.add(ldif)
331         return self.search_dn(user_dn)
332
333     def _create_test_group(self, group_name, members=None):
334         group_dn = self._make_object_dn(group_name)
335         ldif = {
336             "dn": group_dn,
337             "objectClass": "group",
338             "sAMAccountName": group_name,
339         }
340         try:
341             ldif["member"] = [str(usr_dn) for usr_dn in members]
342         except TypeError:
343             pass
344         # delete an object if leftover from previous test
345         samba.tests.delete_force(self.samdb, group_dn)
346         # finally, create the group
347         self.samdb.add(ldif)
348         return self.search_dn(group_dn)
349
350     def test_plain_group(self):
351         print "Test restored Group attributes"
352         # create test group
353         obj = self._create_test_group("r_group")
354         guid = obj["objectGUID"][0]
355         # delete the group
356         self.samdb.delete(str(obj.dn))
357         obj_del = self.search_guid(guid)
358         # restore the Group and fetch what's restored
359         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
360         obj_restore = self.search_guid(guid)
361         # check original attributes and restored one are same
362         attr_orig = set(obj.keys())
363         # windows restore more attributes that originally we have
364         attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
365         attr_rest = set(obj_restore.keys())
366         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
367
368     def test_group_with_members(self):
369         print "Test restored Group with members attributes"
370         # create test group
371         usr1 = self._create_test_user("r_user_1")
372         usr2 = self._create_test_user("r_user_2")
373         obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
374         guid = obj["objectGUID"][0]
375         # delete the group
376         self.samdb.delete(str(obj.dn))
377         obj_del = self.search_guid(guid)
378         # restore the Group and fetch what's restored
379         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
380         obj_restore = self.search_guid(guid)
381         # check original attributes and restored one are same
382         attr_orig = set(obj.keys())
383         # windows restore more attributes that originally we have
384         attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
385         # and does not restore following attributes
386         attr_orig.remove("member")
387         attr_rest = set(obj_restore.keys())
388         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
389
390
391 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
392     """Test different scenarios for delete/reanimate OU/container objects"""
393
394     def _create_test_ou(self, rdn, name=None, description=None):
395         ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
396         # delete an object if leftover from previous test
397         samba.tests.delete_force(self.samdb, ou_dn)
398         # create ou and return created object
399         self.samdb.create_ou(ou_dn, name=name, description=description)
400         return self.search_dn(ou_dn)
401
402     def test_ou_with_name_description(self):
403         print "Test OU reanimation"
404         # create OU to test with
405         obj = self._create_test_ou(rdn="r_ou",
406                                    name="r_ou name",
407                                    description="r_ou description")
408         guid = obj["objectGUID"][0]
409         # delete the object
410         self.samdb.delete(str(obj.dn))
411         obj_del = self.search_guid(guid)
412         # restore the Object and fetch what's restored
413         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
414         obj_restore = self.search_guid(guid)
415         # check original attributes and restored one are same
416         attr_orig = set(obj.keys())
417         attr_rest = set(obj_restore.keys())
418         # windows restore more attributes that originally we have
419         attr_orig.update(["lastKnownParent"])
420         # and does not restore following attributes
421         attr_orig -= {"description"}
422         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
423
424     def test_container(self):
425         print "Test Container reanimation"
426         # create test Container
427         obj = self._create_object({
428             "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
429             "objectClass": "container"
430         })
431         guid = obj["objectGUID"][0]
432         # delete the object
433         self.samdb.delete(str(obj.dn))
434         obj_del = self.search_guid(guid)
435         # restore the Object and fetch what's restored
436         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
437         obj_restore = self.search_guid(guid)
438         # check original attributes and restored one are same
439         attr_orig = set(obj.keys())
440         attr_rest = set(obj_restore.keys())
441         # windows restore more attributes that originally we have
442         attr_orig.update(["lastKnownParent"])
443         # and does not restore following attributes
444         attr_orig -= {"showInAdvancedViewOnly"}
445         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
446
447
448 if __name__ == '__main__':
449     unittest.main()