s4:dsdb/tests: improve the RestoreUserObjectTestCase test
authorStefan Metzmacher <metze@samba.org>
Fri, 8 Jul 2016 13:26:18 +0000 (15:26 +0200)
committerStefan Metzmacher <metze@samba.org>
Sat, 9 Jul 2016 13:06:19 +0000 (15:06 +0200)
We verify attributes, values and their replication metadata after
each step (add, delete, reanimate).

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
source4/dsdb/tests/python/tombstone_reanimation.py

index 3fcdca3854def116037be335ecfa6463eaafdc3c..cabde6ec357e9b25061f0d1648cede0b95372ecf 100755 (executable)
@@ -24,6 +24,12 @@ import unittest
 sys.path.insert(0, "bin/python")
 import samba
 
+from samba.ndr import ndr_unpack, ndr_print
+from samba.dcerpc import misc
+from samba.dcerpc import security
+from samba.dcerpc import drsblobs
+from samba.dcerpc.drsuapi import *
+
 import samba.tests
 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
                  MessageElement, LdbError,
@@ -61,9 +67,10 @@ class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
     def GUID_string(self, guid):
         return self.samdb.schema_format_value("objectGUID", guid)
 
-    def search_guid(self, guid):
+    def search_guid(self, guid, attrs=["*"]):
         res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
-                                scope=SCOPE_BASE, controls=["show_deleted:1"])
+                                scope=SCOPE_BASE, attrs=attrs,
+                                controls=["show_deleted:1"])
         self.assertEquals(len(res), 1)
         return res[0]
 
@@ -133,6 +140,39 @@ class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
                              "Unexpected value (%s) for '%s', expected (%s)" % (
                              str(actual_val), name, expected_val))
 
+    def _check_metadata(self, metadata, expected):
+        repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(metadata[0]))
+
+        repl_array = []
+        for o in repl.ctr.array:
+            repl_array.append((o.attid, o.version))
+        repl_set = set(repl_array)
+
+        expected_set = set(expected)
+        self.assertEqual(len(repl_set), len(expected),
+                         "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % (
+                         str(expected_set.difference(repl_set)),
+                         str(repl_set.difference(expected_set)),
+                         ndr_print(repl)))
+
+        i = 0
+        for o in repl.ctr.array:
+            e = expected[i]
+            (attid, version) = e
+            self.assertEquals(attid, o.attid,
+                              "(LDAP) Wrong attid "
+                              "for expected value %d, wanted 0x%08x got 0x%08x, "
+                              "repl: \n%s"
+                              % (i, attid, o.attid, ndr_print(repl)))
+            # Allow version to be skipped when it does not matter
+            if version is not None:
+                self.assertEquals(o.version, version,
+                                  "(LDAP) Wrong version for expected value %d, "
+                                  "attid 0x%08x, "
+                                  "wanted %d got %d, repl: \n%s"
+                                  % (i, o.attid,
+                                     version, o.version, ndr_print(repl)))
+            i = i + 1
 
     @staticmethod
     def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
@@ -314,7 +354,7 @@ class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
     """Test cases for delete/reanimate user objects"""
 
