reason=AuditReason.ACCESS_DENIED,
status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
+ def test_device_in_world_group(self):
+ self._check_device_in_group(security.SID_WORLD)
+
+ def test_device_in_network_group(self):
+ self._check_device_not_in_group(security.SID_NT_NETWORK)
+
+ def test_device_in_authenticated_users(self):
+ self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
+
+ def _check_device_in_group(self, group):
+ self._check_device_membership(group, expect_in_group=True)
+
+ def _check_device_not_in_group(self, group):
+ self._check_device_membership(group, expect_in_group=False)
+
+ def _check_device_membership(self, group, *, expect_in_group):
+ """Test that authentication succeeds or fails when the device is
+ required to belong to a certain group.
+ """
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 'device'})
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that requires the device to belong to
+ # a certain group.
+ in_group_sddl = self.allow_if(f'Member_of {{SID({group})}}')
+ in_group_policy = self.create_authn_policy(
+ enforced=True, user_allowed_from=in_group_sddl)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=in_group_policy)
+
+ krbtgt_creds = self.get_krbtgt_creds()
+
+ # FIXME: we need to pass this parameter only because Samba doesn’t
+ # handle ‘krbtgt@REALM’ principals correctly (see
+ # https://bugzilla.samba.org/show_bug.cgi?id=15482).
+ krbtgt_sname = self.get_krbtgt_sname()
+
+ # Test whether authentication succeeds or fails.
+ self._armored_as_req(
+ client_creds,
+ krbtgt_creds,
+ mach_tgt,
+ target_sname=krbtgt_sname,
+ expected_error=0 if expect_in_group else KDC_ERR_POLICY)
+
+ policy_success_args = {}
+ policy_failure_args = {
+ 'client_policy_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ 'event': AuditEvent.KERBEROS_DEVICE_RESTRICTION,
+ 'reason': AuditReason.ACCESS_DENIED,
+ 'status': ntstatus.NT_STATUS_INVALID_WORKSTATION,
+ }
+
+ self.check_as_log(client_creds,
+ armor_creds=mach_creds,
+ client_policy=in_group_policy,
+ **(policy_success_args if expect_in_group
+ else policy_failure_args))
+
+ # Create an authentication policy that requires the device not to belong
+ # to the group.
+ not_in_group_sddl = self.allow_if(f'Not_Member_of {{SID({group})}}')
+ not_in_group_policy = self.create_authn_policy(
+ enforced=True, user_allowed_from=not_in_group_sddl)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=not_in_group_policy)
+
+ # Test whether authentication succeeds or fails.
+ self._armored_as_req(
+ client_creds,
+ krbtgt_creds,
+ mach_tgt,
+ target_sname=krbtgt_sname,
+ expected_error=KDC_ERR_POLICY if expect_in_group else 0)
+
+ self.check_as_log(client_creds,
+ armor_creds=mach_creds,
+ client_policy=not_in_group_policy,
+ **(policy_failure_args if expect_in_group
+ else policy_success_args))
+
class TgsReqServicePolicyTests(ConditionalAceBaseTests):
def test_pac_groups_not_present(self):
client_policy=client_policy,
server_policy=target_policy)
+ def test_device_in_world_group(self):
+ self._check_device_in_group(security.SID_WORLD)
+
+ def test_device_in_network_group(self):
+ self._check_device_not_in_group(security.SID_NT_NETWORK)
+
+ def test_device_in_authenticated_users(self):
+ self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
+
+ def _check_device_in_group(self, group):
+ self._check_device_membership(group, expect_in_group=True)
+
+ def _check_device_not_in_group(self, group):
+ self._check_device_membership(group, expect_in_group=False)
+
+ def _check_device_membership(self, group, *, expect_in_group):
+ """Test that authentication succeeds or fails when the device is
+ required to belong to a certain group.
+ """
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 'device'})
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that requires the device to belong to
+ # a certain group.
+ in_group_sddl = self.allow_if(f'Device_Member_of {{SID({group})}}')
+ in_group_policy = self.create_authn_policy(
+ enforced=True, computer_allowed_to=in_group_sddl)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER)
+ client_tgt = self.get_tgt(client_creds)
+
+ # Create a target account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=in_group_policy)
+
+ tgs_success_args = {}
+ tgs_failure_args = {
+ 'expect_edata': self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ 'expect_status': None,
+ 'expected_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ }
+
+ # Test whether authorization succeeds or fails.
+ self._tgs_req(client_tgt,
+ 0 if expect_in_group else KDC_ERR_POLICY,
+ client_creds,
+ target_creds,
+ armor_tgt=mach_tgt,
+ **(tgs_success_args if expect_in_group
+ else tgs_failure_args))
+
+ policy_success_args = {}
+ policy_failure_args = {
+ 'status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ 'event': AuditEvent.KERBEROS_SERVER_RESTRICTION,
+ 'reason': AuditReason.ACCESS_DENIED,
+ }
+
+ self.check_tgs_log(client_creds, target_creds,
+ policy=in_group_policy,
+ **(policy_success_args if expect_in_group
+ else policy_failure_args))
+
+ # Create an authentication policy that requires the device not to belong
+ # to the group.
+ not_in_group_sddl = self.allow_if(
+ f'Not_Device_Member_of {{SID({group})}}')
+ not_in_group_policy = self.create_authn_policy(
+ enforced=True, computer_allowed_to=not_in_group_sddl)
+
+ # Create a target account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=not_in_group_policy)
+
+ # Test whether authorization succeeds or fails.
+ self._tgs_req(client_tgt,
+ KDC_ERR_POLICY if expect_in_group else 0,
+ client_creds,
+ target_creds,
+ armor_tgt=mach_tgt,
+ **(tgs_failure_args if expect_in_group
+ else tgs_success_args))
+
+ self.check_tgs_log(client_creds, target_creds,
+ policy=not_in_group_policy,
+ **(policy_failure_args if expect_in_group
+ else policy_success_args))
+
def test_simple_as_req_client_policy_only(self):
# Create a machine account with which to perform FAST.
mach_creds = self.get_cached_creds(