tests/krb5: Allow replicating accounts to the RODC
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Mon, 13 Sep 2021 10:13:24 +0000 (22:13 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 15 Sep 2021 07:59:31 +0000 (07:59 +0000)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14642

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
python/samba/tests/krb5/kdc_base_test.py

index 3681d26bb83f29504d4bddc78b2d8e2ef5c56cc5..56102cfc4ead62d26ce3e6aeec239e5f8f0607b4 100644 (file)
@@ -31,8 +31,9 @@ from samba import generate_random_password
 from samba.auth import system_session
 from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
 from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
-from samba.drs_utils import drsuapi_connect
+from samba.drs_utils import drs_Replicate, drsuapi_connect
 from samba.dsdb import (
+    DSDB_SYNTAX_BINARY_DN,
     DS_DOMAIN_FUNCTION_2000,
     DS_DOMAIN_FUNCTION_2008,
     DS_GUID_COMPUTERS_CONTAINER,
@@ -45,7 +46,7 @@ from samba.dsdb import (
 )
 from samba.ndr import ndr_pack, ndr_unpack
 from samba import net
-from samba.samdb import SamDB
+from samba.samdb import SamDB, dsdb_Dn
 
 from samba.tests import delete_force
 import samba.tests.krb5.kcrypto as kcrypto
@@ -104,12 +105,20 @@ class KDCBaseTest(RawKerberosTest):
 
         cls.account_cache = {}
 
+        cls.ldb_cleanups = []
+
     @classmethod
     def tearDownClass(cls):
         # Clean up any accounts created by create_account. This is
         # done in tearDownClass() rather than tearDown(), so that
         # accounts need only be created once for permutation tests.
         if cls._ldb is not None:
+            for cleanup in reversed(cls.ldb_cleanups):
+                try:
+                    cls._ldb.modify(cleanup)
+                except ldb.LdbError:
+                    pass
+
             for dn in cls.accounts:
                 delete_force(cls._ldb, dn)
         super().tearDownClass()
@@ -250,6 +259,76 @@ class KDCBaseTest(RawKerberosTest):
 
         return (creds, dn)
 
+    def replicate_account_to_rodc(self, dn):
+        samdb = self.get_samdb()
+        rodc_samdb = self.get_rodc_samdb()
+
+        repl_val = f'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
+
+        msg = ldb.Message()
+        msg.dn = ldb.Dn(rodc_samdb, '')
+        msg['replicateSingleObject'] = ldb.MessageElement(
+            repl_val,
+            ldb.FLAG_MOD_REPLACE,
+            'replicateSingleObject')
+
+        try:
+            # Try replication using the replicateSingleObject rootDSE
+            # operation.
+            rodc_samdb.modify(msg)
+        except ldb.LdbError as err:
+            enum, estr = err.args
+            self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
+            self.assertIn('rootdse_modify: unknown attribute to change!',
+                          estr)
+
+            # If that method wasn't supported, we may be in the rodc:local test
+            # environment, where we can try replicating to the local database.
+
+            lp = self.get_lp()
+
+            rodc_creds = Credentials()
+            rodc_creds.guess(lp)
+            rodc_creds.set_machine_account(lp)
+
+            local_samdb = SamDB(url=None, session_info=system_session(),
+                                credentials=rodc_creds, lp=lp)
+
+            destination_dsa_guid = misc.GUID(local_samdb.get_ntds_GUID())
+
+            repl = drs_Replicate(f'ncacn_ip_tcp:{self.dc_host}[seal]',
+                                 lp, rodc_creds,
+                                 local_samdb, destination_dsa_guid)
+
+            source_dsa_invocation_id = misc.GUID(samdb.invocation_id)
+
+            repl.replicate(dn,
+                           source_dsa_invocation_id,
+                           destination_dsa_guid,
+                           exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                           rodc=True)
+
+    def check_revealed(self, dn, rodc_dn, revealed=True):
+        samdb = self.get_samdb()
+
+        res = samdb.search(base=rodc_dn,
+                           scope=ldb.SCOPE_BASE,
+                           attrs=['msDS-RevealedUsers'])
+
+        revealed_users = res[0].get('msDS-RevealedUsers')
+        if revealed_users is None:
+            self.assertFalse(revealed)
+            return
+
+        revealed_dns = set(str(dsdb_Dn(samdb, str(user),
+                                       syntax_oid=DSDB_SYNTAX_BINARY_DN).dn)
+                           for user in revealed_users)
+
+        if revealed:
+            self.assertIn(str(dn), revealed_dns)
+        else:
+            self.assertNotIn(str(dn), revealed_dns)
+
     def get_secrets(self, samdb, dn,
                     destination_dsa_guid,
                     source_dsa_invocation_id):
@@ -375,6 +454,29 @@ class KDCBaseTest(RawKerberosTest):
         creds.set_tgs_supported_enctypes(supported_enctypes)
         creds.set_ap_supported_enctypes(supported_enctypes)
 
+    def add_to_group(self, account_dn, group_dn, group_attr):
+        samdb = self.get_samdb()
+
+        res = samdb.search(base=group_dn,
+                           scope=ldb.SCOPE_BASE,
+                           attrs=[group_attr])
+        orig_msg = res[0]
+
+        members = list(orig_msg[group_attr])
+        members.append(account_dn)
+
+        msg = ldb.Message()
+        msg.dn = group_dn
+        msg[group_attr] = ldb.MessageElement(members,
+                                             ldb.FLAG_MOD_REPLACE,
+                                             group_attr)
+
+        cleanup = samdb.msg_diff(msg, orig_msg)
+        self.ldb_cleanups.append(cleanup)
+        samdb.modify(msg)
+
+        return cleanup
+
     def get_cached_creds(self, *,
                          machine_account,
                          opts=None):
@@ -382,6 +484,9 @@ class KDCBaseTest(RawKerberosTest):
             opts = {}
 
         opts_default = {
+            'allowed_replication': False,
+            'denied_replication': False,
+            'revealed_to_rodc': False,
             'no_auth_data_required': False,
             'supported_enctypes': None,
             'not_delegated': False,
@@ -407,6 +512,9 @@ class KDCBaseTest(RawKerberosTest):
 
     def create_account_opts(self, *,
                             machine_account,
+                            allowed_replication,
+                            denied_replication,
+                            revealed_to_rodc,
                             no_auth_data_required,
                             supported_enctypes,
                             not_delegated,
@@ -420,6 +528,9 @@ class KDCBaseTest(RawKerberosTest):
             self.assertFalse(trusted_to_auth_for_delegation)
 
         samdb = self.get_samdb()
+        rodc_samdb = self.get_rodc_samdb()
+
+        rodc_dn = self.get_server_dn(rodc_samdb)
 
         user_name = self.account_base + str(self.account_id)
         type(self).account_id += 1
@@ -475,6 +586,32 @@ class KDCBaseTest(RawKerberosTest):
 
             creds.set_tgs_supported_enctypes(tgs_enctypes)
 
+        # Handle secret replication to the RODC.
+
+        if allowed_replication or revealed_to_rodc:
+            # Allow replicating this account's secrets if requested, or allow
+            # it only temporarily if we're about to replicate them.
+            allowed_cleanup = self.add_to_group(
+                dn, rodc_dn,
+                'msDS-RevealOnDemandGroup')
+
+            if revealed_to_rodc:
+                # Replicate this account's secrets to the RODC.
+                self.replicate_account_to_rodc(dn)
+
+            if not allowed_replication:
+                # If we don't want replicating secrets to be allowed for this
+                # account, disable it again.
+                samdb.modify(allowed_cleanup)
+
+            self.check_revealed(dn,
+                                rodc_dn,
+                                revealed=revealed_to_rodc)
+
+        if denied_replication:
+            # Deny replicating this account's secrets to the RODC.
+            self.add_to_group(dn, rodc_dn, 'msDS-NeverRevealGroup')
+
         return creds
 
     def get_client_creds(self,