tests/krb5: Add samba.tests.krb5.conditional_ace_tests
[samba.git] / python / samba / tests / krb5 / kdc_tgs_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
25
26 import ldb
27
28 from samba import dsdb, ntstatus
29
30 from samba.dcerpc import krb5pac, security
31
32
33 import samba.tests.krb5.kcrypto as kcrypto
34 from samba.tests.krb5.kdc_base_test import KDCBaseTest
35 from samba.tests.krb5.raw_testcase import Krb5EncryptionKey
36 from samba.tests.krb5.rfc4120_constants import (
37     AES256_CTS_HMAC_SHA1_96,
38     ARCFOUR_HMAC_MD5,
39     KRB_ERROR,
40     KDC_ERR_BADKEYVER,
41     KDC_ERR_BADMATCH,
42     KDC_ERR_ETYPE_NOSUPP,
43     KDC_ERR_GENERIC,
44     KDC_ERR_MODIFIED,
45     KDC_ERR_NOT_US,
46     KDC_ERR_POLICY,
47     KDC_ERR_PREAUTH_REQUIRED,
48     KDC_ERR_C_PRINCIPAL_UNKNOWN,
49     KDC_ERR_S_PRINCIPAL_UNKNOWN,
50     KDC_ERR_TKT_EXPIRED,
51     KDC_ERR_TGT_REVOKED,
52     KRB_ERR_TKT_NYV,
53     KDC_ERR_WRONG_REALM,
54     NT_ENTERPRISE_PRINCIPAL,
55     NT_PRINCIPAL,
56     NT_SRV_INST,
57 )
58 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
59
60 global_asn1_print = False
61 global_hexdump = False
62
63
64 class KdcTgsBaseTests(KDCBaseTest):
65     def _as_req(self,
66                 creds,
67                 expected_error,
68                 target_creds,
69                 etype,
70                 expected_ticket_etype=None):
71         user_name = creds.get_username()
72         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
73                                           names=user_name.split('/'))
74
75         target_name = target_creds.get_username()
76         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
77                                           names=['host', target_name[:-1]])
78
79         if expected_error:
80             expected_sname = sname
81         else:
82             expected_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
83                                                        names=[target_name])
84
85         realm = creds.get_realm()
86         salt = creds.get_salt()
87
88         till = self.get_KerberosTime(offset=36000)
89
90         ticket_decryption_key = (
91             self.TicketDecryptionKey_from_creds(target_creds,
92                                                 etype=expected_ticket_etype))
93         expected_etypes = target_creds.tgs_supported_enctypes
94
95         kdc_options = ('forwardable,'
96                        'renewable,'
97                        'canonicalize,'
98                        'renewable-ok')
99         kdc_options = krb5_asn1.KDCOptions(kdc_options)
100
101         if expected_error:
102             initial_error = (KDC_ERR_PREAUTH_REQUIRED, expected_error)
103         else:
104             initial_error = KDC_ERR_PREAUTH_REQUIRED
105
106         rep, kdc_exchange_dict = self._test_as_exchange(
107             creds=creds,
108             cname=cname,
109             realm=realm,
110             sname=sname,
111             till=till,
112             expected_error_mode=initial_error,
113             expected_crealm=realm,
114             expected_cname=cname,
115             expected_srealm=realm,
116             expected_sname=sname,
117             expected_salt=salt,
118             expected_supported_etypes=expected_etypes,
119             etypes=etype,
120             padata=None,
121             kdc_options=kdc_options,
122             preauth_key=None,
123             ticket_decryption_key=ticket_decryption_key)
124         self.assertIsNotNone(rep)
125         self.assertEqual(KRB_ERROR, rep['msg-type'])
126         error_code = rep['error-code']
127         if expected_error:
128             self.assertIn(error_code, initial_error)
129             if error_code == expected_error:
130                 return
131         else:
132             self.assertEqual(initial_error, error_code)
133
134         etype_info2 = kdc_exchange_dict['preauth_etype_info2']
135
136         preauth_key = self.PasswordKey_from_etype_info2(creds,
137                                                         etype_info2[0],
138                                                         creds.get_kvno())
139
140         ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
141
142         padata = [ts_enc_padata]
143
144         expected_realm = realm.upper()
145
146         rep, kdc_exchange_dict = self._test_as_exchange(
147             creds=creds,
148             cname=cname,
149             realm=realm,
150             sname=sname,
151             till=till,
152             expected_error_mode=expected_error,
153             expected_crealm=expected_realm,
154             expected_cname=cname,
155             expected_srealm=expected_realm,
156             expected_sname=expected_sname,
157             expected_salt=salt,
158             expected_supported_etypes=expected_etypes,
159             etypes=etype,
160             padata=padata,
161             kdc_options=kdc_options,
162             preauth_key=preauth_key,
163             ticket_decryption_key=ticket_decryption_key,
164             expect_edata=False)
165         if expected_error:
166             self.check_error_rep(rep, expected_error)
167             return None
168
169         self.check_as_reply(rep)
170         return kdc_exchange_dict['rep_ticket_creds']
171
172     def _tgs_req(self, tgt, expected_error, creds, target_creds, *,
173                  armor_tgt=None,
174                  kdc_options='0',
175                  pac_options=None,
176                  expected_cname=None,
177                  expected_sname=None,
178                  expected_account_name=None,
179                  expected_flags=None,
180                  additional_ticket=None,
181                  decryption_key=None,
182                  generate_padata_fn=None,
183                  generate_fast_padata_fn=None,
184                  sname=None,
185                  srealm=None,
186                  till=None,
187                  etypes=None,
188                  expected_ticket_etype=None,
189                  expected_supported_etypes=None,
190                  expect_pac=True,
191                  expect_pac_attrs=None,
192                  expect_pac_attrs_pac_request=None,
193                  expect_requester_sid=None,
194                  expect_edata=False,
195                  expected_sid=None,
196                  expected_groups=None,
197                  unexpected_groups=None,
198                  expect_device_info=None,
199                  expected_device_domain_sid=None,
200                  expected_device_groups=None,
201                  expect_client_claims=None,
202                  expected_client_claims=None,
203                  unexpected_client_claims=None,
204                  expect_device_claims=None,
205                  expected_device_claims=None,
206                  expect_status=None,
207                  expected_status=None,
208                  expected_proxy_target=None,
209                  expected_transited_services=None,
210                  check_patypes=True):
211         if srealm is False:
212             srealm = None
213         elif srealm is None:
214             srealm = target_creds.get_realm()
215
216         if sname is False:
217             sname = None
218             if expected_sname is None:
219                 expected_sname = self.get_krbtgt_sname()
220         else:
221             if sname is None:
222                 target_name = target_creds.get_username()
223                 if target_name == 'krbtgt':
224                     sname = self.PrincipalName_create(
225                         name_type=NT_SRV_INST,
226                         names=[target_name, srealm])
227                 else:
228                     if target_name[-1] == '$':
229                         target_name = target_name[:-1]
230                     sname = self.PrincipalName_create(
231                         name_type=NT_PRINCIPAL,
232                         names=['host', target_name])
233
234             if expected_sname is None:
235                 expected_sname = sname
236
237         if additional_ticket is not None:
238             additional_tickets = [additional_ticket.ticket]
239             if decryption_key is None:
240                 decryption_key = additional_ticket.session_key
241         else:
242             additional_tickets = None
243             if decryption_key is None:
244                 decryption_key = self.TicketDecryptionKey_from_creds(
245                     target_creds, etype=expected_ticket_etype)
246
247         subkey = self.RandomKey(tgt.session_key.etype)
248
249         if armor_tgt is not None:
250             armor_subkey = self.RandomKey(subkey.etype)
251             explicit_armor_key = self.generate_armor_key(armor_subkey,
252                                                          armor_tgt.session_key)
253             armor_key = kcrypto.cf2(explicit_armor_key.key,
254                                     subkey.key,
255                                     b'explicitarmor',
256                                     b'tgsarmor')
257             armor_key = Krb5EncryptionKey(armor_key, None)
258
259             generate_fast_fn = self.generate_simple_fast
260             generate_fast_armor_fn = self.generate_ap_req
261
262             if pac_options is None:
263                 pac_options = '1'  # claims support
264         else:
265             armor_subkey = None
266             armor_key = None
267             generate_fast_fn = None
268             generate_fast_armor_fn = None
269
270         if etypes is None:
271             etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
272
273         if expected_error:
274             check_error_fn = self.generic_check_kdc_error
275             check_rep_fn = None
276         else:
277             check_error_fn = None
278             check_rep_fn = self.generic_check_kdc_rep
279
280         if expected_cname is None:
281             expected_cname = tgt.cname
282
283         kdc_exchange_dict = self.tgs_exchange_dict(
284             creds=creds,
285             expected_crealm=tgt.crealm,
286             expected_cname=expected_cname,
287             expected_srealm=srealm,
288             expected_sname=expected_sname,
289             expected_account_name=expected_account_name,
290             expected_flags=expected_flags,
291             ticket_decryption_key=decryption_key,
292             generate_padata_fn=generate_padata_fn,
293             generate_fast_padata_fn=generate_fast_padata_fn,
294             generate_fast_fn=generate_fast_fn,
295             generate_fast_armor_fn=generate_fast_armor_fn,
296             check_error_fn=check_error_fn,
297             check_rep_fn=check_rep_fn,
298             check_kdc_private_fn=self.generic_check_kdc_private,
299             expected_error_mode=expected_error,
300             expect_status=expect_status,
301             expected_status=expected_status,
302             tgt=tgt,
303             armor_key=armor_key,
304             armor_tgt=armor_tgt,
305             armor_subkey=armor_subkey,
306             pac_options=pac_options,
307             authenticator_subkey=subkey,
308             kdc_options=kdc_options,
309             expected_supported_etypes=expected_supported_etypes,
310             expect_edata=expect_edata,
311             expect_pac=expect_pac,
312             expect_pac_attrs=expect_pac_attrs,
313             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
314             expect_requester_sid=expect_requester_sid,
315             expected_sid=expected_sid,
316             expected_groups=expected_groups,
317             unexpected_groups=unexpected_groups,
318             expect_device_info=expect_device_info,
319             expected_device_domain_sid=expected_device_domain_sid,
320             expected_device_groups=expected_device_groups,
321             expect_client_claims=expect_client_claims,
322             expected_client_claims=expected_client_claims,
323             unexpected_client_claims=unexpected_client_claims,
324             expect_device_claims=expect_device_claims,
325             expected_device_claims=expected_device_claims,
326             expected_proxy_target=expected_proxy_target,
327             expected_transited_services=expected_transited_services,
328             check_patypes=check_patypes)
329
330         rep = self._generic_kdc_exchange(kdc_exchange_dict,
331                                          cname=None,
332                                          realm=srealm,
333                                          sname=sname,
334                                          till_time=till,
335                                          etypes=etypes,
336                                          additional_tickets=additional_tickets)
337         if expected_error:
338             self.check_error_rep(rep, expected_error)
339             return None
340         else:
341             self.check_tgs_reply(rep)
342             return kdc_exchange_dict['rep_ticket_creds']
343
344
345 class KdcTgsTests(KdcTgsBaseTests):
346
347     def setUp(self):
348         super().setUp()
349         self.do_asn1_print = global_asn1_print
350         self.do_hexdump = global_hexdump
351
352     def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
353         ''' Try and obtain a ticket from the TGS, but supply a cname
354             that differs from that provided to the krbtgt
355         '''
356         # Create the user account
357         samdb = self.get_samdb()
358         user_name = "tsttktusr"
359         (uc, _) = self.create_account(samdb, user_name)
360         realm = uc.get_realm().lower()
361
362         # Do the initial AS-REQ, should get a pre-authentication required
363         # response
364         etype = (AES256_CTS_HMAC_SHA1_96,)
365         cname = self.PrincipalName_create(
366             name_type=NT_PRINCIPAL, names=[user_name])
367         sname = self.PrincipalName_create(
368             name_type=NT_SRV_INST, names=["krbtgt", realm])
369
370         rep = self.as_req(cname, sname, realm, etype)
371         self.check_pre_authentication(rep)
372
373         # Do the next AS-REQ
374         padata = self.get_enc_timestamp_pa_data(uc, rep)
375         key = self.get_as_rep_key(uc, rep)
376         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
377         self.check_as_reply(rep)
378
379         # Request a service ticket, but use a cname that does not match
380         # that in the original AS-REQ
381         enc_part2 = self.get_as_rep_enc_data(key, rep)
382         key = self.EncryptionKey_import(enc_part2['key'])
383         ticket = rep['ticket']
384
385         cname = self.PrincipalName_create(
386             name_type=NT_PRINCIPAL,
387             names=["Administrator"])
388         sname = self.PrincipalName_create(
389             name_type=NT_PRINCIPAL,
390             names=["host", samdb.host_dns_name()])
391
392         (rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype,
393                                        creds=uc,
394                                        expected_error_mode=KDC_ERR_BADMATCH,
395                                        expect_edata=False)
396
397         self.assertIsNone(
398             enc_part,
399             "rep = {%s}, enc_part = {%s}" % (rep, enc_part))
400         self.assertEqual(KRB_ERROR, rep['msg-type'], "rep = {%s}" % rep)
401         self.assertEqual(
402             KDC_ERR_BADMATCH,
403             rep['error-code'],
404             "rep = {%s}" % rep)
405
406     def test_ldap_service_ticket(self):
407         '''Get a ticket to the ldap service
408         '''
409         # Create the user account
410         samdb = self.get_samdb()
411         user_name = "tsttktusr"
412         (uc, _) = self.create_account(samdb, user_name)
413         realm = uc.get_realm().lower()
414
415         # Do the initial AS-REQ, should get a pre-authentication required
416         # response
417         etype = (AES256_CTS_HMAC_SHA1_96,)
418         cname = self.PrincipalName_create(
419             name_type=NT_PRINCIPAL, names=[user_name])
420         sname = self.PrincipalName_create(
421             name_type=NT_SRV_INST, names=["krbtgt", realm])
422
423         rep = self.as_req(cname, sname, realm, etype)
424         self.check_pre_authentication(rep)
425
426         # Do the next AS-REQ
427         padata = self.get_enc_timestamp_pa_data(uc, rep)
428         key = self.get_as_rep_key(uc, rep)
429         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
430         self.check_as_reply(rep)
431
432         enc_part2 = self.get_as_rep_enc_data(key, rep)
433         key = self.EncryptionKey_import(enc_part2['key'])
434         ticket = rep['ticket']
435
436         # Request a ticket to the ldap service
437         sname = self.PrincipalName_create(
438             name_type=NT_SRV_INST,
439             names=["ldap", samdb.host_dns_name()])
440
441         (rep, _) = self.tgs_req(
442             cname, sname, uc.get_realm(), ticket, key, etype,
443             service_creds=self.get_dc_creds())
444
445         self.check_tgs_reply(rep)
446
447     def test_get_ticket_for_host_service_of_machine_account(self):
448
449         # Create a user and machine account for the test.
450         #
451         samdb = self.get_samdb()
452         user_name = "tsttktusr"
453         (uc, dn) = self.create_account(samdb, user_name)
454         (mc, _) = self.create_account(samdb, "tsttktmac",
455                                       account_type=self.AccountType.COMPUTER)
456         realm = uc.get_realm().lower()
457
458         # Do the initial AS-REQ, should get a pre-authentication required
459         # response
460         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
461         cname = self.PrincipalName_create(
462             name_type=NT_PRINCIPAL, names=[user_name])
463         sname = self.PrincipalName_create(
464             name_type=NT_SRV_INST, names=["krbtgt", realm])
465
466         rep = self.as_req(cname, sname, realm, etype)
467         self.check_pre_authentication(rep)
468
469         # Do the next AS-REQ
470         padata = self.get_enc_timestamp_pa_data(uc, rep)
471         key = self.get_as_rep_key(uc, rep)
472         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
473         self.check_as_reply(rep)
474
475         # Request a ticket to the host service on the machine account
476         ticket = rep['ticket']
477         enc_part2 = self.get_as_rep_enc_data(key, rep)
478         key = self.EncryptionKey_import(enc_part2['key'])
479         cname = self.PrincipalName_create(
480             name_type=NT_PRINCIPAL,
481             names=[user_name])
482         sname = self.PrincipalName_create(
483             name_type=NT_PRINCIPAL,
484             names=[mc.get_username()])
485
486         (rep, enc_part) = self.tgs_req(
487             cname, sname, uc.get_realm(), ticket, key, etype,
488             service_creds=mc)
489         self.check_tgs_reply(rep)
490
491         # Check the contents of the service ticket
492         ticket = rep['ticket']
493         enc_part = self.decode_service_ticket(mc, ticket)
494
495         pac_data = self.get_pac_data(enc_part['authorization-data'])
496         sid = uc.get_sid()
497         upn = "%s@%s" % (uc.get_username(), realm)
498         self.assertEqual(
499             uc.get_username(),
500             str(pac_data.account_name),
501             "rep = {%s},%s" % (rep, pac_data))
502         self.assertEqual(
503             uc.get_username(),
504             pac_data.logon_name,
505             "rep = {%s},%s" % (rep, pac_data))
506         self.assertEqual(
507             uc.get_realm(),
508             pac_data.domain_name,
509             "rep = {%s},%s" % (rep, pac_data))
510         self.assertEqual(
511             upn,
512             pac_data.upn,
513             "rep = {%s},%s" % (rep, pac_data))
514         self.assertEqual(
515             sid,
516             pac_data.account_sid,
517             "rep = {%s},%s" % (rep, pac_data))
518
519     def test_request(self):
520         client_creds = self.get_client_creds()
521         service_creds = self.get_service_creds()
522
523         tgt = self.get_tgt(client_creds)
524
525         pac = self.get_ticket_pac(tgt)
526         self.assertIsNotNone(pac)
527
528         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
529
530         pac = self.get_ticket_pac(ticket)
531         self.assertIsNotNone(pac)
532
533     def test_request_no_pac(self):
534         client_creds = self.get_client_creds()
535         service_creds = self.get_service_creds()
536
537         tgt = self.get_tgt(client_creds, pac_request=False)
538
539         pac = self.get_ticket_pac(tgt)
540         self.assertIsNotNone(pac)
541
542         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
543                                         pac_request=False, expect_pac=False)
544
545         pac = self.get_ticket_pac(ticket, expect_pac=False)
546         self.assertIsNone(pac)
547
548     def test_request_enterprise_canon(self):
549         upn = self.get_new_username()
550         client_creds = self.get_cached_creds(
551             account_type=self.AccountType.USER,
552             opts={'upn': upn})
553         service_creds = self.get_service_creds()
554
555         user_name = client_creds.get_username()
556         realm = client_creds.get_realm()
557         client_account = f'{user_name}@{realm}'
558
559         expected_cname = self.PrincipalName_create(
560             name_type=NT_PRINCIPAL,
561             names=[user_name])
562
563         kdc_options = 'canonicalize'
564
565         tgt = self.get_tgt(client_creds,
566                            client_account=client_account,
567                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
568                            expected_cname=expected_cname,
569                            expected_account_name=user_name,
570                            kdc_options=kdc_options)
571
572         self._make_tgs_request(
573             client_creds, service_creds, tgt,
574             client_account=client_account,
575             client_name_type=NT_ENTERPRISE_PRINCIPAL,
576             expected_cname=expected_cname,
577             expected_account_name=user_name,
578             kdc_options=kdc_options)
579
580     def test_request_enterprise_canon_case(self):
581         upn = self.get_new_username()
582         client_creds = self.get_cached_creds(
583             account_type=self.AccountType.USER,
584             opts={'upn': upn})
585         service_creds = self.get_service_creds()
586
587         user_name = client_creds.get_username()
588         realm = client_creds.get_realm().lower()
589         client_account = f'{user_name}@{realm}'
590
591         expected_cname = self.PrincipalName_create(
592             name_type=NT_PRINCIPAL,
593             names=[user_name])
594
595         kdc_options = 'canonicalize'
596
597         tgt = self.get_tgt(client_creds,
598                            client_account=client_account,
599                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
600                            expected_cname=expected_cname,
601                            expected_account_name=user_name,
602                            kdc_options=kdc_options)
603
604         self._make_tgs_request(
605             client_creds, service_creds, tgt,
606             client_account=client_account,
607             client_name_type=NT_ENTERPRISE_PRINCIPAL,
608             expected_cname=expected_cname,
609             expected_account_name=user_name,
610             kdc_options=kdc_options)
611
612     def test_request_enterprise_canon_mac(self):
613         upn = self.get_new_username()
614         client_creds = self.get_cached_creds(
615             account_type=self.AccountType.COMPUTER,
616             opts={'upn': upn})
617         service_creds = self.get_service_creds()
618
619         user_name = client_creds.get_username()
620         realm = client_creds.get_realm()
621         client_account = f'{user_name}@{realm}'
622
623         expected_cname = self.PrincipalName_create(
624             name_type=NT_PRINCIPAL,
625             names=[user_name])
626
627         kdc_options = 'canonicalize'
628
629         tgt = self.get_tgt(client_creds,
630                            client_account=client_account,
631                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
632                            expected_cname=expected_cname,
633                            expected_account_name=user_name,
634                            kdc_options=kdc_options)
635
636         self._make_tgs_request(
637             client_creds, service_creds, tgt,
638             client_account=client_account,
639             client_name_type=NT_ENTERPRISE_PRINCIPAL,
640             expected_cname=expected_cname,
641             expected_account_name=user_name,
642             kdc_options=kdc_options)
643
644     def test_request_enterprise_canon_case_mac(self):
645         upn = self.get_new_username()
646         client_creds = self.get_cached_creds(
647             account_type=self.AccountType.COMPUTER,
648             opts={'upn': upn})
649         service_creds = self.get_service_creds()
650
651         user_name = client_creds.get_username()
652         realm = client_creds.get_realm().lower()
653         client_account = f'{user_name}@{realm}'
654
655         expected_cname = self.PrincipalName_create(
656             name_type=NT_PRINCIPAL,
657             names=[user_name])
658
659         kdc_options = 'canonicalize'
660
661         tgt = self.get_tgt(client_creds,
662                            client_account=client_account,
663                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
664                            expected_cname=expected_cname,
665                            expected_account_name=user_name,
666                            kdc_options=kdc_options)
667
668         self._make_tgs_request(
669             client_creds, service_creds, tgt,
670             client_account=client_account,
671             client_name_type=NT_ENTERPRISE_PRINCIPAL,
672             expected_cname=expected_cname,
673             expected_account_name=user_name,
674             kdc_options=kdc_options)
675
676     def test_request_enterprise_no_canon(self):
677         upn = self.get_new_username()
678         client_creds = self.get_cached_creds(
679             account_type=self.AccountType.USER,
680             opts={'upn': upn})
681         service_creds = self.get_service_creds()
682
683         user_name = client_creds.get_username()
684         realm = client_creds.get_realm()
685         client_account = f'{user_name}@{realm}'
686
687         kdc_options = '0'
688
689         tgt = self.get_tgt(client_creds,
690                            client_account=client_account,
691                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
692                            expected_account_name=user_name,
693                            kdc_options=kdc_options)
694
695         self._make_tgs_request(
696             client_creds, service_creds, tgt,
697             client_account=client_account,
698             client_name_type=NT_ENTERPRISE_PRINCIPAL,
699             expected_account_name=user_name,
700             kdc_options=kdc_options)
701
702     def test_request_enterprise_no_canon_case(self):
703         upn = self.get_new_username()
704         client_creds = self.get_cached_creds(
705             account_type=self.AccountType.USER,
706             opts={'upn': upn})
707         service_creds = self.get_service_creds()
708
709         user_name = client_creds.get_username()
710         realm = client_creds.get_realm().lower()
711         client_account = f'{user_name}@{realm}'
712
713         kdc_options = '0'
714
715         tgt = self.get_tgt(client_creds,
716                            client_account=client_account,
717                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
718                            expected_account_name=user_name,
719                            kdc_options=kdc_options)
720
721         self._make_tgs_request(
722             client_creds, service_creds, tgt,
723             client_account=client_account,
724             client_name_type=NT_ENTERPRISE_PRINCIPAL,
725             expected_account_name=user_name,
726             kdc_options=kdc_options)
727
728     def test_request_enterprise_no_canon_mac(self):
729         upn = self.get_new_username()
730         client_creds = self.get_cached_creds(
731             account_type=self.AccountType.COMPUTER,
732             opts={'upn': upn})
733         service_creds = self.get_service_creds()
734
735         user_name = client_creds.get_username()
736         realm = client_creds.get_realm()
737         client_account = f'{user_name}@{realm}'
738
739         kdc_options = '0'
740
741         tgt = self.get_tgt(client_creds,
742                            client_account=client_account,
743                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
744                            expected_account_name=user_name,
745                            kdc_options=kdc_options)
746
747         self._make_tgs_request(
748             client_creds, service_creds, tgt,
749             client_account=client_account,
750             client_name_type=NT_ENTERPRISE_PRINCIPAL,
751             expected_account_name=user_name,
752             kdc_options=kdc_options)
753
754     def test_request_enterprise_no_canon_case_mac(self):
755         upn = self.get_new_username()
756         client_creds = self.get_cached_creds(
757             account_type=self.AccountType.COMPUTER,
758             opts={'upn': upn})
759         service_creds = self.get_service_creds()
760
761         user_name = client_creds.get_username()
762         realm = client_creds.get_realm().lower()
763         client_account = f'{user_name}@{realm}'
764
765         kdc_options = '0'
766
767         tgt = self.get_tgt(client_creds,
768                            client_account=client_account,
769                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
770                            expected_account_name=user_name,
771                            kdc_options=kdc_options)
772
773         self._make_tgs_request(
774             client_creds, service_creds, tgt,
775             client_account=client_account,
776             client_name_type=NT_ENTERPRISE_PRINCIPAL,
777             expected_account_name=user_name,
778             kdc_options=kdc_options)
779
780     def test_client_no_auth_data_required(self):
781         client_creds = self.get_cached_creds(
782             account_type=self.AccountType.USER,
783             opts={'no_auth_data_required': True})
784         service_creds = self.get_service_creds()
785
786         tgt = self.get_tgt(client_creds)
787
788         pac = self.get_ticket_pac(tgt)
789         self.assertIsNotNone(pac)
790
791         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
792
793         pac = self.get_ticket_pac(ticket)
794         self.assertIsNotNone(pac)
795
796     def test_no_pac_client_no_auth_data_required(self):
797         client_creds = self.get_cached_creds(
798             account_type=self.AccountType.USER,
799             opts={'no_auth_data_required': True})
800         service_creds = self.get_service_creds()
801
802         tgt = self.get_tgt(client_creds)
803
804         pac = self.get_ticket_pac(tgt)
805         self.assertIsNotNone(pac)
806
807         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
808                                         pac_request=False, expect_pac=True)
809
810         pac = self.get_ticket_pac(ticket)
811         self.assertIsNotNone(pac)
812
813     def test_service_no_auth_data_required(self):
814         client_creds = self.get_client_creds()
815         service_creds = self.get_cached_creds(
816             account_type=self.AccountType.COMPUTER,
817             opts={'no_auth_data_required': True})
818
819         tgt = self.get_tgt(client_creds)
820
821         pac = self.get_ticket_pac(tgt)
822         self.assertIsNotNone(pac)
823
824         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
825                                         expect_pac=False)
826
827         pac = self.get_ticket_pac(ticket, expect_pac=False)
828         self.assertIsNone(pac)
829
830     def test_no_pac_service_no_auth_data_required(self):
831         client_creds = self.get_client_creds()
832         service_creds = self.get_cached_creds(
833             account_type=self.AccountType.COMPUTER,
834             opts={'no_auth_data_required': True})
835
836         tgt = self.get_tgt(client_creds, pac_request=False)
837
838         pac = self.get_ticket_pac(tgt)
839         self.assertIsNotNone(pac)
840
841         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
842                                         pac_request=False, expect_pac=False)
843
844         pac = self.get_ticket_pac(ticket, expect_pac=False)
845         self.assertIsNone(pac)
846
847     def test_remove_pac_service_no_auth_data_required(self):
848         client_creds = self.get_client_creds()
849         service_creds = self.get_cached_creds(
850             account_type=self.AccountType.COMPUTER,
851             opts={'no_auth_data_required': True})
852
853         tgt = self.modified_ticket(self.get_tgt(client_creds),
854                                    exclude_pac=True)
855
856         pac = self.get_ticket_pac(tgt, expect_pac=False)
857         self.assertIsNone(pac)
858
859         self._make_tgs_request(client_creds, service_creds, tgt,
860                                expect_error=True)
861
862     def test_remove_pac_client_no_auth_data_required(self):
863         client_creds = self.get_cached_creds(
864             account_type=self.AccountType.USER,
865             opts={'no_auth_data_required': True})
866         service_creds = self.get_service_creds()
867
868         tgt = self.modified_ticket(self.get_tgt(client_creds),
869                                    exclude_pac=True)
870
871         pac = self.get_ticket_pac(tgt, expect_pac=False)
872         self.assertIsNone(pac)
873
874         self._make_tgs_request(client_creds, service_creds, tgt,
875                                expect_error=True)
876
877     def test_remove_pac(self):
878         client_creds = self.get_client_creds()
879         service_creds = self.get_service_creds()
880
881         tgt = self.modified_ticket(self.get_tgt(client_creds),
882                                    exclude_pac=True)
883
884         pac = self.get_ticket_pac(tgt, expect_pac=False)
885         self.assertIsNone(pac)
886
887         self._make_tgs_request(client_creds, service_creds, tgt,
888                                expect_error=True)
889
890     def test_upn_dns_info_ex_user(self):
891         client_creds = self.get_client_creds()
892         self._run_upn_dns_info_ex_test(client_creds)
893
894     def test_upn_dns_info_ex_mac(self):
895         mach_creds = self.get_mach_creds()
896         self._run_upn_dns_info_ex_test(mach_creds)
897
898     def test_upn_dns_info_ex_upn_user(self):
899         client_creds = self.get_cached_creds(
900             account_type=self.AccountType.USER,
901             opts={'upn': 'upn_dns_info_test_upn0@bar'})
902         self._run_upn_dns_info_ex_test(client_creds)
903
904     def test_upn_dns_info_ex_upn_mac(self):
905         mach_creds = self.get_cached_creds(
906             account_type=self.AccountType.COMPUTER,
907             opts={'upn': 'upn_dns_info_test_upn1@bar'})
908         self._run_upn_dns_info_ex_test(mach_creds)
909
910     def _run_upn_dns_info_ex_test(self, client_creds):
911         service_creds = self.get_service_creds()
912
913         account_name = client_creds.get_username()
914         upn_name = client_creds.get_upn()
915         if upn_name is None:
916             realm = client_creds.get_realm().lower()
917             upn_name = f'{account_name}@{realm}'
918         sid = client_creds.get_sid()
919
920         tgt = self.get_tgt(client_creds,
921                            expected_account_name=account_name,
922                            expected_upn_name=upn_name,
923                            expected_sid=sid)
924
925         self._make_tgs_request(client_creds, service_creds, tgt,
926                                expected_account_name=account_name,
927                                expected_upn_name=upn_name,
928                                expected_sid=sid)
929
930     # Test making a TGS request.
931     def test_tgs_req(self):
932         creds = self._get_creds()
933         tgt = self._get_tgt(creds)
934         self._run_tgs(tgt, creds, expected_error=0)
935
936     def test_renew_req(self):
937         creds = self._get_creds()
938         tgt = self._get_tgt(creds, renewable=True)
939         self._renew_tgt(tgt, creds, expected_error=0,
940                         expect_pac_attrs=True,
941                         expect_pac_attrs_pac_request=True,
942                         expect_requester_sid=True)
943
944     def test_validate_req(self):
945         creds = self._get_creds()
946         tgt = self._get_tgt(creds, invalid=True)
947         self._validate_tgt(tgt, creds, expected_error=0,
948                            expect_pac_attrs=True,
949                            expect_pac_attrs_pac_request=True,
950                            expect_requester_sid=True)
951
952     def test_s4u2self_req(self):
953         creds = self._get_creds()
954         tgt = self._get_tgt(creds)
955         self._s4u2self(tgt, creds, expected_error=0)
956
957     def test_user2user_req(self):
958         creds = self._get_creds()
959         tgt = self._get_tgt(creds)
960         self._user2user(tgt, creds, expected_error=0)
961
962     def test_fast_req(self):
963         creds = self._get_creds()
964         tgt = self._get_tgt(creds)
965         self._fast(tgt, creds, expected_error=0)
966
967     def test_tgs_req_invalid(self):
968         creds = self._get_creds()
969         tgt = self._get_tgt(creds, invalid=True)
970         self._run_tgs(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
971
972     def test_s4u2self_req_invalid(self):
973         creds = self._get_creds()
974         tgt = self._get_tgt(creds, invalid=True)
975         self._s4u2self(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
976
977     def test_user2user_req_invalid(self):
978         creds = self._get_creds()
979         tgt = self._get_tgt(creds, invalid=True)
980         self._user2user(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
981
982     def test_fast_req_invalid(self):
983         creds = self._get_creds()
984         tgt = self._get_tgt(creds, invalid=True)
985         self._fast(tgt, creds, expected_error=KRB_ERR_TKT_NYV,
986                    expected_sname=self.get_krbtgt_sname())
987
988     def test_tgs_req_no_requester_sid(self):
989         creds = self._get_creds()
990         tgt = self._get_tgt(creds, remove_requester_sid=True)
991
992         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
993
994     def test_tgs_req_no_pac_attrs(self):
995         creds = self._get_creds()
996         tgt = self._get_tgt(creds, remove_pac_attrs=True)
997
998         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
999                       expect_pac_attrs=False)
1000
1001     def test_tgs_req_from_rodc_no_requester_sid(self):
1002         creds = self._get_creds(replication_allowed=True,
1003                                 revealed_to_rodc=True)
1004         tgt = self._get_tgt(creds, from_rodc=True, remove_requester_sid=True)
1005
1006         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1007
1008     def test_tgs_req_from_rodc_no_pac_attrs(self):
1009         creds = self._get_creds(replication_allowed=True,
1010                                 revealed_to_rodc=True)
1011         tgt = self._get_tgt(creds, from_rodc=True, remove_pac_attrs=True)
1012         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
1013                       expect_pac_attrs=False)
1014
1015     # Test making a request without a PAC.
1016     def test_tgs_no_pac(self):
1017         creds = self._get_creds()
1018         tgt = self._get_tgt(creds, remove_pac=True)
1019         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1020
1021     def test_renew_no_pac(self):
1022         creds = self._get_creds()
1023         tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
1024         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1025
1026     def test_validate_no_pac(self):
1027         creds = self._get_creds()
1028         tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
1029         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1030
1031     def test_s4u2self_no_pac(self):
1032         creds = self._get_creds()
1033         tgt = self._get_tgt(creds, remove_pac=True)
1034         self._s4u2self(tgt, creds,
1035                        expected_error=KDC_ERR_TGT_REVOKED,
1036                        expect_edata=False)
1037
1038     def test_user2user_no_pac(self):
1039         creds = self._get_creds()
1040         tgt = self._get_tgt(creds, remove_pac=True)
1041         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1042
1043     def test_fast_no_pac(self):
1044         creds = self._get_creds()
1045         tgt = self._get_tgt(creds, remove_pac=True)
1046         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1047                    expected_sname=self.get_krbtgt_sname())
1048
1049     # Test making a request with authdata and without a PAC.
1050     def test_tgs_authdata_no_pac(self):
1051         creds = self._get_creds()
1052         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1053         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1054
1055     def test_renew_authdata_no_pac(self):
1056         creds = self._get_creds()
1057         tgt = self._get_tgt(creds, renewable=True, remove_pac=True,
1058                             allow_empty_authdata=True)
1059         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1060
1061     def test_validate_authdata_no_pac(self):
1062         creds = self._get_creds()
1063         tgt = self._get_tgt(creds, invalid=True, remove_pac=True,
1064                             allow_empty_authdata=True)
1065         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1066
1067     def test_s4u2self_authdata_no_pac(self):
1068         creds = self._get_creds()
1069         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1070         self._s4u2self(tgt, creds,
1071                        expected_error=KDC_ERR_TGT_REVOKED,
1072                        expect_edata=False)
1073
1074     def test_user2user_authdata_no_pac(self):
1075         creds = self._get_creds()
1076         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1077         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1078
1079     def test_fast_authdata_no_pac(self):
1080         creds = self._get_creds()
1081         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
1082         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1083                    expected_sname=self.get_krbtgt_sname())
1084
1085     # Test changing the SID in the PAC to that of another account.
1086     def test_tgs_sid_mismatch_existing(self):
1087         creds = self._get_creds()
1088         existing_rid = self._get_existing_rid()
1089         tgt = self._get_tgt(creds, new_rid=existing_rid)
1090         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1091
1092     def test_renew_sid_mismatch_existing(self):
1093         creds = self._get_creds()
1094         existing_rid = self._get_existing_rid()
1095         tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
1096         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1097
1098     def test_validate_sid_mismatch_existing(self):
1099         creds = self._get_creds()
1100         existing_rid = self._get_existing_rid()
1101         tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
1102         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1103
1104     def test_s4u2self_sid_mismatch_existing(self):
1105         creds = self._get_creds()
1106         existing_rid = self._get_existing_rid()
1107         tgt = self._get_tgt(creds, new_rid=existing_rid)
1108         self._s4u2self(tgt, creds,
1109                        expected_error=KDC_ERR_TGT_REVOKED)
1110
1111     def test_user2user_sid_mismatch_existing(self):
1112         creds = self._get_creds()
1113         existing_rid = self._get_existing_rid()
1114         tgt = self._get_tgt(creds, new_rid=existing_rid)
1115         self._user2user(tgt, creds,
1116                         expected_error=KDC_ERR_TGT_REVOKED)
1117
1118     def test_fast_sid_mismatch_existing(self):
1119         creds = self._get_creds()
1120         existing_rid = self._get_existing_rid()
1121         tgt = self._get_tgt(creds, new_rid=existing_rid)
1122         self._fast(tgt, creds,
1123                    expected_error=KDC_ERR_TGT_REVOKED,
1124                    expected_sname=self.get_krbtgt_sname())
1125
1126     def test_requester_sid_mismatch_existing(self):
1127         creds = self._get_creds()
1128         existing_rid = self._get_existing_rid()
1129         tgt = self._get_tgt(creds, new_rid=existing_rid,
1130                             can_modify_logon_info=False)
1131         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1132
1133     def test_logon_info_sid_mismatch_existing(self):
1134         creds = self._get_creds()
1135         existing_rid = self._get_existing_rid()
1136         tgt = self._get_tgt(creds, new_rid=existing_rid,
1137                             can_modify_requester_sid=False)
1138         self._run_tgs(tgt, creds, expected_error=0)
1139
1140     def test_logon_info_only_sid_mismatch_existing(self):
1141         creds = self._get_creds()
1142         existing_rid = self._get_existing_rid()
1143         tgt = self._get_tgt(creds, new_rid=existing_rid,
1144                             remove_requester_sid=True)
1145         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1146
1147     # Test changing the SID in the PAC to a non-existent one.
1148     def test_tgs_sid_mismatch_nonexisting(self):
1149         creds = self._get_creds()
1150         nonexistent_rid = self._get_non_existent_rid()
1151         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1152         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1153
1154     def test_renew_sid_mismatch_nonexisting(self):
1155         creds = self._get_creds()
1156         nonexistent_rid = self._get_non_existent_rid()
1157         tgt = self._get_tgt(creds, renewable=True,
1158                             new_rid=nonexistent_rid)
1159         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1160
1161     def test_validate_sid_mismatch_nonexisting(self):
1162         creds = self._get_creds()
1163         nonexistent_rid = self._get_non_existent_rid()
1164         tgt = self._get_tgt(creds, invalid=True,
1165                             new_rid=nonexistent_rid)
1166         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1167
1168     def test_s4u2self_sid_mismatch_nonexisting(self):
1169         creds = self._get_creds()
1170         nonexistent_rid = self._get_non_existent_rid()
1171         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1172         self._s4u2self(tgt, creds,
1173                        expected_error=KDC_ERR_TGT_REVOKED)
1174
1175     def test_user2user_sid_mismatch_nonexisting(self):
1176         creds = self._get_creds()
1177         nonexistent_rid = self._get_non_existent_rid()
1178         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1179         self._user2user(tgt, creds,
1180                         expected_error=KDC_ERR_TGT_REVOKED)
1181
1182     def test_fast_sid_mismatch_nonexisting(self):
1183         creds = self._get_creds()
1184         nonexistent_rid = self._get_non_existent_rid()
1185         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
1186         self._fast(tgt, creds,
1187                    expected_error=KDC_ERR_TGT_REVOKED,
1188                    expected_sname=self.get_krbtgt_sname())
1189
1190     def test_requester_sid_mismatch_nonexisting(self):
1191         creds = self._get_creds()
1192         nonexistent_rid = self._get_non_existent_rid()
1193         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1194                             can_modify_logon_info=False)
1195         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1196
1197     def test_logon_info_sid_mismatch_nonexisting(self):
1198         creds = self._get_creds()
1199         nonexistent_rid = self._get_non_existent_rid()
1200         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1201                             can_modify_requester_sid=False)
1202         self._run_tgs(tgt, creds, expected_error=0)
1203
1204     def test_logon_info_only_sid_mismatch_nonexisting(self):
1205         creds = self._get_creds()
1206         nonexistent_rid = self._get_non_existent_rid()
1207         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1208                             remove_requester_sid=True)
1209         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1210
1211     # Test with an RODC-issued ticket where the client is revealed to the RODC.
1212     def test_tgs_rodc_revealed(self):
1213         creds = self._get_creds(replication_allowed=True,
1214                                 revealed_to_rodc=True)
1215         tgt = self._get_tgt(creds, from_rodc=True)
1216         self._run_tgs(tgt, creds, expected_error=0)
1217
1218     def test_renew_rodc_revealed(self):
1219         creds = self._get_creds(replication_allowed=True,
1220                                 revealed_to_rodc=True)
1221         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1222         self._renew_tgt(tgt, creds, expected_error=0,
1223                         expect_pac_attrs=False,
1224                         expect_requester_sid=True)
1225
1226     def test_validate_rodc_revealed(self):
1227         creds = self._get_creds(replication_allowed=True,
1228                                 revealed_to_rodc=True)
1229         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1230         self._validate_tgt(tgt, creds, expected_error=0,
1231                            expect_pac_attrs=False,
1232                            expect_requester_sid=True)
1233
1234     # This test fails on Windows, which gives KDC_ERR_C_PRINCIPAL_UNKNOWN when
1235     # attempting to use S4U2Self with a TGT from an RODC.
1236     def test_s4u2self_rodc_revealed(self):
1237         creds = self._get_creds(replication_allowed=True,
1238                                 revealed_to_rodc=True)
1239         tgt = self._get_tgt(creds, from_rodc=True)
1240         self._s4u2self(tgt, creds,
1241                        expected_error=KDC_ERR_C_PRINCIPAL_UNKNOWN)
1242
1243     def test_user2user_rodc_revealed(self):
1244         creds = self._get_creds(replication_allowed=True,
1245                                 revealed_to_rodc=True)
1246         tgt = self._get_tgt(creds, from_rodc=True)
1247         self._user2user(tgt, creds, expected_error=0)
1248
1249     # Test with an RODC-issued ticket where the SID in the PAC is changed to
1250     # that of another account.
1251     def test_tgs_rodc_sid_mismatch_existing(self):
1252         creds = self._get_creds(replication_allowed=True,
1253                                 revealed_to_rodc=True)
1254         existing_rid = self._get_existing_rid(replication_allowed=True,
1255                                               revealed_to_rodc=True)
1256         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1257         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1258
1259     def test_renew_rodc_sid_mismatch_existing(self):
1260         creds = self._get_creds(replication_allowed=True,
1261                                 revealed_to_rodc=True)
1262         existing_rid = self._get_existing_rid(replication_allowed=True,
1263                                               revealed_to_rodc=True)
1264         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1265                             new_rid=existing_rid)
1266         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1267
1268     def test_validate_rodc_sid_mismatch_existing(self):
1269         creds = self._get_creds(replication_allowed=True,
1270                                 revealed_to_rodc=True)
1271         existing_rid = self._get_existing_rid(replication_allowed=True,
1272                                        revealed_to_rodc=True)
1273         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1274                             new_rid=existing_rid)
1275         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1276
1277     def test_s4u2self_rodc_sid_mismatch_existing(self):
1278         creds = self._get_creds(replication_allowed=True,
1279                                 revealed_to_rodc=True)
1280         existing_rid = self._get_existing_rid(replication_allowed=True,
1281                                               revealed_to_rodc=True)
1282         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1283         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1284
1285     def test_user2user_rodc_sid_mismatch_existing(self):
1286         creds = self._get_creds(replication_allowed=True,
1287                                 revealed_to_rodc=True)
1288         existing_rid = self._get_existing_rid(replication_allowed=True,
1289                                               revealed_to_rodc=True)
1290         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1291         self._user2user(tgt, creds,
1292                         expected_error=KDC_ERR_TGT_REVOKED)
1293
1294     def test_fast_rodc_sid_mismatch_existing(self):
1295         creds = self._get_creds(replication_allowed=True,
1296                                 revealed_to_rodc=True)
1297         existing_rid = self._get_existing_rid(replication_allowed=True,
1298                                               revealed_to_rodc=True)
1299         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1300         self._fast(tgt, creds,
1301                    expected_error=KDC_ERR_TGT_REVOKED,
1302                    expected_sname=self.get_krbtgt_sname())
1303
1304     def test_tgs_rodc_requester_sid_mismatch_existing(self):
1305         creds = self._get_creds(replication_allowed=True,
1306                                 revealed_to_rodc=True)
1307         existing_rid = self._get_existing_rid(replication_allowed=True,
1308                                               revealed_to_rodc=True)
1309         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1310                             can_modify_logon_info=False)
1311         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1312
1313     def test_tgs_rodc_logon_info_sid_mismatch_existing(self):
1314         creds = self._get_creds(replication_allowed=True,
1315                                 revealed_to_rodc=True)
1316         existing_rid = self._get_existing_rid(replication_allowed=True,
1317                                               revealed_to_rodc=True)
1318         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1319                             can_modify_requester_sid=False)
1320         self._run_tgs(tgt, creds, expected_error=0)
1321
1322     def test_tgs_rodc_logon_info_only_sid_mismatch_existing(self):
1323         creds = self._get_creds(replication_allowed=True,
1324                                 revealed_to_rodc=True)
1325         existing_rid = self._get_existing_rid(replication_allowed=True,
1326                                               revealed_to_rodc=True)
1327         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1328                             remove_requester_sid=True)
1329         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1330
1331     # Test with an RODC-issued ticket where the SID in the PAC is changed to a
1332     # non-existent one.
1333     def test_tgs_rodc_sid_mismatch_nonexisting(self):
1334         creds = self._get_creds(replication_allowed=True,
1335                                 revealed_to_rodc=True)
1336         nonexistent_rid = self._get_non_existent_rid()
1337         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1338         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1339
1340     def test_renew_rodc_sid_mismatch_nonexisting(self):
1341         creds = self._get_creds(replication_allowed=True,
1342                                 revealed_to_rodc=True)
1343         nonexistent_rid = self._get_non_existent_rid()
1344         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1345                             new_rid=nonexistent_rid)
1346         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1347
1348     def test_validate_rodc_sid_mismatch_nonexisting(self):
1349         creds = self._get_creds(replication_allowed=True,
1350                                 revealed_to_rodc=True)
1351         nonexistent_rid = self._get_non_existent_rid()
1352         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1353                             new_rid=nonexistent_rid)
1354         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1355
1356     def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
1357         creds = self._get_creds(replication_allowed=True,
1358                                 revealed_to_rodc=True)
1359         nonexistent_rid = self._get_non_existent_rid()
1360         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1361         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1362
1363     def test_user2user_rodc_sid_mismatch_nonexisting(self):
1364         creds = self._get_creds(replication_allowed=True,
1365                                 revealed_to_rodc=True)
1366         nonexistent_rid = self._get_non_existent_rid()
1367         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1368         self._user2user(tgt, creds,
1369                         expected_error=KDC_ERR_TGT_REVOKED)
1370
1371     def test_fast_rodc_sid_mismatch_nonexisting(self):
1372         creds = self._get_creds(replication_allowed=True,
1373                                 revealed_to_rodc=True)
1374         nonexistent_rid = self._get_non_existent_rid()
1375         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1376         self._fast(tgt, creds,
1377                    expected_error=KDC_ERR_TGT_REVOKED,
1378                    expected_sname=self.get_krbtgt_sname())
1379
1380     def test_tgs_rodc_requester_sid_mismatch_nonexisting(self):
1381         creds = self._get_creds(replication_allowed=True,
1382                                 revealed_to_rodc=True)
1383         nonexistent_rid = self._get_non_existent_rid()
1384         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1385                             can_modify_logon_info=False)
1386         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1387
1388     def test_tgs_rodc_logon_info_sid_mismatch_nonexisting(self):
1389         creds = self._get_creds(replication_allowed=True,
1390                                 revealed_to_rodc=True)
1391         nonexistent_rid = self._get_non_existent_rid()
1392         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1393                             can_modify_requester_sid=False)
1394         self._run_tgs(tgt, creds, expected_error=0)
1395
1396     def test_tgs_rodc_logon_info_only_sid_mismatch_nonexisting(self):
1397         creds = self._get_creds(replication_allowed=True,
1398                                 revealed_to_rodc=True)
1399         nonexistent_rid = self._get_non_existent_rid()
1400         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1401                             remove_requester_sid=True)
1402         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1403
1404     # Test with an RODC-issued ticket where the client is not revealed to the
1405     # RODC.
1406     def test_tgs_rodc_not_revealed(self):
1407         creds = self._get_creds(replication_allowed=True)
1408         tgt = self._get_tgt(creds, from_rodc=True)
1409         # TODO: error code
1410         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1411
1412     def test_renew_rodc_not_revealed(self):
1413         creds = self._get_creds(replication_allowed=True)
1414         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1415         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1416
1417     def test_validate_rodc_not_revealed(self):
1418         creds = self._get_creds(replication_allowed=True)
1419         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1420         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1421
1422     def test_s4u2self_rodc_not_revealed(self):
1423         creds = self._get_creds(replication_allowed=True)
1424         tgt = self._get_tgt(creds, from_rodc=True)
1425         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1426
1427     def test_user2user_rodc_not_revealed(self):
1428         creds = self._get_creds(replication_allowed=True)
1429         tgt = self._get_tgt(creds, from_rodc=True)
1430         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1431
1432     # Test with an RODC-issued ticket where the RODC account does not have the
1433     # PARTIAL_SECRETS bit set.
1434     def test_tgs_rodc_no_partial_secrets(self):
1435         creds = self._get_creds(replication_allowed=True,
1436                                 revealed_to_rodc=True)
1437         tgt = self._get_tgt(creds, from_rodc=True)
1438         self._remove_rodc_partial_secrets()
1439         self._run_tgs(tgt, creds, expected_error=KDC_ERR_POLICY)
1440
1441     def test_renew_rodc_no_partial_secrets(self):
1442         creds = self._get_creds(replication_allowed=True,
1443                                 revealed_to_rodc=True)
1444         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1445         self._remove_rodc_partial_secrets()
1446         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_POLICY)
1447
1448     def test_validate_rodc_no_partial_secrets(self):
1449         creds = self._get_creds(replication_allowed=True,
1450                                 revealed_to_rodc=True)
1451         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1452         self._remove_rodc_partial_secrets()
1453         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_POLICY)
1454
1455     def test_s4u2self_rodc_no_partial_secrets(self):
1456         creds = self._get_creds(replication_allowed=True,
1457                                 revealed_to_rodc=True)
1458         tgt = self._get_tgt(creds, from_rodc=True)
1459         self._remove_rodc_partial_secrets()
1460         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1461
1462     def test_user2user_rodc_no_partial_secrets(self):
1463         creds = self._get_creds(replication_allowed=True,
1464                                 revealed_to_rodc=True)
1465         tgt = self._get_tgt(creds, from_rodc=True)
1466         self._remove_rodc_partial_secrets()
1467         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1468
1469     def test_fast_rodc_no_partial_secrets(self):
1470         creds = self._get_creds(replication_allowed=True,
1471                                 revealed_to_rodc=True)
1472         tgt = self._get_tgt(creds, from_rodc=True)
1473         self._remove_rodc_partial_secrets()
1474         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1475                    expected_sname=self.get_krbtgt_sname())
1476
1477     # Test with an RODC-issued ticket where the RODC account does not have an
1478     # msDS-KrbTgtLink.
1479     def test_tgs_rodc_no_krbtgt_link(self):
1480         creds = self._get_creds(replication_allowed=True,
1481                                 revealed_to_rodc=True)
1482         tgt = self._get_tgt(creds, from_rodc=True)
1483         self._remove_rodc_krbtgt_link()
1484         self._run_tgs(tgt, creds, expected_error=KDC_ERR_POLICY)
1485
1486     def test_renew_rodc_no_krbtgt_link(self):
1487         creds = self._get_creds(replication_allowed=True,
1488                                 revealed_to_rodc=True)
1489         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1490         self._remove_rodc_krbtgt_link()
1491         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_POLICY)
1492
1493     def test_validate_rodc_no_krbtgt_link(self):
1494         creds = self._get_creds(replication_allowed=True,
1495                                 revealed_to_rodc=True)
1496         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1497         self._remove_rodc_krbtgt_link()
1498         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_POLICY)
1499
1500     def test_s4u2self_rodc_no_krbtgt_link(self):
1501         creds = self._get_creds(replication_allowed=True,
1502                                 revealed_to_rodc=True)
1503         tgt = self._get_tgt(creds, from_rodc=True)
1504         self._remove_rodc_krbtgt_link()
1505         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1506
1507     def test_user2user_rodc_no_krbtgt_link(self):
1508         creds = self._get_creds(replication_allowed=True,
1509                                 revealed_to_rodc=True)
1510         tgt = self._get_tgt(creds, from_rodc=True)
1511         self._remove_rodc_krbtgt_link()
1512         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1513
1514     def test_fast_rodc_no_krbtgt_link(self):
1515         creds = self._get_creds(replication_allowed=True,
1516                                 revealed_to_rodc=True)
1517         tgt = self._get_tgt(creds, from_rodc=True)
1518         self._remove_rodc_krbtgt_link()
1519         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1520                    expected_sname=self.get_krbtgt_sname())
1521
1522     # Test with an RODC-issued ticket where the client is not allowed to
1523     # replicate to the RODC.
1524     def test_tgs_rodc_not_allowed(self):
1525         creds = self._get_creds(revealed_to_rodc=True)
1526         tgt = self._get_tgt(creds, from_rodc=True)
1527         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1528
1529     def test_renew_rodc_not_allowed(self):
1530         creds = self._get_creds(revealed_to_rodc=True)
1531         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1532         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1533
1534     def test_validate_rodc_not_allowed(self):
1535         creds = self._get_creds(revealed_to_rodc=True)
1536         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1537         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1538
1539     def test_s4u2self_rodc_not_allowed(self):
1540         creds = self._get_creds(revealed_to_rodc=True)
1541         tgt = self._get_tgt(creds, from_rodc=True)
1542         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1543
1544     def test_user2user_rodc_not_allowed(self):
1545         creds = self._get_creds(revealed_to_rodc=True)
1546         tgt = self._get_tgt(creds, from_rodc=True)
1547         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1548
1549     def test_fast_rodc_not_allowed(self):
1550         creds = self._get_creds(revealed_to_rodc=True)
1551         tgt = self._get_tgt(creds, from_rodc=True)
1552         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1553                    expected_sname=self.get_krbtgt_sname())
1554
1555     # Test with an RODC-issued ticket where the client is denied from
1556     # replicating to the RODC.
1557     def test_tgs_rodc_denied(self):
1558         creds = self._get_creds(replication_denied=True,
1559                                 revealed_to_rodc=True)
1560         tgt = self._get_tgt(creds, from_rodc=True)
1561         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1562
1563     def test_renew_rodc_denied(self):
1564         creds = self._get_creds(replication_denied=True,
1565                                 revealed_to_rodc=True)
1566         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1567         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1568
1569     def test_validate_rodc_denied(self):
1570         creds = self._get_creds(replication_denied=True,
1571                                 revealed_to_rodc=True)
1572         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1573         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1574
1575     def test_s4u2self_rodc_denied(self):
1576         creds = self._get_creds(replication_denied=True,
1577                                 revealed_to_rodc=True)
1578         tgt = self._get_tgt(creds, from_rodc=True)
1579         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1580
1581     def test_user2user_rodc_denied(self):
1582         creds = self._get_creds(replication_denied=True,
1583                                 revealed_to_rodc=True)
1584         tgt = self._get_tgt(creds, from_rodc=True)
1585         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1586
1587     def test_fast_rodc_denied(self):
1588         creds = self._get_creds(replication_denied=True,
1589                                 revealed_to_rodc=True)
1590         tgt = self._get_tgt(creds, from_rodc=True)
1591         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1592                    expected_sname=self.get_krbtgt_sname())
1593
1594     # Test with an RODC-issued ticket where the client is both allowed and
1595     # denied replicating to the RODC.
1596     def test_tgs_rodc_allowed_denied(self):
1597         creds = self._get_creds(replication_allowed=True,
1598                                 replication_denied=True,
1599                                 revealed_to_rodc=True)
1600         tgt = self._get_tgt(creds, from_rodc=True)
1601         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1602
1603     def test_renew_rodc_allowed_denied(self):
1604         creds = self._get_creds(replication_allowed=True,
1605                                 replication_denied=True,
1606                                 revealed_to_rodc=True)
1607         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1608         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1609
1610     def test_validate_rodc_allowed_denied(self):
1611         creds = self._get_creds(replication_allowed=True,
1612                                 replication_denied=True,
1613                                 revealed_to_rodc=True)
1614         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1615         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1616
1617     def test_s4u2self_rodc_allowed_denied(self):
1618         creds = self._get_creds(replication_allowed=True,
1619                                 replication_denied=True,
1620                                 revealed_to_rodc=True)
1621         tgt = self._get_tgt(creds, from_rodc=True)
1622         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1623
1624     def test_user2user_rodc_allowed_denied(self):
1625         creds = self._get_creds(replication_allowed=True,
1626                                 replication_denied=True,
1627                                 revealed_to_rodc=True)
1628         tgt = self._get_tgt(creds, from_rodc=True)
1629         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1630
1631     def test_fast_rodc_allowed_denied(self):
1632         creds = self._get_creds(replication_allowed=True,
1633                                 replication_denied=True,
1634                                 revealed_to_rodc=True)
1635         tgt = self._get_tgt(creds, from_rodc=True)
1636         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1637                    expected_sname=self.get_krbtgt_sname())
1638
1639     # Test making a TGS request with an RC4-encrypted TGT.
1640     def test_tgs_rc4(self):
1641         creds = self._get_creds()
1642         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1643         self._run_tgs(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1644                                            KDC_ERR_BADKEYVER),
1645                       expect_edata=True,
1646                       # We aren’t particular about whether or not we get an
1647                       # NTSTATUS.
1648                       expect_status=None,
1649                       expected_status=ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1650
1651     def test_renew_rc4(self):
1652         creds = self._get_creds()
1653         tgt = self._get_tgt(creds, renewable=True, etype=kcrypto.Enctype.RC4)
1654         self._renew_tgt(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1655                                                     KDC_ERR_BADKEYVER),
1656                         expect_pac_attrs=True,
1657                         expect_pac_attrs_pac_request=True,
1658                         expect_requester_sid=True)
1659
1660     def test_validate_rc4(self):
1661         creds = self._get_creds()
1662         tgt = self._get_tgt(creds, invalid=True, etype=kcrypto.Enctype.RC4)
1663         self._validate_tgt(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1664                                                        KDC_ERR_BADKEYVER),
1665                            expect_pac_attrs=True,
1666                            expect_pac_attrs_pac_request=True,
1667                            expect_requester_sid=True)
1668
1669     def test_s4u2self_rc4(self):
1670         creds = self._get_creds()
1671         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1672         self._s4u2self(tgt, creds, expected_error=(KDC_ERR_GENERIC,
1673                                                    KDC_ERR_BADKEYVER),
1674                        expect_edata=True,
1675                        # We aren’t particular about whether or not we get an
1676                        # NTSTATUS.
1677                        expect_status=None,
1678                        expected_status=ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1679
1680     def test_user2user_rc4(self):
1681         creds = self._get_creds()
1682         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1683         self._user2user(tgt, creds, expected_error=KDC_ERR_ETYPE_NOSUPP)
1684
1685     def test_fast_rc4(self):
1686         creds = self._get_creds()
1687         tgt = self._get_tgt(creds, etype=kcrypto.Enctype.RC4)
1688         self._fast(tgt, creds, expected_error=KDC_ERR_GENERIC,
1689                    expect_edata=self.expect_padata_outer)
1690
1691     # Test with a TGT that has the lifetime of a kpasswd ticket (two minutes).
1692     def test_tgs_kpasswd(self):
1693         creds = self._get_creds()
1694         tgt = self.modify_lifetime(self._get_tgt(creds), lifetime=2 * 60)
1695         self._run_tgs(tgt, creds, expected_error=KDC_ERR_TKT_EXPIRED)
1696
1697     def test_renew_kpasswd(self):
1698         creds = self._get_creds()
1699         tgt = self._get_tgt(creds, renewable=True)
1700         tgt = self.modify_lifetime(tgt, lifetime=2 * 60)
1701         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TKT_EXPIRED)
1702
1703     def test_validate_kpasswd(self):
1704         creds = self._get_creds()
1705         tgt = self._get_tgt(creds, invalid=True)
1706         tgt = self.modify_lifetime(tgt, lifetime=2 * 60)
1707         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TKT_EXPIRED)
1708
1709     def test_s4u2self_kpasswd(self):
1710         creds = self._get_creds()
1711         tgt = self.modify_lifetime(self._get_tgt(creds), lifetime=2 * 60)
1712         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TKT_EXPIRED)
1713
1714     def test_user2user_kpasswd(self):
1715         creds = self._get_creds()
1716         tgt = self.modify_lifetime(self._get_tgt(creds), lifetime=2 * 60)
1717         self._user2user(tgt, creds, expected_error=KDC_ERR_TKT_EXPIRED)
1718
1719     def test_fast_kpasswd(self):
1720         creds = self._get_creds()
1721         tgt = self.modify_lifetime(self._get_tgt(creds), lifetime=2 * 60)
1722         self._fast(tgt, creds, expected_error=KDC_ERR_TKT_EXPIRED)
1723
1724     # Test user-to-user with incorrect service principal names.
1725     def test_user2user_matching_sname_host(self):
1726         creds = self._get_creds()
1727         tgt = self._get_tgt(creds)
1728
1729         user_name = creds.get_username()
1730         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1731                                           names=['host', user_name])
1732
1733         self._user2user(tgt, creds, sname=sname,
1734                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1735
1736     def test_user2user_matching_sname_no_host(self):
1737         creds = self._get_creds()
1738         tgt = self._get_tgt(creds)
1739
1740         user_name = creds.get_username()
1741         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1742                                           names=[user_name])
1743
1744         self._user2user(tgt, creds, sname=sname, expected_error=0)
1745
1746     def test_user2user_wrong_sname(self):
1747         creds = self._get_creds()
1748         tgt = self._get_tgt(creds)
1749
1750         other_creds = self._get_mach_creds()
1751         user_name = other_creds.get_username()
1752         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1753                                           names=[user_name])
1754
1755         self._user2user(tgt, creds, sname=sname,
1756                         expected_error=KDC_ERR_BADMATCH)
1757
1758     def test_user2user_other_sname(self):
1759         other_name = self.get_new_username()
1760         spn = f'host/{other_name}'
1761         creds = self.get_cached_creds(
1762             account_type=self.AccountType.COMPUTER,
1763             opts={'spn': spn})
1764         tgt = self._get_tgt(creds)
1765
1766         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1767                                           names=['host', other_name])
1768
1769         self._user2user(tgt, creds, sname=sname, expected_error=0)
1770
1771     def test_user2user_wrong_sname_krbtgt(self):
1772         creds = self._get_creds()
1773         tgt = self._get_tgt(creds)
1774
1775         sname = self.get_krbtgt_sname()
1776
1777         self._user2user(tgt, creds, sname=sname,
1778                         expected_error=KDC_ERR_BADMATCH)
1779
1780     def test_user2user_wrong_srealm(self):
1781         creds = self._get_creds()
1782         tgt = self._get_tgt(creds)
1783
1784         self._user2user(tgt, creds, srealm='OTHER.REALM',
1785                         expected_error=(KDC_ERR_WRONG_REALM,
1786                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1787
1788     def test_user2user_tgt_correct_realm(self):
1789         creds = self._get_creds()
1790         tgt = self._get_tgt(creds)
1791
1792         realm = creds.get_realm().encode('utf-8')
1793         tgt = self._modify_tgt(tgt, realm)
1794
1795         self._user2user(tgt, creds,
1796                         expected_error=0)
1797
1798     def test_user2user_tgt_wrong_realm(self):
1799         creds = self._get_creds()
1800         tgt = self._get_tgt(creds)
1801
1802         tgt = self._modify_tgt(tgt, b'OTHER.REALM')
1803
1804         self._user2user(tgt, creds,
1805                         expected_error=0)
1806
1807     def test_user2user_tgt_correct_cname(self):
1808         creds = self._get_creds()
1809         tgt = self._get_tgt(creds)
1810
1811         user_name = creds.get_username()
1812         user_name = user_name.encode('utf-8')
1813         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1814                                           names=[user_name])
1815
1816         tgt = self._modify_tgt(tgt, cname=cname)
1817
1818         self._user2user(tgt, creds, expected_error=0)
1819
1820     def test_user2user_tgt_other_cname(self):
1821         samdb = self.get_samdb()
1822
1823         other_name = self.get_new_username()
1824         upn = f'{other_name}@{samdb.domain_dns_name()}'
1825
1826         creds = self.get_cached_creds(
1827             account_type=self.AccountType.COMPUTER,
1828             opts={'upn': upn})
1829         tgt = self._get_tgt(creds)
1830
1831         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1832                                           names=[other_name.encode('utf-8')])
1833
1834         tgt = self._modify_tgt(tgt, cname=cname)
1835
1836         self._user2user(tgt, creds, expected_error=0)
1837
1838     def test_user2user_tgt_cname_host(self):
1839         creds = self._get_creds()
1840         tgt = self._get_tgt(creds)
1841
1842         user_name = creds.get_username()
1843         user_name = user_name.encode('utf-8')
1844         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1845                                           names=[b'host', user_name])
1846
1847         tgt = self._modify_tgt(tgt, cname=cname)
1848
1849         self._user2user(tgt, creds,
1850                         expected_error=(KDC_ERR_TGT_REVOKED,
1851                                         KDC_ERR_C_PRINCIPAL_UNKNOWN))
1852
1853     def test_user2user_non_existent_sname(self):
1854         creds = self._get_creds()
1855         tgt = self._get_tgt(creds)
1856
1857         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1858                                           names=['host', 'non_existent_user'])
1859
1860         self._user2user(tgt, creds, sname=sname,
1861                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1862
1863     def test_user2user_no_sname(self):
1864         creds = self._get_creds()
1865         tgt = self._get_tgt(creds)
1866
1867         self._user2user(tgt, creds, sname=False,
1868                         expected_error=(KDC_ERR_GENERIC,
1869                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1870
1871     def test_tgs_service_ticket(self):
1872         creds = self._get_creds()
1873         tgt = self._get_tgt(creds)
1874
1875         service_creds = self.get_service_creds()
1876         service_ticket = self.get_service_ticket(tgt, service_creds)
1877
1878         self._run_tgs(service_ticket, creds,
1879                       expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1880
1881     def test_renew_service_ticket(self):
1882         creds = self._get_creds()
1883         tgt = self._get_tgt(creds)
1884
1885         service_creds = self.get_service_creds()
1886         service_ticket = self.get_service_ticket(tgt, service_creds)
1887
1888         service_ticket = self.modified_ticket(
1889             service_ticket,
1890             modify_fn=self._modify_renewable,
1891             checksum_keys=self.get_krbtgt_checksum_key())
1892
1893         self._renew_tgt(service_ticket, creds,
1894                         expected_error=KDC_ERR_POLICY)
1895
1896     def test_validate_service_ticket(self):
1897         creds = self._get_creds()
1898         tgt = self._get_tgt(creds)
1899
1900         service_creds = self.get_service_creds()
1901         service_ticket = self.get_service_ticket(tgt, service_creds)
1902
1903         service_ticket = self.modified_ticket(
1904             service_ticket,
1905             modify_fn=self._modify_invalid,
1906             checksum_keys=self.get_krbtgt_checksum_key())
1907
1908         self._validate_tgt(service_ticket, creds,
1909                            expected_error=KDC_ERR_POLICY)
1910
1911     def test_s4u2self_service_ticket(self):
1912         creds = self._get_creds()
1913         tgt = self._get_tgt(creds)
1914
1915         service_creds = self.get_service_creds()
1916         service_ticket = self.get_service_ticket(tgt, service_creds)
1917
1918         self._s4u2self(service_ticket, creds,
1919                        expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1920
1921     def test_user2user_service_ticket(self):
1922         creds = self._get_creds()
1923         tgt = self._get_tgt(creds)
1924
1925         service_creds = self.get_service_creds()
1926         service_ticket = self.get_service_ticket(tgt, service_creds)
1927
1928         self._user2user(service_ticket, creds,
1929                         expected_error=(KDC_ERR_MODIFIED, KDC_ERR_POLICY))
1930
1931     # Expected to fail against Windows, which does not produce an error.
1932     def test_fast_service_ticket(self):
1933         creds = self._get_creds()
1934         tgt = self._get_tgt(creds)
1935
1936         service_creds = self.get_service_creds()
1937         service_ticket = self.get_service_ticket(tgt, service_creds)
1938
1939         self._fast(service_ticket, creds,
1940                    expected_error=(KDC_ERR_POLICY,
1941                                    KDC_ERR_S_PRINCIPAL_UNKNOWN))
1942
1943     def test_pac_attrs_none(self):
1944         creds = self._get_creds()
1945         self.get_tgt(creds, pac_request=None,
1946                      expect_pac=True,
1947                      expect_pac_attrs=True,
1948                      expect_pac_attrs_pac_request=None)
1949
1950     def test_pac_attrs_false(self):
1951         creds = self._get_creds()
1952         self.get_tgt(creds, pac_request=False,
1953                      expect_pac=True,
1954                      expect_pac_attrs=True,
1955                      expect_pac_attrs_pac_request=False)
1956
1957     def test_pac_attrs_true(self):
1958         creds = self._get_creds()
1959         self.get_tgt(creds, pac_request=True,
1960                      expect_pac=True,
1961                      expect_pac_attrs=True,
1962                      expect_pac_attrs_pac_request=True)
1963
1964     def test_pac_attrs_renew_none(self):
1965         creds = self._get_creds()
1966         tgt = self.get_tgt(creds, pac_request=None,
1967                            expect_pac=True,
1968                            expect_pac_attrs=True,
1969                            expect_pac_attrs_pac_request=None)
1970         tgt = self._modify_tgt(tgt, renewable=True)
1971
1972         self._renew_tgt(tgt, creds, expected_error=0,
1973                         expect_pac=True,
1974                         expect_pac_attrs=True,
1975                         expect_pac_attrs_pac_request=None,
1976                         expect_requester_sid=True)
1977
1978     def test_pac_attrs_renew_false(self):
1979         creds = self._get_creds()
1980         tgt = self.get_tgt(creds, pac_request=False,
1981                            expect_pac=True,
1982                            expect_pac_attrs=True,
1983                            expect_pac_attrs_pac_request=False)
1984         tgt = self._modify_tgt(tgt, renewable=True)
1985
1986         self._renew_tgt(tgt, creds, expected_error=0,
1987                         expect_pac=True,
1988                         expect_pac_attrs=True,
1989                         expect_pac_attrs_pac_request=False,
1990                         expect_requester_sid=True)
1991
1992     def test_pac_attrs_renew_true(self):
1993         creds = self._get_creds()
1994         tgt = self.get_tgt(creds, pac_request=True,
1995                            expect_pac=True,
1996                            expect_pac_attrs=True,
1997                            expect_pac_attrs_pac_request=True)
1998         tgt = self._modify_tgt(tgt, renewable=True)
1999
2000         self._renew_tgt(tgt, creds, expected_error=0,
2001                         expect_pac=True,
2002                         expect_pac_attrs=True,
2003                         expect_pac_attrs_pac_request=True,
2004                         expect_requester_sid=True)
2005
2006     def test_pac_attrs_rodc_renew_none(self):
2007         creds = self._get_creds(replication_allowed=True,
2008                                 revealed_to_rodc=True)
2009         tgt = self.get_tgt(creds, pac_request=None,
2010                            expect_pac=True,
2011                            expect_pac_attrs=True,
2012                            expect_pac_attrs_pac_request=None)
2013         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
2014
2015         self._renew_tgt(tgt, creds, expected_error=0,
2016                         expect_pac=True,
2017                         expect_pac_attrs=False,
2018                         expect_requester_sid=True)
2019
2020     def test_pac_attrs_rodc_renew_false(self):
2021         creds = self._get_creds(replication_allowed=True,
2022                                 revealed_to_rodc=True)
2023         tgt = self.get_tgt(creds, pac_request=False,
2024                            expect_pac=True,
2025                            expect_pac_attrs=True,
2026                            expect_pac_attrs_pac_request=False)
2027         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
2028
2029         self._renew_tgt(tgt, creds, expected_error=0,
2030                         expect_pac=True,
2031                         expect_pac_attrs=False,
2032                         expect_requester_sid=True)
2033
2034     def test_pac_attrs_rodc_renew_true(self):
2035         creds = self._get_creds(replication_allowed=True,
2036                                 revealed_to_rodc=True)
2037         tgt = self.get_tgt(creds, pac_request=True,
2038                            expect_pac=True,
2039                            expect_pac_attrs=True,
2040                            expect_pac_attrs_pac_request=True)
2041         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
2042
2043         self._renew_tgt(tgt, creds, expected_error=0,
2044                         expect_pac=True,
2045                         expect_pac_attrs=False,
2046                         expect_requester_sid=True)
2047
2048     def test_pac_attrs_missing_renew_none(self):
2049         creds = self._get_creds()
2050         tgt = self.get_tgt(creds, pac_request=None,
2051                            expect_pac=True,
2052                            expect_pac_attrs=True,
2053                            expect_pac_attrs_pac_request=None)
2054         tgt = self._modify_tgt(tgt, renewable=True,
2055                                remove_pac_attrs=True)
2056
2057         self._renew_tgt(tgt, creds, expected_error=0,
2058                         expect_pac=True,
2059                         expect_pac_attrs=False,
2060                         expect_requester_sid=True)
2061
2062     def test_pac_attrs_missing_renew_false(self):
2063         creds = self._get_creds()
2064         tgt = self.get_tgt(creds, pac_request=False,
2065                            expect_pac=True,
2066                            expect_pac_attrs=True,
2067                            expect_pac_attrs_pac_request=False)
2068         tgt = self._modify_tgt(tgt, renewable=True,
2069                                remove_pac_attrs=True)
2070
2071         self._renew_tgt(tgt, creds, expected_error=0,
2072                         expect_pac=True,
2073                         expect_pac_attrs=False,
2074                         expect_requester_sid=True)
2075
2076     def test_pac_attrs_missing_renew_true(self):
2077         creds = self._get_creds()
2078         tgt = self.get_tgt(creds, pac_request=True,
2079                            expect_pac=True,
2080                            expect_pac_attrs=True,
2081                            expect_pac_attrs_pac_request=True)
2082         tgt = self._modify_tgt(tgt, renewable=True,
2083                                remove_pac_attrs=True)
2084
2085         self._renew_tgt(tgt, creds, expected_error=0,
2086                         expect_pac=True,
2087                         expect_pac_attrs=False,
2088                         expect_requester_sid=True)
2089
2090     def test_pac_attrs_missing_rodc_renew_none(self):
2091         creds = self._get_creds(replication_allowed=True,
2092                                 revealed_to_rodc=True)
2093         tgt = self.get_tgt(creds, pac_request=None,
2094                            expect_pac=True,
2095                            expect_pac_attrs=True,
2096                            expect_pac_attrs_pac_request=None)
2097         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2098                                remove_pac_attrs=True)
2099
2100         self._renew_tgt(tgt, creds, expected_error=0,
2101                         expect_pac=True,
2102                         expect_pac_attrs=False,
2103                         expect_requester_sid=True)
2104
2105     def test_pac_attrs_missing_rodc_renew_false(self):
2106         creds = self._get_creds(replication_allowed=True,
2107                                 revealed_to_rodc=True)
2108         tgt = self.get_tgt(creds, pac_request=False,
2109                            expect_pac=True,
2110                            expect_pac_attrs=True,
2111                            expect_pac_attrs_pac_request=False)
2112         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2113                                remove_pac_attrs=True)
2114
2115         self._renew_tgt(tgt, creds, expected_error=0,
2116                         expect_pac=True,
2117                         expect_pac_attrs=False,
2118                         expect_requester_sid=True)
2119
2120     def test_pac_attrs_missing_rodc_renew_true(self):
2121         creds = self._get_creds(replication_allowed=True,
2122                                 revealed_to_rodc=True)
2123         tgt = self.get_tgt(creds, pac_request=True,
2124                            expect_pac=True,
2125                            expect_pac_attrs=True,
2126                            expect_pac_attrs_pac_request=True)
2127         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2128                                remove_pac_attrs=True)
2129
2130         self._renew_tgt(tgt, creds, expected_error=0,
2131                         expect_pac=True,
2132                         expect_pac_attrs=False,
2133                         expect_requester_sid=True)
2134
2135     def test_tgs_pac_attrs_none(self):
2136         creds = self._get_creds()
2137         tgt = self.get_tgt(creds, pac_request=None,
2138                            expect_pac=True,
2139                            expect_pac_attrs=True,
2140                            expect_pac_attrs_pac_request=None)
2141
2142         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
2143                       expect_pac_attrs=False)
2144
2145     def test_tgs_pac_attrs_false(self):
2146         creds = self._get_creds()
2147         tgt = self.get_tgt(creds, pac_request=False,
2148                            expect_pac=True,
2149                            expect_pac_attrs=True,
2150                            expect_pac_attrs_pac_request=False)
2151
2152         self._run_tgs(tgt, creds, expected_error=0, expect_pac=False,
2153                       expect_pac_attrs=False)
2154
2155     def test_tgs_pac_attrs_true(self):
2156         creds = self._get_creds()
2157         tgt = self.get_tgt(creds, pac_request=True,
2158                            expect_pac=True,
2159                            expect_pac_attrs=True,
2160                            expect_pac_attrs_pac_request=True)
2161
2162         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
2163                       expect_pac_attrs=False)
2164
2165     def test_as_requester_sid(self):
2166         creds = self._get_creds()
2167
2168         sid = creds.get_sid()
2169
2170         self.get_tgt(creds, pac_request=None,
2171                      expect_pac=True,
2172                      expected_sid=sid,
2173                      expect_requester_sid=True)
2174
2175     def test_tgs_requester_sid(self):
2176         creds = self._get_creds()
2177
2178         sid = creds.get_sid()
2179
2180         tgt = self.get_tgt(creds, pac_request=None,
2181                            expect_pac=True,
2182                            expected_sid=sid,
2183                            expect_requester_sid=True)
2184
2185         self._run_tgs(tgt, creds, expected_error=0, expect_pac=True,
2186                       expect_requester_sid=False)
2187
2188     def test_tgs_requester_sid_renew(self):
2189         creds = self._get_creds()
2190
2191         sid = creds.get_sid()
2192
2193         tgt = self.get_tgt(creds, pac_request=None,
2194                            expect_pac=True,
2195                            expected_sid=sid,
2196                            expect_requester_sid=True)
2197         tgt = self._modify_tgt(tgt, renewable=True)
2198
2199         self._renew_tgt(tgt, creds, expected_error=0, expect_pac=True,
2200                         expect_pac_attrs=True,
2201                         expect_pac_attrs_pac_request=None,
2202                         expected_sid=sid,
2203                         expect_requester_sid=True)
2204
2205     def test_tgs_requester_sid_rodc_renew(self):
2206         creds = self._get_creds(replication_allowed=True,
2207                                 revealed_to_rodc=True)
2208
2209         sid = creds.get_sid()
2210
2211         tgt = self.get_tgt(creds, pac_request=None,
2212                            expect_pac=True,
2213                            expected_sid=sid,
2214                            expect_requester_sid=True)
2215         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
2216
2217         self._renew_tgt(tgt, creds, expected_error=0, expect_pac=True,
2218                         expect_pac_attrs=False,
2219                         expected_sid=sid,
2220                         expect_requester_sid=True)
2221
2222     def test_tgs_requester_sid_missing_renew(self):
2223         creds = self._get_creds()
2224
2225         sid = creds.get_sid()
2226
2227         tgt = self.get_tgt(creds, pac_request=None,
2228                            expect_pac=True,
2229                            expected_sid=sid,
2230                            expect_requester_sid=True)
2231         tgt = self._modify_tgt(tgt, renewable=True,
2232                                remove_requester_sid=True)
2233
2234         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
2235
2236     def test_tgs_requester_sid_missing_rodc_renew(self):
2237         creds = self._get_creds(replication_allowed=True,
2238                                 revealed_to_rodc=True)
2239
2240         sid = creds.get_sid()
2241
2242         tgt = self.get_tgt(creds, pac_request=None,
2243                            expect_pac=True,
2244                            expected_sid=sid,
2245                            expect_requester_sid=True)
2246         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
2247                                remove_requester_sid=True)
2248
2249         self._renew_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
2250
2251     def test_tgs_requester_sid_validate(self):
2252         creds = self._get_creds()
2253
2254         sid = creds.get_sid()
2255
2256         tgt = self.get_tgt(creds, pac_request=None,
2257                            expect_pac=True,
2258                            expected_sid=sid,
2259                            expect_requester_sid=True)
2260         tgt = self._modify_tgt(tgt, invalid=True)
2261
2262         self._validate_tgt(tgt, creds, expected_error=0, expect_pac=True,
2263                            expect_pac_attrs=True,
2264                            expect_pac_attrs_pac_request=None,
2265                            expected_sid=sid,
2266                            expect_requester_sid=True)
2267
2268     def test_tgs_requester_sid_rodc_validate(self):
2269         creds = self._get_creds(replication_allowed=True,
2270                                 revealed_to_rodc=True)
2271
2272         sid = creds.get_sid()
2273
2274         tgt = self.get_tgt(creds, pac_request=None,
2275                            expect_pac=True,
2276                            expected_sid=sid,
2277                            expect_requester_sid=True)
2278         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True)
2279
2280         self._validate_tgt(tgt, creds, expected_error=0, expect_pac=True,
2281                            expect_pac_attrs=False,
2282                            expected_sid=sid,
2283                            expect_requester_sid=True)
2284
2285     def test_tgs_requester_sid_missing_validate(self):
2286         creds = self._get_creds()
2287
2288         sid = creds.get_sid()
2289
2290         tgt = self.get_tgt(creds, pac_request=None,
2291                            expect_pac=True,
2292                            expected_sid=sid,
2293                            expect_requester_sid=True)
2294         tgt = self._modify_tgt(tgt, invalid=True,
2295                                remove_requester_sid=True)
2296
2297         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
2298
2299     def test_tgs_requester_sid_missing_rodc_validate(self):
2300         creds = self._get_creds(replication_allowed=True,
2301                                 revealed_to_rodc=True)
2302
2303         sid = creds.get_sid()
2304
2305         tgt = self.get_tgt(creds, pac_request=None,
2306                            expect_pac=True,
2307                            expected_sid=sid,
2308                            expect_requester_sid=True)
2309         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True,
2310                                remove_requester_sid=True)
2311
2312         self._validate_tgt(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
2313
2314     def test_tgs_pac_request_none(self):
2315         creds = self._get_creds()
2316         tgt = self.get_tgt(creds, pac_request=None)
2317
2318         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2319
2320         pac = self.get_ticket_pac(ticket)
2321         self.assertIsNotNone(pac)
2322
2323     def test_tgs_pac_request_false(self):
2324         creds = self._get_creds()
2325         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2326
2327         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=False)
2328
2329         pac = self.get_ticket_pac(ticket, expect_pac=False)
2330         self.assertIsNone(pac)
2331
2332     def test_tgs_pac_request_true(self):
2333         creds = self._get_creds()
2334         tgt = self.get_tgt(creds, pac_request=True)
2335
2336         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2337
2338         pac = self.get_ticket_pac(ticket)
2339         self.assertIsNotNone(pac)
2340
2341     def test_renew_pac_request_none(self):
2342         creds = self._get_creds()
2343         tgt = self.get_tgt(creds, pac_request=None)
2344         tgt = self._modify_tgt(tgt, renewable=True)
2345
2346         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2347                               expect_pac_attrs=True,
2348                               expect_pac_attrs_pac_request=None,
2349                               expect_requester_sid=True)
2350
2351         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2352
2353         pac = self.get_ticket_pac(ticket)
2354         self.assertIsNotNone(pac)
2355
2356     def test_renew_pac_request_false(self):
2357         creds = self._get_creds()
2358         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2359         tgt = self._modify_tgt(tgt, renewable=True)
2360
2361         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2362                               expect_pac_attrs=True,
2363                               expect_pac_attrs_pac_request=False,
2364                               expect_requester_sid=True)
2365
2366         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=False)
2367
2368         pac = self.get_ticket_pac(ticket, expect_pac=False)
2369         self.assertIsNone(pac)
2370
2371     def test_renew_pac_request_true(self):
2372         creds = self._get_creds()
2373         tgt = self.get_tgt(creds, pac_request=True)
2374         tgt = self._modify_tgt(tgt, renewable=True)
2375
2376         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2377                               expect_pac_attrs=True,
2378                               expect_pac_attrs_pac_request=True,
2379                               expect_requester_sid=True)
2380
2381         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2382
2383         pac = self.get_ticket_pac(ticket)
2384         self.assertIsNotNone(pac)
2385
2386     def test_rodc_renew_pac_request_none(self):
2387         creds = self._get_creds(replication_allowed=True,
2388                                 revealed_to_rodc=True)
2389         tgt = self.get_tgt(creds, pac_request=None)
2390         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2391
2392         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2393                               expect_pac_attrs=False,
2394                               expect_requester_sid=True)
2395
2396         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2397
2398         pac = self.get_ticket_pac(ticket)
2399         self.assertIsNotNone(pac)
2400
2401     def test_rodc_renew_pac_request_false(self):
2402         creds = self._get_creds(replication_allowed=True,
2403                                 revealed_to_rodc=True)
2404         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2405         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2406
2407         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2408                               expect_pac_attrs=False,
2409                               expect_requester_sid=True)
2410
2411         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2412
2413         pac = self.get_ticket_pac(ticket)
2414         self.assertIsNotNone(pac)
2415
2416     def test_rodc_renew_pac_request_true(self):
2417         creds = self._get_creds(replication_allowed=True,
2418                                 revealed_to_rodc=True)
2419         tgt = self.get_tgt(creds, pac_request=True)
2420         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2421
2422         tgt = self._renew_tgt(tgt, creds, expected_error=0, expect_pac=None,
2423                               expect_pac_attrs=False,
2424                               expect_requester_sid=True)
2425
2426         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2427
2428         pac = self.get_ticket_pac(ticket)
2429         self.assertIsNotNone(pac)
2430
2431     def test_validate_pac_request_none(self):
2432         creds = self._get_creds()
2433         tgt = self.get_tgt(creds, pac_request=None)
2434         tgt = self._modify_tgt(tgt, invalid=True)
2435
2436         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2437                                  expect_pac_attrs=True,
2438                                  expect_pac_attrs_pac_request=None,
2439                                  expect_requester_sid=True)
2440
2441         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2442
2443         pac = self.get_ticket_pac(ticket)
2444         self.assertIsNotNone(pac)
2445
2446     def test_validate_pac_request_false(self):
2447         creds = self._get_creds()
2448         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2449         tgt = self._modify_tgt(tgt, invalid=True)
2450
2451         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2452                                  expect_pac_attrs=True,
2453                                  expect_pac_attrs_pac_request=False,
2454                                  expect_requester_sid=True)
2455
2456         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=False)
2457
2458         pac = self.get_ticket_pac(ticket, expect_pac=False)
2459         self.assertIsNone(pac)
2460
2461     def test_validate_pac_request_true(self):
2462         creds = self._get_creds()
2463         tgt = self.get_tgt(creds, pac_request=True)
2464         tgt = self._modify_tgt(tgt, invalid=True)
2465
2466         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2467                                  expect_pac_attrs=True,
2468                                  expect_pac_attrs_pac_request=True,
2469                                  expect_requester_sid=True)
2470
2471         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2472
2473         pac = self.get_ticket_pac(ticket)
2474         self.assertIsNotNone(pac)
2475
2476     def test_rodc_validate_pac_request_none(self):
2477         creds = self._get_creds(replication_allowed=True,
2478                                 revealed_to_rodc=True)
2479         tgt = self.get_tgt(creds, pac_request=None)
2480         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2481
2482         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2483                                  expect_pac_attrs=False,
2484                                  expect_requester_sid=True)
2485
2486         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2487
2488         pac = self.get_ticket_pac(ticket)
2489         self.assertIsNotNone(pac)
2490
2491     def test_rodc_validate_pac_request_false(self):
2492         creds = self._get_creds(replication_allowed=True,
2493                                 revealed_to_rodc=True)
2494         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2495         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2496
2497         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2498                                  expect_pac_attrs=False,
2499                                  expect_requester_sid=True)
2500
2501         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2502
2503         pac = self.get_ticket_pac(ticket)
2504         self.assertIsNotNone(pac)
2505
2506     def test_rodc_validate_pac_request_true(self):
2507         creds = self._get_creds(replication_allowed=True,
2508                                 revealed_to_rodc=True)
2509         tgt = self.get_tgt(creds, pac_request=True)
2510         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2511
2512         tgt = self._validate_tgt(tgt, creds, expected_error=0, expect_pac=None,
2513                                  expect_pac_attrs=False,
2514                                  expect_requester_sid=True)
2515
2516         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2517
2518         pac = self.get_ticket_pac(ticket)
2519         self.assertIsNotNone(pac)
2520
2521     def test_s4u2self_pac_request_none(self):
2522         creds = self._get_creds()
2523         tgt = self.get_tgt(creds, pac_request=None)
2524
2525         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2526
2527         pac = self.get_ticket_pac(ticket)
2528         self.assertIsNotNone(pac)
2529
2530     def test_s4u2self_pac_request_false(self):
2531         creds = self._get_creds()
2532         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2533
2534         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2535
2536         pac = self.get_ticket_pac(ticket)
2537         self.assertIsNotNone(pac)
2538
2539     def test_s4u2self_pac_request_true(self):
2540         creds = self._get_creds()
2541         tgt = self.get_tgt(creds, pac_request=True)
2542
2543         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2544
2545         pac = self.get_ticket_pac(ticket)
2546         self.assertIsNotNone(pac)
2547
2548     def test_user2user_pac_request_none(self):
2549         creds = self._get_creds()
2550         tgt = self.get_tgt(creds, pac_request=None)
2551
2552         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2553
2554         pac = self.get_ticket_pac(ticket)
2555         self.assertIsNotNone(pac)
2556
2557     def test_user2user_pac_request_false(self):
2558         creds = self._get_creds()
2559         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2560
2561         ticket = self._user2user(tgt, creds, expected_error=0,
2562                                  expect_pac=True)
2563
2564         pac = self.get_ticket_pac(ticket, expect_pac=True)
2565         self.assertIsNotNone(pac)
2566
2567     def test_user2user_pac_request_true(self):
2568         creds = self._get_creds()
2569         tgt = self.get_tgt(creds, pac_request=True)
2570
2571         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2572
2573         pac = self.get_ticket_pac(ticket)
2574         self.assertIsNotNone(pac)
2575
2576     def test_user2user_user_pac_request_none(self):
2577         creds = self._get_creds()
2578         tgt = self.get_tgt(creds)
2579
2580         user_creds = self._get_mach_creds()
2581         user_tgt = self.get_tgt(user_creds, pac_request=None)
2582
2583         ticket = self._user2user(tgt, creds, expected_error=0,
2584                                  user_tgt=user_tgt, user_creds=user_creds,
2585                                  expect_pac=True)
2586
2587         pac = self.get_ticket_pac(ticket)
2588         self.assertIsNotNone(pac)
2589
2590     def test_user2user_user_pac_request_false(self):
2591         creds = self._get_creds()
2592         tgt = self.get_tgt(creds)
2593
2594         user_creds = self._get_mach_creds()
2595         user_tgt = self.get_tgt(user_creds, pac_request=False, expect_pac=None)
2596
2597         ticket = self._user2user(tgt, creds, expected_error=0,
2598                                  user_tgt=user_tgt, user_creds=user_creds,
2599                                  expect_pac=False)
2600
2601         pac = self.get_ticket_pac(ticket, expect_pac=False)
2602         self.assertIsNone(pac)
2603
2604     def test_user2user_user_pac_request_true(self):
2605         creds = self._get_creds()
2606         tgt = self.get_tgt(creds)
2607
2608         user_creds = self._get_mach_creds()
2609         user_tgt = self.get_tgt(user_creds, pac_request=True)
2610
2611         ticket = self._user2user(tgt, creds, expected_error=0,
2612                                  user_tgt=user_tgt, user_creds=user_creds,
2613                                  expect_pac=True)
2614
2615         pac = self.get_ticket_pac(ticket)
2616         self.assertIsNotNone(pac)
2617
2618     def test_fast_pac_request_none(self):
2619         creds = self._get_creds()
2620         tgt = self.get_tgt(creds, pac_request=None)
2621
2622         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2623
2624         pac = self.get_ticket_pac(ticket)
2625         self.assertIsNotNone(pac)
2626
2627     def test_fast_pac_request_false(self):
2628         creds = self._get_creds()
2629         tgt = self.get_tgt(creds, pac_request=False)
2630
2631         ticket = self._fast(tgt, creds, expected_error=0,
2632                             expect_pac=True)
2633
2634         pac = self.get_ticket_pac(ticket, expect_pac=True)
2635         self.assertIsNotNone(pac)
2636
2637     def test_fast_pac_request_true(self):
2638         creds = self._get_creds()
2639         tgt = self.get_tgt(creds, pac_request=True)
2640
2641         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2642
2643         pac = self.get_ticket_pac(ticket)
2644         self.assertIsNotNone(pac)
2645
2646     def test_tgs_rodc_pac_request_none(self):
2647         creds = self._get_creds(replication_allowed=True,
2648                                 revealed_to_rodc=True)
2649         tgt = self.get_tgt(creds, pac_request=None)
2650         tgt = self._modify_tgt(tgt, from_rodc=True)
2651
2652         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2653
2654         pac = self.get_ticket_pac(ticket)
2655         self.assertIsNotNone(pac)
2656
2657     def test_tgs_rodc_pac_request_false(self):
2658         creds = self._get_creds(replication_allowed=True,
2659                                 revealed_to_rodc=True)
2660         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2661         tgt = self._modify_tgt(tgt, from_rodc=True)
2662
2663         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2664
2665         pac = self.get_ticket_pac(ticket)
2666         self.assertIsNotNone(pac)
2667
2668     def test_tgs_rodc_pac_request_true(self):
2669         creds = self._get_creds(replication_allowed=True,
2670                                 revealed_to_rodc=True)
2671         tgt = self.get_tgt(creds, pac_request=True)
2672         tgt = self._modify_tgt(tgt, from_rodc=True)
2673
2674         ticket = self._run_tgs(tgt, creds, expected_error=0, expect_pac=True)
2675
2676         pac = self.get_ticket_pac(ticket)
2677         self.assertIsNotNone(pac)
2678
2679     def test_tgs_rename(self):
2680         creds = self.get_cached_creds(account_type=self.AccountType.USER,
2681                                       use_cache=False)
2682         tgt = self.get_tgt(creds)
2683
2684         # Rename the account.
2685         new_name = self.get_new_username()
2686
2687         samdb = self.get_samdb()
2688         msg = ldb.Message(creds.get_dn())
2689         msg['sAMAccountName'] = ldb.MessageElement(new_name,
2690                                                    ldb.FLAG_MOD_REPLACE,
2691                                                    'sAMAccountName')
2692         samdb.modify(msg)
2693
2694         self._run_tgs(tgt, creds, expected_error=(KDC_ERR_TGT_REVOKED,
2695                                                   KDC_ERR_C_PRINCIPAL_UNKNOWN))
2696
2697     # Test making a TGS request for a ticket expiring post-2038.
2698     def test_tgs_req_future_till(self):
2699         creds = self._get_creds()
2700         tgt = self._get_tgt(creds)
2701
2702         target_creds = self.get_service_creds()
2703         self._tgs_req(
2704             tgt=tgt,
2705             expected_error=0,
2706             creds=creds,
2707             target_creds=target_creds,
2708             till='99990913024805Z')
2709
2710     def _modify_renewable(self, enc_part):
2711         # Set the renewable flag.
2712         enc_part = self.modify_ticket_flag(enc_part, 'renewable', value=True)
2713
2714         # Set the renew-till time to be in the future.
2715         renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
2716         enc_part['renew-till'] = renew_till
2717
2718         return enc_part
2719
2720     def _modify_invalid(self, enc_part):
2721         # Set the invalid flag.
2722         enc_part = self.modify_ticket_flag(enc_part, 'invalid', value=True)
2723
2724         # Set the ticket start time to be in the past.
2725         past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
2726         enc_part['starttime'] = past_time
2727
2728         return enc_part
2729
2730     def _get_tgt(self,
2731                  client_creds,
2732                  renewable=False,
2733                  invalid=False,
2734                  from_rodc=False,
2735                  new_rid=None,
2736                  remove_pac=False,
2737                  allow_empty_authdata=False,
2738                  can_modify_logon_info=True,
2739                  can_modify_requester_sid=True,
2740                  remove_pac_attrs=False,
2741                  remove_requester_sid=False,
2742                  etype=None,
2743                  cksum_etype=None):
2744         self.assertFalse(renewable and invalid)
2745
2746         if remove_pac:
2747             self.assertIsNone(new_rid)
2748
2749         tgt = self.get_tgt(client_creds)
2750
2751         return self._modify_tgt(
2752             tgt=tgt,
2753             renewable=renewable,
2754             invalid=invalid,
2755             from_rodc=from_rodc,
2756             new_rid=new_rid,
2757             remove_pac=remove_pac,
2758             allow_empty_authdata=allow_empty_authdata,
2759             can_modify_logon_info=can_modify_logon_info,
2760             can_modify_requester_sid=can_modify_requester_sid,
2761             remove_pac_attrs=remove_pac_attrs,
2762             remove_requester_sid=remove_requester_sid,
2763             etype=etype,
2764             cksum_etype=cksum_etype)
2765
2766     def _modify_tgt(self,
2767                     tgt,
2768                     renewable=False,
2769                     invalid=False,
2770                     from_rodc=False,
2771                     new_rid=None,
2772                     remove_pac=False,
2773                     allow_empty_authdata=False,
2774                     cname=None,
2775                     crealm=None,
2776                     can_modify_logon_info=True,
2777                     can_modify_requester_sid=True,
2778                     remove_pac_attrs=False,
2779                     remove_requester_sid=False,
2780                     etype=None,
2781                     cksum_etype=None):
2782         if from_rodc:
2783             krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2784         else:
2785             krbtgt_creds = self.get_krbtgt_creds()
2786
2787         if new_rid is not None or remove_requester_sid or remove_pac_attrs:
2788             def change_sid_fn(pac):
2789                 pac_buffers = pac.buffers
2790                 for pac_buffer in pac_buffers:
2791                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
2792                         if new_rid is not None and can_modify_logon_info:
2793                             logon_info = pac_buffer.info.info
2794
2795                             logon_info.info3.base.rid = new_rid
2796                     elif pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID:
2797                         if remove_requester_sid:
2798                             pac.num_buffers -= 1
2799                             pac_buffers.remove(pac_buffer)
2800                         elif new_rid is not None and can_modify_requester_sid:
2801                             requester_sid = pac_buffer.info
2802
2803                             samdb = self.get_samdb()
2804                             domain_sid = samdb.get_domain_sid()
2805
2806                             new_sid = f'{domain_sid}-{new_rid}'
2807
2808                             requester_sid.sid = security.dom_sid(new_sid)
2809                     elif pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO:
2810                         if remove_pac_attrs:
2811                             pac.num_buffers -= 1
2812                             pac_buffers.remove(pac_buffer)
2813
2814                 pac.buffers = pac_buffers
2815
2816                 return pac
2817         else:
2818             change_sid_fn = None
2819
2820         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds,
2821                                                          etype)
2822
2823         if remove_pac:
2824             checksum_keys = None
2825         else:
2826             if etype == cksum_etype:
2827                 cksum_key = krbtgt_key
2828             else:
2829                 cksum_key = self.TicketDecryptionKey_from_creds(krbtgt_creds,
2830                                                                 cksum_etype)
2831             checksum_keys = {
2832                 krb5pac.PAC_TYPE_KDC_CHECKSUM: cksum_key
2833             }
2834
2835         if renewable:
2836             flags_modify_fn = self._modify_renewable
2837         elif invalid:
2838             flags_modify_fn = self._modify_invalid
2839         else:
2840             flags_modify_fn = None
2841
2842         if cname is not None or crealm is not None:
2843             def modify_fn(enc_part):
2844                 if flags_modify_fn is not None:
2845                     enc_part = flags_modify_fn(enc_part)
2846
2847                 if cname is not None:
2848                     enc_part['cname'] = cname
2849
2850                 if crealm is not None:
2851                     enc_part['crealm'] = crealm
2852
2853                 return enc_part
2854         else:
2855             modify_fn = flags_modify_fn
2856
2857         if cname is not None:
2858             def modify_pac_fn(pac):
2859                 if change_sid_fn is not None:
2860                     pac = change_sid_fn(pac)
2861
2862                 for pac_buffer in pac.buffers:
2863                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2864                         logon_info = pac_buffer.info
2865
2866                         logon_info.account_name = (
2867                             cname['name-string'][0].decode('utf-8'))
2868
2869                 return pac
2870         else:
2871             modify_pac_fn = change_sid_fn
2872
2873         return self.modified_ticket(
2874             tgt,
2875             new_ticket_key=krbtgt_key,
2876             modify_fn=modify_fn,
2877             modify_pac_fn=modify_pac_fn,
2878             exclude_pac=remove_pac,
2879             allow_empty_authdata=allow_empty_authdata,
2880             update_pac_checksums=not remove_pac,
2881             checksum_keys=checksum_keys)
2882
2883     def _remove_rodc_partial_secrets(self):
2884         samdb = self.get_samdb()
2885
2886         rodc_ctx = self.get_mock_rodc_ctx()
2887         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2888
2889         def add_rodc_partial_secrets():
2890             msg = ldb.Message()
2891             msg.dn = rodc_dn
2892             msg['userAccountControl'] = ldb.MessageElement(
2893                 str(rodc_ctx.userAccountControl),
2894                 ldb.FLAG_MOD_REPLACE,
2895                 'userAccountControl')
2896             samdb.modify(msg)
2897
2898         self.addCleanup(add_rodc_partial_secrets)
2899
2900         uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
2901
2902         msg = ldb.Message()
2903         msg.dn = rodc_dn
2904         msg['userAccountControl'] = ldb.MessageElement(
2905             str(uac),
2906             ldb.FLAG_MOD_REPLACE,
2907             'userAccountControl')
2908         samdb.modify(msg)
2909
2910     def _remove_rodc_krbtgt_link(self):
2911         samdb = self.get_samdb()
2912
2913         rodc_ctx = self.get_mock_rodc_ctx()
2914         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2915
2916         def add_rodc_krbtgt_link():
2917             msg = ldb.Message()
2918             msg.dn = rodc_dn
2919             msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2920                 rodc_ctx.new_krbtgt_dn,
2921                 ldb.FLAG_MOD_ADD,
2922                 'msDS-KrbTgtLink')
2923             samdb.modify(msg)
2924
2925         self.addCleanup(add_rodc_krbtgt_link)
2926
2927         msg = ldb.Message()
2928         msg.dn = rodc_dn
2929         msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2930             [],
2931             ldb.FLAG_MOD_DELETE,
2932             'msDS-KrbTgtLink')
2933         samdb.modify(msg)
2934
2935     def _get_creds(self,
2936                    replication_allowed=False,
2937                    replication_denied=False,
2938                    revealed_to_rodc=False):
2939         return self.get_cached_creds(
2940             account_type=self.AccountType.COMPUTER,
2941             opts={
2942                 'allowed_replication_mock': replication_allowed,
2943                 'denied_replication_mock': replication_denied,
2944                 'revealed_to_mock_rodc': revealed_to_rodc,
2945                 'id': 0
2946             })
2947
2948     def _get_existing_rid(self,
2949                           replication_allowed=False,
2950                           replication_denied=False,
2951                           revealed_to_rodc=False):
2952         other_creds = self.get_cached_creds(
2953             account_type=self.AccountType.COMPUTER,
2954             opts={
2955                 'allowed_replication_mock': replication_allowed,
2956                 'denied_replication_mock': replication_denied,
2957                 'revealed_to_mock_rodc': revealed_to_rodc,
2958                 'id': 1
2959             })
2960
2961         other_sid = other_creds.get_sid()
2962         other_rid = int(other_sid.rsplit('-', 1)[1])
2963
2964         return other_rid
2965
2966     def _get_mach_creds(self):
2967         return self.get_cached_creds(
2968             account_type=self.AccountType.COMPUTER,
2969             opts={
2970                 'allowed_replication_mock': True,
2971                 'denied_replication_mock': False,
2972                 'revealed_to_mock_rodc': True,
2973                 'id': 2
2974             })
2975
2976     def _get_non_existent_rid(self):
2977         return (1 << 30) - 1
2978
2979     def _run_tgs(self, tgt, creds, expected_error, *, expect_pac=True,
2980                  expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2981                  expect_requester_sid=None, expected_sid=None,
2982                  expect_edata=False, expect_status=None, expected_status=None):
2983         target_creds = self.get_service_creds()
2984         return self._tgs_req(
2985             tgt, expected_error, creds, target_creds,
2986             expect_pac=expect_pac,
2987             expect_pac_attrs=expect_pac_attrs,
2988             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2989             expect_requester_sid=expect_requester_sid,
2990             expected_sid=expected_sid,
2991             expect_edata=expect_edata,
2992             expect_status=expect_status,
2993             expected_status=expected_status)
2994
2995     # These tests fail against Windows, which does not implement ticket
2996     # renewal.
2997     def _renew_tgt(self, tgt, creds, expected_error, *, expect_pac=True,
2998                    expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2999                    expect_requester_sid=None, expected_sid=None):
3000         krbtgt_creds = self.get_krbtgt_creds()
3001         kdc_options = str(krb5_asn1.KDCOptions('renew'))
3002         return self._tgs_req(
3003             tgt, expected_error, creds, krbtgt_creds,
3004             kdc_options=kdc_options,
3005             expect_pac=expect_pac,
3006             expect_pac_attrs=expect_pac_attrs,
3007             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3008             expect_requester_sid=expect_requester_sid,
3009             expected_sid=expected_sid)
3010
3011     # These tests fail against Windows, which does not implement ticket
3012     # validation.
3013     def _validate_tgt(self, tgt, creds, expected_error, *, expect_pac=True,
3014                       expect_pac_attrs=None,
3015                       expect_pac_attrs_pac_request=None,
3016                       expect_requester_sid=None,
3017                       expected_sid=None):
3018         krbtgt_creds = self.get_krbtgt_creds()
3019         kdc_options = str(krb5_asn1.KDCOptions('validate'))
3020         return self._tgs_req(
3021             tgt, expected_error, creds, krbtgt_creds,
3022             kdc_options=kdc_options,
3023             expect_pac=expect_pac,
3024             expect_pac_attrs=expect_pac_attrs,
3025             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3026             expect_requester_sid=expect_requester_sid,
3027             expected_sid=expected_sid)
3028
3029     def _s4u2self(self, tgt, tgt_creds, expected_error, *, expect_pac=True,
3030                   expect_edata=False, expect_status=None,
3031                   expected_status=None):
3032         user_creds = self._get_mach_creds()
3033
3034         user_name = user_creds.get_username()
3035         user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
3036                                                names=[user_name])
3037         user_realm = user_creds.get_realm()
3038
3039         def generate_s4u2self_padata(_kdc_exchange_dict,
3040                                      _callback_dict,
3041                                      req_body):
3042             padata = self.PA_S4U2Self_create(
3043                 name=user_cname,
3044                 realm=user_realm,
3045                 tgt_session_key=tgt.session_key,
3046                 ctype=None)
3047
3048             return [padata], req_body
3049
3050         return self._tgs_req(tgt, expected_error, tgt_creds, tgt_creds,
3051                              expected_cname=user_cname,
3052                              generate_padata_fn=generate_s4u2self_padata,
3053                              expect_edata=expect_edata,
3054                              expect_status=expect_status,
3055                              expected_status=expected_status,
3056                              expect_pac=expect_pac)
3057
3058     def _user2user(self, tgt, tgt_creds, expected_error, *,
3059                    sname=None,
3060                    srealm=None, user_tgt=None, user_creds=None,
3061                    expect_pac=True, expected_status=None):
3062         if user_tgt is None:
3063             user_creds = self._get_mach_creds()
3064             user_tgt = self.get_tgt(user_creds)
3065         else:
3066             self.assertIsNotNone(user_creds,
3067                                  'if supplying user_tgt, user_creds should be '
3068                                  'supplied also')
3069
3070         kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
3071         return self._tgs_req(user_tgt, expected_error, user_creds, tgt_creds,
3072                              kdc_options=kdc_options,
3073                              additional_ticket=tgt,
3074                              sname=sname,
3075                              srealm=srealm,
3076                              expect_pac=expect_pac,
3077                              expected_status=expected_status)
3078
3079     def _fast(self, armor_tgt, armor_tgt_creds, expected_error,
3080               expected_sname=None, expect_pac=True, expect_edata=False):
3081         user_creds = self._get_mach_creds()
3082         user_tgt = self.get_tgt(user_creds)
3083
3084         target_creds = self.get_service_creds()
3085
3086         return self._tgs_req(user_tgt, expected_error,
3087                              user_creds, target_creds,
3088                              armor_tgt=armor_tgt,
3089                              expected_sname=expected_sname,
3090                              expect_pac=expect_pac,
3091                              expect_edata=expect_edata)
3092
3093
3094 if __name__ == "__main__":
3095     global_asn1_print = False
3096     global_hexdump = False
3097     import unittest
3098     unittest.main()