CVE-2022-37967 Add new PAC checksum
[abartlet/samba-autobuild/.git] / python / samba / tests / krb5 / kdc_tgs_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
25
26 import ldb
27
28 from samba import dsdb
29
30 from samba.dcerpc import krb5pac, security
31
32
33 import samba.tests.krb5.kcrypto as kcrypto
34 from samba.tests.krb5.kdc_base_test import KDCBaseTest
35 from samba.tests.krb5.raw_testcase import Krb5EncryptionKey
36 from samba.tests.krb5.rfc4120_constants import (
37     AES256_CTS_HMAC_SHA1_96,
38     ARCFOUR_HMAC_MD5,
39     KRB_ERROR,
40     KRB_TGS_REP,
41     KDC_ERR_BADMATCH,
42     KDC_ERR_GENERIC,
43     KDC_ERR_MODIFIED,
44     KDC_ERR_NOT_US,
45     KDC_ERR_POLICY,
46     KDC_ERR_PREAUTH_REQUIRED,
47     KDC_ERR_C_PRINCIPAL_UNKNOWN,
48     KDC_ERR_S_PRINCIPAL_UNKNOWN,
49     KDC_ERR_TGT_REVOKED,
50     KRB_ERR_TKT_NYV,
51     KDC_ERR_WRONG_REALM,
52     NT_ENTERPRISE_PRINCIPAL,
53     NT_PRINCIPAL,
54     NT_SRV_INST,
55 )
56 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
57
58 global_asn1_print = False
59 global_hexdump = False
60
61
62 class KdcTgsBaseTests(KDCBaseTest):
63     def _as_req(self,
64                 creds,
65                 expected_error,
66                 target_creds,
67                 etype):
68         user_name = creds.get_username()
69         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
70                                           names=user_name.split('/'))
71
72         target_name = target_creds.get_username()
73         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
74                                           names=['host', target_name[:-1]])
75
76         if expected_error:
77             expected_sname = sname
78         else:
79             expected_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
80                                                        names=[target_name])
81
82         realm = creds.get_realm()
83         salt = creds.get_salt()
84
85         till = self.get_KerberosTime(offset=36000)
86
87         ticket_decryption_key = (
88             self.TicketDecryptionKey_from_creds(target_creds))
89         expected_etypes = target_creds.tgs_supported_enctypes
90
91         kdc_options = ('forwardable,'
92                        'renewable,'
93                        'canonicalize,'
94                        'renewable-ok')
95         kdc_options = krb5_asn1.KDCOptions(kdc_options)
96
97         if expected_error:
98             initial_error = (KDC_ERR_PREAUTH_REQUIRED, expected_error)
99         else:
100             initial_error = KDC_ERR_PREAUTH_REQUIRED
101
102         rep, kdc_exchange_dict = self._test_as_exchange(
103             cname=cname,
104             realm=realm,
105             sname=sname,
106             till=till,
107             client_as_etypes=etype,
108             expected_error_mode=initial_error,
109             expected_crealm=realm,
110             expected_cname=cname,
111             expected_srealm=realm,
112             expected_sname=sname,
113             expected_salt=salt,
114             expected_supported_etypes=expected_etypes,
115             etypes=etype,
116             padata=None,
117             kdc_options=kdc_options,
118             preauth_key=None,
119             ticket_decryption_key=ticket_decryption_key)
120         self.assertIsNotNone(rep)
121         self.assertEqual(KRB_ERROR, rep['msg-type'])
122         error_code = rep['error-code']
123         if expected_error:
124             self.assertIn(error_code, initial_error)
125             if error_code == expected_error:
126                 return
127         else:
128             self.assertEqual(initial_error, error_code)
129
130         etype_info2 = kdc_exchange_dict['preauth_etype_info2']
131
132         preauth_key = self.PasswordKey_from_etype_info2(creds,
133                                                         etype_info2[0],
134                                                         creds.get_kvno())
135
136         ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
137
138         padata = [ts_enc_padata]
139
140         expected_realm = realm.upper()
141
142         rep, kdc_exchange_dict = self._test_as_exchange(
143             cname=cname,
144             realm=realm,
145             sname=sname,
146             till=till,
147             client_as_etypes=etype,
148             expected_error_mode=expected_error,
149             expected_crealm=expected_realm,
150             expected_cname=cname,
151             expected_srealm=expected_realm,
152             expected_sname=expected_sname,
153             expected_salt=salt,
154             expected_supported_etypes=expected_etypes,
155             etypes=etype,
156             padata=padata,
157             kdc_options=kdc_options,
158             preauth_key=preauth_key,
159             ticket_decryption_key=ticket_decryption_key,
160             expect_edata=False)
161         if expected_error:
162             self.check_error_rep(rep, expected_error)
163             return None
164
165         self.check_as_reply(rep)
166         return kdc_exchange_dict['rep_ticket_creds']
167
168     def _tgs_req(self, tgt, expected_error, target_creds,
169                  armor_tgt=None,
170                  kdc_options='0',
171                  expected_cname=None,
172                  expected_sname=None,
173                  additional_ticket=None,
174                  generate_padata_fn=None,
175                  sname=None,
176                  srealm=None,
177                  use_fast=False,
178                  till=None,
179                  etypes=None,
180                  expect_pac=True,
181                  expect_pac_attrs=None,
182                  expect_pac_attrs_pac_request=None,
183                  expect_requester_sid=None,
184                  expect_edata=False,
185                  expected_sid=None,
186                  expected_status=None):
187         if srealm is False:
188             srealm = None
189         elif srealm is None:
190             srealm = target_creds.get_realm()
191
192         if sname is False:
193             sname = None
194             if expected_sname is None:
195                 expected_sname = self.get_krbtgt_sname()
196         else:
197             if sname is None:
198                 target_name = target_creds.get_username()
199                 if target_name == 'krbtgt':
200                     sname = self.PrincipalName_create(
201                         name_type=NT_SRV_INST,
202                         names=[target_name, srealm])
203                 else:
204                     if target_name[-1] == '$':
205                         target_name = target_name[:-1]
206                     sname = self.PrincipalName_create(
207                         name_type=NT_PRINCIPAL,
208                         names=['host', target_name])
209
210             if expected_sname is None:
211                 expected_sname = sname
212
213         if additional_ticket is not None:
214             additional_tickets = [additional_ticket.ticket]
215             decryption_key = additional_ticket.session_key
216         else:
217             additional_tickets = None
218             decryption_key = self.TicketDecryptionKey_from_creds(
219                 target_creds)
220
221         subkey = self.RandomKey(tgt.session_key.etype)
222
223         if armor_tgt is not None:
224             armor_subkey = self.RandomKey(subkey.etype)
225             explicit_armor_key = self.generate_armor_key(armor_subkey,
226                                                          armor_tgt.session_key)
227             armor_key = kcrypto.cf2(explicit_armor_key.key,
228                                     subkey.key,
229                                     b'explicitarmor',
230                                     b'tgsarmor')
231             armor_key = Krb5EncryptionKey(armor_key, None)
232
233             generate_fast_fn = self.generate_simple_fast
234             generate_fast_armor_fn = self.generate_ap_req
235
236             pac_options = '1'  # claims support
237         else:
238             armor_subkey = None
239             armor_key = None
240             generate_fast_fn = None
241             generate_fast_armor_fn = None
242
243             pac_options = None
244
245         if etypes is None:
246             etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
247
248         if expected_error:
249             check_error_fn = self.generic_check_kdc_error
250             check_rep_fn = None
251         else:
252             check_error_fn = None
253             check_rep_fn = self.generic_check_kdc_rep
254
255         if expected_cname is None:
256             expected_cname = tgt.cname
257
258         kdc_exchange_dict = self.tgs_exchange_dict(
259             expected_crealm=tgt.crealm,
260             expected_cname=expected_cname,
261             expected_srealm=srealm,
262             expected_sname=expected_sname,
263             ticket_decryption_key=decryption_key,
264             generate_padata_fn=generate_padata_fn,
265             generate_fast_fn=generate_fast_fn,
266             generate_fast_armor_fn=generate_fast_armor_fn,
267             check_error_fn=check_error_fn,
268             check_rep_fn=check_rep_fn,
269             check_kdc_private_fn=self.generic_check_kdc_private,
270             expected_error_mode=expected_error,
271             expected_status=expected_status,
272             tgt=tgt,
273             armor_key=armor_key,
274             armor_tgt=armor_tgt,
275             armor_subkey=armor_subkey,
276             pac_options=pac_options,
277             authenticator_subkey=subkey,
278             kdc_options=kdc_options,
279             expect_edata=expect_edata,
280             expect_pac=expect_pac,
281             expect_pac_attrs=expect_pac_attrs,
282             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
283             expect_requester_sid=expect_requester_sid,
284             expected_sid=expected_sid)
285
286         rep = self._generic_kdc_exchange(kdc_exchange_dict,
287                                          cname=None,
288                                          realm=srealm,
289                                          sname=sname,
290                                          till_time=till,
291                                          etypes=etypes,
292                                          additional_tickets=additional_tickets)
293         if expected_error:
294             self.check_error_rep(rep, expected_error)
295             return None
296         else:
297             self.check_reply(rep, KRB_TGS_REP)
298             return kdc_exchange_dict['rep_ticket_creds']
299
300
301 class KdcTgsTests(KdcTgsBaseTests):
302
303     def setUp(self):
304         super().setUp()
305         self.do_asn1_print = global_asn1_print
306         self.do_hexdump = global_hexdump
307
308     def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
309         ''' Try and obtain a ticket from the TGS, but supply a cname
310             that differs from that provided to the krbtgt
311         '''
312         # Create the user account
313         samdb = self.get_samdb()
314         user_name = "tsttktusr"
315         (uc, _) = self.create_account(samdb, user_name)
316         realm = uc.get_realm().lower()
317
318         # Do the initial AS-REQ, should get a pre-authentication required
319         # response
320         etype = (AES256_CTS_HMAC_SHA1_96,)
321         cname = self.PrincipalName_create(
322             name_type=NT_PRINCIPAL, names=[user_name])
323         sname = self.PrincipalName_create(
324             name_type=NT_SRV_INST, names=["krbtgt", realm])
325
326         rep = self.as_req(cname, sname, realm, etype)
327         self.check_pre_authentication(rep)
328
329         # Do the next AS-REQ
330         padata = self.get_enc_timestamp_pa_data(uc, rep)
331         key = self.get_as_rep_key(uc, rep)
332         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
333         self.check_as_reply(rep)
334
335         # Request a service ticket, but use a cname that does not match
336         # that in the original AS-REQ
337         enc_part2 = self.get_as_rep_enc_data(key, rep)
338         key = self.EncryptionKey_import(enc_part2['key'])
339         ticket = rep['ticket']
340
341         cname = self.PrincipalName_create(
342             name_type=NT_PRINCIPAL,
343             names=["Administrator"])
344         sname = self.PrincipalName_create(
345             name_type=NT_PRINCIPAL,
346             names=["host", samdb.host_dns_name()])
347
348         (rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype,
349                                        expected_error_mode=KDC_ERR_BADMATCH,
350                                        expect_edata=False)
351
352         self.assertIsNone(
353             enc_part,
354             "rep = {%s}, enc_part = {%s}" % (rep, enc_part))
355         self.assertEqual(KRB_ERROR, rep['msg-type'], "rep = {%s}" % rep)
356         self.assertEqual(
357             KDC_ERR_BADMATCH,
358             rep['error-code'],
359             "rep = {%s}" % rep)
360
361     def test_ldap_service_ticket(self):
362         '''Get a ticket to the ldap service
363         '''
364         # Create the user account
365         samdb = self.get_samdb()
366         user_name = "tsttktusr"
367         (uc, _) = self.create_account(samdb, user_name)
368         realm = uc.get_realm().lower()
369
370         # Do the initial AS-REQ, should get a pre-authentication required
371         # response
372         etype = (AES256_CTS_HMAC_SHA1_96,)
373         cname = self.PrincipalName_create(
374             name_type=NT_PRINCIPAL, names=[user_name])
375         sname = self.PrincipalName_create(
376             name_type=NT_SRV_INST, names=["krbtgt", realm])
377
378         rep = self.as_req(cname, sname, realm, etype)
379         self.check_pre_authentication(rep)
380
381         # Do the next AS-REQ
382         padata = self.get_enc_timestamp_pa_data(uc, rep)
383         key = self.get_as_rep_key(uc, rep)
384         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
385         self.check_as_reply(rep)
386
387         enc_part2 = self.get_as_rep_enc_data(key, rep)
388         key = self.EncryptionKey_import(enc_part2['key'])
389         ticket = rep['ticket']
390
391         # Request a ticket to the ldap service
392         sname = self.PrincipalName_create(
393             name_type=NT_SRV_INST,
394             names=["ldap", samdb.host_dns_name()])
395
396         (rep, _) = self.tgs_req(
397             cname, sname, uc.get_realm(), ticket, key, etype,
398             service_creds=self.get_dc_creds())
399
400         self.check_tgs_reply(rep)
401
402     def test_get_ticket_for_host_service_of_machine_account(self):
403
404         # Create a user and machine account for the test.
405         #
406         samdb = self.get_samdb()
407         user_name = "tsttktusr"
408         (uc, dn) = self.create_account(samdb, user_name)
409         (mc, _) = self.create_account(samdb, "tsttktmac",
410                                       account_type=self.AccountType.COMPUTER)
411         realm = uc.get_realm().lower()
412
413         # Do the initial AS-REQ, should get a pre-authentication required
414         # response
415         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
416         cname = self.PrincipalName_create(
417             name_type=NT_PRINCIPAL, names=[user_name])
418         sname = self.PrincipalName_create(
419             name_type=NT_SRV_INST, names=["krbtgt", realm])
420
421         rep = self.as_req(cname, sname, realm, etype)
422         self.check_pre_authentication(rep)
423
424         # Do the next AS-REQ
425         padata = self.get_enc_timestamp_pa_data(uc, rep)
426         key = self.get_as_rep_key(uc, rep)
427         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
428         self.check_as_reply(rep)
429
430         # Request a ticket to the host service on the machine account
431         ticket = rep['ticket']
432         enc_part2 = self.get_as_rep_enc_data(key, rep)
433         key = self.EncryptionKey_import(enc_part2['key'])
434         cname = self.PrincipalName_create(
435             name_type=NT_PRINCIPAL,
436             names=[user_name])
437         sname = self.PrincipalName_create(
438             name_type=NT_PRINCIPAL,
439             names=[mc.get_username()])
440
441         (rep, enc_part) = self.tgs_req(
442             cname, sname, uc.get_realm(), ticket, key, etype,
443             service_creds=mc)
444         self.check_tgs_reply(rep)
445
446         # Check the contents of the service ticket
447         ticket = rep['ticket']
448         enc_part = self.decode_service_ticket(mc, ticket)
449
450         pac_data = self.get_pac_data(enc_part['authorization-data'])
451         sid = self.get_objectSid(samdb, dn)
452         upn = "%s@%s" % (uc.get_username(), realm)
453         self.assertEqual(
454             uc.get_username(),
455             str(pac_data.account_name),
456             "rep = {%s},%s" % (rep, pac_data))
457         self.assertEqual(
458             uc.get_username(),
459             pac_data.logon_name,
460             "rep = {%s},%s" % (rep, pac_data))
461         self.assertEqual(
462             uc.get_realm(),
463             pac_data.domain_name,
464             "rep = {%s},%s" % (rep, pac_data))
465         self.assertEqual(
466             upn,
467             pac_data.upn,
468             "rep = {%s},%s" % (rep, pac_data))
469         self.assertEqual(
470             sid,
471             pac_data.account_sid,
472             "rep = {%s},%s" % (rep, pac_data))
473
474     def test_request(self):
475         client_creds = self.get_client_creds()
476         service_creds = self.get_service_creds()
477
478         tgt = self.get_tgt(client_creds)
479
480         pac = self.get_ticket_pac(tgt)
481         self.assertIsNotNone(pac)
482
483         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
484
485         pac = self.get_ticket_pac(ticket)
486         self.assertIsNotNone(pac)
487
488     def test_request_no_pac(self):
489         client_creds = self.get_client_creds()
490         service_creds = self.get_service_creds()
491
492         tgt = self.get_tgt(client_creds, pac_request=False)
493
494         pac = self.get_ticket_pac(tgt)
495         self.assertIsNotNone(pac)
496
497         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
498                                         pac_request=False, expect_pac=False)
499
500         pac = self.get_ticket_pac(ticket, expect_pac=False)
501         self.assertIsNone(pac)
502
503     def test_request_enterprise_canon(self):
504         upn = self.get_new_username()
505         client_creds = self.get_cached_creds(
506             account_type=self.AccountType.USER,
507             opts={'upn': upn})
508         service_creds = self.get_service_creds()
509
510         user_name = client_creds.get_username()
511         realm = client_creds.get_realm()
512         client_account = f'{user_name}@{realm}'
513
514         expected_cname = self.PrincipalName_create(
515             name_type=NT_PRINCIPAL,
516             names=[user_name])
517
518         kdc_options = 'canonicalize'
519
520         tgt = self.get_tgt(client_creds,
521                            client_account=client_account,
522                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
523                            expected_cname=expected_cname,
524                            expected_account_name=user_name,
525                            kdc_options=kdc_options)
526
527         self._make_tgs_request(
528             client_creds, service_creds, tgt,
529             client_account=client_account,
530             client_name_type=NT_ENTERPRISE_PRINCIPAL,
531             expected_cname=expected_cname,
532             expected_account_name=user_name,
533             kdc_options=kdc_options)
534
535     def test_request_enterprise_canon_case(self):
536         upn = self.get_new_username()
537         client_creds = self.get_cached_creds(
538             account_type=self.AccountType.USER,
539             opts={'upn': upn})
540         service_creds = self.get_service_creds()
541
542         user_name = client_creds.get_username()
543         realm = client_creds.get_realm().lower()
544         client_account = f'{user_name}@{realm}'
545
546         expected_cname = self.PrincipalName_create(
547             name_type=NT_PRINCIPAL,
548             names=[user_name])
549
550         kdc_options = 'canonicalize'
551
552         tgt = self.get_tgt(client_creds,
553                            client_account=client_account,
554                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
555                            expected_cname=expected_cname,
556                            expected_account_name=user_name,
557                            kdc_options=kdc_options)
558
559         self._make_tgs_request(
560             client_creds, service_creds, tgt,
561             client_account=client_account,
562             client_name_type=NT_ENTERPRISE_PRINCIPAL,
563             expected_cname=expected_cname,
564             expected_account_name=user_name,
565             kdc_options=kdc_options)
566
567     def test_request_enterprise_canon_mac(self):
568         upn = self.get_new_username()
569         client_creds = self.get_cached_creds(
570             account_type=self.AccountType.COMPUTER,
571             opts={'upn': upn})
572         service_creds = self.get_service_creds()
573
574         user_name = client_creds.get_username()
575         realm = client_creds.get_realm()
576         client_account = f'{user_name}@{realm}'
577
578         expected_cname = self.PrincipalName_create(
579             name_type=NT_PRINCIPAL,
580             names=[user_name])
581
582         kdc_options = 'canonicalize'
583
584         tgt = self.get_tgt(client_creds,
585                            client_account=client_account,
586                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
587                            expected_cname=expected_cname,
588                            expected_account_name=user_name,
589                            kdc_options=kdc_options)
590
591         self._make_tgs_request(
592             client_creds, service_creds, tgt,
593             client_account=client_account,
594             client_name_type=NT_ENTERPRISE_PRINCIPAL,
595             expected_cname=expected_cname,
596             expected_account_name=user_name,
597             kdc_options=kdc_options)
598
599     def test_request_enterprise_canon_case_mac(self):
600         upn = self.get_new_username()
601         client_creds = self.get_cached_creds(
602             account_type=self.AccountType.COMPUTER,
603             opts={'upn': upn})
604         service_creds = self.get_service_creds()
605
606         user_name = client_creds.get_username()
607         realm = client_creds.get_realm().lower()
608         client_account = f'{user_name}@{realm}'
609
610         expected_cname = self.PrincipalName_create(
611             name_type=NT_PRINCIPAL,
612             names=[user_name])
613
614         kdc_options = 'canonicalize'
615
616         tgt = self.get_tgt(client_creds,
617                            client_account=client_account,
618                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
619                            expected_cname=expected_cname,
620                            expected_account_name=user_name,
621                            kdc_options=kdc_options)
622
623         self._make_tgs_request(
624             client_creds, service_creds, tgt,
625             client_account=client_account,
626             client_name_type=NT_ENTERPRISE_PRINCIPAL,
627             expected_cname=expected_cname,
628             expected_account_name=user_name,
629             kdc_options=kdc_options)
630
631     def test_request_enterprise_no_canon(self):
632         upn = self.get_new_username()
633         client_creds = self.get_cached_creds(
634             account_type=self.AccountType.USER,
635             opts={'upn': upn})
636         service_creds = self.get_service_creds()
637
638         user_name = client_creds.get_username()
639         realm = client_creds.get_realm()
640         client_account = f'{user_name}@{realm}'
641
642         kdc_options = '0'
643
644         tgt = self.get_tgt(client_creds,
645                            client_account=client_account,
646                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
647                            expected_account_name=user_name,
648                            kdc_options=kdc_options)
649
650         self._make_tgs_request(
651             client_creds, service_creds, tgt,
652             client_account=client_account,
653             client_name_type=NT_ENTERPRISE_PRINCIPAL,
654             expected_account_name=user_name,
655             kdc_options=kdc_options)
656
657     def test_request_enterprise_no_canon_case(self):
658         upn = self.get_new_username()
659         client_creds = self.get_cached_creds(
660             account_type=self.AccountType.USER,
661             opts={'upn': upn})
662         service_creds = self.get_service_creds()
663
664         user_name = client_creds.get_username()
665         realm = client_creds.get_realm().lower()
666         client_account = f'{user_name}@{realm}'
667
668         kdc_options = '0'
669
670         tgt = self.get_tgt(client_creds,
671                            client_account=client_account,
672                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
673                            expected_account_name=user_name,
674                            kdc_options=kdc_options)
675
676         self._make_tgs_request(
677             client_creds, service_creds, tgt,
678             client_account=client_account,
679             client_name_type=NT_ENTERPRISE_PRINCIPAL,
680             expected_account_name=user_name,
681             kdc_options=kdc_options)
682
683     def test_request_enterprise_no_canon_mac(self):
684         upn = self.get_new_username()
685         client_creds = self.get_cached_creds(
686             account_type=self.AccountType.COMPUTER,
687             opts={'upn': upn})
688         service_creds = self.get_service_creds()
689
690         user_name = client_creds.get_username()
691         realm = client_creds.get_realm()
692         client_account = f'{user_name}@{realm}'
693
694         kdc_options = '0'
695
696         tgt = self.get_tgt(client_creds,
697                            client_account=client_account,
698                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
699                            expected_account_name=user_name,
700                            kdc_options=kdc_options)
701
702         self._make_tgs_request(
703             client_creds, service_creds, tgt,
704             client_account=client_account,
705             client_name_type=NT_ENTERPRISE_PRINCIPAL,
706             expected_account_name=user_name,
707             kdc_options=kdc_options)
708
709     def test_request_enterprise_no_canon_case_mac(self):
710         upn = self.get_new_username()
711         client_creds = self.get_cached_creds(
712             account_type=self.AccountType.COMPUTER,
713             opts={'upn': upn})
714         service_creds = self.get_service_creds()
715
716         user_name = client_creds.get_username()
717         realm = client_creds.get_realm().lower()
718         client_account = f'{user_name}@{realm}'
719
720         kdc_options = '0'
721
722         tgt = self.get_tgt(client_creds,
723                            client_account=client_account,
724                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
725                            expected_account_name=user_name,
726                            kdc_options=kdc_options)
727
728         self._make_tgs_request(
729             client_creds, service_creds, tgt,
730             client_account=client_account,
731             client_name_type=NT_ENTERPRISE_PRINCIPAL,
732             expected_account_name=user_name,
733             kdc_options=kdc_options)
734
735     def test_client_no_auth_data_required(self):
736         client_creds = self.get_cached_creds(
737             account_type=self.AccountType.USER,
738             opts={'no_auth_data_required': True})
739         service_creds = self.get_service_creds()
740
741         tgt = self.get_tgt(client_creds)
742
743         pac = self.get_ticket_pac(tgt)
744         self.assertIsNotNone(pac)
745
746         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
747
748         pac = self.get_ticket_pac(ticket)
749         self.assertIsNotNone(pac)
750
751     def test_no_pac_client_no_auth_data_required(self):
752         client_creds = self.get_cached_creds(
753             account_type=self.AccountType.USER,
754             opts={'no_auth_data_required': True})
755         service_creds = self.get_service_creds()
756
757         tgt = self.get_tgt(client_creds)
758
759         pac = self.get_ticket_pac(tgt)
760         self.assertIsNotNone(pac)
761
762         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
763                                         pac_request=False, expect_pac=True)
764
765         pac = self.get_ticket_pac(ticket)
766         self.assertIsNotNone(pac)
767
768     def test_service_no_auth_data_required(self):
769         client_creds = self.get_client_creds()
770         service_creds = self.get_cached_creds(
771             account_type=self.AccountType.COMPUTER,
772             opts={'no_auth_data_required': True})
773
774         tgt = self.get_tgt(client_creds)
775
776         pac = self.get_ticket_pac(tgt)
777         self.assertIsNotNone(pac)
778
779         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
780                                         expect_pac=False)
781
782         pac = self.get_ticket_pac(ticket, expect_pac=False)
783         self.assertIsNone(pac)
784
785     def test_no_pac_service_no_auth_data_required(self):
786         client_creds = self.get_client_creds()
787         service_creds = self.get_cached_creds(
788             account_type=self.AccountType.COMPUTER,
789             opts={'no_auth_data_required': True})
790
791         tgt = self.get_tgt(client_creds, pac_request=False)
792
793         pac = self.get_ticket_pac(tgt)
794         self.assertIsNotNone(pac)
795
796         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
797                                         pac_request=False, expect_pac=False)
798
799         pac = self.get_ticket_pac(ticket, expect_pac=False)
800         self.assertIsNone(pac)
801
802     def test_remove_pac_service_no_auth_data_required(self):
803         client_creds = self.get_client_creds()
804         service_creds = self.get_cached_creds(
805             account_type=self.AccountType.COMPUTER,
806             opts={'no_auth_data_required': True})
807
808         tgt = self.modified_ticket(self.get_tgt(client_creds),
809                                    exclude_pac=True)
810
811         pac = self.get_ticket_pac(tgt, expect_pac=False)
812         self.assertIsNone(pac)
813
814         self._make_tgs_request(client_creds, service_creds, tgt,
815                                expect_error=True)
816
817     def test_remove_pac_client_no_auth_data_required(self):
818         client_creds = self.get_cached_creds(
819             account_type=self.AccountType.USER,
820             opts={'no_auth_data_required': True})
821         service_creds = self.get_service_creds()
822
823         tgt = self.modified_ticket(self.get_tgt(client_creds),
824                                    exclude_pac=True)
825
826         pac = self.get_ticket_pac(tgt, expect_pac=False)
827         self.assertIsNone(pac)
828
829         self._make_tgs_request(client_creds, service_creds, tgt,
830                                expect_error=True)
831
832     def test_remove_pac(self):
833         client_creds = self.get_client_creds()
834         service_creds = self.get_service_creds()
835
836         tgt = self.modified_ticket(self.get_tgt(client_creds),
837                                    exclude_pac=True)
838
839         pac = self.get_ticket_pac(tgt, expect_pac=False)
840         self.assertIsNone(pac)
841
842         self._make_tgs_request(client_creds, service_creds, tgt,
843                                expect_error=True)
844
845     def test_upn_dns_info_ex_user(self):
846         client_creds = self.get_client_creds()
847         self._run_upn_dns_info_ex_test(client_creds)
848
849     def test_upn_dns_info_ex_mac(self):
850         mach_creds = self.get_mach_creds()
851         self._run_upn_dns_info_ex_test(mach_creds)
852
853     def test_upn_dns_info_ex_upn_user(self):
854         client_creds = self.get_cached_creds(
855             account_type=self.AccountType.USER,
856             opts={'upn': 'upn_dns_info_test_upn0@bar'})
857         self._run_upn_dns_info_ex_test(client_creds)
858
859     def test_upn_dns_info_ex_upn_mac(self):
860         mach_creds = self.get_cached_creds(
861             account_type=self.AccountType.COMPUTER,
862             opts={'upn': 'upn_dns_info_test_upn1@bar'})
863         self._run_upn_dns_info_ex_test(mach_creds)
864
865     def _run_upn_dns_info_ex_test(self, client_creds):
866         service_creds = self.get_service_creds()
867
868         samdb = self.get_samdb()
869         dn = client_creds.get_dn()
870
871         account_name = client_creds.get_username()
872         upn_name = client_creds.get_upn()
873         if upn_name is None:
874             realm = client_creds.get_realm().lower()
875             upn_name = f'{account_name}@{realm}'
876         sid = self.get_objectSid(samdb, dn)
877
878         tgt = self.get_tgt(client_creds,
879                            expected_account_name=account_name,
880                            expected_upn_name=upn_name,
881                            expected_sid=sid)
882
883         self._make_tgs_request(client_creds, service_creds, tgt,
884                                expected_account_name=account_name,
885                                expected_upn_name=upn_name,
886                                expected_sid=sid)
887
888     # Test making a TGS request.
889     def test_tgs_req(self):
890         creds = self._get_creds()
891         tgt = self._get_tgt(creds)
892         self._run_tgs(tgt, expected_error=0)
893
894     def test_renew_req(self):
895         creds = self._get_creds()
896         tgt = self._get_tgt(creds, renewable=True)
897         self._renew_tgt(tgt, expected_error=0,
898                         expect_pac_attrs=True,
899                         expect_pac_attrs_pac_request=True,
900                         expect_requester_sid=True)
901
902     def test_validate_req(self):
903         creds = self._get_creds()
904         tgt = self._get_tgt(creds, invalid=True)
905         self._validate_tgt(tgt, expected_error=0,
906                            expect_pac_attrs=True,
907                            expect_pac_attrs_pac_request=True,
908                            expect_requester_sid=True)
909
910     def test_s4u2self_req(self):
911         creds = self._get_creds()
912         tgt = self._get_tgt(creds)
913         self._s4u2self(tgt, creds, expected_error=0)
914
915     def test_user2user_req(self):
916         creds = self._get_creds()
917         tgt = self._get_tgt(creds)
918         self._user2user(tgt, creds, expected_error=0)
919
920     def test_fast_req(self):
921         creds = self._get_creds()
922         tgt = self._get_tgt(creds)
923         self._fast(tgt, creds, expected_error=0)
924
925     def test_tgs_req_invalid(self):
926         creds = self._get_creds()
927         tgt = self._get_tgt(creds, invalid=True)
928         self._run_tgs(tgt, expected_error=KRB_ERR_TKT_NYV)
929
930     def test_s4u2self_req_invalid(self):
931         creds = self._get_creds()
932         tgt = self._get_tgt(creds, invalid=True)
933         self._s4u2self(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
934
935     def test_user2user_req_invalid(self):
936         creds = self._get_creds()
937         tgt = self._get_tgt(creds, invalid=True)
938         self._user2user(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
939
940     def test_fast_req_invalid(self):
941         creds = self._get_creds()
942         tgt = self._get_tgt(creds, invalid=True)
943         self._fast(tgt, creds, expected_error=KRB_ERR_TKT_NYV,
944                    expected_sname=self.get_krbtgt_sname())
945
946     def test_tgs_req_no_requester_sid(self):
947         creds = self._get_creds()
948         tgt = self._get_tgt(creds, remove_requester_sid=True)
949
950         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
951
952     def test_tgs_req_no_pac_attrs(self):
953         creds = self._get_creds()
954         tgt = self._get_tgt(creds, remove_pac_attrs=True)
955
956         self._run_tgs(tgt, expected_error=0, expect_pac=True,
957                       expect_pac_attrs=False)
958
959     def test_tgs_req_from_rodc_no_requester_sid(self):
960         creds = self._get_creds(replication_allowed=True,
961                                 revealed_to_rodc=True)
962         tgt = self._get_tgt(creds, from_rodc=True, remove_requester_sid=True)
963
964         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
965
966     def test_tgs_req_from_rodc_no_pac_attrs(self):
967         creds = self._get_creds(replication_allowed=True,
968                                 revealed_to_rodc=True)
969         tgt = self._get_tgt(creds, from_rodc=True, remove_pac_attrs=True)
970         self._run_tgs(tgt, expected_error=0, expect_pac=True,
971                       expect_pac_attrs=False)
972
973     # Test making a request without a PAC.
974     def test_tgs_no_pac(self):
975         creds = self._get_creds()
976         tgt = self._get_tgt(creds, remove_pac=True)
977         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
978
979     def test_renew_no_pac(self):
980         creds = self._get_creds()
981         tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
982         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
983
984     def test_validate_no_pac(self):
985         creds = self._get_creds()
986         tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
987         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
988
989     def test_s4u2self_no_pac(self):
990         creds = self._get_creds()
991         tgt = self._get_tgt(creds, remove_pac=True)
992         self._s4u2self(tgt, creds,
993                        expected_error=KDC_ERR_TGT_REVOKED,
994                        expect_edata=False)
995
996     def test_user2user_no_pac(self):
997         creds = self._get_creds()
998         tgt = self._get_tgt(creds, remove_pac=True)
999         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1000
1001     def test_fast_no_pac(self):
1002         creds = self._get_creds()
1003         tgt = self._get_tgt(creds, remove_pac=True)
1004         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1005                    expected_sname=self.get_krbtgt_sname())
1006
1007     # Test making a request with authdata and without a PAC.
1008     def test_tgs_authdata_no_pac(self):
1009         creds = self._get_creds()
1010         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1011         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1012
1013     def test_renew_authdata_no_pac(self):
1014         creds = self._get_creds()
1015         tgt = self._get_tgt(creds, renewable=True, remove_pac=True,
1016                             allow_empty_authdata=True)
1017         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1018
1019     def test_validate_authdata_no_pac(self):
1020         creds = self._get_creds()
1021         tgt = self._get_tgt(creds, invalid=True, remove_pac=True,
1022                             allow_empty_authdata=True)
1023         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1024
1025     def test_s4u2self_authdata_no_pac(self):
1026         creds = self._get_creds()
1027         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1028         self._s4u2self(tgt, creds,
1029                        expected_error=KDC_ERR_TGT_REVOKED,
1030                        expect_edata=False)
1031
1032     def test_user2user_authdata_no_pac(self):
1033         creds = self._get_creds()
1034         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1035         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1036
1037     def test_fast_authdata_no_pac(self):
1038         creds = self._get_creds()
1039         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1040         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1041                    expected_sname=self.get_krbtgt_sname())
1042
1043     # Test changing the SID in the PAC to that of another account.
1044     def test_tgs_sid_mismatch_existing(self):
1045         creds = self._get_creds()
1046         existing_rid = self._get_existing_rid()
1047         tgt = self._get_tgt(creds, new_rid=existing_rid)
1048         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1049
1050     def test_renew_sid_mismatch_existing(self):
1051         creds = self._get_creds()
1052         existing_rid = self._get_existing_rid()
1053         tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
1054         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1055
1056     def test_validate_sid_mismatch_existing(self):
1057         creds = self._get_creds()
1058         existing_rid = self._get_existing_rid()
1059         tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
1060         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1061
1062     def test_s4u2self_sid_mismatch_existing(self):
1063         creds = self._get_creds()
1064         existing_rid = self._get_existing_rid()
1065         tgt = self._get_tgt(creds, new_rid=existing_rid)
1066         self._s4u2self(tgt, creds,
1067                        expected_error=KDC_ERR_TGT_REVOKED)
1068
1069     def test_user2user_sid_mismatch_existing(self):
1070         creds = self._get_creds()
1071         existing_rid = self._get_existing_rid()
1072         tgt = self._get_tgt(creds, new_rid=existing_rid)
1073         self._user2user(tgt, creds,
1074                         expected_error=KDC_ERR_TGT_REVOKED)
1075
1076     def test_fast_sid_mismatch_existing(self):
1077         creds = self._get_creds()
1078         existing_rid = self._get_existing_rid()
1079         tgt = self._get_tgt(creds, new_rid=existing_rid)
1080         self._fast(tgt, creds,
1081                    expected_error=KDC_ERR_TGT_REVOKED,
1082                    expected_sname=self.get_krbtgt_sname())
1083
1084     def test_requester_sid_mismatch_existing(self):
1085         creds = self._get_creds()
1086         existing_rid = self._get_existing_rid()
1087         tgt = self._get_tgt(creds, new_rid=existing_rid,
1088                             can_modify_logon_info=False)
1089         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1090
1091     def test_logon_info_sid_mismatch_existing(self):
1092         creds = self._get_creds()
1093         existing_rid = self._get_existing_rid()
1094         tgt = self._get_tgt(creds, new_rid=existing_rid,
1095                             can_modify_requester_sid=False)
1096         self._run_tgs(tgt, expected_error=0)
1097
1098     def test_logon_info_only_sid_mismatch_existing(self):
1099         creds = self._get_creds()
1100         existing_rid = self._get_existing_rid()
1101         tgt = self._get_tgt(creds, new_rid=existing_rid,
1102                             remove_requester_sid=True)
1103         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1104
1105     # Test changing the SID in the PAC to a non-existent one.
1106     def test_tgs_sid_mismatch_nonexisting(self):
1107         creds = self._get_creds()
1108         nonexistent_rid = self._get_non_existent_rid()
1109         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1110         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1111
1112     def test_renew_sid_mismatch_nonexisting(self):
1113         creds = self._get_creds()
1114         nonexistent_rid = self._get_non_existent_rid()
1115         tgt = self._get_tgt(creds, renewable=True,
1116                             new_rid=nonexistent_rid)
1117         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1118
1119     def test_validate_sid_mismatch_nonexisting(self):
1120         creds = self._get_creds()
1121         nonexistent_rid = self._get_non_existent_rid()
1122         tgt = self._get_tgt(creds, invalid=True,
1123                             new_rid=nonexistent_rid)
1124         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1125
1126     def test_s4u2self_sid_mismatch_nonexisting(self):
1127         creds = self._get_creds()
1128         nonexistent_rid = self._get_non_existent_rid()
1129         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1130         self._s4u2self(tgt, creds,
1131                        expected_error=KDC_ERR_TGT_REVOKED)
1132
1133     def test_user2user_sid_mismatch_nonexisting(self):
1134         creds = self._get_creds()
1135         nonexistent_rid = self._get_non_existent_rid()
1136         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1137         self._user2user(tgt, creds,
1138                         expected_error=KDC_ERR_TGT_REVOKED)
1139
1140     def test_fast_sid_mismatch_nonexisting(self):
1141         creds = self._get_creds()
1142         nonexistent_rid = self._get_non_existent_rid()
1143         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1144         self._fast(tgt, creds,
1145                    expected_error=KDC_ERR_TGT_REVOKED,
1146                    expected_sname=self.get_krbtgt_sname())
1147
1148     def test_requester_sid_mismatch_nonexisting(self):
1149         creds = self._get_creds()
1150         nonexistent_rid = self._get_non_existent_rid()
1151         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1152                             can_modify_logon_info=False)
1153         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1154
1155     def test_logon_info_sid_mismatch_nonexisting(self):
1156         creds = self._get_creds()
1157         nonexistent_rid = self._get_non_existent_rid()
1158         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1159                             can_modify_requester_sid=False)
1160         self._run_tgs(tgt, expected_error=0)
1161
1162     def test_logon_info_only_sid_mismatch_nonexisting(self):
1163         creds = self._get_creds()
1164         nonexistent_rid = self._get_non_existent_rid()
1165         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1166                             remove_requester_sid=True)
1167         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1168
1169     # Test with an RODC-issued ticket where the client is revealed to the RODC.
1170     def test_tgs_rodc_revealed(self):
1171         creds = self._get_creds(replication_allowed=True,
1172                                 revealed_to_rodc=True)
1173         tgt = self._get_tgt(creds, from_rodc=True)
1174         self._run_tgs(tgt, expected_error=0)
1175
1176     def test_renew_rodc_revealed(self):
1177         creds = self._get_creds(replication_allowed=True,
1178                                 revealed_to_rodc=True)
1179         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1180         self._renew_tgt(tgt, expected_error=0,
1181                         expect_pac_attrs=False,
1182                         expect_requester_sid=True)
1183
1184     def test_validate_rodc_revealed(self):
1185         creds = self._get_creds(replication_allowed=True,
1186                                 revealed_to_rodc=True)
1187         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1188         self._validate_tgt(tgt, expected_error=0,
1189                            expect_pac_attrs=False,
1190                            expect_requester_sid=True)
1191
1192     # This test fails on Windows, which gives KDC_ERR_C_PRINCIPAL_UNKNOWN when
1193     # attempting to use S4U2Self with a TGT from an RODC.
1194     def test_s4u2self_rodc_revealed(self):
1195         creds = self._get_creds(replication_allowed=True,
1196                                 revealed_to_rodc=True)
1197         tgt = self._get_tgt(creds, from_rodc=True)
1198         self._s4u2self(tgt, creds,
1199                        expected_error=KDC_ERR_C_PRINCIPAL_UNKNOWN)
1200
1201     def test_user2user_rodc_revealed(self):
1202         creds = self._get_creds(replication_allowed=True,
1203                                 revealed_to_rodc=True)
1204         tgt = self._get_tgt(creds, from_rodc=True)
1205         self._user2user(tgt, creds, expected_error=0)
1206
1207     # Test with an RODC-issued ticket where the SID in the PAC is changed to
1208     # that of another account.
1209     def test_tgs_rodc_sid_mismatch_existing(self):
1210         creds = self._get_creds(replication_allowed=True,
1211                                 revealed_to_rodc=True)
1212         existing_rid = self._get_existing_rid(replication_allowed=True,
1213                                               revealed_to_rodc=True)
1214         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1215         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1216
1217     def test_renew_rodc_sid_mismatch_existing(self):
1218         creds = self._get_creds(replication_allowed=True,
1219                                 revealed_to_rodc=True)
1220         existing_rid = self._get_existing_rid(replication_allowed=True,
1221                                               revealed_to_rodc=True)
1222         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1223                             new_rid=existing_rid)
1224         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1225
1226     def test_validate_rodc_sid_mismatch_existing(self):
1227         creds = self._get_creds(replication_allowed=True,
1228                                 revealed_to_rodc=True)
1229         existing_rid = self._get_existing_rid(replication_allowed=True,
1230                                        revealed_to_rodc=True)
1231         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1232                             new_rid=existing_rid)
1233         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1234
1235     def test_s4u2self_rodc_sid_mismatch_existing(self):
1236         creds = self._get_creds(replication_allowed=True,
1237                                 revealed_to_rodc=True)
1238         existing_rid = self._get_existing_rid(replication_allowed=True,
1239                                               revealed_to_rodc=True)
1240         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1241         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1242
1243     def test_user2user_rodc_sid_mismatch_existing(self):
1244         creds = self._get_creds(replication_allowed=True,
1245                                 revealed_to_rodc=True)
1246         existing_rid = self._get_existing_rid(replication_allowed=True,
1247                                               revealed_to_rodc=True)
1248         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1249         self._user2user(tgt, creds,
1250                         expected_error=KDC_ERR_TGT_REVOKED)
1251
1252     def test_fast_rodc_sid_mismatch_existing(self):
1253         creds = self._get_creds(replication_allowed=True,
1254                                 revealed_to_rodc=True)
1255         existing_rid = self._get_existing_rid(replication_allowed=True,
1256                                               revealed_to_rodc=True)
1257         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1258         self._fast(tgt, creds,
1259                    expected_error=KDC_ERR_TGT_REVOKED,
1260                    expected_sname=self.get_krbtgt_sname())
1261
1262     def test_tgs_rodc_requester_sid_mismatch_existing(self):
1263         creds = self._get_creds(replication_allowed=True,
1264                                 revealed_to_rodc=True)
1265         existing_rid = self._get_existing_rid(replication_allowed=True,
1266                                               revealed_to_rodc=True)
1267         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1268                             can_modify_logon_info=False)
1269         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1270
1271     def test_tgs_rodc_logon_info_sid_mismatch_existing(self):
1272         creds = self._get_creds(replication_allowed=True,
1273                                 revealed_to_rodc=True)
1274         existing_rid = self._get_existing_rid(replication_allowed=True,
1275                                               revealed_to_rodc=True)
1276         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1277                             can_modify_requester_sid=False)
1278         self._run_tgs(tgt, expected_error=0)
1279
1280     def test_tgs_rodc_logon_info_only_sid_mismatch_existing(self):
1281         creds = self._get_creds(replication_allowed=True,
1282                                 revealed_to_rodc=True)
1283         existing_rid = self._get_existing_rid(replication_allowed=True,
1284                                               revealed_to_rodc=True)
1285         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1286                             remove_requester_sid=True)
1287         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1288
1289     # Test with an RODC-issued ticket where the SID in the PAC is changed to a
1290     # non-existent one.
1291     def test_tgs_rodc_sid_mismatch_nonexisting(self):
1292         creds = self._get_creds(replication_allowed=True,
1293                                 revealed_to_rodc=True)
1294         nonexistent_rid = self._get_non_existent_rid()
1295         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1296         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1297
1298     def test_renew_rodc_sid_mismatch_nonexisting(self):
1299         creds = self._get_creds(replication_allowed=True,
1300                                 revealed_to_rodc=True)
1301         nonexistent_rid = self._get_non_existent_rid()
1302         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1303                             new_rid=nonexistent_rid)
1304         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1305
1306     def test_validate_rodc_sid_mismatch_nonexisting(self):
1307         creds = self._get_creds(replication_allowed=True,
1308                                 revealed_to_rodc=True)
1309         nonexistent_rid = self._get_non_existent_rid()
1310         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1311                             new_rid=nonexistent_rid)
1312         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1313
1314     def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
1315         creds = self._get_creds(replication_allowed=True,
1316                                 revealed_to_rodc=True)
1317         nonexistent_rid = self._get_non_existent_rid()
1318         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1319         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1320
1321     def test_user2user_rodc_sid_mismatch_nonexisting(self):
1322         creds = self._get_creds(replication_allowed=True,
1323                                 revealed_to_rodc=True)
1324         nonexistent_rid = self._get_non_existent_rid()
1325         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1326         self._user2user(tgt, creds,
1327                         expected_error=KDC_ERR_TGT_REVOKED)
1328
1329     def test_fast_rodc_sid_mismatch_nonexisting(self):
1330         creds = self._get_creds(replication_allowed=True,
1331                                 revealed_to_rodc=True)
1332         nonexistent_rid = self._get_non_existent_rid()
1333         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1334         self._fast(tgt, creds,
1335                    expected_error=KDC_ERR_TGT_REVOKED,
1336                    expected_sname=self.get_krbtgt_sname())
1337
1338     def test_tgs_rodc_requester_sid_mismatch_nonexisting(self):
1339         creds = self._get_creds(replication_allowed=True,
1340                                 revealed_to_rodc=True)
1341         nonexistent_rid = self._get_non_existent_rid()
1342         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1343                             can_modify_logon_info=False)
1344         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1345
1346     def test_tgs_rodc_logon_info_sid_mismatch_nonexisting(self):
1347         creds = self._get_creds(replication_allowed=True,
1348                                 revealed_to_rodc=True)
1349         nonexistent_rid = self._get_non_existent_rid()
1350         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1351                             can_modify_requester_sid=False)
1352         self._run_tgs(tgt, expected_error=0)
1353
1354     def test_tgs_rodc_logon_info_only_sid_mismatch_nonexisting(self):
1355         creds = self._get_creds(replication_allowed=True,
1356                                 revealed_to_rodc=True)
1357         nonexistent_rid = self._get_non_existent_rid()
1358         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1359                             remove_requester_sid=True)
1360         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1361
1362     # Test with an RODC-issued ticket where the client is not revealed to the
1363     # RODC.
1364     def test_tgs_rodc_not_revealed(self):
1365         creds = self._get_creds(replication_allowed=True)
1366         tgt = self._get_tgt(creds, from_rodc=True)
1367         # TODO: error code
1368         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1369
1370     def test_renew_rodc_not_revealed(self):
1371         creds = self._get_creds(replication_allowed=True)
1372         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1373         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1374
1375     def test_validate_rodc_not_revealed(self):
1376         creds = self._get_creds(replication_allowed=True)
1377         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1378         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1379
1380     def test_s4u2self_rodc_not_revealed(self):
1381         creds = self._get_creds(replication_allowed=True)
1382         tgt = self._get_tgt(creds, from_rodc=True)
1383         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1384
1385     def test_user2user_rodc_not_revealed(self):
1386         creds = self._get_creds(replication_allowed=True)
1387         tgt = self._get_tgt(creds, from_rodc=True)
1388         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1389
1390     # Test with an RODC-issued ticket where the RODC account does not have the
1391     # PARTIAL_SECRETS bit set.
1392     def test_tgs_rodc_no_partial_secrets(self):
1393         creds = self._get_creds(replication_allowed=True,
1394                                 revealed_to_rodc=True)
1395         tgt = self._get_tgt(creds, from_rodc=True)
1396         self._remove_rodc_partial_secrets()
1397         self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
1398
1399     def test_renew_rodc_no_partial_secrets(self):
1400         creds = self._get_creds(replication_allowed=True,
1401                                 revealed_to_rodc=True)
1402         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1403         self._remove_rodc_partial_secrets()
1404         self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
1405
1406     def test_validate_rodc_no_partial_secrets(self):
1407         creds = self._get_creds(replication_allowed=True,
1408                                 revealed_to_rodc=True)
1409         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1410         self._remove_rodc_partial_secrets()
1411         self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
1412
1413     def test_s4u2self_rodc_no_partial_secrets(self):
1414         creds = self._get_creds(replication_allowed=True,
1415                                 revealed_to_rodc=True)
1416         tgt = self._get_tgt(creds, from_rodc=True)
1417         self._remove_rodc_partial_secrets()
1418         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1419
1420     def test_user2user_rodc_no_partial_secrets(self):
1421         creds = self._get_creds(replication_allowed=True,
1422                                 revealed_to_rodc=True)
1423         tgt = self._get_tgt(creds, from_rodc=True)
1424         self._remove_rodc_partial_secrets()
1425         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1426
1427     def test_fast_rodc_no_partial_secrets(self):
1428         creds = self._get_creds(replication_allowed=True,
1429                                 revealed_to_rodc=True)
1430         tgt = self._get_tgt(creds, from_rodc=True)
1431         self._remove_rodc_partial_secrets()
1432         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1433                    expected_sname=self.get_krbtgt_sname())
1434
1435     # Test with an RODC-issued ticket where the RODC account does not have an
1436     # msDS-KrbTgtLink.
1437     def test_tgs_rodc_no_krbtgt_link(self):
1438         creds = self._get_creds(replication_allowed=True,
1439                                 revealed_to_rodc=True)
1440         tgt = self._get_tgt(creds, from_rodc=True)
1441         self._remove_rodc_krbtgt_link()
1442         self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
1443
1444     def test_renew_rodc_no_krbtgt_link(self):
1445         creds = self._get_creds(replication_allowed=True,
1446                                 revealed_to_rodc=True)
1447         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1448         self._remove_rodc_krbtgt_link()
1449         self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
1450
1451     def test_validate_rodc_no_krbtgt_link(self):
1452         creds = self._get_creds(replication_allowed=True,
1453                                 revealed_to_rodc=True)
1454         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1455         self._remove_rodc_krbtgt_link()
1456         self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
1457
1458     def test_s4u2self_rodc_no_krbtgt_link(self):
1459         creds = self._get_creds(replication_allowed=True,
1460                                 revealed_to_rodc=True)
1461         tgt = self._get_tgt(creds, from_rodc=True)
1462         self._remove_rodc_krbtgt_link()
1463         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1464
1465     def test_user2user_rodc_no_krbtgt_link(self):
1466         creds = self._get_creds(replication_allowed=True,
1467                                 revealed_to_rodc=True)
1468         tgt = self._get_tgt(creds, from_rodc=True)
1469         self._remove_rodc_krbtgt_link()
1470         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1471
1472     def test_fast_rodc_no_krbtgt_link(self):
1473         creds = self._get_creds(replication_allowed=True,
1474                                 revealed_to_rodc=True)
1475         tgt = self._get_tgt(creds, from_rodc=True)
1476         self._remove_rodc_krbtgt_link()
1477         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1478                    expected_sname=self.get_krbtgt_sname())
1479
1480     # Test with an RODC-issued ticket where the client is not allowed to
1481     # replicate to the RODC.
1482     def test_tgs_rodc_not_allowed(self):
1483         creds = self._get_creds(revealed_to_rodc=True)
1484         tgt = self._get_tgt(creds, from_rodc=True)
1485         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1486
1487     def test_renew_rodc_not_allowed(self):
1488         creds = self._get_creds(revealed_to_rodc=True)
1489         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1490         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1491
1492     def test_validate_rodc_not_allowed(self):
1493         creds = self._get_creds(revealed_to_rodc=True)
1494         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1495         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1496
1497     def test_s4u2self_rodc_not_allowed(self):
1498         creds = self._get_creds(revealed_to_rodc=True)
1499         tgt = self._get_tgt(creds, from_rodc=True)
1500         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1501
1502     def test_user2user_rodc_not_allowed(self):
1503         creds = self._get_creds(revealed_to_rodc=True)
1504         tgt = self._get_tgt(creds, from_rodc=True)
1505         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1506
1507     def test_fast_rodc_not_allowed(self):
1508         creds = self._get_creds(revealed_to_rodc=True)
1509         tgt = self._get_tgt(creds, from_rodc=True)
1510         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1511                    expected_sname=self.get_krbtgt_sname())
1512
1513     # Test with an RODC-issued ticket where the client is denied from
1514     # replicating to the RODC.
1515     def test_tgs_rodc_denied(self):
1516         creds = self._get_creds(replication_denied=True,
1517                                 revealed_to_rodc=True)
1518         tgt = self._get_tgt(creds, from_rodc=True)
1519         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1520
1521     def test_renew_rodc_denied(self):
1522         creds = self._get_creds(replication_denied=True,
1523                                 revealed_to_rodc=True)
1524         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1525         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1526
1527     def test_validate_rodc_denied(self):
1528         creds = self._get_creds(replication_denied=True,
1529                                 revealed_to_rodc=True)
1530         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1531         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1532
1533     def test_s4u2self_rodc_denied(self):
1534         creds = self._get_creds(replication_denied=True,
1535                                 revealed_to_rodc=True)
1536         tgt = self._get_tgt(creds, from_rodc=True)
1537         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1538
1539     def test_user2user_rodc_denied(self):
1540         creds = self._get_creds(replication_denied=True,
1541                                 revealed_to_rodc=True)
1542         tgt = self._get_tgt(creds, from_rodc=True)
1543         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1544
1545     def test_fast_rodc_denied(self):
1546         creds = self._get_creds(replication_denied=True,
1547                                 revealed_to_rodc=True)
1548         tgt = self._get_tgt(creds, from_rodc=True)
1549         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1550                    expected_sname=self.get_krbtgt_sname())
1551
1552     # Test with an RODC-issued ticket where the client is both allowed and
1553     # denied replicating to the RODC.
1554     def test_tgs_rodc_allowed_denied(self):
1555         creds = self._get_creds(replication_allowed=True,
1556                                 replication_denied=True,
1557                                 revealed_to_rodc=True)
1558         tgt = self._get_tgt(creds, from_rodc=True)
1559         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1560
1561     def test_renew_rodc_allowed_denied(self):
1562         creds = self._get_creds(replication_allowed=True,
1563                                 replication_denied=True,
1564                                 revealed_to_rodc=True)
1565         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1566         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1567
1568     def test_validate_rodc_allowed_denied(self):
1569         creds = self._get_creds(replication_allowed=True,
1570                                 replication_denied=True,
1571                                 revealed_to_rodc=True)
1572         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1573         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1574
1575     def test_s4u2self_rodc_allowed_denied(self):
1576         creds = self._get_creds(replication_allowed=True,
1577                                 replication_denied=True,
1578                                 revealed_to_rodc=True)
1579         tgt = self._get_tgt(creds, from_rodc=True)
1580         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1581
1582     def test_user2user_rodc_allowed_denied(self):
1583         creds = self._get_creds(replication_allowed=True,
1584                                 replication_denied=True,
1585                                 revealed_to_rodc=True)
1586         tgt = self._get_tgt(creds, from_rodc=True)
1587         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1588
1589     def test_fast_rodc_allowed_denied(self):
1590         creds = self._get_creds(replication_allowed=True,
1591                                 replication_denied=True,
1592                                 revealed_to_rodc=True)
1593         tgt = self._get_tgt(creds, from_rodc=True)
1594         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1595                    expected_sname=self.get_krbtgt_sname())
1596
1597     # Test making a TGS request with an RC4-encrypted TGT.
1598     def test_tgs_rc4(self):
1599         creds = self._get_creds()
1600         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1601         self._run_tgs(tgt, expected_error=KDC_ERR_GENERIC)
1602
1603     def test_renew_rc4(self):
1604         creds = self._get_creds()
1605         tgt = self._get_tgt(creds, renewable=True, etype=kcrypto.Enctype.RC4)
1606         self._renew_tgt(tgt, expected_error=KDC_ERR_GENERIC,
1607                         expect_pac_attrs=True,
1608                         expect_pac_attrs_pac_request=True,
1609                         expect_requester_sid=True)
1610
1611     def test_validate_rc4(self):
1612         creds = self._get_creds()
1613         tgt = self._get_tgt(creds, invalid=True, etype=kcrypto.Enctype.RC4)
1614         self._validate_tgt(tgt, expected_error=KDC_ERR_GENERIC,
1615                            expect_pac_attrs=True,
1616                            expect_pac_attrs_pac_request=True,
1617                            expect_requester_sid=True)
1618
1619     def test_s4u2self_rc4(self):
1620         creds = self._get_creds()
1621         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1622         self._s4u2self(tgt, creds, expected_error=KDC_ERR_GENERIC)
1623
1624     def test_user2user_rc4(self):
1625         creds = self._get_creds()
1626         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1627         self._user2user(tgt, creds, expected_error=KDC_ERR_GENERIC)
1628
1629     def test_fast_rc4(self):
1630         creds = self._get_creds()
1631         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1632         self._fast(tgt, creds, expected_error=KDC_ERR_GENERIC)
1633
1634     # Test user-to-user with incorrect service principal names.
1635     def test_user2user_matching_sname_host(self):
1636         creds = self._get_creds()
1637         tgt = self._get_tgt(creds)
1638
1639         user_name = creds.get_username()
1640         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1641                                           names=['host', user_name])
1642
1643         self._user2user(tgt, creds, sname=sname,
1644                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1645
1646     def test_user2user_matching_sname_no_host(self):
1647         creds = self._get_creds()
1648         tgt = self._get_tgt(creds)
1649
1650         user_name = creds.get_username()
1651         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1652                                           names=[user_name])
1653
1654         self._user2user(tgt, creds, sname=sname, expected_error=0)
1655
1656     def test_user2user_wrong_sname(self):
1657         creds = self._get_creds()
1658         tgt = self._get_tgt(creds)
1659
1660         other_creds = self._get_mach_creds()
1661         user_name = other_creds.get_username()
1662         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1663                                           names=[user_name])
1664
1665         self._user2user(tgt, creds, sname=sname,
1666                         expected_error=KDC_ERR_BADMATCH)
1667
1668     def test_user2user_other_sname(self):
1669         other_name = self.get_new_username()
1670         spn = f'host/{other_name}'
1671         creds = self.get_cached_creds(
1672             account_type=self.AccountType.COMPUTER,
1673             opts={'spn': spn})
1674         tgt = self._get_tgt(creds)
1675
1676         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1677                                           names=['host', other_name])
1678
1679         self._user2user(tgt, creds, sname=sname, expected_error=0)
1680
1681     def test_user2user_wrong_sname_krbtgt(self):
1682         creds = self._get_creds()
1683         tgt = self._get_tgt(creds)
1684
1685         sname = self.get_krbtgt_sname()
1686
1687         self._user2user(tgt, creds, sname=sname,
1688                         expected_error=KDC_ERR_BADMATCH)
1689
1690     def test_user2user_wrong_srealm(self):
1691         creds = self._get_creds()
1692         tgt = self._get_tgt(creds)
1693
1694         self._user2user(tgt, creds, srealm='OTHER.REALM',
1695                         expected_error=(KDC_ERR_WRONG_REALM,
1696                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1697
1698     def test_user2user_tgt_correct_realm(self):
1699         creds = self._get_creds()
1700         tgt = self._get_tgt(creds)
1701
1702         realm = creds.get_realm().encode('utf-8')
1703         tgt = self._modify_tgt(tgt, realm)
1704
1705         self._user2user(tgt, creds,
1706                         expected_error=0)
1707
1708     def test_user2user_tgt_wrong_realm(self):
1709         creds = self._get_creds()
1710         tgt = self._get_tgt(creds)
1711
1712         tgt = self._modify_tgt(tgt, b'OTHER.REALM')
1713
1714         self._user2user(tgt, creds,
1715                         expected_error=0)
1716
1717     def test_user2user_tgt_correct_cname(self):
1718         creds = self._get_creds()
1719         tgt = self._get_tgt(creds)
1720
1721         user_name = creds.get_username()
1722         user_name = user_name.encode('utf-8')
1723         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1724                                           names=[user_name])
1725
1726         tgt = self._modify_tgt(tgt, cname=cname)
1727
1728         self._user2user(tgt, creds, expected_error=0)
1729
1730     def test_user2user_tgt_other_cname(self):
1731         samdb = self.get_samdb()
1732
1733         other_name = self.get_new_username()
1734         upn = f'{other_name}@{samdb.domain_dns_name()}'
1735
1736         creds = self.get_cached_creds(
1737             account_type=self.AccountType.COMPUTER,
1738             opts={'upn': upn})
1739         tgt = self._get_tgt(creds)
1740
1741         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1742                                           names=[other_name.encode('utf-8')])
1743
1744         tgt = self._modify_tgt(tgt, cname=cname)
1745
1746         self._user2user(tgt, creds, expected_error=0)
1747
1748     def test_user2user_tgt_cname_host(self):
1749         creds = self._get_creds()
1750         tgt = self._get_tgt(creds)
1751
1752         user_name = creds.get_username()
1753         user_name = user_name.encode('utf-8')
1754         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1755                                           names=[b'host', user_name])
1756
1757         tgt = self._modify_tgt(tgt, cname=cname)
1758
1759         self._user2user(tgt, creds,
1760                         expected_error=(KDC_ERR_TGT_REVOKED,
1761                                         KDC_ERR_C_PRINCIPAL_UNKNOWN))
1762
1763     def test_user2user_non_existent_sname(self):
1764         creds = self._get_creds()
1765         tgt = self._get_tgt(creds)
1766
1767         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1768                                           names=['host', 'non_existent_user'])
1769
1770         self._user2user(tgt, creds, sname=sname,
1771                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1772
1773     def test_user2user_no_sname(self):
1774         creds = self._get_creds()
1775         tgt = self._get_tgt(creds)
1776
1777         self._user2user(tgt, creds, sname=False,
1778                         expected_error=(KDC_ERR_GENERIC,
1779                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1780
1781     def test_tgs_service_ticket(self):
1782         creds = self._get_creds()
1783         tgt = self._get_tgt(creds)
1784
1785         service_creds = self.get_service_creds()
1786         service_ticket = self.get_service_ticket(tgt, service_creds)
1787
1788         self._run_tgs(service_ticket,
1789                       expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1790
1791     def test_renew_service_ticket(self):
1792         creds = self._get_creds()
1793         tgt = self._get_tgt(creds)
1794
1795         service_creds = self.get_service_creds()
1796         service_ticket = self.get_service_ticket(tgt, service_creds)
1797
1798         service_ticket = self.modified_ticket(
1799             service_ticket,
1800             modify_fn=self._modify_renewable,
1801             checksum_keys=self.get_krbtgt_checksum_key())
1802
1803         self._renew_tgt(service_ticket,
1804                         expected_error=KDC_ERR_POLICY)
1805
1806     def test_validate_service_ticket(self):
1807         creds = self._get_creds()
1808         tgt = self._get_tgt(creds)
1809
1810         service_creds = self.get_service_creds()
1811         service_ticket = self.get_service_ticket(tgt, service_creds)
1812
1813         service_ticket = self.modified_ticket(
1814             service_ticket,
1815             modify_fn=self._modify_invalid,
1816             checksum_keys=self.get_krbtgt_checksum_key())
1817
1818         self._validate_tgt(service_ticket,
1819                            expected_error=KDC_ERR_POLICY)
1820
1821     def test_s4u2self_service_ticket(self):
1822         creds = self._get_creds()
1823         tgt = self._get_tgt(creds)
1824
1825         service_creds = self.get_service_creds()
1826         service_ticket = self.get_service_ticket(tgt, service_creds)
1827
1828         self._s4u2self(service_ticket, creds,
1829                        expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1830
1831     def test_user2user_service_ticket(self):
1832         creds = self._get_creds()
1833         tgt = self._get_tgt(creds)
1834
1835         service_creds = self.get_service_creds()
1836         service_ticket = self.get_service_ticket(tgt, service_creds)
1837
1838         self._user2user(service_ticket, creds,
1839                         expected_error=(KDC_ERR_MODIFIED, KDC_ERR_POLICY))
1840
1841     # Expected to fail against Windows, which does not produce an error.
1842     def test_fast_service_ticket(self):
1843         creds = self._get_creds()
1844         tgt = self._get_tgt(creds)
1845
1846         service_creds = self.get_service_creds()
1847         service_ticket = self.get_service_ticket(tgt, service_creds)
1848
1849         self._fast(service_ticket, creds,
1850                    expected_error=(KDC_ERR_POLICY,
1851                                    KDC_ERR_S_PRINCIPAL_UNKNOWN))
1852
1853     def test_pac_attrs_none(self):
1854         creds = self._get_creds()
1855         self.get_tgt(creds, pac_request=None,
1856                      expect_pac=True,
1857                      expect_pac_attrs=True,
1858                      expect_pac_attrs_pac_request=None)
1859
1860     def test_pac_attrs_false(self):
1861         creds = self._get_creds()
1862         self.get_tgt(creds, pac_request=False,
1863                      expect_pac=True,
1864                      expect_pac_attrs=True,
1865                      expect_pac_attrs_pac_request=False)
1866
1867     def test_pac_attrs_true(self):
1868         creds = self._get_creds()
1869         self.get_tgt(creds, pac_request=True,
1870                      expect_pac=True,
1871                      expect_pac_attrs=True,
1872                      expect_pac_attrs_pac_request=True)
1873
1874     def test_pac_attrs_renew_none(self):
1875         creds = self._get_creds()
1876         tgt = self.get_tgt(creds, pac_request=None,
1877                            expect_pac=True,
1878                            expect_pac_attrs=True,
1879                            expect_pac_attrs_pac_request=None)
1880         tgt = self._modify_tgt(tgt, renewable=True)
1881
1882         self._renew_tgt(tgt, expected_error=0,
1883                         expect_pac=True,
1884                         expect_pac_attrs=True,
1885                         expect_pac_attrs_pac_request=None,
1886                         expect_requester_sid=True)
1887
1888     def test_pac_attrs_renew_false(self):
1889         creds = self._get_creds()
1890         tgt = self.get_tgt(creds, pac_request=False,
1891                            expect_pac=True,
1892                            expect_pac_attrs=True,
1893                            expect_pac_attrs_pac_request=False)
1894         tgt = self._modify_tgt(tgt, renewable=True)
1895
1896         self._renew_tgt(tgt, expected_error=0,
1897                         expect_pac=True,
1898                         expect_pac_attrs=True,
1899                         expect_pac_attrs_pac_request=False,
1900                         expect_requester_sid=True)
1901
1902     def test_pac_attrs_renew_true(self):
1903         creds = self._get_creds()
1904         tgt = self.get_tgt(creds, pac_request=True,
1905                            expect_pac=True,
1906                            expect_pac_attrs=True,
1907                            expect_pac_attrs_pac_request=True)
1908         tgt = self._modify_tgt(tgt, renewable=True)
1909
1910         self._renew_tgt(tgt, expected_error=0,
1911                         expect_pac=True,
1912                         expect_pac_attrs=True,
1913                         expect_pac_attrs_pac_request=True,
1914                         expect_requester_sid=True)
1915
1916     def test_pac_attrs_rodc_renew_none(self):
1917         creds = self._get_creds(replication_allowed=True,
1918                                 revealed_to_rodc=True)
1919         tgt = self.get_tgt(creds, pac_request=None,
1920                            expect_pac=True,
1921                            expect_pac_attrs=True,
1922                            expect_pac_attrs_pac_request=None)
1923         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1924
1925         self._renew_tgt(tgt, expected_error=0,
1926                         expect_pac=True,
1927                         expect_pac_attrs=False,
1928                         expect_requester_sid=True)
1929
1930     def test_pac_attrs_rodc_renew_false(self):
1931         creds = self._get_creds(replication_allowed=True,
1932                                 revealed_to_rodc=True)
1933         tgt = self.get_tgt(creds, pac_request=False,
1934                            expect_pac=True,
1935                            expect_pac_attrs=True,
1936                            expect_pac_attrs_pac_request=False)
1937         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1938
1939         self._renew_tgt(tgt, expected_error=0,
1940                         expect_pac=True,
1941                         expect_pac_attrs=False,
1942                         expect_requester_sid=True)
1943
1944     def test_pac_attrs_rodc_renew_true(self):
1945         creds = self._get_creds(replication_allowed=True,
1946                                 revealed_to_rodc=True)
1947         tgt = self.get_tgt(creds, pac_request=True,
1948                            expect_pac=True,
1949                            expect_pac_attrs=True,
1950                            expect_pac_attrs_pac_request=True)
1951         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1952
1953         self._renew_tgt(tgt, expected_error=0,
1954                         expect_pac=True,
1955                         expect_pac_attrs=False,
1956                         expect_requester_sid=True)
1957
1958     def test_pac_attrs_missing_renew_none(self):
1959         creds = self._get_creds()
1960         tgt = self.get_tgt(creds, pac_request=None,
1961                            expect_pac=True,
1962                            expect_pac_attrs=True,
1963                            expect_pac_attrs_pac_request=None)
1964         tgt = self._modify_tgt(tgt, renewable=True,
1965                                remove_pac_attrs=True)
1966
1967         self._renew_tgt(tgt, expected_error=0,
1968                         expect_pac=True,
1969                         expect_pac_attrs=False,
1970                         expect_requester_sid=True)
1971
1972     def test_pac_attrs_missing_renew_false(self):
1973         creds = self._get_creds()
1974         tgt = self.get_tgt(creds, pac_request=False,
1975                            expect_pac=True,
1976                            expect_pac_attrs=True,
1977                            expect_pac_attrs_pac_request=False)
1978         tgt = self._modify_tgt(tgt, renewable=True,
1979                                remove_pac_attrs=True)
1980
1981         self._renew_tgt(tgt, expected_error=0,
1982                         expect_pac=True,
1983                         expect_pac_attrs=False,
1984                         expect_requester_sid=True)
1985
1986     def test_pac_attrs_missing_renew_true(self):
1987         creds = self._get_creds()
1988         tgt = self.get_tgt(creds, pac_request=True,
1989                            expect_pac=True,
1990                            expect_pac_attrs=True,
1991                            expect_pac_attrs_pac_request=True)
1992         tgt = self._modify_tgt(tgt, renewable=True,
1993                                remove_pac_attrs=True)
1994
1995         self._renew_tgt(tgt, expected_error=0,
1996                         expect_pac=True,
1997                         expect_pac_attrs=False,
1998                         expect_requester_sid=True)
1999
2000     def test_pac_attrs_missing_rodc_renew_none(self):
2001         creds = self._get_creds(replication_allowed=True,
2002                                 revealed_to_rodc=True)
2003         tgt = self.get_tgt(creds, pac_request=None,
2004                            expect_pac=True,
2005                            expect_pac_attrs=True,
2006                            expect_pac_attrs_pac_request=None)
2007         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2008                                remove_pac_attrs=True)
2009
2010         self._renew_tgt(tgt, expected_error=0,
2011                         expect_pac=True,
2012                         expect_pac_attrs=False,
2013                         expect_requester_sid=True)
2014
2015     def test_pac_attrs_missing_rodc_renew_false(self):
2016         creds = self._get_creds(replication_allowed=True,
2017                                 revealed_to_rodc=True)
2018         tgt = self.get_tgt(creds, pac_request=False,
2019                            expect_pac=True,
2020                            expect_pac_attrs=True,
2021                            expect_pac_attrs_pac_request=False)
2022         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2023                                remove_pac_attrs=True)
2024
2025         self._renew_tgt(tgt, expected_error=0,
2026                         expect_pac=True,
2027                         expect_pac_attrs=False,
2028                         expect_requester_sid=True)
2029
2030     def test_pac_attrs_missing_rodc_renew_true(self):
2031         creds = self._get_creds(replication_allowed=True,
2032                                 revealed_to_rodc=True)
2033         tgt = self.get_tgt(creds, pac_request=True,
2034                            expect_pac=True,
2035                            expect_pac_attrs=True,
2036                            expect_pac_attrs_pac_request=True)
2037         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2038                                remove_pac_attrs=True)
2039
2040         self._renew_tgt(tgt, expected_error=0,
2041                         expect_pac=True,
2042                         expect_pac_attrs=False,
2043                         expect_requester_sid=True)
2044
2045     def test_tgs_pac_attrs_none(self):
2046         creds = self._get_creds()
2047         tgt = self.get_tgt(creds, pac_request=None,
2048                            expect_pac=True,
2049                            expect_pac_attrs=True,
2050                            expect_pac_attrs_pac_request=None)
2051
2052         self._run_tgs(tgt, expected_error=0, expect_pac=True,
2053                       expect_pac_attrs=False)
2054
2055     def test_tgs_pac_attrs_false(self):
2056         creds = self._get_creds()
2057         tgt = self.get_tgt(creds, pac_request=False,
2058                            expect_pac=True,
2059                            expect_pac_attrs=True,
2060                            expect_pac_attrs_pac_request=False)
2061
2062         self._run_tgs(tgt, expected_error=0, expect_pac=False,
2063                       expect_pac_attrs=False)
2064
2065     def test_tgs_pac_attrs_true(self):
2066         creds = self._get_creds()
2067         tgt = self.get_tgt(creds, pac_request=True,
2068                            expect_pac=True,
2069                            expect_pac_attrs=True,
2070                            expect_pac_attrs_pac_request=True)
2071
2072         self._run_tgs(tgt, expected_error=0, expect_pac=True,
2073                       expect_pac_attrs=False)
2074
2075     def test_as_requester_sid(self):
2076         creds = self._get_creds()
2077
2078         samdb = self.get_samdb()
2079         sid = self.get_objectSid(samdb, creds.get_dn())
2080
2081         self.get_tgt(creds, pac_request=None,
2082                      expect_pac=True,
2083                      expected_sid=sid,
2084                      expect_requester_sid=True)
2085
2086     def test_tgs_requester_sid(self):
2087         creds = self._get_creds()
2088
2089         samdb = self.get_samdb()
2090         sid = self.get_objectSid(samdb, creds.get_dn())
2091
2092         tgt = self.get_tgt(creds, pac_request=None,
2093                            expect_pac=True,
2094                            expected_sid=sid,
2095                            expect_requester_sid=True)
2096
2097         self._run_tgs(tgt, expected_error=0, expect_pac=True,
2098                       expect_requester_sid=False)
2099
2100     def test_tgs_requester_sid_renew(self):
2101         creds = self._get_creds()
2102
2103         samdb = self.get_samdb()
2104         sid = self.get_objectSid(samdb, creds.get_dn())
2105
2106         tgt = self.get_tgt(creds, pac_request=None,
2107                            expect_pac=True,
2108                            expected_sid=sid,
2109                            expect_requester_sid=True)
2110         tgt = self._modify_tgt(tgt, renewable=True)
2111
2112         self._renew_tgt(tgt, expected_error=0, expect_pac=True,
2113                         expect_pac_attrs=True,
2114                         expect_pac_attrs_pac_request=None,
2115                         expected_sid=sid,
2116                         expect_requester_sid=True)
2117
2118     def test_tgs_requester_sid_rodc_renew(self):
2119         creds = self._get_creds(replication_allowed=True,
2120                                 revealed_to_rodc=True)
2121
2122         samdb = self.get_samdb()
2123         sid = self.get_objectSid(samdb, creds.get_dn())
2124
2125         tgt = self.get_tgt(creds, pac_request=None,
2126                            expect_pac=True,
2127                            expected_sid=sid,
2128                            expect_requester_sid=True)
2129         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
2130
2131         self._renew_tgt(tgt, expected_error=0, expect_pac=True,
2132                         expect_pac_attrs=False,
2133                         expected_sid=sid,
2134                         expect_requester_sid=True)
2135
2136     def test_tgs_requester_sid_missing_renew(self):
2137         creds = self._get_creds()
2138
2139         samdb = self.get_samdb()
2140         sid = self.get_objectSid(samdb, creds.get_dn())
2141
2142         tgt = self.get_tgt(creds, pac_request=None,
2143                            expect_pac=True,
2144                            expected_sid=sid,
2145                            expect_requester_sid=True)
2146         tgt = self._modify_tgt(tgt, renewable=True,
2147                                remove_requester_sid=True)
2148
2149         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2150
2151     def test_tgs_requester_sid_missing_rodc_renew(self):
2152         creds = self._get_creds(replication_allowed=True,
2153                                 revealed_to_rodc=True)
2154
2155         samdb = self.get_samdb()
2156         sid = self.get_objectSid(samdb, creds.get_dn())
2157
2158         tgt = self.get_tgt(creds, pac_request=None,
2159                            expect_pac=True,
2160                            expected_sid=sid,
2161                            expect_requester_sid=True)
2162         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2163                                remove_requester_sid=True)
2164
2165         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2166
2167     def test_tgs_requester_sid_validate(self):
2168         creds = self._get_creds()
2169
2170         samdb = self.get_samdb()
2171         sid = self.get_objectSid(samdb, creds.get_dn())
2172
2173         tgt = self.get_tgt(creds, pac_request=None,
2174                            expect_pac=True,
2175                            expected_sid=sid,
2176                            expect_requester_sid=True)
2177         tgt = self._modify_tgt(tgt, invalid=True)
2178
2179         self._validate_tgt(tgt, expected_error=0, expect_pac=True,
2180                            expect_pac_attrs=True,
2181                            expect_pac_attrs_pac_request=None,
2182                            expected_sid=sid,
2183                            expect_requester_sid=True)
2184
2185     def test_tgs_requester_sid_rodc_validate(self):
2186         creds = self._get_creds(replication_allowed=True,
2187                                 revealed_to_rodc=True)
2188
2189         samdb = self.get_samdb()
2190         sid = self.get_objectSid(samdb, creds.get_dn())
2191
2192         tgt = self.get_tgt(creds, pac_request=None,
2193                            expect_pac=True,
2194                            expected_sid=sid,
2195                            expect_requester_sid=True)
2196         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True)
2197
2198         self._validate_tgt(tgt, expected_error=0, expect_pac=True,
2199                            expect_pac_attrs=False,
2200                            expected_sid=sid,
2201                            expect_requester_sid=True)
2202
2203     def test_tgs_requester_sid_missing_validate(self):
2204         creds = self._get_creds()
2205
2206         samdb = self.get_samdb()
2207         sid = self.get_objectSid(samdb, creds.get_dn())
2208
2209         tgt = self.get_tgt(creds, pac_request=None,
2210                            expect_pac=True,
2211                            expected_sid=sid,
2212                            expect_requester_sid=True)
2213         tgt = self._modify_tgt(tgt, invalid=True,
2214                                remove_requester_sid=True)
2215
2216         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2217
2218     def test_tgs_requester_sid_missing_rodc_validate(self):
2219         creds = self._get_creds(replication_allowed=True,
2220                                 revealed_to_rodc=True)
2221
2222         samdb = self.get_samdb()
2223         sid = self.get_objectSid(samdb, creds.get_dn())
2224
2225         tgt = self.get_tgt(creds, pac_request=None,
2226                            expect_pac=True,
2227                            expected_sid=sid,
2228                            expect_requester_sid=True)
2229         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True,
2230                                remove_requester_sid=True)
2231
2232         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2233
2234     def test_tgs_pac_request_none(self):
2235         creds = self._get_creds()
2236         tgt = self.get_tgt(creds, pac_request=None)
2237
2238         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2239
2240         pac = self.get_ticket_pac(ticket)
2241         self.assertIsNotNone(pac)
2242
2243     def test_tgs_pac_request_false(self):
2244         creds = self._get_creds()
2245         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2246
2247         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2248
2249         pac = self.get_ticket_pac(ticket, expect_pac=False)
2250         self.assertIsNone(pac)
2251
2252     def test_tgs_pac_request_true(self):
2253         creds = self._get_creds()
2254         tgt = self.get_tgt(creds, pac_request=True)
2255
2256         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2257
2258         pac = self.get_ticket_pac(ticket)
2259         self.assertIsNotNone(pac)
2260
2261     def test_renew_pac_request_none(self):
2262         creds = self._get_creds()
2263         tgt = self.get_tgt(creds, pac_request=None)
2264         tgt = self._modify_tgt(tgt, renewable=True)
2265
2266         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2267                               expect_pac_attrs=True,
2268                               expect_pac_attrs_pac_request=None,
2269                               expect_requester_sid=True)
2270
2271         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2272
2273         pac = self.get_ticket_pac(ticket)
2274         self.assertIsNotNone(pac)
2275
2276     def test_renew_pac_request_false(self):
2277         creds = self._get_creds()
2278         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2279         tgt = self._modify_tgt(tgt, renewable=True)
2280
2281         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2282                               expect_pac_attrs=True,
2283                               expect_pac_attrs_pac_request=False,
2284                               expect_requester_sid=True)
2285
2286         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2287
2288         pac = self.get_ticket_pac(ticket, expect_pac=False)
2289         self.assertIsNone(pac)
2290
2291     def test_renew_pac_request_true(self):
2292         creds = self._get_creds()
2293         tgt = self.get_tgt(creds, pac_request=True)
2294         tgt = self._modify_tgt(tgt, renewable=True)
2295
2296         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2297                               expect_pac_attrs=True,
2298                               expect_pac_attrs_pac_request=True,
2299                               expect_requester_sid=True)
2300
2301         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2302
2303         pac = self.get_ticket_pac(ticket)
2304         self.assertIsNotNone(pac)
2305
2306     def test_rodc_renew_pac_request_none(self):
2307         creds = self._get_creds(replication_allowed=True,
2308                                 revealed_to_rodc=True)
2309         tgt = self.get_tgt(creds, pac_request=None)
2310         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2311
2312         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2313                               expect_pac_attrs=False,
2314                               expect_requester_sid=True)
2315
2316         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2317
2318         pac = self.get_ticket_pac(ticket)
2319         self.assertIsNotNone(pac)
2320
2321     def test_rodc_renew_pac_request_false(self):
2322         creds = self._get_creds(replication_allowed=True,
2323                                 revealed_to_rodc=True)
2324         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2325         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2326
2327         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2328                               expect_pac_attrs=False,
2329                               expect_requester_sid=True)
2330
2331         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2332
2333         pac = self.get_ticket_pac(ticket)
2334         self.assertIsNotNone(pac)
2335
2336     def test_rodc_renew_pac_request_true(self):
2337         creds = self._get_creds(replication_allowed=True,
2338                                 revealed_to_rodc=True)
2339         tgt = self.get_tgt(creds, pac_request=True)
2340         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2341
2342         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2343                               expect_pac_attrs=False,
2344                               expect_requester_sid=True)
2345
2346         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2347
2348         pac = self.get_ticket_pac(ticket)
2349         self.assertIsNotNone(pac)
2350
2351     def test_validate_pac_request_none(self):
2352         creds = self._get_creds()
2353         tgt = self.get_tgt(creds, pac_request=None)
2354         tgt = self._modify_tgt(tgt, invalid=True)
2355
2356         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2357                                  expect_pac_attrs=True,
2358                                  expect_pac_attrs_pac_request=None,
2359                                  expect_requester_sid=True)
2360
2361         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2362
2363         pac = self.get_ticket_pac(ticket)
2364         self.assertIsNotNone(pac)
2365
2366     def test_validate_pac_request_false(self):
2367         creds = self._get_creds()
2368         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2369         tgt = self._modify_tgt(tgt, invalid=True)
2370
2371         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2372                                  expect_pac_attrs=True,
2373                                  expect_pac_attrs_pac_request=False,
2374                                  expect_requester_sid=True)
2375
2376         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2377
2378         pac = self.get_ticket_pac(ticket, expect_pac=False)
2379         self.assertIsNone(pac)
2380
2381     def test_validate_pac_request_true(self):
2382         creds = self._get_creds()
2383         tgt = self.get_tgt(creds, pac_request=True)
2384         tgt = self._modify_tgt(tgt, invalid=True)
2385
2386         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2387                                  expect_pac_attrs=True,
2388                                  expect_pac_attrs_pac_request=True,
2389                                  expect_requester_sid=True)
2390
2391         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2392
2393         pac = self.get_ticket_pac(ticket)
2394         self.assertIsNotNone(pac)
2395
2396     def test_rodc_validate_pac_request_none(self):
2397         creds = self._get_creds(replication_allowed=True,
2398                                 revealed_to_rodc=True)
2399         tgt = self.get_tgt(creds, pac_request=None)
2400         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2401
2402         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2403                                  expect_pac_attrs=False,
2404                                  expect_requester_sid=True)
2405
2406         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2407
2408         pac = self.get_ticket_pac(ticket)
2409         self.assertIsNotNone(pac)
2410
2411     def test_rodc_validate_pac_request_false(self):
2412         creds = self._get_creds(replication_allowed=True,
2413                                 revealed_to_rodc=True)
2414         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2415         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2416
2417         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2418                                  expect_pac_attrs=False,
2419                                  expect_requester_sid=True)
2420
2421         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2422
2423         pac = self.get_ticket_pac(ticket)
2424         self.assertIsNotNone(pac)
2425
2426     def test_rodc_validate_pac_request_true(self):
2427         creds = self._get_creds(replication_allowed=True,
2428                                 revealed_to_rodc=True)
2429         tgt = self.get_tgt(creds, pac_request=True)
2430         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2431
2432         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2433                                  expect_pac_attrs=False,
2434                                  expect_requester_sid=True)
2435
2436         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2437
2438         pac = self.get_ticket_pac(ticket)
2439         self.assertIsNotNone(pac)
2440
2441     def test_s4u2self_pac_request_none(self):
2442         creds = self._get_creds()
2443         tgt = self.get_tgt(creds, pac_request=None)
2444
2445         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2446
2447         pac = self.get_ticket_pac(ticket)
2448         self.assertIsNotNone(pac)
2449
2450     def test_s4u2self_pac_request_false(self):
2451         creds = self._get_creds()
2452         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2453
2454         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2455
2456         pac = self.get_ticket_pac(ticket)
2457         self.assertIsNotNone(pac)
2458
2459     def test_s4u2self_pac_request_true(self):
2460         creds = self._get_creds()
2461         tgt = self.get_tgt(creds, pac_request=True)
2462
2463         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2464
2465         pac = self.get_ticket_pac(ticket)
2466         self.assertIsNotNone(pac)
2467
2468     def test_user2user_pac_request_none(self):
2469         creds = self._get_creds()
2470         tgt = self.get_tgt(creds, pac_request=None)
2471
2472         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2473
2474         pac = self.get_ticket_pac(ticket)
2475         self.assertIsNotNone(pac)
2476
2477     def test_user2user_pac_request_false(self):
2478         creds = self._get_creds()
2479         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2480
2481         ticket = self._user2user(tgt, creds, expected_error=0,
2482                                  expect_pac=True)
2483
2484         pac = self.get_ticket_pac(ticket, expect_pac=True)
2485         self.assertIsNotNone(pac)
2486
2487     def test_user2user_pac_request_true(self):
2488         creds = self._get_creds()
2489         tgt = self.get_tgt(creds, pac_request=True)
2490
2491         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2492
2493         pac = self.get_ticket_pac(ticket)
2494         self.assertIsNotNone(pac)
2495
2496     def test_user2user_user_pac_request_none(self):
2497         creds = self._get_creds()
2498         tgt = self.get_tgt(creds)
2499
2500         user_creds = self._get_mach_creds()
2501         user_tgt = self.get_tgt(user_creds, pac_request=None)
2502
2503         ticket = self._user2user(tgt, creds, expected_error=0,
2504                                  user_tgt=user_tgt, expect_pac=True)
2505
2506         pac = self.get_ticket_pac(ticket)
2507         self.assertIsNotNone(pac)
2508
2509     def test_user2user_user_pac_request_false(self):
2510         creds = self._get_creds()
2511         tgt = self.get_tgt(creds)
2512
2513         user_creds = self._get_mach_creds()
2514         user_tgt = self.get_tgt(user_creds, pac_request=False, expect_pac=None)
2515
2516         ticket = self._user2user(tgt, creds, expected_error=0,
2517                                  user_tgt=user_tgt, expect_pac=False)
2518
2519         pac = self.get_ticket_pac(ticket, expect_pac=False)
2520         self.assertIsNone(pac)
2521
2522     def test_user2user_user_pac_request_true(self):
2523         creds = self._get_creds()
2524         tgt = self.get_tgt(creds)
2525
2526         user_creds = self._get_mach_creds()
2527         user_tgt = self.get_tgt(user_creds, pac_request=True)
2528
2529         ticket = self._user2user(tgt, creds, expected_error=0,
2530                                  user_tgt=user_tgt, expect_pac=True)
2531
2532         pac = self.get_ticket_pac(ticket)
2533         self.assertIsNotNone(pac)
2534
2535     def test_fast_pac_request_none(self):
2536         creds = self._get_creds()
2537         tgt = self.get_tgt(creds, pac_request=None)
2538
2539         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2540
2541         pac = self.get_ticket_pac(ticket)
2542         self.assertIsNotNone(pac)
2543
2544     def test_fast_pac_request_false(self):
2545         creds = self._get_creds()
2546         tgt = self.get_tgt(creds, pac_request=False)
2547
2548         ticket = self._fast(tgt, creds, expected_error=0,
2549                             expect_pac=True)
2550
2551         pac = self.get_ticket_pac(ticket, expect_pac=True)
2552         self.assertIsNotNone(pac)
2553
2554     def test_fast_pac_request_true(self):
2555         creds = self._get_creds()
2556         tgt = self.get_tgt(creds, pac_request=True)
2557
2558         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2559
2560         pac = self.get_ticket_pac(ticket)
2561         self.assertIsNotNone(pac)
2562
2563     def test_tgs_rodc_pac_request_none(self):
2564         creds = self._get_creds(replication_allowed=True,
2565                                 revealed_to_rodc=True)
2566         tgt = self.get_tgt(creds, pac_request=None)
2567         tgt = self._modify_tgt(tgt, from_rodc=True)
2568
2569         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2570
2571         pac = self.get_ticket_pac(ticket)
2572         self.assertIsNotNone(pac)
2573
2574     def test_tgs_rodc_pac_request_false(self):
2575         creds = self._get_creds(replication_allowed=True,
2576                                 revealed_to_rodc=True)
2577         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2578         tgt = self._modify_tgt(tgt, from_rodc=True)
2579
2580         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2581
2582         pac = self.get_ticket_pac(ticket)
2583         self.assertIsNotNone(pac)
2584
2585     def test_tgs_rodc_pac_request_true(self):
2586         creds = self._get_creds(replication_allowed=True,
2587                                 revealed_to_rodc=True)
2588         tgt = self.get_tgt(creds, pac_request=True)
2589         tgt = self._modify_tgt(tgt, from_rodc=True)
2590
2591         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2592
2593         pac = self.get_ticket_pac(ticket)
2594         self.assertIsNotNone(pac)
2595
2596     def test_tgs_rename(self):
2597         creds = self.get_cached_creds(account_type=self.AccountType.USER,
2598                                       use_cache=False)
2599         tgt = self.get_tgt(creds)
2600
2601         # Rename the account.
2602         new_name = self.get_new_username()
2603
2604         samdb = self.get_samdb()
2605         msg = ldb.Message(creds.get_dn())
2606         msg['sAMAccountName'] = ldb.MessageElement(new_name,
2607                                                    ldb.FLAG_MOD_REPLACE,
2608                                                    'sAMAccountName')
2609         samdb.modify(msg)
2610
2611         self._run_tgs(tgt, expected_error=(KDC_ERR_TGT_REVOKED,
2612                                            KDC_ERR_C_PRINCIPAL_UNKNOWN))
2613
2614     # Test making a TGS request for a ticket expiring post-2038.
2615     def test_tgs_req_future_till(self):
2616         creds = self._get_creds()
2617         tgt = self._get_tgt(creds)
2618
2619         target_creds = self.get_service_creds()
2620         self._tgs_req(
2621             tgt=tgt,
2622             expected_error=0,
2623             target_creds=target_creds,
2624             till='99990913024805Z')
2625
2626     def _modify_renewable(self, enc_part):
2627         # Set the renewable flag.
2628         enc_part = self.modify_ticket_flag(enc_part, 'renewable', value=True)
2629
2630         # Set the renew-till time to be in the future.
2631         renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
2632         enc_part['renew-till'] = renew_till
2633
2634         return enc_part
2635
2636     def _modify_invalid(self, enc_part):
2637         # Set the invalid flag.
2638         enc_part = self.modify_ticket_flag(enc_part, 'invalid', value=True)
2639
2640         # Set the ticket start time to be in the past.
2641         past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
2642         enc_part['starttime'] = past_time
2643
2644         return enc_part
2645
2646     def _get_tgt(self,
2647                  client_creds,
2648                  renewable=False,
2649                  invalid=False,
2650                  from_rodc=False,
2651                  new_rid=None,
2652                  remove_pac=False,
2653                  allow_empty_authdata=False,
2654                  can_modify_logon_info=True,
2655                  can_modify_requester_sid=True,
2656                  remove_pac_attrs=False,
2657                  remove_requester_sid=False,
2658                  etype=None,
2659                  cksum_etype=None):
2660         self.assertFalse(renewable and invalid)
2661
2662         if remove_pac:
2663             self.assertIsNone(new_rid)
2664
2665         tgt = self.get_tgt(client_creds)
2666
2667         return self._modify_tgt(
2668             tgt=tgt,
2669             renewable=renewable,
2670             invalid=invalid,
2671             from_rodc=from_rodc,
2672             new_rid=new_rid,
2673             remove_pac=remove_pac,
2674             allow_empty_authdata=allow_empty_authdata,
2675             can_modify_logon_info=can_modify_logon_info,
2676             can_modify_requester_sid=can_modify_requester_sid,
2677             remove_pac_attrs=remove_pac_attrs,
2678             remove_requester_sid=remove_requester_sid,
2679             etype=None,
2680             cksum_etype=cksum_etype)
2681
2682     def _modify_tgt(self,
2683                     tgt,
2684                     renewable=False,
2685                     invalid=False,
2686                     from_rodc=False,
2687                     new_rid=None,
2688                     remove_pac=False,
2689                     allow_empty_authdata=False,
2690                     cname=None,
2691                     crealm=None,
2692                     can_modify_logon_info=True,
2693                     can_modify_requester_sid=True,
2694                     remove_pac_attrs=False,
2695                     remove_requester_sid=False,
2696                     etype=None,
2697                     cksum_etype=None):
2698         if from_rodc:
2699             krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2700         else:
2701             krbtgt_creds = self.get_krbtgt_creds()
2702
2703         if new_rid is not None or remove_requester_sid or remove_pac_attrs:
2704             def change_sid_fn(pac):
2705                 pac_buffers = pac.buffers
2706                 for pac_buffer in pac_buffers:
2707                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
2708                         if new_rid is not None and can_modify_logon_info:
2709                             logon_info = pac_buffer.info.info
2710
2711                             logon_info.info3.base.rid = new_rid
2712                     elif pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID:
2713                         if remove_requester_sid:
2714                             pac.num_buffers -= 1
2715                             pac_buffers.remove(pac_buffer)
2716                         elif new_rid is not None and can_modify_requester_sid:
2717                             requester_sid = pac_buffer.info
2718
2719                             samdb = self.get_samdb()
2720                             domain_sid = samdb.get_domain_sid()
2721
2722                             new_sid = f'{domain_sid}-{new_rid}'
2723
2724                             requester_sid.sid = security.dom_sid(new_sid)
2725                     elif pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO:
2726                         if remove_pac_attrs:
2727                             pac.num_buffers -= 1
2728                             pac_buffers.remove(pac_buffer)
2729
2730                 pac.buffers = pac_buffers
2731
2732                 return pac
2733         else:
2734             change_sid_fn = None
2735
2736         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds,
2737                                                          etype)
2738
2739         if remove_pac:
2740             checksum_keys = None
2741         else:
2742             if etype == cksum_etype:
2743                 cksum_key = krbtgt_key
2744             else:
2745                 cksum_key = self.TicketDecryptionKey_from_creds(krbtgt_creds,
2746                                                                 cksum_etype)
2747             checksum_keys = {
2748                 krb5pac.PAC_TYPE_KDC_CHECKSUM: cksum_key
2749             }
2750
2751         if renewable:
2752             flags_modify_fn = self._modify_renewable
2753         elif invalid:
2754             flags_modify_fn = self._modify_invalid
2755         else:
2756             flags_modify_fn = None
2757
2758         if cname is not None or crealm is not None:
2759             def modify_fn(enc_part):
2760                 if flags_modify_fn is not None:
2761                     enc_part = flags_modify_fn(enc_part)
2762
2763                 if cname is not None:
2764                     enc_part['cname'] = cname
2765
2766                 if crealm is not None:
2767                     enc_part['crealm'] = crealm
2768
2769                 return enc_part
2770         else:
2771             modify_fn = flags_modify_fn
2772
2773         if cname is not None:
2774             def modify_pac_fn(pac):
2775                 if change_sid_fn is not None:
2776                     pac = change_sid_fn(pac)
2777
2778                 for pac_buffer in pac.buffers:
2779                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2780                         logon_info = pac_buffer.info
2781
2782                         logon_info.account_name = (
2783                             cname['name-string'][0].decode('utf-8'))
2784
2785                 return pac
2786         else:
2787             modify_pac_fn = change_sid_fn
2788
2789         return self.modified_ticket(
2790             tgt,
2791             new_ticket_key=krbtgt_key,
2792             modify_fn=modify_fn,
2793             modify_pac_fn=modify_pac_fn,
2794             exclude_pac=remove_pac,
2795             allow_empty_authdata=allow_empty_authdata,
2796             update_pac_checksums=not remove_pac,
2797             checksum_keys=checksum_keys)
2798
2799     def _remove_rodc_partial_secrets(self):
2800         samdb = self.get_samdb()
2801
2802         rodc_ctx = self.get_mock_rodc_ctx()
2803         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2804
2805         def add_rodc_partial_secrets():
2806             msg = ldb.Message()
2807             msg.dn = rodc_dn
2808             msg['userAccountControl'] = ldb.MessageElement(
2809                 str(rodc_ctx.userAccountControl),
2810                 ldb.FLAG_MOD_REPLACE,
2811                 'userAccountControl')
2812             samdb.modify(msg)
2813
2814         self.addCleanup(add_rodc_partial_secrets)
2815
2816         uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
2817
2818         msg = ldb.Message()
2819         msg.dn = rodc_dn
2820         msg['userAccountControl'] = ldb.MessageElement(
2821             str(uac),
2822             ldb.FLAG_MOD_REPLACE,
2823             'userAccountControl')
2824         samdb.modify(msg)
2825
2826     def _remove_rodc_krbtgt_link(self):
2827         samdb = self.get_samdb()
2828
2829         rodc_ctx = self.get_mock_rodc_ctx()
2830         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2831
2832         def add_rodc_krbtgt_link():
2833             msg = ldb.Message()
2834             msg.dn = rodc_dn
2835             msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2836                 rodc_ctx.new_krbtgt_dn,
2837                 ldb.FLAG_MOD_ADD,
2838                 'msDS-KrbTgtLink')
2839             samdb.modify(msg)
2840
2841         self.addCleanup(add_rodc_krbtgt_link)
2842
2843         msg = ldb.Message()
2844         msg.dn = rodc_dn
2845         msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2846             [],
2847             ldb.FLAG_MOD_DELETE,
2848             'msDS-KrbTgtLink')
2849         samdb.modify(msg)
2850
2851     def _get_creds(self,
2852                    replication_allowed=False,
2853                    replication_denied=False,
2854                    revealed_to_rodc=False):
2855         return self.get_cached_creds(
2856             account_type=self.AccountType.COMPUTER,
2857             opts={
2858                 'allowed_replication_mock': replication_allowed,
2859                 'denied_replication_mock': replication_denied,
2860                 'revealed_to_mock_rodc': revealed_to_rodc,
2861                 'id': 0
2862             })
2863
2864     def _get_existing_rid(self,
2865                           replication_allowed=False,
2866                           replication_denied=False,
2867                           revealed_to_rodc=False):
2868         other_creds = self.get_cached_creds(
2869             account_type=self.AccountType.COMPUTER,
2870             opts={
2871                 'allowed_replication_mock': replication_allowed,
2872                 'denied_replication_mock': replication_denied,
2873                 'revealed_to_mock_rodc': revealed_to_rodc,
2874                 'id': 1
2875             })
2876
2877         samdb = self.get_samdb()
2878
2879         other_dn = other_creds.get_dn()
2880         other_sid = self.get_objectSid(samdb, other_dn)
2881
2882         other_rid = int(other_sid.rsplit('-', 1)[1])
2883
2884         return other_rid
2885
2886     def _get_mach_creds(self):
2887         return self.get_cached_creds(
2888             account_type=self.AccountType.COMPUTER,
2889             opts={
2890                 'allowed_replication_mock': True,
2891                 'denied_replication_mock': False,
2892                 'revealed_to_mock_rodc': True,
2893                 'id': 2
2894             })
2895
2896     def _get_non_existent_rid(self):
2897         return (1 << 30) - 1
2898
2899     def _run_tgs(self, tgt, expected_error, expect_pac=True,
2900                  expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2901                  expect_requester_sid=None, expected_sid=None):
2902         target_creds = self.get_service_creds()
2903         return self._tgs_req(
2904             tgt, expected_error, target_creds,
2905             expect_pac=expect_pac,
2906             expect_pac_attrs=expect_pac_attrs,
2907             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2908             expect_requester_sid=expect_requester_sid,
2909             expected_sid=expected_sid)
2910
2911     # These tests fail against Windows, which does not implement ticket
2912     # renewal.
2913     def _renew_tgt(self, tgt, expected_error, expect_pac=True,
2914                    expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2915                    expect_requester_sid=None, expected_sid=None):
2916         krbtgt_creds = self.get_krbtgt_creds()
2917         kdc_options = str(krb5_asn1.KDCOptions('renew'))
2918         return self._tgs_req(
2919             tgt, expected_error, krbtgt_creds,
2920             kdc_options=kdc_options,
2921             expect_pac=expect_pac,
2922             expect_pac_attrs=expect_pac_attrs,
2923             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2924             expect_requester_sid=expect_requester_sid,
2925             expected_sid=expected_sid)
2926
2927     # These tests fail against Windows, which does not implement ticket
2928     # validation.
2929     def _validate_tgt(self, tgt, expected_error, expect_pac=True,
2930                       expect_pac_attrs=None,
2931                       expect_pac_attrs_pac_request=None,
2932                       expect_requester_sid=None,
2933                       expected_sid=None):
2934         krbtgt_creds = self.get_krbtgt_creds()
2935         kdc_options = str(krb5_asn1.KDCOptions('validate'))
2936         return self._tgs_req(
2937             tgt, expected_error, krbtgt_creds,
2938             kdc_options=kdc_options,
2939             expect_pac=expect_pac,
2940             expect_pac_attrs=expect_pac_attrs,
2941             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2942             expect_requester_sid=expect_requester_sid,
2943             expected_sid=expected_sid)
2944
2945     def _s4u2self(self, tgt, tgt_creds, expected_error, expect_pac=True,
2946                   expect_edata=False, expected_status=None):
2947         user_creds = self._get_mach_creds()
2948
2949         user_name = user_creds.get_username()
2950         user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2951                                                names=[user_name])
2952         user_realm = user_creds.get_realm()
2953
2954         def generate_s4u2self_padata(_kdc_exchange_dict,
2955                                      _callback_dict,
2956                                      req_body):
2957             padata = self.PA_S4U2Self_create(
2958                 name=user_cname,
2959                 realm=user_realm,
2960                 tgt_session_key=tgt.session_key,
2961                 ctype=None)
2962
2963             return [padata], req_body
2964
2965         return self._tgs_req(tgt, expected_error, tgt_creds,
2966                              expected_cname=user_cname,
2967                              generate_padata_fn=generate_s4u2self_padata,
2968                              expect_edata=expect_edata,
2969                              expected_status=expected_status,
2970                              expect_pac=expect_pac)
2971
2972     def _user2user(self, tgt, tgt_creds, expected_error, sname=None,
2973                    srealm=None, user_tgt=None, expect_pac=True):
2974         if user_tgt is None:
2975             user_creds = self._get_mach_creds()
2976             user_tgt = self.get_tgt(user_creds)
2977
2978         kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
2979         return self._tgs_req(user_tgt, expected_error, tgt_creds,
2980                              kdc_options=kdc_options,
2981                              additional_ticket=tgt,
2982                              sname=sname,
2983                              srealm=srealm,
2984                              expect_pac=expect_pac)
2985
2986     def _fast(self, armor_tgt, armor_tgt_creds, expected_error,
2987               expected_sname=None, expect_pac=True):
2988         user_creds = self._get_mach_creds()
2989         user_tgt = self.get_tgt(user_creds)
2990
2991         target_creds = self.get_service_creds()
2992
2993         return self._tgs_req(user_tgt, expected_error, target_creds,
2994                              armor_tgt=armor_tgt,
2995                              expected_sname=expected_sname,
2996                              expect_pac=expect_pac)
2997
2998
2999 if __name__ == "__main__":
3000     global_asn1_print = False
3001     global_hexdump = False
3002     import unittest
3003     unittest.main()