from samba.auth import system_session
from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
-from samba.drs_utils import drsuapi_connect
+from samba.drs_utils import drs_Replicate, drsuapi_connect
from samba.dsdb import (
+ DSDB_SYNTAX_BINARY_DN,
DS_DOMAIN_FUNCTION_2000,
DS_DOMAIN_FUNCTION_2008,
DS_GUID_COMPUTERS_CONTAINER,
)
from samba.ndr import ndr_pack, ndr_unpack
from samba import net
-from samba.samdb import SamDB
+from samba.samdb import SamDB, dsdb_Dn
from samba.tests import delete_force
import samba.tests.krb5.kcrypto as kcrypto
cls.account_cache = {}
+ cls.ldb_cleanups = []
+
@classmethod
def tearDownClass(cls):
# Clean up any accounts created by create_account. This is
# done in tearDownClass() rather than tearDown(), so that
# accounts need only be created once for permutation tests.
if cls._ldb is not None:
+ for cleanup in reversed(cls.ldb_cleanups):
+ try:
+ cls._ldb.modify(cleanup)
+ except ldb.LdbError:
+ pass
+
for dn in cls.accounts:
delete_force(cls._ldb, dn)
super().tearDownClass()
return (creds, dn)
+ def replicate_account_to_rodc(self, dn):
+ samdb = self.get_samdb()
+ rodc_samdb = self.get_rodc_samdb()
+
+ repl_val = f'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
+
+ msg = ldb.Message()
+ msg.dn = ldb.Dn(rodc_samdb, '')
+ msg['replicateSingleObject'] = ldb.MessageElement(
+ repl_val,
+ ldb.FLAG_MOD_REPLACE,
+ 'replicateSingleObject')
+
+ try:
+ # Try replication using the replicateSingleObject rootDSE
+ # operation.
+ rodc_samdb.modify(msg)
+ except ldb.LdbError as err:
+ enum, estr = err.args
+ self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
+ self.assertIn('rootdse_modify: unknown attribute to change!',
+ estr)
+
+ # If that method wasn't supported, we may be in the rodc:local test
+ # environment, where we can try replicating to the local database.
+
+ lp = self.get_lp()
+
+ rodc_creds = Credentials()
+ rodc_creds.guess(lp)
+ rodc_creds.set_machine_account(lp)
+
+ local_samdb = SamDB(url=None, session_info=system_session(),
+ credentials=rodc_creds, lp=lp)
+
+ destination_dsa_guid = misc.GUID(local_samdb.get_ntds_GUID())
+
+ repl = drs_Replicate(f'ncacn_ip_tcp:{self.dc_host}[seal]',
+ lp, rodc_creds,
+ local_samdb, destination_dsa_guid)
+
+ source_dsa_invocation_id = misc.GUID(samdb.invocation_id)
+
+ repl.replicate(dn,
+ source_dsa_invocation_id,
+ destination_dsa_guid,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ rodc=True)
+
+ def check_revealed(self, dn, rodc_dn, revealed=True):
+ samdb = self.get_samdb()
+
+ res = samdb.search(base=rodc_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['msDS-RevealedUsers'])
+
+ revealed_users = res[0].get('msDS-RevealedUsers')
+ if revealed_users is None:
+ self.assertFalse(revealed)
+ return
+
+ revealed_dns = set(str(dsdb_Dn(samdb, str(user),
+ syntax_oid=DSDB_SYNTAX_BINARY_DN).dn)
+ for user in revealed_users)
+
+ if revealed:
+ self.assertIn(str(dn), revealed_dns)
+ else:
+ self.assertNotIn(str(dn), revealed_dns)
+
def get_secrets(self, samdb, dn,
destination_dsa_guid,
source_dsa_invocation_id):
creds.set_tgs_supported_enctypes(supported_enctypes)
creds.set_ap_supported_enctypes(supported_enctypes)
+ def add_to_group(self, account_dn, group_dn, group_attr):
+ samdb = self.get_samdb()
+
+ res = samdb.search(base=group_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=[group_attr])
+ orig_msg = res[0]
+
+ members = list(orig_msg[group_attr])
+ members.append(account_dn)
+
+ msg = ldb.Message()
+ msg.dn = group_dn
+ msg[group_attr] = ldb.MessageElement(members,
+ ldb.FLAG_MOD_REPLACE,
+ group_attr)
+
+ cleanup = samdb.msg_diff(msg, orig_msg)
+ self.ldb_cleanups.append(cleanup)
+ samdb.modify(msg)
+
+ return cleanup
+
def get_cached_creds(self, *,
machine_account,
opts=None):
opts = {}
opts_default = {
+ 'allowed_replication': False,
+ 'denied_replication': False,
+ 'revealed_to_rodc': False,
'no_auth_data_required': False,
'supported_enctypes': None,
'not_delegated': False,
def create_account_opts(self, *,
machine_account,
+ allowed_replication,
+ denied_replication,
+ revealed_to_rodc,
no_auth_data_required,
supported_enctypes,
not_delegated,
self.assertFalse(trusted_to_auth_for_delegation)
samdb = self.get_samdb()
+ rodc_samdb = self.get_rodc_samdb()
+
+ rodc_dn = self.get_server_dn(rodc_samdb)
user_name = self.account_base + str(self.account_id)
type(self).account_id += 1
creds.set_tgs_supported_enctypes(tgs_enctypes)
+ # Handle secret replication to the RODC.
+
+ if allowed_replication or revealed_to_rodc:
+ # Allow replicating this account's secrets if requested, or allow
+ # it only temporarily if we're about to replicate them.
+ allowed_cleanup = self.add_to_group(
+ dn, rodc_dn,
+ 'msDS-RevealOnDemandGroup')
+
+ if revealed_to_rodc:
+ # Replicate this account's secrets to the RODC.
+ self.replicate_account_to_rodc(dn)
+
+ if not allowed_replication:
+ # If we don't want replicating secrets to be allowed for this
+ # account, disable it again.
+ samdb.modify(allowed_cleanup)
+
+ self.check_revealed(dn,
+ rodc_dn,
+ revealed=revealed_to_rodc)
+
+ if denied_replication:
+ # Deny replicating this account's secrets to the RODC.
+ self.add_to_group(dn, rodc_dn, 'msDS-NeverRevealGroup')
+
return creds
def get_client_creds(self,