auth: Shorten long SID flags combinations
[samba.git] / python / samba / tests / krb5 / s4u_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import os
21
22 sys.path.insert(0, "bin/python")
23 os.environ["PYTHONUNBUFFERED"] = "1"
24
25 import functools
26
27 from samba import dsdb, ntstatus
28 from samba.dcerpc import krb5pac, lsa, security
29
30 from samba.tests import env_get_var_value
31 from samba.tests.krb5.kcrypto import Cksumtype, Enctype
32 from samba.tests.krb5.kdc_base_test import KDCBaseTest
33 from samba.tests.krb5.raw_testcase import (
34     RawKerberosTest,
35     RodcPacEncryptionKey,
36     ZeroedChecksumKey
37 )
38 from samba.tests.krb5.rfc4120_constants import (
39     AES256_CTS_HMAC_SHA1_96,
40     ARCFOUR_HMAC_MD5,
41     KDC_ERR_BADMATCH,
42     KDC_ERR_BADOPTION,
43     KDC_ERR_BAD_INTEGRITY,
44     KDC_ERR_GENERIC,
45     KDC_ERR_INAPP_CKSUM,
46     KDC_ERR_MODIFIED,
47     KDC_ERR_SUMTYPE_NOSUPP,
48     KDC_ERR_TGT_REVOKED,
49     KU_PA_ENC_TIMESTAMP,
50     KU_AS_REP_ENC_PART,
51     KU_TGS_REP_ENC_PART_SUB_KEY,
52     NT_PRINCIPAL
53 )
54 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
55
56 SidType = RawKerberosTest.SidType
57
58 global_asn1_print = False
59 global_hexdump = False
60
61
62 class S4UKerberosTests(KDCBaseTest):
63
64     default_attrs = security.SE_GROUP_DEFAULT_FLAGS
65
66     def setUp(self):
67         super(S4UKerberosTests, self).setUp()
68         self.do_asn1_print = global_asn1_print
69         self.do_hexdump = global_hexdump
70
71     def _test_s4u2self(self, pa_s4u2self_ctype=None):
72         service_creds = self.get_service_creds()
73         service = service_creds.get_username()
74         realm = service_creds.get_realm()
75
76         cname = self.PrincipalName_create(name_type=1, names=[service])
77         sname = self.PrincipalName_create(name_type=2, names=["krbtgt", realm])
78
79         till = self.get_KerberosTime(offset=36000)
80
81         kdc_options = krb5_asn1.KDCOptions('forwardable')
82         padata = None
83
84         etypes = (18, 17, 23)
85
86         req = self.AS_REQ_create(padata=padata,
87                                  kdc_options=str(kdc_options),
88                                  cname=cname,
89                                  realm=realm,
90                                  sname=sname,
91                                  from_time=None,
92                                  till_time=till,
93                                  renew_time=None,
94                                  nonce=0x7fffffff,
95                                  etypes=etypes,
96                                  addresses=None,
97                                  additional_tickets=None)
98         rep = self.send_recv_transaction(req)
99         self.assertIsNotNone(rep)
100
101         self.assertEqual(rep['msg-type'], 30)
102         self.assertEqual(rep['error-code'], 25)
103         rep_padata = self.der_decode(
104             rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
105
106         for pa in rep_padata:
107             if pa['padata-type'] == 19:
108                 etype_info2 = pa['padata-value']
109                 break
110
111         etype_info2 = self.der_decode(
112             etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
113
114         key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0])
115
116         (patime, pausec) = self.get_KerberosTimeWithUsec()
117         pa_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
118         pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
119
120         pa_ts = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, pa_ts)
121         pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.EncryptedData())
122
123         pa_ts = self.PA_DATA_create(2, pa_ts)
124
125         kdc_options = krb5_asn1.KDCOptions('forwardable')
126         padata = [pa_ts]
127
128         req = self.AS_REQ_create(padata=padata,
129                                  kdc_options=str(kdc_options),
130                                  cname=cname,
131                                  realm=realm,
132                                  sname=sname,
133                                  from_time=None,
134                                  till_time=till,
135                                  renew_time=None,
136                                  nonce=0x7fffffff,
137                                  etypes=etypes,
138                                  addresses=None,
139                                  additional_tickets=None)
140         rep = self.send_recv_transaction(req)
141         self.assertIsNotNone(rep)
142
143         msg_type = rep['msg-type']
144         self.assertEqual(msg_type, 11)
145
146         enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
147         # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
148         # application tag 26
149         try:
150             enc_part2 = self.der_decode(
151                 enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
152         except Exception:
153             enc_part2 = self.der_decode(
154                 enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
155
156         # S4U2Self Request
157         sname = cname
158
159         for_user_name = env_get_var_value('FOR_USER')
160         uname = self.PrincipalName_create(name_type=1, names=[for_user_name])
161
162         kdc_options = krb5_asn1.KDCOptions('forwardable')
163         till = self.get_KerberosTime(offset=36000)
164         ticket = rep['ticket']
165         ticket_session_key = self.EncryptionKey_import(enc_part2['key'])
166         pa_s4u = self.PA_S4U2Self_create(name=uname, realm=realm,
167                                          tgt_session_key=ticket_session_key,
168                                          ctype=pa_s4u2self_ctype)
169         padata = [pa_s4u]
170
171         subkey = self.RandomKey(ticket_session_key.etype)
172
173         (ctime, cusec) = self.get_KerberosTimeWithUsec()
174
175         req = self.TGS_REQ_create(padata=padata,
176                                   cusec=cusec,
177                                   ctime=ctime,
178                                   ticket=ticket,
179                                   kdc_options=str(kdc_options),
180                                   cname=cname,
181                                   realm=realm,
182                                   sname=sname,
183                                   from_time=None,
184                                   till_time=till,
185                                   renew_time=None,
186                                   nonce=0x7ffffffe,
187                                   etypes=etypes,
188                                   addresses=None,
189                                   EncAuthorizationData=None,
190                                   EncAuthorizationData_key=None,
191                                   additional_tickets=None,
192                                   ticket_session_key=ticket_session_key,
193                                   authenticator_subkey=subkey)
194         rep = self.send_recv_transaction(req)
195         self.assertIsNotNone(rep)
196
197         msg_type = rep['msg-type']
198         if msg_type == 13:
199             enc_part2 = subkey.decrypt(
200                 KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
201             enc_part2 = self.der_decode(
202                 enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
203
204         return msg_type
205
206     # Using the checksum type from the tgt_session_key happens to work
207     # everywhere
208     def test_s4u2self(self):
209         msg_type = self._test_s4u2self()
210         self.assertEqual(msg_type, 13)
211
212     # Per spec, the checksum of PA-FOR-USER is HMAC_MD5, see [MS-SFU] 2.2.1
213     def test_s4u2self_hmac_md5_checksum(self):
214         msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.HMAC_MD5)
215         self.assertEqual(msg_type, 13)
216
217     def test_s4u2self_md5_unkeyed_checksum(self):
218         msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.MD5)
219         self.assertEqual(msg_type, 30)
220
221     def test_s4u2self_sha1_unkeyed_checksum(self):
222         msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.SHA1)
223         self.assertEqual(msg_type, 30)
224
225     def test_s4u2self_crc32_unkeyed_checksum(self):
226         msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32)
227         self.assertEqual(msg_type, 30)
228
229     def _run_s4u2self_test(self, kdc_dict):
230         client_opts = kdc_dict.pop('client_opts', None)
231         client_creds = self.get_cached_creds(
232             account_type=self.AccountType.USER,
233             opts=client_opts)
234
235         service_opts = kdc_dict.pop('service_opts', None)
236         service_creds = self.get_cached_creds(
237             account_type=self.AccountType.COMPUTER,
238             opts=service_opts)
239
240         service_tgt = self.get_tgt(service_creds)
241         modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
242         if modify_service_tgt_fn is not None:
243             service_tgt = modify_service_tgt_fn(service_tgt)
244
245         client_name = client_creds.get_username()
246         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
247                                                  names=[client_name])
248
249         samdb = self.get_samdb()
250         client_dn = client_creds.get_dn()
251         sid = self.get_objectSid(samdb, client_dn)
252
253         service_name = kdc_dict.pop('service_name', None)
254         if service_name is None:
255             service_name = service_creds.get_username()[:-1]
256         service_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
257                                                   names=['host', service_name])
258
259         realm = client_creds.get_realm()
260
261         expected_flags = kdc_dict.pop('expected_flags', None)
262         if expected_flags is not None:
263             expected_flags = krb5_asn1.TicketFlags(expected_flags)
264
265         unexpected_flags = kdc_dict.pop('unexpected_flags', None)
266         if unexpected_flags is not None:
267             unexpected_flags = krb5_asn1.TicketFlags(unexpected_flags)
268
269         expected_error_mode = kdc_dict.pop('expected_error_mode', 0)
270         expected_status = kdc_dict.pop('expected_status', None)
271         if expected_error_mode:
272             check_error_fn = self.generic_check_kdc_error
273             check_rep_fn = None
274         else:
275             check_error_fn = None
276             check_rep_fn = self.generic_check_kdc_rep
277
278             self.assertIsNone(expected_status)
279
280         kdc_options = kdc_dict.pop('kdc_options', '0')
281         kdc_options = krb5_asn1.KDCOptions(kdc_options)
282
283         service_decryption_key = self.TicketDecryptionKey_from_creds(
284             service_creds)
285
286         authenticator_subkey = self.RandomKey(Enctype.AES256)
287
288         etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
289                                          ARCFOUR_HMAC_MD5))
290
291         expect_edata = kdc_dict.pop('expect_edata', None)
292         expected_groups = kdc_dict.pop('expected_groups', None)
293         unexpected_groups = kdc_dict.pop('unexpected_groups', None)
294
295         def generate_s4u2self_padata(_kdc_exchange_dict,
296                                      _callback_dict,
297                                      req_body):
298             pa_s4u = self.PA_S4U2Self_create(
299                 name=client_cname,
300                 realm=realm,
301                 tgt_session_key=service_tgt.session_key,
302                 ctype=None)
303
304             return [pa_s4u], req_body
305
306         kdc_exchange_dict = self.tgs_exchange_dict(
307             expected_crealm=realm,
308             expected_cname=client_cname,
309             expected_srealm=realm,
310             expected_sname=service_sname,
311             expected_account_name=client_name,
312             expected_groups=expected_groups,
313             unexpected_groups=unexpected_groups,
314             expected_sid=sid,
315             expected_flags=expected_flags,
316             unexpected_flags=unexpected_flags,
317             ticket_decryption_key=service_decryption_key,
318             expect_ticket_checksum=True,
319             generate_padata_fn=generate_s4u2self_padata,
320             check_error_fn=check_error_fn,
321             check_rep_fn=check_rep_fn,
322             check_kdc_private_fn=self.generic_check_kdc_private,
323             expected_error_mode=expected_error_mode,
324             expected_status=expected_status,
325             tgt=service_tgt,
326             authenticator_subkey=authenticator_subkey,
327             kdc_options=str(kdc_options),
328             expect_client_claims=False,
329             expect_edata=expect_edata)
330
331         self._generic_kdc_exchange(kdc_exchange_dict,
332                                    cname=None,
333                                    realm=realm,
334                                    sname=service_sname,
335                                    etypes=etypes)
336
337         if not expected_error_mode:
338             # Check that the ticket contains a PAC.
339             ticket = kdc_exchange_dict['rep_ticket_creds']
340
341             pac = self.get_ticket_pac(ticket)
342             self.assertIsNotNone(pac)
343
344         # Ensure we used all the parameters given to us.
345         self.assertEqual({}, kdc_dict)
346
347     # Test performing an S4U2Self operation with a forwardable ticket. The
348     # resulting ticket should have the 'forwardable' flag set.
349     def test_s4u2self_forwardable(self):
350         self._run_s4u2self_test(
351             {
352                 'client_opts': {
353                     'not_delegated': False
354                 },
355                 'kdc_options': 'forwardable',
356                 'modify_service_tgt_fn': functools.partial(
357                     self.set_ticket_forwardable, flag=True),
358                 'expected_flags': 'forwardable'
359             })
360
361     # Test performing an S4U2Self operation with a forwardable ticket that does
362     # not contain a PAC. The request should fail.
363     def test_s4u2self_no_pac(self):
364         def forwardable_no_pac(ticket):
365             ticket = self.set_ticket_forwardable(ticket, flag=True)
366             return self.remove_ticket_pac(ticket)
367
368         self._run_s4u2self_test(
369             {
370                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
371                 'client_opts': {
372                     'not_delegated': False
373                 },
374                 'kdc_options': 'forwardable',
375                 'modify_service_tgt_fn': forwardable_no_pac,
376                 'expected_flags': 'forwardable',
377                 'expect_edata': False
378             })
379
380     # Test performing an S4U2Self operation without requesting a forwardable
381     # ticket. The resulting ticket should not have the 'forwardable' flag set.
382     def test_s4u2self_without_forwardable(self):
383         self._run_s4u2self_test(
384             {
385                 'client_opts': {
386                     'not_delegated': False
387                 },
388                 'modify_service_tgt_fn': functools.partial(
389                     self.set_ticket_forwardable, flag=True),
390                 'unexpected_flags': 'forwardable'
391             })
392
393     # Do an S4U2Self with a non-forwardable TGT. The 'forwardable' flag should
394     # not be set on the ticket.
395     def test_s4u2self_not_forwardable(self):
396         self._run_s4u2self_test(
397             {
398                 'client_opts': {
399                     'not_delegated': False
400                 },
401                 'kdc_options': 'forwardable',
402                 'modify_service_tgt_fn': functools.partial(
403                     self.set_ticket_forwardable, flag=False),
404                 'unexpected_flags': 'forwardable'
405             })
406
407     # Do an S4U2Self with the not_delegated flag set on the client. The
408     # 'forwardable' flag should not be set on the ticket.
409     def test_s4u2self_client_not_delegated(self):
410         self._run_s4u2self_test(
411             {
412                 'client_opts': {
413                     'not_delegated': True
414                 },
415                 'kdc_options': 'forwardable',
416                 'modify_service_tgt_fn': functools.partial(
417                     self.set_ticket_forwardable, flag=True),
418                 'unexpected_flags': 'forwardable'
419             })
420
421     # Do an S4U2Self with a service not trusted to authenticate for delegation,
422     # but having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
423     # flag should be set on the ticket.
424     def test_s4u2self_not_trusted_empty_allowed(self):
425         self._run_s4u2self_test(
426             {
427                 'client_opts': {
428                     'not_delegated': False
429                 },
430                 'service_opts': {
431                     'trusted_to_auth_for_delegation': False,
432                     'delegation_to_spn': ()
433                 },
434                 'kdc_options': 'forwardable',
435                 'modify_service_tgt_fn': functools.partial(
436                     self.set_ticket_forwardable, flag=True),
437                 'expected_flags': 'forwardable'
438             })
439
440     # Do an S4U2Self with a service not trusted to authenticate for delegation
441     # and having a non-empty msDS-AllowedToDelegateTo attribute. The
442     # 'forwardable' flag should not be set on the ticket.
443     def test_s4u2self_not_trusted_nonempty_allowed(self):
444         self._run_s4u2self_test(
445             {
446                 'client_opts': {
447                     'not_delegated': False
448                 },
449                 'service_opts': {
450                     'trusted_to_auth_for_delegation': False,
451                     'delegation_to_spn': ('test',)
452                 },
453                 'kdc_options': 'forwardable',
454                 'modify_service_tgt_fn': functools.partial(
455                     self.set_ticket_forwardable, flag=True),
456                 'unexpected_flags': 'forwardable'
457             })
458
459     # Do an S4U2Self with a service trusted to authenticate for delegation and
460     # having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
461     # flag should be set on the ticket.
462     def test_s4u2self_trusted_empty_allowed(self):
463         self._run_s4u2self_test(
464             {
465                 'client_opts': {
466                     'not_delegated': False
467                 },
468                 'service_opts': {
469                     'trusted_to_auth_for_delegation': True,
470                     'delegation_to_spn': ()
471                 },
472                 'kdc_options': 'forwardable',
473                 'modify_service_tgt_fn': functools.partial(
474                     self.set_ticket_forwardable, flag=True),
475                 'expected_flags': 'forwardable'
476             })
477
478     # Do an S4U2Self with a service trusted to authenticate for delegation and
479     # having a non-empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
480     # flag should be set on the ticket.
481     def test_s4u2self_trusted_nonempty_allowed(self):
482         self._run_s4u2self_test(
483             {
484                 'client_opts': {
485                     'not_delegated': False
486                 },
487                 'service_opts': {
488                     'trusted_to_auth_for_delegation': True,
489                     'delegation_to_spn': ('test',)
490                 },
491                 'kdc_options': 'forwardable',
492                 'modify_service_tgt_fn': functools.partial(
493                     self.set_ticket_forwardable, flag=True),
494                 'expected_flags': 'forwardable'
495             })
496
497     # Do an S4U2Self with the sname in the request different to that of the
498     # service. We expect an error.
499     def test_s4u2self_wrong_sname(self):
500         other_creds = self.get_cached_creds(
501             account_type=self.AccountType.COMPUTER,
502             opts={
503                 'trusted_to_auth_for_delegation': True,
504                 'id': 0
505             })
506         other_sname = other_creds.get_username()[:-1]
507
508         self._run_s4u2self_test(
509             {
510                 'expected_error_mode': KDC_ERR_BADMATCH,
511                 'expect_edata': False,
512                 'client_opts': {
513                     'not_delegated': False
514                 },
515                 'service_opts': {
516                     'trusted_to_auth_for_delegation': True
517                 },
518                 'service_name': other_sname,
519                 'kdc_options': 'forwardable',
520                 'modify_service_tgt_fn': functools.partial(
521                     self.set_ticket_forwardable, flag=True)
522             })
523
524     # Do an S4U2Self where the service does not require authorization data. The
525     # resulting ticket should still contain a PAC.
526     def test_s4u2self_no_auth_data_required(self):
527         self._run_s4u2self_test(
528             {
529                 'client_opts': {
530                     'not_delegated': False
531                 },
532                 'service_opts': {
533                     'trusted_to_auth_for_delegation': True,
534                     'no_auth_data_required': True
535                 },
536                 'kdc_options': 'forwardable',
537                 'modify_service_tgt_fn': functools.partial(
538                     self.set_ticket_forwardable, flag=True),
539                 'expected_flags': 'forwardable'
540             })
541
542     # Do an S4U2Self an check that the service asserted identity is part of
543     # the sids.
544     def test_s4u2self_asserted_identity(self):
545         self._run_s4u2self_test(
546             {
547                 'client_opts': {
548                     'not_delegated': False
549                 },
550                 'expected_groups': {
551                     (security.SID_SERVICE_ASSERTED_IDENTITY,
552                      SidType.EXTRA_SID,
553                      self.default_attrs),
554                     ...
555                 },
556                 'unexpected_groups': {
557                     security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
558                 },
559             })
560
561     def _run_delegation_test(self, kdc_dict):
562         s4u2self = kdc_dict.pop('s4u2self', False)
563
564         client_opts = kdc_dict.pop('client_opts', None)
565         client_creds = self.get_cached_creds(
566             account_type=self.AccountType.USER,
567             opts=client_opts)
568
569         samdb = self.get_samdb()
570         client_dn = client_creds.get_dn()
571         sid = self.get_objectSid(samdb, client_dn)
572
573         service1_opts = kdc_dict.pop('service1_opts', {})
574         service2_opts = kdc_dict.pop('service2_opts', {})
575
576         allow_delegation = kdc_dict.pop('allow_delegation', False)
577         allow_rbcd = kdc_dict.pop('allow_rbcd', False)
578         self.assertFalse(allow_delegation and allow_rbcd)
579
580         if allow_rbcd:
581             service1_creds = self.get_cached_creds(
582                 account_type=self.AccountType.COMPUTER,
583                 opts=service1_opts)
584
585             self.assertNotIn('delegation_from_dn', service2_opts)
586             service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
587
588             service2_creds = self.get_cached_creds(
589                 account_type=self.AccountType.COMPUTER,
590                 opts=service2_opts)
591         else:
592             service2_creds = self.get_cached_creds(
593                 account_type=self.AccountType.COMPUTER,
594                 opts=service2_opts)
595
596             if allow_delegation:
597                 self.assertNotIn('delegation_to_spn', service1_opts)
598                 service1_opts['delegation_to_spn'] = service2_creds.get_spn()
599
600             service1_creds = self.get_cached_creds(
601                 account_type=self.AccountType.COMPUTER,
602                 opts=service1_opts)
603
604         service1_tgt = self.get_tgt(service1_creds)
605
606         client_username = client_creds.get_username()
607         client_realm = client_creds.get_realm()
608         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
609                                                  names=[client_username])
610
611         service1_name = service1_creds.get_username()[:-1]
612         service1_realm = service1_creds.get_realm()
613         service1_service = 'host'
614         service1_sname = self.PrincipalName_create(
615             name_type=NT_PRINCIPAL, names=[service1_service,
616                                            service1_name])
617         service1_decryption_key = self.TicketDecryptionKey_from_creds(
618             service1_creds)
619
620         expect_pac = kdc_dict.pop('expect_pac', True)
621
622         expected_groups = kdc_dict.pop('expected_groups', None)
623         unexpected_groups = kdc_dict.pop('unexpected_groups', None)
624
625         client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
626         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
627
628         etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
629                                          ARCFOUR_HMAC_MD5))
630
631         if s4u2self:
632             def generate_s4u2self_padata(_kdc_exchange_dict,
633                                          _callback_dict,
634                                          req_body):
635                 pa_s4u = self.PA_S4U2Self_create(
636                     name=client_cname,
637                     realm=client_realm,
638                     tgt_session_key=service1_tgt.session_key,
639                     ctype=None)
640
641                 return [pa_s4u], req_body
642
643             s4u2self_expected_flags = krb5_asn1.TicketFlags('forwardable')
644             s4u2self_unexpected_flags = krb5_asn1.TicketFlags('0')
645
646             s4u2self_kdc_options = krb5_asn1.KDCOptions('forwardable')
647
648             s4u2self_authenticator_subkey = self.RandomKey(Enctype.AES256)
649             s4u2self_kdc_exchange_dict = self.tgs_exchange_dict(
650                 expected_crealm=client_realm,
651                 expected_cname=client_cname,
652                 expected_srealm=service1_realm,
653                 expected_sname=service1_sname,
654                 expected_account_name=client_username,
655                 expected_groups=expected_groups,
656                 unexpected_groups=unexpected_groups,
657                 expected_sid=sid,
658                 expected_flags=s4u2self_expected_flags,
659                 unexpected_flags=s4u2self_unexpected_flags,
660                 ticket_decryption_key=service1_decryption_key,
661                 generate_padata_fn=generate_s4u2self_padata,
662                 check_rep_fn=self.generic_check_kdc_rep,
663                 check_kdc_private_fn=self.generic_check_kdc_private,
664                 tgt=service1_tgt,
665                 authenticator_subkey=s4u2self_authenticator_subkey,
666                 kdc_options=str(s4u2self_kdc_options),
667                 expect_client_claims=False,
668                 expect_edata=False)
669
670             self._generic_kdc_exchange(s4u2self_kdc_exchange_dict,
671                                        cname=None,
672                                        realm=service1_realm,
673                                        sname=service1_sname,
674                                        etypes=etypes)
675
676             client_service_tkt = s4u2self_kdc_exchange_dict['rep_ticket_creds']
677         else:
678             client_tgt = self.get_tgt(client_creds,
679                                       kdc_options=client_tkt_options,
680                                       expected_flags=expected_flags)
681             client_service_tkt = self.get_service_ticket(
682                 client_tgt,
683                 service1_creds,
684                 kdc_options=client_tkt_options,
685                 expected_flags=expected_flags)
686
687         modify_client_tkt_fn = kdc_dict.pop('modify_client_tkt_fn', None)
688         if modify_client_tkt_fn is not None:
689             client_service_tkt = modify_client_tkt_fn(client_service_tkt)
690
691         additional_tickets = [client_service_tkt.ticket]
692
693         modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
694         if modify_service_tgt_fn is not None:
695             service1_tgt = modify_service_tgt_fn(service1_tgt)
696
697         kdc_options = kdc_dict.pop('kdc_options', None)
698         if kdc_options is None:
699             kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
700
701         service2_name = service2_creds.get_username()[:-1]
702         service2_realm = service2_creds.get_realm()
703         service2_service = 'host'
704         service2_sname = self.PrincipalName_create(
705             name_type=NT_PRINCIPAL, names=[service2_service,
706                                            service2_name])
707         service2_decryption_key = self.TicketDecryptionKey_from_creds(
708             service2_creds)
709         service2_etypes = service2_creds.tgs_supported_enctypes
710
711         expected_error_mode = kdc_dict.pop('expected_error_mode')
712         expected_status = kdc_dict.pop('expected_status', None)
713         if expected_error_mode:
714             check_error_fn = self.generic_check_kdc_error
715             check_rep_fn = None
716         else:
717             check_error_fn = None
718             check_rep_fn = self.generic_check_kdc_rep
719
720             self.assertIsNone(expected_status)
721
722         expect_edata = kdc_dict.pop('expect_edata', None)
723         if expect_edata is not None:
724             self.assertTrue(expected_error_mode)
725
726         pac_options = kdc_dict.pop('pac_options', None)
727
728         authenticator_subkey = self.RandomKey(Enctype.AES256)
729
730         expected_proxy_target = service2_creds.get_spn()
731
732         expected_transited_services = kdc_dict.pop(
733             'expected_transited_services', [])
734
735         transited_service = f'host/{service1_name}@{service1_realm}'
736         expected_transited_services.append(transited_service)
737
738         kdc_exchange_dict = self.tgs_exchange_dict(
739             expected_crealm=client_realm,
740             expected_cname=client_cname,
741             expected_srealm=service2_realm,
742             expected_sname=service2_sname,
743             expected_account_name=client_username,
744             expected_groups=expected_groups,
745             unexpected_groups=unexpected_groups,
746             expected_sid=sid,
747             expected_supported_etypes=service2_etypes,
748             ticket_decryption_key=service2_decryption_key,
749             check_error_fn=check_error_fn,
750             check_rep_fn=check_rep_fn,
751             check_kdc_private_fn=self.generic_check_kdc_private,
752             expected_error_mode=expected_error_mode,
753             expected_status=expected_status,
754             callback_dict={},
755             tgt=service1_tgt,
756             authenticator_subkey=authenticator_subkey,
757             kdc_options=kdc_options,
758             pac_options=pac_options,
759             expect_edata=expect_edata,
760             expected_proxy_target=expected_proxy_target,
761             expected_transited_services=expected_transited_services,
762             expect_pac=expect_pac)
763
764         self._generic_kdc_exchange(kdc_exchange_dict,
765                                    cname=None,
766                                    realm=service2_realm,
767                                    sname=service2_sname,
768                                    etypes=etypes,
769                                    additional_tickets=additional_tickets)
770
771         if not expected_error_mode:
772             # Check whether the ticket contains a PAC.
773             ticket = kdc_exchange_dict['rep_ticket_creds']
774             pac = self.get_ticket_pac(ticket, expect_pac=expect_pac)
775             if expect_pac:
776                 self.assertIsNotNone(pac)
777             else:
778                 self.assertIsNone(pac)
779
780         # Ensure we used all the parameters given to us.
781         self.assertEqual({}, kdc_dict)
782
783     def skip_unless_fl2008(self):
784         samdb = self.get_samdb()
785         functional_level = self.get_domain_functional_level(samdb)
786
787         if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
788             self.skipTest('RBCD requires FL2008')
789
790     def test_constrained_delegation(self):
791         # Test constrained delegation.
792         self._run_delegation_test(
793             {
794                 'expected_error_mode': 0,
795                 'allow_delegation': True
796             })
797
798     def test_constrained_delegation_authentication_asserted_identity(self):
799         # Test constrained delegation and check asserted identity is the
800         # authenticaten authority. Note that we should always find this
801         # SID for all the requests. Just S4U2Self will have a different SID.
802         self._run_delegation_test(
803             {
804                 'expected_error_mode': 0,
805                 'allow_delegation': True,
806                 'expected_groups': {
807                     (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
808                      SidType.EXTRA_SID,
809                      self.default_attrs),
810                     ...
811                 },
812                 'unexpected_groups': {
813                     security.SID_SERVICE_ASSERTED_IDENTITY,
814                 },
815             })
816
817     def test_constrained_delegation_service_asserted_identity(self):
818         # Test constrained delegation and check asserted identity is the
819         # service sid is there. This is a S4U2Proxy + S4U2Self test.
820         self._run_delegation_test(
821             {
822                 'expected_error_mode': 0,
823                 'allow_delegation': True,
824                 's4u2self': True,
825                 'service1_opts': {
826                     'trusted_to_auth_for_delegation': True,
827                 },
828                 'expected_groups': {
829                     (security.SID_SERVICE_ASSERTED_IDENTITY,
830                      SidType.EXTRA_SID,
831                      self.default_attrs),
832                     ...
833                 },
834                 'unexpected_groups': {
835                     security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
836                 },
837             })
838
839     def test_constrained_delegation_no_auth_data_required(self):
840         # Test constrained delegation.
841         self._run_delegation_test(
842             {
843                 'expected_error_mode': 0,
844                 'allow_delegation': True,
845                 'service2_opts': {
846                     'no_auth_data_required': True
847                 },
848                 'expect_pac': False
849             })
850
851     def test_constrained_delegation_existing_delegation_info(self):
852         # Test constrained delegation with an existing S4U_DELEGATION_INFO
853         # structure in the PAC.
854
855         services = ['service1', 'service2', 'service3']
856
857         self._run_delegation_test(
858             {
859                 'expected_error_mode': 0,
860                 'allow_delegation': True,
861                 'modify_client_tkt_fn': functools.partial(
862                     self.add_delegation_info, services=services),
863                 'expected_transited_services': services
864             })
865
866     def test_constrained_delegation_not_allowed(self):
867         # Test constrained delegation when the delegating service does not
868         # allow it.
869         self._run_delegation_test(
870             {
871                 'expected_error_mode': KDC_ERR_BADOPTION,
872                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
873                 'allow_delegation': False
874             })
875
876     def test_constrained_delegation_no_client_pac(self):
877         # Test constrained delegation when the client service ticket does not
878         # contain a PAC.
879         self._run_delegation_test(
880             {
881                 'expected_error_mode': (KDC_ERR_MODIFIED,
882                                         KDC_ERR_TGT_REVOKED),
883                 'allow_delegation': True,
884                 'modify_client_tkt_fn': self.remove_ticket_pac,
885                 'expect_edata': False
886             })
887
888     def test_constrained_delegation_no_service_pac(self):
889         # Test constrained delegation when the service TGT does not contain a
890         # PAC.
891         self._run_delegation_test(
892             {
893                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
894                 'allow_delegation': True,
895                 'modify_service_tgt_fn': self.remove_ticket_pac,
896                 'expect_edata': False
897             })
898
899     def test_constrained_delegation_no_client_pac_no_auth_data_required(self):
900         # Test constrained delegation when the client service ticket does not
901         # contain a PAC.
902         self._run_delegation_test(
903             {
904                 'expected_error_mode': (KDC_ERR_MODIFIED,
905                                         KDC_ERR_BADOPTION),
906                 'allow_delegation': True,
907                 'modify_client_tkt_fn': self.remove_ticket_pac,
908                 'expect_edata': False,
909                 'service2_opts': {
910                     'no_auth_data_required': True
911                 }
912             })
913
914     def test_constrained_delegation_no_service_pac_no_auth_data_required(self):
915         # Test constrained delegation when the service TGT does not contain a
916         # PAC.
917         self._run_delegation_test(
918             {
919                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
920                 'allow_delegation': True,
921                 'modify_service_tgt_fn': self.remove_ticket_pac,
922                 'service2_opts': {
923                     'no_auth_data_required': True
924                 },
925                 'expect_pac': False,
926                 'expect_edata': False
927             })
928
929     def test_constrained_delegation_non_forwardable(self):
930         # Test constrained delegation with a non-forwardable ticket.
931         self._run_delegation_test(
932             {
933                 'expected_error_mode': KDC_ERR_BADOPTION,
934                 'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
935                 'allow_delegation': True,
936                 'modify_client_tkt_fn': functools.partial(
937                     self.set_ticket_forwardable, flag=False)
938             })
939
940     def test_constrained_delegation_pac_options_rbcd(self):
941         # Test constrained delegation, but with the RBCD bit set in the PAC
942         # options.
943         self._run_delegation_test(
944             {
945                 'expected_error_mode': 0,
946                 'pac_options': '0001',  # supports RBCD
947                 'allow_delegation': True
948             })
949
950     def test_rbcd_no_auth_data_required(self):
951         self.skip_unless_fl2008()
952
953         self._run_delegation_test(
954             {
955                 'expected_error_mode': 0,
956                 'allow_rbcd': True,
957                 'pac_options': '0001',  # supports RBCD
958                 'service2_opts': {
959                     'no_auth_data_required': True
960                 },
961                 'expect_pac': False
962             })
963
964     def test_rbcd_existing_delegation_info(self):
965         self.skip_unless_fl2008()
966
967         # Test constrained delegation with an existing S4U_DELEGATION_INFO
968         # structure in the PAC.
969
970         services = ['service1', 'service2', 'service3']
971
972         self._run_delegation_test(
973             {
974                 'expected_error_mode': 0,
975                 'allow_rbcd': True,
976                 'pac_options': '0001',  # supports RBCD
977                 'modify_client_tkt_fn': functools.partial(
978                     self.add_delegation_info, services=services),
979                 'expected_transited_services': services
980             })
981
982     def test_rbcd_not_allowed(self):
983         # Test resource-based constrained delegation when the target service
984         # does not allow it.
985         self._run_delegation_test(
986             {
987                 'expected_error_mode': KDC_ERR_BADOPTION,
988                 'expected_status': ntstatus.NT_STATUS_NOT_FOUND,
989                 'allow_rbcd': False,
990                 'pac_options': '0001'  # supports RBCD
991             })
992
993     def test_rbcd_no_client_pac_a(self):
994         self.skip_unless_fl2008()
995
996         # Test constrained delegation when the client service ticket does not
997         # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
998         self._run_delegation_test(
999             {
1000                 'expected_error_mode': KDC_ERR_MODIFIED,
1001                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
1002                 'allow_rbcd': True,
1003                 'pac_options': '0001',  # supports RBCD
1004                 'modify_client_tkt_fn': self.remove_ticket_pac
1005             })
1006
1007     def test_rbcd_no_client_pac_b(self):
1008         self.skip_unless_fl2008()
1009
1010         # Test constrained delegation when the client service ticket does not
1011         # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
1012         self._run_delegation_test(
1013             {
1014                 'expected_error_mode': KDC_ERR_MODIFIED,
1015                 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
1016                 'allow_rbcd': True,
1017                 'pac_options': '0001',  # supports RBCD
1018                 'modify_client_tkt_fn': self.remove_ticket_pac,
1019                 'service1_opts': {
1020                     'delegation_to_spn': ('host/test')
1021                 }
1022             })
1023
1024     def test_rbcd_no_service_pac(self):
1025         self.skip_unless_fl2008()
1026
1027         # Test constrained delegation when the service TGT does not contain a
1028         # PAC.
1029         self._run_delegation_test(
1030             {
1031                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
1032                 'allow_rbcd': True,
1033                 'pac_options': '0001',  # supports RBCD
1034                 'modify_service_tgt_fn': self.remove_ticket_pac,
1035                 'expect_edata': False
1036             })
1037
1038     def test_rbcd_no_client_pac_no_auth_data_required_a(self):
1039         self.skip_unless_fl2008()
1040
1041         # Test constrained delegation when the client service ticket does not
1042         # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
1043         self._run_delegation_test(
1044             {
1045                 'expected_error_mode': KDC_ERR_MODIFIED,
1046                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
1047                 'allow_rbcd': True,
1048                 'pac_options': '0001',  # supports RBCD
1049                 'modify_client_tkt_fn': self.remove_ticket_pac,
1050                 'service2_opts': {
1051                     'no_auth_data_required': True
1052                 }
1053             })
1054
1055     def test_rbcd_no_client_pac_no_auth_data_required_b(self):
1056         self.skip_unless_fl2008()
1057
1058         # Test constrained delegation when the client service ticket does not
1059         # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
1060         self._run_delegation_test(
1061             {
1062                 'expected_error_mode': KDC_ERR_MODIFIED,
1063                 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
1064                 'allow_rbcd': True,
1065                 'pac_options': '0001',  # supports RBCD
1066                 'modify_client_tkt_fn': self.remove_ticket_pac,
1067                 'service1_opts': {
1068                     'delegation_to_spn': ('host/test')
1069                 },
1070                 'service2_opts': {
1071                     'no_auth_data_required': True
1072                 }
1073             })
1074
1075     def test_rbcd_no_service_pac_no_auth_data_required(self):
1076         self.skip_unless_fl2008()
1077
1078         # Test constrained delegation when the service TGT does not contain a
1079         # PAC.
1080         self._run_delegation_test(
1081             {
1082                 'expected_error_mode': KDC_ERR_TGT_REVOKED,
1083                 'allow_rbcd': True,
1084                 'pac_options': '0001',  # supports RBCD
1085                 'modify_service_tgt_fn': self.remove_ticket_pac,
1086                 'service2_opts': {
1087                     'no_auth_data_required': True
1088                 },
1089                 'expect_edata': False
1090             })
1091
1092     def test_rbcd_non_forwardable(self):
1093         self.skip_unless_fl2008()
1094
1095         # Test resource-based constrained delegation with a non-forwardable
1096         # ticket.
1097         self._run_delegation_test(
1098             {
1099                 'expected_error_mode': KDC_ERR_BADOPTION,
1100                 'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
1101                 'allow_rbcd': True,
1102                 'pac_options': '0001',  # supports RBCD
1103                 'modify_client_tkt_fn': functools.partial(
1104                     self.set_ticket_forwardable, flag=False)
1105             })
1106
1107     def test_rbcd_no_pac_options_a(self):
1108         self.skip_unless_fl2008()
1109
1110         # Test resource-based constrained delegation without the RBCD bit set
1111         # in the PAC options, and an empty msDS-AllowedToDelegateTo attribute.
1112         self._run_delegation_test(
1113             {
1114                 'expected_error_mode': KDC_ERR_BADOPTION,
1115                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
1116                 'allow_rbcd': True,
1117                 'pac_options': '1'  # does not support RBCD
1118             })
1119
1120     def test_rbcd_no_pac_options_b(self):
1121         self.skip_unless_fl2008()
1122
1123         # Test resource-based constrained delegation without the RBCD bit set
1124         # in the PAC options, and a non-empty msDS-AllowedToDelegateTo
1125         # attribute.
1126         self._run_delegation_test(
1127             {
1128                 'expected_error_mode': KDC_ERR_BADOPTION,
1129                 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
1130                 'allow_rbcd': True,
1131                 'pac_options': '1',  # does not support RBCD
1132                 'service1_opts': {
1133                     'delegation_to_spn': ('host/test')
1134                 }
1135             })
1136
1137     def test_bronze_bit_constrained_delegation_old_checksum(self):
1138         # Attempt to modify the ticket without updating the PAC checksums.
1139         self._run_delegation_test(
1140             {
1141                 'expected_error_mode': (KDC_ERR_MODIFIED,
1142                                         KDC_ERR_BAD_INTEGRITY),
1143                 'allow_delegation': True,
1144                 'client_tkt_options': '0',  # non-forwardable ticket
1145                 'modify_client_tkt_fn': functools.partial(
1146                     self.set_ticket_forwardable,
1147                     flag=True, update_pac_checksums=False),
1148                 'expect_edata': False
1149             })
1150
1151     def test_bronze_bit_rbcd_old_checksum(self):
1152         self.skip_unless_fl2008()
1153
1154         # Attempt to modify the ticket without updating the PAC checksums.
1155         self._run_delegation_test(
1156             {
1157                 'expected_error_mode': (KDC_ERR_MODIFIED,
1158                                         KDC_ERR_BAD_INTEGRITY),
1159                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
1160                 'allow_rbcd': True,
1161                 'pac_options': '0001',  # supports RBCD
1162                 'client_tkt_options': '0',  # non-forwardable ticket
1163                 'modify_client_tkt_fn': functools.partial(
1164                     self.set_ticket_forwardable,
1165                     flag=True, update_pac_checksums=False)
1166             })
1167
1168     def test_constrained_delegation_missing_client_checksum(self):
1169         # Present a user ticket without the required checksums.
1170         for checksum in self.pac_checksum_types:
1171             with self.subTest(checksum=checksum):
1172                 if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
1173                     expected_error_mode = (KDC_ERR_MODIFIED,
1174                                            KDC_ERR_BADOPTION)
1175                 else:
1176                     expected_error_mode = KDC_ERR_GENERIC
1177
1178                 self._run_delegation_test(
1179                     {
1180                         'expected_error_mode': expected_error_mode,
1181                         'allow_delegation': True,
1182                         'modify_client_tkt_fn': functools.partial(
1183                             self.remove_pac_checksum, checksum=checksum),
1184                         'expect_edata': False
1185                     })
1186
1187     def test_constrained_delegation_missing_service_checksum(self):
1188         # Present the service's ticket without the required checksums.
1189         for checksum in (krb5pac.PAC_TYPE_SRV_CHECKSUM,
1190                          krb5pac.PAC_TYPE_KDC_CHECKSUM):
1191             with self.subTest(checksum=checksum):
1192                 self._run_delegation_test(
1193                     {
1194                         'expected_error_mode': KDC_ERR_GENERIC,
1195                         'expected_status':
1196                             ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
1197                         'allow_delegation': True,
1198                         'modify_service_tgt_fn': functools.partial(
1199                             self.remove_pac_checksum, checksum=checksum)
1200                     })
1201
1202     def test_rbcd_missing_client_checksum(self):
1203         self.skip_unless_fl2008()
1204
1205         # Present a user ticket without the required checksums.
1206         for checksum in self.pac_checksum_types:
1207             with self.subTest(checksum=checksum):
1208                 if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
1209                     expected_error_mode = KDC_ERR_MODIFIED
1210                 else:
1211                     expected_error_mode = KDC_ERR_GENERIC
1212
1213                 self._run_delegation_test(
1214                     {
1215                         'expected_error_mode': expected_error_mode,
1216                         'expected_status':
1217                             ntstatus.NT_STATUS_NOT_SUPPORTED,
1218                         'allow_rbcd': True,
1219                         'pac_options': '0001',  # supports RBCD
1220                         'modify_client_tkt_fn': functools.partial(
1221                             self.remove_pac_checksum, checksum=checksum)
1222                     })
1223
1224     def test_rbcd_missing_service_checksum(self):
1225         self.skip_unless_fl2008()
1226
1227         # Present the service's ticket without the required checksums.
1228         for checksum in (krb5pac.PAC_TYPE_SRV_CHECKSUM,
1229                          krb5pac.PAC_TYPE_KDC_CHECKSUM):
1230             with self.subTest(checksum=checksum):
1231                 self._run_delegation_test(
1232                     {
1233                         'expected_error_mode': KDC_ERR_GENERIC,
1234                         'expected_status':
1235                             ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
1236                         'allow_rbcd': True,
1237                         'pac_options': '0001',  # supports RBCD
1238                         'modify_service_tgt_fn': functools.partial(
1239                             self.remove_pac_checksum, checksum=checksum)
1240                     })
1241
1242     def test_constrained_delegation_zeroed_client_checksum(self):
1243         # Present a user ticket with invalid checksums.
1244         for checksum in self.pac_checksum_types:
1245             with self.subTest(checksum=checksum):
1246                 self._run_delegation_test(
1247                     {
1248                         'expected_error_mode': (KDC_ERR_MODIFIED,
1249                                                 KDC_ERR_BAD_INTEGRITY),
1250                         'allow_delegation': True,
1251                         'modify_client_tkt_fn': functools.partial(
1252                             self.zeroed_pac_checksum, checksum=checksum),
1253                         'expect_edata': False
1254                     })
1255
1256     def test_constrained_delegation_zeroed_service_checksum(self):
1257         # Present the service's ticket with invalid checksums.
1258         for checksum in self.pac_checksum_types:
1259             with self.subTest(checksum=checksum):
1260                 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1261                     expected_error_mode = (KDC_ERR_MODIFIED,
1262                                            KDC_ERR_BAD_INTEGRITY)
1263                     expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
1264                 else:
1265                     expected_error_mode = 0
1266                     expected_status = None
1267
1268                 self._run_delegation_test(
1269                     {
1270                         'expected_error_mode': expected_error_mode,
1271                         'expected_status': expected_status,
1272                         'allow_delegation': True,
1273                         'modify_service_tgt_fn': functools.partial(
1274                             self.zeroed_pac_checksum, checksum=checksum)
1275                     })
1276
1277     def test_rbcd_zeroed_client_checksum(self):
1278         self.skip_unless_fl2008()
1279
1280         # Present a user ticket with invalid checksums.
1281         for checksum in self.pac_checksum_types:
1282             with self.subTest(checksum=checksum):
1283                 self._run_delegation_test(
1284                     {
1285                         'expected_error_mode': KDC_ERR_MODIFIED,
1286                         'expected_status':
1287                             ntstatus.NT_STATUS_NOT_SUPPORTED,
1288                         'allow_rbcd': True,
1289                         'pac_options': '0001',  # supports RBCD
1290                         'modify_client_tkt_fn': functools.partial(
1291                             self.zeroed_pac_checksum, checksum=checksum)
1292                     })
1293
1294     def test_rbcd_zeroed_service_checksum(self):
1295         self.skip_unless_fl2008()
1296
1297         # Present the service's ticket with invalid checksums.
1298         for checksum in self.pac_checksum_types:
1299             with self.subTest(checksum=checksum):
1300                 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1301                     expected_error_mode = KDC_ERR_MODIFIED
1302                     expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
1303                 else:
1304                     expected_error_mode = 0
1305                     expected_status = None
1306
1307                 self._run_delegation_test(
1308                     {
1309                         'expected_error_mode': expected_error_mode,
1310                         'expected_status': expected_status,
1311                         'allow_rbcd': True,
1312                         'pac_options': '0001',  # supports RBCD
1313                         'modify_service_tgt_fn': functools.partial(
1314                             self.zeroed_pac_checksum, checksum=checksum)
1315                     })
1316
1317     unkeyed_ctypes = {Cksumtype.MD5, Cksumtype.SHA1, Cksumtype.CRC32}
1318
1319     def test_constrained_delegation_unkeyed_client_checksum(self):
1320         # Present a user ticket with invalid checksums.
1321         for checksum in self.pac_checksum_types:
1322             for ctype in self.unkeyed_ctypes:
1323                 with self.subTest(checksum=checksum, ctype=ctype):
1324                     if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
1325                             and ctype == Cksumtype.SHA1):
1326                         expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1327                                                KDC_ERR_INAPP_CKSUM)
1328                     else:
1329                         expected_error_mode = (KDC_ERR_GENERIC,
1330                                                KDC_ERR_INAPP_CKSUM)
1331
1332                     self._run_delegation_test(
1333                         {
1334                             'expected_error_mode': expected_error_mode,
1335                             'allow_delegation': True,
1336                             'modify_client_tkt_fn': functools.partial(
1337                                 self.unkeyed_pac_checksum,
1338                                 checksum=checksum, ctype=ctype),
1339                             'expect_edata': False
1340                         })
1341
1342     def test_constrained_delegation_unkeyed_service_checksum(self):
1343         # Present the service's ticket with invalid checksums.
1344         for checksum in self.pac_checksum_types:
1345             for ctype in self.unkeyed_ctypes:
1346                 with self.subTest(checksum=checksum, ctype=ctype):
1347                     if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1348                         if ctype == Cksumtype.SHA1:
1349                             expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1350                                                    KDC_ERR_INAPP_CKSUM)
1351                             expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
1352                         else:
1353                             expected_error_mode = (KDC_ERR_GENERIC,
1354                                                    KDC_ERR_INAPP_CKSUM)
1355                             expected_status = (
1356                                 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1357                     else:
1358                         expected_error_mode = 0
1359                         expected_status = None
1360
1361                     self._run_delegation_test(
1362                         {
1363                             'expected_error_mode': expected_error_mode,
1364                             'expected_status': expected_status,
1365                             'allow_delegation': True,
1366                             'modify_service_tgt_fn': functools.partial(
1367                                 self.unkeyed_pac_checksum,
1368                                 checksum=checksum, ctype=ctype)
1369                         })
1370
1371     def test_rbcd_unkeyed_client_checksum(self):
1372         self.skip_unless_fl2008()
1373
1374         # Present a user ticket with invalid checksums.
1375         for checksum in self.pac_checksum_types:
1376             for ctype in self.unkeyed_ctypes:
1377                 with self.subTest(checksum=checksum, ctype=ctype):
1378                     if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
1379                             and ctype == Cksumtype.SHA1):
1380                         expected_error_mode = KDC_ERR_SUMTYPE_NOSUPP
1381                     else:
1382                         expected_error_mode = KDC_ERR_GENERIC
1383
1384                     self._run_delegation_test(
1385                         {
1386                             'expected_error_mode': expected_error_mode,
1387                             'expected_status':
1388                                 ntstatus.NT_STATUS_NOT_SUPPORTED,
1389                             'allow_rbcd': True,
1390                             'pac_options': '0001',  # supports RBCD
1391                             'modify_client_tkt_fn': functools.partial(
1392                                 self.unkeyed_pac_checksum,
1393                                 checksum=checksum, ctype=ctype)
1394                         })
1395
1396     def test_rbcd_unkeyed_service_checksum(self):
1397         self.skip_unless_fl2008()
1398
1399         # Present the service's ticket with invalid checksums.
1400         for checksum in self.pac_checksum_types:
1401             for ctype in self.unkeyed_ctypes:
1402                 with self.subTest(checksum=checksum, ctype=ctype):
1403                     if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1404                         if ctype == Cksumtype.SHA1:
1405                             expected_error_mode = KDC_ERR_SUMTYPE_NOSUPP
1406                             expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
1407                         else:
1408                             expected_error_mode = KDC_ERR_GENERIC
1409                             expected_status = (
1410                                 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1411                     else:
1412                         expected_error_mode = 0
1413                         expected_status = None
1414
1415                     self._run_delegation_test(
1416                         {
1417                             'expected_error_mode': expected_error_mode,
1418                             'expected_status': expected_status,
1419                             'allow_rbcd': True,
1420                             'pac_options': '0001',  # supports RBCD
1421                             'modify_service_tgt_fn': functools.partial(
1422                                 self.unkeyed_pac_checksum,
1423                                 checksum=checksum, ctype=ctype)
1424                         })
1425
1426     def test_constrained_delegation_rc4_client_checksum(self):
1427         # Present a user ticket with RC4 checksums.
1428         samdb = self.get_samdb()
1429         functional_level = self.get_domain_functional_level(samdb)
1430
1431         if functional_level >= dsdb.DS_DOMAIN_FUNCTION_2008:
1432             expected_error_mode = (KDC_ERR_GENERIC,
1433                                    KDC_ERR_INAPP_CKSUM)
1434             expect_edata = False
1435         else:
1436             expected_error_mode = 0
1437             expect_edata = None
1438
1439         self._run_delegation_test(
1440             {
1441                 'expected_error_mode': expected_error_mode,
1442                 'allow_delegation': True,
1443                 'modify_client_tkt_fn': self.rc4_pac_checksums,
1444                 'expect_edata': expect_edata,
1445             })
1446
1447     def test_rbcd_rc4_client_checksum(self):
1448         self.skip_unless_fl2008()
1449
1450         # Present a user ticket with RC4 checksums.
1451         expected_error_mode = (KDC_ERR_GENERIC,
1452                                KDC_ERR_BADOPTION)
1453
1454         self._run_delegation_test(
1455             {
1456                 'expected_error_mode': expected_error_mode,
1457                 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
1458                 'allow_rbcd': True,
1459                 'pac_options': '0001',  # supports RBCD
1460                 'modify_client_tkt_fn': self.rc4_pac_checksums,
1461             })
1462
1463     def remove_pac_checksum(self, ticket, checksum):
1464         checksum_keys = self.get_krbtgt_checksum_key()
1465
1466         return self.modified_ticket(ticket,
1467                                     checksum_keys=checksum_keys,
1468                                     include_checksums={checksum: False})
1469
1470     def zeroed_pac_checksum(self, ticket, checksum):
1471         krbtgt_creds = self.get_krbtgt_creds()
1472         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1473
1474         server_key = ticket.decryption_key
1475
1476         checksum_keys = {
1477             krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1478             krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
1479             krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
1480         }
1481
1482         if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1483             zeroed_key = server_key
1484         else:
1485             zeroed_key = krbtgt_key
1486
1487         checksum_keys[checksum] = ZeroedChecksumKey(zeroed_key.key,
1488                                                     zeroed_key.kvno)
1489
1490         return self.modified_ticket(ticket,
1491                                     checksum_keys=checksum_keys,
1492                                     include_checksums={checksum: True})
1493
1494     def unkeyed_pac_checksum(self, ticket, checksum, ctype):
1495         krbtgt_creds = self.get_krbtgt_creds()
1496         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1497
1498         server_key = ticket.decryption_key
1499
1500         checksum_keys = {
1501             krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1502             krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
1503             krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
1504             krb5pac.PAC_TYPE_FULL_CHECKSUM: krbtgt_key,
1505         }
1506
1507         # Make a copy of the existing key and change the ctype.
1508         key = checksum_keys[checksum]
1509         new_key = RodcPacEncryptionKey(key.key, key.kvno)
1510         new_key.ctype = ctype
1511         checksum_keys[checksum] = new_key
1512
1513         return self.modified_ticket(ticket,
1514                                     checksum_keys=checksum_keys,
1515                                     include_checksums={checksum: True})
1516
1517     def rc4_pac_checksums(self, ticket):
1518         krbtgt_creds = self.get_krbtgt_creds()
1519         rc4_krbtgt_key = self.TicketDecryptionKey_from_creds(
1520             krbtgt_creds, etype=Enctype.RC4)
1521
1522         server_key = ticket.decryption_key
1523
1524         checksum_keys = {
1525             krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1526             krb5pac.PAC_TYPE_KDC_CHECKSUM: rc4_krbtgt_key,
1527             krb5pac.PAC_TYPE_TICKET_CHECKSUM: rc4_krbtgt_key,
1528             krb5pac.PAC_TYPE_FULL_CHECKSUM: rc4_krbtgt_key,
1529         }
1530
1531         include_checksums = {
1532             krb5pac.PAC_TYPE_SRV_CHECKSUM: True,
1533             krb5pac.PAC_TYPE_KDC_CHECKSUM: True,
1534             krb5pac.PAC_TYPE_TICKET_CHECKSUM: True,
1535             krb5pac.PAC_TYPE_FULL_CHECKSUM: True,
1536         }
1537
1538         return self.modified_ticket(ticket,
1539                                     checksum_keys=checksum_keys,
1540                                     include_checksums=include_checksums)
1541
1542     def add_delegation_info(self, ticket, services=None):
1543         def modify_pac_fn(pac):
1544             pac_buffers = pac.buffers
1545             self.assertNotIn(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION,
1546                              (buffer.type for buffer in pac_buffers))
1547
1548             transited_services = list(map(lsa.String, services))
1549
1550             delegation = krb5pac.PAC_CONSTRAINED_DELEGATION()
1551             delegation.proxy_target = lsa.String('test_proxy_target')
1552             delegation.transited_services = transited_services
1553             delegation.num_transited_services = len(transited_services)
1554
1555             info = krb5pac.PAC_CONSTRAINED_DELEGATION_CTR()
1556             info.info = delegation
1557
1558             pac_buffer = krb5pac.PAC_BUFFER()
1559             pac_buffer.type = krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION
1560             pac_buffer.info = info
1561
1562             pac_buffers.append(pac_buffer)
1563
1564             pac.buffers = pac_buffers
1565             pac.num_buffers += 1
1566
1567             return pac
1568
1569         checksum_keys = self.get_krbtgt_checksum_key()
1570
1571         return self.modified_ticket(ticket,
1572                                     checksum_keys=checksum_keys,
1573                                     modify_pac_fn=modify_pac_fn)
1574
1575     def set_ticket_forwardable(self, ticket, flag, update_pac_checksums=True):
1576         modify_fn = functools.partial(self.modify_ticket_flag,
1577                                       flag='forwardable',
1578                                       value=flag)
1579
1580         if update_pac_checksums:
1581             checksum_keys = self.get_krbtgt_checksum_key()
1582         else:
1583             checksum_keys = None
1584
1585         return self.modified_ticket(ticket,
1586                                     modify_fn=modify_fn,
1587                                     checksum_keys=checksum_keys,
1588                                     update_pac_checksums=update_pac_checksums)
1589
1590     def remove_ticket_pac(self, ticket):
1591         return self.modified_ticket(ticket,
1592                                     exclude_pac=True)
1593
1594
1595 if __name__ == "__main__":
1596     global_asn1_print = False
1597     global_hexdump = False
1598     import unittest
1599     unittest.main()