3 # Tombstone reanimation tests
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import print_function
25 sys.path.insert(0, "bin/python")
28 from samba.ndr import ndr_unpack, ndr_print
29 from samba.dcerpc import misc
30 from samba.dcerpc import security
31 from samba.dcerpc import drsblobs
32 from samba.dcerpc.drsuapi import *
33 from samba.tests.password_test import PasswordCommon
36 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
37 MessageElement, LdbError,
38 ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
39 ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
42 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
43 """ verify Samba restores required attributes when
44 user restores a Deleted object
48 super(RestoredObjectAttributesBaseTestCase, self).setUp()
49 self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
50 self.base_dn = self.samdb.domain_dn()
51 self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
52 self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
54 # permit password changes during this test
55 PasswordCommon.allow_password_changes(self, self.samdb)
58 super(RestoredObjectAttributesBaseTestCase, self).tearDown()
60 def GUID_string(self, guid):
61 return self.samdb.schema_format_value("objectGUID", guid)
63 def search_guid(self, guid, attrs=["*"]):
64 res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
65 scope=SCOPE_BASE, attrs=attrs,
66 controls=["show_deleted:1"])
67 self.assertEquals(len(res), 1)
70 def search_dn(self, dn):
71 res = self.samdb.search(expression="(objectClass=*)",
74 controls=["show_recycled:1"])
75 self.assertEquals(len(res), 1)
78 def _create_object(self, msg):
79 """:param msg: dict with dn and attributes to create an object from"""
80 # delete an object if leftover from previous test
81 samba.tests.delete_force(self.samdb, msg['dn'])
83 return self.search_dn(msg['dn'])
85 def assertNamesEqual(self, attrs_expected, attrs_extra):
86 self.assertEqual(attrs_expected, attrs_extra,
87 "Actual object does not have expected attributes, missing from expected (%s), extra (%s)"
88 % (str(attrs_expected.difference(attrs_extra)), str(attrs_extra.difference(attrs_expected))))
90 def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
91 self.assertNamesEqual(attrs_orig, attrs_rest)
92 # remove volatile attributes, they can't be equal
93 attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
94 for attr in attrs_orig:
95 # convert original attr value to ldif
96 orig_val = obj_orig.get(attr)
99 if not isinstance(orig_val, MessageElement):
100 orig_val = MessageElement(str(orig_val), 0, attr)
103 orig_ldif = self.samdb.write_ldif(m, 0)
104 # convert restored attr value to ldif
105 rest_val = obj_restored.get(attr)
106 self.assertFalse(rest_val is None)
108 if not isinstance(rest_val, MessageElement):
109 rest_val = MessageElement(str(rest_val), 0, attr)
111 rest_ldif = self.samdb.write_ldif(m, 0)
112 # compare generated ldif's
113 self.assertEqual(orig_ldif, rest_ldif)
115 def assertAttributesExists(self, attr_expected, obj_msg):
116 """Check object contains at least expected attrbigutes
117 :param attr_expected: dict of expected attributes with values. ** is any value
118 :param obj_msg: Ldb.Message for the object under test
120 actual_names = set(obj_msg.keys())
121 # Samba does not use 'dSCorePropagationData', so skip it
122 actual_names -= set(['dSCorePropagationData'])
123 expected_names = set(attr_expected.keys())
124 self.assertNamesEqual(expected_names, actual_names)
125 for name in attr_expected.keys():
126 expected_val = attr_expected[name]
127 actual_val = obj_msg.get(name)
128 self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
129 if expected_val == "**":
130 # "**" values means "any"
132 self.assertEqual(expected_val, str(actual_val),
133 "Unexpected value (%s) for '%s', expected (%s)" % (
134 str(actual_val), name, expected_val))
136 def _check_metadata(self, metadata, expected):
137 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(metadata[0]))
140 for o in repl.ctr.array:
141 repl_array.append((o.attid, o.version))
142 repl_set = set(repl_array)
144 expected_set = set(expected)
145 self.assertEqual(len(repl_set), len(expected),
146 "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % (
147 str(expected_set.difference(repl_set)),
148 str(repl_set.difference(expected_set)),
152 for o in repl.ctr.array:
155 self.assertEquals(attid, o.attid,
156 "(LDAP) Wrong attid "
157 "for expected value %d, wanted 0x%08x got 0x%08x, "
159 % (i, attid, o.attid, ndr_print(repl)))
160 # Allow version to be skipped when it does not matter
161 if version is not None:
162 self.assertEquals(o.version, version,
163 "(LDAP) Wrong version for expected value %d, "
165 "wanted %d got %d, repl: \n%s"
167 version, o.version, ndr_print(repl)))
171 def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
172 """Restores a deleted object
173 :param samdb: SamDB connection to SAM
174 :param del_dn: str Deleted object DN
175 :param new_dn: str Where to restore the object
176 :param new_attrs: dict Additional attributes to set
179 msg.dn = Dn(samdb, str(del_dn))
180 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
181 msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
182 if new_attrs is not None:
183 assert isinstance(new_attrs, dict)
184 for attr in new_attrs:
185 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
186 samdb.modify(msg, ["show_deleted:1"])
189 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
191 super(BaseRestoreObjectTestCase, self).setUp()
193 def enable_recycle_bin(self):
195 msg.dn = Dn(self.samdb, "")
196 msg["enableOptionalFeature"] = MessageElement(
197 "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
198 FLAG_MOD_ADD, "enableOptionalFeature")
200 self.samdb.modify(msg)
201 except LdbError as e:
203 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
205 def test_undelete(self):
206 print("Testing standard undelete operation")
207 usr1 = "cn=testuser,cn=users," + self.base_dn
208 samba.tests.delete_force(self.samdb, usr1)
211 "objectclass": "user",
212 "description": "test user description",
213 "samaccountname": "testuser"})
214 objLive1 = self.search_dn(usr1)
215 guid1 = objLive1["objectGUID"][0]
216 self.samdb.delete(usr1)
217 objDeleted1 = self.search_guid(guid1)
218 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
219 objLive2 = self.search_dn(usr1)
220 self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
221 samba.tests.delete_force(self.samdb, usr1)
223 def test_rename(self):
224 print("Testing attempt to rename deleted object")
225 usr1 = "cn=testuser,cn=users," + self.base_dn
228 "objectclass": "user",
229 "description": "test user description",
230 "samaccountname": "testuser"})
231 objLive1 = self.search_dn(usr1)
232 guid1 = objLive1["objectGUID"][0]
233 self.samdb.delete(usr1)
234 objDeleted1 = self.search_guid(guid1)
235 # just to make sure we get the correct error if the show deleted is missing
237 self.samdb.rename(str(objDeleted1.dn), usr1)
239 except LdbError as e1:
241 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
244 self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
246 except LdbError as e2:
248 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
250 def test_undelete_with_mod(self):
251 print("Testing standard undelete operation with modification of additional attributes")
252 usr1 = "cn=testuser,cn=users," + self.base_dn
255 "objectclass": "user",
256 "description": "test user description",
257 "samaccountname": "testuser"})
258 objLive1 = self.search_dn(usr1)
259 guid1 = objLive1["objectGUID"][0]
260 self.samdb.delete(usr1)
261 objDeleted1 = self.search_guid(guid1)
262 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
263 objLive2 = self.search_dn(usr1)
264 self.assertEqual(objLive2["url"][0], "www.samba.org")
265 samba.tests.delete_force(self.samdb, usr1)
267 def test_undelete_newuser(self):
268 print("Testing undelete user with a different dn")
269 usr1 = "cn=testuser,cn=users," + self.base_dn
270 usr2 = "cn=testuser2,cn=users," + self.base_dn
271 samba.tests.delete_force(self.samdb, usr1)
274 "objectclass": "user",
275 "description": "test user description",
276 "samaccountname": "testuser"})
277 objLive1 = self.search_dn(usr1)
278 guid1 = objLive1["objectGUID"][0]
279 self.samdb.delete(usr1)
280 objDeleted1 = self.search_guid(guid1)
281 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
282 objLive2 = self.search_dn(usr2)
283 samba.tests.delete_force(self.samdb, usr1)
284 samba.tests.delete_force(self.samdb, usr2)
286 def test_undelete_existing(self):
287 print("Testing undelete user after a user with the same dn has been created")
288 usr1 = "cn=testuser,cn=users," + self.base_dn
291 "objectclass": "user",
292 "description": "test user description",
293 "samaccountname": "testuser"})
294 objLive1 = self.search_dn(usr1)
295 guid1 = objLive1["objectGUID"][0]
296 self.samdb.delete(usr1)
299 "objectclass": "user",
300 "description": "test user description",
301 "samaccountname": "testuser"})
302 objDeleted1 = self.search_guid(guid1)
304 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
306 except LdbError as e3:
308 self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
310 def test_undelete_cross_nc(self):
311 print("Cross NC undelete")
312 c1 = "cn=ldaptestcontainer," + self.base_dn
313 c2 = "cn=ldaptestcontainer2," + self.configuration_dn
314 c3 = "cn=ldaptestcontainer," + self.configuration_dn
315 c4 = "cn=ldaptestcontainer2," + self.base_dn
316 samba.tests.delete_force(self.samdb, c1)
317 samba.tests.delete_force(self.samdb, c2)
318 samba.tests.delete_force(self.samdb, c3)
319 samba.tests.delete_force(self.samdb, c4)
322 "objectclass": "container"})
325 "objectclass": "container"})
326 objLive1 = self.search_dn(c1)
327 objLive2 = self.search_dn(c2)
328 guid1 = objLive1["objectGUID"][0]
329 guid2 = objLive2["objectGUID"][0]
330 self.samdb.delete(c1)
331 self.samdb.delete(c2)
332 objDeleted1 = self.search_guid(guid1)
333 objDeleted2 = self.search_guid(guid2)
334 # try to undelete from base dn to config
336 self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
338 except LdbError as e4:
340 self.assertEquals(num, ERR_OPERATIONS_ERROR)
341 #try to undelete from config to base dn
343 self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
345 except LdbError as e5:
347 self.assertEquals(num, ERR_OPERATIONS_ERROR)
348 #assert undeletion will work in same nc
349 self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
350 self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
353 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
354 """Test cases for delete/reanimate user objects"""
356 def _expected_user_add_attributes(self, username, user_dn, category):
357 return {'dn': user_dn,
360 'distinguishedName': user_dn,
368 'userAccountControl': '546',
370 'badPasswordTime': '0',
376 'primaryGroupID': '513',
378 'accountExpires': '9223372036854775807',
380 'sAMAccountName': username,
381 'sAMAccountType': '805306368',
382 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
385 def _expected_user_add_metadata(self):
387 (DRSUAPI_ATTID_objectClass, 1),
388 (DRSUAPI_ATTID_cn, 1),
389 (DRSUAPI_ATTID_instanceType, 1),
390 (DRSUAPI_ATTID_whenCreated, 1),
391 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
392 (DRSUAPI_ATTID_name, 1),
393 (DRSUAPI_ATTID_userAccountControl, None),
394 (DRSUAPI_ATTID_codePage, 1),
395 (DRSUAPI_ATTID_countryCode, 1),
396 (DRSUAPI_ATTID_dBCSPwd, 1),
397 (DRSUAPI_ATTID_logonHours, 1),
398 (DRSUAPI_ATTID_unicodePwd, 1),
399 (DRSUAPI_ATTID_ntPwdHistory, 1),
400 (DRSUAPI_ATTID_pwdLastSet, 1),
401 (DRSUAPI_ATTID_primaryGroupID, 1),
402 (DRSUAPI_ATTID_objectSid, 1),
403 (DRSUAPI_ATTID_accountExpires, 1),
404 (DRSUAPI_ATTID_lmPwdHistory, 1),
405 (DRSUAPI_ATTID_sAMAccountName, 1),
406 (DRSUAPI_ATTID_sAMAccountType, 1),
407 (DRSUAPI_ATTID_objectCategory, 1)]
409 def _expected_user_del_attributes(self, username, _guid, _sid):
410 guid = ndr_unpack(misc.GUID, _guid)
411 dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
412 cn = "%s\nDEL:%s" % (username, guid)
416 'distinguishedName': dn,
418 'isRecycled': 'TRUE',
426 'userAccountControl': '546',
428 'sAMAccountName': username,
429 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
432 def _expected_user_del_metadata(self):
434 (DRSUAPI_ATTID_objectClass, 1),
435 (DRSUAPI_ATTID_cn, 2),
436 (DRSUAPI_ATTID_instanceType, 1),
437 (DRSUAPI_ATTID_whenCreated, 1),
438 (DRSUAPI_ATTID_isDeleted, 1),
439 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
440 (DRSUAPI_ATTID_name, 2),
441 (DRSUAPI_ATTID_userAccountControl, None),
442 (DRSUAPI_ATTID_codePage, 2),
443 (DRSUAPI_ATTID_countryCode, 2),
444 (DRSUAPI_ATTID_dBCSPwd, 1),
445 (DRSUAPI_ATTID_logonHours, 1),
446 (DRSUAPI_ATTID_unicodePwd, 1),
447 (DRSUAPI_ATTID_ntPwdHistory, 1),
448 (DRSUAPI_ATTID_pwdLastSet, 2),
449 (DRSUAPI_ATTID_primaryGroupID, 2),
450 (DRSUAPI_ATTID_objectSid, 1),
451 (DRSUAPI_ATTID_accountExpires, 2),
452 (DRSUAPI_ATTID_lmPwdHistory, 1),
453 (DRSUAPI_ATTID_sAMAccountName, 1),
454 (DRSUAPI_ATTID_sAMAccountType, 2),
455 (DRSUAPI_ATTID_lastKnownParent, 1),
456 (DRSUAPI_ATTID_objectCategory, 2),
457 (DRSUAPI_ATTID_isRecycled, 1)]
459 def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category):
460 return {'dn': user_dn,
463 'distinguishedName': user_dn,
471 'userAccountControl': '546',
473 'badPasswordTime': '0',
479 'primaryGroupID': '513',
480 'operatorCount': '0',
483 'accountExpires': '0',
485 'sAMAccountName': username,
486 'sAMAccountType': '805306368',
487 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
488 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
491 def _expected_user_restore_metadata(self):
493 (DRSUAPI_ATTID_objectClass, 1),
494 (DRSUAPI_ATTID_cn, 3),
495 (DRSUAPI_ATTID_instanceType, 1),
496 (DRSUAPI_ATTID_whenCreated, 1),
497 (DRSUAPI_ATTID_isDeleted, 2),
498 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
499 (DRSUAPI_ATTID_name, 3),
500 (DRSUAPI_ATTID_userAccountControl, None),
501 (DRSUAPI_ATTID_codePage, 3),
502 (DRSUAPI_ATTID_countryCode, 3),
503 (DRSUAPI_ATTID_dBCSPwd, 1),
504 (DRSUAPI_ATTID_logonHours, 1),
505 (DRSUAPI_ATTID_unicodePwd, 1),
506 (DRSUAPI_ATTID_ntPwdHistory, 1),
507 (DRSUAPI_ATTID_pwdLastSet, 3),
508 (DRSUAPI_ATTID_primaryGroupID, 3),
509 (DRSUAPI_ATTID_operatorCount, 1),
510 (DRSUAPI_ATTID_objectSid, 1),
511 (DRSUAPI_ATTID_adminCount, 1),
512 (DRSUAPI_ATTID_accountExpires, 3),
513 (DRSUAPI_ATTID_lmPwdHistory, 1),
514 (DRSUAPI_ATTID_sAMAccountName, 1),
515 (DRSUAPI_ATTID_sAMAccountType, 3),
516 (DRSUAPI_ATTID_lastKnownParent, 1),
517 (DRSUAPI_ATTID_objectCategory, 3),
518 (DRSUAPI_ATTID_isRecycled, 2)]
520 def test_restore_user(self):
521 print("Test restored user attributes")
522 username = "restore_user"
523 usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
524 samba.tests.delete_force(self.samdb, usr_dn)
527 "objectClass": "user",
528 "sAMAccountName": username})
529 obj = self.search_dn(usr_dn)
530 guid = obj["objectGUID"][0]
531 sid = obj["objectSID"][0]
532 obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
533 self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj)
534 self._check_metadata(obj_rmd["replPropertyMetaData"],
535 self._expected_user_add_metadata())
536 self.samdb.delete(usr_dn)
537 obj_del = self.search_guid(guid)
538 obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
539 orig_attrs = set(obj.keys())
540 del_attrs = set(obj_del.keys())
541 self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del)
542 self._check_metadata(obj_del_rmd["replPropertyMetaData"],
543 self._expected_user_del_metadata())
544 # restore the user and fetch what's restored
545 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
546 obj_restore = self.search_guid(guid)
547 obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
548 # check original attributes and restored one are same
549 orig_attrs = set(obj.keys())
550 # windows restore more attributes that originally we have
551 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
552 rest_attrs = set(obj_restore.keys())
553 self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
554 self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
555 self._expected_user_restore_metadata())
557 class RestoreUserPwdObjectTestCase(RestoredObjectAttributesBaseTestCase):
558 """Test cases for delete/reanimate user objects with password"""
560 def _expected_userpw_add_attributes(self, username, user_dn, category):
561 return {'dn': user_dn,
564 'distinguishedName': user_dn,
572 'userAccountControl': '546',
574 'badPasswordTime': '0',
580 'primaryGroupID': '513',
582 'accountExpires': '9223372036854775807',
584 'sAMAccountName': username,
585 'sAMAccountType': '805306368',
586 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
589 def _expected_userpw_add_metadata(self):
591 (DRSUAPI_ATTID_objectClass, 1),
592 (DRSUAPI_ATTID_cn, 1),
593 (DRSUAPI_ATTID_instanceType, 1),
594 (DRSUAPI_ATTID_whenCreated, 1),
595 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
596 (DRSUAPI_ATTID_name, 1),
597 (DRSUAPI_ATTID_userAccountControl, None),
598 (DRSUAPI_ATTID_codePage, 1),
599 (DRSUAPI_ATTID_countryCode, 1),
600 (DRSUAPI_ATTID_dBCSPwd, 1),
601 (DRSUAPI_ATTID_logonHours, 1),
602 (DRSUAPI_ATTID_unicodePwd, 1),
603 (DRSUAPI_ATTID_ntPwdHistory, 1),
604 (DRSUAPI_ATTID_pwdLastSet, 1),
605 (DRSUAPI_ATTID_primaryGroupID, 1),
606 (DRSUAPI_ATTID_supplementalCredentials, 1),
607 (DRSUAPI_ATTID_objectSid, 1),
608 (DRSUAPI_ATTID_accountExpires, 1),
609 (DRSUAPI_ATTID_lmPwdHistory, 1),
610 (DRSUAPI_ATTID_sAMAccountName, 1),
611 (DRSUAPI_ATTID_sAMAccountType, 1),
612 (DRSUAPI_ATTID_objectCategory, 1)]
614 def _expected_userpw_del_attributes(self, username, _guid, _sid):
615 guid = ndr_unpack(misc.GUID, _guid)
616 dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
617 cn = "%s\nDEL:%s" % (username, guid)
621 'distinguishedName': dn,
623 'isRecycled': 'TRUE',
631 'userAccountControl': '546',
633 'sAMAccountName': username,
634 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
637 def _expected_userpw_del_metadata(self):
639 (DRSUAPI_ATTID_objectClass, 1),
640 (DRSUAPI_ATTID_cn, 2),
641 (DRSUAPI_ATTID_instanceType, 1),
642 (DRSUAPI_ATTID_whenCreated, 1),
643 (DRSUAPI_ATTID_isDeleted, 1),
644 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
645 (DRSUAPI_ATTID_name, 2),
646 (DRSUAPI_ATTID_userAccountControl, None),
647 (DRSUAPI_ATTID_codePage, 2),
648 (DRSUAPI_ATTID_countryCode, 2),
649 (DRSUAPI_ATTID_dBCSPwd, 1),
650 (DRSUAPI_ATTID_logonHours, 1),
651 (DRSUAPI_ATTID_unicodePwd, 2),
652 (DRSUAPI_ATTID_ntPwdHistory, 2),
653 (DRSUAPI_ATTID_pwdLastSet, 2),
654 (DRSUAPI_ATTID_primaryGroupID, 2),
655 (DRSUAPI_ATTID_supplementalCredentials, 2),
656 (DRSUAPI_ATTID_objectSid, 1),
657 (DRSUAPI_ATTID_accountExpires, 2),
658 (DRSUAPI_ATTID_lmPwdHistory, 2),
659 (DRSUAPI_ATTID_sAMAccountName, 1),
660 (DRSUAPI_ATTID_sAMAccountType, 2),
661 (DRSUAPI_ATTID_lastKnownParent, 1),
662 (DRSUAPI_ATTID_objectCategory, 2),
663 (DRSUAPI_ATTID_isRecycled, 1)]
665 def _expected_userpw_restore_attributes(self, username, guid, sid, user_dn, category):
666 return {'dn': user_dn,
669 'distinguishedName': user_dn,
677 'userAccountControl': '546',
679 'badPasswordTime': '0',
685 'primaryGroupID': '513',
686 'operatorCount': '0',
689 'accountExpires': '0',
691 'sAMAccountName': username,
692 'sAMAccountType': '805306368',
693 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
694 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
697 def _expected_userpw_restore_metadata(self):
699 (DRSUAPI_ATTID_objectClass, 1),
700 (DRSUAPI_ATTID_cn, 3),
701 (DRSUAPI_ATTID_instanceType, 1),
702 (DRSUAPI_ATTID_whenCreated, 1),
703 (DRSUAPI_ATTID_isDeleted, 2),
704 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
705 (DRSUAPI_ATTID_name, 3),
706 (DRSUAPI_ATTID_userAccountControl, None),
707 (DRSUAPI_ATTID_codePage, 3),
708 (DRSUAPI_ATTID_countryCode, 3),
709 (DRSUAPI_ATTID_dBCSPwd, 2),
710 (DRSUAPI_ATTID_logonHours, 1),
711 (DRSUAPI_ATTID_unicodePwd, 3),
712 (DRSUAPI_ATTID_ntPwdHistory, 3),
713 (DRSUAPI_ATTID_pwdLastSet, 4),
714 (DRSUAPI_ATTID_primaryGroupID, 3),
715 (DRSUAPI_ATTID_supplementalCredentials, 3),
716 (DRSUAPI_ATTID_operatorCount, 1),
717 (DRSUAPI_ATTID_objectSid, 1),
718 (DRSUAPI_ATTID_adminCount, 1),
719 (DRSUAPI_ATTID_accountExpires, 3),
720 (DRSUAPI_ATTID_lmPwdHistory, 3),
721 (DRSUAPI_ATTID_sAMAccountName, 1),
722 (DRSUAPI_ATTID_sAMAccountType, 3),
723 (DRSUAPI_ATTID_lastKnownParent, 1),
724 (DRSUAPI_ATTID_objectCategory, 3),
725 (DRSUAPI_ATTID_isRecycled, 2)]
727 def test_restorepw_user(self):
728 print("Test restored user attributes")
729 username = "restorepw_user"
730 usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
731 samba.tests.delete_force(self.samdb, usr_dn)
734 "objectClass": "user",
735 "userPassword": "thatsAcomplPASS0",
736 "sAMAccountName": username})
737 obj = self.search_dn(usr_dn)
738 guid = obj["objectGUID"][0]
739 sid = obj["objectSID"][0]
740 obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
741 self.assertAttributesExists(self._expected_userpw_add_attributes(username, usr_dn, "Person"), obj)
742 self._check_metadata(obj_rmd["replPropertyMetaData"],
743 self._expected_userpw_add_metadata())
744 self.samdb.delete(usr_dn)
745 obj_del = self.search_guid(guid)
746 obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
747 orig_attrs = set(obj.keys())
748 del_attrs = set(obj_del.keys())
749 self.assertAttributesExists(self._expected_userpw_del_attributes(username, guid, sid), obj_del)
750 self._check_metadata(obj_del_rmd["replPropertyMetaData"],
751 self._expected_userpw_del_metadata())
752 # restore the user and fetch what's restored
753 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn, {"userPassword": ["thatsAcomplPASS1"]})
754 obj_restore = self.search_guid(guid)
755 obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
756 # check original attributes and restored one are same
757 orig_attrs = set(obj.keys())
758 # windows restore more attributes that originally we have
759 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
760 rest_attrs = set(obj_restore.keys())
761 self.assertAttributesExists(self._expected_userpw_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
762 self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
763 self._expected_userpw_restore_metadata())
765 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
766 """Test different scenarios for delete/reanimate group objects"""
768 def _make_object_dn(self, name):
769 return "CN=%s,CN=Users,%s" % (name, self.base_dn)
771 def _create_test_user(self, user_name):
772 user_dn = self._make_object_dn(user_name)
775 "objectClass": "user",
776 "sAMAccountName": user_name,
778 # delete an object if leftover from previous test
779 samba.tests.delete_force(self.samdb, user_dn)
780 # finally, create the group
782 return self.search_dn(user_dn)
784 def _create_test_group(self, group_name, members=None):
785 group_dn = self._make_object_dn(group_name)
788 "objectClass": "group",
789 "sAMAccountName": group_name,
792 ldif["member"] = [str(usr_dn) for usr_dn in members]
795 # delete an object if leftover from previous test
796 samba.tests.delete_force(self.samdb, group_dn)
797 # finally, create the group
799 return self.search_dn(group_dn)
801 def _expected_group_attributes(self, groupname, group_dn, category):
802 return {'dn': group_dn,
803 'groupType': '-2147483646',
804 'distinguishedName': group_dn,
805 'sAMAccountName': groupname,
807 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
810 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
812 'sAMAccountType': '268435456',
816 'operatorCount': '0',
822 def test_plain_group(self):
823 print("Test restored Group attributes")
825 obj = self._create_test_group("r_group")
826 guid = obj["objectGUID"][0]
828 self.samdb.delete(str(obj.dn))
829 obj_del = self.search_guid(guid)
830 # restore the Group and fetch what's restored
831 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
832 obj_restore = self.search_guid(guid)
833 # check original attributes and restored one are same
834 attr_orig = set(obj.keys())
835 # windows restore more attributes that originally we have
836 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
837 attr_rest = set(obj_restore.keys())
838 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
839 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
841 def test_group_with_members(self):
842 print("Test restored Group with members attributes")
844 usr1 = self._create_test_user("r_user_1")
845 usr2 = self._create_test_user("r_user_2")
846 obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
847 guid = obj["objectGUID"][0]
849 self.samdb.delete(str(obj.dn))
850 obj_del = self.search_guid(guid)
851 # restore the Group and fetch what's restored
852 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
853 obj_restore = self.search_guid(guid)
854 # check original attributes and restored one are same
855 attr_orig = set(obj.keys())
856 # windows restore more attributes that originally we have
857 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
858 # and does not restore following attributes
859 attr_orig.remove("member")
860 attr_rest = set(obj_restore.keys())
861 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
862 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
865 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
866 """Test different scenarios for delete/reanimate OU/container objects"""
868 def _expected_container_attributes(self, rdn, name, dn, category):
870 lastKnownParent = '%s' % self.base_dn
872 lastKnownParent = 'CN=Users,%s' % self.base_dn
874 'distinguishedName': dn,
876 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
879 'lastKnownParent': lastKnownParent,
887 def _create_test_ou(self, rdn, name=None, description=None):
888 ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
889 # delete an object if leftover from previous test
890 samba.tests.delete_force(self.samdb, ou_dn)
891 # create ou and return created object
892 self.samdb.create_ou(ou_dn, name=name, description=description)
893 return self.search_dn(ou_dn)
895 def test_ou_with_name_description(self):
896 print("Test OU reanimation")
897 # create OU to test with
898 obj = self._create_test_ou(rdn="r_ou",
900 description="r_ou description")
901 guid = obj["objectGUID"][0]
903 self.samdb.delete(str(obj.dn))
904 obj_del = self.search_guid(guid)
905 # restore the Object and fetch what's restored
906 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
907 obj_restore = self.search_guid(guid)
908 # check original attributes and restored one are same
909 attr_orig = set(obj.keys())
910 attr_rest = set(obj_restore.keys())
911 # windows restore more attributes that originally we have
912 attr_orig.update(["lastKnownParent"])
913 # and does not restore following attributes
914 attr_orig -= set(["description"])
915 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
916 expected_attrs = self._expected_container_attributes("OU", "r_ou", str(obj.dn), "Organizational-Unit")
917 self.assertAttributesExists(expected_attrs, obj_restore)
919 def test_container(self):
920 print("Test Container reanimation")
921 # create test Container
922 obj = self._create_object({
923 "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
924 "objectClass": "container"
926 guid = obj["objectGUID"][0]
928 self.samdb.delete(str(obj.dn))
929 obj_del = self.search_guid(guid)
930 # restore the Object and fetch what's restored
931 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
932 obj_restore = self.search_guid(guid)
933 # check original attributes and restored one are same
934 attr_orig = set(obj.keys())
935 attr_rest = set(obj_restore.keys())
936 # windows restore more attributes that originally we have
937 attr_orig.update(["lastKnownParent"])
938 # and does not restore following attributes
939 attr_orig -= set(["showInAdvancedViewOnly"])
940 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
941 expected_attrs = self._expected_container_attributes("CN", "r_container",
942 str(obj.dn), "Container")
943 self.assertAttributesExists(expected_attrs, obj_restore)
946 if __name__ == '__main__':