be49f16b1f78733b027579c8efead6f433577c47
[samba.git] / python / samba / tests / krb5 / raw_testcase.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
28
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
33
34 from pyasn1.codec.ber.encoder import BitStringEncoder
35
36 from samba.credentials import Credentials
37 from samba.dcerpc import security
38 from samba.gensec import FEATURE_SEAL
39
40 import samba.tests
41 from samba.tests import TestCaseInTempDir
42
43 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
44 from samba.tests.krb5.rfc4120_constants import (
45     AD_IF_RELEVANT,
46     AD_WIN2K_PAC,
47     FX_FAST_ARMOR_AP_REQUEST,
48     KDC_ERR_GENERIC,
49     KDC_ERR_PREAUTH_FAILED,
50     KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
51     KRB_AP_REQ,
52     KRB_AS_REP,
53     KRB_AS_REQ,
54     KRB_ERROR,
55     KRB_TGS_REP,
56     KRB_TGS_REQ,
57     KU_AP_REQ_AUTH,
58     KU_AS_REP_ENC_PART,
59     KU_ENC_CHALLENGE_KDC,
60     KU_FAST_ENC,
61     KU_FAST_FINISHED,
62     KU_FAST_REP,
63     KU_FAST_REQ_CHKSUM,
64     KU_NON_KERB_CKSUM_SALT,
65     KU_TGS_REP_ENC_PART_SESSION,
66     KU_TGS_REP_ENC_PART_SUB_KEY,
67     KU_TGS_REQ_AUTH,
68     KU_TGS_REQ_AUTH_CKSUM,
69     KU_TGS_REQ_AUTH_DAT_SESSION,
70     KU_TGS_REQ_AUTH_DAT_SUBKEY,
71     KU_TICKET,
72     NT_SRV_INST,
73     NT_WELLKNOWN,
74     PADATA_ENCRYPTED_CHALLENGE,
75     PADATA_ENC_TIMESTAMP,
76     PADATA_ETYPE_INFO,
77     PADATA_ETYPE_INFO2,
78     PADATA_FOR_USER,
79     PADATA_FX_COOKIE,
80     PADATA_FX_ERROR,
81     PADATA_FX_FAST,
82     PADATA_KDC_REQ,
83     PADATA_PAC_OPTIONS,
84     PADATA_PAC_REQUEST,
85     PADATA_PK_AS_REQ,
86     PADATA_PK_AS_REP_19,
87     PADATA_PW_SALT,
88     PADATA_SUPPORTED_ETYPES
89 )
90 import samba.tests.krb5.kcrypto as kcrypto
91
92
93 def BitStringEncoder_encodeValue32(
94         self, value, asn1Spec, encodeFun, **options):
95     #
96     # BitStrings like KDCOptions or TicketFlags should at least
97     # be 32-Bit on the wire
98     #
99     if asn1Spec is not None:
100         # TODO: try to avoid ASN.1 schema instantiation
101         value = asn1Spec.clone(value)
102
103     valueLength = len(value)
104     if valueLength % 8:
105         alignedValue = value << (8 - valueLength % 8)
106     else:
107         alignedValue = value
108
109     substrate = alignedValue.asOctets()
110     length = len(substrate)
111     # We need at least 32-Bit / 4-Bytes
112     if length < 4:
113         padding = 4 - length
114     else:
115         padding = 0
116     ret = b'\x00' + substrate + (b'\x00' * padding)
117     return ret, False, True
118
119
120 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
121
122
123 def BitString_NamedValues_prettyPrint(self, scope=0):
124     ret = "%s" % self.asBinary()
125     bits = []
126     highest_bit = 32
127     for byte in self.asNumbers():
128         for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
129             mask = 1 << bit
130             if byte & mask:
131                 val = 1
132             else:
133                 val = 0
134             bits.append(val)
135     if len(bits) < highest_bit:
136         for bitPosition in range(len(bits), highest_bit):
137             bits.append(0)
138     indent = " " * scope
139     delim = ": (\n%s " % indent
140     for bitPosition in range(highest_bit):
141         if bitPosition in self.prettyPrintNamedValues:
142             name = self.prettyPrintNamedValues[bitPosition]
143         elif bits[bitPosition] != 0:
144             name = "unknown-bit-%u" % bitPosition
145         else:
146             continue
147         ret += "%s%s:%u" % (delim, name, bits[bitPosition])
148         delim = ",\n%s " % indent
149     ret += "\n%s)" % indent
150     return ret
151
152
153 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
154     krb5_asn1.TicketFlagsValues.namedValues
155 krb5_asn1.TicketFlags.namedValues =\
156     krb5_asn1.TicketFlagsValues.namedValues
157 krb5_asn1.TicketFlags.prettyPrint =\
158     BitString_NamedValues_prettyPrint
159 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
160     krb5_asn1.KDCOptionsValues.namedValues
161 krb5_asn1.KDCOptions.namedValues =\
162     krb5_asn1.KDCOptionsValues.namedValues
163 krb5_asn1.KDCOptions.prettyPrint =\
164     BitString_NamedValues_prettyPrint
165 krb5_asn1.APOptions.prettyPrintNamedValues =\
166     krb5_asn1.APOptionsValues.namedValues
167 krb5_asn1.APOptions.namedValues =\
168     krb5_asn1.APOptionsValues.namedValues
169 krb5_asn1.APOptions.prettyPrint =\
170     BitString_NamedValues_prettyPrint
171 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
172     krb5_asn1.PACOptionFlagsValues.namedValues
173 krb5_asn1.PACOptionFlags.namedValues =\
174     krb5_asn1.PACOptionFlagsValues.namedValues
175 krb5_asn1.PACOptionFlags.prettyPrint =\
176     BitString_NamedValues_prettyPrint
177
178
179 def Integer_NamedValues_prettyPrint(self, scope=0):
180     intval = int(self)
181     if intval in self.prettyPrintNamedValues:
182         name = self.prettyPrintNamedValues[intval]
183     else:
184         name = "<__unknown__>"
185     ret = "%d (0x%x) %s" % (intval, intval, name)
186     return ret
187
188
189 krb5_asn1.NameType.prettyPrintNamedValues =\
190     krb5_asn1.NameTypeValues.namedValues
191 krb5_asn1.NameType.prettyPrint =\
192     Integer_NamedValues_prettyPrint
193 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
194     krb5_asn1.AuthDataTypeValues.namedValues
195 krb5_asn1.AuthDataType.prettyPrint =\
196     Integer_NamedValues_prettyPrint
197 krb5_asn1.PADataType.prettyPrintNamedValues =\
198     krb5_asn1.PADataTypeValues.namedValues
199 krb5_asn1.PADataType.prettyPrint =\
200     Integer_NamedValues_prettyPrint
201 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
202     krb5_asn1.EncryptionTypeValues.namedValues
203 krb5_asn1.EncryptionType.prettyPrint =\
204     Integer_NamedValues_prettyPrint
205 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
206     krb5_asn1.ChecksumTypeValues.namedValues
207 krb5_asn1.ChecksumType.prettyPrint =\
208     Integer_NamedValues_prettyPrint
209 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
210     krb5_asn1.KerbErrorDataTypeValues.namedValues
211 krb5_asn1.KerbErrorDataType.prettyPrint =\
212     Integer_NamedValues_prettyPrint
213
214
215 class Krb5EncryptionKey:
216     def __init__(self, key, kvno):
217         EncTypeChecksum = {
218             kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
219             kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
220             kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
221         }
222         self.key = key
223         self.etype = key.enctype
224         self.ctype = EncTypeChecksum[self.etype]
225         self.kvno = kvno
226
227     def encrypt(self, usage, plaintext):
228         ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
229         return ciphertext
230
231     def decrypt(self, usage, ciphertext):
232         plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
233         return plaintext
234
235     def make_zeroed_checksum(self, ctype=None):
236         if ctype is None:
237             ctype = self.ctype
238
239         checksum_len = kcrypto.checksum_len(ctype)
240         return bytes(checksum_len)
241
242     def make_checksum(self, usage, plaintext, ctype=None):
243         if ctype is None:
244             ctype = self.ctype
245         cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
246         return cksum
247
248     def verify_checksum(self, usage, plaintext, ctype, cksum):
249         if self.ctype != ctype:
250             raise AssertionError(f'{self.ctype} != {ctype}')
251
252         kcrypto.verify_checksum(ctype,
253                                 self.key,
254                                 usage,
255                                 plaintext,
256                                 cksum)
257
258     def export_obj(self):
259         EncryptionKey_obj = {
260             'keytype': self.etype,
261             'keyvalue': self.key.contents,
262         }
263         return EncryptionKey_obj
264
265
266 class KerberosCredentials(Credentials):
267     def __init__(self):
268         super(KerberosCredentials, self).__init__()
269         all_enc_types = 0
270         all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
271         all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
272         all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
273
274         self.as_supported_enctypes = all_enc_types
275         self.tgs_supported_enctypes = all_enc_types
276         self.ap_supported_enctypes = all_enc_types
277
278         self.kvno = None
279         self.forced_keys = {}
280
281         self.forced_salt = None
282
283         self.dn = None
284
285     def set_as_supported_enctypes(self, value):
286         self.as_supported_enctypes = int(value)
287
288     def set_tgs_supported_enctypes(self, value):
289         self.tgs_supported_enctypes = int(value)
290
291     def set_ap_supported_enctypes(self, value):
292         self.ap_supported_enctypes = int(value)
293
294     def _get_krb5_etypes(self, supported_enctypes):
295         etypes = ()
296
297         if supported_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
298             etypes += (kcrypto.Enctype.AES256,)
299         if supported_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
300             etypes += (kcrypto.Enctype.AES128,)
301         if supported_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
302             etypes += (kcrypto.Enctype.RC4,)
303
304         return etypes
305
306     def get_as_krb5_etypes(self):
307         return self._get_krb5_etypes(self.as_supported_enctypes)
308
309     def get_tgs_krb5_etypes(self):
310         return self._get_krb5_etypes(self.tgs_supported_enctypes)
311
312     def get_ap_krb5_etypes(self):
313         return self._get_krb5_etypes(self.ap_supported_enctypes)
314
315     def set_kvno(self, kvno):
316         # Sign-extend from 32 bits.
317         if kvno & 1 << 31:
318             kvno |= -1 << 31
319         self.kvno = kvno
320
321     def get_kvno(self):
322         return self.kvno
323
324     def set_forced_key(self, etype, hexkey):
325         etype = int(etype)
326         contents = binascii.a2b_hex(hexkey)
327         key = kcrypto.Key(etype, contents)
328         self.forced_keys[etype] = Krb5EncryptionKey(key, self.kvno)
329
330     def get_forced_key(self, etype):
331         etype = int(etype)
332         return self.forced_keys.get(etype)
333
334     def set_forced_salt(self, salt):
335         self.forced_salt = bytes(salt)
336
337     def get_forced_salt(self):
338         return self.forced_salt
339
340     def get_salt(self):
341         if self.forced_salt is not None:
342             return self.forced_salt
343
344         if self.get_workstation():
345             salt_string = '%shost%s.%s' % (
346                 self.get_realm().upper(),
347                 self.get_username().lower().rsplit('$', 1)[0],
348                 self.get_realm().lower())
349         else:
350             salt_string = self.get_realm().upper() + self.get_username()
351
352         return salt_string.encode('utf-8')
353
354     def set_dn(self, dn):
355         self.dn = dn
356
357     def get_dn(self):
358         return self.dn
359
360
361 class KerberosTicketCreds:
362     def __init__(self, ticket, session_key,
363                  crealm=None, cname=None,
364                  srealm=None, sname=None,
365                  decryption_key=None,
366                  ticket_private=None,
367                  encpart_private=None):
368         self.ticket = ticket
369         self.session_key = session_key
370         self.crealm = crealm
371         self.cname = cname
372         self.srealm = srealm
373         self.sname = sname
374         self.decryption_key = decryption_key
375         self.ticket_private = ticket_private
376         self.encpart_private = encpart_private
377
378
379 class RawKerberosTest(TestCaseInTempDir):
380     """A raw Kerberos Test case."""
381
382     etypes_to_test = (
383         {"value": -1111, "name": "dummy", },
384         {"value": kcrypto.Enctype.AES256, "name": "aes128", },
385         {"value": kcrypto.Enctype.AES128, "name": "aes256", },
386         {"value": kcrypto.Enctype.RC4, "name": "rc4", },
387     )
388
389     setup_etype_test_permutations_done = False
390
391     @classmethod
392     def setup_etype_test_permutations(cls):
393         if cls.setup_etype_test_permutations_done:
394             return
395
396         res = []
397
398         num_idxs = len(cls.etypes_to_test)
399         permutations = []
400         for num in range(1, num_idxs + 1):
401             chunk = list(itertools.permutations(range(num_idxs), num))
402             for e in chunk:
403                 el = list(e)
404                 permutations.append(el)
405
406         for p in permutations:
407             name = None
408             etypes = ()
409             for idx in p:
410                 n = cls.etypes_to_test[idx]["name"]
411                 if name is None:
412                     name = n
413                 else:
414                     name += "_%s" % n
415                 etypes += (cls.etypes_to_test[idx]["value"],)
416
417             r = {"name": name, "etypes": etypes, }
418             res.append(r)
419
420         cls.etype_test_permutations = res
421         cls.setup_etype_test_permutations_done = True
422
423     @classmethod
424     def etype_test_permutation_name_idx(cls):
425         cls.setup_etype_test_permutations()
426         res = []
427         idx = 0
428         for e in cls.etype_test_permutations:
429             r = (e['name'], idx)
430             idx += 1
431             res.append(r)
432         return res
433
434     def etype_test_permutation_by_idx(self, idx):
435         e = self.etype_test_permutations[idx]
436         return (e['name'], e['etypes'])
437
438     @classmethod
439     def setUpClass(cls):
440         super().setUpClass()
441
442         cls.host = samba.tests.env_get_var_value('SERVER')
443         cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
444
445         # A dictionary containing credentials that have already been
446         # obtained.
447         cls.creds_dict = {}
448
449         cls.kdc_fast_support = False
450
451     def setUp(self):
452         super().setUp()
453         self.do_asn1_print = False
454         self.do_hexdump = False
455
456         strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
457                                                         allow_missing=True)
458         if strict_checking is None:
459             strict_checking = '1'
460         self.strict_checking = bool(int(strict_checking))
461
462         self.s = None
463
464         self.unspecified_kvno = object()
465
466     def tearDown(self):
467         self._disconnect("tearDown")
468         super().tearDown()
469
470     def _disconnect(self, reason):
471         if self.s is None:
472             return
473         self.s.close()
474         self.s = None
475         if self.do_hexdump:
476             sys.stderr.write("disconnect[%s]\n" % reason)
477
478     def _connect_tcp(self, host):
479         tcp_port = 88
480         try:
481             self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
482                                         socket.SOCK_STREAM, socket.SOL_TCP,
483                                         0)
484             self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
485             self.s.settimeout(10)
486             self.s.connect(self.a[0][4])
487         except socket.error:
488             self.s.close()
489             raise
490         except IOError:
491             self.s.close()
492             raise
493
494     def connect(self, host):
495         self.assertNotConnected()
496         self._connect_tcp(host)
497         if self.do_hexdump:
498             sys.stderr.write("connected[%s]\n" % host)
499
500     def env_get_var(self, varname, prefix,
501                     fallback_default=True,
502                     allow_missing=False):
503         val = None
504         if prefix is not None:
505             allow_missing_prefix = allow_missing or fallback_default
506             val = samba.tests.env_get_var_value(
507                 '%s_%s' % (prefix, varname),
508                 allow_missing=allow_missing_prefix)
509         else:
510             fallback_default = True
511         if val is None and fallback_default:
512             val = samba.tests.env_get_var_value(varname,
513                                                 allow_missing=allow_missing)
514         return val
515
516     def _get_krb5_creds_from_env(self, prefix,
517                                  default_username=None,
518                                  allow_missing_password=False,
519                                  allow_missing_keys=True,
520                                  require_strongest_key=False):
521         c = KerberosCredentials()
522         c.guess()
523
524         domain = self.env_get_var('DOMAIN', prefix)
525         realm = self.env_get_var('REALM', prefix)
526         allow_missing_username = default_username is not None
527         username = self.env_get_var('USERNAME', prefix,
528                                     fallback_default=False,
529                                     allow_missing=allow_missing_username)
530         if username is None:
531             username = default_username
532         password = self.env_get_var('PASSWORD', prefix,
533                                     fallback_default=False,
534                                     allow_missing=allow_missing_password)
535         c.set_domain(domain)
536         c.set_realm(realm)
537         c.set_username(username)
538         if password is not None:
539             c.set_password(password)
540         as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
541                                                  prefix, allow_missing=True)
542         if as_supported_enctypes is not None:
543             c.set_as_supported_enctypes(as_supported_enctypes)
544         tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
545                                                   prefix, allow_missing=True)
546         if tgs_supported_enctypes is not None:
547             c.set_tgs_supported_enctypes(tgs_supported_enctypes)
548         ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
549                                                  prefix, allow_missing=True)
550         if ap_supported_enctypes is not None:
551             c.set_ap_supported_enctypes(ap_supported_enctypes)
552
553         if require_strongest_key:
554             kvno_allow_missing = False
555             if password is None:
556                 aes256_allow_missing = False
557             else:
558                 aes256_allow_missing = True
559         else:
560             kvno_allow_missing = allow_missing_keys
561             aes256_allow_missing = allow_missing_keys
562         kvno = self.env_get_var('KVNO', prefix,
563                                 fallback_default=False,
564                                 allow_missing=kvno_allow_missing)
565         if kvno is not None:
566             c.set_kvno(kvno)
567         aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
568                                       fallback_default=False,
569                                       allow_missing=aes256_allow_missing)
570         if aes256_key is not None:
571             c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
572         aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
573                                       fallback_default=False,
574                                       allow_missing=True)
575         if aes128_key is not None:
576             c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
577         rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
578                                    fallback_default=False, allow_missing=True)
579         if rc4_key is not None:
580             c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
581
582         if not allow_missing_keys:
583             self.assertTrue(c.forced_keys,
584                             'Please supply %s encryption keys '
585                             'in environment' % prefix)
586
587         return c
588
589     def _get_krb5_creds(self,
590                         prefix,
591                         default_username=None,
592                         allow_missing_password=False,
593                         allow_missing_keys=True,
594                         require_strongest_key=False,
595                         fallback_creds_fn=None):
596         if prefix in self.creds_dict:
597             return self.creds_dict[prefix]
598
599         # We don't have the credentials already
600         creds = None
601         env_err = None
602         try:
603             # Try to obtain them from the environment
604             creds = self._get_krb5_creds_from_env(
605                 prefix,
606                 default_username=default_username,
607                 allow_missing_password=allow_missing_password,
608                 allow_missing_keys=allow_missing_keys,
609                 require_strongest_key=require_strongest_key)
610         except Exception as err:
611             # An error occurred, so save it for later
612             env_err = err
613         else:
614             self.assertIsNotNone(creds)
615             # Save the obtained credentials
616             self.creds_dict[prefix] = creds
617             return creds
618
619         if fallback_creds_fn is not None:
620             try:
621                 # Try to use the fallback method
622                 creds = fallback_creds_fn()
623             except Exception as err:
624                 print("ERROR FROM ENV: %r" % (env_err))
625                 print("FALLBACK-FN: %s" % (fallback_creds_fn))
626                 print("FALLBACK-ERROR: %r" % (err))
627             else:
628                 self.assertIsNotNone(creds)
629                 # Save the obtained credentials
630                 self.creds_dict[prefix] = creds
631                 return creds
632
633         # Both methods failed, so raise the exception from the
634         # environment method
635         raise env_err
636
637     def get_user_creds(self,
638                        allow_missing_password=False,
639                        allow_missing_keys=True):
640         c = self._get_krb5_creds(prefix=None,
641                                  allow_missing_password=allow_missing_password,
642                                  allow_missing_keys=allow_missing_keys)
643         return c
644
645     def get_service_creds(self,
646                           allow_missing_password=False,
647                           allow_missing_keys=True):
648         c = self._get_krb5_creds(prefix='SERVICE',
649                                  allow_missing_password=allow_missing_password,
650                                  allow_missing_keys=allow_missing_keys)
651         return c
652
653     def get_client_creds(self,
654                          allow_missing_password=False,
655                          allow_missing_keys=True):
656         c = self._get_krb5_creds(prefix='CLIENT',
657                                  allow_missing_password=allow_missing_password,
658                                  allow_missing_keys=allow_missing_keys)
659         return c
660
661     def get_server_creds(self,
662                          allow_missing_password=False,
663                          allow_missing_keys=True):
664         c = self._get_krb5_creds(prefix='SERVER',
665                                  allow_missing_password=allow_missing_password,
666                                  allow_missing_keys=allow_missing_keys)
667         return c
668
669     def get_admin_creds(self,
670                         allow_missing_password=False,
671                         allow_missing_keys=True):
672         c = self._get_krb5_creds(prefix='ADMIN',
673                                  allow_missing_password=allow_missing_password,
674                                  allow_missing_keys=allow_missing_keys)
675         c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
676         return c
677
678     def get_krbtgt_creds(self,
679                          require_keys=True,
680                          require_strongest_key=False):
681         if require_strongest_key:
682             self.assertTrue(require_keys)
683         c = self._get_krb5_creds(prefix='KRBTGT',
684                                  default_username='krbtgt',
685                                  allow_missing_password=True,
686                                  allow_missing_keys=not require_keys,
687                                  require_strongest_key=require_strongest_key)
688         return c
689
690     def get_anon_creds(self):
691         c = Credentials()
692         c.set_anonymous()
693         return c
694
695     def asn1_dump(self, name, obj, asn1_print=None):
696         if asn1_print is None:
697             asn1_print = self.do_asn1_print
698         if asn1_print:
699             if name is not None:
700                 sys.stderr.write("%s:\n%s" % (name, obj))
701             else:
702                 sys.stderr.write("%s" % (obj))
703
704     def hex_dump(self, name, blob, hexdump=None):
705         if hexdump is None:
706             hexdump = self.do_hexdump
707         if hexdump:
708             sys.stderr.write(
709                 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
710
711     def der_decode(
712             self,
713             blob,
714             asn1Spec=None,
715             native_encode=True,
716             asn1_print=None,
717             hexdump=None):
718         if asn1Spec is not None:
719             class_name = type(asn1Spec).__name__.split(':')[0]
720         else:
721             class_name = "<None-asn1Spec>"
722         self.hex_dump(class_name, blob, hexdump=hexdump)
723         obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
724         self.asn1_dump(None, obj, asn1_print=asn1_print)
725         if native_encode:
726             obj = pyasn1_native_encode(obj)
727         return obj
728
729     def der_encode(
730             self,
731             obj,
732             asn1Spec=None,
733             native_decode=True,
734             asn1_print=None,
735             hexdump=None):
736         if native_decode:
737             obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
738         class_name = type(obj).__name__.split(':')[0]
739         if class_name is not None:
740             self.asn1_dump(None, obj, asn1_print=asn1_print)
741         blob = pyasn1_der_encode(obj)
742         if class_name is not None:
743             self.hex_dump(class_name, blob, hexdump=hexdump)
744         return blob
745
746     def send_pdu(self, req, asn1_print=None, hexdump=None):
747         try:
748             k5_pdu = self.der_encode(
749                 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
750             header = struct.pack('>I', len(k5_pdu))
751             req_pdu = header
752             req_pdu += k5_pdu
753             self.hex_dump("send_pdu", header, hexdump=hexdump)
754             self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
755             while True:
756                 sent = self.s.send(req_pdu, 0)
757                 if sent == len(req_pdu):
758                     break
759                 req_pdu = req_pdu[sent:]
760         except socket.error as e:
761             self._disconnect("send_pdu: %s" % e)
762             raise
763         except IOError as e:
764             self._disconnect("send_pdu: %s" % e)
765             raise
766
767     def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
768         rep_pdu = None
769         try:
770             if timeout is not None:
771                 self.s.settimeout(timeout)
772             rep_pdu = self.s.recv(num_recv, 0)
773             self.s.settimeout(10)
774             if len(rep_pdu) == 0:
775                 self._disconnect("recv_raw: EOF")
776                 return None
777             self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
778         except socket.timeout:
779             self.s.settimeout(10)
780             sys.stderr.write("recv_raw: TIMEOUT\n")
781         except socket.error as e:
782             self._disconnect("recv_raw: %s" % e)
783             raise
784         except IOError as e:
785             self._disconnect("recv_raw: %s" % e)
786             raise
787         return rep_pdu
788
789     def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
790         rep_pdu = None
791         rep = None
792         raw_pdu = self.recv_raw(
793             num_recv=4, hexdump=hexdump, timeout=timeout)
794         if raw_pdu is None:
795             return (None, None)
796         header = struct.unpack(">I", raw_pdu[0:4])
797         k5_len = header[0]
798         if k5_len == 0:
799             return (None, "")
800         missing = k5_len
801         rep_pdu = b''
802         while missing > 0:
803             raw_pdu = self.recv_raw(
804                 num_recv=missing, hexdump=hexdump, timeout=timeout)
805             self.assertGreaterEqual(len(raw_pdu), 1)
806             rep_pdu += raw_pdu
807             missing = k5_len - len(rep_pdu)
808         k5_raw = self.der_decode(
809             rep_pdu,
810             asn1Spec=None,
811             native_encode=False,
812             asn1_print=False,
813             hexdump=False)
814         pvno = k5_raw['field-0']
815         self.assertEqual(pvno, 5)
816         msg_type = k5_raw['field-1']
817         self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
818         if msg_type == KRB_AS_REP:
819             asn1Spec = krb5_asn1.AS_REP()
820         elif msg_type == KRB_TGS_REP:
821             asn1Spec = krb5_asn1.TGS_REP()
822         elif msg_type == KRB_ERROR:
823             asn1Spec = krb5_asn1.KRB_ERROR()
824         rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
825                               asn1_print=asn1_print, hexdump=False)
826         return (rep, rep_pdu)
827
828     def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
829         (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
830                                            hexdump=hexdump,
831                                            timeout=timeout)
832         return rep
833
834     def assertIsConnected(self):
835         self.assertIsNotNone(self.s, msg="Not connected")
836
837     def assertNotConnected(self):
838         self.assertIsNone(self.s, msg="Is connected")
839
840     def send_recv_transaction(
841             self,
842             req,
843             asn1_print=None,
844             hexdump=None,
845             timeout=None,
846             to_rodc=False):
847         host = self.host if to_rodc else self.dc_host
848         self.connect(host)
849         try:
850             self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
851             rep = self.recv_pdu(
852                 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
853         except Exception:
854             self._disconnect("transaction failed")
855             raise
856         self._disconnect("transaction done")
857         return rep
858
859     def assertNoValue(self, value):
860         self.assertTrue(value.isNoValue)
861
862     def assertHasValue(self, value):
863         self.assertIsNotNone(value)
864
865     def getElementValue(self, obj, elem):
866         return obj.get(elem)
867
868     def assertElementMissing(self, obj, elem):
869         v = self.getElementValue(obj, elem)
870         self.assertIsNone(v)
871
872     def assertElementPresent(self, obj, elem):
873         v = self.getElementValue(obj, elem)
874         self.assertIsNotNone(v)
875         if self.strict_checking:
876             if isinstance(v, collections.abc.Container):
877                 self.assertNotEqual(0, len(v))
878
879     def assertElementEqual(self, obj, elem, value):
880         v = self.getElementValue(obj, elem)
881         self.assertIsNotNone(v)
882         self.assertEqual(v, value)
883
884     def assertElementEqualUTF8(self, obj, elem, value):
885         v = self.getElementValue(obj, elem)
886         self.assertIsNotNone(v)
887         self.assertEqual(v, bytes(value, 'utf8'))
888
889     def assertPrincipalEqual(self, princ1, princ2):
890         self.assertEqual(princ1['name-type'], princ2['name-type'])
891         self.assertEqual(
892             len(princ1['name-string']),
893             len(princ2['name-string']),
894             msg="princ1=%s != princ2=%s" % (princ1, princ2))
895         for idx in range(len(princ1['name-string'])):
896             self.assertEqual(
897                 princ1['name-string'][idx],
898                 princ2['name-string'][idx],
899                 msg="princ1=%s != princ2=%s" % (princ1, princ2))
900
901     def assertElementEqualPrincipal(self, obj, elem, value):
902         v = self.getElementValue(obj, elem)
903         self.assertIsNotNone(v)
904         v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
905         self.assertPrincipalEqual(v, value)
906
907     def assertElementKVNO(self, obj, elem, value):
908         v = self.getElementValue(obj, elem)
909         if value == "autodetect":
910             value = v
911         if value is not None:
912             self.assertIsNotNone(v)
913             # The value on the wire should never be 0
914             self.assertNotEqual(v, 0)
915             # unspecified_kvno means we don't know the kvno,
916             # but want to enforce its presence
917             if value is not self.unspecified_kvno:
918                 value = int(value)
919                 self.assertNotEqual(value, 0)
920                 self.assertEqual(v, value)
921         else:
922             self.assertIsNone(v)
923
924     def assertElementFlags(self, obj, elem, expected, unexpected):
925         v = self.getElementValue(obj, elem)
926         self.assertIsNotNone(v)
927         if expected is not None:
928             self.assertIsInstance(expected, krb5_asn1.KDCOptions)
929             for i, flag in enumerate(expected):
930                 if flag == 1:
931                     self.assertEqual('1', v[i],
932                                      f"'{expected.namedValues[i]}' "
933                                      f"expected in {v}")
934         if unexpected is not None:
935             self.assertIsInstance(unexpected, krb5_asn1.KDCOptions)
936             for i, flag in enumerate(unexpected):
937                 if flag == 1:
938                     self.assertEqual('0', v[i],
939                                      f"'{unexpected.namedValues[i]}' "
940                                      f"unexpected in {v}")
941
942     def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
943         if epoch is None:
944             epoch = time.time()
945         if offset is not None:
946             epoch = epoch + int(offset)
947         dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
948         return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
949
950     def get_KerberosTime(self, epoch=None, offset=None):
951         (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
952         return s
953
954     def get_EpochFromKerberosTime(self, kerberos_time):
955         if isinstance(kerberos_time, bytes):
956             kerberos_time = kerberos_time.decode()
957
958         epoch = datetime.datetime.strptime(kerberos_time,
959                                            '%Y%m%d%H%M%SZ')
960         epoch = epoch.replace(tzinfo=datetime.timezone.utc)
961         epoch = int(epoch.timestamp())
962
963         return epoch
964
965     def get_Nonce(self):
966         nonce_min = 0x7f000000
967         nonce_max = 0x7fffffff
968         v = random.randint(nonce_min, nonce_max)
969         return v
970
971     def get_pa_dict(self, pa_data):
972         pa_dict = {}
973
974         if pa_data is not None:
975             for pa in pa_data:
976                 pa_type = pa['padata-type']
977                 if pa_type in pa_dict:
978                     raise RuntimeError(f'Duplicate type {pa_type}')
979                 pa_dict[pa_type] = pa['padata-value']
980
981         return pa_dict
982
983     def SessionKey_create(self, etype, contents, kvno=None):
984         key = kcrypto.Key(etype, contents)
985         return Krb5EncryptionKey(key, kvno)
986
987     def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
988         self.assertIsNotNone(pwd)
989         self.assertIsNotNone(salt)
990         key = kcrypto.string_to_key(etype, pwd, salt)
991         return Krb5EncryptionKey(key, kvno)
992
993     def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
994         e = etype_info2['etype']
995
996         salt = etype_info2.get('salt')
997
998         if e == kcrypto.Enctype.RC4:
999             nthash = creds.get_nt_hash()
1000             return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1001
1002         password = creds.get_password()
1003         return self.PasswordKey_create(
1004             etype=e, pwd=password, salt=salt, kvno=kvno)
1005
1006     def TicketDecryptionKey_from_creds(self, creds, etype=None):
1007
1008         if etype is None:
1009             etypes = creds.get_tgs_krb5_etypes()
1010             etype = etypes[0]
1011
1012         forced_key = creds.get_forced_key(etype)
1013         if forced_key is not None:
1014             return forced_key
1015
1016         kvno = creds.get_kvno()
1017
1018         fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1019                     "nor a password specified, " % (
1020                         creds.get_username(), etype, kvno))
1021
1022         if etype == kcrypto.Enctype.RC4:
1023             nthash = creds.get_nt_hash()
1024             self.assertIsNotNone(nthash, msg=fail_msg)
1025             return self.SessionKey_create(etype=etype,
1026                                           contents=nthash,
1027                                           kvno=kvno)
1028
1029         password = creds.get_password()
1030         self.assertIsNotNone(password, msg=fail_msg)
1031         salt = creds.get_salt()
1032         return self.PasswordKey_create(etype=etype,
1033                                        pwd=password,
1034                                        salt=salt,
1035                                        kvno=kvno)
1036
1037     def RandomKey(self, etype):
1038         e = kcrypto._get_enctype_profile(etype)
1039         contents = samba.generate_random_bytes(e.keysize)
1040         return self.SessionKey_create(etype=etype, contents=contents)
1041
1042     def EncryptionKey_import(self, EncryptionKey_obj):
1043         return self.SessionKey_create(EncryptionKey_obj['keytype'],
1044                                       EncryptionKey_obj['keyvalue'])
1045
1046     def EncryptedData_create(self, key, usage, plaintext):
1047         # EncryptedData   ::= SEQUENCE {
1048         #         etype   [0] Int32 -- EncryptionType --,
1049         #         kvno    [1] Int32 OPTIONAL,
1050         #         cipher  [2] OCTET STRING -- ciphertext
1051         # }
1052         ciphertext = key.encrypt(usage, plaintext)
1053         EncryptedData_obj = {
1054             'etype': key.etype,
1055             'cipher': ciphertext
1056         }
1057         if key.kvno is not None:
1058             EncryptedData_obj['kvno'] = key.kvno
1059         return EncryptedData_obj
1060
1061     def Checksum_create(self, key, usage, plaintext, ctype=None):
1062         # Checksum        ::= SEQUENCE {
1063         #        cksumtype       [0] Int32,
1064         #        checksum        [1] OCTET STRING
1065         # }
1066         if ctype is None:
1067             ctype = key.ctype
1068         checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1069         Checksum_obj = {
1070             'cksumtype': ctype,
1071             'checksum': checksum,
1072         }
1073         return Checksum_obj
1074
1075     @classmethod
1076     def PrincipalName_create(cls, name_type, names):
1077         # PrincipalName   ::= SEQUENCE {
1078         #         name-type       [0] Int32,
1079         #         name-string     [1] SEQUENCE OF KerberosString
1080         # }
1081         PrincipalName_obj = {
1082             'name-type': name_type,
1083             'name-string': names,
1084         }
1085         return PrincipalName_obj
1086
1087     def AuthorizationData_create(self, ad_type, ad_data):
1088         # AuthorizationData ::= SEQUENCE {
1089         #         ad-type         [0] Int32,
1090         #         ad-data         [1] OCTET STRING
1091         # }
1092         AUTH_DATA_obj = {
1093             'ad-type': ad_type,
1094             'ad-data': ad_data
1095         }
1096         return AUTH_DATA_obj
1097
1098     def PA_DATA_create(self, padata_type, padata_value):
1099         # PA-DATA         ::= SEQUENCE {
1100         #         -- NOTE: first tag is [1], not [0]
1101         #         padata-type     [1] Int32,
1102         #         padata-value    [2] OCTET STRING -- might be encoded AP-REQ
1103         # }
1104         PA_DATA_obj = {
1105             'padata-type': padata_type,
1106             'padata-value': padata_value,
1107         }
1108         return PA_DATA_obj
1109
1110     def PA_ENC_TS_ENC_create(self, ts, usec):
1111         # PA-ENC-TS-ENC ::= SEQUENCE {
1112         #        patimestamp[0]          KerberosTime, -- client's time
1113         #        pausec[1]               krb5int32 OPTIONAL
1114         # }
1115         PA_ENC_TS_ENC_obj = {
1116             'patimestamp': ts,
1117             'pausec': usec,
1118         }
1119         return PA_ENC_TS_ENC_obj
1120
1121     def PA_PAC_OPTIONS_create(self, options):
1122         # PA-PAC-OPTIONS  ::= SEQUENCE {
1123         #         options         [0] PACOptionFlags
1124         # }
1125         PA_PAC_OPTIONS_obj = {
1126             'options': options
1127         }
1128         return PA_PAC_OPTIONS_obj
1129
1130     def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1131         # KrbFastArmor    ::= SEQUENCE {
1132         #         armor-type      [0] Int32,
1133         #         armor-value     [1] OCTET STRING,
1134         #         ...
1135         # }
1136         KRB_FAST_ARMOR_obj = {
1137             'armor-type': armor_type,
1138             'armor-value': armor_value
1139         }
1140         return KRB_FAST_ARMOR_obj
1141
1142     def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1143         # KrbFastReq      ::= SEQUENCE {
1144         #         fast-options    [0] FastOptions,
1145         #         padata          [1] SEQUENCE OF PA-DATA,
1146         #         req-body        [2] KDC-REQ-BODY,
1147         #         ...
1148         # }
1149         KRB_FAST_REQ_obj = {
1150             'fast-options': fast_options,
1151             'padata': padata,
1152             'req-body': req_body
1153         }
1154         return KRB_FAST_REQ_obj
1155
1156     def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1157         # KrbFastArmoredReq ::= SEQUENCE {
1158         #         armor           [0] KrbFastArmor OPTIONAL,
1159         #         req-checksum    [1] Checksum,
1160         #         enc-fast-req    [2] EncryptedData -- KrbFastReq --
1161         # }
1162         KRB_FAST_ARMORED_REQ_obj = {
1163             'req-checksum': req_checksum,
1164             'enc-fast-req': enc_fast_req
1165         }
1166         if armor is not None:
1167             KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1168         return KRB_FAST_ARMORED_REQ_obj
1169
1170     def PA_FX_FAST_REQUEST_create(self, armored_data):
1171         # PA-FX-FAST-REQUEST ::= CHOICE {
1172         #         armored-data    [0] KrbFastArmoredReq,
1173         #         ...
1174         # }
1175         PA_FX_FAST_REQUEST_obj = {
1176             'armored-data': armored_data
1177         }
1178         return PA_FX_FAST_REQUEST_obj
1179
1180     def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1181         # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1182         #         include-pac[0] BOOLEAN --If TRUE, and no pac present,
1183         #                                --    include PAC.
1184         #                                --If FALSE, and PAC present,
1185         #                                --    remove PAC.
1186         # }
1187         KERB_PA_PAC_REQUEST_obj = {
1188             'include-pac': include_pac,
1189         }
1190         if not pa_data_create:
1191             return KERB_PA_PAC_REQUEST_obj
1192         pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1193                                  asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1194         pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1195         return pa_data
1196
1197     def get_pa_pac_options(self, options):
1198         pac_options = self.PA_PAC_OPTIONS_create(options)
1199         pac_options = self.der_encode(pac_options,
1200                                       asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1201         pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1202
1203         return pac_options
1204
1205     def KDC_REQ_BODY_create(self,
1206                             kdc_options,
1207                             cname,
1208                             realm,
1209                             sname,
1210                             from_time,
1211                             till_time,
1212                             renew_time,
1213                             nonce,
1214                             etypes,
1215                             addresses,
1216                             additional_tickets,
1217                             EncAuthorizationData,
1218                             EncAuthorizationData_key,
1219                             EncAuthorizationData_usage,
1220                             asn1_print=None,
1221                             hexdump=None):
1222         # KDC-REQ-BODY    ::= SEQUENCE {
1223         #        kdc-options             [0] KDCOptions,
1224         #        cname                   [1] PrincipalName OPTIONAL
1225         #                                    -- Used only in AS-REQ --,
1226         #        realm                   [2] Realm
1227         #                                    -- Server's realm
1228         #                                    -- Also client's in AS-REQ --,
1229         #        sname                   [3] PrincipalName OPTIONAL,
1230         #        from                    [4] KerberosTime OPTIONAL,
1231         #        till                    [5] KerberosTime,
1232         #        rtime                   [6] KerberosTime OPTIONAL,
1233         #        nonce                   [7] UInt32,
1234         #        etype                   [8] SEQUENCE OF Int32
1235         #                                    -- EncryptionType
1236         #                                    -- in preference order --,
1237         #        addresses               [9] HostAddresses OPTIONAL,
1238         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1239         #                                    -- AuthorizationData --,
1240         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1241         #                                        -- NOTE: not empty
1242         # }
1243         if EncAuthorizationData is not None:
1244             enc_ad_plain = self.der_encode(
1245                 EncAuthorizationData,
1246                 asn1Spec=krb5_asn1.AuthorizationData(),
1247                 asn1_print=asn1_print,
1248                 hexdump=hexdump)
1249             enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1250                                                EncAuthorizationData_usage,
1251                                                enc_ad_plain)
1252         else:
1253             enc_ad = None
1254         KDC_REQ_BODY_obj = {
1255             'kdc-options': kdc_options,
1256             'realm': realm,
1257             'till': till_time,
1258             'nonce': nonce,
1259             'etype': etypes,
1260         }
1261         if cname is not None:
1262             KDC_REQ_BODY_obj['cname'] = cname
1263         if sname is not None:
1264             KDC_REQ_BODY_obj['sname'] = sname
1265         if from_time is not None:
1266             KDC_REQ_BODY_obj['from'] = from_time
1267         if renew_time is not None:
1268             KDC_REQ_BODY_obj['rtime'] = renew_time
1269         if addresses is not None:
1270             KDC_REQ_BODY_obj['addresses'] = addresses
1271         if enc_ad is not None:
1272             KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1273         if additional_tickets is not None:
1274             KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1275         return KDC_REQ_BODY_obj
1276
1277     def KDC_REQ_create(self,
1278                        msg_type,
1279                        padata,
1280                        req_body,
1281                        asn1Spec=None,
1282                        asn1_print=None,
1283                        hexdump=None):
1284         # KDC-REQ         ::= SEQUENCE {
1285         #        -- NOTE: first tag is [1], not [0]
1286         #        pvno            [1] INTEGER (5) ,
1287         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1288         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1289         #                            -- NOTE: not empty --,
1290         #        req-body        [4] KDC-REQ-BODY
1291         # }
1292         #
1293         KDC_REQ_obj = {
1294             'pvno': 5,
1295             'msg-type': msg_type,
1296             'req-body': req_body,
1297         }
1298         if padata is not None:
1299             KDC_REQ_obj['padata'] = padata
1300         if asn1Spec is not None:
1301             KDC_REQ_decoded = pyasn1_native_decode(
1302                 KDC_REQ_obj, asn1Spec=asn1Spec)
1303         else:
1304             KDC_REQ_decoded = None
1305         return KDC_REQ_obj, KDC_REQ_decoded
1306
1307     def AS_REQ_create(self,
1308                       padata,       # optional
1309                       kdc_options,  # required
1310                       cname,        # optional
1311                       realm,        # required
1312                       sname,        # optional
1313                       from_time,    # optional
1314                       till_time,    # required
1315                       renew_time,   # optional
1316                       nonce,        # required
1317                       etypes,       # required
1318                       addresses,    # optional
1319                       additional_tickets,
1320                       native_decoded_only=True,
1321                       asn1_print=None,
1322                       hexdump=None):
1323         # KDC-REQ         ::= SEQUENCE {
1324         #        -- NOTE: first tag is [1], not [0]
1325         #        pvno            [1] INTEGER (5) ,
1326         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1327         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1328         #                            -- NOTE: not empty --,
1329         #        req-body        [4] KDC-REQ-BODY
1330         # }
1331         #
1332         # KDC-REQ-BODY    ::= SEQUENCE {
1333         #        kdc-options             [0] KDCOptions,
1334         #        cname                   [1] PrincipalName OPTIONAL
1335         #                                    -- Used only in AS-REQ --,
1336         #        realm                   [2] Realm
1337         #                                    -- Server's realm
1338         #                                    -- Also client's in AS-REQ --,
1339         #        sname                   [3] PrincipalName OPTIONAL,
1340         #        from                    [4] KerberosTime OPTIONAL,
1341         #        till                    [5] KerberosTime,
1342         #        rtime                   [6] KerberosTime OPTIONAL,
1343         #        nonce                   [7] UInt32,
1344         #        etype                   [8] SEQUENCE OF Int32
1345         #                                    -- EncryptionType
1346         #                                    -- in preference order --,
1347         #        addresses               [9] HostAddresses OPTIONAL,
1348         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1349         #                                    -- AuthorizationData --,
1350         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1351         #                                        -- NOTE: not empty
1352         # }
1353         KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1354             kdc_options,
1355             cname,
1356             realm,
1357             sname,
1358             from_time,
1359             till_time,
1360             renew_time,
1361             nonce,
1362             etypes,
1363             addresses,
1364             additional_tickets,
1365             EncAuthorizationData=None,
1366             EncAuthorizationData_key=None,
1367             EncAuthorizationData_usage=None,
1368             asn1_print=asn1_print,
1369             hexdump=hexdump)
1370         obj, decoded = self.KDC_REQ_create(
1371             msg_type=KRB_AS_REQ,
1372             padata=padata,
1373             req_body=KDC_REQ_BODY_obj,
1374             asn1Spec=krb5_asn1.AS_REQ(),
1375             asn1_print=asn1_print,
1376             hexdump=hexdump)
1377         if native_decoded_only:
1378             return decoded
1379         return decoded, obj
1380
1381     def AP_REQ_create(self, ap_options, ticket, authenticator):
1382         # AP-REQ          ::= [APPLICATION 14] SEQUENCE {
1383         #        pvno            [0] INTEGER (5),
1384         #        msg-type        [1] INTEGER (14),
1385         #        ap-options      [2] APOptions,
1386         #        ticket          [3] Ticket,
1387         #        authenticator   [4] EncryptedData -- Authenticator
1388         # }
1389         AP_REQ_obj = {
1390             'pvno': 5,
1391             'msg-type': KRB_AP_REQ,
1392             'ap-options': ap_options,
1393             'ticket': ticket,
1394             'authenticator': authenticator,
1395         }
1396         return AP_REQ_obj
1397
1398     def Authenticator_create(
1399             self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1400             authorization_data):
1401         # -- Unencrypted authenticator
1402         # Authenticator   ::= [APPLICATION 2] SEQUENCE  {
1403         #        authenticator-vno       [0] INTEGER (5),
1404         #        crealm                  [1] Realm,
1405         #        cname                   [2] PrincipalName,
1406         #        cksum                   [3] Checksum OPTIONAL,
1407         #        cusec                   [4] Microseconds,
1408         #        ctime                   [5] KerberosTime,
1409         #        subkey                  [6] EncryptionKey OPTIONAL,
1410         #        seq-number              [7] UInt32 OPTIONAL,
1411         #        authorization-data      [8] AuthorizationData OPTIONAL
1412         # }
1413         Authenticator_obj = {
1414             'authenticator-vno': 5,
1415             'crealm': crealm,
1416             'cname': cname,
1417             'cusec': cusec,
1418             'ctime': ctime,
1419         }
1420         if cksum is not None:
1421             Authenticator_obj['cksum'] = cksum
1422         if subkey is not None:
1423             Authenticator_obj['subkey'] = subkey
1424         if seq_number is not None:
1425             Authenticator_obj['seq-number'] = seq_number
1426         if authorization_data is not None:
1427             Authenticator_obj['authorization-data'] = authorization_data
1428         return Authenticator_obj
1429
1430     def TGS_REQ_create(self,
1431                        padata,       # optional
1432                        cusec,
1433                        ctime,
1434                        ticket,
1435                        kdc_options,  # required
1436                        cname,        # optional
1437                        realm,        # required
1438                        sname,        # optional
1439                        from_time,    # optional
1440                        till_time,    # required
1441                        renew_time,   # optional
1442                        nonce,        # required
1443                        etypes,       # required
1444                        addresses,    # optional
1445                        EncAuthorizationData,
1446                        EncAuthorizationData_key,
1447                        additional_tickets,
1448                        ticket_session_key,
1449                        authenticator_subkey=None,
1450                        body_checksum_type=None,
1451                        native_decoded_only=True,
1452                        asn1_print=None,
1453                        hexdump=None):
1454         # KDC-REQ         ::= SEQUENCE {
1455         #        -- NOTE: first tag is [1], not [0]
1456         #        pvno            [1] INTEGER (5) ,
1457         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1458         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1459         #                            -- NOTE: not empty --,
1460         #        req-body        [4] KDC-REQ-BODY
1461         # }
1462         #
1463         # KDC-REQ-BODY    ::= SEQUENCE {
1464         #        kdc-options             [0] KDCOptions,
1465         #        cname                   [1] PrincipalName OPTIONAL
1466         #                                    -- Used only in AS-REQ --,
1467         #        realm                   [2] Realm
1468         #                                    -- Server's realm
1469         #                                    -- Also client's in AS-REQ --,
1470         #        sname                   [3] PrincipalName OPTIONAL,
1471         #        from                    [4] KerberosTime OPTIONAL,
1472         #        till                    [5] KerberosTime,
1473         #        rtime                   [6] KerberosTime OPTIONAL,
1474         #        nonce                   [7] UInt32,
1475         #        etype                   [8] SEQUENCE OF Int32
1476         #                                    -- EncryptionType
1477         #                                    -- in preference order --,
1478         #        addresses               [9] HostAddresses OPTIONAL,
1479         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1480         #                                    -- AuthorizationData --,
1481         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1482         #                                        -- NOTE: not empty
1483         # }
1484
1485         if authenticator_subkey is not None:
1486             EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1487         else:
1488             EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1489
1490         req_body = self.KDC_REQ_BODY_create(
1491             kdc_options=kdc_options,
1492             cname=None,
1493             realm=realm,
1494             sname=sname,
1495             from_time=from_time,
1496             till_time=till_time,
1497             renew_time=renew_time,
1498             nonce=nonce,
1499             etypes=etypes,
1500             addresses=addresses,
1501             additional_tickets=additional_tickets,
1502             EncAuthorizationData=EncAuthorizationData,
1503             EncAuthorizationData_key=EncAuthorizationData_key,
1504             EncAuthorizationData_usage=EncAuthorizationData_usage)
1505         req_body_blob = self.der_encode(req_body,
1506                                         asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1507                                         asn1_print=asn1_print, hexdump=hexdump)
1508
1509         req_body_checksum = self.Checksum_create(ticket_session_key,
1510                                                  KU_TGS_REQ_AUTH_CKSUM,
1511                                                  req_body_blob,
1512                                                  ctype=body_checksum_type)
1513
1514         subkey_obj = None
1515         if authenticator_subkey is not None:
1516             subkey_obj = authenticator_subkey.export_obj()
1517         seq_number = random.randint(0, 0xfffffffe)
1518         authenticator = self.Authenticator_create(
1519             crealm=realm,
1520             cname=cname,
1521             cksum=req_body_checksum,
1522             cusec=cusec,
1523             ctime=ctime,
1524             subkey=subkey_obj,
1525             seq_number=seq_number,
1526             authorization_data=None)
1527         authenticator = self.der_encode(
1528             authenticator,
1529             asn1Spec=krb5_asn1.Authenticator(),
1530             asn1_print=asn1_print,
1531             hexdump=hexdump)
1532
1533         authenticator = self.EncryptedData_create(
1534             ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1535
1536         ap_options = krb5_asn1.APOptions('0')
1537         ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1538                                     ticket=ticket,
1539                                     authenticator=authenticator)
1540         ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1541                                  asn1_print=asn1_print, hexdump=hexdump)
1542         pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1543         if padata is not None:
1544             padata.append(pa_tgs_req)
1545         else:
1546             padata = [pa_tgs_req]
1547
1548         obj, decoded = self.KDC_REQ_create(
1549             msg_type=KRB_TGS_REQ,
1550             padata=padata,
1551             req_body=req_body,
1552             asn1Spec=krb5_asn1.TGS_REQ(),
1553             asn1_print=asn1_print,
1554             hexdump=hexdump)
1555         if native_decoded_only:
1556             return decoded
1557         return decoded, obj
1558
1559     def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1560         # PA-S4U2Self     ::= SEQUENCE {
1561         #        name            [0] PrincipalName,
1562         #        realm           [1] Realm,
1563         #        cksum           [2] Checksum,
1564         #        auth            [3] GeneralString
1565         # }
1566         cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1567         for n in name['name-string']:
1568             cksum_data += n.encode()
1569         cksum_data += realm.encode()
1570         cksum_data += "Kerberos".encode()
1571         cksum = self.Checksum_create(tgt_session_key,
1572                                      KU_NON_KERB_CKSUM_SALT,
1573                                      cksum_data,
1574                                      ctype)
1575
1576         PA_S4U2Self_obj = {
1577             'name': name,
1578             'realm': realm,
1579             'cksum': cksum,
1580             'auth': "Kerberos",
1581         }
1582         pa_s4u2self = self.der_encode(
1583             PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1584         return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1585
1586     def _generic_kdc_exchange(self,
1587                               kdc_exchange_dict,  # required
1588                               cname=None,  # optional
1589                               realm=None,  # required
1590                               sname=None,  # optional
1591                               from_time=None,  # optional
1592                               till_time=None,  # required
1593                               renew_time=None,  # optional
1594                               etypes=None,  # required
1595                               addresses=None,  # optional
1596                               additional_tickets=None,  # optional
1597                               EncAuthorizationData=None,  # optional
1598                               EncAuthorizationData_key=None,  # optional
1599                               EncAuthorizationData_usage=None):  # optional
1600
1601         check_error_fn = kdc_exchange_dict['check_error_fn']
1602         check_rep_fn = kdc_exchange_dict['check_rep_fn']
1603         generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1604         generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1605         generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1606         generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1607         callback_dict = kdc_exchange_dict['callback_dict']
1608         req_msg_type = kdc_exchange_dict['req_msg_type']
1609         req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1610         rep_msg_type = kdc_exchange_dict['rep_msg_type']
1611
1612         expected_error_mode = kdc_exchange_dict['expected_error_mode']
1613         kdc_options = kdc_exchange_dict['kdc_options']
1614
1615         pac_request = kdc_exchange_dict['pac_request']
1616         pac_options = kdc_exchange_dict['pac_options']
1617
1618         # Parameters specific to the inner request body
1619         inner_req = kdc_exchange_dict['inner_req']
1620
1621         # Parameters specific to the outer request body
1622         outer_req = kdc_exchange_dict['outer_req']
1623
1624         if till_time is None:
1625             till_time = self.get_KerberosTime(offset=36000)
1626
1627         if 'nonce' in kdc_exchange_dict:
1628             nonce = kdc_exchange_dict['nonce']
1629         else:
1630             nonce = self.get_Nonce()
1631             kdc_exchange_dict['nonce'] = nonce
1632
1633         req_body = self.KDC_REQ_BODY_create(
1634             kdc_options=kdc_options,
1635             cname=cname,
1636             realm=realm,
1637             sname=sname,
1638             from_time=from_time,
1639             till_time=till_time,
1640             renew_time=renew_time,
1641             nonce=nonce,
1642             etypes=etypes,
1643             addresses=addresses,
1644             additional_tickets=additional_tickets,
1645             EncAuthorizationData=EncAuthorizationData,
1646             EncAuthorizationData_key=EncAuthorizationData_key,
1647             EncAuthorizationData_usage=EncAuthorizationData_usage)
1648
1649         inner_req_body = dict(req_body)
1650         if inner_req is not None:
1651             for key, value in inner_req.items():
1652                 if value is not None:
1653                     inner_req_body[key] = value
1654                 else:
1655                     del inner_req_body[key]
1656         if outer_req is not None:
1657             for key, value in outer_req.items():
1658                 if value is not None:
1659                     req_body[key] = value
1660                 else:
1661                     del req_body[key]
1662
1663         additional_padata = []
1664         if pac_request is not None:
1665             pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1666             additional_padata.append(pa_pac_request)
1667         if pac_options is not None:
1668             pa_pac_options = self.get_pa_pac_options(pac_options)
1669             additional_padata.append(pa_pac_options)
1670
1671         if req_msg_type == KRB_AS_REQ:
1672             tgs_req = None
1673             tgs_req_padata = None
1674         else:
1675             self.assertEqual(KRB_TGS_REQ, req_msg_type)
1676
1677             tgs_req = self.generate_ap_req(kdc_exchange_dict,
1678                                            callback_dict,
1679                                            req_body,
1680                                            armor=False)
1681             tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1682
1683         if generate_fast_padata_fn is not None:
1684             self.assertIsNotNone(generate_fast_fn)
1685             # This can alter req_body...
1686             fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1687                                                             callback_dict,
1688                                                             req_body)
1689
1690             fast_padata += additional_padata
1691         else:
1692             fast_padata = []
1693
1694         if generate_fast_armor_fn is not None:
1695             self.assertIsNotNone(generate_fast_fn)
1696             fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1697                                                  callback_dict,
1698                                                  req_body,
1699                                                  armor=True)
1700
1701             fast_armor_type = kdc_exchange_dict['fast_armor_type']
1702             fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1703                                                     fast_ap_req)
1704         else:
1705             fast_armor = None
1706
1707         if generate_padata_fn is not None:
1708             # This can alter req_body...
1709             outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1710                                                         callback_dict,
1711                                                         req_body)
1712             self.assertIsNotNone(outer_padata)
1713             self.assertNotIn(PADATA_KDC_REQ,
1714                              [pa['padata-type'] for pa in outer_padata],
1715                              'Don\'t create TGS-REQ manually')
1716         else:
1717             outer_padata = None
1718
1719         if generate_fast_fn is not None:
1720             armor_key = kdc_exchange_dict['armor_key']
1721             self.assertIsNotNone(armor_key)
1722
1723             if req_msg_type == KRB_AS_REQ:
1724                 checksum_blob = self.der_encode(
1725                     req_body,
1726                     asn1Spec=krb5_asn1.KDC_REQ_BODY())
1727             else:
1728                 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1729                 checksum_blob = tgs_req
1730
1731             checksum = self.Checksum_create(armor_key,
1732                                             KU_FAST_REQ_CHKSUM,
1733                                             checksum_blob)
1734
1735             fast = generate_fast_fn(kdc_exchange_dict,
1736                                     callback_dict,
1737                                     inner_req_body,
1738                                     fast_padata,
1739                                     fast_armor,
1740                                     checksum)
1741         else:
1742             fast = None
1743
1744         padata = []
1745
1746         if tgs_req_padata is not None:
1747             padata.append(tgs_req_padata)
1748
1749         if fast is not None:
1750             padata.append(fast)
1751
1752         if outer_padata is not None:
1753             padata += outer_padata
1754
1755         if fast is None:
1756             padata += additional_padata
1757
1758         if not padata:
1759             padata = None
1760
1761         kdc_exchange_dict['req_padata'] = padata
1762         kdc_exchange_dict['fast_padata'] = fast_padata
1763         kdc_exchange_dict['req_body'] = inner_req_body
1764
1765         req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1766                                                    padata=padata,
1767                                                    req_body=req_body,
1768                                                    asn1Spec=req_asn1Spec())
1769
1770         to_rodc = kdc_exchange_dict['to_rodc']
1771
1772         rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1773         self.assertIsNotNone(rep)
1774
1775         msg_type = self.getElementValue(rep, 'msg-type')
1776         self.assertIsNotNone(msg_type)
1777
1778         expected_msg_type = None
1779         if check_error_fn is not None:
1780             expected_msg_type = KRB_ERROR
1781             self.assertIsNone(check_rep_fn)
1782             self.assertNotEqual(0, len(expected_error_mode))
1783             self.assertNotIn(0, expected_error_mode)
1784         if check_rep_fn is not None:
1785             expected_msg_type = rep_msg_type
1786             self.assertIsNone(check_error_fn)
1787             self.assertEqual(0, len(expected_error_mode))
1788         self.assertIsNotNone(expected_msg_type)
1789         self.assertEqual(msg_type, expected_msg_type)
1790
1791         if msg_type == KRB_ERROR:
1792             return check_error_fn(kdc_exchange_dict,
1793                                   callback_dict,
1794                                   rep)
1795
1796         return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1797
1798     def as_exchange_dict(self,
1799                          expected_crealm=None,
1800                          expected_cname=None,
1801                          expected_anon=False,
1802                          expected_srealm=None,
1803                          expected_sname=None,
1804                          expected_flags=None,
1805                          unexpected_flags=None,
1806                          ticket_decryption_key=None,
1807                          generate_fast_fn=None,
1808                          generate_fast_armor_fn=None,
1809                          generate_fast_padata_fn=None,
1810                          fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1811                          generate_padata_fn=None,
1812                          check_error_fn=None,
1813                          check_rep_fn=None,
1814                          check_kdc_private_fn=None,
1815                          callback_dict=None,
1816                          expected_error_mode=0,
1817                          expected_status=None,
1818                          client_as_etypes=None,
1819                          expected_salt=None,
1820                          authenticator_subkey=None,
1821                          preauth_key=None,
1822                          armor_key=None,
1823                          armor_tgt=None,
1824                          armor_subkey=None,
1825                          auth_data=None,
1826                          kdc_options='',
1827                          inner_req=None,
1828                          outer_req=None,
1829                          pac_request=None,
1830                          pac_options=None,
1831                          to_rodc=False):
1832         if expected_error_mode == 0:
1833             expected_error_mode = ()
1834         elif not isinstance(expected_error_mode, collections.abc.Container):
1835             expected_error_mode = (expected_error_mode,)
1836
1837         kdc_exchange_dict = {
1838             'req_msg_type': KRB_AS_REQ,
1839             'req_asn1Spec': krb5_asn1.AS_REQ,
1840             'rep_msg_type': KRB_AS_REP,
1841             'rep_asn1Spec': krb5_asn1.AS_REP,
1842             'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1843             'expected_crealm': expected_crealm,
1844             'expected_cname': expected_cname,
1845             'expected_anon': expected_anon,
1846             'expected_srealm': expected_srealm,
1847             'expected_sname': expected_sname,
1848             'expected_flags': expected_flags,
1849             'unexpected_flags': unexpected_flags,
1850             'ticket_decryption_key': ticket_decryption_key,
1851             'generate_fast_fn': generate_fast_fn,
1852             'generate_fast_armor_fn': generate_fast_armor_fn,
1853             'generate_fast_padata_fn': generate_fast_padata_fn,
1854             'fast_armor_type': fast_armor_type,
1855             'generate_padata_fn': generate_padata_fn,
1856             'check_error_fn': check_error_fn,
1857             'check_rep_fn': check_rep_fn,
1858             'check_kdc_private_fn': check_kdc_private_fn,
1859             'callback_dict': callback_dict,
1860             'expected_error_mode': expected_error_mode,
1861             'expected_status': expected_status,
1862             'client_as_etypes': client_as_etypes,
1863             'expected_salt': expected_salt,
1864             'authenticator_subkey': authenticator_subkey,
1865             'preauth_key': preauth_key,
1866             'armor_key': armor_key,
1867             'armor_tgt': armor_tgt,
1868             'armor_subkey': armor_subkey,
1869             'auth_data': auth_data,
1870             'kdc_options': kdc_options,
1871             'inner_req': inner_req,
1872             'outer_req': outer_req,
1873             'pac_request': pac_request,
1874             'pac_options': pac_options,
1875             'to_rodc': to_rodc
1876         }
1877         if callback_dict is None:
1878             callback_dict = {}
1879
1880         return kdc_exchange_dict
1881
1882     def tgs_exchange_dict(self,
1883                           expected_crealm=None,
1884                           expected_cname=None,
1885                           expected_anon=False,
1886                           expected_srealm=None,
1887                           expected_sname=None,
1888                           expected_flags=None,
1889                           unexpected_flags=None,
1890                           ticket_decryption_key=None,
1891                           generate_fast_fn=None,
1892                           generate_fast_armor_fn=None,
1893                           generate_fast_padata_fn=None,
1894                           fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1895                           generate_padata_fn=None,
1896                           check_error_fn=None,
1897                           check_rep_fn=None,
1898                           check_kdc_private_fn=None,
1899                           expected_error_mode=0,
1900                           expected_status=None,
1901                           callback_dict=None,
1902                           tgt=None,
1903                           armor_key=None,
1904                           armor_tgt=None,
1905                           armor_subkey=None,
1906                           authenticator_subkey=None,
1907                           auth_data=None,
1908                           body_checksum_type=None,
1909                           kdc_options='',
1910                           inner_req=None,
1911                           outer_req=None,
1912                           pac_request=None,
1913                           pac_options=None,
1914                           to_rodc=False):
1915         if expected_error_mode == 0:
1916             expected_error_mode = ()
1917         elif not isinstance(expected_error_mode, collections.abc.Container):
1918             expected_error_mode = (expected_error_mode,)
1919
1920         kdc_exchange_dict = {
1921             'req_msg_type': KRB_TGS_REQ,
1922             'req_asn1Spec': krb5_asn1.TGS_REQ,
1923             'rep_msg_type': KRB_TGS_REP,
1924             'rep_asn1Spec': krb5_asn1.TGS_REP,
1925             'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
1926             'expected_crealm': expected_crealm,
1927             'expected_cname': expected_cname,
1928             'expected_anon': expected_anon,
1929             'expected_srealm': expected_srealm,
1930             'expected_sname': expected_sname,
1931             'expected_flags': expected_flags,
1932             'unexpected_flags': unexpected_flags,
1933             'ticket_decryption_key': ticket_decryption_key,
1934             'generate_fast_fn': generate_fast_fn,
1935             'generate_fast_armor_fn': generate_fast_armor_fn,
1936             'generate_fast_padata_fn': generate_fast_padata_fn,
1937             'fast_armor_type': fast_armor_type,
1938             'generate_padata_fn': generate_padata_fn,
1939             'check_error_fn': check_error_fn,
1940             'check_rep_fn': check_rep_fn,
1941             'check_kdc_private_fn': check_kdc_private_fn,
1942             'callback_dict': callback_dict,
1943             'expected_error_mode': expected_error_mode,
1944             'expected_status': expected_status,
1945             'tgt': tgt,
1946             'body_checksum_type': body_checksum_type,
1947             'armor_key': armor_key,
1948             'armor_tgt': armor_tgt,
1949             'armor_subkey': armor_subkey,
1950             'auth_data': auth_data,
1951             'authenticator_subkey': authenticator_subkey,
1952             'kdc_options': kdc_options,
1953             'inner_req': inner_req,
1954             'outer_req': outer_req,
1955             'pac_request': pac_request,
1956             'pac_options': pac_options,
1957             'to_rodc': to_rodc
1958         }
1959         if callback_dict is None:
1960             callback_dict = {}
1961
1962         return kdc_exchange_dict
1963
1964     def generic_check_kdc_rep(self,
1965                               kdc_exchange_dict,
1966                               callback_dict,
1967                               rep):
1968
1969         expected_crealm = kdc_exchange_dict['expected_crealm']
1970         expected_anon = kdc_exchange_dict['expected_anon']
1971         expected_srealm = kdc_exchange_dict['expected_srealm']
1972         expected_sname = kdc_exchange_dict['expected_sname']
1973         ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
1974         check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
1975         rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
1976         msg_type = kdc_exchange_dict['rep_msg_type']
1977         armor_key = kdc_exchange_dict['armor_key']
1978
1979         self.assertElementEqual(rep, 'msg-type', msg_type)  # AS-REP | TGS-REP
1980         padata = self.getElementValue(rep, 'padata')
1981         if self.strict_checking:
1982             self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
1983             if expected_anon:
1984                 expected_cname = self.PrincipalName_create(
1985                     name_type=NT_WELLKNOWN,
1986                     names=['WELLKNOWN', 'ANONYMOUS'])
1987             else:
1988                 expected_cname = kdc_exchange_dict['expected_cname']
1989             self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
1990         self.assertElementPresent(rep, 'ticket')
1991         ticket = self.getElementValue(rep, 'ticket')
1992         ticket_encpart = None
1993         ticket_cipher = None
1994         self.assertIsNotNone(ticket)
1995         if ticket is not None:  # Never None, but gives indentation
1996             self.assertElementEqual(ticket, 'tkt-vno', 5)
1997             self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
1998             self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
1999             self.assertElementPresent(ticket, 'enc-part')
2000             ticket_encpart = self.getElementValue(ticket, 'enc-part')
2001             self.assertIsNotNone(ticket_encpart)
2002             if ticket_encpart is not None:  # Never None, but gives indentation
2003                 self.assertElementPresent(ticket_encpart, 'etype')
2004                 # 'unspecified' means present, with any value != 0
2005                 self.assertElementKVNO(ticket_encpart, 'kvno',
2006                                        self.unspecified_kvno)
2007                 self.assertElementPresent(ticket_encpart, 'cipher')
2008                 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2009         self.assertElementPresent(rep, 'enc-part')
2010         encpart = self.getElementValue(rep, 'enc-part')
2011         encpart_cipher = None
2012         self.assertIsNotNone(encpart)
2013         if encpart is not None:  # Never None, but gives indentation
2014             self.assertElementPresent(encpart, 'etype')
2015             self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2016             self.assertElementPresent(encpart, 'cipher')
2017             encpart_cipher = self.getElementValue(encpart, 'cipher')
2018
2019         ticket_checksum = None
2020
2021         # Get the decryption key for the encrypted part
2022         encpart_decryption_key, encpart_decryption_usage = (
2023             self.get_preauth_key(kdc_exchange_dict))
2024
2025         if armor_key is not None:
2026             pa_dict = self.get_pa_dict(padata)
2027
2028             if PADATA_FX_FAST in pa_dict:
2029                 fx_fast_data = pa_dict[PADATA_FX_FAST]
2030                 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2031                                                         fx_fast_data,
2032                                                         armor_key,
2033                                                         finished=True)
2034
2035                 if 'strengthen-key' in fast_response:
2036                     strengthen_key = self.EncryptionKey_import(
2037                         fast_response['strengthen-key'])
2038                     encpart_decryption_key = (
2039                         self.generate_strengthen_reply_key(
2040                             strengthen_key,
2041                             encpart_decryption_key))
2042
2043                 fast_finished = fast_response.get('finished')
2044                 if fast_finished is not None:
2045                     ticket_checksum = fast_finished['ticket-checksum']
2046
2047                 self.check_rep_padata(kdc_exchange_dict,
2048                                       callback_dict,
2049                                       rep,
2050                                       fast_response['padata'],
2051                                       error_code=0)
2052
2053         ticket_private = None
2054         if ticket_decryption_key is not None:
2055             self.assertElementEqual(ticket_encpart, 'etype',
2056                                     ticket_decryption_key.etype)
2057             self.assertElementKVNO(ticket_encpart, 'kvno',
2058                                    ticket_decryption_key.kvno)
2059             ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2060                                                            ticket_cipher)
2061             ticket_private = self.der_decode(
2062                 ticket_decpart,
2063                 asn1Spec=krb5_asn1.EncTicketPart())
2064
2065         encpart_private = None
2066         self.assertIsNotNone(encpart_decryption_key)
2067         if encpart_decryption_key is not None:
2068             self.assertElementEqual(encpart, 'etype',
2069                                     encpart_decryption_key.etype)
2070             if self.strict_checking:
2071                 self.assertElementKVNO(encpart, 'kvno',
2072                                        encpart_decryption_key.kvno)
2073             rep_decpart = encpart_decryption_key.decrypt(
2074                 encpart_decryption_usage,
2075                 encpart_cipher)
2076             # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2077             # application tag 26
2078             try:
2079                 encpart_private = self.der_decode(
2080                     rep_decpart,
2081                     asn1Spec=rep_encpart_asn1Spec())
2082             except Exception:
2083                 encpart_private = self.der_decode(
2084                     rep_decpart,
2085                     asn1Spec=krb5_asn1.EncTGSRepPart())
2086
2087         self.assertIsNotNone(check_kdc_private_fn)
2088         if check_kdc_private_fn is not None:
2089             check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2090                                  rep, ticket_private, encpart_private,
2091                                  ticket_checksum)
2092
2093         return rep
2094
2095     def check_fx_fast_data(self,
2096                            kdc_exchange_dict,
2097                            fx_fast_data,
2098                            armor_key,
2099                            finished=False,
2100                            expect_strengthen_key=True):
2101         fx_fast_data = self.der_decode(fx_fast_data,
2102                                        asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2103
2104         enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2105         self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2106
2107         fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2108
2109         fast_response = self.der_decode(fast_rep,
2110                                         asn1Spec=krb5_asn1.KrbFastResponse())
2111
2112         if expect_strengthen_key and self.strict_checking:
2113             self.assertIn('strengthen-key', fast_response)
2114
2115         if finished:
2116             self.assertIn('finished', fast_response)
2117
2118         # Ensure that the nonce matches the nonce in the body of the request
2119         # (RFC6113 5.4.3).
2120         nonce = kdc_exchange_dict['nonce']
2121         self.assertEqual(nonce, fast_response['nonce'])
2122
2123         return fast_response
2124
2125     def generic_check_kdc_private(self,
2126                                   kdc_exchange_dict,
2127                                   callback_dict,
2128                                   rep,
2129                                   ticket_private,
2130                                   encpart_private,
2131                                   ticket_checksum):
2132         kdc_options = kdc_exchange_dict['kdc_options']
2133         canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2134         canonicalize = (canon_pos < len(kdc_options)
2135                         and kdc_options[canon_pos] == '1')
2136         renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2137         renewable = (renewable_pos < len(kdc_options)
2138                      and kdc_options[renewable_pos] == '1')
2139
2140         expected_crealm = kdc_exchange_dict['expected_crealm']
2141         expected_cname = kdc_exchange_dict['expected_cname']
2142         expected_srealm = kdc_exchange_dict['expected_srealm']
2143         expected_sname = kdc_exchange_dict['expected_sname']
2144         ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2145
2146         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2147
2148         expected_flags = kdc_exchange_dict.get('expected_flags')
2149         unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2150
2151         ticket = self.getElementValue(rep, 'ticket')
2152
2153         if ticket_checksum is not None:
2154             armor_key = kdc_exchange_dict['armor_key']
2155             self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2156
2157         ticket_session_key = None
2158         if ticket_private is not None:
2159             self.assertElementFlags(ticket_private, 'flags',
2160                                     expected_flags,
2161                                     unexpected_flags)
2162             self.assertElementPresent(ticket_private, 'key')
2163             ticket_key = self.getElementValue(ticket_private, 'key')
2164             self.assertIsNotNone(ticket_key)
2165             if ticket_key is not None:  # Never None, but gives indentation
2166                 self.assertElementPresent(ticket_key, 'keytype')
2167                 self.assertElementPresent(ticket_key, 'keyvalue')
2168                 ticket_session_key = self.EncryptionKey_import(ticket_key)
2169             self.assertElementEqualUTF8(ticket_private, 'crealm',
2170                                         expected_crealm)
2171             if self.strict_checking:
2172                 self.assertElementEqualPrincipal(ticket_private, 'cname',
2173                                                  expected_cname)
2174             self.assertElementPresent(ticket_private, 'transited')
2175             self.assertElementPresent(ticket_private, 'authtime')
2176             if self.strict_checking:
2177                 self.assertElementPresent(ticket_private, 'starttime')
2178             self.assertElementPresent(ticket_private, 'endtime')
2179             if renewable:
2180                 if self.strict_checking:
2181                     self.assertElementPresent(ticket_private, 'renew-till')
2182             else:
2183                 self.assertElementMissing(ticket_private, 'renew-till')
2184             if self.strict_checking:
2185                 self.assertElementEqual(ticket_private, 'caddr', [])
2186             self.assertElementPresent(ticket_private, 'authorization-data')
2187
2188         encpart_session_key = None
2189         if encpart_private is not None:
2190             self.assertElementPresent(encpart_private, 'key')
2191             encpart_key = self.getElementValue(encpart_private, 'key')
2192             self.assertIsNotNone(encpart_key)
2193             if encpart_key is not None:  # Never None, but gives indentation
2194                 self.assertElementPresent(encpart_key, 'keytype')
2195                 self.assertElementPresent(encpart_key, 'keyvalue')
2196                 encpart_session_key = self.EncryptionKey_import(encpart_key)
2197             self.assertElementPresent(encpart_private, 'last-req')
2198             self.assertElementEqual(encpart_private, 'nonce',
2199                                     kdc_exchange_dict['nonce'])
2200             if rep_msg_type == KRB_AS_REP:
2201                 if self.strict_checking:
2202                     self.assertElementPresent(encpart_private,
2203                                               'key-expiration')
2204             else:
2205                 self.assertElementMissing(encpart_private,
2206                                           'key-expiration')
2207             self.assertElementFlags(encpart_private, 'flags',
2208                                     expected_flags,
2209                                     unexpected_flags)
2210             self.assertElementPresent(encpart_private, 'authtime')
2211             if self.strict_checking:
2212                 self.assertElementPresent(encpart_private, 'starttime')
2213             self.assertElementPresent(encpart_private, 'endtime')
2214             if renewable:
2215                 if self.strict_checking:
2216                     self.assertElementPresent(encpart_private, 'renew-till')
2217             else:
2218                 self.assertElementMissing(encpart_private, 'renew-till')
2219             self.assertElementEqualUTF8(encpart_private, 'srealm',
2220                                         expected_srealm)
2221             self.assertElementEqualPrincipal(encpart_private, 'sname',
2222                                              expected_sname)
2223             if self.strict_checking:
2224                 self.assertElementEqual(encpart_private, 'caddr', [])
2225
2226             sent_claims = self.sent_claims(kdc_exchange_dict)
2227
2228             if self.strict_checking:
2229                 if sent_claims or canonicalize:
2230                     self.assertElementPresent(encpart_private,
2231                                               'encrypted-pa-data')
2232                     enc_pa_dict = self.get_pa_dict(
2233                         encpart_private['encrypted-pa-data'])
2234                     if canonicalize:
2235                         self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2236
2237                         (supported_etypes,) = struct.unpack(
2238                             '<L',
2239                             enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2240
2241                         self.assertTrue(
2242                             security.KERB_ENCTYPE_FAST_SUPPORTED
2243                             & supported_etypes)
2244                         self.assertTrue(
2245                             security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2246                             & supported_etypes)
2247                         self.assertTrue(
2248                             security.KERB_ENCTYPE_CLAIMS_SUPPORTED
2249                             & supported_etypes)
2250                     else:
2251                         self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2252
2253                     # ClaimsCompIdFASTSupported registry key
2254                     if sent_claims:
2255                         self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2256
2257                         self.check_pac_options_claims_support(
2258                             enc_pa_dict[PADATA_PAC_OPTIONS])
2259                     else:
2260                         self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2261                 else:
2262                     self.assertElementEqual(encpart_private,
2263                                             'encrypted-pa-data',
2264                                             [])
2265
2266         if ticket_session_key is not None and encpart_session_key is not None:
2267             self.assertEqual(ticket_session_key.etype,
2268                              encpart_session_key.etype)
2269             self.assertEqual(ticket_session_key.key.contents,
2270                              encpart_session_key.key.contents)
2271         if encpart_session_key is not None:
2272             session_key = encpart_session_key
2273         else:
2274             session_key = ticket_session_key
2275         ticket_creds = KerberosTicketCreds(
2276             ticket,
2277             session_key,
2278             crealm=expected_crealm,
2279             cname=expected_cname,
2280             srealm=expected_srealm,
2281             sname=expected_sname,
2282             decryption_key=ticket_decryption_key,
2283             ticket_private=ticket_private,
2284             encpart_private=encpart_private)
2285
2286         kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2287
2288     def check_pac_options_claims_support(self, pac_options):
2289         pac_options = self.der_decode(pac_options,
2290                                       asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2291         self.assertEqual('1', pac_options['options'][0])  # claims bit
2292
2293     def generic_check_kdc_error(self,
2294                                 kdc_exchange_dict,
2295                                 callback_dict,
2296                                 rep,
2297                                 inner=False):
2298
2299         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2300
2301         expected_anon = kdc_exchange_dict['expected_anon']
2302         expected_srealm = kdc_exchange_dict['expected_srealm']
2303         expected_sname = kdc_exchange_dict['expected_sname']
2304         expected_error_mode = kdc_exchange_dict['expected_error_mode']
2305
2306         sent_fast = self.sent_fast(kdc_exchange_dict)
2307
2308         fast_armor_type = kdc_exchange_dict['fast_armor_type']
2309
2310         self.assertElementEqual(rep, 'pvno', 5)
2311         self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2312         error_code = self.getElementValue(rep, 'error-code')
2313         self.assertIn(error_code, expected_error_mode)
2314         if self.strict_checking:
2315             self.assertElementMissing(rep, 'ctime')
2316             self.assertElementMissing(rep, 'cusec')
2317         self.assertElementPresent(rep, 'stime')
2318         self.assertElementPresent(rep, 'susec')
2319         # error-code checked above
2320         if self.strict_checking:
2321             self.assertElementMissing(rep, 'crealm')
2322             if expected_anon and not inner:
2323                 expected_cname = self.PrincipalName_create(
2324                     name_type=NT_WELLKNOWN,
2325                     names=['WELLKNOWN', 'ANONYMOUS'])
2326                 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2327             else:
2328                 self.assertElementMissing(rep, 'cname')
2329             self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2330             if sent_fast and error_code == KDC_ERR_GENERIC:
2331                 self.assertElementEqualPrincipal(rep, 'sname',
2332                                                  self.get_krbtgt_sname())
2333             else:
2334                 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2335             self.assertElementMissing(rep, 'e-text')
2336         if (error_code == KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2337                 or (rep_msg_type == KRB_TGS_REP
2338                     and not sent_fast)
2339                 or (sent_fast and fast_armor_type is not None
2340                     and fast_armor_type != FX_FAST_ARMOR_AP_REQUEST)
2341                 or inner):
2342             self.assertElementMissing(rep, 'e-data')
2343             return rep
2344         edata = self.getElementValue(rep, 'e-data')
2345         if self.strict_checking:
2346             if error_code != KDC_ERR_GENERIC:
2347                 # Predicting whether an ERR_GENERIC error contains e-data is
2348                 # more complicated.
2349                 self.assertIsNotNone(edata)
2350         if edata is not None:
2351             if rep_msg_type == KRB_TGS_REP and not sent_fast:
2352                 rep_padata = [self.der_decode(edata,
2353                                               asn1Spec=krb5_asn1.PA_DATA())]
2354             else:
2355                 rep_padata = self.der_decode(edata,
2356                                              asn1Spec=krb5_asn1.METHOD_DATA())
2357             self.assertGreater(len(rep_padata), 0)
2358
2359             if sent_fast:
2360                 self.assertEqual(1, len(rep_padata))
2361                 rep_pa_dict = self.get_pa_dict(rep_padata)
2362                 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2363
2364                 armor_key = kdc_exchange_dict['armor_key']
2365                 self.assertIsNotNone(armor_key)
2366                 fast_response = self.check_fx_fast_data(
2367                     kdc_exchange_dict,
2368                     rep_pa_dict[PADATA_FX_FAST],
2369                     armor_key,
2370                     expect_strengthen_key=False)
2371
2372                 rep_padata = fast_response['padata']
2373
2374             etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2375                                                 callback_dict,
2376                                                 rep,
2377                                                 rep_padata,
2378                                                 error_code)
2379
2380             kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2381
2382         return rep
2383
2384     def check_rep_padata(self,
2385                          kdc_exchange_dict,
2386                          callback_dict,
2387                          rep,
2388                          rep_padata,
2389                          error_code):
2390         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2391
2392         req_body = kdc_exchange_dict['req_body']
2393         proposed_etypes = req_body['etype']
2394         client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2395
2396         sent_fast = self.sent_fast(kdc_exchange_dict)
2397         sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2398
2399         if rep_msg_type == KRB_TGS_REP:
2400             self.assertTrue(sent_fast)
2401
2402         expect_etype_info2 = ()
2403         expect_etype_info = False
2404         unexpect_etype_info = True
2405         expected_aes_type = 0
2406         expected_rc4_type = 0
2407         if kcrypto.Enctype.RC4 in proposed_etypes:
2408             expect_etype_info = True
2409         for etype in proposed_etypes:
2410             if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2411                 expect_etype_info = False
2412             if etype not in client_as_etypes:
2413                 continue
2414             if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2415                 if etype > expected_aes_type:
2416                     expected_aes_type = etype
2417             if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2418                 unexpect_etype_info = False
2419                 if etype > expected_rc4_type:
2420                     expected_rc4_type = etype
2421
2422         if expected_aes_type != 0:
2423             expect_etype_info2 += (expected_aes_type,)
2424         if expected_rc4_type != 0:
2425             expect_etype_info2 += (expected_rc4_type,)
2426
2427         expected_patypes = ()
2428         if sent_fast and error_code != 0:
2429             expected_patypes += (PADATA_FX_ERROR,)
2430             expected_patypes += (PADATA_FX_COOKIE,)
2431
2432         if rep_msg_type == KRB_TGS_REP:
2433             if not sent_fast and error_code != 0:
2434                 expected_patypes += (PADATA_PW_SALT,)
2435             else:
2436                 sent_claims = self.sent_claims(kdc_exchange_dict)
2437                 if sent_claims and error_code not in (0, KDC_ERR_GENERIC):
2438                     expected_patypes += (PADATA_PAC_OPTIONS,)
2439         elif error_code != KDC_ERR_GENERIC:
2440             if expect_etype_info:
2441                 self.assertGreater(len(expect_etype_info2), 0)
2442                 expected_patypes += (PADATA_ETYPE_INFO,)
2443             if len(expect_etype_info2) != 0:
2444                 expected_patypes += (PADATA_ETYPE_INFO2,)
2445
2446             if error_code != KDC_ERR_PREAUTH_FAILED:
2447                 if sent_fast:
2448                     expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2449                 else:
2450                     expected_patypes += (PADATA_ENC_TIMESTAMP,)
2451
2452                 if not sent_enc_challenge:
2453                     expected_patypes += (PADATA_PK_AS_REQ,)
2454                     expected_patypes += (PADATA_PK_AS_REP_19,)
2455
2456             if (self.kdc_fast_support
2457                     and not sent_fast
2458                     and not sent_enc_challenge):
2459                 expected_patypes += (PADATA_FX_FAST,)
2460                 expected_patypes += (PADATA_FX_COOKIE,)
2461
2462         if self.strict_checking:
2463             for i, patype in enumerate(expected_patypes):
2464                 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
2465             self.assertEqual(len(rep_padata), len(expected_patypes))
2466
2467         etype_info2 = None
2468         etype_info = None
2469         enc_timestamp = None
2470         enc_challenge = None
2471         pk_as_req = None
2472         pk_as_rep19 = None
2473         fast_cookie = None
2474         fast_error = None
2475         fx_fast = None
2476         pac_options = None
2477         pw_salt = None
2478         for pa in rep_padata:
2479             patype = self.getElementValue(pa, 'padata-type')
2480             pavalue = self.getElementValue(pa, 'padata-value')
2481             if patype == PADATA_ETYPE_INFO2:
2482                 self.assertIsNone(etype_info2)
2483                 etype_info2 = self.der_decode(pavalue,
2484                                               asn1Spec=krb5_asn1.ETYPE_INFO2())
2485                 continue
2486             if patype == PADATA_ETYPE_INFO:
2487                 self.assertIsNone(etype_info)
2488                 etype_info = self.der_decode(pavalue,
2489                                              asn1Spec=krb5_asn1.ETYPE_INFO())
2490                 continue
2491             if patype == PADATA_ENC_TIMESTAMP:
2492                 self.assertIsNone(enc_timestamp)
2493                 enc_timestamp = pavalue
2494                 self.assertEqual(len(enc_timestamp), 0)
2495                 continue
2496             if patype == PADATA_ENCRYPTED_CHALLENGE:
2497                 self.assertIsNone(enc_challenge)
2498                 enc_challenge = pavalue
2499                 continue
2500             if patype == PADATA_PK_AS_REQ:
2501                 self.assertIsNone(pk_as_req)
2502                 pk_as_req = pavalue
2503                 self.assertEqual(len(pk_as_req), 0)
2504                 continue
2505             if patype == PADATA_PK_AS_REP_19:
2506                 self.assertIsNone(pk_as_rep19)
2507                 pk_as_rep19 = pavalue
2508                 self.assertEqual(len(pk_as_rep19), 0)
2509                 continue
2510             if patype == PADATA_FX_COOKIE:
2511                 self.assertIsNone(fast_cookie)
2512                 fast_cookie = pavalue
2513                 self.assertIsNotNone(fast_cookie)
2514                 continue
2515             if patype == PADATA_FX_ERROR:
2516                 self.assertIsNone(fast_error)
2517                 fast_error = pavalue
2518                 self.assertIsNotNone(fast_error)
2519                 continue
2520             if patype == PADATA_FX_FAST:
2521                 self.assertIsNone(fx_fast)
2522                 fx_fast = pavalue
2523                 self.assertEqual(len(fx_fast), 0)
2524                 continue
2525             if patype == PADATA_PAC_OPTIONS:
2526                 self.assertIsNone(pac_options)
2527                 pac_options = pavalue
2528                 self.assertIsNotNone(pac_options)
2529                 continue
2530             if patype == PADATA_PW_SALT:
2531                 self.assertIsNone(pw_salt)
2532                 pw_salt = pavalue
2533                 self.assertIsNotNone(pw_salt)
2534                 continue
2535
2536         if fast_cookie is not None:
2537             kdc_exchange_dict['fast_cookie'] = fast_cookie
2538
2539         if fast_error is not None:
2540             fast_error = self.der_decode(fast_error,
2541                                          asn1Spec=krb5_asn1.KRB_ERROR())
2542             self.generic_check_kdc_error(kdc_exchange_dict,
2543                                          callback_dict,
2544                                          fast_error,
2545                                          inner=True)
2546
2547         if pac_options is not None:
2548             self.check_pac_options_claims_support(pac_options)
2549
2550         if pw_salt is not None:
2551             self.assertEqual(12, len(pw_salt))
2552
2553             status = int.from_bytes(pw_salt[:4], 'little')
2554             flags = int.from_bytes(pw_salt[8:], 'little')
2555
2556             expected_status = kdc_exchange_dict['expected_status']
2557             self.assertEqual(expected_status, status)
2558
2559             self.assertEqual(3, flags)
2560         else:
2561             self.assertIsNone(kdc_exchange_dict.get('expected_status'))
2562
2563         if enc_challenge is not None:
2564             if not sent_enc_challenge:
2565                 self.assertEqual(len(enc_challenge), 0)
2566             else:
2567                 armor_key = kdc_exchange_dict['armor_key']
2568                 self.assertIsNotNone(armor_key)
2569
2570                 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2571
2572                 kdc_challenge_key = self.generate_kdc_challenge_key(
2573                     armor_key, preauth_key)
2574
2575                 # Ensure that the encrypted challenge FAST factor is supported
2576                 # (RFC6113 5.4.6).
2577                 if self.strict_checking:
2578                     self.assertNotEqual(len(enc_challenge), 0)
2579                 if len(enc_challenge) != 0:
2580                     encrypted_challenge = self.der_decode(
2581                         enc_challenge,
2582                         asn1Spec=krb5_asn1.EncryptedData())
2583                     self.assertEqual(encrypted_challenge['etype'],
2584                                      kdc_challenge_key.etype)
2585
2586                     challenge = kdc_challenge_key.decrypt(
2587                         KU_ENC_CHALLENGE_KDC,
2588                         encrypted_challenge['cipher'])
2589                     challenge = self.der_decode(
2590                         challenge,
2591                         asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2592
2593                     # Retrieve the returned timestamp.
2594                     rep_patime = challenge['patimestamp']
2595                     self.assertIn('pausec', challenge)
2596
2597                     # Ensure the returned time is within five minutes of the
2598                     # current time.
2599                     rep_time = self.get_EpochFromKerberosTime(rep_patime)
2600                     current_time = time.time()
2601
2602                     self.assertLess(current_time - 300, rep_time)
2603                     self.assertLess(rep_time, current_time + 300)
2604
2605         if all(etype not in client_as_etypes or etype not in proposed_etypes
2606                for etype in (kcrypto.Enctype.AES256,
2607                              kcrypto.Enctype.AES128,
2608                              kcrypto.Enctype.RC4)):
2609             self.assertIsNone(etype_info2)
2610             self.assertIsNone(etype_info)
2611             if rep_msg_type == KRB_AS_REP:
2612                 if self.strict_checking:
2613                     if sent_fast:
2614                         self.assertIsNotNone(enc_challenge)
2615                         self.assertIsNone(enc_timestamp)
2616                     else:
2617                         self.assertIsNotNone(enc_timestamp)
2618                         self.assertIsNone(enc_challenge)
2619                     self.assertIsNotNone(pk_as_req)
2620                     self.assertIsNotNone(pk_as_rep19)
2621             else:
2622                 self.assertIsNone(enc_timestamp)
2623                 self.assertIsNone(enc_challenge)
2624                 self.assertIsNone(pk_as_req)
2625                 self.assertIsNone(pk_as_rep19)
2626             return None
2627
2628         if error_code != KDC_ERR_GENERIC:
2629             if self.strict_checking:
2630                 self.assertIsNotNone(etype_info2)
2631         else:
2632             self.assertIsNone(etype_info2)
2633         if expect_etype_info:
2634             self.assertIsNotNone(etype_info)
2635         else:
2636             if self.strict_checking:
2637                 self.assertIsNone(etype_info)
2638         if unexpect_etype_info:
2639             self.assertIsNone(etype_info)
2640
2641         if error_code != KDC_ERR_GENERIC and self.strict_checking:
2642             self.assertGreaterEqual(len(etype_info2), 1)
2643             self.assertEqual(len(etype_info2), len(expect_etype_info2))
2644             for i in range(0, len(etype_info2)):
2645                 e = self.getElementValue(etype_info2[i], 'etype')
2646                 self.assertEqual(e, expect_etype_info2[i])
2647                 salt = self.getElementValue(etype_info2[i], 'salt')
2648                 if e == kcrypto.Enctype.RC4:
2649                     self.assertIsNone(salt)
2650                 else:
2651                     self.assertIsNotNone(salt)
2652                     expected_salt = kdc_exchange_dict['expected_salt']
2653                     if expected_salt is not None:
2654                         self.assertEqual(salt, expected_salt)
2655                 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2656                 if self.strict_checking:
2657                     self.assertIsNone(s2kparams)
2658         if etype_info is not None:
2659             self.assertEqual(len(etype_info), 1)
2660             e = self.getElementValue(etype_info[0], 'etype')
2661             self.assertEqual(e, kcrypto.Enctype.RC4)
2662             self.assertEqual(e, expect_etype_info2[0])
2663             salt = self.getElementValue(etype_info[0], 'salt')
2664             if self.strict_checking:
2665                 self.assertIsNotNone(salt)
2666                 self.assertEqual(len(salt), 0)
2667
2668         if error_code not in (KDC_ERR_PREAUTH_FAILED,
2669                               KDC_ERR_GENERIC):
2670             if sent_fast:
2671                 self.assertIsNotNone(enc_challenge)
2672                 if self.strict_checking:
2673                     self.assertIsNone(enc_timestamp)
2674             else:
2675                 self.assertIsNotNone(enc_timestamp)
2676                 if self.strict_checking:
2677                     self.assertIsNone(enc_challenge)
2678             if not sent_enc_challenge:
2679                 if self.strict_checking:
2680                     self.assertIsNotNone(pk_as_req)
2681                     self.assertIsNotNone(pk_as_rep19)
2682             else:
2683                 self.assertIsNone(pk_as_req)
2684                 self.assertIsNone(pk_as_rep19)
2685         else:
2686             if self.strict_checking:
2687                 self.assertIsNone(enc_timestamp)
2688                 self.assertIsNone(enc_challenge)
2689             self.assertIsNone(pk_as_req)
2690             self.assertIsNone(pk_as_rep19)
2691
2692         return etype_info2
2693
2694     def generate_simple_fast(self,
2695                              kdc_exchange_dict,
2696                              _callback_dict,
2697                              req_body,
2698                              fast_padata,
2699                              fast_armor,
2700                              checksum,
2701                              fast_options=''):
2702         armor_key = kdc_exchange_dict['armor_key']
2703
2704         fast_req = self.KRB_FAST_REQ_create(fast_options,
2705                                             fast_padata,
2706                                             req_body)
2707         fast_req = self.der_encode(fast_req,
2708                                    asn1Spec=krb5_asn1.KrbFastReq())
2709         fast_req = self.EncryptedData_create(armor_key,
2710                                              KU_FAST_ENC,
2711                                              fast_req)
2712
2713         fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2714                                                             checksum,
2715                                                             fast_req)
2716
2717         fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2718         fx_fast_request = self.der_encode(
2719             fx_fast_request,
2720             asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2721
2722         fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2723                                           fx_fast_request)
2724
2725         return fast_padata
2726
2727     def generate_ap_req(self,
2728                         kdc_exchange_dict,
2729                         _callback_dict,
2730                         req_body,
2731                         armor):
2732         if armor:
2733             tgt = kdc_exchange_dict['armor_tgt']
2734             authenticator_subkey = kdc_exchange_dict['armor_subkey']
2735
2736             req_body_checksum = None
2737         else:
2738             tgt = kdc_exchange_dict['tgt']
2739             authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2740             body_checksum_type = kdc_exchange_dict['body_checksum_type']
2741
2742             req_body_blob = self.der_encode(req_body,
2743                                             asn1Spec=krb5_asn1.KDC_REQ_BODY())
2744
2745             req_body_checksum = self.Checksum_create(tgt.session_key,
2746                                                      KU_TGS_REQ_AUTH_CKSUM,
2747                                                      req_body_blob,
2748                                                      ctype=body_checksum_type)
2749
2750         auth_data = kdc_exchange_dict['auth_data']
2751
2752         subkey_obj = None
2753         if authenticator_subkey is not None:
2754             subkey_obj = authenticator_subkey.export_obj()
2755         seq_number = random.randint(0, 0xfffffffe)
2756         (ctime, cusec) = self.get_KerberosTimeWithUsec()
2757         authenticator_obj = self.Authenticator_create(
2758             crealm=tgt.crealm,
2759             cname=tgt.cname,
2760             cksum=req_body_checksum,
2761             cusec=cusec,
2762             ctime=ctime,
2763             subkey=subkey_obj,
2764             seq_number=seq_number,
2765             authorization_data=auth_data)
2766         authenticator_blob = self.der_encode(
2767             authenticator_obj,
2768             asn1Spec=krb5_asn1.Authenticator())
2769
2770         usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2771         authenticator = self.EncryptedData_create(tgt.session_key,
2772                                                   usage,
2773                                                   authenticator_blob)
2774
2775         ap_options = krb5_asn1.APOptions('0')
2776         ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2777                                         ticket=tgt.ticket,
2778                                         authenticator=authenticator)
2779         ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2780
2781         return ap_req
2782
2783     def generate_simple_tgs_padata(self,
2784                                    kdc_exchange_dict,
2785                                    callback_dict,
2786                                    req_body):
2787         ap_req = self.generate_ap_req(kdc_exchange_dict,
2788                                       callback_dict,
2789                                       req_body,
2790                                       armor=False)
2791         pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2792         padata = [pa_tgs_req]
2793
2794         return padata, req_body
2795
2796     def get_preauth_key(self, kdc_exchange_dict):
2797         msg_type = kdc_exchange_dict['rep_msg_type']
2798
2799         if msg_type == KRB_AS_REP:
2800             key = kdc_exchange_dict['preauth_key']
2801             usage = KU_AS_REP_ENC_PART
2802         else:  # KRB_TGS_REP
2803             authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2804             if authenticator_subkey is not None:
2805                 key = authenticator_subkey
2806                 usage = KU_TGS_REP_ENC_PART_SUB_KEY
2807             else:
2808                 tgt = kdc_exchange_dict['tgt']
2809                 key = tgt.session_key
2810                 usage = KU_TGS_REP_ENC_PART_SESSION
2811
2812         self.assertIsNotNone(key)
2813
2814         return key, usage
2815
2816     def generate_armor_key(self, subkey, session_key):
2817         armor_key = kcrypto.cf2(subkey.key,
2818                                 session_key.key,
2819                                 b'subkeyarmor',
2820                                 b'ticketarmor')
2821         armor_key = Krb5EncryptionKey(armor_key, None)
2822
2823         return armor_key
2824
2825     def generate_strengthen_reply_key(self, strengthen_key, reply_key):
2826         strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
2827                                            reply_key.key,
2828                                            b'strengthenkey',
2829                                            b'replykey')
2830         strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
2831                                                  reply_key.kvno)
2832
2833         return strengthen_reply_key
2834
2835     def generate_client_challenge_key(self, armor_key, longterm_key):
2836         client_challenge_key = kcrypto.cf2(armor_key.key,
2837                                            longterm_key.key,
2838                                            b'clientchallengearmor',
2839                                            b'challengelongterm')
2840         client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
2841
2842         return client_challenge_key
2843
2844     def generate_kdc_challenge_key(self, armor_key, longterm_key):
2845         kdc_challenge_key = kcrypto.cf2(armor_key.key,
2846                                         longterm_key.key,
2847                                         b'kdcchallengearmor',
2848                                         b'challengelongterm')
2849         kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
2850
2851         return kdc_challenge_key
2852
2853     def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
2854         expected_type = expected_checksum['cksumtype']
2855         self.assertEqual(armor_key.ctype, expected_type)
2856
2857         ticket_blob = self.der_encode(ticket,
2858                                       asn1Spec=krb5_asn1.Ticket())
2859         checksum = self.Checksum_create(armor_key,
2860                                         KU_FAST_FINISHED,
2861                                         ticket_blob)
2862         self.assertEqual(expected_checksum, checksum)
2863
2864     def replace_pac(self, auth_data, new_pac, expect_pac=True):
2865         if new_pac is not None:
2866             self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
2867             self.assertElementPresent(new_pac, 'ad-data')
2868
2869         new_auth_data = []
2870
2871         ad_relevant = None
2872         old_pac = None
2873
2874         for authdata_elem in auth_data:
2875             if authdata_elem['ad-type'] == AD_IF_RELEVANT:
2876                 ad_relevant = self.der_decode(
2877                     authdata_elem['ad-data'],
2878                     asn1Spec=krb5_asn1.AD_IF_RELEVANT())
2879
2880                 relevant_elems = []
2881                 for relevant_elem in ad_relevant:
2882                     if relevant_elem['ad-type'] == AD_WIN2K_PAC:
2883                         self.assertIsNone(old_pac, 'Multiple PACs detected')
2884                         old_pac = relevant_elem['ad-data']
2885
2886                         if new_pac is not None:
2887                             relevant_elems.append(new_pac)
2888                     else:
2889                         relevant_elems.append(relevant_elem)
2890                 if expect_pac:
2891                     self.assertIsNotNone(old_pac, 'Expected PAC')
2892
2893                 ad_relevant = self.der_encode(
2894                     relevant_elems,
2895                     asn1Spec=krb5_asn1.AD_IF_RELEVANT())
2896
2897                 authdata_elem = self.AuthorizationData_create(AD_IF_RELEVANT,
2898                                                               ad_relevant)
2899
2900             new_auth_data.append(authdata_elem)
2901
2902         if expect_pac:
2903             self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
2904
2905         return new_auth_data, old_pac
2906
2907     def get_outer_pa_dict(self, kdc_exchange_dict):
2908         return self.get_pa_dict(kdc_exchange_dict['req_padata'])
2909
2910     def get_fast_pa_dict(self, kdc_exchange_dict):
2911         req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
2912
2913         if req_pa_dict:
2914             return req_pa_dict
2915
2916         return self.get_outer_pa_dict(kdc_exchange_dict)
2917
2918     def sent_fast(self, kdc_exchange_dict):
2919         outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
2920
2921         return PADATA_FX_FAST in outer_pa_dict
2922
2923     def sent_enc_challenge(self, kdc_exchange_dict):
2924         fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
2925
2926         return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
2927
2928     def sent_claims(self, kdc_exchange_dict):
2929         fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
2930
2931         if PADATA_PAC_OPTIONS not in fast_pa_dict:
2932             return False
2933
2934         pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
2935                                       asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2936         pac_options = pac_options['options']
2937         claims_pos = len(tuple(krb5_asn1.PACOptionFlags('claims'))) - 1
2938
2939         return (claims_pos < len(pac_options)
2940                 and pac_options[claims_pos] == '1')
2941
2942     def get_krbtgt_sname(self):
2943         krbtgt_creds = self.get_krbtgt_creds()
2944         krbtgt_username = krbtgt_creds.get_username()
2945         krbtgt_realm = krbtgt_creds.get_realm()
2946         krbtgt_sname = self.PrincipalName_create(
2947             name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
2948
2949         return krbtgt_sname
2950
2951     def _test_as_exchange(self,
2952                           cname,
2953                           realm,
2954                           sname,
2955                           till,
2956                           client_as_etypes,
2957                           expected_error_mode,
2958                           expected_crealm,
2959                           expected_cname,
2960                           expected_srealm,
2961                           expected_sname,
2962                           expected_salt,
2963                           etypes,
2964                           padata,
2965                           kdc_options,
2966                           expected_flags=None,
2967                           unexpected_flags=None,
2968                           preauth_key=None,
2969                           ticket_decryption_key=None,
2970                           pac_request=None,
2971                           pac_options=None,
2972                           to_rodc=False):
2973
2974         def _generate_padata_copy(_kdc_exchange_dict,
2975                                   _callback_dict,
2976                                   req_body):
2977             return padata, req_body
2978
2979         if not expected_error_mode:
2980             check_error_fn = None
2981             check_rep_fn = self.generic_check_kdc_rep
2982         else:
2983             check_error_fn = self.generic_check_kdc_error
2984             check_rep_fn = None
2985
2986         if padata is not None:
2987             generate_padata_fn = _generate_padata_copy
2988         else:
2989             generate_padata_fn = None
2990
2991         kdc_exchange_dict = self.as_exchange_dict(
2992             expected_crealm=expected_crealm,
2993             expected_cname=expected_cname,
2994             expected_srealm=expected_srealm,
2995             expected_sname=expected_sname,
2996             ticket_decryption_key=ticket_decryption_key,
2997             generate_padata_fn=generate_padata_fn,
2998             check_error_fn=check_error_fn,
2999             check_rep_fn=check_rep_fn,
3000             check_kdc_private_fn=self.generic_check_kdc_private,
3001             expected_error_mode=expected_error_mode,
3002             client_as_etypes=client_as_etypes,
3003             expected_salt=expected_salt,
3004             expected_flags=expected_flags,
3005             unexpected_flags=unexpected_flags,
3006             preauth_key=preauth_key,
3007             kdc_options=str(kdc_options),
3008             pac_request=pac_request,
3009             pac_options=pac_options,
3010             to_rodc=to_rodc)
3011
3012         rep = self._generic_kdc_exchange(kdc_exchange_dict,
3013                                          cname=cname,
3014                                          realm=realm,
3015                                          sname=sname,
3016                                          till_time=till,
3017                                          etypes=etypes)
3018
3019         return rep, kdc_exchange_dict