1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from datetime import datetime, timezone
28 from collections import namedtuple
30 from ldb import SCOPE_BASE
31 from samba import generate_random_password
32 from samba.auth import system_session
33 from samba.credentials import (
39 from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
40 from samba.drs_utils import drs_Replicate, drsuapi_connect
41 from samba.dsdb import (
42 DSDB_SYNTAX_BINARY_DN,
43 DS_DOMAIN_FUNCTION_2000,
44 DS_DOMAIN_FUNCTION_2008,
45 DS_GUID_COMPUTERS_CONTAINER,
46 DS_GUID_DOMAIN_CONTROLLERS_CONTAINER,
47 DS_GUID_USERS_CONTAINER,
48 UF_WORKSTATION_TRUST_ACCOUNT,
49 UF_NO_AUTH_DATA_REQUIRED,
52 UF_PARTIAL_SECRETS_ACCOUNT,
53 UF_SERVER_TRUST_ACCOUNT,
54 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
56 from samba.join import DCJoinContext
57 from samba.ndr import ndr_pack, ndr_unpack
59 from samba.samdb import SamDB, dsdb_Dn
61 from samba.tests import delete_force
62 import samba.tests.krb5.kcrypto as kcrypto
63 from samba.tests.krb5.raw_testcase import (
68 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
69 from samba.tests.krb5.rfc4120_constants import (
72 AES256_CTS_HMAC_SHA1_96,
74 KDC_ERR_PREAUTH_REQUIRED,
80 KU_ENC_CHALLENGE_CLIENT,
85 PADATA_ENCRYPTED_CHALLENGE,
90 sys.path.insert(0, "bin/python")
91 os.environ["PYTHONUNBUFFERED"] = "1"
93 global_asn1_print = False
94 global_hexdump = False
97 class KDCBaseTest(RawKerberosTest):
98 """ Base class for KDC tests.
101 class AccountType(Enum):
115 cls._functional_level = None
117 # An identifier to ensure created accounts have unique names. Windows
118 # caches accounts based on usernames, so account names being different
119 # across test runs avoids previous test runs affecting the results.
120 cls.account_base = f'{secrets.token_hex(4)}_'
123 # A list containing DNs of accounts created as part of testing.
126 cls.account_cache = {}
131 cls.ldb_cleanups = []
134 def tearDownClass(cls):
135 # Clean up any accounts created by create_account. This is
136 # done in tearDownClass() rather than tearDown(), so that
137 # accounts need only be created once for permutation tests.
138 if cls._ldb is not None:
139 for cleanup in reversed(cls.ldb_cleanups):
141 cls._ldb.modify(cleanup)
145 for dn in reversed(cls.accounts):
146 delete_force(cls._ldb, dn)
148 if cls._rodc_ctx is not None:
149 cls._rodc_ctx.cleanup_old_join(force=True)
151 super().tearDownClass()
155 self.do_asn1_print = global_asn1_print
156 self.do_hexdump = global_hexdump
160 type(self)._lp = self.get_loadparm()
165 if self._ldb is None:
166 creds = self.get_admin_creds()
169 session = system_session()
170 type(self)._ldb = SamDB(url="ldap://%s" % self.dc_host,
171 session_info=session,
177 def get_rodc_samdb(self):
178 if self._rodc_ldb is None:
179 creds = self.get_admin_creds()
182 session = system_session()
183 type(self)._rodc_ldb = SamDB(url="ldap://%s" % self.host,
184 session_info=session,
189 return self._rodc_ldb
191 def get_server_dn(self, samdb):
192 server = samdb.get_serverName()
194 res = samdb.search(base=server,
195 scope=ldb.SCOPE_BASE,
196 attrs=['serverReference'])
197 dn = ldb.Dn(samdb, res[0]['serverReference'][0].decode('utf8'))
201 def get_mock_rodc_ctx(self):
202 if self._rodc_ctx is None:
203 admin_creds = self.get_admin_creds()
206 rodc_name = self.get_new_username()
207 site_name = 'Default-First-Site-Name'
209 rodc_ctx = DCJoinContext(server=self.dc_host,
213 netbios_name=rodc_name,
216 self.create_rodc(rodc_ctx)
218 type(self)._rodc_ctx = rodc_ctx
220 return self._rodc_ctx
222 def get_domain_functional_level(self, ldb):
223 if self._functional_level is None:
224 res = ldb.search(base='',
226 attrs=['domainFunctionality'])
228 functional_level = int(res[0]['domainFunctionality'][0])
230 functional_level = DS_DOMAIN_FUNCTION_2000
232 type(self)._functional_level = functional_level
234 return self._functional_level
236 def get_default_enctypes(self):
237 samdb = self.get_samdb()
238 functional_level = self.get_domain_functional_level(samdb)
240 # RC4 should always be supported
241 default_enctypes = {kcrypto.Enctype.RC4}
242 if functional_level >= DS_DOMAIN_FUNCTION_2008:
243 # AES is only supported at functional level 2008 or higher
244 default_enctypes.add(kcrypto.Enctype.AES256)
245 default_enctypes.add(kcrypto.Enctype.AES128)
247 return default_enctypes
249 def create_group(self, samdb, name, ou=None):
251 ou = samdb.get_wellknown_dn(samdb.get_default_basedn(),
252 DS_GUID_USERS_CONTAINER)
254 dn = f'CN={name},{ou}'
256 # Remove the group if it exists; this will happen if a previous test
258 delete_force(samdb, dn)
260 # Save the group name so it can be deleted in tearDownClass.
261 self.accounts.append(dn)
265 'objectClass': 'group'
271 def create_account(self, samdb, name, account_type=AccountType.USER,
272 spn=None, upn=None, additional_details=None,
273 ou=None, account_control=0, add_dollar=True,
274 expired_password=False):
275 '''Create an account for testing.
276 The dn of the created account is added to self.accounts,
277 which is used by tearDownClass to clean up the created accounts.
280 if account_type is self.AccountType.COMPUTER:
281 guid = DS_GUID_COMPUTERS_CONTAINER
282 elif account_type is self.AccountType.SERVER:
283 guid = DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
285 guid = DS_GUID_USERS_CONTAINER
287 ou = samdb.get_wellknown_dn(samdb.get_default_basedn(), guid)
289 dn = "CN=%s,%s" % (name, ou)
291 # remove the account if it exists, this will happen if a previous test
293 delete_force(samdb, dn)
295 if account_type is self.AccountType.USER:
296 object_class = "user"
297 account_control |= UF_NORMAL_ACCOUNT
299 object_class = "computer"
302 if account_type is self.AccountType.COMPUTER:
303 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
304 elif account_type is self.AccountType.SERVER:
305 account_control |= UF_SERVER_TRUST_ACCOUNT
309 password = generate_random_password(32, 32)
310 utf16pw = ('"%s"' % password).encode('utf-16-le')
314 "objectclass": object_class,
315 "sAMAccountName": account_name,
316 "userAccountControl": str(account_control),
317 "unicodePwd": utf16pw}
319 upn = upn.format(account=account_name)
321 if isinstance(spn, str):
322 spn = spn.format(account=account_name)
324 spn = tuple(s.format(account=account_name) for s in spn)
325 details["servicePrincipalName"] = spn
327 details["userPrincipalName"] = upn
329 details["pwdLastSet"] = "0"
330 if additional_details is not None:
331 details.update(additional_details)
332 # Save the account name so it can be deleted in tearDownClass
333 self.accounts.append(dn)
336 creds = KerberosCredentials()
337 creds.guess(self.get_lp())
338 creds.set_realm(samdb.domain_dns_name().upper())
339 creds.set_domain(samdb.domain_netbios_name().upper())
340 creds.set_password(password)
341 creds.set_username(account_name)
342 if account_type is self.AccountType.USER:
343 creds.set_workstation('')
345 creds.set_workstation(name)
346 creds.set_dn(ldb.Dn(samdb, dn))
350 self.creds_set_enctypes(creds)
352 res = samdb.search(base=dn,
353 scope=ldb.SCOPE_BASE,
354 attrs=['msDS-KeyVersionNumber'])
355 kvno = res[0].get('msDS-KeyVersionNumber', idx=0)
357 self.assertEqual(int(kvno), 1)
362 def get_security_descriptor(self, dn):
363 samdb = self.get_samdb()
365 sid = self.get_objectSid(samdb, dn)
367 owner_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
370 ace.access_mask = security.SEC_ADS_CONTROL_ACCESS
372 ace.trustee = security.dom_sid(sid)
374 dacl = security.acl()
375 dacl.revision = security.SECURITY_ACL_REVISION_ADS
379 security_desc = security.descriptor()
380 security_desc.type |= security.SEC_DESC_DACL_PRESENT
381 security_desc.owner_sid = owner_sid
382 security_desc.dacl = dacl
384 return ndr_pack(security_desc)
386 def create_rodc(self, ctx):
387 ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
388 ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
389 ctx.krbtgt_dn = f'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
391 ctx.never_reveal_sid = [f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
392 f'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
393 f'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
394 f'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
395 f'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
396 ctx.reveal_sid = f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
398 mysid = ctx.get_mysid()
399 admin_dn = f'<SID={mysid}>'
400 ctx.managedby = admin_dn
402 ctx.userAccountControl = (UF_WORKSTATION_TRUST_ACCOUNT |
403 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
404 UF_PARTIAL_SECRETS_ACCOUNT)
406 ctx.connection_dn = f'CN=RODC Connection (FRS),{ctx.ntds_dn}'
407 ctx.secure_channel_type = misc.SEC_CHAN_RODC
409 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
410 drsuapi.DRSUAPI_DRS_PER_SYNC |
411 drsuapi.DRSUAPI_DRS_GET_ANC |
412 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
413 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
414 ctx.domain_replica_flags = ctx.replica_flags | drsuapi.DRSUAPI_DRS_CRITICAL_ONLY
418 ctx.cleanup_old_join()
421 ctx.join_add_objects()
423 # cleanup the failed join (checking we still have a live LDB
424 # connection to the remote DC first)
425 ctx.refresh_ldb_connection()
426 ctx.cleanup_old_join()
429 def replicate_account_to_rodc(self, dn):
430 samdb = self.get_samdb()
431 rodc_samdb = self.get_rodc_samdb()
433 repl_val = f'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
436 msg.dn = ldb.Dn(rodc_samdb, '')
437 msg['replicateSingleObject'] = ldb.MessageElement(
439 ldb.FLAG_MOD_REPLACE,
440 'replicateSingleObject')
443 # Try replication using the replicateSingleObject rootDSE
445 rodc_samdb.modify(msg)
446 except ldb.LdbError as err:
447 enum, estr = err.args
448 self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
449 self.assertIn('rootdse_modify: unknown attribute to change!',
452 # If that method wasn't supported, we may be in the rodc:local test
453 # environment, where we can try replicating to the local database.
457 rodc_creds = Credentials()
459 rodc_creds.set_machine_account(lp)
461 local_samdb = SamDB(url=None, session_info=system_session(),
462 credentials=rodc_creds, lp=lp)
464 destination_dsa_guid = misc.GUID(local_samdb.get_ntds_GUID())
466 repl = drs_Replicate(f'ncacn_ip_tcp:{self.dc_host}[seal]',
468 local_samdb, destination_dsa_guid)
470 source_dsa_invocation_id = misc.GUID(samdb.invocation_id)
473 source_dsa_invocation_id,
474 destination_dsa_guid,
475 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
478 def reveal_account_to_mock_rodc(self, dn):
479 samdb = self.get_samdb()
480 rodc_ctx = self.get_mock_rodc_ctx()
485 destination_dsa_guid=rodc_ctx.ntds_guid,
486 source_dsa_invocation_id=misc.GUID(samdb.invocation_id))
488 def check_revealed(self, dn, rodc_dn, revealed=True):
489 samdb = self.get_samdb()
491 res = samdb.search(base=rodc_dn,
492 scope=ldb.SCOPE_BASE,
493 attrs=['msDS-RevealedUsers'])
495 revealed_users = res[0].get('msDS-RevealedUsers')
496 if revealed_users is None:
497 self.assertFalse(revealed)
500 revealed_dns = set(str(dsdb_Dn(samdb, str(user),
501 syntax_oid=DSDB_SYNTAX_BINARY_DN).dn)
502 for user in revealed_users)
505 self.assertIn(str(dn), revealed_dns)
507 self.assertNotIn(str(dn), revealed_dns)
509 def get_secrets(self, samdb, dn,
510 destination_dsa_guid,
511 source_dsa_invocation_id):
512 admin_creds = self.get_admin_creds()
514 dns_hostname = samdb.host_dns_name()
515 (bind, handle, _) = drsuapi_connect(dns_hostname,
519 req = drsuapi.DsGetNCChangesRequest8()
521 req.destination_dsa_guid = destination_dsa_guid
522 req.source_dsa_invocation_id = source_dsa_invocation_id
524 naming_context = drsuapi.DsReplicaObjectIdentifier()
525 naming_context.dn = dn
527 req.naming_context = naming_context
529 hwm = drsuapi.DsReplicaHighWaterMark()
530 hwm.tmp_highest_usn = 0
534 req.highwatermark = hwm
535 req.uptodateness_vector = None
537 req.replica_flags = 0
539 req.max_object_count = 1
540 req.max_ndr_size = 402116
541 req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
543 attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
544 drsuapi.DRSUAPI_ATTID_unicodePwd,
545 drsuapi.DRSUAPI_ATTID_ntPwdHistory]
547 partial_attribute_set = drsuapi.DsPartialAttributeSet()
548 partial_attribute_set.version = 1
549 partial_attribute_set.attids = attids
550 partial_attribute_set.num_attids = len(attids)
552 req.partial_attribute_set = partial_attribute_set
554 req.partial_attribute_set_ex = None
555 req.mapping_ctr.num_mappings = 0
556 req.mapping_ctr.mappings = None
558 _, ctr = bind.DsGetNCChanges(handle, 8, req)
560 self.assertEqual(1, ctr.object_count)
562 identifier = ctr.first_object.object.identifier
563 attributes = ctr.first_object.object.attribute_ctr.attributes
565 self.assertEqual(dn, identifier.dn)
567 return bind, identifier, attributes
569 def get_keys(self, samdb, dn, expected_etypes=None):
570 admin_creds = self.get_admin_creds()
572 bind, identifier, attributes = self.get_secrets(
575 destination_dsa_guid=misc.GUID(samdb.get_ntds_GUID()),
576 source_dsa_invocation_id=misc.GUID())
578 rid = identifier.sid.split()[1]
580 net_ctx = net.Net(admin_creds)
584 for attr in attributes:
585 if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
586 net_ctx.replicate_decrypt(bind, attr, rid)
587 attr_val = attr.value_ctr.values[0].blob
589 spl = ndr_unpack(drsblobs.supplementalCredentialsBlob,
591 for pkg in spl.sub.packages:
592 if pkg.name == 'Primary:Kerberos-Newer-Keys':
593 krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
594 krb5_new_keys = ndr_unpack(
595 drsblobs.package_PrimaryKerberosBlob,
597 for key in krb5_new_keys.ctr.keys:
598 keytype = key.keytype
599 if keytype in (kcrypto.Enctype.AES256,
600 kcrypto.Enctype.AES128):
601 keys[keytype] = key.value.hex()
602 elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
603 net_ctx.replicate_decrypt(bind, attr, rid)
604 if attr.value_ctr.num_values > 0:
605 pwd = attr.value_ctr.values[0].blob
606 keys[kcrypto.Enctype.RC4] = pwd.hex()
608 if expected_etypes is None:
609 expected_etypes = self.get_default_enctypes()
611 self.assertCountEqual(expected_etypes, keys)
615 def creds_set_keys(self, creds, keys):
617 for enctype, key in keys.items():
618 creds.set_forced_key(enctype, key)
620 def creds_set_enctypes(self, creds):
621 samdb = self.get_samdb()
623 res = samdb.search(creds.get_dn(),
624 scope=ldb.SCOPE_BASE,
625 attrs=['msDS-SupportedEncryptionTypes'])
626 supported_enctypes = res[0].get('msDS-SupportedEncryptionTypes', idx=0)
628 if supported_enctypes is None:
629 supported_enctypes = 0
631 creds.set_as_supported_enctypes(supported_enctypes)
632 creds.set_tgs_supported_enctypes(supported_enctypes)
633 creds.set_ap_supported_enctypes(supported_enctypes)
635 def creds_set_default_enctypes(self, creds,
637 claims_support=False,
638 compound_id_support=False):
639 default_enctypes = self.get_default_enctypes()
640 supported_enctypes = KerberosCredentials.etypes_to_bits(
644 supported_enctypes |= security.KERB_ENCTYPE_FAST_SUPPORTED
646 supported_enctypes |= security.KERB_ENCTYPE_CLAIMS_SUPPORTED
647 if compound_id_support:
648 supported_enctypes |= (
649 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED)
651 creds.set_as_supported_enctypes(supported_enctypes)
652 creds.set_tgs_supported_enctypes(supported_enctypes)
653 creds.set_ap_supported_enctypes(supported_enctypes)
655 def add_to_group(self, account_dn, group_dn, group_attr, expect_attr=True):
656 samdb = self.get_samdb()
659 res = samdb.search(base=group_dn,
660 scope=ldb.SCOPE_BASE,
662 except ldb.LdbError as err:
664 if num != ldb.ERR_NO_SUCH_OBJECT:
670 members = orig_msg.get(group_attr)
672 self.assertIsNotNone(members)
673 elif members is None:
676 members = list(members)
677 members.append(account_dn)
681 msg[group_attr] = ldb.MessageElement(members,
682 ldb.FLAG_MOD_REPLACE,
685 cleanup = samdb.msg_diff(msg, orig_msg)
686 self.ldb_cleanups.append(cleanup)
691 def remove_from_group(self, account_dn, group_dn):
692 samdb = self.get_samdb()
694 res = samdb.search(base=group_dn,
695 scope=ldb.SCOPE_BASE,
698 self.assertIn('member', orig_msg)
699 members = list(orig_msg['member'])
701 account_dn = str(account_dn).encode('utf-8')
702 self.assertIn(account_dn, members)
703 members.remove(account_dn)
707 msg['member'] = ldb.MessageElement(members,
708 ldb.FLAG_MOD_REPLACE,
711 cleanup = samdb.msg_diff(msg, orig_msg)
712 self.ldb_cleanups.append(cleanup)
717 def get_cached_creds(self, *,
730 'additional_details': None,
731 'allowed_replication': False,
732 'allowed_replication_mock': False,
733 'denied_replication': False,
734 'denied_replication_mock': False,
735 'revealed_to_rodc': False,
736 'revealed_to_mock_rodc': False,
737 'no_auth_data_required': False,
738 'expired_password': False,
739 'supported_enctypes': None,
740 'not_delegated': False,
741 'delegation_to_spn': None,
742 'delegation_from_dn': None,
743 'trusted_to_auth_for_delegation': False,
744 'fast_support': False,
746 'kerberos_enabled': True,
747 'secure_channel_type': None,
752 'account_type': account_type,
757 cache_key = tuple(sorted(account_opts.items()))
760 creds = self.account_cache.get(cache_key)
761 if creds is not None:
764 creds = self.create_account_opts(**account_opts)
766 self.account_cache[cache_key] = creds
770 def create_account_opts(self, *,
779 allowed_replication_mock,
781 denied_replication_mock,
783 revealed_to_mock_rodc,
784 no_auth_data_required,
790 trusted_to_auth_for_delegation,
796 if account_type is self.AccountType.USER:
797 self.assertIsNone(spn)
798 self.assertIsNone(delegation_to_spn)
799 self.assertIsNone(delegation_from_dn)
800 self.assertFalse(trusted_to_auth_for_delegation)
802 self.assertFalse(not_delegated)
804 samdb = self.get_samdb()
806 user_name = self.get_new_username()
807 if name_prefix is not None:
808 user_name = name_prefix + user_name
809 if name_suffix is not None:
810 user_name += name_suffix
812 user_account_control = 0
813 if trusted_to_auth_for_delegation:
814 user_account_control |= UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
816 user_account_control |= UF_NOT_DELEGATED
817 if no_auth_data_required:
818 user_account_control |= UF_NO_AUTH_DATA_REQUIRED
820 if additional_details:
821 details = {k: v for k, v in additional_details}
825 enctypes = supported_enctypes
827 enctypes = enctypes or 0
828 enctypes |= KerberosCredentials.fast_supported_bits
830 if enctypes is not None:
831 details['msDS-SupportedEncryptionTypes'] = str(enctypes)
833 if delegation_to_spn:
834 details['msDS-AllowedToDelegateTo'] = delegation_to_spn
836 if delegation_from_dn:
837 security_descriptor = self.get_security_descriptor(
839 details['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
842 if spn is None and account_type is not self.AccountType.USER:
843 spn = 'host/' + user_name
845 creds, dn = self.create_account(samdb, user_name,
846 account_type=account_type,
849 additional_details=details,
850 account_control=user_account_control,
851 add_dollar=add_dollar,
852 expired_password=expired_password)
854 keys = self.get_keys(samdb, dn)
855 self.creds_set_keys(creds, keys)
857 # Handle secret replication to the RODC.
859 if allowed_replication or revealed_to_rodc:
860 rodc_samdb = self.get_rodc_samdb()
861 rodc_dn = self.get_server_dn(rodc_samdb)
863 # Allow replicating this account's secrets if requested, or allow
864 # it only temporarily if we're about to replicate them.
865 allowed_cleanup = self.add_to_group(
867 'msDS-RevealOnDemandGroup')
870 # Replicate this account's secrets to the RODC.
871 self.replicate_account_to_rodc(dn)
873 if not allowed_replication:
874 # If we don't want replicating secrets to be allowed for this
875 # account, disable it again.
876 samdb.modify(allowed_cleanup)
878 self.check_revealed(dn,
880 revealed=revealed_to_rodc)
882 if denied_replication:
883 rodc_samdb = self.get_rodc_samdb()
884 rodc_dn = self.get_server_dn(rodc_samdb)
886 # Deny replicating this account's secrets to the RODC.
887 self.add_to_group(dn, rodc_dn, 'msDS-NeverRevealGroup')
889 # Handle secret replication to the mock RODC.
891 if allowed_replication_mock or revealed_to_mock_rodc:
892 # Allow replicating this account's secrets if requested, or allow
893 # it only temporarily if we want to add the account to the mock
894 # RODC's msDS-RevealedUsers.
895 rodc_ctx = self.get_mock_rodc_ctx()
896 mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
898 allowed_mock_cleanup = self.add_to_group(
900 'msDS-RevealOnDemandGroup')
902 if revealed_to_mock_rodc:
903 # Request replicating this account's secrets to the mock RODC,
904 # which updates msDS-RevealedUsers.
905 self.reveal_account_to_mock_rodc(dn)
907 if not allowed_replication_mock:
908 # If we don't want replicating secrets to be allowed for this
909 # account, disable it again.
910 samdb.modify(allowed_mock_cleanup)
912 self.check_revealed(dn,
914 revealed=revealed_to_mock_rodc)
916 if denied_replication_mock:
917 # Deny replicating this account's secrets to the mock RODC.
918 rodc_ctx = self.get_mock_rodc_ctx()
919 mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
921 self.add_to_group(dn, mock_rodc_dn, 'msDS-NeverRevealGroup')
923 if member_of is not None:
924 for group_dn in member_of:
925 self.add_to_group(dn, ldb.Dn(samdb, group_dn), 'member',
929 creds.set_kerberos_state(MUST_USE_KERBEROS)
931 creds.set_kerberos_state(DONT_USE_KERBEROS)
933 if secure_channel_type is not None:
934 creds.set_secure_channel_type(secure_channel_type)
938 def get_new_username(self):
939 user_name = self.account_base + str(self.account_id)
940 type(self).account_id += 1
944 def get_client_creds(self,
945 allow_missing_password=False,
946 allow_missing_keys=True):
947 def create_client_account():
948 return self.get_cached_creds(account_type=self.AccountType.USER)
950 c = self._get_krb5_creds(prefix='CLIENT',
951 allow_missing_password=allow_missing_password,
952 allow_missing_keys=allow_missing_keys,
953 fallback_creds_fn=create_client_account)
956 def get_mach_creds(self,
957 allow_missing_password=False,
958 allow_missing_keys=True):
959 def create_mach_account():
960 return self.get_cached_creds(account_type=self.AccountType.COMPUTER,
961 opts={'fast_support': True})
963 c = self._get_krb5_creds(prefix='MAC',
964 allow_missing_password=allow_missing_password,
965 allow_missing_keys=allow_missing_keys,
966 fallback_creds_fn=create_mach_account)
969 def get_service_creds(self,
970 allow_missing_password=False,
971 allow_missing_keys=True):
972 def create_service_account():
973 return self.get_cached_creds(
974 account_type=self.AccountType.COMPUTER,
976 'trusted_to_auth_for_delegation': True,
980 c = self._get_krb5_creds(prefix='SERVICE',
981 allow_missing_password=allow_missing_password,
982 allow_missing_keys=allow_missing_keys,
983 fallback_creds_fn=create_service_account)
986 def get_rodc_krbtgt_creds(self,
988 require_strongest_key=False):
989 if require_strongest_key:
990 self.assertTrue(require_keys)
992 def download_rodc_krbtgt_creds():
993 samdb = self.get_samdb()
994 rodc_samdb = self.get_rodc_samdb()
996 rodc_dn = self.get_server_dn(rodc_samdb)
998 res = samdb.search(rodc_dn,
999 scope=ldb.SCOPE_BASE,
1000 attrs=['msDS-KrbTgtLink'])
1001 krbtgt_dn = res[0]['msDS-KrbTgtLink'][0]
1003 res = samdb.search(krbtgt_dn,
1004 scope=ldb.SCOPE_BASE,
1005 attrs=['sAMAccountName',
1006 'msDS-KeyVersionNumber',
1007 'msDS-SecondaryKrbTgtNumber'])
1008 krbtgt_dn = res[0].dn
1009 username = str(res[0]['sAMAccountName'])
1011 creds = KerberosCredentials()
1012 creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
1013 creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
1014 creds.set_username(username)
1016 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1017 krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
1019 rodc_kvno = krbtgt_number << 16 | kvno
1020 creds.set_kvno(rodc_kvno)
1021 creds.set_dn(krbtgt_dn)
1023 keys = self.get_keys(samdb, krbtgt_dn)
1024 self.creds_set_keys(creds, keys)
1026 # The RODC krbtgt account should support the default enctypes,
1027 # although it might not have the msDS-SupportedEncryptionTypes
1029 self.creds_set_default_enctypes(
1031 fast_support=self.kdc_fast_support,
1032 claims_support=self.kdc_claims_support,
1033 compound_id_support=self.kdc_compound_id_support)
1037 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
1038 allow_missing_password=True,
1039 allow_missing_keys=not require_keys,
1040 require_strongest_key=require_strongest_key,
1041 fallback_creds_fn=download_rodc_krbtgt_creds)
1044 def get_mock_rodc_krbtgt_creds(self,
1046 require_strongest_key=False):
1047 if require_strongest_key:
1048 self.assertTrue(require_keys)
1050 def create_rodc_krbtgt_account():
1051 samdb = self.get_samdb()
1053 rodc_ctx = self.get_mock_rodc_ctx()
1055 krbtgt_dn = rodc_ctx.new_krbtgt_dn
1057 res = samdb.search(base=ldb.Dn(samdb, krbtgt_dn),
1058 scope=ldb.SCOPE_BASE,
1059 attrs=['msDS-KeyVersionNumber',
1060 'msDS-SecondaryKrbTgtNumber'])
1062 username = str(rodc_ctx.krbtgt_name)
1064 creds = KerberosCredentials()
1065 creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
1066 creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
1067 creds.set_username(username)
1069 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1070 krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
1072 rodc_kvno = krbtgt_number << 16 | kvno
1073 creds.set_kvno(rodc_kvno)
1076 keys = self.get_keys(samdb, dn)
1077 self.creds_set_keys(creds, keys)
1079 self.creds_set_enctypes(creds)
1083 c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
1084 allow_missing_password=True,
1085 allow_missing_keys=not require_keys,
1086 require_strongest_key=require_strongest_key,
1087 fallback_creds_fn=create_rodc_krbtgt_account)
1090 def get_krbtgt_creds(self,
1092 require_strongest_key=False):
1093 if require_strongest_key:
1094 self.assertTrue(require_keys)
1096 def download_krbtgt_creds():
1097 samdb = self.get_samdb()
1099 krbtgt_rid = security.DOMAIN_RID_KRBTGT
1100 krbtgt_sid = '%s-%d' % (samdb.get_domain_sid(), krbtgt_rid)
1102 res = samdb.search(base='<SID=%s>' % krbtgt_sid,
1103 scope=ldb.SCOPE_BASE,
1104 attrs=['sAMAccountName',
1105 'msDS-KeyVersionNumber'])
1107 username = str(res[0]['sAMAccountName'])
1109 creds = KerberosCredentials()
1110 creds.set_domain(self.env_get_var('DOMAIN', 'KRBTGT'))
1111 creds.set_realm(self.env_get_var('REALM', 'KRBTGT'))
1112 creds.set_username(username)
1114 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1115 creds.set_kvno(kvno)
1118 keys = self.get_keys(samdb, dn)
1119 self.creds_set_keys(creds, keys)
1121 # The krbtgt account should support the default enctypes, although
1122 # it might not (on Samba) have the msDS-SupportedEncryptionTypes
1124 self.creds_set_default_enctypes(
1126 fast_support=self.kdc_fast_support,
1127 claims_support=self.kdc_claims_support,
1128 compound_id_support=self.kdc_compound_id_support)
1132 c = self._get_krb5_creds(prefix='KRBTGT',
1133 default_username='krbtgt',
1134 allow_missing_password=True,
1135 allow_missing_keys=not require_keys,
1136 require_strongest_key=require_strongest_key,
1137 fallback_creds_fn=download_krbtgt_creds)
1140 def get_dc_creds(self,
1142 require_strongest_key=False):
1143 if require_strongest_key:
1144 self.assertTrue(require_keys)
1146 def download_dc_creds():
1147 samdb = self.get_samdb()
1150 dc_sid = '%s-%d' % (samdb.get_domain_sid(), dc_rid)
1152 res = samdb.search(base='<SID=%s>' % dc_sid,
1153 scope=ldb.SCOPE_BASE,
1154 attrs=['sAMAccountName',
1155 'msDS-KeyVersionNumber'])
1157 username = str(res[0]['sAMAccountName'])
1159 creds = KerberosCredentials()
1160 creds.set_domain(self.env_get_var('DOMAIN', 'DC'))
1161 creds.set_realm(self.env_get_var('REALM', 'DC'))
1162 creds.set_username(username)
1164 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1165 creds.set_kvno(kvno)
1166 creds.set_workstation(username[:-1])
1169 keys = self.get_keys(samdb, dn)
1170 self.creds_set_keys(creds, keys)
1172 self.creds_set_enctypes(creds)
1176 c = self._get_krb5_creds(prefix='DC',
1177 allow_missing_password=True,
1178 allow_missing_keys=not require_keys,
1179 require_strongest_key=require_strongest_key,
1180 fallback_creds_fn=download_dc_creds)
1183 def get_server_creds(self,
1185 require_strongest_key=False):
1186 if require_strongest_key:
1187 self.assertTrue(require_keys)
1189 def download_server_creds():
1190 samdb = self.get_samdb()
1192 res = samdb.search(base=samdb.get_default_basedn(),
1193 expression=(f'(|(sAMAccountName={self.host}*)'
1194 f'(dNSHostName={self.host}))'),
1195 scope=ldb.SCOPE_SUBTREE,
1196 attrs=['sAMAccountName',
1197 'msDS-KeyVersionNumber'])
1198 self.assertEqual(1, len(res))
1200 username = str(res[0]['sAMAccountName'])
1202 creds = KerberosCredentials()
1203 creds.set_domain(self.env_get_var('DOMAIN', 'SERVER'))
1204 creds.set_realm(self.env_get_var('REALM', 'SERVER'))
1205 creds.set_username(username)
1207 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1208 creds.set_kvno(kvno)
1211 keys = self.get_keys(samdb, dn)
1212 self.creds_set_keys(creds, keys)
1214 self.creds_set_enctypes(creds)
1218 c = self._get_krb5_creds(prefix='SERVER',
1219 allow_missing_password=True,
1220 allow_missing_keys=not require_keys,
1221 require_strongest_key=require_strongest_key,
1222 fallback_creds_fn=download_server_creds)
1225 def as_req(self, cname, sname, realm, etypes, padata=None, kdc_options=0):
1226 '''Send a Kerberos AS_REQ, returns the undecoded response
1229 till = self.get_KerberosTime(offset=36000)
1231 req = self.AS_REQ_create(padata=padata,
1232 kdc_options=str(kdc_options),
1242 additional_tickets=None)
1243 rep = self.send_recv_transaction(req)
1246 def get_as_rep_key(self, creds, rep):
1247 '''Extract the session key from an AS-REP
1249 rep_padata = self.der_decode(
1251 asn1Spec=krb5_asn1.METHOD_DATA())
1253 for pa in rep_padata:
1254 if pa['padata-type'] == PADATA_ETYPE_INFO2:
1255 padata_value = pa['padata-value']
1258 etype_info2 = self.der_decode(
1259 padata_value, asn1Spec=krb5_asn1.ETYPE_INFO2())
1261 key = self.PasswordKey_from_etype_info2(creds, etype_info2[0],
1265 def get_enc_timestamp_pa_data(self, creds, rep, skew=0):
1266 '''generate the pa_data data element for an AS-REQ
1269 key = self.get_as_rep_key(creds, rep)
1271 return self.get_enc_timestamp_pa_data_from_key(key, skew=skew)
1273 def get_enc_timestamp_pa_data_from_key(self, key, skew=0):
1274 (patime, pausec) = self.get_KerberosTimeWithUsec(offset=skew)
1275 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
1276 padata = self.der_encode(padata, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
1278 padata = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, padata)
1279 padata = self.der_encode(padata, asn1Spec=krb5_asn1.EncryptedData())
1281 padata = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, padata)
1285 def get_challenge_pa_data(self, client_challenge_key, skew=0):
1286 patime, pausec = self.get_KerberosTimeWithUsec(offset=skew)
1287 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
1288 padata = self.der_encode(padata,
1289 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
1291 padata = self.EncryptedData_create(client_challenge_key,
1292 KU_ENC_CHALLENGE_CLIENT,
1294 padata = self.der_encode(padata,
1295 asn1Spec=krb5_asn1.EncryptedData())
1297 padata = self.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE,
1302 def get_as_rep_enc_data(self, key, rep):
1303 ''' Decrypt and Decode the encrypted data in an AS-REP
1305 enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
1306 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1307 # application tag 26
1309 enc_part = self.der_decode(
1310 enc_part, asn1Spec=krb5_asn1.EncASRepPart())
1312 enc_part = self.der_decode(
1313 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
1317 def check_pre_authentication(self, rep):
1318 """ Check that the kdc response was pre-authentication required
1320 self.check_error_rep(rep, KDC_ERR_PREAUTH_REQUIRED)
1322 def check_as_reply(self, rep):
1323 """ Check that the kdc response is an AS-REP and that the
1329 match the expected values
1331 self.check_reply(rep, msg_type=KRB_AS_REP)
1333 def check_tgs_reply(self, rep):
1334 """ Check that the kdc response is an TGS-REP and that the
1340 match the expected values
1342 self.check_reply(rep, msg_type=KRB_TGS_REP)
1344 def check_reply(self, rep, msg_type):
1346 # Should have a reply, and it should an TGS-REP message.
1347 self.assertIsNotNone(rep)
1348 self.assertEqual(rep['msg-type'], msg_type, "rep = {%s}" % rep)
1350 # Protocol version number should be 5
1351 pvno = int(rep['pvno'])
1352 self.assertEqual(5, pvno, "rep = {%s}" % rep)
1354 # The ticket version number should be 5
1355 tkt_vno = int(rep['ticket']['tkt-vno'])
1356 self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
1358 # Check that the kvno is not an RODC kvno
1359 # MIT kerberos does not provide the kvno, so we treat it as optional.
1360 # This is tested in compatability_test.py
1361 if 'kvno' in rep['enc-part']:
1362 kvno = int(rep['enc-part']['kvno'])
1363 # If the high order bits are set this is an RODC kvno.
1364 self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
1366 def check_error_rep(self, rep, expected):
1367 """ Check that the reply is an error message, with the expected
1368 error-code specified.
1370 self.assertIsNotNone(rep)
1371 self.assertEqual(rep['msg-type'], KRB_ERROR, "rep = {%s}" % rep)
1372 if isinstance(expected, collections.abc.Container):
1373 self.assertIn(rep['error-code'], expected, "rep = {%s}" % rep)
1375 self.assertEqual(rep['error-code'], expected, "rep = {%s}" % rep)
1377 def tgs_req(self, cname, sname, realm, ticket, key, etypes,
1378 expected_error_mode=0, padata=None, kdc_options=0,
1379 to_rodc=False, service_creds=None, expect_pac=True,
1380 expect_edata=None, expected_flags=None, unexpected_flags=None):
1381 '''Send a TGS-REQ, returns the response and the decrypted and
1385 subkey = self.RandomKey(key.etype)
1387 (ctime, cusec) = self.get_KerberosTimeWithUsec()
1389 tgt = KerberosTicketCreds(ticket,
1394 if service_creds is not None:
1395 decryption_key = self.TicketDecryptionKey_from_creds(
1397 expected_supported_etypes = service_creds.tgs_supported_enctypes
1399 decryption_key = None
1400 expected_supported_etypes = None
1402 if not expected_error_mode:
1403 check_error_fn = None
1404 check_rep_fn = self.generic_check_kdc_rep
1406 check_error_fn = self.generic_check_kdc_error
1409 def generate_padata(_kdc_exchange_dict,
1413 return padata, req_body
1415 kdc_exchange_dict = self.tgs_exchange_dict(
1416 expected_crealm=realm,
1417 expected_cname=cname,
1418 expected_srealm=realm,
1419 expected_sname=sname,
1420 expected_error_mode=expected_error_mode,
1421 expected_flags=expected_flags,
1422 unexpected_flags=unexpected_flags,
1423 expected_supported_etypes=expected_supported_etypes,
1424 check_error_fn=check_error_fn,
1425 check_rep_fn=check_rep_fn,
1426 check_kdc_private_fn=self.generic_check_kdc_private,
1427 ticket_decryption_key=decryption_key,
1428 generate_padata_fn=generate_padata if padata is not None else None,
1430 authenticator_subkey=subkey,
1431 kdc_options=str(kdc_options),
1432 expect_edata=expect_edata,
1433 expect_pac=expect_pac,
1436 rep = self._generic_kdc_exchange(kdc_exchange_dict,
1442 if expected_error_mode:
1445 ticket_creds = kdc_exchange_dict['rep_ticket_creds']
1446 enc_part = ticket_creds.encpart_private
1448 return rep, enc_part
1450 def get_service_ticket(self, tgt, target_creds, service='host',
1451 target_name=None, till=None, rc4_support=True,
1452 to_rodc=False, kdc_options=None,
1453 expected_flags=None, unexpected_flags=None,
1454 pac_request=True, expect_pac=True, fresh=False):
1455 user_name = tgt.cname['name-string'][0]
1456 ticket_sname = tgt.sname
1457 if target_name is None:
1458 target_name = target_creds.get_username()[:-1]
1459 cache_key = (user_name, target_name, service, to_rodc, kdc_options,
1460 pac_request, str(expected_flags), str(unexpected_flags),
1466 ticket = self.tkt_cache.get(cache_key)
1468 if ticket is not None:
1471 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1473 if kdc_options is None:
1475 kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
1477 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1478 names=[service, target_name])
1479 srealm = target_creds.get_realm()
1481 authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
1483 decryption_key = self.TicketDecryptionKey_from_creds(target_creds)
1485 kdc_exchange_dict = self.tgs_exchange_dict(
1486 expected_crealm=tgt.crealm,
1487 expected_cname=tgt.cname,
1488 expected_srealm=srealm,
1489 expected_sname=sname,
1490 expected_supported_etypes=target_creds.tgs_supported_enctypes,
1491 expected_flags=expected_flags,
1492 unexpected_flags=unexpected_flags,
1493 ticket_decryption_key=decryption_key,
1494 check_rep_fn=self.generic_check_kdc_rep,
1495 check_kdc_private_fn=self.generic_check_kdc_private,
1497 authenticator_subkey=authenticator_subkey,
1498 kdc_options=kdc_options,
1499 pac_request=pac_request,
1500 expect_pac=expect_pac,
1501 rc4_support=rc4_support,
1504 rep = self._generic_kdc_exchange(kdc_exchange_dict,
1510 self.check_tgs_reply(rep)
1512 service_ticket_creds = kdc_exchange_dict['rep_ticket_creds']
1515 krbtgt_creds = self.get_rodc_krbtgt_creds()
1517 krbtgt_creds = self.get_krbtgt_creds()
1518 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1519 self.verify_ticket(service_ticket_creds, krbtgt_key,
1520 service_ticket=True, expect_pac=expect_pac,
1521 expect_ticket_checksum=self.tkt_sig_support)
1523 self.tkt_cache[cache_key] = service_ticket_creds
1525 return service_ticket_creds
1527 def get_tgt(self, creds, to_rodc=False, kdc_options=None,
1528 client_account=None, client_name_type=NT_PRINCIPAL,
1529 expected_flags=None, unexpected_flags=None,
1530 expected_account_name=None, expected_upn_name=None,
1531 expected_cname=None,
1533 sname=None, realm=None,
1534 pac_request=True, expect_pac=True,
1535 expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
1536 expect_requester_sid=None,
1539 if client_account is not None:
1540 user_name = client_account
1542 user_name = creds.get_username()
1544 cache_key = (user_name, to_rodc, kdc_options, pac_request,
1546 str(expected_flags), str(unexpected_flags),
1547 expected_account_name, expected_upn_name, expected_sid,
1548 str(sname), str(realm),
1549 str(expected_cname),
1551 expect_pac, expect_pac_attrs,
1552 expect_pac_attrs_pac_request, expect_requester_sid)
1555 tgt = self.tkt_cache.get(cache_key)
1561 realm = creds.get_realm()
1563 salt = creds.get_salt()
1565 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1566 cname = self.PrincipalName_create(name_type=client_name_type,
1567 names=user_name.split('/'))
1569 sname = self.PrincipalName_create(name_type=NT_SRV_INST,
1570 names=['krbtgt', realm])
1571 expected_sname = self.PrincipalName_create(
1572 name_type=NT_SRV_INST, names=['krbtgt', realm.upper()])
1574 expected_sname = sname
1576 if expected_cname is None:
1577 expected_cname = cname
1579 till = self.get_KerberosTime(offset=36000)
1582 krbtgt_creds = self.get_rodc_krbtgt_creds()
1584 krbtgt_creds = self.get_krbtgt_creds()
1585 ticket_decryption_key = (
1586 self.TicketDecryptionKey_from_creds(krbtgt_creds))
1588 expected_etypes = krbtgt_creds.tgs_supported_enctypes
1590 if kdc_options is None:
1591 kdc_options = ('forwardable,'
1595 kdc_options = krb5_asn1.KDCOptions(kdc_options)
1597 pac_options = '1' # supports claims
1599 rep, kdc_exchange_dict = self._test_as_exchange(
1604 client_as_etypes=etype,
1605 expected_error_mode=KDC_ERR_PREAUTH_REQUIRED,
1606 expected_crealm=realm,
1607 expected_cname=expected_cname,
1608 expected_srealm=realm,
1609 expected_sname=sname,
1610 expected_account_name=expected_account_name,
1611 expected_upn_name=expected_upn_name,
1612 expected_sid=expected_sid,
1614 expected_flags=expected_flags,
1615 unexpected_flags=unexpected_flags,
1616 expected_supported_etypes=expected_etypes,
1619 kdc_options=kdc_options,
1621 ticket_decryption_key=ticket_decryption_key,
1622 pac_request=pac_request,
1623 pac_options=pac_options,
1624 expect_pac=expect_pac,
1625 expect_pac_attrs=expect_pac_attrs,
1626 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
1627 expect_requester_sid=expect_requester_sid,
1628 rc4_support=rc4_support,
1630 self.check_pre_authentication(rep)
1632 etype_info2 = kdc_exchange_dict['preauth_etype_info2']
1634 preauth_key = self.PasswordKey_from_etype_info2(creds,
1638 ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
1640 padata = [ts_enc_padata]
1642 expected_realm = realm.upper()
1644 rep, kdc_exchange_dict = self._test_as_exchange(
1649 client_as_etypes=etype,
1650 expected_error_mode=0,
1651 expected_crealm=expected_realm,
1652 expected_cname=expected_cname,
1653 expected_srealm=expected_realm,
1654 expected_sname=expected_sname,
1655 expected_account_name=expected_account_name,
1656 expected_upn_name=expected_upn_name,
1657 expected_sid=expected_sid,
1659 expected_flags=expected_flags,
1660 unexpected_flags=unexpected_flags,
1661 expected_supported_etypes=expected_etypes,
1664 kdc_options=kdc_options,
1665 preauth_key=preauth_key,
1666 ticket_decryption_key=ticket_decryption_key,
1667 pac_request=pac_request,
1668 pac_options=pac_options,
1669 expect_pac=expect_pac,
1670 expect_pac_attrs=expect_pac_attrs,
1671 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
1672 expect_requester_sid=expect_requester_sid,
1673 rc4_support=rc4_support,
1675 self.check_as_reply(rep)
1677 ticket_creds = kdc_exchange_dict['rep_ticket_creds']
1679 self.tkt_cache[cache_key] = ticket_creds
1683 def _make_tgs_request(self, client_creds, service_creds, tgt,
1684 client_account=None,
1685 client_name_type=NT_PRINCIPAL,
1687 pac_request=None, expect_pac=True,
1689 expected_cname=None,
1690 expected_account_name=None,
1691 expected_upn_name=None,
1693 if client_account is None:
1694 client_account = client_creds.get_username()
1695 cname = self.PrincipalName_create(name_type=client_name_type,
1696 names=client_account.split('/'))
1698 service_account = service_creds.get_username()
1699 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1700 names=[service_account])
1702 realm = service_creds.get_realm()
1704 expected_crealm = realm
1705 if expected_cname is None:
1706 expected_cname = cname
1707 expected_srealm = realm
1708 expected_sname = sname
1710 expected_supported_etypes = service_creds.tgs_supported_enctypes
1712 etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1714 if kdc_options is None:
1715 kdc_options = 'canonicalize'
1716 kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
1718 target_decryption_key = self.TicketDecryptionKey_from_creds(
1721 authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
1724 expected_error_mode = KDC_ERR_TGT_REVOKED
1725 check_error_fn = self.generic_check_kdc_error
1728 expected_error_mode = 0
1729 check_error_fn = None
1730 check_rep_fn = self.generic_check_kdc_rep
1732 kdc_exchange_dict = self.tgs_exchange_dict(
1733 expected_crealm=expected_crealm,
1734 expected_cname=expected_cname,
1735 expected_srealm=expected_srealm,
1736 expected_sname=expected_sname,
1737 expected_account_name=expected_account_name,
1738 expected_upn_name=expected_upn_name,
1739 expected_sid=expected_sid,
1740 expected_supported_etypes=expected_supported_etypes,
1741 ticket_decryption_key=target_decryption_key,
1742 check_error_fn=check_error_fn,
1743 check_rep_fn=check_rep_fn,
1744 check_kdc_private_fn=self.generic_check_kdc_private,
1745 expected_error_mode=expected_error_mode,
1747 authenticator_subkey=authenticator_subkey,
1748 kdc_options=kdc_options,
1749 pac_request=pac_request,
1750 expect_pac=expect_pac,
1753 rep = self._generic_kdc_exchange(kdc_exchange_dict,
1759 self.check_error_rep(rep, expected_error_mode)
1763 self.check_reply(rep, KRB_TGS_REP)
1765 return kdc_exchange_dict['rep_ticket_creds']
1767 # Named tuple to contain values of interest when the PAC is decoded.
1768 PacData = namedtuple(
1770 "account_name account_sid logon_name upn domain_name")
1772 def get_pac_data(self, authorization_data):
1773 '''Decode the PAC element contained in the authorization-data element
1781 # The PAC data will be wrapped in an AD_IF_RELEVANT element
1782 ad_if_relevant_elements = (
1783 x for x in authorization_data if x['ad-type'] == AD_IF_RELEVANT)
1784 for dt in ad_if_relevant_elements:
1785 buf = self.der_decode(
1786 dt['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT())
1787 # The PAC data is further wrapped in a AD_WIN2K_PAC element
1788 for ad in (x for x in buf if x['ad-type'] == AD_WIN2K_PAC):
1789 pb = ndr_unpack(krb5pac.PAC_DATA, ad['ad-data'])
1790 for pac in pb.buffers:
1791 if pac.type == krb5pac.PAC_TYPE_LOGON_INFO:
1793 pac.info.info.info3.base.account_name)
1795 str(pac.info.info.info3.base.domain_sid)
1796 + "-" + str(pac.info.info.info3.base.rid))
1797 elif pac.type == krb5pac.PAC_TYPE_LOGON_NAME:
1798 logon_name = pac.info.account_name
1799 elif pac.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
1800 upn = pac.info.upn_name
1801 domain_name = pac.info.dns_domain_name
1803 return self.PacData(
1810 def decode_service_ticket(self, creds, ticket):
1811 '''Decrypt and decode a service ticket
1814 name = creds.get_username()
1815 if name.endswith('$'):
1817 realm = creds.get_realm()
1818 salt = "%s.%s@%s" % (name, realm.lower(), realm.upper())
1820 key = self.PasswordKey_create(
1821 ticket['enc-part']['etype'],
1822 creds.get_password(),
1824 ticket['enc-part']['kvno'])
1826 enc_part = key.decrypt(KU_TICKET, ticket['enc-part']['cipher'])
1827 enc_ticket_part = self.der_decode(
1828 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
1829 return enc_ticket_part
1831 def modify_ticket_flag(self, enc_part, flag, value):
1832 self.assertIsInstance(value, bool)
1834 flag = krb5_asn1.TicketFlags(flag)
1835 pos = len(tuple(flag)) - 1
1837 flags = enc_part['flags']
1838 self.assertLessEqual(pos, len(flags))
1840 new_flags = flags[:pos] + str(int(value)) + flags[pos + 1:]
1841 enc_part['flags'] = new_flags
1845 def get_objectSid(self, samdb, dn):
1846 ''' Get the objectSID for a DN
1847 Note: performs an Ldb query.
1849 res = samdb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
1850 self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
1851 sid = samdb.schema_format_value("objectSID", res[0]["objectSID"][0])
1852 return sid.decode('utf8')
1854 def add_attribute(self, samdb, dn_str, name, value):
1855 if isinstance(value, list):
1859 flag = ldb.FLAG_MOD_ADD
1861 dn = ldb.Dn(samdb, dn_str)
1862 msg = ldb.Message(dn)
1863 msg[name] = ldb.MessageElement(values, flag, name)
1866 def modify_attribute(self, samdb, dn_str, name, value):
1867 if isinstance(value, list):
1871 flag = ldb.FLAG_MOD_REPLACE
1873 dn = ldb.Dn(samdb, dn_str)
1874 msg = ldb.Message(dn)
1875 msg[name] = ldb.MessageElement(values, flag, name)
1878 def create_ccache(self, cname, ticket, enc_part):
1879 """ Lay out a version 4 on-disk credentials cache, to be read using the
1883 field = krb5ccache.DELTATIME_TAG()
1884 field.kdc_sec_offset = 0
1885 field.kdc_usec_offset = 0
1887 v4tag = krb5ccache.V4TAG()
1891 v4tags = krb5ccache.V4TAGS()
1893 v4tags.further_tags = b''
1895 optional_header = krb5ccache.V4HEADER()
1896 optional_header.v4tags = v4tags
1898 cname_string = cname['name-string']
1900 cprincipal = krb5ccache.PRINCIPAL()
1901 cprincipal.name_type = cname['name-type']
1902 cprincipal.component_count = len(cname_string)
1903 cprincipal.realm = ticket['realm']
1904 cprincipal.components = cname_string
1906 sname = ticket['sname']
1907 sname_string = sname['name-string']
1909 sprincipal = krb5ccache.PRINCIPAL()
1910 sprincipal.name_type = sname['name-type']
1911 sprincipal.component_count = len(sname_string)
1912 sprincipal.realm = ticket['realm']
1913 sprincipal.components = sname_string
1915 key = self.EncryptionKey_import(enc_part['key'])
1917 key_data = key.export_obj()
1918 keyblock = krb5ccache.KEYBLOCK()
1919 keyblock.enctype = key_data['keytype']
1920 keyblock.data = key_data['keyvalue']
1922 addresses = krb5ccache.ADDRESSES()
1926 authdata = krb5ccache.AUTHDATA()
1930 # Re-encode the ticket, since it was decoded by another layer.
1931 ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
1933 authtime = enc_part['authtime']
1934 starttime = enc_part.get('starttime', authtime)
1935 endtime = enc_part['endtime']
1937 cred = krb5ccache.CREDENTIAL()
1938 cred.client = cprincipal
1939 cred.server = sprincipal
1940 cred.keyblock = keyblock
1941 cred.authtime = self.get_EpochFromKerberosTime(authtime)
1942 cred.starttime = self.get_EpochFromKerberosTime(starttime)
1943 cred.endtime = self.get_EpochFromKerberosTime(endtime)
1945 # Account for clock skew of up to five minutes.
1946 self.assertLess(cred.authtime - 5 * 60,
1947 datetime.now(timezone.utc).timestamp(),
1948 "Ticket not yet valid - clocks may be out of sync.")
1949 self.assertLess(cred.starttime - 5 * 60,
1950 datetime.now(timezone.utc).timestamp(),
1951 "Ticket not yet valid - clocks may be out of sync.")
1952 self.assertGreater(cred.endtime - 60 * 60,
1953 datetime.now(timezone.utc).timestamp(),
1954 "Ticket already expired/about to expire - "
1955 "clocks may be out of sync.")
1957 cred.renew_till = cred.endtime
1959 cred.ticket_flags = int(enc_part['flags'], 2)
1960 cred.addresses = addresses
1961 cred.authdata = authdata
1962 cred.ticket = ticket_data
1963 cred.second_ticket = b''
1965 ccache = krb5ccache.CCACHE()
1968 ccache.optional_header = optional_header
1969 ccache.principal = cprincipal
1972 # Serialise the credentials cache structure.
1973 result = ndr_pack(ccache)
1975 # Create a temporary file and write the credentials.
1976 cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
1977 cachefile.write(result)
1982 def create_ccache_with_user(self, user_credentials, mach_credentials,
1983 service="host", target_name=None, pac=True):
1984 # Obtain a service ticket authorising the user and place it into a
1985 # newly created credentials cache file.
1987 user_name = user_credentials.get_username()
1988 realm = user_credentials.get_realm()
1990 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1993 tgt = self.get_tgt(user_credentials)
1995 # Request a ticket to the host service on the machine account
1996 ticket = self.get_service_ticket(tgt, mach_credentials,
1998 target_name=target_name)
2001 ticket = self.modified_ticket(ticket, exclude_pac=True)
2003 # Write the ticket into a credentials cache file that can be ingested
2004 # by the main credentials code.
2005 cachefile = self.create_ccache(cname, ticket.ticket,
2006 ticket.encpart_private)
2008 # Create a credentials object to reference the credentials cache.
2009 creds = Credentials()
2010 creds.set_kerberos_state(MUST_USE_KERBEROS)
2011 creds.set_username(user_name, SPECIFIED)
2012 creds.set_realm(realm)
2013 creds.set_named_ccache(cachefile.name, SPECIFIED, self.get_lp())
2015 # Return the credentials along with the cache file.
2016 return (creds, cachefile)