tests/krb5: Be less particular about getting NTSTATUS codes for KDC TGS tests
[samba.git] / python / samba / tests / krb5 / kdc_tgs_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
25
26 import ldb
27
28 from samba import dsdb, ntstatus
29
30 from samba.dcerpc import krb5pac, security
31
32
33 import samba.tests.krb5.kcrypto as kcrypto
34 from samba.tests.krb5.kdc_base_test import KDCBaseTest
35 from samba.tests.krb5.raw_testcase import Krb5EncryptionKey
36 from samba.tests.krb5.rfc4120_constants import (
37     AES256_CTS_HMAC_SHA1_96,
38     ARCFOUR_HMAC_MD5,
39     KRB_ERROR,
40     KDC_ERR_BADKEYVER,
41     KDC_ERR_BADMATCH,
42     KDC_ERR_ETYPE_NOSUPP,
43     KDC_ERR_GENERIC,
44     KDC_ERR_MODIFIED,
45     KDC_ERR_NOT_US,
46     KDC_ERR_POLICY,
47     KDC_ERR_PREAUTH_REQUIRED,
48     KDC_ERR_C_PRINCIPAL_UNKNOWN,
49     KDC_ERR_S_PRINCIPAL_UNKNOWN,
50     KDC_ERR_TGT_REVOKED,
51     KRB_ERR_TKT_NYV,
52     KDC_ERR_WRONG_REALM,
53     NT_ENTERPRISE_PRINCIPAL,
54     NT_PRINCIPAL,
55     NT_SRV_INST,
56 )
57 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
58
59 global_asn1_print = False
60 global_hexdump = False
61
62
63 class KdcTgsBaseTests(KDCBaseTest):
64     def _as_req(self,
65                 creds,
66                 expected_error,
67                 target_creds,
68                 etype,
69                 expected_ticket_etype=None):
70         user_name = creds.get_username()
71         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
72                                           names=user_name.split('/'))
73
74         target_name = target_creds.get_username()
75         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
76                                           names=['host', target_name[:-1]])
77
78         if expected_error:
79             expected_sname = sname
80         else:
81             expected_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
82                                                        names=[target_name])
83
84         realm = creds.get_realm()
85         salt = creds.get_salt()
86
87         till = self.get_KerberosTime(offset=36000)
88
89         ticket_decryption_key = (
90             self.TicketDecryptionKey_from_creds(target_creds,
91                                                 etype=expected_ticket_etype))
92         expected_etypes = target_creds.tgs_supported_enctypes
93
94         kdc_options = ('forwardable,'
95                        'renewable,'
96                        'canonicalize,'
97                        'renewable-ok')
98         kdc_options = krb5_asn1.KDCOptions(kdc_options)
99
100         if expected_error:
101             initial_error = (KDC_ERR_PREAUTH_REQUIRED, expected_error)
102         else:
103             initial_error = KDC_ERR_PREAUTH_REQUIRED
104
105         rep, kdc_exchange_dict = self._test_as_exchange(
106             creds=creds,
107             cname=cname,
108             realm=realm,
109             sname=sname,
110             till=till,
111             expected_error_mode=initial_error,
112             expected_crealm=realm,
113             expected_cname=cname,
114             expected_srealm=realm,
115             expected_sname=sname,
116             expected_salt=salt,
117             expected_supported_etypes=expected_etypes,
118             etypes=etype,
119             padata=None,
120             kdc_options=kdc_options,
121             preauth_key=None,
122             ticket_decryption_key=ticket_decryption_key)
123         self.assertIsNotNone(rep)
124         self.assertEqual(KRB_ERROR, rep['msg-type'])
125         error_code = rep['error-code']
126         if expected_error:
127             self.assertIn(error_code, initial_error)
128             if error_code == expected_error:
129                 return
130         else:
131             self.assertEqual(initial_error, error_code)
132
133         etype_info2 = kdc_exchange_dict['preauth_etype_info2']
134
135         preauth_key = self.PasswordKey_from_etype_info2(creds,
136                                                         etype_info2[0],
137                                                         creds.get_kvno())
138
139         ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
140
141         padata = [ts_enc_padata]
142
143         expected_realm = realm.upper()
144
145         rep, kdc_exchange_dict = self._test_as_exchange(
146             creds=creds,
147             cname=cname,
148             realm=realm,
149             sname=sname,
150             till=till,
151             expected_error_mode=expected_error,
152             expected_crealm=expected_realm,
153             expected_cname=cname,
154             expected_srealm=expected_realm,
155             expected_sname=expected_sname,
156             expected_salt=salt,
157             expected_supported_etypes=expected_etypes,
158             etypes=etype,
159             padata=padata,
160             kdc_options=kdc_options,
161             preauth_key=preauth_key,
162             ticket_decryption_key=ticket_decryption_key,
163             expect_edata=False)
164         if expected_error:
165             self.check_error_rep(rep, expected_error)
166             return None
167
168         self.check_as_reply(rep)
169         return kdc_exchange_dict['rep_ticket_creds']
170
171     def _tgs_req(self, tgt, expected_error, creds, target_creds, *,
172                  armor_tgt=None,
173                  kdc_options='0',
174                  pac_options=None,
175                  expected_cname=None,
176                  expected_sname=None,
177                  expected_account_name=None,
178                  additional_ticket=None,
179                  decryption_key=None,
180                  generate_padata_fn=None,
181                  generate_fast_padata_fn=None,
182                  sname=None,
183                  srealm=None,
184                  till=None,
185                  etypes=None,
186                  expected_ticket_etype=None,
187                  expected_supported_etypes=None,
188                  expect_pac=True,
189                  expect_pac_attrs=None,
190                  expect_pac_attrs_pac_request=None,
191                  expect_requester_sid=None,
192                  expect_edata=False,
193                  expected_sid=None,
194                  expect_status=None,
195                  expected_status=None,
196                  expected_proxy_target=None,
197                  expected_transited_services=None,
198                  check_patypes=True):
199         if srealm is False:
200             srealm = None
201         elif srealm is None:
202             srealm = target_creds.get_realm()
203
204         if sname is False:
205             sname = None
206             if expected_sname is None:
207                 expected_sname = self.get_krbtgt_sname()
208         else:
209             if sname is None:
210                 target_name = target_creds.get_username()
211                 if target_name == 'krbtgt':
212                     sname = self.PrincipalName_create(
213                         name_type=NT_SRV_INST,
214                         names=[target_name, srealm])
215                 else:
216                     if target_name[-1] == '$':
217                         target_name = target_name[:-1]
218                     sname = self.PrincipalName_create(
219                         name_type=NT_PRINCIPAL,
220                         names=['host', target_name])
221
222             if expected_sname is None:
223                 expected_sname = sname
224
225         if additional_ticket is not None:
226             additional_tickets = [additional_ticket.ticket]
227             if decryption_key is None:
228                 decryption_key = additional_ticket.session_key
229         else:
230             additional_tickets = None
231             if decryption_key is None:
232                 decryption_key = self.TicketDecryptionKey_from_creds(
233                     target_creds, etype=expected_ticket_etype)
234
235         subkey = self.RandomKey(tgt.session_key.etype)
236
237         if armor_tgt is not None:
238             armor_subkey = self.RandomKey(subkey.etype)
239             explicit_armor_key = self.generate_armor_key(armor_subkey,
240                                                          armor_tgt.session_key)
241             armor_key = kcrypto.cf2(explicit_armor_key.key,
242                                     subkey.key,
243                                     b'explicitarmor',
244                                     b'tgsarmor')
245             armor_key = Krb5EncryptionKey(armor_key, None)
246
247             generate_fast_fn = self.generate_simple_fast
248             generate_fast_armor_fn = self.generate_ap_req
249
250             if pac_options is None:
251                 pac_options = '1'  # claims support
252         else:
253             armor_subkey = None
254             armor_key = None
255             generate_fast_fn = None
256             generate_fast_armor_fn = None
257
258         if etypes is None:
259             etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
260
261         if expected_error:
262             check_error_fn = self.generic_check_kdc_error
263             check_rep_fn = None
264         else:
265             check_error_fn = None
266             check_rep_fn = self.generic_check_kdc_rep
267
268         if expected_cname is None:
269             expected_cname = tgt.cname
270
271         kdc_exchange_dict = self.tgs_exchange_dict(
272             creds=creds,
273             expected_crealm=tgt.crealm,
274             expected_cname=expected_cname,
275             expected_srealm=srealm,
276             expected_sname=expected_sname,
277             expected_account_name=expected_account_name,
278             ticket_decryption_key=decryption_key,
279             generate_padata_fn=generate_padata_fn,
280             generate_fast_padata_fn=generate_fast_padata_fn,
281             generate_fast_fn=generate_fast_fn,
282             generate_fast_armor_fn=generate_fast_armor_fn,
283             check_error_fn=check_error_fn,
284             check_rep_fn=check_rep_fn,
285             check_kdc_private_fn=self.generic_check_kdc_private,
286             expected_error_mode=expected_error,
287             expect_status=expect_status,
288             expected_status=expected_status,
289             tgt=tgt,
290             armor_key=armor_key,
291             armor_tgt=armor_tgt,
292             armor_subkey=armor_subkey,
293             pac_options=pac_options,
294             authenticator_subkey=subkey,
295             kdc_options=kdc_options,
296             expected_supported_etypes=expected_supported_etypes,
297             expect_edata=expect_edata,
298             expect_pac=expect_pac,
299             expect_pac_attrs=expect_pac_attrs,
300             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
301             expect_requester_sid=expect_requester_sid,
302             expected_sid=expected_sid,
303             expected_proxy_target=expected_proxy_target,
304             expected_transited_services=expected_transited_services,
305             check_patypes=check_patypes)
306
307         rep = self._generic_kdc_exchange(kdc_exchange_dict,
308                                          cname=None,
309                                          realm=srealm,
310                                          sname=sname,
311                                          till_time=till,
312                                          etypes=etypes,
313                                          additional_tickets=additional_tickets)
314         if expected_error:
315             self.check_error_rep(rep, expected_error)
316             return None
317         else:
318             self.check_tgs_reply(rep)
319             return kdc_exchange_dict['rep_ticket_creds']
320
321
322 class KdcTgsTests(KdcTgsBaseTests):
323
324     def setUp(self):
325         super().setUp()
326         self.do_asn1_print = global_asn1_print
327         self.do_hexdump = global_hexdump
328
329     def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
330         ''' Try and obtain a ticket from the TGS, but supply a cname
331             that differs from that provided to the krbtgt
332         '''
333         # Create the user account
334         samdb = self.get_samdb()
335         user_name = "tsttktusr"
336         (uc, _) = self.create_account(samdb, user_name)
337         realm = uc.get_realm().lower()
338
339         # Do the initial AS-REQ, should get a pre-authentication required
340         # response
341         etype = (AES256_CTS_HMAC_SHA1_96,)
342         cname = self.PrincipalName_create(
343             name_type=NT_PRINCIPAL, names=[user_name])
344         sname = self.PrincipalName_create(
345             name_type=NT_SRV_INST, names=["krbtgt", realm])
346
347         rep = self.as_req(cname, sname, realm, etype)
348         self.check_pre_authentication(rep)
349
350         # Do the next AS-REQ
351         padata = self.get_enc_timestamp_pa_data(uc, rep)
352         key = self.get_as_rep_key(uc, rep)
353         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
354         self.check_as_reply(rep)
355
356         # Request a service ticket, but use a cname that does not match
357         # that in the original AS-REQ
358         enc_part2 = self.get_as_rep_enc_data(key, rep)
359         key = self.EncryptionKey_import(enc_part2['key'])
360         ticket = rep['ticket']
361
362         cname = self.PrincipalName_create(
363             name_type=NT_PRINCIPAL,
364             names=["Administrator"])
365         sname = self.PrincipalName_create(
366             name_type=NT_PRINCIPAL,
367             names=["host", samdb.host_dns_name()])
368
369         (rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype,
370                                        creds=uc,
371                                        expected_error_mode=KDC_ERR_BADMATCH,
372                                        expect_edata=False)
373
374         self.assertIsNone(
375             enc_part,
376             "rep = {%s}, enc_part = {%s}" % (rep, enc_part))
377         self.assertEqual(KRB_ERROR, rep['msg-type'], "rep = {%s}" % rep)
378         self.assertEqual(
379             KDC_ERR_BADMATCH,
380             rep['error-code'],
381             "rep = {%s}" % rep)
382
383     def test_ldap_service_ticket(self):
384         '''Get a ticket to the ldap service
385         '''
386         # Create the user account
387         samdb = self.get_samdb()
388         user_name = "tsttktusr"
389         (uc, _) = self.create_account(samdb, user_name)
390         realm = uc.get_realm().lower()
391
392         # Do the initial AS-REQ, should get a pre-authentication required
393         # response
394         etype = (AES256_CTS_HMAC_SHA1_96,)
395         cname = self.PrincipalName_create(
396             name_type=NT_PRINCIPAL, names=[user_name])
397         sname = self.PrincipalName_create(
398             name_type=NT_SRV_INST, names=["krbtgt", realm])
399
400         rep = self.as_req(cname, sname, realm, etype)
401         self.check_pre_authentication(rep)
402
403         # Do the next AS-REQ
404         padata = self.get_enc_timestamp_pa_data(uc, rep)
405         key = self.get_as_rep_key(uc, rep)
406         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
407         self.check_as_reply(rep)
408
409         enc_part2 = self.get_as_rep_enc_data(key, rep)
410         key = self.EncryptionKey_import(enc_part2['key'])
411         ticket = rep['ticket']
412
413         # Request a ticket to the ldap service
414         sname = self.PrincipalName_create(
415             name_type=NT_SRV_INST,
416             names=["ldap", samdb.host_dns_name()])
417
418         (rep, _) = self.tgs_req(
419             cname, sname, uc.get_realm(), ticket, key, etype,
420             service_creds=self.get_dc_creds())
421
422         self.check_tgs_reply(rep)
423
424     def test_get_ticket_for_host_service_of_machine_account(self):
425
426         # Create a user and machine account for the test.
427         #
428         samdb = self.get_samdb()
429         user_name = "tsttktusr"
430         (uc, dn) = self.create_account(samdb, user_name)
431         (mc, _) = self.create_account(samdb, "tsttktmac",
432                                       account_type=self.AccountType.COMPUTER)
433         realm = uc.get_realm().lower()
434
435         # Do the initial AS-REQ, should get a pre-authentication required
436         # response
437         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
438         cname = self.PrincipalName_create(
439             name_type=NT_PRINCIPAL, names=[user_name])
440         sname = self.PrincipalName_create(
441             name_type=NT_SRV_INST, names=["krbtgt", realm])
442
443         rep = self.as_req(cname, sname, realm, etype)
444         self.check_pre_authentication(rep)
445
446         # Do the next AS-REQ
447         padata = self.get_enc_timestamp_pa_data(uc, rep)
448         key = self.get_as_rep_key(uc, rep)
449         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
450         self.check_as_reply(rep)
451
452         # Request a ticket to the host service on the machine account
453         ticket = rep['ticket']
454         enc_part2 = self.get_as_rep_enc_data(key, rep)
455         key = self.EncryptionKey_import(enc_part2['key'])
456         cname = self.PrincipalName_create(
457             name_type=NT_PRINCIPAL,
458             names=[user_name])
459         sname = self.PrincipalName_create(
460             name_type=NT_PRINCIPAL,
461             names=[mc.get_username()])
462
463         (rep, enc_part) = self.tgs_req(
464             cname, sname, uc.get_realm(), ticket, key, etype,
465             service_creds=mc)
466         self.check_tgs_reply(rep)
467
468         # Check the contents of the service ticket
469         ticket = rep['ticket']
470         enc_part = self.decode_service_ticket(mc, ticket)
471
472         pac_data = self.get_pac_data(enc_part['authorization-data'])
473         sid = self.get_objectSid(samdb, dn)
474         upn = "%s@%s" % (uc.get_username(), realm)
475         self.assertEqual(
476             uc.get_username(),
477             str(pac_data.account_name),
478             "rep = {%s},%s" % (rep, pac_data))
479         self.assertEqual(
480             uc.get_username(),
481             pac_data.logon_name,
482             "rep = {%s},%s" % (rep, pac_data))
483         self.assertEqual(
484             uc.get_realm(),
485             pac_data.domain_name,
486             "rep = {%s},%s" % (rep, pac_data))
487         self.assertEqual(
488             upn,
489             pac_data.upn,
490             "rep = {%s},%s" % (rep, pac_data))
491         self.assertEqual(
492             sid,
493             pac_data.account_sid,
494             "rep = {%s},%s" % (rep, pac_data))
495
496     def test_request(self):
497         client_creds = self.get_client_creds()
498         service_creds = self.get_service_creds()
499
500         tgt = self.get_tgt(client_creds)
501
502         pac = self.get_ticket_pac(tgt)
503         self.assertIsNotNone(pac)
504
505         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
506
507         pac = self.get_ticket_pac(ticket)
508         self.assertIsNotNone(pac)
509
510     def test_request_no_pac(self):
511         client_creds = self.get_client_creds()
512         service_creds = self.get_service_creds()
513
514         tgt = self.get_tgt(client_creds, pac_request=False)
515
516         pac = self.get_ticket_pac(tgt)
517         self.assertIsNotNone(pac)
518
519         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
520                                         pac_request=False, expect_pac=False)
521
522         pac = self.get_ticket_pac(ticket, expect_pac=False)
523         self.assertIsNone(pac)
524
525     def test_request_enterprise_canon(self):
526         upn = self.get_new_username()
527         client_creds = self.get_cached_creds(
528             account_type=self.AccountType.USER,
529             opts={'upn': upn})
530         service_creds = self.get_service_creds()
531
532         user_name = client_creds.get_username()
533         realm = client_creds.get_realm()
534         client_account = f'{user_name}@{realm}'
535
536         expected_cname = self.PrincipalName_create(
537             name_type=NT_PRINCIPAL,
538             names=[user_name])
539
540         kdc_options = 'canonicalize'
541
542         tgt = self.get_tgt(client_creds,
543                            client_account=client_account,
544                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
545                            expected_cname=expected_cname,
546                            expected_account_name=user_name,
547                            kdc_options=kdc_options)
548
549         self._make_tgs_request(
550             client_creds, service_creds, tgt,
551             client_account=client_account,
552             client_name_type=NT_ENTERPRISE_PRINCIPAL,
553             expected_cname=expected_cname,
554             expected_account_name=user_name,
555             kdc_options=kdc_options)
556
557     def test_request_enterprise_canon_case(self):
558         upn = self.get_new_username()
559         client_creds = self.get_cached_creds(
560             account_type=self.AccountType.USER,
561             opts={'upn': upn})
562         service_creds = self.get_service_creds()
563
564         user_name = client_creds.get_username()
565         realm = client_creds.get_realm().lower()
566         client_account = f'{user_name}@{realm}'
567
568         expected_cname = self.PrincipalName_create(
569             name_type=NT_PRINCIPAL,
570             names=[user_name])
571
572         kdc_options = 'canonicalize'
573
574         tgt = self.get_tgt(client_creds,
575                            client_account=client_account,
576                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
577                            expected_cname=expected_cname,
578                            expected_account_name=user_name,
579                            kdc_options=kdc_options)
580
581         self._make_tgs_request(
582             client_creds, service_creds, tgt,
583             client_account=client_account,
584             client_name_type=NT_ENTERPRISE_PRINCIPAL,
585             expected_cname=expected_cname,
586             expected_account_name=user_name,
587             kdc_options=kdc_options)
588
589     def test_request_enterprise_canon_mac(self):
590         upn = self.get_new_username()
591         client_creds = self.get_cached_creds(
592             account_type=self.AccountType.COMPUTER,
593             opts={'upn': upn})
594         service_creds = self.get_service_creds()
595
596         user_name = client_creds.get_username()
597         realm = client_creds.get_realm()
598         client_account = f'{user_name}@{realm}'
599
600         expected_cname = self.PrincipalName_create(
601             name_type=NT_PRINCIPAL,
602             names=[user_name])
603
604         kdc_options = 'canonicalize'
605
606         tgt = self.get_tgt(client_creds,
607                            client_account=client_account,
608                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
609                            expected_cname=expected_cname,
610                            expected_account_name=user_name,
611                            kdc_options=kdc_options)
612
613         self._make_tgs_request(
614             client_creds, service_creds, tgt,
615             client_account=client_account,
616             client_name_type=NT_ENTERPRISE_PRINCIPAL,
617             expected_cname=expected_cname,
618             expected_account_name=user_name,
619             kdc_options=kdc_options)
620
621     def test_request_enterprise_canon_case_mac(self):
622         upn = self.get_new_username()
623         client_creds = self.get_cached_creds(
624             account_type=self.AccountType.COMPUTER,
625             opts={'upn': upn})
626         service_creds = self.get_service_creds()
627
628         user_name = client_creds.get_username()
629         realm = client_creds.get_realm().lower()
630         client_account = f'{user_name}@{realm}'
631
632         expected_cname = self.PrincipalName_create(
633             name_type=NT_PRINCIPAL,
634             names=[user_name])
635
636         kdc_options = 'canonicalize'
637
638         tgt = self.get_tgt(client_creds,
639                            client_account=client_account,
640                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
641                            expected_cname=expected_cname,
642                            expected_account_name=user_name,
643                            kdc_options=kdc_options)
644
645         self._make_tgs_request(
646             client_creds, service_creds, tgt,
647             client_account=client_account,
648             client_name_type=NT_ENTERPRISE_PRINCIPAL,
649             expected_cname=expected_cname,
650             expected_account_name=user_name,
651             kdc_options=kdc_options)
652
653     def test_request_enterprise_no_canon(self):
654         upn = self.get_new_username()
655         client_creds = self.get_cached_creds(
656             account_type=self.AccountType.USER,
657             opts={'upn': upn})
658         service_creds = self.get_service_creds()
659
660         user_name = client_creds.get_username()
661         realm = client_creds.get_realm()
662         client_account = f'{user_name}@{realm}'
663
664         kdc_options = '0'
665
666         tgt = self.get_tgt(client_creds,
667                            client_account=client_account,
668                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
669                            expected_account_name=user_name,
670                            kdc_options=kdc_options)
671
672         self._make_tgs_request(
673             client_creds, service_creds, tgt,
674             client_account=client_account,
675             client_name_type=NT_ENTERPRISE_PRINCIPAL,
676             expected_account_name=user_name,
677             kdc_options=kdc_options)
678
679     def test_request_enterprise_no_canon_case(self):
680         upn = self.get_new_username()
681         client_creds = self.get_cached_creds(
682             account_type=self.AccountType.USER,
683             opts={'upn': upn})
684         service_creds = self.get_service_creds()
685
686         user_name = client_creds.get_username()
687         realm = client_creds.get_realm().lower()
688         client_account = f'{user_name}@{realm}'
689
690         kdc_options = '0'
691
692         tgt = self.get_tgt(client_creds,
693                            client_account=client_account,
694                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
695                            expected_account_name=user_name,
696                            kdc_options=kdc_options)
697
698         self._make_tgs_request(
699             client_creds, service_creds, tgt,
700             client_account=client_account,
701             client_name_type=NT_ENTERPRISE_PRINCIPAL,
702             expected_account_name=user_name,
703             kdc_options=kdc_options)
704
705     def test_request_enterprise_no_canon_mac(self):
706         upn = self.get_new_username()
707         client_creds = self.get_cached_creds(
708             account_type=self.AccountType.COMPUTER,
709             opts={'upn': upn})
710         service_creds = self.get_service_creds()
711
712         user_name = client_creds.get_username()
713         realm = client_creds.get_realm()
714         client_account = f'{user_name}@{realm}'
715
716         kdc_options = '0'
717
718         tgt = self.get_tgt(client_creds,
719                            client_account=client_account,
720                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
721                            expected_account_name=user_name,
722                            kdc_options=kdc_options)
723
724         self._make_tgs_request(
725             client_creds, service_creds, tgt,
726             client_account=client_account,
727             client_name_type=NT_ENTERPRISE_PRINCIPAL,
728             expected_account_name=user_name,
729             kdc_options=kdc_options)
730
731     def test_request_enterprise_no_canon_case_mac(self):
732         upn = self.get_new_username()
733         client_creds = self.get_cached_creds(
734             account_type=self.AccountType.COMPUTER,
735             opts={'upn': upn})
736         service_creds = self.get_service_creds()
737
738         user_name = client_creds.get_username()
739         realm = client_creds.get_realm().lower()
740         client_account = f'{user_name}@{realm}'
741
742         kdc_options = '0'
743
744         tgt = self.get_tgt(client_creds,
745                            client_account=client_account,
746                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
747                            expected_account_name=user_name,
748                            kdc_options=kdc_options)
749
750         self._make_tgs_request(
751             client_creds, service_creds, tgt,
752             client_account=client_account,
753             client_name_type=NT_ENTERPRISE_PRINCIPAL,
754             expected_account_name=user_name,
755             kdc_options=kdc_options)
756
757     def test_client_no_auth_data_required(self):
758         client_creds = self.get_cached_creds(
759             account_type=self.AccountType.USER,
760             opts={'no_auth_data_required': True})
761         service_creds = self.get_service_creds()
762
763         tgt = self.get_tgt(client_creds)
764
765         pac = self.get_ticket_pac(tgt)
766         self.assertIsNotNone(pac)
767
768         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
769
770         pac = self.get_ticket_pac(ticket)
771         self.assertIsNotNone(pac)
772
773     def test_no_pac_client_no_auth_data_required(self):
774         client_creds = self.get_cached_creds(
775             account_type=self.AccountType.USER,
776             opts={'no_auth_data_required': True})
777         service_creds = self.get_service_creds()
778
779         tgt = self.get_tgt(client_creds)
780
781         pac = self.get_ticket_pac(tgt)
782         self.assertIsNotNone(pac)
783
784         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
785                                         pac_request=False, expect_pac=True)
786
787         pac = self.get_ticket_pac(ticket)
788         self.assertIsNotNone(pac)
789
790     def test_service_no_auth_data_required(self):
791         client_creds = self.get_client_creds()
792         service_creds = self.get_cached_creds(
793             account_type=self.AccountType.COMPUTER,
794             opts={'no_auth_data_required': True})
795
796         tgt = self.get_tgt(client_creds)
797
798         pac = self.get_ticket_pac(tgt)
799         self.assertIsNotNone(pac)
800
801         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
802                                         expect_pac=False)
803
804         pac = self.get_ticket_pac(ticket, expect_pac=False)
805         self.assertIsNone(pac)
806
807     def test_no_pac_service_no_auth_data_required(self):
808         client_creds = self.get_client_creds()
809         service_creds = self.get_cached_creds(
810             account_type=self.AccountType.COMPUTER,
811             opts={'no_auth_data_required': True})
812
813         tgt = self.get_tgt(client_creds, pac_request=False)
814
815         pac = self.get_ticket_pac(tgt)
816         self.assertIsNotNone(pac)
817
818         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
819                                         pac_request=False, expect_pac=False)
820
821         pac = self.get_ticket_pac(ticket, expect_pac=False)
822         self.assertIsNone(pac)
823
824     def test_remove_pac_service_no_auth_data_required(self):
825         client_creds = self.get_client_creds()
826         service_creds = self.get_cached_creds(
827             account_type=self.AccountType.COMPUTER,
828             opts={'no_auth_data_required': True})
829
830         tgt = self.modified_ticket(self.get_tgt(client_creds),
831                                    exclude_pac=True)
832
833         pac = self.get_ticket_pac(tgt, expect_pac=False)
834         self.assertIsNone(pac)
835
836         self._make_tgs_request(client_creds, service_creds, tgt,
837                                expect_error=True)
838
839     def test_remove_pac_client_no_auth_data_required(self):
840         client_creds = self.get_cached_creds(
841             account_type=self.AccountType.USER,
842             opts={'no_auth_data_required': True})
843         service_creds = self.get_service_creds()
844
845         tgt = self.modified_ticket(self.get_tgt(client_creds),
846                                    exclude_pac=True)
847
848         pac = self.get_ticket_pac(tgt, expect_pac=False)
849         self.assertIsNone(pac)
850
851         self._make_tgs_request(client_creds, service_creds, tgt,
852                                expect_error=True)
853
854     def test_remove_pac(self):
855         client_creds = self.get_client_creds()
856         service_creds = self.get_service_creds()
857
858         tgt = self.modified_ticket(self.get_tgt(client_creds),
859                                    exclude_pac=True)
860
861         pac = self.get_ticket_pac(tgt, expect_pac=False)
862         self.assertIsNone(pac)
863
864         self._make_tgs_request(client_creds, service_creds, tgt,
865                                expect_error=True)
866
867     def test_upn_dns_info_ex_user(self):
868         client_creds = self.get_client_creds()
869         self._run_upn_dns_info_ex_test(client_creds)
870
871     def test_upn_dns_info_ex_mac(self):
872         mach_creds = self.get_mach_creds()
873         self._run_upn_dns_info_ex_test(mach_creds)
874
875     def test_upn_dns_info_ex_upn_user(self):
876         client_creds = self.get_cached_creds(
877             account_type=self.AccountType.USER,
878             opts={'upn': 'upn_dns_info_test_upn0@bar'})
879         self._run_upn_dns_info_ex_test(client_creds)
880
881     def test_upn_dns_info_ex_upn_mac(self):
882         mach_creds = self.get_cached_creds(
883             account_type=self.AccountType.COMPUTER,
884             opts={'upn': 'upn_dns_info_test_upn1@bar'})
885         self._run_upn_dns_info_ex_test(mach_creds)
886
887     def _run_upn_dns_info_ex_test(self, client_creds):
888         service_creds = self.get_service_creds()
889
890         samdb = self.get_samdb()
891         dn = client_creds.get_dn()
892
893         account_name = client_creds.get_username()
894         upn_name = client_creds.get_upn()
895         if upn_name is None:
896             realm = client_creds.get_realm().lower()
897             upn_name = f'{account_name}@{realm}'
898         sid = self.get_objectSid(samdb, dn)
899
900         tgt = self.get_tgt(client_creds,
901                            expected_account_name=account_name,
902                            expected_upn_name=upn_name,
903                            expected_sid=sid)
904
905         self._make_tgs_request(client_creds, service_creds, tgt,
906                                expected_account_name=account_name,
907                                expected_upn_name=upn_name,
908                                expected_sid=sid)
909
910     # Test making a TGS request.
911     def test_tgs_req(self):
912         creds = self._get_creds()
913         tgt = self._get_tgt(creds)
914         self._run_tgs(tgt, creds, expected_error=0)
915
916     def test_renew_req(self):
917         creds = self._get_creds()
918         tgt = self._get_tgt(creds, renewable=True)
919         self._renew_tgt(tgt, creds, expected_error=0,
920                         expect_pac_attrs=True,
921                         expect_pac_attrs_pac_request=True,
922                         expect_requester_sid=True)
923
924     def test_validate_req(self):
925         creds = self._get_creds()
926         tgt = self._get_tgt(creds, invalid=True)
927         self._validate_tgt(tgt, creds, expected_error=0,
928                            expect_pac_attrs=True,
929                            expect_pac_attrs_pac_request=True,
930                            expect_requester_sid=True)
931
932     def test_s4u2self_req(self):
933         creds = self._get_creds()
934         tgt = self._get_tgt(creds)
935         self._s4u2self(tgt, creds, expected_error=0)
936
937     def test_user2user_req(self):
938         creds = self._get_creds()
939         tgt = self._get_tgt(creds)
940         self._user2user(tgt, creds, expected_error=0)
941
942     def test_fast_req(self):
943         creds = self._get_creds()
944         tgt = self._get_tgt(creds)
945         self._fast(tgt, creds, expected_error=0)
946
947     def test_tgs_req_invalid(self):
948         creds = self._get_creds()
949         tgt = self._get_tgt(creds, invalid=True)
950         self._run_tgs(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
951
952     def test_s4u2self_req_invalid(self):
953         creds = self._get_creds()
954         tgt = self._get_tgt(creds, invalid=True)
955         self._s4u2self(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
956
957     def test_user2user_req_invalid(self):
958         creds = self._get_creds()
959         tgt = self._get_tgt(creds, invalid=True)
960         self._user2user(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
961
962     def test_fast_req_invalid(self):
963         creds = self._get_creds()
964         tgt = self._get_tgt(creds, invalid=True)
965         self._fast(tgt, creds, expected_error=KRB_ERR_TKT_NYV,
966                    expected_sname=self.get_krbtgt_sname())
967
968     def test_tgs_req_no_requester_sid(self):
969         creds = self._get_creds()
970         tgt = self._get_tgt(creds, remove_requester_sid=True)
971
972         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
973
974     def test_tgs_req_no_pac_attrs(self):
975         creds = self._get_creds()
976         tgt = self._get_tgt(creds, remove_pac_attrs=True)
977
978         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
979                       expect_pac_attrs=False)
980
981     def test_tgs_req_from_rodc_no_requester_sid(self):
982         creds = self._get_creds(replication_allowed=True,
983                                 revealed_to_rodc=True)
984         tgt = self._get_tgt(creds, from_rodc=True, remove_requester_sid=True)
985
986         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
987
988     def test_tgs_req_from_rodc_no_pac_attrs(self):
989         creds = self._get_creds(replication_allowed=True,
990                                 revealed_to_rodc=True)
991         tgt = self._get_tgt(creds, from_rodc=True, remove_pac_attrs=True)
992         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
993                       expect_pac_attrs=False)
994
995     # Test making a request without a PAC.
996     def test_tgs_no_pac(self):
997         creds = self._get_creds()
998         tgt = self._get_tgt(creds, remove_pac=True)
999         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1000
1001     def test_renew_no_pac(self):
1002         creds = self._get_creds()
1003         tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
1004         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1005
1006     def test_validate_no_pac(self):
1007         creds = self._get_creds()
1008         tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
1009         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1010
1011     def test_s4u2self_no_pac(self):
1012         creds = self._get_creds()
1013         tgt = self._get_tgt(creds, remove_pac=True)
1014         self._s4u2self(tgt, creds,
1015                        expected_error=KDC_ERR_TGT_REVOKED,
1016                        expect_edata=False)
1017
1018     def test_user2user_no_pac(self):
1019         creds = self._get_creds()
1020         tgt = self._get_tgt(creds, remove_pac=True)
1021         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1022
1023     def test_fast_no_pac(self):
1024         creds = self._get_creds()
1025         tgt = self._get_tgt(creds, remove_pac=True)
1026         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1027                    expected_sname=self.get_krbtgt_sname())
1028
1029     # Test making a request with authdata and without a PAC.
1030     def test_tgs_authdata_no_pac(self):
1031         creds = self._get_creds()
1032         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1033         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1034
1035     def test_renew_authdata_no_pac(self):
1036         creds = self._get_creds()
1037         tgt = self._get_tgt(creds, renewable=True, remove_pac=True,
1038                             allow_empty_authdata=True)
1039         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1040
1041     def test_validate_authdata_no_pac(self):
1042         creds = self._get_creds()
1043         tgt = self._get_tgt(creds, invalid=True, remove_pac=True,
1044                             allow_empty_authdata=True)
1045         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1046
1047     def test_s4u2self_authdata_no_pac(self):
1048         creds = self._get_creds()
1049         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1050         self._s4u2self(tgt, creds,
1051                        expected_error=KDC_ERR_TGT_REVOKED,
1052                        expect_edata=False)
1053
1054     def test_user2user_authdata_no_pac(self):
1055         creds = self._get_creds()
1056         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1057         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1058
1059     def test_fast_authdata_no_pac(self):
1060         creds = self._get_creds()
1061         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1062         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1063                    expected_sname=self.get_krbtgt_sname())
1064
1065     # Test changing the SID in the PAC to that of another account.
1066     def test_tgs_sid_mismatch_existing(self):
1067         creds = self._get_creds()
1068         existing_rid = self._get_existing_rid()
1069         tgt = self._get_tgt(creds, new_rid=existing_rid)
1070         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1071
1072     def test_renew_sid_mismatch_existing(self):
1073         creds = self._get_creds()
1074         existing_rid = self._get_existing_rid()
1075         tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
1076         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1077
1078     def test_validate_sid_mismatch_existing(self):
1079         creds = self._get_creds()
1080         existing_rid = self._get_existing_rid()
1081         tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
1082         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1083
1084     def test_s4u2self_sid_mismatch_existing(self):
1085         creds = self._get_creds()
1086         existing_rid = self._get_existing_rid()
1087         tgt = self._get_tgt(creds, new_rid=existing_rid)
1088         self._s4u2self(tgt, creds,
1089                        expected_error=KDC_ERR_TGT_REVOKED)
1090
1091     def test_user2user_sid_mismatch_existing(self):
1092         creds = self._get_creds()
1093         existing_rid = self._get_existing_rid()
1094         tgt = self._get_tgt(creds, new_rid=existing_rid)
1095         self._user2user(tgt, creds,
1096                         expected_error=KDC_ERR_TGT_REVOKED)
1097
1098     def test_fast_sid_mismatch_existing(self):
1099         creds = self._get_creds()
1100         existing_rid = self._get_existing_rid()
1101         tgt = self._get_tgt(creds, new_rid=existing_rid)
1102         self._fast(tgt, creds,
1103                    expected_error=KDC_ERR_TGT_REVOKED,
1104                    expected_sname=self.get_krbtgt_sname())
1105
1106     def test_requester_sid_mismatch_existing(self):
1107         creds = self._get_creds()
1108         existing_rid = self._get_existing_rid()
1109         tgt = self._get_tgt(creds, new_rid=existing_rid,
1110                             can_modify_logon_info=False)
1111         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1112
1113     def test_logon_info_sid_mismatch_existing(self):
1114         creds = self._get_creds()
1115         existing_rid = self._get_existing_rid()
1116         tgt = self._get_tgt(creds, new_rid=existing_rid,
1117                             can_modify_requester_sid=False)
1118         self._run_tgs(tgt, creds, expected_error=0)
1119
1120     def test_logon_info_only_sid_mismatch_existing(self):
1121         creds = self._get_creds()
1122         existing_rid = self._get_existing_rid()
1123         tgt = self._get_tgt(creds, new_rid=existing_rid,
1124                             remove_requester_sid=True)
1125         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1126
1127     # Test changing the SID in the PAC to a non-existent one.
1128     def test_tgs_sid_mismatch_nonexisting(self):
1129         creds = self._get_creds()
1130         nonexistent_rid = self._get_non_existent_rid()
1131         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1132         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1133
1134     def test_renew_sid_mismatch_nonexisting(self):
1135         creds = self._get_creds()
1136         nonexistent_rid = self._get_non_existent_rid()
1137         tgt = self._get_tgt(creds, renewable=True,
1138                             new_rid=nonexistent_rid)
1139         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1140
1141     def test_validate_sid_mismatch_nonexisting(self):
1142         creds = self._get_creds()
1143         nonexistent_rid = self._get_non_existent_rid()
1144         tgt = self._get_tgt(creds, invalid=True,
1145                             new_rid=nonexistent_rid)
1146         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1147
1148     def test_s4u2self_sid_mismatch_nonexisting(self):
1149         creds = self._get_creds()
1150         nonexistent_rid = self._get_non_existent_rid()
1151         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1152         self._s4u2self(tgt, creds,
1153                        expected_error=KDC_ERR_TGT_REVOKED)
1154
1155     def test_user2user_sid_mismatch_nonexisting(self):
1156         creds = self._get_creds()
1157         nonexistent_rid = self._get_non_existent_rid()
1158         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1159         self._user2user(tgt, creds,
1160                         expected_error=KDC_ERR_TGT_REVOKED)
1161
1162     def test_fast_sid_mismatch_nonexisting(self):
1163         creds = self._get_creds()
1164         nonexistent_rid = self._get_non_existent_rid()
1165         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1166         self._fast(tgt, creds,
1167                    expected_error=KDC_ERR_TGT_REVOKED,
1168                    expected_sname=self.get_krbtgt_sname())
1169
1170     def test_requester_sid_mismatch_nonexisting(self):
1171         creds = self._get_creds()
1172         nonexistent_rid = self._get_non_existent_rid()
1173         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1174                             can_modify_logon_info=False)
1175         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1176
1177     def test_logon_info_sid_mismatch_nonexisting(self):
1178         creds = self._get_creds()
1179         nonexistent_rid = self._get_non_existent_rid()
1180         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1181                             can_modify_requester_sid=False)
1182         self._run_tgs(tgt, creds, expected_error=0)
1183
1184     def test_logon_info_only_sid_mismatch_nonexisting(self):
1185         creds = self._get_creds()
1186         nonexistent_rid = self._get_non_existent_rid()
1187         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1188                             remove_requester_sid=True)
1189         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1190
1191     # Test with an RODC-issued ticket where the client is revealed to the RODC.
1192     def test_tgs_rodc_revealed(self):
1193         creds = self._get_creds(replication_allowed=True,
1194                                 revealed_to_rodc=True)
1195         tgt = self._get_tgt(creds, from_rodc=True)
1196         self._run_tgs(tgt, creds, expected_error=0)
1197
1198     def test_renew_rodc_revealed(self):
1199         creds = self._get_creds(replication_allowed=True,
1200                                 revealed_to_rodc=True)
1201         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1202         self._renew_tgt(tgt, creds, expected_error=0,
1203                         expect_pac_attrs=False,
1204                         expect_requester_sid=True)
1205
1206     def test_validate_rodc_revealed(self):
1207         creds = self._get_creds(replication_allowed=True,
1208                                 revealed_to_rodc=True)
1209         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1210         self._validate_tgt(tgt, creds, expected_error=0,
1211                            expect_pac_attrs=False,
1212                            expect_requester_sid=True)
1213
1214     # This test fails on Windows, which gives KDC_ERR_C_PRINCIPAL_UNKNOWN when
1215     # attempting to use S4U2Self with a TGT from an RODC.
1216     def test_s4u2self_rodc_revealed(self):
1217         creds = self._get_creds(replication_allowed=True,
1218                                 revealed_to_rodc=True)
1219         tgt = self._get_tgt(creds, from_rodc=True)
1220         self._s4u2self(tgt, creds,
1221                        expected_error=KDC_ERR_C_PRINCIPAL_UNKNOWN)
1222
1223     def test_user2user_rodc_revealed(self):
1224         creds = self._get_creds(replication_allowed=True,
1225                                 revealed_to_rodc=True)
1226         tgt = self._get_tgt(creds, from_rodc=True)
1227         self._user2user(tgt, creds, expected_error=0)
1228
1229     # Test with an RODC-issued ticket where the SID in the PAC is changed to
1230     # that of another account.
1231     def test_tgs_rodc_sid_mismatch_existing(self):
1232         creds = self._get_creds(replication_allowed=True,
1233                                 revealed_to_rodc=True)
1234         existing_rid = self._get_existing_rid(replication_allowed=True,
1235                                               revealed_to_rodc=True)
1236         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1237         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1238
1239     def test_renew_rodc_sid_mismatch_existing(self):
1240         creds = self._get_creds(replication_allowed=True,
1241                                 revealed_to_rodc=True)
1242         existing_rid = self._get_existing_rid(replication_allowed=True,
1243                                               revealed_to_rodc=True)
1244         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1245                             new_rid=existing_rid)
1246         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1247
1248     def test_validate_rodc_sid_mismatch_existing(self):
1249         creds = self._get_creds(replication_allowed=True,
1250                                 revealed_to_rodc=True)
1251         existing_rid = self._get_existing_rid(replication_allowed=True,
1252                                        revealed_to_rodc=True)
1253         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1254                             new_rid=existing_rid)
1255         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1256
1257     def test_s4u2self_rodc_sid_mismatch_existing(self):
1258         creds = self._get_creds(replication_allowed=True,
1259                                 revealed_to_rodc=True)
1260         existing_rid = self._get_existing_rid(replication_allowed=True,
1261                                               revealed_to_rodc=True)
1262         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1263         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1264
1265     def test_user2user_rodc_sid_mismatch_existing(self):
1266         creds = self._get_creds(replication_allowed=True,
1267                                 revealed_to_rodc=True)
1268         existing_rid = self._get_existing_rid(replication_allowed=True,
1269                                               revealed_to_rodc=True)
1270         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1271         self._user2user(tgt, creds,
1272                         expected_error=KDC_ERR_TGT_REVOKED)
1273
1274     def test_fast_rodc_sid_mismatch_existing(self):
1275         creds = self._get_creds(replication_allowed=True,
1276                                 revealed_to_rodc=True)
1277         existing_rid = self._get_existing_rid(replication_allowed=True,
1278                                               revealed_to_rodc=True)
1279         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1280         self._fast(tgt, creds,
1281                    expected_error=KDC_ERR_TGT_REVOKED,
1282                    expected_sname=self.get_krbtgt_sname())
1283
1284     def test_tgs_rodc_requester_sid_mismatch_existing(self):
1285         creds = self._get_creds(replication_allowed=True,
1286                                 revealed_to_rodc=True)
1287         existing_rid = self._get_existing_rid(replication_allowed=True,
1288                                               revealed_to_rodc=True)
1289         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1290                             can_modify_logon_info=False)
1291         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1292
1293     def test_tgs_rodc_logon_info_sid_mismatch_existing(self):
1294         creds = self._get_creds(replication_allowed=True,
1295                                 revealed_to_rodc=True)
1296         existing_rid = self._get_existing_rid(replication_allowed=True,
1297                                               revealed_to_rodc=True)
1298         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1299                             can_modify_requester_sid=False)
1300         self._run_tgs(tgt, creds, expected_error=0)
1301
1302     def test_tgs_rodc_logon_info_only_sid_mismatch_existing(self):
1303         creds = self._get_creds(replication_allowed=True,
1304                                 revealed_to_rodc=True)
1305         existing_rid = self._get_existing_rid(replication_allowed=True,
1306                                               revealed_to_rodc=True)
1307         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1308                             remove_requester_sid=True)
1309         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1310
1311     # Test with an RODC-issued ticket where the SID in the PAC is changed to a
1312     # non-existent one.
1313     def test_tgs_rodc_sid_mismatch_nonexisting(self):
1314         creds = self._get_creds(replication_allowed=True,
1315                                 revealed_to_rodc=True)
1316         nonexistent_rid = self._get_non_existent_rid()
1317         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1318         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1319
1320     def test_renew_rodc_sid_mismatch_nonexisting(self):
1321         creds = self._get_creds(replication_allowed=True,
1322                                 revealed_to_rodc=True)
1323         nonexistent_rid = self._get_non_existent_rid()
1324         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1325                             new_rid=nonexistent_rid)
1326         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1327
1328     def test_validate_rodc_sid_mismatch_nonexisting(self):
1329         creds = self._get_creds(replication_allowed=True,
1330                                 revealed_to_rodc=True)
1331         nonexistent_rid = self._get_non_existent_rid()
1332         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1333                             new_rid=nonexistent_rid)
1334         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1335
1336     def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
1337         creds = self._get_creds(replication_allowed=True,
1338                                 revealed_to_rodc=True)
1339         nonexistent_rid = self._get_non_existent_rid()
1340         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1341         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1342
1343     def test_user2user_rodc_sid_mismatch_nonexisting(self):
1344         creds = self._get_creds(replication_allowed=True,
1345                                 revealed_to_rodc=True)
1346         nonexistent_rid = self._get_non_existent_rid()
1347         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1348         self._user2user(tgt, creds,
1349                         expected_error=KDC_ERR_TGT_REVOKED)
1350
1351     def test_fast_rodc_sid_mismatch_nonexisting(self):
1352         creds = self._get_creds(replication_allowed=True,
1353                                 revealed_to_rodc=True)
1354         nonexistent_rid = self._get_non_existent_rid()
1355         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1356         self._fast(tgt, creds,
1357                    expected_error=KDC_ERR_TGT_REVOKED,
1358                    expected_sname=self.get_krbtgt_sname())
1359
1360     def test_tgs_rodc_requester_sid_mismatch_nonexisting(self):
1361         creds = self._get_creds(replication_allowed=True,
1362                                 revealed_to_rodc=True)
1363         nonexistent_rid = self._get_non_existent_rid()
1364         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1365                             can_modify_logon_info=False)
1366         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1367
1368     def test_tgs_rodc_logon_info_sid_mismatch_nonexisting(self):
1369         creds = self._get_creds(replication_allowed=True,
1370                                 revealed_to_rodc=True)
1371         nonexistent_rid = self._get_non_existent_rid()
1372         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1373                             can_modify_requester_sid=False)
1374         self._run_tgs(tgt, creds, expected_error=0)
1375
1376     def test_tgs_rodc_logon_info_only_sid_mismatch_nonexisting(self):
1377         creds = self._get_creds(replication_allowed=True,
1378                                 revealed_to_rodc=True)
1379         nonexistent_rid = self._get_non_existent_rid()
1380         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1381                             remove_requester_sid=True)
1382         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1383
1384     # Test with an RODC-issued ticket where the client is not revealed to the
1385     # RODC.
1386     def test_tgs_rodc_not_revealed(self):
1387         creds = self._get_creds(replication_allowed=True)
1388         tgt = self._get_tgt(creds, from_rodc=True)
1389         # TODO: error code
1390         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1391
1392     def test_renew_rodc_not_revealed(self):
1393         creds = self._get_creds(replication_allowed=True)
1394         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1395         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1396
1397     def test_validate_rodc_not_revealed(self):
1398         creds = self._get_creds(replication_allowed=True)
1399         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1400         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1401
1402     def test_s4u2self_rodc_not_revealed(self):
1403         creds = self._get_creds(replication_allowed=True)
1404         tgt = self._get_tgt(creds, from_rodc=True)
1405         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1406
1407     def test_user2user_rodc_not_revealed(self):
1408         creds = self._get_creds(replication_allowed=True)
1409         tgt = self._get_tgt(creds, from_rodc=True)
1410         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1411
1412     # Test with an RODC-issued ticket where the RODC account does not have the
1413     # PARTIAL_SECRETS bit set.
1414     def test_tgs_rodc_no_partial_secrets(self):
1415         creds = self._get_creds(replication_allowed=True,
1416                                 revealed_to_rodc=True)
1417         tgt = self._get_tgt(creds, from_rodc=True)
1418         self._remove_rodc_partial_secrets()
1419         self._run_tgs(tgt, creds, expected_error=KDC_ERR_POLICY)
1420
1421     def test_renew_rodc_no_partial_secrets(self):
1422         creds = self._get_creds(replication_allowed=True,
1423                                 revealed_to_rodc=True)
1424         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1425         self._remove_rodc_partial_secrets()
1426         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_POLICY)
1427
1428     def test_validate_rodc_no_partial_secrets(self):
1429         creds = self._get_creds(replication_allowed=True,
1430                                 revealed_to_rodc=True)
1431         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1432         self._remove_rodc_partial_secrets()
1433         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_POLICY)
1434
1435     def test_s4u2self_rodc_no_partial_secrets(self):
1436         creds = self._get_creds(replication_allowed=True,
1437                                 revealed_to_rodc=True)
1438         tgt = self._get_tgt(creds, from_rodc=True)
1439         self._remove_rodc_partial_secrets()
1440         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1441
1442     def test_user2user_rodc_no_partial_secrets(self):
1443         creds = self._get_creds(replication_allowed=True,
1444                                 revealed_to_rodc=True)
1445         tgt = self._get_tgt(creds, from_rodc=True)
1446         self._remove_rodc_partial_secrets()
1447         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1448
1449     def test_fast_rodc_no_partial_secrets(self):
1450         creds = self._get_creds(replication_allowed=True,
1451                                 revealed_to_rodc=True)
1452         tgt = self._get_tgt(creds, from_rodc=True)
1453         self._remove_rodc_partial_secrets()
1454         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1455                    expected_sname=self.get_krbtgt_sname())
1456
1457     # Test with an RODC-issued ticket where the RODC account does not have an
1458     # msDS-KrbTgtLink.
1459     def test_tgs_rodc_no_krbtgt_link(self):
1460         creds = self._get_creds(replication_allowed=True,
1461                                 revealed_to_rodc=True)
1462         tgt = self._get_tgt(creds, from_rodc=True)
1463         self._remove_rodc_krbtgt_link()
1464         self._run_tgs(tgt, creds, expected_error=KDC_ERR_POLICY)
1465
1466     def test_renew_rodc_no_krbtgt_link(self):
1467         creds = self._get_creds(replication_allowed=True,
1468                                 revealed_to_rodc=True)
1469         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1470         self._remove_rodc_krbtgt_link()
1471         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_POLICY)
1472
1473     def test_validate_rodc_no_krbtgt_link(self):
1474         creds = self._get_creds(replication_allowed=True,
1475                                 revealed_to_rodc=True)
1476         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1477         self._remove_rodc_krbtgt_link()
1478         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_POLICY)
1479
1480     def test_s4u2self_rodc_no_krbtgt_link(self):
1481         creds = self._get_creds(replication_allowed=True,
1482                                 revealed_to_rodc=True)
1483         tgt = self._get_tgt(creds, from_rodc=True)
1484         self._remove_rodc_krbtgt_link()
1485         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1486
1487     def test_user2user_rodc_no_krbtgt_link(self):
1488         creds = self._get_creds(replication_allowed=True,
1489                                 revealed_to_rodc=True)
1490         tgt = self._get_tgt(creds, from_rodc=True)
1491         self._remove_rodc_krbtgt_link()
1492         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1493
1494     def test_fast_rodc_no_krbtgt_link(self):
1495         creds = self._get_creds(replication_allowed=True,
1496                                 revealed_to_rodc=True)
1497         tgt = self._get_tgt(creds, from_rodc=True)
1498         self._remove_rodc_krbtgt_link()
1499         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1500                    expected_sname=self.get_krbtgt_sname())
1501
1502     # Test with an RODC-issued ticket where the client is not allowed to
1503     # replicate to the RODC.
1504     def test_tgs_rodc_not_allowed(self):
1505         creds = self._get_creds(revealed_to_rodc=True)
1506         tgt = self._get_tgt(creds, from_rodc=True)
1507         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1508
1509     def test_renew_rodc_not_allowed(self):
1510         creds = self._get_creds(revealed_to_rodc=True)
1511         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1512         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1513
1514     def test_validate_rodc_not_allowed(self):
1515         creds = self._get_creds(revealed_to_rodc=True)
1516         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1517         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1518
1519     def test_s4u2self_rodc_not_allowed(self):
1520         creds = self._get_creds(revealed_to_rodc=True)
1521         tgt = self._get_tgt(creds, from_rodc=True)
1522         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1523
1524     def test_user2user_rodc_not_allowed(self):
1525         creds = self._get_creds(revealed_to_rodc=True)
1526         tgt = self._get_tgt(creds, from_rodc=True)
1527         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1528
1529     def test_fast_rodc_not_allowed(self):
1530         creds = self._get_creds(revealed_to_rodc=True)
1531         tgt = self._get_tgt(creds, from_rodc=True)
1532         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1533                    expected_sname=self.get_krbtgt_sname())
1534
1535     # Test with an RODC-issued ticket where the client is denied from
1536     # replicating to the RODC.
1537     def test_tgs_rodc_denied(self):
1538         creds = self._get_creds(replication_denied=True,
1539                                 revealed_to_rodc=True)
1540         tgt = self._get_tgt(creds, from_rodc=True)
1541         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1542
1543     def test_renew_rodc_denied(self):
1544         creds = self._get_creds(replication_denied=True,
1545                                 revealed_to_rodc=True)
1546         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1547         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1548
1549     def test_validate_rodc_denied(self):
1550         creds = self._get_creds(replication_denied=True,
1551                                 revealed_to_rodc=True)
1552         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1553         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1554
1555     def test_s4u2self_rodc_denied(self):
1556         creds = self._get_creds(replication_denied=True,
1557                                 revealed_to_rodc=True)
1558         tgt = self._get_tgt(creds, from_rodc=True)
1559         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1560
1561     def test_user2user_rodc_denied(self):
1562         creds = self._get_creds(replication_denied=True,
1563                                 revealed_to_rodc=True)
1564         tgt = self._get_tgt(creds, from_rodc=True)
1565         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1566
1567     def test_fast_rodc_denied(self):
1568         creds = self._get_creds(replication_denied=True,
1569                                 revealed_to_rodc=True)
1570         tgt = self._get_tgt(creds, from_rodc=True)
1571         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1572                    expected_sname=self.get_krbtgt_sname())
1573
1574     # Test with an RODC-issued ticket where the client is both allowed and
1575     # denied replicating to the RODC.
1576     def test_tgs_rodc_allowed_denied(self):
1577         creds = self._get_creds(replication_allowed=True,
1578                                 replication_denied=True,
1579                                 revealed_to_rodc=True)
1580         tgt = self._get_tgt(creds, from_rodc=True)
1581         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1582
1583     def test_renew_rodc_allowed_denied(self):
1584         creds = self._get_creds(replication_allowed=True,
1585                                 replication_denied=True,
1586                                 revealed_to_rodc=True)
1587         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1588         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1589
1590     def test_validate_rodc_allowed_denied(self):
1591         creds = self._get_creds(replication_allowed=True,
1592                                 replication_denied=True,
1593                                 revealed_to_rodc=True)
1594         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1595         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1596
1597     def test_s4u2self_rodc_allowed_denied(self):
1598         creds = self._get_creds(replication_allowed=True,
1599                                 replication_denied=True,
1600                                 revealed_to_rodc=True)
1601         tgt = self._get_tgt(creds, from_rodc=True)
1602         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1603
1604     def test_user2user_rodc_allowed_denied(self):
1605         creds = self._get_creds(replication_allowed=True,
1606                                 replication_denied=True,
1607                                 revealed_to_rodc=True)
1608         tgt = self._get_tgt(creds, from_rodc=True)
1609         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1610
1611     def test_fast_rodc_allowed_denied(self):
1612         creds = self._get_creds(replication_allowed=True,
1613                                 replication_denied=True,
1614                                 revealed_to_rodc=True)
1615         tgt = self._get_tgt(creds, from_rodc=True)
1616         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1617                    expected_sname=self.get_krbtgt_sname())
1618
1619     # Test making a TGS request with an RC4-encrypted TGT.
1620     def test_tgs_rc4(self):
1621         creds = self._get_creds()
1622         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1623         self._run_tgs(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1624                                            KDC_ERR_BADKEYVER),
1625                       expect_edata=True,
1626                       # We aren’t particular about whether or not we get an
1627                       # NTSTATUS.
1628                       expect_status=None,
1629                       expected_status=ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1630
1631     def test_renew_rc4(self):
1632         creds = self._get_creds()
1633         tgt = self._get_tgt(creds, renewable=True, etype=kcrypto.Enctype.RC4)
1634         self._renew_tgt(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1635                                                     KDC_ERR_BADKEYVER),
1636                         expect_pac_attrs=True,
1637                         expect_pac_attrs_pac_request=True,
1638                         expect_requester_sid=True)
1639
1640     def test_validate_rc4(self):
1641         creds = self._get_creds()
1642         tgt = self._get_tgt(creds, invalid=True, etype=kcrypto.Enctype.RC4)
1643         self._validate_tgt(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1644                                                        KDC_ERR_BADKEYVER),
1645                            expect_pac_attrs=True,
1646                            expect_pac_attrs_pac_request=True,
1647                            expect_requester_sid=True)
1648
1649     def test_s4u2self_rc4(self):
1650         creds = self._get_creds()
1651         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1652         self._s4u2self(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1653                                                    KDC_ERR_BADKEYVER),
1654                        expect_edata=True,
1655                        # We aren’t particular about whether or not we get an
1656                        # NTSTATUS.
1657                        expect_status=None,
1658                        expected_status=ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1659
1660     def test_user2user_rc4(self):
1661         creds = self._get_creds()
1662         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1663         self._user2user(tgt, creds, expected_error=KDC_ERR_ETYPE_NOSUPP)
1664
1665     def test_fast_rc4(self):
1666         creds = self._get_creds()
1667         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1668         self._fast(tgt, creds, expected_error=KDC_ERR_GENERIC,
1669                    expect_edata=self.expect_padata_outer)
1670
1671     # Test user-to-user with incorrect service principal names.
1672     def test_user2user_matching_sname_host(self):
1673         creds = self._get_creds()
1674         tgt = self._get_tgt(creds)
1675
1676         user_name = creds.get_username()
1677         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1678                                           names=['host', user_name])
1679
1680         self._user2user(tgt, creds, sname=sname,
1681                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1682
1683     def test_user2user_matching_sname_no_host(self):
1684         creds = self._get_creds()
1685         tgt = self._get_tgt(creds)
1686
1687         user_name = creds.get_username()
1688         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1689                                           names=[user_name])
1690
1691         self._user2user(tgt, creds, sname=sname, expected_error=0)
1692
1693     def test_user2user_wrong_sname(self):
1694         creds = self._get_creds()
1695         tgt = self._get_tgt(creds)
1696
1697         other_creds = self._get_mach_creds()
1698         user_name = other_creds.get_username()
1699         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1700                                           names=[user_name])
1701
1702         self._user2user(tgt, creds, sname=sname,
1703                         expected_error=KDC_ERR_BADMATCH)
1704
1705     def test_user2user_other_sname(self):
1706         other_name = self.get_new_username()
1707         spn = f'host/{other_name}'
1708         creds = self.get_cached_creds(
1709             account_type=self.AccountType.COMPUTER,
1710             opts={'spn': spn})
1711         tgt = self._get_tgt(creds)
1712
1713         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1714                                           names=['host', other_name])
1715
1716         self._user2user(tgt, creds, sname=sname, expected_error=0)
1717
1718     def test_user2user_wrong_sname_krbtgt(self):
1719         creds = self._get_creds()
1720         tgt = self._get_tgt(creds)
1721
1722         sname = self.get_krbtgt_sname()
1723
1724         self._user2user(tgt, creds, sname=sname,
1725                         expected_error=KDC_ERR_BADMATCH)
1726
1727     def test_user2user_wrong_srealm(self):
1728         creds = self._get_creds()
1729         tgt = self._get_tgt(creds)
1730
1731         self._user2user(tgt, creds, srealm='OTHER.REALM',
1732                         expected_error=(KDC_ERR_WRONG_REALM,
1733                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1734
1735     def test_user2user_tgt_correct_realm(self):
1736         creds = self._get_creds()
1737         tgt = self._get_tgt(creds)
1738
1739         realm = creds.get_realm().encode('utf-8')
1740         tgt = self._modify_tgt(tgt, realm)
1741
1742         self._user2user(tgt, creds,
1743                         expected_error=0)
1744
1745     def test_user2user_tgt_wrong_realm(self):
1746         creds = self._get_creds()
1747         tgt = self._get_tgt(creds)
1748
1749         tgt = self._modify_tgt(tgt, b'OTHER.REALM')
1750
1751         self._user2user(tgt, creds,
1752                         expected_error=0)
1753
1754     def test_user2user_tgt_correct_cname(self):
1755         creds = self._get_creds()
1756         tgt = self._get_tgt(creds)
1757
1758         user_name = creds.get_username()
1759         user_name = user_name.encode('utf-8')
1760         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1761                                           names=[user_name])
1762
1763         tgt = self._modify_tgt(tgt, cname=cname)
1764
1765         self._user2user(tgt, creds, expected_error=0)
1766
1767     def test_user2user_tgt_other_cname(self):
1768         samdb = self.get_samdb()
1769
1770         other_name = self.get_new_username()
1771         upn = f'{other_name}@{samdb.domain_dns_name()}'
1772
1773         creds = self.get_cached_creds(
1774             account_type=self.AccountType.COMPUTER,
1775             opts={'upn': upn})
1776         tgt = self._get_tgt(creds)
1777
1778         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1779                                           names=[other_name.encode('utf-8')])
1780
1781         tgt = self._modify_tgt(tgt, cname=cname)
1782
1783         self._user2user(tgt, creds, expected_error=0)
1784
1785     def test_user2user_tgt_cname_host(self):
1786         creds = self._get_creds()
1787         tgt = self._get_tgt(creds)
1788
1789         user_name = creds.get_username()
1790         user_name = user_name.encode('utf-8')
1791         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1792                                           names=[b'host', user_name])
1793
1794         tgt = self._modify_tgt(tgt, cname=cname)
1795
1796         self._user2user(tgt, creds,
1797                         expected_error=(KDC_ERR_TGT_REVOKED,
1798                                         KDC_ERR_C_PRINCIPAL_UNKNOWN))
1799
1800     def test_user2user_non_existent_sname(self):
1801         creds = self._get_creds()
1802         tgt = self._get_tgt(creds)
1803
1804         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1805                                           names=['host', 'non_existent_user'])
1806
1807         self._user2user(tgt, creds, sname=sname,
1808                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1809
1810     def test_user2user_no_sname(self):
1811         creds = self._get_creds()
1812         tgt = self._get_tgt(creds)
1813
1814         self._user2user(tgt, creds, sname=False,
1815                         expected_error=(KDC_ERR_GENERIC,
1816                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1817
1818     def test_tgs_service_ticket(self):
1819         creds = self._get_creds()
1820         tgt = self._get_tgt(creds)
1821
1822         service_creds = self.get_service_creds()
1823         service_ticket = self.get_service_ticket(tgt, service_creds)
1824
1825         self._run_tgs(service_ticket, creds,
1826                       expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1827
1828     def test_renew_service_ticket(self):
1829         creds = self._get_creds()
1830         tgt = self._get_tgt(creds)
1831
1832         service_creds = self.get_service_creds()
1833         service_ticket = self.get_service_ticket(tgt, service_creds)
1834
1835         service_ticket = self.modified_ticket(
1836             service_ticket,
1837             modify_fn=self._modify_renewable,
1838             checksum_keys=self.get_krbtgt_checksum_key())
1839
1840         self._renew_tgt(service_ticket, creds,
1841                         expected_error=KDC_ERR_POLICY)
1842
1843     def test_validate_service_ticket(self):
1844         creds = self._get_creds()
1845         tgt = self._get_tgt(creds)
1846
1847         service_creds = self.get_service_creds()
1848         service_ticket = self.get_service_ticket(tgt, service_creds)
1849
1850         service_ticket = self.modified_ticket(
1851             service_ticket,
1852             modify_fn=self._modify_invalid,
1853             checksum_keys=self.get_krbtgt_checksum_key())
1854
1855         self._validate_tgt(service_ticket, creds,
1856                            expected_error=KDC_ERR_POLICY)
1857
1858     def test_s4u2self_service_ticket(self):
1859         creds = self._get_creds()
1860         tgt = self._get_tgt(creds)
1861
1862         service_creds = self.get_service_creds()
1863         service_ticket = self.get_service_ticket(tgt, service_creds)
1864
1865         self._s4u2self(service_ticket, creds,
1866                        expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1867
1868     def test_user2user_service_ticket(self):
1869         creds = self._get_creds()
1870         tgt = self._get_tgt(creds)
1871
1872         service_creds = self.get_service_creds()
1873         service_ticket = self.get_service_ticket(tgt, service_creds)
1874
1875         self._user2user(service_ticket, creds,
1876                         expected_error=(KDC_ERR_MODIFIED, KDC_ERR_POLICY))
1877
1878     # Expected to fail against Windows, which does not produce an error.
1879     def test_fast_service_ticket(self):
1880         creds = self._get_creds()
1881         tgt = self._get_tgt(creds)
1882
1883         service_creds = self.get_service_creds()
1884         service_ticket = self.get_service_ticket(tgt, service_creds)
1885
1886         self._fast(service_ticket, creds,
1887                    expected_error=(KDC_ERR_POLICY,
1888                                    KDC_ERR_S_PRINCIPAL_UNKNOWN))
1889
1890     def test_pac_attrs_none(self):
1891         creds = self._get_creds()
1892         self.get_tgt(creds, pac_request=None,
1893                      expect_pac=True,
1894                      expect_pac_attrs=True,
1895                      expect_pac_attrs_pac_request=None)
1896
1897     def test_pac_attrs_false(self):
1898         creds = self._get_creds()
1899         self.get_tgt(creds, pac_request=False,
1900                      expect_pac=True,
1901                      expect_pac_attrs=True,
1902                      expect_pac_attrs_pac_request=False)
1903
1904     def test_pac_attrs_true(self):
1905         creds = self._get_creds()
1906         self.get_tgt(creds, pac_request=True,
1907                      expect_pac=True,
1908                      expect_pac_attrs=True,
1909                      expect_pac_attrs_pac_request=True)
1910
1911     def test_pac_attrs_renew_none(self):
1912         creds = self._get_creds()
1913         tgt = self.get_tgt(creds, pac_request=None,
1914                            expect_pac=True,
1915                            expect_pac_attrs=True,
1916                            expect_pac_attrs_pac_request=None)
1917         tgt = self._modify_tgt(tgt, renewable=True)
1918
1919         self._renew_tgt(tgt, creds, expected_error=0,
1920                         expect_pac=True,
1921                         expect_pac_attrs=True,
1922                         expect_pac_attrs_pac_request=None,
1923                         expect_requester_sid=True)
1924
1925     def test_pac_attrs_renew_false(self):
1926         creds = self._get_creds()
1927         tgt = self.get_tgt(creds, pac_request=False,
1928                            expect_pac=True,
1929                            expect_pac_attrs=True,
1930                            expect_pac_attrs_pac_request=False)
1931         tgt = self._modify_tgt(tgt, renewable=True)
1932
1933         self._renew_tgt(tgt, creds, expected_error=0,
1934                         expect_pac=True,
1935                         expect_pac_attrs=True,
1936                         expect_pac_attrs_pac_request=False,
1937                         expect_requester_sid=True)
1938
1939     def test_pac_attrs_renew_true(self):
1940         creds = self._get_creds()
1941         tgt = self.get_tgt(creds, pac_request=True,
1942                            expect_pac=True,
1943                            expect_pac_attrs=True,
1944                            expect_pac_attrs_pac_request=True)
1945         tgt = self._modify_tgt(tgt, renewable=True)
1946
1947         self._renew_tgt(tgt, creds, expected_error=0,
1948                         expect_pac=True,
1949                         expect_pac_attrs=True,
1950                         expect_pac_attrs_pac_request=True,
1951                         expect_requester_sid=True)
1952
1953     def test_pac_attrs_rodc_renew_none(self):
1954         creds = self._get_creds(replication_allowed=True,
1955                                 revealed_to_rodc=True)
1956         tgt = self.get_tgt(creds, pac_request=None,
1957                            expect_pac=True,
1958                            expect_pac_attrs=True,
1959                            expect_pac_attrs_pac_request=None)
1960         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1961
1962         self._renew_tgt(tgt, creds, expected_error=0,
1963                         expect_pac=True,
1964                         expect_pac_attrs=False,
1965                         expect_requester_sid=True)
1966
1967     def test_pac_attrs_rodc_renew_false(self):
1968         creds = self._get_creds(replication_allowed=True,
1969                                 revealed_to_rodc=True)
1970         tgt = self.get_tgt(creds, pac_request=False,
1971                            expect_pac=True,
1972                            expect_pac_attrs=True,
1973                            expect_pac_attrs_pac_request=False)
1974         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1975
1976         self._renew_tgt(tgt, creds, expected_error=0,
1977                         expect_pac=True,
1978                         expect_pac_attrs=False,
1979                         expect_requester_sid=True)
1980
1981     def test_pac_attrs_rodc_renew_true(self):
1982         creds = self._get_creds(replication_allowed=True,
1983                                 revealed_to_rodc=True)
1984         tgt = self.get_tgt(creds, pac_request=True,
1985                            expect_pac=True,
1986                            expect_pac_attrs=True,
1987                            expect_pac_attrs_pac_request=True)
1988         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1989
1990         self._renew_tgt(tgt, creds, expected_error=0,
1991                         expect_pac=True,
1992                         expect_pac_attrs=False,
1993                         expect_requester_sid=True)
1994
1995     def test_pac_attrs_missing_renew_none(self):
1996         creds = self._get_creds()
1997         tgt = self.get_tgt(creds, pac_request=None,
1998                            expect_pac=True,
1999                            expect_pac_attrs=True,
2000                            expect_pac_attrs_pac_request=None)
2001         tgt = self._modify_tgt(tgt, renewable=True,
2002                                remove_pac_attrs=True)
2003
2004         self._renew_tgt(tgt, creds, expected_error=0,
2005                         expect_pac=True,
2006                         expect_pac_attrs=False,
2007                         expect_requester_sid=True)
2008
2009     def test_pac_attrs_missing_renew_false(self):
2010         creds = self._get_creds()
2011         tgt = self.get_tgt(creds, pac_request=False,
2012                            expect_pac=True,
2013                            expect_pac_attrs=True,
2014                            expect_pac_attrs_pac_request=False)
2015         tgt = self._modify_tgt(tgt, renewable=True,
2016                                remove_pac_attrs=True)
2017
2018         self._renew_tgt(tgt, creds, expected_error=0,
2019                         expect_pac=True,
2020                         expect_pac_attrs=False,
2021                         expect_requester_sid=True)
2022
2023     def test_pac_attrs_missing_renew_true(self):
2024         creds = self._get_creds()
2025         tgt = self.get_tgt(creds, pac_request=True,
2026                            expect_pac=True,
2027                            expect_pac_attrs=True,
2028                            expect_pac_attrs_pac_request=True)
2029         tgt = self._modify_tgt(tgt, renewable=True,
2030                                remove_pac_attrs=True)
2031
2032         self._renew_tgt(tgt, creds, expected_error=0,
2033                         expect_pac=True,
2034                         expect_pac_attrs=False,
2035                         expect_requester_sid=True)
2036
2037     def test_pac_attrs_missing_rodc_renew_none(self):
2038         creds = self._get_creds(replication_allowed=True,
2039                                 revealed_to_rodc=True)
2040         tgt = self.get_tgt(creds, pac_request=None,
2041                            expect_pac=True,
2042                            expect_pac_attrs=True,
2043                            expect_pac_attrs_pac_request=None)
2044         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2045                                remove_pac_attrs=True)
2046
2047         self._renew_tgt(tgt, creds, expected_error=0,
2048                         expect_pac=True,
2049                         expect_pac_attrs=False,
2050                         expect_requester_sid=True)
2051
2052     def test_pac_attrs_missing_rodc_renew_false(self):
2053         creds = self._get_creds(replication_allowed=True,
2054                                 revealed_to_rodc=True)
2055         tgt = self.get_tgt(creds, pac_request=False,
2056                            expect_pac=True,
2057                            expect_pac_attrs=True,
2058                            expect_pac_attrs_pac_request=False)
2059         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2060                                remove_pac_attrs=True)
2061
2062         self._renew_tgt(tgt, creds, expected_error=0,
2063                         expect_pac=True,
2064                         expect_pac_attrs=False,
2065                         expect_requester_sid=True)
2066
2067     def test_pac_attrs_missing_rodc_renew_true(self):
2068         creds = self._get_creds(replication_allowed=True,
2069                                 revealed_to_rodc=True)
2070         tgt = self.get_tgt(creds, pac_request=True,
2071                            expect_pac=True,
2072                            expect_pac_attrs=True,
2073                            expect_pac_attrs_pac_request=True)
2074         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2075                                remove_pac_attrs=True)
2076
2077         self._renew_tgt(tgt, creds, expected_error=0,
2078                         expect_pac=True,
2079                         expect_pac_attrs=False,
2080                         expect_requester_sid=True)
2081
2082     def test_tgs_pac_attrs_none(self):
2083         creds = self._get_creds()
2084         tgt = self.get_tgt(creds, pac_request=None,
2085                            expect_pac=True,
2086                            expect_pac_attrs=True,
2087                            expect_pac_attrs_pac_request=None)
2088
2089         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
2090                       expect_pac_attrs=False)
2091
2092     def test_tgs_pac_attrs_false(self):
2093         creds = self._get_creds()
2094         tgt = self.get_tgt(creds, pac_request=False,
2095                            expect_pac=True,
2096                            expect_pac_attrs=True,
2097                            expect_pac_attrs_pac_request=False)
2098
2099         self._run_tgs(tgt, creds, expected_error=0, expect_pac=False,
2100                       expect_pac_attrs=False)
2101
2102     def test_tgs_pac_attrs_true(self):
2103         creds = self._get_creds()
2104         tgt = self.get_tgt(creds, pac_request=True,
2105                            expect_pac=True,
2106                            expect_pac_attrs=True,
2107                            expect_pac_attrs_pac_request=True)
2108
2109         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
2110                       expect_pac_attrs=False)
2111
2112     def test_as_requester_sid(self):
2113         creds = self._get_creds()
2114
2115         samdb = self.get_samdb()
2116         sid = self.get_objectSid(samdb, creds.get_dn())
2117
2118         self.get_tgt(creds, pac_request=None,
2119                      expect_pac=True,
2120                      expected_sid=sid,
2121                      expect_requester_sid=True)
2122
2123     def test_tgs_requester_sid(self):
2124         creds = self._get_creds()
2125
2126         samdb = self.get_samdb()
2127         sid = self.get_objectSid(samdb, creds.get_dn())
2128
2129         tgt = self.get_tgt(creds, pac_request=None,
2130                            expect_pac=True,
2131                            expected_sid=sid,
2132                            expect_requester_sid=True)
2133
2134         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
2135                       expect_requester_sid=False)
2136
2137     def test_tgs_requester_sid_renew(self):
2138         creds = self._get_creds()
2139
2140         samdb = self.get_samdb()
2141         sid = self.get_objectSid(samdb, creds.get_dn())
2142
2143         tgt = self.get_tgt(creds, pac_request=None,
2144                            expect_pac=True,
2145                            expected_sid=sid,
2146                            expect_requester_sid=True)
2147         tgt = self._modify_tgt(tgt, renewable=True)
2148
2149         self._renew_tgt(tgt, creds, expected_error=0, expect_pac=True,
2150                         expect_pac_attrs=True,
2151                         expect_pac_attrs_pac_request=None,
2152                         expected_sid=sid,
2153                         expect_requester_sid=True)
2154
2155     def test_tgs_requester_sid_rodc_renew(self):
2156         creds = self._get_creds(replication_allowed=True,
2157                                 revealed_to_rodc=True)
2158
2159         samdb = self.get_samdb()
2160         sid = self.get_objectSid(samdb, creds.get_dn())
2161
2162         tgt = self.get_tgt(creds, pac_request=None,
2163                            expect_pac=True,
2164                            expected_sid=sid,
2165                            expect_requester_sid=True)
2166         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
2167
2168         self._renew_tgt(tgt, creds, expected_error=0, expect_pac=True,
2169                         expect_pac_attrs=False,
2170                         expected_sid=sid,
2171                         expect_requester_sid=True)
2172
2173     def test_tgs_requester_sid_missing_renew(self):
2174         creds = self._get_creds()
2175
2176         samdb = self.get_samdb()
2177         sid = self.get_objectSid(samdb, creds.get_dn())
2178
2179         tgt = self.get_tgt(creds, pac_request=None,
2180                            expect_pac=True,
2181                            expected_sid=sid,
2182                            expect_requester_sid=True)
2183         tgt = self._modify_tgt(tgt, renewable=True,
2184                                remove_requester_sid=True)
2185
2186         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
2187
2188     def test_tgs_requester_sid_missing_rodc_renew(self):
2189         creds = self._get_creds(replication_allowed=True,
2190                                 revealed_to_rodc=True)
2191
2192         samdb = self.get_samdb()
2193         sid = self.get_objectSid(samdb, creds.get_dn())
2194
2195         tgt = self.get_tgt(creds, pac_request=None,
2196                            expect_pac=True,
2197                            expected_sid=sid,
2198                            expect_requester_sid=True)
2199         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2200                                remove_requester_sid=True)
2201
2202         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
2203
2204     def test_tgs_requester_sid_validate(self):
2205         creds = self._get_creds()
2206
2207         samdb = self.get_samdb()
2208         sid = self.get_objectSid(samdb, creds.get_dn())
2209
2210         tgt = self.get_tgt(creds, pac_request=None,
2211                            expect_pac=True,
2212                            expected_sid=sid,
2213                            expect_requester_sid=True)
2214         tgt = self._modify_tgt(tgt, invalid=True)
2215
2216         self._validate_tgt(tgt, creds, expected_error=0, expect_pac=True,
2217                            expect_pac_attrs=True,
2218                            expect_pac_attrs_pac_request=None,
2219                            expected_sid=sid,
2220                            expect_requester_sid=True)
2221
2222     def test_tgs_requester_sid_rodc_validate(self):
2223         creds = self._get_creds(replication_allowed=True,
2224                                 revealed_to_rodc=True)
2225
2226         samdb = self.get_samdb()
2227         sid = self.get_objectSid(samdb, creds.get_dn())
2228
2229         tgt = self.get_tgt(creds, pac_request=None,
2230                            expect_pac=True,
2231                            expected_sid=sid,
2232                            expect_requester_sid=True)
2233         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True)
2234
2235         self._validate_tgt(tgt, creds, expected_error=0, expect_pac=True,
2236                            expect_pac_attrs=False,
2237                            expected_sid=sid,
2238                            expect_requester_sid=True)
2239
2240     def test_tgs_requester_sid_missing_validate(self):
2241         creds = self._get_creds()
2242
2243         samdb = self.get_samdb()
2244         sid = self.get_objectSid(samdb, creds.get_dn())
2245
2246         tgt = self.get_tgt(creds, pac_request=None,
2247                            expect_pac=True,
2248                            expected_sid=sid,
2249                            expect_requester_sid=True)
2250         tgt = self._modify_tgt(tgt, invalid=True,
2251                                remove_requester_sid=True)
2252
2253         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
2254
2255     def test_tgs_requester_sid_missing_rodc_validate(self):
2256         creds = self._get_creds(replication_allowed=True,
2257                                 revealed_to_rodc=True)
2258
2259         samdb = self.get_samdb()
2260         sid = self.get_objectSid(samdb, creds.get_dn())
2261
2262         tgt = self.get_tgt(creds, pac_request=None,
2263                            expect_pac=True,
2264                            expected_sid=sid,
2265                            expect_requester_sid=True)
2266         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True,
2267                                remove_requester_sid=True)
2268
2269         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
2270
2271     def test_tgs_pac_request_none(self):
2272         creds = self._get_creds()
2273         tgt = self.get_tgt(creds, pac_request=None)
2274
2275         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2276
2277         pac = self.get_ticket_pac(ticket)
2278         self.assertIsNotNone(pac)
2279
2280     def test_tgs_pac_request_false(self):
2281         creds = self._get_creds()
2282         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2283
2284         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=False)
2285
2286         pac = self.get_ticket_pac(ticket, expect_pac=False)
2287         self.assertIsNone(pac)
2288
2289     def test_tgs_pac_request_true(self):
2290         creds = self._get_creds()
2291         tgt = self.get_tgt(creds, pac_request=True)
2292
2293         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2294
2295         pac = self.get_ticket_pac(ticket)
2296         self.assertIsNotNone(pac)
2297
2298     def test_renew_pac_request_none(self):
2299         creds = self._get_creds()
2300         tgt = self.get_tgt(creds, pac_request=None)
2301         tgt = self._modify_tgt(tgt, renewable=True)
2302
2303         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2304                               expect_pac_attrs=True,
2305                               expect_pac_attrs_pac_request=None,
2306                               expect_requester_sid=True)
2307
2308         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2309
2310         pac = self.get_ticket_pac(ticket)
2311         self.assertIsNotNone(pac)
2312
2313     def test_renew_pac_request_false(self):
2314         creds = self._get_creds()
2315         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2316         tgt = self._modify_tgt(tgt, renewable=True)
2317
2318         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2319                               expect_pac_attrs=True,
2320                               expect_pac_attrs_pac_request=False,
2321                               expect_requester_sid=True)
2322
2323         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=False)
2324
2325         pac = self.get_ticket_pac(ticket, expect_pac=False)
2326         self.assertIsNone(pac)
2327
2328     def test_renew_pac_request_true(self):
2329         creds = self._get_creds()
2330         tgt = self.get_tgt(creds, pac_request=True)
2331         tgt = self._modify_tgt(tgt, renewable=True)
2332
2333         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2334                               expect_pac_attrs=True,
2335                               expect_pac_attrs_pac_request=True,
2336                               expect_requester_sid=True)
2337
2338         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2339
2340         pac = self.get_ticket_pac(ticket)
2341         self.assertIsNotNone(pac)
2342
2343     def test_rodc_renew_pac_request_none(self):
2344         creds = self._get_creds(replication_allowed=True,
2345                                 revealed_to_rodc=True)
2346         tgt = self.get_tgt(creds, pac_request=None)
2347         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2348
2349         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2350                               expect_pac_attrs=False,
2351                               expect_requester_sid=True)
2352
2353         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2354
2355         pac = self.get_ticket_pac(ticket)
2356         self.assertIsNotNone(pac)
2357
2358     def test_rodc_renew_pac_request_false(self):
2359         creds = self._get_creds(replication_allowed=True,
2360                                 revealed_to_rodc=True)
2361         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2362         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2363
2364         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2365                               expect_pac_attrs=False,
2366                               expect_requester_sid=True)
2367
2368         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2369
2370         pac = self.get_ticket_pac(ticket)
2371         self.assertIsNotNone(pac)
2372
2373     def test_rodc_renew_pac_request_true(self):
2374         creds = self._get_creds(replication_allowed=True,
2375                                 revealed_to_rodc=True)
2376         tgt = self.get_tgt(creds, pac_request=True)
2377         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2378
2379         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2380                               expect_pac_attrs=False,
2381                               expect_requester_sid=True)
2382
2383         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2384
2385         pac = self.get_ticket_pac(ticket)
2386         self.assertIsNotNone(pac)
2387
2388     def test_validate_pac_request_none(self):
2389         creds = self._get_creds()
2390         tgt = self.get_tgt(creds, pac_request=None)
2391         tgt = self._modify_tgt(tgt, invalid=True)
2392
2393         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2394                                  expect_pac_attrs=True,
2395                                  expect_pac_attrs_pac_request=None,
2396                                  expect_requester_sid=True)
2397
2398         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2399
2400         pac = self.get_ticket_pac(ticket)
2401         self.assertIsNotNone(pac)
2402
2403     def test_validate_pac_request_false(self):
2404         creds = self._get_creds()
2405         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2406         tgt = self._modify_tgt(tgt, invalid=True)
2407
2408         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2409                                  expect_pac_attrs=True,
2410                                  expect_pac_attrs_pac_request=False,
2411                                  expect_requester_sid=True)
2412
2413         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=False)
2414
2415         pac = self.get_ticket_pac(ticket, expect_pac=False)
2416         self.assertIsNone(pac)
2417
2418     def test_validate_pac_request_true(self):
2419         creds = self._get_creds()
2420         tgt = self.get_tgt(creds, pac_request=True)
2421         tgt = self._modify_tgt(tgt, invalid=True)
2422
2423         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2424                                  expect_pac_attrs=True,
2425                                  expect_pac_attrs_pac_request=True,
2426                                  expect_requester_sid=True)
2427
2428         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2429
2430         pac = self.get_ticket_pac(ticket)
2431         self.assertIsNotNone(pac)
2432
2433     def test_rodc_validate_pac_request_none(self):
2434         creds = self._get_creds(replication_allowed=True,
2435                                 revealed_to_rodc=True)
2436         tgt = self.get_tgt(creds, pac_request=None)
2437         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2438
2439         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2440                                  expect_pac_attrs=False,
2441                                  expect_requester_sid=True)
2442
2443         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2444
2445         pac = self.get_ticket_pac(ticket)
2446         self.assertIsNotNone(pac)
2447
2448     def test_rodc_validate_pac_request_false(self):
2449         creds = self._get_creds(replication_allowed=True,
2450                                 revealed_to_rodc=True)
2451         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2452         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2453
2454         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2455                                  expect_pac_attrs=False,
2456                                  expect_requester_sid=True)
2457
2458         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2459
2460         pac = self.get_ticket_pac(ticket)
2461         self.assertIsNotNone(pac)
2462
2463     def test_rodc_validate_pac_request_true(self):
2464         creds = self._get_creds(replication_allowed=True,
2465                                 revealed_to_rodc=True)
2466         tgt = self.get_tgt(creds, pac_request=True)
2467         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2468
2469         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2470                                  expect_pac_attrs=False,
2471                                  expect_requester_sid=True)
2472
2473         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2474
2475         pac = self.get_ticket_pac(ticket)
2476         self.assertIsNotNone(pac)
2477
2478     def test_s4u2self_pac_request_none(self):
2479         creds = self._get_creds()
2480         tgt = self.get_tgt(creds, pac_request=None)
2481
2482         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2483
2484         pac = self.get_ticket_pac(ticket)
2485         self.assertIsNotNone(pac)
2486
2487     def test_s4u2self_pac_request_false(self):
2488         creds = self._get_creds()
2489         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2490
2491         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2492
2493         pac = self.get_ticket_pac(ticket)
2494         self.assertIsNotNone(pac)
2495
2496     def test_s4u2self_pac_request_true(self):
2497         creds = self._get_creds()
2498         tgt = self.get_tgt(creds, pac_request=True)
2499
2500         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2501
2502         pac = self.get_ticket_pac(ticket)
2503         self.assertIsNotNone(pac)
2504
2505     def test_user2user_pac_request_none(self):
2506         creds = self._get_creds()
2507         tgt = self.get_tgt(creds, pac_request=None)
2508
2509         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2510
2511         pac = self.get_ticket_pac(ticket)
2512         self.assertIsNotNone(pac)
2513
2514     def test_user2user_pac_request_false(self):
2515         creds = self._get_creds()
2516         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2517
2518         ticket = self._user2user(tgt, creds, expected_error=0,
2519                                  expect_pac=True)
2520
2521         pac = self.get_ticket_pac(ticket, expect_pac=True)
2522         self.assertIsNotNone(pac)
2523
2524     def test_user2user_pac_request_true(self):
2525         creds = self._get_creds()
2526         tgt = self.get_tgt(creds, pac_request=True)
2527
2528         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2529
2530         pac = self.get_ticket_pac(ticket)
2531         self.assertIsNotNone(pac)
2532
2533     def test_user2user_user_pac_request_none(self):
2534         creds = self._get_creds()
2535         tgt = self.get_tgt(creds)
2536
2537         user_creds = self._get_mach_creds()
2538         user_tgt = self.get_tgt(user_creds, pac_request=None)
2539
2540         ticket = self._user2user(tgt, creds, expected_error=0,
2541                                  user_tgt=user_tgt, user_creds=user_creds,
2542                                  expect_pac=True)
2543
2544         pac = self.get_ticket_pac(ticket)
2545         self.assertIsNotNone(pac)
2546
2547     def test_user2user_user_pac_request_false(self):
2548         creds = self._get_creds()
2549         tgt = self.get_tgt(creds)
2550
2551         user_creds = self._get_mach_creds()
2552         user_tgt = self.get_tgt(user_creds, pac_request=False, expect_pac=None)
2553
2554         ticket = self._user2user(tgt, creds, expected_error=0,
2555                                  user_tgt=user_tgt, user_creds=user_creds,
2556                                  expect_pac=False)
2557
2558         pac = self.get_ticket_pac(ticket, expect_pac=False)
2559         self.assertIsNone(pac)
2560
2561     def test_user2user_user_pac_request_true(self):
2562         creds = self._get_creds()
2563         tgt = self.get_tgt(creds)
2564
2565         user_creds = self._get_mach_creds()
2566         user_tgt = self.get_tgt(user_creds, pac_request=True)
2567
2568         ticket = self._user2user(tgt, creds, expected_error=0,
2569                                  user_tgt=user_tgt, user_creds=user_creds,
2570                                  expect_pac=True)
2571
2572         pac = self.get_ticket_pac(ticket)
2573         self.assertIsNotNone(pac)
2574
2575     def test_fast_pac_request_none(self):
2576         creds = self._get_creds()
2577         tgt = self.get_tgt(creds, pac_request=None)
2578
2579         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2580
2581         pac = self.get_ticket_pac(ticket)
2582         self.assertIsNotNone(pac)
2583
2584     def test_fast_pac_request_false(self):
2585         creds = self._get_creds()
2586         tgt = self.get_tgt(creds, pac_request=False)
2587
2588         ticket = self._fast(tgt, creds, expected_error=0,
2589                             expect_pac=True)
2590
2591         pac = self.get_ticket_pac(ticket, expect_pac=True)
2592         self.assertIsNotNone(pac)
2593
2594     def test_fast_pac_request_true(self):
2595         creds = self._get_creds()
2596         tgt = self.get_tgt(creds, pac_request=True)
2597
2598         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2599
2600         pac = self.get_ticket_pac(ticket)
2601         self.assertIsNotNone(pac)
2602
2603     def test_tgs_rodc_pac_request_none(self):
2604         creds = self._get_creds(replication_allowed=True,
2605                                 revealed_to_rodc=True)
2606         tgt = self.get_tgt(creds, pac_request=None)
2607         tgt = self._modify_tgt(tgt, from_rodc=True)
2608
2609         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2610
2611         pac = self.get_ticket_pac(ticket)
2612         self.assertIsNotNone(pac)
2613
2614     def test_tgs_rodc_pac_request_false(self):
2615         creds = self._get_creds(replication_allowed=True,
2616                                 revealed_to_rodc=True)
2617         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2618         tgt = self._modify_tgt(tgt, from_rodc=True)
2619
2620         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2621
2622         pac = self.get_ticket_pac(ticket)
2623         self.assertIsNotNone(pac)
2624
2625     def test_tgs_rodc_pac_request_true(self):
2626         creds = self._get_creds(replication_allowed=True,
2627                                 revealed_to_rodc=True)
2628         tgt = self.get_tgt(creds, pac_request=True)
2629         tgt = self._modify_tgt(tgt, from_rodc=True)
2630
2631         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2632
2633         pac = self.get_ticket_pac(ticket)
2634         self.assertIsNotNone(pac)
2635
2636     def test_tgs_rename(self):
2637         creds = self.get_cached_creds(account_type=self.AccountType.USER,
2638                                       use_cache=False)
2639         tgt = self.get_tgt(creds)
2640
2641         # Rename the account.
2642         new_name = self.get_new_username()
2643
2644         samdb = self.get_samdb()
2645         msg = ldb.Message(creds.get_dn())
2646         msg['sAMAccountName'] = ldb.MessageElement(new_name,
2647                                                    ldb.FLAG_MOD_REPLACE,
2648                                                    'sAMAccountName')
2649         samdb.modify(msg)
2650
2651         self._run_tgs(tgt, creds, expected_error=(KDC_ERR_TGT_REVOKED,
2652                                                   KDC_ERR_C_PRINCIPAL_UNKNOWN))
2653
2654     # Test making a TGS request for a ticket expiring post-2038.
2655     def test_tgs_req_future_till(self):
2656         creds = self._get_creds()
2657         tgt = self._get_tgt(creds)
2658
2659         target_creds = self.get_service_creds()
2660         self._tgs_req(
2661             tgt=tgt,
2662             expected_error=0,
2663             creds=creds,
2664             target_creds=target_creds,
2665             till='99990913024805Z')
2666
2667     def _modify_renewable(self, enc_part):
2668         # Set the renewable flag.
2669         enc_part = self.modify_ticket_flag(enc_part, 'renewable', value=True)
2670
2671         # Set the renew-till time to be in the future.
2672         renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
2673         enc_part['renew-till'] = renew_till
2674
2675         return enc_part
2676
2677     def _modify_invalid(self, enc_part):
2678         # Set the invalid flag.
2679         enc_part = self.modify_ticket_flag(enc_part, 'invalid', value=True)
2680
2681         # Set the ticket start time to be in the past.
2682         past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
2683         enc_part['starttime'] = past_time
2684
2685         return enc_part
2686
2687     def _get_tgt(self,
2688                  client_creds,
2689                  renewable=False,
2690                  invalid=False,
2691                  from_rodc=False,
2692                  new_rid=None,
2693                  remove_pac=False,
2694                  allow_empty_authdata=False,
2695                  can_modify_logon_info=True,
2696                  can_modify_requester_sid=True,
2697                  remove_pac_attrs=False,
2698                  remove_requester_sid=False,
2699                  etype=None,
2700                  cksum_etype=None):
2701         self.assertFalse(renewable and invalid)
2702
2703         if remove_pac:
2704             self.assertIsNone(new_rid)
2705
2706         tgt = self.get_tgt(client_creds)
2707
2708         return self._modify_tgt(
2709             tgt=tgt,
2710             renewable=renewable,
2711             invalid=invalid,
2712             from_rodc=from_rodc,
2713             new_rid=new_rid,
2714             remove_pac=remove_pac,
2715             allow_empty_authdata=allow_empty_authdata,
2716             can_modify_logon_info=can_modify_logon_info,
2717             can_modify_requester_sid=can_modify_requester_sid,
2718             remove_pac_attrs=remove_pac_attrs,
2719             remove_requester_sid=remove_requester_sid,
2720             etype=etype,
2721             cksum_etype=cksum_etype)
2722
2723     def _modify_tgt(self,
2724                     tgt,
2725                     renewable=False,
2726                     invalid=False,
2727                     from_rodc=False,
2728                     new_rid=None,
2729                     remove_pac=False,
2730                     allow_empty_authdata=False,
2731                     cname=None,
2732                     crealm=None,
2733                     can_modify_logon_info=True,
2734                     can_modify_requester_sid=True,
2735                     remove_pac_attrs=False,
2736                     remove_requester_sid=False,
2737                     etype=None,
2738                     cksum_etype=None):
2739         if from_rodc:
2740             krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2741         else:
2742             krbtgt_creds = self.get_krbtgt_creds()
2743
2744         if new_rid is not None or remove_requester_sid or remove_pac_attrs:
2745             def change_sid_fn(pac):
2746                 pac_buffers = pac.buffers
2747                 for pac_buffer in pac_buffers:
2748                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
2749                         if new_rid is not None and can_modify_logon_info:
2750                             logon_info = pac_buffer.info.info
2751
2752                             logon_info.info3.base.rid = new_rid
2753                     elif pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID:
2754                         if remove_requester_sid:
2755                             pac.num_buffers -= 1
2756                             pac_buffers.remove(pac_buffer)
2757                         elif new_rid is not None and can_modify_requester_sid:
2758                             requester_sid = pac_buffer.info
2759
2760                             samdb = self.get_samdb()
2761                             domain_sid = samdb.get_domain_sid()
2762
2763                             new_sid = f'{domain_sid}-{new_rid}'
2764
2765                             requester_sid.sid = security.dom_sid(new_sid)
2766                     elif pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO:
2767                         if remove_pac_attrs:
2768                             pac.num_buffers -= 1
2769                             pac_buffers.remove(pac_buffer)
2770
2771                 pac.buffers = pac_buffers
2772
2773                 return pac
2774         else:
2775             change_sid_fn = None
2776
2777         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds,
2778                                                          etype)
2779
2780         if remove_pac:
2781             checksum_keys = None
2782         else:
2783             if etype == cksum_etype:
2784                 cksum_key = krbtgt_key
2785             else:
2786                 cksum_key = self.TicketDecryptionKey_from_creds(krbtgt_creds,
2787                                                                 cksum_etype)
2788             checksum_keys = {
2789                 krb5pac.PAC_TYPE_KDC_CHECKSUM: cksum_key
2790             }
2791
2792         if renewable:
2793             flags_modify_fn = self._modify_renewable
2794         elif invalid:
2795             flags_modify_fn = self._modify_invalid
2796         else:
2797             flags_modify_fn = None
2798
2799         if cname is not None or crealm is not None:
2800             def modify_fn(enc_part):
2801                 if flags_modify_fn is not None:
2802                     enc_part = flags_modify_fn(enc_part)
2803
2804                 if cname is not None:
2805                     enc_part['cname'] = cname
2806
2807                 if crealm is not None:
2808                     enc_part['crealm'] = crealm
2809
2810                 return enc_part
2811         else:
2812             modify_fn = flags_modify_fn
2813
2814         if cname is not None:
2815             def modify_pac_fn(pac):
2816                 if change_sid_fn is not None:
2817                     pac = change_sid_fn(pac)
2818
2819                 for pac_buffer in pac.buffers:
2820                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2821                         logon_info = pac_buffer.info
2822
2823                         logon_info.account_name = (
2824                             cname['name-string'][0].decode('utf-8'))
2825
2826                 return pac
2827         else:
2828             modify_pac_fn = change_sid_fn
2829
2830         return self.modified_ticket(
2831             tgt,
2832             new_ticket_key=krbtgt_key,
2833             modify_fn=modify_fn,
2834             modify_pac_fn=modify_pac_fn,
2835             exclude_pac=remove_pac,
2836             allow_empty_authdata=allow_empty_authdata,
2837             update_pac_checksums=not remove_pac,
2838             checksum_keys=checksum_keys)
2839
2840     def _remove_rodc_partial_secrets(self):
2841         samdb = self.get_samdb()
2842
2843         rodc_ctx = self.get_mock_rodc_ctx()
2844         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2845
2846         def add_rodc_partial_secrets():
2847             msg = ldb.Message()
2848             msg.dn = rodc_dn
2849             msg['userAccountControl'] = ldb.MessageElement(
2850                 str(rodc_ctx.userAccountControl),
2851                 ldb.FLAG_MOD_REPLACE,
2852                 'userAccountControl')
2853             samdb.modify(msg)
2854
2855         self.addCleanup(add_rodc_partial_secrets)
2856
2857         uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
2858
2859         msg = ldb.Message()
2860         msg.dn = rodc_dn
2861         msg['userAccountControl'] = ldb.MessageElement(
2862             str(uac),
2863             ldb.FLAG_MOD_REPLACE,
2864             'userAccountControl')
2865         samdb.modify(msg)
2866
2867     def _remove_rodc_krbtgt_link(self):
2868         samdb = self.get_samdb()
2869
2870         rodc_ctx = self.get_mock_rodc_ctx()
2871         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2872
2873         def add_rodc_krbtgt_link():
2874             msg = ldb.Message()
2875             msg.dn = rodc_dn
2876             msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2877                 rodc_ctx.new_krbtgt_dn,
2878                 ldb.FLAG_MOD_ADD,
2879                 'msDS-KrbTgtLink')
2880             samdb.modify(msg)
2881
2882         self.addCleanup(add_rodc_krbtgt_link)
2883
2884         msg = ldb.Message()
2885         msg.dn = rodc_dn
2886         msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2887             [],
2888             ldb.FLAG_MOD_DELETE,
2889             'msDS-KrbTgtLink')
2890         samdb.modify(msg)
2891
2892     def _get_creds(self,
2893                    replication_allowed=False,
2894                    replication_denied=False,
2895                    revealed_to_rodc=False):
2896         return self.get_cached_creds(
2897             account_type=self.AccountType.COMPUTER,
2898             opts={
2899                 'allowed_replication_mock': replication_allowed,
2900                 'denied_replication_mock': replication_denied,
2901                 'revealed_to_mock_rodc': revealed_to_rodc,
2902                 'id': 0
2903             })
2904
2905     def _get_existing_rid(self,
2906                           replication_allowed=False,
2907                           replication_denied=False,
2908                           revealed_to_rodc=False):
2909         other_creds = self.get_cached_creds(
2910             account_type=self.AccountType.COMPUTER,
2911             opts={
2912                 'allowed_replication_mock': replication_allowed,
2913                 'denied_replication_mock': replication_denied,
2914                 'revealed_to_mock_rodc': revealed_to_rodc,
2915                 'id': 1
2916             })
2917
2918         samdb = self.get_samdb()
2919
2920         other_dn = other_creds.get_dn()
2921         other_sid = self.get_objectSid(samdb, other_dn)
2922
2923         other_rid = int(other_sid.rsplit('-', 1)[1])
2924
2925         return other_rid
2926
2927     def _get_mach_creds(self):
2928         return self.get_cached_creds(
2929             account_type=self.AccountType.COMPUTER,
2930             opts={
2931                 'allowed_replication_mock': True,
2932                 'denied_replication_mock': False,
2933                 'revealed_to_mock_rodc': True,
2934                 'id': 2
2935             })
2936
2937     def _get_non_existent_rid(self):
2938         return (1 << 30) - 1
2939
2940     def _run_tgs(self, tgt, creds, expected_error, *, expect_pac=True,
2941                  expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2942                  expect_requester_sid=None, expected_sid=None,
2943                  expect_edata=False, expect_status=None, expected_status=None):
2944         target_creds = self.get_service_creds()
2945         return self._tgs_req(
2946             tgt, expected_error, creds, target_creds,
2947             expect_pac=expect_pac,
2948             expect_pac_attrs=expect_pac_attrs,
2949             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2950             expect_requester_sid=expect_requester_sid,
2951             expected_sid=expected_sid,
2952             expect_edata=expect_edata,
2953             expect_status=expect_status,
2954             expected_status=expected_status)
2955
2956     # These tests fail against Windows, which does not implement ticket
2957     # renewal.
2958     def _renew_tgt(self, tgt, creds, expected_error, *, expect_pac=True,
2959                    expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2960                    expect_requester_sid=None, expected_sid=None):
2961         krbtgt_creds = self.get_krbtgt_creds()
2962         kdc_options = str(krb5_asn1.KDCOptions('renew'))
2963         return self._tgs_req(
2964             tgt, expected_error, creds, krbtgt_creds,
2965             kdc_options=kdc_options,
2966             expect_pac=expect_pac,
2967             expect_pac_attrs=expect_pac_attrs,
2968             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2969             expect_requester_sid=expect_requester_sid,
2970             expected_sid=expected_sid)
2971
2972     # These tests fail against Windows, which does not implement ticket
2973     # validation.
2974     def _validate_tgt(self, tgt, creds, expected_error, *, expect_pac=True,
2975                       expect_pac_attrs=None,
2976                       expect_pac_attrs_pac_request=None,
2977                       expect_requester_sid=None,
2978                       expected_sid=None):
2979         krbtgt_creds = self.get_krbtgt_creds()
2980         kdc_options = str(krb5_asn1.KDCOptions('validate'))
2981         return self._tgs_req(
2982             tgt, expected_error, creds, krbtgt_creds,
2983             kdc_options=kdc_options,
2984             expect_pac=expect_pac,
2985             expect_pac_attrs=expect_pac_attrs,
2986             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2987             expect_requester_sid=expect_requester_sid,
2988             expected_sid=expected_sid)
2989
2990     def _s4u2self(self, tgt, tgt_creds, expected_error, *, expect_pac=True,
2991                   expect_edata=False, expect_status=None,
2992                   expected_status=None):
2993         user_creds = self._get_mach_creds()
2994
2995         user_name = user_creds.get_username()
2996         user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2997                                                names=[user_name])
2998         user_realm = user_creds.get_realm()
2999
3000         def generate_s4u2self_padata(_kdc_exchange_dict,
3001                                      _callback_dict,
3002                                      req_body):
3003             padata = self.PA_S4U2Self_create(
3004                 name=user_cname,
3005                 realm=user_realm,
3006                 tgt_session_key=tgt.session_key,
3007                 ctype=None)
3008
3009             return [padata], req_body
3010
3011         return self._tgs_req(tgt, expected_error, tgt_creds, tgt_creds,
3012                              expected_cname=user_cname,
3013                              generate_padata_fn=generate_s4u2self_padata,
3014                              expect_edata=expect_edata,
3015                              expect_status=expect_status,
3016                              expected_status=expected_status,
3017                              expect_pac=expect_pac)
3018
3019     def _user2user(self, tgt, tgt_creds, expected_error, *,
3020                    sname=None,
3021                    srealm=None, user_tgt=None, user_creds=None,
3022                    expect_pac=True, expected_status=None):
3023         if user_tgt is None:
3024             user_creds = self._get_mach_creds()
3025             user_tgt = self.get_tgt(user_creds)
3026         else:
3027             self.assertIsNotNone(user_creds,
3028                                  'if supplying user_tgt, user_creds should be '
3029                                  'supplied also')
3030
3031         kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
3032         return self._tgs_req(user_tgt, expected_error, user_creds, tgt_creds,
3033                              kdc_options=kdc_options,
3034                              additional_ticket=tgt,
3035                              sname=sname,
3036                              srealm=srealm,
3037                              expect_pac=expect_pac,
3038                              expected_status=expected_status)
3039
3040     def _fast(self, armor_tgt, armor_tgt_creds, expected_error,
3041               expected_sname=None, expect_pac=True, expect_edata=False):
3042         user_creds = self._get_mach_creds()
3043         user_tgt = self.get_tgt(user_creds)
3044
3045         target_creds = self.get_service_creds()
3046
3047         return self._tgs_req(user_tgt, expected_error,
3048                              user_creds, target_creds,
3049                              armor_tgt=armor_tgt,
3050                              expected_sname=expected_sname,
3051                              expect_pac=expect_pac,
3052                              expect_edata=expect_edata)
3053
3054
3055 if __name__ == "__main__":
3056     global_asn1_print = False
3057     global_hexdump = False
3058     import unittest
3059     unittest.main()