2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
26 from samba import ntstatus
27 from samba.dcerpc import krb5pac, lsa
29 from samba.tests import env_get_var_value
30 from samba.tests.krb5.kcrypto import Cksumtype, Enctype
31 from samba.tests.krb5.kdc_base_test import KDCBaseTest
32 from samba.tests.krb5.raw_testcase import (
36 from samba.tests.krb5.rfc4120_constants import (
37 AES256_CTS_HMAC_SHA1_96,
41 KDC_ERR_BAD_INTEGRITY,
45 KDC_ERR_SUMTYPE_NOSUPP,
49 KU_TGS_REP_ENC_PART_SUB_KEY,
52 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
54 global_asn1_print = False
55 global_hexdump = False
58 class S4UKerberosTests(KDCBaseTest):
61 super(S4UKerberosTests, self).setUp()
62 self.do_asn1_print = global_asn1_print
63 self.do_hexdump = global_hexdump
65 def _test_s4u2self(self, pa_s4u2self_ctype=None):
66 service_creds = self.get_service_creds()
67 service = service_creds.get_username()
68 realm = service_creds.get_realm()
70 cname = self.PrincipalName_create(name_type=1, names=[service])
71 sname = self.PrincipalName_create(name_type=2, names=["krbtgt", realm])
73 till = self.get_KerberosTime(offset=36000)
75 kdc_options = krb5_asn1.KDCOptions('forwardable')
80 req = self.AS_REQ_create(padata=padata,
81 kdc_options=str(kdc_options),
91 additional_tickets=None)
92 rep = self.send_recv_transaction(req)
93 self.assertIsNotNone(rep)
95 self.assertEqual(rep['msg-type'], 30)
96 self.assertEqual(rep['error-code'], 25)
97 rep_padata = self.der_decode(
98 rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
100 for pa in rep_padata:
101 if pa['padata-type'] == 19:
102 etype_info2 = pa['padata-value']
105 etype_info2 = self.der_decode(
106 etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
108 key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0])
110 (patime, pausec) = self.get_KerberosTimeWithUsec()
111 pa_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
112 pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
114 pa_ts = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, pa_ts)
115 pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.EncryptedData())
117 pa_ts = self.PA_DATA_create(2, pa_ts)
119 kdc_options = krb5_asn1.KDCOptions('forwardable')
122 req = self.AS_REQ_create(padata=padata,
123 kdc_options=str(kdc_options),
133 additional_tickets=None)
134 rep = self.send_recv_transaction(req)
135 self.assertIsNotNone(rep)
137 msg_type = rep['msg-type']
138 self.assertEqual(msg_type, 11)
140 enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
141 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
144 enc_part2 = self.der_decode(
145 enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
147 enc_part2 = self.der_decode(
148 enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
153 for_user_name = env_get_var_value('FOR_USER')
154 uname = self.PrincipalName_create(name_type=1, names=[for_user_name])
156 kdc_options = krb5_asn1.KDCOptions('forwardable')
157 till = self.get_KerberosTime(offset=36000)
158 ticket = rep['ticket']
159 ticket_session_key = self.EncryptionKey_import(enc_part2['key'])
160 pa_s4u = self.PA_S4U2Self_create(name=uname, realm=realm,
161 tgt_session_key=ticket_session_key,
162 ctype=pa_s4u2self_ctype)
165 subkey = self.RandomKey(ticket_session_key.etype)
167 (ctime, cusec) = self.get_KerberosTimeWithUsec()
169 req = self.TGS_REQ_create(padata=padata,
173 kdc_options=str(kdc_options),
183 EncAuthorizationData=None,
184 EncAuthorizationData_key=None,
185 additional_tickets=None,
186 ticket_session_key=ticket_session_key,
187 authenticator_subkey=subkey)
188 rep = self.send_recv_transaction(req)
189 self.assertIsNotNone(rep)
191 msg_type = rep['msg-type']
193 enc_part2 = subkey.decrypt(
194 KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
195 enc_part2 = self.der_decode(
196 enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
200 # Using the checksum type from the tgt_session_key happens to work
202 def test_s4u2self(self):
203 msg_type = self._test_s4u2self()
204 self.assertEqual(msg_type, 13)
206 # Per spec, the checksum of PA-FOR-USER is HMAC_MD5, see [MS-SFU] 2.2.1
207 def test_s4u2self_hmac_md5_checksum(self):
208 msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.HMAC_MD5)
209 self.assertEqual(msg_type, 13)
211 def test_s4u2self_md5_unkeyed_checksum(self):
212 msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.MD5)
213 self.assertEqual(msg_type, 30)
215 def test_s4u2self_sha1_unkeyed_checksum(self):
216 msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.SHA1)
217 self.assertEqual(msg_type, 30)
219 def test_s4u2self_crc32_unkeyed_checksum(self):
220 msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32)
221 self.assertEqual(msg_type, 30)
223 def _run_s4u2self_test(self, kdc_dict):
224 client_opts = kdc_dict.pop('client_opts', None)
225 client_creds = self.get_cached_creds(
226 account_type=self.AccountType.USER,
229 service_opts = kdc_dict.pop('service_opts', None)
230 service_creds = self.get_cached_creds(
231 account_type=self.AccountType.COMPUTER,
234 service_tgt = self.get_tgt(service_creds)
235 modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
236 if modify_service_tgt_fn is not None:
237 service_tgt = modify_service_tgt_fn(service_tgt)
239 client_name = client_creds.get_username()
240 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
243 samdb = self.get_samdb()
244 client_dn = client_creds.get_dn()
245 sid = self.get_objectSid(samdb, client_dn)
247 service_name = kdc_dict.pop('service_name', None)
248 if service_name is None:
249 service_name = service_creds.get_username()[:-1]
250 service_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
251 names=['host', service_name])
253 realm = client_creds.get_realm()
255 expected_flags = kdc_dict.pop('expected_flags', None)
256 if expected_flags is not None:
257 expected_flags = krb5_asn1.TicketFlags(expected_flags)
259 unexpected_flags = kdc_dict.pop('unexpected_flags', None)
260 if unexpected_flags is not None:
261 unexpected_flags = krb5_asn1.TicketFlags(unexpected_flags)
263 expected_error_mode = kdc_dict.pop('expected_error_mode', 0)
264 expected_status = kdc_dict.pop('expected_status', None)
265 if expected_error_mode:
266 check_error_fn = self.generic_check_kdc_error
269 check_error_fn = None
270 check_rep_fn = self.generic_check_kdc_rep
272 self.assertIsNone(expected_status)
274 kdc_options = kdc_dict.pop('kdc_options', '0')
275 kdc_options = krb5_asn1.KDCOptions(kdc_options)
277 service_decryption_key = self.TicketDecryptionKey_from_creds(
280 authenticator_subkey = self.RandomKey(Enctype.AES256)
282 etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
285 expect_edata = kdc_dict.pop('expect_edata', None)
287 def generate_s4u2self_padata(_kdc_exchange_dict,
290 pa_s4u = self.PA_S4U2Self_create(
293 tgt_session_key=service_tgt.session_key,
296 return [pa_s4u], req_body
298 kdc_exchange_dict = self.tgs_exchange_dict(
299 expected_crealm=realm,
300 expected_cname=client_cname,
301 expected_srealm=realm,
302 expected_sname=service_sname,
303 expected_account_name=client_name,
305 expected_flags=expected_flags,
306 unexpected_flags=unexpected_flags,
307 ticket_decryption_key=service_decryption_key,
308 expect_ticket_checksum=True,
309 generate_padata_fn=generate_s4u2self_padata,
310 check_error_fn=check_error_fn,
311 check_rep_fn=check_rep_fn,
312 check_kdc_private_fn=self.generic_check_kdc_private,
313 expected_error_mode=expected_error_mode,
314 expected_status=expected_status,
316 authenticator_subkey=authenticator_subkey,
317 kdc_options=str(kdc_options),
319 expect_edata=expect_edata)
321 self._generic_kdc_exchange(kdc_exchange_dict,
327 if not expected_error_mode:
328 # Check that the ticket contains a PAC.
329 ticket = kdc_exchange_dict['rep_ticket_creds']
331 pac = self.get_ticket_pac(ticket)
332 self.assertIsNotNone(pac)
334 # Ensure we used all the parameters given to us.
335 self.assertEqual({}, kdc_dict)
337 # Test performing an S4U2Self operation with a forwardable ticket. The
338 # resulting ticket should have the 'forwardable' flag set.
339 def test_s4u2self_forwardable(self):
340 self._run_s4u2self_test(
343 'not_delegated': False
345 'kdc_options': 'forwardable',
346 'modify_service_tgt_fn': functools.partial(
347 self.set_ticket_forwardable, flag=True),
348 'expected_flags': 'forwardable'
351 # Test performing an S4U2Self operation with a forwardable ticket that does
352 # not contain a PAC. The request should fail.
353 def test_s4u2self_no_pac(self):
354 def forwardable_no_pac(ticket):
355 ticket = self.set_ticket_forwardable(ticket, flag=True)
356 return self.remove_ticket_pac(ticket)
358 self._run_s4u2self_test(
360 'expected_error_mode': KDC_ERR_TGT_REVOKED,
362 'not_delegated': False
364 'kdc_options': 'forwardable',
365 'modify_service_tgt_fn': forwardable_no_pac,
366 'expected_flags': 'forwardable',
367 'expect_edata': False
370 # Test performing an S4U2Self operation without requesting a forwardable
371 # ticket. The resulting ticket should not have the 'forwardable' flag set.
372 def test_s4u2self_without_forwardable(self):
373 self._run_s4u2self_test(
376 'not_delegated': False
378 'modify_service_tgt_fn': functools.partial(
379 self.set_ticket_forwardable, flag=True),
380 'unexpected_flags': 'forwardable'
383 # Do an S4U2Self with a non-forwardable TGT. The 'forwardable' flag should
384 # not be set on the ticket.
385 def test_s4u2self_not_forwardable(self):
386 self._run_s4u2self_test(
389 'not_delegated': False
391 'kdc_options': 'forwardable',
392 'modify_service_tgt_fn': functools.partial(
393 self.set_ticket_forwardable, flag=False),
394 'unexpected_flags': 'forwardable'
397 # Do an S4U2Self with the not_delegated flag set on the client. The
398 # 'forwardable' flag should not be set on the ticket.
399 def test_s4u2self_client_not_delegated(self):
400 self._run_s4u2self_test(
403 'not_delegated': True
405 'kdc_options': 'forwardable',
406 'modify_service_tgt_fn': functools.partial(
407 self.set_ticket_forwardable, flag=True),
408 'unexpected_flags': 'forwardable'
411 # Do an S4U2Self with a service not trusted to authenticate for delegation,
412 # but having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
413 # flag should be set on the ticket.
414 def test_s4u2self_not_trusted_empty_allowed(self):
415 self._run_s4u2self_test(
418 'not_delegated': False
421 'trusted_to_auth_for_delegation': False,
422 'delegation_to_spn': ()
424 'kdc_options': 'forwardable',
425 'modify_service_tgt_fn': functools.partial(
426 self.set_ticket_forwardable, flag=True),
427 'expected_flags': 'forwardable'
430 # Do an S4U2Self with a service not trusted to authenticate for delegation
431 # and having a non-empty msDS-AllowedToDelegateTo attribute. The
432 # 'forwardable' flag should not be set on the ticket.
433 def test_s4u2self_not_trusted_nonempty_allowed(self):
434 self._run_s4u2self_test(
437 'not_delegated': False
440 'trusted_to_auth_for_delegation': False,
441 'delegation_to_spn': ('test',)
443 'kdc_options': 'forwardable',
444 'modify_service_tgt_fn': functools.partial(
445 self.set_ticket_forwardable, flag=True),
446 'unexpected_flags': 'forwardable'
449 # Do an S4U2Self with a service trusted to authenticate for delegation and
450 # having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
451 # flag should be set on the ticket.
452 def test_s4u2self_trusted_empty_allowed(self):
453 self._run_s4u2self_test(
456 'not_delegated': False
459 'trusted_to_auth_for_delegation': True,
460 'delegation_to_spn': ()
462 'kdc_options': 'forwardable',
463 'modify_service_tgt_fn': functools.partial(
464 self.set_ticket_forwardable, flag=True),
465 'expected_flags': 'forwardable'
468 # Do an S4U2Self with a service trusted to authenticate for delegation and
469 # having a non-empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
470 # flag should be set on the ticket.
471 def test_s4u2self_trusted_nonempty_allowed(self):
472 self._run_s4u2self_test(
475 'not_delegated': False
478 'trusted_to_auth_for_delegation': True,
479 'delegation_to_spn': ('test',)
481 'kdc_options': 'forwardable',
482 'modify_service_tgt_fn': functools.partial(
483 self.set_ticket_forwardable, flag=True),
484 'expected_flags': 'forwardable'
487 # Do an S4U2Self with the sname in the request different to that of the
488 # service. We expect an error.
489 def test_s4u2self_wrong_sname(self):
490 other_creds = self.get_cached_creds(
491 account_type=self.AccountType.COMPUTER,
493 'trusted_to_auth_for_delegation': True,
496 other_sname = other_creds.get_username()[:-1]
498 self._run_s4u2self_test(
500 'expected_error_mode': KDC_ERR_BADMATCH,
501 'expect_edata': False,
503 'not_delegated': False
506 'trusted_to_auth_for_delegation': True
508 'service_name': other_sname,
509 'kdc_options': 'forwardable',
510 'modify_service_tgt_fn': functools.partial(
511 self.set_ticket_forwardable, flag=True)
514 # Do an S4U2Self where the service does not require authorization data. The
515 # resulting ticket should still contain a PAC.
516 def test_s4u2self_no_auth_data_required(self):
517 self._run_s4u2self_test(
520 'not_delegated': False
523 'trusted_to_auth_for_delegation': True,
524 'no_auth_data_required': True
526 'kdc_options': 'forwardable',
527 'modify_service_tgt_fn': functools.partial(
528 self.set_ticket_forwardable, flag=True),
529 'expected_flags': 'forwardable'
532 def _run_delegation_test(self, kdc_dict):
533 client_opts = kdc_dict.pop('client_opts', None)
534 client_creds = self.get_cached_creds(
535 account_type=self.AccountType.USER,
538 samdb = self.get_samdb()
539 client_dn = client_creds.get_dn()
540 sid = self.get_objectSid(samdb, client_dn)
542 service1_opts = kdc_dict.pop('service1_opts', {})
543 service2_opts = kdc_dict.pop('service2_opts', {})
545 allow_delegation = kdc_dict.pop('allow_delegation', False)
546 allow_rbcd = kdc_dict.pop('allow_rbcd', False)
547 self.assertFalse(allow_delegation and allow_rbcd)
550 service1_creds = self.get_cached_creds(
551 account_type=self.AccountType.COMPUTER,
554 self.assertNotIn('delegation_from_dn', service2_opts)
555 service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
557 service2_creds = self.get_cached_creds(
558 account_type=self.AccountType.COMPUTER,
561 service2_creds = self.get_cached_creds(
562 account_type=self.AccountType.COMPUTER,
566 self.assertNotIn('delegation_to_spn', service1_opts)
567 service1_opts['delegation_to_spn'] = service2_creds.get_spn()
569 service1_creds = self.get_cached_creds(
570 account_type=self.AccountType.COMPUTER,
573 client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
574 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
576 client_tgt = self.get_tgt(client_creds,
577 kdc_options=client_tkt_options,
578 expected_flags=expected_flags)
579 client_service_tkt = self.get_service_ticket(
582 kdc_options=client_tkt_options,
583 expected_flags=expected_flags)
585 service1_tgt = self.get_tgt(service1_creds)
587 modify_client_tkt_fn = kdc_dict.pop('modify_client_tkt_fn', None)
588 if modify_client_tkt_fn is not None:
589 client_service_tkt = modify_client_tkt_fn(client_service_tkt)
591 additional_tickets = [client_service_tkt.ticket]
593 modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
594 if modify_service_tgt_fn is not None:
595 service1_tgt = modify_service_tgt_fn(service1_tgt)
597 kdc_options = kdc_dict.pop('kdc_options', None)
598 if kdc_options is None:
599 kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
601 client_username = client_creds.get_username()
602 client_realm = client_creds.get_realm()
603 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
604 names=[client_username])
606 service1_name = service1_creds.get_username()[:-1]
607 service1_realm = service1_creds.get_realm()
609 service2_name = service2_creds.get_username()[:-1]
610 service2_realm = service2_creds.get_realm()
611 service2_service = 'host'
612 service2_sname = self.PrincipalName_create(
613 name_type=NT_PRINCIPAL, names=[service2_service,
615 service2_decryption_key = self.TicketDecryptionKey_from_creds(
617 service2_etypes = service2_creds.tgs_supported_enctypes
619 expected_error_mode = kdc_dict.pop('expected_error_mode')
620 expected_status = kdc_dict.pop('expected_status', None)
621 if expected_error_mode:
622 check_error_fn = self.generic_check_kdc_error
625 check_error_fn = None
626 check_rep_fn = self.generic_check_kdc_rep
628 self.assertIsNone(expected_status)
630 expect_edata = kdc_dict.pop('expect_edata', None)
631 if expect_edata is not None:
632 self.assertTrue(expected_error_mode)
634 pac_options = kdc_dict.pop('pac_options', None)
636 authenticator_subkey = self.RandomKey(Enctype.AES256)
638 etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
641 expected_proxy_target = service2_creds.get_spn()
643 expected_transited_services = kdc_dict.pop(
644 'expected_transited_services', [])
646 transited_service = f'host/{service1_name}@{service1_realm}'
647 expected_transited_services.append(transited_service)
649 expect_pac = kdc_dict.pop('expect_pac', True)
651 kdc_exchange_dict = self.tgs_exchange_dict(
652 expected_crealm=client_realm,
653 expected_cname=client_cname,
654 expected_srealm=service2_realm,
655 expected_sname=service2_sname,
656 expected_account_name=client_username,
658 expected_supported_etypes=service2_etypes,
659 ticket_decryption_key=service2_decryption_key,
660 check_error_fn=check_error_fn,
661 check_rep_fn=check_rep_fn,
662 check_kdc_private_fn=self.generic_check_kdc_private,
663 expected_error_mode=expected_error_mode,
664 expected_status=expected_status,
667 authenticator_subkey=authenticator_subkey,
668 kdc_options=kdc_options,
669 pac_options=pac_options,
670 expect_edata=expect_edata,
671 expected_proxy_target=expected_proxy_target,
672 expected_transited_services=expected_transited_services,
673 expect_pac=expect_pac)
675 self._generic_kdc_exchange(kdc_exchange_dict,
677 realm=service2_realm,
678 sname=service2_sname,
680 additional_tickets=additional_tickets)
682 if not expected_error_mode:
683 # Check whether the ticket contains a PAC.
684 ticket = kdc_exchange_dict['rep_ticket_creds']
685 pac = self.get_ticket_pac(ticket, expect_pac=expect_pac)
687 self.assertIsNotNone(pac)
689 self.assertIsNone(pac)
691 # Ensure we used all the parameters given to us.
692 self.assertEqual({}, kdc_dict)
694 def test_constrained_delegation(self):
695 # Test constrained delegation.
696 self._run_delegation_test(
698 'expected_error_mode': 0,
699 'allow_delegation': True
702 def test_constrained_delegation_no_auth_data_required(self):
703 # Test constrained delegation.
704 self._run_delegation_test(
706 'expected_error_mode': 0,
707 'allow_delegation': True,
709 'no_auth_data_required': True
714 def test_constrained_delegation_existing_delegation_info(self):
715 # Test constrained delegation with an existing S4U_DELEGATION_INFO
716 # structure in the PAC.
718 services = ['service1', 'service2', 'service3']
720 self._run_delegation_test(
722 'expected_error_mode': 0,
723 'allow_delegation': True,
724 'modify_client_tkt_fn': functools.partial(
725 self.add_delegation_info, services=services),
726 'expected_transited_services': services
729 def test_constrained_delegation_not_allowed(self):
730 # Test constrained delegation when the delegating service does not
732 self._run_delegation_test(
734 'expected_error_mode': KDC_ERR_BADOPTION,
735 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
736 'allow_delegation': False
739 def test_constrained_delegation_no_client_pac(self):
740 # Test constrained delegation when the client service ticket does not
742 self._run_delegation_test(
744 'expected_error_mode': (KDC_ERR_MODIFIED,
745 KDC_ERR_TGT_REVOKED),
746 'allow_delegation': True,
747 'modify_client_tkt_fn': self.remove_ticket_pac,
748 'expect_edata': False
751 def test_constrained_delegation_no_service_pac(self):
752 # Test constrained delegation when the service TGT does not contain a
754 self._run_delegation_test(
756 'expected_error_mode': KDC_ERR_TGT_REVOKED,
757 'allow_delegation': True,
758 'modify_service_tgt_fn': self.remove_ticket_pac,
759 'expect_edata': False
762 def test_constrained_delegation_no_client_pac_no_auth_data_required(self):
763 # Test constrained delegation when the client service ticket does not
765 self._run_delegation_test(
767 'expected_error_mode': (KDC_ERR_MODIFIED,
769 'allow_delegation': True,
770 'modify_client_tkt_fn': self.remove_ticket_pac,
771 'expect_edata': False,
773 'no_auth_data_required': True
777 def test_constrained_delegation_no_service_pac_no_auth_data_required(self):
778 # Test constrained delegation when the service TGT does not contain a
780 self._run_delegation_test(
782 'expected_error_mode': KDC_ERR_TGT_REVOKED,
783 'allow_delegation': True,
784 'modify_service_tgt_fn': self.remove_ticket_pac,
786 'no_auth_data_required': True
789 'expect_edata': False
792 def test_constrained_delegation_non_forwardable(self):
793 # Test constrained delegation with a non-forwardable ticket.
794 self._run_delegation_test(
796 'expected_error_mode': KDC_ERR_BADOPTION,
797 'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
798 'allow_delegation': True,
799 'modify_client_tkt_fn': functools.partial(
800 self.set_ticket_forwardable, flag=False)
803 def test_constrained_delegation_pac_options_rbcd(self):
804 # Test constrained delegation, but with the RBCD bit set in the PAC
806 self._run_delegation_test(
808 'expected_error_mode': 0,
809 'pac_options': '0001', # supports RBCD
810 'allow_delegation': True
813 def test_rbcd_no_auth_data_required(self):
814 self._run_delegation_test(
816 'expected_error_mode': 0,
818 'pac_options': '0001', # supports RBCD
820 'no_auth_data_required': True
825 def test_rbcd_existing_delegation_info(self):
826 # Test constrained delegation with an existing S4U_DELEGATION_INFO
827 # structure in the PAC.
829 services = ['service1', 'service2', 'service3']
831 self._run_delegation_test(
833 'expected_error_mode': 0,
835 'pac_options': '0001', # supports RBCD
836 'modify_client_tkt_fn': functools.partial(
837 self.add_delegation_info, services=services),
838 'expected_transited_services': services
841 def test_rbcd_not_allowed(self):
842 # Test resource-based constrained delegation when the target service
844 self._run_delegation_test(
846 'expected_error_mode': KDC_ERR_BADOPTION,
847 'expected_status': ntstatus.NT_STATUS_NOT_FOUND,
849 'pac_options': '0001' # supports RBCD
852 def test_rbcd_no_client_pac_a(self):
853 # Test constrained delegation when the client service ticket does not
854 # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
855 self._run_delegation_test(
857 'expected_error_mode': KDC_ERR_MODIFIED,
858 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
860 'pac_options': '0001', # supports RBCD
861 'modify_client_tkt_fn': self.remove_ticket_pac
864 def test_rbcd_no_client_pac_b(self):
865 # Test constrained delegation when the client service ticket does not
866 # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
867 self._run_delegation_test(
869 'expected_error_mode': KDC_ERR_MODIFIED,
870 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
872 'pac_options': '0001', # supports RBCD
873 'modify_client_tkt_fn': self.remove_ticket_pac,
875 'delegation_to_spn': ('host/test')
879 def test_rbcd_no_service_pac(self):
880 # Test constrained delegation when the service TGT does not contain a
882 self._run_delegation_test(
884 'expected_error_mode': KDC_ERR_TGT_REVOKED,
886 'pac_options': '0001', # supports RBCD
887 'modify_service_tgt_fn': self.remove_ticket_pac,
888 'expect_edata': False
891 def test_rbcd_no_client_pac_no_auth_data_required_a(self):
892 # Test constrained delegation when the client service ticket does not
893 # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
894 self._run_delegation_test(
896 'expected_error_mode': KDC_ERR_MODIFIED,
897 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
899 'pac_options': '0001', # supports RBCD
900 'modify_client_tkt_fn': self.remove_ticket_pac,
902 'no_auth_data_required': True
906 def test_rbcd_no_client_pac_no_auth_data_required_b(self):
907 # Test constrained delegation when the client service ticket does not
908 # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
909 self._run_delegation_test(
911 'expected_error_mode': KDC_ERR_MODIFIED,
912 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
914 'pac_options': '0001', # supports RBCD
915 'modify_client_tkt_fn': self.remove_ticket_pac,
917 'delegation_to_spn': ('host/test')
920 'no_auth_data_required': True
924 def test_rbcd_no_service_pac_no_auth_data_required(self):
925 # Test constrained delegation when the service TGT does not contain a
927 self._run_delegation_test(
929 'expected_error_mode': KDC_ERR_TGT_REVOKED,
931 'pac_options': '0001', # supports RBCD
932 'modify_service_tgt_fn': self.remove_ticket_pac,
934 'no_auth_data_required': True
936 'expect_edata': False
939 def test_rbcd_non_forwardable(self):
940 # Test resource-based constrained delegation with a non-forwardable
942 self._run_delegation_test(
944 'expected_error_mode': KDC_ERR_BADOPTION,
945 'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
947 'pac_options': '0001', # supports RBCD
948 'modify_client_tkt_fn': functools.partial(
949 self.set_ticket_forwardable, flag=False)
952 def test_rbcd_no_pac_options_a(self):
953 # Test resource-based constrained delegation without the RBCD bit set
954 # in the PAC options, and an empty msDS-AllowedToDelegateTo attribute.
955 self._run_delegation_test(
957 'expected_error_mode': KDC_ERR_BADOPTION,
958 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
960 'pac_options': '1' # does not support RBCD
963 def test_rbcd_no_pac_options_b(self):
964 # Test resource-based constrained delegation without the RBCD bit set
965 # in the PAC options, and a non-empty msDS-AllowedToDelegateTo
967 self._run_delegation_test(
969 'expected_error_mode': KDC_ERR_BADOPTION,
970 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
972 'pac_options': '1', # does not support RBCD
974 'delegation_to_spn': ('host/test')
978 def test_bronze_bit_constrained_delegation_old_checksum(self):
979 # Attempt to modify the ticket without updating the PAC checksums.
980 self._run_delegation_test(
982 'expected_error_mode': (KDC_ERR_MODIFIED,
983 KDC_ERR_BAD_INTEGRITY),
984 'allow_delegation': True,
985 'client_tkt_options': '0', # non-forwardable ticket
986 'modify_client_tkt_fn': functools.partial(
987 self.set_ticket_forwardable,
988 flag=True, update_pac_checksums=False),
989 'expect_edata': False
992 def test_bronze_bit_rbcd_old_checksum(self):
993 # Attempt to modify the ticket without updating the PAC checksums.
994 self._run_delegation_test(
996 'expected_error_mode': KDC_ERR_MODIFIED,
997 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
999 'pac_options': '0001', # supports RBCD
1000 'client_tkt_options': '0', # non-forwardable ticket
1001 'modify_client_tkt_fn': functools.partial(
1002 self.set_ticket_forwardable,
1003 flag=True, update_pac_checksums=False)
1006 def test_constrained_delegation_missing_client_checksum(self):
1007 # Present a user ticket without the required checksums.
1008 for checksum in self.pac_checksum_types:
1009 with self.subTest(checksum=checksum):
1010 if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
1011 expected_error_mode = (KDC_ERR_MODIFIED,
1014 expected_error_mode = KDC_ERR_GENERIC
1016 self._run_delegation_test(
1018 'expected_error_mode': expected_error_mode,
1019 'allow_delegation': True,
1020 'modify_client_tkt_fn': functools.partial(
1021 self.remove_pac_checksum, checksum=checksum),
1022 'expect_edata': False
1025 def test_constrained_delegation_missing_service_checksum(self):
1026 # Present the service's ticket without the required checksums.
1027 for checksum in filter(lambda x: x != krb5pac.PAC_TYPE_TICKET_CHECKSUM,
1028 self.pac_checksum_types):
1029 with self.subTest(checksum=checksum):
1030 self._run_delegation_test(
1032 'expected_error_mode': KDC_ERR_GENERIC,
1034 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
1035 'allow_delegation': True,
1036 'modify_service_tgt_fn': functools.partial(
1037 self.remove_pac_checksum, checksum=checksum)
1040 def test_rbcd_missing_client_checksum(self):
1041 # Present a user ticket without the required checksums.
1042 for checksum in self.pac_checksum_types:
1043 with self.subTest(checksum=checksum):
1044 if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
1045 expected_error_mode = KDC_ERR_MODIFIED
1047 expected_error_mode = KDC_ERR_GENERIC
1049 self._run_delegation_test(
1051 'expected_error_mode': expected_error_mode,
1053 ntstatus.NT_STATUS_NOT_SUPPORTED,
1055 'pac_options': '0001', # supports RBCD
1056 'modify_client_tkt_fn': functools.partial(
1057 self.remove_pac_checksum, checksum=checksum)
1060 def test_rbcd_missing_service_checksum(self):
1061 # Present the service's ticket without the required checksums.
1062 for checksum in filter(lambda x: x != krb5pac.PAC_TYPE_TICKET_CHECKSUM,
1063 self.pac_checksum_types):
1064 with self.subTest(checksum=checksum):
1065 self._run_delegation_test(
1067 'expected_error_mode': KDC_ERR_GENERIC,
1069 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
1071 'pac_options': '0001', # supports RBCD
1072 'modify_service_tgt_fn': functools.partial(
1073 self.remove_pac_checksum, checksum=checksum)
1076 def test_constrained_delegation_zeroed_client_checksum(self):
1077 # Present a user ticket with invalid checksums.
1078 for checksum in self.pac_checksum_types:
1079 with self.subTest(checksum=checksum):
1080 self._run_delegation_test(
1082 'expected_error_mode': (KDC_ERR_MODIFIED,
1083 KDC_ERR_BAD_INTEGRITY),
1084 'allow_delegation': True,
1085 'modify_client_tkt_fn': functools.partial(
1086 self.zeroed_pac_checksum, checksum=checksum),
1087 'expect_edata': False
1090 def test_constrained_delegation_zeroed_service_checksum(self):
1091 # Present the service's ticket with invalid checksums.
1092 for checksum in self.pac_checksum_types:
1093 with self.subTest(checksum=checksum):
1094 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1095 expected_error_mode = (KDC_ERR_MODIFIED,
1096 KDC_ERR_BAD_INTEGRITY)
1097 expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
1099 expected_error_mode = 0
1100 expected_status = None
1102 self._run_delegation_test(
1104 'expected_error_mode': expected_error_mode,
1105 'expected_status': expected_status,
1106 'allow_delegation': True,
1107 'modify_service_tgt_fn': functools.partial(
1108 self.zeroed_pac_checksum, checksum=checksum)
1111 def test_rbcd_zeroed_client_checksum(self):
1112 # Present a user ticket with invalid checksums.
1113 for checksum in self.pac_checksum_types:
1114 with self.subTest(checksum=checksum):
1115 self._run_delegation_test(
1117 'expected_error_mode': KDC_ERR_MODIFIED,
1119 ntstatus.NT_STATUS_NOT_SUPPORTED,
1121 'pac_options': '0001', # supports RBCD
1122 'modify_client_tkt_fn': functools.partial(
1123 self.zeroed_pac_checksum, checksum=checksum)
1126 def test_rbcd_zeroed_service_checksum(self):
1127 # Present the service's ticket with invalid checksums.
1128 for checksum in self.pac_checksum_types:
1129 with self.subTest(checksum=checksum):
1130 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1131 expected_error_mode = KDC_ERR_MODIFIED
1132 expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
1134 expected_error_mode = 0
1135 expected_status = None
1137 self._run_delegation_test(
1139 'expected_error_mode': expected_error_mode,
1140 'expected_status': expected_status,
1142 'pac_options': '0001', # supports RBCD
1143 'modify_service_tgt_fn': functools.partial(
1144 self.zeroed_pac_checksum, checksum=checksum)
1147 unkeyed_ctypes = {Cksumtype.MD5, Cksumtype.SHA1, Cksumtype.CRC32}
1149 def test_constrained_delegation_unkeyed_client_checksum(self):
1150 # Present a user ticket with invalid checksums.
1151 for checksum in self.pac_checksum_types:
1152 for ctype in self.unkeyed_ctypes:
1153 with self.subTest(checksum=checksum, ctype=ctype):
1154 if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
1155 and ctype == Cksumtype.SHA1):
1156 expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1157 KDC_ERR_INAPP_CKSUM)
1159 expected_error_mode = (KDC_ERR_GENERIC,
1160 KDC_ERR_INAPP_CKSUM)
1162 self._run_delegation_test(
1164 'expected_error_mode': expected_error_mode,
1165 'allow_delegation': True,
1166 'modify_client_tkt_fn': functools.partial(
1167 self.unkeyed_pac_checksum,
1168 checksum=checksum, ctype=ctype),
1169 'expect_edata': False
1172 def test_constrained_delegation_unkeyed_service_checksum(self):
1173 # Present the service's ticket with invalid checksums.
1174 for checksum in self.pac_checksum_types:
1175 for ctype in self.unkeyed_ctypes:
1176 with self.subTest(checksum=checksum, ctype=ctype):
1177 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1178 if ctype == Cksumtype.SHA1:
1179 expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1180 KDC_ERR_INAPP_CKSUM)
1181 expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
1183 expected_error_mode = (KDC_ERR_GENERIC,
1184 KDC_ERR_INAPP_CKSUM)
1186 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1188 expected_error_mode = 0
1189 expected_status = None
1191 self._run_delegation_test(
1193 'expected_error_mode': expected_error_mode,
1194 'expected_status': expected_status,
1195 'allow_delegation': True,
1196 'modify_service_tgt_fn': functools.partial(
1197 self.unkeyed_pac_checksum,
1198 checksum=checksum, ctype=ctype)
1201 def test_rbcd_unkeyed_client_checksum(self):
1202 # Present a user ticket with invalid checksums.
1203 for checksum in self.pac_checksum_types:
1204 for ctype in self.unkeyed_ctypes:
1205 with self.subTest(checksum=checksum, ctype=ctype):
1206 if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
1207 and ctype == Cksumtype.SHA1):
1208 expected_error_mode = KDC_ERR_SUMTYPE_NOSUPP
1210 expected_error_mode = KDC_ERR_GENERIC
1212 self._run_delegation_test(
1214 'expected_error_mode': expected_error_mode,
1216 ntstatus.NT_STATUS_NOT_SUPPORTED,
1218 'pac_options': '0001', # supports RBCD
1219 'modify_client_tkt_fn': functools.partial(
1220 self.unkeyed_pac_checksum,
1221 checksum=checksum, ctype=ctype)
1224 def test_rbcd_unkeyed_service_checksum(self):
1225 # Present the service's ticket with invalid checksums.
1226 for checksum in self.pac_checksum_types:
1227 for ctype in self.unkeyed_ctypes:
1228 with self.subTest(checksum=checksum, ctype=ctype):
1229 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1230 if ctype == Cksumtype.SHA1:
1231 expected_error_mode = KDC_ERR_SUMTYPE_NOSUPP
1232 expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
1234 expected_error_mode = KDC_ERR_GENERIC
1236 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1238 expected_error_mode = 0
1239 expected_status = None
1241 self._run_delegation_test(
1243 'expected_error_mode': expected_error_mode,
1244 'expected_status': expected_status,
1246 'pac_options': '0001', # supports RBCD
1247 'modify_service_tgt_fn': functools.partial(
1248 self.unkeyed_pac_checksum,
1249 checksum=checksum, ctype=ctype)
1252 def remove_pac_checksum(self, ticket, checksum):
1253 checksum_keys = self.get_krbtgt_checksum_key()
1255 return self.modified_ticket(ticket,
1256 checksum_keys=checksum_keys,
1257 include_checksums={checksum: False})
1259 def zeroed_pac_checksum(self, ticket, checksum):
1260 krbtgt_creds = self.get_krbtgt_creds()
1261 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1263 server_key = ticket.decryption_key
1266 krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1267 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
1268 krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
1271 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1272 zeroed_key = server_key
1274 zeroed_key = krbtgt_key
1276 checksum_keys[checksum] = ZeroedChecksumKey(zeroed_key.key,
1279 return self.modified_ticket(ticket,
1280 checksum_keys=checksum_keys,
1281 include_checksums={checksum: True})
1283 def unkeyed_pac_checksum(self, ticket, checksum, ctype):
1284 krbtgt_creds = self.get_krbtgt_creds()
1285 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1287 server_key = ticket.decryption_key
1290 krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1291 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
1292 krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
1295 # Make a copy of the existing key and change the ctype.
1296 key = checksum_keys[checksum]
1297 new_key = RodcPacEncryptionKey(key.key, key.kvno)
1298 new_key.ctype = ctype
1299 checksum_keys[checksum] = new_key
1301 return self.modified_ticket(ticket,
1302 checksum_keys=checksum_keys,
1303 include_checksums={checksum: True})
1305 def add_delegation_info(self, ticket, services=None):
1306 def modify_pac_fn(pac):
1307 pac_buffers = pac.buffers
1308 self.assertNotIn(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION,
1309 (buffer.type for buffer in pac_buffers))
1311 transited_services = list(map(lsa.String, services))
1313 delegation = krb5pac.PAC_CONSTRAINED_DELEGATION()
1314 delegation.proxy_target = lsa.String('test_proxy_target')
1315 delegation.transited_services = transited_services
1316 delegation.num_transited_services = len(transited_services)
1318 info = krb5pac.PAC_CONSTRAINED_DELEGATION_CTR()
1319 info.info = delegation
1321 pac_buffer = krb5pac.PAC_BUFFER()
1322 pac_buffer.type = krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION
1323 pac_buffer.info = info
1325 pac_buffers.append(pac_buffer)
1327 pac.buffers = pac_buffers
1328 pac.num_buffers += 1
1332 checksum_keys = self.get_krbtgt_checksum_key()
1334 return self.modified_ticket(ticket,
1335 checksum_keys=checksum_keys,
1336 modify_pac_fn=modify_pac_fn)
1338 def set_ticket_forwardable(self, ticket, flag, update_pac_checksums=True):
1339 flag = '1' if flag else '0'
1341 def modify_fn(enc_part):
1342 # Reset the forwardable flag
1343 forwardable_pos = (len(tuple(krb5_asn1.TicketFlags('forwardable')))
1346 flags = enc_part['flags']
1347 self.assertLessEqual(forwardable_pos, len(flags))
1348 enc_part['flags'] = (flags[:forwardable_pos] +
1350 flags[forwardable_pos+1:])
1354 if update_pac_checksums:
1355 checksum_keys = self.get_krbtgt_checksum_key()
1357 checksum_keys = None
1359 return self.modified_ticket(ticket,
1360 modify_fn=modify_fn,
1361 checksum_keys=checksum_keys,
1362 update_pac_checksums=update_pac_checksums)
1364 def remove_ticket_pac(self, ticket):
1365 return self.modified_ticket(ticket,
1369 if __name__ == "__main__":
1370 global_asn1_print = False
1371 global_hexdump = False