tests/krb5: Add a test for S4U2Self with no authorization data required
[samba.git] / python / samba / tests / krb5 / s4u_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import os
21 import functools
22
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
25
26 from samba import ntstatus
27 from samba.dcerpc import krb5pac, lsa
28
29 from samba.tests import env_get_var_value
30 from samba.tests.krb5.kcrypto import Cksumtype, Enctype
31 from samba.tests.krb5.kdc_base_test import KDCBaseTest
32 from samba.tests.krb5.raw_testcase import (
33     RodcPacEncryptionKey,
34     ZeroedChecksumKey
35 )
36 from samba.tests.krb5.rfc4120_constants import (
37     AES256_CTS_HMAC_SHA1_96,
38     ARCFOUR_HMAC_MD5,
39     KDC_ERR_BADMATCH,
40     KDC_ERR_BADOPTION,
41     KDC_ERR_BAD_INTEGRITY,
42     KDC_ERR_GENERIC,
43     KDC_ERR_INAPP_CKSUM,
44     KDC_ERR_MODIFIED,
45     KDC_ERR_SUMTYPE_NOSUPP,
46     KDC_ERR_TGT_REVOKED,
47     KU_PA_ENC_TIMESTAMP,
48     KU_AS_REP_ENC_PART,
49     KU_TGS_REP_ENC_PART_SUB_KEY,
50     NT_PRINCIPAL
51 )
52 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
53
54 global_asn1_print = False
55 global_hexdump = False
56
57
58 class S4UKerberosTests(KDCBaseTest):
59
60     def setUp(self):
61         super(S4UKerberosTests, self).setUp()
62         self.do_asn1_print = global_asn1_print
63         self.do_hexdump = global_hexdump
64
65     def _test_s4u2self(self, pa_s4u2self_ctype=None):
66         service_creds = self.get_service_creds()
67         service = service_creds.get_username()
68         realm = service_creds.get_realm()
69
70         cname = self.PrincipalName_create(name_type=1, names=[service])
71         sname = self.PrincipalName_create(name_type=2, names=["krbtgt", realm])
72
73         till = self.get_KerberosTime(offset=36000)
74
75         kdc_options = krb5_asn1.KDCOptions('forwardable')
76         padata = None
77
78         etypes = (18, 17, 23)
79
80         req = self.AS_REQ_create(padata=padata,
81                                  kdc_options=str(kdc_options),
82                                  cname=cname,
83                                  realm=realm,
84                                  sname=sname,
85                                  from_time=None,
86                                  till_time=till,
87                                  renew_time=None,
88                                  nonce=0x7fffffff,
89                                  etypes=etypes,
90                                  addresses=None,
91                                  additional_tickets=None)
92         rep = self.send_recv_transaction(req)
93         self.assertIsNotNone(rep)
94
95         self.assertEqual(rep['msg-type'], 30)
96         self.assertEqual(rep['error-code'], 25)
97         rep_padata = self.der_decode(
98             rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
99
100         for pa in rep_padata:
101             if pa['padata-type'] == 19:
102                 etype_info2 = pa['padata-value']
103                 break
104
105         etype_info2 = self.der_decode(
106             etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
107
108         key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0])
109
110         (patime, pausec) = self.get_KerberosTimeWithUsec()
111         pa_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
112         pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
113
114         pa_ts = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, pa_ts)
115         pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.EncryptedData())
116
117         pa_ts = self.PA_DATA_create(2, pa_ts)
118
119         kdc_options = krb5_asn1.KDCOptions('forwardable')
120         padata = [pa_ts]
121
122         req = self.AS_REQ_create(padata=padata,
123                                  kdc_options=str(kdc_options),
124                                  cname=cname,
125                                  realm=realm,
126                                  sname=sname,
127                                  from_time=None,
128                                  till_time=till,
129                                  renew_time=None,
130                                  nonce=0x7fffffff,
131                                  etypes=etypes,
132                                  addresses=None,
133                                  additional_tickets=None)
134         rep = self.send_recv_transaction(req)
135         self.assertIsNotNone(rep)
136
137         msg_type = rep['msg-type']
138         self.assertEqual(msg_type, 11)
139
140         enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
141         # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
142         # application tag 26
143         try:
144             enc_part2 = self.der_decode(
145                 enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
146         except Exception:
147             enc_part2 = self.der_decode(
148                 enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
149
150         # S4U2Self Request
151         sname = cname
152
153         for_user_name = env_get_var_value('FOR_USER')
154         uname = self.PrincipalName_create(name_type=1, names=[for_user_name])
155
156         kdc_options = krb5_asn1.KDCOptions('forwardable')
157         till = self.get_KerberosTime(offset=36000)
158         ticket = rep['ticket']
159         ticket_session_key = self.EncryptionKey_import(enc_part2['key'])
160         pa_s4u = self.PA_S4U2Self_create(name=uname, realm=realm,
161                                          tgt_session_key=ticket_session_key,
162                                          ctype=pa_s4u2self_ctype)
163         padata = [pa_s4u]
164
165         subkey = self.RandomKey(ticket_session_key.etype)
166
167         (ctime, cusec) = self.get_KerberosTimeWithUsec()
168
169         req = self.TGS_REQ_create(padata=padata,
170                                   cusec=cusec,
171                                   ctime=ctime,
172                                   ticket=ticket,
173                                   kdc_options=str(kdc_options),
174                                   cname=cname,
175                                   realm=realm,
176                                   sname=sname,
177                                   from_time=None,
178                                   till_time=till,
179                                   renew_time=None,
180                                   nonce=0x7ffffffe,
181                                   etypes=etypes,
182                                   addresses=None,
183                                   EncAuthorizationData=None,
184                                   EncAuthorizationData_key=None,
185                                   additional_tickets=None,
186                                   ticket_session_key=ticket_session_key,
187                                   authenticator_subkey=subkey)
188         rep = self.send_recv_transaction(req)
189         self.assertIsNotNone(rep)
190
191         msg_type = rep['msg-type']
192         if msg_type == 13:
193             enc_part2 = subkey.decrypt(
194                 KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
195             enc_part2 = self.der_decode(
196                 enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
197
198         return msg_type
199
200     # Using the checksum type from the tgt_session_key happens to work
201     # everywhere
202     def test_s4u2self(self):
203         msg_type = self._test_s4u2self()
204         self.assertEqual(msg_type, 13)
205
206     # Per spec, the checksum of PA-FOR-USER is HMAC_MD5, see [MS-SFU] 2.2.1
207     def test_s4u2self_hmac_md5_checksum(self):
208         msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.HMAC_MD5)
209         self.assertEqual(msg_type, 13)
210
211     def test_s4u2self_md5_unkeyed_checksum(self):
212         msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.MD5)
213         self.assertEqual(msg_type, 30)
214
215     def test_s4u2self_sha1_unkeyed_checksum(self):
216         msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.SHA1)
217         self.assertEqual(msg_type, 30)
218
219     def test_s4u2self_crc32_unkeyed_checksum(self):
220         msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32)
221         self.assertEqual(msg_type, 30)
222
223     def _run_s4u2self_test(self, kdc_dict):
224         client_opts = kdc_dict.pop('client_opts', None)
225         client_creds = self.get_cached_creds(
226             account_type=self.AccountType.USER,
227             opts=client_opts)
228
229         service_opts = kdc_dict.pop('service_opts', None)
230         service_creds = self.get_cached_creds(
231             account_type=self.AccountType.COMPUTER,
232             opts=service_opts)
233
234         service_tgt = self.get_tgt(service_creds)
235         modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
236         if modify_service_tgt_fn is not None:
237             service_tgt = modify_service_tgt_fn(service_tgt)
238
239         client_name = client_creds.get_username()
240         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
241                                                  names=[client_name])
242
243         samdb = self.get_samdb()
244         client_dn = client_creds.get_dn()
245         sid = self.get_objectSid(samdb, client_dn)
246
247         service_name = kdc_dict.pop('service_name', None)
248         if service_name is None:
249             service_name = service_creds.get_username()[:-1]
250         service_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
251                                                   names=['host', service_name])
252
253         realm = client_creds.get_realm()
254
255         expected_flags = kdc_dict.pop('expected_flags', None)
256         if expected_flags is not None:
257             expected_flags = krb5_asn1.TicketFlags(expected_flags)
258
259         unexpected_flags = kdc_dict.pop('unexpected_flags', None)
260         if unexpected_flags is not None:
261             unexpected_flags = krb5_asn1.TicketFlags(unexpected_flags)
262
263         expected_error_mode = kdc_dict.pop('expected_error_mode', 0)
264         expected_status = kdc_dict.pop('expected_status', None)
265         if expected_error_mode:
266             check_error_fn = self.generic_check_kdc_error
267             check_rep_fn = None
268         else:
269             check_error_fn = None
270             check_rep_fn = self.generic_check_kdc_rep
271
272             self.assertIsNone(expected_status)
273
274         kdc_options = kdc_dict.pop('kdc_options', '0')
275         kdc_options = krb5_asn1.KDCOptions(kdc_options)
276
277         service_decryption_key = self.TicketDecryptionKey_from_creds(
278             service_creds)
279
280         authenticator_subkey = self.RandomKey(Enctype.AES256)
281
282         etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
283                                          ARCFOUR_HMAC_MD5))
284
285         expect_edata = kdc_dict.pop('expect_edata', None)
286
287         def generate_s4u2self_padata(_kdc_exchange_dict,
288                                      _callback_dict,
289                                      req_body):
290             pa_s4u = self.PA_S4U2Self_create(
291                 name=client_cname,
292                 realm=realm,
293                 tgt_session_key=service_tgt.session_key,
294                 ctype=None)
295
296             return [pa_s4u], req_body
297
298         kdc_exchange_dict = self.tgs_exchange_dict(
299             expected_crealm=realm,
300             expected_cname=client_cname,
301             expected_srealm=realm,
302             expected_sname=service_sname,
303             expected_account_name=client_name,
304             expected_sid=sid,
305             expected_flags=expected_flags,
306             unexpected_flags=unexpected_flags,
307             ticket_decryption_key=service_decryption_key,
308             expect_ticket_checksum=True,
309             generate_padata_fn=generate_s4u2self_padata,
310             check_error_fn=check_error_fn,
311             check_rep_fn=check_rep_fn,
312             check_kdc_private_fn=self.generic_check_kdc_private,
313             expected_error_mode=expected_error_mode,
314             expected_status=expected_status,
315             tgt=service_tgt,
316             authenticator_subkey=authenticator_subkey,
317             kdc_options=str(kdc_options),
318             expect_claims=False,
319             expect_edata=expect_edata)
320
321         self._generic_kdc_exchange(kdc_exchange_dict,
322                                    cname=None,
323                                    realm=realm,
324                                    sname=service_sname,
325                                    etypes=etypes)
326
327         if not expected_error_mode:
328             # Check that the ticket contains a PAC.
329             ticket = kdc_exchange_dict['rep_ticket_creds']
330
331             pac = self.get_ticket_pac(ticket)
332             self.assertIsNotNone(pac)
333
334         # Ensure we used all the parameters given to us.
335         self.assertEqual({}, kdc_dict)
336
337     # Test performing an S4U2Self operation with a forwardable ticket. The
338     # resulting ticket should have the 'forwardable' flag set.
339     def test_s4u2self_forwardable(self):
340         self._run_s4u2self_test(
341             {
342                 'client_opts': {
343                     'not_delegated': False
344                 },
345                 'kdc_options': 'forwardable',
346                 'modify_service_tgt_fn': functools.partial(
347                     self.set_ticket_forwardable, flag=True),
348                 'expected_flags': 'forwardable'
349             })
350
351     # Test performing an S4U2Self operation with a forwardable ticket that does
352     # not contain a PAC. The request should fail.
353     def test_s4u2self_no_pac(self):
354         def forwardable_no_pac(ticket):
355             ticket = self.set_ticket_forwardable(ticket, flag=True)
356             return self.remove_ticket_pac(ticket)
357
358         self._run_s4u2self_test(
359             {
360                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
361                 'client_opts': {
362                     'not_delegated': False
363                 },
364                 'kdc_options': 'forwardable',
365                 'modify_service_tgt_fn': forwardable_no_pac,
366                 'expected_flags': 'forwardable',
367                 'expect_edata': False
368             })
369
370     # Test performing an S4U2Self operation without requesting a forwardable
371     # ticket. The resulting ticket should not have the 'forwardable' flag set.
372     def test_s4u2self_without_forwardable(self):
373         self._run_s4u2self_test(
374             {
375                 'client_opts': {
376                     'not_delegated': False
377                 },
378                 'modify_service_tgt_fn': functools.partial(
379                     self.set_ticket_forwardable, flag=True),
380                 'unexpected_flags': 'forwardable'
381             })
382
383     # Do an S4U2Self with a non-forwardable TGT. The 'forwardable' flag should
384     # not be set on the ticket.
385     def test_s4u2self_not_forwardable(self):
386         self._run_s4u2self_test(
387             {
388                 'client_opts': {
389                     'not_delegated': False
390                 },
391                 'kdc_options': 'forwardable',
392                 'modify_service_tgt_fn': functools.partial(
393                     self.set_ticket_forwardable, flag=False),
394                 'unexpected_flags': 'forwardable'
395             })
396
397     # Do an S4U2Self with the not_delegated flag set on the client. The
398     # 'forwardable' flag should not be set on the ticket.
399     def test_s4u2self_client_not_delegated(self):
400         self._run_s4u2self_test(
401             {
402                 'client_opts': {
403                     'not_delegated': True
404                 },
405                 'kdc_options': 'forwardable',
406                 'modify_service_tgt_fn': functools.partial(
407                     self.set_ticket_forwardable, flag=True),
408                 'unexpected_flags': 'forwardable'
409             })
410
411     # Do an S4U2Self with a service not trusted to authenticate for delegation,
412     # but having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
413     # flag should be set on the ticket.
414     def test_s4u2self_not_trusted_empty_allowed(self):
415         self._run_s4u2self_test(
416             {
417                 'client_opts': {
418                     'not_delegated': False
419                 },
420                 'service_opts': {
421                     'trusted_to_auth_for_delegation': False,
422                     'delegation_to_spn': ()
423                 },
424                 'kdc_options': 'forwardable',
425                 'modify_service_tgt_fn': functools.partial(
426                     self.set_ticket_forwardable, flag=True),
427                 'expected_flags': 'forwardable'
428             })
429
430     # Do an S4U2Self with a service not trusted to authenticate for delegation
431     # and having a non-empty msDS-AllowedToDelegateTo attribute. The
432     # 'forwardable' flag should not be set on the ticket.
433     def test_s4u2self_not_trusted_nonempty_allowed(self):
434         self._run_s4u2self_test(
435             {
436                 'client_opts': {
437                     'not_delegated': False
438                 },
439                 'service_opts': {
440                     'trusted_to_auth_for_delegation': False,
441                     'delegation_to_spn': ('test',)
442                 },
443                 'kdc_options': 'forwardable',
444                 'modify_service_tgt_fn': functools.partial(
445                     self.set_ticket_forwardable, flag=True),
446                 'unexpected_flags': 'forwardable'
447             })
448
449     # Do an S4U2Self with a service trusted to authenticate for delegation and
450     # having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
451     # flag should be set on the ticket.
452     def test_s4u2self_trusted_empty_allowed(self):
453         self._run_s4u2self_test(
454             {
455                 'client_opts': {
456                     'not_delegated': False
457                 },
458                 'service_opts': {
459                     'trusted_to_auth_for_delegation': True,
460                     'delegation_to_spn': ()
461                 },
462                 'kdc_options': 'forwardable',
463                 'modify_service_tgt_fn': functools.partial(
464                     self.set_ticket_forwardable, flag=True),
465                 'expected_flags': 'forwardable'
466             })
467
468     # Do an S4U2Self with a service trusted to authenticate for delegation and
469     # having a non-empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
470     # flag should be set on the ticket.
471     def test_s4u2self_trusted_nonempty_allowed(self):
472         self._run_s4u2self_test(
473             {
474                 'client_opts': {
475                     'not_delegated': False
476                 },
477                 'service_opts': {
478                     'trusted_to_auth_for_delegation': True,
479                     'delegation_to_spn': ('test',)
480                 },
481                 'kdc_options': 'forwardable',
482                 'modify_service_tgt_fn': functools.partial(
483                     self.set_ticket_forwardable, flag=True),
484                 'expected_flags': 'forwardable'
485             })
486
487     # Do an S4U2Self with the sname in the request different to that of the
488     # service. We expect an error.
489     def test_s4u2self_wrong_sname(self):
490         other_creds = self.get_cached_creds(
491             account_type=self.AccountType.COMPUTER,
492             opts={
493                 'trusted_to_auth_for_delegation': True,
494                 'id': 0
495             })
496         other_sname = other_creds.get_username()[:-1]
497
498         self._run_s4u2self_test(
499             {
500                 'expected_error_mode': KDC_ERR_BADMATCH,
501                 'expect_edata': False,
502                 'client_opts': {
503                     'not_delegated': False
504                 },
505                 'service_opts': {
506                     'trusted_to_auth_for_delegation': True
507                 },
508                 'service_name': other_sname,
509                 'kdc_options': 'forwardable',
510                 'modify_service_tgt_fn': functools.partial(
511                     self.set_ticket_forwardable, flag=True)
512             })
513
514     # Do an S4U2Self where the service does not require authorization data. The
515     # resulting ticket should still contain a PAC.
516     def test_s4u2self_no_auth_data_required(self):
517         self._run_s4u2self_test(
518             {
519                 'client_opts': {
520                     'not_delegated': False
521                 },
522                 'service_opts': {
523                     'trusted_to_auth_for_delegation': True,
524                     'no_auth_data_required': True
525                 },
526                 'kdc_options': 'forwardable',
527                 'modify_service_tgt_fn': functools.partial(
528                     self.set_ticket_forwardable, flag=True),
529                 'expected_flags': 'forwardable'
530             })
531
532     def _run_delegation_test(self, kdc_dict):
533         client_opts = kdc_dict.pop('client_opts', None)
534         client_creds = self.get_cached_creds(
535             account_type=self.AccountType.USER,
536             opts=client_opts)
537
538         samdb = self.get_samdb()
539         client_dn = client_creds.get_dn()
540         sid = self.get_objectSid(samdb, client_dn)
541
542         service1_opts = kdc_dict.pop('service1_opts', {})
543         service2_opts = kdc_dict.pop('service2_opts', {})
544
545         allow_delegation = kdc_dict.pop('allow_delegation', False)
546         allow_rbcd = kdc_dict.pop('allow_rbcd', False)
547         self.assertFalse(allow_delegation and allow_rbcd)
548
549         if allow_rbcd:
550             service1_creds = self.get_cached_creds(
551                 account_type=self.AccountType.COMPUTER,
552                 opts=service1_opts)
553
554             self.assertNotIn('delegation_from_dn', service2_opts)
555             service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
556
557             service2_creds = self.get_cached_creds(
558                 account_type=self.AccountType.COMPUTER,
559                 opts=service2_opts)
560         else:
561             service2_creds = self.get_cached_creds(
562                 account_type=self.AccountType.COMPUTER,
563                 opts=service2_opts)
564
565             if allow_delegation:
566                 self.assertNotIn('delegation_to_spn', service1_opts)
567                 service1_opts['delegation_to_spn'] = service2_creds.get_spn()
568
569             service1_creds = self.get_cached_creds(
570                 account_type=self.AccountType.COMPUTER,
571                 opts=service1_opts)
572
573         client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
574         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
575
576         client_tgt = self.get_tgt(client_creds,
577                                   kdc_options=client_tkt_options,
578                                   expected_flags=expected_flags)
579         client_service_tkt = self.get_service_ticket(
580             client_tgt,
581             service1_creds,
582             kdc_options=client_tkt_options,
583             expected_flags=expected_flags)
584
585         service1_tgt = self.get_tgt(service1_creds)
586
587         modify_client_tkt_fn = kdc_dict.pop('modify_client_tkt_fn', None)
588         if modify_client_tkt_fn is not None:
589             client_service_tkt = modify_client_tkt_fn(client_service_tkt)
590
591         additional_tickets = [client_service_tkt.ticket]
592
593         modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
594         if modify_service_tgt_fn is not None:
595             service1_tgt = modify_service_tgt_fn(service1_tgt)
596
597         kdc_options = kdc_dict.pop('kdc_options', None)
598         if kdc_options is None:
599             kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
600
601         client_username = client_creds.get_username()
602         client_realm = client_creds.get_realm()
603         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
604                                                  names=[client_username])
605
606         service1_name = service1_creds.get_username()[:-1]
607         service1_realm = service1_creds.get_realm()
608
609         service2_name = service2_creds.get_username()[:-1]
610         service2_realm = service2_creds.get_realm()
611         service2_service = 'host'
612         service2_sname = self.PrincipalName_create(
613             name_type=NT_PRINCIPAL, names=[service2_service,
614                                            service2_name])
615         service2_decryption_key = self.TicketDecryptionKey_from_creds(
616             service2_creds)
617         service2_etypes = service2_creds.tgs_supported_enctypes
618
619         expected_error_mode = kdc_dict.pop('expected_error_mode')
620         expected_status = kdc_dict.pop('expected_status', None)
621         if expected_error_mode:
622             check_error_fn = self.generic_check_kdc_error
623             check_rep_fn = None
624         else:
625             check_error_fn = None
626             check_rep_fn = self.generic_check_kdc_rep
627
628             self.assertIsNone(expected_status)
629
630         expect_edata = kdc_dict.pop('expect_edata', None)
631         if expect_edata is not None:
632             self.assertTrue(expected_error_mode)
633
634         pac_options = kdc_dict.pop('pac_options', None)
635
636         authenticator_subkey = self.RandomKey(Enctype.AES256)
637
638         etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
639                                          ARCFOUR_HMAC_MD5))
640
641         expected_proxy_target = service2_creds.get_spn()
642
643         expected_transited_services = kdc_dict.pop(
644             'expected_transited_services', [])
645
646         transited_service = f'host/{service1_name}@{service1_realm}'
647         expected_transited_services.append(transited_service)
648
649         expect_pac = kdc_dict.pop('expect_pac', True)
650
651         kdc_exchange_dict = self.tgs_exchange_dict(
652             expected_crealm=client_realm,
653             expected_cname=client_cname,
654             expected_srealm=service2_realm,
655             expected_sname=service2_sname,
656             expected_account_name=client_username,
657             expected_sid=sid,
658             expected_supported_etypes=service2_etypes,
659             ticket_decryption_key=service2_decryption_key,
660             check_error_fn=check_error_fn,
661             check_rep_fn=check_rep_fn,
662             check_kdc_private_fn=self.generic_check_kdc_private,
663             expected_error_mode=expected_error_mode,
664             expected_status=expected_status,
665             callback_dict={},
666             tgt=service1_tgt,
667             authenticator_subkey=authenticator_subkey,
668             kdc_options=kdc_options,
669             pac_options=pac_options,
670             expect_edata=expect_edata,
671             expected_proxy_target=expected_proxy_target,
672             expected_transited_services=expected_transited_services,
673             expect_pac=expect_pac)
674
675         self._generic_kdc_exchange(kdc_exchange_dict,
676                                    cname=None,
677                                    realm=service2_realm,
678                                    sname=service2_sname,
679                                    etypes=etypes,
680                                    additional_tickets=additional_tickets)
681
682         if not expected_error_mode:
683             # Check whether the ticket contains a PAC.
684             ticket = kdc_exchange_dict['rep_ticket_creds']
685             pac = self.get_ticket_pac(ticket, expect_pac=expect_pac)
686             if expect_pac:
687                 self.assertIsNotNone(pac)
688             else:
689                 self.assertIsNone(pac)
690
691         # Ensure we used all the parameters given to us.
692         self.assertEqual({}, kdc_dict)
693
694     def test_constrained_delegation(self):
695         # Test constrained delegation.
696         self._run_delegation_test(
697             {
698                 'expected_error_mode': 0,
699                 'allow_delegation': True
700             })
701
702     def test_constrained_delegation_no_auth_data_required(self):
703         # Test constrained delegation.
704         self._run_delegation_test(
705             {
706                 'expected_error_mode': 0,
707                 'allow_delegation': True,
708                 'service2_opts': {
709                     'no_auth_data_required': True
710                 },
711                 'expect_pac': False
712             })
713
714     def test_constrained_delegation_existing_delegation_info(self):
715         # Test constrained delegation with an existing S4U_DELEGATION_INFO
716         # structure in the PAC.
717
718         services = ['service1', 'service2', 'service3']
719
720         self._run_delegation_test(
721             {
722                 'expected_error_mode': 0,
723                 'allow_delegation': True,
724                 'modify_client_tkt_fn': functools.partial(
725                     self.add_delegation_info, services=services),
726                 'expected_transited_services': services
727             })
728
729     def test_constrained_delegation_not_allowed(self):
730         # Test constrained delegation when the delegating service does not
731         # allow it.
732         self._run_delegation_test(
733             {
734                 'expected_error_mode': KDC_ERR_BADOPTION,
735                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
736                 'allow_delegation': False
737             })
738
739     def test_constrained_delegation_no_client_pac(self):
740         # Test constrained delegation when the client service ticket does not
741         # contain a PAC.
742         self._run_delegation_test(
743             {
744                 'expected_error_mode': (KDC_ERR_MODIFIED,
745                                         KDC_ERR_TGT_REVOKED),
746                 'allow_delegation': True,
747                 'modify_client_tkt_fn': self.remove_ticket_pac,
748                 'expect_edata': False
749             })
750
751     def test_constrained_delegation_no_service_pac(self):
752         # Test constrained delegation when the service TGT does not contain a
753         # PAC.
754         self._run_delegation_test(
755             {
756                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
757                 'allow_delegation': True,
758                 'modify_service_tgt_fn': self.remove_ticket_pac,
759                 'expect_edata': False
760             })
761
762     def test_constrained_delegation_no_client_pac_no_auth_data_required(self):
763         # Test constrained delegation when the client service ticket does not
764         # contain a PAC.
765         self._run_delegation_test(
766             {
767                 'expected_error_mode': (KDC_ERR_MODIFIED,
768                                         KDC_ERR_BADOPTION),
769                 'allow_delegation': True,
770                 'modify_client_tkt_fn': self.remove_ticket_pac,
771                 'expect_edata': False,
772                 'service2_opts': {
773                     'no_auth_data_required': True
774                 }
775             })
776
777     def test_constrained_delegation_no_service_pac_no_auth_data_required(self):
778         # Test constrained delegation when the service TGT does not contain a
779         # PAC.
780         self._run_delegation_test(
781             {
782                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
783                 'allow_delegation': True,
784                 'modify_service_tgt_fn': self.remove_ticket_pac,
785                 'service2_opts': {
786                     'no_auth_data_required': True
787                 },
788                 'expect_pac': False,
789                 'expect_edata': False
790             })
791
792     def test_constrained_delegation_non_forwardable(self):
793         # Test constrained delegation with a non-forwardable ticket.
794         self._run_delegation_test(
795             {
796                 'expected_error_mode': KDC_ERR_BADOPTION,
797                 'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
798                 'allow_delegation': True,
799                 'modify_client_tkt_fn': functools.partial(
800                     self.set_ticket_forwardable, flag=False)
801             })
802
803     def test_constrained_delegation_pac_options_rbcd(self):
804         # Test constrained delegation, but with the RBCD bit set in the PAC
805         # options.
806         self._run_delegation_test(
807             {
808                 'expected_error_mode': 0,
809                 'pac_options': '0001',  # supports RBCD
810                 'allow_delegation': True
811             })
812
813     def test_rbcd_no_auth_data_required(self):
814         self._run_delegation_test(
815             {
816                 'expected_error_mode': 0,
817                 'allow_rbcd': True,
818                 'pac_options': '0001',  # supports RBCD
819                 'service2_opts': {
820                     'no_auth_data_required': True
821                 },
822                 'expect_pac': False
823             })
824
825     def test_rbcd_existing_delegation_info(self):
826         # Test constrained delegation with an existing S4U_DELEGATION_INFO
827         # structure in the PAC.
828
829         services = ['service1', 'service2', 'service3']
830
831         self._run_delegation_test(
832             {
833                 'expected_error_mode': 0,
834                 'allow_rbcd': True,
835                 'pac_options': '0001',  # supports RBCD
836                 'modify_client_tkt_fn': functools.partial(
837                     self.add_delegation_info, services=services),
838                 'expected_transited_services': services
839             })
840
841     def test_rbcd_not_allowed(self):
842         # Test resource-based constrained delegation when the target service
843         # does not allow it.
844         self._run_delegation_test(
845             {
846                 'expected_error_mode': KDC_ERR_BADOPTION,
847                 'expected_status': ntstatus.NT_STATUS_NOT_FOUND,
848                 'allow_rbcd': False,
849                 'pac_options': '0001'  # supports RBCD
850             })
851
852     def test_rbcd_no_client_pac_a(self):
853         # Test constrained delegation when the client service ticket does not
854         # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
855         self._run_delegation_test(
856             {
857                 'expected_error_mode': KDC_ERR_MODIFIED,
858                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
859                 'allow_rbcd': True,
860                 'pac_options': '0001',  # supports RBCD
861                 'modify_client_tkt_fn': self.remove_ticket_pac
862             })
863
864     def test_rbcd_no_client_pac_b(self):
865         # Test constrained delegation when the client service ticket does not
866         # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
867         self._run_delegation_test(
868             {
869                 'expected_error_mode': KDC_ERR_MODIFIED,
870                 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
871                 'allow_rbcd': True,
872                 'pac_options': '0001',  # supports RBCD
873                 'modify_client_tkt_fn': self.remove_ticket_pac,
874                 'service1_opts': {
875                     'delegation_to_spn': ('host/test')
876                 }
877             })
878
879     def test_rbcd_no_service_pac(self):
880         # Test constrained delegation when the service TGT does not contain a
881         # PAC.
882         self._run_delegation_test(
883             {
884                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
885                 'allow_rbcd': True,
886                 'pac_options': '0001',  # supports RBCD
887                 'modify_service_tgt_fn': self.remove_ticket_pac,
888                 'expect_edata': False
889             })
890
891     def test_rbcd_no_client_pac_no_auth_data_required_a(self):
892         # Test constrained delegation when the client service ticket does not
893         # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
894         self._run_delegation_test(
895             {
896                 'expected_error_mode': KDC_ERR_MODIFIED,
897                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
898                 'allow_rbcd': True,
899                 'pac_options': '0001',  # supports RBCD
900                 'modify_client_tkt_fn': self.remove_ticket_pac,
901                 'service2_opts': {
902                     'no_auth_data_required': True
903                 }
904             })
905
906     def test_rbcd_no_client_pac_no_auth_data_required_b(self):
907         # Test constrained delegation when the client service ticket does not
908         # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
909         self._run_delegation_test(
910             {
911                 'expected_error_mode': KDC_ERR_MODIFIED,
912                 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
913                 'allow_rbcd': True,
914                 'pac_options': '0001',  # supports RBCD
915                 'modify_client_tkt_fn': self.remove_ticket_pac,
916                 'service1_opts': {
917                     'delegation_to_spn': ('host/test')
918                 },
919                 'service2_opts': {
920                     'no_auth_data_required': True
921                 }
922             })
923
924     def test_rbcd_no_service_pac_no_auth_data_required(self):
925         # Test constrained delegation when the service TGT does not contain a
926         # PAC.
927         self._run_delegation_test(
928             {
929                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
930                 'allow_rbcd': True,
931                 'pac_options': '0001',  # supports RBCD
932                 'modify_service_tgt_fn': self.remove_ticket_pac,
933                 'service2_opts': {
934                     'no_auth_data_required': True
935                 },
936                 'expect_edata': False
937             })
938
939     def test_rbcd_non_forwardable(self):
940         # Test resource-based constrained delegation with a non-forwardable
941         # ticket.
942         self._run_delegation_test(
943             {
944                 'expected_error_mode': KDC_ERR_BADOPTION,
945                 'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
946                 'allow_rbcd': True,
947                 'pac_options': '0001',  # supports RBCD
948                 'modify_client_tkt_fn': functools.partial(
949                     self.set_ticket_forwardable, flag=False)
950             })
951
952     def test_rbcd_no_pac_options_a(self):
953         # Test resource-based constrained delegation without the RBCD bit set
954         # in the PAC options, and an empty msDS-AllowedToDelegateTo attribute.
955         self._run_delegation_test(
956             {
957                 'expected_error_mode': KDC_ERR_BADOPTION,
958                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
959                 'allow_rbcd': True,
960                 'pac_options': '1'  # does not support RBCD
961             })
962
963     def test_rbcd_no_pac_options_b(self):
964         # Test resource-based constrained delegation without the RBCD bit set
965         # in the PAC options, and a non-empty msDS-AllowedToDelegateTo
966         # attribute.
967         self._run_delegation_test(
968             {
969                 'expected_error_mode': KDC_ERR_BADOPTION,
970                 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
971                 'allow_rbcd': True,
972                 'pac_options': '1',  # does not support RBCD
973                 'service1_opts': {
974                     'delegation_to_spn': ('host/test')
975                 }
976             })
977
978     def test_bronze_bit_constrained_delegation_old_checksum(self):
979         # Attempt to modify the ticket without updating the PAC checksums.
980         self._run_delegation_test(
981             {
982                 'expected_error_mode': (KDC_ERR_MODIFIED,
983                                         KDC_ERR_BAD_INTEGRITY),
984                 'allow_delegation': True,
985                 'client_tkt_options': '0',  # non-forwardable ticket
986                 'modify_client_tkt_fn': functools.partial(
987                     self.set_ticket_forwardable,
988                     flag=True, update_pac_checksums=False),
989                 'expect_edata': False
990             })
991
992     def test_bronze_bit_rbcd_old_checksum(self):
993         # Attempt to modify the ticket without updating the PAC checksums.
994         self._run_delegation_test(
995             {
996                 'expected_error_mode': KDC_ERR_MODIFIED,
997                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
998                 'allow_rbcd': True,
999                 'pac_options': '0001',  # supports RBCD
1000                 'client_tkt_options': '0',  # non-forwardable ticket
1001                 'modify_client_tkt_fn': functools.partial(
1002                     self.set_ticket_forwardable,
1003                     flag=True, update_pac_checksums=False)
1004             })
1005
1006     def test_constrained_delegation_missing_client_checksum(self):
1007         # Present a user ticket without the required checksums.
1008         for checksum in self.pac_checksum_types:
1009             with self.subTest(checksum=checksum):
1010                 if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
1011                     expected_error_mode = (KDC_ERR_MODIFIED,
1012                                            KDC_ERR_BADOPTION)
1013                 else:
1014                     expected_error_mode = KDC_ERR_GENERIC
1015
1016                 self._run_delegation_test(
1017                     {
1018                         'expected_error_mode': expected_error_mode,
1019                         'allow_delegation': True,
1020                         'modify_client_tkt_fn': functools.partial(
1021                             self.remove_pac_checksum, checksum=checksum),
1022                         'expect_edata': False
1023                     })
1024
1025     def test_constrained_delegation_missing_service_checksum(self):
1026         # Present the service's ticket without the required checksums.
1027         for checksum in filter(lambda x: x != krb5pac.PAC_TYPE_TICKET_CHECKSUM,
1028                                self.pac_checksum_types):
1029             with self.subTest(checksum=checksum):
1030                 self._run_delegation_test(
1031                     {
1032                         'expected_error_mode': KDC_ERR_GENERIC,
1033                         'expected_status':
1034                             ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
1035                         'allow_delegation': True,
1036                         'modify_service_tgt_fn': functools.partial(
1037                             self.remove_pac_checksum, checksum=checksum)
1038                     })
1039
1040     def test_rbcd_missing_client_checksum(self):
1041         # Present a user ticket without the required checksums.
1042         for checksum in self.pac_checksum_types:
1043             with self.subTest(checksum=checksum):
1044                 if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
1045                     expected_error_mode = KDC_ERR_MODIFIED
1046                 else:
1047                     expected_error_mode = KDC_ERR_GENERIC
1048
1049                 self._run_delegation_test(
1050                     {
1051                         'expected_error_mode': expected_error_mode,
1052                         'expected_status':
1053                             ntstatus.NT_STATUS_NOT_SUPPORTED,
1054                         'allow_rbcd': True,
1055                         'pac_options': '0001',  # supports RBCD
1056                         'modify_client_tkt_fn': functools.partial(
1057                             self.remove_pac_checksum, checksum=checksum)
1058                     })
1059
1060     def test_rbcd_missing_service_checksum(self):
1061         # Present the service's ticket without the required checksums.
1062         for checksum in filter(lambda x: x != krb5pac.PAC_TYPE_TICKET_CHECKSUM,
1063                                self.pac_checksum_types):
1064             with self.subTest(checksum=checksum):
1065                 self._run_delegation_test(
1066                     {
1067                         'expected_error_mode': KDC_ERR_GENERIC,
1068                         'expected_status':
1069                             ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
1070                         'allow_rbcd': True,
1071                         'pac_options': '0001',  # supports RBCD
1072                         'modify_service_tgt_fn': functools.partial(
1073                             self.remove_pac_checksum, checksum=checksum)
1074                     })
1075
1076     def test_constrained_delegation_zeroed_client_checksum(self):
1077         # Present a user ticket with invalid checksums.
1078         for checksum in self.pac_checksum_types:
1079             with self.subTest(checksum=checksum):
1080                 self._run_delegation_test(
1081                     {
1082                         'expected_error_mode': (KDC_ERR_MODIFIED,
1083                                                 KDC_ERR_BAD_INTEGRITY),
1084                         'allow_delegation': True,
1085                         'modify_client_tkt_fn': functools.partial(
1086                             self.zeroed_pac_checksum, checksum=checksum),
1087                         'expect_edata': False
1088                     })
1089
1090     def test_constrained_delegation_zeroed_service_checksum(self):
1091         # Present the service's ticket with invalid checksums.
1092         for checksum in self.pac_checksum_types:
1093             with self.subTest(checksum=checksum):
1094                 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1095                     expected_error_mode = (KDC_ERR_MODIFIED,
1096                                            KDC_ERR_BAD_INTEGRITY)
1097                     expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
1098                 else:
1099                     expected_error_mode = 0
1100                     expected_status = None
1101
1102                 self._run_delegation_test(
1103                     {
1104                         'expected_error_mode': expected_error_mode,
1105                         'expected_status': expected_status,
1106                         'allow_delegation': True,
1107                         'modify_service_tgt_fn': functools.partial(
1108                             self.zeroed_pac_checksum, checksum=checksum)
1109                     })
1110
1111     def test_rbcd_zeroed_client_checksum(self):
1112         # Present a user ticket with invalid checksums.
1113         for checksum in self.pac_checksum_types:
1114             with self.subTest(checksum=checksum):
1115                 self._run_delegation_test(
1116                     {
1117                         'expected_error_mode': KDC_ERR_MODIFIED,
1118                         'expected_status':
1119                             ntstatus.NT_STATUS_NOT_SUPPORTED,
1120                         'allow_rbcd': True,
1121                         'pac_options': '0001',  # supports RBCD
1122                         'modify_client_tkt_fn': functools.partial(
1123                             self.zeroed_pac_checksum, checksum=checksum)
1124                     })
1125
1126     def test_rbcd_zeroed_service_checksum(self):
1127         # Present the service's ticket with invalid checksums.
1128         for checksum in self.pac_checksum_types:
1129             with self.subTest(checksum=checksum):
1130                 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1131                     expected_error_mode = KDC_ERR_MODIFIED
1132                     expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
1133                 else:
1134                     expected_error_mode = 0
1135                     expected_status = None
1136
1137                 self._run_delegation_test(
1138                     {
1139                         'expected_error_mode': expected_error_mode,
1140                         'expected_status': expected_status,
1141                         'allow_rbcd': True,
1142                         'pac_options': '0001',  # supports RBCD
1143                         'modify_service_tgt_fn': functools.partial(
1144                             self.zeroed_pac_checksum, checksum=checksum)
1145                     })
1146
1147     unkeyed_ctypes = {Cksumtype.MD5, Cksumtype.SHA1, Cksumtype.CRC32}
1148
1149     def test_constrained_delegation_unkeyed_client_checksum(self):
1150         # Present a user ticket with invalid checksums.
1151         for checksum in self.pac_checksum_types:
1152             for ctype in self.unkeyed_ctypes:
1153                 with self.subTest(checksum=checksum, ctype=ctype):
1154                     if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
1155                             and ctype == Cksumtype.SHA1):
1156                         expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1157                                                KDC_ERR_INAPP_CKSUM)
1158                     else:
1159                         expected_error_mode = (KDC_ERR_GENERIC,
1160                                                KDC_ERR_INAPP_CKSUM)
1161
1162                     self._run_delegation_test(
1163                         {
1164                             'expected_error_mode': expected_error_mode,
1165                             'allow_delegation': True,
1166                             'modify_client_tkt_fn': functools.partial(
1167                                 self.unkeyed_pac_checksum,
1168                                 checksum=checksum, ctype=ctype),
1169                             'expect_edata': False
1170                         })
1171
1172     def test_constrained_delegation_unkeyed_service_checksum(self):
1173         # Present the service's ticket with invalid checksums.
1174         for checksum in self.pac_checksum_types:
1175             for ctype in self.unkeyed_ctypes:
1176                 with self.subTest(checksum=checksum, ctype=ctype):
1177                     if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1178                         if ctype == Cksumtype.SHA1:
1179                             expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1180                                                    KDC_ERR_INAPP_CKSUM)
1181                             expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
1182                         else:
1183                             expected_error_mode = (KDC_ERR_GENERIC,
1184                                                    KDC_ERR_INAPP_CKSUM)
1185                             expected_status = (
1186                                 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1187                     else:
1188                         expected_error_mode = 0
1189                         expected_status = None
1190
1191                     self._run_delegation_test(
1192                         {
1193                             'expected_error_mode': expected_error_mode,
1194                             'expected_status': expected_status,
1195                             'allow_delegation': True,
1196                             'modify_service_tgt_fn': functools.partial(
1197                                 self.unkeyed_pac_checksum,
1198                                 checksum=checksum, ctype=ctype)
1199                         })
1200
1201     def test_rbcd_unkeyed_client_checksum(self):
1202         # Present a user ticket with invalid checksums.
1203         for checksum in self.pac_checksum_types:
1204             for ctype in self.unkeyed_ctypes:
1205                 with self.subTest(checksum=checksum, ctype=ctype):
1206                     if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
1207                             and ctype == Cksumtype.SHA1):
1208                         expected_error_mode = KDC_ERR_SUMTYPE_NOSUPP
1209                     else:
1210                         expected_error_mode = KDC_ERR_GENERIC
1211
1212                     self._run_delegation_test(
1213                         {
1214                             'expected_error_mode': expected_error_mode,
1215                             'expected_status':
1216                                 ntstatus.NT_STATUS_NOT_SUPPORTED,
1217                             'allow_rbcd': True,
1218                             'pac_options': '0001',  # supports RBCD
1219                             'modify_client_tkt_fn': functools.partial(
1220                                 self.unkeyed_pac_checksum,
1221                                 checksum=checksum, ctype=ctype)
1222                         })
1223
1224     def test_rbcd_unkeyed_service_checksum(self):
1225         # Present the service's ticket with invalid checksums.
1226         for checksum in self.pac_checksum_types:
1227             for ctype in self.unkeyed_ctypes:
1228                 with self.subTest(checksum=checksum, ctype=ctype):
1229                     if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1230                         if ctype == Cksumtype.SHA1:
1231                             expected_error_mode = KDC_ERR_SUMTYPE_NOSUPP
1232                             expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
1233                         else:
1234                             expected_error_mode = KDC_ERR_GENERIC
1235                             expected_status = (
1236                                 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1237                     else:
1238                         expected_error_mode = 0
1239                         expected_status = None
1240
1241                     self._run_delegation_test(
1242                         {
1243                             'expected_error_mode': expected_error_mode,
1244                             'expected_status': expected_status,
1245                             'allow_rbcd': True,
1246                             'pac_options': '0001',  # supports RBCD
1247                             'modify_service_tgt_fn': functools.partial(
1248                                 self.unkeyed_pac_checksum,
1249                                 checksum=checksum, ctype=ctype)
1250                         })
1251
1252     def remove_pac_checksum(self, ticket, checksum):
1253         checksum_keys = self.get_krbtgt_checksum_key()
1254
1255         return self.modified_ticket(ticket,
1256                                     checksum_keys=checksum_keys,
1257                                     include_checksums={checksum: False})
1258
1259     def zeroed_pac_checksum(self, ticket, checksum):
1260         krbtgt_creds = self.get_krbtgt_creds()
1261         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1262
1263         server_key = ticket.decryption_key
1264
1265         checksum_keys = {
1266             krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1267             krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
1268             krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
1269         }
1270
1271         if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1272             zeroed_key = server_key
1273         else:
1274             zeroed_key = krbtgt_key
1275
1276         checksum_keys[checksum] = ZeroedChecksumKey(zeroed_key.key,
1277                                                     zeroed_key.kvno)
1278
1279         return self.modified_ticket(ticket,
1280                                     checksum_keys=checksum_keys,
1281                                     include_checksums={checksum: True})
1282
1283     def unkeyed_pac_checksum(self, ticket, checksum, ctype):
1284         krbtgt_creds = self.get_krbtgt_creds()
1285         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1286
1287         server_key = ticket.decryption_key
1288
1289         checksum_keys = {
1290             krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1291             krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
1292             krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
1293         }
1294
1295         # Make a copy of the existing key and change the ctype.
1296         key = checksum_keys[checksum]
1297         new_key = RodcPacEncryptionKey(key.key, key.kvno)
1298         new_key.ctype = ctype
1299         checksum_keys[checksum] = new_key
1300
1301         return self.modified_ticket(ticket,
1302                                     checksum_keys=checksum_keys,
1303                                     include_checksums={checksum: True})
1304
1305     def add_delegation_info(self, ticket, services=None):
1306         def modify_pac_fn(pac):
1307             pac_buffers = pac.buffers
1308             self.assertNotIn(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION,
1309                              (buffer.type for buffer in pac_buffers))
1310
1311             transited_services = list(map(lsa.String, services))
1312
1313             delegation = krb5pac.PAC_CONSTRAINED_DELEGATION()
1314             delegation.proxy_target = lsa.String('test_proxy_target')
1315             delegation.transited_services = transited_services
1316             delegation.num_transited_services = len(transited_services)
1317
1318             info = krb5pac.PAC_CONSTRAINED_DELEGATION_CTR()
1319             info.info = delegation
1320
1321             pac_buffer = krb5pac.PAC_BUFFER()
1322             pac_buffer.type = krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION
1323             pac_buffer.info = info
1324
1325             pac_buffers.append(pac_buffer)
1326
1327             pac.buffers = pac_buffers
1328             pac.num_buffers += 1
1329
1330             return pac
1331
1332         checksum_keys = self.get_krbtgt_checksum_key()
1333
1334         return self.modified_ticket(ticket,
1335                                     checksum_keys=checksum_keys,
1336                                     modify_pac_fn=modify_pac_fn)
1337
1338     def set_ticket_forwardable(self, ticket, flag, update_pac_checksums=True):
1339         flag = '1' if flag else '0'
1340
1341         def modify_fn(enc_part):
1342             # Reset the forwardable flag
1343             forwardable_pos = (len(tuple(krb5_asn1.TicketFlags('forwardable')))
1344                                - 1)
1345
1346             flags = enc_part['flags']
1347             self.assertLessEqual(forwardable_pos, len(flags))
1348             enc_part['flags'] = (flags[:forwardable_pos] +
1349                                  flag +
1350                                  flags[forwardable_pos+1:])
1351
1352             return enc_part
1353
1354         if update_pac_checksums:
1355             checksum_keys = self.get_krbtgt_checksum_key()
1356         else:
1357             checksum_keys = None
1358
1359         return self.modified_ticket(ticket,
1360                                     modify_fn=modify_fn,
1361                                     checksum_keys=checksum_keys,
1362                                     update_pac_checksums=update_pac_checksums)
1363
1364     def remove_ticket_pac(self, ticket):
1365         return self.modified_ticket(ticket,
1366                                     exclude_pac=True)
1367
1368
1369 if __name__ == "__main__":
1370     global_asn1_print = False
1371     global_hexdump = False
1372     import unittest
1373     unittest.main()