0b351ae253bdc3511d84d8572bacf4c1b0c07dcf
[samba.git] / python / samba / tests / krb5 / conditional_ace_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2023
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 sys.path.insert(0, 'bin/python')
24 os.environ['PYTHONUNBUFFERED'] = '1'
25
26 from collections import OrderedDict
27 from functools import partial
28 import re
29 from string import Formatter
30
31 import ldb
32
33 from samba import dsdb, ntstatus
34 from samba.dcerpc import claims, krb5pac, security
35 from samba.ndr import ndr_pack, ndr_unpack
36
37 from samba.tests import DynamicTestCase, env_get_var_value
38 from samba.tests.krb5.authn_policy_tests import (
39     AuditEvent,
40     AuditReason,
41     AuthnPolicyBaseTests,
42 )
43 from samba.tests.krb5.raw_testcase import RawKerberosTest
44 from samba.tests.krb5.rfc4120_constants import (
45     KDC_ERR_BADOPTION,
46     KDC_ERR_GENERIC,
47     KDC_ERR_MODIFIED,
48     KDC_ERR_POLICY,
49     NT_PRINCIPAL,
50 )
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
52
53 SidType = RawKerberosTest.SidType
54
55 global_asn1_print = False
56 global_hexdump = False
57
58
59 # When used as a test outcome, indicates that the test can cause a Windows
60 # server to crash, and is to be run with caution.
61 CRASHES_WINDOWS = object()
62
63
64 class ConditionalAceBaseTests(AuthnPolicyBaseTests):
65     # Constants for group SID attributes.
66     default_attrs = security.SE_GROUP_DEFAULT_FLAGS
67     resource_attrs = default_attrs | security.SE_GROUP_RESOURCE
68
69     aa_asserted_identity = (
70         security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
71     service_asserted_identity = security.SID_SERVICE_ASSERTED_IDENTITY
72
73     @classmethod
74     def setUpClass(cls):
75         super().setUpClass()
76
77         cls._setup = False
78
79     def setUp(self):
80         super().setUp()
81         self.do_asn1_print = global_asn1_print
82         self.do_hexdump = global_hexdump
83
84         if not self._setup:
85             samdb = self.get_samdb()
86             cls = type(self)
87
88             # Create a machine account with which to perform FAST.
89             cls._mach_creds = self.get_cached_creds(
90                 account_type=self.AccountType.COMPUTER)
91
92             # Create some new groups.
93
94             group0_name = self.get_new_username()
95             group0_dn = self.create_group(samdb, group0_name)
96             cls._group0_sid = self.get_objectSid(samdb, group0_dn)
97
98             group1_name = self.get_new_username()
99             group1_dn = self.create_group(samdb, group1_name)
100             cls._group1_sid = self.get_objectSid(samdb, group1_dn)
101
102             # Create machine accounts with which to perform FAST that belong to
103             # various arrangements of the groups.
104
105             cls._member_of_both_creds = self.get_cached_creds(
106                 account_type=self.AccountType.COMPUTER,
107                 opts={'member_of': (group0_dn, group1_dn)})
108
109             cls._member_of_one_creds = self.get_cached_creds(
110                 account_type=self.AccountType.COMPUTER,
111                 opts={'member_of': (group1_dn,)})
112
113             # Create some authentication silos.
114             cls._unenforced_silo = self.create_authn_silo(enforced=False)
115             cls._enforced_silo = self.create_authn_silo(enforced=True)
116
117             # Create machine accounts with which to perform FAST that belong to
118             # the respective silos.
119
120             cls._member_of_unenforced_silo = self._get_creds(
121                 account_type=self.AccountType.COMPUTER,
122                 assigned_silo=self._unenforced_silo,
123                 cached=True)
124             self.add_to_group(str(self._member_of_unenforced_silo.get_dn()),
125                               self._unenforced_silo.dn,
126                               'msDS-AuthNPolicySiloMembers',
127                               expect_attr=False)
128
129             cls._member_of_enforced_silo = self._get_creds(
130                 account_type=self.AccountType.COMPUTER,
131                 assigned_silo=self._enforced_silo,
132                 cached=True)
133             self.add_to_group(str(self._member_of_enforced_silo.get_dn()),
134                               self._enforced_silo.dn,
135                               'msDS-AuthNPolicySiloMembers',
136                               expect_attr=False)
137
138             # Create a couple of multi‐valued string claims for testing claim
139             # value comparisons.
140
141             cls.claim0_attr = 'carLicense'
142             cls.claim0_id = self.get_new_username()
143             self.create_claim(cls.claim0_id,
144                               enabled=True,
145                               attribute=cls.claim0_attr,
146                               single_valued=False,
147                               source_type='AD',
148                               for_classes=['computer', 'user'],
149                               value_type=claims.CLAIM_TYPE_STRING)
150
151             cls.claim1_attr = 'departmentNumber'
152             cls.claim1_id = self.get_new_username()
153             self.create_claim(cls.claim1_id,
154                               enabled=True,
155                               attribute=cls.claim1_attr,
156                               single_valued=False,
157                               source_type='AD',
158                               for_classes=['computer', 'user'],
159                               value_type=claims.CLAIM_TYPE_STRING)
160
161             cls._setup = True
162
163     # For debugging purposes. Prints out the SDDL representation of
164     # authentication policy conditions set by the Windows GUI.
165     def _print_authn_policy_sddl(self, policy_id):
166         policy_dn = self.get_authn_policies_dn()
167         policy_dn.add_child(f'CN={policy_id}')
168
169         attrs = [
170             'msDS-ComputerAllowedToAuthenticateTo',
171             'msDS-ServiceAllowedToAuthenticateFrom',
172             'msDS-ServiceAllowedToAuthenticateTo',
173             'msDS-UserAllowedToAuthenticateFrom',
174             'msDS-UserAllowedToAuthenticateTo',
175         ]
176
177         samdb = self.get_samdb()
178         res = samdb.search(policy_dn, scope=ldb.SCOPE_BASE, attrs=attrs)
179         self.assertEqual(1, len(res),
180                          f'Authentication policy {policy_id} not found')
181         result = res[0]
182
183         def print_sddl(attr):
184             sd = result.get(attr, idx=0)
185             if sd is None:
186                 return
187
188             sec_desc = ndr_unpack(security.descriptor, sd)
189             print(f'{attr}: {sec_desc.as_sddl()}')
190
191         for attr in attrs:
192             print_sddl(attr)
193
194     def sddl_array_from_sids(self, sids):
195         def sddl_from_sid_entry(sid_entry):
196             sid, _, _ = sid_entry
197             return f'SID({sid})'
198
199         return f"{{{', '.join(map(sddl_from_sid_entry, sids))}}}"
200
201     def allow_if(self, condition):
202         return f'O:SYD:(XA;;CR;;;WD;({condition}))'
203
204     @staticmethod
205     def escaped_claim_id(claim_id):
206         escapes = '\x00\t\n\x0b\x0c\r !"%&()<=>|'
207         return ''.join(c
208                        if c not in escapes
209                        else f'%{ord(c):04x}'
210                        for c in claim_id)
211
212
213 @DynamicTestCase
214 class ConditionalAceTests(ConditionalAceBaseTests):
215     @classmethod
216     def setUpDynamicTestCases(cls):
217         FILTER = env_get_var_value('FILTER', allow_missing=True)
218
219         # These operators are arranged so that each operator precedes its own
220         # affixes.
221         op_names = OrderedDict([
222             ('!=', 'does not equal'),
223             ('!', 'not'),
224             ('&&', 'and'),
225             ('<=', 'is less than or equals'),
226             ('<', 'is less than'),
227             ('==', 'equals'),
228             ('>=', 'exceeds or equals'),
229             ('>', 'exceeds'),
230             ('Not_Any_of', 'matches none of'),
231             ('Any_of', 'matches any of'),
232             ('Not_Contains', 'does not contain'),
233             ('Contains', 'contains'),
234             ('Not_Member_of_Any', 'the user belongs to none of'),
235             ('Not_Device_Member_of_Any', 'the device belongs to none of'),  # TODO: no test for this yet
236             ('Device_Member_of_Any', 'the device belongs to any of'),  # TODO: no test for this yet
237             ('Not_Device_Member_of', 'the device does not belong to'),  # TODO: no test for this yet
238             ('Device_Member_of', 'the device belongs to'),
239             ('Not_Exists', 'there does not exist'),
240             ('Exists', 'there exists'),
241             ('Member_of_Any', 'the user belongs to any of'),
242             ('Not_Member_of', 'the user does not belong to'),
243             ('Member_of', 'the user belongs to'),
244             ('||', 'or'),
245         ])
246
247         # This is a safety measure to ensure correct ordering of op_names
248         keys = list(op_names.keys())
249         for i in range(len(keys)):
250             for j in range(i + 1, len(keys)):
251                 if keys[i] in keys[j]:
252                     raise AssertionError((keys[i], keys[j]))
253
254         for case in cls.pac_claim_cases:
255             if len(case) == 3:
256                 pac_claims, expression, outcome = case
257                 claim_map = None
258             elif len(case) == 4:
259                 pac_claims, expression, claim_map, outcome = case
260             else:
261                 raise AssertionError(
262                     f'found {len(case)} items in case, expected 3–4')
263
264             expression_name = expression
265             for op, op_name in op_names.items():
266                 expression_name = expression_name.replace(op, op_name)
267
268             name = f'{pac_claims}_{expression_name}'
269
270             if claim_map is not None:
271                 name += f'_{claim_map}'
272
273             name = re.sub(r'\W+', '_', name)
274             if len(name) > 150:
275                 name = f'{name[:125]}+{len(name) - 125}‐more'
276
277             if FILTER and not re.search(FILTER, name):
278                 continue
279
280             cls.generate_dynamic_test('test_pac_claim_cmp', name,
281                                       pac_claims, expression, claim_map,
282                                       outcome)
283
284         for case in cls.claim_against_claim_cases:
285             lhs, op, rhs, outcome = case
286             op_name = op_names[op]
287
288             name = f'{lhs}_{op_name}_{rhs}'
289
290             name = re.sub(r'\W+', '_', name)
291             if FILTER and not re.search(FILTER, name):
292                 continue
293
294             cls.generate_dynamic_test('test_cmp', name,
295                                       lhs, op, rhs, outcome)
296
297         for case in cls.claim_against_literal_cases:
298             lhs, op, rhs, outcome = case
299             op_name = op_names[op]
300
301             name = f'{lhs}_{op_name}_literal_{rhs}'
302
303             name = re.sub(r'\W+', '_', name)
304             if FILTER and not re.search(FILTER, name):
305                 continue
306
307             cls.generate_dynamic_test('test_cmp', name,
308                                       lhs, op, rhs, outcome, True)
309
310     def test_allowed_from_member_of_each(self):
311         # Create an authentication policy that allows accounts belonging to
312         # both groups.
313         policy = self.create_authn_policy(
314             enforced=True,
315             user_allowed_from=(
316                 f'O:SYD:(XA;;CR;;;WD;(Member_of '
317                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
318         )
319
320         # Create a user account with the assigned policy.
321         client_creds = self._get_creds(account_type=self.AccountType.USER,
322                                        assigned_policy=policy)
323
324         # Show that we get a policy error if the machine account does not
325         # belong to both groups.
326         armor_tgt = self.get_tgt(self._member_of_one_creds)
327         self._get_tgt(client_creds, armor_tgt=armor_tgt,
328                       expected_error=KDC_ERR_POLICY)
329
330         # Otherwise, authentication should succeed.
331         armor_tgt = self.get_tgt(self._member_of_both_creds)
332         self._get_tgt(client_creds, armor_tgt=armor_tgt,
333                       expected_error=0)
334
335     def test_allowed_from_member_of_any(self):
336         # Create an authentication policy that allows accounts belonging to
337         # either group.
338         policy = self.create_authn_policy(
339             enforced=True,
340             user_allowed_from=(
341                 f'O:SYD:(XA;;CR;;;WD;(Member_of_Any '
342                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
343         )
344
345         # Create a user account with the assigned policy.
346         client_creds = self._get_creds(account_type=self.AccountType.USER,
347                                        assigned_policy=policy)
348
349         # Show that we get a policy error if the machine account belongs to
350         # neither group.
351         armor_tgt = self.get_tgt(self._mach_creds)
352         self._get_tgt(client_creds, armor_tgt=armor_tgt,
353                       expected_error=KDC_ERR_POLICY)
354
355         # Otherwise, authentication should succeed.
356         armor_tgt = self.get_tgt(self._member_of_one_creds)
357         self._get_tgt(client_creds, armor_tgt=armor_tgt,
358                       expected_error=0)
359
360     def test_allowed_from_not_member_of_each(self):
361         # Create an authentication policy that allows accounts not belonging to
362         # both groups.
363         policy = self.create_authn_policy(
364             enforced=True,
365             user_allowed_from=(
366                 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of '
367                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
368         )
369
370         # Create a user account with the assigned policy.
371         client_creds = self._get_creds(account_type=self.AccountType.USER,
372                                        assigned_policy=policy)
373
374         # Show that we get a policy error if the machine account belongs to
375         # both groups.
376         armor_tgt = self.get_tgt(self._member_of_both_creds)
377         self._get_tgt(client_creds, armor_tgt=armor_tgt,
378                       expected_error=KDC_ERR_POLICY)
379
380         # Otherwise, authentication should succeed.
381         armor_tgt = self.get_tgt(self._member_of_one_creds)
382         self._get_tgt(client_creds, armor_tgt=armor_tgt,
383                       expected_error=0)
384
385     def test_allowed_from_not_member_of_any(self):
386         # Create an authentication policy that allows accounts belonging to
387         # neither group.
388         policy = self.create_authn_policy(
389             enforced=True,
390             user_allowed_from=(
391                 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of_Any '
392                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
393         )
394
395         # Create a user account with the assigned policy.
396         client_creds = self._get_creds(account_type=self.AccountType.USER,
397                                        assigned_policy=policy)
398
399         # Show that we get a policy error if the machine account belongs to one
400         # of the groups.
401         armor_tgt = self.get_tgt(self._member_of_one_creds)
402         self._get_tgt(client_creds, armor_tgt=armor_tgt,
403                       expected_error=KDC_ERR_POLICY)
404
405         # Otherwise, authentication should succeed.
406         armor_tgt = self.get_tgt(self._mach_creds)
407         self._get_tgt(client_creds, armor_tgt=armor_tgt,
408                       expected_error=0)
409
410     def test_allowed_from_member_of_each_deny(self):
411         # Create an authentication policy that denies accounts belonging to
412         # both groups, and allows other accounts.
413         policy = self.create_authn_policy(
414             enforced=True,
415             user_allowed_from=(
416                 f'O:SYD:(XD;;CR;;;WD;(Member_of '
417                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
418                 f'(A;;CR;;;WD)'),
419         )
420
421         # Create a user account with the assigned policy.
422         client_creds = self._get_creds(account_type=self.AccountType.USER,
423                                        assigned_policy=policy)
424
425         # Show that we get a policy error if the machine account belongs to
426         # both groups.
427         armor_tgt = self.get_tgt(self._member_of_both_creds)
428         self._get_tgt(client_creds, armor_tgt=armor_tgt,
429                       expected_error=KDC_ERR_POLICY)
430
431         # Otherwise, authentication should succeed.
432         armor_tgt = self.get_tgt(self._member_of_one_creds)
433         self._get_tgt(client_creds, armor_tgt=armor_tgt,
434                       expected_error=0)
435
436     def test_allowed_from_member_of_any_deny(self):
437         # Create an authentication policy that denies accounts belonging to
438         # either group, and allows other accounts.
439         policy = self.create_authn_policy(
440             enforced=True,
441             user_allowed_from=(
442                 f'O:SYD:(XD;;CR;;;WD;(Member_of_Any '
443                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
444                 f'(A;;CR;;;WD)'),
445         )
446
447         # Create a user account with the assigned policy.
448         client_creds = self._get_creds(account_type=self.AccountType.USER,
449                                        assigned_policy=policy)
450
451         # Show that we get a policy error if the machine account belongs to
452         # either group.
453         armor_tgt = self.get_tgt(self._member_of_one_creds)
454         self._get_tgt(client_creds, armor_tgt=armor_tgt,
455                       expected_error=KDC_ERR_POLICY)
456
457         # Otherwise, authentication should succeed.
458         armor_tgt = self.get_tgt(self._mach_creds)
459         self._get_tgt(client_creds, armor_tgt=armor_tgt,
460                       expected_error=0)
461
462     def test_allowed_from_not_member_of_each_deny(self):
463         # Create an authentication policy that denies accounts not belonging to
464         # both groups, and allows other accounts.
465         policy = self.create_authn_policy(
466             enforced=True,
467             user_allowed_from=(
468                 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of '
469                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
470                 f'(A;;CR;;;WD)'),
471         )
472
473         # Create a user account with the assigned policy.
474         client_creds = self._get_creds(account_type=self.AccountType.USER,
475                                        assigned_policy=policy)
476
477         # Show that we get a policy error if the machine account doesn’t belong
478         # to both groups.
479         armor_tgt = self.get_tgt(self._member_of_one_creds)
480         self._get_tgt(client_creds, armor_tgt=armor_tgt,
481                       expected_error=KDC_ERR_POLICY)
482
483         # Otherwise, authentication should succeed.
484         armor_tgt = self.get_tgt(self._member_of_both_creds)
485         self._get_tgt(client_creds, armor_tgt=armor_tgt,
486                       expected_error=0)
487
488     def test_allowed_from_not_member_of_any_deny(self):
489         # Create an authentication policy that denies accounts belonging to
490         # neither group, and allows other accounts.
491         policy = self.create_authn_policy(
492             enforced=True,
493             user_allowed_from=(
494                 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of_Any '
495                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
496                 f'(A;;CR;;;WD)'),
497         )
498
499         # Create a user account with the assigned policy.
500         client_creds = self._get_creds(account_type=self.AccountType.USER,
501                                        assigned_policy=policy)
502
503         # Show that we get a policy error if the machine account belongs to
504         # neither group.
505         armor_tgt = self.get_tgt(self._mach_creds)
506         self._get_tgt(client_creds, armor_tgt=armor_tgt,
507                       expected_error=KDC_ERR_POLICY)
508
509         # Otherwise, authentication should succeed.
510         armor_tgt = self.get_tgt(self._member_of_one_creds)
511         self._get_tgt(client_creds, armor_tgt=armor_tgt,
512                       expected_error=0)
513
514     def test_allowed_from_unenforced_silo_equals(self):
515         # Create an authentication policy that allows accounts belonging to the
516         # unenforced silo.
517         policy = self.create_authn_policy(
518             enforced=True,
519             user_allowed_from=(
520                 f'O:SYD:(XA;;CR;;;WD;'
521                 f'(@User.ad://ext/AuthenticationSilo == '
522                 f'"{self._unenforced_silo}"))'),
523         )
524
525         # Create a user account with the assigned policy.
526         client_creds = self._get_creds(account_type=self.AccountType.USER,
527                                        assigned_policy=policy)
528
529         # As the policy is unenforced, the ‘ad://ext/AuthenticationSilo’ claim
530         # will not be present in the TGT, and the ACE will never allow access.
531
532         armor_tgt = self.get_tgt(self._mach_creds)
533         self._get_tgt(client_creds, armor_tgt=armor_tgt,
534                       expected_error=KDC_ERR_POLICY)
535
536         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
537         self._get_tgt(client_creds, armor_tgt=armor_tgt,
538                       expected_error=KDC_ERR_POLICY)
539
540         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
541         self._get_tgt(client_creds, armor_tgt=armor_tgt,
542                       expected_error=KDC_ERR_POLICY)
543
544     def test_allowed_from_enforced_silo_equals(self):
545         # Create an authentication policy that allows accounts belonging to the
546         # enforced silo.
547         policy = self.create_authn_policy(
548             enforced=True,
549             user_allowed_from=(
550                 f'O:SYD:(XA;;CR;;;WD;'
551                 f'(@User.ad://ext/AuthenticationSilo == '
552                 f'"{self._enforced_silo}"))'),
553         )
554
555         # Create a user account with the assigned policy.
556         client_creds = self._get_creds(account_type=self.AccountType.USER,
557                                        assigned_policy=policy)
558
559         # Show that we get a policy error if the machine account does not
560         # belong to the silo.
561         armor_tgt = self.get_tgt(self._mach_creds)
562         self._get_tgt(client_creds, armor_tgt=armor_tgt,
563                       expected_error=KDC_ERR_POLICY)
564
565         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
566         self._get_tgt(client_creds, armor_tgt=armor_tgt,
567                       expected_error=KDC_ERR_POLICY)
568
569         # Otherwise, authentication should succeed.
570         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
571         self._get_tgt(client_creds, armor_tgt=armor_tgt,
572                       expected_error=0)
573
574     def test_allowed_from_unenforced_silo_not_equals(self):
575         # Create an authentication policy that allows accounts not belonging to
576         # the unenforced silo.
577         policy = self.create_authn_policy(
578             enforced=True,
579             user_allowed_from=(
580                 f'O:SYD:(XA;;CR;;;WD;'
581                 f'(@User.ad://ext/AuthenticationSilo != '
582                 f'"{self._unenforced_silo}"))'),
583         )
584
585         # Create a user account with the assigned policy.
586         client_creds = self._get_creds(account_type=self.AccountType.USER,
587                                        assigned_policy=policy)
588
589         # Show that authentication fails unless the account belongs to a silo
590         # other than the unenforced silo.
591
592         armor_tgt = self.get_tgt(self._mach_creds)
593         self._get_tgt(client_creds, armor_tgt=armor_tgt,
594                       expected_error=KDC_ERR_POLICY)
595
596         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
597         self._get_tgt(client_creds, armor_tgt=armor_tgt,
598                       expected_error=KDC_ERR_POLICY)
599
600         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
601         self._get_tgt(client_creds, armor_tgt=armor_tgt,
602                       expected_error=0)
603
604     def test_allowed_from_enforced_silo_not_equals(self):
605         # Create an authentication policy that allows accounts not belonging to
606         # the enforced silo.
607         policy = self.create_authn_policy(
608             enforced=True,
609             user_allowed_from=(
610                 f'O:SYD:(XA;;CR;;;WD;'
611                 f'(@User.ad://ext/AuthenticationSilo != '
612                 f'"{self._enforced_silo}"))'),
613         )
614
615         # Create a user account with the assigned policy.
616         client_creds = self._get_creds(account_type=self.AccountType.USER,
617                                        assigned_policy=policy)
618
619         # Show that authentication always fails, as none of the machine
620         # accounts belong to a silo that is not the enforced one. (The
621         # unenforced silo doesn’t count, as it will never appear in a claim.)
622
623         armor_tgt = self.get_tgt(self._mach_creds)
624         self._get_tgt(client_creds, armor_tgt=armor_tgt,
625                       expected_error=KDC_ERR_POLICY)
626
627         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
628         self._get_tgt(client_creds, armor_tgt=armor_tgt,
629                       expected_error=KDC_ERR_POLICY)
630
631         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
632         self._get_tgt(client_creds, armor_tgt=armor_tgt,
633                       expected_error=KDC_ERR_POLICY)
634
635     def test_allowed_from_unenforced_silo_equals_deny(self):
636         # Create an authentication policy that denies accounts belonging to the
637         # unenforced silo, and allows other accounts.
638         policy = self.create_authn_policy(
639             enforced=True,
640             user_allowed_from=(
641                 f'O:SYD:(XD;;CR;;;WD;'
642                 f'(@User.ad://ext/AuthenticationSilo == '
643                 f'"{self._unenforced_silo}"))'
644                 f'(A;;CR;;;WD)'),
645         )
646
647         # Create a user account with the assigned policy.
648         client_creds = self._get_creds(account_type=self.AccountType.USER,
649                                        assigned_policy=policy)
650
651         # Show that authentication fails unless the account belongs to a silo
652         # other than the unenforced silo.
653
654         armor_tgt = self.get_tgt(self._mach_creds)
655         self._get_tgt(client_creds, armor_tgt=armor_tgt,
656                       expected_error=KDC_ERR_POLICY)
657
658         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
659         self._get_tgt(client_creds, armor_tgt=armor_tgt,
660                       expected_error=KDC_ERR_POLICY)
661
662         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
663         self._get_tgt(client_creds, armor_tgt=armor_tgt,
664                       expected_error=0)
665
666     def test_allowed_from_enforced_silo_equals_deny(self):
667         # Create an authentication policy that denies accounts belonging to the
668         # enforced silo, and allows other accounts.
669         policy = self.create_authn_policy(
670             enforced=True,
671             user_allowed_from=(
672                 f'O:SYD:(XD;;CR;;;WD;'
673                 f'(@User.ad://ext/AuthenticationSilo == '
674                 f'"{self._enforced_silo}"))'
675                 f'(A;;CR;;;WD)'),
676         )
677
678         # Create a user account with the assigned policy.
679         client_creds = self._get_creds(account_type=self.AccountType.USER,
680                                        assigned_policy=policy)
681
682         # Show that authentication always fails, as none of the machine
683         # accounts belong to a silo that is not the enforced one. (The
684         # unenforced silo doesn’t count, as it will never appear in a claim.)
685
686         armor_tgt = self.get_tgt(self._mach_creds)
687         self._get_tgt(client_creds, armor_tgt=armor_tgt,
688                       expected_error=KDC_ERR_POLICY)
689
690         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
691         self._get_tgt(client_creds, armor_tgt=armor_tgt,
692                       expected_error=KDC_ERR_POLICY)
693
694         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
695         self._get_tgt(client_creds, armor_tgt=armor_tgt,
696                       expected_error=KDC_ERR_POLICY)
697
698     def test_allowed_from_unenforced_silo_not_equals_deny(self):
699         # Create an authentication policy that denies accounts not belonging to
700         # the unenforced silo, and allows other accounts.
701         policy = self.create_authn_policy(
702             enforced=True,
703             user_allowed_from=(
704                 f'O:SYD:(XD;;CR;;;WD;'
705                 f'(@User.ad://ext/AuthenticationSilo != '
706                 f'"{self._unenforced_silo}"))'
707                 f'(A;;CR;;;WD)'),
708         )
709
710         # Create a user account with the assigned policy.
711         client_creds = self._get_creds(account_type=self.AccountType.USER,
712                                        assigned_policy=policy)
713
714         # Show that authentication always fails, as the unenforced silo will
715         # never appear in a claim.
716
717         armor_tgt = self.get_tgt(self._mach_creds)
718         self._get_tgt(client_creds, armor_tgt=armor_tgt,
719                       expected_error=KDC_ERR_POLICY)
720
721         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
722         self._get_tgt(client_creds, armor_tgt=armor_tgt,
723                       expected_error=KDC_ERR_POLICY)
724
725         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
726         self._get_tgt(client_creds, armor_tgt=armor_tgt,
727                       expected_error=KDC_ERR_POLICY)
728
729     def test_allowed_from_enforced_silo_not_equals_deny(self):
730         # Create an authentication policy that denies accounts not belonging to
731         # the enforced silo, and allows other accounts.
732         policy = self.create_authn_policy(
733             enforced=True,
734             user_allowed_from=(
735                 f'O:SYD:(XD;;CR;;;WD;'
736                 f'(@User.ad://ext/AuthenticationSilo != '
737                 f'"{self._enforced_silo}"))'
738                 f'(A;;CR;;;WD)'),
739         )
740
741         # Create a user account with the assigned policy.
742         client_creds = self._get_creds(account_type=self.AccountType.USER,
743                                        assigned_policy=policy)
744
745         # Show that authentication fails unless the account belongs to the
746         # enforced silo.
747
748         armor_tgt = self.get_tgt(self._mach_creds)
749         self._get_tgt(client_creds, armor_tgt=armor_tgt,
750                       expected_error=KDC_ERR_POLICY)
751
752         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
753         self._get_tgt(client_creds, armor_tgt=armor_tgt,
754                       expected_error=KDC_ERR_POLICY)
755
756         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
757         self._get_tgt(client_creds, armor_tgt=armor_tgt,
758                       expected_error=0)
759
760     def test_allowed_from_claim_equals_claim(self):
761         # Create a couple of claims.
762
763         claim0_id = self.get_new_username()
764         self.create_claim(claim0_id,
765                           enabled=True,
766                           attribute='carLicense',
767                           single_valued=True,
768                           source_type='AD',
769                           for_classes=['computer'],
770                           value_type=claims.CLAIM_TYPE_STRING)
771
772         claim1_id = self.get_new_username()
773         self.create_claim(claim1_id,
774                           enabled=True,
775                           attribute='comment',
776                           single_valued=True,
777                           source_type='AD',
778                           for_classes=['computer'],
779                           value_type=claims.CLAIM_TYPE_STRING)
780
781         # Create an authentication policy that allows accounts having the two
782         # claims be equal.
783         policy = self.create_authn_policy(
784             enforced=True,
785             user_allowed_from=(
786                 f'O:SYD:(XA;;CR;;;WD;'
787                 f'(@User.{claim0_id} == @User.{claim1_id}))'),
788         )
789
790         # Create a user account with the assigned policy.
791         client_creds = self._get_creds(account_type=self.AccountType.USER,
792                                        assigned_policy=policy)
793
794         armor_tgt = self.get_tgt(self._mach_creds)
795         self._get_tgt(client_creds, armor_tgt=armor_tgt,
796                       expected_error=KDC_ERR_POLICY)
797
798         mach_creds = self.get_cached_creds(
799             account_type=self.AccountType.COMPUTER,
800             opts={
801                 'additional_details': (
802                     ('carLicense', 'foo'),
803                     ('comment', 'foo'),
804                 ),
805             })
806         armor_tgt = self.get_tgt(
807             mach_creds,
808             expect_client_claims=True,
809             expected_client_claims={
810                 claim0_id: {
811                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
812                     'type': claims.CLAIM_TYPE_STRING,
813                     'values': ('foo',),
814                 },
815                 claim1_id: {
816                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
817                     'type': claims.CLAIM_TYPE_STRING,
818                     'values': ('foo',),
819                 },
820             })
821         self._get_tgt(client_creds, armor_tgt=armor_tgt,
822                       expected_error=0)
823
824     def test_allowed_to_client_equals(self):
825         client_claim_attr = 'carLicense'
826         client_claim_value = 'foo bar'
827         client_claim_values = client_claim_value,
828
829         client_claim_id = self.get_new_username()
830         self.create_claim(client_claim_id,
831                           enabled=True,
832                           attribute=client_claim_attr,
833                           single_valued=True,
834                           source_type='AD',
835                           for_classes=['user'],
836                           value_type=claims.CLAIM_TYPE_STRING)
837
838         # Create an authentication policy that allows authorization if the
839         # client has a particular claim value.
840         policy = self.create_authn_policy(
841             enforced=True,
842             computer_allowed_to=(
843                 f'O:SYD:(XA;;CR;;;WD;'
844                 f'((@User.{client_claim_id} == "{client_claim_value}")))'),
845         )
846
847         # Create a computer account with the assigned policy.
848         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
849                                        assigned_policy=policy)
850
851         armor_tgt = self.get_tgt(self._mach_creds)
852
853         # Create a user account without the claim value.
854         client_creds = self.get_cached_creds(
855             account_type=self.AccountType.USER)
856         tgt = self.get_tgt(client_creds)
857         # Show that obtaining a service ticket is denied.
858         self._tgs_req(
859             tgt, KDC_ERR_POLICY, client_creds, target_creds,
860             armor_tgt=armor_tgt,
861             expect_edata=self.expect_padata_outer,
862             # We aren’t particular about whether or not we get an NTSTATUS.
863             expect_status=None,
864             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
865             check_patypes=False)
866
867         # Create a user account with the claim value.
868         client_creds = self.get_cached_creds(
869             account_type=self.AccountType.USER,
870             opts={
871                 'additional_details': (
872                     (client_claim_attr, client_claim_values),
873                 ),
874             })
875         tgt = self.get_tgt(
876             client_creds,
877             expect_client_claims=True,
878             expected_client_claims={
879                 client_claim_id: {
880                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
881                     'type': claims.CLAIM_TYPE_STRING,
882                     'values': client_claim_values,
883                 },
884             })
885         # Show that obtaining a service ticket is allowed.
886         self._tgs_req(tgt, 0, client_creds, target_creds,
887                       armor_tgt=armor_tgt)
888
889     def test_allowed_to_device_equals(self):
890         device_claim_attr = 'carLicense'
891         device_claim_value = 'bar'
892         device_claim_values = device_claim_value,
893
894         device_claim_id = self.get_new_username()
895         self.create_claim(device_claim_id,
896                           enabled=True,
897                           attribute=device_claim_attr,
898                           single_valued=True,
899                           source_type='AD',
900                           for_classes=['computer'],
901                           value_type=claims.CLAIM_TYPE_STRING)
902
903         # Create a user account.
904         client_creds = self.get_cached_creds(
905             account_type=self.AccountType.USER)
906         tgt = self.get_tgt(client_creds)
907
908         # Create an authentication policy that allows authorization if the
909         # device has a particular claim value.
910         policy = self.create_authn_policy(
911             enforced=True,
912             computer_allowed_to=(
913                 f'O:SYD:(XA;;CR;;;WD;'
914                 f'(@Device.{device_claim_id} == "{device_claim_value}"))'),
915         )
916
917         # Create a computer account with the assigned policy.
918         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
919                                        assigned_policy=policy)
920
921         armor_tgt = self.get_tgt(self._mach_creds)
922         # Show that obtaining a service ticket is denied when the claim value
923         # is not present.
924         self._tgs_req(
925             tgt, KDC_ERR_POLICY, client_creds, target_creds,
926             armor_tgt=armor_tgt,
927             expect_edata=self.expect_padata_outer,
928             # We aren’t particular about whether or not we get an NTSTATUS.
929             expect_status=None,
930             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
931             check_patypes=False)
932
933         mach_creds = self.get_cached_creds(
934             account_type=self.AccountType.COMPUTER,
935             opts={
936                 'additional_details': (
937                     (device_claim_attr, device_claim_values),
938                 ),
939             })
940         armor_tgt = self.get_tgt(
941             mach_creds,
942             expect_client_claims=True,
943             expected_client_claims={
944                 device_claim_id: {
945                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
946                     'type': claims.CLAIM_TYPE_STRING,
947                     'values': device_claim_values,
948                 },
949             })
950         # Show that obtaining a service ticket is allowed when the claim value
951         # is present.
952         self._tgs_req(tgt, 0, client_creds, target_creds,
953                       armor_tgt=armor_tgt)
954
955     claim_against_claim_cases = [
956         # If either side is missing, the result is unknown.
957         ((), '==', (), None),
958         ((), '!=', (), None),
959         ('a', '==', (), None),
960         ((), '==', 'b', None),
961         # Straightforward equality and inequality checks work.
962         ('foo', '==', 'foo', True),
963         ('foo', '==', 'bar', False),
964         ('foo', '!=', 'foo', False),
965         ('foo', '!=', 'bar', True),
966         # We can perform less‐than and greater‐than operations.
967         ('cat', '<', 'dog', True),
968         ('cat', '<=', 'dog', True),
969         ('cat', '>', 'dog', False),
970         ('cat', '>=', 'dog', False),
971         ('foo', '<=', 'foo', True),
972         ('foo', '>=', 'foo', True),
973         ('foo', '<', 'foo bar', True),
974         ('foo bar', '>', 'foo', True),
975         # String comparison is case‐sensitive.
976         ('foo bar', '==', 'Foo BAR', True),
977         ('foo bar', '==', 'FOO BAR', True),
978         ('ćàț', '==', 'ĆÀȚ', True),
979         ('ḽ', '==', 'Ḽ', True),
980         ('ⅸ', '==', 'Ⅸ', True),
981         ('ꙭ', '==', 'Ꙭ', True),
982         ('ⱦ', '==', 'Ⱦ', True),  # Lowercased variant added in Unicode 5.0.
983         ('ԛԣ', '==', 'ԚԢ', True),  # All added in Unicode 5.1.
984         ('foo', '<', 'foo', True),
985         ('ćàș', '<', 'ĆÀȚ', True),
986         ('cat', '<', 'ćàț', True),
987         # This is done by converting to UPPER CASE. Hence, both ‘A’ (U+41) and
988         # ‘a’ (U+61) compare less than ‘_’ (U+5F).
989         ('A', '<', '_', True),
990         ('a', '<', '_', True),
991         # But not all uppercased/lowercased pairs are considered to be equal in
992         # this way.
993         ('ß', '<', 'ẞ', True),
994         ('ß', '>', 'SS', True),
995         ('ⳬ', '>', 'Ⳬ', True),  # Added in Unicode 5.2.
996         ('ʞ', '<', 'Ʞ', True),  # Uppercased variant added in Unicode 6.0.
997         ('ʞ', '<', 'ʟ', True),  # U+029E < U+029F < U+A7B0 (upper variant, Ʞ)
998         ('ꞧ', '>', 'Ꞧ', True),  # Added in Unicode 6.0.
999         ('ɜ', '<', 'Ɜ', True),  # Uppercased variant added in Unicode 7.0.
1000         #
1001         # Strings are compared as UTF‐16 code units, rather than as Unicode
1002         # codepoints. So while you might expect ‘𐀀’ (U+10000) to compare
1003         # greater than ‘豈’ (U+F900), it is actually considered to be the
1004         # *smaller* of the pair. That is because it is encoded as a sequence of
1005         # two code units, 0xd800 and 0xdc00, which combination compares less
1006         # than the single code unit 0xf900.
1007         ('ퟻ', '<', '𐀀', True),
1008         ('𐀀', '<', '豈', True),
1009         ('ퟻ', '<', '豈', True),
1010         # Composites can be compared.
1011         (('foo', 'bar'), '==', ('foo', 'bar'), True),
1012         (('foo', 'bar'), '==', ('foo', 'baz'), False),
1013         # The individual components don’t have to match in case.
1014         (('foo', 'bar'), '==', ('FOO', 'BAR'), True),
1015         # Nor must they match in order.
1016         (('foo', 'bar'), '==', ('bar', 'foo'), True),
1017         # Composites of different lengths compare unequal.
1018         (('foo', 'bar'), '!=', 'foo', True),
1019         (('foo', 'bar'), '!=', ('foo', 'bar', 'baz'), True),
1020         # But composites don’t have a defined ordering, and aren’t considered
1021         # greater or lesser than one another.
1022         (('foo', 'bar'), '<', ('foo', 'bar'), None),
1023         (('foo', 'bar'), '<=', ('foo', 'bar'), None),
1024         (('foo', 'bar'), '>', ('foo', 'bar', 'baz'), None),
1025         (('foo', 'bar'), '>=', ('foo', 'bar', 'baz'), None),
1026         # We can test for containment.
1027         (('foo', 'bar'), 'Contains', ('FOO'), True),
1028         (('foo', 'bar'), 'Contains', ('foo', 'bar'), True),
1029         (('foo', 'bar'), 'Not_Contains', ('foo', 'bar'), False),
1030         (('foo', 'bar'), 'Contains', ('foo', 'bar', 'baz'), False),
1031         (('foo', 'bar'), 'Not_Contains', ('foo', 'bar', 'baz'), True),
1032         # We can test whether the operands have any elements in common.
1033         ('foo', 'Any_of', 'foo', True),
1034         (('foo', 'bar'), 'Any_of', 'BAR', True),
1035         (('foo', 'bar'), 'Any_of', 'baz', False),
1036         (('foo', 'bar'), 'Not_Any_of', 'baz', True),
1037         (('foo', 'bar'), 'Any_of', ('bar', 'baz'), True),
1038         (('foo', 'bar'), 'Not_Any_of', ('bar', 'baz'), False),
1039     ]
1040
1041     claim_against_literal_cases = [
1042         # String comparisons also work against literals.
1043         ('foo bar', '==', '"foo bar"', True),
1044         # Composites can be compared with literals.
1045         ('bar', '==', '{{"bar"}}', True),
1046         (('apple', 'banana'), '==', '{{"APPLE", "BANANA"}}', True),
1047         (('apple', 'banana'), '==', '{{"BANANA", "APPLE"}}', True),
1048         (('apple', 'banana'), '==', '{{"apple", "banana", "apple"}}', False),
1049         # We can test for containment.
1050         ((), 'Contains', '{{"foo"}}', None),
1051         ((), 'Not_Contains', '{{"foo", "bar"}}', None),
1052         ('bar', 'Contains', '{{"bar"}}', True),
1053         (('foo', 'bar'), 'Contains', '{{"foo", "bar"}}', True),
1054         (('foo', 'bar'), 'Contains', '{{"foo", "bar", "baz"}}', False),
1055         # The right‐hand side of Contains or Not_Contains does not have to be a
1056         # composite.
1057         ('foo', 'Contains', '"foo"', True),
1058         (('foo', 'bar'), 'Not_Contains', '"foo"', False),
1059         # It’s fine if the right‐hand side contains duplicate elements.
1060         (('foo', 'bar'), 'Contains', '{{"foo", "bar", "bar"}}', True),
1061         # We can test whether the operands have any elements in common.
1062         ('bar', 'Any_of', '{{"bar"}}', True),
1063         (('foo', 'bar'), 'Any_of', '{{"bar", "baz"}}', True),
1064         (('foo', 'bar'), 'Any_of', '{{"baz"}}', False),
1065         # The right‐hand side of Any_of or Not_Any_of must be a composite.
1066         ('foo', 'Any_of', '"foo"', None),
1067         (('foo', 'bar'), 'Not_Any_of', '"baz"', None),
1068         # A string won’t compare equal to a numeric literal.
1069         ('42', '==', '"42"', True),
1070         ('42', '==', '42', None),
1071         # Nor can composites that mismatch in type be compared.
1072         (('123', '456'), '==', '{{"123", "456"}}', True),
1073         (('654', '321'), '==', '{{654, 321}}', None),
1074         (('foo', 'bar'), 'Contains', '{{1, 2, 3}}', None),
1075     ]
1076
1077     ##########################################################################################
1078
1079     def _test_cmp_with_args(self, lhs, op, rhs, outcome, rhs_is_literal=False):
1080         # Construct a conditional ACE expression that evaluates to True if the
1081         # two claim values are equal.
1082         if rhs_is_literal:
1083             self.assertIsInstance(rhs, str)
1084             rhs = rhs.format(self=self)
1085             expression = f'(@User.{self.claim0_id} {op} {rhs})'
1086         else:
1087             expression = f'(@User.{self.claim0_id} {op} @User.{self.claim1_id})'
1088
1089         # Create an authentication policy that will allow authentication when
1090         # the expression is true, and a second that will deny authentication in
1091         # the same circumstance. By observing the results of authenticating
1092         # against each of these policies in turn, we can determine whether the
1093         # expression evaluates to a True, False, or Unknown value.
1094
1095         allowed_sddl = f'O:SYD:(XA;;CR;;;WD;{expression})'
1096         denied_sddl = f'O:SYD:(XD;;CR;;;WD;{expression})(A;;CR;;;WD)'
1097
1098         allowed_policy = self.create_authn_policy(
1099             enforced=True,
1100             user_allowed_from=allowed_sddl)
1101         denied_policy = self.create_authn_policy(
1102             enforced=True,
1103             user_allowed_from=denied_sddl)
1104
1105         # Create a user account assigned to each policy.
1106         allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1107                                         assigned_policy=allowed_policy)
1108         denied_creds = self._get_creds(account_type=self.AccountType.USER,
1109                                        assigned_policy=denied_policy)
1110
1111         additional_details = ()
1112         if lhs:
1113             additional_details += ((self.claim0_attr, lhs),)
1114         if rhs and not rhs_is_literal:
1115             additional_details += ((self.claim1_attr, rhs),)
1116
1117         # Create a computer account with the provided attribute values.
1118         mach_creds = self.get_cached_creds(
1119             account_type=self.AccountType.COMPUTER,
1120             opts={'additional_details': additional_details})
1121
1122         def expected_values(val):
1123             if isinstance(val, (str, bytes)):
1124                 return val,
1125
1126             return val
1127
1128         expected_client_claims = {}
1129         if lhs:
1130             expected_client_claims[self.claim0_id] = {
1131                 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1132                 'type': claims.CLAIM_TYPE_STRING,
1133                 'values': expected_values(lhs),
1134             }
1135         if rhs and not rhs_is_literal:
1136             expected_client_claims[self.claim1_id] = {
1137                 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1138                 'type': claims.CLAIM_TYPE_STRING,
1139                 'values': expected_values(rhs),
1140             }
1141
1142         # Fetch the computer account’s TGT, and ensure it contains the claims.
1143         armor_tgt = self.get_tgt(
1144             mach_creds,
1145             expect_client_claims=bool(expected_client_claims) or None,
1146             expected_client_claims=expected_client_claims)
1147
1148         # The first or the second authentication request is expected to succeed
1149         # if the outcome is True or False, respectively. An Unknown outcome,
1150         # represented by None, will result in a policy error in either case.
1151         allowed_error = 0 if outcome is True else KDC_ERR_POLICY
1152         denied_error = 0 if outcome is False else KDC_ERR_POLICY
1153
1154         # Attempt to authenticate and ensure that we observe the expected
1155         # results.
1156         self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1157                       expected_error=allowed_error)
1158         self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1159                       expected_error=denied_error)
1160
1161     pac_claim_cases = [
1162         # Test a very simple expression with various claims.
1163         ([
1164             (claims.CLAIMS_SOURCE_TYPE_AD, [
1165                 ('{non_empty_string}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1166             ]),
1167         ], '{non_empty_string}', True),
1168         ([
1169             (claims.CLAIMS_SOURCE_TYPE_AD, [
1170                 ('{zero_uint}', claims.CLAIM_TYPE_UINT64, [0]),
1171             ]),
1172         ], '{zero_uint}', False),
1173         ([
1174             (claims.CLAIMS_SOURCE_TYPE_AD, [
1175                 ('{nonzero_uint}', claims.CLAIM_TYPE_UINT64, [1]),
1176             ]),
1177         ], '{nonzero_uint}', True),
1178         ([
1179             (claims.CLAIMS_SOURCE_TYPE_AD, [
1180                 ('{zero_uints}', claims.CLAIM_TYPE_UINT64, [0, 0]),
1181             ]),
1182         ], '{zero_uints}', KDC_ERR_GENERIC),
1183         ([
1184             (claims.CLAIMS_SOURCE_TYPE_AD, [
1185                 ('{zero_and_one_uint}', claims.CLAIM_TYPE_UINT64, [0, 1]),
1186             ]),
1187         ], '{zero_and_one_uint}', True),
1188         ([
1189             (claims.CLAIMS_SOURCE_TYPE_AD, [
1190                 ('{one_and_zero_uint}', claims.CLAIM_TYPE_UINT64, [1, 0]),
1191             ]),
1192         ], '{one_and_zero_uint}', True),
1193         ([
1194             (claims.CLAIMS_SOURCE_TYPE_AD, [
1195                 ('{zero_int}', claims.CLAIM_TYPE_INT64, [0]),
1196             ]),
1197         ], '{zero_int}', False),
1198         ([
1199             (claims.CLAIMS_SOURCE_TYPE_AD, [
1200                 ('{nonzero_int}', claims.CLAIM_TYPE_INT64, [1]),
1201             ]),
1202         ], '{nonzero_int}', True),
1203         ([
1204             (claims.CLAIMS_SOURCE_TYPE_AD, [
1205                 ('{zero_ints}', claims.CLAIM_TYPE_INT64, [0, 0]),
1206             ]),
1207         ], '{zero_ints}', KDC_ERR_GENERIC),
1208         ([
1209             (claims.CLAIMS_SOURCE_TYPE_AD, [
1210                 ('{zero_and_one_int}', claims.CLAIM_TYPE_INT64, [0, 1]),
1211             ]),
1212         ], '{zero_and_one_int}', True),
1213         ([
1214             (claims.CLAIMS_SOURCE_TYPE_AD, [
1215                 ('{one_and_zero_int}', claims.CLAIM_TYPE_INT64, [1, 0]),
1216             ]),
1217         ], '{one_and_zero_int}', True),
1218         ([
1219             (claims.CLAIMS_SOURCE_TYPE_AD, [
1220                 ('{false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1221             ]),
1222         ], '{false_boolean}', False),
1223         ([
1224             (claims.CLAIMS_SOURCE_TYPE_AD, [
1225                 ('{true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1226             ]),
1227         ], '{true_boolean}', True),
1228         ([
1229             (claims.CLAIMS_SOURCE_TYPE_AD, [
1230                 ('{false_booleans}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1231             ]),
1232         ], '{false_booleans}', KDC_ERR_GENERIC),
1233         ([
1234             (claims.CLAIMS_SOURCE_TYPE_AD, [
1235                 ('{false_and_true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0, 1]),
1236             ]),
1237         ], '{false_and_true_boolean}', True),
1238         ([
1239             (claims.CLAIMS_SOURCE_TYPE_AD, [
1240                 ('{true_and_false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1, 0]),
1241             ]),
1242         ], '{true_and_false_boolean}', True),
1243         # Test a basic comparison against a literal.
1244         ([
1245             (claims.CLAIMS_SOURCE_TYPE_AD, [
1246                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1247             ]),
1248         ], '{a} == "foo bar"', True),
1249         # Claims can be compared against one another.
1250         ([
1251             (claims.CLAIMS_SOURCE_TYPE_AD, [
1252                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1253                 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO BAR']),
1254             ]),
1255         ], '{a} == {b}', True),
1256         ([
1257             (claims.CLAIMS_SOURCE_TYPE_AD, [
1258                 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO', 'BAR', 'BAZ']),
1259                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'bar', 'baz']),
1260             ]),
1261         ], '{a} != {b}', False),
1262         # Certificate claims are also valid.
1263         ([
1264             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1265                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1266             ]),
1267         ], '{a} == "foo"', True),
1268         # Other claim source types are ignored.
1269         ([
1270             (0, [
1271                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1272             ]),
1273         ], '{a} == "foo"', None),
1274         ([
1275             (3, [
1276                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1277             ]),
1278         ], '{a} == "foo"', None),
1279         # If multiple claims have the same ID, the *last* one takes precedence.
1280         ([
1281             (claims.CLAIMS_SOURCE_TYPE_AD, [
1282                 ('{a}', claims.CLAIM_TYPE_STRING, ['this is not the value…']),
1283                 ('{a}', claims.CLAIM_TYPE_STRING, ['…nor is this…']),
1284             ]),
1285             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1286                 ('{a}', claims.CLAIM_TYPE_STRING, ['…and this isn’t either.']),
1287             ]),
1288             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1289                 ('{a}', claims.CLAIM_TYPE_STRING, ['here’s the actual value!']),
1290             ]),
1291             (3, [
1292                 ('{a}', claims.CLAIM_TYPE_STRING, ['this is a red herring.']),
1293             ]),
1294         ], '{a} == "here’s the actual value!"', True),
1295         # Claim values can be empty.
1296         ([
1297             (claims.CLAIMS_SOURCE_TYPE_AD, [
1298                 ('{empty_claim_string}', claims.CLAIM_TYPE_STRING, []),
1299             ]),
1300         ], '{empty_claim_string} != "foo bar"', None),
1301         ([
1302             (claims.CLAIMS_SOURCE_TYPE_AD, [
1303                 ('{empty_claim_boolean}', claims.CLAIM_TYPE_BOOLEAN, []),
1304             ]),
1305         ], 'Exists {empty_claim_boolean}', None),
1306         # Test unsigned integer equality.
1307         ([
1308             (claims.CLAIMS_SOURCE_TYPE_AD, [
1309                 ('{a}', claims.CLAIM_TYPE_UINT64, [42]),
1310             ]),
1311         ], '{a} == 42', True),
1312         ([
1313             (claims.CLAIMS_SOURCE_TYPE_AD, [
1314                 ('{a}', claims.CLAIM_TYPE_UINT64, [0]),
1315             ]),
1316         ], '{a} == 3', False),
1317         ([
1318             (claims.CLAIMS_SOURCE_TYPE_AD, [
1319                 ('{a}', claims.CLAIM_TYPE_UINT64, [1, 2, 3]),
1320             ]),
1321         ], '{a} == {{1, 2, 3}}', True),
1322         ([
1323             (claims.CLAIMS_SOURCE_TYPE_AD, [
1324                 ('{a}', claims.CLAIM_TYPE_UINT64, [4, 5, 6]),
1325             ]),
1326         ], '{a} != {{1, 2, 3}}', True),
1327         # Test unsigned integer comparison. Ensure we don’t run into any
1328         # integer overflow issues.
1329         ([
1330             (claims.CLAIMS_SOURCE_TYPE_AD, [
1331                 ('{a}', claims.CLAIM_TYPE_UINT64, [1 << 32]),
1332             ]),
1333         ], '{a} > 0', True),
1334         # Test signed integer comparisons.
1335         ([
1336             (claims.CLAIMS_SOURCE_TYPE_AD, [
1337                 ('{a}', claims.CLAIM_TYPE_INT64, [42]),
1338             ]),
1339         ], '{a} == 42', True),
1340         ([
1341             (claims.CLAIMS_SOURCE_TYPE_AD, [
1342                 ('{a}', claims.CLAIM_TYPE_INT64, [42 << 32]),
1343             ]),
1344         ], f'{{a}} == {42 << 32}', True),
1345         # Test boolean claims. Be careful! Windows will *crash* if you send it
1346         # claims that aren’t real booleans (not 0 or 1). I doubt Microsoft will
1347         # consider this a security issue though.
1348         ([
1349             (claims.CLAIMS_SOURCE_TYPE_AD, [
1350                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [2]),
1351                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [3]),
1352             ]),
1353         ], '{a} == {b}', CRASHES_WINDOWS),
1354         ([
1355             (claims.CLAIMS_SOURCE_TYPE_AD, [
1356                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1357                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1358             ]),
1359         ], '{a} == {b}', True),
1360         ([
1361             (claims.CLAIMS_SOURCE_TYPE_AD, [
1362                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1363             ]),
1364         ], '{a} == 42', None),
1365         ([
1366             (claims.CLAIMS_SOURCE_TYPE_AD, [
1367                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1368                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1369             ]),
1370         ], '{a} && {b}', True),
1371         ([
1372             (claims.CLAIMS_SOURCE_TYPE_AD, [
1373                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1374                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1375             ]),
1376         ], '{a} && {b}', False),
1377         ([
1378             (claims.CLAIMS_SOURCE_TYPE_AD, [
1379                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1380                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1381             ]),
1382         ], '{a} && {b}', False),
1383         ([
1384             (claims.CLAIMS_SOURCE_TYPE_AD, [
1385                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1386                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1387             ]),
1388         ], '{a} || {b}', True),
1389         ([
1390             (claims.CLAIMS_SOURCE_TYPE_AD, [
1391                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1392                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1393             ]),
1394         ], '{a} || {b}', True),
1395         ([
1396             (claims.CLAIMS_SOURCE_TYPE_AD, [
1397                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1398                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1399             ]),
1400         ], '{a} || {b}', False),
1401         ([
1402             (claims.CLAIMS_SOURCE_TYPE_AD, [
1403                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1404             ]),
1405         ], '!({a})', True),
1406         ([
1407             (claims.CLAIMS_SOURCE_TYPE_AD, [
1408                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1409             ]),
1410         ], '!(!(!(!({a}))))', False),
1411         ([
1412             (claims.CLAIMS_SOURCE_TYPE_AD, [
1413                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1414             ]),
1415         ], '!({a} && {a})', True),
1416         ([
1417             (claims.CLAIMS_SOURCE_TYPE_AD, [
1418                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1419                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1420             ]),
1421         ], '{a} && !({b} || {b})', True),
1422         ([
1423             (claims.CLAIMS_SOURCE_TYPE_AD, [
1424                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1425             ]),
1426         ], '!({a}) || !({a})', True),
1427         ([
1428             (claims.CLAIMS_SOURCE_TYPE_AD, [
1429                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1430                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1431             ]),
1432         ], '{a} && !({b})', None),
1433         # Expressions containing the ‘not’ operator are occasionally evaluated
1434         # inconsistently, as evidenced here. ‘a || !a’ evaluates to ‘unknown’…
1435         ([
1436             (claims.CLAIMS_SOURCE_TYPE_AD, [
1437                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1438             ]),
1439         ], '{a} || !({a})', None),
1440         # …but ‘!a || a’ — the same expression, just with the operands switched
1441         # round — evaluates to ‘true’.
1442         ([
1443             (claims.CLAIMS_SOURCE_TYPE_AD, [
1444                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1445             ]),
1446         ], '!({a}) || {a}', True),
1447         # This inconsistency is not observed with other boolean expressions,
1448         # such as ‘a || a’.
1449         ([
1450             (claims.CLAIMS_SOURCE_TYPE_AD, [
1451                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1452             ]),
1453         ], '{a} || ({a} || {a})', True),
1454         ([
1455             (claims.CLAIMS_SOURCE_TYPE_AD, [
1456                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1457             ]),
1458         ], '({b} || {b}) || {b}', True),
1459         # Test a very large claim. Much larger than this, and
1460         # conditional_ace_encode_binary() will refuse to encode the conditions.
1461         ([
1462             (claims.CLAIMS_SOURCE_TYPE_AD, [
1463                 ('{large_claim}', claims.CLAIM_TYPE_STRING, ['z' * 4900]),
1464             ]),
1465         ], f'{{large_claim}} == "{"z" * 4900}"', True),
1466         # Test an even larger claim. Windows does not appear to like receiving
1467         # a claim this large.
1468         ([
1469             (claims.CLAIMS_SOURCE_TYPE_AD, [
1470                 ('{larger_claim}', claims.CLAIM_TYPE_STRING, ['z' * 100000]),
1471             ]),
1472         ], '{larger_claim} > "z"', CRASHES_WINDOWS),
1473         # Test a great number of claims. Windows does not appear to like
1474         # receiving this many claims.
1475         ([
1476             (claims.CLAIMS_SOURCE_TYPE_AD, [
1477                 ('{many_claims}', claims.CLAIM_TYPE_UINT64,
1478                  list(range(0, 100000))),
1479             ]),
1480         ], '{many_claims} Any_of "99999"', CRASHES_WINDOWS),
1481         # Test a claim with a very long name. Much larger than this, and
1482         # conditional_ace_encode_binary() will refuse to encode the conditions.
1483         ([
1484             (claims.CLAIMS_SOURCE_TYPE_AD, [
1485                 ('{long_name}', claims.CLAIM_TYPE_STRING, ['a']),
1486             ]),
1487         ], '{long_name} == "a"', {'long_name': 'z' * 4900}, True),
1488         # Test attribute name escaping.
1489         ([
1490             (claims.CLAIMS_SOURCE_TYPE_AD, [
1491                 ('{escaped_claim}', claims.CLAIM_TYPE_STRING, ['claim value']),
1492             ]),
1493         ], '{escaped_claim} == "claim value"',
1494            {'escaped_claim': '(:foo:! /&/ :bar:!)'}, True),
1495         # Test a claim whose name consists entirely of dots.
1496         ([
1497             (claims.CLAIMS_SOURCE_TYPE_AD, [
1498                 ('{dotty_claim}', claims.CLAIM_TYPE_STRING, ['a']),
1499             ]),
1500         ], '{dotty_claim} == "a"', {'dotty_claim': '...'}, True),
1501         # Test a claim whose name consists of the first thousand non‐zero
1502         # Unicode codepoints.
1503         ([
1504             (claims.CLAIMS_SOURCE_TYPE_AD, [
1505                 ('{1000_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1506             ]),
1507         ], '{1000_unicode} == "a"',
1508            {'1000_unicode': ''.join(map(chr, range(1, 1001)))}, True),
1509         # Test a claim whose name consists of some higher Unicode codepoints,
1510         # including non‐BMP ones.
1511         ([
1512             (claims.CLAIMS_SOURCE_TYPE_AD, [
1513                 ('{higher_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1514             ]),
1515         ], '{higher_unicode} == "a"',
1516            {'higher_unicode': ''.join(map(chr, range(0xfe00, 0x10800)))}, True),
1517         # Duplicate claim values are not allowed…
1518         ([
1519             (claims.CLAIMS_SOURCE_TYPE_AD, [
1520                 ('{a}', claims.CLAIM_TYPE_INT64, [42, 42, 42]),
1521             ]),
1522         ], '{a} == {a}', KDC_ERR_GENERIC),
1523         ([
1524             (claims.CLAIMS_SOURCE_TYPE_AD, [
1525                 ('{a}', claims.CLAIM_TYPE_UINT64, [42, 42]),
1526             ]),
1527         ], '{a} == {a}', KDC_ERR_GENERIC),
1528         ([
1529             (claims.CLAIMS_SOURCE_TYPE_AD, [
1530                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'foo']),
1531             ]),
1532         ], '{a} == {a}', KDC_ERR_GENERIC),
1533         ([
1534             (claims.CLAIMS_SOURCE_TYPE_AD, [
1535                 ('{a}', claims.CLAIM_TYPE_STRING, ['FOO', 'foo']),
1536             ]),
1537         ], '{a} == {a}', KDC_ERR_GENERIC),
1538         ([
1539             (claims.CLAIMS_SOURCE_TYPE_AD, [
1540                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1541             ]),
1542         ], '{a} == {a}', KDC_ERR_GENERIC),
1543         # …but it’s OK if duplicate values are spread across multiple claim
1544         # entries.
1545         ([
1546             (claims.CLAIMS_SOURCE_TYPE_AD, [
1547                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1548                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1549             ]),
1550             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1551                 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1552                 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1553             ]),
1554             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1555                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1556                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1557                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1558                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1559             ]),
1560         ], '{dup} == {dup}', True),
1561         # Test invalid claim types. Be careful! Windows will *crash* if you
1562         # send it invalid claim types. I doubt Microsoft will consider this a
1563         # security issue though.
1564         ([
1565             (claims.CLAIMS_SOURCE_TYPE_AD, [
1566                 ('{invalid_sid}', 5, []),
1567             ]),
1568         ], '{invalid_sid} == {invalid_sid}', CRASHES_WINDOWS),
1569         ([
1570             (claims.CLAIMS_SOURCE_TYPE_AD, [
1571                 ('{invalid_octet_string}', 16, []),
1572             ]),
1573         ], '{invalid_octet_string} == {invalid_octet_string}', CRASHES_WINDOWS),
1574         # Sending an empty string will crash Windows.
1575         ([
1576             (claims.CLAIMS_SOURCE_TYPE_AD, [
1577                 ('{empty_string}', claims.CLAIM_TYPE_STRING, ['']),
1578             ]),
1579         ], '{empty_string}', CRASHES_WINDOWS),
1580         # But sending empty arrays is OK.
1581         ([
1582             (claims.CLAIMS_SOURCE_TYPE_AD, [
1583                 ('{empty_array}', claims.CLAIM_TYPE_INT64, []),
1584                 ('{empty_array}', claims.CLAIM_TYPE_UINT64, []),
1585                 ('{empty_array}', claims.CLAIM_TYPE_BOOLEAN, []),
1586                 ('{empty_array}', claims.CLAIM_TYPE_STRING, []),
1587             ]),
1588         ], '{empty_array}', None),
1589     ]
1590
1591     def _test_pac_claim_cmp_with_args(self,
1592                                       pac_claims,
1593                                       expression,
1594                                       claim_map,
1595                                       outcome):
1596         self.assertIsInstance(expression, str)
1597
1598         if outcome is CRASHES_WINDOWS and not self.crash_windows:
1599             self.skipTest('test crashes Windows servers')
1600
1601         if claim_map is None:
1602             claim_map = {}
1603
1604         claim_ids = {}
1605
1606         def get_claim_id(claim_name):
1607             claim = claim_ids.get(claim_name)
1608             if claim is None:
1609                 claim = claim_map.pop(claim_name, None)
1610                 if claim is None:
1611                     claim = self.get_new_username()
1612
1613                 claim_ids[claim_name] = claim
1614
1615             return claim
1616
1617         def formatted_claim_expression(expr):
1618             formatter = Formatter()
1619             result = []
1620
1621             for literal_text, field_name, format_spec, conversion in (
1622                     formatter.parse(expr)):
1623                 self.assertFalse(format_spec,
1624                                  f'format specifier ({format_spec}) should '
1625                                  f'not be specified')
1626                 self.assertFalse(conversion,
1627                                  f'conversion ({conversion}) should not be '
1628                                  'specified')
1629
1630                 result.append(literal_text)
1631
1632                 if field_name is not None:
1633                     self.assertTrue(field_name,
1634                                     'a field name should be specified')
1635
1636                     claim_id = get_claim_id(field_name)
1637                     claim_id = self.escaped_claim_id(claim_id)
1638                     result.append(f'@User.{claim_id}')
1639
1640             return ''.join(result)
1641
1642         # Construct the conditional ACE expression.
1643         expression = formatted_claim_expression(expression)
1644
1645         self.assertFalse(claim_map, 'unused claim mapping(s) remain')
1646
1647         # Create an authentication policy that will allow authentication when
1648         # the expression is true, and a second that will deny authentication in
1649         # the same circumstance. By observing the results of authenticating
1650         # against each of these policies in turn, we can determine whether the
1651         # expression evaluates to a True, False, or Unknown value.
1652
1653         allowed_sddl = f'O:SYD:(XA;;CR;;;WD;({expression}))'
1654         denied_sddl = f'O:SYD:(XD;;CR;;;WD;({expression}))(A;;CR;;;WD)'
1655
1656         allowed_policy = self.create_authn_policy(
1657             enforced=True,
1658             user_allowed_from=allowed_sddl)
1659         denied_policy = self.create_authn_policy(
1660             enforced=True,
1661             user_allowed_from=denied_sddl)
1662
1663         # Create a user account assigned to each policy.
1664         allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1665                                         assigned_policy=allowed_policy)
1666         denied_creds = self._get_creds(account_type=self.AccountType.USER,
1667                                        assigned_policy=denied_policy)
1668
1669         # Create a computer account.
1670         mach_creds = self.get_cached_creds(
1671             account_type=self.AccountType.COMPUTER)
1672
1673         def expected_values(val):
1674             if isinstance(val, (str, bytes)):
1675                 return val,
1676
1677             return val
1678
1679         # Fetch the computer account’s TGT.
1680         armor_tgt = self.get_tgt(mach_creds)
1681
1682         if pac_claims:
1683             # Replace the claims in the PAC with our own.
1684             armor_tgt = self.modified_ticket(
1685                 armor_tgt,
1686                 modify_pac_fn=partial(self.set_pac_claims,
1687                                       client_claims=pac_claims,
1688                                       claim_ids=claim_ids),
1689                 checksum_keys=self.get_krbtgt_checksum_key())
1690
1691         # The first or the second authentication request is expected to succeed
1692         # if the outcome is True or False, respectively. An Unknown outcome,
1693         # represented by None, will result in a policy error in either case.
1694         if outcome is True:
1695             allowed_error, denied_error = 0, KDC_ERR_POLICY
1696         elif outcome is False:
1697             allowed_error, denied_error = KDC_ERR_POLICY, 0
1698         elif outcome is None:
1699             allowed_error, denied_error = KDC_ERR_POLICY, KDC_ERR_POLICY
1700         else:
1701             allowed_error, denied_error = outcome, outcome
1702
1703         # Attempt to authenticate and ensure that we observe the expected
1704         # results.
1705         self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1706                       expected_error=allowed_error)
1707         self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1708                       expected_error=denied_error)
1709
1710     def test_rbcd_without_aa_asserted_identity(self):
1711         service_sids = {
1712             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1713             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1714         }
1715
1716         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1717                    service_sids=service_sids,
1718                    code=KDC_ERR_BADOPTION,
1719                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1720                    edata=self.expect_padata_outer)
1721
1722         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1723                    service_sids=service_sids,
1724                    code=KDC_ERR_POLICY,
1725                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1726                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1727                    reason=AuditReason.ACCESS_DENIED,
1728                    edata=self.expect_padata_outer)
1729
1730     def test_rbcd_with_aa_asserted_identity(self):
1731         service_sids = {
1732             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1733             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1734             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1735         }
1736
1737         expected_groups = service_sids | {
1738             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1739         }
1740
1741         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1742                    service_sids=service_sids,
1743                    expected_groups=expected_groups)
1744
1745         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1746                    service_sids=service_sids,
1747                    expected_groups=expected_groups)
1748
1749     def test_rbcd_without_service_asserted_identity(self):
1750         service_sids = {
1751             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1752             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1753         }
1754
1755         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1756                    service_sids=service_sids,
1757                    code=KDC_ERR_BADOPTION,
1758                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1759                    edata=self.expect_padata_outer)
1760
1761         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1762                    service_sids=service_sids,
1763                    code=KDC_ERR_POLICY,
1764                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1765                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1766                    reason=AuditReason.ACCESS_DENIED,
1767                    edata=self.expect_padata_outer)
1768
1769     def test_rbcd_with_service_asserted_identity(self):
1770         service_sids = {
1771             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1772             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1773             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1774         }
1775
1776         expected_groups = {
1777             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1778             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1779             # The Application Authority Asserted Identity SID has replaced the
1780             # Service Asserted Identity SID.
1781             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1782             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1783         }
1784
1785         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1786                    service_sids=service_sids,
1787                    expected_groups=expected_groups)
1788
1789         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1790                    service_sids=service_sids,
1791                    expected_groups=expected_groups)
1792
1793     def test_rbcd_without_claims_valid(self):
1794         service_sids = {
1795             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1796             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1797         }
1798
1799         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1800                    service_sids=service_sids,
1801                    code=KDC_ERR_BADOPTION,
1802                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1803                    edata=self.expect_padata_outer)
1804
1805         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1806                    service_sids=service_sids,
1807                    code=KDC_ERR_POLICY,
1808                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1809                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1810                    reason=AuditReason.ACCESS_DENIED,
1811                    edata=self.expect_padata_outer)
1812
1813     def test_rbcd_with_claims_valid(self):
1814         service_sids = {
1815             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1816             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1817             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1818         }
1819
1820         expected_groups = service_sids | {
1821             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1822         }
1823
1824         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1825                    service_sids=service_sids,
1826                    expected_groups=expected_groups)
1827
1828         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1829                    service_sids=service_sids,
1830                    expected_groups=expected_groups)
1831
1832     def test_rbcd_without_compounded_authentication(self):
1833         service_sids = {
1834             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1835             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1836         }
1837
1838         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1839                    service_sids=service_sids,
1840                    code=KDC_ERR_BADOPTION,
1841                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1842                    edata=self.expect_padata_outer)
1843
1844         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1845                    service_sids=service_sids,
1846                    code=KDC_ERR_POLICY,
1847                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1848                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1849                    reason=AuditReason.ACCESS_DENIED,
1850                    edata=self.expect_padata_outer)
1851
1852     def test_rbcd_with_compounded_authentication(self):
1853         service_sids = {
1854             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1855             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1856             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1857         }
1858
1859         expected_groups = {
1860             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1861             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1862             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1863             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1864         }
1865
1866         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1867                    service_sids=service_sids,
1868                    expected_groups=expected_groups)
1869
1870         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1871                    service_sids=service_sids,
1872                    expected_groups=expected_groups)
1873
1874     def test_rbcd_client_without_aa_asserted_identity(self):
1875         client_sids = {
1876             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1877             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1878         }
1879
1880         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1881                    client_sids=client_sids)
1882
1883         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1884                    client_sids=client_sids)
1885
1886     def test_rbcd_client_with_aa_asserted_identity(self):
1887         client_sids = {
1888             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1889             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1890             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1891         }
1892
1893         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1894                    client_sids=client_sids,
1895                    expected_groups=client_sids)
1896
1897         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1898                    client_sids=client_sids,
1899                    expected_groups=client_sids)
1900
1901     def test_rbcd_client_without_service_asserted_identity(self):
1902         client_sids = {
1903             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1904             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1905         }
1906
1907         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1908                    client_sids=client_sids,
1909                    code=KDC_ERR_BADOPTION,
1910                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1911                    edata=self.expect_padata_outer)
1912
1913         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1914                    client_sids=client_sids,
1915                    code=KDC_ERR_POLICY,
1916                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1917                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1918                    reason=AuditReason.ACCESS_DENIED,
1919                    edata=self.expect_padata_outer)
1920
1921     def test_rbcd_client_with_service_asserted_identity(self):
1922         client_sids = {
1923             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1924             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1925             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1926         }
1927
1928         self._rbcd(f'Not_Member_of SID({self.service_asserted_identity})',
1929                    client_sids=client_sids,
1930                    expected_groups=client_sids)
1931
1932         self._rbcd(target_policy=f'Not_Member_of SID({self.service_asserted_identity})',
1933                    client_sids=client_sids,
1934                    expected_groups=client_sids)
1935
1936     def test_rbcd_client_without_claims_valid(self):
1937         client_sids = {
1938             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1939             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1940         }
1941
1942         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1943                    client_sids=client_sids)
1944
1945         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1946                    client_sids=client_sids)
1947
1948     def test_rbcd_client_with_claims_valid(self):
1949         client_sids = {
1950             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1951             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1952             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1953         }
1954
1955         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1956                    client_sids=client_sids,
1957                    expected_groups=client_sids)
1958
1959         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1960                    client_sids=client_sids,
1961                    expected_groups=client_sids)
1962
1963     def test_rbcd_client_without_compounded_authentication(self):
1964         client_sids = {
1965             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1966             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1967         }
1968
1969         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1970                    client_sids=client_sids,
1971                    code=KDC_ERR_BADOPTION,
1972                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1973                    edata=self.expect_padata_outer)
1974
1975         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1976                    client_sids=client_sids,
1977                    code=KDC_ERR_POLICY,
1978                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1979                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1980                    reason=AuditReason.ACCESS_DENIED,
1981                    edata=self.expect_padata_outer)
1982
1983     def test_rbcd_client_with_compounded_authentication(self):
1984         client_sids = {
1985             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1986             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1987             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1988         }
1989
1990         self._rbcd(f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1991                    client_sids=client_sids,
1992                    expected_groups=client_sids)
1993
1994         self._rbcd(target_policy=f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1995                    client_sids=client_sids,
1996                    expected_groups=client_sids)
1997
1998     def test_rbcd_device_without_aa_asserted_identity(self):
1999         device_sids = {
2000             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2001             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2002         }
2003
2004         self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2005                    device_sids=device_sids,
2006                    code=KDC_ERR_BADOPTION,
2007                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2008                    edata=self.expect_padata_outer)
2009
2010         self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2011                    device_sids=device_sids,
2012                    code=KDC_ERR_POLICY,
2013                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2014                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2015                    reason=AuditReason.ACCESS_DENIED,
2016                    edata=self.expect_padata_outer)
2017
2018     def test_rbcd_device_without_aa_asserted_identity_not_memberof(self):
2019         device_sids = {
2020             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2021             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2022         }
2023
2024         self._rbcd(f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2025                    device_sids=device_sids)
2026
2027         self._rbcd(target_policy=f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2028                    device_sids=device_sids)
2029
2030     def test_rbcd_device_with_aa_asserted_identity(self):
2031         device_sids = {
2032             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2033             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2034             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2035         }
2036
2037         self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2038                    device_sids=device_sids)
2039
2040         self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2041                    device_sids=device_sids)
2042
2043     def test_rbcd_device_without_service_asserted_identity(self):
2044         device_sids = {
2045             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2046             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2047         }
2048
2049         self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2050                    device_sids=device_sids,
2051                    code=KDC_ERR_BADOPTION,
2052                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2053                    edata=self.expect_padata_outer)
2054
2055         self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2056                    device_sids=device_sids,
2057                    code=KDC_ERR_POLICY,
2058                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2059                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2060                    reason=AuditReason.ACCESS_DENIED,
2061                    edata=self.expect_padata_outer)
2062
2063     def test_rbcd_device_with_service_asserted_identity(self):
2064         device_sids = {
2065             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2066             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2067             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2068         }
2069
2070         self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2071                    device_sids=device_sids)
2072
2073         self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2074                    device_sids=device_sids)
2075
2076     def test_rbcd_device_without_claims_valid(self):
2077         device_sids = {
2078             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2079             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2080         }
2081
2082         self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2083                    device_sids=device_sids,
2084                    code=KDC_ERR_BADOPTION,
2085                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2086                    edata=self.expect_padata_outer)
2087
2088         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2089                    device_sids=device_sids,
2090                    code=KDC_ERR_POLICY,
2091                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2092                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2093                    reason=AuditReason.ACCESS_DENIED,
2094                    edata=self.expect_padata_outer)
2095
2096     def test_rbcd_device_with_claims_valid(self):
2097         device_sids = {
2098             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2099             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2100             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2101         }
2102
2103         self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2104                    device_sids=device_sids)
2105
2106         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2107                    device_sids=device_sids)
2108
2109     def test_rbcd_device_without_compounded_auth(self):
2110         device_sids = {
2111             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2112             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2113         }
2114
2115         self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2116                    device_sids=device_sids,
2117                    code=KDC_ERR_BADOPTION,
2118                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2119                    edata=self.expect_padata_outer)
2120
2121         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2122                    device_sids=device_sids,
2123                    code=KDC_ERR_POLICY,
2124                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2125                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2126                    reason=AuditReason.ACCESS_DENIED,
2127                    edata=self.expect_padata_outer)
2128
2129     def test_rbcd_device_with_compounded_auth(self):
2130         device_sids = {
2131             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2132             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2133             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2134         }
2135
2136         self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2137                    device_sids=device_sids)
2138
2139         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2140                    device_sids=device_sids)
2141
2142     def test_rbcd(self):
2143         self._rbcd('Member_of SID({service_sid})')
2144
2145     def test_rbcd_device_from_rodc(self):
2146         self._rbcd('Member_of SID({service_sid})',
2147                    device_from_rodc=True,
2148                    code=CRASHES_WINDOWS)
2149
2150     def test_rbcd_service_from_rodc(self):
2151         self._rbcd('Member_of SID({service_sid})',
2152                    service_from_rodc=True,
2153                    code=KDC_ERR_BADOPTION,
2154                    edata=self.expect_padata_outer)
2155
2156     def test_rbcd_device_and_service_from_rodc(self):
2157         self._rbcd('Member_of SID({service_sid})',
2158                    service_from_rodc=True,
2159                    device_from_rodc=True,
2160                    code=CRASHES_WINDOWS)
2161
2162     def test_rbcd_client_from_rodc(self):
2163         self._rbcd('Member_of SID({service_sid})',
2164                    client_from_rodc=True,
2165                    code=KDC_ERR_MODIFIED,
2166                    edata=self.expect_padata_outer)
2167
2168     def test_rbcd_client_and_device_from_rodc(self):
2169         self._rbcd('Member_of SID({service_sid})',
2170                    client_from_rodc=True,
2171                    device_from_rodc=True,
2172                    code=CRASHES_WINDOWS)
2173
2174     def test_rbcd_client_and_service_from_rodc(self):
2175         self._rbcd('Member_of SID({service_sid})',
2176                    client_from_rodc=True,
2177                    service_from_rodc=True,
2178                    code=KDC_ERR_BADOPTION,
2179                    edata=self.expect_padata_outer)
2180
2181     def test_rbcd_all_from_rodc(self):
2182         self._rbcd('Member_of SID({service_sid})',
2183                    client_from_rodc=True,
2184                    service_from_rodc=True,
2185                    device_from_rodc=True,
2186                    code=CRASHES_WINDOWS)
2187
2188     def _rbcd(self,
2189               rbcd_expression=None,
2190               *,
2191               code=0,
2192               status=None,
2193               event=AuditEvent.OK,
2194               reason=AuditReason.NONE,
2195               edata=False,
2196               target_policy=None,
2197               client_from_rodc=False,
2198               service_from_rodc=False,
2199               device_from_rodc=False,
2200               client_sids=None,
2201               client_claims=None,
2202               service_sids=None,
2203               service_claims=None,
2204               device_sids=None,
2205               device_claims=None,
2206               expected_groups=None,
2207               expected_device_groups=None,
2208               expected_claims=None):
2209         if code is CRASHES_WINDOWS and not self.crash_windows:
2210             self.skipTest('test crashes Windows servers')
2211
2212         samdb = self.get_samdb()
2213         functional_level = self.get_domain_functional_level(samdb)
2214
2215         if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2216             self.skipTest('RBCD requires FL2008')
2217
2218         domain_sid_str = samdb.get_domain_sid()
2219         domain_sid = security.dom_sid(domain_sid_str)
2220
2221         client_creds = self.get_cached_creds(
2222             account_type=self.AccountType.USER,
2223             opts={
2224                 'allowed_replication_mock': client_from_rodc,
2225                 'revealed_to_mock_rodc': client_from_rodc,
2226             })
2227         client_sid = client_creds.get_sid()
2228
2229         client_username = client_creds.get_username()
2230         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2231                                                  names=[client_username])
2232
2233         client_tkt_options = 'forwardable'
2234         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2235
2236         checksum_key = self.get_krbtgt_checksum_key()
2237
2238         if client_from_rodc or service_from_rodc or device_from_rodc:
2239             rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2240             rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2241             rodc_checksum_key = {
2242                 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2243             }
2244
2245         client_tgt = self.get_tgt(client_creds,
2246                                   kdc_options=client_tkt_options,
2247                                   expected_flags=expected_flags)
2248
2249         # Create a machine account with which to perform FAST.
2250         mach_creds = self.get_cached_creds(
2251             account_type=self.AccountType.COMPUTER,
2252             opts={
2253                 'allowed_replication_mock': device_from_rodc,
2254                 'revealed_to_mock_rodc': device_from_rodc,
2255             })
2256         mach_tgt = self.get_tgt(mach_creds)
2257         device_modify_pac_fn = []
2258         if device_sids is not None:
2259             device_modify_pac_fn.append(partial(self.set_pac_sids,
2260                                                 new_sids=device_sids))
2261         if device_claims is not None:
2262             device_modify_pac_fn.append(partial(self.set_pac_claims,
2263                                                 client_claims=device_claims))
2264         mach_tgt = self.modified_ticket(
2265             mach_tgt,
2266             modify_pac_fn=device_modify_pac_fn,
2267             new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2268             checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2269
2270         service_creds = self.get_cached_creds(
2271             account_type=self.AccountType.COMPUTER,
2272             opts={
2273                 'id': 1,
2274                 'allowed_replication_mock': service_from_rodc,
2275                 'revealed_to_mock_rodc': service_from_rodc,
2276             })
2277         service_tgt = self.get_tgt(service_creds)
2278
2279         service_modify_pac_fn = []
2280         if service_sids is not None:
2281             service_modify_pac_fn.append(partial(self.set_pac_sids,
2282                                                  new_sids=service_sids))
2283         if service_claims is not None:
2284             service_modify_pac_fn.append(partial(self.set_pac_claims,
2285                                                  client_claims=service_claims))
2286         service_tgt = self.modified_ticket(
2287             service_tgt,
2288             modify_pac_fn=service_modify_pac_fn,
2289             new_ticket_key=rodc_krbtgt_key if service_from_rodc else None,
2290             checksum_keys=rodc_checksum_key if service_from_rodc else checksum_key)
2291
2292         if target_policy is None:
2293             policy = None
2294             assigned_policy = None
2295         else:
2296             sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(service_sid=service_creds.get_sid())}))'
2297             policy = self.create_authn_policy(enforced=True,
2298                                               computer_allowed_to=sddl)
2299             assigned_policy = str(policy.dn)
2300
2301         if rbcd_expression is not None:
2302             sddl = f'O:SYD:(XA;;CR;;;WD;({rbcd_expression.format(service_sid=service_creds.get_sid())}))'
2303         else:
2304             sddl = 'O:SYD:(A;;CR;;;WD)'
2305         descriptor = security.descriptor.from_sddl(sddl, domain_sid)
2306         descriptor = ndr_pack(descriptor)
2307
2308         # Create a target account with the assigned policy.
2309         target_creds = self.get_cached_creds(
2310             account_type=self.AccountType.COMPUTER,
2311             opts={
2312                 'assigned_policy': assigned_policy,
2313                 'additional_details': (
2314                     ('msDS-AllowedToActOnBehalfOfOtherIdentity', descriptor),
2315                 ),
2316             })
2317
2318         client_service_tkt = self.get_service_ticket(
2319             client_tgt,
2320             service_creds,
2321             kdc_options=client_tkt_options,
2322             expected_flags=expected_flags)
2323         client_modify_pac_fn = []
2324         if client_sids is not None:
2325             client_modify_pac_fn.append(partial(self.set_pac_sids,
2326                                                 new_sids=client_sids))
2327         if client_claims is not None:
2328             client_modify_pac_fn.append(partial(self.set_pac_claims,
2329                                                 client_claims=client_claims))
2330         client_service_tkt = self.modified_ticket(client_service_tkt,
2331                                                   modify_pac_fn=client_modify_pac_fn,
2332                                                   checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2333
2334         kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
2335
2336         target_decryption_key = self.TicketDecryptionKey_from_creds(
2337             target_creds)
2338         target_etypes = target_creds.tgs_supported_enctypes
2339
2340         service_name = service_creds.get_username()
2341         if service_name[-1] == '$':
2342             service_name = service_name[:-1]
2343         expected_transited_services = [
2344             f'host/{service_name}@{service_creds.get_realm()}'
2345         ]
2346
2347         expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2348         expected_device_groups = self.map_sids(expected_device_groups, None, domain_sid_str)
2349
2350         # Show that obtaining a service ticket with RBCD is allowed.
2351         self._tgs_req(service_tgt, code, service_creds, target_creds,
2352                       armor_tgt=mach_tgt,
2353                       kdc_options=kdc_options,
2354                       pac_options='1001',  # supports claims, RBCD
2355                       expected_cname=client_cname,
2356                       expected_account_name=client_username,
2357                       additional_ticket=client_service_tkt,
2358                       decryption_key=target_decryption_key,
2359                       expected_sid=client_sid,
2360                       expected_groups=expected_groups,
2361                       expect_device_info=bool(expected_device_groups) or None,
2362                       expected_device_domain_sid=domain_sid_str,
2363                       expected_device_groups=expected_device_groups,
2364                       expect_client_claims=bool(expected_claims) or None,
2365                       expected_client_claims=expected_claims,
2366                       expected_supported_etypes=target_etypes,
2367                       expected_proxy_target=target_creds.get_spn(),
2368                       expected_transited_services=expected_transited_services,
2369                       expected_status=status,
2370                       expect_edata=edata)
2371
2372         if code:
2373             effective_client_creds = service_creds
2374         else:
2375             effective_client_creds = client_creds
2376
2377         self.check_tgs_log(effective_client_creds, target_creds,
2378                            policy=policy,
2379                            checked_creds=service_creds,
2380                            status=status,
2381                            event=event,
2382                            reason=reason)
2383
2384     def test_tgs_without_aa_asserted_identity(self):
2385         client_sids = {
2386             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2387             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2388         }
2389
2390         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2391                   client_sids=client_sids,
2392                   expected_groups=client_sids,
2393                   code=KDC_ERR_POLICY,
2394                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2395                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2396                   reason=AuditReason.ACCESS_DENIED,
2397                   edata=self.expect_padata_outer)
2398
2399     def test_tgs_without_aa_asserted_identity_client_from_rodc(self):
2400         client_sids = {
2401             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2402             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2403         }
2404
2405         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2406                   client_from_rodc=True,
2407                   client_sids=client_sids,
2408                   expected_groups=client_sids,
2409                   code=KDC_ERR_POLICY,
2410                   edata=self.expect_padata_outer)
2411
2412     def test_tgs_without_aa_asserted_identity_device_from_rodc(self):
2413         client_sids = {
2414             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2415             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2416         }
2417
2418         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2419                   device_from_rodc=True,
2420                   client_sids=client_sids,
2421                   expected_groups=client_sids,
2422                   code=CRASHES_WINDOWS)
2423
2424     def test_tgs_without_aa_asserted_identity_both_from_rodc(self):
2425         client_sids = {
2426             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2427             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2428         }
2429
2430         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2431                   client_from_rodc=True,
2432                   device_from_rodc=True,
2433                   client_sids=client_sids,
2434                   expected_groups=client_sids,
2435                   code=CRASHES_WINDOWS)
2436
2437     def test_tgs_with_aa_asserted_identity(self):
2438         client_sids = {
2439             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2440             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2441             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2442         }
2443
2444         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2445                   client_sids=client_sids,
2446                   expected_groups=client_sids)
2447
2448     def test_tgs_with_aa_asserted_identity_client_from_rodc(self):
2449         client_sids = {
2450             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2451             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2452             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2453         }
2454
2455         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2456                   client_from_rodc=True,
2457                   client_sids=client_sids,
2458                   expected_groups=client_sids,
2459                   code=KDC_ERR_POLICY,
2460                   edata=self.expect_padata_outer)
2461
2462     def test_tgs_with_aa_asserted_identity_device_from_rodc(self):
2463         client_sids = {
2464             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2465             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2466             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2467         }
2468
2469         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2470                   device_from_rodc=True,
2471                   client_sids=client_sids,
2472                   expected_groups=client_sids,
2473                   code=CRASHES_WINDOWS)
2474
2475     def test_tgs_with_aa_asserted_identity_both_from_rodc(self):
2476         client_sids = {
2477             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2478             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2479             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2480         }
2481
2482         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2483                   client_from_rodc=True,
2484                   device_from_rodc=True,
2485                   client_sids=client_sids,
2486                   expected_groups=client_sids,
2487                   code=CRASHES_WINDOWS)
2488
2489     def test_tgs_without_service_asserted_identity(self):
2490         client_sids = {
2491             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2492             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2493         }
2494
2495         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2496                   client_sids=client_sids,
2497                   expected_groups=client_sids,
2498                   code=KDC_ERR_POLICY,
2499                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2500                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2501                   reason=AuditReason.ACCESS_DENIED,
2502                   edata=self.expect_padata_outer)
2503
2504     def test_tgs_without_service_asserted_identity_client_from_rodc(self):
2505         client_sids = {
2506             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2507             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2508         }
2509
2510         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2511                   client_from_rodc=True,
2512                   client_sids=client_sids,
2513                   expected_groups=client_sids,
2514                   code=KDC_ERR_POLICY,
2515                   edata=self.expect_padata_outer)
2516
2517     def test_tgs_without_service_asserted_identity_device_from_rodc(self):
2518         client_sids = {
2519             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2520             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2521         }
2522
2523         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2524                   device_from_rodc=True,
2525                   client_sids=client_sids,
2526                   expected_groups=client_sids,
2527                   code=CRASHES_WINDOWS)
2528
2529     def test_tgs_without_service_asserted_identity_both_from_rodc(self):
2530         client_sids = {
2531             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2532             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2533         }
2534
2535         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2536                   client_from_rodc=True,
2537                   device_from_rodc=True,
2538                   client_sids=client_sids,
2539                   expected_groups=client_sids,
2540                   code=CRASHES_WINDOWS)
2541
2542     def test_tgs_with_service_asserted_identity(self):
2543         client_sids = {
2544             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2545             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2546             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2547         }
2548
2549         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2550                   client_sids=client_sids,
2551                   expected_groups=client_sids)
2552
2553     def test_tgs_with_service_asserted_identity_client_from_rodc(self):
2554         client_sids = {
2555             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2556             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2557             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2558         }
2559
2560         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2561                   client_from_rodc=True,
2562                   client_sids=client_sids,
2563                   expected_groups=client_sids,
2564                   code=KDC_ERR_POLICY,
2565                   edata=self.expect_padata_outer)
2566
2567     def test_tgs_with_service_asserted_identity_device_from_rodc(self):
2568         client_sids = {
2569             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2570             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2571             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2572         }
2573
2574         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2575                   device_from_rodc=True,
2576                   client_sids=client_sids,
2577                   expected_groups=client_sids,
2578                   code=CRASHES_WINDOWS)
2579
2580     def test_tgs_with_service_asserted_identity_both_from_rodc(self):
2581         client_sids = {
2582             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2583             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2584             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2585         }
2586
2587         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2588                   client_from_rodc=True,
2589                   device_from_rodc=True,
2590                   client_sids=client_sids,
2591                   expected_groups=client_sids,
2592                   code=CRASHES_WINDOWS)
2593
2594     def test_tgs_without_claims_valid(self):
2595         client_sids = {
2596             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2597             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2598         }
2599
2600         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2601                   client_sids=client_sids,
2602                   expected_groups=client_sids,
2603                   code=KDC_ERR_POLICY,
2604                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2605                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2606                   reason=AuditReason.ACCESS_DENIED,
2607                   edata=self.expect_padata_outer)
2608
2609     def test_tgs_without_claims_valid_client_from_rodc(self):
2610         client_sids = {
2611             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2612             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2613         }
2614
2615         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2616                   client_from_rodc=True,
2617                   client_sids=client_sids,
2618                   expected_groups=client_sids,
2619                   code=KDC_ERR_POLICY,
2620                   edata=self.expect_padata_outer)
2621
2622     def test_tgs_without_claims_valid_device_from_rodc(self):
2623         client_sids = {
2624             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2625             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2626         }
2627
2628         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2629                   device_from_rodc=True,
2630                   client_sids=client_sids,
2631                   expected_groups=client_sids,
2632                   code=CRASHES_WINDOWS)
2633
2634     def test_tgs_without_claims_valid_both_from_rodc(self):
2635         client_sids = {
2636             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2637             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2638         }
2639
2640         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2641                   client_from_rodc=True,
2642                   device_from_rodc=True,
2643                   client_sids=client_sids,
2644                   expected_groups=client_sids,
2645                   code=CRASHES_WINDOWS)
2646
2647     def test_tgs_with_claims_valid(self):
2648         client_sids = {
2649             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2650             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2651             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2652         }
2653
2654         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2655                   client_sids=client_sids,
2656                   expected_groups=client_sids)
2657
2658     def test_tgs_with_claims_valid_client_from_rodc(self):
2659         client_sids = {
2660             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2661             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2662             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2663         }
2664
2665         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2666                   client_from_rodc=True,
2667                   client_sids=client_sids,
2668                   expected_groups=client_sids,
2669                   code=KDC_ERR_POLICY,
2670                   edata=self.expect_padata_outer)
2671
2672     def test_tgs_with_claims_valid_device_from_rodc(self):
2673         client_sids = {
2674             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2675             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2676             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2677         }
2678
2679         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2680                   device_from_rodc=True,
2681                   client_sids=client_sids,
2682                   expected_groups=client_sids,
2683                   code=CRASHES_WINDOWS)
2684
2685     def test_tgs_with_claims_valid_both_from_rodc(self):
2686         client_sids = {
2687             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2688             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2689             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2690         }
2691
2692         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2693                   client_from_rodc=True,
2694                   device_from_rodc=True,
2695                   client_sids=client_sids,
2696                   expected_groups=client_sids,
2697                   code=CRASHES_WINDOWS)
2698
2699     def _tgs(self,
2700              target_policy=None,
2701              *,
2702              code=0,
2703              event=AuditEvent.OK,
2704              reason=AuditReason.NONE,
2705              status=None,
2706              edata=False,
2707              client_from_rodc=False,
2708              device_from_rodc=False,
2709              client_sids=None,
2710              client_claims=None,
2711              device_sids=None,
2712              device_claims=None,
2713              expected_groups=None,
2714              expected_device_groups=None,
2715              expected_claims=None):
2716         if code is CRASHES_WINDOWS and not self.crash_windows:
2717             self.skipTest('test crashes Windows servers')
2718
2719         samdb = self.get_samdb()
2720         functional_level = self.get_domain_functional_level(samdb)
2721
2722         if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2723             self.skipTest('RBCD requires FL2008')
2724
2725         domain_sid_str = samdb.get_domain_sid()
2726
2727         client_creds = self.get_cached_creds(
2728             account_type=self.AccountType.USER,
2729             opts={
2730                 'allowed_replication_mock': client_from_rodc,
2731                 'revealed_to_mock_rodc': client_from_rodc,
2732             })
2733         client_sid = client_creds.get_sid()
2734
2735         client_username = client_creds.get_username()
2736         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2737                                                  names=[client_username])
2738
2739         client_tkt_options = 'forwardable'
2740         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2741
2742         checksum_key = self.get_krbtgt_checksum_key()
2743
2744         if client_from_rodc or device_from_rodc:
2745             rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2746             rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2747             rodc_checksum_key = {
2748                 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2749             }
2750
2751         client_tgt = self.get_tgt(client_creds,
2752                                   kdc_options=client_tkt_options,
2753                                   expected_flags=expected_flags)
2754
2755         client_modify_pac_fn = []
2756         if client_sids is not None:
2757             client_modify_pac_fn.append(partial(self.set_pac_sids,
2758                                                 new_sids=client_sids))
2759         if client_claims is not None:
2760             client_modify_pac_fn.append(partial(self.set_pac_claims,
2761                                                 client_claims=client_claims))
2762         client_tgt = self.modified_ticket(
2763             client_tgt,
2764             modify_pac_fn=client_modify_pac_fn,
2765             new_ticket_key=rodc_krbtgt_key if client_from_rodc else None,
2766             checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2767
2768         # Create a machine account with which to perform FAST.
2769         mach_creds = self.get_cached_creds(
2770             account_type=self.AccountType.COMPUTER,
2771             opts={
2772                 'allowed_replication_mock': device_from_rodc,
2773                 'revealed_to_mock_rodc': device_from_rodc,
2774             })
2775         mach_tgt = self.get_tgt(mach_creds)
2776         device_modify_pac_fn = []
2777         if device_sids is not None:
2778             device_modify_pac_fn.append(partial(self.set_pac_sids,
2779                                                 new_sids=device_sids))
2780         if device_claims is not None:
2781             device_modify_pac_fn.append(partial(self.set_pac_claims,
2782                                                 client_claims=device_claims))
2783         mach_tgt = self.modified_ticket(
2784             mach_tgt,
2785             modify_pac_fn=device_modify_pac_fn,
2786             new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2787             checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2788
2789         if target_policy is None:
2790             policy = None
2791             assigned_policy = None
2792         else:
2793             sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(client_sid=client_creds.get_sid())}))'
2794             policy = self.create_authn_policy(enforced=True,
2795                                               computer_allowed_to=sddl)
2796             assigned_policy = str(policy.dn)
2797
2798         # Create a target account with the assigned policy.
2799         target_creds = self.get_cached_creds(
2800             account_type=self.AccountType.COMPUTER,
2801             opts={'assigned_policy': assigned_policy})
2802
2803         target_decryption_key = self.TicketDecryptionKey_from_creds(
2804             target_creds)
2805         target_etypes = target_creds.tgs_supported_enctypes
2806
2807         expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2808         expected_device_groups = self.map_sids(expected_device_groups, None, domain_sid_str)
2809
2810         # Show that obtaining a service ticket is allowed.
2811         self._tgs_req(client_tgt, code, client_creds, target_creds,
2812                       armor_tgt=mach_tgt,
2813                       expected_cname=client_cname,
2814                       expected_account_name=client_username,
2815                       decryption_key=target_decryption_key,
2816                       expected_sid=client_sid,
2817                       expected_groups=expected_groups,
2818                       expect_device_info=bool(expected_device_groups) or None,
2819                       expected_device_domain_sid=domain_sid_str,
2820                       expected_device_groups=expected_device_groups,
2821                       expect_client_claims=bool(expected_claims) or None,
2822                       expected_client_claims=expected_claims,
2823                       expected_supported_etypes=target_etypes,
2824                       expected_status=status,
2825                       expect_edata=edata)
2826
2827         self.check_tgs_log(client_creds, target_creds,
2828                            policy=policy,
2829                            checked_creds=client_creds,
2830                            status=status,
2831                            event=event,
2832                            reason=reason)
2833
2834     def test_conditional_ace_allowed_from_user_deny(self):
2835         # Create a machine account with which to perform FAST.
2836         mach_creds = self.get_cached_creds(
2837             account_type=self.AccountType.COMPUTER)
2838         mach_tgt = self.get_tgt(mach_creds)
2839
2840         # Create an authentication policy that explicitly denies the machine
2841         # account for a user.
2842         allowed = 'O:SYD:(A;;CR;;;WD)'
2843         denied = f'O:SYD:(XD;;CR;;;{mach_creds.get_sid()};(abc))'
2844         policy = self.create_authn_policy(enforced=True,
2845                                           user_allowed_from=denied,
2846                                           service_allowed_from=allowed)
2847
2848         # Create a user account with the assigned policy.
2849         client_creds = self._get_creds(account_type=self.AccountType.USER,
2850                                        assigned_policy=policy)
2851
2852         # Show that we get a policy error when trying to authenticate.
2853         self._get_tgt(client_creds, armor_tgt=mach_tgt,
2854                       expected_error=KDC_ERR_POLICY)
2855
2856         self.check_as_log(
2857             client_creds,
2858             armor_creds=mach_creds,
2859             client_policy=policy,
2860             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2861             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
2862             reason=AuditReason.ACCESS_DENIED,
2863             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
2864
2865
2866 class TgsReqServicePolicyTests(ConditionalAceBaseTests):
2867     def test_pac_groups_not_present(self):
2868         """Test that authorization succeeds if the client does not belong to
2869         some required groups.
2870         """
2871
2872         required_sids = {
2873             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
2874             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
2875         }
2876
2877         # Create a machine account with which to perform FAST.
2878         mach_creds = self.get_cached_creds(
2879             account_type=self.AccountType.COMPUTER,
2880             opts={'id': 'device'})
2881         mach_tgt = self.get_tgt(mach_creds)
2882
2883         # Create a user account.
2884         client_creds = self._get_creds(account_type=self.AccountType.USER)
2885         client_tgt = self.get_tgt(client_creds)
2886
2887         # Create an authentication policy that requires the client to belong to
2888         # certain groups.
2889         target_policy_sddl = self.allow_if(
2890             f'Member_of {self.sddl_array_from_sids(required_sids)}')
2891         target_policy = self.create_authn_policy(
2892             enforced=True, computer_allowed_to=target_policy_sddl)
2893
2894         # Create a target account with the assigned policy.
2895         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
2896                                        assigned_policy=target_policy)
2897
2898         # Show that authorization fails.
2899         self._tgs_req(
2900             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
2901             armor_tgt=mach_tgt,
2902             expect_edata=self.expect_padata_outer,
2903             # We aren’t particular about whether or not we get an NTSTATUS.
2904             expect_status=None,
2905             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
2906
2907         self.check_tgs_log(
2908             client_creds, target_creds,
2909             policy=target_policy,
2910             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2911             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2912             reason=AuditReason.ACCESS_DENIED)
2913
2914     def test_pac_groups_present(self):
2915         """Test that authorization succeeds if the client belongs to some
2916         required groups.
2917         """
2918
2919         required_sids = {
2920             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
2921             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
2922         }
2923
2924         client_sids = required_sids | {
2925             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2926             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2927         }
2928
2929         # Create a machine account with which to perform FAST.
2930         mach_creds = self.get_cached_creds(
2931             account_type=self.AccountType.COMPUTER,
2932             opts={'id': 'device'})
2933         mach_tgt = self.get_tgt(mach_creds)
2934
2935         # Create a user account.
2936         client_creds = self._get_creds(account_type=self.AccountType.USER)
2937         client_tgt = self.get_tgt(client_creds)
2938
2939         # Add the required groups to the client’s TGT.
2940         client_tgt = self.modified_ticket(
2941             client_tgt,
2942             modify_pac_fn=partial(self.set_pac_sids,
2943                                   new_sids=client_sids),
2944             checksum_keys=self.get_krbtgt_checksum_key())
2945
2946         # Create an authentication policy that requires the client to belong to
2947         # certain groups.
2948         target_policy_sddl = self.allow_if(
2949             f'Member_of {self.sddl_array_from_sids(required_sids)}')
2950         target_policy = self.create_authn_policy(
2951             enforced=True, computer_allowed_to=target_policy_sddl)
2952
2953         # Create a target account with the assigned policy.
2954         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
2955                                        assigned_policy=target_policy)
2956
2957         # Show that authorization succeeds.
2958         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
2959
2960         self.check_tgs_log(client_creds, target_creds,
2961                            policy=target_policy)
2962
2963     def test_pac_resource_groups_present_to_service_sid_compression(self):
2964         """Test that authorization succeeds if the client belongs to some
2965         required resource groups, and the request is to a service that supports
2966         SID compression.
2967         """
2968
2969         required_sids = {
2970             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
2971             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
2972             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
2973         }
2974
2975         client_sids = required_sids | {
2976             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2977             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2978         }
2979
2980         # Create a machine account with which to perform FAST.
2981         mach_creds = self.get_cached_creds(
2982             account_type=self.AccountType.COMPUTER,
2983             opts={'id': 'device'})
2984         mach_tgt = self.get_tgt(mach_creds)
2985
2986         # Create a user account.
2987         client_creds = self._get_creds(account_type=self.AccountType.USER)
2988         client_tgt = self.get_tgt(client_creds)
2989
2990         # Add the required groups to the client’s TGT.
2991         client_tgt = self.modified_ticket(
2992             client_tgt,
2993             modify_pac_fn=partial(self.set_pac_sids,
2994                                   new_sids=client_sids),
2995             checksum_keys=self.get_krbtgt_checksum_key())
2996
2997         # Create an authentication policy that requires the client to belong to
2998         # certain groups.
2999         target_policy_sddl = self.allow_if(
3000             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3001         target_policy = self.create_authn_policy(
3002             enforced=True, computer_allowed_to=target_policy_sddl)
3003
3004         # Create a target account with the assigned policy.
3005         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3006                                        assigned_policy=target_policy)
3007
3008         # Show that authorization fails.
3009         self._tgs_req(
3010             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3011             armor_tgt=mach_tgt,
3012             expect_edata=self.expect_padata_outer,
3013             # We aren’t particular about whether or not we get an NTSTATUS.
3014             expect_status=None,
3015             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3016
3017         self.check_tgs_log(
3018             client_creds, target_creds,
3019             policy=target_policy,
3020             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3021             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3022             reason=AuditReason.ACCESS_DENIED)
3023
3024     def test_pac_resource_groups_present_to_service_no_sid_compression(self):
3025         """Test that authorization succeeds if the client belongs to some
3026         required resource groups, and the request is to a service that does not
3027         support SID compression.
3028         """
3029
3030         required_sids = {
3031             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3032             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3033             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3034         }
3035
3036         client_sids = required_sids | {
3037             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3038             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3039         }
3040
3041         # Create a machine account with which to perform FAST.
3042         mach_creds = self.get_cached_creds(
3043             account_type=self.AccountType.COMPUTER,
3044             opts={'id': 'device'})
3045         mach_tgt = self.get_tgt(mach_creds)
3046
3047         # Create a user account.
3048         client_creds = self._get_creds(account_type=self.AccountType.USER)
3049         client_tgt = self.get_tgt(client_creds)
3050
3051         # Add the required groups to the client’s TGT.
3052         client_tgt = self.modified_ticket(
3053             client_tgt,
3054             modify_pac_fn=partial(self.set_pac_sids,
3055                                   new_sids=client_sids),
3056             checksum_keys=self.get_krbtgt_checksum_key())
3057
3058         # Create an authentication policy that requires the client to belong to
3059         # certain groups.
3060         target_policy_sddl = self.allow_if(
3061             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3062         target_policy = self.create_authn_policy(
3063             enforced=True, computer_allowed_to=target_policy_sddl)
3064
3065         # Create a target account with the assigned policy.
3066         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3067                                        assigned_policy=target_policy,
3068                                        additional_details={
3069                                            'msDS-SupportedEncryptionTypes': str((
3070                                                security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
3071                                                    security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK) | (
3072                                                        security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED))})
3073
3074         # Show that authorization fails.
3075         self._tgs_req(
3076             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3077             armor_tgt=mach_tgt,
3078             expect_edata=self.expect_padata_outer,
3079             # We aren’t particular about whether or not we get an NTSTATUS.
3080             expect_status=None,
3081             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3082
3083         self.check_tgs_log(
3084             client_creds, target_creds,
3085             policy=target_policy,
3086             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3087             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3088             reason=AuditReason.ACCESS_DENIED)
3089
3090     def test_pac_well_known_groups_not_present(self):
3091         """Test that authorization fails if the client does not belong to one
3092         or more required well‐known groups.
3093         """
3094
3095         required_sids = {
3096             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3097             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
3098             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3099             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3100         }
3101
3102         client_sids = {
3103             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3104             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3105         }
3106
3107         # Create a machine account with which to perform FAST.
3108         mach_creds = self.get_cached_creds(
3109             account_type=self.AccountType.COMPUTER,
3110             opts={'id': 'device'})
3111         mach_tgt = self.get_tgt(mach_creds)
3112
3113         # Create a user account.
3114         client_creds = self._get_creds(account_type=self.AccountType.USER)
3115         client_tgt = self.get_tgt(client_creds)
3116
3117         # Modify the client’s TGT to contain only the SID of the client’s
3118         # primary group.
3119         client_tgt = self.modified_ticket(
3120             client_tgt,
3121             modify_pac_fn=partial(self.set_pac_sids,
3122                                   new_sids=client_sids),
3123             checksum_keys=self.get_krbtgt_checksum_key())
3124
3125         # Create an authentication policy that requires the client to belong to
3126         # certain groups.
3127         target_policy_sddl = self.allow_if(
3128             f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
3129         target_policy = self.create_authn_policy(
3130             enforced=True, computer_allowed_to=target_policy_sddl)
3131
3132         # Create a target account with the assigned policy.
3133         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3134                                        assigned_policy=target_policy)
3135
3136         # Show that authorization fails.
3137         self._tgs_req(
3138             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3139             armor_tgt=mach_tgt,
3140             expect_edata=self.expect_padata_outer,
3141             # We aren’t particular about whether or not we get an NTSTATUS.
3142             expect_status=None,
3143             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3144
3145         self.check_tgs_log(
3146             client_creds, target_creds,
3147             policy=target_policy,
3148             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3149             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3150             reason=AuditReason.ACCESS_DENIED)
3151
3152     def test_pac_device_info(self):
3153         self._run_pac_device_info_test()
3154
3155     def test_pac_device_info_no_compound_id_support(self):
3156         self._run_pac_device_info_test(compound_id_support=False)
3157
3158     def test_pac_device_info_no_claims_valid(self):
3159         self._run_pac_device_info_test(device_claims_valid=False)
3160
3161     def _run_pac_device_info_test(self, compound_id_support=True, device_claims_valid=True):
3162         """Test the groups of the client and the device after performing a
3163         FAST‐armored TGS‐REQ.
3164         """
3165
3166         client_claim_id = 'the name of the client’s client claim'
3167         client_claim_value = 'the value of the client’s client claim'
3168
3169         client_claims = [
3170             (claims.CLAIMS_SOURCE_TYPE_AD, [
3171                 (client_claim_id, claims.CLAIM_TYPE_STRING, [client_claim_value]),
3172             ]),
3173         ]
3174
3175         expected_client_claims = {
3176             client_claim_id: {
3177                 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
3178                 'type': claims.CLAIM_TYPE_STRING,
3179                 'values': (client_claim_value,),
3180             },
3181         }
3182
3183         device_claim_id = 'the name of the device’s client claim'
3184         device_claim_value = 'the value of the device’s client claim'
3185
3186         device_claims = [
3187             (claims.CLAIMS_SOURCE_TYPE_AD, [
3188                 (device_claim_id, claims.CLAIM_TYPE_STRING, [device_claim_value]),
3189             ]),
3190         ]
3191
3192         if compound_id_support:
3193             expected_device_claims = {
3194                 device_claim_id: {
3195                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
3196                     'type': claims.CLAIM_TYPE_STRING,
3197                     'values': (device_claim_value,),
3198                 },
3199             }
3200         else:
3201             expected_device_claims = None
3202
3203         # Create a machine account with which to perform FAST.
3204         mach_creds = self.get_cached_creds(
3205             account_type=self.AccountType.COMPUTER,
3206             opts={'id': 'device'})
3207         mach_tgt = self.get_tgt(mach_creds)
3208
3209         client_sids = {
3210             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3211             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3212             # This to ensure we have EXTRA_SIDS set already, as
3213             # windows won't set that flag otherwise when adding one
3214             # more
3215             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3216         }
3217
3218         device_sids = {
3219             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3220             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3221             ('S-1-2-3-4', SidType.EXTRA_SID, self.resource_attrs),
3222             ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
3223         }
3224
3225         if device_claims_valid:
3226             device_sids.add((security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs))
3227
3228         # Modify the machine account’s TGT to contain only the SID of the
3229         # machine account’s primary group.
3230         mach_tgt = self.modified_ticket(
3231             mach_tgt,
3232             modify_pac_fn=[
3233                 partial(self.set_pac_sids,
3234                         new_sids=device_sids),
3235                 partial(self.set_pac_claims, client_claims=device_claims),
3236             ],
3237             checksum_keys=self.get_krbtgt_checksum_key())
3238
3239         # Create a user account.
3240         client_creds = self._get_creds(account_type=self.AccountType.USER)
3241         client_tgt = self.get_tgt(client_creds)
3242
3243         # Modify the client’s TGT to contain only the SID of the client’s
3244         # primary group.
3245         client_tgt = self.modified_ticket(
3246             client_tgt,
3247             modify_pac_fn=[
3248                 partial(self.set_pac_sids,
3249                         new_sids=client_sids),
3250                 partial(self.set_pac_claims, client_claims=client_claims),
3251             ],
3252             checksum_keys=self.get_krbtgt_checksum_key())
3253
3254         # Indicate that Compound Identity is supported.
3255         target_creds, _ = self.get_target(to_krbtgt=False, compound_id=compound_id_support)
3256
3257         expected_sids = {
3258             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3259             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3260             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3261             # The client’s groups are not to include the Asserted Identity and
3262             # Claims Valid SIDs.
3263         }
3264
3265         if compound_id_support:
3266             expected_sids.add((security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs))
3267
3268             expected_device_sids = {
3269                 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3270                 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3271                 ('S-1-2-3-4', SidType.EXTRA_SID, self.resource_attrs),
3272                 ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
3273             }
3274
3275             if device_claims_valid:
3276                 expected_device_sids.add(frozenset([(security.SID_CLAIMS_VALID, SidType.RESOURCE_SID, self.default_attrs)]))
3277         else:
3278             expected_device_sids = None
3279
3280         samdb = self.get_samdb()
3281         domain_sid_str = samdb.get_domain_sid()
3282
3283         expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3284         # The device SIDs will be put into the PAC unmodified.
3285         expected_device_sids = self.map_sids(expected_device_sids, None, domain_sid_str)
3286
3287         # Show that authorization succeeds.
3288         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
3289                       expected_groups=expected_sids,
3290                       expect_device_info=bool(expected_device_sids) or None,
3291                       expected_device_domain_sid=domain_sid_str,
3292                       expected_device_groups=expected_device_sids,
3293                       expect_client_claims=bool(expected_client_claims) or None,
3294                       expected_client_claims=expected_client_claims,
3295                       expect_device_claims=bool(expected_device_claims) or None,
3296                       expected_device_claims=expected_device_claims)
3297
3298         self.check_tgs_log(client_creds, target_creds)
3299
3300     def test_pac_extra_sids_behaviour(self):
3301         """Test the groups of the client and the device after performing a
3302         FAST‐armored TGS‐REQ.
3303         """
3304
3305         # Create a machine account with which to perform FAST.
3306         mach_creds = self.get_cached_creds(
3307             account_type=self.AccountType.COMPUTER,
3308             opts={'id': 'device'})
3309         mach_tgt = self.get_tgt(mach_creds)
3310
3311         client_sids = {
3312             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3313             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3314         }
3315
3316         # Create a user account.
3317         client_creds = self._get_creds(account_type=self.AccountType.USER)
3318         client_tgt = self.get_tgt(client_creds)
3319
3320         # Modify the client’s TGT to contain only the SID of the client’s
3321         # primary group.
3322         client_tgt = self.modified_ticket(
3323             client_tgt,
3324             modify_pac_fn=partial(self.set_pac_sids,
3325                                   new_sids=client_sids),
3326             checksum_keys=self.get_krbtgt_checksum_key())
3327
3328         # Indicate that Compound Identity is supported.
3329         target_creds, _ = self.get_target(to_krbtgt=False, compound_id=True)
3330
3331         expected_sids = {
3332             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3333             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3334             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs)
3335             # The client’s groups are not to include the Asserted Identity and
3336             # Claims Valid SIDs.
3337         }
3338
3339         samdb = self.get_samdb()
3340         domain_sid_str = samdb.get_domain_sid()
3341
3342         expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3343
3344         # Show that authorization succeeds.
3345         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
3346                       expected_groups=expected_sids)
3347
3348         self.check_tgs_log(client_creds, target_creds)
3349
3350     def test_pac_claims_not_present(self):
3351         """Test that authentication fails if the device does not have a
3352         required claim.
3353         """
3354
3355         claim_id = 'the name of the claim'
3356         claim_value = 'the value of the claim'
3357
3358         # Create a machine account with which to perform FAST.
3359         mach_creds = self.get_cached_creds(
3360             account_type=self.AccountType.COMPUTER,
3361             opts={'id': 'device'})
3362         mach_tgt = self.get_tgt(mach_creds)
3363
3364         # Create an authentication policy that requires the device to have a
3365         # certain claim.
3366         target_policy_sddl = self.allow_if(
3367             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3368         target_policy = self.create_authn_policy(
3369             enforced=True, computer_allowed_to=target_policy_sddl)
3370
3371         # Create a user account.
3372         client_creds = self._get_creds(account_type=self.AccountType.USER)
3373         client_tgt = self.get_tgt(client_creds)
3374
3375         # Create a target account with the assigned policy.
3376         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3377                                        assigned_policy=target_policy)
3378
3379         # Show that authorization fails.
3380         self._tgs_req(
3381             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3382             armor_tgt=mach_tgt,
3383             expect_edata=self.expect_padata_outer,
3384             # We aren’t particular about whether or not we get an NTSTATUS.
3385             expect_status=None,
3386             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3387
3388         self.check_tgs_log(
3389             client_creds,
3390             target_creds,
3391             policy=target_policy,
3392             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3393             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3394             reason=AuditReason.ACCESS_DENIED)
3395
3396     def test_pac_claims_present(self):
3397         """Test that authentication succeeds if the user has a required
3398         claim.
3399         """
3400
3401         claim_id = 'the name of the claim'
3402         claim_value = 'the value of the claim'
3403
3404         pac_claims = [
3405             (claims.CLAIMS_SOURCE_TYPE_AD, [
3406                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3407             ]),
3408         ]
3409
3410         # Create a machine account with which to perform FAST.
3411         mach_creds = self.get_cached_creds(
3412             account_type=self.AccountType.COMPUTER,
3413             opts={'id': 'device'})
3414         mach_tgt = self.get_tgt(mach_creds)
3415
3416         # Create an authentication policy that requires the user to have a
3417         # certain claim.
3418         target_policy_sddl = self.allow_if(
3419             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3420         target_policy = self.create_authn_policy(
3421             enforced=True, computer_allowed_to=target_policy_sddl)
3422
3423         # Create a user account.
3424         client_creds = self._get_creds(account_type=self.AccountType.USER)
3425         client_tgt = self.get_tgt(client_creds)
3426
3427         # Add the required claim to the client’s TGT.
3428         client_tgt = self.modified_ticket(
3429             client_tgt,
3430             modify_pac_fn=partial(self.set_pac_claims,
3431                                   client_claims=pac_claims),
3432             checksum_keys=self.get_krbtgt_checksum_key())
3433
3434         # Create a target account with the assigned policy.
3435         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3436                                        assigned_policy=target_policy)
3437
3438         # Show that authorization succeeds.
3439         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
3440
3441         self.check_tgs_log(client_creds, target_creds,
3442                            policy=target_policy)
3443
3444     def test_pac_claims_invalid(self):
3445         """Test that authentication fails if the device’s required claim is not
3446         valid.
3447         """
3448
3449         claim_id = 'the name of the claim'
3450         claim_value = 'the value of the claim'
3451
3452         pac_claims = [
3453             (claims.CLAIMS_SOURCE_TYPE_AD, [
3454                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3455             ]),
3456         ]
3457
3458         # The device’s SIDs do not include the Claims Valid SID.
3459         device_sids = {
3460             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3461             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3462         }
3463
3464         # Create a machine account with which to perform FAST.
3465         mach_creds = self.get_cached_creds(
3466             account_type=self.AccountType.COMPUTER,
3467             opts={'id': 'device'})
3468         mach_tgt = self.get_tgt(mach_creds)
3469
3470         # Create an authentication policy that requires the device to have a
3471         # certain claim.
3472         target_policy_sddl = self.allow_if(
3473             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3474         target_policy = self.create_authn_policy(
3475             enforced=True, computer_allowed_to=target_policy_sddl)
3476
3477         # Create a user account.
3478         client_creds = self._get_creds(account_type=self.AccountType.USER)
3479         client_tgt = self.get_tgt(client_creds)
3480
3481         # Add the SIDs and the required claim to the client’s TGT.
3482         client_tgt = self.modified_ticket(
3483             client_tgt,
3484             modify_pac_fn=[
3485                 partial(self.set_pac_claims, client_claims=pac_claims),
3486                 partial(self.set_pac_sids, new_sids=device_sids)],
3487             checksum_keys=self.get_krbtgt_checksum_key())
3488
3489         # Create a target account with the assigned policy.
3490         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3491                                        assigned_policy=target_policy)
3492
3493         # Show that authorization fails.
3494         self._tgs_req(
3495             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3496             armor_tgt=mach_tgt,
3497             expect_edata=self.expect_padata_outer,
3498             # We aren’t particular about whether or not we get an NTSTATUS.
3499             expect_status=None,
3500             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3501
3502         self.check_tgs_log(
3503             client_creds,
3504             target_creds,
3505             policy=target_policy,
3506             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3507             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3508             reason=AuditReason.ACCESS_DENIED)
3509
3510     def test_pac_device_claims_not_present(self):
3511         """Test that authorization fails if the device does not have a
3512         required claim.
3513         """
3514
3515         claim_id = 'the name of the claim'
3516         claim_value = 'the value of the claim'
3517
3518         # Create a machine account with which to perform FAST.
3519         mach_creds = self.get_cached_creds(
3520             account_type=self.AccountType.COMPUTER,
3521             opts={'id': 'device'})
3522         mach_tgt = self.get_tgt(mach_creds)
3523
3524         # Create an authentication policy that requires the device to have a
3525         # certain device claim.
3526         target_policy_sddl = self.allow_if(
3527             f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3528         target_policy = self.create_authn_policy(
3529             enforced=True, computer_allowed_to=target_policy_sddl)
3530
3531         # Create a user account.
3532         client_creds = self._get_creds(account_type=self.AccountType.USER)
3533         client_tgt = self.get_tgt(client_creds)
3534
3535         # Create a target account with the assigned policy.
3536         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3537                                        assigned_policy=target_policy)
3538
3539         # Show that authorization fails.
3540         self._tgs_req(
3541             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3542             armor_tgt=mach_tgt,
3543             expect_edata=self.expect_padata_outer,
3544             # We aren’t particular about whether or not we get an NTSTATUS.
3545             expect_status=None,
3546             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3547
3548         self.check_tgs_log(
3549             client_creds,
3550             target_creds,
3551             policy=target_policy,
3552             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3553             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3554             reason=AuditReason.ACCESS_DENIED)
3555
3556     def test_pac_device_claims_present(self):
3557         """Test that authorization succeeds if the device has a required claim.
3558         """
3559
3560         claim_id = 'the name of the claim'
3561         claim_value = 'the value of the claim'
3562
3563         pac_claims = [
3564             (claims.CLAIMS_SOURCE_TYPE_AD, [
3565                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3566             ]),
3567         ]
3568
3569         # Create a machine account with which to perform FAST.
3570         mach_creds = self.get_cached_creds(
3571             account_type=self.AccountType.COMPUTER,
3572             opts={'id': 'device'})
3573         mach_tgt = self.get_tgt(mach_creds)
3574
3575         # Add the required claim to the machine account’s TGT.
3576         mach_tgt = self.modified_ticket(
3577             mach_tgt,
3578             modify_pac_fn=partial(self.set_pac_claims,
3579                                   client_claims=pac_claims),
3580             checksum_keys=self.get_krbtgt_checksum_key())
3581
3582         # Create an authentication policy that requires the device to have a
3583         # certain device claim.
3584         target_policy_sddl = self.allow_if(
3585             f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3586         target_policy = self.create_authn_policy(
3587             enforced=True, computer_allowed_to=target_policy_sddl)
3588
3589         # Create a user account.
3590         client_creds = self._get_creds(account_type=self.AccountType.USER)
3591         client_tgt = self.get_tgt(client_creds)
3592
3593         # Create a target account with the assigned policy.
3594         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3595                                        assigned_policy=target_policy)
3596
3597         # Show that authorization succeeds.
3598         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
3599
3600         self.check_tgs_log(client_creds, target_creds,
3601                            policy=target_policy)
3602
3603     def test_pac_device_claims_invalid(self):
3604         """Test that authorization fails if the device’s required claim is not
3605         valid.
3606         """
3607
3608         claim_id = 'the name of the claim'
3609         claim_value = 'the value of the claim'
3610
3611         pac_claims = [
3612             (claims.CLAIMS_SOURCE_TYPE_AD, [
3613                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3614             ]),
3615         ]
3616
3617         # The device’s SIDs do not include the Claims Valid SID.
3618         device_sids = {
3619             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3620             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3621         }
3622
3623         # Create a machine account with which to perform FAST.
3624         mach_creds = self.get_cached_creds(
3625             account_type=self.AccountType.COMPUTER,
3626             opts={'id': 'device'})
3627         mach_tgt = self.get_tgt(mach_creds)
3628
3629         # Add the SIDs and the required claim to the machine account’s TGT.
3630         mach_tgt = self.modified_ticket(
3631             mach_tgt,
3632             modify_pac_fn=[
3633                 partial(self.set_pac_claims, client_claims=pac_claims),
3634                 partial(self.set_pac_sids, new_sids=device_sids)],
3635             checksum_keys=self.get_krbtgt_checksum_key())
3636
3637         # Create an authentication policy that requires the device to have a
3638         # certain claim.
3639         target_policy_sddl = self.allow_if(
3640             f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3641         target_policy = self.create_authn_policy(
3642             enforced=True, computer_allowed_to=target_policy_sddl)
3643
3644         # Create a user account.
3645         client_creds = self._get_creds(account_type=self.AccountType.USER)
3646         client_tgt = self.get_tgt(client_creds)
3647
3648         # Create a target account with the assigned policy.
3649         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3650                                        assigned_policy=target_policy)
3651
3652         # Show that authorization fails.
3653         self._tgs_req(
3654             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3655             armor_tgt=mach_tgt,
3656             expect_edata=self.expect_padata_outer,
3657             # We aren’t particular about whether or not we get an NTSTATUS.
3658             expect_status=None,
3659             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3660
3661         self.check_tgs_log(
3662             client_creds,
3663             target_creds,
3664             policy=target_policy,
3665             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3666             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3667             reason=AuditReason.ACCESS_DENIED)
3668
3669     def test_pac_device_claims_invalid_no_attrs(self):
3670         """Test that authorization fails if the device’s required claim is not
3671         valid.
3672         """
3673
3674         claim_id = 'the name of the claim'
3675         claim_value = 'the value of the claim'
3676
3677         pac_claims = [
3678             (claims.CLAIMS_SOURCE_TYPE_AD, [
3679                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3680             ]),
3681         ]
3682
3683         device_sids = {
3684             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3685             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3686             # The device’s SIDs include the Claims Valid SID, but it has no
3687             # attributes.
3688             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, 0),
3689         }
3690
3691         # Create a machine account with which to perform FAST.
3692         mach_creds = self.get_cached_creds(
3693             account_type=self.AccountType.COMPUTER,
3694             opts={'id': 'device'})
3695         mach_tgt = self.get_tgt(mach_creds)
3696
3697         # Add the SIDs and the required claim to the machine account’s TGT.
3698         mach_tgt = self.modified_ticket(
3699             mach_tgt,
3700             modify_pac_fn=[
3701                 partial(self.set_pac_claims, client_claims=pac_claims),
3702                 partial(self.set_pac_sids, new_sids=device_sids)],
3703             checksum_keys=self.get_krbtgt_checksum_key())
3704
3705         # Create an authentication policy that requires the device to have a
3706         # certain claim.
3707         target_policy_sddl = self.allow_if(
3708             f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3709         target_policy = self.create_authn_policy(
3710             enforced=True, computer_allowed_to=target_policy_sddl)
3711
3712         # Create a user account.
3713         client_creds = self._get_creds(account_type=self.AccountType.USER)
3714         client_tgt = self.get_tgt(client_creds)
3715
3716         # Create a target account with the assigned policy.
3717         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3718                                        assigned_policy=target_policy)
3719
3720         # Show that authorization succeeds.
3721         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
3722
3723         self.check_tgs_log(client_creds, target_creds,
3724                            policy=target_policy)
3725
3726
3727 if __name__ == '__main__':
3728     global_asn1_print = False
3729     global_hexdump = False
3730     import unittest
3731     unittest.main()