2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2023
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys.path.insert(0, 'bin/python')
24 os.environ['PYTHONUNBUFFERED'] = '1'
26 from collections import OrderedDict
27 from functools import partial
29 from string import Formatter
33 from samba import dsdb, ntstatus
34 from samba.dcerpc import claims, krb5pac, security
35 from samba.ndr import ndr_pack, ndr_unpack
37 from samba.tests import DynamicTestCase, env_get_var_value
38 from samba.tests.krb5.authn_policy_tests import (
43 from samba.tests.krb5.raw_testcase import RawKerberosTest
44 from samba.tests.krb5.rfc4120_constants import (
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
53 SidType = RawKerberosTest.SidType
55 global_asn1_print = False
56 global_hexdump = False
59 # When used as a test outcome, indicates that the test can cause a Windows
60 # server to crash, and is to be run with caution.
61 CRASHES_WINDOWS = object()
64 class ConditionalAceBaseTests(AuthnPolicyBaseTests):
65 # Constants for group SID attributes.
66 default_attrs = security.SE_GROUP_DEFAULT_FLAGS
67 resource_attrs = default_attrs | security.SE_GROUP_RESOURCE
69 aa_asserted_identity = (
70 security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
71 service_asserted_identity = security.SID_SERVICE_ASSERTED_IDENTITY
81 self.do_asn1_print = global_asn1_print
82 self.do_hexdump = global_hexdump
85 samdb = self.get_samdb()
88 # Create a machine account with which to perform FAST.
89 cls._mach_creds = self.get_cached_creds(
90 account_type=self.AccountType.COMPUTER)
92 # Create some new groups.
94 group0_name = self.get_new_username()
95 group0_dn = self.create_group(samdb, group0_name)
96 cls._group0_sid = self.get_objectSid(samdb, group0_dn)
98 group1_name = self.get_new_username()
99 group1_dn = self.create_group(samdb, group1_name)
100 cls._group1_sid = self.get_objectSid(samdb, group1_dn)
102 # Create machine accounts with which to perform FAST that belong to
103 # various arrangements of the groups.
105 cls._member_of_both_creds = self.get_cached_creds(
106 account_type=self.AccountType.COMPUTER,
107 opts={'member_of': (group0_dn, group1_dn)})
109 cls._member_of_one_creds = self.get_cached_creds(
110 account_type=self.AccountType.COMPUTER,
111 opts={'member_of': (group1_dn,)})
113 # Create some authentication silos.
114 cls._unenforced_silo = self.create_authn_silo(enforced=False)
115 cls._enforced_silo = self.create_authn_silo(enforced=True)
117 # Create machine accounts with which to perform FAST that belong to
118 # the respective silos.
120 cls._member_of_unenforced_silo = self._get_creds(
121 account_type=self.AccountType.COMPUTER,
122 assigned_silo=self._unenforced_silo,
124 self.add_to_group(str(self._member_of_unenforced_silo.get_dn()),
125 self._unenforced_silo.dn,
126 'msDS-AuthNPolicySiloMembers',
129 cls._member_of_enforced_silo = self._get_creds(
130 account_type=self.AccountType.COMPUTER,
131 assigned_silo=self._enforced_silo,
133 self.add_to_group(str(self._member_of_enforced_silo.get_dn()),
134 self._enforced_silo.dn,
135 'msDS-AuthNPolicySiloMembers',
138 # Create a couple of multi‐valued string claims for testing claim
141 cls.claim0_attr = 'carLicense'
142 cls.claim0_id = self.get_new_username()
143 self.create_claim(cls.claim0_id,
145 attribute=cls.claim0_attr,
148 for_classes=['computer', 'user'],
149 value_type=claims.CLAIM_TYPE_STRING)
151 cls.claim1_attr = 'departmentNumber'
152 cls.claim1_id = self.get_new_username()
153 self.create_claim(cls.claim1_id,
155 attribute=cls.claim1_attr,
158 for_classes=['computer', 'user'],
159 value_type=claims.CLAIM_TYPE_STRING)
163 # For debugging purposes. Prints out the SDDL representation of
164 # authentication policy conditions set by the Windows GUI.
165 def _print_authn_policy_sddl(self, policy_id):
166 policy_dn = self.get_authn_policies_dn()
167 policy_dn.add_child(f'CN={policy_id}')
170 'msDS-ComputerAllowedToAuthenticateTo',
171 'msDS-ServiceAllowedToAuthenticateFrom',
172 'msDS-ServiceAllowedToAuthenticateTo',
173 'msDS-UserAllowedToAuthenticateFrom',
174 'msDS-UserAllowedToAuthenticateTo',
177 samdb = self.get_samdb()
178 res = samdb.search(policy_dn, scope=ldb.SCOPE_BASE, attrs=attrs)
179 self.assertEqual(1, len(res),
180 f'Authentication policy {policy_id} not found')
183 def print_sddl(attr):
184 sd = result.get(attr, idx=0)
188 sec_desc = ndr_unpack(security.descriptor, sd)
189 print(f'{attr}: {sec_desc.as_sddl()}')
194 def sddl_array_from_sids(self, sids):
195 def sddl_from_sid_entry(sid_entry):
196 sid, _, _ = sid_entry
199 return f"{{{', '.join(map(sddl_from_sid_entry, sids))}}}"
201 def allow_if(self, condition):
202 return f'O:SYD:(XA;;CR;;;WD;({condition}))'
205 def escaped_claim_id(claim_id):
206 escapes = '\x00\t\n\x0b\x0c\r !"%&()<=>|'
209 else f'%{ord(c):04x}'
214 class ConditionalAceTests(ConditionalAceBaseTests):
216 def setUpDynamicTestCases(cls):
217 FILTER = env_get_var_value('FILTER', allow_missing=True)
219 # These operators are arranged so that each operator precedes its own
221 op_names = OrderedDict([
222 ('!=', 'does not equal'),
225 ('<=', 'is less than or equals'),
226 ('<', 'is less than'),
228 ('>=', 'exceeds or equals'),
230 ('Not_Any_of', 'matches none of'),
231 ('Any_of', 'matches any of'),
232 ('Not_Contains', 'does not contain'),
233 ('Contains', 'contains'),
234 ('Not_Member_of_Any', 'the user belongs to none of'),
235 ('Not_Device_Member_of_Any', 'the device belongs to none of'), # TODO: no test for this yet
236 ('Device_Member_of_Any', 'the device belongs to any of'), # TODO: no test for this yet
237 ('Not_Device_Member_of', 'the device does not belong to'), # TODO: no test for this yet
238 ('Device_Member_of', 'the device belongs to'),
239 ('Not_Exists', 'there does not exist'),
240 ('Exists', 'there exists'),
241 ('Member_of_Any', 'the user belongs to any of'),
242 ('Not_Member_of', 'the user does not belong to'),
243 ('Member_of', 'the user belongs to'),
247 # This is a safety measure to ensure correct ordering of op_names
248 keys = list(op_names.keys())
249 for i in range(len(keys)):
250 for j in range(i + 1, len(keys)):
251 if keys[i] in keys[j]:
252 raise AssertionError((keys[i], keys[j]))
254 for case in cls.pac_claim_cases:
256 pac_claims, expression, outcome = case
259 pac_claims, expression, claim_map, outcome = case
261 raise AssertionError(
262 f'found {len(case)} items in case, expected 3–4')
264 expression_name = expression
265 for op, op_name in op_names.items():
266 expression_name = expression_name.replace(op, op_name)
268 name = f'{pac_claims}_{expression_name}'
270 if claim_map is not None:
271 name += f'_{claim_map}'
273 name = re.sub(r'\W+', '_', name)
275 name = f'{name[:125]}+{len(name) - 125}‐more'
277 if FILTER and not re.search(FILTER, name):
280 cls.generate_dynamic_test('test_pac_claim_cmp', name,
281 pac_claims, expression, claim_map,
284 for case in cls.claim_against_claim_cases:
285 lhs, op, rhs, outcome = case
286 op_name = op_names[op]
288 name = f'{lhs}_{op_name}_{rhs}'
290 name = re.sub(r'\W+', '_', name)
291 if FILTER and not re.search(FILTER, name):
294 cls.generate_dynamic_test('test_cmp', name,
295 lhs, op, rhs, outcome)
297 for case in cls.claim_against_literal_cases:
298 lhs, op, rhs, outcome = case
299 op_name = op_names[op]
301 name = f'{lhs}_{op_name}_literal_{rhs}'
303 name = re.sub(r'\W+', '_', name)
304 if FILTER and not re.search(FILTER, name):
307 cls.generate_dynamic_test('test_cmp', name,
308 lhs, op, rhs, outcome, True)
310 def test_allowed_from_member_of_each(self):
311 # Create an authentication policy that allows accounts belonging to
313 policy = self.create_authn_policy(
316 f'O:SYD:(XA;;CR;;;WD;(Member_of '
317 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
320 # Create a user account with the assigned policy.
321 client_creds = self._get_creds(account_type=self.AccountType.USER,
322 assigned_policy=policy)
324 # Show that we get a policy error if the machine account does not
325 # belong to both groups.
326 armor_tgt = self.get_tgt(self._member_of_one_creds)
327 self._get_tgt(client_creds, armor_tgt=armor_tgt,
328 expected_error=KDC_ERR_POLICY)
330 # Otherwise, authentication should succeed.
331 armor_tgt = self.get_tgt(self._member_of_both_creds)
332 self._get_tgt(client_creds, armor_tgt=armor_tgt,
335 def test_allowed_from_member_of_any(self):
336 # Create an authentication policy that allows accounts belonging to
338 policy = self.create_authn_policy(
341 f'O:SYD:(XA;;CR;;;WD;(Member_of_Any '
342 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
345 # Create a user account with the assigned policy.
346 client_creds = self._get_creds(account_type=self.AccountType.USER,
347 assigned_policy=policy)
349 # Show that we get a policy error if the machine account belongs to
351 armor_tgt = self.get_tgt(self._mach_creds)
352 self._get_tgt(client_creds, armor_tgt=armor_tgt,
353 expected_error=KDC_ERR_POLICY)
355 # Otherwise, authentication should succeed.
356 armor_tgt = self.get_tgt(self._member_of_one_creds)
357 self._get_tgt(client_creds, armor_tgt=armor_tgt,
360 def test_allowed_from_not_member_of_each(self):
361 # Create an authentication policy that allows accounts not belonging to
363 policy = self.create_authn_policy(
366 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of '
367 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
370 # Create a user account with the assigned policy.
371 client_creds = self._get_creds(account_type=self.AccountType.USER,
372 assigned_policy=policy)
374 # Show that we get a policy error if the machine account belongs to
376 armor_tgt = self.get_tgt(self._member_of_both_creds)
377 self._get_tgt(client_creds, armor_tgt=armor_tgt,
378 expected_error=KDC_ERR_POLICY)
380 # Otherwise, authentication should succeed.
381 armor_tgt = self.get_tgt(self._member_of_one_creds)
382 self._get_tgt(client_creds, armor_tgt=armor_tgt,
385 def test_allowed_from_not_member_of_any(self):
386 # Create an authentication policy that allows accounts belonging to
388 policy = self.create_authn_policy(
391 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of_Any '
392 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
395 # Create a user account with the assigned policy.
396 client_creds = self._get_creds(account_type=self.AccountType.USER,
397 assigned_policy=policy)
399 # Show that we get a policy error if the machine account belongs to one
401 armor_tgt = self.get_tgt(self._member_of_one_creds)
402 self._get_tgt(client_creds, armor_tgt=armor_tgt,
403 expected_error=KDC_ERR_POLICY)
405 # Otherwise, authentication should succeed.
406 armor_tgt = self.get_tgt(self._mach_creds)
407 self._get_tgt(client_creds, armor_tgt=armor_tgt,
410 def test_allowed_from_member_of_each_deny(self):
411 # Create an authentication policy that denies accounts belonging to
412 # both groups, and allows other accounts.
413 policy = self.create_authn_policy(
416 f'O:SYD:(XD;;CR;;;WD;(Member_of '
417 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
421 # Create a user account with the assigned policy.
422 client_creds = self._get_creds(account_type=self.AccountType.USER,
423 assigned_policy=policy)
425 # Show that we get a policy error if the machine account belongs to
427 armor_tgt = self.get_tgt(self._member_of_both_creds)
428 self._get_tgt(client_creds, armor_tgt=armor_tgt,
429 expected_error=KDC_ERR_POLICY)
431 # Otherwise, authentication should succeed.
432 armor_tgt = self.get_tgt(self._member_of_one_creds)
433 self._get_tgt(client_creds, armor_tgt=armor_tgt,
436 def test_allowed_from_member_of_any_deny(self):
437 # Create an authentication policy that denies accounts belonging to
438 # either group, and allows other accounts.
439 policy = self.create_authn_policy(
442 f'O:SYD:(XD;;CR;;;WD;(Member_of_Any '
443 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
447 # Create a user account with the assigned policy.
448 client_creds = self._get_creds(account_type=self.AccountType.USER,
449 assigned_policy=policy)
451 # Show that we get a policy error if the machine account belongs to
453 armor_tgt = self.get_tgt(self._member_of_one_creds)
454 self._get_tgt(client_creds, armor_tgt=armor_tgt,
455 expected_error=KDC_ERR_POLICY)
457 # Otherwise, authentication should succeed.
458 armor_tgt = self.get_tgt(self._mach_creds)
459 self._get_tgt(client_creds, armor_tgt=armor_tgt,
462 def test_allowed_from_not_member_of_each_deny(self):
463 # Create an authentication policy that denies accounts not belonging to
464 # both groups, and allows other accounts.
465 policy = self.create_authn_policy(
468 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of '
469 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
473 # Create a user account with the assigned policy.
474 client_creds = self._get_creds(account_type=self.AccountType.USER,
475 assigned_policy=policy)
477 # Show that we get a policy error if the machine account doesn’t belong
479 armor_tgt = self.get_tgt(self._member_of_one_creds)
480 self._get_tgt(client_creds, armor_tgt=armor_tgt,
481 expected_error=KDC_ERR_POLICY)
483 # Otherwise, authentication should succeed.
484 armor_tgt = self.get_tgt(self._member_of_both_creds)
485 self._get_tgt(client_creds, armor_tgt=armor_tgt,
488 def test_allowed_from_not_member_of_any_deny(self):
489 # Create an authentication policy that denies accounts belonging to
490 # neither group, and allows other accounts.
491 policy = self.create_authn_policy(
494 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of_Any '
495 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
499 # Create a user account with the assigned policy.
500 client_creds = self._get_creds(account_type=self.AccountType.USER,
501 assigned_policy=policy)
503 # Show that we get a policy error if the machine account belongs to
505 armor_tgt = self.get_tgt(self._mach_creds)
506 self._get_tgt(client_creds, armor_tgt=armor_tgt,
507 expected_error=KDC_ERR_POLICY)
509 # Otherwise, authentication should succeed.
510 armor_tgt = self.get_tgt(self._member_of_one_creds)
511 self._get_tgt(client_creds, armor_tgt=armor_tgt,
514 def test_allowed_from_unenforced_silo_equals(self):
515 # Create an authentication policy that allows accounts belonging to the
517 policy = self.create_authn_policy(
520 f'O:SYD:(XA;;CR;;;WD;'
521 f'(@User.ad://ext/AuthenticationSilo == '
522 f'"{self._unenforced_silo}"))'),
525 # Create a user account with the assigned policy.
526 client_creds = self._get_creds(account_type=self.AccountType.USER,
527 assigned_policy=policy)
529 # As the policy is unenforced, the ‘ad://ext/AuthenticationSilo’ claim
530 # will not be present in the TGT, and the ACE will never allow access.
532 armor_tgt = self.get_tgt(self._mach_creds)
533 self._get_tgt(client_creds, armor_tgt=armor_tgt,
534 expected_error=KDC_ERR_POLICY)
536 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
537 self._get_tgt(client_creds, armor_tgt=armor_tgt,
538 expected_error=KDC_ERR_POLICY)
540 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
541 self._get_tgt(client_creds, armor_tgt=armor_tgt,
542 expected_error=KDC_ERR_POLICY)
544 def test_allowed_from_enforced_silo_equals(self):
545 # Create an authentication policy that allows accounts belonging to the
547 policy = self.create_authn_policy(
550 f'O:SYD:(XA;;CR;;;WD;'
551 f'(@User.ad://ext/AuthenticationSilo == '
552 f'"{self._enforced_silo}"))'),
555 # Create a user account with the assigned policy.
556 client_creds = self._get_creds(account_type=self.AccountType.USER,
557 assigned_policy=policy)
559 # Show that we get a policy error if the machine account does not
560 # belong to the silo.
561 armor_tgt = self.get_tgt(self._mach_creds)
562 self._get_tgt(client_creds, armor_tgt=armor_tgt,
563 expected_error=KDC_ERR_POLICY)
565 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
566 self._get_tgt(client_creds, armor_tgt=armor_tgt,
567 expected_error=KDC_ERR_POLICY)
569 # Otherwise, authentication should succeed.
570 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
571 self._get_tgt(client_creds, armor_tgt=armor_tgt,
574 def test_allowed_from_unenforced_silo_not_equals(self):
575 # Create an authentication policy that allows accounts not belonging to
576 # the unenforced silo.
577 policy = self.create_authn_policy(
580 f'O:SYD:(XA;;CR;;;WD;'
581 f'(@User.ad://ext/AuthenticationSilo != '
582 f'"{self._unenforced_silo}"))'),
585 # Create a user account with the assigned policy.
586 client_creds = self._get_creds(account_type=self.AccountType.USER,
587 assigned_policy=policy)
589 # Show that authentication fails unless the account belongs to a silo
590 # other than the unenforced silo.
592 armor_tgt = self.get_tgt(self._mach_creds)
593 self._get_tgt(client_creds, armor_tgt=armor_tgt,
594 expected_error=KDC_ERR_POLICY)
596 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
597 self._get_tgt(client_creds, armor_tgt=armor_tgt,
598 expected_error=KDC_ERR_POLICY)
600 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
601 self._get_tgt(client_creds, armor_tgt=armor_tgt,
604 def test_allowed_from_enforced_silo_not_equals(self):
605 # Create an authentication policy that allows accounts not belonging to
607 policy = self.create_authn_policy(
610 f'O:SYD:(XA;;CR;;;WD;'
611 f'(@User.ad://ext/AuthenticationSilo != '
612 f'"{self._enforced_silo}"))'),
615 # Create a user account with the assigned policy.
616 client_creds = self._get_creds(account_type=self.AccountType.USER,
617 assigned_policy=policy)
619 # Show that authentication always fails, as none of the machine
620 # accounts belong to a silo that is not the enforced one. (The
621 # unenforced silo doesn’t count, as it will never appear in a claim.)
623 armor_tgt = self.get_tgt(self._mach_creds)
624 self._get_tgt(client_creds, armor_tgt=armor_tgt,
625 expected_error=KDC_ERR_POLICY)
627 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
628 self._get_tgt(client_creds, armor_tgt=armor_tgt,
629 expected_error=KDC_ERR_POLICY)
631 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
632 self._get_tgt(client_creds, armor_tgt=armor_tgt,
633 expected_error=KDC_ERR_POLICY)
635 def test_allowed_from_unenforced_silo_equals_deny(self):
636 # Create an authentication policy that denies accounts belonging to the
637 # unenforced silo, and allows other accounts.
638 policy = self.create_authn_policy(
641 f'O:SYD:(XD;;CR;;;WD;'
642 f'(@User.ad://ext/AuthenticationSilo == '
643 f'"{self._unenforced_silo}"))'
647 # Create a user account with the assigned policy.
648 client_creds = self._get_creds(account_type=self.AccountType.USER,
649 assigned_policy=policy)
651 # Show that authentication fails unless the account belongs to a silo
652 # other than the unenforced silo.
654 armor_tgt = self.get_tgt(self._mach_creds)
655 self._get_tgt(client_creds, armor_tgt=armor_tgt,
656 expected_error=KDC_ERR_POLICY)
658 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
659 self._get_tgt(client_creds, armor_tgt=armor_tgt,
660 expected_error=KDC_ERR_POLICY)
662 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
663 self._get_tgt(client_creds, armor_tgt=armor_tgt,
666 def test_allowed_from_enforced_silo_equals_deny(self):
667 # Create an authentication policy that denies accounts belonging to the
668 # enforced silo, and allows other accounts.
669 policy = self.create_authn_policy(
672 f'O:SYD:(XD;;CR;;;WD;'
673 f'(@User.ad://ext/AuthenticationSilo == '
674 f'"{self._enforced_silo}"))'
678 # Create a user account with the assigned policy.
679 client_creds = self._get_creds(account_type=self.AccountType.USER,
680 assigned_policy=policy)
682 # Show that authentication always fails, as none of the machine
683 # accounts belong to a silo that is not the enforced one. (The
684 # unenforced silo doesn’t count, as it will never appear in a claim.)
686 armor_tgt = self.get_tgt(self._mach_creds)
687 self._get_tgt(client_creds, armor_tgt=armor_tgt,
688 expected_error=KDC_ERR_POLICY)
690 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
691 self._get_tgt(client_creds, armor_tgt=armor_tgt,
692 expected_error=KDC_ERR_POLICY)
694 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
695 self._get_tgt(client_creds, armor_tgt=armor_tgt,
696 expected_error=KDC_ERR_POLICY)
698 def test_allowed_from_unenforced_silo_not_equals_deny(self):
699 # Create an authentication policy that denies accounts not belonging to
700 # the unenforced silo, and allows other accounts.
701 policy = self.create_authn_policy(
704 f'O:SYD:(XD;;CR;;;WD;'
705 f'(@User.ad://ext/AuthenticationSilo != '
706 f'"{self._unenforced_silo}"))'
710 # Create a user account with the assigned policy.
711 client_creds = self._get_creds(account_type=self.AccountType.USER,
712 assigned_policy=policy)
714 # Show that authentication always fails, as the unenforced silo will
715 # never appear in a claim.
717 armor_tgt = self.get_tgt(self._mach_creds)
718 self._get_tgt(client_creds, armor_tgt=armor_tgt,
719 expected_error=KDC_ERR_POLICY)
721 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
722 self._get_tgt(client_creds, armor_tgt=armor_tgt,
723 expected_error=KDC_ERR_POLICY)
725 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
726 self._get_tgt(client_creds, armor_tgt=armor_tgt,
727 expected_error=KDC_ERR_POLICY)
729 def test_allowed_from_enforced_silo_not_equals_deny(self):
730 # Create an authentication policy that denies accounts not belonging to
731 # the enforced silo, and allows other accounts.
732 policy = self.create_authn_policy(
735 f'O:SYD:(XD;;CR;;;WD;'
736 f'(@User.ad://ext/AuthenticationSilo != '
737 f'"{self._enforced_silo}"))'
741 # Create a user account with the assigned policy.
742 client_creds = self._get_creds(account_type=self.AccountType.USER,
743 assigned_policy=policy)
745 # Show that authentication fails unless the account belongs to the
748 armor_tgt = self.get_tgt(self._mach_creds)
749 self._get_tgt(client_creds, armor_tgt=armor_tgt,
750 expected_error=KDC_ERR_POLICY)
752 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
753 self._get_tgt(client_creds, armor_tgt=armor_tgt,
754 expected_error=KDC_ERR_POLICY)
756 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
757 self._get_tgt(client_creds, armor_tgt=armor_tgt,
760 def test_allowed_from_claim_equals_claim(self):
761 # Create a couple of claims.
763 claim0_id = self.get_new_username()
764 self.create_claim(claim0_id,
766 attribute='carLicense',
769 for_classes=['computer'],
770 value_type=claims.CLAIM_TYPE_STRING)
772 claim1_id = self.get_new_username()
773 self.create_claim(claim1_id,
778 for_classes=['computer'],
779 value_type=claims.CLAIM_TYPE_STRING)
781 # Create an authentication policy that allows accounts having the two
783 policy = self.create_authn_policy(
786 f'O:SYD:(XA;;CR;;;WD;'
787 f'(@User.{claim0_id} == @User.{claim1_id}))'),
790 # Create a user account with the assigned policy.
791 client_creds = self._get_creds(account_type=self.AccountType.USER,
792 assigned_policy=policy)
794 armor_tgt = self.get_tgt(self._mach_creds)
795 self._get_tgt(client_creds, armor_tgt=armor_tgt,
796 expected_error=KDC_ERR_POLICY)
798 mach_creds = self.get_cached_creds(
799 account_type=self.AccountType.COMPUTER,
801 'additional_details': (
802 ('carLicense', 'foo'),
806 armor_tgt = self.get_tgt(
808 expect_client_claims=True,
809 expected_client_claims={
811 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
812 'type': claims.CLAIM_TYPE_STRING,
816 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
817 'type': claims.CLAIM_TYPE_STRING,
821 self._get_tgt(client_creds, armor_tgt=armor_tgt,
824 def test_allowed_to_client_equals(self):
825 client_claim_attr = 'carLicense'
826 client_claim_value = 'foo bar'
827 client_claim_values = client_claim_value,
829 client_claim_id = self.get_new_username()
830 self.create_claim(client_claim_id,
832 attribute=client_claim_attr,
835 for_classes=['user'],
836 value_type=claims.CLAIM_TYPE_STRING)
838 # Create an authentication policy that allows authorization if the
839 # client has a particular claim value.
840 policy = self.create_authn_policy(
842 computer_allowed_to=(
843 f'O:SYD:(XA;;CR;;;WD;'
844 f'((@User.{client_claim_id} == "{client_claim_value}")))'),
847 # Create a computer account with the assigned policy.
848 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
849 assigned_policy=policy)
851 armor_tgt = self.get_tgt(self._mach_creds)
853 # Create a user account without the claim value.
854 client_creds = self.get_cached_creds(
855 account_type=self.AccountType.USER)
856 tgt = self.get_tgt(client_creds)
857 # Show that obtaining a service ticket is denied.
859 tgt, KDC_ERR_POLICY, client_creds, target_creds,
861 expect_edata=self.expect_padata_outer,
862 # We aren’t particular about whether or not we get an NTSTATUS.
864 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
867 # Create a user account with the claim value.
868 client_creds = self.get_cached_creds(
869 account_type=self.AccountType.USER,
871 'additional_details': (
872 (client_claim_attr, client_claim_values),
877 expect_client_claims=True,
878 expected_client_claims={
880 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
881 'type': claims.CLAIM_TYPE_STRING,
882 'values': client_claim_values,
885 # Show that obtaining a service ticket is allowed.
886 self._tgs_req(tgt, 0, client_creds, target_creds,
889 def test_allowed_to_device_equals(self):
890 device_claim_attr = 'carLicense'
891 device_claim_value = 'bar'
892 device_claim_values = device_claim_value,
894 device_claim_id = self.get_new_username()
895 self.create_claim(device_claim_id,
897 attribute=device_claim_attr,
900 for_classes=['computer'],
901 value_type=claims.CLAIM_TYPE_STRING)
903 # Create a user account.
904 client_creds = self.get_cached_creds(
905 account_type=self.AccountType.USER)
906 tgt = self.get_tgt(client_creds)
908 # Create an authentication policy that allows authorization if the
909 # device has a particular claim value.
910 policy = self.create_authn_policy(
912 computer_allowed_to=(
913 f'O:SYD:(XA;;CR;;;WD;'
914 f'(@Device.{device_claim_id} == "{device_claim_value}"))'),
917 # Create a computer account with the assigned policy.
918 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
919 assigned_policy=policy)
921 armor_tgt = self.get_tgt(self._mach_creds)
922 # Show that obtaining a service ticket is denied when the claim value
925 tgt, KDC_ERR_POLICY, client_creds, target_creds,
927 expect_edata=self.expect_padata_outer,
928 # We aren’t particular about whether or not we get an NTSTATUS.
930 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
933 mach_creds = self.get_cached_creds(
934 account_type=self.AccountType.COMPUTER,
936 'additional_details': (
937 (device_claim_attr, device_claim_values),
940 armor_tgt = self.get_tgt(
942 expect_client_claims=True,
943 expected_client_claims={
945 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
946 'type': claims.CLAIM_TYPE_STRING,
947 'values': device_claim_values,
950 # Show that obtaining a service ticket is allowed when the claim value
952 self._tgs_req(tgt, 0, client_creds, target_creds,
955 claim_against_claim_cases = [
956 # If either side is missing, the result is unknown.
957 ((), '==', (), None),
958 ((), '!=', (), None),
959 ('a', '==', (), None),
960 ((), '==', 'b', None),
961 # Straightforward equality and inequality checks work.
962 ('foo', '==', 'foo', True),
963 ('foo', '==', 'bar', False),
964 ('foo', '!=', 'foo', False),
965 ('foo', '!=', 'bar', True),
966 # We can perform less‐than and greater‐than operations.
967 ('cat', '<', 'dog', True),
968 ('cat', '<=', 'dog', True),
969 ('cat', '>', 'dog', False),
970 ('cat', '>=', 'dog', False),
971 ('foo', '<=', 'foo', True),
972 ('foo', '>=', 'foo', True),
973 ('foo', '<', 'foo bar', True),
974 ('foo bar', '>', 'foo', True),
975 # String comparison is case‐sensitive.
976 ('foo bar', '==', 'Foo BAR', True),
977 ('foo bar', '==', 'FOO BAR', True),
978 ('ćàț', '==', 'ĆÀȚ', True),
979 ('ḽ', '==', 'Ḽ', True),
980 ('ⅸ', '==', 'Ⅸ', True),
981 ('ꙭ', '==', 'Ꙭ', True),
982 ('ⱦ', '==', 'Ⱦ', True), # Lowercased variant added in Unicode 5.0.
983 ('ԛԣ', '==', 'ԚԢ', True), # All added in Unicode 5.1.
984 ('foo', '<', 'foo', True),
985 ('ćàș', '<', 'ĆÀȚ', True),
986 ('cat', '<', 'ćàț', True),
987 # This is done by converting to UPPER CASE. Hence, both ‘A’ (U+41) and
988 # ‘a’ (U+61) compare less than ‘_’ (U+5F).
989 ('A', '<', '_', True),
990 ('a', '<', '_', True),
991 # But not all uppercased/lowercased pairs are considered to be equal in
993 ('ß', '<', 'ẞ', True),
994 ('ß', '>', 'SS', True),
995 ('ⳬ', '>', 'Ⳬ', True), # Added in Unicode 5.2.
996 ('ʞ', '<', 'Ʞ', True), # Uppercased variant added in Unicode 6.0.
997 ('ʞ', '<', 'ʟ', True), # U+029E < U+029F < U+A7B0 (upper variant, Ʞ)
998 ('ꞧ', '>', 'Ꞧ', True), # Added in Unicode 6.0.
999 ('ɜ', '<', 'Ɜ', True), # Uppercased variant added in Unicode 7.0.
1001 # Strings are compared as UTF‐16 code units, rather than as Unicode
1002 # codepoints. So while you might expect ‘𐀀’ (U+10000) to compare
1003 # greater than ‘豈’ (U+F900), it is actually considered to be the
1004 # *smaller* of the pair. That is because it is encoded as a sequence of
1005 # two code units, 0xd800 and 0xdc00, which combination compares less
1006 # than the single code unit 0xf900.
1007 ('ퟻ', '<', '𐀀', True),
1008 ('𐀀', '<', '豈', True),
1009 ('ퟻ', '<', '豈', True),
1010 # Composites can be compared.
1011 (('foo', 'bar'), '==', ('foo', 'bar'), True),
1012 (('foo', 'bar'), '==', ('foo', 'baz'), False),
1013 # The individual components don’t have to match in case.
1014 (('foo', 'bar'), '==', ('FOO', 'BAR'), True),
1015 # Nor must they match in order.
1016 (('foo', 'bar'), '==', ('bar', 'foo'), True),
1017 # Composites of different lengths compare unequal.
1018 (('foo', 'bar'), '!=', 'foo', True),
1019 (('foo', 'bar'), '!=', ('foo', 'bar', 'baz'), True),
1020 # But composites don’t have a defined ordering, and aren’t considered
1021 # greater or lesser than one another.
1022 (('foo', 'bar'), '<', ('foo', 'bar'), None),
1023 (('foo', 'bar'), '<=', ('foo', 'bar'), None),
1024 (('foo', 'bar'), '>', ('foo', 'bar', 'baz'), None),
1025 (('foo', 'bar'), '>=', ('foo', 'bar', 'baz'), None),
1026 # We can test for containment.
1027 (('foo', 'bar'), 'Contains', ('FOO'), True),
1028 (('foo', 'bar'), 'Contains', ('foo', 'bar'), True),
1029 (('foo', 'bar'), 'Not_Contains', ('foo', 'bar'), False),
1030 (('foo', 'bar'), 'Contains', ('foo', 'bar', 'baz'), False),
1031 (('foo', 'bar'), 'Not_Contains', ('foo', 'bar', 'baz'), True),
1032 # We can test whether the operands have any elements in common.
1033 ('foo', 'Any_of', 'foo', True),
1034 (('foo', 'bar'), 'Any_of', 'BAR', True),
1035 (('foo', 'bar'), 'Any_of', 'baz', False),
1036 (('foo', 'bar'), 'Not_Any_of', 'baz', True),
1037 (('foo', 'bar'), 'Any_of', ('bar', 'baz'), True),
1038 (('foo', 'bar'), 'Not_Any_of', ('bar', 'baz'), False),
1041 claim_against_literal_cases = [
1042 # String comparisons also work against literals.
1043 ('foo bar', '==', '"foo bar"', True),
1044 # Composites can be compared with literals.
1045 ('bar', '==', '{{"bar"}}', True),
1046 (('apple', 'banana'), '==', '{{"APPLE", "BANANA"}}', True),
1047 (('apple', 'banana'), '==', '{{"BANANA", "APPLE"}}', True),
1048 (('apple', 'banana'), '==', '{{"apple", "banana", "apple"}}', False),
1049 # We can test for containment.
1050 ((), 'Contains', '{{"foo"}}', None),
1051 ((), 'Not_Contains', '{{"foo", "bar"}}', None),
1052 ('bar', 'Contains', '{{"bar"}}', True),
1053 (('foo', 'bar'), 'Contains', '{{"foo", "bar"}}', True),
1054 (('foo', 'bar'), 'Contains', '{{"foo", "bar", "baz"}}', False),
1055 # The right‐hand side of Contains or Not_Contains does not have to be a
1057 ('foo', 'Contains', '"foo"', True),
1058 (('foo', 'bar'), 'Not_Contains', '"foo"', False),
1059 # It’s fine if the right‐hand side contains duplicate elements.
1060 (('foo', 'bar'), 'Contains', '{{"foo", "bar", "bar"}}', True),
1061 # We can test whether the operands have any elements in common.
1062 ('bar', 'Any_of', '{{"bar"}}', True),
1063 (('foo', 'bar'), 'Any_of', '{{"bar", "baz"}}', True),
1064 (('foo', 'bar'), 'Any_of', '{{"baz"}}', False),
1065 # The right‐hand side of Any_of or Not_Any_of must be a composite.
1066 ('foo', 'Any_of', '"foo"', None),
1067 (('foo', 'bar'), 'Not_Any_of', '"baz"', None),
1068 # A string won’t compare equal to a numeric literal.
1069 ('42', '==', '"42"', True),
1070 ('42', '==', '42', None),
1071 # Nor can composites that mismatch in type be compared.
1072 (('123', '456'), '==', '{{"123", "456"}}', True),
1073 (('654', '321'), '==', '{{654, 321}}', None),
1074 (('foo', 'bar'), 'Contains', '{{1, 2, 3}}', None),
1077 ##########################################################################################
1079 def _test_cmp_with_args(self, lhs, op, rhs, outcome, rhs_is_literal=False):
1080 # Construct a conditional ACE expression that evaluates to True if the
1081 # two claim values are equal.
1083 self.assertIsInstance(rhs, str)
1084 rhs = rhs.format(self=self)
1085 expression = f'(@User.{self.claim0_id} {op} {rhs})'
1087 expression = f'(@User.{self.claim0_id} {op} @User.{self.claim1_id})'
1089 # Create an authentication policy that will allow authentication when
1090 # the expression is true, and a second that will deny authentication in
1091 # the same circumstance. By observing the results of authenticating
1092 # against each of these policies in turn, we can determine whether the
1093 # expression evaluates to a True, False, or Unknown value.
1095 allowed_sddl = f'O:SYD:(XA;;CR;;;WD;{expression})'
1096 denied_sddl = f'O:SYD:(XD;;CR;;;WD;{expression})(A;;CR;;;WD)'
1098 allowed_policy = self.create_authn_policy(
1100 user_allowed_from=allowed_sddl)
1101 denied_policy = self.create_authn_policy(
1103 user_allowed_from=denied_sddl)
1105 # Create a user account assigned to each policy.
1106 allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1107 assigned_policy=allowed_policy)
1108 denied_creds = self._get_creds(account_type=self.AccountType.USER,
1109 assigned_policy=denied_policy)
1111 additional_details = ()
1113 additional_details += ((self.claim0_attr, lhs),)
1114 if rhs and not rhs_is_literal:
1115 additional_details += ((self.claim1_attr, rhs),)
1117 # Create a computer account with the provided attribute values.
1118 mach_creds = self.get_cached_creds(
1119 account_type=self.AccountType.COMPUTER,
1120 opts={'additional_details': additional_details})
1122 def expected_values(val):
1123 if isinstance(val, (str, bytes)):
1128 expected_client_claims = {}
1130 expected_client_claims[self.claim0_id] = {
1131 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1132 'type': claims.CLAIM_TYPE_STRING,
1133 'values': expected_values(lhs),
1135 if rhs and not rhs_is_literal:
1136 expected_client_claims[self.claim1_id] = {
1137 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1138 'type': claims.CLAIM_TYPE_STRING,
1139 'values': expected_values(rhs),
1142 # Fetch the computer account’s TGT, and ensure it contains the claims.
1143 armor_tgt = self.get_tgt(
1145 expect_client_claims=bool(expected_client_claims) or None,
1146 expected_client_claims=expected_client_claims)
1148 # The first or the second authentication request is expected to succeed
1149 # if the outcome is True or False, respectively. An Unknown outcome,
1150 # represented by None, will result in a policy error in either case.
1151 allowed_error = 0 if outcome is True else KDC_ERR_POLICY
1152 denied_error = 0 if outcome is False else KDC_ERR_POLICY
1154 # Attempt to authenticate and ensure that we observe the expected
1156 self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1157 expected_error=allowed_error)
1158 self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1159 expected_error=denied_error)
1162 # Test a very simple expression with various claims.
1164 (claims.CLAIMS_SOURCE_TYPE_AD, [
1165 ('{non_empty_string}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1167 ], '{non_empty_string}', True),
1169 (claims.CLAIMS_SOURCE_TYPE_AD, [
1170 ('{zero_uint}', claims.CLAIM_TYPE_UINT64, [0]),
1172 ], '{zero_uint}', False),
1174 (claims.CLAIMS_SOURCE_TYPE_AD, [
1175 ('{nonzero_uint}', claims.CLAIM_TYPE_UINT64, [1]),
1177 ], '{nonzero_uint}', True),
1179 (claims.CLAIMS_SOURCE_TYPE_AD, [
1180 ('{zero_uints}', claims.CLAIM_TYPE_UINT64, [0, 0]),
1182 ], '{zero_uints}', KDC_ERR_GENERIC),
1184 (claims.CLAIMS_SOURCE_TYPE_AD, [
1185 ('{zero_and_one_uint}', claims.CLAIM_TYPE_UINT64, [0, 1]),
1187 ], '{zero_and_one_uint}', True),
1189 (claims.CLAIMS_SOURCE_TYPE_AD, [
1190 ('{one_and_zero_uint}', claims.CLAIM_TYPE_UINT64, [1, 0]),
1192 ], '{one_and_zero_uint}', True),
1194 (claims.CLAIMS_SOURCE_TYPE_AD, [
1195 ('{zero_int}', claims.CLAIM_TYPE_INT64, [0]),
1197 ], '{zero_int}', False),
1199 (claims.CLAIMS_SOURCE_TYPE_AD, [
1200 ('{nonzero_int}', claims.CLAIM_TYPE_INT64, [1]),
1202 ], '{nonzero_int}', True),
1204 (claims.CLAIMS_SOURCE_TYPE_AD, [
1205 ('{zero_ints}', claims.CLAIM_TYPE_INT64, [0, 0]),
1207 ], '{zero_ints}', KDC_ERR_GENERIC),
1209 (claims.CLAIMS_SOURCE_TYPE_AD, [
1210 ('{zero_and_one_int}', claims.CLAIM_TYPE_INT64, [0, 1]),
1212 ], '{zero_and_one_int}', True),
1214 (claims.CLAIMS_SOURCE_TYPE_AD, [
1215 ('{one_and_zero_int}', claims.CLAIM_TYPE_INT64, [1, 0]),
1217 ], '{one_and_zero_int}', True),
1219 (claims.CLAIMS_SOURCE_TYPE_AD, [
1220 ('{false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1222 ], '{false_boolean}', False),
1224 (claims.CLAIMS_SOURCE_TYPE_AD, [
1225 ('{true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1227 ], '{true_boolean}', True),
1229 (claims.CLAIMS_SOURCE_TYPE_AD, [
1230 ('{false_booleans}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1232 ], '{false_booleans}', KDC_ERR_GENERIC),
1234 (claims.CLAIMS_SOURCE_TYPE_AD, [
1235 ('{false_and_true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0, 1]),
1237 ], '{false_and_true_boolean}', True),
1239 (claims.CLAIMS_SOURCE_TYPE_AD, [
1240 ('{true_and_false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1, 0]),
1242 ], '{true_and_false_boolean}', True),
1243 # Test a basic comparison against a literal.
1245 (claims.CLAIMS_SOURCE_TYPE_AD, [
1246 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1248 ], '{a} == "foo bar"', True),
1249 # Claims can be compared against one another.
1251 (claims.CLAIMS_SOURCE_TYPE_AD, [
1252 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1253 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO BAR']),
1255 ], '{a} == {b}', True),
1257 (claims.CLAIMS_SOURCE_TYPE_AD, [
1258 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO', 'BAR', 'BAZ']),
1259 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'bar', 'baz']),
1261 ], '{a} != {b}', False),
1262 # Certificate claims are also valid.
1264 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1265 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1267 ], '{a} == "foo"', True),
1268 # Other claim source types are ignored.
1271 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1273 ], '{a} == "foo"', None),
1276 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1278 ], '{a} == "foo"', None),
1279 # If multiple claims have the same ID, the *last* one takes precedence.
1281 (claims.CLAIMS_SOURCE_TYPE_AD, [
1282 ('{a}', claims.CLAIM_TYPE_STRING, ['this is not the value…']),
1283 ('{a}', claims.CLAIM_TYPE_STRING, ['…nor is this…']),
1285 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1286 ('{a}', claims.CLAIM_TYPE_STRING, ['…and this isn’t either.']),
1288 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1289 ('{a}', claims.CLAIM_TYPE_STRING, ['here’s the actual value!']),
1292 ('{a}', claims.CLAIM_TYPE_STRING, ['this is a red herring.']),
1294 ], '{a} == "here’s the actual value!"', True),
1295 # Claim values can be empty.
1297 (claims.CLAIMS_SOURCE_TYPE_AD, [
1298 ('{empty_claim_string}', claims.CLAIM_TYPE_STRING, []),
1300 ], '{empty_claim_string} != "foo bar"', None),
1302 (claims.CLAIMS_SOURCE_TYPE_AD, [
1303 ('{empty_claim_boolean}', claims.CLAIM_TYPE_BOOLEAN, []),
1305 ], 'Exists {empty_claim_boolean}', None),
1306 # Test unsigned integer equality.
1308 (claims.CLAIMS_SOURCE_TYPE_AD, [
1309 ('{a}', claims.CLAIM_TYPE_UINT64, [42]),
1311 ], '{a} == 42', True),
1313 (claims.CLAIMS_SOURCE_TYPE_AD, [
1314 ('{a}', claims.CLAIM_TYPE_UINT64, [0]),
1316 ], '{a} == 3', False),
1318 (claims.CLAIMS_SOURCE_TYPE_AD, [
1319 ('{a}', claims.CLAIM_TYPE_UINT64, [1, 2, 3]),
1321 ], '{a} == {{1, 2, 3}}', True),
1323 (claims.CLAIMS_SOURCE_TYPE_AD, [
1324 ('{a}', claims.CLAIM_TYPE_UINT64, [4, 5, 6]),
1326 ], '{a} != {{1, 2, 3}}', True),
1327 # Test unsigned integer comparison. Ensure we don’t run into any
1328 # integer overflow issues.
1330 (claims.CLAIMS_SOURCE_TYPE_AD, [
1331 ('{a}', claims.CLAIM_TYPE_UINT64, [1 << 32]),
1333 ], '{a} > 0', True),
1334 # Test signed integer comparisons.
1336 (claims.CLAIMS_SOURCE_TYPE_AD, [
1337 ('{a}', claims.CLAIM_TYPE_INT64, [42]),
1339 ], '{a} == 42', True),
1341 (claims.CLAIMS_SOURCE_TYPE_AD, [
1342 ('{a}', claims.CLAIM_TYPE_INT64, [42 << 32]),
1344 ], f'{{a}} == {42 << 32}', True),
1345 # Test boolean claims. Be careful! Windows will *crash* if you send it
1346 # claims that aren’t real booleans (not 0 or 1). I doubt Microsoft will
1347 # consider this a security issue though.
1349 (claims.CLAIMS_SOURCE_TYPE_AD, [
1350 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [2]),
1351 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [3]),
1353 ], '{a} == {b}', CRASHES_WINDOWS),
1355 (claims.CLAIMS_SOURCE_TYPE_AD, [
1356 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1357 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1359 ], '{a} == {b}', True),
1361 (claims.CLAIMS_SOURCE_TYPE_AD, [
1362 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1364 ], '{a} == 42', None),
1366 (claims.CLAIMS_SOURCE_TYPE_AD, [
1367 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1368 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1370 ], '{a} && {b}', True),
1372 (claims.CLAIMS_SOURCE_TYPE_AD, [
1373 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1374 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1376 ], '{a} && {b}', False),
1378 (claims.CLAIMS_SOURCE_TYPE_AD, [
1379 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1380 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1382 ], '{a} && {b}', False),
1384 (claims.CLAIMS_SOURCE_TYPE_AD, [
1385 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1386 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1388 ], '{a} || {b}', True),
1390 (claims.CLAIMS_SOURCE_TYPE_AD, [
1391 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1392 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1394 ], '{a} || {b}', True),
1396 (claims.CLAIMS_SOURCE_TYPE_AD, [
1397 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1398 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1400 ], '{a} || {b}', False),
1402 (claims.CLAIMS_SOURCE_TYPE_AD, [
1403 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1407 (claims.CLAIMS_SOURCE_TYPE_AD, [
1408 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1410 ], '!(!(!(!({a}))))', False),
1412 (claims.CLAIMS_SOURCE_TYPE_AD, [
1413 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1415 ], '!({a} && {a})', True),
1417 (claims.CLAIMS_SOURCE_TYPE_AD, [
1418 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1419 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1421 ], '{a} && !({b} || {b})', True),
1423 (claims.CLAIMS_SOURCE_TYPE_AD, [
1424 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1426 ], '!({a}) || !({a})', True),
1428 (claims.CLAIMS_SOURCE_TYPE_AD, [
1429 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1430 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1432 ], '{a} && !({b})', None),
1433 # Expressions containing the ‘not’ operator are occasionally evaluated
1434 # inconsistently, as evidenced here. ‘a || !a’ evaluates to ‘unknown’…
1436 (claims.CLAIMS_SOURCE_TYPE_AD, [
1437 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1439 ], '{a} || !({a})', None),
1440 # …but ‘!a || a’ — the same expression, just with the operands switched
1441 # round — evaluates to ‘true’.
1443 (claims.CLAIMS_SOURCE_TYPE_AD, [
1444 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1446 ], '!({a}) || {a}', True),
1447 # This inconsistency is not observed with other boolean expressions,
1450 (claims.CLAIMS_SOURCE_TYPE_AD, [
1451 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1453 ], '{a} || ({a} || {a})', True),
1455 (claims.CLAIMS_SOURCE_TYPE_AD, [
1456 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1458 ], '({b} || {b}) || {b}', True),
1459 # Test a very large claim. Much larger than this, and
1460 # conditional_ace_encode_binary() will refuse to encode the conditions.
1462 (claims.CLAIMS_SOURCE_TYPE_AD, [
1463 ('{large_claim}', claims.CLAIM_TYPE_STRING, ['z' * 4900]),
1465 ], f'{{large_claim}} == "{"z" * 4900}"', True),
1466 # Test an even larger claim. Windows does not appear to like receiving
1467 # a claim this large.
1469 (claims.CLAIMS_SOURCE_TYPE_AD, [
1470 ('{larger_claim}', claims.CLAIM_TYPE_STRING, ['z' * 100000]),
1472 ], '{larger_claim} > "z"', CRASHES_WINDOWS),
1473 # Test a great number of claims. Windows does not appear to like
1474 # receiving this many claims.
1476 (claims.CLAIMS_SOURCE_TYPE_AD, [
1477 ('{many_claims}', claims.CLAIM_TYPE_UINT64,
1478 list(range(0, 100000))),
1480 ], '{many_claims} Any_of "99999"', CRASHES_WINDOWS),
1481 # Test a claim with a very long name. Much larger than this, and
1482 # conditional_ace_encode_binary() will refuse to encode the conditions.
1484 (claims.CLAIMS_SOURCE_TYPE_AD, [
1485 ('{long_name}', claims.CLAIM_TYPE_STRING, ['a']),
1487 ], '{long_name} == "a"', {'long_name': 'z' * 4900}, True),
1488 # Test attribute name escaping.
1490 (claims.CLAIMS_SOURCE_TYPE_AD, [
1491 ('{escaped_claim}', claims.CLAIM_TYPE_STRING, ['claim value']),
1493 ], '{escaped_claim} == "claim value"',
1494 {'escaped_claim': '(:foo:! /&/ :bar:!)'}, True),
1495 # Test a claim whose name consists entirely of dots.
1497 (claims.CLAIMS_SOURCE_TYPE_AD, [
1498 ('{dotty_claim}', claims.CLAIM_TYPE_STRING, ['a']),
1500 ], '{dotty_claim} == "a"', {'dotty_claim': '...'}, True),
1501 # Test a claim whose name consists of the first thousand non‐zero
1502 # Unicode codepoints.
1504 (claims.CLAIMS_SOURCE_TYPE_AD, [
1505 ('{1000_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1507 ], '{1000_unicode} == "a"',
1508 {'1000_unicode': ''.join(map(chr, range(1, 1001)))}, True),
1509 # Test a claim whose name consists of some higher Unicode codepoints,
1510 # including non‐BMP ones.
1512 (claims.CLAIMS_SOURCE_TYPE_AD, [
1513 ('{higher_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1515 ], '{higher_unicode} == "a"',
1516 {'higher_unicode': ''.join(map(chr, range(0xfe00, 0x10800)))}, True),
1517 # Duplicate claim values are not allowed…
1519 (claims.CLAIMS_SOURCE_TYPE_AD, [
1520 ('{a}', claims.CLAIM_TYPE_INT64, [42, 42, 42]),
1522 ], '{a} == {a}', KDC_ERR_GENERIC),
1524 (claims.CLAIMS_SOURCE_TYPE_AD, [
1525 ('{a}', claims.CLAIM_TYPE_UINT64, [42, 42]),
1527 ], '{a} == {a}', KDC_ERR_GENERIC),
1529 (claims.CLAIMS_SOURCE_TYPE_AD, [
1530 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'foo']),
1532 ], '{a} == {a}', KDC_ERR_GENERIC),
1534 (claims.CLAIMS_SOURCE_TYPE_AD, [
1535 ('{a}', claims.CLAIM_TYPE_STRING, ['FOO', 'foo']),
1537 ], '{a} == {a}', KDC_ERR_GENERIC),
1539 (claims.CLAIMS_SOURCE_TYPE_AD, [
1540 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1542 ], '{a} == {a}', KDC_ERR_GENERIC),
1543 # …but it’s OK if duplicate values are spread across multiple claim
1546 (claims.CLAIMS_SOURCE_TYPE_AD, [
1547 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1548 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1550 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1551 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1552 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1554 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1555 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1556 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1557 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1558 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1560 ], '{dup} == {dup}', True),
1561 # Test invalid claim types. Be careful! Windows will *crash* if you
1562 # send it invalid claim types. I doubt Microsoft will consider this a
1563 # security issue though.
1565 (claims.CLAIMS_SOURCE_TYPE_AD, [
1566 ('{invalid_sid}', 5, []),
1568 ], '{invalid_sid} == {invalid_sid}', CRASHES_WINDOWS),
1570 (claims.CLAIMS_SOURCE_TYPE_AD, [
1571 ('{invalid_octet_string}', 16, []),
1573 ], '{invalid_octet_string} == {invalid_octet_string}', CRASHES_WINDOWS),
1574 # Sending an empty string will crash Windows.
1576 (claims.CLAIMS_SOURCE_TYPE_AD, [
1577 ('{empty_string}', claims.CLAIM_TYPE_STRING, ['']),
1579 ], '{empty_string}', CRASHES_WINDOWS),
1580 # But sending empty arrays is OK.
1582 (claims.CLAIMS_SOURCE_TYPE_AD, [
1583 ('{empty_array}', claims.CLAIM_TYPE_INT64, []),
1584 ('{empty_array}', claims.CLAIM_TYPE_UINT64, []),
1585 ('{empty_array}', claims.CLAIM_TYPE_BOOLEAN, []),
1586 ('{empty_array}', claims.CLAIM_TYPE_STRING, []),
1588 ], '{empty_array}', None),
1591 def _test_pac_claim_cmp_with_args(self,
1596 self.assertIsInstance(expression, str)
1598 if outcome is CRASHES_WINDOWS and not self.crash_windows:
1599 self.skipTest('test crashes Windows servers')
1601 if claim_map is None:
1606 def get_claim_id(claim_name):
1607 claim = claim_ids.get(claim_name)
1609 claim = claim_map.pop(claim_name, None)
1611 claim = self.get_new_username()
1613 claim_ids[claim_name] = claim
1617 def formatted_claim_expression(expr):
1618 formatter = Formatter()
1621 for literal_text, field_name, format_spec, conversion in (
1622 formatter.parse(expr)):
1623 self.assertFalse(format_spec,
1624 f'format specifier ({format_spec}) should '
1625 f'not be specified')
1626 self.assertFalse(conversion,
1627 f'conversion ({conversion}) should not be '
1630 result.append(literal_text)
1632 if field_name is not None:
1633 self.assertTrue(field_name,
1634 'a field name should be specified')
1636 claim_id = get_claim_id(field_name)
1637 claim_id = self.escaped_claim_id(claim_id)
1638 result.append(f'@User.{claim_id}')
1640 return ''.join(result)
1642 # Construct the conditional ACE expression.
1643 expression = formatted_claim_expression(expression)
1645 self.assertFalse(claim_map, 'unused claim mapping(s) remain')
1647 # Create an authentication policy that will allow authentication when
1648 # the expression is true, and a second that will deny authentication in
1649 # the same circumstance. By observing the results of authenticating
1650 # against each of these policies in turn, we can determine whether the
1651 # expression evaluates to a True, False, or Unknown value.
1653 allowed_sddl = f'O:SYD:(XA;;CR;;;WD;({expression}))'
1654 denied_sddl = f'O:SYD:(XD;;CR;;;WD;({expression}))(A;;CR;;;WD)'
1656 allowed_policy = self.create_authn_policy(
1658 user_allowed_from=allowed_sddl)
1659 denied_policy = self.create_authn_policy(
1661 user_allowed_from=denied_sddl)
1663 # Create a user account assigned to each policy.
1664 allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1665 assigned_policy=allowed_policy)
1666 denied_creds = self._get_creds(account_type=self.AccountType.USER,
1667 assigned_policy=denied_policy)
1669 # Create a computer account.
1670 mach_creds = self.get_cached_creds(
1671 account_type=self.AccountType.COMPUTER)
1673 def expected_values(val):
1674 if isinstance(val, (str, bytes)):
1679 # Fetch the computer account’s TGT.
1680 armor_tgt = self.get_tgt(mach_creds)
1683 # Replace the claims in the PAC with our own.
1684 armor_tgt = self.modified_ticket(
1686 modify_pac_fn=partial(self.set_pac_claims,
1687 client_claims=pac_claims,
1688 claim_ids=claim_ids),
1689 checksum_keys=self.get_krbtgt_checksum_key())
1691 # The first or the second authentication request is expected to succeed
1692 # if the outcome is True or False, respectively. An Unknown outcome,
1693 # represented by None, will result in a policy error in either case.
1695 allowed_error, denied_error = 0, KDC_ERR_POLICY
1696 elif outcome is False:
1697 allowed_error, denied_error = KDC_ERR_POLICY, 0
1698 elif outcome is None:
1699 allowed_error, denied_error = KDC_ERR_POLICY, KDC_ERR_POLICY
1701 allowed_error, denied_error = outcome, outcome
1703 # Attempt to authenticate and ensure that we observe the expected
1705 self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1706 expected_error=allowed_error)
1707 self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1708 expected_error=denied_error)
1710 def test_rbcd_without_aa_asserted_identity(self):
1712 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1713 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1716 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1717 service_sids=service_sids,
1718 code=KDC_ERR_BADOPTION,
1719 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1720 edata=self.expect_padata_outer)
1722 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1723 service_sids=service_sids,
1724 code=KDC_ERR_POLICY,
1725 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1726 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1727 reason=AuditReason.ACCESS_DENIED,
1728 edata=self.expect_padata_outer)
1730 def test_rbcd_with_aa_asserted_identity(self):
1732 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1733 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1734 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1737 expected_groups = service_sids | {
1738 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1741 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1742 service_sids=service_sids,
1743 expected_groups=expected_groups)
1745 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1746 service_sids=service_sids,
1747 expected_groups=expected_groups)
1749 def test_rbcd_without_service_asserted_identity(self):
1751 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1752 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1755 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1756 service_sids=service_sids,
1757 code=KDC_ERR_BADOPTION,
1758 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1759 edata=self.expect_padata_outer)
1761 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1762 service_sids=service_sids,
1763 code=KDC_ERR_POLICY,
1764 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1765 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1766 reason=AuditReason.ACCESS_DENIED,
1767 edata=self.expect_padata_outer)
1769 def test_rbcd_with_service_asserted_identity(self):
1771 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1772 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1773 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1777 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1778 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1779 # The Application Authority Asserted Identity SID has replaced the
1780 # Service Asserted Identity SID.
1781 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1782 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1785 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1786 service_sids=service_sids,
1787 expected_groups=expected_groups)
1789 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1790 service_sids=service_sids,
1791 expected_groups=expected_groups)
1793 def test_rbcd_without_claims_valid(self):
1795 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1796 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1799 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1800 service_sids=service_sids,
1801 code=KDC_ERR_BADOPTION,
1802 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1803 edata=self.expect_padata_outer)
1805 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1806 service_sids=service_sids,
1807 code=KDC_ERR_POLICY,
1808 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1809 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1810 reason=AuditReason.ACCESS_DENIED,
1811 edata=self.expect_padata_outer)
1813 def test_rbcd_with_claims_valid(self):
1815 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1816 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1817 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1820 expected_groups = service_sids | {
1821 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1824 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1825 service_sids=service_sids,
1826 expected_groups=expected_groups)
1828 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1829 service_sids=service_sids,
1830 expected_groups=expected_groups)
1832 def test_rbcd_without_compounded_authentication(self):
1834 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1835 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1838 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1839 service_sids=service_sids,
1840 code=KDC_ERR_BADOPTION,
1841 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1842 edata=self.expect_padata_outer)
1844 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1845 service_sids=service_sids,
1846 code=KDC_ERR_POLICY,
1847 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1848 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1849 reason=AuditReason.ACCESS_DENIED,
1850 edata=self.expect_padata_outer)
1852 def test_rbcd_with_compounded_authentication(self):
1854 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1855 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1856 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1860 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1861 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1862 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1863 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1866 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1867 service_sids=service_sids,
1868 expected_groups=expected_groups)
1870 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1871 service_sids=service_sids,
1872 expected_groups=expected_groups)
1874 def test_rbcd_client_without_aa_asserted_identity(self):
1876 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1877 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1880 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1881 client_sids=client_sids)
1883 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1884 client_sids=client_sids)
1886 def test_rbcd_client_with_aa_asserted_identity(self):
1888 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1889 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1890 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1893 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1894 client_sids=client_sids,
1895 expected_groups=client_sids)
1897 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1898 client_sids=client_sids,
1899 expected_groups=client_sids)
1901 def test_rbcd_client_without_service_asserted_identity(self):
1903 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1904 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1907 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1908 client_sids=client_sids,
1909 code=KDC_ERR_BADOPTION,
1910 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1911 edata=self.expect_padata_outer)
1913 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1914 client_sids=client_sids,
1915 code=KDC_ERR_POLICY,
1916 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1917 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1918 reason=AuditReason.ACCESS_DENIED,
1919 edata=self.expect_padata_outer)
1921 def test_rbcd_client_with_service_asserted_identity(self):
1923 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1924 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1925 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1928 self._rbcd(f'Not_Member_of SID({self.service_asserted_identity})',
1929 client_sids=client_sids,
1930 expected_groups=client_sids)
1932 self._rbcd(target_policy=f'Not_Member_of SID({self.service_asserted_identity})',
1933 client_sids=client_sids,
1934 expected_groups=client_sids)
1936 def test_rbcd_client_without_claims_valid(self):
1938 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1939 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1942 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1943 client_sids=client_sids)
1945 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1946 client_sids=client_sids)
1948 def test_rbcd_client_with_claims_valid(self):
1950 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1951 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1952 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1955 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1956 client_sids=client_sids,
1957 expected_groups=client_sids)
1959 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1960 client_sids=client_sids,
1961 expected_groups=client_sids)
1963 def test_rbcd_client_without_compounded_authentication(self):
1965 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1966 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1969 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1970 client_sids=client_sids,
1971 code=KDC_ERR_BADOPTION,
1972 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1973 edata=self.expect_padata_outer)
1975 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1976 client_sids=client_sids,
1977 code=KDC_ERR_POLICY,
1978 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1979 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1980 reason=AuditReason.ACCESS_DENIED,
1981 edata=self.expect_padata_outer)
1983 def test_rbcd_client_with_compounded_authentication(self):
1985 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1986 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1987 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1990 self._rbcd(f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1991 client_sids=client_sids,
1992 expected_groups=client_sids)
1994 self._rbcd(target_policy=f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1995 client_sids=client_sids,
1996 expected_groups=client_sids)
1998 def test_rbcd_device_without_aa_asserted_identity(self):
2000 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2001 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2004 self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2005 device_sids=device_sids,
2006 code=KDC_ERR_BADOPTION,
2007 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2008 edata=self.expect_padata_outer)
2010 self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2011 device_sids=device_sids,
2012 code=KDC_ERR_POLICY,
2013 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2014 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2015 reason=AuditReason.ACCESS_DENIED,
2016 edata=self.expect_padata_outer)
2018 def test_rbcd_device_without_aa_asserted_identity_not_memberof(self):
2020 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2021 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2024 self._rbcd(f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2025 device_sids=device_sids)
2027 self._rbcd(target_policy=f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2028 device_sids=device_sids)
2030 def test_rbcd_device_with_aa_asserted_identity(self):
2032 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2033 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2034 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2037 self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2038 device_sids=device_sids)
2040 self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2041 device_sids=device_sids)
2043 def test_rbcd_device_without_service_asserted_identity(self):
2045 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2046 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2049 self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2050 device_sids=device_sids,
2051 code=KDC_ERR_BADOPTION,
2052 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2053 edata=self.expect_padata_outer)
2055 self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2056 device_sids=device_sids,
2057 code=KDC_ERR_POLICY,
2058 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2059 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2060 reason=AuditReason.ACCESS_DENIED,
2061 edata=self.expect_padata_outer)
2063 def test_rbcd_device_with_service_asserted_identity(self):
2065 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2066 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2067 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2070 self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2071 device_sids=device_sids)
2073 self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2074 device_sids=device_sids)
2076 def test_rbcd_device_without_claims_valid(self):
2078 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2079 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2082 self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2083 device_sids=device_sids,
2084 code=KDC_ERR_BADOPTION,
2085 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2086 edata=self.expect_padata_outer)
2088 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2089 device_sids=device_sids,
2090 code=KDC_ERR_POLICY,
2091 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2092 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2093 reason=AuditReason.ACCESS_DENIED,
2094 edata=self.expect_padata_outer)
2096 def test_rbcd_device_with_claims_valid(self):
2098 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2099 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2100 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2103 self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2104 device_sids=device_sids)
2106 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2107 device_sids=device_sids)
2109 def test_rbcd_device_without_compounded_auth(self):
2111 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2112 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2115 self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2116 device_sids=device_sids,
2117 code=KDC_ERR_BADOPTION,
2118 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2119 edata=self.expect_padata_outer)
2121 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2122 device_sids=device_sids,
2123 code=KDC_ERR_POLICY,
2124 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2125 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2126 reason=AuditReason.ACCESS_DENIED,
2127 edata=self.expect_padata_outer)
2129 def test_rbcd_device_with_compounded_auth(self):
2131 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2132 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2133 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2136 self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2137 device_sids=device_sids)
2139 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2140 device_sids=device_sids)
2142 def test_rbcd(self):
2143 self._rbcd('Member_of SID({service_sid})')
2145 def test_rbcd_device_from_rodc(self):
2146 self._rbcd('Member_of SID({service_sid})',
2147 device_from_rodc=True,
2148 code=CRASHES_WINDOWS)
2150 def test_rbcd_service_from_rodc(self):
2151 self._rbcd('Member_of SID({service_sid})',
2152 service_from_rodc=True,
2153 code=KDC_ERR_BADOPTION,
2154 edata=self.expect_padata_outer)
2156 def test_rbcd_device_and_service_from_rodc(self):
2157 self._rbcd('Member_of SID({service_sid})',
2158 service_from_rodc=True,
2159 device_from_rodc=True,
2160 code=CRASHES_WINDOWS)
2162 def test_rbcd_client_from_rodc(self):
2163 self._rbcd('Member_of SID({service_sid})',
2164 client_from_rodc=True,
2165 code=KDC_ERR_MODIFIED,
2166 edata=self.expect_padata_outer)
2168 def test_rbcd_client_and_device_from_rodc(self):
2169 self._rbcd('Member_of SID({service_sid})',
2170 client_from_rodc=True,
2171 device_from_rodc=True,
2172 code=CRASHES_WINDOWS)
2174 def test_rbcd_client_and_service_from_rodc(self):
2175 self._rbcd('Member_of SID({service_sid})',
2176 client_from_rodc=True,
2177 service_from_rodc=True,
2178 code=KDC_ERR_BADOPTION,
2179 edata=self.expect_padata_outer)
2181 def test_rbcd_all_from_rodc(self):
2182 self._rbcd('Member_of SID({service_sid})',
2183 client_from_rodc=True,
2184 service_from_rodc=True,
2185 device_from_rodc=True,
2186 code=CRASHES_WINDOWS)
2189 rbcd_expression=None,
2193 event=AuditEvent.OK,
2194 reason=AuditReason.NONE,
2197 client_from_rodc=False,
2198 service_from_rodc=False,
2199 device_from_rodc=False,
2203 service_claims=None,
2206 expected_groups=None,
2207 expected_device_groups=None,
2208 expected_claims=None):
2209 if code is CRASHES_WINDOWS and not self.crash_windows:
2210 self.skipTest('test crashes Windows servers')
2212 samdb = self.get_samdb()
2213 functional_level = self.get_domain_functional_level(samdb)
2215 if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2216 self.skipTest('RBCD requires FL2008')
2218 domain_sid_str = samdb.get_domain_sid()
2219 domain_sid = security.dom_sid(domain_sid_str)
2221 client_creds = self.get_cached_creds(
2222 account_type=self.AccountType.USER,
2224 'allowed_replication_mock': client_from_rodc,
2225 'revealed_to_mock_rodc': client_from_rodc,
2227 client_sid = client_creds.get_sid()
2229 client_username = client_creds.get_username()
2230 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2231 names=[client_username])
2233 client_tkt_options = 'forwardable'
2234 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2236 checksum_key = self.get_krbtgt_checksum_key()
2238 if client_from_rodc or service_from_rodc or device_from_rodc:
2239 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2240 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2241 rodc_checksum_key = {
2242 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2245 client_tgt = self.get_tgt(client_creds,
2246 kdc_options=client_tkt_options,
2247 expected_flags=expected_flags)
2249 # Create a machine account with which to perform FAST.
2250 mach_creds = self.get_cached_creds(
2251 account_type=self.AccountType.COMPUTER,
2253 'allowed_replication_mock': device_from_rodc,
2254 'revealed_to_mock_rodc': device_from_rodc,
2256 mach_tgt = self.get_tgt(mach_creds)
2257 device_modify_pac_fn = []
2258 if device_sids is not None:
2259 device_modify_pac_fn.append(partial(self.set_pac_sids,
2260 new_sids=device_sids))
2261 if device_claims is not None:
2262 device_modify_pac_fn.append(partial(self.set_pac_claims,
2263 client_claims=device_claims))
2264 mach_tgt = self.modified_ticket(
2266 modify_pac_fn=device_modify_pac_fn,
2267 new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2268 checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2270 service_creds = self.get_cached_creds(
2271 account_type=self.AccountType.COMPUTER,
2274 'allowed_replication_mock': service_from_rodc,
2275 'revealed_to_mock_rodc': service_from_rodc,
2277 service_tgt = self.get_tgt(service_creds)
2279 service_modify_pac_fn = []
2280 if service_sids is not None:
2281 service_modify_pac_fn.append(partial(self.set_pac_sids,
2282 new_sids=service_sids))
2283 if service_claims is not None:
2284 service_modify_pac_fn.append(partial(self.set_pac_claims,
2285 client_claims=service_claims))
2286 service_tgt = self.modified_ticket(
2288 modify_pac_fn=service_modify_pac_fn,
2289 new_ticket_key=rodc_krbtgt_key if service_from_rodc else None,
2290 checksum_keys=rodc_checksum_key if service_from_rodc else checksum_key)
2292 if target_policy is None:
2294 assigned_policy = None
2296 sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(service_sid=service_creds.get_sid())}))'
2297 policy = self.create_authn_policy(enforced=True,
2298 computer_allowed_to=sddl)
2299 assigned_policy = str(policy.dn)
2301 if rbcd_expression is not None:
2302 sddl = f'O:SYD:(XA;;CR;;;WD;({rbcd_expression.format(service_sid=service_creds.get_sid())}))'
2304 sddl = 'O:SYD:(A;;CR;;;WD)'
2305 descriptor = security.descriptor.from_sddl(sddl, domain_sid)
2306 descriptor = ndr_pack(descriptor)
2308 # Create a target account with the assigned policy.
2309 target_creds = self.get_cached_creds(
2310 account_type=self.AccountType.COMPUTER,
2312 'assigned_policy': assigned_policy,
2313 'additional_details': (
2314 ('msDS-AllowedToActOnBehalfOfOtherIdentity', descriptor),
2318 client_service_tkt = self.get_service_ticket(
2321 kdc_options=client_tkt_options,
2322 expected_flags=expected_flags)
2323 client_modify_pac_fn = []
2324 if client_sids is not None:
2325 client_modify_pac_fn.append(partial(self.set_pac_sids,
2326 new_sids=client_sids))
2327 if client_claims is not None:
2328 client_modify_pac_fn.append(partial(self.set_pac_claims,
2329 client_claims=client_claims))
2330 client_service_tkt = self.modified_ticket(client_service_tkt,
2331 modify_pac_fn=client_modify_pac_fn,
2332 checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2334 kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
2336 target_decryption_key = self.TicketDecryptionKey_from_creds(
2338 target_etypes = target_creds.tgs_supported_enctypes
2340 service_name = service_creds.get_username()
2341 if service_name[-1] == '$':
2342 service_name = service_name[:-1]
2343 expected_transited_services = [
2344 f'host/{service_name}@{service_creds.get_realm()}'
2347 expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2348 expected_device_groups = self.map_sids(expected_device_groups, None, domain_sid_str)
2350 # Show that obtaining a service ticket with RBCD is allowed.
2351 self._tgs_req(service_tgt, code, service_creds, target_creds,
2353 kdc_options=kdc_options,
2354 pac_options='1001', # supports claims, RBCD
2355 expected_cname=client_cname,
2356 expected_account_name=client_username,
2357 additional_ticket=client_service_tkt,
2358 decryption_key=target_decryption_key,
2359 expected_sid=client_sid,
2360 expected_groups=expected_groups,
2361 expect_device_info=bool(expected_device_groups) or None,
2362 expected_device_domain_sid=domain_sid_str,
2363 expected_device_groups=expected_device_groups,
2364 expect_client_claims=bool(expected_claims) or None,
2365 expected_client_claims=expected_claims,
2366 expected_supported_etypes=target_etypes,
2367 expected_proxy_target=target_creds.get_spn(),
2368 expected_transited_services=expected_transited_services,
2369 expected_status=status,
2373 effective_client_creds = service_creds
2375 effective_client_creds = client_creds
2377 self.check_tgs_log(effective_client_creds, target_creds,
2379 checked_creds=service_creds,
2384 def test_tgs_without_aa_asserted_identity(self):
2386 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2387 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2390 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2391 client_sids=client_sids,
2392 expected_groups=client_sids,
2393 code=KDC_ERR_POLICY,
2394 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2395 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2396 reason=AuditReason.ACCESS_DENIED,
2397 edata=self.expect_padata_outer)
2399 def test_tgs_without_aa_asserted_identity_client_from_rodc(self):
2401 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2402 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2405 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2406 client_from_rodc=True,
2407 client_sids=client_sids,
2408 expected_groups=client_sids,
2409 code=KDC_ERR_POLICY,
2410 edata=self.expect_padata_outer)
2412 def test_tgs_without_aa_asserted_identity_device_from_rodc(self):
2414 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2415 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2418 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2419 device_from_rodc=True,
2420 client_sids=client_sids,
2421 expected_groups=client_sids,
2422 code=CRASHES_WINDOWS)
2424 def test_tgs_without_aa_asserted_identity_both_from_rodc(self):
2426 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2427 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2430 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2431 client_from_rodc=True,
2432 device_from_rodc=True,
2433 client_sids=client_sids,
2434 expected_groups=client_sids,
2435 code=CRASHES_WINDOWS)
2437 def test_tgs_with_aa_asserted_identity(self):
2439 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2440 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2441 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2444 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2445 client_sids=client_sids,
2446 expected_groups=client_sids)
2448 def test_tgs_with_aa_asserted_identity_client_from_rodc(self):
2450 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2451 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2452 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2455 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2456 client_from_rodc=True,
2457 client_sids=client_sids,
2458 expected_groups=client_sids,
2459 code=KDC_ERR_POLICY,
2460 edata=self.expect_padata_outer)
2462 def test_tgs_with_aa_asserted_identity_device_from_rodc(self):
2464 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2465 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2466 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2469 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2470 device_from_rodc=True,
2471 client_sids=client_sids,
2472 expected_groups=client_sids,
2473 code=CRASHES_WINDOWS)
2475 def test_tgs_with_aa_asserted_identity_both_from_rodc(self):
2477 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2478 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2479 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2482 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2483 client_from_rodc=True,
2484 device_from_rodc=True,
2485 client_sids=client_sids,
2486 expected_groups=client_sids,
2487 code=CRASHES_WINDOWS)
2489 def test_tgs_without_service_asserted_identity(self):
2491 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2492 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2495 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2496 client_sids=client_sids,
2497 expected_groups=client_sids,
2498 code=KDC_ERR_POLICY,
2499 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2500 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2501 reason=AuditReason.ACCESS_DENIED,
2502 edata=self.expect_padata_outer)
2504 def test_tgs_without_service_asserted_identity_client_from_rodc(self):
2506 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2507 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2510 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2511 client_from_rodc=True,
2512 client_sids=client_sids,
2513 expected_groups=client_sids,
2514 code=KDC_ERR_POLICY,
2515 edata=self.expect_padata_outer)
2517 def test_tgs_without_service_asserted_identity_device_from_rodc(self):
2519 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2520 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2523 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2524 device_from_rodc=True,
2525 client_sids=client_sids,
2526 expected_groups=client_sids,
2527 code=CRASHES_WINDOWS)
2529 def test_tgs_without_service_asserted_identity_both_from_rodc(self):
2531 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2532 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2535 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2536 client_from_rodc=True,
2537 device_from_rodc=True,
2538 client_sids=client_sids,
2539 expected_groups=client_sids,
2540 code=CRASHES_WINDOWS)
2542 def test_tgs_with_service_asserted_identity(self):
2544 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2545 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2546 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2549 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2550 client_sids=client_sids,
2551 expected_groups=client_sids)
2553 def test_tgs_with_service_asserted_identity_client_from_rodc(self):
2555 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2556 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2557 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2560 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2561 client_from_rodc=True,
2562 client_sids=client_sids,
2563 expected_groups=client_sids,
2564 code=KDC_ERR_POLICY,
2565 edata=self.expect_padata_outer)
2567 def test_tgs_with_service_asserted_identity_device_from_rodc(self):
2569 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2570 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2571 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2574 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2575 device_from_rodc=True,
2576 client_sids=client_sids,
2577 expected_groups=client_sids,
2578 code=CRASHES_WINDOWS)
2580 def test_tgs_with_service_asserted_identity_both_from_rodc(self):
2582 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2583 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2584 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2587 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2588 client_from_rodc=True,
2589 device_from_rodc=True,
2590 client_sids=client_sids,
2591 expected_groups=client_sids,
2592 code=CRASHES_WINDOWS)
2594 def test_tgs_without_claims_valid(self):
2596 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2597 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2600 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2601 client_sids=client_sids,
2602 expected_groups=client_sids,
2603 code=KDC_ERR_POLICY,
2604 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2605 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2606 reason=AuditReason.ACCESS_DENIED,
2607 edata=self.expect_padata_outer)
2609 def test_tgs_without_claims_valid_client_from_rodc(self):
2611 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2612 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2615 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2616 client_from_rodc=True,
2617 client_sids=client_sids,
2618 expected_groups=client_sids,
2619 code=KDC_ERR_POLICY,
2620 edata=self.expect_padata_outer)
2622 def test_tgs_without_claims_valid_device_from_rodc(self):
2624 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2625 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2628 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2629 device_from_rodc=True,
2630 client_sids=client_sids,
2631 expected_groups=client_sids,
2632 code=CRASHES_WINDOWS)
2634 def test_tgs_without_claims_valid_both_from_rodc(self):
2636 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2637 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2640 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2641 client_from_rodc=True,
2642 device_from_rodc=True,
2643 client_sids=client_sids,
2644 expected_groups=client_sids,
2645 code=CRASHES_WINDOWS)
2647 def test_tgs_with_claims_valid(self):
2649 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2650 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2651 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2654 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2655 client_sids=client_sids,
2656 expected_groups=client_sids)
2658 def test_tgs_with_claims_valid_client_from_rodc(self):
2660 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2661 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2662 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2665 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2666 client_from_rodc=True,
2667 client_sids=client_sids,
2668 expected_groups=client_sids,
2669 code=KDC_ERR_POLICY,
2670 edata=self.expect_padata_outer)
2672 def test_tgs_with_claims_valid_device_from_rodc(self):
2674 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2675 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2676 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2679 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2680 device_from_rodc=True,
2681 client_sids=client_sids,
2682 expected_groups=client_sids,
2683 code=CRASHES_WINDOWS)
2685 def test_tgs_with_claims_valid_both_from_rodc(self):
2687 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2688 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2689 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2692 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2693 client_from_rodc=True,
2694 device_from_rodc=True,
2695 client_sids=client_sids,
2696 expected_groups=client_sids,
2697 code=CRASHES_WINDOWS)
2703 event=AuditEvent.OK,
2704 reason=AuditReason.NONE,
2707 client_from_rodc=False,
2708 device_from_rodc=False,
2713 expected_groups=None,
2714 expected_device_groups=None,
2715 expected_claims=None):
2716 if code is CRASHES_WINDOWS and not self.crash_windows:
2717 self.skipTest('test crashes Windows servers')
2719 samdb = self.get_samdb()
2720 functional_level = self.get_domain_functional_level(samdb)
2722 if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2723 self.skipTest('RBCD requires FL2008')
2725 domain_sid_str = samdb.get_domain_sid()
2727 client_creds = self.get_cached_creds(
2728 account_type=self.AccountType.USER,
2730 'allowed_replication_mock': client_from_rodc,
2731 'revealed_to_mock_rodc': client_from_rodc,
2733 client_sid = client_creds.get_sid()
2735 client_username = client_creds.get_username()
2736 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2737 names=[client_username])
2739 client_tkt_options = 'forwardable'
2740 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2742 checksum_key = self.get_krbtgt_checksum_key()
2744 if client_from_rodc or device_from_rodc:
2745 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2746 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2747 rodc_checksum_key = {
2748 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2751 client_tgt = self.get_tgt(client_creds,
2752 kdc_options=client_tkt_options,
2753 expected_flags=expected_flags)
2755 client_modify_pac_fn = []
2756 if client_sids is not None:
2757 client_modify_pac_fn.append(partial(self.set_pac_sids,
2758 new_sids=client_sids))
2759 if client_claims is not None:
2760 client_modify_pac_fn.append(partial(self.set_pac_claims,
2761 client_claims=client_claims))
2762 client_tgt = self.modified_ticket(
2764 modify_pac_fn=client_modify_pac_fn,
2765 new_ticket_key=rodc_krbtgt_key if client_from_rodc else None,
2766 checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2768 # Create a machine account with which to perform FAST.
2769 mach_creds = self.get_cached_creds(
2770 account_type=self.AccountType.COMPUTER,
2772 'allowed_replication_mock': device_from_rodc,
2773 'revealed_to_mock_rodc': device_from_rodc,
2775 mach_tgt = self.get_tgt(mach_creds)
2776 device_modify_pac_fn = []
2777 if device_sids is not None:
2778 device_modify_pac_fn.append(partial(self.set_pac_sids,
2779 new_sids=device_sids))
2780 if device_claims is not None:
2781 device_modify_pac_fn.append(partial(self.set_pac_claims,
2782 client_claims=device_claims))
2783 mach_tgt = self.modified_ticket(
2785 modify_pac_fn=device_modify_pac_fn,
2786 new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2787 checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2789 if target_policy is None:
2791 assigned_policy = None
2793 sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(client_sid=client_creds.get_sid())}))'
2794 policy = self.create_authn_policy(enforced=True,
2795 computer_allowed_to=sddl)
2796 assigned_policy = str(policy.dn)
2798 # Create a target account with the assigned policy.
2799 target_creds = self.get_cached_creds(
2800 account_type=self.AccountType.COMPUTER,
2801 opts={'assigned_policy': assigned_policy})
2803 target_decryption_key = self.TicketDecryptionKey_from_creds(
2805 target_etypes = target_creds.tgs_supported_enctypes
2807 expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2808 expected_device_groups = self.map_sids(expected_device_groups, None, domain_sid_str)
2810 # Show that obtaining a service ticket is allowed.
2811 self._tgs_req(client_tgt, code, client_creds, target_creds,
2813 expected_cname=client_cname,
2814 expected_account_name=client_username,
2815 decryption_key=target_decryption_key,
2816 expected_sid=client_sid,
2817 expected_groups=expected_groups,
2818 expect_device_info=bool(expected_device_groups) or None,
2819 expected_device_domain_sid=domain_sid_str,
2820 expected_device_groups=expected_device_groups,
2821 expect_client_claims=bool(expected_claims) or None,
2822 expected_client_claims=expected_claims,
2823 expected_supported_etypes=target_etypes,
2824 expected_status=status,
2827 self.check_tgs_log(client_creds, target_creds,
2829 checked_creds=client_creds,
2834 def test_conditional_ace_allowed_from_user_deny(self):
2835 # Create a machine account with which to perform FAST.
2836 mach_creds = self.get_cached_creds(
2837 account_type=self.AccountType.COMPUTER)
2838 mach_tgt = self.get_tgt(mach_creds)
2840 # Create an authentication policy that explicitly denies the machine
2841 # account for a user.
2842 allowed = 'O:SYD:(A;;CR;;;WD)'
2843 denied = f'O:SYD:(XD;;CR;;;{mach_creds.get_sid()};(abc))'
2844 policy = self.create_authn_policy(enforced=True,
2845 user_allowed_from=denied,
2846 service_allowed_from=allowed)
2848 # Create a user account with the assigned policy.
2849 client_creds = self._get_creds(account_type=self.AccountType.USER,
2850 assigned_policy=policy)
2852 # Show that we get a policy error when trying to authenticate.
2853 self._get_tgt(client_creds, armor_tgt=mach_tgt,
2854 expected_error=KDC_ERR_POLICY)
2858 armor_creds=mach_creds,
2859 client_policy=policy,
2860 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2861 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
2862 reason=AuditReason.ACCESS_DENIED,
2863 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
2866 class TgsReqServicePolicyTests(ConditionalAceBaseTests):
2867 def test_pac_groups_not_present(self):
2868 """Test that authorization succeeds if the client does not belong to
2869 some required groups.
2873 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
2874 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
2877 # Create a machine account with which to perform FAST.
2878 mach_creds = self.get_cached_creds(
2879 account_type=self.AccountType.COMPUTER,
2880 opts={'id': 'device'})
2881 mach_tgt = self.get_tgt(mach_creds)
2883 # Create a user account.
2884 client_creds = self._get_creds(account_type=self.AccountType.USER)
2885 client_tgt = self.get_tgt(client_creds)
2887 # Create an authentication policy that requires the client to belong to
2889 target_policy_sddl = self.allow_if(
2890 f'Member_of {self.sddl_array_from_sids(required_sids)}')
2891 target_policy = self.create_authn_policy(
2892 enforced=True, computer_allowed_to=target_policy_sddl)
2894 # Create a target account with the assigned policy.
2895 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
2896 assigned_policy=target_policy)
2898 # Show that authorization fails.
2900 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
2902 expect_edata=self.expect_padata_outer,
2903 # We aren’t particular about whether or not we get an NTSTATUS.
2905 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
2908 client_creds, target_creds,
2909 policy=target_policy,
2910 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2911 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2912 reason=AuditReason.ACCESS_DENIED)
2914 def test_pac_groups_present(self):
2915 """Test that authorization succeeds if the client belongs to some
2920 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
2921 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
2924 client_sids = required_sids | {
2925 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2926 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2929 # Create a machine account with which to perform FAST.
2930 mach_creds = self.get_cached_creds(
2931 account_type=self.AccountType.COMPUTER,
2932 opts={'id': 'device'})
2933 mach_tgt = self.get_tgt(mach_creds)
2935 # Create a user account.
2936 client_creds = self._get_creds(account_type=self.AccountType.USER)
2937 client_tgt = self.get_tgt(client_creds)
2939 # Add the required groups to the client’s TGT.
2940 client_tgt = self.modified_ticket(
2942 modify_pac_fn=partial(self.set_pac_sids,
2943 new_sids=client_sids),
2944 checksum_keys=self.get_krbtgt_checksum_key())
2946 # Create an authentication policy that requires the client to belong to
2948 target_policy_sddl = self.allow_if(
2949 f'Member_of {self.sddl_array_from_sids(required_sids)}')
2950 target_policy = self.create_authn_policy(
2951 enforced=True, computer_allowed_to=target_policy_sddl)
2953 # Create a target account with the assigned policy.
2954 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
2955 assigned_policy=target_policy)
2957 # Show that authorization succeeds.
2958 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
2960 self.check_tgs_log(client_creds, target_creds,
2961 policy=target_policy)
2963 def test_pac_resource_groups_present_to_service_sid_compression(self):
2964 """Test that authorization succeeds if the client belongs to some
2965 required resource groups, and the request is to a service that supports
2970 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
2971 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
2972 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
2975 client_sids = required_sids | {
2976 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2977 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2980 # Create a machine account with which to perform FAST.
2981 mach_creds = self.get_cached_creds(
2982 account_type=self.AccountType.COMPUTER,
2983 opts={'id': 'device'})
2984 mach_tgt = self.get_tgt(mach_creds)
2986 # Create a user account.
2987 client_creds = self._get_creds(account_type=self.AccountType.USER)
2988 client_tgt = self.get_tgt(client_creds)
2990 # Add the required groups to the client’s TGT.
2991 client_tgt = self.modified_ticket(
2993 modify_pac_fn=partial(self.set_pac_sids,
2994 new_sids=client_sids),
2995 checksum_keys=self.get_krbtgt_checksum_key())
2997 # Create an authentication policy that requires the client to belong to
2999 target_policy_sddl = self.allow_if(
3000 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3001 target_policy = self.create_authn_policy(
3002 enforced=True, computer_allowed_to=target_policy_sddl)
3004 # Create a target account with the assigned policy.
3005 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3006 assigned_policy=target_policy)
3008 # Show that authorization fails.
3010 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3012 expect_edata=self.expect_padata_outer,
3013 # We aren’t particular about whether or not we get an NTSTATUS.
3015 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3018 client_creds, target_creds,
3019 policy=target_policy,
3020 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3021 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3022 reason=AuditReason.ACCESS_DENIED)
3024 def test_pac_resource_groups_present_to_service_no_sid_compression(self):
3025 """Test that authorization succeeds if the client belongs to some
3026 required resource groups, and the request is to a service that does not
3027 support SID compression.
3031 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3032 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3033 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3036 client_sids = required_sids | {
3037 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3038 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3041 # Create a machine account with which to perform FAST.
3042 mach_creds = self.get_cached_creds(
3043 account_type=self.AccountType.COMPUTER,
3044 opts={'id': 'device'})
3045 mach_tgt = self.get_tgt(mach_creds)
3047 # Create a user account.
3048 client_creds = self._get_creds(account_type=self.AccountType.USER)
3049 client_tgt = self.get_tgt(client_creds)
3051 # Add the required groups to the client’s TGT.
3052 client_tgt = self.modified_ticket(
3054 modify_pac_fn=partial(self.set_pac_sids,
3055 new_sids=client_sids),
3056 checksum_keys=self.get_krbtgt_checksum_key())
3058 # Create an authentication policy that requires the client to belong to
3060 target_policy_sddl = self.allow_if(
3061 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3062 target_policy = self.create_authn_policy(
3063 enforced=True, computer_allowed_to=target_policy_sddl)
3065 # Create a target account with the assigned policy.
3066 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3067 assigned_policy=target_policy,
3068 additional_details={
3069 'msDS-SupportedEncryptionTypes': str((
3070 security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
3071 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK) | (
3072 security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED))})
3074 # Show that authorization fails.
3076 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3078 expect_edata=self.expect_padata_outer,
3079 # We aren’t particular about whether or not we get an NTSTATUS.
3081 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3084 client_creds, target_creds,
3085 policy=target_policy,
3086 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3087 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3088 reason=AuditReason.ACCESS_DENIED)
3090 def test_pac_well_known_groups_not_present(self):
3091 """Test that authorization fails if the client does not belong to one
3092 or more required well‐known groups.
3096 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3097 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
3098 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3099 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3103 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3104 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3107 # Create a machine account with which to perform FAST.
3108 mach_creds = self.get_cached_creds(
3109 account_type=self.AccountType.COMPUTER,
3110 opts={'id': 'device'})
3111 mach_tgt = self.get_tgt(mach_creds)
3113 # Create a user account.
3114 client_creds = self._get_creds(account_type=self.AccountType.USER)
3115 client_tgt = self.get_tgt(client_creds)
3117 # Modify the client’s TGT to contain only the SID of the client’s
3119 client_tgt = self.modified_ticket(
3121 modify_pac_fn=partial(self.set_pac_sids,
3122 new_sids=client_sids),
3123 checksum_keys=self.get_krbtgt_checksum_key())
3125 # Create an authentication policy that requires the client to belong to
3127 target_policy_sddl = self.allow_if(
3128 f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
3129 target_policy = self.create_authn_policy(
3130 enforced=True, computer_allowed_to=target_policy_sddl)
3132 # Create a target account with the assigned policy.
3133 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3134 assigned_policy=target_policy)
3136 # Show that authorization fails.
3138 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3140 expect_edata=self.expect_padata_outer,
3141 # We aren’t particular about whether or not we get an NTSTATUS.
3143 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3146 client_creds, target_creds,
3147 policy=target_policy,
3148 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3149 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3150 reason=AuditReason.ACCESS_DENIED)
3152 def test_pac_device_info(self):
3153 self._run_pac_device_info_test()
3155 def test_pac_device_info_no_compound_id_support(self):
3156 self._run_pac_device_info_test(compound_id_support=False)
3158 def test_pac_device_info_no_claims_valid(self):
3159 self._run_pac_device_info_test(device_claims_valid=False)
3161 def _run_pac_device_info_test(self, compound_id_support=True, device_claims_valid=True):
3162 """Test the groups of the client and the device after performing a
3163 FAST‐armored TGS‐REQ.
3166 client_claim_id = 'the name of the client’s client claim'
3167 client_claim_value = 'the value of the client’s client claim'
3170 (claims.CLAIMS_SOURCE_TYPE_AD, [
3171 (client_claim_id, claims.CLAIM_TYPE_STRING, [client_claim_value]),
3175 expected_client_claims = {
3177 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
3178 'type': claims.CLAIM_TYPE_STRING,
3179 'values': (client_claim_value,),
3183 device_claim_id = 'the name of the device’s client claim'
3184 device_claim_value = 'the value of the device’s client claim'
3187 (claims.CLAIMS_SOURCE_TYPE_AD, [
3188 (device_claim_id, claims.CLAIM_TYPE_STRING, [device_claim_value]),
3192 if compound_id_support:
3193 expected_device_claims = {
3195 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
3196 'type': claims.CLAIM_TYPE_STRING,
3197 'values': (device_claim_value,),
3201 expected_device_claims = None
3203 # Create a machine account with which to perform FAST.
3204 mach_creds = self.get_cached_creds(
3205 account_type=self.AccountType.COMPUTER,
3206 opts={'id': 'device'})
3207 mach_tgt = self.get_tgt(mach_creds)
3210 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3211 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3212 # This to ensure we have EXTRA_SIDS set already, as
3213 # windows won't set that flag otherwise when adding one
3215 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3219 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3220 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3221 ('S-1-2-3-4', SidType.EXTRA_SID, self.resource_attrs),
3222 ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
3225 if device_claims_valid:
3226 device_sids.add((security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs))
3228 # Modify the machine account’s TGT to contain only the SID of the
3229 # machine account’s primary group.
3230 mach_tgt = self.modified_ticket(
3233 partial(self.set_pac_sids,
3234 new_sids=device_sids),
3235 partial(self.set_pac_claims, client_claims=device_claims),
3237 checksum_keys=self.get_krbtgt_checksum_key())
3239 # Create a user account.
3240 client_creds = self._get_creds(account_type=self.AccountType.USER)
3241 client_tgt = self.get_tgt(client_creds)
3243 # Modify the client’s TGT to contain only the SID of the client’s
3245 client_tgt = self.modified_ticket(
3248 partial(self.set_pac_sids,
3249 new_sids=client_sids),
3250 partial(self.set_pac_claims, client_claims=client_claims),
3252 checksum_keys=self.get_krbtgt_checksum_key())
3254 # Indicate that Compound Identity is supported.
3255 target_creds, _ = self.get_target(to_krbtgt=False, compound_id=compound_id_support)
3258 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3259 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3260 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3261 # The client’s groups are not to include the Asserted Identity and
3262 # Claims Valid SIDs.
3265 if compound_id_support:
3266 expected_sids.add((security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs))
3268 expected_device_sids = {
3269 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3270 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3271 ('S-1-2-3-4', SidType.EXTRA_SID, self.resource_attrs),
3272 ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
3275 if device_claims_valid:
3276 expected_device_sids.add(frozenset([(security.SID_CLAIMS_VALID, SidType.RESOURCE_SID, self.default_attrs)]))
3278 expected_device_sids = None
3280 samdb = self.get_samdb()
3281 domain_sid_str = samdb.get_domain_sid()
3283 expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3284 # The device SIDs will be put into the PAC unmodified.
3285 expected_device_sids = self.map_sids(expected_device_sids, None, domain_sid_str)
3287 # Show that authorization succeeds.
3288 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
3289 expected_groups=expected_sids,
3290 expect_device_info=bool(expected_device_sids) or None,
3291 expected_device_domain_sid=domain_sid_str,
3292 expected_device_groups=expected_device_sids,
3293 expect_client_claims=bool(expected_client_claims) or None,
3294 expected_client_claims=expected_client_claims,
3295 expect_device_claims=bool(expected_device_claims) or None,
3296 expected_device_claims=expected_device_claims)
3298 self.check_tgs_log(client_creds, target_creds)
3300 def test_pac_extra_sids_behaviour(self):
3301 """Test the groups of the client and the device after performing a
3302 FAST‐armored TGS‐REQ.
3305 # Create a machine account with which to perform FAST.
3306 mach_creds = self.get_cached_creds(
3307 account_type=self.AccountType.COMPUTER,
3308 opts={'id': 'device'})
3309 mach_tgt = self.get_tgt(mach_creds)
3312 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3313 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3316 # Create a user account.
3317 client_creds = self._get_creds(account_type=self.AccountType.USER)
3318 client_tgt = self.get_tgt(client_creds)
3320 # Modify the client’s TGT to contain only the SID of the client’s
3322 client_tgt = self.modified_ticket(
3324 modify_pac_fn=partial(self.set_pac_sids,
3325 new_sids=client_sids),
3326 checksum_keys=self.get_krbtgt_checksum_key())
3328 # Indicate that Compound Identity is supported.
3329 target_creds, _ = self.get_target(to_krbtgt=False, compound_id=True)
3332 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3333 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3334 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs)
3335 # The client’s groups are not to include the Asserted Identity and
3336 # Claims Valid SIDs.
3339 samdb = self.get_samdb()
3340 domain_sid_str = samdb.get_domain_sid()
3342 expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3344 # Show that authorization succeeds.
3345 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
3346 expected_groups=expected_sids)
3348 self.check_tgs_log(client_creds, target_creds)
3350 def test_pac_claims_not_present(self):
3351 """Test that authentication fails if the device does not have a
3355 claim_id = 'the name of the claim'
3356 claim_value = 'the value of the claim'
3358 # Create a machine account with which to perform FAST.
3359 mach_creds = self.get_cached_creds(
3360 account_type=self.AccountType.COMPUTER,
3361 opts={'id': 'device'})
3362 mach_tgt = self.get_tgt(mach_creds)
3364 # Create an authentication policy that requires the device to have a
3366 target_policy_sddl = self.allow_if(
3367 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3368 target_policy = self.create_authn_policy(
3369 enforced=True, computer_allowed_to=target_policy_sddl)
3371 # Create a user account.
3372 client_creds = self._get_creds(account_type=self.AccountType.USER)
3373 client_tgt = self.get_tgt(client_creds)
3375 # Create a target account with the assigned policy.
3376 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3377 assigned_policy=target_policy)
3379 # Show that authorization fails.
3381 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3383 expect_edata=self.expect_padata_outer,
3384 # We aren’t particular about whether or not we get an NTSTATUS.
3386 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3391 policy=target_policy,
3392 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3393 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3394 reason=AuditReason.ACCESS_DENIED)
3396 def test_pac_claims_present(self):
3397 """Test that authentication succeeds if the user has a required
3401 claim_id = 'the name of the claim'
3402 claim_value = 'the value of the claim'
3405 (claims.CLAIMS_SOURCE_TYPE_AD, [
3406 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3410 # Create a machine account with which to perform FAST.
3411 mach_creds = self.get_cached_creds(
3412 account_type=self.AccountType.COMPUTER,
3413 opts={'id': 'device'})
3414 mach_tgt = self.get_tgt(mach_creds)
3416 # Create an authentication policy that requires the user to have a
3418 target_policy_sddl = self.allow_if(
3419 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3420 target_policy = self.create_authn_policy(
3421 enforced=True, computer_allowed_to=target_policy_sddl)
3423 # Create a user account.
3424 client_creds = self._get_creds(account_type=self.AccountType.USER)
3425 client_tgt = self.get_tgt(client_creds)
3427 # Add the required claim to the client’s TGT.
3428 client_tgt = self.modified_ticket(
3430 modify_pac_fn=partial(self.set_pac_claims,
3431 client_claims=pac_claims),
3432 checksum_keys=self.get_krbtgt_checksum_key())
3434 # Create a target account with the assigned policy.
3435 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3436 assigned_policy=target_policy)
3438 # Show that authorization succeeds.
3439 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
3441 self.check_tgs_log(client_creds, target_creds,
3442 policy=target_policy)
3444 def test_pac_claims_invalid(self):
3445 """Test that authentication fails if the device’s required claim is not
3449 claim_id = 'the name of the claim'
3450 claim_value = 'the value of the claim'
3453 (claims.CLAIMS_SOURCE_TYPE_AD, [
3454 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3458 # The device’s SIDs do not include the Claims Valid SID.
3460 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3461 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3464 # Create a machine account with which to perform FAST.
3465 mach_creds = self.get_cached_creds(
3466 account_type=self.AccountType.COMPUTER,
3467 opts={'id': 'device'})
3468 mach_tgt = self.get_tgt(mach_creds)
3470 # Create an authentication policy that requires the device to have a
3472 target_policy_sddl = self.allow_if(
3473 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3474 target_policy = self.create_authn_policy(
3475 enforced=True, computer_allowed_to=target_policy_sddl)
3477 # Create a user account.
3478 client_creds = self._get_creds(account_type=self.AccountType.USER)
3479 client_tgt = self.get_tgt(client_creds)
3481 # Add the SIDs and the required claim to the client’s TGT.
3482 client_tgt = self.modified_ticket(
3485 partial(self.set_pac_claims, client_claims=pac_claims),
3486 partial(self.set_pac_sids, new_sids=device_sids)],
3487 checksum_keys=self.get_krbtgt_checksum_key())
3489 # Create a target account with the assigned policy.
3490 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3491 assigned_policy=target_policy)
3493 # Show that authorization fails.
3495 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3497 expect_edata=self.expect_padata_outer,
3498 # We aren’t particular about whether or not we get an NTSTATUS.
3500 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3505 policy=target_policy,
3506 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3507 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3508 reason=AuditReason.ACCESS_DENIED)
3510 def test_pac_device_claims_not_present(self):
3511 """Test that authorization fails if the device does not have a
3515 claim_id = 'the name of the claim'
3516 claim_value = 'the value of the claim'
3518 # Create a machine account with which to perform FAST.
3519 mach_creds = self.get_cached_creds(
3520 account_type=self.AccountType.COMPUTER,
3521 opts={'id': 'device'})
3522 mach_tgt = self.get_tgt(mach_creds)
3524 # Create an authentication policy that requires the device to have a
3525 # certain device claim.
3526 target_policy_sddl = self.allow_if(
3527 f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3528 target_policy = self.create_authn_policy(
3529 enforced=True, computer_allowed_to=target_policy_sddl)
3531 # Create a user account.
3532 client_creds = self._get_creds(account_type=self.AccountType.USER)
3533 client_tgt = self.get_tgt(client_creds)
3535 # Create a target account with the assigned policy.
3536 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3537 assigned_policy=target_policy)
3539 # Show that authorization fails.
3541 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3543 expect_edata=self.expect_padata_outer,
3544 # We aren’t particular about whether or not we get an NTSTATUS.
3546 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3551 policy=target_policy,
3552 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3553 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3554 reason=AuditReason.ACCESS_DENIED)
3556 def test_pac_device_claims_present(self):
3557 """Test that authorization succeeds if the device has a required claim.
3560 claim_id = 'the name of the claim'
3561 claim_value = 'the value of the claim'
3564 (claims.CLAIMS_SOURCE_TYPE_AD, [
3565 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3569 # Create a machine account with which to perform FAST.
3570 mach_creds = self.get_cached_creds(
3571 account_type=self.AccountType.COMPUTER,
3572 opts={'id': 'device'})
3573 mach_tgt = self.get_tgt(mach_creds)
3575 # Add the required claim to the machine account’s TGT.
3576 mach_tgt = self.modified_ticket(
3578 modify_pac_fn=partial(self.set_pac_claims,
3579 client_claims=pac_claims),
3580 checksum_keys=self.get_krbtgt_checksum_key())
3582 # Create an authentication policy that requires the device to have a
3583 # certain device claim.
3584 target_policy_sddl = self.allow_if(
3585 f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3586 target_policy = self.create_authn_policy(
3587 enforced=True, computer_allowed_to=target_policy_sddl)
3589 # Create a user account.
3590 client_creds = self._get_creds(account_type=self.AccountType.USER)
3591 client_tgt = self.get_tgt(client_creds)
3593 # Create a target account with the assigned policy.
3594 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3595 assigned_policy=target_policy)
3597 # Show that authorization succeeds.
3598 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
3600 self.check_tgs_log(client_creds, target_creds,
3601 policy=target_policy)
3603 def test_pac_device_claims_invalid(self):
3604 """Test that authorization fails if the device’s required claim is not
3608 claim_id = 'the name of the claim'
3609 claim_value = 'the value of the claim'
3612 (claims.CLAIMS_SOURCE_TYPE_AD, [
3613 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3617 # The device’s SIDs do not include the Claims Valid SID.
3619 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3620 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3623 # Create a machine account with which to perform FAST.
3624 mach_creds = self.get_cached_creds(
3625 account_type=self.AccountType.COMPUTER,
3626 opts={'id': 'device'})
3627 mach_tgt = self.get_tgt(mach_creds)
3629 # Add the SIDs and the required claim to the machine account’s TGT.
3630 mach_tgt = self.modified_ticket(
3633 partial(self.set_pac_claims, client_claims=pac_claims),
3634 partial(self.set_pac_sids, new_sids=device_sids)],
3635 checksum_keys=self.get_krbtgt_checksum_key())
3637 # Create an authentication policy that requires the device to have a
3639 target_policy_sddl = self.allow_if(
3640 f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3641 target_policy = self.create_authn_policy(
3642 enforced=True, computer_allowed_to=target_policy_sddl)
3644 # Create a user account.
3645 client_creds = self._get_creds(account_type=self.AccountType.USER)
3646 client_tgt = self.get_tgt(client_creds)
3648 # Create a target account with the assigned policy.
3649 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3650 assigned_policy=target_policy)
3652 # Show that authorization fails.
3654 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3656 expect_edata=self.expect_padata_outer,
3657 # We aren’t particular about whether or not we get an NTSTATUS.
3659 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3664 policy=target_policy,
3665 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3666 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3667 reason=AuditReason.ACCESS_DENIED)
3669 def test_pac_device_claims_invalid_no_attrs(self):
3670 """Test that authorization fails if the device’s required claim is not
3674 claim_id = 'the name of the claim'
3675 claim_value = 'the value of the claim'
3678 (claims.CLAIMS_SOURCE_TYPE_AD, [
3679 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3684 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3685 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3686 # The device’s SIDs include the Claims Valid SID, but it has no
3688 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, 0),
3691 # Create a machine account with which to perform FAST.
3692 mach_creds = self.get_cached_creds(
3693 account_type=self.AccountType.COMPUTER,
3694 opts={'id': 'device'})
3695 mach_tgt = self.get_tgt(mach_creds)
3697 # Add the SIDs and the required claim to the machine account’s TGT.
3698 mach_tgt = self.modified_ticket(
3701 partial(self.set_pac_claims, client_claims=pac_claims),
3702 partial(self.set_pac_sids, new_sids=device_sids)],
3703 checksum_keys=self.get_krbtgt_checksum_key())
3705 # Create an authentication policy that requires the device to have a
3707 target_policy_sddl = self.allow_if(
3708 f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3709 target_policy = self.create_authn_policy(
3710 enforced=True, computer_allowed_to=target_policy_sddl)
3712 # Create a user account.
3713 client_creds = self._get_creds(account_type=self.AccountType.USER)
3714 client_tgt = self.get_tgt(client_creds)
3716 # Create a target account with the assigned policy.
3717 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3718 assigned_policy=target_policy)
3720 # Show that authorization succeeds.
3721 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
3723 self.check_tgs_log(client_creds, target_creds,
3724 policy=target_policy)
3727 if __name__ == '__main__':
3728 global_asn1_print = False
3729 global_hexdump = False