CVE-2022-37966 python:tests/krb5: allow ticket/supported_etypes to be passed KdcTgsBa...
[abartlet/samba-autobuild/.git] / python / samba / tests / krb5 / kdc_tgs_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 import ldb
24
25
26 from samba import dsdb, ntstatus
27
28 from samba.dcerpc import krb5pac, security
29
30 sys.path.insert(0, "bin/python")
31 os.environ["PYTHONUNBUFFERED"] = "1"
32
33 import samba.tests.krb5.kcrypto as kcrypto
34 from samba.tests.krb5.kdc_base_test import KDCBaseTest
35 from samba.tests.krb5.raw_testcase import Krb5EncryptionKey
36 from samba.tests.krb5.rfc4120_constants import (
37     AES256_CTS_HMAC_SHA1_96,
38     ARCFOUR_HMAC_MD5,
39     KRB_ERROR,
40     KRB_TGS_REP,
41     KDC_ERR_BADKEYVER,
42     KDC_ERR_BADMATCH,
43     KDC_ERR_ETYPE_NOSUPP,
44     KDC_ERR_GENERIC,
45     KDC_ERR_MODIFIED,
46     KDC_ERR_NOT_US,
47     KDC_ERR_POLICY,
48     KDC_ERR_PREAUTH_REQUIRED,
49     KDC_ERR_C_PRINCIPAL_UNKNOWN,
50     KDC_ERR_S_PRINCIPAL_UNKNOWN,
51     KDC_ERR_TGT_REVOKED,
52     KRB_ERR_TKT_NYV,
53     KDC_ERR_WRONG_REALM,
54     NT_ENTERPRISE_PRINCIPAL,
55     NT_PRINCIPAL,
56     NT_SRV_INST,
57 )
58 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
59
60 global_asn1_print = False
61 global_hexdump = False
62
63
64 class KdcTgsBaseTests(KDCBaseTest):
65     def _as_req(self,
66                 creds,
67                 expected_error,
68                 target_creds,
69                 etype,
70                 expected_ticket_etype=None):
71         user_name = creds.get_username()
72         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
73                                           names=user_name.split('/'))
74
75         target_name = target_creds.get_username()
76         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
77                                           names=['host', target_name[:-1]])
78
79         if expected_error:
80             expected_sname = sname
81         else:
82             expected_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
83                                                        names=[target_name])
84
85         realm = creds.get_realm()
86         salt = creds.get_salt()
87
88         till = self.get_KerberosTime(offset=36000)
89
90         ticket_decryption_key = (
91             self.TicketDecryptionKey_from_creds(target_creds,
92                                                 etype=expected_ticket_etype))
93         expected_etypes = target_creds.tgs_supported_enctypes
94
95         kdc_options = ('forwardable,'
96                        'renewable,'
97                        'canonicalize,'
98                        'renewable-ok')
99         kdc_options = krb5_asn1.KDCOptions(kdc_options)
100
101         if expected_error:
102             initial_error = (KDC_ERR_PREAUTH_REQUIRED, expected_error)
103         else:
104             initial_error = KDC_ERR_PREAUTH_REQUIRED
105
106         rep, kdc_exchange_dict = self._test_as_exchange(
107             cname=cname,
108             realm=realm,
109             sname=sname,
110             till=till,
111             client_as_etypes=etype,
112             expected_error_mode=initial_error,
113             expected_crealm=realm,
114             expected_cname=cname,
115             expected_srealm=realm,
116             expected_sname=sname,
117             expected_salt=salt,
118             expected_supported_etypes=expected_etypes,
119             etypes=etype,
120             padata=None,
121             kdc_options=kdc_options,
122             preauth_key=None,
123             ticket_decryption_key=ticket_decryption_key)
124         self.assertIsNotNone(rep)
125         self.assertEqual(KRB_ERROR, rep['msg-type'])
126         error_code = rep['error-code']
127         if expected_error:
128             self.assertIn(error_code, initial_error)
129             if error_code == expected_error:
130                 return
131         else:
132             self.assertEqual(initial_error, error_code)
133
134         etype_info2 = kdc_exchange_dict['preauth_etype_info2']
135
136         preauth_key = self.PasswordKey_from_etype_info2(creds,
137                                                         etype_info2[0],
138                                                         creds.get_kvno())
139
140         ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
141
142         padata = [ts_enc_padata]
143
144         expected_realm = realm.upper()
145
146         rep, kdc_exchange_dict = self._test_as_exchange(
147             cname=cname,
148             realm=realm,
149             sname=sname,
150             till=till,
151             client_as_etypes=etype,
152             expected_error_mode=expected_error,
153             expected_crealm=expected_realm,
154             expected_cname=cname,
155             expected_srealm=expected_realm,
156             expected_sname=expected_sname,
157             expected_salt=salt,
158             expected_supported_etypes=expected_etypes,
159             etypes=etype,
160             padata=padata,
161             kdc_options=kdc_options,
162             preauth_key=preauth_key,
163             ticket_decryption_key=ticket_decryption_key,
164             expect_edata=False)
165         if expected_error:
166             self.check_error_rep(rep, expected_error)
167             return None
168
169         self.check_as_reply(rep)
170         return kdc_exchange_dict['rep_ticket_creds']
171
172     def _tgs_req(self, tgt, expected_error, target_creds,
173                  armor_tgt=None,
174                  kdc_options='0',
175                  expected_cname=None,
176                  expected_sname=None,
177                  additional_ticket=None,
178                  generate_padata_fn=None,
179                  sname=None,
180                  srealm=None,
181                  use_fast=False,
182                  expect_claims=True,
183                  etypes=None,
184                  expected_ticket_etype=None,
185                  expected_supported_etypes=None,
186                  expect_pac=True,
187                  expect_pac_attrs=None,
188                  expect_pac_attrs_pac_request=None,
189                  expect_requester_sid=None,
190                  expect_edata=False,
191                  expected_sid=None,
192                  expected_status=None):
193         if srealm is False:
194             srealm = None
195         elif srealm is None:
196             srealm = target_creds.get_realm()
197
198         if sname is False:
199             sname = None
200             if expected_sname is None:
201                 expected_sname = self.get_krbtgt_sname()
202         else:
203             if sname is None:
204                 target_name = target_creds.get_username()
205                 if target_name == 'krbtgt':
206                     sname = self.PrincipalName_create(
207                         name_type=NT_SRV_INST,
208                         names=[target_name, srealm])
209                 else:
210                     if target_name[-1] == '$':
211                         target_name = target_name[:-1]
212                     sname = self.PrincipalName_create(
213                         name_type=NT_PRINCIPAL,
214                         names=['host', target_name])
215
216             if expected_sname is None:
217                 expected_sname = sname
218
219         if additional_ticket is not None:
220             additional_tickets = [additional_ticket.ticket]
221             decryption_key = additional_ticket.session_key
222         else:
223             additional_tickets = None
224             decryption_key = self.TicketDecryptionKey_from_creds(
225                 target_creds, etype=expected_ticket_etype)
226
227         subkey = self.RandomKey(tgt.session_key.etype)
228
229         if armor_tgt is not None:
230             armor_subkey = self.RandomKey(subkey.etype)
231             explicit_armor_key = self.generate_armor_key(armor_subkey,
232                                                          armor_tgt.session_key)
233             armor_key = kcrypto.cf2(explicit_armor_key.key,
234                                     subkey.key,
235                                     b'explicitarmor',
236                                     b'tgsarmor')
237             armor_key = Krb5EncryptionKey(armor_key, None)
238
239             generate_fast_fn = self.generate_simple_fast
240             generate_fast_armor_fn = self.generate_ap_req
241
242             pac_options = '1'  # claims support
243         else:
244             armor_subkey = None
245             armor_key = None
246             generate_fast_fn = None
247             generate_fast_armor_fn = None
248
249             pac_options = None
250
251         if etypes is None:
252             etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
253
254         if expected_error:
255             check_error_fn = self.generic_check_kdc_error
256             check_rep_fn = None
257         else:
258             check_error_fn = None
259             check_rep_fn = self.generic_check_kdc_rep
260
261         if expected_cname is None:
262             expected_cname = tgt.cname
263
264         kdc_exchange_dict = self.tgs_exchange_dict(
265             expected_crealm=tgt.crealm,
266             expected_cname=expected_cname,
267             expected_srealm=srealm,
268             expected_sname=expected_sname,
269             ticket_decryption_key=decryption_key,
270             generate_padata_fn=generate_padata_fn,
271             generate_fast_fn=generate_fast_fn,
272             generate_fast_armor_fn=generate_fast_armor_fn,
273             check_error_fn=check_error_fn,
274             check_rep_fn=check_rep_fn,
275             check_kdc_private_fn=self.generic_check_kdc_private,
276             expected_error_mode=expected_error,
277             expected_status=expected_status,
278             tgt=tgt,
279             armor_key=armor_key,
280             armor_tgt=armor_tgt,
281             armor_subkey=armor_subkey,
282             pac_options=pac_options,
283             authenticator_subkey=subkey,
284             kdc_options=kdc_options,
285             expected_supported_etypes=expected_supported_etypes,
286             expect_edata=expect_edata,
287             expect_pac=expect_pac,
288             expect_pac_attrs=expect_pac_attrs,
289             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
290             expect_requester_sid=expect_requester_sid,
291             expected_sid=expected_sid,
292             expect_claims=expect_claims)
293
294         rep = self._generic_kdc_exchange(kdc_exchange_dict,
295                                          cname=None,
296                                          realm=srealm,
297                                          sname=sname,
298                                          etypes=etypes,
299                                          additional_tickets=additional_tickets)
300         if expected_error:
301             self.check_error_rep(rep, expected_error)
302             return None
303         else:
304             self.check_reply(rep, KRB_TGS_REP)
305             return kdc_exchange_dict['rep_ticket_creds']
306
307
308 class KdcTgsTests(KdcTgsBaseTests):
309
310     def setUp(self):
311         super().setUp()
312         self.do_asn1_print = global_asn1_print
313         self.do_hexdump = global_hexdump
314
315     def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
316         ''' Try and obtain a ticket from the TGS, but supply a cname
317             that differs from that provided to the krbtgt
318         '''
319         # Create the user account
320         samdb = self.get_samdb()
321         user_name = "tsttktusr"
322         (uc, _) = self.create_account(samdb, user_name)
323         realm = uc.get_realm().lower()
324
325         # Do the initial AS-REQ, should get a pre-authentication required
326         # response
327         etype = (AES256_CTS_HMAC_SHA1_96,)
328         cname = self.PrincipalName_create(
329             name_type=NT_PRINCIPAL, names=[user_name])
330         sname = self.PrincipalName_create(
331             name_type=NT_SRV_INST, names=["krbtgt", realm])
332
333         rep = self.as_req(cname, sname, realm, etype)
334         self.check_pre_authentication(rep)
335
336         # Do the next AS-REQ
337         padata = self.get_enc_timestamp_pa_data(uc, rep)
338         key = self.get_as_rep_key(uc, rep)
339         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
340         self.check_as_reply(rep)
341
342         # Request a service ticket, but use a cname that does not match
343         # that in the original AS-REQ
344         enc_part2 = self.get_as_rep_enc_data(key, rep)
345         key = self.EncryptionKey_import(enc_part2['key'])
346         ticket = rep['ticket']
347
348         cname = self.PrincipalName_create(
349             name_type=NT_PRINCIPAL,
350             names=["Administrator"])
351         sname = self.PrincipalName_create(
352             name_type=NT_PRINCIPAL,
353             names=["host", samdb.host_dns_name()])
354
355         (rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype,
356                                        expected_error_mode=KDC_ERR_BADMATCH,
357                                        expect_edata=False)
358
359         self.assertIsNone(
360             enc_part,
361             "rep = {%s}, enc_part = {%s}" % (rep, enc_part))
362         self.assertEqual(KRB_ERROR, rep['msg-type'], "rep = {%s}" % rep)
363         self.assertEqual(
364             KDC_ERR_BADMATCH,
365             rep['error-code'],
366             "rep = {%s}" % rep)
367
368     def test_ldap_service_ticket(self):
369         '''Get a ticket to the ldap service
370         '''
371         # Create the user account
372         samdb = self.get_samdb()
373         user_name = "tsttktusr"
374         (uc, _) = self.create_account(samdb, user_name)
375         realm = uc.get_realm().lower()
376
377         # Do the initial AS-REQ, should get a pre-authentication required
378         # response
379         etype = (AES256_CTS_HMAC_SHA1_96,)
380         cname = self.PrincipalName_create(
381             name_type=NT_PRINCIPAL, names=[user_name])
382         sname = self.PrincipalName_create(
383             name_type=NT_SRV_INST, names=["krbtgt", realm])
384
385         rep = self.as_req(cname, sname, realm, etype)
386         self.check_pre_authentication(rep)
387
388         # Do the next AS-REQ
389         padata = self.get_enc_timestamp_pa_data(uc, rep)
390         key = self.get_as_rep_key(uc, rep)
391         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
392         self.check_as_reply(rep)
393
394         enc_part2 = self.get_as_rep_enc_data(key, rep)
395         key = self.EncryptionKey_import(enc_part2['key'])
396         ticket = rep['ticket']
397
398         # Request a ticket to the ldap service
399         sname = self.PrincipalName_create(
400             name_type=NT_SRV_INST,
401             names=["ldap", samdb.host_dns_name()])
402
403         (rep, _) = self.tgs_req(
404             cname, sname, uc.get_realm(), ticket, key, etype,
405             service_creds=self.get_dc_creds())
406
407         self.check_tgs_reply(rep)
408
409     def test_get_ticket_for_host_service_of_machine_account(self):
410
411         # Create a user and machine account for the test.
412         #
413         samdb = self.get_samdb()
414         user_name = "tsttktusr"
415         (uc, dn) = self.create_account(samdb, user_name)
416         (mc, _) = self.create_account(samdb, "tsttktmac",
417                                       account_type=self.AccountType.COMPUTER)
418         realm = uc.get_realm().lower()
419
420         # Do the initial AS-REQ, should get a pre-authentication required
421         # response
422         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
423         cname = self.PrincipalName_create(
424             name_type=NT_PRINCIPAL, names=[user_name])
425         sname = self.PrincipalName_create(
426             name_type=NT_SRV_INST, names=["krbtgt", realm])
427
428         rep = self.as_req(cname, sname, realm, etype)
429         self.check_pre_authentication(rep)
430
431         # Do the next AS-REQ
432         padata = self.get_enc_timestamp_pa_data(uc, rep)
433         key = self.get_as_rep_key(uc, rep)
434         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
435         self.check_as_reply(rep)
436
437         # Request a ticket to the host service on the machine account
438         ticket = rep['ticket']
439         enc_part2 = self.get_as_rep_enc_data(key, rep)
440         key = self.EncryptionKey_import(enc_part2['key'])
441         cname = self.PrincipalName_create(
442             name_type=NT_PRINCIPAL,
443             names=[user_name])
444         sname = self.PrincipalName_create(
445             name_type=NT_PRINCIPAL,
446             names=[mc.get_username()])
447
448         (rep, enc_part) = self.tgs_req(
449             cname, sname, uc.get_realm(), ticket, key, etype,
450             service_creds=mc)
451         self.check_tgs_reply(rep)
452
453         # Check the contents of the service ticket
454         ticket = rep['ticket']
455         enc_part = self.decode_service_ticket(mc, ticket)
456
457         pac_data = self.get_pac_data(enc_part['authorization-data'])
458         sid = self.get_objectSid(samdb, dn)
459         upn = "%s@%s" % (uc.get_username(), realm)
460         self.assertEqual(
461             uc.get_username(),
462             str(pac_data.account_name),
463             "rep = {%s},%s" % (rep, pac_data))
464         self.assertEqual(
465             uc.get_username(),
466             pac_data.logon_name,
467             "rep = {%s},%s" % (rep, pac_data))
468         self.assertEqual(
469             uc.get_realm(),
470             pac_data.domain_name,
471             "rep = {%s},%s" % (rep, pac_data))
472         self.assertEqual(
473             upn,
474             pac_data.upn,
475             "rep = {%s},%s" % (rep, pac_data))
476         self.assertEqual(
477             sid,
478             pac_data.account_sid,
479             "rep = {%s},%s" % (rep, pac_data))
480
481     def test_request(self):
482         client_creds = self.get_client_creds()
483         service_creds = self.get_service_creds()
484
485         tgt = self.get_tgt(client_creds)
486
487         pac = self.get_ticket_pac(tgt)
488         self.assertIsNotNone(pac)
489
490         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
491
492         pac = self.get_ticket_pac(ticket)
493         self.assertIsNotNone(pac)
494
495     def test_request_no_pac(self):
496         client_creds = self.get_client_creds()
497         service_creds = self.get_service_creds()
498
499         tgt = self.get_tgt(client_creds, pac_request=False)
500
501         pac = self.get_ticket_pac(tgt)
502         self.assertIsNotNone(pac)
503
504         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
505                                         pac_request=False, expect_pac=False)
506
507         pac = self.get_ticket_pac(ticket, expect_pac=False)
508         self.assertIsNone(pac)
509
510     def test_request_enterprise_canon(self):
511         upn = self.get_new_username()
512         client_creds = self.get_cached_creds(
513             account_type=self.AccountType.USER,
514             opts={'upn': upn})
515         service_creds = self.get_service_creds()
516
517         user_name = client_creds.get_username()
518         realm = client_creds.get_realm()
519         client_account = f'{user_name}@{realm}'
520
521         expected_cname = self.PrincipalName_create(
522             name_type=NT_PRINCIPAL,
523             names=[user_name])
524
525         kdc_options = 'canonicalize'
526
527         tgt = self.get_tgt(client_creds,
528                            client_account=client_account,
529                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
530                            expected_cname=expected_cname,
531                            expected_account_name=user_name,
532                            kdc_options=kdc_options)
533
534         self._make_tgs_request(
535             client_creds, service_creds, tgt,
536             client_account=client_account,
537             client_name_type=NT_ENTERPRISE_PRINCIPAL,
538             expected_cname=expected_cname,
539             expected_account_name=user_name,
540             kdc_options=kdc_options)
541
542     def test_request_enterprise_canon_case(self):
543         upn = self.get_new_username()
544         client_creds = self.get_cached_creds(
545             account_type=self.AccountType.USER,
546             opts={'upn': upn})
547         service_creds = self.get_service_creds()
548
549         user_name = client_creds.get_username()
550         realm = client_creds.get_realm().lower()
551         client_account = f'{user_name}@{realm}'
552
553         expected_cname = self.PrincipalName_create(
554             name_type=NT_PRINCIPAL,
555             names=[user_name])
556
557         kdc_options = 'canonicalize'
558
559         tgt = self.get_tgt(client_creds,
560                            client_account=client_account,
561                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
562                            expected_cname=expected_cname,
563                            expected_account_name=user_name,
564                            kdc_options=kdc_options)
565
566         self._make_tgs_request(
567             client_creds, service_creds, tgt,
568             client_account=client_account,
569             client_name_type=NT_ENTERPRISE_PRINCIPAL,
570             expected_cname=expected_cname,
571             expected_account_name=user_name,
572             kdc_options=kdc_options)
573
574     def test_request_enterprise_canon_mac(self):
575         upn = self.get_new_username()
576         client_creds = self.get_cached_creds(
577             account_type=self.AccountType.COMPUTER,
578             opts={'upn': upn})
579         service_creds = self.get_service_creds()
580
581         user_name = client_creds.get_username()
582         realm = client_creds.get_realm()
583         client_account = f'{user_name}@{realm}'
584
585         expected_cname = self.PrincipalName_create(
586             name_type=NT_PRINCIPAL,
587             names=[user_name])
588
589         kdc_options = 'canonicalize'
590
591         tgt = self.get_tgt(client_creds,
592                            client_account=client_account,
593                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
594                            expected_cname=expected_cname,
595                            expected_account_name=user_name,
596                            kdc_options=kdc_options)
597
598         self._make_tgs_request(
599             client_creds, service_creds, tgt,
600             client_account=client_account,
601             client_name_type=NT_ENTERPRISE_PRINCIPAL,
602             expected_cname=expected_cname,
603             expected_account_name=user_name,
604             kdc_options=kdc_options)
605
606     def test_request_enterprise_canon_case_mac(self):
607         upn = self.get_new_username()
608         client_creds = self.get_cached_creds(
609             account_type=self.AccountType.COMPUTER,
610             opts={'upn': upn})
611         service_creds = self.get_service_creds()
612
613         user_name = client_creds.get_username()
614         realm = client_creds.get_realm().lower()
615         client_account = f'{user_name}@{realm}'
616
617         expected_cname = self.PrincipalName_create(
618             name_type=NT_PRINCIPAL,
619             names=[user_name])
620
621         kdc_options = 'canonicalize'
622
623         tgt = self.get_tgt(client_creds,
624                            client_account=client_account,
625                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
626                            expected_cname=expected_cname,
627                            expected_account_name=user_name,
628                            kdc_options=kdc_options)
629
630         self._make_tgs_request(
631             client_creds, service_creds, tgt,
632             client_account=client_account,
633             client_name_type=NT_ENTERPRISE_PRINCIPAL,
634             expected_cname=expected_cname,
635             expected_account_name=user_name,
636             kdc_options=kdc_options)
637
638     def test_request_enterprise_no_canon(self):
639         upn = self.get_new_username()
640         client_creds = self.get_cached_creds(
641             account_type=self.AccountType.USER,
642             opts={'upn': upn})
643         service_creds = self.get_service_creds()
644
645         user_name = client_creds.get_username()
646         realm = client_creds.get_realm()
647         client_account = f'{user_name}@{realm}'
648
649         kdc_options = '0'
650
651         tgt = self.get_tgt(client_creds,
652                            client_account=client_account,
653                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
654                            expected_account_name=user_name,
655                            kdc_options=kdc_options)
656
657         self._make_tgs_request(
658             client_creds, service_creds, tgt,
659             client_account=client_account,
660             client_name_type=NT_ENTERPRISE_PRINCIPAL,
661             expected_account_name=user_name,
662             kdc_options=kdc_options)
663
664     def test_request_enterprise_no_canon_case(self):
665         upn = self.get_new_username()
666         client_creds = self.get_cached_creds(
667             account_type=self.AccountType.USER,
668             opts={'upn': upn})
669         service_creds = self.get_service_creds()
670
671         user_name = client_creds.get_username()
672         realm = client_creds.get_realm().lower()
673         client_account = f'{user_name}@{realm}'
674
675         kdc_options = '0'
676
677         tgt = self.get_tgt(client_creds,
678                            client_account=client_account,
679                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
680                            expected_account_name=user_name,
681                            kdc_options=kdc_options)
682
683         self._make_tgs_request(
684             client_creds, service_creds, tgt,
685             client_account=client_account,
686             client_name_type=NT_ENTERPRISE_PRINCIPAL,
687             expected_account_name=user_name,
688             kdc_options=kdc_options)
689
690     def test_request_enterprise_no_canon_mac(self):
691         upn = self.get_new_username()
692         client_creds = self.get_cached_creds(
693             account_type=self.AccountType.COMPUTER,
694             opts={'upn': upn})
695         service_creds = self.get_service_creds()
696
697         user_name = client_creds.get_username()
698         realm = client_creds.get_realm()
699         client_account = f'{user_name}@{realm}'
700
701         kdc_options = '0'
702
703         tgt = self.get_tgt(client_creds,
704                            client_account=client_account,
705                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
706                            expected_account_name=user_name,
707                            kdc_options=kdc_options)
708
709         self._make_tgs_request(
710             client_creds, service_creds, tgt,
711             client_account=client_account,
712             client_name_type=NT_ENTERPRISE_PRINCIPAL,
713             expected_account_name=user_name,
714             kdc_options=kdc_options)
715
716     def test_request_enterprise_no_canon_case_mac(self):
717         upn = self.get_new_username()
718         client_creds = self.get_cached_creds(
719             account_type=self.AccountType.COMPUTER,
720             opts={'upn': upn})
721         service_creds = self.get_service_creds()
722
723         user_name = client_creds.get_username()
724         realm = client_creds.get_realm().lower()
725         client_account = f'{user_name}@{realm}'
726
727         kdc_options = '0'
728
729         tgt = self.get_tgt(client_creds,
730                            client_account=client_account,
731                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
732                            expected_account_name=user_name,
733                            kdc_options=kdc_options)
734
735         self._make_tgs_request(
736             client_creds, service_creds, tgt,
737             client_account=client_account,
738             client_name_type=NT_ENTERPRISE_PRINCIPAL,
739             expected_account_name=user_name,
740             kdc_options=kdc_options)
741
742     def test_client_no_auth_data_required(self):
743         client_creds = self.get_cached_creds(
744             account_type=self.AccountType.USER,
745             opts={'no_auth_data_required': True})
746         service_creds = self.get_service_creds()
747
748         tgt = self.get_tgt(client_creds)
749
750         pac = self.get_ticket_pac(tgt)
751         self.assertIsNotNone(pac)
752
753         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
754
755         pac = self.get_ticket_pac(ticket)
756         self.assertIsNotNone(pac)
757
758     def test_no_pac_client_no_auth_data_required(self):
759         client_creds = self.get_cached_creds(
760             account_type=self.AccountType.USER,
761             opts={'no_auth_data_required': True})
762         service_creds = self.get_service_creds()
763
764         tgt = self.get_tgt(client_creds)
765
766         pac = self.get_ticket_pac(tgt)
767         self.assertIsNotNone(pac)
768
769         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
770                                         pac_request=False, expect_pac=True)
771
772         pac = self.get_ticket_pac(ticket)
773         self.assertIsNotNone(pac)
774
775     def test_service_no_auth_data_required(self):
776         client_creds = self.get_client_creds()
777         service_creds = self.get_cached_creds(
778             account_type=self.AccountType.COMPUTER,
779             opts={'no_auth_data_required': True})
780
781         tgt = self.get_tgt(client_creds)
782
783         pac = self.get_ticket_pac(tgt)
784         self.assertIsNotNone(pac)
785
786         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
787                                         expect_pac=False)
788
789         pac = self.get_ticket_pac(ticket, expect_pac=False)
790         self.assertIsNone(pac)
791
792     def test_no_pac_service_no_auth_data_required(self):
793         client_creds = self.get_client_creds()
794         service_creds = self.get_cached_creds(
795             account_type=self.AccountType.COMPUTER,
796             opts={'no_auth_data_required': True})
797
798         tgt = self.get_tgt(client_creds, pac_request=False)
799
800         pac = self.get_ticket_pac(tgt)
801         self.assertIsNotNone(pac)
802
803         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
804                                         pac_request=False, expect_pac=False)
805
806         pac = self.get_ticket_pac(ticket, expect_pac=False)
807         self.assertIsNone(pac)
808
809     def test_remove_pac_service_no_auth_data_required(self):
810         client_creds = self.get_client_creds()
811         service_creds = self.get_cached_creds(
812             account_type=self.AccountType.COMPUTER,
813             opts={'no_auth_data_required': True})
814
815         tgt = self.modified_ticket(self.get_tgt(client_creds),
816                                    exclude_pac=True)
817
818         pac = self.get_ticket_pac(tgt, expect_pac=False)
819         self.assertIsNone(pac)
820
821         self._make_tgs_request(client_creds, service_creds, tgt,
822                                expect_error=True)
823
824     def test_remove_pac_client_no_auth_data_required(self):
825         client_creds = self.get_cached_creds(
826             account_type=self.AccountType.USER,
827             opts={'no_auth_data_required': True})
828         service_creds = self.get_service_creds()
829
830         tgt = self.modified_ticket(self.get_tgt(client_creds),
831                                    exclude_pac=True)
832
833         pac = self.get_ticket_pac(tgt, expect_pac=False)
834         self.assertIsNone(pac)
835
836         self._make_tgs_request(client_creds, service_creds, tgt,
837                                expect_error=True)
838
839     def test_remove_pac(self):
840         client_creds = self.get_client_creds()
841         service_creds = self.get_service_creds()
842
843         tgt = self.modified_ticket(self.get_tgt(client_creds),
844                                    exclude_pac=True)
845
846         pac = self.get_ticket_pac(tgt, expect_pac=False)
847         self.assertIsNone(pac)
848
849         self._make_tgs_request(client_creds, service_creds, tgt,
850                                expect_error=True)
851
852     def test_upn_dns_info_ex_user(self):
853         client_creds = self.get_client_creds()
854         self._run_upn_dns_info_ex_test(client_creds)
855
856     def test_upn_dns_info_ex_mac(self):
857         mach_creds = self.get_mach_creds()
858         self._run_upn_dns_info_ex_test(mach_creds)
859
860     def test_upn_dns_info_ex_upn_user(self):
861         client_creds = self.get_cached_creds(
862             account_type=self.AccountType.USER,
863             opts={'upn': 'upn_dns_info_test_upn0@bar'})
864         self._run_upn_dns_info_ex_test(client_creds)
865
866     def test_upn_dns_info_ex_upn_mac(self):
867         mach_creds = self.get_cached_creds(
868             account_type=self.AccountType.COMPUTER,
869             opts={'upn': 'upn_dns_info_test_upn1@bar'})
870         self._run_upn_dns_info_ex_test(mach_creds)
871
872     def _run_upn_dns_info_ex_test(self, client_creds):
873         service_creds = self.get_service_creds()
874
875         samdb = self.get_samdb()
876         dn = client_creds.get_dn()
877
878         account_name = client_creds.get_username()
879         upn_name = client_creds.get_upn()
880         if upn_name is None:
881             realm = client_creds.get_realm().lower()
882             upn_name = f'{account_name}@{realm}'
883         sid = self.get_objectSid(samdb, dn)
884
885         tgt = self.get_tgt(client_creds,
886                            expected_account_name=account_name,
887                            expected_upn_name=upn_name,
888                            expected_sid=sid)
889
890         self._make_tgs_request(client_creds, service_creds, tgt,
891                                expected_account_name=account_name,
892                                expected_upn_name=upn_name,
893                                expected_sid=sid)
894
895     # Test making a TGS request.
896     def test_tgs_req(self):
897         creds = self._get_creds()
898         tgt = self._get_tgt(creds)
899         self._run_tgs(tgt, expected_error=0)
900
901     def test_renew_req(self):
902         creds = self._get_creds()
903         tgt = self._get_tgt(creds, renewable=True)
904         self._renew_tgt(tgt, expected_error=0,
905                         expect_pac_attrs=True,
906                         expect_pac_attrs_pac_request=True,
907                         expect_requester_sid=True)
908
909     def test_validate_req(self):
910         creds = self._get_creds()
911         tgt = self._get_tgt(creds, invalid=True)
912         self._validate_tgt(tgt, expected_error=0,
913                            expect_pac_attrs=True,
914                            expect_pac_attrs_pac_request=True,
915                            expect_requester_sid=True)
916
917     def test_s4u2self_req(self):
918         creds = self._get_creds()
919         tgt = self._get_tgt(creds)
920         self._s4u2self(tgt, creds, expected_error=0)
921
922     def test_user2user_req(self):
923         creds = self._get_creds()
924         tgt = self._get_tgt(creds)
925         self._user2user(tgt, creds, expected_error=0)
926
927     def test_fast_req(self):
928         creds = self._get_creds()
929         tgt = self._get_tgt(creds)
930         self._fast(tgt, creds, expected_error=0)
931
932     def test_tgs_req_invalid(self):
933         creds = self._get_creds()
934         tgt = self._get_tgt(creds, invalid=True)
935         self._run_tgs(tgt, expected_error=KRB_ERR_TKT_NYV)
936
937     def test_s4u2self_req_invalid(self):
938         creds = self._get_creds()
939         tgt = self._get_tgt(creds, invalid=True)
940         self._s4u2self(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
941
942     def test_user2user_req_invalid(self):
943         creds = self._get_creds()
944         tgt = self._get_tgt(creds, invalid=True)
945         self._user2user(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
946
947     def test_fast_req_invalid(self):
948         creds = self._get_creds()
949         tgt = self._get_tgt(creds, invalid=True)
950         self._fast(tgt, creds, expected_error=KRB_ERR_TKT_NYV,
951                    expected_sname=self.get_krbtgt_sname())
952
953     def test_tgs_req_no_requester_sid(self):
954         creds = self._get_creds()
955         tgt = self._get_tgt(creds, remove_requester_sid=True)
956
957         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
958
959     def test_tgs_req_no_pac_attrs(self):
960         creds = self._get_creds()
961         tgt = self._get_tgt(creds, remove_pac_attrs=True)
962
963         self._run_tgs(tgt, expected_error=0, expect_pac=True,
964                       expect_pac_attrs=False)
965
966     def test_tgs_req_from_rodc_no_requester_sid(self):
967         creds = self._get_creds(replication_allowed=True,
968                                 revealed_to_rodc=True)
969         tgt = self._get_tgt(creds, from_rodc=True, remove_requester_sid=True)
970
971         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
972
973     def test_tgs_req_from_rodc_no_pac_attrs(self):
974         creds = self._get_creds(replication_allowed=True,
975                                 revealed_to_rodc=True)
976         tgt = self._get_tgt(creds, from_rodc=True, remove_pac_attrs=True)
977         self._run_tgs(tgt, expected_error=0, expect_pac=True,
978                       expect_pac_attrs=False)
979
980     # Test making a request without a PAC.
981     def test_tgs_no_pac(self):
982         creds = self._get_creds()
983         tgt = self._get_tgt(creds, remove_pac=True)
984         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
985
986     def test_renew_no_pac(self):
987         creds = self._get_creds()
988         tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
989         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
990
991     def test_validate_no_pac(self):
992         creds = self._get_creds()
993         tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
994         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
995
996     def test_s4u2self_no_pac(self):
997         creds = self._get_creds()
998         tgt = self._get_tgt(creds, remove_pac=True)
999         self._s4u2self(tgt, creds,
1000                        expected_error=KDC_ERR_TGT_REVOKED,
1001                        expect_edata=False)
1002
1003     def test_user2user_no_pac(self):
1004         creds = self._get_creds()
1005         tgt = self._get_tgt(creds, remove_pac=True)
1006         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1007
1008     def test_fast_no_pac(self):
1009         creds = self._get_creds()
1010         tgt = self._get_tgt(creds, remove_pac=True)
1011         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1012                    expected_sname=self.get_krbtgt_sname())
1013
1014     # Test making a request with authdata and without a PAC.
1015     def test_tgs_authdata_no_pac(self):
1016         creds = self._get_creds()
1017         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1018         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1019
1020     def test_renew_authdata_no_pac(self):
1021         creds = self._get_creds()
1022         tgt = self._get_tgt(creds, renewable=True, remove_pac=True,
1023                             allow_empty_authdata=True)
1024         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1025
1026     def test_validate_authdata_no_pac(self):
1027         creds = self._get_creds()
1028         tgt = self._get_tgt(creds, invalid=True, remove_pac=True,
1029                             allow_empty_authdata=True)
1030         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1031
1032     def test_s4u2self_authdata_no_pac(self):
1033         creds = self._get_creds()
1034         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1035         self._s4u2self(tgt, creds,
1036                        expected_error=KDC_ERR_TGT_REVOKED,
1037                        expect_edata=False)
1038
1039     def test_user2user_authdata_no_pac(self):
1040         creds = self._get_creds()
1041         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1042         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1043
1044     def test_fast_authdata_no_pac(self):
1045         creds = self._get_creds()
1046         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1047         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1048                    expected_sname=self.get_krbtgt_sname())
1049
1050     # Test changing the SID in the PAC to that of another account.
1051     def test_tgs_sid_mismatch_existing(self):
1052         creds = self._get_creds()
1053         existing_rid = self._get_existing_rid()
1054         tgt = self._get_tgt(creds, new_rid=existing_rid)
1055         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1056
1057     def test_renew_sid_mismatch_existing(self):
1058         creds = self._get_creds()
1059         existing_rid = self._get_existing_rid()
1060         tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
1061         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1062
1063     def test_validate_sid_mismatch_existing(self):
1064         creds = self._get_creds()
1065         existing_rid = self._get_existing_rid()
1066         tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
1067         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1068
1069     def test_s4u2self_sid_mismatch_existing(self):
1070         creds = self._get_creds()
1071         existing_rid = self._get_existing_rid()
1072         tgt = self._get_tgt(creds, new_rid=existing_rid)
1073         self._s4u2self(tgt, creds,
1074                        expected_error=KDC_ERR_TGT_REVOKED)
1075
1076     def test_user2user_sid_mismatch_existing(self):
1077         creds = self._get_creds()
1078         existing_rid = self._get_existing_rid()
1079         tgt = self._get_tgt(creds, new_rid=existing_rid)
1080         self._user2user(tgt, creds,
1081                         expected_error=KDC_ERR_TGT_REVOKED)
1082
1083     def test_fast_sid_mismatch_existing(self):
1084         creds = self._get_creds()
1085         existing_rid = self._get_existing_rid()
1086         tgt = self._get_tgt(creds, new_rid=existing_rid)
1087         self._fast(tgt, creds,
1088                    expected_error=KDC_ERR_TGT_REVOKED,
1089                    expected_sname=self.get_krbtgt_sname())
1090
1091     def test_requester_sid_mismatch_existing(self):
1092         creds = self._get_creds()
1093         existing_rid = self._get_existing_rid()
1094         tgt = self._get_tgt(creds, new_rid=existing_rid,
1095                             can_modify_logon_info=False)
1096         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1097
1098     def test_logon_info_sid_mismatch_existing(self):
1099         creds = self._get_creds()
1100         existing_rid = self._get_existing_rid()
1101         tgt = self._get_tgt(creds, new_rid=existing_rid,
1102                             can_modify_requester_sid=False)
1103         self._run_tgs(tgt, expected_error=0)
1104
1105     def test_logon_info_only_sid_mismatch_existing(self):
1106         creds = self._get_creds()
1107         existing_rid = self._get_existing_rid()
1108         tgt = self._get_tgt(creds, new_rid=existing_rid,
1109                             remove_requester_sid=True)
1110         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1111
1112     # Test changing the SID in the PAC to a non-existent one.
1113     def test_tgs_sid_mismatch_nonexisting(self):
1114         creds = self._get_creds()
1115         nonexistent_rid = self._get_non_existent_rid()
1116         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1117         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1118
1119     def test_renew_sid_mismatch_nonexisting(self):
1120         creds = self._get_creds()
1121         nonexistent_rid = self._get_non_existent_rid()
1122         tgt = self._get_tgt(creds, renewable=True,
1123                             new_rid=nonexistent_rid)
1124         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1125
1126     def test_validate_sid_mismatch_nonexisting(self):
1127         creds = self._get_creds()
1128         nonexistent_rid = self._get_non_existent_rid()
1129         tgt = self._get_tgt(creds, invalid=True,
1130                             new_rid=nonexistent_rid)
1131         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1132
1133     def test_s4u2self_sid_mismatch_nonexisting(self):
1134         creds = self._get_creds()
1135         nonexistent_rid = self._get_non_existent_rid()
1136         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1137         self._s4u2self(tgt, creds,
1138                        expected_error=KDC_ERR_TGT_REVOKED)
1139
1140     def test_user2user_sid_mismatch_nonexisting(self):
1141         creds = self._get_creds()
1142         nonexistent_rid = self._get_non_existent_rid()
1143         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1144         self._user2user(tgt, creds,
1145                         expected_error=KDC_ERR_TGT_REVOKED)
1146
1147     def test_fast_sid_mismatch_nonexisting(self):
1148         creds = self._get_creds()
1149         nonexistent_rid = self._get_non_existent_rid()
1150         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1151         self._fast(tgt, creds,
1152                    expected_error=KDC_ERR_TGT_REVOKED,
1153                    expected_sname=self.get_krbtgt_sname())
1154
1155     def test_requester_sid_mismatch_nonexisting(self):
1156         creds = self._get_creds()
1157         nonexistent_rid = self._get_non_existent_rid()
1158         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1159                             can_modify_logon_info=False)
1160         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1161
1162     def test_logon_info_sid_mismatch_nonexisting(self):
1163         creds = self._get_creds()
1164         nonexistent_rid = self._get_non_existent_rid()
1165         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1166                             can_modify_requester_sid=False)
1167         self._run_tgs(tgt, expected_error=0)
1168
1169     def test_logon_info_only_sid_mismatch_nonexisting(self):
1170         creds = self._get_creds()
1171         nonexistent_rid = self._get_non_existent_rid()
1172         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1173                             remove_requester_sid=True)
1174         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1175
1176     # Test with an RODC-issued ticket where the client is revealed to the RODC.
1177     def test_tgs_rodc_revealed(self):
1178         creds = self._get_creds(replication_allowed=True,
1179                                 revealed_to_rodc=True)
1180         tgt = self._get_tgt(creds, from_rodc=True)
1181         self._run_tgs(tgt, expected_error=0)
1182
1183     def test_renew_rodc_revealed(self):
1184         creds = self._get_creds(replication_allowed=True,
1185                                 revealed_to_rodc=True)
1186         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1187         self._renew_tgt(tgt, expected_error=0,
1188                         expect_pac_attrs=False,
1189                         expect_requester_sid=True)
1190
1191     def test_validate_rodc_revealed(self):
1192         creds = self._get_creds(replication_allowed=True,
1193                                 revealed_to_rodc=True)
1194         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1195         self._validate_tgt(tgt, expected_error=0,
1196                            expect_pac_attrs=False,
1197                            expect_requester_sid=True)
1198
1199     # This test fails on Windows, which gives KDC_ERR_C_PRINCIPAL_UNKNOWN when
1200     # attempting to use S4U2Self with a TGT from an RODC.
1201     def test_s4u2self_rodc_revealed(self):
1202         creds = self._get_creds(replication_allowed=True,
1203                                 revealed_to_rodc=True)
1204         tgt = self._get_tgt(creds, from_rodc=True)
1205         self._s4u2self(tgt, creds, expected_error=0)
1206
1207     def test_user2user_rodc_revealed(self):
1208         creds = self._get_creds(replication_allowed=True,
1209                                 revealed_to_rodc=True)
1210         tgt = self._get_tgt(creds, from_rodc=True)
1211         self._user2user(tgt, creds, expected_error=0)
1212
1213     # Test with an RODC-issued ticket where the SID in the PAC is changed to
1214     # that of another account.
1215     def test_tgs_rodc_sid_mismatch_existing(self):
1216         creds = self._get_creds(replication_allowed=True,
1217                                 revealed_to_rodc=True)
1218         existing_rid = self._get_existing_rid(replication_allowed=True,
1219                                               revealed_to_rodc=True)
1220         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1221         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1222
1223     def test_renew_rodc_sid_mismatch_existing(self):
1224         creds = self._get_creds(replication_allowed=True,
1225                                 revealed_to_rodc=True)
1226         existing_rid = self._get_existing_rid(replication_allowed=True,
1227                                               revealed_to_rodc=True)
1228         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1229                             new_rid=existing_rid)
1230         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1231
1232     def test_validate_rodc_sid_mismatch_existing(self):
1233         creds = self._get_creds(replication_allowed=True,
1234                                 revealed_to_rodc=True)
1235         existing_rid = self._get_existing_rid(replication_allowed=True,
1236                                        revealed_to_rodc=True)
1237         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1238                             new_rid=existing_rid)
1239         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1240
1241     def test_s4u2self_rodc_sid_mismatch_existing(self):
1242         creds = self._get_creds(replication_allowed=True,
1243                                 revealed_to_rodc=True)
1244         existing_rid = self._get_existing_rid(replication_allowed=True,
1245                                               revealed_to_rodc=True)
1246         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1247         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1248
1249     def test_user2user_rodc_sid_mismatch_existing(self):
1250         creds = self._get_creds(replication_allowed=True,
1251                                 revealed_to_rodc=True)
1252         existing_rid = self._get_existing_rid(replication_allowed=True,
1253                                               revealed_to_rodc=True)
1254         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1255         self._user2user(tgt, creds,
1256                         expected_error=KDC_ERR_TGT_REVOKED)
1257
1258     def test_fast_rodc_sid_mismatch_existing(self):
1259         creds = self._get_creds(replication_allowed=True,
1260                                 revealed_to_rodc=True)
1261         existing_rid = self._get_existing_rid(replication_allowed=True,
1262                                               revealed_to_rodc=True)
1263         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1264         self._fast(tgt, creds,
1265                    expected_error=KDC_ERR_TGT_REVOKED,
1266                    expected_sname=self.get_krbtgt_sname())
1267
1268     def test_tgs_rodc_requester_sid_mismatch_existing(self):
1269         creds = self._get_creds(replication_allowed=True,
1270                                 revealed_to_rodc=True)
1271         existing_rid = self._get_existing_rid(replication_allowed=True,
1272                                               revealed_to_rodc=True)
1273         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1274                             can_modify_logon_info=False)
1275         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1276
1277     def test_tgs_rodc_logon_info_sid_mismatch_existing(self):
1278         creds = self._get_creds(replication_allowed=True,
1279                                 revealed_to_rodc=True)
1280         existing_rid = self._get_existing_rid(replication_allowed=True,
1281                                               revealed_to_rodc=True)
1282         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1283                             can_modify_requester_sid=False)
1284         self._run_tgs(tgt, expected_error=0)
1285
1286     def test_tgs_rodc_logon_info_only_sid_mismatch_existing(self):
1287         creds = self._get_creds(replication_allowed=True,
1288                                 revealed_to_rodc=True)
1289         existing_rid = self._get_existing_rid(replication_allowed=True,
1290                                               revealed_to_rodc=True)
1291         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1292                             remove_requester_sid=True)
1293         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1294
1295     # Test with an RODC-issued ticket where the SID in the PAC is changed to a
1296     # non-existent one.
1297     def test_tgs_rodc_sid_mismatch_nonexisting(self):
1298         creds = self._get_creds(replication_allowed=True,
1299                                 revealed_to_rodc=True)
1300         nonexistent_rid = self._get_non_existent_rid()
1301         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1302         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1303
1304     def test_renew_rodc_sid_mismatch_nonexisting(self):
1305         creds = self._get_creds(replication_allowed=True,
1306                                 revealed_to_rodc=True)
1307         nonexistent_rid = self._get_non_existent_rid()
1308         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1309                             new_rid=nonexistent_rid)
1310         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1311
1312     def test_validate_rodc_sid_mismatch_nonexisting(self):
1313         creds = self._get_creds(replication_allowed=True,
1314                                 revealed_to_rodc=True)
1315         nonexistent_rid = self._get_non_existent_rid()
1316         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1317                             new_rid=nonexistent_rid)
1318         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1319
1320     def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
1321         creds = self._get_creds(replication_allowed=True,
1322                                 revealed_to_rodc=True)
1323         nonexistent_rid = self._get_non_existent_rid()
1324         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1325         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1326
1327     def test_user2user_rodc_sid_mismatch_nonexisting(self):
1328         creds = self._get_creds(replication_allowed=True,
1329                                 revealed_to_rodc=True)
1330         nonexistent_rid = self._get_non_existent_rid()
1331         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1332         self._user2user(tgt, creds,
1333                         expected_error=KDC_ERR_TGT_REVOKED)
1334
1335     def test_fast_rodc_sid_mismatch_nonexisting(self):
1336         creds = self._get_creds(replication_allowed=True,
1337                                 revealed_to_rodc=True)
1338         nonexistent_rid = self._get_non_existent_rid()
1339         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1340         self._fast(tgt, creds,
1341                    expected_error=KDC_ERR_TGT_REVOKED,
1342                    expected_sname=self.get_krbtgt_sname())
1343
1344     def test_tgs_rodc_requester_sid_mismatch_nonexisting(self):
1345         creds = self._get_creds(replication_allowed=True,
1346                                 revealed_to_rodc=True)
1347         nonexistent_rid = self._get_non_existent_rid()
1348         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1349                             can_modify_logon_info=False)
1350         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1351
1352     def test_tgs_rodc_logon_info_sid_mismatch_nonexisting(self):
1353         creds = self._get_creds(replication_allowed=True,
1354                                 revealed_to_rodc=True)
1355         nonexistent_rid = self._get_non_existent_rid()
1356         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1357                             can_modify_requester_sid=False)
1358         self._run_tgs(tgt, expected_error=0)
1359
1360     def test_tgs_rodc_logon_info_only_sid_mismatch_nonexisting(self):
1361         creds = self._get_creds(replication_allowed=True,
1362                                 revealed_to_rodc=True)
1363         nonexistent_rid = self._get_non_existent_rid()
1364         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1365                             remove_requester_sid=True)
1366         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1367
1368     # Test with an RODC-issued ticket where the client is not revealed to the
1369     # RODC.
1370     def test_tgs_rodc_not_revealed(self):
1371         creds = self._get_creds(replication_allowed=True)
1372         tgt = self._get_tgt(creds, from_rodc=True)
1373         # TODO: error code
1374         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1375
1376     def test_renew_rodc_not_revealed(self):
1377         creds = self._get_creds(replication_allowed=True)
1378         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1379         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1380
1381     def test_validate_rodc_not_revealed(self):
1382         creds = self._get_creds(replication_allowed=True)
1383         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1384         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1385
1386     def test_s4u2self_rodc_not_revealed(self):
1387         creds = self._get_creds(replication_allowed=True)
1388         tgt = self._get_tgt(creds, from_rodc=True)
1389         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1390
1391     def test_user2user_rodc_not_revealed(self):
1392         creds = self._get_creds(replication_allowed=True)
1393         tgt = self._get_tgt(creds, from_rodc=True)
1394         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1395
1396     # Test with an RODC-issued ticket where the RODC account does not have the
1397     # PARTIAL_SECRETS bit set.
1398     def test_tgs_rodc_no_partial_secrets(self):
1399         creds = self._get_creds(replication_allowed=True,
1400                                 revealed_to_rodc=True)
1401         tgt = self._get_tgt(creds, from_rodc=True)
1402         self._remove_rodc_partial_secrets()
1403         self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
1404
1405     def test_renew_rodc_no_partial_secrets(self):
1406         creds = self._get_creds(replication_allowed=True,
1407                                 revealed_to_rodc=True)
1408         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1409         self._remove_rodc_partial_secrets()
1410         self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
1411
1412     def test_validate_rodc_no_partial_secrets(self):
1413         creds = self._get_creds(replication_allowed=True,
1414                                 revealed_to_rodc=True)
1415         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1416         self._remove_rodc_partial_secrets()
1417         self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
1418
1419     def test_s4u2self_rodc_no_partial_secrets(self):
1420         creds = self._get_creds(replication_allowed=True,
1421                                 revealed_to_rodc=True)
1422         tgt = self._get_tgt(creds, from_rodc=True)
1423         self._remove_rodc_partial_secrets()
1424         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1425
1426     def test_user2user_rodc_no_partial_secrets(self):
1427         creds = self._get_creds(replication_allowed=True,
1428                                 revealed_to_rodc=True)
1429         tgt = self._get_tgt(creds, from_rodc=True)
1430         self._remove_rodc_partial_secrets()
1431         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1432
1433     def test_fast_rodc_no_partial_secrets(self):
1434         creds = self._get_creds(replication_allowed=True,
1435                                 revealed_to_rodc=True)
1436         tgt = self._get_tgt(creds, from_rodc=True)
1437         self._remove_rodc_partial_secrets()
1438         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1439                    expected_sname=self.get_krbtgt_sname())
1440
1441     # Test with an RODC-issued ticket where the RODC account does not have an
1442     # msDS-KrbTgtLink.
1443     def test_tgs_rodc_no_krbtgt_link(self):
1444         creds = self._get_creds(replication_allowed=True,
1445                                 revealed_to_rodc=True)
1446         tgt = self._get_tgt(creds, from_rodc=True)
1447         self._remove_rodc_krbtgt_link()
1448         self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
1449
1450     def test_renew_rodc_no_krbtgt_link(self):
1451         creds = self._get_creds(replication_allowed=True,
1452                                 revealed_to_rodc=True)
1453         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1454         self._remove_rodc_krbtgt_link()
1455         self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
1456
1457     def test_validate_rodc_no_krbtgt_link(self):
1458         creds = self._get_creds(replication_allowed=True,
1459                                 revealed_to_rodc=True)
1460         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1461         self._remove_rodc_krbtgt_link()
1462         self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
1463
1464     def test_s4u2self_rodc_no_krbtgt_link(self):
1465         creds = self._get_creds(replication_allowed=True,
1466                                 revealed_to_rodc=True)
1467         tgt = self._get_tgt(creds, from_rodc=True)
1468         self._remove_rodc_krbtgt_link()
1469         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1470
1471     def test_user2user_rodc_no_krbtgt_link(self):
1472         creds = self._get_creds(replication_allowed=True,
1473                                 revealed_to_rodc=True)
1474         tgt = self._get_tgt(creds, from_rodc=True)
1475         self._remove_rodc_krbtgt_link()
1476         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1477
1478     def test_fast_rodc_no_krbtgt_link(self):
1479         creds = self._get_creds(replication_allowed=True,
1480                                 revealed_to_rodc=True)
1481         tgt = self._get_tgt(creds, from_rodc=True)
1482         self._remove_rodc_krbtgt_link()
1483         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1484                    expected_sname=self.get_krbtgt_sname())
1485
1486     # Test with an RODC-issued ticket where the client is not allowed to
1487     # replicate to the RODC.
1488     def test_tgs_rodc_not_allowed(self):
1489         creds = self._get_creds(revealed_to_rodc=True)
1490         tgt = self._get_tgt(creds, from_rodc=True)
1491         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1492
1493     def test_renew_rodc_not_allowed(self):
1494         creds = self._get_creds(revealed_to_rodc=True)
1495         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1496         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1497
1498     def test_validate_rodc_not_allowed(self):
1499         creds = self._get_creds(revealed_to_rodc=True)
1500         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1501         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1502
1503     def test_s4u2self_rodc_not_allowed(self):
1504         creds = self._get_creds(revealed_to_rodc=True)
1505         tgt = self._get_tgt(creds, from_rodc=True)
1506         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1507
1508     def test_user2user_rodc_not_allowed(self):
1509         creds = self._get_creds(revealed_to_rodc=True)
1510         tgt = self._get_tgt(creds, from_rodc=True)
1511         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1512
1513     def test_fast_rodc_not_allowed(self):
1514         creds = self._get_creds(revealed_to_rodc=True)
1515         tgt = self._get_tgt(creds, from_rodc=True)
1516         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1517                    expected_sname=self.get_krbtgt_sname())
1518
1519     # Test with an RODC-issued ticket where the client is denied from
1520     # replicating to the RODC.
1521     def test_tgs_rodc_denied(self):
1522         creds = self._get_creds(replication_denied=True,
1523                                 revealed_to_rodc=True)
1524         tgt = self._get_tgt(creds, from_rodc=True)
1525         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1526
1527     def test_renew_rodc_denied(self):
1528         creds = self._get_creds(replication_denied=True,
1529                                 revealed_to_rodc=True)
1530         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1531         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1532
1533     def test_validate_rodc_denied(self):
1534         creds = self._get_creds(replication_denied=True,
1535                                 revealed_to_rodc=True)
1536         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1537         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1538
1539     def test_s4u2self_rodc_denied(self):
1540         creds = self._get_creds(replication_denied=True,
1541                                 revealed_to_rodc=True)
1542         tgt = self._get_tgt(creds, from_rodc=True)
1543         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1544
1545     def test_user2user_rodc_denied(self):
1546         creds = self._get_creds(replication_denied=True,
1547                                 revealed_to_rodc=True)
1548         tgt = self._get_tgt(creds, from_rodc=True)
1549         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1550
1551     def test_fast_rodc_denied(self):
1552         creds = self._get_creds(replication_denied=True,
1553                                 revealed_to_rodc=True)
1554         tgt = self._get_tgt(creds, from_rodc=True)
1555         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1556                    expected_sname=self.get_krbtgt_sname())
1557
1558     # Test with an RODC-issued ticket where the client is both allowed and
1559     # denied replicating to the RODC.
1560     def test_tgs_rodc_allowed_denied(self):
1561         creds = self._get_creds(replication_allowed=True,
1562                                 replication_denied=True,
1563                                 revealed_to_rodc=True)
1564         tgt = self._get_tgt(creds, from_rodc=True)
1565         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1566
1567     def test_renew_rodc_allowed_denied(self):
1568         creds = self._get_creds(replication_allowed=True,
1569                                 replication_denied=True,
1570                                 revealed_to_rodc=True)
1571         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1572         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1573
1574     def test_validate_rodc_allowed_denied(self):
1575         creds = self._get_creds(replication_allowed=True,
1576                                 replication_denied=True,
1577                                 revealed_to_rodc=True)
1578         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1579         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1580
1581     def test_s4u2self_rodc_allowed_denied(self):
1582         creds = self._get_creds(replication_allowed=True,
1583                                 replication_denied=True,
1584                                 revealed_to_rodc=True)
1585         tgt = self._get_tgt(creds, from_rodc=True)
1586         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1587
1588     def test_user2user_rodc_allowed_denied(self):
1589         creds = self._get_creds(replication_allowed=True,
1590                                 replication_denied=True,
1591                                 revealed_to_rodc=True)
1592         tgt = self._get_tgt(creds, from_rodc=True)
1593         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1594
1595     def test_fast_rodc_allowed_denied(self):
1596         creds = self._get_creds(replication_allowed=True,
1597                                 replication_denied=True,
1598                                 revealed_to_rodc=True)
1599         tgt = self._get_tgt(creds, from_rodc=True)
1600         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1601                    expected_sname=self.get_krbtgt_sname())
1602
1603     # Test making a TGS request with an RC4-encrypted TGT.
1604     def test_tgs_rc4(self):
1605         creds = self._get_creds()
1606         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1607         self._run_tgs(tgt, expected_error=(KDC_ERR_GENERIC,
1608                                            KDC_ERR_BADKEYVER),
1609                       expect_edata=True,
1610                       expected_status=ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1611
1612     def test_renew_rc4(self):
1613         creds = self._get_creds()
1614         tgt = self._get_tgt(creds, renewable=True, etype=kcrypto.Enctype.RC4)
1615         self._renew_tgt(tgt, expected_error=(KDC_ERR_GENERIC,
1616                                              KDC_ERR_BADKEYVER),
1617                         expect_pac_attrs=True,
1618                         expect_pac_attrs_pac_request=True,
1619                         expect_requester_sid=True)
1620
1621     def test_validate_rc4(self):
1622         creds = self._get_creds()
1623         tgt = self._get_tgt(creds, invalid=True, etype=kcrypto.Enctype.RC4)
1624         self._validate_tgt(tgt, expected_error=(KDC_ERR_GENERIC,
1625                                                 KDC_ERR_BADKEYVER),
1626                            expect_pac_attrs=True,
1627                            expect_pac_attrs_pac_request=True,
1628                            expect_requester_sid=True)
1629
1630     def test_s4u2self_rc4(self):
1631         creds = self._get_creds()
1632         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1633         self._s4u2self(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1634                                                    KDC_ERR_BADKEYVER),
1635                        expect_edata=True,
1636                        expected_status=ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1637
1638     def test_user2user_rc4(self):
1639         creds = self._get_creds()
1640         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1641         self._user2user(tgt, creds, expected_error=KDC_ERR_ETYPE_NOSUPP)
1642
1643     def test_fast_rc4(self):
1644         creds = self._get_creds()
1645         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1646         self._fast(tgt, creds, expected_error=KDC_ERR_GENERIC,
1647                    expect_edata=self.expect_padata_outer)
1648
1649     # Test user-to-user with incorrect service principal names.
1650     def test_user2user_matching_sname_host(self):
1651         creds = self._get_creds()
1652         tgt = self._get_tgt(creds)
1653
1654         user_name = creds.get_username()
1655         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1656                                           names=['host', user_name])
1657
1658         self._user2user(tgt, creds, sname=sname,
1659                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1660
1661     def test_user2user_matching_sname_no_host(self):
1662         creds = self._get_creds()
1663         tgt = self._get_tgt(creds)
1664
1665         user_name = creds.get_username()
1666         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1667                                           names=[user_name])
1668
1669         self._user2user(tgt, creds, sname=sname, expected_error=0)
1670
1671     def test_user2user_wrong_sname(self):
1672         creds = self._get_creds()
1673         tgt = self._get_tgt(creds)
1674
1675         other_creds = self._get_mach_creds()
1676         user_name = other_creds.get_username()
1677         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1678                                           names=[user_name])
1679
1680         self._user2user(tgt, creds, sname=sname,
1681                         expected_error=KDC_ERR_BADMATCH)
1682
1683     def test_user2user_other_sname(self):
1684         other_name = self.get_new_username()
1685         spn = f'host/{other_name}'
1686         creds = self.get_cached_creds(
1687             account_type=self.AccountType.COMPUTER,
1688             opts={'spn': spn})
1689         tgt = self._get_tgt(creds)
1690
1691         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1692                                           names=['host', other_name])
1693
1694         self._user2user(tgt, creds, sname=sname, expected_error=0)
1695
1696     def test_user2user_wrong_sname_krbtgt(self):
1697         creds = self._get_creds()
1698         tgt = self._get_tgt(creds)
1699
1700         sname = self.get_krbtgt_sname()
1701
1702         self._user2user(tgt, creds, sname=sname,
1703                         expected_error=KDC_ERR_BADMATCH)
1704
1705     def test_user2user_wrong_srealm(self):
1706         creds = self._get_creds()
1707         tgt = self._get_tgt(creds)
1708
1709         self._user2user(tgt, creds, srealm='OTHER.REALM',
1710                         expected_error=(KDC_ERR_WRONG_REALM,
1711                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1712
1713     def test_user2user_tgt_correct_realm(self):
1714         creds = self._get_creds()
1715         tgt = self._get_tgt(creds)
1716
1717         realm = creds.get_realm().encode('utf-8')
1718         tgt = self._modify_tgt(tgt, realm)
1719
1720         self._user2user(tgt, creds,
1721                         expected_error=0)
1722
1723     def test_user2user_tgt_wrong_realm(self):
1724         creds = self._get_creds()
1725         tgt = self._get_tgt(creds)
1726
1727         tgt = self._modify_tgt(tgt, b'OTHER.REALM')
1728
1729         self._user2user(tgt, creds,
1730                         expected_error=0)
1731
1732     def test_user2user_tgt_correct_cname(self):
1733         creds = self._get_creds()
1734         tgt = self._get_tgt(creds)
1735
1736         user_name = creds.get_username()
1737         user_name = user_name.encode('utf-8')
1738         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1739                                           names=[user_name])
1740
1741         tgt = self._modify_tgt(tgt, cname=cname)
1742
1743         self._user2user(tgt, creds, expected_error=0)
1744
1745     def test_user2user_tgt_other_cname(self):
1746         samdb = self.get_samdb()
1747
1748         other_name = self.get_new_username()
1749         upn = f'{other_name}@{samdb.domain_dns_name()}'
1750
1751         creds = self.get_cached_creds(
1752             account_type=self.AccountType.COMPUTER,
1753             opts={'upn': upn})
1754         tgt = self._get_tgt(creds)
1755
1756         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1757                                           names=[other_name.encode('utf-8')])
1758
1759         tgt = self._modify_tgt(tgt, cname=cname)
1760
1761         self._user2user(tgt, creds, expected_error=0)
1762
1763     def test_user2user_tgt_cname_host(self):
1764         creds = self._get_creds()
1765         tgt = self._get_tgt(creds)
1766
1767         user_name = creds.get_username()
1768         user_name = user_name.encode('utf-8')
1769         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1770                                           names=[b'host', user_name])
1771
1772         tgt = self._modify_tgt(tgt, cname=cname)
1773
1774         self._user2user(tgt, creds,
1775                         expected_error=(KDC_ERR_TGT_REVOKED,
1776                                         KDC_ERR_C_PRINCIPAL_UNKNOWN))
1777
1778     def test_user2user_non_existent_sname(self):
1779         creds = self._get_creds()
1780         tgt = self._get_tgt(creds)
1781
1782         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1783                                           names=['host', 'non_existent_user'])
1784
1785         self._user2user(tgt, creds, sname=sname,
1786                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1787
1788     def test_user2user_no_sname(self):
1789         creds = self._get_creds()
1790         tgt = self._get_tgt(creds)
1791
1792         self._user2user(tgt, creds, sname=False,
1793                         expected_error=(KDC_ERR_GENERIC,
1794                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1795
1796     def test_tgs_service_ticket(self):
1797         creds = self._get_creds()
1798         tgt = self._get_tgt(creds)
1799
1800         service_creds = self.get_service_creds()
1801         service_ticket = self.get_service_ticket(tgt, service_creds)
1802
1803         self._run_tgs(service_ticket,
1804                       expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1805
1806     def test_renew_service_ticket(self):
1807         creds = self._get_creds()
1808         tgt = self._get_tgt(creds)
1809
1810         service_creds = self.get_service_creds()
1811         service_ticket = self.get_service_ticket(tgt, service_creds)
1812
1813         service_ticket = self.modified_ticket(
1814             service_ticket,
1815             modify_fn=self._modify_renewable,
1816             checksum_keys=self.get_krbtgt_checksum_key())
1817
1818         self._renew_tgt(service_ticket,
1819                         expected_error=KDC_ERR_POLICY)
1820
1821     def test_validate_service_ticket(self):
1822         creds = self._get_creds()
1823         tgt = self._get_tgt(creds)
1824
1825         service_creds = self.get_service_creds()
1826         service_ticket = self.get_service_ticket(tgt, service_creds)
1827
1828         service_ticket = self.modified_ticket(
1829             service_ticket,
1830             modify_fn=self._modify_invalid,
1831             checksum_keys=self.get_krbtgt_checksum_key())
1832
1833         self._validate_tgt(service_ticket,
1834                            expected_error=KDC_ERR_POLICY)
1835
1836     def test_s4u2self_service_ticket(self):
1837         creds = self._get_creds()
1838         tgt = self._get_tgt(creds)
1839
1840         service_creds = self.get_service_creds()
1841         service_ticket = self.get_service_ticket(tgt, service_creds)
1842
1843         self._s4u2self(service_ticket, creds,
1844                        expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1845
1846     def test_user2user_service_ticket(self):
1847         creds = self._get_creds()
1848         tgt = self._get_tgt(creds)
1849
1850         service_creds = self.get_service_creds()
1851         service_ticket = self.get_service_ticket(tgt, service_creds)
1852
1853         self._user2user(service_ticket, creds,
1854                         expected_error=(KDC_ERR_MODIFIED, KDC_ERR_POLICY))
1855
1856     # Expected to fail against Windows, which does not produce an error.
1857     def test_fast_service_ticket(self):
1858         creds = self._get_creds()
1859         tgt = self._get_tgt(creds)
1860
1861         service_creds = self.get_service_creds()
1862         service_ticket = self.get_service_ticket(tgt, service_creds)
1863
1864         self._fast(service_ticket, creds,
1865                    expected_error=(KDC_ERR_POLICY,
1866                                    KDC_ERR_S_PRINCIPAL_UNKNOWN))
1867
1868     def test_pac_attrs_none(self):
1869         creds = self._get_creds()
1870         self.get_tgt(creds, pac_request=None,
1871                      expect_pac=True,
1872                      expect_pac_attrs=True,
1873                      expect_pac_attrs_pac_request=None)
1874
1875     def test_pac_attrs_false(self):
1876         creds = self._get_creds()
1877         self.get_tgt(creds, pac_request=False,
1878                      expect_pac=True,
1879                      expect_pac_attrs=True,
1880                      expect_pac_attrs_pac_request=False)
1881
1882     def test_pac_attrs_true(self):
1883         creds = self._get_creds()
1884         self.get_tgt(creds, pac_request=True,
1885                      expect_pac=True,
1886                      expect_pac_attrs=True,
1887                      expect_pac_attrs_pac_request=True)
1888
1889     def test_pac_attrs_renew_none(self):
1890         creds = self._get_creds()
1891         tgt = self.get_tgt(creds, pac_request=None,
1892                            expect_pac=True,
1893                            expect_pac_attrs=True,
1894                            expect_pac_attrs_pac_request=None)
1895         tgt = self._modify_tgt(tgt, renewable=True)
1896
1897         self._renew_tgt(tgt, expected_error=0,
1898                         expect_pac=True,
1899                         expect_pac_attrs=True,
1900                         expect_pac_attrs_pac_request=None,
1901                         expect_requester_sid=True)
1902
1903     def test_pac_attrs_renew_false(self):
1904         creds = self._get_creds()
1905         tgt = self.get_tgt(creds, pac_request=False,
1906                            expect_pac=True,
1907                            expect_pac_attrs=True,
1908                            expect_pac_attrs_pac_request=False)
1909         tgt = self._modify_tgt(tgt, renewable=True)
1910
1911         self._renew_tgt(tgt, expected_error=0,
1912                         expect_pac=True,
1913                         expect_pac_attrs=True,
1914                         expect_pac_attrs_pac_request=False,
1915                         expect_requester_sid=True)
1916
1917     def test_pac_attrs_renew_true(self):
1918         creds = self._get_creds()
1919         tgt = self.get_tgt(creds, pac_request=True,
1920                            expect_pac=True,
1921                            expect_pac_attrs=True,
1922                            expect_pac_attrs_pac_request=True)
1923         tgt = self._modify_tgt(tgt, renewable=True)
1924
1925         self._renew_tgt(tgt, expected_error=0,
1926                         expect_pac=True,
1927                         expect_pac_attrs=True,
1928                         expect_pac_attrs_pac_request=True,
1929                         expect_requester_sid=True)
1930
1931     def test_pac_attrs_rodc_renew_none(self):
1932         creds = self._get_creds(replication_allowed=True,
1933                                 revealed_to_rodc=True)
1934         tgt = self.get_tgt(creds, pac_request=None,
1935                            expect_pac=True,
1936                            expect_pac_attrs=True,
1937                            expect_pac_attrs_pac_request=None)
1938         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1939
1940         self._renew_tgt(tgt, expected_error=0,
1941                         expect_pac=True,
1942                         expect_pac_attrs=False,
1943                         expect_requester_sid=True)
1944
1945     def test_pac_attrs_rodc_renew_false(self):
1946         creds = self._get_creds(replication_allowed=True,
1947                                 revealed_to_rodc=True)
1948         tgt = self.get_tgt(creds, pac_request=False,
1949                            expect_pac=True,
1950                            expect_pac_attrs=True,
1951                            expect_pac_attrs_pac_request=False)
1952         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1953
1954         self._renew_tgt(tgt, expected_error=0,
1955                         expect_pac=True,
1956                         expect_pac_attrs=False,
1957                         expect_requester_sid=True)
1958
1959     def test_pac_attrs_rodc_renew_true(self):
1960         creds = self._get_creds(replication_allowed=True,
1961                                 revealed_to_rodc=True)
1962         tgt = self.get_tgt(creds, pac_request=True,
1963                            expect_pac=True,
1964                            expect_pac_attrs=True,
1965                            expect_pac_attrs_pac_request=True)
1966         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1967
1968         self._renew_tgt(tgt, expected_error=0,
1969                         expect_pac=True,
1970                         expect_pac_attrs=False,
1971                         expect_requester_sid=True)
1972
1973     def test_pac_attrs_missing_renew_none(self):
1974         creds = self._get_creds()
1975         tgt = self.get_tgt(creds, pac_request=None,
1976                            expect_pac=True,
1977                            expect_pac_attrs=True,
1978                            expect_pac_attrs_pac_request=None)
1979         tgt = self._modify_tgt(tgt, renewable=True,
1980                                remove_pac_attrs=True)
1981
1982         self._renew_tgt(tgt, expected_error=0,
1983                         expect_pac=True,
1984                         expect_pac_attrs=False,
1985                         expect_requester_sid=True)
1986
1987     def test_pac_attrs_missing_renew_false(self):
1988         creds = self._get_creds()
1989         tgt = self.get_tgt(creds, pac_request=False,
1990                            expect_pac=True,
1991                            expect_pac_attrs=True,
1992                            expect_pac_attrs_pac_request=False)
1993         tgt = self._modify_tgt(tgt, renewable=True,
1994                                remove_pac_attrs=True)
1995
1996         self._renew_tgt(tgt, expected_error=0,
1997                         expect_pac=True,
1998                         expect_pac_attrs=False,
1999                         expect_requester_sid=True)
2000
2001     def test_pac_attrs_missing_renew_true(self):
2002         creds = self._get_creds()
2003         tgt = self.get_tgt(creds, pac_request=True,
2004                            expect_pac=True,
2005                            expect_pac_attrs=True,
2006                            expect_pac_attrs_pac_request=True)
2007         tgt = self._modify_tgt(tgt, renewable=True,
2008                                remove_pac_attrs=True)
2009
2010         self._renew_tgt(tgt, expected_error=0,
2011                         expect_pac=True,
2012                         expect_pac_attrs=False,
2013                         expect_requester_sid=True)
2014
2015     def test_pac_attrs_missing_rodc_renew_none(self):
2016         creds = self._get_creds(replication_allowed=True,
2017                                 revealed_to_rodc=True)
2018         tgt = self.get_tgt(creds, pac_request=None,
2019                            expect_pac=True,
2020                            expect_pac_attrs=True,
2021                            expect_pac_attrs_pac_request=None)
2022         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2023                                remove_pac_attrs=True)
2024
2025         self._renew_tgt(tgt, expected_error=0,
2026                         expect_pac=True,
2027                         expect_pac_attrs=False,
2028                         expect_requester_sid=True)
2029
2030     def test_pac_attrs_missing_rodc_renew_false(self):
2031         creds = self._get_creds(replication_allowed=True,
2032                                 revealed_to_rodc=True)
2033         tgt = self.get_tgt(creds, pac_request=False,
2034                            expect_pac=True,
2035                            expect_pac_attrs=True,
2036                            expect_pac_attrs_pac_request=False)
2037         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2038                                remove_pac_attrs=True)
2039
2040         self._renew_tgt(tgt, expected_error=0,
2041                         expect_pac=True,
2042                         expect_pac_attrs=False,
2043                         expect_requester_sid=True)
2044
2045     def test_pac_attrs_missing_rodc_renew_true(self):
2046         creds = self._get_creds(replication_allowed=True,
2047                                 revealed_to_rodc=True)
2048         tgt = self.get_tgt(creds, pac_request=True,
2049                            expect_pac=True,
2050                            expect_pac_attrs=True,
2051                            expect_pac_attrs_pac_request=True)
2052         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2053                                remove_pac_attrs=True)
2054
2055         self._renew_tgt(tgt, expected_error=0,
2056                         expect_pac=True,
2057                         expect_pac_attrs=False,
2058                         expect_requester_sid=True)
2059
2060     def test_tgs_pac_attrs_none(self):
2061         creds = self._get_creds()
2062         tgt = self.get_tgt(creds, pac_request=None,
2063                            expect_pac=True,
2064                            expect_pac_attrs=True,
2065                            expect_pac_attrs_pac_request=None)
2066
2067         self._run_tgs(tgt, expected_error=0, expect_pac=True,
2068                       expect_pac_attrs=False)
2069
2070     def test_tgs_pac_attrs_false(self):
2071         creds = self._get_creds()
2072         tgt = self.get_tgt(creds, pac_request=False,
2073                            expect_pac=True,
2074                            expect_pac_attrs=True,
2075                            expect_pac_attrs_pac_request=False)
2076
2077         self._run_tgs(tgt, expected_error=0, expect_pac=False,
2078                       expect_pac_attrs=False)
2079
2080     def test_tgs_pac_attrs_true(self):
2081         creds = self._get_creds()
2082         tgt = self.get_tgt(creds, pac_request=True,
2083                            expect_pac=True,
2084                            expect_pac_attrs=True,
2085                            expect_pac_attrs_pac_request=True)
2086
2087         self._run_tgs(tgt, expected_error=0, expect_pac=True,
2088                       expect_pac_attrs=False)
2089
2090     def test_as_requester_sid(self):
2091         creds = self._get_creds()
2092
2093         samdb = self.get_samdb()
2094         sid = self.get_objectSid(samdb, creds.get_dn())
2095
2096         self.get_tgt(creds, pac_request=None,
2097                      expect_pac=True,
2098                      expected_sid=sid,
2099                      expect_requester_sid=True)
2100
2101     def test_tgs_requester_sid(self):
2102         creds = self._get_creds()
2103
2104         samdb = self.get_samdb()
2105         sid = self.get_objectSid(samdb, creds.get_dn())
2106
2107         tgt = self.get_tgt(creds, pac_request=None,
2108                            expect_pac=True,
2109                            expected_sid=sid,
2110                            expect_requester_sid=True)
2111
2112         self._run_tgs(tgt, expected_error=0, expect_pac=True,
2113                       expect_requester_sid=False)
2114
2115     def test_tgs_requester_sid_renew(self):
2116         creds = self._get_creds()
2117
2118         samdb = self.get_samdb()
2119         sid = self.get_objectSid(samdb, creds.get_dn())
2120
2121         tgt = self.get_tgt(creds, pac_request=None,
2122                            expect_pac=True,
2123                            expected_sid=sid,
2124                            expect_requester_sid=True)
2125         tgt = self._modify_tgt(tgt, renewable=True)
2126
2127         self._renew_tgt(tgt, expected_error=0, expect_pac=True,
2128                         expect_pac_attrs=True,
2129                         expect_pac_attrs_pac_request=None,
2130                         expected_sid=sid,
2131                         expect_requester_sid=True)
2132
2133     def test_tgs_requester_sid_rodc_renew(self):
2134         creds = self._get_creds(replication_allowed=True,
2135                                 revealed_to_rodc=True)
2136
2137         samdb = self.get_samdb()
2138         sid = self.get_objectSid(samdb, creds.get_dn())
2139
2140         tgt = self.get_tgt(creds, pac_request=None,
2141                            expect_pac=True,
2142                            expected_sid=sid,
2143                            expect_requester_sid=True)
2144         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
2145
2146         self._renew_tgt(tgt, expected_error=0, expect_pac=True,
2147                         expect_pac_attrs=False,
2148                         expected_sid=sid,
2149                         expect_requester_sid=True)
2150
2151     def test_tgs_requester_sid_missing_renew(self):
2152         creds = self._get_creds()
2153
2154         samdb = self.get_samdb()
2155         sid = self.get_objectSid(samdb, creds.get_dn())
2156
2157         tgt = self.get_tgt(creds, pac_request=None,
2158                            expect_pac=True,
2159                            expected_sid=sid,
2160                            expect_requester_sid=True)
2161         tgt = self._modify_tgt(tgt, renewable=True,
2162                                remove_requester_sid=True)
2163
2164         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2165
2166     def test_tgs_requester_sid_missing_rodc_renew(self):
2167         creds = self._get_creds(replication_allowed=True,
2168                                 revealed_to_rodc=True)
2169
2170         samdb = self.get_samdb()
2171         sid = self.get_objectSid(samdb, creds.get_dn())
2172
2173         tgt = self.get_tgt(creds, pac_request=None,
2174                            expect_pac=True,
2175                            expected_sid=sid,
2176                            expect_requester_sid=True)
2177         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2178                                remove_requester_sid=True)
2179
2180         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2181
2182     def test_tgs_requester_sid_validate(self):
2183         creds = self._get_creds()
2184
2185         samdb = self.get_samdb()
2186         sid = self.get_objectSid(samdb, creds.get_dn())
2187
2188         tgt = self.get_tgt(creds, pac_request=None,
2189                            expect_pac=True,
2190                            expected_sid=sid,
2191                            expect_requester_sid=True)
2192         tgt = self._modify_tgt(tgt, invalid=True)
2193
2194         self._validate_tgt(tgt, expected_error=0, expect_pac=True,
2195                            expect_pac_attrs=True,
2196                            expect_pac_attrs_pac_request=None,
2197                            expected_sid=sid,
2198                            expect_requester_sid=True)
2199
2200     def test_tgs_requester_sid_rodc_validate(self):
2201         creds = self._get_creds(replication_allowed=True,
2202                                 revealed_to_rodc=True)
2203
2204         samdb = self.get_samdb()
2205         sid = self.get_objectSid(samdb, creds.get_dn())
2206
2207         tgt = self.get_tgt(creds, pac_request=None,
2208                            expect_pac=True,
2209                            expected_sid=sid,
2210                            expect_requester_sid=True)
2211         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True)
2212
2213         self._validate_tgt(tgt, expected_error=0, expect_pac=True,
2214                            expect_pac_attrs=False,
2215                            expected_sid=sid,
2216                            expect_requester_sid=True)
2217
2218     def test_tgs_requester_sid_missing_validate(self):
2219         creds = self._get_creds()
2220
2221         samdb = self.get_samdb()
2222         sid = self.get_objectSid(samdb, creds.get_dn())
2223
2224         tgt = self.get_tgt(creds, pac_request=None,
2225                            expect_pac=True,
2226                            expected_sid=sid,
2227                            expect_requester_sid=True)
2228         tgt = self._modify_tgt(tgt, invalid=True,
2229                                remove_requester_sid=True)
2230
2231         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2232
2233     def test_tgs_requester_sid_missing_rodc_validate(self):
2234         creds = self._get_creds(replication_allowed=True,
2235                                 revealed_to_rodc=True)
2236
2237         samdb = self.get_samdb()
2238         sid = self.get_objectSid(samdb, creds.get_dn())
2239
2240         tgt = self.get_tgt(creds, pac_request=None,
2241                            expect_pac=True,
2242                            expected_sid=sid,
2243                            expect_requester_sid=True)
2244         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True,
2245                                remove_requester_sid=True)
2246
2247         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2248
2249     def test_tgs_pac_request_none(self):
2250         creds = self._get_creds()
2251         tgt = self.get_tgt(creds, pac_request=None)
2252
2253         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2254
2255         pac = self.get_ticket_pac(ticket)
2256         self.assertIsNotNone(pac)
2257
2258     def test_tgs_pac_request_false(self):
2259         creds = self._get_creds()
2260         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2261
2262         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2263
2264         pac = self.get_ticket_pac(ticket, expect_pac=False)
2265         self.assertIsNone(pac)
2266
2267     def test_tgs_pac_request_true(self):
2268         creds = self._get_creds()
2269         tgt = self.get_tgt(creds, pac_request=True)
2270
2271         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2272
2273         pac = self.get_ticket_pac(ticket)
2274         self.assertIsNotNone(pac)
2275
2276     def test_renew_pac_request_none(self):
2277         creds = self._get_creds()
2278         tgt = self.get_tgt(creds, pac_request=None)
2279         tgt = self._modify_tgt(tgt, renewable=True)
2280
2281         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2282                               expect_pac_attrs=True,
2283                               expect_pac_attrs_pac_request=None,
2284                               expect_requester_sid=True)
2285
2286         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2287
2288         pac = self.get_ticket_pac(ticket)
2289         self.assertIsNotNone(pac)
2290
2291     def test_renew_pac_request_false(self):
2292         creds = self._get_creds()
2293         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2294         tgt = self._modify_tgt(tgt, renewable=True)
2295
2296         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2297                               expect_pac_attrs=True,
2298                               expect_pac_attrs_pac_request=False,
2299                               expect_requester_sid=True)
2300
2301         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2302
2303         pac = self.get_ticket_pac(ticket, expect_pac=False)
2304         self.assertIsNone(pac)
2305
2306     def test_renew_pac_request_true(self):
2307         creds = self._get_creds()
2308         tgt = self.get_tgt(creds, pac_request=True)
2309         tgt = self._modify_tgt(tgt, renewable=True)
2310
2311         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2312                               expect_pac_attrs=True,
2313                               expect_pac_attrs_pac_request=True,
2314                               expect_requester_sid=True)
2315
2316         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2317
2318         pac = self.get_ticket_pac(ticket)
2319         self.assertIsNotNone(pac)
2320
2321     def test_rodc_renew_pac_request_none(self):
2322         creds = self._get_creds(replication_allowed=True,
2323                                 revealed_to_rodc=True)
2324         tgt = self.get_tgt(creds, pac_request=None)
2325         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2326
2327         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2328                               expect_pac_attrs=False,
2329                               expect_requester_sid=True)
2330
2331         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2332
2333         pac = self.get_ticket_pac(ticket)
2334         self.assertIsNotNone(pac)
2335
2336     def test_rodc_renew_pac_request_false(self):
2337         creds = self._get_creds(replication_allowed=True,
2338                                 revealed_to_rodc=True)
2339         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2340         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2341
2342         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2343                               expect_pac_attrs=False,
2344                               expect_requester_sid=True)
2345
2346         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2347
2348         pac = self.get_ticket_pac(ticket)
2349         self.assertIsNotNone(pac)
2350
2351     def test_rodc_renew_pac_request_true(self):
2352         creds = self._get_creds(replication_allowed=True,
2353                                 revealed_to_rodc=True)
2354         tgt = self.get_tgt(creds, pac_request=True)
2355         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2356
2357         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2358                               expect_pac_attrs=False,
2359                               expect_requester_sid=True)
2360
2361         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2362
2363         pac = self.get_ticket_pac(ticket)
2364         self.assertIsNotNone(pac)
2365
2366     def test_validate_pac_request_none(self):
2367         creds = self._get_creds()
2368         tgt = self.get_tgt(creds, pac_request=None)
2369         tgt = self._modify_tgt(tgt, invalid=True)
2370
2371         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2372                                  expect_pac_attrs=True,
2373                                  expect_pac_attrs_pac_request=None,
2374                                  expect_requester_sid=True)
2375
2376         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2377
2378         pac = self.get_ticket_pac(ticket)
2379         self.assertIsNotNone(pac)
2380
2381     def test_validate_pac_request_false(self):
2382         creds = self._get_creds()
2383         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2384         tgt = self._modify_tgt(tgt, invalid=True)
2385
2386         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2387                                  expect_pac_attrs=True,
2388                                  expect_pac_attrs_pac_request=False,
2389                                  expect_requester_sid=True)
2390
2391         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2392
2393         pac = self.get_ticket_pac(ticket, expect_pac=False)
2394         self.assertIsNone(pac)
2395
2396     def test_validate_pac_request_true(self):
2397         creds = self._get_creds()
2398         tgt = self.get_tgt(creds, pac_request=True)
2399         tgt = self._modify_tgt(tgt, invalid=True)
2400
2401         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2402                                  expect_pac_attrs=True,
2403                                  expect_pac_attrs_pac_request=True,
2404                                  expect_requester_sid=True)
2405
2406         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2407
2408         pac = self.get_ticket_pac(ticket)
2409         self.assertIsNotNone(pac)
2410
2411     def test_rodc_validate_pac_request_none(self):
2412         creds = self._get_creds(replication_allowed=True,
2413                                 revealed_to_rodc=True)
2414         tgt = self.get_tgt(creds, pac_request=None)
2415         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2416
2417         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2418                                  expect_pac_attrs=False,
2419                                  expect_requester_sid=True)
2420
2421         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2422
2423         pac = self.get_ticket_pac(ticket)
2424         self.assertIsNotNone(pac)
2425
2426     def test_rodc_validate_pac_request_false(self):
2427         creds = self._get_creds(replication_allowed=True,
2428                                 revealed_to_rodc=True)
2429         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2430         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2431
2432         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2433                                  expect_pac_attrs=False,
2434                                  expect_requester_sid=True)
2435
2436         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2437
2438         pac = self.get_ticket_pac(ticket)
2439         self.assertIsNotNone(pac)
2440
2441     def test_rodc_validate_pac_request_true(self):
2442         creds = self._get_creds(replication_allowed=True,
2443                                 revealed_to_rodc=True)
2444         tgt = self.get_tgt(creds, pac_request=True)
2445         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2446
2447         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2448                                  expect_pac_attrs=False,
2449                                  expect_requester_sid=True)
2450
2451         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2452
2453         pac = self.get_ticket_pac(ticket)
2454         self.assertIsNotNone(pac)
2455
2456     def test_s4u2self_pac_request_none(self):
2457         creds = self._get_creds()
2458         tgt = self.get_tgt(creds, pac_request=None)
2459
2460         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2461
2462         pac = self.get_ticket_pac(ticket)
2463         self.assertIsNotNone(pac)
2464
2465     def test_s4u2self_pac_request_false(self):
2466         creds = self._get_creds()
2467         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2468
2469         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2470
2471         pac = self.get_ticket_pac(ticket)
2472         self.assertIsNotNone(pac)
2473
2474     def test_s4u2self_pac_request_true(self):
2475         creds = self._get_creds()
2476         tgt = self.get_tgt(creds, pac_request=True)
2477
2478         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2479
2480         pac = self.get_ticket_pac(ticket)
2481         self.assertIsNotNone(pac)
2482
2483     def test_user2user_pac_request_none(self):
2484         creds = self._get_creds()
2485         tgt = self.get_tgt(creds, pac_request=None)
2486
2487         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2488
2489         pac = self.get_ticket_pac(ticket)
2490         self.assertIsNotNone(pac)
2491
2492     def test_user2user_pac_request_false(self):
2493         creds = self._get_creds()
2494         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2495
2496         ticket = self._user2user(tgt, creds, expected_error=0,
2497                                  expect_pac=True)
2498
2499         pac = self.get_ticket_pac(ticket, expect_pac=True)
2500         self.assertIsNotNone(pac)
2501
2502     def test_user2user_pac_request_true(self):
2503         creds = self._get_creds()
2504         tgt = self.get_tgt(creds, pac_request=True)
2505
2506         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2507
2508         pac = self.get_ticket_pac(ticket)
2509         self.assertIsNotNone(pac)
2510
2511     def test_user2user_user_pac_request_none(self):
2512         creds = self._get_creds()
2513         tgt = self.get_tgt(creds)
2514
2515         user_creds = self._get_mach_creds()
2516         user_tgt = self.get_tgt(user_creds, pac_request=None)
2517
2518         ticket = self._user2user(tgt, creds, expected_error=0,
2519                                  user_tgt=user_tgt, expect_pac=True)
2520
2521         pac = self.get_ticket_pac(ticket)
2522         self.assertIsNotNone(pac)
2523
2524     def test_user2user_user_pac_request_false(self):
2525         creds = self._get_creds()
2526         tgt = self.get_tgt(creds)
2527
2528         user_creds = self._get_mach_creds()
2529         user_tgt = self.get_tgt(user_creds, pac_request=False, expect_pac=None)
2530
2531         ticket = self._user2user(tgt, creds, expected_error=0,
2532                                  user_tgt=user_tgt, expect_pac=False)
2533
2534         pac = self.get_ticket_pac(ticket, expect_pac=False)
2535         self.assertIsNone(pac)
2536
2537     def test_user2user_user_pac_request_true(self):
2538         creds = self._get_creds()
2539         tgt = self.get_tgt(creds)
2540
2541         user_creds = self._get_mach_creds()
2542         user_tgt = self.get_tgt(user_creds, pac_request=True)
2543
2544         ticket = self._user2user(tgt, creds, expected_error=0,
2545                                  user_tgt=user_tgt, expect_pac=True)
2546
2547         pac = self.get_ticket_pac(ticket)
2548         self.assertIsNotNone(pac)
2549
2550     def test_fast_pac_request_none(self):
2551         creds = self._get_creds()
2552         tgt = self.get_tgt(creds, pac_request=None)
2553
2554         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2555
2556         pac = self.get_ticket_pac(ticket)
2557         self.assertIsNotNone(pac)
2558
2559     def test_fast_pac_request_false(self):
2560         creds = self._get_creds()
2561         tgt = self.get_tgt(creds, pac_request=False)
2562
2563         ticket = self._fast(tgt, creds, expected_error=0,
2564                             expect_pac=True)
2565
2566         pac = self.get_ticket_pac(ticket, expect_pac=True)
2567         self.assertIsNotNone(pac)
2568
2569     def test_fast_pac_request_true(self):
2570         creds = self._get_creds()
2571         tgt = self.get_tgt(creds, pac_request=True)
2572
2573         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2574
2575         pac = self.get_ticket_pac(ticket)
2576         self.assertIsNotNone(pac)
2577
2578     def test_tgs_rodc_pac_request_none(self):
2579         creds = self._get_creds(replication_allowed=True,
2580                                 revealed_to_rodc=True)
2581         tgt = self.get_tgt(creds, pac_request=None)
2582         tgt = self._modify_tgt(tgt, from_rodc=True)
2583
2584         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2585
2586         pac = self.get_ticket_pac(ticket)
2587         self.assertIsNotNone(pac)
2588
2589     def test_tgs_rodc_pac_request_false(self):
2590         creds = self._get_creds(replication_allowed=True,
2591                                 revealed_to_rodc=True)
2592         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2593         tgt = self._modify_tgt(tgt, from_rodc=True)
2594
2595         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2596
2597         pac = self.get_ticket_pac(ticket)
2598         self.assertIsNotNone(pac)
2599
2600     def test_tgs_rodc_pac_request_true(self):
2601         creds = self._get_creds(replication_allowed=True,
2602                                 revealed_to_rodc=True)
2603         tgt = self.get_tgt(creds, pac_request=True)
2604         tgt = self._modify_tgt(tgt, from_rodc=True)
2605
2606         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2607
2608         pac = self.get_ticket_pac(ticket)
2609         self.assertIsNotNone(pac)
2610
2611     def test_tgs_rename(self):
2612         creds = self.get_cached_creds(account_type=self.AccountType.USER,
2613                                       use_cache=False)
2614         tgt = self.get_tgt(creds)
2615
2616         # Rename the account.
2617         new_name = self.get_new_username()
2618
2619         samdb = self.get_samdb()
2620         msg = ldb.Message(creds.get_dn())
2621         msg['sAMAccountName'] = ldb.MessageElement(new_name,
2622                                                    ldb.FLAG_MOD_REPLACE,
2623                                                    'sAMAccountName')
2624         samdb.modify(msg)
2625
2626         self._run_tgs(tgt, expected_error=(KDC_ERR_TGT_REVOKED,
2627                                            KDC_ERR_C_PRINCIPAL_UNKNOWN))
2628
2629     def _modify_renewable(self, enc_part):
2630         # Set the renewable flag.
2631         enc_part = self.modify_ticket_flag(enc_part, 'renewable', value=True)
2632
2633         # Set the renew-till time to be in the future.
2634         renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
2635         enc_part['renew-till'] = renew_till
2636
2637         return enc_part
2638
2639     def _modify_invalid(self, enc_part):
2640         # Set the invalid flag.
2641         enc_part = self.modify_ticket_flag(enc_part, 'invalid', value=True)
2642
2643         # Set the ticket start time to be in the past.
2644         past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
2645         enc_part['starttime'] = past_time
2646
2647         return enc_part
2648
2649     def _get_tgt(self,
2650                  client_creds,
2651                  renewable=False,
2652                  invalid=False,
2653                  from_rodc=False,
2654                  new_rid=None,
2655                  remove_pac=False,
2656                  allow_empty_authdata=False,
2657                  can_modify_logon_info=True,
2658                  can_modify_requester_sid=True,
2659                  remove_pac_attrs=False,
2660                  remove_requester_sid=False,
2661                  etype=None,
2662                  cksum_etype=None):
2663         self.assertFalse(renewable and invalid)
2664
2665         if remove_pac:
2666             self.assertIsNone(new_rid)
2667
2668         tgt = self.get_tgt(client_creds)
2669
2670         return self._modify_tgt(
2671             tgt=tgt,
2672             renewable=renewable,
2673             invalid=invalid,
2674             from_rodc=from_rodc,
2675             new_rid=new_rid,
2676             remove_pac=remove_pac,
2677             allow_empty_authdata=allow_empty_authdata,
2678             can_modify_logon_info=can_modify_logon_info,
2679             can_modify_requester_sid=can_modify_requester_sid,
2680             remove_pac_attrs=remove_pac_attrs,
2681             remove_requester_sid=remove_requester_sid,
2682             etype=etype,
2683             cksum_etype=cksum_etype)
2684
2685     def _modify_tgt(self,
2686                     tgt,
2687                     renewable=False,
2688                     invalid=False,
2689                     from_rodc=False,
2690                     new_rid=None,
2691                     remove_pac=False,
2692                     allow_empty_authdata=False,
2693                     cname=None,
2694                     crealm=None,
2695                     can_modify_logon_info=True,
2696                     can_modify_requester_sid=True,
2697                     remove_pac_attrs=False,
2698                     remove_requester_sid=False,
2699                     etype=None,
2700                     cksum_etype=None):
2701         if from_rodc:
2702             krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2703         else:
2704             krbtgt_creds = self.get_krbtgt_creds()
2705
2706         if new_rid is not None or remove_requester_sid or remove_pac_attrs:
2707             def change_sid_fn(pac):
2708                 pac_buffers = pac.buffers
2709                 for pac_buffer in pac_buffers:
2710                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
2711                         if new_rid is not None and can_modify_logon_info:
2712                             logon_info = pac_buffer.info.info
2713
2714                             logon_info.info3.base.rid = new_rid
2715                     elif pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID:
2716                         if remove_requester_sid:
2717                             pac.num_buffers -= 1
2718                             pac_buffers.remove(pac_buffer)
2719                         elif new_rid is not None and can_modify_requester_sid:
2720                             requester_sid = pac_buffer.info
2721
2722                             samdb = self.get_samdb()
2723                             domain_sid = samdb.get_domain_sid()
2724
2725                             new_sid = f'{domain_sid}-{new_rid}'
2726
2727                             requester_sid.sid = security.dom_sid(new_sid)
2728                     elif pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO:
2729                         if remove_pac_attrs:
2730                             pac.num_buffers -= 1
2731                             pac_buffers.remove(pac_buffer)
2732
2733                 pac.buffers = pac_buffers
2734
2735                 return pac
2736         else:
2737             change_sid_fn = None
2738
2739         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds,
2740                                                          etype)
2741
2742         if remove_pac:
2743             checksum_keys = None
2744         else:
2745             if etype == cksum_etype:
2746                 cksum_key = krbtgt_key
2747             else:
2748                 cksum_key = self.TicketDecryptionKey_from_creds(krbtgt_creds,
2749                                                                 cksum_etype)
2750             checksum_keys = {
2751                 krb5pac.PAC_TYPE_KDC_CHECKSUM: cksum_key
2752             }
2753
2754         if renewable:
2755             flags_modify_fn = self._modify_renewable
2756         elif invalid:
2757             flags_modify_fn = self._modify_invalid
2758         else:
2759             flags_modify_fn = None
2760
2761         if cname is not None or crealm is not None:
2762             def modify_fn(enc_part):
2763                 if flags_modify_fn is not None:
2764                     enc_part = flags_modify_fn(enc_part)
2765
2766                 if cname is not None:
2767                     enc_part['cname'] = cname
2768
2769                 if crealm is not None:
2770                     enc_part['crealm'] = crealm
2771
2772                 return enc_part
2773         else:
2774             modify_fn = flags_modify_fn
2775
2776         if cname is not None:
2777             def modify_pac_fn(pac):
2778                 if change_sid_fn is not None:
2779                     pac = change_sid_fn(pac)
2780
2781                 for pac_buffer in pac.buffers:
2782                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2783                         logon_info = pac_buffer.info
2784
2785                         logon_info.account_name = (
2786                             cname['name-string'][0].decode('utf-8'))
2787
2788                 return pac
2789         else:
2790             modify_pac_fn = change_sid_fn
2791
2792         return self.modified_ticket(
2793             tgt,
2794             new_ticket_key=krbtgt_key,
2795             modify_fn=modify_fn,
2796             modify_pac_fn=modify_pac_fn,
2797             exclude_pac=remove_pac,
2798             allow_empty_authdata=allow_empty_authdata,
2799             update_pac_checksums=not remove_pac,
2800             checksum_keys=checksum_keys)
2801
2802     def _remove_rodc_partial_secrets(self):
2803         samdb = self.get_samdb()
2804
2805         rodc_ctx = self.get_mock_rodc_ctx()
2806         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2807
2808         def add_rodc_partial_secrets():
2809             msg = ldb.Message()
2810             msg.dn = rodc_dn
2811             msg['userAccountControl'] = ldb.MessageElement(
2812                 str(rodc_ctx.userAccountControl),
2813                 ldb.FLAG_MOD_REPLACE,
2814                 'userAccountControl')
2815             samdb.modify(msg)
2816
2817         self.addCleanup(add_rodc_partial_secrets)
2818
2819         uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
2820
2821         msg = ldb.Message()
2822         msg.dn = rodc_dn
2823         msg['userAccountControl'] = ldb.MessageElement(
2824             str(uac),
2825             ldb.FLAG_MOD_REPLACE,
2826             'userAccountControl')
2827         samdb.modify(msg)
2828
2829     def _remove_rodc_krbtgt_link(self):
2830         samdb = self.get_samdb()
2831
2832         rodc_ctx = self.get_mock_rodc_ctx()
2833         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2834
2835         def add_rodc_krbtgt_link():
2836             msg = ldb.Message()
2837             msg.dn = rodc_dn
2838             msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2839                 rodc_ctx.new_krbtgt_dn,
2840                 ldb.FLAG_MOD_ADD,
2841                 'msDS-KrbTgtLink')
2842             samdb.modify(msg)
2843
2844         self.addCleanup(add_rodc_krbtgt_link)
2845
2846         msg = ldb.Message()
2847         msg.dn = rodc_dn
2848         msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2849             [],
2850             ldb.FLAG_MOD_DELETE,
2851             'msDS-KrbTgtLink')
2852         samdb.modify(msg)
2853
2854     def _get_creds(self,
2855                    replication_allowed=False,
2856                    replication_denied=False,
2857                    revealed_to_rodc=False):
2858         return self.get_cached_creds(
2859             account_type=self.AccountType.COMPUTER,
2860             opts={
2861                 'allowed_replication_mock': replication_allowed,
2862                 'denied_replication_mock': replication_denied,
2863                 'revealed_to_mock_rodc': revealed_to_rodc,
2864                 'id': 0
2865             })
2866
2867     def _get_existing_rid(self,
2868                           replication_allowed=False,
2869                           replication_denied=False,
2870                           revealed_to_rodc=False):
2871         other_creds = self.get_cached_creds(
2872             account_type=self.AccountType.COMPUTER,
2873             opts={
2874                 'allowed_replication_mock': replication_allowed,
2875                 'denied_replication_mock': replication_denied,
2876                 'revealed_to_mock_rodc': revealed_to_rodc,
2877                 'id': 1
2878             })
2879
2880         samdb = self.get_samdb()
2881
2882         other_dn = other_creds.get_dn()
2883         other_sid = self.get_objectSid(samdb, other_dn)
2884
2885         other_rid = int(other_sid.rsplit('-', 1)[1])
2886
2887         return other_rid
2888
2889     def _get_mach_creds(self):
2890         return self.get_cached_creds(
2891             account_type=self.AccountType.COMPUTER,
2892             opts={
2893                 'allowed_replication_mock': True,
2894                 'denied_replication_mock': False,
2895                 'revealed_to_mock_rodc': True,
2896                 'id': 2
2897             })
2898
2899     def _get_non_existent_rid(self):
2900         return (1 << 30) - 1
2901
2902     def _run_tgs(self, tgt, expected_error, expect_pac=True,
2903                  expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2904                  expect_requester_sid=None, expected_sid=None,
2905                  expect_edata=False, expected_status=None):
2906         target_creds = self.get_service_creds()
2907         return self._tgs_req(
2908             tgt, expected_error, target_creds,
2909             expect_pac=expect_pac,
2910             expect_pac_attrs=expect_pac_attrs,
2911             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2912             expect_requester_sid=expect_requester_sid,
2913             expected_sid=expected_sid,
2914             expect_edata=expect_edata,
2915             expected_status=expected_status)
2916
2917     # These tests fail against Windows, which does not implement ticket
2918     # renewal.
2919     def _renew_tgt(self, tgt, expected_error, expect_pac=True,
2920                    expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2921                    expect_requester_sid=None, expected_sid=None):
2922         krbtgt_creds = self.get_krbtgt_creds()
2923         kdc_options = str(krb5_asn1.KDCOptions('renew'))
2924         return self._tgs_req(
2925             tgt, expected_error, krbtgt_creds,
2926             kdc_options=kdc_options,
2927             expect_pac=expect_pac,
2928             expect_pac_attrs=expect_pac_attrs,
2929             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2930             expect_requester_sid=expect_requester_sid,
2931             expected_sid=expected_sid)
2932
2933     # These tests fail against Windows, which does not implement ticket
2934     # validation.
2935     def _validate_tgt(self, tgt, expected_error, expect_pac=True,
2936                       expect_pac_attrs=None,
2937                       expect_pac_attrs_pac_request=None,
2938                       expect_requester_sid=None,
2939                       expected_sid=None):
2940         krbtgt_creds = self.get_krbtgt_creds()
2941         kdc_options = str(krb5_asn1.KDCOptions('validate'))
2942         return self._tgs_req(
2943             tgt, expected_error, krbtgt_creds,
2944             kdc_options=kdc_options,
2945             expect_pac=expect_pac,
2946             expect_pac_attrs=expect_pac_attrs,
2947             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2948             expect_requester_sid=expect_requester_sid,
2949             expected_sid=expected_sid)
2950
2951     def _s4u2self(self, tgt, tgt_creds, expected_error, expect_pac=True,
2952                   expect_edata=False, expected_status=None):
2953         user_creds = self._get_mach_creds()
2954
2955         user_name = user_creds.get_username()
2956         user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2957                                                names=[user_name])
2958         user_realm = user_creds.get_realm()
2959
2960         def generate_s4u2self_padata(_kdc_exchange_dict,
2961                                      _callback_dict,
2962                                      req_body):
2963             padata = self.PA_S4U2Self_create(
2964                 name=user_cname,
2965                 realm=user_realm,
2966                 tgt_session_key=tgt.session_key,
2967                 ctype=None)
2968
2969             return [padata], req_body
2970
2971         return self._tgs_req(tgt, expected_error, tgt_creds,
2972                              expected_cname=user_cname,
2973                              generate_padata_fn=generate_s4u2self_padata,
2974                              expect_claims=False, expect_edata=expect_edata,
2975                              expected_status=expected_status,
2976                              expect_pac=expect_pac)
2977
2978     def _user2user(self, tgt, tgt_creds, expected_error, sname=None,
2979                    srealm=None, user_tgt=None, expect_pac=True,
2980                    expected_status=None):
2981         if user_tgt is None:
2982             user_creds = self._get_mach_creds()
2983             user_tgt = self.get_tgt(user_creds)
2984
2985         kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
2986         return self._tgs_req(user_tgt, expected_error, tgt_creds,
2987                              kdc_options=kdc_options,
2988                              additional_ticket=tgt,
2989                              sname=sname,
2990                              srealm=srealm,
2991                              expect_pac=expect_pac,
2992                              expected_status=expected_status)
2993
2994     def _fast(self, armor_tgt, armor_tgt_creds, expected_error,
2995               expected_sname=None, expect_pac=True, expect_edata=False):
2996         user_creds = self._get_mach_creds()
2997         user_tgt = self.get_tgt(user_creds)
2998
2999         target_creds = self.get_service_creds()
3000
3001         return self._tgs_req(user_tgt, expected_error, target_creds,
3002                              armor_tgt=armor_tgt,
3003                              expected_sname=expected_sname,
3004                              expect_pac=expect_pac,
3005                              expect_edata=expect_edata)
3006
3007
3008 if __name__ == "__main__":
3009     global_asn1_print = False
3010     global_hexdump = False
3011     import unittest
3012     unittest.main()