PEP8: fix E202: whitespace before ')'
[garming/samba-autobuild/.git] / source4 / dsdb / tests / python / tombstone_reanimation.py
1 #!/usr/bin/env python
2 #
3 # Tombstone reanimation tests
4 #
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21 from __future__ import print_function
22 import sys
23 import unittest
24
25 sys.path.insert(0, "bin/python")
26 import samba
27
28 from samba.ndr import ndr_unpack, ndr_print
29 from samba.dcerpc import misc
30 from samba.dcerpc import security
31 from samba.dcerpc import drsblobs
32 from samba.dcerpc.drsuapi import *
33 from samba.tests.password_test import PasswordCommon
34
35 import samba.tests
36 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
37                  MessageElement, LdbError,
38                  ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
39                  ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
40
41
42 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
43     """ verify Samba restores required attributes when
44         user restores a Deleted object
45     """
46
47     def setUp(self):
48         super(RestoredObjectAttributesBaseTestCase, self).setUp()
49         self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
50         self.base_dn = self.samdb.domain_dn()
51         self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
52         self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
53
54         # permit password changes during this test
55         PasswordCommon.allow_password_changes(self, self.samdb)
56
57     def tearDown(self):
58         super(RestoredObjectAttributesBaseTestCase, self).tearDown()
59
60     def GUID_string(self, guid):
61         return self.samdb.schema_format_value("objectGUID", guid)
62
63     def search_guid(self, guid, attrs=["*"]):
64         res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
65                                 scope=SCOPE_BASE, attrs=attrs,
66                                 controls=["show_deleted:1"])
67         self.assertEquals(len(res), 1)
68         return res[0]
69
70     def search_dn(self, dn):
71         res = self.samdb.search(expression="(objectClass=*)",
72                                 base=dn,
73                                 scope=SCOPE_BASE,
74                                 controls=["show_recycled:1"])
75         self.assertEquals(len(res), 1)
76         return res[0]
77
78     def _create_object(self, msg):
79         """:param msg: dict with dn and attributes to create an object from"""
80         # delete an object if leftover from previous test
81         samba.tests.delete_force(self.samdb, msg['dn'])
82         self.samdb.add(msg)
83         return self.search_dn(msg['dn'])
84
85     def assertNamesEqual(self, attrs_expected, attrs_extra):
86         self.assertEqual(attrs_expected, attrs_extra,
87                          "Actual object does not have expected attributes, missing from expected (%s), extra (%s)"
88                          % (str(attrs_expected.difference(attrs_extra)), str(attrs_extra.difference(attrs_expected))))
89
90     def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
91         self.assertNamesEqual(attrs_orig, attrs_rest)
92         # remove volatile attributes, they can't be equal
93         attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
94         for attr in attrs_orig:
95             # convert original attr value to ldif
96             orig_val = obj_orig.get(attr)
97             if orig_val is None:
98                 continue
99             if not isinstance(orig_val, MessageElement):
100                 orig_val = MessageElement(str(orig_val), 0, attr)
101             m = Message()
102             m.add(orig_val)
103             orig_ldif = self.samdb.write_ldif(m, 0)
104             # convert restored attr value to ldif
105             rest_val = obj_restored.get(attr)
106             self.assertFalse(rest_val is None)
107             m = Message()
108             if not isinstance(rest_val, MessageElement):
109                 rest_val = MessageElement(str(rest_val), 0, attr)
110             m.add(rest_val)
111             rest_ldif = self.samdb.write_ldif(m, 0)
112             # compare generated ldif's
113             self.assertEqual(orig_ldif, rest_ldif)
114
115     def assertAttributesExists(self, attr_expected, obj_msg):
116         """Check object contains at least expected attrbigutes
117         :param attr_expected: dict of expected attributes with values. ** is any value
118         :param obj_msg: Ldb.Message for the object under test
119         """
120         actual_names = set(obj_msg.keys())
121         # Samba does not use 'dSCorePropagationData', so skip it
122         actual_names -= set(['dSCorePropagationData'])
123         expected_names = set(attr_expected.keys())
124         self.assertNamesEqual(expected_names, actual_names)
125         for name in attr_expected.keys():
126             expected_val = attr_expected[name]
127             actual_val = obj_msg.get(name)
128             self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
129             if expected_val == "**":
130                 # "**" values means "any"
131                 continue
132             self.assertEqual(expected_val, str(actual_val),
133                              "Unexpected value (%s) for '%s', expected (%s)" % (
134                              str(actual_val), name, expected_val))
135
136     def _check_metadata(self, metadata, expected):
137         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(metadata[0]))
138
139         repl_array = []
140         for o in repl.ctr.array:
141             repl_array.append((o.attid, o.version))
142         repl_set = set(repl_array)
143
144         expected_set = set(expected)
145         self.assertEqual(len(repl_set), len(expected),
146                          "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % (
147                          str(expected_set.difference(repl_set)),
148                          str(repl_set.difference(expected_set)),
149                          ndr_print(repl)))
150
151         i = 0
152         for o in repl.ctr.array:
153             e = expected[i]
154             (attid, version) = e
155             self.assertEquals(attid, o.attid,
156                               "(LDAP) Wrong attid "
157                               "for expected value %d, wanted 0x%08x got 0x%08x, "
158                               "repl: \n%s"
159                               % (i, attid, o.attid, ndr_print(repl)))
160             # Allow version to be skipped when it does not matter
161             if version is not None:
162                 self.assertEquals(o.version, version,
163                                   "(LDAP) Wrong version for expected value %d, "
164                                   "attid 0x%08x, "
165                                   "wanted %d got %d, repl: \n%s"
166                                   % (i, o.attid,
167                                      version, o.version, ndr_print(repl)))
168             i = i + 1
169
170     @staticmethod
171     def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
172         """Restores a deleted object
173         :param samdb: SamDB connection to SAM
174         :param del_dn: str Deleted object DN
175         :param new_dn: str Where to restore the object
176         :param new_attrs: dict Additional attributes to set
177         """
178         msg = Message()
179         msg.dn = Dn(samdb, str(del_dn))
180         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
181         msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
182         if new_attrs is not None:
183             assert isinstance(new_attrs, dict)
184             for attr in new_attrs:
185                 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
186         samdb.modify(msg, ["show_deleted:1"])
187
188
189 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
190     def setUp(self):
191         super(BaseRestoreObjectTestCase, self).setUp()
192
193     def enable_recycle_bin(self):
194         msg = Message()
195         msg.dn = Dn(self.samdb, "")
196         msg["enableOptionalFeature"] = MessageElement(
197             "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
198             FLAG_MOD_ADD, "enableOptionalFeature")
199         try:
200             self.samdb.modify(msg)
201         except LdbError as e:
202             (num, _) = e.args
203             self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
204
205     def test_undelete(self):
206         print("Testing standard undelete operation")
207         usr1 = "cn=testuser,cn=users," + self.base_dn
208         samba.tests.delete_force(self.samdb, usr1)
209         self.samdb.add({
210             "dn": usr1,
211             "objectclass": "user",
212             "description": "test user description",
213             "samaccountname": "testuser"})
214         objLive1 = self.search_dn(usr1)
215         guid1 = objLive1["objectGUID"][0]
216         self.samdb.delete(usr1)
217         objDeleted1 = self.search_guid(guid1)
218         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
219         objLive2 = self.search_dn(usr1)
220         self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
221         samba.tests.delete_force(self.samdb, usr1)
222
223     def test_rename(self):
224         print("Testing attempt to rename deleted object")
225         usr1 = "cn=testuser,cn=users," + self.base_dn
226         self.samdb.add({
227             "dn": usr1,
228             "objectclass": "user",
229             "description": "test user description",
230             "samaccountname": "testuser"})
231         objLive1 = self.search_dn(usr1)
232         guid1 = objLive1["objectGUID"][0]
233         self.samdb.delete(usr1)
234         objDeleted1 = self.search_guid(guid1)
235         # just to make sure we get the correct error if the show deleted is missing
236         try:
237             self.samdb.rename(str(objDeleted1.dn), usr1)
238             self.fail()
239         except LdbError as e1:
240             (num, _) = e1.args
241             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
242
243         try:
244             self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
245             self.fail()
246         except LdbError as e2:
247             (num, _) = e2.args
248             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
249
250     def test_undelete_with_mod(self):
251         print("Testing standard undelete operation with modification of additional attributes")
252         usr1 = "cn=testuser,cn=users," + self.base_dn
253         self.samdb.add({
254             "dn": usr1,
255             "objectclass": "user",
256             "description": "test user description",
257             "samaccountname": "testuser"})
258         objLive1 = self.search_dn(usr1)
259         guid1 = objLive1["objectGUID"][0]
260         self.samdb.delete(usr1)
261         objDeleted1 = self.search_guid(guid1)
262         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
263         objLive2 = self.search_dn(usr1)
264         self.assertEqual(objLive2["url"][0], "www.samba.org")
265         samba.tests.delete_force(self.samdb, usr1)
266
267     def test_undelete_newuser(self):
268         print("Testing undelete user with a different dn")
269         usr1 = "cn=testuser,cn=users," + self.base_dn
270         usr2 = "cn=testuser2,cn=users," + self.base_dn
271         samba.tests.delete_force(self.samdb, usr1)
272         self.samdb.add({
273             "dn": usr1,
274             "objectclass": "user",
275             "description": "test user description",
276             "samaccountname": "testuser"})
277         objLive1 = self.search_dn(usr1)
278         guid1 = objLive1["objectGUID"][0]
279         self.samdb.delete(usr1)
280         objDeleted1 = self.search_guid(guid1)
281         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
282         objLive2 = self.search_dn(usr2)
283         samba.tests.delete_force(self.samdb, usr1)
284         samba.tests.delete_force(self.samdb, usr2)
285
286     def test_undelete_existing(self):
287         print("Testing undelete user after a user with the same dn has been created")
288         usr1 = "cn=testuser,cn=users," + self.base_dn
289         self.samdb.add({
290             "dn": usr1,
291             "objectclass": "user",
292             "description": "test user description",
293             "samaccountname": "testuser"})
294         objLive1 = self.search_dn(usr1)
295         guid1 = objLive1["objectGUID"][0]
296         self.samdb.delete(usr1)
297         self.samdb.add({
298             "dn": usr1,
299             "objectclass": "user",
300             "description": "test user description",
301             "samaccountname": "testuser"})
302         objDeleted1 = self.search_guid(guid1)
303         try:
304             self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
305             self.fail()
306         except LdbError as e3:
307             (num, _) = e3.args
308             self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
309
310     def test_undelete_cross_nc(self):
311         print("Cross NC undelete")
312         c1 = "cn=ldaptestcontainer," + self.base_dn
313         c2 = "cn=ldaptestcontainer2," + self.configuration_dn
314         c3 = "cn=ldaptestcontainer," + self.configuration_dn
315         c4 = "cn=ldaptestcontainer2," + self.base_dn
316         samba.tests.delete_force(self.samdb, c1)
317         samba.tests.delete_force(self.samdb, c2)
318         samba.tests.delete_force(self.samdb, c3)
319         samba.tests.delete_force(self.samdb, c4)
320         self.samdb.add({
321             "dn": c1,
322             "objectclass": "container"})
323         self.samdb.add({
324             "dn": c2,
325             "objectclass": "container"})
326         objLive1 = self.search_dn(c1)
327         objLive2 = self.search_dn(c2)
328         guid1 = objLive1["objectGUID"][0]
329         guid2 = objLive2["objectGUID"][0]
330         self.samdb.delete(c1)
331         self.samdb.delete(c2)
332         objDeleted1 = self.search_guid(guid1)
333         objDeleted2 = self.search_guid(guid2)
334         # try to undelete from base dn to config
335         try:
336             self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
337             self.fail()
338         except LdbError as e4:
339             (num, _) = e4.args
340             self.assertEquals(num, ERR_OPERATIONS_ERROR)
341         #try to undelete from config to base dn
342         try:
343             self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
344             self.fail()
345         except LdbError as e5:
346             (num, _) = e5.args
347             self.assertEquals(num, ERR_OPERATIONS_ERROR)
348         #assert undeletion will work in same nc
349         self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
350         self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
351
352
353 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
354     """Test cases for delete/reanimate user objects"""
355
356     def _expected_user_add_attributes(self, username, user_dn, category):
357         return {'dn': user_dn,
358                 'objectClass': '**',
359                 'cn': username,
360                 'distinguishedName': user_dn,
361                 'instanceType': '4',
362                 'whenCreated': '**',
363                 'whenChanged': '**',
364                 'uSNCreated': '**',
365                 'uSNChanged': '**',
366                 'name': username,
367                 'objectGUID': '**',
368                 'userAccountControl': '546',
369                 'badPwdCount': '0',
370                 'badPasswordTime': '0',
371                 'codePage': '0',
372                 'countryCode': '0',
373                 'lastLogon': '0',
374                 'lastLogoff': '0',
375                 'pwdLastSet': '0',
376                 'primaryGroupID': '513',
377                 'objectSid': '**',
378                 'accountExpires': '9223372036854775807',
379                 'logonCount': '0',
380                 'sAMAccountName': username,
381                 'sAMAccountType': '805306368',
382                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
383                 }
384
385     def _expected_user_add_metadata(self):
386         return [
387             (DRSUAPI_ATTID_objectClass, 1),
388             (DRSUAPI_ATTID_cn, 1),
389             (DRSUAPI_ATTID_instanceType, 1),
390             (DRSUAPI_ATTID_whenCreated, 1),
391             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
392             (DRSUAPI_ATTID_name, 1),
393             (DRSUAPI_ATTID_userAccountControl, None),
394             (DRSUAPI_ATTID_codePage, 1),
395             (DRSUAPI_ATTID_countryCode, 1),
396             (DRSUAPI_ATTID_dBCSPwd, 1),
397             (DRSUAPI_ATTID_logonHours, 1),
398             (DRSUAPI_ATTID_unicodePwd, 1),
399             (DRSUAPI_ATTID_ntPwdHistory, 1),
400             (DRSUAPI_ATTID_pwdLastSet, 1),
401             (DRSUAPI_ATTID_primaryGroupID, 1),
402             (DRSUAPI_ATTID_objectSid, 1),
403             (DRSUAPI_ATTID_accountExpires, 1),
404             (DRSUAPI_ATTID_lmPwdHistory, 1),
405             (DRSUAPI_ATTID_sAMAccountName, 1),
406             (DRSUAPI_ATTID_sAMAccountType, 1),
407             (DRSUAPI_ATTID_objectCategory, 1)]
408
409     def _expected_user_del_attributes(self, username, _guid, _sid):
410         guid = ndr_unpack(misc.GUID, _guid)
411         dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
412         cn = "%s\nDEL:%s" % (username, guid)
413         return {'dn': dn,
414                 'objectClass': '**',
415                 'cn': cn,
416                 'distinguishedName': dn,
417                 'isDeleted': 'TRUE',
418                 'isRecycled': 'TRUE',
419                 'instanceType': '4',
420                 'whenCreated': '**',
421                 'whenChanged': '**',
422                 'uSNCreated': '**',
423                 'uSNChanged': '**',
424                 'name': cn,
425                 'objectGUID': _guid,
426                 'userAccountControl': '546',
427                 'objectSid': _sid,
428                 'sAMAccountName': username,
429                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
430                 }
431
432     def _expected_user_del_metadata(self):
433         return [
434             (DRSUAPI_ATTID_objectClass, 1),
435             (DRSUAPI_ATTID_cn, 2),
436             (DRSUAPI_ATTID_instanceType, 1),
437             (DRSUAPI_ATTID_whenCreated, 1),
438             (DRSUAPI_ATTID_isDeleted, 1),
439             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
440             (DRSUAPI_ATTID_name, 2),
441             (DRSUAPI_ATTID_userAccountControl, None),
442             (DRSUAPI_ATTID_codePage, 2),
443             (DRSUAPI_ATTID_countryCode, 2),
444             (DRSUAPI_ATTID_dBCSPwd, 1),
445             (DRSUAPI_ATTID_logonHours, 1),
446             (DRSUAPI_ATTID_unicodePwd, 1),
447             (DRSUAPI_ATTID_ntPwdHistory, 1),
448             (DRSUAPI_ATTID_pwdLastSet, 2),
449             (DRSUAPI_ATTID_primaryGroupID, 2),
450             (DRSUAPI_ATTID_objectSid, 1),
451             (DRSUAPI_ATTID_accountExpires, 2),
452             (DRSUAPI_ATTID_lmPwdHistory, 1),
453             (DRSUAPI_ATTID_sAMAccountName, 1),
454             (DRSUAPI_ATTID_sAMAccountType, 2),
455             (DRSUAPI_ATTID_lastKnownParent, 1),
456             (DRSUAPI_ATTID_objectCategory, 2),
457             (DRSUAPI_ATTID_isRecycled, 1)]
458
459     def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category):
460         return {'dn': user_dn,
461                 'objectClass': '**',
462                 'cn': username,
463                 'distinguishedName': user_dn,
464                 'instanceType': '4',
465                 'whenCreated': '**',
466                 'whenChanged': '**',
467                 'uSNCreated': '**',
468                 'uSNChanged': '**',
469                 'name': username,
470                 'objectGUID': guid,
471                 'userAccountControl': '546',
472                 'badPwdCount': '0',
473                 'badPasswordTime': '0',
474                 'codePage': '0',
475                 'countryCode': '0',
476                 'lastLogon': '0',
477                 'lastLogoff': '0',
478                 'pwdLastSet': '0',
479                 'primaryGroupID': '513',
480                 'operatorCount': '0',
481                 'objectSid': sid,
482                 'adminCount': '0',
483                 'accountExpires': '0',
484                 'logonCount': '0',
485                 'sAMAccountName': username,
486                 'sAMAccountType': '805306368',
487                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
488                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
489                 }
490
491     def _expected_user_restore_metadata(self):
492         return [
493             (DRSUAPI_ATTID_objectClass, 1),
494             (DRSUAPI_ATTID_cn, 3),
495             (DRSUAPI_ATTID_instanceType, 1),
496             (DRSUAPI_ATTID_whenCreated, 1),
497             (DRSUAPI_ATTID_isDeleted, 2),
498             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
499             (DRSUAPI_ATTID_name, 3),
500             (DRSUAPI_ATTID_userAccountControl, None),
501             (DRSUAPI_ATTID_codePage, 3),
502             (DRSUAPI_ATTID_countryCode, 3),
503             (DRSUAPI_ATTID_dBCSPwd, 1),
504             (DRSUAPI_ATTID_logonHours, 1),
505             (DRSUAPI_ATTID_unicodePwd, 1),
506             (DRSUAPI_ATTID_ntPwdHistory, 1),
507             (DRSUAPI_ATTID_pwdLastSet, 3),
508             (DRSUAPI_ATTID_primaryGroupID, 3),
509             (DRSUAPI_ATTID_operatorCount, 1),
510             (DRSUAPI_ATTID_objectSid, 1),
511             (DRSUAPI_ATTID_adminCount, 1),
512             (DRSUAPI_ATTID_accountExpires, 3),
513             (DRSUAPI_ATTID_lmPwdHistory, 1),
514             (DRSUAPI_ATTID_sAMAccountName, 1),
515             (DRSUAPI_ATTID_sAMAccountType, 3),
516             (DRSUAPI_ATTID_lastKnownParent, 1),
517             (DRSUAPI_ATTID_objectCategory, 3),
518             (DRSUAPI_ATTID_isRecycled, 2)]
519
520     def test_restore_user(self):
521         print("Test restored user attributes")
522         username = "restore_user"
523         usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
524         samba.tests.delete_force(self.samdb, usr_dn)
525         self.samdb.add({
526             "dn": usr_dn,
527             "objectClass": "user",
528             "sAMAccountName": username})
529         obj = self.search_dn(usr_dn)
530         guid = obj["objectGUID"][0]
531         sid = obj["objectSID"][0]
532         obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
533         self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj)
534         self._check_metadata(obj_rmd["replPropertyMetaData"],
535                              self._expected_user_add_metadata())
536         self.samdb.delete(usr_dn)
537         obj_del = self.search_guid(guid)
538         obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
539         orig_attrs = set(obj.keys())
540         del_attrs = set(obj_del.keys())
541         self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del)
542         self._check_metadata(obj_del_rmd["replPropertyMetaData"],
543                              self._expected_user_del_metadata())
544         # restore the user and fetch what's restored
545         self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
546         obj_restore = self.search_guid(guid)
547         obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
548         # check original attributes and restored one are same
549         orig_attrs = set(obj.keys())
550         # windows restore more attributes that originally we have
551         orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
552         rest_attrs = set(obj_restore.keys())
553         self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
554         self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
555                              self._expected_user_restore_metadata())
556
557 class RestoreUserPwdObjectTestCase(RestoredObjectAttributesBaseTestCase):
558     """Test cases for delete/reanimate user objects with password"""
559
560     def _expected_userpw_add_attributes(self, username, user_dn, category):
561         return {'dn': user_dn,
562                 'objectClass': '**',
563                 'cn': username,
564                 'distinguishedName': user_dn,
565                 'instanceType': '4',
566                 'whenCreated': '**',
567                 'whenChanged': '**',
568                 'uSNCreated': '**',
569                 'uSNChanged': '**',
570                 'name': username,
571                 'objectGUID': '**',
572                 'userAccountControl': '546',
573                 'badPwdCount': '0',
574                 'badPasswordTime': '0',
575                 'codePage': '0',
576                 'countryCode': '0',
577                 'lastLogon': '0',
578                 'lastLogoff': '0',
579                 'pwdLastSet': '**',
580                 'primaryGroupID': '513',
581                 'objectSid': '**',
582                 'accountExpires': '9223372036854775807',
583                 'logonCount': '0',
584                 'sAMAccountName': username,
585                 'sAMAccountType': '805306368',
586                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
587                 }
588
589     def _expected_userpw_add_metadata(self):
590         return [
591             (DRSUAPI_ATTID_objectClass, 1),
592             (DRSUAPI_ATTID_cn, 1),
593             (DRSUAPI_ATTID_instanceType, 1),
594             (DRSUAPI_ATTID_whenCreated, 1),
595             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
596             (DRSUAPI_ATTID_name, 1),
597             (DRSUAPI_ATTID_userAccountControl, None),
598             (DRSUAPI_ATTID_codePage, 1),
599             (DRSUAPI_ATTID_countryCode, 1),
600             (DRSUAPI_ATTID_dBCSPwd, 1),
601             (DRSUAPI_ATTID_logonHours, 1),
602             (DRSUAPI_ATTID_unicodePwd, 1),
603             (DRSUAPI_ATTID_ntPwdHistory, 1),
604             (DRSUAPI_ATTID_pwdLastSet, 1),
605             (DRSUAPI_ATTID_primaryGroupID, 1),
606             (DRSUAPI_ATTID_supplementalCredentials, 1),
607             (DRSUAPI_ATTID_objectSid, 1),
608             (DRSUAPI_ATTID_accountExpires, 1),
609             (DRSUAPI_ATTID_lmPwdHistory, 1),
610             (DRSUAPI_ATTID_sAMAccountName, 1),
611             (DRSUAPI_ATTID_sAMAccountType, 1),
612             (DRSUAPI_ATTID_objectCategory, 1)]
613
614     def _expected_userpw_del_attributes(self, username, _guid, _sid):
615         guid = ndr_unpack(misc.GUID, _guid)
616         dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
617         cn = "%s\nDEL:%s" % (username, guid)
618         return {'dn': dn,
619                 'objectClass': '**',
620                 'cn': cn,
621                 'distinguishedName': dn,
622                 'isDeleted': 'TRUE',
623                 'isRecycled': 'TRUE',
624                 'instanceType': '4',
625                 'whenCreated': '**',
626                 'whenChanged': '**',
627                 'uSNCreated': '**',
628                 'uSNChanged': '**',
629                 'name': cn,
630                 'objectGUID': _guid,
631                 'userAccountControl': '546',
632                 'objectSid': _sid,
633                 'sAMAccountName': username,
634                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
635                 }
636
637     def _expected_userpw_del_metadata(self):
638         return [
639             (DRSUAPI_ATTID_objectClass, 1),
640             (DRSUAPI_ATTID_cn, 2),
641             (DRSUAPI_ATTID_instanceType, 1),
642             (DRSUAPI_ATTID_whenCreated, 1),
643             (DRSUAPI_ATTID_isDeleted, 1),
644             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
645             (DRSUAPI_ATTID_name, 2),
646             (DRSUAPI_ATTID_userAccountControl, None),
647             (DRSUAPI_ATTID_codePage, 2),
648             (DRSUAPI_ATTID_countryCode, 2),
649             (DRSUAPI_ATTID_dBCSPwd, 1),
650             (DRSUAPI_ATTID_logonHours, 1),
651             (DRSUAPI_ATTID_unicodePwd, 2),
652             (DRSUAPI_ATTID_ntPwdHistory, 2),
653             (DRSUAPI_ATTID_pwdLastSet, 2),
654             (DRSUAPI_ATTID_primaryGroupID, 2),
655             (DRSUAPI_ATTID_supplementalCredentials, 2),
656             (DRSUAPI_ATTID_objectSid, 1),
657             (DRSUAPI_ATTID_accountExpires, 2),
658             (DRSUAPI_ATTID_lmPwdHistory, 2),
659             (DRSUAPI_ATTID_sAMAccountName, 1),
660             (DRSUAPI_ATTID_sAMAccountType, 2),
661             (DRSUAPI_ATTID_lastKnownParent, 1),
662             (DRSUAPI_ATTID_objectCategory, 2),
663             (DRSUAPI_ATTID_isRecycled, 1)]
664
665     def _expected_userpw_restore_attributes(self, username, guid, sid, user_dn, category):
666         return {'dn': user_dn,
667                 'objectClass': '**',
668                 'cn': username,
669                 'distinguishedName': user_dn,
670                 'instanceType': '4',
671                 'whenCreated': '**',
672                 'whenChanged': '**',
673                 'uSNCreated': '**',
674                 'uSNChanged': '**',
675                 'name': username,
676                 'objectGUID': guid,
677                 'userAccountControl': '546',
678                 'badPwdCount': '0',
679                 'badPasswordTime': '0',
680                 'codePage': '0',
681                 'countryCode': '0',
682                 'lastLogon': '0',
683                 'lastLogoff': '0',
684                 'pwdLastSet': '**',
685                 'primaryGroupID': '513',
686                 'operatorCount': '0',
687                 'objectSid': sid,
688                 'adminCount': '0',
689                 'accountExpires': '0',
690                 'logonCount': '0',
691                 'sAMAccountName': username,
692                 'sAMAccountType': '805306368',
693                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
694                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
695                 }
696
697     def _expected_userpw_restore_metadata(self):
698         return [
699             (DRSUAPI_ATTID_objectClass, 1),
700             (DRSUAPI_ATTID_cn, 3),
701             (DRSUAPI_ATTID_instanceType, 1),
702             (DRSUAPI_ATTID_whenCreated, 1),
703             (DRSUAPI_ATTID_isDeleted, 2),
704             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
705             (DRSUAPI_ATTID_name, 3),
706             (DRSUAPI_ATTID_userAccountControl, None),
707             (DRSUAPI_ATTID_codePage, 3),
708             (DRSUAPI_ATTID_countryCode, 3),
709             (DRSUAPI_ATTID_dBCSPwd, 2),
710             (DRSUAPI_ATTID_logonHours, 1),
711             (DRSUAPI_ATTID_unicodePwd, 3),
712             (DRSUAPI_ATTID_ntPwdHistory, 3),
713             (DRSUAPI_ATTID_pwdLastSet, 4),
714             (DRSUAPI_ATTID_primaryGroupID, 3),
715             (DRSUAPI_ATTID_supplementalCredentials, 3),
716             (DRSUAPI_ATTID_operatorCount, 1),
717             (DRSUAPI_ATTID_objectSid, 1),
718             (DRSUAPI_ATTID_adminCount, 1),
719             (DRSUAPI_ATTID_accountExpires, 3),
720             (DRSUAPI_ATTID_lmPwdHistory, 3),
721             (DRSUAPI_ATTID_sAMAccountName, 1),
722             (DRSUAPI_ATTID_sAMAccountType, 3),
723             (DRSUAPI_ATTID_lastKnownParent, 1),
724             (DRSUAPI_ATTID_objectCategory, 3),
725             (DRSUAPI_ATTID_isRecycled, 2)]
726
727     def test_restorepw_user(self):
728         print("Test restored user attributes")
729         username = "restorepw_user"
730         usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
731         samba.tests.delete_force(self.samdb, usr_dn)
732         self.samdb.add({
733             "dn": usr_dn,
734             "objectClass": "user",
735             "userPassword": "thatsAcomplPASS0",
736             "sAMAccountName": username})
737         obj = self.search_dn(usr_dn)
738         guid = obj["objectGUID"][0]
739         sid = obj["objectSID"][0]
740         obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
741         self.assertAttributesExists(self._expected_userpw_add_attributes(username, usr_dn, "Person"), obj)
742         self._check_metadata(obj_rmd["replPropertyMetaData"],
743                              self._expected_userpw_add_metadata())
744         self.samdb.delete(usr_dn)
745         obj_del = self.search_guid(guid)
746         obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
747         orig_attrs = set(obj.keys())
748         del_attrs = set(obj_del.keys())
749         self.assertAttributesExists(self._expected_userpw_del_attributes(username, guid, sid), obj_del)
750         self._check_metadata(obj_del_rmd["replPropertyMetaData"],
751                              self._expected_userpw_del_metadata())
752         # restore the user and fetch what's restored
753         self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn, {"userPassword": ["thatsAcomplPASS1"]})
754         obj_restore = self.search_guid(guid)
755         obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
756         # check original attributes and restored one are same
757         orig_attrs = set(obj.keys())
758         # windows restore more attributes that originally we have
759         orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
760         rest_attrs = set(obj_restore.keys())
761         self.assertAttributesExists(self._expected_userpw_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
762         self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
763                              self._expected_userpw_restore_metadata())
764
765 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
766     """Test different scenarios for delete/reanimate group objects"""
767
768     def _make_object_dn(self, name):
769         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
770
771     def _create_test_user(self, user_name):
772         user_dn = self._make_object_dn(user_name)
773         ldif = {
774             "dn": user_dn,
775             "objectClass": "user",
776             "sAMAccountName": user_name,
777         }
778         # delete an object if leftover from previous test
779         samba.tests.delete_force(self.samdb, user_dn)
780         # finally, create the group
781         self.samdb.add(ldif)
782         return self.search_dn(user_dn)
783
784     def _create_test_group(self, group_name, members=None):
785         group_dn = self._make_object_dn(group_name)
786         ldif = {
787             "dn": group_dn,
788             "objectClass": "group",
789             "sAMAccountName": group_name,
790         }
791         try:
792             ldif["member"] = [str(usr_dn) for usr_dn in members]
793         except TypeError:
794             pass
795         # delete an object if leftover from previous test
796         samba.tests.delete_force(self.samdb, group_dn)
797         # finally, create the group
798         self.samdb.add(ldif)
799         return self.search_dn(group_dn)
800
801     def _expected_group_attributes(self, groupname, group_dn, category):
802         return {'dn': group_dn,
803                 'groupType': '-2147483646',
804                 'distinguishedName': group_dn,
805                 'sAMAccountName': groupname,
806                 'name': groupname,
807                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
808                 'objectClass': '**',
809                 'objectGUID': '**',
810                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
811                 'whenChanged': '**',
812                 'sAMAccountType': '268435456',
813                 'objectSid': '**',
814                 'whenCreated': '**',
815                 'uSNCreated': '**',
816                 'operatorCount': '0',
817                 'uSNChanged': '**',
818                 'instanceType': '4',
819                 'adminCount': '0',
820                 'cn': groupname}
821
822     def test_plain_group(self):
823         print("Test restored Group attributes")
824         # create test group
825         obj = self._create_test_group("r_group")
826         guid = obj["objectGUID"][0]
827         # delete the group
828         self.samdb.delete(str(obj.dn))
829         obj_del = self.search_guid(guid)
830         # restore the Group and fetch what's restored
831         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
832         obj_restore = self.search_guid(guid)
833         # check original attributes and restored one are same
834         attr_orig = set(obj.keys())
835         # windows restore more attributes that originally we have
836         attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
837         attr_rest = set(obj_restore.keys())
838         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
839         self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
840
841     def test_group_with_members(self):
842         print("Test restored Group with members attributes")
843         # create test group
844         usr1 = self._create_test_user("r_user_1")
845         usr2 = self._create_test_user("r_user_2")
846         obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
847         guid = obj["objectGUID"][0]
848         # delete the group
849         self.samdb.delete(str(obj.dn))
850         obj_del = self.search_guid(guid)
851         # restore the Group and fetch what's restored
852         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
853         obj_restore = self.search_guid(guid)
854         # check original attributes and restored one are same
855         attr_orig = set(obj.keys())
856         # windows restore more attributes that originally we have
857         attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
858         # and does not restore following attributes
859         attr_orig.remove("member")
860         attr_rest = set(obj_restore.keys())
861         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
862         self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
863
864
865 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
866     """Test different scenarios for delete/reanimate OU/container objects"""
867
868     def _expected_container_attributes(self, rdn, name, dn, category):
869         if rdn == 'OU':
870             lastKnownParent = '%s' % self.base_dn
871         else:
872             lastKnownParent = 'CN=Users,%s' % self.base_dn
873         return {'dn': dn,
874                 'distinguishedName': dn,
875                 'name': name,
876                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
877                 'objectClass': '**',
878                 'objectGUID': '**',
879                 'lastKnownParent': lastKnownParent,
880                 'whenChanged': '**',
881                 'whenCreated': '**',
882                 'uSNCreated': '**',
883                 'uSNChanged': '**',
884                 'instanceType': '4',
885                 rdn.lower(): name}
886
887     def _create_test_ou(self, rdn, name=None, description=None):
888         ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
889         # delete an object if leftover from previous test
890         samba.tests.delete_force(self.samdb, ou_dn)
891         # create ou and return created object
892         self.samdb.create_ou(ou_dn, name=name, description=description)
893         return self.search_dn(ou_dn)
894
895     def test_ou_with_name_description(self):
896         print("Test OU reanimation")
897         # create OU to test with
898         obj = self._create_test_ou(rdn="r_ou",
899                                    name="r_ou name",
900                                    description="r_ou description")
901         guid = obj["objectGUID"][0]
902         # delete the object
903         self.samdb.delete(str(obj.dn))
904         obj_del = self.search_guid(guid)
905         # restore the Object and fetch what's restored
906         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
907         obj_restore = self.search_guid(guid)
908         # check original attributes and restored one are same
909         attr_orig = set(obj.keys())
910         attr_rest = set(obj_restore.keys())
911         # windows restore more attributes that originally we have
912         attr_orig.update(["lastKnownParent"])
913         # and does not restore following attributes
914         attr_orig -= set(["description"])
915         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
916         expected_attrs = self._expected_container_attributes("OU", "r_ou", str(obj.dn), "Organizational-Unit")
917         self.assertAttributesExists(expected_attrs, obj_restore)
918
919     def test_container(self):
920         print("Test Container reanimation")
921         # create test Container
922         obj = self._create_object({
923             "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
924             "objectClass": "container"
925         })
926         guid = obj["objectGUID"][0]
927         # delete the object
928         self.samdb.delete(str(obj.dn))
929         obj_del = self.search_guid(guid)
930         # restore the Object and fetch what's restored
931         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
932         obj_restore = self.search_guid(guid)
933         # check original attributes and restored one are same
934         attr_orig = set(obj.keys())
935         attr_rest = set(obj_restore.keys())
936         # windows restore more attributes that originally we have
937         attr_orig.update(["lastKnownParent"])
938         # and does not restore following attributes
939         attr_orig -= set(["showInAdvancedViewOnly"])
940         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
941         expected_attrs = self._expected_container_attributes("CN", "r_container",
942                                                              str(obj.dn), "Container")
943         self.assertAttributesExists(expected_attrs, obj_restore)
944
945
946 if __name__ == '__main__':
947     unittest.main()