s4-dsdb/tests: Assert on expected set of attributes for restored objects
[kai/samba-autobuild/.git] / source4 / dsdb / tests / python / tombstone_reanimation.py
1 #!/usr/bin/env python
2 #
3 # Tombstone reanimation tests
4 #
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21 import sys
22 import unittest
23
24 sys.path.insert(0, "bin/python")
25 import samba
26
27 import samba.tests
28 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
29                  MessageElement, LdbError,
30                  ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
31                  ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
32
33
34 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
35     """ verify Samba restores required attributes when
36         user restores a Deleted object
37     """
38
39     def setUp(self):
40         super(RestoredObjectAttributesBaseTestCase, self).setUp()
41         self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
42         self.base_dn = self.samdb.domain_dn()
43         self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
44         self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
45         # Get the old "dSHeuristics" if it was set
46         self.dsheuristics = self.samdb.get_dsheuristics()
47         # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
48         self.samdb.set_dsheuristics("000000001")
49         # Get the old "minPwdAge"
50         self.minPwdAge = self.samdb.get_minPwdAge()
51         # Set it temporary to "0"
52         self.samdb.set_minPwdAge("0")
53
54     def tearDown(self):
55         super(RestoredObjectAttributesBaseTestCase, self).tearDown()
56         # Reset the "dSHeuristics" as they were before
57         self.samdb.set_dsheuristics(self.dsheuristics)
58         # Reset the "minPwdAge" as it was before
59         self.samdb.set_minPwdAge(self.minPwdAge)
60
61     def GUID_string(self, guid):
62         return self.samdb.schema_format_value("objectGUID", guid)
63
64     def search_guid(self, guid):
65         res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
66                                 scope=SCOPE_BASE, controls=["show_deleted:1"])
67         self.assertEquals(len(res), 1)
68         return res[0]
69
70     def search_dn(self, dn):
71         res = self.samdb.search(expression="(objectClass=*)",
72                                 base=dn,
73                                 scope=SCOPE_BASE,
74                                 controls=["show_recycled:1"])
75         self.assertEquals(len(res), 1)
76         return res[0]
77
78     def _create_object(self, msg):
79         """:param msg: dict with dn and attributes to create an object from"""
80         # delete an object if leftover from previous test
81         samba.tests.delete_force(self.samdb, msg['dn'])
82         self.samdb.add(msg)
83         return self.search_dn(msg['dn'])
84
85     def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
86         self.assertEqual(attrs_orig, attrs_rest, "Actual object does not has expected attributes")
87         # remove volatile attributes, they can't be equal
88         attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
89         for attr in attrs_orig:
90             # convert original attr value to ldif
91             orig_val = obj_orig.get(attr)
92             if orig_val is None:
93                 continue
94             if not isinstance(orig_val, MessageElement):
95                 orig_val = MessageElement(str(orig_val), 0, attr    )
96             m = Message()
97             m.add(orig_val)
98             orig_ldif = self.samdb.write_ldif(m, 0)
99             # convert restored attr value to ldif
100             rest_val = obj_restored.get(attr)
101             self.assertFalse(rest_val is None)
102             m = Message()
103             if not isinstance(rest_val, MessageElement):
104                 rest_val = MessageElement(str(rest_val), 0, attr)
105             m.add(rest_val)
106             rest_ldif = self.samdb.write_ldif(m, 0)
107             # compare generated ldif's
108             self.assertEqual(orig_ldif.lower(), rest_ldif.lower())
109
110     def assertAttributesExists(self, attr_expected, obj_msg):
111         """Check object contains at least expected attrbigutes
112         :param attr_expected: dict of expected attributes with values. ** is any value
113         :param obj_msg: Ldb.Message for the object under test
114         """
115         actual_names = set(obj_msg.keys())
116         # Samba does not use 'dSCorePropagationData', so skip it
117         actual_names -= set(['dSCorePropagationData'])
118         self.assertEqual(set(attr_expected.keys()), actual_names, "Actual object does not has expected attributes")
119         for name in attr_expected.keys():
120             expected_val = attr_expected[name]
121             actual_val = obj_msg.get(name)
122             self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
123             if expected_val == "**":
124                 # "**" values means "any"
125                 continue
126             self.assertEqual(expected_val.lower(), str(actual_val).lower(),
127                              "Unexpected value for '%s'" % name)
128
129     @staticmethod
130     def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
131         """Restores a deleted object
132         :param samdb: SamDB connection to SAM
133         :param del_dn: str Deleted object DN
134         :param new_dn: str Where to restore the object
135         :param new_attrs: dict Additional attributes to set
136         """
137         msg = Message()
138         msg.dn = Dn(samdb, str(del_dn))
139         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
140         msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
141         if new_attrs is not None:
142             assert isinstance(new_attrs, dict)
143             for attr in new_attrs:
144                 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
145         samdb.modify(msg, ["show_deleted:1"])
146
147
148 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
149     def setUp(self):
150         super(BaseRestoreObjectTestCase, self).setUp()
151
152     def enable_recycle_bin(self):
153         msg = Message()
154         msg.dn = Dn(self.samdb, "")
155         msg["enableOptionalFeature"] = MessageElement(
156             "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
157             FLAG_MOD_ADD, "enableOptionalFeature")
158         try:
159             self.samdb.modify(msg)
160         except LdbError, (num, _):
161             self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
162
163     def test_undelete(self):
164         print "Testing standard undelete operation"
165         usr1 = "cn=testuser,cn=users," + self.base_dn
166         samba.tests.delete_force(self.samdb, usr1)
167         self.samdb.add({
168             "dn": usr1,
169             "objectclass": "user",
170             "description": "test user description",
171             "samaccountname": "testuser"})
172         objLive1 = self.search_dn(usr1)
173         guid1 = objLive1["objectGUID"][0]
174         self.samdb.delete(usr1)
175         objDeleted1 = self.search_guid(guid1)
176         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
177         objLive2 = self.search_dn(usr1)
178         self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
179         samba.tests.delete_force(self.samdb, usr1)
180
181     def test_rename(self):
182         print "Testing attempt to rename deleted object"
183         usr1 = "cn=testuser,cn=users," + self.base_dn
184         self.samdb.add({
185             "dn": usr1,
186             "objectclass": "user",
187             "description": "test user description",
188             "samaccountname": "testuser"})
189         objLive1 = self.search_dn(usr1)
190         guid1 = objLive1["objectGUID"][0]
191         self.samdb.delete(usr1)
192         objDeleted1 = self.search_guid(guid1)
193         # just to make sure we get the correct error if the show deleted is missing
194         try:
195             self.samdb.rename(str(objDeleted1.dn), usr1)
196             self.fail()
197         except LdbError, (num, _):
198             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
199
200         try:
201             self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
202             self.fail()
203         except LdbError, (num, _):
204             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
205
206     def test_undelete_with_mod(self):
207         print "Testing standard undelete operation with modification of additional attributes"
208         usr1 = "cn=testuser,cn=users," + self.base_dn
209         self.samdb.add({
210             "dn": usr1,
211             "objectclass": "user",
212             "description": "test user description",
213             "samaccountname": "testuser"})
214         objLive1 = self.search_dn(usr1)
215         guid1 = objLive1["objectGUID"][0]
216         self.samdb.delete(usr1)
217         objDeleted1 = self.search_guid(guid1)
218         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
219         objLive2 = self.search_dn(usr1)
220         self.assertEqual(objLive2["url"][0], "www.samba.org")
221         samba.tests.delete_force(self.samdb, usr1)
222
223     def test_undelete_newuser(self):
224         print "Testing undelete user with a different dn"
225         usr1 = "cn=testuser,cn=users," + self.base_dn
226         usr2 = "cn=testuser2,cn=users," + self.base_dn
227         samba.tests.delete_force(self.samdb, usr1)
228         self.samdb.add({
229             "dn": usr1,
230             "objectclass": "user",
231             "description": "test user description",
232             "samaccountname": "testuser"})
233         objLive1 = self.search_dn(usr1)
234         guid1 = objLive1["objectGUID"][0]
235         self.samdb.delete(usr1)
236         objDeleted1 = self.search_guid(guid1)
237         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
238         objLive2 = self.search_dn(usr2)
239         samba.tests.delete_force(self.samdb, usr1)
240         samba.tests.delete_force(self.samdb, usr2)
241
242     def test_undelete_existing(self):
243         print "Testing undelete user after a user with the same dn has been created"
244         usr1 = "cn=testuser,cn=users," + self.base_dn
245         self.samdb.add({
246             "dn": usr1,
247             "objectclass": "user",
248             "description": "test user description",
249             "samaccountname": "testuser"})
250         objLive1 = self.search_dn(usr1)
251         guid1 = objLive1["objectGUID"][0]
252         self.samdb.delete(usr1)
253         self.samdb.add({
254             "dn": usr1,
255             "objectclass": "user",
256             "description": "test user description",
257             "samaccountname": "testuser"})
258         objDeleted1 = self.search_guid(guid1)
259         try:
260             self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
261             self.fail()
262         except LdbError, (num, _):
263             self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
264
265     def test_undelete_cross_nc(self):
266         print "Cross NC undelete"
267         c1 = "cn=ldaptestcontainer," + self.base_dn
268         c2 = "cn=ldaptestcontainer2," + self.configuration_dn
269         c3 = "cn=ldaptestcontainer," + self.configuration_dn
270         c4 = "cn=ldaptestcontainer2," + self.base_dn
271         samba.tests.delete_force(self.samdb, c1)
272         samba.tests.delete_force(self.samdb, c2)
273         samba.tests.delete_force(self.samdb, c3)
274         samba.tests.delete_force(self.samdb, c4)
275         self.samdb.add({
276             "dn": c1,
277             "objectclass": "container"})
278         self.samdb.add({
279             "dn": c2,
280             "objectclass": "container"})
281         objLive1 = self.search_dn(c1)
282         objLive2 = self.search_dn(c2)
283         guid1 = objLive1["objectGUID"][0]
284         guid2 = objLive2["objectGUID"][0]
285         self.samdb.delete(c1)
286         self.samdb.delete(c2)
287         objDeleted1 = self.search_guid(guid1)
288         objDeleted2 = self.search_guid(guid2)
289         # try to undelete from base dn to config
290         try:
291             self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
292             self.fail()
293         except LdbError, (num, _):
294             self.assertEquals(num, ERR_OPERATIONS_ERROR)
295         #try to undelete from config to base dn
296         try:
297             self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
298             self.fail()
299         except LdbError, (num, _):
300             self.assertEquals(num, ERR_OPERATIONS_ERROR)
301         #assert undeletion will work in same nc
302         self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
303         self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
304
305
306 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
307     """Test cases for delete/reanimate user objects"""
308
309     def _expected_user_attributes(self, username, user_dn, category):
310         return {'dn': user_dn,
311                 'objectClass': '**',
312                 'cn': username,
313                 'distinguishedName': user_dn,
314                 'instanceType': '4',
315                 'whenCreated': '**',
316                 'whenChanged': '**',
317                 'uSNCreated': '**',
318                 'uSNChanged': '**',
319                 'name': username,
320                 'objectGUID': '**',
321                 'userAccountControl': '546',
322                 'badPwdCount': '0',
323                 'badPasswordTime': '0',
324                 'codePage': '0',
325                 'countryCode': '0',
326                 'lastLogon': '0',
327                 'lastLogoff': '0',
328                 'pwdLastSet': '0',
329                 'primaryGroupID': '513',
330                 'operatorCount': '0',
331                 'objectSid': '**',
332                 'adminCount': '0',
333                 'accountExpires': '9223372036854775807',
334                 'logonCount': '0',
335                 'sAMAccountName': username,
336                 'sAMAccountType': '805306368',
337                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
338                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
339                 }
340
341     def test_restore_user(self):
342         print "Test restored user attributes"
343         username = "restore_user"
344         usr_dn = "cn=%s,cn=users,%s" % (username, self.base_dn)
345         samba.tests.delete_force(self.samdb, usr_dn)
346         self.samdb.add({
347             "dn": usr_dn,
348             "objectClass": "user",
349             "sAMAccountName": username})
350         obj = self.search_dn(usr_dn)
351         guid = obj["objectGUID"][0]
352         self.samdb.delete(usr_dn)
353         obj_del = self.search_guid(guid)
354         # restore the user and fetch what's restored
355         self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
356         obj_restore = self.search_guid(guid)
357         # check original attributes and restored one are same
358         orig_attrs = set(obj.keys())
359         # windows restore more attributes that originally we have
360         orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
361         rest_attrs = set(obj_restore.keys())
362         self.assertEqual(orig_attrs, rest_attrs, "Actual object does not has expected attributes")
363         self.assertAttributesExists(self._expected_user_attributes(username, usr_dn, "Person"), obj_restore)
364
365
366 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
367     """Test different scenarios for delete/reanimate group objects"""
368
369     def _make_object_dn(self, name):
370         return "cn=%s,cn=users,%s" % (name, self.base_dn)
371
372     def _create_test_user(self, user_name):
373         user_dn = self._make_object_dn(user_name)
374         ldif = {
375             "dn": user_dn,
376             "objectClass": "user",
377             "sAMAccountName": user_name,
378         }
379         # delete an object if leftover from previous test
380         samba.tests.delete_force(self.samdb, user_dn)
381         # finally, create the group
382         self.samdb.add(ldif)
383         return self.search_dn(user_dn)
384
385     def _create_test_group(self, group_name, members=None):
386         group_dn = self._make_object_dn(group_name)
387         ldif = {
388             "dn": group_dn,
389             "objectClass": "group",
390             "sAMAccountName": group_name,
391         }
392         try:
393             ldif["member"] = [str(usr_dn) for usr_dn in members]
394         except TypeError:
395             pass
396         # delete an object if leftover from previous test
397         samba.tests.delete_force(self.samdb, group_dn)
398         # finally, create the group
399         self.samdb.add(ldif)
400         return self.search_dn(group_dn)
401
402     def _expected_group_attributes(self, groupname, group_dn, category):
403         return {'dn': group_dn,
404                 'groupType': '-2147483646',
405                 'distinguishedName': group_dn,
406                 'sAMAccountName': groupname,
407                 'name': groupname,
408                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
409                 'objectClass': '**',
410                 'objectGUID': '**',
411                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
412                 'whenChanged': '**',
413                 'sAMAccountType': '268435456',
414                 'objectSid': '**',
415                 'whenCreated': '**',
416                 'uSNCreated': '**',
417                 'operatorCount': '0',
418                 'uSNChanged': '**',
419                 'instanceType': '4',
420                 'adminCount': '0',
421                 'cn': groupname }
422
423     def test_plain_group(self):
424         print "Test restored Group attributes"
425         # create test group
426         obj = self._create_test_group("r_group")
427         guid = obj["objectGUID"][0]
428         # delete the group
429         self.samdb.delete(str(obj.dn))
430         obj_del = self.search_guid(guid)
431         # restore the Group and fetch what's restored
432         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
433         obj_restore = self.search_guid(guid)
434         # check original attributes and restored one are same
435         attr_orig = set(obj.keys())
436         # windows restore more attributes that originally we have
437         attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
438         attr_rest = set(obj_restore.keys())
439         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
440         self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
441
442     def test_group_with_members(self):
443         print "Test restored Group with members attributes"
444         # create test group
445         usr1 = self._create_test_user("r_user_1")
446         usr2 = self._create_test_user("r_user_2")
447         obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
448         guid = obj["objectGUID"][0]
449         # delete the group
450         self.samdb.delete(str(obj.dn))
451         obj_del = self.search_guid(guid)
452         # restore the Group and fetch what's restored
453         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
454         obj_restore = self.search_guid(guid)
455         # check original attributes and restored one are same
456         attr_orig = set(obj.keys())
457         # windows restore more attributes that originally we have
458         attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
459         # and does not restore following attributes
460         attr_orig.remove("member")
461         attr_rest = set(obj_restore.keys())
462         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
463         self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
464
465
466 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
467     """Test different scenarios for delete/reanimate OU/container objects"""
468
469     def _expected_container_attributes(self, rdn, name, dn, category):
470         if rdn == 'ou':
471             lastKnownParent = '%s' % self.base_dn
472         else:
473             lastKnownParent = 'CN=Users,%s' % self.base_dn
474         return {'dn': dn,
475                 'distinguishedName': dn,
476                 'name': name,
477                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
478                 'objectClass': '**',
479                 'objectGUID': '**',
480                 'lastKnownParent': lastKnownParent,
481                 'whenChanged': '**',
482                 'whenCreated': '**',
483                 'uSNCreated': '**',
484                 'uSNChanged': '**',
485                 'instanceType': '4',
486                 rdn: name }
487
488     def _create_test_ou(self, rdn, name=None, description=None):
489         ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
490         # delete an object if leftover from previous test
491         samba.tests.delete_force(self.samdb, ou_dn)
492         # create ou and return created object
493         self.samdb.create_ou(ou_dn, name=name, description=description)
494         return self.search_dn(ou_dn)
495
496     def test_ou_with_name_description(self):
497         print "Test OU reanimation"
498         # create OU to test with
499         obj = self._create_test_ou(rdn="r_ou",
500                                    name="r_ou name",
501                                    description="r_ou description")
502         guid = obj["objectGUID"][0]
503         # delete the object
504         self.samdb.delete(str(obj.dn))
505         obj_del = self.search_guid(guid)
506         # restore the Object and fetch what's restored
507         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
508         obj_restore = self.search_guid(guid)
509         # check original attributes and restored one are same
510         attr_orig = set(obj.keys())
511         attr_rest = set(obj_restore.keys())
512         # windows restore more attributes that originally we have
513         attr_orig.update(["lastKnownParent"])
514         # and does not restore following attributes
515         attr_orig -= set(["description"])
516         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
517         expected_attrs = self._expected_container_attributes("ou", "r_ou", str(obj.dn), "Organizational-Unit")
518         self.assertAttributesExists(expected_attrs, obj_restore)
519
520     def test_container(self):
521         print "Test Container reanimation"
522         # create test Container
523         obj = self._create_object({
524             "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
525             "objectClass": "container"
526         })
527         guid = obj["objectGUID"][0]
528         # delete the object
529         self.samdb.delete(str(obj.dn))
530         obj_del = self.search_guid(guid)
531         # restore the Object and fetch what's restored
532         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
533         obj_restore = self.search_guid(guid)
534         # check original attributes and restored one are same
535         attr_orig = set(obj.keys())
536         attr_rest = set(obj_restore.keys())
537         # windows restore more attributes that originally we have
538         attr_orig.update(["lastKnownParent"])
539         # and does not restore following attributes
540         attr_orig -= set(["showInAdvancedViewOnly"])
541         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
542         expected_attrs = self._expected_container_attributes("cn", "r_container",
543                                                              str(obj.dn), "container")
544         self.assertAttributesExists(expected_attrs, obj_restore)
545
546
547 if __name__ == '__main__':
548     unittest.main()