-    def _expected_user_attributes(self, username, user_dn, category):
+    def _expected_user_add_attributes(self, username, user_dn, category):
         return {'dn': user_dn,
                 'objectClass': '**',
                 'cn': username,
@@ -335,17 +375,149 @@ class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
                 'lastLogoff': '0',
                 'pwdLastSet': '0',
                 'primaryGroupID': '513',
-                'operatorCount': '0',
                 'objectSid': '**',
-                'adminCount': '0',
                 'accountExpires': '9223372036854775807',
                 'logonCount': '0',
                 'sAMAccountName': username,
                 'sAMAccountType': '805306368',
+                'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
+                }
+
+    def _expected_user_add_metadata(self):
+        return [
+            (DRSUAPI_ATTID_objectClass, 1),
+            (DRSUAPI_ATTID_cn, 1),
+            (DRSUAPI_ATTID_instanceType, 1),
+            (DRSUAPI_ATTID_whenCreated, 1),
+            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
+            (DRSUAPI_ATTID_name, 1),
+            (DRSUAPI_ATTID_userAccountControl, None),
+            (DRSUAPI_ATTID_codePage, 1),
+            (DRSUAPI_ATTID_countryCode, 1),
+            (DRSUAPI_ATTID_dBCSPwd, 1),
+            (DRSUAPI_ATTID_logonHours, 1),
+            (DRSUAPI_ATTID_unicodePwd, 1),
+            (DRSUAPI_ATTID_ntPwdHistory, 1),
+            (DRSUAPI_ATTID_pwdLastSet, 1),
+            (DRSUAPI_ATTID_primaryGroupID, 1),
+            (DRSUAPI_ATTID_objectSid, 1),
+            (DRSUAPI_ATTID_accountExpires, 1),
+            (DRSUAPI_ATTID_lmPwdHistory, 1),
+            (DRSUAPI_ATTID_sAMAccountName, 1),
+            (DRSUAPI_ATTID_sAMAccountType, 1),
+            (DRSUAPI_ATTID_objectCategory, 1)]
+
+    def _expected_user_del_attributes(self, username, _guid, _sid):
+        guid = ndr_unpack(misc.GUID, _guid)
+        dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
+        cn = "%s\nDEL:%s" % (username, guid)
+        return {'dn': dn,
+                'objectClass': '**',
+                'cn': cn,
+                'distinguishedName': dn,
+                'isDeleted': 'TRUE',
+                'isRecycled': 'TRUE',
+                'instanceType': '4',
+                'whenCreated': '**',
+                'whenChanged': '**',
+                'uSNCreated': '**',
+                'uSNChanged': '**',
+                'name': cn,
+                'objectGUID': _guid,
+                'userAccountControl': '546',
+                'objectSid': _sid,
+                'sAMAccountName': username,
+                'lastKnownParent': 'CN=Users,%s' % self.base_dn,
+                }
+
+    def _expected_user_del_metadata(self):
+        return [
+            (DRSUAPI_ATTID_objectClass, 1),
+            (DRSUAPI_ATTID_cn, 2),
+            (DRSUAPI_ATTID_instanceType, 1),
+            (DRSUAPI_ATTID_whenCreated, 1),
+            (DRSUAPI_ATTID_isDeleted, 1),
+            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
+            (DRSUAPI_ATTID_name, 2),
+            (DRSUAPI_ATTID_userAccountControl, None),
+            (DRSUAPI_ATTID_codePage, 2),
+            (DRSUAPI_ATTID_countryCode, 2),
+            (DRSUAPI_ATTID_dBCSPwd, 1),
+            (DRSUAPI_ATTID_logonHours, 1),
+            (DRSUAPI_ATTID_unicodePwd, 1),
+            (DRSUAPI_ATTID_ntPwdHistory, 1),
+            (DRSUAPI_ATTID_pwdLastSet, 2),
+            (DRSUAPI_ATTID_primaryGroupID, 2),
+            (DRSUAPI_ATTID_objectSid, 1),
+            (DRSUAPI_ATTID_accountExpires, 2),
+            (DRSUAPI_ATTID_lmPwdHistory, 1),
+            (DRSUAPI_ATTID_sAMAccountName, 1),
+            (DRSUAPI_ATTID_sAMAccountType, 2),
+            (DRSUAPI_ATTID_lastKnownParent, 1),
+            (DRSUAPI_ATTID_objectCategory, 2),
+            (DRSUAPI_ATTID_isRecycled, 1)]
+
+    def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category):
+        return {'dn': user_dn,
+                'objectClass': '**',
+                'cn': username,
+                'distinguishedName': user_dn,
+                'instanceType': '4',
+                'whenCreated': '**',
+                'whenChanged': '**',
+                'uSNCreated': '**',
+                'uSNChanged': '**',
+                'name': username,
+                'objectGUID': guid,
+                'userAccountControl': '546',
+                'badPwdCount': '0',
+                'badPasswordTime': '0',
+                'codePage': '0',
+                'countryCode': '0',
+                'lastLogon': '0',
+                'lastLogoff': '0',
+                'pwdLastSet': '0',
+                'primaryGroupID': '513',
+                'operatorCount': '0',
+                'objectSid': sid,
+                'adminCount': '0',
+                'accountExpires': '0',
+                'logonCount': '0',
+                'sAMAccountName': username,
+                'sAMAccountType': '805306368',
                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
                 }
 
+    def _expected_user_restore_metadata(self):
+        return [
+            (DRSUAPI_ATTID_objectClass, 1),
+            (DRSUAPI_ATTID_cn, 3),
+            (DRSUAPI_ATTID_instanceType, 1),
+            (DRSUAPI_ATTID_whenCreated, 1),
+            (DRSUAPI_ATTID_isDeleted, 2),
+            (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
+            (DRSUAPI_ATTID_name, 3),
+            (DRSUAPI_ATTID_userAccountControl, None),
+            (DRSUAPI_ATTID_codePage, 3),
+            (DRSUAPI_ATTID_countryCode, 3),
+            (DRSUAPI_ATTID_dBCSPwd, 1),
+            (DRSUAPI_ATTID_logonHours, 1),
+            (DRSUAPI_ATTID_unicodePwd, 1),
+            (DRSUAPI_ATTID_ntPwdHistory, 1),
+            (DRSUAPI_ATTID_pwdLastSet, 3),
+            (DRSUAPI_ATTID_primaryGroupID, 3),
+            (DRSUAPI_ATTID_operatorCount, 1),
+            (DRSUAPI_ATTID_objectSid, 1),
+            (DRSUAPI_ATTID_adminCount, 1),
+            (DRSUAPI_ATTID_accountExpires, 3),
+            (DRSUAPI_ATTID_lmPwdHistory, 1),
+            (DRSUAPI_ATTID_sAMAccountName, 1),
+            (DRSUAPI_ATTID_sAMAccountType, 3),
+            (DRSUAPI_ATTID_lastKnownParent, 1),
+            (DRSUAPI_ATTID_objectCategory, 3),
+            (DRSUAPI_ATTID_isRecycled, 2)]
+
     def test_restore_user(self):
         print "Test restored user attributes"
         username = "restore_user"
@@ -357,18 +529,31 @@ class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
             "sAMAccountName": username})
         obj = self.search_dn(usr_dn)
         guid = obj["objectGUID"][0]
+        sid = obj["objectSID"][0]
+        obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
+        self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj)
+        self._check_metadata(obj_rmd["replPropertyMetaData"],
+                             self._expected_user_add_metadata())
         self.samdb.delete(usr_dn)
         obj_del = self.search_guid(guid)
+        obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
+        orig_attrs = set(obj.keys())
+        del_attrs = set(obj_del.keys())
+        self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del)
+        self._check_metadata(obj_del_rmd["replPropertyMetaData"],
+                             self._expected_user_del_metadata())
         # restore the user and fetch what's restored
         self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
         obj_restore = self.search_guid(guid)
+        obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
         # check original attributes and restored one are same
         orig_attrs = set(obj.keys())
         # windows restore more attributes that originally we have
         orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
         rest_attrs = set(obj_restore.keys())
-        self.assertAttributesEqual(obj, orig_attrs, obj_restore, rest_attrs)
-        self.assertAttributesExists(self._expected_user_attributes(username, usr_dn, "Person"), obj_restore)
+        self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
+        self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
+                             self._expected_user_restore_metadata())
 
 
 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):