2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2023
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys.path.insert(0, 'bin/python')
24 os.environ['PYTHONUNBUFFERED'] = '1'
26 from collections import OrderedDict
27 from functools import partial
29 from string import Formatter
33 from samba import dsdb, ntstatus
34 from samba.dcerpc import claims, krb5pac, security
35 from samba.ndr import ndr_pack, ndr_unpack
37 from samba.tests import DynamicTestCase, env_get_var_value
38 from samba.tests.krb5.authn_policy_tests import (
43 from samba.tests.krb5.raw_testcase import RawKerberosTest
44 from samba.tests.krb5.rfc4120_constants import (
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
53 SidType = RawKerberosTest.SidType
55 global_asn1_print = False
56 global_hexdump = False
59 # When used as a test outcome, indicates that the test can cause a Windows
60 # server to crash, and is to be run with caution.
61 CRASHES_WINDOWS = object()
64 class ConditionalAceBaseTests(AuthnPolicyBaseTests):
65 # Constants for group SID attributes.
66 default_attrs = security.SE_GROUP_DEFAULT_FLAGS
67 resource_attrs = default_attrs | security.SE_GROUP_RESOURCE
69 aa_asserted_identity = (
70 security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
71 service_asserted_identity = security.SID_SERVICE_ASSERTED_IDENTITY
81 self.do_asn1_print = global_asn1_print
82 self.do_hexdump = global_hexdump
85 samdb = self.get_samdb()
88 # Create a machine account with which to perform FAST.
89 cls._mach_creds = self.get_cached_creds(
90 account_type=self.AccountType.COMPUTER)
92 # Create some new groups.
94 group0_name = self.get_new_username()
95 group0_dn = self.create_group(samdb, group0_name)
96 cls._group0_sid = self.get_objectSid(samdb, group0_dn)
98 group1_name = self.get_new_username()
99 group1_dn = self.create_group(samdb, group1_name)
100 cls._group1_sid = self.get_objectSid(samdb, group1_dn)
102 # Create machine accounts with which to perform FAST that belong to
103 # various arrangements of the groups.
105 cls._member_of_both_creds = self.get_cached_creds(
106 account_type=self.AccountType.COMPUTER,
107 opts={'member_of': (group0_dn, group1_dn)})
109 cls._member_of_one_creds = self.get_cached_creds(
110 account_type=self.AccountType.COMPUTER,
111 opts={'member_of': (group1_dn,)})
113 # Create some authentication silos.
114 cls._unenforced_silo = self.create_authn_silo(enforced=False)
115 cls._enforced_silo = self.create_authn_silo(enforced=True)
117 # Create machine accounts with which to perform FAST that belong to
118 # the respective silos.
120 cls._member_of_unenforced_silo = self._get_creds(
121 account_type=self.AccountType.COMPUTER,
122 assigned_silo=self._unenforced_silo,
124 self.add_to_group(str(self._member_of_unenforced_silo.get_dn()),
125 self._unenforced_silo.dn,
126 'msDS-AuthNPolicySiloMembers',
129 cls._member_of_enforced_silo = self._get_creds(
130 account_type=self.AccountType.COMPUTER,
131 assigned_silo=self._enforced_silo,
133 self.add_to_group(str(self._member_of_enforced_silo.get_dn()),
134 self._enforced_silo.dn,
135 'msDS-AuthNPolicySiloMembers',
138 # Create a couple of multi‐valued string claims for testing claim
141 cls.claim0_attr = 'carLicense'
142 cls.claim0_id = self.get_new_username()
143 self.create_claim(cls.claim0_id,
145 attribute=cls.claim0_attr,
148 for_classes=['computer', 'user'],
149 value_type=claims.CLAIM_TYPE_STRING)
151 cls.claim1_attr = 'departmentNumber'
152 cls.claim1_id = self.get_new_username()
153 self.create_claim(cls.claim1_id,
155 attribute=cls.claim1_attr,
158 for_classes=['computer', 'user'],
159 value_type=claims.CLAIM_TYPE_STRING)
163 # For debugging purposes. Prints out the SDDL representation of
164 # authentication policy conditions set by the Windows GUI.
165 def _print_authn_policy_sddl(self, policy_id):
166 policy_dn = self.get_authn_policies_dn()
167 policy_dn.add_child(f'CN={policy_id}')
170 'msDS-ComputerAllowedToAuthenticateTo',
171 'msDS-ServiceAllowedToAuthenticateFrom',
172 'msDS-ServiceAllowedToAuthenticateTo',
173 'msDS-UserAllowedToAuthenticateFrom',
174 'msDS-UserAllowedToAuthenticateTo',
177 samdb = self.get_samdb()
178 res = samdb.search(policy_dn, scope=ldb.SCOPE_BASE, attrs=attrs)
179 self.assertEqual(1, len(res),
180 f'Authentication policy {policy_id} not found')
183 def print_sddl(attr):
184 sd = result.get(attr, idx=0)
188 sec_desc = ndr_unpack(security.descriptor, sd)
189 print(f'{attr}: {sec_desc.as_sddl()}')
194 def sddl_array_from_sids(self, sids):
195 def sddl_from_sid_entry(sid_entry):
196 sid, _, _ = sid_entry
199 return f"{{{', '.join(map(sddl_from_sid_entry, sids))}}}"
201 def allow_if(self, condition):
202 return f'O:SYD:(XA;;CR;;;WD;({condition}))'
205 def escaped_claim_id(claim_id):
206 escapes = '\x00\t\n\x0b\x0c\r !"%&()<=>|'
209 else f'%{ord(c):04x}'
214 class ConditionalAceTests(ConditionalAceBaseTests):
216 def setUpDynamicTestCases(cls):
217 FILTER = env_get_var_value('FILTER', allow_missing=True)
219 # These operators are arranged so that each operator precedes its own
221 op_names = OrderedDict([
222 ('!=', 'does not equal'),
225 ('<=', 'is less than or equals'),
226 ('<', 'is less than'),
228 ('>=', 'exceeds or equals'),
230 ('Not_Any_of', 'matches none of'),
231 ('Any_of', 'matches any of'),
232 ('Not_Contains', 'does not contain'),
233 ('Contains', 'contains'),
234 ('Not_Member_of_Any', 'the user belongs to none of'),
235 ('Not_Device_Member_of_Any', 'the device belongs to none of'), # TODO: no test for this yet
236 ('Device_Member_of_Any', 'the device belongs to any of'), # TODO: no test for this yet
237 ('Not_Device_Member_of', 'the device does not belong to'), # TODO: no test for this yet
238 ('Device_Member_of', 'the device belongs to'),
239 ('Not_Exists', 'there does not exist'),
240 ('Exists', 'there exists'),
241 ('Member_of_Any', 'the user belongs to any of'),
242 ('Not_Member_of', 'the user does not belong to'),
243 ('Member_of', 'the user belongs to'),
247 # This is a safety measure to ensure correct ordering of op_names
248 keys = list(op_names.keys())
249 for i in range(len(keys)):
250 for j in range(i + 1, len(keys)):
251 if keys[i] in keys[j]:
252 raise AssertionError((keys[i], keys[j]))
254 for case in cls.pac_claim_cases:
256 pac_claims, expression, outcome = case
259 pac_claims, expression, claim_map, outcome = case
261 raise AssertionError(
262 f'found {len(case)} items in case, expected 3–4')
264 expression_name = expression
265 for op, op_name in op_names.items():
266 expression_name = expression_name.replace(op, op_name)
268 name = f'{pac_claims}_{expression_name}'
270 if claim_map is not None:
271 name += f'_{claim_map}'
273 name = re.sub(r'\W+', '_', name)
275 name = f'{name[:125]}+{len(name) - 125}‐more'
277 if FILTER and not re.search(FILTER, name):
280 cls.generate_dynamic_test('test_pac_claim_cmp', name,
281 pac_claims, expression, claim_map,
284 for case in cls.claim_against_claim_cases:
285 lhs, op, rhs, outcome = case
286 op_name = op_names[op]
288 name = f'{lhs}_{op_name}_{rhs}'
290 name = re.sub(r'\W+', '_', name)
291 if FILTER and not re.search(FILTER, name):
294 cls.generate_dynamic_test('test_cmp', name,
295 lhs, op, rhs, outcome)
297 for case in cls.claim_against_literal_cases:
298 lhs, op, rhs, outcome = case
299 op_name = op_names[op]
301 name = f'{lhs}_{op_name}_literal_{rhs}'
303 name = re.sub(r'\W+', '_', name)
304 if FILTER and not re.search(FILTER, name):
307 cls.generate_dynamic_test('test_cmp', name,
308 lhs, op, rhs, outcome, True)
310 def test_allowed_from_member_of_each(self):
311 # Create an authentication policy that allows accounts belonging to
313 policy = self.create_authn_policy(
316 f'O:SYD:(XA;;CR;;;WD;(Member_of '
317 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
320 # Create a user account with the assigned policy.
321 client_creds = self._get_creds(account_type=self.AccountType.USER,
322 assigned_policy=policy)
324 # Show that we get a policy error if the machine account does not
325 # belong to both groups.
326 armor_tgt = self.get_tgt(self._member_of_one_creds)
327 self._get_tgt(client_creds, armor_tgt=armor_tgt,
328 expected_error=KDC_ERR_POLICY)
330 # Otherwise, authentication should succeed.
331 armor_tgt = self.get_tgt(self._member_of_both_creds)
332 self._get_tgt(client_creds, armor_tgt=armor_tgt,
335 def test_allowed_from_member_of_any(self):
336 # Create an authentication policy that allows accounts belonging to
338 policy = self.create_authn_policy(
341 f'O:SYD:(XA;;CR;;;WD;(Member_of_Any '
342 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
345 # Create a user account with the assigned policy.
346 client_creds = self._get_creds(account_type=self.AccountType.USER,
347 assigned_policy=policy)
349 # Show that we get a policy error if the machine account belongs to
351 armor_tgt = self.get_tgt(self._mach_creds)
352 self._get_tgt(client_creds, armor_tgt=armor_tgt,
353 expected_error=KDC_ERR_POLICY)
355 # Otherwise, authentication should succeed.
356 armor_tgt = self.get_tgt(self._member_of_one_creds)
357 self._get_tgt(client_creds, armor_tgt=armor_tgt,
360 def test_allowed_from_not_member_of_each(self):
361 # Create an authentication policy that allows accounts not belonging to
363 policy = self.create_authn_policy(
366 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of '
367 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
370 # Create a user account with the assigned policy.
371 client_creds = self._get_creds(account_type=self.AccountType.USER,
372 assigned_policy=policy)
374 # Show that we get a policy error if the machine account belongs to
376 armor_tgt = self.get_tgt(self._member_of_both_creds)
377 self._get_tgt(client_creds, armor_tgt=armor_tgt,
378 expected_error=KDC_ERR_POLICY)
380 # Otherwise, authentication should succeed.
381 armor_tgt = self.get_tgt(self._member_of_one_creds)
382 self._get_tgt(client_creds, armor_tgt=armor_tgt,
385 def test_allowed_from_not_member_of_any(self):
386 # Create an authentication policy that allows accounts belonging to
388 policy = self.create_authn_policy(
391 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of_Any '
392 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
395 # Create a user account with the assigned policy.
396 client_creds = self._get_creds(account_type=self.AccountType.USER,
397 assigned_policy=policy)
399 # Show that we get a policy error if the machine account belongs to one
401 armor_tgt = self.get_tgt(self._member_of_one_creds)
402 self._get_tgt(client_creds, armor_tgt=armor_tgt,
403 expected_error=KDC_ERR_POLICY)
405 # Otherwise, authentication should succeed.
406 armor_tgt = self.get_tgt(self._mach_creds)
407 self._get_tgt(client_creds, armor_tgt=armor_tgt,
410 def test_allowed_from_member_of_each_deny(self):
411 # Create an authentication policy that denies accounts belonging to
412 # both groups, and allows other accounts.
413 policy = self.create_authn_policy(
416 f'O:SYD:(XD;;CR;;;WD;(Member_of '
417 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
421 # Create a user account with the assigned policy.
422 client_creds = self._get_creds(account_type=self.AccountType.USER,
423 assigned_policy=policy)
425 # Show that we get a policy error if the machine account belongs to
427 armor_tgt = self.get_tgt(self._member_of_both_creds)
428 self._get_tgt(client_creds, armor_tgt=armor_tgt,
429 expected_error=KDC_ERR_POLICY)
431 # Otherwise, authentication should succeed.
432 armor_tgt = self.get_tgt(self._member_of_one_creds)
433 self._get_tgt(client_creds, armor_tgt=armor_tgt,
436 def test_allowed_from_member_of_any_deny(self):
437 # Create an authentication policy that denies accounts belonging to
438 # either group, and allows other accounts.
439 policy = self.create_authn_policy(
442 f'O:SYD:(XD;;CR;;;WD;(Member_of_Any '
443 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
447 # Create a user account with the assigned policy.
448 client_creds = self._get_creds(account_type=self.AccountType.USER,
449 assigned_policy=policy)
451 # Show that we get a policy error if the machine account belongs to
453 armor_tgt = self.get_tgt(self._member_of_one_creds)
454 self._get_tgt(client_creds, armor_tgt=armor_tgt,
455 expected_error=KDC_ERR_POLICY)
457 # Otherwise, authentication should succeed.
458 armor_tgt = self.get_tgt(self._mach_creds)
459 self._get_tgt(client_creds, armor_tgt=armor_tgt,
462 def test_allowed_from_not_member_of_each_deny(self):
463 # Create an authentication policy that denies accounts not belonging to
464 # both groups, and allows other accounts.
465 policy = self.create_authn_policy(
468 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of '
469 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
473 # Create a user account with the assigned policy.
474 client_creds = self._get_creds(account_type=self.AccountType.USER,
475 assigned_policy=policy)
477 # Show that we get a policy error if the machine account doesn’t belong
479 armor_tgt = self.get_tgt(self._member_of_one_creds)
480 self._get_tgt(client_creds, armor_tgt=armor_tgt,
481 expected_error=KDC_ERR_POLICY)
483 # Otherwise, authentication should succeed.
484 armor_tgt = self.get_tgt(self._member_of_both_creds)
485 self._get_tgt(client_creds, armor_tgt=armor_tgt,
488 def test_allowed_from_not_member_of_any_deny(self):
489 # Create an authentication policy that denies accounts belonging to
490 # neither group, and allows other accounts.
491 policy = self.create_authn_policy(
494 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of_Any '
495 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
499 # Create a user account with the assigned policy.
500 client_creds = self._get_creds(account_type=self.AccountType.USER,
501 assigned_policy=policy)
503 # Show that we get a policy error if the machine account belongs to
505 armor_tgt = self.get_tgt(self._mach_creds)
506 self._get_tgt(client_creds, armor_tgt=armor_tgt,
507 expected_error=KDC_ERR_POLICY)
509 # Otherwise, authentication should succeed.
510 armor_tgt = self.get_tgt(self._member_of_one_creds)
511 self._get_tgt(client_creds, armor_tgt=armor_tgt,
514 def test_allowed_from_unenforced_silo_equals(self):
515 # Create an authentication policy that allows accounts belonging to the
517 policy = self.create_authn_policy(
520 f'O:SYD:(XA;;CR;;;WD;'
521 f'(@User.ad://ext/AuthenticationSilo == '
522 f'"{self._unenforced_silo}"))'),
525 # Create a user account with the assigned policy.
526 client_creds = self._get_creds(account_type=self.AccountType.USER,
527 assigned_policy=policy)
529 # As the policy is unenforced, the ‘ad://ext/AuthenticationSilo’ claim
530 # will not be present in the TGT, and the ACE will never allow access.
532 armor_tgt = self.get_tgt(self._mach_creds)
533 self._get_tgt(client_creds, armor_tgt=armor_tgt,
534 expected_error=KDC_ERR_POLICY)
536 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
537 self._get_tgt(client_creds, armor_tgt=armor_tgt,
538 expected_error=KDC_ERR_POLICY)
540 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
541 self._get_tgt(client_creds, armor_tgt=armor_tgt,
542 expected_error=KDC_ERR_POLICY)
544 def test_allowed_from_enforced_silo_equals(self):
545 # Create an authentication policy that allows accounts belonging to the
547 policy = self.create_authn_policy(
550 f'O:SYD:(XA;;CR;;;WD;'
551 f'(@User.ad://ext/AuthenticationSilo == '
552 f'"{self._enforced_silo}"))'),
555 # Create a user account with the assigned policy.
556 client_creds = self._get_creds(account_type=self.AccountType.USER,
557 assigned_policy=policy)
559 # Show that we get a policy error if the machine account does not
560 # belong to the silo.
561 armor_tgt = self.get_tgt(self._mach_creds)
562 self._get_tgt(client_creds, armor_tgt=armor_tgt,
563 expected_error=KDC_ERR_POLICY)
565 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
566 self._get_tgt(client_creds, armor_tgt=armor_tgt,
567 expected_error=KDC_ERR_POLICY)
569 # Otherwise, authentication should succeed.
570 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
571 self._get_tgt(client_creds, armor_tgt=armor_tgt,
574 def test_allowed_from_unenforced_silo_not_equals(self):
575 # Create an authentication policy that allows accounts not belonging to
576 # the unenforced silo.
577 policy = self.create_authn_policy(
580 f'O:SYD:(XA;;CR;;;WD;'
581 f'(@User.ad://ext/AuthenticationSilo != '
582 f'"{self._unenforced_silo}"))'),
585 # Create a user account with the assigned policy.
586 client_creds = self._get_creds(account_type=self.AccountType.USER,
587 assigned_policy=policy)
589 # Show that authentication fails unless the account belongs to a silo
590 # other than the unenforced silo.
592 armor_tgt = self.get_tgt(self._mach_creds)
593 self._get_tgt(client_creds, armor_tgt=armor_tgt,
594 expected_error=KDC_ERR_POLICY)
596 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
597 self._get_tgt(client_creds, armor_tgt=armor_tgt,
598 expected_error=KDC_ERR_POLICY)
600 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
601 self._get_tgt(client_creds, armor_tgt=armor_tgt,
604 def test_allowed_from_enforced_silo_not_equals(self):
605 # Create an authentication policy that allows accounts not belonging to
607 policy = self.create_authn_policy(
610 f'O:SYD:(XA;;CR;;;WD;'
611 f'(@User.ad://ext/AuthenticationSilo != '
612 f'"{self._enforced_silo}"))'),
615 # Create a user account with the assigned policy.
616 client_creds = self._get_creds(account_type=self.AccountType.USER,
617 assigned_policy=policy)
619 # Show that authentication always fails, as none of the machine
620 # accounts belong to a silo that is not the enforced one. (The
621 # unenforced silo doesn’t count, as it will never appear in a claim.)
623 armor_tgt = self.get_tgt(self._mach_creds)
624 self._get_tgt(client_creds, armor_tgt=armor_tgt,
625 expected_error=KDC_ERR_POLICY)
627 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
628 self._get_tgt(client_creds, armor_tgt=armor_tgt,
629 expected_error=KDC_ERR_POLICY)
631 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
632 self._get_tgt(client_creds, armor_tgt=armor_tgt,
633 expected_error=KDC_ERR_POLICY)
635 def test_allowed_from_unenforced_silo_equals_deny(self):
636 # Create an authentication policy that denies accounts belonging to the
637 # unenforced silo, and allows other accounts.
638 policy = self.create_authn_policy(
641 f'O:SYD:(XD;;CR;;;WD;'
642 f'(@User.ad://ext/AuthenticationSilo == '
643 f'"{self._unenforced_silo}"))'
647 # Create a user account with the assigned policy.
648 client_creds = self._get_creds(account_type=self.AccountType.USER,
649 assigned_policy=policy)
651 # Show that authentication fails unless the account belongs to a silo
652 # other than the unenforced silo.
654 armor_tgt = self.get_tgt(self._mach_creds)
655 self._get_tgt(client_creds, armor_tgt=armor_tgt,
656 expected_error=KDC_ERR_POLICY)
658 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
659 self._get_tgt(client_creds, armor_tgt=armor_tgt,
660 expected_error=KDC_ERR_POLICY)
662 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
663 self._get_tgt(client_creds, armor_tgt=armor_tgt,
666 def test_allowed_from_enforced_silo_equals_deny(self):
667 # Create an authentication policy that denies accounts belonging to the
668 # enforced silo, and allows other accounts.
669 policy = self.create_authn_policy(
672 f'O:SYD:(XD;;CR;;;WD;'
673 f'(@User.ad://ext/AuthenticationSilo == '
674 f'"{self._enforced_silo}"))'
678 # Create a user account with the assigned policy.
679 client_creds = self._get_creds(account_type=self.AccountType.USER,
680 assigned_policy=policy)
682 # Show that authentication always fails, as none of the machine
683 # accounts belong to a silo that is not the enforced one. (The
684 # unenforced silo doesn’t count, as it will never appear in a claim.)
686 armor_tgt = self.get_tgt(self._mach_creds)
687 self._get_tgt(client_creds, armor_tgt=armor_tgt,
688 expected_error=KDC_ERR_POLICY)
690 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
691 self._get_tgt(client_creds, armor_tgt=armor_tgt,
692 expected_error=KDC_ERR_POLICY)
694 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
695 self._get_tgt(client_creds, armor_tgt=armor_tgt,
696 expected_error=KDC_ERR_POLICY)
698 def test_allowed_from_unenforced_silo_not_equals_deny(self):
699 # Create an authentication policy that denies accounts not belonging to
700 # the unenforced silo, and allows other accounts.
701 policy = self.create_authn_policy(
704 f'O:SYD:(XD;;CR;;;WD;'
705 f'(@User.ad://ext/AuthenticationSilo != '
706 f'"{self._unenforced_silo}"))'
710 # Create a user account with the assigned policy.
711 client_creds = self._get_creds(account_type=self.AccountType.USER,
712 assigned_policy=policy)
714 # Show that authentication always fails, as the unenforced silo will
715 # never appear in a claim.
717 armor_tgt = self.get_tgt(self._mach_creds)
718 self._get_tgt(client_creds, armor_tgt=armor_tgt,
719 expected_error=KDC_ERR_POLICY)
721 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
722 self._get_tgt(client_creds, armor_tgt=armor_tgt,
723 expected_error=KDC_ERR_POLICY)
725 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
726 self._get_tgt(client_creds, armor_tgt=armor_tgt,
727 expected_error=KDC_ERR_POLICY)
729 def test_allowed_from_enforced_silo_not_equals_deny(self):
730 # Create an authentication policy that denies accounts not belonging to
731 # the enforced silo, and allows other accounts.
732 policy = self.create_authn_policy(
735 f'O:SYD:(XD;;CR;;;WD;'
736 f'(@User.ad://ext/AuthenticationSilo != '
737 f'"{self._enforced_silo}"))'
741 # Create a user account with the assigned policy.
742 client_creds = self._get_creds(account_type=self.AccountType.USER,
743 assigned_policy=policy)
745 # Show that authentication fails unless the account belongs to the
748 armor_tgt = self.get_tgt(self._mach_creds)
749 self._get_tgt(client_creds, armor_tgt=armor_tgt,
750 expected_error=KDC_ERR_POLICY)
752 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
753 self._get_tgt(client_creds, armor_tgt=armor_tgt,
754 expected_error=KDC_ERR_POLICY)
756 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
757 self._get_tgt(client_creds, armor_tgt=armor_tgt,
760 def test_allowed_from_claim_equals_claim(self):
761 # Create a couple of claim types.
763 claim0_id = self.get_new_username()
764 self.create_claim(claim0_id,
766 attribute='carLicense',
769 for_classes=['computer'],
770 value_type=claims.CLAIM_TYPE_STRING)
772 claim1_id = self.get_new_username()
773 self.create_claim(claim1_id,
778 for_classes=['computer'],
779 value_type=claims.CLAIM_TYPE_STRING)
781 # Create an authentication policy that allows accounts having the two
783 policy = self.create_authn_policy(
786 f'O:SYD:(XA;;CR;;;WD;'
787 f'(@User.{claim0_id} == @User.{claim1_id}))'),
790 # Create a user account with the assigned policy.
791 client_creds = self._get_creds(account_type=self.AccountType.USER,
792 assigned_policy=policy)
794 armor_tgt = self.get_tgt(self._mach_creds)
795 self._get_tgt(client_creds, armor_tgt=armor_tgt,
796 expected_error=KDC_ERR_POLICY)
798 mach_creds = self.get_cached_creds(
799 account_type=self.AccountType.COMPUTER,
801 'additional_details': (
802 ('carLicense', 'foo'),
806 armor_tgt = self.get_tgt(
808 expect_client_claims=True,
809 expected_client_claims={
811 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
812 'type': claims.CLAIM_TYPE_STRING,
816 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
817 'type': claims.CLAIM_TYPE_STRING,
821 self._get_tgt(client_creds, armor_tgt=armor_tgt,
824 def test_allowed_to_client_equals(self):
825 client_claim_attr = 'carLicense'
826 client_claim_value = 'foo bar'
827 client_claim_values = client_claim_value,
829 client_claim_id = self.get_new_username()
830 self.create_claim(client_claim_id,
832 attribute=client_claim_attr,
835 for_classes=['user'],
836 value_type=claims.CLAIM_TYPE_STRING)
838 # Create an authentication policy that allows authorization if the
839 # client has a particular claim value.
840 policy = self.create_authn_policy(
842 computer_allowed_to=(
843 f'O:SYD:(XA;;CR;;;WD;'
844 f'((@User.{client_claim_id} == "{client_claim_value}")))'),
847 # Create a computer account with the assigned policy.
848 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
849 assigned_policy=policy)
851 armor_tgt = self.get_tgt(self._mach_creds)
853 # Create a user account without the claim value.
854 client_creds = self.get_cached_creds(
855 account_type=self.AccountType.USER)
856 tgt = self.get_tgt(client_creds)
857 # Show that obtaining a service ticket is denied.
859 tgt, KDC_ERR_POLICY, client_creds, target_creds,
861 expect_edata=self.expect_padata_outer,
862 # We aren’t particular about whether or not we get an NTSTATUS.
864 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
867 # Create a user account with the claim value.
868 client_creds = self.get_cached_creds(
869 account_type=self.AccountType.USER,
871 'additional_details': (
872 (client_claim_attr, client_claim_values),
877 expect_client_claims=True,
878 expected_client_claims={
880 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
881 'type': claims.CLAIM_TYPE_STRING,
882 'values': client_claim_values,
885 # Show that obtaining a service ticket is allowed.
886 self._tgs_req(tgt, 0, client_creds, target_creds,
889 def test_allowed_to_device_equals(self):
890 device_claim_attr = 'carLicense'
891 device_claim_value = 'bar'
892 device_claim_values = device_claim_value,
894 device_claim_id = self.get_new_username()
895 self.create_claim(device_claim_id,
897 attribute=device_claim_attr,
900 for_classes=['computer'],
901 value_type=claims.CLAIM_TYPE_STRING)
903 # Create a user account.
904 client_creds = self.get_cached_creds(
905 account_type=self.AccountType.USER)
906 tgt = self.get_tgt(client_creds)
908 # Create an authentication policy that allows authorization if the
909 # device has a particular claim value.
910 policy = self.create_authn_policy(
912 computer_allowed_to=(
913 f'O:SYD:(XA;;CR;;;WD;'
914 f'(@Device.{device_claim_id} == "{device_claim_value}"))'),
917 # Create a computer account with the assigned policy.
918 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
919 assigned_policy=policy)
921 armor_tgt = self.get_tgt(self._mach_creds)
922 # Show that obtaining a service ticket is denied when the claim value
925 tgt, KDC_ERR_POLICY, client_creds, target_creds,
927 expect_edata=self.expect_padata_outer,
928 # We aren’t particular about whether or not we get an NTSTATUS.
930 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
933 mach_creds = self.get_cached_creds(
934 account_type=self.AccountType.COMPUTER,
936 'additional_details': (
937 (device_claim_attr, device_claim_values),
940 armor_tgt = self.get_tgt(
942 expect_client_claims=True,
943 expected_client_claims={
945 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
946 'type': claims.CLAIM_TYPE_STRING,
947 'values': device_claim_values,
950 # Show that obtaining a service ticket is allowed when the claim value
952 self._tgs_req(tgt, 0, client_creds, target_creds,
955 claim_against_claim_cases = [
956 # If either side is missing, the result is unknown.
957 ((), '==', (), None),
958 ((), '!=', (), None),
959 ('a', '==', (), None),
960 ((), '==', 'b', None),
961 # Straightforward equality and inequality checks work.
962 ('foo', '==', 'foo', True),
963 ('foo', '==', 'bar', False),
964 ('foo', '!=', 'foo', False),
965 ('foo', '!=', 'bar', True),
966 # We can perform less‐than and greater‐than operations.
967 ('cat', '<', 'dog', True),
968 ('cat', '<=', 'dog', True),
969 ('cat', '>', 'dog', False),
970 ('cat', '>=', 'dog', False),
971 ('foo', '<=', 'foo', True),
972 ('foo', '>=', 'foo', True),
973 ('foo', '<', 'foo bar', True),
974 ('foo bar', '>', 'foo', True),
975 # String comparison is case‐sensitive.
976 ('foo bar', '==', 'Foo BAR', True),
977 ('foo bar', '==', 'FOO BAR', True),
978 ('ćàț', '==', 'ĆÀȚ', True),
979 ('ḽ', '==', 'Ḽ', True),
980 ('ⅸ', '==', 'Ⅸ', True),
981 ('ꙭ', '==', 'Ꙭ', True),
982 ('ⱦ', '==', 'Ⱦ', True), # Lowercased variant added in Unicode 5.0.
983 ('ԛԣ', '==', 'ԚԢ', True), # All added in Unicode 5.1.
984 ('foo', '<', 'foo', True),
985 ('ćàș', '<', 'ĆÀȚ', True),
986 ('cat', '<', 'ćàț', True),
987 # This is done by converting to UPPER CASE. Hence, both ‘A’ (U+41) and
988 # ‘a’ (U+61) compare less than ‘_’ (U+5F).
989 ('A', '<', '_', True),
990 ('a', '<', '_', True),
991 # But not all uppercased/lowercased pairs are considered to be equal in
993 ('ß', '<', 'ẞ', True),
994 ('ß', '>', 'SS', True),
995 ('ⳬ', '>', 'Ⳬ', True), # Added in Unicode 5.2.
996 ('ʞ', '<', 'Ʞ', True), # Uppercased variant added in Unicode 6.0.
997 ('ʞ', '<', 'ʟ', True), # U+029E < U+029F < U+A7B0 (upper variant, Ʞ)
998 ('ꞧ', '>', 'Ꞧ', True), # Added in Unicode 6.0.
999 ('ɜ', '<', 'Ɜ', True), # Uppercased variant added in Unicode 7.0.
1001 # Strings are compared as UTF‐16 code units, rather than as Unicode
1002 # codepoints. So while you might expect ‘𐀀’ (U+10000) to compare
1003 # greater than ‘豈’ (U+F900), it is actually considered to be the
1004 # *smaller* of the pair. That is because it is encoded as a sequence of
1005 # two code units, 0xd800 and 0xdc00, which combination compares less
1006 # than the single code unit 0xf900.
1007 ('ퟻ', '<', '𐀀', True),
1008 ('𐀀', '<', '豈', True),
1009 ('ퟻ', '<', '豈', True),
1010 # Composites can be compared.
1011 (('foo', 'bar'), '==', ('foo', 'bar'), True),
1012 (('foo', 'bar'), '==', ('foo', 'baz'), False),
1013 # The individual components don’t have to match in case.
1014 (('foo', 'bar'), '==', ('FOO', 'BAR'), True),
1015 # Nor must they match in order.
1016 (('foo', 'bar'), '==', ('bar', 'foo'), True),
1017 # Composites of different lengths compare unequal.
1018 (('foo', 'bar'), '!=', 'foo', True),
1019 (('foo', 'bar'), '!=', ('foo', 'bar', 'baz'), True),
1020 # But composites don’t have a defined ordering, and aren’t considered
1021 # greater or lesser than one another.
1022 (('foo', 'bar'), '<', ('foo', 'bar'), None),
1023 (('foo', 'bar'), '<=', ('foo', 'bar'), None),
1024 (('foo', 'bar'), '>', ('foo', 'bar', 'baz'), None),
1025 (('foo', 'bar'), '>=', ('foo', 'bar', 'baz'), None),
1026 # We can test for containment.
1027 (('foo', 'bar'), 'Contains', ('FOO'), True),
1028 (('foo', 'bar'), 'Contains', ('foo', 'bar'), True),
1029 (('foo', 'bar'), 'Not_Contains', ('foo', 'bar'), False),
1030 (('foo', 'bar'), 'Contains', ('foo', 'bar', 'baz'), False),
1031 (('foo', 'bar'), 'Not_Contains', ('foo', 'bar', 'baz'), True),
1032 # We can test whether the operands have any elements in common.
1033 ('foo', 'Any_of', 'foo', True),
1034 (('foo', 'bar'), 'Any_of', 'BAR', True),
1035 (('foo', 'bar'), 'Any_of', 'baz', False),
1036 (('foo', 'bar'), 'Not_Any_of', 'baz', True),
1037 (('foo', 'bar'), 'Any_of', ('bar', 'baz'), True),
1038 (('foo', 'bar'), 'Not_Any_of', ('bar', 'baz'), False),
1041 claim_against_literal_cases = [
1042 # String comparisons also work against literals.
1043 ('foo bar', '==', '"foo bar"', True),
1044 # Composites can be compared with literals.
1045 ('bar', '==', '{{"bar"}}', True),
1046 (('apple', 'banana'), '==', '{{"APPLE", "BANANA"}}', True),
1047 (('apple', 'banana'), '==', '{{"BANANA", "APPLE"}}', True),
1048 (('apple', 'banana'), '==', '{{"apple", "banana", "apple"}}', False),
1049 # We can test for containment.
1050 ((), 'Contains', '{{"foo"}}', None),
1051 ((), 'Not_Contains', '{{"foo", "bar"}}', None),
1052 ('bar', 'Contains', '{{"bar"}}', True),
1053 (('foo', 'bar'), 'Contains', '{{"foo", "bar"}}', True),
1054 (('foo', 'bar'), 'Contains', '{{"foo", "bar", "baz"}}', False),
1055 # The right‐hand side of Contains or Not_Contains does not have to be a
1057 ('foo', 'Contains', '"foo"', True),
1058 (('foo', 'bar'), 'Not_Contains', '"foo"', False),
1059 # It’s fine if the right‐hand side contains duplicate elements.
1060 (('foo', 'bar'), 'Contains', '{{"foo", "bar", "bar"}}', True),
1061 # We can test whether the operands have any elements in common.
1062 ('bar', 'Any_of', '{{"bar"}}', True),
1063 (('foo', 'bar'), 'Any_of', '{{"bar", "baz"}}', True),
1064 (('foo', 'bar'), 'Any_of', '{{"baz"}}', False),
1065 # The right‐hand side of Any_of or Not_Any_of must be a composite.
1066 ('foo', 'Any_of', '"foo"', None),
1067 (('foo', 'bar'), 'Not_Any_of', '"baz"', None),
1068 # A string won’t compare equal to a numeric literal.
1069 ('42', '==', '"42"', True),
1070 ('42', '==', '42', None),
1071 # Nor can composites that mismatch in type be compared.
1072 (('123', '456'), '==', '{{"123", "456"}}', True),
1073 (('654', '321'), '==', '{{654, 321}}', None),
1074 (('foo', 'bar'), 'Contains', '{{1, 2, 3}}', None),
1077 def _test_cmp_with_args(self, lhs, op, rhs, outcome, rhs_is_literal=False):
1078 # Construct a conditional ACE expression that evaluates to True if the
1079 # two claim values are equal.
1081 self.assertIsInstance(rhs, str)
1082 rhs = rhs.format(self=self)
1083 expression = f'(@User.{self.claim0_id} {op} {rhs})'
1085 expression = f'(@User.{self.claim0_id} {op} @User.{self.claim1_id})'
1087 # Create an authentication policy that will allow authentication when
1088 # the expression is true, and a second that will deny authentication in
1089 # the same circumstance. By observing the results of authenticating
1090 # against each of these policies in turn, we can determine whether the
1091 # expression evaluates to a True, False, or Unknown value.
1093 allowed_sddl = f'O:SYD:(XA;;CR;;;WD;{expression})'
1094 denied_sddl = f'O:SYD:(XD;;CR;;;WD;{expression})(A;;CR;;;WD)'
1096 allowed_policy = self.create_authn_policy(
1098 user_allowed_from=allowed_sddl)
1099 denied_policy = self.create_authn_policy(
1101 user_allowed_from=denied_sddl)
1103 # Create a user account assigned to each policy.
1104 allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1105 assigned_policy=allowed_policy)
1106 denied_creds = self._get_creds(account_type=self.AccountType.USER,
1107 assigned_policy=denied_policy)
1109 additional_details = ()
1111 additional_details += ((self.claim0_attr, lhs),)
1112 if rhs and not rhs_is_literal:
1113 additional_details += ((self.claim1_attr, rhs),)
1115 # Create a computer account with the provided attribute values.
1116 mach_creds = self.get_cached_creds(
1117 account_type=self.AccountType.COMPUTER,
1118 opts={'additional_details': additional_details})
1120 def expected_values(val):
1121 if isinstance(val, (str, bytes)):
1126 expected_client_claims = {}
1128 expected_client_claims[self.claim0_id] = {
1129 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1130 'type': claims.CLAIM_TYPE_STRING,
1131 'values': expected_values(lhs),
1133 if rhs and not rhs_is_literal:
1134 expected_client_claims[self.claim1_id] = {
1135 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1136 'type': claims.CLAIM_TYPE_STRING,
1137 'values': expected_values(rhs),
1140 # Fetch the computer account’s TGT, and ensure it contains the claims.
1141 armor_tgt = self.get_tgt(
1143 expect_client_claims=bool(expected_client_claims) or None,
1144 expected_client_claims=expected_client_claims)
1146 # The first or the second authentication request is expected to succeed
1147 # if the outcome is True or False, respectively. An Unknown outcome,
1148 # represented by None, will result in a policy error in either case.
1149 allowed_error = 0 if outcome is True else KDC_ERR_POLICY
1150 denied_error = 0 if outcome is False else KDC_ERR_POLICY
1152 # Attempt to authenticate and ensure that we observe the expected
1154 self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1155 expected_error=allowed_error)
1156 self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1157 expected_error=denied_error)
1160 # Test a very simple expression with various claims.
1162 (claims.CLAIMS_SOURCE_TYPE_AD, [
1163 ('{non_empty_string}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1165 ], '{non_empty_string}', True),
1167 (claims.CLAIMS_SOURCE_TYPE_AD, [
1168 ('{zero_uint}', claims.CLAIM_TYPE_UINT64, [0]),
1170 ], '{zero_uint}', False),
1172 (claims.CLAIMS_SOURCE_TYPE_AD, [
1173 ('{nonzero_uint}', claims.CLAIM_TYPE_UINT64, [1]),
1175 ], '{nonzero_uint}', True),
1177 (claims.CLAIMS_SOURCE_TYPE_AD, [
1178 ('{zero_uints}', claims.CLAIM_TYPE_UINT64, [0, 0]),
1180 ], '{zero_uints}', KDC_ERR_GENERIC),
1182 (claims.CLAIMS_SOURCE_TYPE_AD, [
1183 ('{zero_and_one_uint}', claims.CLAIM_TYPE_UINT64, [0, 1]),
1185 ], '{zero_and_one_uint}', True),
1187 (claims.CLAIMS_SOURCE_TYPE_AD, [
1188 ('{one_and_zero_uint}', claims.CLAIM_TYPE_UINT64, [1, 0]),
1190 ], '{one_and_zero_uint}', True),
1192 (claims.CLAIMS_SOURCE_TYPE_AD, [
1193 ('{zero_int}', claims.CLAIM_TYPE_INT64, [0]),
1195 ], '{zero_int}', False),
1197 (claims.CLAIMS_SOURCE_TYPE_AD, [
1198 ('{nonzero_int}', claims.CLAIM_TYPE_INT64, [1]),
1200 ], '{nonzero_int}', True),
1202 (claims.CLAIMS_SOURCE_TYPE_AD, [
1203 ('{zero_ints}', claims.CLAIM_TYPE_INT64, [0, 0]),
1205 ], '{zero_ints}', KDC_ERR_GENERIC),
1207 (claims.CLAIMS_SOURCE_TYPE_AD, [
1208 ('{zero_and_one_int}', claims.CLAIM_TYPE_INT64, [0, 1]),
1210 ], '{zero_and_one_int}', True),
1212 (claims.CLAIMS_SOURCE_TYPE_AD, [
1213 ('{one_and_zero_int}', claims.CLAIM_TYPE_INT64, [1, 0]),
1215 ], '{one_and_zero_int}', True),
1217 (claims.CLAIMS_SOURCE_TYPE_AD, [
1218 ('{false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1220 ], '{false_boolean}', False),
1222 (claims.CLAIMS_SOURCE_TYPE_AD, [
1223 ('{true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1225 ], '{true_boolean}', True),
1227 (claims.CLAIMS_SOURCE_TYPE_AD, [
1228 ('{false_booleans}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1230 ], '{false_booleans}', KDC_ERR_GENERIC),
1232 (claims.CLAIMS_SOURCE_TYPE_AD, [
1233 ('{false_and_true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0, 1]),
1235 ], '{false_and_true_boolean}', True),
1237 (claims.CLAIMS_SOURCE_TYPE_AD, [
1238 ('{true_and_false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1, 0]),
1240 ], '{true_and_false_boolean}', True),
1241 # Test a basic comparison against a literal.
1243 (claims.CLAIMS_SOURCE_TYPE_AD, [
1244 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1246 ], '{a} == "foo bar"', True),
1247 # Claims can be compared against one another.
1249 (claims.CLAIMS_SOURCE_TYPE_AD, [
1250 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1251 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO BAR']),
1253 ], '{a} == {b}', True),
1255 (claims.CLAIMS_SOURCE_TYPE_AD, [
1256 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO', 'BAR', 'BAZ']),
1257 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'bar', 'baz']),
1259 ], '{a} != {b}', False),
1260 # Certificate claims are also valid.
1262 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1263 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1265 ], '{a} == "foo"', True),
1266 # Other claim source types are ignored.
1269 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1271 ], '{a} == "foo"', None),
1274 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1276 ], '{a} == "foo"', None),
1277 # If multiple claims have the same ID, the *last* one takes precedence.
1279 (claims.CLAIMS_SOURCE_TYPE_AD, [
1280 ('{a}', claims.CLAIM_TYPE_STRING, ['this is not the value…']),
1281 ('{a}', claims.CLAIM_TYPE_STRING, ['…nor is this…']),
1283 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1284 ('{a}', claims.CLAIM_TYPE_STRING, ['…and this isn’t either.']),
1286 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1287 ('{a}', claims.CLAIM_TYPE_STRING, ['here’s the actual value!']),
1290 ('{a}', claims.CLAIM_TYPE_STRING, ['this is a red herring.']),
1292 ], '{a} == "here’s the actual value!"', True),
1293 # Claim values can be empty.
1295 (claims.CLAIMS_SOURCE_TYPE_AD, [
1296 ('{empty_claim_string}', claims.CLAIM_TYPE_STRING, []),
1298 ], '{empty_claim_string} != "foo bar"', None),
1300 (claims.CLAIMS_SOURCE_TYPE_AD, [
1301 ('{empty_claim_boolean}', claims.CLAIM_TYPE_BOOLEAN, []),
1303 ], 'Exists {empty_claim_boolean}', None),
1304 # Test unsigned integer equality.
1306 (claims.CLAIMS_SOURCE_TYPE_AD, [
1307 ('{a}', claims.CLAIM_TYPE_UINT64, [42]),
1309 ], '{a} == 42', True),
1311 (claims.CLAIMS_SOURCE_TYPE_AD, [
1312 ('{a}', claims.CLAIM_TYPE_UINT64, [0]),
1314 ], '{a} == 3', False),
1316 (claims.CLAIMS_SOURCE_TYPE_AD, [
1317 ('{a}', claims.CLAIM_TYPE_UINT64, [1, 2, 3]),
1319 ], '{a} == {{1, 2, 3}}', True),
1321 (claims.CLAIMS_SOURCE_TYPE_AD, [
1322 ('{a}', claims.CLAIM_TYPE_UINT64, [4, 5, 6]),
1324 ], '{a} != {{1, 2, 3}}', True),
1325 # Test unsigned integer comparison. Ensure we don’t run into any
1326 # integer overflow issues.
1328 (claims.CLAIMS_SOURCE_TYPE_AD, [
1329 ('{a}', claims.CLAIM_TYPE_UINT64, [1 << 32]),
1331 ], '{a} > 0', True),
1332 # Test signed integer comparisons.
1334 (claims.CLAIMS_SOURCE_TYPE_AD, [
1335 ('{a}', claims.CLAIM_TYPE_INT64, [42]),
1337 ], '{a} == 42', True),
1339 (claims.CLAIMS_SOURCE_TYPE_AD, [
1340 ('{a}', claims.CLAIM_TYPE_INT64, [42 << 32]),
1342 ], f'{{a}} == {42 << 32}', True),
1343 # Test boolean claims. Be careful! Windows will *crash* if you send it
1344 # claims that aren’t real booleans (not 0 or 1). I doubt Microsoft will
1345 # consider this a security issue though.
1347 (claims.CLAIMS_SOURCE_TYPE_AD, [
1348 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [2]),
1349 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [3]),
1351 ], '{a} == {b}', (None, CRASHES_WINDOWS)),
1353 (claims.CLAIMS_SOURCE_TYPE_AD, [
1354 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1355 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1357 ], '{a} == {b}', True),
1359 (claims.CLAIMS_SOURCE_TYPE_AD, [
1360 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1362 ], '{a} == 42', None),
1364 (claims.CLAIMS_SOURCE_TYPE_AD, [
1365 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1366 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1368 ], '{a} && {b}', True),
1370 (claims.CLAIMS_SOURCE_TYPE_AD, [
1371 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1372 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1374 ], '{a} && {b}', False),
1376 (claims.CLAIMS_SOURCE_TYPE_AD, [
1377 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1378 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1380 ], '{a} && {b}', False),
1382 (claims.CLAIMS_SOURCE_TYPE_AD, [
1383 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1384 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1386 ], '{a} || {b}', True),
1388 (claims.CLAIMS_SOURCE_TYPE_AD, [
1389 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1390 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1392 ], '{a} || {b}', True),
1394 (claims.CLAIMS_SOURCE_TYPE_AD, [
1395 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1396 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1398 ], '{a} || {b}', False),
1400 (claims.CLAIMS_SOURCE_TYPE_AD, [
1401 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1405 (claims.CLAIMS_SOURCE_TYPE_AD, [
1406 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1408 ], '!(!(!(!({a}))))', False),
1410 (claims.CLAIMS_SOURCE_TYPE_AD, [
1411 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1413 ], '!({a} && {a})', True),
1415 (claims.CLAIMS_SOURCE_TYPE_AD, [
1416 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1417 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1419 ], '{a} && !({b} || {b})', True),
1421 (claims.CLAIMS_SOURCE_TYPE_AD, [
1422 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1424 ], '!({a}) || !({a})', True),
1426 (claims.CLAIMS_SOURCE_TYPE_AD, [
1427 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1428 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1430 ], '{a} && !({b})', None),
1431 # Expressions containing the ‘not’ operator are occasionally evaluated
1432 # inconsistently, as evidenced here. ‘a || !a’ evaluates to ‘unknown’…
1434 (claims.CLAIMS_SOURCE_TYPE_AD, [
1435 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1437 ], '{a} || !({a})', None),
1438 # …but ‘!a || a’ — the same expression, just with the operands switched
1439 # round — evaluates to ‘true’.
1441 (claims.CLAIMS_SOURCE_TYPE_AD, [
1442 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1444 ], '!({a}) || {a}', True),
1445 # This inconsistency is not observed with other boolean expressions,
1448 (claims.CLAIMS_SOURCE_TYPE_AD, [
1449 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1451 ], '{a} || ({a} || {a})', True),
1453 (claims.CLAIMS_SOURCE_TYPE_AD, [
1454 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1456 ], '({b} || {b}) || {b}', True),
1457 # Test a very large claim. Much larger than this, and
1458 # conditional_ace_encode_binary() will refuse to encode the conditions.
1460 (claims.CLAIMS_SOURCE_TYPE_AD, [
1461 ('{large_claim}', claims.CLAIM_TYPE_STRING, ['z' * 4900]),
1463 ], f'{{large_claim}} == "{"z" * 4900}"', True),
1464 # Test an even larger claim. Windows does not appear to like receiving
1465 # a claim this large.
1467 (claims.CLAIMS_SOURCE_TYPE_AD, [
1468 ('{larger_claim}', claims.CLAIM_TYPE_STRING, ['z' * 100000]),
1470 ], '{larger_claim} > "z"', (True, CRASHES_WINDOWS)),
1471 # Test a great number of claims. Windows does not appear to like
1472 # receiving this many claims.
1474 (claims.CLAIMS_SOURCE_TYPE_AD, [
1475 ('{many_claims}', claims.CLAIM_TYPE_UINT64,
1476 list(range(0, 100000))),
1478 ], '{many_claims} Any_of "99999"', (True, CRASHES_WINDOWS)),
1479 # Test a claim with a very long name. Much larger than this, and
1480 # conditional_ace_encode_binary() will refuse to encode the conditions.
1482 (claims.CLAIMS_SOURCE_TYPE_AD, [
1483 ('{long_name}', claims.CLAIM_TYPE_STRING, ['a']),
1485 ], '{long_name} == "a"', {'long_name': 'z' * 4900}, True),
1486 # Test attribute name escaping.
1488 (claims.CLAIMS_SOURCE_TYPE_AD, [
1489 ('{escaped_claim}', claims.CLAIM_TYPE_STRING, ['claim value']),
1491 ], '{escaped_claim} == "claim value"',
1492 {'escaped_claim': '(:foo:! /&/ :bar:!)'}, True),
1493 # Test a claim whose name consists entirely of dots.
1495 (claims.CLAIMS_SOURCE_TYPE_AD, [
1496 ('{dotty_claim}', claims.CLAIM_TYPE_STRING, ['a']),
1498 ], '{dotty_claim} == "a"', {'dotty_claim': '...'}, True),
1499 # Test a claim whose name consists of the first thousand non‐zero
1500 # Unicode codepoints.
1502 (claims.CLAIMS_SOURCE_TYPE_AD, [
1503 ('{1000_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1505 ], '{1000_unicode} == "a"',
1506 {'1000_unicode': ''.join(map(chr, range(1, 1001)))}, True),
1507 # Test a claim whose name consists of some higher Unicode codepoints,
1508 # including non‐BMP ones.
1510 (claims.CLAIMS_SOURCE_TYPE_AD, [
1511 ('{higher_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1513 ], '{higher_unicode} == "a"',
1514 {'higher_unicode': ''.join(map(chr, range(0xfe00, 0x10800)))}, True),
1515 # Duplicate claim values are not allowed…
1517 (claims.CLAIMS_SOURCE_TYPE_AD, [
1518 ('{a}', claims.CLAIM_TYPE_INT64, [42, 42, 42]),
1520 ], '{a} == {a}', KDC_ERR_GENERIC),
1522 (claims.CLAIMS_SOURCE_TYPE_AD, [
1523 ('{a}', claims.CLAIM_TYPE_UINT64, [42, 42]),
1525 ], '{a} == {a}', KDC_ERR_GENERIC),
1527 (claims.CLAIMS_SOURCE_TYPE_AD, [
1528 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'foo']),
1530 ], '{a} == {a}', KDC_ERR_GENERIC),
1532 (claims.CLAIMS_SOURCE_TYPE_AD, [
1533 ('{a}', claims.CLAIM_TYPE_STRING, ['FOO', 'foo']),
1535 ], '{a} == {a}', KDC_ERR_GENERIC),
1537 (claims.CLAIMS_SOURCE_TYPE_AD, [
1538 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1540 ], '{a} == {a}', KDC_ERR_GENERIC),
1541 # …but it’s OK if duplicate values are spread across multiple claim
1544 (claims.CLAIMS_SOURCE_TYPE_AD, [
1545 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1546 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1548 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1549 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1550 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1552 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1553 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1554 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1555 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1556 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1558 ], '{dup} == {dup}', True),
1559 # Test invalid claim types. Be careful! Windows will *crash* if you
1560 # send it invalid claim types. I doubt Microsoft will consider this a
1561 # security issue though.
1563 (claims.CLAIMS_SOURCE_TYPE_AD, [
1564 ('{invalid_sid}', 5, []),
1566 ], '{invalid_sid} == {invalid_sid}', (None, CRASHES_WINDOWS)),
1568 (claims.CLAIMS_SOURCE_TYPE_AD, [
1569 ('{invalid_octet_string}', 16, []),
1571 ], '{invalid_octet_string} == {invalid_octet_string}', (None, CRASHES_WINDOWS)),
1572 # Sending an empty string will crash Windows.
1574 (claims.CLAIMS_SOURCE_TYPE_AD, [
1575 ('{empty_string}', claims.CLAIM_TYPE_STRING, ['']),
1577 ], '{empty_string}', (None, CRASHES_WINDOWS)),
1578 # But sending empty arrays is OK.
1580 (claims.CLAIMS_SOURCE_TYPE_AD, [
1581 ('{empty_array}', claims.CLAIM_TYPE_INT64, []),
1582 ('{empty_array}', claims.CLAIM_TYPE_UINT64, []),
1583 ('{empty_array}', claims.CLAIM_TYPE_BOOLEAN, []),
1584 ('{empty_array}', claims.CLAIM_TYPE_STRING, []),
1586 ], '{empty_array}', None),
1589 def _test_pac_claim_cmp_with_args(self,
1594 self.assertIsInstance(expression, str)
1597 outcome, crashes_windows = outcome
1598 self.assertIs(crashes_windows, CRASHES_WINDOWS)
1599 if not self.crash_windows:
1600 self.skipTest('test crashes Windows servers')
1602 self.assertIsNot(outcome, CRASHES_WINDOWS)
1604 if claim_map is None:
1609 def get_claim_id(claim_name):
1610 claim = claim_ids.get(claim_name)
1612 claim = claim_map.pop(claim_name, None)
1614 claim = self.get_new_username()
1616 claim_ids[claim_name] = claim
1620 def formatted_claim_expression(expr):
1621 formatter = Formatter()
1624 for literal_text, field_name, format_spec, conversion in (
1625 formatter.parse(expr)):
1626 self.assertFalse(format_spec,
1627 f'format specifier ({format_spec}) should '
1628 f'not be specified')
1629 self.assertFalse(conversion,
1630 f'conversion ({conversion}) should not be '
1633 result.append(literal_text)
1635 if field_name is not None:
1636 self.assertTrue(field_name,
1637 'a field name should be specified')
1639 claim_id = get_claim_id(field_name)
1640 claim_id = self.escaped_claim_id(claim_id)
1641 result.append(f'@User.{claim_id}')
1643 return ''.join(result)
1645 # Construct the conditional ACE expression.
1646 expression = formatted_claim_expression(expression)
1648 self.assertFalse(claim_map, 'unused claim mapping(s) remain')
1650 # Create an authentication policy that will allow authentication when
1651 # the expression is true, and a second that will deny authentication in
1652 # the same circumstance. By observing the results of authenticating
1653 # against each of these policies in turn, we can determine whether the
1654 # expression evaluates to a True, False, or Unknown value.
1656 allowed_sddl = f'O:SYD:(XA;;CR;;;WD;({expression}))'
1657 denied_sddl = f'O:SYD:(XD;;CR;;;WD;({expression}))(A;;CR;;;WD)'
1659 allowed_policy = self.create_authn_policy(
1661 user_allowed_from=allowed_sddl)
1662 denied_policy = self.create_authn_policy(
1664 user_allowed_from=denied_sddl)
1666 # Create a user account assigned to each policy.
1667 allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1668 assigned_policy=allowed_policy)
1669 denied_creds = self._get_creds(account_type=self.AccountType.USER,
1670 assigned_policy=denied_policy)
1672 # Create a computer account.
1673 mach_creds = self.get_cached_creds(
1674 account_type=self.AccountType.COMPUTER)
1676 def expected_values(val):
1677 if isinstance(val, (str, bytes)):
1682 # Fetch the computer account’s TGT.
1683 armor_tgt = self.get_tgt(mach_creds)
1686 # Replace the claims in the PAC with our own.
1687 armor_tgt = self.modified_ticket(
1689 modify_pac_fn=partial(self.set_pac_claims,
1690 client_claims=pac_claims,
1691 claim_ids=claim_ids),
1692 checksum_keys=self.get_krbtgt_checksum_key())
1694 # The first or the second authentication request is expected to succeed
1695 # if the outcome is True or False, respectively. An Unknown outcome,
1696 # represented by None, will result in a policy error in either case.
1698 allowed_error, denied_error = 0, KDC_ERR_POLICY
1699 elif outcome is False:
1700 allowed_error, denied_error = KDC_ERR_POLICY, 0
1701 elif outcome is None:
1702 allowed_error, denied_error = KDC_ERR_POLICY, KDC_ERR_POLICY
1704 allowed_error, denied_error = outcome, outcome
1706 # Attempt to authenticate and ensure that we observe the expected
1708 self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1709 expected_error=allowed_error)
1710 self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1711 expected_error=denied_error)
1713 def test_rbcd_without_aa_asserted_identity(self):
1715 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1716 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1719 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1720 service_sids=service_sids,
1721 code=KDC_ERR_BADOPTION,
1722 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1723 edata=self.expect_padata_outer)
1725 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1726 service_sids=service_sids,
1727 code=KDC_ERR_POLICY,
1728 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1729 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1730 reason=AuditReason.ACCESS_DENIED,
1731 edata=self.expect_padata_outer)
1733 def test_rbcd_with_aa_asserted_identity(self):
1735 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1736 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1737 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1740 expected_groups = service_sids | {
1741 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1744 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1745 service_sids=service_sids,
1746 expected_groups=expected_groups)
1748 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1749 service_sids=service_sids,
1750 expected_groups=expected_groups)
1752 def test_rbcd_without_service_asserted_identity(self):
1754 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1755 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1758 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1759 service_sids=service_sids,
1760 code=KDC_ERR_BADOPTION,
1761 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1762 edata=self.expect_padata_outer)
1764 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1765 service_sids=service_sids,
1766 code=KDC_ERR_POLICY,
1767 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1768 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1769 reason=AuditReason.ACCESS_DENIED,
1770 edata=self.expect_padata_outer)
1772 def test_rbcd_with_service_asserted_identity(self):
1774 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1775 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1776 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1780 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1781 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1782 # The Application Authority Asserted Identity SID has replaced the
1783 # Service Asserted Identity SID.
1784 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1785 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1788 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1789 service_sids=service_sids,
1790 expected_groups=expected_groups)
1792 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1793 service_sids=service_sids,
1794 expected_groups=expected_groups)
1796 def test_rbcd_without_claims_valid(self):
1798 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1799 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1802 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1803 service_sids=service_sids,
1804 code=KDC_ERR_BADOPTION,
1805 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1806 edata=self.expect_padata_outer)
1808 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1809 service_sids=service_sids,
1810 code=KDC_ERR_POLICY,
1811 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1812 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1813 reason=AuditReason.ACCESS_DENIED,
1814 edata=self.expect_padata_outer)
1816 def test_rbcd_with_claims_valid(self):
1818 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1819 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1820 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1823 expected_groups = service_sids | {
1824 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1827 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1828 service_sids=service_sids,
1829 expected_groups=expected_groups)
1831 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1832 service_sids=service_sids,
1833 expected_groups=expected_groups)
1835 def test_rbcd_without_compounded_authentication(self):
1837 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1838 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1841 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1842 service_sids=service_sids,
1843 code=KDC_ERR_BADOPTION,
1844 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1845 edata=self.expect_padata_outer)
1847 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1848 service_sids=service_sids,
1849 code=KDC_ERR_POLICY,
1850 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1851 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1852 reason=AuditReason.ACCESS_DENIED,
1853 edata=self.expect_padata_outer)
1855 def test_rbcd_with_compounded_authentication(self):
1857 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1858 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1859 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1863 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1864 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1865 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1866 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1869 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1870 service_sids=service_sids,
1871 expected_groups=expected_groups)
1873 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1874 service_sids=service_sids,
1875 expected_groups=expected_groups)
1877 def test_rbcd_client_without_aa_asserted_identity(self):
1879 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1880 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1883 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1884 client_sids=client_sids)
1886 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1887 client_sids=client_sids)
1889 def test_rbcd_client_with_aa_asserted_identity(self):
1891 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1892 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1893 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1896 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1897 client_sids=client_sids,
1898 expected_groups=client_sids)
1900 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1901 client_sids=client_sids,
1902 expected_groups=client_sids)
1904 def test_rbcd_client_without_service_asserted_identity(self):
1906 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1907 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1910 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1911 client_sids=client_sids,
1912 code=KDC_ERR_BADOPTION,
1913 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1914 edata=self.expect_padata_outer)
1916 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1917 client_sids=client_sids,
1918 code=KDC_ERR_POLICY,
1919 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1920 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1921 reason=AuditReason.ACCESS_DENIED,
1922 edata=self.expect_padata_outer)
1924 def test_rbcd_client_with_service_asserted_identity(self):
1926 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1927 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1928 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1931 self._rbcd(f'Not_Member_of SID({self.service_asserted_identity})',
1932 client_sids=client_sids,
1933 expected_groups=client_sids)
1935 self._rbcd(target_policy=f'Not_Member_of SID({self.service_asserted_identity})',
1936 client_sids=client_sids,
1937 expected_groups=client_sids)
1939 def test_rbcd_client_without_claims_valid(self):
1941 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1942 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1945 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1946 client_sids=client_sids)
1948 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1949 client_sids=client_sids)
1951 def test_rbcd_client_with_claims_valid(self):
1953 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1954 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1955 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1958 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1959 client_sids=client_sids,
1960 expected_groups=client_sids)
1962 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1963 client_sids=client_sids,
1964 expected_groups=client_sids)
1966 def test_rbcd_client_without_compounded_authentication(self):
1968 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1969 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1972 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1973 client_sids=client_sids,
1974 code=KDC_ERR_BADOPTION,
1975 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1976 edata=self.expect_padata_outer)
1978 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1979 client_sids=client_sids,
1980 code=KDC_ERR_POLICY,
1981 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1982 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1983 reason=AuditReason.ACCESS_DENIED,
1984 edata=self.expect_padata_outer)
1986 def test_rbcd_client_with_compounded_authentication(self):
1988 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1989 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1990 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1993 self._rbcd(f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1994 client_sids=client_sids,
1995 expected_groups=client_sids)
1997 self._rbcd(target_policy=f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1998 client_sids=client_sids,
1999 expected_groups=client_sids)
2001 def test_rbcd_device_without_aa_asserted_identity(self):
2003 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2004 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2007 self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2008 device_sids=device_sids,
2009 code=KDC_ERR_BADOPTION,
2010 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2011 edata=self.expect_padata_outer)
2013 self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2014 device_sids=device_sids,
2015 code=KDC_ERR_POLICY,
2016 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2017 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2018 reason=AuditReason.ACCESS_DENIED,
2019 edata=self.expect_padata_outer)
2021 def test_rbcd_device_without_aa_asserted_identity_not_memberof(self):
2023 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2024 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2027 self._rbcd(f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2028 device_sids=device_sids)
2030 self._rbcd(target_policy=f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2031 device_sids=device_sids)
2033 def test_rbcd_device_with_aa_asserted_identity(self):
2035 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2036 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2037 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2040 self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2041 device_sids=device_sids)
2043 self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2044 device_sids=device_sids)
2046 def test_rbcd_device_without_service_asserted_identity(self):
2048 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2049 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2052 self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2053 device_sids=device_sids,
2054 code=KDC_ERR_BADOPTION,
2055 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2056 edata=self.expect_padata_outer)
2058 self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2059 device_sids=device_sids,
2060 code=KDC_ERR_POLICY,
2061 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2062 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2063 reason=AuditReason.ACCESS_DENIED,
2064 edata=self.expect_padata_outer)
2066 def test_rbcd_device_with_service_asserted_identity(self):
2068 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2069 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2070 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2073 self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2074 device_sids=device_sids)
2076 self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2077 device_sids=device_sids)
2079 def test_rbcd_device_without_claims_valid(self):
2081 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2082 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2085 self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2086 device_sids=device_sids,
2087 code=KDC_ERR_BADOPTION,
2088 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2089 edata=self.expect_padata_outer)
2091 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2092 device_sids=device_sids,
2093 code=KDC_ERR_POLICY,
2094 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2095 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2096 reason=AuditReason.ACCESS_DENIED,
2097 edata=self.expect_padata_outer)
2099 def test_rbcd_device_with_claims_valid(self):
2101 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2102 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2103 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2106 self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2107 device_sids=device_sids)
2109 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2110 device_sids=device_sids)
2112 def test_rbcd_device_without_compounded_authentication(self):
2114 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2115 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2118 self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2119 device_sids=device_sids,
2120 code=KDC_ERR_BADOPTION,
2121 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2122 edata=self.expect_padata_outer)
2124 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2125 device_sids=device_sids,
2126 code=KDC_ERR_POLICY,
2127 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2128 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2129 reason=AuditReason.ACCESS_DENIED,
2130 edata=self.expect_padata_outer)
2132 def test_rbcd_device_with_compounded_authentication(self):
2134 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2135 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2136 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2139 self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2140 device_sids=device_sids)
2142 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2143 device_sids=device_sids)
2145 def test_rbcd(self):
2146 self._rbcd('Member_of SID({service_sid})')
2148 def test_rbcd_device_from_rodc(self):
2149 self._rbcd('Member_of SID({service_sid})',
2150 device_from_rodc=True,
2151 code=(0, CRASHES_WINDOWS))
2153 def test_rbcd_service_from_rodc(self):
2154 self._rbcd('Member_of SID({service_sid})',
2155 service_from_rodc=True)
2157 def test_rbcd_device_and_service_from_rodc(self):
2158 self._rbcd('Member_of SID({service_sid})',
2159 service_from_rodc=True,
2160 device_from_rodc=True,
2161 code=(0, CRASHES_WINDOWS))
2163 def test_rbcd_client_from_rodc(self):
2164 self._rbcd('Member_of SID({service_sid})',
2165 client_from_rodc=True)
2167 def test_rbcd_client_and_device_from_rodc(self):
2168 self._rbcd('Member_of SID({service_sid})',
2169 client_from_rodc=True,
2170 device_from_rodc=True,
2171 code=(0, CRASHES_WINDOWS))
2173 def test_rbcd_client_and_service_from_rodc(self):
2174 self._rbcd('Member_of SID({service_sid})',
2175 client_from_rodc=True,
2176 service_from_rodc=True)
2178 def test_rbcd_all_from_rodc(self):
2179 self._rbcd('Member_of SID({service_sid})',
2180 client_from_rodc=True,
2181 service_from_rodc=True,
2182 device_from_rodc=True,
2183 code=(0, CRASHES_WINDOWS))
2186 rbcd_expression=None,
2190 event=AuditEvent.OK,
2191 reason=AuditReason.NONE,
2194 client_from_rodc=False,
2195 service_from_rodc=False,
2196 device_from_rodc=False,
2200 service_claims=None,
2203 expected_groups=None,
2204 expected_device_groups=None,
2205 expected_claims=None):
2207 code, crashes_windows = code
2208 self.assertIs(crashes_windows, CRASHES_WINDOWS)
2209 if not self.crash_windows:
2210 self.skipTest('test crashes Windows servers')
2212 self.assertIsNot(code, CRASHES_WINDOWS)
2214 samdb = self.get_samdb()
2215 functional_level = self.get_domain_functional_level(samdb)
2217 if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2218 self.skipTest('RBCD requires FL2008')
2220 domain_sid_str = samdb.get_domain_sid()
2221 domain_sid = security.dom_sid(domain_sid_str)
2223 client_creds = self.get_cached_creds(
2224 account_type=self.AccountType.USER,
2226 'allowed_replication_mock': client_from_rodc,
2227 'revealed_to_mock_rodc': client_from_rodc,
2229 client_sid = client_creds.get_sid()
2231 client_username = client_creds.get_username()
2232 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2233 names=[client_username])
2235 client_tkt_options = 'forwardable'
2236 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2238 checksum_key = self.get_krbtgt_checksum_key()
2240 if client_from_rodc or service_from_rodc or device_from_rodc:
2241 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2242 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2243 rodc_checksum_key = {
2244 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2247 client_tgt = self.get_tgt(client_creds,
2248 kdc_options=client_tkt_options,
2249 expected_flags=expected_flags)
2251 # Create a machine account with which to perform FAST.
2252 mach_creds = self.get_cached_creds(
2253 account_type=self.AccountType.COMPUTER,
2255 'allowed_replication_mock': device_from_rodc,
2256 'revealed_to_mock_rodc': device_from_rodc,
2258 mach_tgt = self.get_tgt(mach_creds)
2259 device_modify_pac_fn = []
2260 if device_sids is not None:
2261 device_modify_pac_fn.append(partial(self.set_pac_sids,
2262 new_sids=device_sids))
2263 if device_claims is not None:
2264 device_modify_pac_fn.append(partial(self.set_pac_claims,
2265 client_claims=device_claims))
2266 mach_tgt = self.modified_ticket(
2268 modify_pac_fn=device_modify_pac_fn,
2269 new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2270 checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2272 service_creds = self.get_cached_creds(
2273 account_type=self.AccountType.COMPUTER,
2276 'allowed_replication_mock': service_from_rodc,
2277 'revealed_to_mock_rodc': service_from_rodc,
2279 service_tgt = self.get_tgt(service_creds)
2281 service_modify_pac_fn = []
2282 if service_sids is not None:
2283 service_modify_pac_fn.append(partial(self.set_pac_sids,
2284 new_sids=service_sids))
2285 if service_claims is not None:
2286 service_modify_pac_fn.append(partial(self.set_pac_claims,
2287 client_claims=service_claims))
2288 service_tgt = self.modified_ticket(
2290 modify_pac_fn=service_modify_pac_fn,
2291 new_ticket_key=rodc_krbtgt_key if service_from_rodc else None,
2292 checksum_keys=rodc_checksum_key if service_from_rodc else checksum_key)
2294 if target_policy is None:
2296 assigned_policy = None
2298 sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(service_sid=service_creds.get_sid())}))'
2299 policy = self.create_authn_policy(enforced=True,
2300 computer_allowed_to=sddl)
2301 assigned_policy = str(policy.dn)
2303 if rbcd_expression is not None:
2304 sddl = f'O:SYD:(XA;;CR;;;WD;({rbcd_expression.format(service_sid=service_creds.get_sid())}))'
2306 sddl = 'O:SYD:(A;;CR;;;WD)'
2307 descriptor = security.descriptor.from_sddl(sddl, domain_sid)
2308 descriptor = ndr_pack(descriptor)
2310 # Create a target account with the assigned policy.
2311 target_creds = self.get_cached_creds(
2312 account_type=self.AccountType.COMPUTER,
2314 'assigned_policy': assigned_policy,
2315 'additional_details': (
2316 ('msDS-AllowedToActOnBehalfOfOtherIdentity', descriptor),
2320 client_service_tkt = self.get_service_ticket(
2323 kdc_options=client_tkt_options,
2324 expected_flags=expected_flags)
2325 client_modify_pac_fn = []
2326 if client_sids is not None:
2327 client_modify_pac_fn.append(partial(self.set_pac_sids,
2328 new_sids=client_sids))
2329 if client_claims is not None:
2330 client_modify_pac_fn.append(partial(self.set_pac_claims,
2331 client_claims=client_claims))
2332 client_service_tkt = self.modified_ticket(client_service_tkt,
2333 modify_pac_fn=client_modify_pac_fn,
2334 checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2336 kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
2338 target_decryption_key = self.TicketDecryptionKey_from_creds(
2340 target_etypes = target_creds.tgs_supported_enctypes
2342 service_name = service_creds.get_username()
2343 if service_name[-1] == '$':
2344 service_name = service_name[:-1]
2345 expected_transited_services = [
2346 f'host/{service_name}@{service_creds.get_realm()}'
2349 expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2350 expected_device_groups = self.map_sids(expected_device_groups, None, domain_sid_str)
2352 # Show that obtaining a service ticket with RBCD is allowed.
2353 self._tgs_req(service_tgt, code, service_creds, target_creds,
2355 kdc_options=kdc_options,
2356 pac_options='1001', # supports claims, RBCD
2357 expected_cname=client_cname,
2358 expected_account_name=client_username,
2359 additional_ticket=client_service_tkt,
2360 decryption_key=target_decryption_key,
2361 expected_sid=client_sid,
2362 expected_groups=expected_groups,
2363 expect_device_info=bool(expected_device_groups) or None,
2364 expected_device_domain_sid=domain_sid_str,
2365 expected_device_groups=expected_device_groups,
2366 expect_client_claims=bool(expected_claims) or None,
2367 expected_client_claims=expected_claims,
2368 expected_supported_etypes=target_etypes,
2369 expected_proxy_target=target_creds.get_spn(),
2370 expected_transited_services=expected_transited_services,
2371 expected_status=status,
2375 effective_client_creds = service_creds
2377 effective_client_creds = client_creds
2379 self.check_tgs_log(effective_client_creds, target_creds,
2381 checked_creds=service_creds,
2386 def test_tgs_without_aa_asserted_identity(self):
2388 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2389 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2392 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2393 client_sids=client_sids,
2394 code=KDC_ERR_POLICY,
2395 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2396 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2397 reason=AuditReason.ACCESS_DENIED,
2398 edata=self.expect_padata_outer)
2400 def test_tgs_without_aa_asserted_identity_client_from_rodc(self):
2402 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2403 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2406 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2407 client_from_rodc=True,
2408 client_sids=client_sids,
2409 code=KDC_ERR_POLICY,
2410 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2411 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2412 reason=AuditReason.ACCESS_DENIED,
2413 edata=self.expect_padata_outer)
2415 def test_tgs_without_aa_asserted_identity_device_from_rodc(self):
2417 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2418 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2421 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2422 device_from_rodc=True,
2423 client_sids=client_sids,
2424 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2425 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2426 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2427 reason=AuditReason.ACCESS_DENIED,
2428 edata=self.expect_padata_outer)
2430 def test_tgs_without_aa_asserted_identity_both_from_rodc(self):
2432 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2433 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2436 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2437 client_from_rodc=True,
2438 device_from_rodc=True,
2439 client_sids=client_sids,
2440 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2441 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2442 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2443 reason=AuditReason.ACCESS_DENIED,
2444 edata=self.expect_padata_outer)
2446 def test_tgs_with_aa_asserted_identity(self):
2448 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2449 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2450 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2453 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2454 client_sids=client_sids,
2455 expected_groups=client_sids)
2457 def test_tgs_with_aa_asserted_identity_client_from_rodc(self):
2459 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2460 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2461 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2464 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2465 client_from_rodc=True,
2466 client_sids=client_sids,
2467 expected_groups=client_sids)
2469 def test_tgs_with_aa_asserted_identity_device_from_rodc(self):
2471 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2472 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2473 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2476 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2477 device_from_rodc=True,
2478 client_sids=client_sids,
2479 expected_groups=client_sids,
2480 code=(0, CRASHES_WINDOWS))
2482 def test_tgs_with_aa_asserted_identity_both_from_rodc(self):
2484 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2485 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2486 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2489 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2490 client_from_rodc=True,
2491 device_from_rodc=True,
2492 client_sids=client_sids,
2493 expected_groups=client_sids,
2494 code=(0, CRASHES_WINDOWS))
2496 def test_tgs_without_service_asserted_identity(self):
2498 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2499 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2502 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2503 client_sids=client_sids,
2504 code=KDC_ERR_POLICY,
2505 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2506 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2507 reason=AuditReason.ACCESS_DENIED,
2508 edata=self.expect_padata_outer)
2510 def test_tgs_without_service_asserted_identity_client_from_rodc(self):
2512 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2513 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2516 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2517 client_from_rodc=True,
2518 client_sids=client_sids,
2519 code=KDC_ERR_POLICY,
2520 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2521 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2522 reason=AuditReason.ACCESS_DENIED,
2523 edata=self.expect_padata_outer)
2525 def test_tgs_without_service_asserted_identity_device_from_rodc(self):
2527 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2528 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2531 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2532 device_from_rodc=True,
2533 client_sids=client_sids,
2534 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2535 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2536 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2537 reason=AuditReason.ACCESS_DENIED,
2538 edata=self.expect_padata_outer)
2540 def test_tgs_without_service_asserted_identity_both_from_rodc(self):
2542 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2543 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2546 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2547 client_from_rodc=True,
2548 device_from_rodc=True,
2549 client_sids=client_sids,
2550 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2551 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2552 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2553 reason=AuditReason.ACCESS_DENIED,
2554 edata=self.expect_padata_outer)
2556 def test_tgs_with_service_asserted_identity(self):
2558 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2559 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2560 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2563 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2564 client_sids=client_sids,
2565 expected_groups=client_sids)
2567 def test_tgs_with_service_asserted_identity_client_from_rodc(self):
2569 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2570 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2571 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2574 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2575 client_from_rodc=True,
2576 client_sids=client_sids,
2577 expected_groups=client_sids)
2579 def test_tgs_with_service_asserted_identity_device_from_rodc(self):
2581 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2582 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2583 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2586 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2587 device_from_rodc=True,
2588 client_sids=client_sids,
2589 expected_groups=client_sids,
2590 code=(0, CRASHES_WINDOWS))
2592 def test_tgs_with_service_asserted_identity_both_from_rodc(self):
2594 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2595 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2596 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2599 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2600 client_from_rodc=True,
2601 device_from_rodc=True,
2602 client_sids=client_sids,
2603 expected_groups=client_sids,
2604 code=(0, CRASHES_WINDOWS))
2606 def test_tgs_without_claims_valid(self):
2608 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2609 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2612 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2613 client_sids=client_sids,
2614 code=KDC_ERR_POLICY,
2615 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2616 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2617 reason=AuditReason.ACCESS_DENIED,
2618 edata=self.expect_padata_outer)
2620 def test_tgs_without_claims_valid_client_from_rodc(self):
2622 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2623 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2626 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2627 client_from_rodc=True,
2628 client_sids=client_sids,
2629 code=KDC_ERR_POLICY,
2630 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2631 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2632 reason=AuditReason.ACCESS_DENIED,
2633 edata=self.expect_padata_outer)
2635 def test_tgs_without_claims_valid_device_from_rodc(self):
2637 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2638 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2641 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2642 device_from_rodc=True,
2643 client_sids=client_sids,
2644 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2645 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2646 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2647 reason=AuditReason.ACCESS_DENIED,
2648 edata=self.expect_padata_outer)
2650 def test_tgs_without_claims_valid_both_from_rodc(self):
2652 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2653 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2656 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2657 client_from_rodc=True,
2658 device_from_rodc=True,
2659 client_sids=client_sids,
2660 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2661 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2662 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2663 reason=AuditReason.ACCESS_DENIED,
2664 edata=self.expect_padata_outer)
2666 def test_tgs_with_claims_valid(self):
2668 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2669 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2670 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2673 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2674 client_sids=client_sids,
2675 expected_groups=client_sids)
2677 def test_tgs_with_claims_valid_client_from_rodc(self):
2679 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2680 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2681 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2684 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2685 client_from_rodc=True,
2686 client_sids=client_sids,
2687 expected_groups=client_sids)
2689 def test_tgs_with_claims_valid_device_from_rodc(self):
2691 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2692 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2693 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2696 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2697 device_from_rodc=True,
2698 client_sids=client_sids,
2699 expected_groups=client_sids,
2700 code=(0, CRASHES_WINDOWS))
2702 def test_tgs_with_claims_valid_both_from_rodc(self):
2704 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2705 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2706 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2709 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
2710 client_from_rodc=True,
2711 device_from_rodc=True,
2712 client_sids=client_sids,
2713 expected_groups=client_sids,
2714 code=(0, CRASHES_WINDOWS))
2720 event=AuditEvent.OK,
2721 reason=AuditReason.NONE,
2725 client_from_rodc=None,
2726 device_from_rodc=None,
2731 expected_groups=None,
2732 expected_device_groups=None,
2733 expected_claims=None):
2735 code, crashes_windows = code
2736 self.assertIs(crashes_windows, CRASHES_WINDOWS)
2737 if not self.crash_windows:
2738 self.skipTest('test crashes Windows servers')
2740 self.assertIsNot(code, CRASHES_WINDOWS)
2743 self.assertIsNone(device_from_rodc)
2744 self.assertIsNone(device_sids)
2745 self.assertIsNone(device_claims)
2746 self.assertIsNone(expected_device_groups)
2748 if client_from_rodc is None:
2749 client_from_rodc = False
2751 if device_from_rodc is None:
2752 device_from_rodc = False
2754 client_creds = self.get_cached_creds(
2755 account_type=self.AccountType.USER,
2757 'allowed_replication_mock': client_from_rodc,
2758 'revealed_to_mock_rodc': client_from_rodc,
2760 client_sid = client_creds.get_sid()
2762 client_username = client_creds.get_username()
2763 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2764 names=[client_username])
2766 client_tkt_options = 'forwardable'
2767 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2769 checksum_key = self.get_krbtgt_checksum_key()
2771 if client_from_rodc or device_from_rodc:
2772 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2773 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2774 rodc_checksum_key = {
2775 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2778 client_tgt = self.get_tgt(client_creds,
2779 kdc_options=client_tkt_options,
2780 expected_flags=expected_flags)
2782 client_modify_pac_fn = []
2783 if client_sids is not None:
2784 client_modify_pac_fn.append(partial(self.set_pac_sids,
2785 new_sids=client_sids))
2786 if client_claims is not None:
2787 client_modify_pac_fn.append(partial(self.set_pac_claims,
2788 client_claims=client_claims))
2789 client_tgt = self.modified_ticket(
2791 modify_pac_fn=client_modify_pac_fn,
2792 new_ticket_key=rodc_krbtgt_key if client_from_rodc else None,
2793 checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2796 # Create a machine account with which to perform FAST.
2797 mach_creds = self.get_cached_creds(
2798 account_type=self.AccountType.COMPUTER,
2800 'allowed_replication_mock': device_from_rodc,
2801 'revealed_to_mock_rodc': device_from_rodc,
2803 mach_tgt = self.get_tgt(mach_creds)
2804 device_modify_pac_fn = []
2805 if device_sids is not None:
2806 device_modify_pac_fn.append(partial(self.set_pac_sids,
2807 new_sids=device_sids))
2808 if device_claims is not None:
2809 device_modify_pac_fn.append(partial(self.set_pac_claims,
2810 client_claims=device_claims))
2811 mach_tgt = self.modified_ticket(
2813 modify_pac_fn=device_modify_pac_fn,
2814 new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2815 checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2819 if target_policy is None:
2821 assigned_policy = None
2823 sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(client_sid=client_creds.get_sid())}))'
2824 policy = self.create_authn_policy(enforced=True,
2825 computer_allowed_to=sddl)
2826 assigned_policy = str(policy.dn)
2828 # Create a target account with the assigned policy.
2829 target_creds = self.get_cached_creds(
2830 account_type=self.AccountType.COMPUTER,
2831 opts={'assigned_policy': assigned_policy})
2833 target_decryption_key = self.TicketDecryptionKey_from_creds(
2835 target_etypes = target_creds.tgs_supported_enctypes
2837 samdb = self.get_samdb()
2838 domain_sid_str = samdb.get_domain_sid()
2840 expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2841 expected_device_groups = self.map_sids(expected_device_groups, None, domain_sid_str)
2843 # Show that obtaining a service ticket is allowed.
2844 self._tgs_req(client_tgt, code, client_creds, target_creds,
2846 expected_cname=client_cname,
2847 expected_account_name=client_username,
2848 decryption_key=target_decryption_key,
2849 expected_sid=client_sid,
2850 expected_groups=expected_groups,
2851 expect_device_info=bool(expected_device_groups) or None,
2852 expected_device_domain_sid=domain_sid_str,
2853 expected_device_groups=expected_device_groups,
2854 expect_client_claims=bool(expected_claims) or None,
2855 expected_client_claims=expected_claims,
2856 expected_supported_etypes=target_etypes,
2857 expected_status=status,
2860 self.check_tgs_log(client_creds, target_creds,
2862 checked_creds=client_creds,
2867 def test_conditional_ace_allowed_from_user_deny(self):
2868 # Create a machine account with which to perform FAST.
2869 mach_creds = self.get_cached_creds(
2870 account_type=self.AccountType.COMPUTER)
2871 mach_tgt = self.get_tgt(mach_creds)
2873 # Create an authentication policy that explicitly denies the machine
2874 # account for a user.
2875 allowed = 'O:SYD:(A;;CR;;;WD)'
2876 denied = f'O:SYD:(XD;;CR;;;{mach_creds.get_sid()};(abc))'
2877 policy = self.create_authn_policy(enforced=True,
2878 user_allowed_from=denied,
2879 service_allowed_from=allowed)
2881 # Create a user account with the assigned policy.
2882 client_creds = self._get_creds(account_type=self.AccountType.USER,
2883 assigned_policy=policy)
2885 # Show that we get a policy error when trying to authenticate.
2886 self._get_tgt(client_creds, armor_tgt=mach_tgt,
2887 expected_error=KDC_ERR_POLICY)
2891 armor_creds=mach_creds,
2892 client_policy=policy,
2893 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2894 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
2895 reason=AuditReason.ACCESS_DENIED,
2896 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
2899 class DeviceRestrictionTests(ConditionalAceBaseTests):
2900 def test_pac_groups_not_present(self):
2901 """Test that authentication fails if the device does not belong to some
2906 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
2907 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
2910 # Create a machine account with which to perform FAST.
2911 mach_creds = self.get_cached_creds(
2912 account_type=self.AccountType.COMPUTER,
2913 opts={'id': 'device'})
2914 mach_tgt = self.get_tgt(mach_creds)
2916 # Create an authentication policy that requires the device to belong to
2918 client_policy_sddl = self.allow_if(
2919 f'Member_of {self.sddl_array_from_sids(required_sids)}')
2920 client_policy = self.create_authn_policy(
2921 enforced=True, user_allowed_from=client_policy_sddl)
2923 # Create a user account with the assigned policy.
2924 client_creds = self._get_creds(account_type=self.AccountType.USER,
2925 assigned_policy=client_policy)
2927 # Show that authentication fails.
2928 self._armored_as_req(client_creds,
2929 self.get_krbtgt_creds(),
2931 expected_error=KDC_ERR_POLICY)
2935 armor_creds=mach_creds,
2936 client_policy=client_policy,
2937 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2938 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
2939 reason=AuditReason.ACCESS_DENIED,
2940 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
2942 def test_pac_groups_present(self):
2943 """Test that authentication succeeds if the device belongs to some
2948 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
2949 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
2952 device_sids = required_sids | {
2953 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2954 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2957 # Create a machine account with which to perform FAST.
2958 mach_creds = self.get_cached_creds(
2959 account_type=self.AccountType.COMPUTER,
2960 opts={'id': 'device'})
2961 mach_tgt = self.get_tgt(mach_creds)
2963 # Add the required groups to the machine account’s TGT.
2964 mach_tgt = self.modified_ticket(
2966 modify_pac_fn=partial(self.set_pac_sids,
2967 new_sids=device_sids),
2968 checksum_keys=self.get_krbtgt_checksum_key())
2970 # Create an authentication policy that requires the device to belong to
2972 client_policy_sddl = self.allow_if(
2973 f'Member_of {self.sddl_array_from_sids(required_sids)}')
2974 client_policy = self.create_authn_policy(
2975 enforced=True, user_allowed_from=client_policy_sddl)
2977 # Create a user account with the assigned policy.
2978 client_creds = self._get_creds(account_type=self.AccountType.USER,
2979 assigned_policy=client_policy)
2981 # FIXME: we need to pass this parameter only because Samba doesn’t
2982 # handle ‘krbtgt@REALM’ principals correctly (see
2983 # https://bugzilla.samba.org/show_bug.cgi?id=15482).
2984 krbtgt_sname = self.get_krbtgt_sname()
2986 # Show that authentication succeeds.
2987 self._armored_as_req(client_creds,
2988 self.get_krbtgt_creds(),
2990 target_sname=krbtgt_sname)
2992 self.check_as_log(client_creds,
2993 armor_creds=mach_creds,
2994 client_policy=client_policy)
2996 def test_pac_resource_groups_present(self):
2997 """Test that authentication succeeds if the device belongs to some
2998 required resource groups.
3002 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3003 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3004 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3007 device_sids = required_sids | {
3008 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3009 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3012 # Create a machine account with which to perform FAST.
3013 mach_creds = self.get_cached_creds(
3014 account_type=self.AccountType.COMPUTER,
3015 opts={'id': 'device'})
3016 mach_tgt = self.get_tgt(mach_creds)
3018 # Add the required groups to the machine account’s TGT.
3019 mach_tgt = self.modified_ticket(
3021 modify_pac_fn=partial(self.set_pac_sids,
3022 new_sids=device_sids),
3023 checksum_keys=self.get_krbtgt_checksum_key())
3025 # Create an authentication policy that requires the device to belong to
3027 client_policy_sddl = self.allow_if(
3028 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3029 client_policy = self.create_authn_policy(
3030 enforced=True, user_allowed_from=client_policy_sddl)
3032 # Create a user account with the assigned policy.
3033 client_creds = self._get_creds(account_type=self.AccountType.USER,
3034 assigned_policy=client_policy)
3036 # Show that authentication fails.
3037 self._armored_as_req(client_creds,
3038 self.get_krbtgt_creds(),
3040 expected_error=KDC_ERR_POLICY)
3044 armor_creds=mach_creds,
3045 client_policy=client_policy,
3046 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3047 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3048 reason=AuditReason.ACCESS_DENIED,
3049 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3051 def test_pac_resource_groups_present_to_service_sid_compression(self):
3052 """Test that authentication succeeds if the device belongs to some
3053 required resource groups, and the request is to a service that supports
3058 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3059 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3060 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3063 device_sids = required_sids | {
3064 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3065 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3068 # Create a machine account with which to perform FAST.
3069 mach_creds = self.get_cached_creds(
3070 account_type=self.AccountType.COMPUTER,
3071 opts={'id': 'device'})
3072 mach_tgt = self.get_tgt(mach_creds)
3074 # Add the required groups to the machine account’s TGT.
3075 mach_tgt = self.modified_ticket(
3077 modify_pac_fn=partial(self.set_pac_sids,
3078 new_sids=device_sids),
3079 checksum_keys=self.get_krbtgt_checksum_key())
3081 # Create an authentication policy that requires the device to belong to
3083 client_policy_sddl = self.allow_if(
3084 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3085 client_policy = self.create_authn_policy(
3086 enforced=True, user_allowed_from=client_policy_sddl)
3088 # Create a user account with the assigned policy.
3089 client_creds = self._get_creds(account_type=self.AccountType.USER,
3090 assigned_policy=client_policy)
3092 target_creds = self.get_cached_creds(
3093 account_type=self.AccountType.COMPUTER,
3094 opts={'id': 'target'})
3096 # Show that authentication fails.
3097 self._armored_as_req(client_creds,
3100 expected_error=KDC_ERR_POLICY)
3104 armor_creds=mach_creds,
3105 client_policy=client_policy,
3106 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3107 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3108 reason=AuditReason.ACCESS_DENIED,
3109 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3111 def test_pac_resource_groups_present_to_service_no_sid_compression(self):
3112 """Test that authentication succeeds if the device belongs to some
3113 required resource groups, and the request is to a service that does not
3114 support SID compression.
3118 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3119 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3120 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3123 device_sids = required_sids | {
3124 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3125 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3128 # Create a machine account with which to perform FAST.
3129 mach_creds = self.get_cached_creds(
3130 account_type=self.AccountType.COMPUTER,
3131 opts={'id': 'device'})
3132 mach_tgt = self.get_tgt(mach_creds)
3134 # Add the required groups to the machine account’s TGT.
3135 mach_tgt = self.modified_ticket(
3137 modify_pac_fn=partial(self.set_pac_sids,
3138 new_sids=device_sids),
3139 checksum_keys=self.get_krbtgt_checksum_key())
3141 # Create an authentication policy that requires the device to belong to
3143 client_policy_sddl = self.allow_if(
3144 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3145 client_policy = self.create_authn_policy(
3146 enforced=True, user_allowed_from=client_policy_sddl)
3148 # Create a user account with the assigned policy.
3149 client_creds = self._get_creds(account_type=self.AccountType.USER,
3150 assigned_policy=client_policy)
3152 target_creds = self.get_cached_creds(
3153 account_type=self.AccountType.COMPUTER,
3156 'supported_enctypes': (
3157 security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
3158 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK),
3159 'sid_compression_support': False,
3162 # Show that authentication fails.
3163 self._armored_as_req(client_creds,
3166 expected_error=KDC_ERR_POLICY)
3170 armor_creds=mach_creds,
3171 client_policy=client_policy,
3172 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3173 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3174 reason=AuditReason.ACCESS_DENIED,
3175 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3177 def test_pac_well_known_groups_not_present(self):
3178 """Test that authentication fails if the device does not belong to one
3179 or more required well‐known groups.
3183 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3184 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
3185 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3186 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3190 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3191 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3194 # Create a machine account with which to perform FAST.
3195 mach_creds = self.get_cached_creds(
3196 account_type=self.AccountType.COMPUTER,
3197 opts={'id': 'device'})
3198 mach_tgt = self.get_tgt(mach_creds)
3200 # Modify the machine account’s TGT to contain only the SID of the
3201 # machine account’s primary group.
3202 mach_tgt = self.modified_ticket(
3204 modify_pac_fn=partial(self.set_pac_sids,
3205 new_sids=device_sids),
3206 checksum_keys=self.get_krbtgt_checksum_key())
3208 # Create an authentication policy that requires the device to belong to
3210 client_policy_sddl = self.allow_if(
3211 f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
3212 client_policy = self.create_authn_policy(
3213 enforced=True, user_allowed_from=client_policy_sddl)
3215 # Create a user account with the assigned policy.
3216 client_creds = self._get_creds(account_type=self.AccountType.USER,
3217 assigned_policy=client_policy)
3219 # Show that authentication fails.
3220 self._armored_as_req(client_creds,
3221 self.get_krbtgt_creds(),
3223 expected_error=KDC_ERR_POLICY)
3227 armor_creds=mach_creds,
3228 client_policy=client_policy,
3229 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3230 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3231 reason=AuditReason.ACCESS_DENIED,
3232 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3234 def test_pac_device_info(self):
3235 """Test the groups of the client and the device after performing a
3236 FAST‐armored AS‐REQ.
3240 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3241 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3244 # Create a machine account with which to perform FAST.
3245 mach_creds = self.get_cached_creds(
3246 account_type=self.AccountType.COMPUTER,
3247 opts={'id': 'device'})
3248 mach_tgt = self.get_tgt(mach_creds)
3250 # Add the required groups to the machine account’s TGT.
3251 mach_tgt = self.modified_ticket(
3253 modify_pac_fn=partial(self.set_pac_sids,
3254 new_sids=device_sids),
3255 checksum_keys=self.get_krbtgt_checksum_key())
3257 # Create a user account.
3258 client_creds = self._get_creds(account_type=self.AccountType.USER)
3260 target_creds = self.get_cached_creds(
3261 account_type=self.AccountType.COMPUTER,
3262 opts={'id': 'target'})
3265 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3266 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3267 # The client’s groups are to include the Asserted Identity and
3268 # Claims Valid SIDs.
3269 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3270 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3273 samdb = self.get_samdb()
3274 domain_sid_str = samdb.get_domain_sid()
3276 expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3278 # Show that authentication succeeds. Check that the groups in the PAC
3280 self._armored_as_req(client_creds,
3283 expected_groups=expected_sids,
3284 expect_device_info=False,
3285 expected_device_groups=None)
3289 armor_creds=mach_creds)
3291 def test_pac_claims_not_present(self):
3292 """Test that authentication fails if the device does not have a
3296 claim_id = 'the name of the claim'
3297 claim_value = 'the value of the claim'
3299 # Create a machine account with which to perform FAST.
3300 mach_creds = self.get_cached_creds(
3301 account_type=self.AccountType.COMPUTER,
3302 opts={'id': 'device'})
3303 mach_tgt = self.get_tgt(mach_creds)
3305 # Create an authentication policy that requires the device to have a
3307 client_policy_sddl = self.allow_if(
3308 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3309 client_policy = self.create_authn_policy(
3310 enforced=True, user_allowed_from=client_policy_sddl)
3312 # Create a user account with the assigned policy.
3313 client_creds = self._get_creds(account_type=self.AccountType.USER,
3314 assigned_policy=client_policy)
3316 # Show that authentication fails.
3317 self._armored_as_req(client_creds,
3318 self.get_krbtgt_creds(),
3320 expected_error=KDC_ERR_POLICY)
3324 armor_creds=mach_creds,
3325 client_policy=client_policy,
3326 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3327 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3328 reason=AuditReason.ACCESS_DENIED,
3329 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3331 def test_pac_claims_present(self):
3332 """Test that authentication succeeds if the device has a required
3336 claim_id = 'the name of the claim'
3337 claim_value = 'the value of the claim'
3340 (claims.CLAIMS_SOURCE_TYPE_AD, [
3341 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3345 # Create a machine account with which to perform FAST.
3346 mach_creds = self.get_cached_creds(
3347 account_type=self.AccountType.COMPUTER,
3348 opts={'id': 'device'})
3349 mach_tgt = self.get_tgt(mach_creds)
3351 # Add the required claim to the machine account’s TGT.
3352 mach_tgt = self.modified_ticket(
3354 modify_pac_fn=partial(self.set_pac_claims,
3355 client_claims=pac_claims),
3356 checksum_keys=self.get_krbtgt_checksum_key())
3358 # Create an authentication policy that requires the device to have a
3360 client_policy_sddl = self.allow_if(
3361 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3362 client_policy = self.create_authn_policy(
3363 enforced=True, user_allowed_from=client_policy_sddl)
3365 # Create a user account with the assigned policy.
3366 client_creds = self._get_creds(account_type=self.AccountType.USER,
3367 assigned_policy=client_policy)
3369 # FIXME: we need to pass this parameter only because Samba doesn’t
3370 # handle ‘krbtgt@REALM’ principals correctly (see
3371 # https://bugzilla.samba.org/show_bug.cgi?id=15482).
3372 krbtgt_sname = self.get_krbtgt_sname()
3374 # Show that authentication succeeds.
3375 self._armored_as_req(client_creds,
3376 self.get_krbtgt_creds(),
3378 target_sname=krbtgt_sname)
3380 self.check_as_log(client_creds,
3381 armor_creds=mach_creds,
3382 client_policy=client_policy)
3384 def test_pac_claims_invalid(self):
3385 """Test that authentication fails if the device’s required claim is not
3389 claim_id = 'the name of the claim'
3390 claim_value = 'the value of the claim'
3393 (claims.CLAIMS_SOURCE_TYPE_AD, [
3394 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3398 # The device’s SIDs do not include the Claims Valid SID.
3400 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3401 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3404 # Create a machine account with which to perform FAST.
3405 mach_creds = self.get_cached_creds(
3406 account_type=self.AccountType.COMPUTER,
3407 opts={'id': 'device'})
3408 mach_tgt = self.get_tgt(mach_creds)
3410 # Add the SIDs and the required claim to the machine account’s TGT.
3411 mach_tgt = self.modified_ticket(
3414 partial(self.set_pac_claims, client_claims=pac_claims),
3415 partial(self.set_pac_sids, new_sids=device_sids)],
3416 checksum_keys=self.get_krbtgt_checksum_key())
3418 # Create an authentication policy that requires the device to have a
3420 client_policy_sddl = self.allow_if(
3421 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
3422 client_policy = self.create_authn_policy(
3423 enforced=True, user_allowed_from=client_policy_sddl)
3425 # Create a user account with the assigned policy.
3426 client_creds = self._get_creds(account_type=self.AccountType.USER,
3427 assigned_policy=client_policy)
3429 # Show that authentication fails.
3430 self._armored_as_req(client_creds,
3431 self.get_krbtgt_creds(),
3433 expected_error=KDC_ERR_POLICY)
3437 armor_creds=mach_creds,
3438 client_policy=client_policy,
3439 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3440 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3441 reason=AuditReason.ACCESS_DENIED,
3442 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3444 def test_device_in_world_group(self):
3445 self._check_device_in_group(security.SID_WORLD)
3447 def test_device_in_network_group(self):
3448 self._check_device_not_in_group(security.SID_NT_NETWORK)
3450 def test_device_in_authenticated_users(self):
3451 self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
3453 def _check_device_in_group(self, group):
3454 self._check_device_membership(group, expect_in_group=True)
3456 def _check_device_not_in_group(self, group):
3457 self._check_device_membership(group, expect_in_group=False)
3459 def _check_device_membership(self, group, *, expect_in_group):
3460 """Test that authentication succeeds or fails when the device is
3461 required to belong to a certain group.
3464 # Create a machine account with which to perform FAST.
3465 mach_creds = self.get_cached_creds(
3466 account_type=self.AccountType.COMPUTER,
3467 opts={'id': 'device'})
3468 mach_tgt = self.get_tgt(mach_creds)
3470 # Create an authentication policy that requires the device to belong to
3472 in_group_sddl = self.allow_if(f'Member_of {{SID({group})}}')
3473 in_group_policy = self.create_authn_policy(
3474 enforced=True, user_allowed_from=in_group_sddl)
3476 # Create a user account with the assigned policy.
3477 client_creds = self._get_creds(account_type=self.AccountType.USER,
3478 assigned_policy=in_group_policy)
3480 krbtgt_creds = self.get_krbtgt_creds()
3482 # FIXME: we need to pass this parameter only because Samba doesn’t
3483 # handle ‘krbtgt@REALM’ principals correctly (see
3484 # https://bugzilla.samba.org/show_bug.cgi?id=15482).
3485 krbtgt_sname = self.get_krbtgt_sname()
3487 # Test whether authentication succeeds or fails.
3488 self._armored_as_req(
3492 target_sname=krbtgt_sname,
3493 expected_error=0 if expect_in_group else KDC_ERR_POLICY)
3495 policy_success_args = {}
3496 policy_failure_args = {
3497 'client_policy_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3498 'event': AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3499 'reason': AuditReason.ACCESS_DENIED,
3500 'status': ntstatus.NT_STATUS_INVALID_WORKSTATION,
3503 self.check_as_log(client_creds,
3504 armor_creds=mach_creds,
3505 client_policy=in_group_policy,
3506 **(policy_success_args if expect_in_group
3507 else policy_failure_args))
3509 # Create an authentication policy that requires the device not to belong
3511 not_in_group_sddl = self.allow_if(f'Not_Member_of {{SID({group})}}')
3512 not_in_group_policy = self.create_authn_policy(
3513 enforced=True, user_allowed_from=not_in_group_sddl)
3515 # Create a user account with the assigned policy.
3516 client_creds = self._get_creds(account_type=self.AccountType.USER,
3517 assigned_policy=not_in_group_policy)
3519 # Test whether authentication succeeds or fails.
3520 self._armored_as_req(
3524 target_sname=krbtgt_sname,
3525 expected_error=KDC_ERR_POLICY if expect_in_group else 0)
3527 self.check_as_log(client_creds,
3528 armor_creds=mach_creds,
3529 client_policy=not_in_group_policy,
3530 **(policy_failure_args if expect_in_group
3531 else policy_success_args))
3534 class TgsReqServicePolicyTests(ConditionalAceBaseTests):
3535 def test_pac_groups_not_present(self):
3536 """Test that authorization succeeds if the client does not belong to
3537 some required groups.
3541 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3542 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3545 # Create a machine account with which to perform FAST.
3546 mach_creds = self.get_cached_creds(
3547 account_type=self.AccountType.COMPUTER,
3548 opts={'id': 'device'})
3549 mach_tgt = self.get_tgt(mach_creds)
3551 # Create a user account.
3552 client_creds = self._get_creds(account_type=self.AccountType.USER)
3553 client_tgt = self.get_tgt(client_creds)
3555 # Create an authentication policy that requires the client to belong to
3557 target_policy_sddl = self.allow_if(
3558 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3559 target_policy = self.create_authn_policy(
3560 enforced=True, computer_allowed_to=target_policy_sddl)
3562 # Create a target account with the assigned policy.
3563 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3564 assigned_policy=target_policy)
3566 # Show that authorization fails.
3568 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3570 expect_edata=self.expect_padata_outer,
3571 # We aren’t particular about whether or not we get an NTSTATUS.
3573 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3576 client_creds, target_creds,
3577 policy=target_policy,
3578 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3579 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3580 reason=AuditReason.ACCESS_DENIED)
3582 def test_pac_groups_present(self):
3583 """Test that authorization succeeds if the client belongs to some
3588 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3589 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3592 client_sids = required_sids | {
3593 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3594 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3597 # Create a machine account with which to perform FAST.
3598 mach_creds = self.get_cached_creds(
3599 account_type=self.AccountType.COMPUTER,
3600 opts={'id': 'device'})
3601 mach_tgt = self.get_tgt(mach_creds)
3603 # Create a user account.
3604 client_creds = self._get_creds(account_type=self.AccountType.USER)
3605 client_tgt = self.get_tgt(client_creds)
3607 # Add the required groups to the client’s TGT.
3608 client_tgt = self.modified_ticket(
3610 modify_pac_fn=partial(self.set_pac_sids,
3611 new_sids=client_sids),
3612 checksum_keys=self.get_krbtgt_checksum_key())
3614 # Create an authentication policy that requires the client to belong to
3616 target_policy_sddl = self.allow_if(
3617 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3618 target_policy = self.create_authn_policy(
3619 enforced=True, computer_allowed_to=target_policy_sddl)
3621 # Create a target account with the assigned policy.
3622 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3623 assigned_policy=target_policy)
3625 # Show that authorization succeeds.
3626 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
3628 self.check_tgs_log(client_creds, target_creds,
3629 policy=target_policy)
3631 def test_pac_resource_groups_present_to_service_sid_compression(self):
3632 """Test that authorization succeeds if the client belongs to some
3633 required resource groups, and the request is to a service that supports
3638 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3639 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3640 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3643 client_sids = required_sids | {
3644 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3645 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3648 # Create a machine account with which to perform FAST.
3649 mach_creds = self.get_cached_creds(
3650 account_type=self.AccountType.COMPUTER,
3651 opts={'id': 'device'})
3652 mach_tgt = self.get_tgt(mach_creds)
3654 # Create a user account.
3655 client_creds = self._get_creds(account_type=self.AccountType.USER)
3656 client_tgt = self.get_tgt(client_creds)
3658 # Add the required groups to the client’s TGT.
3659 client_tgt = self.modified_ticket(
3661 modify_pac_fn=partial(self.set_pac_sids,
3662 new_sids=client_sids),
3663 checksum_keys=self.get_krbtgt_checksum_key())
3665 # Create an authentication policy that requires the client to belong to
3667 target_policy_sddl = self.allow_if(
3668 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3669 target_policy = self.create_authn_policy(
3670 enforced=True, computer_allowed_to=target_policy_sddl)
3672 # Create a target account with the assigned policy.
3673 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3674 assigned_policy=target_policy)
3676 # Show that authorization fails.
3678 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3680 expect_edata=self.expect_padata_outer,
3681 # We aren’t particular about whether or not we get an NTSTATUS.
3683 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3686 client_creds, target_creds,
3687 policy=target_policy,
3688 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3689 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3690 reason=AuditReason.ACCESS_DENIED)
3692 def test_pac_resource_groups_present_to_service_no_sid_compression(self):
3693 """Test that authorization succeeds if the client belongs to some
3694 required resource groups, and the request is to a service that does not
3695 support SID compression.
3699 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3700 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3701 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3704 client_sids = required_sids | {
3705 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3706 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3709 # Create a machine account with which to perform FAST.
3710 mach_creds = self.get_cached_creds(
3711 account_type=self.AccountType.COMPUTER,
3712 opts={'id': 'device'})
3713 mach_tgt = self.get_tgt(mach_creds)
3715 # Create a user account.
3716 client_creds = self._get_creds(account_type=self.AccountType.USER)
3717 client_tgt = self.get_tgt(client_creds)
3719 # Add the required groups to the client’s TGT.
3720 client_tgt = self.modified_ticket(
3722 modify_pac_fn=partial(self.set_pac_sids,
3723 new_sids=client_sids),
3724 checksum_keys=self.get_krbtgt_checksum_key())
3726 # Create an authentication policy that requires the client to belong to
3728 target_policy_sddl = self.allow_if(
3729 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3730 target_policy = self.create_authn_policy(
3731 enforced=True, computer_allowed_to=target_policy_sddl)
3733 # Create a target account with the assigned policy.
3734 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3735 assigned_policy=target_policy,
3736 additional_details={
3737 'msDS-SupportedEncryptionTypes': str((
3738 security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
3739 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK) | (
3740 security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED))})
3742 # Show that authorization fails.
3744 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3746 expect_edata=self.expect_padata_outer,
3747 # We aren’t particular about whether or not we get an NTSTATUS.
3749 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3752 client_creds, target_creds,
3753 policy=target_policy,
3754 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3755 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3756 reason=AuditReason.ACCESS_DENIED)
3758 def test_pac_well_known_groups_not_present(self):
3759 """Test that authorization fails if the client does not belong to one
3760 or more required well‐known groups.
3764 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3765 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
3766 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3767 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3771 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3772 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3775 # Create a machine account with which to perform FAST.
3776 mach_creds = self.get_cached_creds(
3777 account_type=self.AccountType.COMPUTER,
3778 opts={'id': 'device'})
3779 mach_tgt = self.get_tgt(mach_creds)
3781 # Create a user account.
3782 client_creds = self._get_creds(account_type=self.AccountType.USER)
3783 client_tgt = self.get_tgt(client_creds)
3785 # Modify the client’s TGT to contain only the SID of the client’s
3787 client_tgt = self.modified_ticket(
3789 modify_pac_fn=partial(self.set_pac_sids,
3790 new_sids=client_sids),
3791 checksum_keys=self.get_krbtgt_checksum_key())
3793 # Create an authentication policy that requires the client to belong to
3795 target_policy_sddl = self.allow_if(
3796 f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
3797 target_policy = self.create_authn_policy(
3798 enforced=True, computer_allowed_to=target_policy_sddl)
3800 # Create a target account with the assigned policy.
3801 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
3802 assigned_policy=target_policy)
3804 # Show that authorization fails.
3806 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
3808 expect_edata=self.expect_padata_outer,
3809 # We aren’t particular about whether or not we get an NTSTATUS.
3811 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
3814 client_creds, target_creds,
3815 policy=target_policy,
3816 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3817 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3818 reason=AuditReason.ACCESS_DENIED)
3820 def test_pac_device_info(self):
3821 self._run_pac_device_info_test()
3823 def test_pac_device_info_no_compound_id_support(self):
3824 self._run_pac_device_info_test(compound_id_support=False)
3826 def test_pac_device_info_no_claims_valid(self):
3827 self._run_pac_device_info_test(device_claims_valid=False)
3829 def _run_pac_device_info_test(self, compound_id_support=True, device_claims_valid=True):
3830 """Test the groups of the client and the device after performing a
3831 FAST‐armored TGS‐REQ.
3834 client_claim_id = 'the name of the client’s client claim'
3835 client_claim_value = 'the value of the client’s client claim'
3838 (claims.CLAIMS_SOURCE_TYPE_AD, [
3839 (client_claim_id, claims.CLAIM_TYPE_STRING, [client_claim_value]),
3843 expected_client_claims = {
3845 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
3846 'type': claims.CLAIM_TYPE_STRING,
3847 'values': (client_claim_value,),
3851 device_claim_id = 'the name of the device’s client claim'
3852 device_claim_value = 'the value of the device’s client claim'
3855 (claims.CLAIMS_SOURCE_TYPE_AD, [
3856 (device_claim_id, claims.CLAIM_TYPE_STRING, [device_claim_value]),
3860 if compound_id_support:
3861 expected_device_claims = {
3863 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
3864 'type': claims.CLAIM_TYPE_STRING,
3865 'values': (device_claim_value,),
3869 expected_device_claims = None
3871 # Create a machine account with which to perform FAST.
3872 mach_creds = self.get_cached_creds(
3873 account_type=self.AccountType.COMPUTER,
3874 opts={'id': 'device'})
3875 mach_tgt = self.get_tgt(mach_creds)
3878 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3879 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3880 # This to ensure we have EXTRA_SIDS set already, as
3881 # windows won't set that flag otherwise when adding one
3883 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3887 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3888 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3889 ('S-1-2-3-4', SidType.EXTRA_SID, self.resource_attrs),
3890 ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
3893 if device_claims_valid:
3894 device_sids.add((security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs))
3896 # Modify the machine account’s TGT to contain only the SID of the
3897 # machine account’s primary group.
3898 mach_tgt = self.modified_ticket(
3901 partial(self.set_pac_sids,
3902 new_sids=device_sids),
3903 partial(self.set_pac_claims, client_claims=device_claims),
3905 checksum_keys=self.get_krbtgt_checksum_key())
3907 # Create a user account.
3908 client_creds = self._get_creds(account_type=self.AccountType.USER)
3909 client_tgt = self.get_tgt(client_creds)
3911 # Modify the client’s TGT to contain only the SID of the client’s
3913 client_tgt = self.modified_ticket(
3916 partial(self.set_pac_sids,
3917 new_sids=client_sids),
3918 partial(self.set_pac_claims, client_claims=client_claims),
3920 checksum_keys=self.get_krbtgt_checksum_key())
3922 # Indicate that Compound Identity is supported.
3923 target_creds, _ = self.get_target(to_krbtgt=False, compound_id=compound_id_support)
3926 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3927 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3928 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3929 # The client’s groups are not to include the Asserted Identity and
3930 # Claims Valid SIDs.
3933 if compound_id_support:
3934 expected_sids.add((security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs))
3936 expected_device_sids = {
3937 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3938 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3939 ('S-1-2-3-4', SidType.EXTRA_SID, self.resource_attrs),
3940 ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
3943 if device_claims_valid:
3944 expected_device_sids.add(frozenset([(security.SID_CLAIMS_VALID, SidType.RESOURCE_SID, self.default_attrs)]))
3946 expected_device_sids = None
3948 samdb = self.get_samdb()
3949 domain_sid_str = samdb.get_domain_sid()
3951 expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3952 # The device SIDs will be put into the PAC unmodified.
3953 expected_device_sids = self.map_sids(expected_device_sids, None, domain_sid_str)
3955 # Show that authorization succeeds.
3956 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
3957 expected_groups=expected_sids,
3958 expect_device_info=bool(expected_device_sids) or None,
3959 expected_device_domain_sid=domain_sid_str,
3960 expected_device_groups=expected_device_sids,
3961 expect_client_claims=bool(expected_client_claims) or None,
3962 expected_client_claims=expected_client_claims,
3963 expect_device_claims=bool(expected_device_claims) or None,
3964 expected_device_claims=expected_device_claims)
3966 self.check_tgs_log(client_creds, target_creds)
3968 def test_pac_extra_sids_behaviour(self):
3969 """Test the groups of the client and the device after performing a
3970 FAST‐armored TGS‐REQ.
3973 # Create a machine account with which to perform FAST.
3974 mach_creds = self.get_cached_creds(
3975 account_type=self.AccountType.COMPUTER,
3976 opts={'id': 'device'})
3977 mach_tgt = self.get_tgt(mach_creds)
3980 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3981 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3984 # Create a user account.
3985 client_creds = self._get_creds(account_type=self.AccountType.USER)
3986 client_tgt = self.get_tgt(client_creds)
3988 # Modify the client’s TGT to contain only the SID of the client’s
3990 client_tgt = self.modified_ticket(
3992 modify_pac_fn=partial(self.set_pac_sids,
3993 new_sids=client_sids),
3994 checksum_keys=self.get_krbtgt_checksum_key())
3996 # Indicate that Compound Identity is supported.
3997 target_creds, _ = self.get_target(to_krbtgt=False, compound_id=True)
4000 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4001 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4002 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs)
4003 # The client’s groups are not to include the Asserted Identity and
4004 # Claims Valid SIDs.
4007 samdb = self.get_samdb()
4008 domain_sid_str = samdb.get_domain_sid()
4010 expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
4012 # Show that authorization succeeds.
4013 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
4014 expected_groups=expected_sids)
4016 self.check_tgs_log(client_creds, target_creds)
4018 def test_pac_claims_not_present(self):
4019 """Test that authentication fails if the device does not have a
4023 claim_id = 'the name of the claim'
4024 claim_value = 'the value of the claim'
4026 # Create a machine account with which to perform FAST.
4027 mach_creds = self.get_cached_creds(
4028 account_type=self.AccountType.COMPUTER,
4029 opts={'id': 'device'})
4030 mach_tgt = self.get_tgt(mach_creds)
4032 # Create an authentication policy that requires the device to have a
4034 target_policy_sddl = self.allow_if(
4035 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4036 target_policy = self.create_authn_policy(
4037 enforced=True, computer_allowed_to=target_policy_sddl)
4039 # Create a user account.
4040 client_creds = self._get_creds(account_type=self.AccountType.USER)
4041 client_tgt = self.get_tgt(client_creds)
4043 # Create a target account with the assigned policy.
4044 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4045 assigned_policy=target_policy)
4047 # Show that authorization fails.
4049 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4051 expect_edata=self.expect_padata_outer,
4052 # We aren’t particular about whether or not we get an NTSTATUS.
4054 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4059 policy=target_policy,
4060 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4061 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4062 reason=AuditReason.ACCESS_DENIED)
4064 def test_pac_claims_present(self):
4065 """Test that authentication succeeds if the user has a required
4069 claim_id = 'the name of the claim'
4070 claim_value = 'the value of the claim'
4073 (claims.CLAIMS_SOURCE_TYPE_AD, [
4074 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4078 # Create a machine account with which to perform FAST.
4079 mach_creds = self.get_cached_creds(
4080 account_type=self.AccountType.COMPUTER,
4081 opts={'id': 'device'})
4082 mach_tgt = self.get_tgt(mach_creds)
4084 # Create an authentication policy that requires the user to have a
4086 target_policy_sddl = self.allow_if(
4087 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4088 target_policy = self.create_authn_policy(
4089 enforced=True, computer_allowed_to=target_policy_sddl)
4091 # Create a user account.
4092 client_creds = self._get_creds(account_type=self.AccountType.USER)
4093 client_tgt = self.get_tgt(client_creds)
4095 # Add the required claim to the client’s TGT.
4096 client_tgt = self.modified_ticket(
4098 modify_pac_fn=partial(self.set_pac_claims,
4099 client_claims=pac_claims),
4100 checksum_keys=self.get_krbtgt_checksum_key())
4102 # Create a target account with the assigned policy.
4103 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4104 assigned_policy=target_policy)
4106 # Show that authorization succeeds.
4107 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4109 self.check_tgs_log(client_creds, target_creds,
4110 policy=target_policy)
4112 def test_pac_claims_invalid(self):
4113 """Test that authentication fails if the device’s required claim is not
4117 claim_id = 'the name of the claim'
4118 claim_value = 'the value of the claim'
4121 (claims.CLAIMS_SOURCE_TYPE_AD, [
4122 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4126 # The device’s SIDs do not include the Claims Valid SID.
4128 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4129 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4132 # Create a machine account with which to perform FAST.
4133 mach_creds = self.get_cached_creds(
4134 account_type=self.AccountType.COMPUTER,
4135 opts={'id': 'device'})
4136 mach_tgt = self.get_tgt(mach_creds)
4138 # Create an authentication policy that requires the device to have a
4140 target_policy_sddl = self.allow_if(
4141 f'@User.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4142 target_policy = self.create_authn_policy(
4143 enforced=True, computer_allowed_to=target_policy_sddl)
4145 # Create a user account.
4146 client_creds = self._get_creds(account_type=self.AccountType.USER)
4147 client_tgt = self.get_tgt(client_creds)
4149 # Add the SIDs and the required claim to the client’s TGT.
4150 client_tgt = self.modified_ticket(
4153 partial(self.set_pac_claims, client_claims=pac_claims),
4154 partial(self.set_pac_sids, new_sids=device_sids)],
4155 checksum_keys=self.get_krbtgt_checksum_key())
4157 # Create a target account with the assigned policy.
4158 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4159 assigned_policy=target_policy)
4161 # Show that authorization fails.
4163 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4165 expect_edata=self.expect_padata_outer,
4166 # We aren’t particular about whether or not we get an NTSTATUS.
4168 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4173 policy=target_policy,
4174 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4175 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4176 reason=AuditReason.ACCESS_DENIED)
4178 def test_pac_device_claims_not_present(self):
4179 """Test that authorization fails if the device does not have a
4183 claim_id = 'the name of the claim'
4184 claim_value = 'the value of the claim'
4186 # Create a machine account with which to perform FAST.
4187 mach_creds = self.get_cached_creds(
4188 account_type=self.AccountType.COMPUTER,
4189 opts={'id': 'device'})
4190 mach_tgt = self.get_tgt(mach_creds)
4192 # Create an authentication policy that requires the device to have a
4193 # certain device claim.
4194 target_policy_sddl = self.allow_if(
4195 f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4196 target_policy = self.create_authn_policy(
4197 enforced=True, computer_allowed_to=target_policy_sddl)
4199 # Create a user account.
4200 client_creds = self._get_creds(account_type=self.AccountType.USER)
4201 client_tgt = self.get_tgt(client_creds)
4203 # Create a target account with the assigned policy.
4204 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4205 assigned_policy=target_policy)
4207 # Show that authorization fails.
4209 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4211 expect_edata=self.expect_padata_outer,
4212 # We aren’t particular about whether or not we get an NTSTATUS.
4214 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4219 policy=target_policy,
4220 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4221 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4222 reason=AuditReason.ACCESS_DENIED)
4224 def test_pac_device_claims_present(self):
4225 """Test that authorization succeeds if the device has a required claim.
4228 claim_id = 'the name of the claim'
4229 claim_value = 'the value of the claim'
4232 (claims.CLAIMS_SOURCE_TYPE_AD, [
4233 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4237 # Create a machine account with which to perform FAST.
4238 mach_creds = self.get_cached_creds(
4239 account_type=self.AccountType.COMPUTER,
4240 opts={'id': 'device'})
4241 mach_tgt = self.get_tgt(mach_creds)
4243 # Add the required claim to the machine account’s TGT.
4244 mach_tgt = self.modified_ticket(
4246 modify_pac_fn=partial(self.set_pac_claims,
4247 client_claims=pac_claims),
4248 checksum_keys=self.get_krbtgt_checksum_key())
4250 # Create an authentication policy that requires the device to have a
4251 # certain device claim.
4252 target_policy_sddl = self.allow_if(
4253 f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4254 target_policy = self.create_authn_policy(
4255 enforced=True, computer_allowed_to=target_policy_sddl)
4257 # Create a user account.
4258 client_creds = self._get_creds(account_type=self.AccountType.USER)
4259 client_tgt = self.get_tgt(client_creds)
4261 # Create a target account with the assigned policy.
4262 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4263 assigned_policy=target_policy)
4265 # Show that authorization succeeds.
4266 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4268 self.check_tgs_log(client_creds, target_creds,
4269 policy=target_policy)
4271 def test_pac_device_claims_invalid(self):
4272 """Test that authorization fails if the device’s required claim is not
4276 claim_id = 'the name of the claim'
4277 claim_value = 'the value of the claim'
4280 (claims.CLAIMS_SOURCE_TYPE_AD, [
4281 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4285 # The device’s SIDs do not include the Claims Valid SID.
4287 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4288 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4291 # Create a machine account with which to perform FAST.
4292 mach_creds = self.get_cached_creds(
4293 account_type=self.AccountType.COMPUTER,
4294 opts={'id': 'device'})
4295 mach_tgt = self.get_tgt(mach_creds)
4297 # Add the SIDs and the required claim to the machine account’s TGT.
4298 mach_tgt = self.modified_ticket(
4301 partial(self.set_pac_claims, client_claims=pac_claims),
4302 partial(self.set_pac_sids, new_sids=device_sids)],
4303 checksum_keys=self.get_krbtgt_checksum_key())
4305 # Create an authentication policy that requires the device to have a
4307 target_policy_sddl = self.allow_if(
4308 f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4309 target_policy = self.create_authn_policy(
4310 enforced=True, computer_allowed_to=target_policy_sddl)
4312 # Create a user account.
4313 client_creds = self._get_creds(account_type=self.AccountType.USER)
4314 client_tgt = self.get_tgt(client_creds)
4316 # Create a target account with the assigned policy.
4317 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4318 assigned_policy=target_policy)
4320 # Show that authorization fails.
4322 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4324 expect_edata=self.expect_padata_outer,
4325 # We aren’t particular about whether or not we get an NTSTATUS.
4327 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4332 policy=target_policy,
4333 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4334 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4335 reason=AuditReason.ACCESS_DENIED)
4337 def test_pac_device_claims_invalid_no_attrs(self):
4338 """Test that authorization fails if the device’s required claim is not
4342 claim_id = 'the name of the claim'
4343 claim_value = 'the value of the claim'
4346 (claims.CLAIMS_SOURCE_TYPE_AD, [
4347 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4352 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4353 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4354 # The device’s SIDs include the Claims Valid SID, but it has no
4356 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, 0),
4359 # Create a machine account with which to perform FAST.
4360 mach_creds = self.get_cached_creds(
4361 account_type=self.AccountType.COMPUTER,
4362 opts={'id': 'device'})
4363 mach_tgt = self.get_tgt(mach_creds)
4365 # Add the SIDs and the required claim to the machine account’s TGT.
4366 mach_tgt = self.modified_ticket(
4369 partial(self.set_pac_claims, client_claims=pac_claims),
4370 partial(self.set_pac_sids, new_sids=device_sids)],
4371 checksum_keys=self.get_krbtgt_checksum_key())
4373 # Create an authentication policy that requires the device to have a
4375 target_policy_sddl = self.allow_if(
4376 f'@Device.{self.escaped_claim_id(claim_id)} == "{claim_value}"')
4377 target_policy = self.create_authn_policy(
4378 enforced=True, computer_allowed_to=target_policy_sddl)
4380 # Create a user account.
4381 client_creds = self._get_creds(account_type=self.AccountType.USER)
4382 client_tgt = self.get_tgt(client_creds)
4384 # Create a target account with the assigned policy.
4385 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4386 assigned_policy=target_policy)
4388 # Show that authorization succeeds.
4389 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4391 self.check_tgs_log(client_creds, target_creds,
4392 policy=target_policy)
4394 def test_simple_as_req_client_and_target_policy(self):
4395 # Create a machine account with which to perform FAST.
4396 mach_creds = self.get_cached_creds(
4397 account_type=self.AccountType.COMPUTER)
4398 mach_tgt = self.get_tgt(mach_creds)
4400 # Create an authentication policy that explicitly allows the machine
4401 # account for a user.
4402 client_policy_sddl = f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};(Member_of {{SID({mach_creds.get_sid()}), SID({mach_creds.get_sid()})}}))'
4403 client_policy = self.create_authn_policy(enforced=True,
4404 user_allowed_from=client_policy_sddl)
4406 # Create a user account with the assigned policy.
4407 client_creds = self._get_creds(account_type=self.AccountType.USER,
4408 assigned_policy=client_policy)
4410 # Create an authentication policy that applies to a computer and
4411 # explicitly allows the user account to obtain a service ticket.
4412 target_policy_sddl = f'O:SYD:(XA;;CR;;;{client_creds.get_sid()};(Member_of SID({client_creds.get_sid()})))'
4413 target_policy = self.create_authn_policy(enforced=True,
4414 computer_allowed_to=target_policy_sddl)
4416 # Create a computer account with the assigned policy.
4417 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4418 assigned_policy=target_policy)
4421 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4422 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4423 (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY, SidType.EXTRA_SID, self.default_attrs),
4424 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
4427 # Show that obtaining a service ticket with an AS‐REQ is allowed.
4428 self._armored_as_req(client_creds,
4431 expected_groups=expected_groups)
4433 self.check_as_log(client_creds,
4434 armor_creds=mach_creds,
4435 client_policy=client_policy,
4436 server_policy=target_policy)
4438 def test_device_in_world_group(self):
4439 self._check_device_in_group(security.SID_WORLD)
4441 def test_device_in_network_group(self):
4442 self._check_device_not_in_group(security.SID_NT_NETWORK)
4444 def test_device_in_authenticated_users(self):
4445 self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
4447 def _check_device_in_group(self, group):
4448 self._check_device_membership(group, expect_in_group=True)
4450 def _check_device_not_in_group(self, group):
4451 self._check_device_membership(group, expect_in_group=False)
4453 def _check_device_membership(self, group, *, expect_in_group):
4454 """Test that authentication succeeds or fails when the device is
4455 required to belong to a certain group.
4458 # Create a machine account with which to perform FAST.
4459 mach_creds = self.get_cached_creds(
4460 account_type=self.AccountType.COMPUTER,
4461 opts={'id': 'device'})
4462 mach_tgt = self.get_tgt(mach_creds)
4464 # Create an authentication policy that requires the device to belong to
4466 in_group_sddl = self.allow_if(f'Device_Member_of {{SID({group})}}')
4467 in_group_policy = self.create_authn_policy(
4468 enforced=True, computer_allowed_to=in_group_sddl)
4470 # Create a user account.
4471 client_creds = self._get_creds(account_type=self.AccountType.USER)
4472 client_tgt = self.get_tgt(client_creds)
4474 # Create a target account with the assigned policy.
4475 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4476 assigned_policy=in_group_policy)
4478 tgs_success_args = {}
4479 tgs_failure_args = {
4480 'expect_edata': self.expect_padata_outer,
4481 # We aren’t particular about whether or not we get an NTSTATUS.
4482 'expect_status': None,
4483 'expected_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4486 # Test whether authorization succeeds or fails.
4487 self._tgs_req(client_tgt,
4488 0 if expect_in_group else KDC_ERR_POLICY,
4492 **(tgs_success_args if expect_in_group
4493 else tgs_failure_args))
4495 policy_success_args = {}
4496 policy_failure_args = {
4497 'status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4498 'event': AuditEvent.KERBEROS_SERVER_RESTRICTION,
4499 'reason': AuditReason.ACCESS_DENIED,
4502 self.check_tgs_log(client_creds, target_creds,
4503 policy=in_group_policy,
4504 **(policy_success_args if expect_in_group
4505 else policy_failure_args))
4507 # Create an authentication policy that requires the device not to belong
4509 not_in_group_sddl = self.allow_if(
4510 f'Not_Device_Member_of {{SID({group})}}')
4511 not_in_group_policy = self.create_authn_policy(
4512 enforced=True, computer_allowed_to=not_in_group_sddl)
4514 # Create a target account with the assigned policy.
4515 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4516 assigned_policy=not_in_group_policy)
4518 # Test whether authorization succeeds or fails.
4519 self._tgs_req(client_tgt,
4520 KDC_ERR_POLICY if expect_in_group else 0,
4524 **(tgs_failure_args if expect_in_group
4525 else tgs_success_args))
4527 self.check_tgs_log(client_creds, target_creds,
4528 policy=not_in_group_policy,
4529 **(policy_failure_args if expect_in_group
4530 else policy_success_args))
4532 def test_simple_as_req_client_policy_only(self):
4533 # Create a machine account with which to perform FAST.
4534 mach_creds = self.get_cached_creds(
4535 account_type=self.AccountType.COMPUTER)
4536 mach_tgt = self.get_tgt(mach_creds)
4538 # Create an authentication policy that explicitly allows the machine
4539 # account for a user.
4540 client_policy_sddl = f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};(Member_of SID({mach_creds.get_sid()})))'
4541 client_policy = self.create_authn_policy(enforced=True,
4542 user_allowed_from=client_policy_sddl)
4544 # Create a user account with the assigned policy.
4545 client_creds = self._get_creds(account_type=self.AccountType.USER,
4546 assigned_policy=client_policy)
4549 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4550 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4551 (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY, SidType.EXTRA_SID, self.default_attrs),
4552 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
4555 # FIXME: we need to pass this parameter only because Samba doesn’t
4556 # handle ‘krbtgt@REALM’ principals correctly (see
4557 # https://bugzilla.samba.org/show_bug.cgi?id=15482).
4558 krbtgt_sname = self.get_krbtgt_sname()
4560 # Show that obtaining a service ticket with an AS‐REQ is allowed.
4561 self._armored_as_req(client_creds,
4562 self.get_krbtgt_creds(),
4564 target_sname=krbtgt_sname,
4565 expected_groups=expected_groups)
4567 self.check_as_log(client_creds,
4568 armor_creds=mach_creds,
4569 client_policy=client_policy)
4572 if __name__ == '__main__':
4573 global_asn1_print = False
4574 global_hexdump = False