1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import security
38 from samba.gensec import FEATURE_SEAL
41 from samba.tests import TestCaseInTempDir
43 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
44 from samba.tests.krb5.rfc4120_constants import (
47 FX_FAST_ARMOR_AP_REQUEST,
49 KDC_ERR_PREAUTH_FAILED,
50 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
64 KU_NON_KERB_CKSUM_SALT,
65 KU_TGS_REP_ENC_PART_SESSION,
66 KU_TGS_REP_ENC_PART_SUB_KEY,
68 KU_TGS_REQ_AUTH_CKSUM,
69 KU_TGS_REQ_AUTH_DAT_SESSION,
70 KU_TGS_REQ_AUTH_DAT_SUBKEY,
74 PADATA_ENCRYPTED_CHALLENGE,
88 PADATA_SUPPORTED_ETYPES
90 import samba.tests.krb5.kcrypto as kcrypto
93 def BitStringEncoder_encodeValue32(
94 self, value, asn1Spec, encodeFun, **options):
96 # BitStrings like KDCOptions or TicketFlags should at least
97 # be 32-Bit on the wire
99 if asn1Spec is not None:
100 # TODO: try to avoid ASN.1 schema instantiation
101 value = asn1Spec.clone(value)
103 valueLength = len(value)
105 alignedValue = value << (8 - valueLength % 8)
109 substrate = alignedValue.asOctets()
110 length = len(substrate)
111 # We need at least 32-Bit / 4-Bytes
116 ret = b'\x00' + substrate + (b'\x00' * padding)
117 return ret, False, True
120 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
123 def BitString_NamedValues_prettyPrint(self, scope=0):
124 ret = "%s" % self.asBinary()
127 for byte in self.asNumbers():
128 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
135 if len(bits) < highest_bit:
136 for bitPosition in range(len(bits), highest_bit):
139 delim = ": (\n%s " % indent
140 for bitPosition in range(highest_bit):
141 if bitPosition in self.prettyPrintNamedValues:
142 name = self.prettyPrintNamedValues[bitPosition]
143 elif bits[bitPosition] != 0:
144 name = "unknown-bit-%u" % bitPosition
147 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
148 delim = ",\n%s " % indent
149 ret += "\n%s)" % indent
153 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
154 krb5_asn1.TicketFlagsValues.namedValues
155 krb5_asn1.TicketFlags.namedValues =\
156 krb5_asn1.TicketFlagsValues.namedValues
157 krb5_asn1.TicketFlags.prettyPrint =\
158 BitString_NamedValues_prettyPrint
159 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
160 krb5_asn1.KDCOptionsValues.namedValues
161 krb5_asn1.KDCOptions.namedValues =\
162 krb5_asn1.KDCOptionsValues.namedValues
163 krb5_asn1.KDCOptions.prettyPrint =\
164 BitString_NamedValues_prettyPrint
165 krb5_asn1.APOptions.prettyPrintNamedValues =\
166 krb5_asn1.APOptionsValues.namedValues
167 krb5_asn1.APOptions.namedValues =\
168 krb5_asn1.APOptionsValues.namedValues
169 krb5_asn1.APOptions.prettyPrint =\
170 BitString_NamedValues_prettyPrint
171 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
172 krb5_asn1.PACOptionFlagsValues.namedValues
173 krb5_asn1.PACOptionFlags.namedValues =\
174 krb5_asn1.PACOptionFlagsValues.namedValues
175 krb5_asn1.PACOptionFlags.prettyPrint =\
176 BitString_NamedValues_prettyPrint
179 def Integer_NamedValues_prettyPrint(self, scope=0):
181 if intval in self.prettyPrintNamedValues:
182 name = self.prettyPrintNamedValues[intval]
184 name = "<__unknown__>"
185 ret = "%d (0x%x) %s" % (intval, intval, name)
189 krb5_asn1.NameType.prettyPrintNamedValues =\
190 krb5_asn1.NameTypeValues.namedValues
191 krb5_asn1.NameType.prettyPrint =\
192 Integer_NamedValues_prettyPrint
193 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
194 krb5_asn1.AuthDataTypeValues.namedValues
195 krb5_asn1.AuthDataType.prettyPrint =\
196 Integer_NamedValues_prettyPrint
197 krb5_asn1.PADataType.prettyPrintNamedValues =\
198 krb5_asn1.PADataTypeValues.namedValues
199 krb5_asn1.PADataType.prettyPrint =\
200 Integer_NamedValues_prettyPrint
201 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
202 krb5_asn1.EncryptionTypeValues.namedValues
203 krb5_asn1.EncryptionType.prettyPrint =\
204 Integer_NamedValues_prettyPrint
205 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
206 krb5_asn1.ChecksumTypeValues.namedValues
207 krb5_asn1.ChecksumType.prettyPrint =\
208 Integer_NamedValues_prettyPrint
209 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
210 krb5_asn1.KerbErrorDataTypeValues.namedValues
211 krb5_asn1.KerbErrorDataType.prettyPrint =\
212 Integer_NamedValues_prettyPrint
215 class Krb5EncryptionKey:
216 def __init__(self, key, kvno):
218 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
219 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
220 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
223 self.etype = key.enctype
224 self.ctype = EncTypeChecksum[self.etype]
227 def encrypt(self, usage, plaintext):
228 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
231 def decrypt(self, usage, ciphertext):
232 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
235 def make_zeroed_checksum(self, ctype=None):
239 checksum_len = kcrypto.checksum_len(ctype)
240 return bytes(checksum_len)
242 def make_checksum(self, usage, plaintext, ctype=None):
245 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
248 def verify_checksum(self, usage, plaintext, ctype, cksum):
249 if self.ctype != ctype:
250 raise AssertionError(f'{self.ctype} != {ctype}')
252 kcrypto.verify_checksum(ctype,
258 def export_obj(self):
259 EncryptionKey_obj = {
260 'keytype': self.etype,
261 'keyvalue': self.key.contents,
263 return EncryptionKey_obj
266 class KerberosCredentials(Credentials):
268 super(KerberosCredentials, self).__init__()
270 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
271 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
272 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
274 self.as_supported_enctypes = all_enc_types
275 self.tgs_supported_enctypes = all_enc_types
276 self.ap_supported_enctypes = all_enc_types
279 self.forced_keys = {}
281 self.forced_salt = None
285 def set_as_supported_enctypes(self, value):
286 self.as_supported_enctypes = int(value)
288 def set_tgs_supported_enctypes(self, value):
289 self.tgs_supported_enctypes = int(value)
291 def set_ap_supported_enctypes(self, value):
292 self.ap_supported_enctypes = int(value)
294 def _get_krb5_etypes(self, supported_enctypes):
297 if supported_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
298 etypes += (kcrypto.Enctype.AES256,)
299 if supported_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
300 etypes += (kcrypto.Enctype.AES128,)
301 if supported_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
302 etypes += (kcrypto.Enctype.RC4,)
306 def get_as_krb5_etypes(self):
307 return self._get_krb5_etypes(self.as_supported_enctypes)
309 def get_tgs_krb5_etypes(self):
310 return self._get_krb5_etypes(self.tgs_supported_enctypes)
312 def get_ap_krb5_etypes(self):
313 return self._get_krb5_etypes(self.ap_supported_enctypes)
315 def set_kvno(self, kvno):
316 # Sign-extend from 32 bits.
324 def set_forced_key(self, etype, hexkey):
326 contents = binascii.a2b_hex(hexkey)
327 key = kcrypto.Key(etype, contents)
328 self.forced_keys[etype] = Krb5EncryptionKey(key, self.kvno)
330 def get_forced_key(self, etype):
332 return self.forced_keys.get(etype)
334 def set_forced_salt(self, salt):
335 self.forced_salt = bytes(salt)
337 def get_forced_salt(self):
338 return self.forced_salt
341 if self.forced_salt is not None:
342 return self.forced_salt
344 if self.get_workstation():
345 salt_string = '%shost%s.%s' % (
346 self.get_realm().upper(),
347 self.get_username().lower().rsplit('$', 1)[0],
348 self.get_realm().lower())
350 salt_string = self.get_realm().upper() + self.get_username()
352 return salt_string.encode('utf-8')
354 def set_dn(self, dn):
361 class KerberosTicketCreds:
362 def __init__(self, ticket, session_key,
363 crealm=None, cname=None,
364 srealm=None, sname=None,
367 encpart_private=None):
369 self.session_key = session_key
374 self.decryption_key = decryption_key
375 self.ticket_private = ticket_private
376 self.encpart_private = encpart_private
379 class RawKerberosTest(TestCaseInTempDir):
380 """A raw Kerberos Test case."""
383 {"value": -1111, "name": "dummy", },
384 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
385 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
386 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
389 setup_etype_test_permutations_done = False
392 def setup_etype_test_permutations(cls):
393 if cls.setup_etype_test_permutations_done:
398 num_idxs = len(cls.etypes_to_test)
400 for num in range(1, num_idxs + 1):
401 chunk = list(itertools.permutations(range(num_idxs), num))
404 permutations.append(el)
406 for p in permutations:
410 n = cls.etypes_to_test[idx]["name"]
415 etypes += (cls.etypes_to_test[idx]["value"],)
417 r = {"name": name, "etypes": etypes, }
420 cls.etype_test_permutations = res
421 cls.setup_etype_test_permutations_done = True
424 def etype_test_permutation_name_idx(cls):
425 cls.setup_etype_test_permutations()
428 for e in cls.etype_test_permutations:
434 def etype_test_permutation_by_idx(self, idx):
435 e = self.etype_test_permutations[idx]
436 return (e['name'], e['etypes'])
442 cls.host = samba.tests.env_get_var_value('SERVER')
443 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
445 # A dictionary containing credentials that have already been
449 cls.kdc_fast_support = False
453 self.do_asn1_print = False
454 self.do_hexdump = False
456 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
458 if strict_checking is None:
459 strict_checking = '1'
460 self.strict_checking = bool(int(strict_checking))
464 self.unspecified_kvno = object()
467 self._disconnect("tearDown")
470 def _disconnect(self, reason):
476 sys.stderr.write("disconnect[%s]\n" % reason)
478 def _connect_tcp(self, host):
481 self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
482 socket.SOCK_STREAM, socket.SOL_TCP,
484 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
485 self.s.settimeout(10)
486 self.s.connect(self.a[0][4])
494 def connect(self, host):
495 self.assertNotConnected()
496 self._connect_tcp(host)
498 sys.stderr.write("connected[%s]\n" % host)
500 def env_get_var(self, varname, prefix,
501 fallback_default=True,
502 allow_missing=False):
504 if prefix is not None:
505 allow_missing_prefix = allow_missing or fallback_default
506 val = samba.tests.env_get_var_value(
507 '%s_%s' % (prefix, varname),
508 allow_missing=allow_missing_prefix)
510 fallback_default = True
511 if val is None and fallback_default:
512 val = samba.tests.env_get_var_value(varname,
513 allow_missing=allow_missing)
516 def _get_krb5_creds_from_env(self, prefix,
517 default_username=None,
518 allow_missing_password=False,
519 allow_missing_keys=True,
520 require_strongest_key=False):
521 c = KerberosCredentials()
524 domain = self.env_get_var('DOMAIN', prefix)
525 realm = self.env_get_var('REALM', prefix)
526 allow_missing_username = default_username is not None
527 username = self.env_get_var('USERNAME', prefix,
528 fallback_default=False,
529 allow_missing=allow_missing_username)
531 username = default_username
532 password = self.env_get_var('PASSWORD', prefix,
533 fallback_default=False,
534 allow_missing=allow_missing_password)
537 c.set_username(username)
538 if password is not None:
539 c.set_password(password)
540 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
541 prefix, allow_missing=True)
542 if as_supported_enctypes is not None:
543 c.set_as_supported_enctypes(as_supported_enctypes)
544 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
545 prefix, allow_missing=True)
546 if tgs_supported_enctypes is not None:
547 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
548 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
549 prefix, allow_missing=True)
550 if ap_supported_enctypes is not None:
551 c.set_ap_supported_enctypes(ap_supported_enctypes)
553 if require_strongest_key:
554 kvno_allow_missing = False
556 aes256_allow_missing = False
558 aes256_allow_missing = True
560 kvno_allow_missing = allow_missing_keys
561 aes256_allow_missing = allow_missing_keys
562 kvno = self.env_get_var('KVNO', prefix,
563 fallback_default=False,
564 allow_missing=kvno_allow_missing)
567 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
568 fallback_default=False,
569 allow_missing=aes256_allow_missing)
570 if aes256_key is not None:
571 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
572 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
573 fallback_default=False,
575 if aes128_key is not None:
576 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
577 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
578 fallback_default=False, allow_missing=True)
579 if rc4_key is not None:
580 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
582 if not allow_missing_keys:
583 self.assertTrue(c.forced_keys,
584 'Please supply %s encryption keys '
585 'in environment' % prefix)
589 def _get_krb5_creds(self,
591 default_username=None,
592 allow_missing_password=False,
593 allow_missing_keys=True,
594 require_strongest_key=False,
595 fallback_creds_fn=None):
596 if prefix in self.creds_dict:
597 return self.creds_dict[prefix]
599 # We don't have the credentials already
603 # Try to obtain them from the environment
604 creds = self._get_krb5_creds_from_env(
606 default_username=default_username,
607 allow_missing_password=allow_missing_password,
608 allow_missing_keys=allow_missing_keys,
609 require_strongest_key=require_strongest_key)
610 except Exception as err:
611 # An error occurred, so save it for later
614 self.assertIsNotNone(creds)
615 # Save the obtained credentials
616 self.creds_dict[prefix] = creds
619 if fallback_creds_fn is not None:
621 # Try to use the fallback method
622 creds = fallback_creds_fn()
623 except Exception as err:
624 print("ERROR FROM ENV: %r" % (env_err))
625 print("FALLBACK-FN: %s" % (fallback_creds_fn))
626 print("FALLBACK-ERROR: %r" % (err))
628 self.assertIsNotNone(creds)
629 # Save the obtained credentials
630 self.creds_dict[prefix] = creds
633 # Both methods failed, so raise the exception from the
637 def get_user_creds(self,
638 allow_missing_password=False,
639 allow_missing_keys=True):
640 c = self._get_krb5_creds(prefix=None,
641 allow_missing_password=allow_missing_password,
642 allow_missing_keys=allow_missing_keys)
645 def get_service_creds(self,
646 allow_missing_password=False,
647 allow_missing_keys=True):
648 c = self._get_krb5_creds(prefix='SERVICE',
649 allow_missing_password=allow_missing_password,
650 allow_missing_keys=allow_missing_keys)
653 def get_client_creds(self,
654 allow_missing_password=False,
655 allow_missing_keys=True):
656 c = self._get_krb5_creds(prefix='CLIENT',
657 allow_missing_password=allow_missing_password,
658 allow_missing_keys=allow_missing_keys)
661 def get_server_creds(self,
662 allow_missing_password=False,
663 allow_missing_keys=True):
664 c = self._get_krb5_creds(prefix='SERVER',
665 allow_missing_password=allow_missing_password,
666 allow_missing_keys=allow_missing_keys)
669 def get_admin_creds(self,
670 allow_missing_password=False,
671 allow_missing_keys=True):
672 c = self._get_krb5_creds(prefix='ADMIN',
673 allow_missing_password=allow_missing_password,
674 allow_missing_keys=allow_missing_keys)
675 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
678 def get_krbtgt_creds(self,
680 require_strongest_key=False):
681 if require_strongest_key:
682 self.assertTrue(require_keys)
683 c = self._get_krb5_creds(prefix='KRBTGT',
684 default_username='krbtgt',
685 allow_missing_password=True,
686 allow_missing_keys=not require_keys,
687 require_strongest_key=require_strongest_key)
690 def get_anon_creds(self):
695 def asn1_dump(self, name, obj, asn1_print=None):
696 if asn1_print is None:
697 asn1_print = self.do_asn1_print
700 sys.stderr.write("%s:\n%s" % (name, obj))
702 sys.stderr.write("%s" % (obj))
704 def hex_dump(self, name, blob, hexdump=None):
706 hexdump = self.do_hexdump
709 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
718 if asn1Spec is not None:
719 class_name = type(asn1Spec).__name__.split(':')[0]
721 class_name = "<None-asn1Spec>"
722 self.hex_dump(class_name, blob, hexdump=hexdump)
723 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
724 self.asn1_dump(None, obj, asn1_print=asn1_print)
726 obj = pyasn1_native_encode(obj)
737 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
738 class_name = type(obj).__name__.split(':')[0]
739 if class_name is not None:
740 self.asn1_dump(None, obj, asn1_print=asn1_print)
741 blob = pyasn1_der_encode(obj)
742 if class_name is not None:
743 self.hex_dump(class_name, blob, hexdump=hexdump)
746 def send_pdu(self, req, asn1_print=None, hexdump=None):
748 k5_pdu = self.der_encode(
749 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
750 header = struct.pack('>I', len(k5_pdu))
753 self.hex_dump("send_pdu", header, hexdump=hexdump)
754 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
756 sent = self.s.send(req_pdu, 0)
757 if sent == len(req_pdu):
759 req_pdu = req_pdu[sent:]
760 except socket.error as e:
761 self._disconnect("send_pdu: %s" % e)
764 self._disconnect("send_pdu: %s" % e)
767 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
770 if timeout is not None:
771 self.s.settimeout(timeout)
772 rep_pdu = self.s.recv(num_recv, 0)
773 self.s.settimeout(10)
774 if len(rep_pdu) == 0:
775 self._disconnect("recv_raw: EOF")
777 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
778 except socket.timeout:
779 self.s.settimeout(10)
780 sys.stderr.write("recv_raw: TIMEOUT\n")
781 except socket.error as e:
782 self._disconnect("recv_raw: %s" % e)
785 self._disconnect("recv_raw: %s" % e)
789 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
792 raw_pdu = self.recv_raw(
793 num_recv=4, hexdump=hexdump, timeout=timeout)
796 header = struct.unpack(">I", raw_pdu[0:4])
803 raw_pdu = self.recv_raw(
804 num_recv=missing, hexdump=hexdump, timeout=timeout)
805 self.assertGreaterEqual(len(raw_pdu), 1)
807 missing = k5_len - len(rep_pdu)
808 k5_raw = self.der_decode(
814 pvno = k5_raw['field-0']
815 self.assertEqual(pvno, 5)
816 msg_type = k5_raw['field-1']
817 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
818 if msg_type == KRB_AS_REP:
819 asn1Spec = krb5_asn1.AS_REP()
820 elif msg_type == KRB_TGS_REP:
821 asn1Spec = krb5_asn1.TGS_REP()
822 elif msg_type == KRB_ERROR:
823 asn1Spec = krb5_asn1.KRB_ERROR()
824 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
825 asn1_print=asn1_print, hexdump=False)
826 return (rep, rep_pdu)
828 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
829 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
834 def assertIsConnected(self):
835 self.assertIsNotNone(self.s, msg="Not connected")
837 def assertNotConnected(self):
838 self.assertIsNone(self.s, msg="Is connected")
840 def send_recv_transaction(
847 host = self.host if to_rodc else self.dc_host
850 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
852 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
854 self._disconnect("transaction failed")
856 self._disconnect("transaction done")
859 def assertNoValue(self, value):
860 self.assertTrue(value.isNoValue)
862 def assertHasValue(self, value):
863 self.assertIsNotNone(value)
865 def getElementValue(self, obj, elem):
868 def assertElementMissing(self, obj, elem):
869 v = self.getElementValue(obj, elem)
872 def assertElementPresent(self, obj, elem):
873 v = self.getElementValue(obj, elem)
874 self.assertIsNotNone(v)
875 if self.strict_checking:
876 if isinstance(v, collections.abc.Container):
877 self.assertNotEqual(0, len(v))
879 def assertElementEqual(self, obj, elem, value):
880 v = self.getElementValue(obj, elem)
881 self.assertIsNotNone(v)
882 self.assertEqual(v, value)
884 def assertElementEqualUTF8(self, obj, elem, value):
885 v = self.getElementValue(obj, elem)
886 self.assertIsNotNone(v)
887 self.assertEqual(v, bytes(value, 'utf8'))
889 def assertPrincipalEqual(self, princ1, princ2):
890 self.assertEqual(princ1['name-type'], princ2['name-type'])
892 len(princ1['name-string']),
893 len(princ2['name-string']),
894 msg="princ1=%s != princ2=%s" % (princ1, princ2))
895 for idx in range(len(princ1['name-string'])):
897 princ1['name-string'][idx],
898 princ2['name-string'][idx],
899 msg="princ1=%s != princ2=%s" % (princ1, princ2))
901 def assertElementEqualPrincipal(self, obj, elem, value):
902 v = self.getElementValue(obj, elem)
903 self.assertIsNotNone(v)
904 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
905 self.assertPrincipalEqual(v, value)
907 def assertElementKVNO(self, obj, elem, value):
908 v = self.getElementValue(obj, elem)
909 if value == "autodetect":
911 if value is not None:
912 self.assertIsNotNone(v)
913 # The value on the wire should never be 0
914 self.assertNotEqual(v, 0)
915 # unspecified_kvno means we don't know the kvno,
916 # but want to enforce its presence
917 if value is not self.unspecified_kvno:
919 self.assertNotEqual(value, 0)
920 self.assertEqual(v, value)
924 def assertElementFlags(self, obj, elem, expected, unexpected):
925 v = self.getElementValue(obj, elem)
926 self.assertIsNotNone(v)
927 if expected is not None:
928 self.assertIsInstance(expected, krb5_asn1.KDCOptions)
929 for i, flag in enumerate(expected):
931 self.assertEqual('1', v[i],
932 f"'{expected.namedValues[i]}' "
934 if unexpected is not None:
935 self.assertIsInstance(unexpected, krb5_asn1.KDCOptions)
936 for i, flag in enumerate(unexpected):
938 self.assertEqual('0', v[i],
939 f"'{unexpected.namedValues[i]}' "
940 f"unexpected in {v}")
942 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
945 if offset is not None:
946 epoch = epoch + int(offset)
947 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
948 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
950 def get_KerberosTime(self, epoch=None, offset=None):
951 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
954 def get_EpochFromKerberosTime(self, kerberos_time):
955 if isinstance(kerberos_time, bytes):
956 kerberos_time = kerberos_time.decode()
958 epoch = datetime.datetime.strptime(kerberos_time,
960 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
961 epoch = int(epoch.timestamp())
966 nonce_min = 0x7f000000
967 nonce_max = 0x7fffffff
968 v = random.randint(nonce_min, nonce_max)
971 def get_pa_dict(self, pa_data):
974 if pa_data is not None:
976 pa_type = pa['padata-type']
977 if pa_type in pa_dict:
978 raise RuntimeError(f'Duplicate type {pa_type}')
979 pa_dict[pa_type] = pa['padata-value']
983 def SessionKey_create(self, etype, contents, kvno=None):
984 key = kcrypto.Key(etype, contents)
985 return Krb5EncryptionKey(key, kvno)
987 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
988 self.assertIsNotNone(pwd)
989 self.assertIsNotNone(salt)
990 key = kcrypto.string_to_key(etype, pwd, salt)
991 return Krb5EncryptionKey(key, kvno)
993 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
994 e = etype_info2['etype']
996 salt = etype_info2.get('salt')
998 if e == kcrypto.Enctype.RC4:
999 nthash = creds.get_nt_hash()
1000 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1002 password = creds.get_password()
1003 return self.PasswordKey_create(
1004 etype=e, pwd=password, salt=salt, kvno=kvno)
1006 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1009 etypes = creds.get_tgs_krb5_etypes()
1012 forced_key = creds.get_forced_key(etype)
1013 if forced_key is not None:
1016 kvno = creds.get_kvno()
1018 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1019 "nor a password specified, " % (
1020 creds.get_username(), etype, kvno))
1022 if etype == kcrypto.Enctype.RC4:
1023 nthash = creds.get_nt_hash()
1024 self.assertIsNotNone(nthash, msg=fail_msg)
1025 return self.SessionKey_create(etype=etype,
1029 password = creds.get_password()
1030 self.assertIsNotNone(password, msg=fail_msg)
1031 salt = creds.get_salt()
1032 return self.PasswordKey_create(etype=etype,
1037 def RandomKey(self, etype):
1038 e = kcrypto._get_enctype_profile(etype)
1039 contents = samba.generate_random_bytes(e.keysize)
1040 return self.SessionKey_create(etype=etype, contents=contents)
1042 def EncryptionKey_import(self, EncryptionKey_obj):
1043 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1044 EncryptionKey_obj['keyvalue'])
1046 def EncryptedData_create(self, key, usage, plaintext):
1047 # EncryptedData ::= SEQUENCE {
1048 # etype [0] Int32 -- EncryptionType --,
1049 # kvno [1] Int32 OPTIONAL,
1050 # cipher [2] OCTET STRING -- ciphertext
1052 ciphertext = key.encrypt(usage, plaintext)
1053 EncryptedData_obj = {
1055 'cipher': ciphertext
1057 if key.kvno is not None:
1058 EncryptedData_obj['kvno'] = key.kvno
1059 return EncryptedData_obj
1061 def Checksum_create(self, key, usage, plaintext, ctype=None):
1062 # Checksum ::= SEQUENCE {
1063 # cksumtype [0] Int32,
1064 # checksum [1] OCTET STRING
1068 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1071 'checksum': checksum,
1076 def PrincipalName_create(cls, name_type, names):
1077 # PrincipalName ::= SEQUENCE {
1078 # name-type [0] Int32,
1079 # name-string [1] SEQUENCE OF KerberosString
1081 PrincipalName_obj = {
1082 'name-type': name_type,
1083 'name-string': names,
1085 return PrincipalName_obj
1087 def AuthorizationData_create(self, ad_type, ad_data):
1088 # AuthorizationData ::= SEQUENCE {
1089 # ad-type [0] Int32,
1090 # ad-data [1] OCTET STRING
1096 return AUTH_DATA_obj
1098 def PA_DATA_create(self, padata_type, padata_value):
1099 # PA-DATA ::= SEQUENCE {
1100 # -- NOTE: first tag is [1], not [0]
1101 # padata-type [1] Int32,
1102 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1105 'padata-type': padata_type,
1106 'padata-value': padata_value,
1110 def PA_ENC_TS_ENC_create(self, ts, usec):
1111 # PA-ENC-TS-ENC ::= SEQUENCE {
1112 # patimestamp[0] KerberosTime, -- client's time
1113 # pausec[1] krb5int32 OPTIONAL
1115 PA_ENC_TS_ENC_obj = {
1119 return PA_ENC_TS_ENC_obj
1121 def PA_PAC_OPTIONS_create(self, options):
1122 # PA-PAC-OPTIONS ::= SEQUENCE {
1123 # options [0] PACOptionFlags
1125 PA_PAC_OPTIONS_obj = {
1128 return PA_PAC_OPTIONS_obj
1130 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1131 # KrbFastArmor ::= SEQUENCE {
1132 # armor-type [0] Int32,
1133 # armor-value [1] OCTET STRING,
1136 KRB_FAST_ARMOR_obj = {
1137 'armor-type': armor_type,
1138 'armor-value': armor_value
1140 return KRB_FAST_ARMOR_obj
1142 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1143 # KrbFastReq ::= SEQUENCE {
1144 # fast-options [0] FastOptions,
1145 # padata [1] SEQUENCE OF PA-DATA,
1146 # req-body [2] KDC-REQ-BODY,
1149 KRB_FAST_REQ_obj = {
1150 'fast-options': fast_options,
1152 'req-body': req_body
1154 return KRB_FAST_REQ_obj
1156 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1157 # KrbFastArmoredReq ::= SEQUENCE {
1158 # armor [0] KrbFastArmor OPTIONAL,
1159 # req-checksum [1] Checksum,
1160 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1162 KRB_FAST_ARMORED_REQ_obj = {
1163 'req-checksum': req_checksum,
1164 'enc-fast-req': enc_fast_req
1166 if armor is not None:
1167 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1168 return KRB_FAST_ARMORED_REQ_obj
1170 def PA_FX_FAST_REQUEST_create(self, armored_data):
1171 # PA-FX-FAST-REQUEST ::= CHOICE {
1172 # armored-data [0] KrbFastArmoredReq,
1175 PA_FX_FAST_REQUEST_obj = {
1176 'armored-data': armored_data
1178 return PA_FX_FAST_REQUEST_obj
1180 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1181 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1182 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1184 # --If FALSE, and PAC present,
1187 KERB_PA_PAC_REQUEST_obj = {
1188 'include-pac': include_pac,
1190 if not pa_data_create:
1191 return KERB_PA_PAC_REQUEST_obj
1192 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1193 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1194 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1197 def get_pa_pac_options(self, options):
1198 pac_options = self.PA_PAC_OPTIONS_create(options)
1199 pac_options = self.der_encode(pac_options,
1200 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1201 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1205 def KDC_REQ_BODY_create(self,
1217 EncAuthorizationData,
1218 EncAuthorizationData_key,
1219 EncAuthorizationData_usage,
1222 # KDC-REQ-BODY ::= SEQUENCE {
1223 # kdc-options [0] KDCOptions,
1224 # cname [1] PrincipalName OPTIONAL
1225 # -- Used only in AS-REQ --,
1228 # -- Also client's in AS-REQ --,
1229 # sname [3] PrincipalName OPTIONAL,
1230 # from [4] KerberosTime OPTIONAL,
1231 # till [5] KerberosTime,
1232 # rtime [6] KerberosTime OPTIONAL,
1234 # etype [8] SEQUENCE OF Int32
1236 # -- in preference order --,
1237 # addresses [9] HostAddresses OPTIONAL,
1238 # enc-authorization-data [10] EncryptedData OPTIONAL
1239 # -- AuthorizationData --,
1240 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1241 # -- NOTE: not empty
1243 if EncAuthorizationData is not None:
1244 enc_ad_plain = self.der_encode(
1245 EncAuthorizationData,
1246 asn1Spec=krb5_asn1.AuthorizationData(),
1247 asn1_print=asn1_print,
1249 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1250 EncAuthorizationData_usage,
1254 KDC_REQ_BODY_obj = {
1255 'kdc-options': kdc_options,
1261 if cname is not None:
1262 KDC_REQ_BODY_obj['cname'] = cname
1263 if sname is not None:
1264 KDC_REQ_BODY_obj['sname'] = sname
1265 if from_time is not None:
1266 KDC_REQ_BODY_obj['from'] = from_time
1267 if renew_time is not None:
1268 KDC_REQ_BODY_obj['rtime'] = renew_time
1269 if addresses is not None:
1270 KDC_REQ_BODY_obj['addresses'] = addresses
1271 if enc_ad is not None:
1272 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1273 if additional_tickets is not None:
1274 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1275 return KDC_REQ_BODY_obj
1277 def KDC_REQ_create(self,
1284 # KDC-REQ ::= SEQUENCE {
1285 # -- NOTE: first tag is [1], not [0]
1286 # pvno [1] INTEGER (5) ,
1287 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1288 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1289 # -- NOTE: not empty --,
1290 # req-body [4] KDC-REQ-BODY
1295 'msg-type': msg_type,
1296 'req-body': req_body,
1298 if padata is not None:
1299 KDC_REQ_obj['padata'] = padata
1300 if asn1Spec is not None:
1301 KDC_REQ_decoded = pyasn1_native_decode(
1302 KDC_REQ_obj, asn1Spec=asn1Spec)
1304 KDC_REQ_decoded = None
1305 return KDC_REQ_obj, KDC_REQ_decoded
1307 def AS_REQ_create(self,
1309 kdc_options, # required
1313 from_time, # optional
1314 till_time, # required
1315 renew_time, # optional
1318 addresses, # optional
1320 native_decoded_only=True,
1323 # KDC-REQ ::= SEQUENCE {
1324 # -- NOTE: first tag is [1], not [0]
1325 # pvno [1] INTEGER (5) ,
1326 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1327 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1328 # -- NOTE: not empty --,
1329 # req-body [4] KDC-REQ-BODY
1332 # KDC-REQ-BODY ::= SEQUENCE {
1333 # kdc-options [0] KDCOptions,
1334 # cname [1] PrincipalName OPTIONAL
1335 # -- Used only in AS-REQ --,
1338 # -- Also client's in AS-REQ --,
1339 # sname [3] PrincipalName OPTIONAL,
1340 # from [4] KerberosTime OPTIONAL,
1341 # till [5] KerberosTime,
1342 # rtime [6] KerberosTime OPTIONAL,
1344 # etype [8] SEQUENCE OF Int32
1346 # -- in preference order --,
1347 # addresses [9] HostAddresses OPTIONAL,
1348 # enc-authorization-data [10] EncryptedData OPTIONAL
1349 # -- AuthorizationData --,
1350 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1351 # -- NOTE: not empty
1353 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1365 EncAuthorizationData=None,
1366 EncAuthorizationData_key=None,
1367 EncAuthorizationData_usage=None,
1368 asn1_print=asn1_print,
1370 obj, decoded = self.KDC_REQ_create(
1371 msg_type=KRB_AS_REQ,
1373 req_body=KDC_REQ_BODY_obj,
1374 asn1Spec=krb5_asn1.AS_REQ(),
1375 asn1_print=asn1_print,
1377 if native_decoded_only:
1381 def AP_REQ_create(self, ap_options, ticket, authenticator):
1382 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1383 # pvno [0] INTEGER (5),
1384 # msg-type [1] INTEGER (14),
1385 # ap-options [2] APOptions,
1386 # ticket [3] Ticket,
1387 # authenticator [4] EncryptedData -- Authenticator
1391 'msg-type': KRB_AP_REQ,
1392 'ap-options': ap_options,
1394 'authenticator': authenticator,
1398 def Authenticator_create(
1399 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1400 authorization_data):
1401 # -- Unencrypted authenticator
1402 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1403 # authenticator-vno [0] INTEGER (5),
1405 # cname [2] PrincipalName,
1406 # cksum [3] Checksum OPTIONAL,
1407 # cusec [4] Microseconds,
1408 # ctime [5] KerberosTime,
1409 # subkey [6] EncryptionKey OPTIONAL,
1410 # seq-number [7] UInt32 OPTIONAL,
1411 # authorization-data [8] AuthorizationData OPTIONAL
1413 Authenticator_obj = {
1414 'authenticator-vno': 5,
1420 if cksum is not None:
1421 Authenticator_obj['cksum'] = cksum
1422 if subkey is not None:
1423 Authenticator_obj['subkey'] = subkey
1424 if seq_number is not None:
1425 Authenticator_obj['seq-number'] = seq_number
1426 if authorization_data is not None:
1427 Authenticator_obj['authorization-data'] = authorization_data
1428 return Authenticator_obj
1430 def TGS_REQ_create(self,
1435 kdc_options, # required
1439 from_time, # optional
1440 till_time, # required
1441 renew_time, # optional
1444 addresses, # optional
1445 EncAuthorizationData,
1446 EncAuthorizationData_key,
1449 authenticator_subkey=None,
1450 body_checksum_type=None,
1451 native_decoded_only=True,
1454 # KDC-REQ ::= SEQUENCE {
1455 # -- NOTE: first tag is [1], not [0]
1456 # pvno [1] INTEGER (5) ,
1457 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1458 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1459 # -- NOTE: not empty --,
1460 # req-body [4] KDC-REQ-BODY
1463 # KDC-REQ-BODY ::= SEQUENCE {
1464 # kdc-options [0] KDCOptions,
1465 # cname [1] PrincipalName OPTIONAL
1466 # -- Used only in AS-REQ --,
1469 # -- Also client's in AS-REQ --,
1470 # sname [3] PrincipalName OPTIONAL,
1471 # from [4] KerberosTime OPTIONAL,
1472 # till [5] KerberosTime,
1473 # rtime [6] KerberosTime OPTIONAL,
1475 # etype [8] SEQUENCE OF Int32
1477 # -- in preference order --,
1478 # addresses [9] HostAddresses OPTIONAL,
1479 # enc-authorization-data [10] EncryptedData OPTIONAL
1480 # -- AuthorizationData --,
1481 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1482 # -- NOTE: not empty
1485 if authenticator_subkey is not None:
1486 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1488 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1490 req_body = self.KDC_REQ_BODY_create(
1491 kdc_options=kdc_options,
1495 from_time=from_time,
1496 till_time=till_time,
1497 renew_time=renew_time,
1500 addresses=addresses,
1501 additional_tickets=additional_tickets,
1502 EncAuthorizationData=EncAuthorizationData,
1503 EncAuthorizationData_key=EncAuthorizationData_key,
1504 EncAuthorizationData_usage=EncAuthorizationData_usage)
1505 req_body_blob = self.der_encode(req_body,
1506 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1507 asn1_print=asn1_print, hexdump=hexdump)
1509 req_body_checksum = self.Checksum_create(ticket_session_key,
1510 KU_TGS_REQ_AUTH_CKSUM,
1512 ctype=body_checksum_type)
1515 if authenticator_subkey is not None:
1516 subkey_obj = authenticator_subkey.export_obj()
1517 seq_number = random.randint(0, 0xfffffffe)
1518 authenticator = self.Authenticator_create(
1521 cksum=req_body_checksum,
1525 seq_number=seq_number,
1526 authorization_data=None)
1527 authenticator = self.der_encode(
1529 asn1Spec=krb5_asn1.Authenticator(),
1530 asn1_print=asn1_print,
1533 authenticator = self.EncryptedData_create(
1534 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1536 ap_options = krb5_asn1.APOptions('0')
1537 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1539 authenticator=authenticator)
1540 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1541 asn1_print=asn1_print, hexdump=hexdump)
1542 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1543 if padata is not None:
1544 padata.append(pa_tgs_req)
1546 padata = [pa_tgs_req]
1548 obj, decoded = self.KDC_REQ_create(
1549 msg_type=KRB_TGS_REQ,
1552 asn1Spec=krb5_asn1.TGS_REQ(),
1553 asn1_print=asn1_print,
1555 if native_decoded_only:
1559 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1560 # PA-S4U2Self ::= SEQUENCE {
1561 # name [0] PrincipalName,
1563 # cksum [2] Checksum,
1564 # auth [3] GeneralString
1566 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1567 for n in name['name-string']:
1568 cksum_data += n.encode()
1569 cksum_data += realm.encode()
1570 cksum_data += "Kerberos".encode()
1571 cksum = self.Checksum_create(tgt_session_key,
1572 KU_NON_KERB_CKSUM_SALT,
1582 pa_s4u2self = self.der_encode(
1583 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1584 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1586 def _generic_kdc_exchange(self,
1587 kdc_exchange_dict, # required
1588 cname=None, # optional
1589 realm=None, # required
1590 sname=None, # optional
1591 from_time=None, # optional
1592 till_time=None, # required
1593 renew_time=None, # optional
1594 etypes=None, # required
1595 addresses=None, # optional
1596 additional_tickets=None, # optional
1597 EncAuthorizationData=None, # optional
1598 EncAuthorizationData_key=None, # optional
1599 EncAuthorizationData_usage=None): # optional
1601 check_error_fn = kdc_exchange_dict['check_error_fn']
1602 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1603 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1604 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1605 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1606 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1607 callback_dict = kdc_exchange_dict['callback_dict']
1608 req_msg_type = kdc_exchange_dict['req_msg_type']
1609 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1610 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1612 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1613 kdc_options = kdc_exchange_dict['kdc_options']
1615 pac_request = kdc_exchange_dict['pac_request']
1616 pac_options = kdc_exchange_dict['pac_options']
1618 # Parameters specific to the inner request body
1619 inner_req = kdc_exchange_dict['inner_req']
1621 # Parameters specific to the outer request body
1622 outer_req = kdc_exchange_dict['outer_req']
1624 if till_time is None:
1625 till_time = self.get_KerberosTime(offset=36000)
1627 if 'nonce' in kdc_exchange_dict:
1628 nonce = kdc_exchange_dict['nonce']
1630 nonce = self.get_Nonce()
1631 kdc_exchange_dict['nonce'] = nonce
1633 req_body = self.KDC_REQ_BODY_create(
1634 kdc_options=kdc_options,
1638 from_time=from_time,
1639 till_time=till_time,
1640 renew_time=renew_time,
1643 addresses=addresses,
1644 additional_tickets=additional_tickets,
1645 EncAuthorizationData=EncAuthorizationData,
1646 EncAuthorizationData_key=EncAuthorizationData_key,
1647 EncAuthorizationData_usage=EncAuthorizationData_usage)
1649 inner_req_body = dict(req_body)
1650 if inner_req is not None:
1651 for key, value in inner_req.items():
1652 if value is not None:
1653 inner_req_body[key] = value
1655 del inner_req_body[key]
1656 if outer_req is not None:
1657 for key, value in outer_req.items():
1658 if value is not None:
1659 req_body[key] = value
1663 additional_padata = []
1664 if pac_request is not None:
1665 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1666 additional_padata.append(pa_pac_request)
1667 if pac_options is not None:
1668 pa_pac_options = self.get_pa_pac_options(pac_options)
1669 additional_padata.append(pa_pac_options)
1671 if req_msg_type == KRB_AS_REQ:
1673 tgs_req_padata = None
1675 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1677 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1681 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1683 if generate_fast_padata_fn is not None:
1684 self.assertIsNotNone(generate_fast_fn)
1685 # This can alter req_body...
1686 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1690 fast_padata += additional_padata
1694 if generate_fast_armor_fn is not None:
1695 self.assertIsNotNone(generate_fast_fn)
1696 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1701 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1702 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1707 if generate_padata_fn is not None:
1708 # This can alter req_body...
1709 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1712 self.assertIsNotNone(outer_padata)
1713 self.assertNotIn(PADATA_KDC_REQ,
1714 [pa['padata-type'] for pa in outer_padata],
1715 'Don\'t create TGS-REQ manually')
1719 if generate_fast_fn is not None:
1720 armor_key = kdc_exchange_dict['armor_key']
1721 self.assertIsNotNone(armor_key)
1723 if req_msg_type == KRB_AS_REQ:
1724 checksum_blob = self.der_encode(
1726 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1728 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1729 checksum_blob = tgs_req
1731 checksum = self.Checksum_create(armor_key,
1735 fast = generate_fast_fn(kdc_exchange_dict,
1746 if tgs_req_padata is not None:
1747 padata.append(tgs_req_padata)
1749 if fast is not None:
1752 if outer_padata is not None:
1753 padata += outer_padata
1756 padata += additional_padata
1761 kdc_exchange_dict['req_padata'] = padata
1762 kdc_exchange_dict['fast_padata'] = fast_padata
1763 kdc_exchange_dict['req_body'] = inner_req_body
1765 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1768 asn1Spec=req_asn1Spec())
1770 to_rodc = kdc_exchange_dict['to_rodc']
1772 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1773 self.assertIsNotNone(rep)
1775 msg_type = self.getElementValue(rep, 'msg-type')
1776 self.assertIsNotNone(msg_type)
1778 expected_msg_type = None
1779 if check_error_fn is not None:
1780 expected_msg_type = KRB_ERROR
1781 self.assertIsNone(check_rep_fn)
1782 self.assertNotEqual(0, len(expected_error_mode))
1783 self.assertNotIn(0, expected_error_mode)
1784 if check_rep_fn is not None:
1785 expected_msg_type = rep_msg_type
1786 self.assertIsNone(check_error_fn)
1787 self.assertEqual(0, len(expected_error_mode))
1788 self.assertIsNotNone(expected_msg_type)
1789 self.assertEqual(msg_type, expected_msg_type)
1791 if msg_type == KRB_ERROR:
1792 return check_error_fn(kdc_exchange_dict,
1796 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1798 def as_exchange_dict(self,
1799 expected_crealm=None,
1800 expected_cname=None,
1801 expected_anon=False,
1802 expected_srealm=None,
1803 expected_sname=None,
1804 expected_flags=None,
1805 unexpected_flags=None,
1806 ticket_decryption_key=None,
1807 generate_fast_fn=None,
1808 generate_fast_armor_fn=None,
1809 generate_fast_padata_fn=None,
1810 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1811 generate_padata_fn=None,
1812 check_error_fn=None,
1814 check_kdc_private_fn=None,
1816 expected_error_mode=0,
1817 expected_status=None,
1818 client_as_etypes=None,
1820 authenticator_subkey=None,
1832 if expected_error_mode == 0:
1833 expected_error_mode = ()
1834 elif not isinstance(expected_error_mode, collections.abc.Container):
1835 expected_error_mode = (expected_error_mode,)
1837 kdc_exchange_dict = {
1838 'req_msg_type': KRB_AS_REQ,
1839 'req_asn1Spec': krb5_asn1.AS_REQ,
1840 'rep_msg_type': KRB_AS_REP,
1841 'rep_asn1Spec': krb5_asn1.AS_REP,
1842 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1843 'expected_crealm': expected_crealm,
1844 'expected_cname': expected_cname,
1845 'expected_anon': expected_anon,
1846 'expected_srealm': expected_srealm,
1847 'expected_sname': expected_sname,
1848 'expected_flags': expected_flags,
1849 'unexpected_flags': unexpected_flags,
1850 'ticket_decryption_key': ticket_decryption_key,
1851 'generate_fast_fn': generate_fast_fn,
1852 'generate_fast_armor_fn': generate_fast_armor_fn,
1853 'generate_fast_padata_fn': generate_fast_padata_fn,
1854 'fast_armor_type': fast_armor_type,
1855 'generate_padata_fn': generate_padata_fn,
1856 'check_error_fn': check_error_fn,
1857 'check_rep_fn': check_rep_fn,
1858 'check_kdc_private_fn': check_kdc_private_fn,
1859 'callback_dict': callback_dict,
1860 'expected_error_mode': expected_error_mode,
1861 'expected_status': expected_status,
1862 'client_as_etypes': client_as_etypes,
1863 'expected_salt': expected_salt,
1864 'authenticator_subkey': authenticator_subkey,
1865 'preauth_key': preauth_key,
1866 'armor_key': armor_key,
1867 'armor_tgt': armor_tgt,
1868 'armor_subkey': armor_subkey,
1869 'auth_data': auth_data,
1870 'kdc_options': kdc_options,
1871 'inner_req': inner_req,
1872 'outer_req': outer_req,
1873 'pac_request': pac_request,
1874 'pac_options': pac_options,
1877 if callback_dict is None:
1880 return kdc_exchange_dict
1882 def tgs_exchange_dict(self,
1883 expected_crealm=None,
1884 expected_cname=None,
1885 expected_anon=False,
1886 expected_srealm=None,
1887 expected_sname=None,
1888 expected_flags=None,
1889 unexpected_flags=None,
1890 ticket_decryption_key=None,
1891 generate_fast_fn=None,
1892 generate_fast_armor_fn=None,
1893 generate_fast_padata_fn=None,
1894 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1895 generate_padata_fn=None,
1896 check_error_fn=None,
1898 check_kdc_private_fn=None,
1899 expected_error_mode=0,
1900 expected_status=None,
1906 authenticator_subkey=None,
1908 body_checksum_type=None,
1915 if expected_error_mode == 0:
1916 expected_error_mode = ()
1917 elif not isinstance(expected_error_mode, collections.abc.Container):
1918 expected_error_mode = (expected_error_mode,)
1920 kdc_exchange_dict = {
1921 'req_msg_type': KRB_TGS_REQ,
1922 'req_asn1Spec': krb5_asn1.TGS_REQ,
1923 'rep_msg_type': KRB_TGS_REP,
1924 'rep_asn1Spec': krb5_asn1.TGS_REP,
1925 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
1926 'expected_crealm': expected_crealm,
1927 'expected_cname': expected_cname,
1928 'expected_anon': expected_anon,
1929 'expected_srealm': expected_srealm,
1930 'expected_sname': expected_sname,
1931 'expected_flags': expected_flags,
1932 'unexpected_flags': unexpected_flags,
1933 'ticket_decryption_key': ticket_decryption_key,
1934 'generate_fast_fn': generate_fast_fn,
1935 'generate_fast_armor_fn': generate_fast_armor_fn,
1936 'generate_fast_padata_fn': generate_fast_padata_fn,
1937 'fast_armor_type': fast_armor_type,
1938 'generate_padata_fn': generate_padata_fn,
1939 'check_error_fn': check_error_fn,
1940 'check_rep_fn': check_rep_fn,
1941 'check_kdc_private_fn': check_kdc_private_fn,
1942 'callback_dict': callback_dict,
1943 'expected_error_mode': expected_error_mode,
1944 'expected_status': expected_status,
1946 'body_checksum_type': body_checksum_type,
1947 'armor_key': armor_key,
1948 'armor_tgt': armor_tgt,
1949 'armor_subkey': armor_subkey,
1950 'auth_data': auth_data,
1951 'authenticator_subkey': authenticator_subkey,
1952 'kdc_options': kdc_options,
1953 'inner_req': inner_req,
1954 'outer_req': outer_req,
1955 'pac_request': pac_request,
1956 'pac_options': pac_options,
1959 if callback_dict is None:
1962 return kdc_exchange_dict
1964 def generic_check_kdc_rep(self,
1969 expected_crealm = kdc_exchange_dict['expected_crealm']
1970 expected_anon = kdc_exchange_dict['expected_anon']
1971 expected_srealm = kdc_exchange_dict['expected_srealm']
1972 expected_sname = kdc_exchange_dict['expected_sname']
1973 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
1974 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
1975 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
1976 msg_type = kdc_exchange_dict['rep_msg_type']
1977 armor_key = kdc_exchange_dict['armor_key']
1979 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
1980 padata = self.getElementValue(rep, 'padata')
1981 if self.strict_checking:
1982 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
1984 expected_cname = self.PrincipalName_create(
1985 name_type=NT_WELLKNOWN,
1986 names=['WELLKNOWN', 'ANONYMOUS'])
1988 expected_cname = kdc_exchange_dict['expected_cname']
1989 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
1990 self.assertElementPresent(rep, 'ticket')
1991 ticket = self.getElementValue(rep, 'ticket')
1992 ticket_encpart = None
1993 ticket_cipher = None
1994 self.assertIsNotNone(ticket)
1995 if ticket is not None: # Never None, but gives indentation
1996 self.assertElementEqual(ticket, 'tkt-vno', 5)
1997 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
1998 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
1999 self.assertElementPresent(ticket, 'enc-part')
2000 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2001 self.assertIsNotNone(ticket_encpart)
2002 if ticket_encpart is not None: # Never None, but gives indentation
2003 self.assertElementPresent(ticket_encpart, 'etype')
2004 # 'unspecified' means present, with any value != 0
2005 self.assertElementKVNO(ticket_encpart, 'kvno',
2006 self.unspecified_kvno)
2007 self.assertElementPresent(ticket_encpart, 'cipher')
2008 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2009 self.assertElementPresent(rep, 'enc-part')
2010 encpart = self.getElementValue(rep, 'enc-part')
2011 encpart_cipher = None
2012 self.assertIsNotNone(encpart)
2013 if encpart is not None: # Never None, but gives indentation
2014 self.assertElementPresent(encpart, 'etype')
2015 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2016 self.assertElementPresent(encpart, 'cipher')
2017 encpart_cipher = self.getElementValue(encpart, 'cipher')
2019 ticket_checksum = None
2021 # Get the decryption key for the encrypted part
2022 encpart_decryption_key, encpart_decryption_usage = (
2023 self.get_preauth_key(kdc_exchange_dict))
2025 if armor_key is not None:
2026 pa_dict = self.get_pa_dict(padata)
2028 if PADATA_FX_FAST in pa_dict:
2029 fx_fast_data = pa_dict[PADATA_FX_FAST]
2030 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2035 if 'strengthen-key' in fast_response:
2036 strengthen_key = self.EncryptionKey_import(
2037 fast_response['strengthen-key'])
2038 encpart_decryption_key = (
2039 self.generate_strengthen_reply_key(
2041 encpart_decryption_key))
2043 fast_finished = fast_response.get('finished')
2044 if fast_finished is not None:
2045 ticket_checksum = fast_finished['ticket-checksum']
2047 self.check_rep_padata(kdc_exchange_dict,
2050 fast_response['padata'],
2053 ticket_private = None
2054 if ticket_decryption_key is not None:
2055 self.assertElementEqual(ticket_encpart, 'etype',
2056 ticket_decryption_key.etype)
2057 self.assertElementKVNO(ticket_encpart, 'kvno',
2058 ticket_decryption_key.kvno)
2059 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2061 ticket_private = self.der_decode(
2063 asn1Spec=krb5_asn1.EncTicketPart())
2065 encpart_private = None
2066 self.assertIsNotNone(encpart_decryption_key)
2067 if encpart_decryption_key is not None:
2068 self.assertElementEqual(encpart, 'etype',
2069 encpart_decryption_key.etype)
2070 if self.strict_checking:
2071 self.assertElementKVNO(encpart, 'kvno',
2072 encpart_decryption_key.kvno)
2073 rep_decpart = encpart_decryption_key.decrypt(
2074 encpart_decryption_usage,
2076 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2077 # application tag 26
2079 encpart_private = self.der_decode(
2081 asn1Spec=rep_encpart_asn1Spec())
2083 encpart_private = self.der_decode(
2085 asn1Spec=krb5_asn1.EncTGSRepPart())
2087 self.assertIsNotNone(check_kdc_private_fn)
2088 if check_kdc_private_fn is not None:
2089 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2090 rep, ticket_private, encpart_private,
2095 def check_fx_fast_data(self,
2100 expect_strengthen_key=True):
2101 fx_fast_data = self.der_decode(fx_fast_data,
2102 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2104 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2105 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2107 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2109 fast_response = self.der_decode(fast_rep,
2110 asn1Spec=krb5_asn1.KrbFastResponse())
2112 if expect_strengthen_key and self.strict_checking:
2113 self.assertIn('strengthen-key', fast_response)
2116 self.assertIn('finished', fast_response)
2118 # Ensure that the nonce matches the nonce in the body of the request
2120 nonce = kdc_exchange_dict['nonce']
2121 self.assertEqual(nonce, fast_response['nonce'])
2123 return fast_response
2125 def generic_check_kdc_private(self,
2132 kdc_options = kdc_exchange_dict['kdc_options']
2133 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2134 canonicalize = (canon_pos < len(kdc_options)
2135 and kdc_options[canon_pos] == '1')
2136 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2137 renewable = (renewable_pos < len(kdc_options)
2138 and kdc_options[renewable_pos] == '1')
2140 expected_crealm = kdc_exchange_dict['expected_crealm']
2141 expected_cname = kdc_exchange_dict['expected_cname']
2142 expected_srealm = kdc_exchange_dict['expected_srealm']
2143 expected_sname = kdc_exchange_dict['expected_sname']
2144 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2146 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2148 expected_flags = kdc_exchange_dict.get('expected_flags')
2149 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2151 ticket = self.getElementValue(rep, 'ticket')
2153 if ticket_checksum is not None:
2154 armor_key = kdc_exchange_dict['armor_key']
2155 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2157 ticket_session_key = None
2158 if ticket_private is not None:
2159 self.assertElementFlags(ticket_private, 'flags',
2162 self.assertElementPresent(ticket_private, 'key')
2163 ticket_key = self.getElementValue(ticket_private, 'key')
2164 self.assertIsNotNone(ticket_key)
2165 if ticket_key is not None: # Never None, but gives indentation
2166 self.assertElementPresent(ticket_key, 'keytype')
2167 self.assertElementPresent(ticket_key, 'keyvalue')
2168 ticket_session_key = self.EncryptionKey_import(ticket_key)
2169 self.assertElementEqualUTF8(ticket_private, 'crealm',
2171 if self.strict_checking:
2172 self.assertElementEqualPrincipal(ticket_private, 'cname',
2174 self.assertElementPresent(ticket_private, 'transited')
2175 self.assertElementPresent(ticket_private, 'authtime')
2176 if self.strict_checking:
2177 self.assertElementPresent(ticket_private, 'starttime')
2178 self.assertElementPresent(ticket_private, 'endtime')
2180 if self.strict_checking:
2181 self.assertElementPresent(ticket_private, 'renew-till')
2183 self.assertElementMissing(ticket_private, 'renew-till')
2184 if self.strict_checking:
2185 self.assertElementEqual(ticket_private, 'caddr', [])
2186 self.assertElementPresent(ticket_private, 'authorization-data')
2188 encpart_session_key = None
2189 if encpart_private is not None:
2190 self.assertElementPresent(encpart_private, 'key')
2191 encpart_key = self.getElementValue(encpart_private, 'key')
2192 self.assertIsNotNone(encpart_key)
2193 if encpart_key is not None: # Never None, but gives indentation
2194 self.assertElementPresent(encpart_key, 'keytype')
2195 self.assertElementPresent(encpart_key, 'keyvalue')
2196 encpart_session_key = self.EncryptionKey_import(encpart_key)
2197 self.assertElementPresent(encpart_private, 'last-req')
2198 self.assertElementEqual(encpart_private, 'nonce',
2199 kdc_exchange_dict['nonce'])
2200 if rep_msg_type == KRB_AS_REP:
2201 if self.strict_checking:
2202 self.assertElementPresent(encpart_private,
2205 self.assertElementMissing(encpart_private,
2207 self.assertElementFlags(encpart_private, 'flags',
2210 self.assertElementPresent(encpart_private, 'authtime')
2211 if self.strict_checking:
2212 self.assertElementPresent(encpart_private, 'starttime')
2213 self.assertElementPresent(encpart_private, 'endtime')
2215 if self.strict_checking:
2216 self.assertElementPresent(encpart_private, 'renew-till')
2218 self.assertElementMissing(encpart_private, 'renew-till')
2219 self.assertElementEqualUTF8(encpart_private, 'srealm',
2221 self.assertElementEqualPrincipal(encpart_private, 'sname',
2223 if self.strict_checking:
2224 self.assertElementEqual(encpart_private, 'caddr', [])
2226 sent_claims = self.sent_claims(kdc_exchange_dict)
2228 if self.strict_checking:
2229 if sent_claims or canonicalize:
2230 self.assertElementPresent(encpart_private,
2231 'encrypted-pa-data')
2232 enc_pa_dict = self.get_pa_dict(
2233 encpart_private['encrypted-pa-data'])
2235 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2237 (supported_etypes,) = struct.unpack(
2239 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2242 security.KERB_ENCTYPE_FAST_SUPPORTED
2245 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2248 security.KERB_ENCTYPE_CLAIMS_SUPPORTED
2251 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2253 # ClaimsCompIdFASTSupported registry key
2255 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2257 self.check_pac_options_claims_support(
2258 enc_pa_dict[PADATA_PAC_OPTIONS])
2260 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2262 self.assertElementEqual(encpart_private,
2263 'encrypted-pa-data',
2266 if ticket_session_key is not None and encpart_session_key is not None:
2267 self.assertEqual(ticket_session_key.etype,
2268 encpart_session_key.etype)
2269 self.assertEqual(ticket_session_key.key.contents,
2270 encpart_session_key.key.contents)
2271 if encpart_session_key is not None:
2272 session_key = encpart_session_key
2274 session_key = ticket_session_key
2275 ticket_creds = KerberosTicketCreds(
2278 crealm=expected_crealm,
2279 cname=expected_cname,
2280 srealm=expected_srealm,
2281 sname=expected_sname,
2282 decryption_key=ticket_decryption_key,
2283 ticket_private=ticket_private,
2284 encpart_private=encpart_private)
2286 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2288 def check_pac_options_claims_support(self, pac_options):
2289 pac_options = self.der_decode(pac_options,
2290 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2291 self.assertEqual('1', pac_options['options'][0]) # claims bit
2293 def generic_check_kdc_error(self,
2299 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2301 expected_anon = kdc_exchange_dict['expected_anon']
2302 expected_srealm = kdc_exchange_dict['expected_srealm']
2303 expected_sname = kdc_exchange_dict['expected_sname']
2304 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2306 sent_fast = self.sent_fast(kdc_exchange_dict)
2308 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2310 self.assertElementEqual(rep, 'pvno', 5)
2311 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2312 error_code = self.getElementValue(rep, 'error-code')
2313 self.assertIn(error_code, expected_error_mode)
2314 if self.strict_checking:
2315 self.assertElementMissing(rep, 'ctime')
2316 self.assertElementMissing(rep, 'cusec')
2317 self.assertElementPresent(rep, 'stime')
2318 self.assertElementPresent(rep, 'susec')
2319 # error-code checked above
2320 if self.strict_checking:
2321 self.assertElementMissing(rep, 'crealm')
2322 if expected_anon and not inner:
2323 expected_cname = self.PrincipalName_create(
2324 name_type=NT_WELLKNOWN,
2325 names=['WELLKNOWN', 'ANONYMOUS'])
2326 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2328 self.assertElementMissing(rep, 'cname')
2329 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2330 if sent_fast and error_code == KDC_ERR_GENERIC:
2331 self.assertElementEqualPrincipal(rep, 'sname',
2332 self.get_krbtgt_sname())
2334 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2335 self.assertElementMissing(rep, 'e-text')
2336 if (error_code == KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2337 or (rep_msg_type == KRB_TGS_REP
2339 or (sent_fast and fast_armor_type is not None
2340 and fast_armor_type != FX_FAST_ARMOR_AP_REQUEST)
2342 self.assertElementMissing(rep, 'e-data')
2344 edata = self.getElementValue(rep, 'e-data')
2345 if self.strict_checking:
2346 if error_code != KDC_ERR_GENERIC:
2347 # Predicting whether an ERR_GENERIC error contains e-data is
2349 self.assertIsNotNone(edata)
2350 if edata is not None:
2351 if rep_msg_type == KRB_TGS_REP and not sent_fast:
2352 rep_padata = [self.der_decode(edata,
2353 asn1Spec=krb5_asn1.PA_DATA())]
2355 rep_padata = self.der_decode(edata,
2356 asn1Spec=krb5_asn1.METHOD_DATA())
2357 self.assertGreater(len(rep_padata), 0)
2360 self.assertEqual(1, len(rep_padata))
2361 rep_pa_dict = self.get_pa_dict(rep_padata)
2362 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2364 armor_key = kdc_exchange_dict['armor_key']
2365 self.assertIsNotNone(armor_key)
2366 fast_response = self.check_fx_fast_data(
2368 rep_pa_dict[PADATA_FX_FAST],
2370 expect_strengthen_key=False)
2372 rep_padata = fast_response['padata']
2374 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2380 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2384 def check_rep_padata(self,
2390 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2392 req_body = kdc_exchange_dict['req_body']
2393 proposed_etypes = req_body['etype']
2394 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2396 sent_fast = self.sent_fast(kdc_exchange_dict)
2397 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2399 if rep_msg_type == KRB_TGS_REP:
2400 self.assertTrue(sent_fast)
2402 expect_etype_info2 = ()
2403 expect_etype_info = False
2404 unexpect_etype_info = True
2405 expected_aes_type = 0
2406 expected_rc4_type = 0
2407 if kcrypto.Enctype.RC4 in proposed_etypes:
2408 expect_etype_info = True
2409 for etype in proposed_etypes:
2410 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2411 expect_etype_info = False
2412 if etype not in client_as_etypes:
2414 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2415 if etype > expected_aes_type:
2416 expected_aes_type = etype
2417 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2418 unexpect_etype_info = False
2419 if etype > expected_rc4_type:
2420 expected_rc4_type = etype
2422 if expected_aes_type != 0:
2423 expect_etype_info2 += (expected_aes_type,)
2424 if expected_rc4_type != 0:
2425 expect_etype_info2 += (expected_rc4_type,)
2427 expected_patypes = ()
2428 if sent_fast and error_code != 0:
2429 expected_patypes += (PADATA_FX_ERROR,)
2430 expected_patypes += (PADATA_FX_COOKIE,)
2432 if rep_msg_type == KRB_TGS_REP:
2433 if not sent_fast and error_code != 0:
2434 expected_patypes += (PADATA_PW_SALT,)
2436 sent_claims = self.sent_claims(kdc_exchange_dict)
2437 if sent_claims and error_code not in (0, KDC_ERR_GENERIC):
2438 expected_patypes += (PADATA_PAC_OPTIONS,)
2439 elif error_code != KDC_ERR_GENERIC:
2440 if expect_etype_info:
2441 self.assertGreater(len(expect_etype_info2), 0)
2442 expected_patypes += (PADATA_ETYPE_INFO,)
2443 if len(expect_etype_info2) != 0:
2444 expected_patypes += (PADATA_ETYPE_INFO2,)
2446 if error_code != KDC_ERR_PREAUTH_FAILED:
2448 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2450 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2452 if not sent_enc_challenge:
2453 expected_patypes += (PADATA_PK_AS_REQ,)
2454 expected_patypes += (PADATA_PK_AS_REP_19,)
2456 if (self.kdc_fast_support
2458 and not sent_enc_challenge):
2459 expected_patypes += (PADATA_FX_FAST,)
2460 expected_patypes += (PADATA_FX_COOKIE,)
2462 if self.strict_checking:
2463 for i, patype in enumerate(expected_patypes):
2464 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
2465 self.assertEqual(len(rep_padata), len(expected_patypes))
2469 enc_timestamp = None
2470 enc_challenge = None
2478 for pa in rep_padata:
2479 patype = self.getElementValue(pa, 'padata-type')
2480 pavalue = self.getElementValue(pa, 'padata-value')
2481 if patype == PADATA_ETYPE_INFO2:
2482 self.assertIsNone(etype_info2)
2483 etype_info2 = self.der_decode(pavalue,
2484 asn1Spec=krb5_asn1.ETYPE_INFO2())
2486 if patype == PADATA_ETYPE_INFO:
2487 self.assertIsNone(etype_info)
2488 etype_info = self.der_decode(pavalue,
2489 asn1Spec=krb5_asn1.ETYPE_INFO())
2491 if patype == PADATA_ENC_TIMESTAMP:
2492 self.assertIsNone(enc_timestamp)
2493 enc_timestamp = pavalue
2494 self.assertEqual(len(enc_timestamp), 0)
2496 if patype == PADATA_ENCRYPTED_CHALLENGE:
2497 self.assertIsNone(enc_challenge)
2498 enc_challenge = pavalue
2500 if patype == PADATA_PK_AS_REQ:
2501 self.assertIsNone(pk_as_req)
2503 self.assertEqual(len(pk_as_req), 0)
2505 if patype == PADATA_PK_AS_REP_19:
2506 self.assertIsNone(pk_as_rep19)
2507 pk_as_rep19 = pavalue
2508 self.assertEqual(len(pk_as_rep19), 0)
2510 if patype == PADATA_FX_COOKIE:
2511 self.assertIsNone(fast_cookie)
2512 fast_cookie = pavalue
2513 self.assertIsNotNone(fast_cookie)
2515 if patype == PADATA_FX_ERROR:
2516 self.assertIsNone(fast_error)
2517 fast_error = pavalue
2518 self.assertIsNotNone(fast_error)
2520 if patype == PADATA_FX_FAST:
2521 self.assertIsNone(fx_fast)
2523 self.assertEqual(len(fx_fast), 0)
2525 if patype == PADATA_PAC_OPTIONS:
2526 self.assertIsNone(pac_options)
2527 pac_options = pavalue
2528 self.assertIsNotNone(pac_options)
2530 if patype == PADATA_PW_SALT:
2531 self.assertIsNone(pw_salt)
2533 self.assertIsNotNone(pw_salt)
2536 if fast_cookie is not None:
2537 kdc_exchange_dict['fast_cookie'] = fast_cookie
2539 if fast_error is not None:
2540 fast_error = self.der_decode(fast_error,
2541 asn1Spec=krb5_asn1.KRB_ERROR())
2542 self.generic_check_kdc_error(kdc_exchange_dict,
2547 if pac_options is not None:
2548 self.check_pac_options_claims_support(pac_options)
2550 if pw_salt is not None:
2551 self.assertEqual(12, len(pw_salt))
2553 status = int.from_bytes(pw_salt[:4], 'little')
2554 flags = int.from_bytes(pw_salt[8:], 'little')
2556 expected_status = kdc_exchange_dict['expected_status']
2557 self.assertEqual(expected_status, status)
2559 self.assertEqual(3, flags)
2561 self.assertIsNone(kdc_exchange_dict.get('expected_status'))
2563 if enc_challenge is not None:
2564 if not sent_enc_challenge:
2565 self.assertEqual(len(enc_challenge), 0)
2567 armor_key = kdc_exchange_dict['armor_key']
2568 self.assertIsNotNone(armor_key)
2570 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2572 kdc_challenge_key = self.generate_kdc_challenge_key(
2573 armor_key, preauth_key)
2575 # Ensure that the encrypted challenge FAST factor is supported
2577 if self.strict_checking:
2578 self.assertNotEqual(len(enc_challenge), 0)
2579 if len(enc_challenge) != 0:
2580 encrypted_challenge = self.der_decode(
2582 asn1Spec=krb5_asn1.EncryptedData())
2583 self.assertEqual(encrypted_challenge['etype'],
2584 kdc_challenge_key.etype)
2586 challenge = kdc_challenge_key.decrypt(
2587 KU_ENC_CHALLENGE_KDC,
2588 encrypted_challenge['cipher'])
2589 challenge = self.der_decode(
2591 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2593 # Retrieve the returned timestamp.
2594 rep_patime = challenge['patimestamp']
2595 self.assertIn('pausec', challenge)
2597 # Ensure the returned time is within five minutes of the
2599 rep_time = self.get_EpochFromKerberosTime(rep_patime)
2600 current_time = time.time()
2602 self.assertLess(current_time - 300, rep_time)
2603 self.assertLess(rep_time, current_time + 300)
2605 if all(etype not in client_as_etypes or etype not in proposed_etypes
2606 for etype in (kcrypto.Enctype.AES256,
2607 kcrypto.Enctype.AES128,
2608 kcrypto.Enctype.RC4)):
2609 self.assertIsNone(etype_info2)
2610 self.assertIsNone(etype_info)
2611 if rep_msg_type == KRB_AS_REP:
2612 if self.strict_checking:
2614 self.assertIsNotNone(enc_challenge)
2615 self.assertIsNone(enc_timestamp)
2617 self.assertIsNotNone(enc_timestamp)
2618 self.assertIsNone(enc_challenge)
2619 self.assertIsNotNone(pk_as_req)
2620 self.assertIsNotNone(pk_as_rep19)
2622 self.assertIsNone(enc_timestamp)
2623 self.assertIsNone(enc_challenge)
2624 self.assertIsNone(pk_as_req)
2625 self.assertIsNone(pk_as_rep19)
2628 if error_code != KDC_ERR_GENERIC:
2629 if self.strict_checking:
2630 self.assertIsNotNone(etype_info2)
2632 self.assertIsNone(etype_info2)
2633 if expect_etype_info:
2634 self.assertIsNotNone(etype_info)
2636 if self.strict_checking:
2637 self.assertIsNone(etype_info)
2638 if unexpect_etype_info:
2639 self.assertIsNone(etype_info)
2641 if error_code != KDC_ERR_GENERIC and self.strict_checking:
2642 self.assertGreaterEqual(len(etype_info2), 1)
2643 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2644 for i in range(0, len(etype_info2)):
2645 e = self.getElementValue(etype_info2[i], 'etype')
2646 self.assertEqual(e, expect_etype_info2[i])
2647 salt = self.getElementValue(etype_info2[i], 'salt')
2648 if e == kcrypto.Enctype.RC4:
2649 self.assertIsNone(salt)
2651 self.assertIsNotNone(salt)
2652 expected_salt = kdc_exchange_dict['expected_salt']
2653 if expected_salt is not None:
2654 self.assertEqual(salt, expected_salt)
2655 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2656 if self.strict_checking:
2657 self.assertIsNone(s2kparams)
2658 if etype_info is not None:
2659 self.assertEqual(len(etype_info), 1)
2660 e = self.getElementValue(etype_info[0], 'etype')
2661 self.assertEqual(e, kcrypto.Enctype.RC4)
2662 self.assertEqual(e, expect_etype_info2[0])
2663 salt = self.getElementValue(etype_info[0], 'salt')
2664 if self.strict_checking:
2665 self.assertIsNotNone(salt)
2666 self.assertEqual(len(salt), 0)
2668 if error_code not in (KDC_ERR_PREAUTH_FAILED,
2671 self.assertIsNotNone(enc_challenge)
2672 if self.strict_checking:
2673 self.assertIsNone(enc_timestamp)
2675 self.assertIsNotNone(enc_timestamp)
2676 if self.strict_checking:
2677 self.assertIsNone(enc_challenge)
2678 if not sent_enc_challenge:
2679 if self.strict_checking:
2680 self.assertIsNotNone(pk_as_req)
2681 self.assertIsNotNone(pk_as_rep19)
2683 self.assertIsNone(pk_as_req)
2684 self.assertIsNone(pk_as_rep19)
2686 if self.strict_checking:
2687 self.assertIsNone(enc_timestamp)
2688 self.assertIsNone(enc_challenge)
2689 self.assertIsNone(pk_as_req)
2690 self.assertIsNone(pk_as_rep19)
2694 def generate_simple_fast(self,
2702 armor_key = kdc_exchange_dict['armor_key']
2704 fast_req = self.KRB_FAST_REQ_create(fast_options,
2707 fast_req = self.der_encode(fast_req,
2708 asn1Spec=krb5_asn1.KrbFastReq())
2709 fast_req = self.EncryptedData_create(armor_key,
2713 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2717 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2718 fx_fast_request = self.der_encode(
2720 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2722 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2727 def generate_ap_req(self,
2733 tgt = kdc_exchange_dict['armor_tgt']
2734 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2736 req_body_checksum = None
2738 tgt = kdc_exchange_dict['tgt']
2739 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2740 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2742 req_body_blob = self.der_encode(req_body,
2743 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2745 req_body_checksum = self.Checksum_create(tgt.session_key,
2746 KU_TGS_REQ_AUTH_CKSUM,
2748 ctype=body_checksum_type)
2750 auth_data = kdc_exchange_dict['auth_data']
2753 if authenticator_subkey is not None:
2754 subkey_obj = authenticator_subkey.export_obj()
2755 seq_number = random.randint(0, 0xfffffffe)
2756 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2757 authenticator_obj = self.Authenticator_create(
2760 cksum=req_body_checksum,
2764 seq_number=seq_number,
2765 authorization_data=auth_data)
2766 authenticator_blob = self.der_encode(
2768 asn1Spec=krb5_asn1.Authenticator())
2770 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2771 authenticator = self.EncryptedData_create(tgt.session_key,
2775 ap_options = krb5_asn1.APOptions('0')
2776 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2778 authenticator=authenticator)
2779 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2783 def generate_simple_tgs_padata(self,
2787 ap_req = self.generate_ap_req(kdc_exchange_dict,
2791 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2792 padata = [pa_tgs_req]
2794 return padata, req_body
2796 def get_preauth_key(self, kdc_exchange_dict):
2797 msg_type = kdc_exchange_dict['rep_msg_type']
2799 if msg_type == KRB_AS_REP:
2800 key = kdc_exchange_dict['preauth_key']
2801 usage = KU_AS_REP_ENC_PART
2803 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2804 if authenticator_subkey is not None:
2805 key = authenticator_subkey
2806 usage = KU_TGS_REP_ENC_PART_SUB_KEY
2808 tgt = kdc_exchange_dict['tgt']
2809 key = tgt.session_key
2810 usage = KU_TGS_REP_ENC_PART_SESSION
2812 self.assertIsNotNone(key)
2816 def generate_armor_key(self, subkey, session_key):
2817 armor_key = kcrypto.cf2(subkey.key,
2821 armor_key = Krb5EncryptionKey(armor_key, None)
2825 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
2826 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
2830 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
2833 return strengthen_reply_key
2835 def generate_client_challenge_key(self, armor_key, longterm_key):
2836 client_challenge_key = kcrypto.cf2(armor_key.key,
2838 b'clientchallengearmor',
2839 b'challengelongterm')
2840 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
2842 return client_challenge_key
2844 def generate_kdc_challenge_key(self, armor_key, longterm_key):
2845 kdc_challenge_key = kcrypto.cf2(armor_key.key,
2847 b'kdcchallengearmor',
2848 b'challengelongterm')
2849 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
2851 return kdc_challenge_key
2853 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
2854 expected_type = expected_checksum['cksumtype']
2855 self.assertEqual(armor_key.ctype, expected_type)
2857 ticket_blob = self.der_encode(ticket,
2858 asn1Spec=krb5_asn1.Ticket())
2859 checksum = self.Checksum_create(armor_key,
2862 self.assertEqual(expected_checksum, checksum)
2864 def replace_pac(self, auth_data, new_pac, expect_pac=True):
2865 if new_pac is not None:
2866 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
2867 self.assertElementPresent(new_pac, 'ad-data')
2874 for authdata_elem in auth_data:
2875 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
2876 ad_relevant = self.der_decode(
2877 authdata_elem['ad-data'],
2878 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
2881 for relevant_elem in ad_relevant:
2882 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
2883 self.assertIsNone(old_pac, 'Multiple PACs detected')
2884 old_pac = relevant_elem['ad-data']
2886 if new_pac is not None:
2887 relevant_elems.append(new_pac)
2889 relevant_elems.append(relevant_elem)
2891 self.assertIsNotNone(old_pac, 'Expected PAC')
2893 ad_relevant = self.der_encode(
2895 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
2897 authdata_elem = self.AuthorizationData_create(AD_IF_RELEVANT,
2900 new_auth_data.append(authdata_elem)
2903 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
2905 return new_auth_data, old_pac
2907 def get_outer_pa_dict(self, kdc_exchange_dict):
2908 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
2910 def get_fast_pa_dict(self, kdc_exchange_dict):
2911 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
2916 return self.get_outer_pa_dict(kdc_exchange_dict)
2918 def sent_fast(self, kdc_exchange_dict):
2919 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
2921 return PADATA_FX_FAST in outer_pa_dict
2923 def sent_enc_challenge(self, kdc_exchange_dict):
2924 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
2926 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
2928 def sent_claims(self, kdc_exchange_dict):
2929 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
2931 if PADATA_PAC_OPTIONS not in fast_pa_dict:
2934 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
2935 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2936 pac_options = pac_options['options']
2937 claims_pos = len(tuple(krb5_asn1.PACOptionFlags('claims'))) - 1
2939 return (claims_pos < len(pac_options)
2940 and pac_options[claims_pos] == '1')
2942 def get_krbtgt_sname(self):
2943 krbtgt_creds = self.get_krbtgt_creds()
2944 krbtgt_username = krbtgt_creds.get_username()
2945 krbtgt_realm = krbtgt_creds.get_realm()
2946 krbtgt_sname = self.PrincipalName_create(
2947 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
2951 def _test_as_exchange(self,
2957 expected_error_mode,
2966 expected_flags=None,
2967 unexpected_flags=None,
2969 ticket_decryption_key=None,
2974 def _generate_padata_copy(_kdc_exchange_dict,
2977 return padata, req_body
2979 if not expected_error_mode:
2980 check_error_fn = None
2981 check_rep_fn = self.generic_check_kdc_rep
2983 check_error_fn = self.generic_check_kdc_error
2986 if padata is not None:
2987 generate_padata_fn = _generate_padata_copy
2989 generate_padata_fn = None
2991 kdc_exchange_dict = self.as_exchange_dict(
2992 expected_crealm=expected_crealm,
2993 expected_cname=expected_cname,
2994 expected_srealm=expected_srealm,
2995 expected_sname=expected_sname,
2996 ticket_decryption_key=ticket_decryption_key,
2997 generate_padata_fn=generate_padata_fn,
2998 check_error_fn=check_error_fn,
2999 check_rep_fn=check_rep_fn,
3000 check_kdc_private_fn=self.generic_check_kdc_private,
3001 expected_error_mode=expected_error_mode,
3002 client_as_etypes=client_as_etypes,
3003 expected_salt=expected_salt,
3004 expected_flags=expected_flags,
3005 unexpected_flags=unexpected_flags,
3006 preauth_key=preauth_key,
3007 kdc_options=str(kdc_options),
3008 pac_request=pac_request,
3009 pac_options=pac_options,
3012 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3019 return rep, kdc_exchange_dict