tests/krb5: Test whether the device belongs to some default groups
[samba.git] / python / samba / tests / krb5 / conditional_ace_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2023
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 sys.path.insert(0, 'bin/python')
24 os.environ['PYTHONUNBUFFERED'] = '1'
25
26 from collections import OrderedDict
27 from functools import partial
28 import re
29 from string import Formatter
30
31 import ldb
32
33 from samba import dsdb, ntstatus
34 from samba.dcerpc import claims, krb5pac, security
35 from samba.ndr import ndr_pack, ndr_unpack
36
37 from samba.tests import DynamicTestCase, env_get_var_value
38 from samba.tests.krb5.authn_policy_tests import (
39     AuditEvent,
40     AuditReason,
41     AuthnPolicyBaseTests,
42 )
43 from samba.tests.krb5.raw_testcase import RawKerberosTest
44 from samba.tests.krb5.rfc4120_constants import (
45     KDC_ERR_BADOPTION,
46     KDC_ERR_GENERIC,
47     KDC_ERR_MODIFIED,
48     KDC_ERR_POLICY,
49     NT_PRINCIPAL,
50 )
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
52
53 SidType = RawKerberosTest.SidType
54
55 global_asn1_print = False
56 global_hexdump = False
57
58
59 # When used as a test outcome, indicates that the test can cause a Windows
60 # server to crash, and is to be run with caution.
61 CRASHES_WINDOWS = object()
62
63
64 class ConditionalAceBaseTests(AuthnPolicyBaseTests):
65     # Constants for group SID attributes.
66     default_attrs = security.SE_GROUP_DEFAULT_FLAGS
67     resource_attrs = default_attrs | security.SE_GROUP_RESOURCE
68
69     aa_asserted_identity = (
70         security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
71     service_asserted_identity = security.SID_SERVICE_ASSERTED_IDENTITY
72
73     @classmethod
74     def setUpClass(cls):
75         super().setUpClass()
76
77         cls._setup = False
78
79     def setUp(self):
80         super().setUp()
81         self.do_asn1_print = global_asn1_print
82         self.do_hexdump = global_hexdump
83
84         if not self._setup:
85             samdb = self.get_samdb()
86             cls = type(self)
87
88             # Create a machine account with which to perform FAST.
89             cls._mach_creds = self.get_cached_creds(
90                 account_type=self.AccountType.COMPUTER)
91
92             # Create some new groups.
93
94             group0_name = self.get_new_username()
95             group0_dn = self.create_group(samdb, group0_name)
96             cls._group0_sid = self.get_objectSid(samdb, group0_dn)
97
98             group1_name = self.get_new_username()
99             group1_dn = self.create_group(samdb, group1_name)
100             cls._group1_sid = self.get_objectSid(samdb, group1_dn)
101
102             # Create machine accounts with which to perform FAST that belong to
103             # various arrangements of the groups.
104
105             cls._member_of_both_creds = self.get_cached_creds(
106                 account_type=self.AccountType.COMPUTER,
107                 opts={'member_of': (group0_dn, group1_dn)})
108
109             cls._member_of_one_creds = self.get_cached_creds(
110                 account_type=self.AccountType.COMPUTER,
111                 opts={'member_of': (group1_dn,)})
112
113             # Create some authentication silos.
114             cls._unenforced_silo = self.create_authn_silo(enforced=False)
115             cls._enforced_silo = self.create_authn_silo(enforced=True)
116
117             # Create machine accounts with which to perform FAST that belong to
118             # the respective silos.
119
120             cls._member_of_unenforced_silo = self._get_creds(
121                 account_type=self.AccountType.COMPUTER,
122                 assigned_silo=self._unenforced_silo,
123                 cached=True)
124             self.add_to_group(str(self._member_of_unenforced_silo.get_dn()),
125                               self._unenforced_silo.dn,
126                               'msDS-AuthNPolicySiloMembers',
127                               expect_attr=False)
128
129             cls._member_of_enforced_silo = self._get_creds(
130                 account_type=self.AccountType.COMPUTER,
131                 assigned_silo=self._enforced_silo,
132                 cached=True)
133             self.add_to_group(str(self._member_of_enforced_silo.get_dn()),
134                               self._enforced_silo.dn,
135                               'msDS-AuthNPolicySiloMembers',
136                               expect_attr=False)
137
138             # Create a couple of multi‐valued string claims for testing claim
139             # value comparisons.
140
141             cls.claim0_attr = 'carLicense'
142             cls.claim0_id = self.get_new_username()
143             self.create_claim(cls.claim0_id,
144                               enabled=True,
145                               attribute=cls.claim0_attr,
146                               single_valued=False,
147                               source_type='AD',
148                               for_classes=['computer', 'user'],
149                               value_type=claims.CLAIM_TYPE_STRING)
150
151             cls.claim1_attr = 'departmentNumber'
152             cls.claim1_id = self.get_new_username()
153             self.create_claim(cls.claim1_id,
154                               enabled=True,
155                               attribute=cls.claim1_attr,
156                               single_valued=False,
157                               source_type='AD',
158                               for_classes=['computer', 'user'],
159                               value_type=claims.CLAIM_TYPE_STRING)
160
161             cls._setup = True
162
163     # For debugging purposes. Prints out the SDDL representation of
164     # authentication policy conditions set by the Windows GUI.
165     def _print_authn_policy_sddl(self, policy_id):
166         policy_dn = self.get_authn_policies_dn()
167         policy_dn.add_child(f'CN={policy_id}')
168
169         attrs = [
170             'msDS-ComputerAllowedToAuthenticateTo',
171             'msDS-ServiceAllowedToAuthenticateFrom',
172             'msDS-ServiceAllowedToAuthenticateTo',
173             'msDS-UserAllowedToAuthenticateFrom',
174             'msDS-UserAllowedToAuthenticateTo',
175         ]
176
177         samdb = self.get_samdb()
178         res = samdb.search(policy_dn, scope=ldb.SCOPE_BASE, attrs=attrs)
179         self.assertEqual(1, len(res),
180                          f'Authentication policy {policy_id} not found')
181         result = res[0]
182
183         def print_sddl(attr):
184             sd = result.get(attr, idx=0)
185             if sd is None:
186                 return
187
188             sec_desc = ndr_unpack(security.descriptor, sd)
189             print(f'{attr}: {sec_desc.as_sddl()}')
190
191         for attr in attrs:
192             print_sddl(attr)
193
194     def sddl_array_from_sids(self, sids):
195         def sddl_from_sid_entry(sid_entry):
196             sid, _, _ = sid_entry
197             return f'SID({sid})'
198
199         return f"{{{', '.join(map(sddl_from_sid_entry, sids))}}}"
200
201     def allow_if(self, condition):
202         return f'O:SYD:(XA;;CR;;;WD;({condition}))'
203
204     @staticmethod
205     def escaped_claim_id(claim_id):
206         escapes = '\x00\t\n\x0b\x0c\r !"%&()<=>|'
207         return ''.join(c
208                        if c not in escapes
209                        else f'%{ord(c):04x}'
210                        for c in claim_id)
211
212
213 @DynamicTestCase
214 class ConditionalAceTests(ConditionalAceBaseTests):
215     @classmethod
216     def setUpDynamicTestCases(cls):
217         FILTER = env_get_var_value('FILTER', allow_missing=True)
218
219         # These operators are arranged so that each operator precedes its own
220         # affixes.
221         op_names = OrderedDict([
222             ('!=', 'does not equal'),
223             ('!', 'not'),
224             ('&&', 'and'),
225             ('<=', 'is less than or equals'),
226             ('<', 'is less than'),
227             ('==', 'equals'),
228             ('>=', 'exceeds or equals'),
229             ('>', 'exceeds'),
230             ('Not_Any_of', 'matches none of'),
231             ('Any_of', 'matches any of'),
232             ('Not_Contains', 'does not contain'),
233             ('Contains', 'contains'),
234             ('Not_Member_of_Any', 'the user belongs to none of'),
235             ('Not_Device_Member_of_Any', 'the device belongs to none of'),  # TODO: no test for this yet
236             ('Device_Member_of_Any', 'the device belongs to any of'),  # TODO: no test for this yet
237             ('Not_Device_Member_of', 'the device does not belong to'),  # TODO: no test for this yet
238             ('Device_Member_of', 'the device belongs to'),
239             ('Not_Exists', 'there does not exist'),
240             ('Exists', 'there exists'),
241             ('Member_of_Any', 'the user belongs to any of'),
242             ('Not_Member_of', 'the user does not belong to'),
243             ('Member_of', 'the user belongs to'),
244             ('||', 'or'),
245         ])
246
247         # This is a safety measure to ensure correct ordering of op_names
248         keys = list(op_names.keys())
249         for i in range(len(keys)):
250             for j in range(i + 1, len(keys)):
251                 if keys[i] in keys[j]:
252                     raise AssertionError((keys[i], keys[j]))
253
254         for case in cls.pac_claim_cases:
255             if len(case) == 3:
256                 pac_claims, expression, outcome = case
257                 claim_map = None
258             elif len(case) == 4:
259                 pac_claims, expression, claim_map, outcome = case
260             else:
261                 raise AssertionError(
262                     f'found {len(case)} items in case, expected 3–4')
263
264             expression_name = expression
265             for op, op_name in op_names.items():
266                 expression_name = expression_name.replace(op, op_name)
267
268             name = f'{pac_claims}_{expression_name}'
269
270             if claim_map is not None:
271                 name += f'_{claim_map}'
272
273             name = re.sub(r'\W+', '_', name)
274             if len(name) > 150:
275                 name = f'{name[:125]}+{len(name) - 125}‐more'
276
277             if FILTER and not re.search(FILTER, name):
278                 continue
279
280             cls.generate_dynamic_test('test_pac_claim_cmp', name,
281                                       pac_claims, expression, claim_map,
282                                       outcome)
283
284         for case in cls.claim_against_claim_cases:
285             lhs, op, rhs, outcome = case
286             op_name = op_names[op]
287
288             name = f'{lhs}_{op_name}_{rhs}'
289
290             name = re.sub(r'\W+', '_', name)
291             if FILTER and not re.search(FILTER, name):
292                 continue
293
294             cls.generate_dynamic_test('test_cmp', name,
295                                       lhs, op, rhs, outcome)
296
297         for case in cls.claim_against_literal_cases:
298             lhs, op, rhs, outcome = case
299             op_name = op_names[op]
300
301             name = f'{lhs}_{op_name}_literal_{rhs}'
302
303             name = re.sub(r'\W+', '_', name)
304             if FILTER and not re.search(FILTER, name):
305                 continue
306
307             cls.generate_dynamic_test('test_cmp', name,
308                                       lhs, op, rhs, outcome, True)
309
310     def test_allowed_from_member_of_each(self):
311         # Create an authentication policy that allows accounts belonging to
312         # both groups.
313         policy = self.create_authn_policy(
314             enforced=True,
315             user_allowed_from=(
316                 f'O:SYD:(XA;;CR;;;WD;(Member_of '
317                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
318         )
319
320         # Create a user account with the assigned policy.
321         client_creds = self._get_creds(account_type=self.AccountType.USER,
322                                        assigned_policy=policy)
323
324         # Show that we get a policy error if the machine account does not
325         # belong to both groups.
326         armor_tgt = self.get_tgt(self._member_of_one_creds)
327         self._get_tgt(client_creds, armor_tgt=armor_tgt,
328                       expected_error=KDC_ERR_POLICY)
329
330         # Otherwise, authentication should succeed.
331         armor_tgt = self.get_tgt(self._member_of_both_creds)
332         self._get_tgt(client_creds, armor_tgt=armor_tgt,
333                       expected_error=0)
334
335     def test_allowed_from_member_of_any(self):
336         # Create an authentication policy that allows accounts belonging to
337         # either group.
338         policy = self.create_authn_policy(
339             enforced=True,
340             user_allowed_from=(
341                 f'O:SYD:(XA;;CR;;;WD;(Member_of_Any '
342                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
343         )
344
345         # Create a user account with the assigned policy.
346         client_creds = self._get_creds(account_type=self.AccountType.USER,
347                                        assigned_policy=policy)
348
349         # Show that we get a policy error if the machine account belongs to
350         # neither group.
351         armor_tgt = self.get_tgt(self._mach_creds)
352         self._get_tgt(client_creds, armor_tgt=armor_tgt,
353                       expected_error=KDC_ERR_POLICY)
354
355         # Otherwise, authentication should succeed.
356         armor_tgt = self.get_tgt(self._member_of_one_creds)
357         self._get_tgt(client_creds, armor_tgt=armor_tgt,
358                       expected_error=0)
359
360     def test_allowed_from_not_member_of_each(self):
361         # Create an authentication policy that allows accounts not belonging to
362         # both groups.
363         policy = self.create_authn_policy(
364             enforced=True,
365             user_allowed_from=(
366                 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of '
367                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
368         )
369
370         # Create a user account with the assigned policy.
371         client_creds = self._get_creds(account_type=self.AccountType.USER,
372                                        assigned_policy=policy)
373
374         # Show that we get a policy error if the machine account belongs to
375         # both groups.
376         armor_tgt = self.get_tgt(self._member_of_both_creds)
377         self._get_tgt(client_creds, armor_tgt=armor_tgt,
378                       expected_error=KDC_ERR_POLICY)
379
380         # Otherwise, authentication should succeed.
381         armor_tgt = self.get_tgt(self._member_of_one_creds)
382         self._get_tgt(client_creds, armor_tgt=armor_tgt,
383                       expected_error=0)
384
385     def test_allowed_from_not_member_of_any(self):
386         # Create an authentication policy that allows accounts belonging to
387         # neither group.
388         policy = self.create_authn_policy(
389             enforced=True,
390             user_allowed_from=(
391                 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of_Any '
392                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
393         )
394
395         # Create a user account with the assigned policy.
396         client_creds = self._get_creds(account_type=self.AccountType.USER,
397                                        assigned_policy=policy)
398
399         # Show that we get a policy error if the machine account belongs to one
400         # of the groups.
401         armor_tgt = self.get_tgt(self._member_of_one_creds)
402         self._get_tgt(client_creds, armor_tgt=armor_tgt,
403                       expected_error=KDC_ERR_POLICY)
404
405         # Otherwise, authentication should succeed.
406         armor_tgt = self.get_tgt(self._mach_creds)
407         self._get_tgt(client_creds, armor_tgt=armor_tgt,
408                       expected_error=0)
409
410     def test_allowed_from_member_of_each_deny(self):
411         # Create an authentication policy that denies accounts belonging to
412         # both groups, and allows other accounts.
413         policy = self.create_authn_policy(
414             enforced=True,
415             user_allowed_from=(
416                 f'O:SYD:(XD;;CR;;;WD;(Member_of '
417                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
418                 f'(A;;CR;;;WD)'),
419         )
420
421         # Create a user account with the assigned policy.
422         client_creds = self._get_creds(account_type=self.AccountType.USER,
423                                        assigned_policy=policy)
424
425         # Show that we get a policy error if the machine account belongs to
426         # both groups.
427         armor_tgt = self.get_tgt(self._member_of_both_creds)
428         self._get_tgt(client_creds, armor_tgt=armor_tgt,
429                       expected_error=KDC_ERR_POLICY)
430
431         # Otherwise, authentication should succeed.
432         armor_tgt = self.get_tgt(self._member_of_one_creds)
433         self._get_tgt(client_creds, armor_tgt=armor_tgt,
434                       expected_error=0)
435
436     def test_allowed_from_member_of_any_deny(self):
437         # Create an authentication policy that denies accounts belonging to
438         # either group, and allows other accounts.
439         policy = self.create_authn_policy(
440             enforced=True,
441             user_allowed_from=(
442                 f'O:SYD:(XD;;CR;;;WD;(Member_of_Any '
443                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
444                 f'(A;;CR;;;WD)'),
445         )
446
447         # Create a user account with the assigned policy.
448         client_creds = self._get_creds(account_type=self.AccountType.USER,
449                                        assigned_policy=policy)
450
451         # Show that we get a policy error if the machine account belongs to
452         # either group.
453         armor_tgt = self.get_tgt(self._member_of_one_creds)
454         self._get_tgt(client_creds, armor_tgt=armor_tgt,
455                       expected_error=KDC_ERR_POLICY)
456
457         # Otherwise, authentication should succeed.
458         armor_tgt = self.get_tgt(self._mach_creds)
459         self._get_tgt(client_creds, armor_tgt=armor_tgt,
460                       expected_error=0)
461
462     def test_allowed_from_not_member_of_each_deny(self):
463         # Create an authentication policy that denies accounts not belonging to
464         # both groups, and allows other accounts.
465         policy = self.create_authn_policy(
466             enforced=True,
467             user_allowed_from=(
468                 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of '
469                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
470                 f'(A;;CR;;;WD)'),
471         )
472
473         # Create a user account with the assigned policy.
474         client_creds = self._get_creds(account_type=self.AccountType.USER,
475                                        assigned_policy=policy)
476
477         # Show that we get a policy error if the machine account doesn’t belong
478         # to both groups.
479         armor_tgt = self.get_tgt(self._member_of_one_creds)
480         self._get_tgt(client_creds, armor_tgt=armor_tgt,
481                       expected_error=KDC_ERR_POLICY)
482
483         # Otherwise, authentication should succeed.
484         armor_tgt = self.get_tgt(self._member_of_both_creds)
485         self._get_tgt(client_creds, armor_tgt=armor_tgt,
486                       expected_error=0)
487
488     def test_allowed_from_not_member_of_any_deny(self):
489         # Create an authentication policy that denies accounts belonging to
490         # neither group, and allows other accounts.
491         policy = self.create_authn_policy(
492             enforced=True,
493             user_allowed_from=(
494                 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of_Any '
495                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
496                 f'(A;;CR;;;WD)'),
497         )
498
499         # Create a user account with the assigned policy.
500         client_creds = self._get_creds(account_type=self.AccountType.USER,
501                                        assigned_policy=policy)
502
503         # Show that we get a policy error if the machine account belongs to
504         # neither group.
505         armor_tgt = self.get_tgt(self._mach_creds)
506         self._get_tgt(client_creds, armor_tgt=armor_tgt,
507                       expected_error=KDC_ERR_POLICY)
508
509         # Otherwise, authentication should succeed.
510         armor_tgt = self.get_tgt(self._member_of_one_creds)
511         self._get_tgt(client_creds, armor_tgt=armor_tgt,
512                       expected_error=0)
513
514     def test_allowed_from_unenforced_silo_equals(self):
515         # Create an authentication policy that allows accounts belonging to the
516         # unenforced silo.
517         policy = self.create_authn_policy(
518             enforced=True,
519             user_allowed_from=(
520                 f'O:SYD:(XA;;CR;;;WD;'
521                 f'(@User.ad://ext/AuthenticationSilo == '
522                 f'"{self._unenforced_silo}"))'),
523         )
524
525         # Create a user account with the assigned policy.
526         client_creds = self._get_creds(account_type=self.AccountType.USER,
527                                        assigned_policy=policy)
528
529         # As the policy is unenforced, the ‘ad://ext/AuthenticationSilo’ claim
530         # will not be present in the TGT, and the ACE will never allow access.
531
532         armor_tgt = self.get_tgt(self._mach_creds)
533         self._get_tgt(client_creds, armor_tgt=armor_tgt,
534                       expected_error=KDC_ERR_POLICY)
535
536         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
537         self._get_tgt(client_creds, armor_tgt=armor_tgt,
538                       expected_error=KDC_ERR_POLICY)
539
540         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
541         self._get_tgt(client_creds, armor_tgt=armor_tgt,
542                       expected_error=KDC_ERR_POLICY)
543
544     def test_allowed_from_enforced_silo_equals(self):
545         # Create an authentication policy that allows accounts belonging to the
546         # enforced silo.
547         policy = self.create_authn_policy(
548             enforced=True,
549             user_allowed_from=(
550                 f'O:SYD:(XA;;CR;;;WD;'
551                 f'(@User.ad://ext/AuthenticationSilo == '
552                 f'"{self._enforced_silo}"))'),
553         )
554
555         # Create a user account with the assigned policy.
556         client_creds = self._get_creds(account_type=self.AccountType.USER,
557                                        assigned_policy=policy)
558
559         # Show that we get a policy error if the machine account does not
560         # belong to the silo.
561         armor_tgt = self.get_tgt(self._mach_creds)
562         self._get_tgt(client_creds, armor_tgt=armor_tgt,
563                       expected_error=KDC_ERR_POLICY)
564
565         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
566         self._get_tgt(client_creds, armor_tgt=armor_tgt,
567                       expected_error=KDC_ERR_POLICY)
568
569         # Otherwise, authentication should succeed.
570         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
571         self._get_tgt(client_creds, armor_tgt=armor_tgt,
572                       expected_error=0)
573
574     def test_allowed_from_unenforced_silo_not_equals(self):
575         # Create an authentication policy that allows accounts not belonging to
576         # the unenforced silo.
577         policy = self.create_authn_policy(
578             enforced=True,
579             user_allowed_from=(
580                 f'O:SYD:(XA;;CR;;;WD;'
581                 f'(@User.ad://ext/AuthenticationSilo != '
582                 f'"{self._unenforced_silo}"))'),
583         )
584
585         # Create a user account with the assigned policy.
586         client_creds = self._get_creds(account_type=self.AccountType.USER,
587                                        assigned_policy=policy)
588
589         # Show that authentication fails unless the account belongs to a silo
590         # other than the unenforced silo.
591
592         armor_tgt = self.get_tgt(self._mach_creds)
593         self._get_tgt(client_creds, armor_tgt=armor_tgt,
594                       expected_error=KDC_ERR_POLICY)
595
596         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
597         self._get_tgt(client_creds, armor_tgt=armor_tgt,
598                       expected_error=KDC_ERR_POLICY)
599
600         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
601         self._get_tgt(client_creds, armor_tgt=armor_tgt,
602                       expected_error=0)
603
604     def test_allowed_from_enforced_silo_not_equals(self):
605         # Create an authentication policy that allows accounts not belonging to
606         # the enforced silo.
607         policy = self.create_authn_policy(
608             enforced=True,
609             user_allowed_from=(
610                 f'O:SYD:(XA;;CR;;;WD;'
611                 f'(@User.ad://ext/AuthenticationSilo != '
612                 f'"{self._enforced_silo}"))'),
613         )
614
615         # Create a user account with the assigned policy.
616         client_creds = self._get_creds(account_type=self.AccountType.USER,
617                                        assigned_policy=policy)
618
619         # Show that authentication always fails, as none of the machine
620         # accounts belong to a silo that is not the enforced one. (The
621         # unenforced silo doesn’t count, as it will never appear in a claim.)
622
623         armor_tgt = self.get_tgt(self._mach_creds)
624         self._get_tgt(client_creds, armor_tgt=armor_tgt,
625                       expected_error=KDC_ERR_POLICY)
626
627         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
628         self._get_tgt(client_creds, armor_tgt=armor_tgt,
629                       expected_error=KDC_ERR_POLICY)
630
631         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
632         self._get_tgt(client_creds, armor_tgt=armor_tgt,
633                       expected_error=KDC_ERR_POLICY)
634
635     def test_allowed_from_unenforced_silo_equals_deny(self):
636         # Create an authentication policy that denies accounts belonging to the
637         # unenforced silo, and allows other accounts.
638         policy = self.create_authn_policy(
639             enforced=True,
640             user_allowed_from=(
641                 f'O:SYD:(XD;;CR;;;WD;'
642                 f'(@User.ad://ext/AuthenticationSilo == '
643                 f'"{self._unenforced_silo}"))'
644                 f'(A;;CR;;;WD)'),
645         )
646
647         # Create a user account with the assigned policy.
648         client_creds = self._get_creds(account_type=self.AccountType.USER,
649                                        assigned_policy=policy)
650
651         # Show that authentication fails unless the account belongs to a silo
652         # other than the unenforced silo.
653
654         armor_tgt = self.get_tgt(self._mach_creds)
655         self._get_tgt(client_creds, armor_tgt=armor_tgt,
656                       expected_error=KDC_ERR_POLICY)
657
658         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
659         self._get_tgt(client_creds, armor_tgt=armor_tgt,
660                       expected_error=KDC_ERR_POLICY)
661
662         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
663         self._get_tgt(client_creds, armor_tgt=armor_tgt,
664                       expected_error=0)
665
666     def test_allowed_from_enforced_silo_equals_deny(self):
667         # Create an authentication policy that denies accounts belonging to the
668         # enforced silo, and allows other accounts.
669         policy = self.create_authn_policy(
670             enforced=True,
671             user_allowed_from=(
672                 f'O:SYD:(XD;;CR;;;WD;'
673                 f'(@User.ad://ext/AuthenticationSilo == '
674                 f'"{self._enforced_silo}"))'
675                 f'(A;;CR;;;WD)'),
676         )
677
678         # Create a user account with the assigned policy.
679         client_creds = self._get_creds(account_type=self.AccountType.USER,
680                                        assigned_policy=policy)
681
682         # Show that authentication always fails, as none of the machine
683         # accounts belong to a silo that is not the enforced one. (The
684         # unenforced silo doesn’t count, as it will never appear in a claim.)
685
686         armor_tgt = self.get_tgt(self._mach_creds)
687         self._get_tgt(client_creds, armor_tgt=armor_tgt,
688                       expected_error=KDC_ERR_POLICY)
689
690         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
691         self._get_tgt(client_creds, armor_tgt=armor_tgt,
692                       expected_error=KDC_ERR_POLICY)
693
694         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
695         self._get_tgt(client_creds, armor_tgt=armor_tgt,
696                       expected_error=KDC_ERR_POLICY)
697
698     def test_allowed_from_unenforced_silo_not_equals_deny(self):
699         # Create an authentication policy that denies accounts not belonging to
700         # the unenforced silo, and allows other accounts.
701         policy = self.create_authn_policy(
702             enforced=True,
703             user_allowed_from=(
704                 f'O:SYD:(XD;;CR;;;WD;'
705                 f'(@User.ad://ext/AuthenticationSilo != '
706                 f'"{self._unenforced_silo}"))'
707                 f'(A;;CR;;;WD)'),
708         )
709
710         # Create a user account with the assigned policy.
711         client_creds = self._get_creds(account_type=self.AccountType.USER,
712                                        assigned_policy=policy)
713
714         # Show that authentication always fails, as the unenforced silo will
715         # never appear in a claim.
716
717         armor_tgt = self.get_tgt(self._mach_creds)
718         self._get_tgt(client_creds, armor_tgt=armor_tgt,
719                       expected_error=KDC_ERR_POLICY)
720
721         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
722         self._get_tgt(client_creds, armor_tgt=armor_tgt,
723                       expected_error=KDC_ERR_POLICY)
724
725         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
726         self._get_tgt(client_creds, armor_tgt=armor_tgt,
727                       expected_error=KDC_ERR_POLICY)
728
729     def test_allowed_from_enforced_silo_not_equals_deny(self):
730         # Create an authentication policy that denies accounts not belonging to
731         # the enforced silo, and allows other accounts.
732         policy = self.create_authn_policy(
733             enforced=True,
734             user_allowed_from=(
735                 f'O:SYD:(XD;;CR;;;WD;'
736                 f'(@User.ad://ext/AuthenticationSilo != '
737                 f'"{self._enforced_silo}"))'
738                 f'(A;;CR;;;WD)'),
739         )
740
741         # Create a user account with the assigned policy.
742         client_creds = self._get_creds(account_type=self.AccountType.USER,
743                                        assigned_policy=policy)
744
745         # Show that authentication fails unless the account belongs to the
746         # enforced silo.
747
748         armor_tgt = self.get_tgt(self._mach_creds)
749         self._get_tgt(client_creds, armor_tgt=armor_tgt,
750                       expected_error=KDC_ERR_POLICY)
751
752         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
753         self._get_tgt(client_creds, armor_tgt=armor_tgt,
754                       expected_error=KDC_ERR_POLICY)
755
756         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
757         self._get_tgt(client_creds, armor_tgt=armor_tgt,
758                       expected_error=0)
759
760     def test_allowed_from_claim_equals_claim(self):
761         # Create a couple of claim types.
762
763         claim0_id = self.get_new_username()
764         self.create_claim(claim0_id,
765                           enabled=True,
766                           attribute='carLicense',
767                           single_valued=True,
768                           source_type='AD',
769                           for_classes=['computer'],
770                           value_type=claims.CLAIM_TYPE_STRING)
771
772         claim1_id = self.get_new_username()
773         self.create_claim(claim1_id,
774                           enabled=True,
775                           attribute='comment',
776                           single_valued=True,
777                           source_type='AD',
778                           for_classes=['computer'],
779                           value_type=claims.CLAIM_TYPE_STRING)
780
781         # Create an authentication policy that allows accounts having the two
782         # claims be equal.
783         policy = self.create_authn_policy(
784             enforced=True,
785             user_allowed_from=(
786                 f'O:SYD:(XA;;CR;;;WD;'
787                 f'(@User.{claim0_id} == @User.{claim1_id}))'),
788         )
789
790         # Create a user account with the assigned policy.
791         client_creds = self._get_creds(account_type=self.AccountType.USER,
792                                        assigned_policy=policy)
793
794         armor_tgt = self.get_tgt(self._mach_creds)
795         self._get_tgt(client_creds, armor_tgt=armor_tgt,
796                       expected_error=KDC_ERR_POLICY)
797
798         mach_creds = self.get_cached_creds(
799             account_type=self.AccountType.COMPUTER,
800             opts={
801                 'additional_details': (
802                     ('carLicense', 'foo'),
803                     ('comment', 'foo'),
804                 ),
805             })
806         armor_tgt = self.get_tgt(
807             mach_creds,
808             expect_client_claims=True,
809             expected_client_claims={
810                 claim0_id: {
811                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
812                     'type': claims.CLAIM_TYPE_STRING,
813                     'values': ('foo',),
814                 },
815                 claim1_id: {
816                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
817                     'type': claims.CLAIM_TYPE_STRING,
818                     'values': ('foo',),
819                 },
820             })
821         self._get_tgt(client_creds, armor_tgt=armor_tgt,
822                       expected_error=0)
823
824     def test_allowed_to_client_equals(self):
825         client_claim_attr = 'carLicense'
826         client_claim_value = 'foo bar'
827         client_claim_values = client_claim_value,
828
829         client_claim_id = self.get_new_username()
830         self.create_claim(client_claim_id,
831                           enabled=True,
832                           attribute=client_claim_attr,
833                           single_valued=True,
834                           source_type='AD',
835                           for_classes=['user'],
836                           value_type=claims.CLAIM_TYPE_STRING)
837
838         # Create an authentication policy that allows authorization if the
839         # client has a particular claim value.
840         policy = self.create_authn_policy(
841             enforced=True,
842             computer_allowed_to=(
843                 f'O:SYD:(XA;;CR;;;WD;'
844                 f'((@User.{client_claim_id} == "{client_claim_value}")))'),
845         )
846
847         # Create a computer account with the assigned policy.
848         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
849                                        assigned_policy=policy)
850
851         armor_tgt = self.get_tgt(self._mach_creds)
852
853         # Create a user account without the claim value.
854         client_creds = self.get_cached_creds(
855             account_type=self.AccountType.USER)
856         tgt = self.get_tgt(client_creds)
857         # Show that obtaining a service ticket is denied.
858         self._tgs_req(
859             tgt, KDC_ERR_POLICY, client_creds, target_creds,
860             armor_tgt=armor_tgt,
861             expect_edata=self.expect_padata_outer,
862             # We aren’t particular about whether or not we get an NTSTATUS.
863             expect_status=None,
864             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
865             check_patypes=False)
866
867         # Create a user account with the claim value.
868         client_creds = self.get_cached_creds(
869             account_type=self.AccountType.USER,
870             opts={
871                 'additional_details': (
872                     (client_claim_attr, client_claim_values),
873                 ),
874             })
875         tgt = self.get_tgt(
876             client_creds,
877             expect_client_claims=True,
878             expected_client_claims={
879                 client_claim_id: {
880                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
881                     'type': claims.CLAIM_TYPE_STRING,
882                     'values': client_claim_values,
883                 },
884             })
885         # Show that obtaining a service ticket is allowed.
886         self._tgs_req(tgt, 0, client_creds, target_creds,
887                       armor_tgt=armor_tgt)
888
889     def test_allowed_to_device_equals(self):
890         device_claim_attr = 'carLicense'
891         device_claim_value = 'bar'
892         device_claim_values = device_claim_value,
893
894         device_claim_id = self.get_new_username()
895         self.create_claim(device_claim_id,
896                           enabled=True,
897                           attribute=device_claim_attr,
898                           single_valued=True,
899                           source_type='AD',
900                           for_classes=['computer'],
901                           value_type=claims.CLAIM_TYPE_STRING)
902
903         # Create a user account.
904         client_creds = self.get_cached_creds(
905             account_type=self.AccountType.USER)
906         tgt = self.get_tgt(client_creds)
907
908         # Create an authentication policy that allows authorization if the
909         # device has a particular claim value.
910         policy = self.create_authn_policy(
911             enforced=True,
912             computer_allowed_to=(
913                 f'O:SYD:(XA;;CR;;;WD;'
914                 f'(@Device.{device_claim_id} == "{device_claim_value}"))'),
915         )
916
917         # Create a computer account with the assigned policy.
918         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
919                                        assigned_policy=policy)
920
921         armor_tgt = self.get_tgt(self._mach_creds)
922         # Show that obtaining a service ticket is denied when the claim value
923         # is not present.
924         self._tgs_req(
925             tgt, KDC_ERR_POLICY, client_creds, target_creds,
926             armor_tgt=armor_tgt,
927             expect_edata=self.expect_padata_outer,
928             # We aren’t particular about whether or not we get an NTSTATUS.
929             expect_status=None,
930             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
931             check_patypes=False)
932
933         mach_creds = self.get_cached_creds(
934             account_type=self.AccountType.COMPUTER,
935             opts={
936                 'additional_details': (
937                     (device_claim_attr, device_claim_values),
938                 ),
939             })
940         armor_tgt = self.get_tgt(
941             mach_creds,
942             expect_client_claims=True,
943             expected_client_claims={
944                 device_claim_id: {
945                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
946                     'type': claims.CLAIM_TYPE_STRING,
947                     'values': device_claim_values,
948                 },
949             })
950         # Show that obtaining a service ticket is allowed when the claim value
951         # is present.
952         self._tgs_req(tgt, 0, client_creds, target_creds,
953                       armor_tgt=armor_tgt)
954
955     claim_against_claim_cases = [
956         # If either side is missing, the result is unknown.
957         ((), '==', (), None),
958         ((), '!=', (), None),
959         ('a', '==', (), None),
960         ((), '==', 'b', None),
961         # Straightforward equality and inequality checks work.
962         ('foo', '==', 'foo', True),
963         ('foo', '==', 'bar', False),
964         ('foo', '!=', 'foo', False),
965         ('foo', '!=', 'bar', True),
966         # We can perform less‐than and greater‐than operations.
967         ('cat', '<', 'dog', True),
968         ('cat', '<=', 'dog', True),
969         ('cat', '>', 'dog', False),
970         ('cat', '>=', 'dog', False),
971         ('foo', '<=', 'foo', True),
972         ('foo', '>=', 'foo', True),
973         ('foo', '<', 'foo bar', True),
974         ('foo bar', '>', 'foo', True),
975         # String comparison is case‐sensitive.
976         ('foo bar', '==', 'Foo BAR', True),
977         ('foo bar', '==', 'FOO BAR', True),
978         ('ćàț', '==', 'ĆÀȚ', True),
979         ('ḽ', '==', 'Ḽ', True),
980         ('ⅸ', '==', 'Ⅸ', True),
981         ('ꙭ', '==', 'Ꙭ', True),
982         ('ⱦ', '==', 'Ⱦ', True),  # Lowercased variant added in Unicode 5.0.
983         ('ԛԣ', '==', 'ԚԢ', True),  # All added in Unicode 5.1.
984         ('foo', '<', 'foo', True),
985         ('ćàș', '<', 'ĆÀȚ', True),
986         ('cat', '<', 'ćàț', True),
987         # This is done by converting to UPPER CASE. Hence, both ‘A’ (U+41) and
988         # ‘a’ (U+61) compare less than ‘_’ (U+5F).
989         ('A', '<', '_', True),
990         ('a', '<', '_', True),
991         # But not all uppercased/lowercased pairs are considered to be equal in
992         # this way.
993         ('ß', '<', 'ẞ', True),
994         ('ß', '>', 'SS', True),
995         ('ⳬ', '>', 'Ⳬ', True),  # Added in Unicode 5.2.
996         ('ʞ', '<', 'Ʞ', True),  # Uppercased variant added in Unicode 6.0.
997         ('ʞ', '<', 'ʟ', True),  # U+029E < U+029F < U+A7B0 (upper variant, Ʞ)
998         ('ꞧ', '>', 'Ꞧ', True),  # Added in Unicode 6.0.
999         ('ɜ', '<', 'Ɜ', True),  # Uppercased variant added in Unicode 7.0.
1000         #
1001         # Strings are compared as UTF‐16 code units, rather than as Unicode
1002         # codepoints. So while you might expect ‘𐀀’ (U+10000) to compare
1003         # greater than ‘豈’ (U+F900), it is actually considered to be the
1004         # *smaller* of the pair. That is because it is encoded as a sequence of
1005         # two code units, 0xd800 and 0xdc00, which combination compares less
1006         # than the single code unit 0xf900.
1007         ('ퟻ', '<', '𐀀', True),
1008         ('𐀀', '<', '豈', True),
1009         ('ퟻ', '<', '豈', True),
1010         # Composites can be compared.
1011         (('foo', 'bar'), '==', ('foo', 'bar'), True),
1012         (('foo', 'bar'), '==', ('foo', 'baz'), False),
1013         # The individual components don’t have to match in case.
1014         (('foo', 'bar'), '==', ('FOO', 'BAR'), True),
1015         # Nor must they match in order.
1016         (('foo', 'bar'), '==', ('bar', 'foo'), True),
1017         # Composites of different lengths compare unequal.
1018         (('foo', 'bar'), '!=', 'foo', True),
1019         (('foo', 'bar'), '!=', ('foo', 'bar', 'baz'), True),
1020         # But composites don’t have a defined ordering, and aren’t considered
1021         # greater or lesser than one another.
1022         (('foo', 'bar'), '<', ('foo', 'bar'), None),
1023         (('foo', 'bar'), '<=', ('foo', 'bar'), None),
1024         (('foo', 'bar'), '>', ('foo', 'bar', 'baz'), None),
1025         (('foo', 'bar'), '>=', ('foo', 'bar', 'baz'), None),
1026         # We can test for containment.
1027         (('foo', 'bar'), 'Contains', ('FOO'), True),
1028         (('foo', 'bar'), 'Contains', ('foo', 'bar'), True),
1029         (('foo', 'bar'), 'Not_Contains', ('foo', 'bar'), False),
1030         (('foo', 'bar'), 'Contains', ('foo', 'bar', 'baz'), False),
1031         (('foo', 'bar'), 'Not_Contains', ('foo', 'bar', 'baz'), True),
1032         # We can test whether the operands have any elements in common.
1033         ('foo', 'Any_of', 'foo', True),
1034         (('foo', 'bar'), 'Any_of', 'BAR', True),
1035         (('foo', 'bar'), 'Any_of', 'baz', False),
1036         (('foo', 'bar'), 'Not_Any_of', 'baz', True),
1037         (('foo', 'bar'), 'Any_of', ('bar', 'baz'), True),
1038         (('foo', 'bar'), 'Not_Any_of', ('bar', 'baz'), False),
1039     ]
1040
1041     claim_against_literal_cases = [
1042         # String comparisons also work against literals.
1043         ('foo bar', '==', '"foo bar"', True),
1044         # Composites can be compared with literals.
1045         ('bar', '==', '{{"bar"}}', True),
1046         (('apple', 'banana'), '==', '{{"APPLE", "BANANA"}}', True),
1047         (('apple', 'banana'), '==', '{{"BANANA", "APPLE"}}', True),
1048         (('apple', 'banana'), '==', '{{"apple", "banana", "apple"}}', False),
1049         # We can test for containment.
1050         ((), 'Contains', '{{"foo"}}', None),
1051         ((), 'Not_Contains', '{{"foo", "bar"}}', None),
1052         ('bar', 'Contains', '{{"bar"}}', True),
1053         (('foo', 'bar'), 'Contains', '{{"foo", "bar"}}', True),
1054         (('foo', 'bar'), 'Contains', '{{"foo", "bar", "baz"}}', False),
1055         # The right‐hand side of Contains or Not_Contains does not have to be a
1056         # composite.
1057         ('foo', 'Contains', '"foo"', True),
1058         (('foo', 'bar'), 'Not_Contains', '"foo"', False),
1059         # It’s fine if the right‐hand side contains duplicate elements.
1060         (('foo', 'bar'), 'Contains', '{{"foo", "bar", "bar"}}', True),
1061         # We can test whether the operands have any elements in common.
1062         ('bar', 'Any_of', '{{"bar"}}', True),
1063         (('foo', 'bar'), 'Any_of', '{{"bar", "baz"}}', True),
1064         (('foo', 'bar'), 'Any_of', '{{"baz"}}', False),
1065         # The right‐hand side of Any_of or Not_Any_of must be a composite.
1066         ('foo', 'Any_of', '"foo"', None),
1067         (('foo', 'bar'), 'Not_Any_of', '"baz"', None),
1068         # A string won’t compare equal to a numeric literal.
1069         ('42', '==', '"42"', True),
1070         ('42', '==', '42', None),
1071         # Nor can composites that mismatch in type be compared.
1072         (('123', '456'), '==', '{{"123", "456"}}', True),
1073         (('654', '321'), '==', '{{654, 321}}', None),
1074         (('foo', 'bar'), 'Contains', '{{1, 2, 3}}', None),
1075     ]
1076
1077     def _test_cmp_with_args(self, lhs, op, rhs, outcome, rhs_is_literal=False):
1078         # Construct a conditional ACE expression that evaluates to True if the
1079         # two claim values are equal.
1080         if rhs_is_literal:
1081             self.assertIsInstance(rhs, str)
1082             rhs = rhs.format(self=self)
1083             expression = f'(@User.{self.claim0_id} {op} {rhs})'
1084         else:
1085             expression = f'(@User.{self.claim0_id} {op} @User.{self.claim1_id})'
1086
1087         # Create an authentication policy that will allow authentication when
1088         # the expression is true, and a second that will deny authentication in
1089         # the same circumstance. By observing the results of authenticating
1090         # against each of these policies in turn, we can determine whether the
1091         # expression evaluates to a True, False, or Unknown value.
1092
1093         allowed_sddl = f'O:SYD:(XA;;CR;;;WD;{expression})'
1094         denied_sddl = f'O:SYD:(XD;;CR;;;WD;{expression})(A;;CR;;;WD)'
1095
1096         allowed_policy = self.create_authn_policy(
1097             enforced=True,
1098             user_allowed_from=allowed_sddl)
1099         denied_policy = self.create_authn_policy(
1100             enforced=True,
1101             user_allowed_from=denied_sddl)
1102
1103         # Create a user account assigned to each policy.
1104         allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1105                                         assigned_policy=allowed_policy)
1106         denied_creds = self._get_creds(account_type=self.AccountType.USER,
1107                                        assigned_policy=denied_policy)
1108
1109         additional_details = ()
1110         if lhs:
1111             additional_details += ((self.claim0_attr, lhs),)
1112         if rhs and not rhs_is_literal:
1113             additional_details += ((self.claim1_attr, rhs),)
1114
1115         # Create a computer account with the provided attribute values.
1116         mach_creds = self.get_cached_creds(
1117             account_type=self.AccountType.COMPUTER,
1118             opts={'additional_details': additional_details})
1119
1120         def expected_values(val):
1121             if isinstance(val, (str, bytes)):
1122                 return val,
1123
1124             return val
1125
1126         expected_client_claims = {}
1127         if lhs:
1128             expected_client_claims[self.claim0_id] = {
1129                 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1130                 'type': claims.CLAIM_TYPE_STRING,
1131                 'values': expected_values(lhs),
1132             }
1133         if rhs and not rhs_is_literal:
1134             expected_client_claims[self.claim1_id] = {
1135                 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1136                 'type': claims.CLAIM_TYPE_STRING,
1137                 'values': expected_values(rhs),
1138             }
1139
1140         # Fetch the computer account’s TGT, and ensure it contains the claims.
1141         armor_tgt = self.get_tgt(
1142             mach_creds,
1143             expect_client_claims=bool(expected_client_claims) or None,
1144             expected_client_claims=expected_client_claims)
1145
1146         # The first or the second authentication request is expected to succeed
1147         # if the outcome is True or False, respectively. An Unknown outcome,
1148         # represented by None, will result in a policy error in either case.
1149         allowed_error = 0 if outcome is True else KDC_ERR_POLICY
1150         denied_error = 0 if outcome is False else KDC_ERR_POLICY
1151
1152         # Attempt to authenticate and ensure that we observe the expected
1153         # results.
1154         self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1155                       expected_error=allowed_error)
1156         self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1157                       expected_error=denied_error)
1158
1159     pac_claim_cases = [
1160         # Test a very simple expression with various claims.
1161         ([
1162             (claims.CLAIMS_SOURCE_TYPE_AD, [
1163                 ('{non_empty_string}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1164             ]),
1165         ], '{non_empty_string}', True),
1166         ([
1167             (claims.CLAIMS_SOURCE_TYPE_AD, [
1168                 ('{zero_uint}', claims.CLAIM_TYPE_UINT64, [0]),
1169             ]),
1170         ], '{zero_uint}', False),
1171         ([
1172             (claims.CLAIMS_SOURCE_TYPE_AD, [
1173                 ('{nonzero_uint}', claims.CLAIM_TYPE_UINT64, [1]),
1174             ]),
1175         ], '{nonzero_uint}', True),
1176         ([
1177             (claims.CLAIMS_SOURCE_TYPE_AD, [
1178                 ('{zero_uints}', claims.CLAIM_TYPE_UINT64, [0, 0]),
1179             ]),
1180         ], '{zero_uints}', KDC_ERR_GENERIC),
1181         ([
1182             (claims.CLAIMS_SOURCE_TYPE_AD, [
1183                 ('{zero_and_one_uint}', claims.CLAIM_TYPE_UINT64, [0, 1]),
1184             ]),
1185         ], '{zero_and_one_uint}', True),
1186         ([
1187             (claims.CLAIMS_SOURCE_TYPE_AD, [
1188                 ('{one_and_zero_uint}', claims.CLAIM_TYPE_UINT64, [1, 0]),
1189             ]),
1190         ], '{one_and_zero_uint}', True),
1191         ([
1192             (claims.CLAIMS_SOURCE_TYPE_AD, [
1193                 ('{zero_int}', claims.CLAIM_TYPE_INT64, [0]),
1194             ]),
1195         ], '{zero_int}', False),
1196         ([
1197             (claims.CLAIMS_SOURCE_TYPE_AD, [
1198                 ('{nonzero_int}', claims.CLAIM_TYPE_INT64, [1]),
1199             ]),
1200         ], '{nonzero_int}', True),
1201         ([
1202             (claims.CLAIMS_SOURCE_TYPE_AD, [
1203                 ('{zero_ints}', claims.CLAIM_TYPE_INT64, [0, 0]),
1204             ]),
1205         ], '{zero_ints}', KDC_ERR_GENERIC),
1206         ([
1207             (claims.CLAIMS_SOURCE_TYPE_AD, [
1208                 ('{zero_and_one_int}', claims.CLAIM_TYPE_INT64, [0, 1]),
1209             ]),
1210         ], '{zero_and_one_int}', True),
1211         ([
1212             (claims.CLAIMS_SOURCE_TYPE_AD, [
1213                 ('{one_and_zero_int}', claims.CLAIM_TYPE_INT64, [1, 0]),
1214             ]),
1215         ], '{one_and_zero_int}', True),
1216         ([
1217             (claims.CLAIMS_SOURCE_TYPE_AD, [
1218                 ('{false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1219             ]),
1220         ], '{false_boolean}', False),
1221         ([
1222             (claims.CLAIMS_SOURCE_TYPE_AD, [
1223                 ('{true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1224             ]),
1225         ], '{true_boolean}', True),
1226         ([
1227             (claims.CLAIMS_SOURCE_TYPE_AD, [
1228                 ('{false_booleans}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1229             ]),
1230         ], '{false_booleans}', KDC_ERR_GENERIC),
1231         ([
1232             (claims.CLAIMS_SOURCE_TYPE_AD, [
1233                 ('{false_and_true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0, 1]),
1234             ]),
1235         ], '{false_and_true_boolean}', True),
1236         ([
1237             (claims.CLAIMS_SOURCE_TYPE_AD, [
1238                 ('{true_and_false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1, 0]),
1239             ]),
1240         ], '{true_and_false_boolean}', True),
1241         # Test a basic comparison against a literal.
1242         ([
1243             (claims.CLAIMS_SOURCE_TYPE_AD, [
1244                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1245             ]),
1246         ], '{a} == "foo bar"', True),
1247         # Claims can be compared against one another.
1248         ([
1249             (claims.CLAIMS_SOURCE_TYPE_AD, [
1250                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1251                 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO BAR']),
1252             ]),
1253         ], '{a} == {b}', True),
1254         ([
1255             (claims.CLAIMS_SOURCE_TYPE_AD, [
1256                 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO', 'BAR', 'BAZ']),
1257                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'bar', 'baz']),
1258             ]),
1259         ], '{a} != {b}', False),
1260         # Certificate claims are also valid.
1261         ([
1262             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1263                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1264             ]),
1265         ], '{a} == "foo"', True),
1266         # Other claim source types are ignored.
1267         ([
1268             (0, [
1269                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1270             ]),
1271         ], '{a} == "foo"', None),
1272         ([
1273             (3, [
1274                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1275             ]),
1276         ], '{a} == "foo"', None),
1277         # If multiple claims have the same ID, the *last* one takes precedence.
1278         ([
1279             (claims.CLAIMS_SOURCE_TYPE_AD, [
1280                 ('{a}', claims.CLAIM_TYPE_STRING, ['this is not the value…']),
1281                 ('{a}', claims.CLAIM_TYPE_STRING, ['…nor is this…']),
1282             ]),
1283             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1284                 ('{a}', claims.CLAIM_TYPE_STRING, ['…and this isn’t either.']),
1285             ]),
1286             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1287                 ('{a}', claims.CLAIM_TYPE_STRING, ['here’s the actual value!']),
1288             ]),
1289             (3, [
1290                 ('{a}', claims.CLAIM_TYPE_STRING, ['this is a red herring.']),
1291             ]),
1292         ], '{a} == "here’s the actual value!"', True),
1293         # Claim values can be empty.
1294         ([
1295             (claims.CLAIMS_SOURCE_TYPE_AD, [
1296                 ('{empty_claim_string}', claims.CLAIM_TYPE_STRING, []),
1297             ]),
1298         ], '{empty_claim_string} != "foo bar"', None),
1299         ([
1300             (claims.CLAIMS_SOURCE_TYPE_AD, [
1301                 ('{empty_claim_boolean}', claims.CLAIM_TYPE_BOOLEAN, []),
1302             ]),
1303         ], 'Exists {empty_claim_boolean}', None),
1304         # Test unsigned integer equality.
1305         ([
1306             (claims.CLAIMS_SOURCE_TYPE_AD, [
1307                 ('{a}', claims.CLAIM_TYPE_UINT64, [42]),
1308             ]),
1309         ], '{a} == 42', True),
1310         ([
1311             (claims.CLAIMS_SOURCE_TYPE_AD, [
1312                 ('{a}', claims.CLAIM_TYPE_UINT64, [0]),
1313             ]),
1314         ], '{a} == 3', False),
1315         ([
1316             (claims.CLAIMS_SOURCE_TYPE_AD, [
1317                 ('{a}', claims.CLAIM_TYPE_UINT64, [1, 2, 3]),
1318             ]),
1319         ], '{a} == {{1, 2, 3}}', True),
1320         ([
1321             (claims.CLAIMS_SOURCE_TYPE_AD, [
1322                 ('{a}', claims.CLAIM_TYPE_UINT64, [4, 5, 6]),
1323             ]),
1324         ], '{a} != {{1, 2, 3}}', True),
1325         # Test unsigned integer comparison. Ensure we don’t run into any
1326         # integer overflow issues.
1327         ([
1328             (claims.CLAIMS_SOURCE_TYPE_AD, [
1329                 ('{a}', claims.CLAIM_TYPE_UINT64, [1 << 32]),
1330             ]),
1331         ], '{a} > 0', True),
1332         # Test signed integer comparisons.
1333         ([
1334             (claims.CLAIMS_SOURCE_TYPE_AD, [
1335                 ('{a}', claims.CLAIM_TYPE_INT64, [42]),
1336             ]),
1337         ], '{a} == 42', True),
1338         ([
1339             (claims.CLAIMS_SOURCE_TYPE_AD, [
1340                 ('{a}', claims.CLAIM_TYPE_INT64, [42 << 32]),
1341             ]),
1342         ], f'{{a}} == {42 << 32}', True),
1343         # Test boolean claims. Be careful! Windows will *crash* if you send it
1344         # claims that aren’t real booleans (not 0 or 1). I doubt Microsoft will
1345         # consider this a security issue though.
1346         ([
1347             (claims.CLAIMS_SOURCE_TYPE_AD, [
1348                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [2]),
1349                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [3]),
1350             ]),
1351         ], '{a} == {b}', (None, CRASHES_WINDOWS)),
1352         ([
1353             (claims.CLAIMS_SOURCE_TYPE_AD, [
1354                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1355                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1356             ]),
1357         ], '{a} == {b}', True),
1358         ([
1359             (claims.CLAIMS_SOURCE_TYPE_AD, [
1360                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1361             ]),
1362         ], '{a} == 42', None),
1363         ([
1364             (claims.CLAIMS_SOURCE_TYPE_AD, [
1365                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1366                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1367             ]),
1368         ], '{a} && {b}', True),
1369         ([
1370             (claims.CLAIMS_SOURCE_TYPE_AD, [
1371                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1372                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1373             ]),
1374         ], '{a} && {b}', False),
1375         ([
1376             (claims.CLAIMS_SOURCE_TYPE_AD, [
1377                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1378                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1379             ]),
1380         ], '{a} && {b}', False),
1381         ([
1382             (claims.CLAIMS_SOURCE_TYPE_AD, [
1383                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1384                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1385             ]),
1386         ], '{a} || {b}', True),
1387         ([
1388             (claims.CLAIMS_SOURCE_TYPE_AD, [
1389                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1390                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1391             ]),
1392         ], '{a} || {b}', True),
1393         ([
1394             (claims.CLAIMS_SOURCE_TYPE_AD, [
1395                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1396                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1397             ]),
1398         ], '{a} || {b}', False),
1399         ([
1400             (claims.CLAIMS_SOURCE_TYPE_AD, [
1401                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1402             ]),
1403         ], '!({a})', True),
1404         ([
1405             (claims.CLAIMS_SOURCE_TYPE_AD, [
1406                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1407             ]),
1408         ], '!(!(!(!({a}))))', False),
1409         ([
1410             (claims.CLAIMS_SOURCE_TYPE_AD, [
1411                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1412             ]),
1413         ], '!({a} && {a})', True),
1414         ([
1415             (claims.CLAIMS_SOURCE_TYPE_AD, [
1416                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1417                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1418             ]),
1419         ], '{a} && !({b} || {b})', True),
1420         ([
1421             (claims.CLAIMS_SOURCE_TYPE_AD, [
1422                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1423             ]),
1424         ], '!({a}) || !({a})', True),
1425         ([
1426             (claims.CLAIMS_SOURCE_TYPE_AD, [
1427                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1428                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1429             ]),
1430         ], '{a} && !({b})', None),
1431         # Expressions containing the ‘not’ operator are occasionally evaluated
1432         # inconsistently, as evidenced here. ‘a || !a’ evaluates to ‘unknown’…
1433         ([
1434             (claims.CLAIMS_SOURCE_TYPE_AD, [
1435                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1436             ]),
1437         ], '{a} || !({a})', None),
1438         # …but ‘!a || a’ — the same expression, just with the operands switched
1439         # round — evaluates to ‘true’.
1440         ([
1441             (claims.CLAIMS_SOURCE_TYPE_AD, [
1442                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1443             ]),
1444         ], '!({a}) || {a}', True),
1445         # This inconsistency is not observed with other boolean expressions,
1446         # such as ‘a || a’.
1447         ([
1448             (claims.CLAIMS_SOURCE_TYPE_AD, [
1449                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1450             ]),
1451         ], '{a} || ({a} || {a})', True),
1452         ([
1453             (claims.CLAIMS_SOURCE_TYPE_AD, [
1454                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1455             ]),
1456         ], '({b} || {b}) || {b}', True),
1457         # Test a very large claim. Much larger than this, and
1458         # conditional_ace_encode_binary() will refuse to encode the conditions.
1459         ([
1460             (claims.CLAIMS_SOURCE_TYPE_AD, [
1461                 ('{large_claim}', claims.CLAIM_TYPE_STRING, ['z' * 4900]),
1462             ]),
1463         ], f'{{large_claim}} == "{"z" * 4900}"', True),
1464         # Test an even larger claim. Windows does not appear to like receiving
1465         # a claim this large.
1466         ([
1467             (claims.CLAIMS_SOURCE_TYPE_AD, [
1468                 ('{larger_claim}', claims.CLAIM_TYPE_STRING, ['z' * 100000]),
1469             ]),
1470         ], '{larger_claim} > "z"', (True, CRASHES_WINDOWS)),
1471         # Test a great number of claims. Windows does not appear to like
1472         # receiving this many claims.
1473         ([
1474             (claims.CLAIMS_SOURCE_TYPE_AD, [
1475                 ('{many_claims}', claims.CLAIM_TYPE_UINT64,
1476                  list(range(0, 100000))),
1477             ]),
1478         ], '{many_claims} Any_of "99999"', (True, CRASHES_WINDOWS)),
1479         # Test a claim with a very long name. Much larger than this, and
1480         # conditional_ace_encode_binary() will refuse to encode the conditions.
1481         ([
1482             (claims.CLAIMS_SOURCE_TYPE_AD, [
1483                 ('{long_name}', claims.CLAIM_TYPE_STRING, ['a']),
1484             ]),
1485         ], '{long_name} == "a"', {'long_name': 'z' * 4900}, True),
1486         # Test attribute name escaping.
1487         ([
1488             (claims.CLAIMS_SOURCE_TYPE_AD, [
1489                 ('{escaped_claim}', claims.CLAIM_TYPE_STRING, ['claim value']),
1490             ]),
1491         ], '{escaped_claim} == "claim value"',
1492            {'escaped_claim': '(:foo:! /&/ :bar:!)'}, True),
1493         # Test a claim whose name consists entirely of dots.
1494         ([
1495             (claims.CLAIMS_SOURCE_TYPE_AD, [
1496                 ('{dotty_claim}', claims.CLAIM_TYPE_STRING, ['a']),
1497             ]),
1498         ], '{dotty_claim} == "a"', {'dotty_claim': '...'}, True),
1499         # Test a claim whose name consists of the first thousand non‐zero
1500         # Unicode codepoints.
1501         ([
1502             (claims.CLAIMS_SOURCE_TYPE_AD, [
1503                 ('{1000_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1504             ]),
1505         ], '{1000_unicode} == "a"',
1506            {'1000_unicode': ''.join(map(chr, range(1, 1001)))}, True),
1507         # Test a claim whose name consists of some higher Unicode codepoints,
1508         # including non‐BMP ones.
1509         ([
1510             (claims.CLAIMS_SOURCE_TYPE_AD, [
1511                 ('{higher_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1512             ]),
1513         ], '{higher_unicode} == "a"',
1514            {'higher_unicode': ''.join(map(chr, range(0xfe00, 0x10800)))}, True),
1515         # Duplicate claim values are not allowed…
1516         ([
1517             (claims.CLAIMS_SOURCE_TYPE_AD, [
1518                 ('{a}', claims.CLAIM_TYPE_INT64, [42, 42, 42]),
1519             ]),
1520         ], '{a} == {a}', KDC_ERR_GENERIC),
1521         ([
1522             (claims.CLAIMS_SOURCE_TYPE_AD, [
1523                 ('{a}', claims.CLAIM_TYPE_UINT64, [42, 42]),
1524             ]),
1525         ], '{a} == {a}', KDC_ERR_GENERIC),
1526         ([
1527             (claims.CLAIMS_SOURCE_TYPE_AD, [
1528                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'foo']),
1529             ]),
1530         ], '{a} == {a}', KDC_ERR_GENERIC),
1531         ([
1532             (claims.CLAIMS_SOURCE_TYPE_AD, [
1533                 ('{a}', claims.CLAIM_TYPE_STRING, ['FOO', 'foo']),
1534             ]),
1535         ], '{a} == {a}', KDC_ERR_GENERIC),
1536         ([
1537             (claims.CLAIMS_SOURCE_TYPE_AD, [
1538                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1539             ]),
1540         ], '{a} == {a}', KDC_ERR_GENERIC),
1541         # …but it’s OK if duplicate values are spread across multiple claim
1542         # entries.
1543         ([
1544             (claims.CLAIMS_SOURCE_TYPE_AD, [
1545                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1546                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1547             ]),
1548             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1549                 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1550                 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1551             ]),
1552             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1553                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1554                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1555                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1556                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1557             ]),
1558         ], '{dup} == {dup}', True),
1559         # Test invalid claim types. Be careful! Windows will *crash* if you
1560         # send it invalid claim types. I doubt Microsoft will consider this a
1561         # security issue though.
1562         ([
1563             (claims.CLAIMS_SOURCE_TYPE_AD, [
1564                 ('{invalid_sid}', 5, []),
1565             ]),
1566         ], '{invalid_sid} == {invalid_sid}', (None, CRASHES_WINDOWS)),
1567         ([
1568             (claims.CLAIMS_SOURCE_TYPE_AD, [
1569                 ('{invalid_octet_string}', 16, []),
1570             ]),
1571         ], '{invalid_octet_string} == {invalid_octet_string}', (None, CRASHES_WINDOWS)),
1572         # Sending an empty string will crash Windows.
1573         ([
1574             (claims.CLAIMS_SOURCE_TYPE_AD, [
1575                 ('{empty_string}', claims.CLAIM_TYPE_STRING, ['']),
1576             ]),
1577         ], '{empty_string}', (None, CRASHES_WINDOWS)),
1578         # But sending empty arrays is OK.
1579         ([
1580             (claims.CLAIMS_SOURCE_TYPE_AD, [
1581                 ('{empty_array}', claims.CLAIM_TYPE_INT64, []),
1582                 ('{empty_array}', claims.CLAIM_TYPE_UINT64, []),
1583                 ('{empty_array}', claims.CLAIM_TYPE_BOOLEAN, []),
1584                 ('{empty_array}', claims.CLAIM_TYPE_STRING, []),
1585             ]),
1586         ], '{empty_array}', None),
1587     ]
1588
1589     def _test_pac_claim_cmp_with_args(self,
1590                                       pac_claims,
1591                                       expression,
1592                                       claim_map,
1593                                       outcome):
1594         self.assertIsInstance(expression, str)
1595
1596         try:
1597             outcome, crashes_windows = outcome
1598             self.assertIs(crashes_windows, CRASHES_WINDOWS)
1599             if not self.crash_windows:
1600                 self.skipTest('test crashes Windows servers')
1601         except TypeError:
1602             self.assertIsNot(outcome, CRASHES_WINDOWS)
1603
1604         if claim_map is None:
1605             claim_map = {}
1606
1607         claim_ids = {}
1608
1609         def get_claim_id(claim_name):
1610             claim = claim_ids.get(claim_name)
1611             if claim is None:
1612                 claim = claim_map.pop(claim_name, None)
1613                 if claim is None:
1614                     claim = self.get_new_username()
1615
1616                 claim_ids[claim_name] = claim
1617
1618             return claim
1619
1620         def formatted_claim_expression(expr):
1621             formatter = Formatter()
1622             result = []
1623
1624             for literal_text, field_name, format_spec, conversion in (
1625                     formatter.parse(expr)):
1626                 self.assertFalse(format_spec,
1627                                  f'format specifier ({format_spec}) should '
1628                                  f'not be specified')
1629                 self.assertFalse(conversion,
1630                                  f'conversion ({conversion}) should not be '
1631                                  'specified')
1632
1633                 result.append(literal_text)
1634
1635                 if field_name is not None:
1636                     self.assertTrue(field_name,
1637                                     'a field name should be specified')
1638
1639                     claim_id = get_claim_id(field_name)
1640                     claim_id = self.escaped_claim_id(claim_id)
1641                     result.append(f'@User.{claim_id}')
1642
1643             return ''.join(result)
1644
1645         # Construct the conditional ACE expression.
1646         expression = formatted_claim_expression(expression)
1647
1648         self.assertFalse(claim_map, 'unused claim mapping(s) remain')
1649
1650         # Create an authentication policy that will allow authentication when
1651         # the expression is true, and a second that will deny authentication in
1652         # the same circumstance. By observing the results of authenticating
1653         # against each of these policies in turn, we can determine whether the
1654         # expression evaluates to a True, False, or Unknown value.
1655
1656         allowed_sddl = f'O:SYD:(XA;;CR;;;WD;({expression}))'
1657         denied_sddl = f'O:SYD:(XD;;CR;;;WD;({expression}))(A;;CR;;;WD)'
1658
1659         allowed_policy = self.create_authn_policy(
1660             enforced=True,
1661             user_allowed_from=allowed_sddl)
1662         denied_policy = self.create_authn_policy(
1663             enforced=True,
1664             user_allowed_from=denied_sddl)
1665
1666         # Create a user account assigned to each policy.
1667         allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1668                                         assigned_policy=allowed_policy)
1669         denied_creds = self._get_creds(account_type=self.AccountType.USER,
1670                                        assigned_policy=denied_policy)
1671
1672         # Create a computer account.
1673         mach_creds = self.get_cached_creds(
1674             account_type=self.AccountType.COMPUTER)
1675
1676         def expected_values(val):
1677             if isinstance(val, (str, bytes)):
1678                 return val,
1679
1680             return val
1681
1682         # Fetch the computer account’s TGT.
1683         armor_tgt = self.get_tgt(mach_creds)
1684
1685         if pac_claims:
1686             # Replace the claims in the PAC with our own.
1687             armor_tgt = self.modified_ticket(
1688                 armor_tgt,
1689                 modify_pac_fn=partial(self.set_pac_claims,
1690                                       client_claims=pac_claims,
1691                                       claim_ids=claim_ids),
1692                 checksum_keys=self.get_krbtgt_checksum_key())
1693
1694         # The first or the second authentication request is expected to succeed
1695         # if the outcome is True or False, respectively. An Unknown outcome,
1696         # represented by None, will result in a policy error in either case.
1697         if outcome is True:
1698             allowed_error, denied_error = 0, KDC_ERR_POLICY
1699         elif outcome is False:
1700             allowed_error, denied_error = KDC_ERR_POLICY, 0
1701         elif outcome is None:
1702             allowed_error, denied_error = KDC_ERR_POLICY, KDC_ERR_POLICY
1703         else:
1704             allowed_error, denied_error = outcome, outcome
1705
1706         # Attempt to authenticate and ensure that we observe the expected
1707         # results.
1708         self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1709                       expected_error=allowed_error)
1710         self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1711                       expected_error=denied_error)
1712
1713     def test_rbcd_without_aa_asserted_identity(self):
1714         service_sids = {
1715             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1716             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1717         }
1718
1719         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1720                    service_sids=service_sids,
1721                    code=KDC_ERR_BADOPTION,
1722                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1723                    edata=self.expect_padata_outer)
1724
1725         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1726                    service_sids=service_sids,
1727                    code=KDC_ERR_POLICY,
1728                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1729                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1730                    reason=AuditReason.ACCESS_DENIED,
1731                    edata=self.expect_padata_outer)
1732
1733     def test_rbcd_with_aa_asserted_identity(self):
1734         service_sids = {
1735             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1736             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1737             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1738         }
1739
1740         expected_groups = service_sids | {
1741             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1742         }
1743
1744         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1745                    service_sids=service_sids,
1746                    expected_groups=expected_groups)
1747
1748         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1749                    service_sids=service_sids,
1750                    expected_groups=expected_groups)
1751
1752     def test_rbcd_without_service_asserted_identity(self):
1753         service_sids = {
1754             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1755             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1756         }
1757
1758         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1759                    service_sids=service_sids,
1760                    code=KDC_ERR_BADOPTION,
1761                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1762                    edata=self.expect_padata_outer)
1763
1764         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1765                    service_sids=service_sids,
1766                    code=KDC_ERR_POLICY,
1767                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1768                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1769                    reason=AuditReason.ACCESS_DENIED,
1770                    edata=self.expect_padata_outer)
1771
1772     def test_rbcd_with_service_asserted_identity(self):
1773         service_sids = {
1774             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1775             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1776             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1777         }
1778
1779         expected_groups = {
1780             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1781             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1782             # The Application Authority Asserted Identity SID has replaced the
1783             # Service Asserted Identity SID.
1784             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1785             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1786         }
1787
1788         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1789                    service_sids=service_sids,
1790                    expected_groups=expected_groups)
1791
1792         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1793                    service_sids=service_sids,
1794                    expected_groups=expected_groups)
1795
1796     def test_rbcd_without_claims_valid(self):
1797         service_sids = {
1798             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1799             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1800         }
1801
1802         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1803                    service_sids=service_sids,
1804                    code=KDC_ERR_BADOPTION,
1805                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1806                    edata=self.expect_padata_outer)
1807
1808         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1809                    service_sids=service_sids,
1810                    code=KDC_ERR_POLICY,
1811                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1812                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1813                    reason=AuditReason.ACCESS_DENIED,
1814                    edata=self.expect_padata_outer)
1815
1816     def test_rbcd_with_claims_valid(self):
1817         service_sids = {
1818             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1819             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1820             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1821         }
1822
1823         expected_groups = service_sids | {
1824             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1825         }
1826
1827         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1828                    service_sids=service_sids,
1829                    expected_groups=expected_groups)
1830
1831         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1832                    service_sids=service_sids,
1833                    expected_groups=expected_groups)
1834
1835     def test_rbcd_without_compounded_authentication(self):
1836         service_sids = {
1837             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1838             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1839         }
1840
1841         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1842                    service_sids=service_sids,
1843                    code=KDC_ERR_BADOPTION,
1844                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1845                    edata=self.expect_padata_outer)
1846
1847         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1848                    service_sids=service_sids,
1849                    code=KDC_ERR_POLICY,
1850                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1851                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1852                    reason=AuditReason.ACCESS_DENIED,
1853                    edata=self.expect_padata_outer)
1854
1855     def test_rbcd_with_compounded_authentication(self):
1856         service_sids = {
1857             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1858             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1859             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1860         }
1861
1862         expected_groups = {
1863             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1864             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1865             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1866             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1867         }
1868
1869         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1870                    service_sids=service_sids,
1871                    expected_groups=expected_groups)
1872
1873         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1874                    service_sids=service_sids,
1875                    expected_groups=expected_groups)
1876
1877     def test_rbcd_client_without_aa_asserted_identity(self):
1878         client_sids = {
1879             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1880             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1881         }
1882
1883         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1884                    client_sids=client_sids)
1885
1886         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1887                    client_sids=client_sids)
1888
1889     def test_rbcd_client_with_aa_asserted_identity(self):
1890         client_sids = {
1891             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1892             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1893             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1894         }
1895
1896         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1897                    client_sids=client_sids,
1898                    expected_groups=client_sids)
1899
1900         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1901                    client_sids=client_sids,
1902                    expected_groups=client_sids)
1903
1904     def test_rbcd_client_without_service_asserted_identity(self):
1905         client_sids = {
1906             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1907             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1908         }
1909
1910         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1911                    client_sids=client_sids,
1912                    code=KDC_ERR_BADOPTION,
1913                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1914                    edata=self.expect_padata_outer)
1915
1916         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1917                    client_sids=client_sids,
1918                    code=KDC_ERR_POLICY,
1919                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1920                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1921                    reason=AuditReason.ACCESS_DENIED,
1922                    edata=self.expect_padata_outer)
1923
1924     def test_rbcd_client_with_service_asserted_identity(self):
1925         client_sids = {
1926             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1927             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1928             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1929         }
1930
1931         self._rbcd(f'Not_Member_of SID({self.service_asserted_identity})',
1932                    client_sids=client_sids,
1933                    expected_groups=client_sids)
1934
1935         self._rbcd(target_policy=f'Not_Member_of SID({self.service_asserted_identity})',
1936                    client_sids=client_sids,
1937                    expected_groups=client_sids)
1938
1939     def test_rbcd_client_without_claims_valid(self):
1940         client_sids = {
1941             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1942             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1943         }
1944
1945         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1946                    client_sids=client_sids)
1947
1948         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1949                    client_sids=client_sids)
1950
1951     def test_rbcd_client_with_claims_valid(self):
1952         client_sids = {
1953             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1954             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1955             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1956         }
1957
1958         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1959                    client_sids=client_sids,
1960                    expected_groups=client_sids)
1961
1962         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1963                    client_sids=client_sids,
1964                    expected_groups=client_sids)
1965
1966     def test_rbcd_client_without_compounded_authentication(self):
1967         client_sids = {
1968             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1969             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1970         }
1971
1972         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1973                    client_sids=client_sids,
1974                    code=KDC_ERR_BADOPTION,
1975                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1976                    edata=self.expect_padata_outer)
1977
1978         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1979                    client_sids=client_sids,
1980                    code=KDC_ERR_POLICY,
1981                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1982                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1983                    reason=AuditReason.ACCESS_DENIED,
1984                    edata=self.expect_padata_outer)
1985
1986     def test_rbcd_client_with_compounded_authentication(self):
1987         client_sids = {
1988             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1989             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1990             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1991         }
1992
1993         self._rbcd(f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1994                    client_sids=client_sids,
1995                    expected_groups=client_sids)
1996
1997         self._rbcd(target_policy=f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1998                    client_sids=client_sids,
1999                    expected_groups=client_sids)
2000
2001     def test_rbcd_device_without_aa_asserted_identity(self):
2002         device_sids = {
2003             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2004             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2005         }
2006
2007         self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2008                    device_sids=device_sids,
2009                    code=KDC_ERR_BADOPTION,
2010                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2011                    edata=self.expect_padata_outer)
2012
2013         self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2014                    device_sids=device_sids,
2015                    code=KDC_ERR_POLICY,
2016                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2017                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2018                    reason=AuditReason.ACCESS_DENIED,
2019                    edata=self.expect_padata_outer)
2020
2021     def test_rbcd_device_without_aa_asserted_identity_not_memberof(self):
2022         device_sids = {
2023             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2024             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2025         }
2026
2027         self._rbcd(f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2028                    device_sids=device_sids)
2029
2030         self._rbcd(target_policy=f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2031                    device_sids=device_sids)
2032
2033     def test_rbcd_device_with_aa_asserted_identity(self):
2034         device_sids = {
2035             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2036             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2037             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2038         }
2039
2040         self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2041                    device_sids=device_sids)
2042
2043         self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2044                    device_sids=device_sids)
2045
2046     def test_rbcd_device_without_service_asserted_identity(self):
2047         device_sids = {
2048             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2049             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2050         }
2051
2052         self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2053                    device_sids=device_sids,
2054                    code=KDC_ERR_BADOPTION,
2055                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2056                    edata=self.expect_padata_outer)
2057
2058         self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2059                    device_sids=device_sids,
2060                    code=KDC_ERR_POLICY,
2061                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2062                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2063                    reason=AuditReason.ACCESS_DENIED,
2064                    edata=self.expect_padata_outer)
2065
2066     def test_rbcd_device_with_service_asserted_identity(self):
2067         device_sids = {
2068             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2069             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2070             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2071         }
2072
2073         self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2074                    device_sids=device_sids)
2075
2076         self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2077                    device_sids=device_sids)
2078
2079     def test_rbcd_device_without_claims_valid(self):
2080         device_sids = {
2081             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2082             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2083         }
2084
2085         self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2086                    device_sids=device_sids,
2087                    code=KDC_ERR_BADOPTION,
2088                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2089                    edata=self.expect_padata_outer)
2090
2091         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2092                    device_sids=device_sids,
2093                    code=KDC_ERR_POLICY,
2094                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2095                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2096                    reason=AuditReason.ACCESS_DENIED,
2097                    edata=self.expect_padata_outer)
2098
2099     def test_rbcd_device_with_claims_valid(self):
2100         device_sids = {
2101             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2102             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2103             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2104         }
2105
2106         self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2107                    device_sids=device_sids)
2108
2109         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2110                    device_sids=device_sids)
2111
2112     def test_rbcd_device_without_compounded_authentication(self):
2113         device_sids = {
2114             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2115             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2116         }
2117
2118         self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2119                    device_sids=device_sids,
2120                    code=KDC_ERR_BADOPTION,
2121                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2122                    edata=self.expect_padata_outer)
2123
2124         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2125                    device_sids=device_sids,
2126                    code=KDC_ERR_POLICY,
2127                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2128                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2129                    reason=AuditReason.ACCESS_DENIED,
2130                    edata=self.expect_padata_outer)
2131
2132     def test_rbcd_device_with_compounded_authentication(self):
2133         device_sids = {
2134             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2135             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2136             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2137         }
2138
2139         self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2140                    device_sids=device_sids)
2141
2142         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2143                    device_sids=device_sids)
2144
2145     def test_rbcd(self):
2146         self._rbcd('Member_of SID({service_sid})')
2147
2148     def test_rbcd_device_from_rodc(self):
2149         self._rbcd('Member_of SID({service_sid})',
2150                    device_from_rodc=True,
2151                    code=(0, CRASHES_WINDOWS))
2152
2153     def test_rbcd_service_from_rodc(self):
2154         self._rbcd('Member_of SID({service_sid})',
2155                    service_from_rodc=True)
2156
2157     def test_rbcd_device_and_service_from_rodc(self):
2158         self._rbcd('Member_of SID({service_sid})',
2159                    service_from_rodc=True,
2160                    device_from_rodc=True,
2161                    code=(0, CRASHES_WINDOWS))
2162
2163     def test_rbcd_client_from_rodc(self):
2164         self._rbcd('Member_of SID({service_sid})',
2165                    client_from_rodc=True)
2166
2167     def test_rbcd_client_and_device_from_rodc(self):
2168         self._rbcd('Member_of SID({service_sid})',
2169                    client_from_rodc=True,
2170                    device_from_rodc=True,
2171                    code=(0, CRASHES_WINDOWS))
2172
2173     def test_rbcd_client_and_service_from_rodc(self):
2174         self._rbcd('Member_of SID({service_sid})',
2175                    client_from_rodc=True,
2176                    service_from_rodc=True)
2177
2178     def test_rbcd_all_from_rodc(self):
2179         self._rbcd('Member_of SID({service_sid})',
2180                    client_from_rodc=True,
2181                    service_from_rodc=True,
2182                    device_from_rodc=True,
2183                    code=(0, CRASHES_WINDOWS))
2184
2185     def _rbcd(self,
2186               rbcd_expression=None,
2187               *,
2188               code=0,
2189               status=None,
2190               event=AuditEvent.OK,
2191               reason=AuditReason.NONE,
2192               edata=False,
2193               target_policy=None,
2194               client_from_rodc=False,
2195               service_from_rodc=False,
2196               device_from_rodc=False,
2197               client_sids=None,
2198               client_claims=None,
2199               service_sids=None,
2200               service_claims=None,
2201               device_sids=None,
2202               device_claims=None,
2203               expected_groups=None,
2204               expected_device_groups=None,
2205               expected_claims=None):
2206         try:
2207             code, crashes_windows = code
2208             self.assertIs(crashes_windows, CRASHES_WINDOWS)
2209             if not self.crash_windows:
2210                 self.skipTest('test crashes Windows servers')
2211         except TypeError:
2212             self.assertIsNot(code, CRASHES_WINDOWS)
2213
2214         samdb = self.get_samdb()
2215         functional_level = self.get_domain_functional_level(samdb)
2216
2217         if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2218             self.skipTest('RBCD requires FL2008')
2219
2220         domain_sid_str = samdb.get_domain_sid()
2221         domain_sid = security.dom_sid(domain_sid_str)
2222
2223         client_creds = self.get_cached_creds(
2224             account_type=self.AccountType.USER,
2225             opts={
2226                 'allowed_replication_mock': client_from_rodc,
2227                 'revealed_to_mock_rodc': client_from_rodc,
2228             })
2229         client_sid = client_creds.get_sid()
2230
2231         client_username = client_creds.get_username()
2232         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2233                                                  names=[client_username])
2234
2235         client_tkt_options = 'forwardable'
2236         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2237
2238         checksum_key = self.get_krbtgt_checksum_key()
2239
2240         if client_from_rodc or service_from_rodc or device_from_rodc:
2241             rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2242             rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2243             rodc_checksum_key = {
2244                 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2245             }
2246
2247         client_tgt = self.get_tgt(client_creds,
2248                                   kdc_options=client_tkt_options,
2249                                   expected_flags=expected_flags)
2250
2251         # Create a machine account with which to perform FAST.
2252         mach_creds = self.get_cached_creds(
2253             account_type=self.AccountType.COMPUTER,
2254             opts={
2255                 'allowed_replication_mock': device_from_rodc,
2256                 'revealed_to_mock_rodc': device_from_rodc,
2257             })
2258         mach_tgt = self.get_tgt(mach_creds)
2259         device_modify_pac_fn = []
2260         if device_sids is not None:
2261             device_modify_pac_fn.append(partial(self.set_pac_sids,
2262                                                 new_sids=device_sids))
2263         if device_claims is not None:
2264             device_modify_pac_fn.append(partial(self.set_pac_claims,
2265                                                 client_claims=device_claims))
2266         mach_tgt = self.modified_ticket(
2267             mach_tgt,
2268             modify_pac_fn=device_modify_pac_fn,
2269             new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2270             checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2271
2272         service_creds = self.get_cached_creds(
2273             account_type=self.AccountType.COMPUTER,
2274             opts={
2275                 'id': 1,
2276                 'allowed_replication_mock': service_from_rodc,
2277                 'revealed_to_mock_rodc': service_from_rodc,
2278             })
2279         service_tgt = self.get_tgt(service_creds)
2280
2281         service_modify_pac_fn = []
2282         if service_sids is not None:
2283             service_modify_pac_fn.append(partial(self.set_pac_sids,
2284                                                  new_sids=service_sids))
2285         if service_claims is not None:
2286             service_modify_pac_fn.append(partial(self.set_pac_claims,
2287                                                  client_claims=service_claims))
2288         service_tgt = self.modified_ticket(
2289             service_tgt,
2290             modify_pac_fn=service_modify_pac_fn,
2291             new_ticket_key=rodc_krbtgt_key if service_from_rodc else None,
2292             checksum_keys=rodc_checksum_key if service_from_rodc else checksum_key)
2293
2294         if target_policy is None:
2295             policy = None
2296             assigned_policy = None
2297         else:
2298             sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(service_sid=service_creds.get_sid())}))'
2299             policy = self.create_authn_policy(enforced=True,
2300                                               computer_allowed_to=sddl)
2301             assigned_policy = str(policy.dn)
2302
2303         if rbcd_expression is not None:
2304             sddl = f'O:SYD:(XA;;CR;;;WD;({rbcd_expression.format(service_sid=service_creds.get_sid())}))'
2305         else:
2306             sddl = 'O:SYD:(A;;CR;;;WD)'
2307         descriptor = security.descriptor.from_sddl(sddl, domain_sid)
2308         descriptor = ndr_pack(descriptor)
2309
2310         # Create a target account with the assigned policy.
2311         target_creds = self.get_cached_creds(
2312             account_type=self.AccountType.COMPUTER,
2313             opts={
2314                 'assigned_policy': assigned_policy,
2315                 'additional_details': (
2316                     ('msDS-AllowedToActOnBehalfOfOtherIdentity', descriptor),
2317                 ),
2318             })
2319
2320         client_service_tkt = self.get_service_ticket(
2321             client_tgt,
2322             service_creds,
2323             kdc_options=client_tkt_options,
2324             expected_flags=expected_flags)
2325         client_modify_pac_fn = []
2326         if client_sids is not None:
2327             client_modify_pac_fn.append(partial(self.set_pac_sids,
2328                                                 new_sids=client_sids))
2329         if client_claims is not None:
2330             client_modify_pac_fn.append(partial(self.set_pac_claims,
2331                                                 client_claims=client_claims))
2332         client_service_tkt = self.modified_ticket(client_service_tkt,
2333                                                   modify_pac_fn=client_modify_pac_fn,
2334                                                   checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2335
2336         kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
2337
2338         target_decryption_key = self.TicketDecryptionKey_from_creds(
2339             target_creds)
2340         target_etypes = target_creds.tgs_supported_enctypes
2341
2342         service_name = service_creds.get_username()
2343         if service_name[-1] == '$':
2344             service_name = service_name[:-1]
2345         expected_transited_services = [
2346             f'host/{service_name}@{service_creds.get_realm()}'
2347         ]
2348
2349         expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2350         expected_device_groups = self.map_sids(expected_device_groups, None, domain_sid_str)
2351
2352         # Show that obtaining a service ticket with RBCD is allowed.
2353         self._tgs_req(service_tgt, code, service_creds, target_creds,
2354                       armor_tgt=mach_tgt,
2355                       kdc_options=kdc_options,
2356                       pac_options='1001',  # supports claims, RBCD
2357                       expected_cname=client_cname,
2358                       expected_account_name=client_username,
2359                       additional_ticket=client_service_tkt,
2360                       decryption_key=target_decryption_key,
2361                       expected_sid=client_sid,
2362                       expected_groups=expected_groups,
2363                       expect_device_info=bool(expected_device_groups) or None,
2364                       expected_device_domain_sid=domain_sid_str,
2365                       expected_device_groups=expected_device_groups,
2366                       expect_client_claims=bool(expected_claims) or None,
2367                       expected_client_claims=expected_claims,
2368                       expected_supported_etypes=target_etypes,
2369                       expected_proxy_target=target_creds.get_spn(),
2370                       expected_transited_services=expected_transited_services,
2371                       expected_status=status,
2372                       expect_edata=edata)
2373
2374         if code:
2375             effective_client_creds = service_creds
2376         else:
2377             effective_client_creds = client_creds
2378
2379         self.check_tgs_log(effective_client_creds, target_creds,
2380                            policy=policy,
2381                            checked_creds=service_creds,
2382                            status=status,
2383                            event=event,
2384                            reason=reason)
2385
2386     def test_tgs_without_aa_asserted_identity(self):
2387         client_sids = {
2388             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2389             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2390         }
2391
2392         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2393                   client_sids=client_sids,
2394                   code=KDC_ERR_POLICY,
2395                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2396                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2397                   reason=AuditReason.ACCESS_DENIED,
2398                   edata=self.expect_padata_outer)
2399
2400     def test_tgs_without_aa_asserted_identity_client_from_rodc(self):
2401         client_sids = {
2402             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2403             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2404         }
2405
2406         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2407                   client_from_rodc=True,
2408                   client_sids=client_sids,
2409                   code=KDC_ERR_POLICY,
2410                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2411                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2412                   reason=AuditReason.ACCESS_DENIED,
2413                   edata=self.expect_padata_outer)
2414
2415     def test_tgs_without_aa_asserted_identity_device_from_rodc(self):
2416         client_sids = {
2417             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2418             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2419         }
2420
2421         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2422                   device_from_rodc=True,
2423                   client_sids=client_sids,
2424                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2425                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2426                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2427                   reason=AuditReason.ACCESS_DENIED,
2428                   edata=self.expect_padata_outer)
2429
2430     def test_tgs_without_aa_asserted_identity_both_from_rodc(self):
2431         client_sids = {
2432             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2433             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2434         }
2435
2436         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2437                   client_from_rodc=True,
2438                   device_from_rodc=True,
2439                   client_sids=client_sids,
2440                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2441                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2442                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2443                   reason=AuditReason.ACCESS_DENIED,
2444                   edata=self.expect_padata_outer)
2445
2446     def test_tgs_with_aa_asserted_identity(self):
2447         client_sids = {
2448             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2449             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2450             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2451         }
2452
2453         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2454                   client_sids=client_sids,
2455                   expected_groups=client_sids)
2456
2457     def test_tgs_with_aa_asserted_identity_client_from_rodc(self):
2458         client_sids = {
2459             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2460             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2461             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2462         }
2463
2464         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2465                   client_from_rodc=True,
2466                   client_sids=client_sids,
2467                   expected_groups=client_sids)
2468
2469     def test_tgs_with_aa_asserted_identity_device_from_rodc(self):
2470         client_sids = {
2471             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2472             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2473             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2474         }
2475
2476         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2477                   device_from_rodc=True,
2478                   client_sids=client_sids,
2479                   expected_groups=client_sids,
2480                   code=(0, CRASHES_WINDOWS))
2481
2482     def test_tgs_with_aa_asserted_identity_both_from_rodc(self):
2483         client_sids = {
2484             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2485             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2486             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2487         }
2488
2489         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2490                   client_from_rodc=True,
2491                   device_from_rodc=True,
2492                   client_sids=client_sids,
2493                   expected_groups=client_sids,
2494                   code=(0, CRASHES_WINDOWS))
2495
2496     def test_tgs_without_service_asserted_identity(self):
2497         client_sids = {
2498             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2499             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2500         }
2501
2502         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2503                   client_sids=client_sids,
2504                   code=KDC_ERR_POLICY,
2505                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2506                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2507                   reason=AuditReason.ACCESS_DENIED,
2508                   edata=self.expect_padata_outer)
2509
2510     def test_tgs_without_service_asserted_identity_client_from_rodc(self):
2511         client_sids = {
2512             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2513             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2514         }
2515
2516         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2517                   client_from_rodc=True,
2518                   client_sids=client_sids,
2519                   code=KDC_ERR_POLICY,
2520                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2521                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2522                   reason=AuditReason.ACCESS_DENIED,
2523                   edata=self.expect_padata_outer)
2524
2525     def test_tgs_without_service_asserted_identity_device_from_rodc(self):
2526         client_sids = {
2527             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2528             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2529         }
2530
2531         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2532                   device_from_rodc=True,
2533                   client_sids=client_sids,
2534                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2535                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2536                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2537                   reason=AuditReason.ACCESS_DENIED,
2538                   edata=self.expect_padata_outer)
2539
2540     def test_tgs_without_service_asserted_identity_both_from_rodc(self):
2541         client_sids = {
2542             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2543             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2544         }
2545
2546         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2547                   client_from_rodc=True,
2548                   device_from_rodc=True,
2549                   client_sids=client_sids,
2550                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2551                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2552                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2553                   reason=AuditReason.ACCESS_DENIED,
2554                   edata=self.expect_padata_outer)
2555
2556     def test_tgs_with_service_asserted_identity(self):
2557         client_sids = {
2558             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2559             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2560             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2561         }
2562
2563         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2564                   client_sids=client_sids,
2565                   expected_groups=client_sids)
2566
2567     def test_tgs_with_service_asserted_identity_client_from_rodc(self):
2568         client_sids = {
2569             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2570             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2571             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2572         }
2573
2574         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2575                   client_from_rodc=True,
2576                   client_sids=client_sids,
2577                   expected_groups=client_sids)
2578
2579     def test_tgs_with_service_asserted_identity_device_from_rodc(self):
2580         client_sids = {
2581             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2582             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2583             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2584         }
2585
2586         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2587                   device_from_rodc=True,
2588                   client_sids=client_sids,
2589                   expected_groups=client_sids,
2590                   code=(0, CRASHES_WINDOWS))
2591
2592     def test_tgs_with_service_asserted_identity_both_from_rodc(self):
2593         client_sids = {
2594             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2595             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2596             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2597         }
2598
2599         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2600                   client_from_rodc=True,
2601                   device_from_rodc=True,
2602                   client_sids=client_sids,
2603                   expected_groups=client_sids,
2604                   code=(0, CRASHES_WINDOWS))
2605
2606     def test_tgs_without_claims_valid(self):
2607         client_sids = {
2608             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2609             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2610         }
2611
2612         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2613                   client_sids=client_sids,
2614                   code=KDC_ERR_POLICY,
2615                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2616                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2617                   reason=AuditReason.ACCESS_DENIED,
2618                   edata=self.expect_padata_outer)
2619
2620     def test_tgs_without_claims_valid_client_from_rodc(self):
2621         client_sids = {
2622             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2623             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2624         }
2625
2626         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2627                   client_from_rodc=True,
2628                   client_sids=client_sids,
2629                   code=KDC_ERR_POLICY,
2630                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2631                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2632                   reason=AuditReason.ACCESS_DENIED,
2633                   edata=self.expect_padata_outer)
2634
2635     def test_tgs_without_claims_valid_device_from_rodc(self):
2636         client_sids = {
2637             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2638             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2639         }
2640
2641         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2642                   device_from_rodc=True,
2643                   client_sids=client_sids,
2644                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2645                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2646                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2647                   reason=AuditReason.ACCESS_DENIED,
2648                   edata=self.expect_padata_outer)
2649
2650     def test_tgs_without_claims_valid_both_from_rodc(self):
2651         client_sids = {
2652             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2653             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2654         }
2655
2656         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2657                   client_from_rodc=True,
2658                   device_from_rodc=True,
2659                   client_sids=client_sids,
2660                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2661                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2662                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2663                   reason=AuditReason.ACCESS_DENIED,
2664                   edata=self.expect_padata_outer)
2665
2666     def test_tgs_with_claims_valid(self):
2667         client_sids = {
2668             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2669             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2670             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2671         }
2672
2673         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2674                   client_sids=client_sids,
2675                   expected_groups=client_sids)
2676
2677     def test_tgs_with_claims_valid_client_from_rodc(self):
2678         client_sids = {
2679             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2680             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2681             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2682         }
2683
2684         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2685                   client_from_rodc=True,
2686                   client_sids=client_sids,
2687                   expected_groups=client_sids)
2688
2689     def test_tgs_with_claims_valid_device_from_rodc(self):
2690         client_sids = {
2691             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2692             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2693             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2694         }
2695
2696         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2697                   device_from_rodc=True,
2698                   client_sids=client_sids,
2699                   expected_groups=client_sids,
2700                   code=(0, CRASHES_WINDOWS))
2701
2702     def test_tgs_with_claims_valid_both_from_rodc(self):
2703         client_sids = {
2704             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2705             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2706             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2707         }
2708
2709         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2710                   client_from_rodc=True,
2711                   device_from_rodc=True,
2712                   client_sids=client_sids,
2713                   expected_groups=client_sids,
2714                   code=(0, CRASHES_WINDOWS))
2715
2716     def _tgs(self,
2717              target_policy=None,
2718              *,
2719              code=0,
2720              event=AuditEvent.OK,
2721              reason=AuditReason.NONE,
2722              status=None,
2723              edata=False,
2724              use_fast=True,
2725              client_from_rodc=None,
2726              device_from_rodc=None,
2727              client_sids=None,
2728              client_claims=None,
2729              device_sids=None,
2730              device_claims=None,
2731              expected_groups=None,
2732              expected_device_groups=None,
2733              expected_claims=None):
2734         try:
2735             code, crashes_windows = code
2736             self.assertIs(crashes_windows, CRASHES_WINDOWS)
2737             if not self.crash_windows:
2738                 self.skipTest('test crashes Windows servers')
2739         except TypeError:
2740             self.assertIsNot(code, CRASHES_WINDOWS)
2741
2742         if not use_fast:
2743             self.assertIsNone(device_from_rodc)
2744             self.assertIsNone(device_sids)
2745             self.assertIsNone(device_claims)
2746             self.assertIsNone(expected_device_groups)
2747
2748         if client_from_rodc is None:
2749             client_from_rodc = False
2750
2751         if device_from_rodc is None:
2752             device_from_rodc = False
2753
2754         client_creds = self.get_cached_creds(
2755             account_type=self.AccountType.USER,
2756             opts={
2757                 'allowed_replication_mock': client_from_rodc,
2758                 'revealed_to_mock_rodc': client_from_rodc,
2759             })
2760         client_sid = client_creds.get_sid()
2761
2762         client_username = client_creds.get_username()
2763         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2764                                                  names=[client_username])
2765
2766         client_tkt_options = 'forwardable'
2767         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2768
2769         checksum_key = self.get_krbtgt_checksum_key()
2770
2771         if client_from_rodc or device_from_rodc:
2772             rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2773             rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2774             rodc_checksum_key = {
2775                 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2776             }
2777
2778         client_tgt = self.get_tgt(client_creds,
2779                                   kdc_options=client_tkt_options,
2780                                   expected_flags=expected_flags)
2781
2782         client_modify_pac_fn = []
2783         if client_sids is not None:
2784             client_modify_pac_fn.append(partial(self.set_pac_sids,
2785                                                 new_sids=client_sids))
2786         if client_claims is not None:
2787             client_modify_pac_fn.append(partial(self.set_pac_claims,
2788                                                 client_claims=client_claims))
2789         client_tgt = self.modified_ticket(
2790             client_tgt,
2791             modify_pac_fn=client_modify_pac_fn,
2792             new_ticket_key=rodc_krbtgt_key if client_from_rodc else None,
2793             checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2794
2795         if use_fast:
2796             # Create a machine account with which to perform FAST.
2797             mach_creds = self.get_cached_creds(
2798                 account_type=self.AccountType.COMPUTER,
2799                 opts={
2800                     'allowed_replication_mock': device_from_rodc,
2801                     'revealed_to_mock_rodc': device_from_rodc,
2802                 })
2803             mach_tgt = self.get_tgt(mach_creds)
2804             device_modify_pac_fn = []
2805             if device_sids is not None:
2806                 device_modify_pac_fn.append(partial(self.set_pac_sids,
2807                                                     new_sids=device_sids))
2808             if device_claims is not None:
2809                 device_modify_pac_fn.append(partial(self.set_pac_claims,
2810                                                     client_claims=device_claims))
2811             mach_tgt = self.modified_ticket(
2812                 mach_tgt,
2813                 modify_pac_fn=device_modify_pac_fn,
2814                 new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2815                 checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2816         else:
2817             mach_tgt = None
2818
2819         if target_policy is None:
2820             policy = None
2821             assigned_policy = None
2822         else:
2823             sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(client_sid=client_creds.get_sid())}))'
2824             policy = self.create_authn_policy(enforced=True,
2825                                               computer_allowed_to=sddl)
2826             assigned_policy = str(policy.dn)
2827
2828         # Create a target account with the assigned policy.
2829         target_creds = self.get_cached_creds(
2830             account_type=self.AccountType.COMPUTER,
2831             opts={'assigned_policy': assigned_policy})
2832
2833         target_decryption_key = self.TicketDecryptionKey_from_creds(
2834             target_creds)
2835         target_etypes = target_creds.tgs_supported_enctypes
2836
2837         samdb = self.get_samdb()
2838         domain_sid_str = samdb.get_domain_sid()
2839
2840         expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2841         expected_device_groups = self.map_sids(expected_device_groups, None, domain_sid_str)
2842
2843         # Show that obtaining a service ticket is allowed.
2844         self._tgs_req(client_tgt, code, client_creds, target_creds,
2845                       armor_tgt=mach_tgt,
2846                       expected_cname=client_cname,
2847                       expected_account_name=client_username,
2848                       decryption_key=target_decryption_key,
2849                       expected_sid=client_sid,
2850                       expected_groups=expected_groups,
2851                       expect_device_info=bool(expected_device_groups) or None,
2852                       expected_device_domain_sid=domain_sid_str,
2853                       expected_device_groups=expected_device_groups,
2854                       expect_client_claims=bool(expected_claims) or None,
2855                       expected_client_claims=expected_claims,
2856                       expected_supported_etypes=target_etypes,
2857                       expected_status=status,
2858                       expect_edata=edata)
2859
2860         self.check_tgs_log(client_creds, target_creds,
2861                            policy=policy,
2862                            checked_creds=client_creds,
2863                            status=status,
2864                            event=event,
2865                            reason=reason)
2866
2867     def test_conditional_ace_allowed_from_user_deny(self):
2868         # Create a machine account with which to perform FAST.
2869         mach_creds = self.get_cached_creds(
2870             account_type=self.AccountType.COMPUTER)
2871         mach_tgt = self.get_tgt(mach_creds)
2872
2873         # Create an authentication policy that explicitly denies the machine
2874         # account for a user.
2875         allowed = 'O:SYD:(A;;CR;;;WD)'
2876         denied = f'O:SYD:(XD;;CR;;;{mach_creds.get_sid()};(abc))'
2877         policy = self.create_authn_policy(enforced=True,
2878                                           user_allowed_from=denied,
2879                                           service_allowed_from=allowed)
2880
2881         # Create a user account with the assigned policy.
2882         client_creds = self._get_creds(account_type=self.AccountType.USER,
2883                                        assigned_policy=policy)
2884
2885         # Show that we get a policy error when trying to authenticate.
2886         self._get_tgt(client_creds, armor_tgt=mach_tgt,
2887                       expected_error=KDC_ERR_POLICY)
2888
2889         self.check_as_log(
2890             client_creds,
2891             armor_creds=mach_creds,
2892             client_policy=policy,
2893             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2894             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
2895             reason=AuditReason.ACCESS_DENIED,
2896             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
2897
2898
2899 class DeviceRestrictionTests(ConditionalAceBaseTests):
2900     def test_pac_groups_not_present(self):
2901         """Test that authentication fails if the device does not belong to some
2902         required groups.
2903         """
2904
2905         required_sids = {
2906             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
2907             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
2908         }
2909
2910         # Create a machine account with which to perform FAST.
2911         mach_creds = self.get_cached_creds(
2912             account_type=self.AccountType.COMPUTER,
2913             opts={'id': 'device'})
2914         mach_tgt = self.get_tgt(mach_creds)
2915
2916         # Create an authentication policy that requires the device to belong to
2917         # certain groups.
2918         client_policy_sddl = self.allow_if(
2919             f'Member_of {self.sddl_array_from_sids(required_sids)}')
2920         client_policy = self.create_authn_policy(
2921             enforced=True, user_allowed_from=client_policy_sddl)
2922
2923         # Create a user account with the assigned policy.
2924         client_creds = self._get_creds(account_type=self.AccountType.USER,
2925                                        assigned_policy=client_policy)
2926
2927         # Show that authentication fails.
2928         self._armored_as_req(client_creds,
2929                              self.get_krbtgt_creds(),
2930                              mach_tgt,
2931                              expected_error=KDC_ERR_POLICY)
2932
2933         self.check_as_log(
2934             client_creds,
2935             armor_creds=mach_creds,
2936             client_policy=client_policy,
2937             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2938             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
2939             reason=AuditReason.ACCESS_DENIED,
2940             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
2941
2942     def test_pac_groups_present(self):
2943         """Test that authentication succeeds if the device belongs to some
2944         required groups.
2945         """
2946
2947         required_sids = {
2948             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
2949             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
2950         }
2951
2952         device_sids = required_sids | {
2953             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2954             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2955         }
2956
2957         # Create a machine account with which to perform FAST.
2958         mach_creds = self.get_cached_creds(
2959             account_type=self.AccountType.COMPUTER,
2960             opts={'id': 'device'})
2961         mach_tgt = self.get_tgt(mach_creds)
2962
2963         # Add the required groups to the machine account’s TGT.
2964         mach_tgt = self.modified_ticket(
2965             mach_tgt,
2966             modify_pac_fn=partial(self.set_pac_sids,
2967                                   new_sids=device_sids),
2968             checksum_keys=self.get_krbtgt_checksum_key())
2969
2970         # Create an authentication policy that requires the device to belong to
2971         # certain groups.
2972         client_policy_sddl = self.allow_if(
2973             f'Member_of {self.sddl_array_from_sids(required_sids)}')
2974         client_policy = self.create_authn_policy(
2975             enforced=True, user_allowed_from=client_policy_sddl)
2976
2977         # Create a user account with the assigned policy.
2978         client_creds = self._get_creds(account_type=self.AccountType.USER,
2979                                        assigned_policy=client_policy)
2980
2981         # FIXME: we need to pass this parameter only because Samba doesn’t
2982         # handle ‘krbtgt@REALM’ principals correctly (see
2983         # https://bugzilla.samba.org/show_bug.cgi?id=15482).
2984         krbtgt_sname = self.get_krbtgt_sname()
2985
2986         # Show that authentication succeeds.
2987         self._armored_as_req(client_creds,
2988                              self.get_krbtgt_creds(),
2989                              mach_tgt,
2990                              target_sname=krbtgt_sname)
2991
2992         self.check_as_log(client_creds,
2993                           armor_creds=mach_creds,
2994                           client_policy=client_policy)
2995
2996     def test_pac_resource_groups_present(self):
2997         """Test that authentication succeeds if the device belongs to some
2998         required resource groups.
2999         """
3000
3001         required_sids = {
3002             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3003             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3004             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3005         }
3006
3007         device_sids = required_sids | {
3008             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3009             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3010         }
3011
3012         # Create a machine account with which to perform FAST.
3013         mach_creds = self.get_cached_creds(
3014             account_type=self.AccountType.COMPUTER,
3015             opts={'id': 'device'})
3016         mach_tgt = self.get_tgt(mach_creds)
3017
3018         # Add the required groups to the machine account’s TGT.
3019         mach_tgt = self.modified_ticket(
3020             mach_tgt,
3021             modify_pac_fn=partial(self.set_pac_sids,
3022                                   new_sids=device_sids),
3023             checksum_keys=self.get_krbtgt_checksum_key())
3024
3025         # Create an authentication policy that requires the device to belong to
3026         # certain groups.
3027         client_policy_sddl = self.allow_if(
3028             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3029         client_policy = self.create_authn_policy(
3030             enforced=True, user_allowed_from=client_policy_sddl)
3031
3032         # Create a user account with the assigned policy.
3033         client_creds = self._get_creds(account_type=self.AccountType.USER,
3034                                        assigned_policy=client_policy)
3035
3036         # Show that authentication fails.
3037         self._armored_as_req(client_creds,
3038                              self.get_krbtgt_creds(),
3039                              mach_tgt,
3040                              expected_error=KDC_ERR_POLICY)
3041
3042         self.check_as_log(
3043             client_creds,
3044             armor_creds=mach_creds,
3045             client_policy=client_policy,
3046             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3047             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3048             reason=AuditReason.ACCESS_DENIED,
3049             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3050
3051     def test_pac_resource_groups_present_to_service_sid_compression(self):
3052         """Test that authentication succeeds if the device belongs to some
3053         required resource groups, and the request is to a service that supports
3054         SID compression.
3055         """
3056
3057         required_sids = {
3058             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3059             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3060             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3061         }
3062
3063         device_sids = required_sids | {
3064             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3065             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3066         }
3067
3068         # Create a machine account with which to perform FAST.
3069         mach_creds = self.get_cached_creds(
3070             account_type=self.AccountType.COMPUTER,
3071             opts={'id': 'device'})
3072         mach_tgt = self.get_tgt(mach_creds)
3073
3074         # Add the required groups to the machine account’s TGT.
3075         mach_tgt = self.modified_ticket(
3076             mach_tgt,
3077             modify_pac_fn=partial(self.set_pac_sids,
3078                                   new_sids=device_sids),
3079             checksum_keys=self.get_krbtgt_checksum_key())
3080
3081         # Create an authentication policy that requires the device to belong to
3082         # certain groups.
3083         client_policy_sddl = self.allow_if(
3084             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3085         client_policy = self.create_authn_policy(
3086             enforced=True, user_allowed_from=client_policy_sddl)
3087
3088         # Create a user account with the assigned policy.
3089         client_creds = self._get_creds(account_type=self.AccountType.USER,
3090                                        assigned_policy=client_policy)
3091
3092         target_creds = self.get_cached_creds(
3093             account_type=self.AccountType.COMPUTER,
3094             opts={'id': 'target'})
3095
3096         # Show that authentication fails.
3097         self._armored_as_req(client_creds,
3098                              target_creds,
3099                              mach_tgt,
3100                              expected_error=KDC_ERR_POLICY)
3101
3102         self.check_as_log(
3103             client_creds,
3104             armor_creds=mach_creds,
3105             client_policy=client_policy,
3106             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3107             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3108             reason=AuditReason.ACCESS_DENIED,
3109             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3110
3111     def test_pac_resource_groups_present_to_service_no_sid_compression(self):
3112         """Test that authentication succeeds if the device belongs to some
3113         required resource groups, and the request is to a service that does not
3114         support SID compression.
3115         """
3116
3117         required_sids = {
3118             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3119             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3120             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3121         }
3122
3123         device_sids = required_sids | {
3124             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3125             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3126         }
3127
3128         # Create a machine account with which to perform FAST.
3129         mach_creds = self.get_cached_creds(
3130             account_type=self.AccountType.COMPUTER,
3131             opts={'id': 'device'})
3132         mach_tgt = self.get_tgt(mach_creds)
3133
3134         # Add the required groups to the machine account’s TGT.
3135         mach_tgt = self.modified_ticket(
3136             mach_tgt,
3137             modify_pac_fn=partial(self.set_pac_sids,
3138                                   new_sids=device_sids),
3139             checksum_keys=self.get_krbtgt_checksum_key())
3140
3141         # Create an authentication policy that requires the device to belong to
3142         # certain groups.
3143         client_policy_sddl = self.allow_if(
3144             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3145         client_policy = self.create_authn_policy(
3146             enforced=True, user_allowed_from=client_policy_sddl)
3147
3148         # Create a user account with the assigned policy.
3149         client_creds = self._get_creds(account_type=self.AccountType.USER,
3150                                        assigned_policy=client_policy)
3151
3152         target_creds = self.get_cached_creds(
3153             account_type=self.AccountType.COMPUTER,
3154             opts={
3155                 'id': 'target',
3156                 'supported_enctypes': (
3157                     security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
3158                         security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK),
3159                 'sid_compression_support': False,
3160             })
3161
3162         # Show that authentication fails.
3163         self._armored_as_req(client_creds,
3164                              target_creds,
3165                              mach_tgt,
3166                              expected_error=KDC_ERR_POLICY)
3167
3168         self.check_as_log(
3169             client_creds,
3170             armor_creds=mach_creds,
3171             client_policy=client_policy,
3172             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3173             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3174             reason=AuditReason.ACCESS_DENIED,
3175             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3176
3177     def test_pac_well_known_groups_not_present(self):
3178         """Test that authentication fails if the device does not belong to one
3179         or more required well‐known groups.
3180         """
3181
3182         required_sids = {
3183             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3184             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
3185             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3186             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3187         }
3188
3189         device_sids = {
3190             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3191             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3192         }
3193
3194         # Create a machine account with which to perform FAST.
3195         mach_creds = self.get_cached_creds(
3196             account_type=self.AccountType.COMPUTER,
3197             opts={'id': 'device'})
3198         mach_tgt = self.get_tgt(mach_creds)
3199
3200         # Modify the machine account’s TGT to contain only the SID of the
3201         # machine account’s primary group.
3202         mach_tgt = self.modified_ticket(
3203             mach_tgt,
3204             modify_pac_fn=partial(self.set_pac_sids,
3205                                   new_sids=device_sids),
3206             checksum_keys=self.get_krbtgt_checksum_key())
3207
3208         # Create an authentication policy that requires the device to belong to
3209         # certain groups.
3210         client_policy_sddl = self.allow_if(
3211             f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
3212         client_policy = self.create_authn_policy(
3213             enforced=True, user_allowed_from=client_policy_sddl)
3214
3215         # Create a user account with the assigned policy.
3216         client_creds = self._get_creds(account_type=self.AccountType.USER,
3217                                        assigned_policy=client_policy)
3218
3219         # Show that authentication fails.
3220         self._armored_as_req(client_creds,
3221                              self.get_krbtgt_creds(),
3222                              mach_tgt,
3223                              expected_error=KDC_ERR_POLICY)
3224
3225         self.check_as_log(
3226             client_creds,
3227             armor_creds=mach_creds,
3228             client_policy=client_policy,
3229             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3230             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3231             reason=AuditReason.ACCESS_DENIED,
3232             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3233
3234     def test_pac_device_info(self):
3235         """Test the groups of the client and the device after performing a
3236         FAST‐armored AS‐REQ.
3237         """
3238
3239         device_sids = {
3240             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3241             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3242         }
3243
3244         # Create a machine account with which to perform FAST.
3245         mach_creds = self.get_cached_creds(
3246             account_type=self.AccountType.COMPUTER,
3247             opts={'id': 'device'})
3248         mach_tgt = self.get_tgt(mach_creds)
3249
3250         # Add the required groups to the machine account’s TGT.
3251         mach_tgt = self.modified_ticket(
3252             mach_tgt,
3253             modify_pac_fn=partial(self.set_pac_sids,
3254                                   new_sids=device_sids),
3255             checksum_keys=self.get_krbtgt_checksum_key())
3256
3257         # Create a user account.
3258         client_creds = self._get_creds(account_type=self.AccountType.USER)
3259
3260         target_creds = self.get_cached_creds(
3261             account_type=self.AccountType.COMPUTER,
3262             opts={'id': 'target'})
3263
3264         expected_sids = {
3265             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3266             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3267             # The client’s groups are to include the Asserted Identity and
3268             # Claims Valid SIDs.
3269             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3270             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3271         }
3272
3273         samdb = self.get_samdb()
3274         domain_sid_str = samdb.get_domain_sid()
3275
3276         expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3277
3278         # Show that authentication succeeds. Check that the groups in the PAC
3279         # are as expected.
3280         self._armored_as_req(client_creds,
3281                              target_creds,
3282                              mach_tgt,
3283                              expected_groups=expected_sids,
3284                              expect_device_info=False,
3285                              expected_device_groups=None)
3286
3287         self.check_as_log(
3288             client_creds,
3289             armor_creds=mach_creds)
3290
3291     def test_pac_claims_not_present(self):
3292         """Test that authentication fails if the device does not have a
3293         required claim.
3294         """
3295
3296         claim_id = 'the name of the claim'
3297         claim_value = 'the value of the claim'
3298
3299         # Create a machine account with which to perform FAST.
3300         mach_creds = self.get_cached_creds(
3301             account_type=self.AccountType.COMPUTER,
3302             opts={'id': 'device'})
3303         mach_tgt = self.get_tgt(mach_creds)
3304
3305         # Create an authentication policy that requires the device to have a
3306         # certain claim.
3307         client_policy_sddl = self.allow_if(
3308             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3309         client_policy = self.create_authn_policy(
3310             enforced=True, user_allowed_from=client_policy_sddl)
3311
3312         # Create a user account with the assigned policy.
3313         client_creds = self._get_creds(account_type=self.AccountType.USER,
3314                                        assigned_policy=client_policy)
3315
3316         # Show that authentication fails.
3317         self._armored_as_req(client_creds,
3318                              self.get_krbtgt_creds(),
3319                              mach_tgt,
3320                              expected_error=KDC_ERR_POLICY)
3321
3322         self.check_as_log(
3323             client_creds,
3324             armor_creds=mach_creds,
3325             client_policy=client_policy,
3326             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3327             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3328             reason=AuditReason.ACCESS_DENIED,
3329             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3330
3331     def test_pac_claims_present(self):
3332         """Test that authentication succeeds if the device has a required
3333         claim.
3334         """
3335
3336         claim_id = 'the name of the claim'
3337         claim_value = 'the value of the claim'
3338
3339         pac_claims = [
3340             (claims.CLAIMS_SOURCE_TYPE_AD, [
3341                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3342             ]),
3343         ]
3344
3345         # Create a machine account with which to perform FAST.
3346         mach_creds = self.get_cached_creds(
3347             account_type=self.AccountType.COMPUTER,
3348             opts={'id': 'device'})
3349         mach_tgt = self.get_tgt(mach_creds)
3350
3351         # Add the required claim to the machine account’s TGT.
3352         mach_tgt = self.modified_ticket(
3353             mach_tgt,
3354             modify_pac_fn=partial(self.set_pac_claims,
3355                                   client_claims=pac_claims),
3356             checksum_keys=self.get_krbtgt_checksum_key())
3357
3358         # Create an authentication policy that requires the device to have a
3359         # certain claim.
3360         client_policy_sddl = self.allow_if(
3361             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3362         client_policy = self.create_authn_policy(
3363             enforced=True, user_allowed_from=client_policy_sddl)
3364
3365         # Create a user account with the assigned policy.
3366         client_creds = self._get_creds(account_type=self.AccountType.USER,
3367                                        assigned_policy=client_policy)
3368
3369         # FIXME: we need to pass this parameter only because Samba doesn’t
3370         # handle ‘krbtgt@REALM’ principals correctly (see
3371         # https://bugzilla.samba.org/show_bug.cgi?id=15482).
3372         krbtgt_sname = self.get_krbtgt_sname()
3373
3374         # Show that authentication succeeds.
3375         self._armored_as_req(client_creds,
3376                              self.get_krbtgt_creds(),
3377                              mach_tgt,
3378                              target_sname=krbtgt_sname)
3379
3380         self.check_as_log(client_creds,
3381                           armor_creds=mach_creds,
3382                           client_policy=client_policy)
3383
3384     def test_pac_claims_invalid(self):
3385         """Test that authentication fails if the device’s required claim is not
3386         valid.
3387         """
3388
3389         claim_id = 'the name of the claim'
3390         claim_value = 'the value of the claim'
3391
3392         pac_claims = [
3393             (claims.CLAIMS_SOURCE_TYPE_AD, [
3394                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3395             ]),
3396         ]
3397
3398         # The device’s SIDs do not include the Claims Valid SID.
3399         device_sids = {
3400             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3401             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3402         }
3403
3404         # Create a machine account with which to perform FAST.
3405         mach_creds = self.get_cached_creds(
3406             account_type=self.AccountType.COMPUTER,
3407             opts={'id': 'device'})
3408         mach_tgt = self.get_tgt(mach_creds)
3409
3410         # Add the SIDs and the required claim to the machine account’s TGT.
3411         mach_tgt = self.modified_ticket(
3412             mach_tgt,
3413             modify_pac_fn=[
3414                 partial(self.set_pac_claims, client_claims=pac_claims),
3415                 partial(self.set_pac_sids, new_sids=device_sids)],
3416             checksum_keys=self.get_krbtgt_checksum_key())
3417
3418         # Create an authentication policy that requires the device to have a
3419         # certain claim.
3420         client_policy_sddl = self.allow_if(
3421             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3422         client_policy = self.create_authn_policy(
3423             enforced=True, user_allowed_from=client_policy_sddl)
3424
3425         # Create a user account with the assigned policy.
3426         client_creds = self._get_creds(account_type=self.AccountType.USER,
3427                                        assigned_policy=client_policy)
3428
3429         # Show that authentication fails.
3430         self._armored_as_req(client_creds,
3431                              self.get_krbtgt_creds(),
3432                              mach_tgt,
3433                              expected_error=KDC_ERR_POLICY)
3434
3435         self.check_as_log(
3436             client_creds,
3437             armor_creds=mach_creds,
3438             client_policy=client_policy,
3439             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3440             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3441             reason=AuditReason.ACCESS_DENIED,
3442             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3443
3444     def test_device_in_world_group(self):
3445         self._check_device_in_group(security.SID_WORLD)
3446
3447     def test_device_in_network_group(self):
3448         self._check_device_not_in_group(security.SID_NT_NETWORK)
3449
3450     def test_device_in_authenticated_users(self):
3451         self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
3452
3453     def _check_device_in_group(self, group):
3454         self._check_device_membership(group, expect_in_group=True)
3455
3456     def _check_device_not_in_group(self, group):
3457         self._check_device_membership(group, expect_in_group=False)
3458
3459     def _check_device_membership(self, group, *, expect_in_group):
3460         """Test that authentication succeeds or fails when the device is
3461         required to belong to a certain group.
3462         """
3463
3464         # Create a machine account with which to perform FAST.
3465         mach_creds = self.get_cached_creds(
3466             account_type=self.AccountType.COMPUTER,
3467             opts={'id': 'device'})
3468         mach_tgt = self.get_tgt(mach_creds)
3469
3470         # Create an authentication policy that requires the device to belong to
3471         # a certain group.
3472         in_group_sddl = self.allow_if(f'Member_of {{SID({group})}}')
3473         in_group_policy = self.create_authn_policy(
3474             enforced=True, user_allowed_from=in_group_sddl)
3475
3476         # Create a user account with the assigned policy.
3477         client_creds = self._get_creds(account_type=self.AccountType.USER,
3478                                        assigned_policy=in_group_policy)
3479
3480         krbtgt_creds = self.get_krbtgt_creds()
3481
3482         # FIXME: we need to pass this parameter only because Samba doesn’t
3483         # handle ‘krbtgt@REALM’ principals correctly (see
3484         # https://bugzilla.samba.org/show_bug.cgi?id=15482).
3485         krbtgt_sname = self.get_krbtgt_sname()
3486
3487         # Test whether authentication succeeds or fails.
3488         self._armored_as_req(
3489             client_creds,
3490             krbtgt_creds,
3491             mach_tgt,
3492             target_sname=krbtgt_sname,
3493             expected_error=0 if expect_in_group else KDC_ERR_POLICY)
3494
3495         policy_success_args = {}
3496         policy_failure_args = {
3497             'client_policy_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3498             'event': AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3499             'reason': AuditReason.ACCESS_DENIED,
3500             'status': ntstatus.NT_STATUS_INVALID_WORKSTATION,
3501         }
3502
3503         self.check_as_log(client_creds,
3504                           armor_creds=mach_creds,
3505                           client_policy=in_group_policy,
3506                           **(policy_success_args if expect_in_group
3507                              else policy_failure_args))
3508
3509         # Create an authentication policy that requires the device not to belong
3510         # to the group.
3511         not_in_group_sddl = self.allow_if(f'Not_Member_of {{SID({group})}}')
3512         not_in_group_policy = self.create_authn_policy(
3513             enforced=True, user_allowed_from=not_in_group_sddl)
3514
3515         # Create a user account with the assigned policy.
3516         client_creds = self._get_creds(account_type=self.AccountType.USER,
3517                                        assigned_policy=not_in_group_policy)
3518
3519         # Test whether authentication succeeds or fails.
3520         self._armored_as_req(
3521             client_creds,
3522             krbtgt_creds,
3523             mach_tgt,
3524             target_sname=krbtgt_sname,
3525             expected_error=KDC_ERR_POLICY if expect_in_group else 0)
3526
3527         self.check_as_log(client_creds,
3528                           armor_creds=mach_creds,
3529                           client_policy=not_in_group_policy,
3530                           **(policy_failure_args if expect_in_group
3531                              else policy_success_args))
3532
3533
3534 class TgsReqServicePolicyTests(ConditionalAceBaseTests):
3535     def test_pac_groups_not_present(self):
3536         """Test that authorization succeeds if the client does not belong to
3537         some required groups.
3538         """
3539
3540         required_sids = {
3541             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3542             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3543         }
3544
3545         # Create a machine account with which to perform FAST.
3546         mach_creds = self.get_cached_creds(
3547             account_type=self.AccountType.COMPUTER,
3548             opts={'id': 'device'})
3549         mach_tgt = self.get_tgt(mach_creds)
3550
3551         # Create a user account.
3552         client_creds = self._get_creds(account_type=self.AccountType.USER)
3553         client_tgt = self.get_tgt(client_creds)
3554
3555         # Create an authentication policy that requires the client to belong to
3556         # certain groups.
3557         target_policy_sddl = self.allow_if(
3558             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3559         target_policy = self.create_authn_policy(
3560             enforced=True, computer_allowed_to=target_policy_sddl)
3561
3562         # Create a target account with the assigned policy.
3563         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3564                                        assigned_policy=target_policy)
3565
3566         # Show that authorization fails.
3567         self._tgs_req(
3568             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3569             armor_tgt=mach_tgt,
3570             expect_edata=self.expect_padata_outer,
3571             # We aren’t particular about whether or not we get an NTSTATUS.
3572             expect_status=None,
3573             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3574
3575         self.check_tgs_log(
3576             client_creds, target_creds,
3577             policy=target_policy,
3578             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3579             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3580             reason=AuditReason.ACCESS_DENIED)
3581
3582     def test_pac_groups_present(self):
3583         """Test that authorization succeeds if the client belongs to some
3584         required groups.
3585         """
3586
3587         required_sids = {
3588             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3589             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3590         }
3591
3592         client_sids = required_sids | {
3593             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3594             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3595         }
3596
3597         # Create a machine account with which to perform FAST.
3598         mach_creds = self.get_cached_creds(
3599             account_type=self.AccountType.COMPUTER,
3600             opts={'id': 'device'})
3601         mach_tgt = self.get_tgt(mach_creds)
3602
3603         # Create a user account.
3604         client_creds = self._get_creds(account_type=self.AccountType.USER)
3605         client_tgt = self.get_tgt(client_creds)
3606
3607         # Add the required groups to the client’s TGT.
3608         client_tgt = self.modified_ticket(
3609             client_tgt,
3610             modify_pac_fn=partial(self.set_pac_sids,
3611                                   new_sids=client_sids),
3612             checksum_keys=self.get_krbtgt_checksum_key())
3613
3614         # Create an authentication policy that requires the client to belong to
3615         # certain groups.
3616         target_policy_sddl = self.allow_if(
3617             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3618         target_policy = self.create_authn_policy(
3619             enforced=True, computer_allowed_to=target_policy_sddl)
3620
3621         # Create a target account with the assigned policy.
3622         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3623                                        assigned_policy=target_policy)
3624
3625         # Show that authorization succeeds.
3626         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
3627
3628         self.check_tgs_log(client_creds, target_creds,
3629                            policy=target_policy)
3630
3631     def test_pac_resource_groups_present_to_service_sid_compression(self):
3632         """Test that authorization succeeds if the client belongs to some
3633         required resource groups, and the request is to a service that supports
3634         SID compression.
3635         """
3636
3637         required_sids = {
3638             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3639             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3640             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3641         }
3642
3643         client_sids = required_sids | {
3644             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3645             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3646         }
3647
3648         # Create a machine account with which to perform FAST.
3649         mach_creds = self.get_cached_creds(
3650             account_type=self.AccountType.COMPUTER,
3651             opts={'id': 'device'})
3652         mach_tgt = self.get_tgt(mach_creds)
3653
3654         # Create a user account.
3655         client_creds = self._get_creds(account_type=self.AccountType.USER)
3656         client_tgt = self.get_tgt(client_creds)
3657
3658         # Add the required groups to the client’s TGT.
3659         client_tgt = self.modified_ticket(
3660             client_tgt,
3661             modify_pac_fn=partial(self.set_pac_sids,
3662                                   new_sids=client_sids),
3663             checksum_keys=self.get_krbtgt_checksum_key())
3664
3665         # Create an authentication policy that requires the client to belong to
3666         # certain groups.
3667         target_policy_sddl = self.allow_if(
3668             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3669         target_policy = self.create_authn_policy(
3670             enforced=True, computer_allowed_to=target_policy_sddl)
3671
3672         # Create a target account with the assigned policy.
3673         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3674                                        assigned_policy=target_policy)
3675
3676         # Show that authorization fails.
3677         self._tgs_req(
3678             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3679             armor_tgt=mach_tgt,
3680             expect_edata=self.expect_padata_outer,
3681             # We aren’t particular about whether or not we get an NTSTATUS.
3682             expect_status=None,
3683             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3684
3685         self.check_tgs_log(
3686             client_creds, target_creds,
3687             policy=target_policy,
3688             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3689             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3690             reason=AuditReason.ACCESS_DENIED)
3691
3692     def test_pac_resource_groups_present_to_service_no_sid_compression(self):
3693         """Test that authorization succeeds if the client belongs to some
3694         required resource groups, and the request is to a service that does not
3695         support SID compression.
3696         """
3697
3698         required_sids = {
3699             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3700             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3701             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3702         }
3703
3704         client_sids = required_sids | {
3705             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3706             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3707         }
3708
3709         # Create a machine account with which to perform FAST.
3710         mach_creds = self.get_cached_creds(
3711             account_type=self.AccountType.COMPUTER,
3712             opts={'id': 'device'})
3713         mach_tgt = self.get_tgt(mach_creds)
3714
3715         # Create a user account.
3716         client_creds = self._get_creds(account_type=self.AccountType.USER)
3717         client_tgt = self.get_tgt(client_creds)
3718
3719         # Add the required groups to the client’s TGT.
3720         client_tgt = self.modified_ticket(
3721             client_tgt,
3722             modify_pac_fn=partial(self.set_pac_sids,
3723                                   new_sids=client_sids),
3724             checksum_keys=self.get_krbtgt_checksum_key())
3725
3726         # Create an authentication policy that requires the client to belong to
3727         # certain groups.
3728         target_policy_sddl = self.allow_if(
3729             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3730         target_policy = self.create_authn_policy(
3731             enforced=True, computer_allowed_to=target_policy_sddl)
3732
3733         # Create a target account with the assigned policy.
3734         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3735                                        assigned_policy=target_policy,
3736                                        additional_details={
3737                                            'msDS-SupportedEncryptionTypes': str((
3738                                                security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
3739                                                    security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK) | (
3740                                                        security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED))})
3741
3742         # Show that authorization fails.
3743         self._tgs_req(
3744             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3745             armor_tgt=mach_tgt,
3746             expect_edata=self.expect_padata_outer,
3747             # We aren’t particular about whether or not we get an NTSTATUS.
3748             expect_status=None,
3749             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3750
3751         self.check_tgs_log(
3752             client_creds, target_creds,
3753             policy=target_policy,
3754             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3755             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3756             reason=AuditReason.ACCESS_DENIED)
3757
3758     def test_pac_well_known_groups_not_present(self):
3759         """Test that authorization fails if the client does not belong to one
3760         or more required well‐known groups.
3761         """
3762
3763         required_sids = {
3764             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3765             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
3766             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3767             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3768         }
3769
3770         client_sids = {
3771             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3772             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3773         }
3774
3775         # Create a machine account with which to perform FAST.
3776         mach_creds = self.get_cached_creds(
3777             account_type=self.AccountType.COMPUTER,
3778             opts={'id': 'device'})
3779         mach_tgt = self.get_tgt(mach_creds)
3780
3781         # Create a user account.
3782         client_creds = self._get_creds(account_type=self.AccountType.USER)
3783         client_tgt = self.get_tgt(client_creds)
3784
3785         # Modify the client’s TGT to contain only the SID of the client’s
3786         # primary group.
3787         client_tgt = self.modified_ticket(
3788             client_tgt,
3789             modify_pac_fn=partial(self.set_pac_sids,
3790                                   new_sids=client_sids),
3791             checksum_keys=self.get_krbtgt_checksum_key())
3792
3793         # Create an authentication policy that requires the client to belong to
3794         # certain groups.
3795         target_policy_sddl = self.allow_if(
3796             f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
3797         target_policy = self.create_authn_policy(
3798             enforced=True, computer_allowed_to=target_policy_sddl)
3799
3800         # Create a target account with the assigned policy.
3801         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3802                                        assigned_policy=target_policy)
3803
3804         # Show that authorization fails.
3805         self._tgs_req(
3806             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3807             armor_tgt=mach_tgt,
3808             expect_edata=self.expect_padata_outer,
3809             # We aren’t particular about whether or not we get an NTSTATUS.
3810             expect_status=None,
3811             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3812
3813         self.check_tgs_log(
3814             client_creds, target_creds,
3815             policy=target_policy,
3816             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3817             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3818             reason=AuditReason.ACCESS_DENIED)
3819
3820     def test_pac_device_info(self):
3821         self._run_pac_device_info_test()
3822
3823     def test_pac_device_info_no_compound_id_support(self):
3824         self._run_pac_device_info_test(compound_id_support=False)
3825
3826     def test_pac_device_info_no_claims_valid(self):
3827         self._run_pac_device_info_test(device_claims_valid=False)
3828
3829     def _run_pac_device_info_test(self, compound_id_support=True, device_claims_valid=True):
3830         """Test the groups of the client and the device after performing a
3831         FAST‐armored TGS‐REQ.
3832         """
3833
3834         client_claim_id = 'the name of the client’s client claim'
3835         client_claim_value = 'the value of the client’s client claim'
3836
3837         client_claims = [
3838             (claims.CLAIMS_SOURCE_TYPE_AD, [
3839                 (client_claim_id, claims.CLAIM_TYPE_STRING, [client_claim_value]),
3840             ]),
3841         ]
3842
3843         expected_client_claims = {
3844             client_claim_id: {
3845                 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
3846                 'type': claims.CLAIM_TYPE_STRING,
3847                 'values': (client_claim_value,),
3848             },
3849         }
3850
3851         device_claim_id = 'the name of the device’s client claim'
3852         device_claim_value = 'the value of the device’s client claim'
3853
3854         device_claims = [
3855             (claims.CLAIMS_SOURCE_TYPE_AD, [
3856                 (device_claim_id, claims.CLAIM_TYPE_STRING, [device_claim_value]),
3857             ]),
3858         ]
3859
3860         if compound_id_support:
3861             expected_device_claims = {
3862                 device_claim_id: {
3863                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
3864                     'type': claims.CLAIM_TYPE_STRING,
3865                     'values': (device_claim_value,),
3866                 },
3867             }
3868         else:
3869             expected_device_claims = None
3870
3871         # Create a machine account with which to perform FAST.
3872         mach_creds = self.get_cached_creds(
3873             account_type=self.AccountType.COMPUTER,
3874             opts={'id': 'device'})
3875         mach_tgt = self.get_tgt(mach_creds)
3876
3877         client_sids = {
3878             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3879             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3880             # This to ensure we have EXTRA_SIDS set already, as
3881             # windows won't set that flag otherwise when adding one
3882             # more
3883             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3884         }
3885
3886         device_sids = {
3887             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3888             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3889             ('S-1-2-3-4', SidType.EXTRA_SID, self.resource_attrs),
3890             ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
3891         }
3892
3893         if device_claims_valid:
3894             device_sids.add((security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs))
3895
3896         # Modify the machine account’s TGT to contain only the SID of the
3897         # machine account’s primary group.
3898         mach_tgt = self.modified_ticket(
3899             mach_tgt,
3900             modify_pac_fn=[
3901                 partial(self.set_pac_sids,
3902                         new_sids=device_sids),
3903                 partial(self.set_pac_claims, client_claims=device_claims),
3904             ],
3905             checksum_keys=self.get_krbtgt_checksum_key())
3906
3907         # Create a user account.
3908         client_creds = self._get_creds(account_type=self.AccountType.USER)
3909         client_tgt = self.get_tgt(client_creds)
3910
3911         # Modify the client’s TGT to contain only the SID of the client’s
3912         # primary group.
3913         client_tgt = self.modified_ticket(
3914             client_tgt,
3915             modify_pac_fn=[
3916                 partial(self.set_pac_sids,
3917                         new_sids=client_sids),
3918                 partial(self.set_pac_claims, client_claims=client_claims),
3919             ],
3920             checksum_keys=self.get_krbtgt_checksum_key())
3921
3922         # Indicate that Compound Identity is supported.
3923         target_creds, _ = self.get_target(to_krbtgt=False, compound_id=compound_id_support)
3924
3925         expected_sids = {
3926             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3927             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3928             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3929             # The client’s groups are not to include the Asserted Identity and
3930             # Claims Valid SIDs.
3931         }
3932
3933         if compound_id_support:
3934             expected_sids.add((security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs))
3935
3936             expected_device_sids = {
3937                 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3938                 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3939                 ('S-1-2-3-4', SidType.EXTRA_SID, self.resource_attrs),
3940                 ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
3941             }
3942
3943             if device_claims_valid:
3944                 expected_device_sids.add(frozenset([(security.SID_CLAIMS_VALID, SidType.RESOURCE_SID, self.default_attrs)]))
3945         else:
3946             expected_device_sids = None
3947
3948         samdb = self.get_samdb()
3949         domain_sid_str = samdb.get_domain_sid()
3950
3951         expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3952         # The device SIDs will be put into the PAC unmodified.
3953         expected_device_sids = self.map_sids(expected_device_sids, None, domain_sid_str)
3954
3955         # Show that authorization succeeds.
3956         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
3957                       expected_groups=expected_sids,
3958                       expect_device_info=bool(expected_device_sids) or None,
3959                       expected_device_domain_sid=domain_sid_str,
3960                       expected_device_groups=expected_device_sids,
3961                       expect_client_claims=bool(expected_client_claims) or None,
3962                       expected_client_claims=expected_client_claims,
3963                       expect_device_claims=bool(expected_device_claims) or None,
3964                       expected_device_claims=expected_device_claims)
3965
3966         self.check_tgs_log(client_creds, target_creds)
3967
3968     def test_pac_extra_sids_behaviour(self):
3969         """Test the groups of the client and the device after performing a
3970         FAST‐armored TGS‐REQ.
3971         """
3972
3973         # Create a machine account with which to perform FAST.
3974         mach_creds = self.get_cached_creds(
3975             account_type=self.AccountType.COMPUTER,
3976             opts={'id': 'device'})
3977         mach_tgt = self.get_tgt(mach_creds)
3978
3979         client_sids = {
3980             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3981             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3982         }
3983
3984         # Create a user account.
3985         client_creds = self._get_creds(account_type=self.AccountType.USER)
3986         client_tgt = self.get_tgt(client_creds)
3987
3988         # Modify the client’s TGT to contain only the SID of the client’s
3989         # primary group.
3990         client_tgt = self.modified_ticket(
3991             client_tgt,
3992             modify_pac_fn=partial(self.set_pac_sids,
3993                                   new_sids=client_sids),
3994             checksum_keys=self.get_krbtgt_checksum_key())
3995
3996         # Indicate that Compound Identity is supported.
3997         target_creds, _ = self.get_target(to_krbtgt=False, compound_id=True)
3998
3999         expected_sids = {
4000             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4001             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4002             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs)
4003             # The client’s groups are not to include the Asserted Identity and
4004             # Claims Valid SIDs.
4005         }
4006
4007         samdb = self.get_samdb()
4008         domain_sid_str = samdb.get_domain_sid()
4009
4010         expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
4011
4012         # Show that authorization succeeds.
4013         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
4014                       expected_groups=expected_sids)
4015
4016         self.check_tgs_log(client_creds, target_creds)
4017
4018     def test_pac_claims_not_present(self):
4019         """Test that authentication fails if the device does not have a
4020         required claim.
4021         """
4022
4023         claim_id = 'the name of the claim'
4024         claim_value = 'the value of the claim'
4025
4026         # Create a machine account with which to perform FAST.
4027         mach_creds = self.get_cached_creds(
4028             account_type=self.AccountType.COMPUTER,
4029             opts={'id': 'device'})
4030         mach_tgt = self.get_tgt(mach_creds)
4031
4032         # Create an authentication policy that requires the device to have a
4033         # certain claim.
4034         target_policy_sddl = self.allow_if(
4035             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4036         target_policy = self.create_authn_policy(
4037             enforced=True, computer_allowed_to=target_policy_sddl)
4038
4039         # Create a user account.
4040         client_creds = self._get_creds(account_type=self.AccountType.USER)
4041         client_tgt = self.get_tgt(client_creds)
4042
4043         # Create a target account with the assigned policy.
4044         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4045                                        assigned_policy=target_policy)
4046
4047         # Show that authorization fails.
4048         self._tgs_req(
4049             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4050             armor_tgt=mach_tgt,
4051             expect_edata=self.expect_padata_outer,
4052             # We aren’t particular about whether or not we get an NTSTATUS.
4053             expect_status=None,
4054             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4055
4056         self.check_tgs_log(
4057             client_creds,
4058             target_creds,
4059             policy=target_policy,
4060             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4061             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4062             reason=AuditReason.ACCESS_DENIED)
4063
4064     def test_pac_claims_present(self):
4065         """Test that authentication succeeds if the user has a required
4066         claim.
4067         """
4068
4069         claim_id = 'the name of the claim'
4070         claim_value = 'the value of the claim'
4071
4072         pac_claims = [
4073             (claims.CLAIMS_SOURCE_TYPE_AD, [
4074                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4075             ]),
4076         ]
4077
4078         # Create a machine account with which to perform FAST.
4079         mach_creds = self.get_cached_creds(
4080             account_type=self.AccountType.COMPUTER,
4081             opts={'id': 'device'})
4082         mach_tgt = self.get_tgt(mach_creds)
4083
4084         # Create an authentication policy that requires the user to have a
4085         # certain claim.
4086         target_policy_sddl = self.allow_if(
4087             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4088         target_policy = self.create_authn_policy(
4089             enforced=True, computer_allowed_to=target_policy_sddl)
4090
4091         # Create a user account.
4092         client_creds = self._get_creds(account_type=self.AccountType.USER)
4093         client_tgt = self.get_tgt(client_creds)
4094
4095         # Add the required claim to the client’s TGT.
4096         client_tgt = self.modified_ticket(
4097             client_tgt,
4098             modify_pac_fn=partial(self.set_pac_claims,
4099                                   client_claims=pac_claims),
4100             checksum_keys=self.get_krbtgt_checksum_key())
4101
4102         # Create a target account with the assigned policy.
4103         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4104                                        assigned_policy=target_policy)
4105
4106         # Show that authorization succeeds.
4107         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4108
4109         self.check_tgs_log(client_creds, target_creds,
4110                            policy=target_policy)
4111
4112     def test_pac_claims_invalid(self):
4113         """Test that authentication fails if the device’s required claim is not
4114         valid.
4115         """
4116
4117         claim_id = 'the name of the claim'
4118         claim_value = 'the value of the claim'
4119
4120         pac_claims = [
4121             (claims.CLAIMS_SOURCE_TYPE_AD, [
4122                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4123             ]),
4124         ]
4125
4126         # The device’s SIDs do not include the Claims Valid SID.
4127         device_sids = {
4128             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4129             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4130         }
4131
4132         # Create a machine account with which to perform FAST.
4133         mach_creds = self.get_cached_creds(
4134             account_type=self.AccountType.COMPUTER,
4135             opts={'id': 'device'})
4136         mach_tgt = self.get_tgt(mach_creds)
4137
4138         # Create an authentication policy that requires the device to have a
4139         # certain claim.
4140         target_policy_sddl = self.allow_if(
4141             f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4142         target_policy = self.create_authn_policy(
4143             enforced=True, computer_allowed_to=target_policy_sddl)
4144
4145         # Create a user account.
4146         client_creds = self._get_creds(account_type=self.AccountType.USER)
4147         client_tgt = self.get_tgt(client_creds)
4148
4149         # Add the SIDs and the required claim to the client’s TGT.
4150         client_tgt = self.modified_ticket(
4151             client_tgt,
4152             modify_pac_fn=[
4153                 partial(self.set_pac_claims, client_claims=pac_claims),
4154                 partial(self.set_pac_sids, new_sids=device_sids)],
4155             checksum_keys=self.get_krbtgt_checksum_key())
4156
4157         # Create a target account with the assigned policy.
4158         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4159                                        assigned_policy=target_policy)
4160
4161         # Show that authorization fails.
4162         self._tgs_req(
4163             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4164             armor_tgt=mach_tgt,
4165             expect_edata=self.expect_padata_outer,
4166             # We aren’t particular about whether or not we get an NTSTATUS.
4167             expect_status=None,
4168             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4169
4170         self.check_tgs_log(
4171             client_creds,
4172             target_creds,
4173             policy=target_policy,
4174             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4175             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4176             reason=AuditReason.ACCESS_DENIED)
4177
4178     def test_pac_device_claims_not_present(self):
4179         """Test that authorization fails if the device does not have a
4180         required claim.
4181         """
4182
4183         claim_id = 'the name of the claim'
4184         claim_value = 'the value of the claim'
4185
4186         # Create a machine account with which to perform FAST.
4187         mach_creds = self.get_cached_creds(
4188             account_type=self.AccountType.COMPUTER,
4189             opts={'id': 'device'})
4190         mach_tgt = self.get_tgt(mach_creds)
4191
4192         # Create an authentication policy that requires the device to have a
4193         # certain device claim.
4194         target_policy_sddl = self.allow_if(
4195             f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4196         target_policy = self.create_authn_policy(
4197             enforced=True, computer_allowed_to=target_policy_sddl)
4198
4199         # Create a user account.
4200         client_creds = self._get_creds(account_type=self.AccountType.USER)
4201         client_tgt = self.get_tgt(client_creds)
4202
4203         # Create a target account with the assigned policy.
4204         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4205                                        assigned_policy=target_policy)
4206
4207         # Show that authorization fails.
4208         self._tgs_req(
4209             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4210             armor_tgt=mach_tgt,
4211             expect_edata=self.expect_padata_outer,
4212             # We aren’t particular about whether or not we get an NTSTATUS.
4213             expect_status=None,
4214             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4215
4216         self.check_tgs_log(
4217             client_creds,
4218             target_creds,
4219             policy=target_policy,
4220             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4221             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4222             reason=AuditReason.ACCESS_DENIED)
4223
4224     def test_pac_device_claims_present(self):
4225         """Test that authorization succeeds if the device has a required claim.
4226         """
4227
4228         claim_id = 'the name of the claim'
4229         claim_value = 'the value of the claim'
4230
4231         pac_claims = [
4232             (claims.CLAIMS_SOURCE_TYPE_AD, [
4233                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4234             ]),
4235         ]
4236
4237         # Create a machine account with which to perform FAST.
4238         mach_creds = self.get_cached_creds(
4239             account_type=self.AccountType.COMPUTER,
4240             opts={'id': 'device'})
4241         mach_tgt = self.get_tgt(mach_creds)
4242
4243         # Add the required claim to the machine account’s TGT.
4244         mach_tgt = self.modified_ticket(
4245             mach_tgt,
4246             modify_pac_fn=partial(self.set_pac_claims,
4247                                   client_claims=pac_claims),
4248             checksum_keys=self.get_krbtgt_checksum_key())
4249
4250         # Create an authentication policy that requires the device to have a
4251         # certain device claim.
4252         target_policy_sddl = self.allow_if(
4253             f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4254         target_policy = self.create_authn_policy(
4255             enforced=True, computer_allowed_to=target_policy_sddl)
4256
4257         # Create a user account.
4258         client_creds = self._get_creds(account_type=self.AccountType.USER)
4259         client_tgt = self.get_tgt(client_creds)
4260
4261         # Create a target account with the assigned policy.
4262         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4263                                        assigned_policy=target_policy)
4264
4265         # Show that authorization succeeds.
4266         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4267
4268         self.check_tgs_log(client_creds, target_creds,
4269                            policy=target_policy)
4270
4271     def test_pac_device_claims_invalid(self):
4272         """Test that authorization fails if the device’s required claim is not
4273         valid.
4274         """
4275
4276         claim_id = 'the name of the claim'
4277         claim_value = 'the value of the claim'
4278
4279         pac_claims = [
4280             (claims.CLAIMS_SOURCE_TYPE_AD, [
4281                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4282             ]),
4283         ]
4284
4285         # The device’s SIDs do not include the Claims Valid SID.
4286         device_sids = {
4287             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4288             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4289         }
4290
4291         # Create a machine account with which to perform FAST.
4292         mach_creds = self.get_cached_creds(
4293             account_type=self.AccountType.COMPUTER,
4294             opts={'id': 'device'})
4295         mach_tgt = self.get_tgt(mach_creds)
4296
4297         # Add the SIDs and the required claim to the machine account’s TGT.
4298         mach_tgt = self.modified_ticket(
4299             mach_tgt,
4300             modify_pac_fn=[
4301                 partial(self.set_pac_claims, client_claims=pac_claims),
4302                 partial(self.set_pac_sids, new_sids=device_sids)],
4303             checksum_keys=self.get_krbtgt_checksum_key())
4304
4305         # Create an authentication policy that requires the device to have a
4306         # certain claim.
4307         target_policy_sddl = self.allow_if(
4308             f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4309         target_policy = self.create_authn_policy(
4310             enforced=True, computer_allowed_to=target_policy_sddl)
4311
4312         # Create a user account.
4313         client_creds = self._get_creds(account_type=self.AccountType.USER)
4314         client_tgt = self.get_tgt(client_creds)
4315
4316         # Create a target account with the assigned policy.
4317         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4318                                        assigned_policy=target_policy)
4319
4320         # Show that authorization fails.
4321         self._tgs_req(
4322             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4323             armor_tgt=mach_tgt,
4324             expect_edata=self.expect_padata_outer,
4325             # We aren’t particular about whether or not we get an NTSTATUS.
4326             expect_status=None,
4327             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4328
4329         self.check_tgs_log(
4330             client_creds,
4331             target_creds,
4332             policy=target_policy,
4333             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4334             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4335             reason=AuditReason.ACCESS_DENIED)
4336
4337     def test_pac_device_claims_invalid_no_attrs(self):
4338         """Test that authorization fails if the device’s required claim is not
4339         valid.
4340         """
4341
4342         claim_id = 'the name of the claim'
4343         claim_value = 'the value of the claim'
4344
4345         pac_claims = [
4346             (claims.CLAIMS_SOURCE_TYPE_AD, [
4347                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4348             ]),
4349         ]
4350
4351         device_sids = {
4352             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4353             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4354             # The device’s SIDs include the Claims Valid SID, but it has no
4355             # attributes.
4356             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, 0),
4357         }
4358
4359         # Create a machine account with which to perform FAST.
4360         mach_creds = self.get_cached_creds(
4361             account_type=self.AccountType.COMPUTER,
4362             opts={'id': 'device'})
4363         mach_tgt = self.get_tgt(mach_creds)
4364
4365         # Add the SIDs and the required claim to the machine account’s TGT.
4366         mach_tgt = self.modified_ticket(
4367             mach_tgt,
4368             modify_pac_fn=[
4369                 partial(self.set_pac_claims, client_claims=pac_claims),
4370                 partial(self.set_pac_sids, new_sids=device_sids)],
4371             checksum_keys=self.get_krbtgt_checksum_key())
4372
4373         # Create an authentication policy that requires the device to have a
4374         # certain claim.
4375         target_policy_sddl = self.allow_if(
4376             f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4377         target_policy = self.create_authn_policy(
4378             enforced=True, computer_allowed_to=target_policy_sddl)
4379
4380         # Create a user account.
4381         client_creds = self._get_creds(account_type=self.AccountType.USER)
4382         client_tgt = self.get_tgt(client_creds)
4383
4384         # Create a target account with the assigned policy.
4385         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4386                                        assigned_policy=target_policy)
4387
4388         # Show that authorization succeeds.
4389         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4390
4391         self.check_tgs_log(client_creds, target_creds,
4392                            policy=target_policy)
4393
4394     def test_simple_as_req_client_and_target_policy(self):
4395         # Create a machine account with which to perform FAST.
4396         mach_creds = self.get_cached_creds(
4397             account_type=self.AccountType.COMPUTER)
4398         mach_tgt = self.get_tgt(mach_creds)
4399
4400         # Create an authentication policy that explicitly allows the machine
4401         # account for a user.
4402         client_policy_sddl = f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};(Member_of {{SID({mach_creds.get_sid()}), SID({mach_creds.get_sid()})}}))'
4403         client_policy = self.create_authn_policy(enforced=True,
4404                                                  user_allowed_from=client_policy_sddl)
4405
4406         # Create a user account with the assigned policy.
4407         client_creds = self._get_creds(account_type=self.AccountType.USER,
4408                                        assigned_policy=client_policy)
4409
4410         # Create an authentication policy that applies to a computer and
4411         # explicitly allows the user account to obtain a service ticket.
4412         target_policy_sddl = f'O:SYD:(XA;;CR;;;{client_creds.get_sid()};(Member_of SID({client_creds.get_sid()})))'
4413         target_policy = self.create_authn_policy(enforced=True,
4414                                                  computer_allowed_to=target_policy_sddl)
4415
4416         # Create a computer account with the assigned policy.
4417         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4418                                        assigned_policy=target_policy)
4419
4420         expected_groups = {
4421             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4422             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4423             (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY, SidType.EXTRA_SID, self.default_attrs),
4424             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
4425         }
4426
4427         # Show that obtaining a service ticket with an AS‐REQ is allowed.
4428         self._armored_as_req(client_creds,
4429                           target_creds,
4430                           mach_tgt,
4431                           expected_groups=expected_groups)
4432
4433         self.check_as_log(client_creds,
4434                           armor_creds=mach_creds,
4435                           client_policy=client_policy,
4436                           server_policy=target_policy)
4437
4438     def test_device_in_world_group(self):
4439         self._check_device_in_group(security.SID_WORLD)
4440
4441     def test_device_in_network_group(self):
4442         self._check_device_not_in_group(security.SID_NT_NETWORK)
4443
4444     def test_device_in_authenticated_users(self):
4445         self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
4446
4447     def _check_device_in_group(self, group):
4448         self._check_device_membership(group, expect_in_group=True)
4449
4450     def _check_device_not_in_group(self, group):
4451         self._check_device_membership(group, expect_in_group=False)
4452
4453     def _check_device_membership(self, group, *, expect_in_group):
4454         """Test that authentication succeeds or fails when the device is
4455         required to belong to a certain group.
4456         """
4457
4458         # Create a machine account with which to perform FAST.
4459         mach_creds = self.get_cached_creds(
4460             account_type=self.AccountType.COMPUTER,
4461             opts={'id': 'device'})
4462         mach_tgt = self.get_tgt(mach_creds)
4463
4464         # Create an authentication policy that requires the device to belong to
4465         # a certain group.
4466         in_group_sddl = self.allow_if(f'Device_Member_of {{SID({group})}}')
4467         in_group_policy = self.create_authn_policy(
4468             enforced=True, computer_allowed_to=in_group_sddl)
4469
4470         # Create a user account.
4471         client_creds = self._get_creds(account_type=self.AccountType.USER)
4472         client_tgt = self.get_tgt(client_creds)
4473
4474         # Create a target account with the assigned policy.
4475         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4476                                        assigned_policy=in_group_policy)
4477
4478         tgs_success_args = {}
4479         tgs_failure_args = {
4480             'expect_edata': self.expect_padata_outer,
4481             # We aren’t particular about whether or not we get an NTSTATUS.
4482             'expect_status': None,
4483             'expected_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4484         }
4485
4486         # Test whether authorization succeeds or fails.
4487         self._tgs_req(client_tgt,
4488                       0 if expect_in_group else KDC_ERR_POLICY,
4489                       client_creds,
4490                       target_creds,
4491                       armor_tgt=mach_tgt,
4492                       **(tgs_success_args if expect_in_group
4493                       else tgs_failure_args))
4494
4495         policy_success_args = {}
4496         policy_failure_args = {
4497             'status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4498             'event': AuditEvent.KERBEROS_SERVER_RESTRICTION,
4499             'reason': AuditReason.ACCESS_DENIED,
4500         }
4501
4502         self.check_tgs_log(client_creds, target_creds,
4503                            policy=in_group_policy,
4504                            **(policy_success_args if expect_in_group
4505                            else policy_failure_args))
4506
4507         # Create an authentication policy that requires the device not to belong
4508         # to the group.
4509         not_in_group_sddl = self.allow_if(
4510             f'Not_Device_Member_of {{SID({group})}}')
4511         not_in_group_policy = self.create_authn_policy(
4512             enforced=True, computer_allowed_to=not_in_group_sddl)
4513
4514         # Create a target account with the assigned policy.
4515         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4516                                        assigned_policy=not_in_group_policy)
4517
4518         # Test whether authorization succeeds or fails.
4519         self._tgs_req(client_tgt,
4520                       KDC_ERR_POLICY if expect_in_group else 0,
4521                       client_creds,
4522                       target_creds,
4523                       armor_tgt=mach_tgt,
4524                       **(tgs_failure_args if expect_in_group
4525                       else tgs_success_args))
4526
4527         self.check_tgs_log(client_creds, target_creds,
4528                            policy=not_in_group_policy,
4529                            **(policy_failure_args if expect_in_group
4530                               else policy_success_args))
4531
4532     def test_simple_as_req_client_policy_only(self):
4533         # Create a machine account with which to perform FAST.
4534         mach_creds = self.get_cached_creds(
4535             account_type=self.AccountType.COMPUTER)
4536         mach_tgt = self.get_tgt(mach_creds)
4537
4538         # Create an authentication policy that explicitly allows the machine
4539         # account for a user.
4540         client_policy_sddl = f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};(Member_of SID({mach_creds.get_sid()})))'
4541         client_policy = self.create_authn_policy(enforced=True,
4542                                                  user_allowed_from=client_policy_sddl)
4543
4544         # Create a user account with the assigned policy.
4545         client_creds = self._get_creds(account_type=self.AccountType.USER,
4546                                        assigned_policy=client_policy)
4547
4548         expected_groups = {
4549             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4550             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4551             (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY, SidType.EXTRA_SID, self.default_attrs),
4552             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
4553         }
4554
4555         # FIXME: we need to pass this parameter only because Samba doesn’t
4556         # handle ‘krbtgt@REALM’ principals correctly (see
4557         # https://bugzilla.samba.org/show_bug.cgi?id=15482).
4558         krbtgt_sname = self.get_krbtgt_sname()
4559
4560         # Show that obtaining a service ticket with an AS‐REQ is allowed.
4561         self._armored_as_req(client_creds,
4562                           self.get_krbtgt_creds(),
4563                           mach_tgt,
4564                           target_sname=krbtgt_sname,
4565                           expected_groups=expected_groups)
4566
4567         self.check_as_log(client_creds,
4568                           armor_creds=mach_creds,
4569                           client_policy=client_policy)
4570
4571
4572 if __name__ == '__main__':
4573     global_asn1_print = False
4574     global_hexdump = False
4575     import unittest
4576     unittest.main()