CVE-2021-20251 tests/krb5: Add tests for password lockout race
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Mon, 4 Jul 2022 08:48:48 +0000 (20:48 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 12 Sep 2022 23:07:37 +0000 (23:07 +0000)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14611

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/lockout_tests.py [new file with mode: 0755]
python/samba/tests/krb5/raw_testcase.py
python/samba/tests/krb5/rfc4120_constants.py
python/samba/tests/usage.py
selftest/flapping.d/ldap-pwd-change-race [new file with mode: 0644]
selftest/knownfail_heimdal_kdc
selftest/knownfail_mit_kdc
source4/selftest/tests.py

diff --git a/python/samba/tests/krb5/lockout_tests.py b/python/samba/tests/krb5/lockout_tests.py
new file mode 100755 (executable)
index 0000000..e49e82a
--- /dev/null
@@ -0,0 +1,1088 @@
+#!/usr/bin/env python3
+# Unix SMB/CIFS implementation.
+# Copyright (C) Stefan Metzmacher 2020
+# Copyright (C) Catalyst.Net Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+from concurrent import futures
+from enum import Enum
+from functools import partial
+from multiprocessing import Pipe
+import os
+import sys
+import time
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.ciphers.base import Cipher
+from cryptography.hazmat.primitives.ciphers import algorithms
+
+import ldb
+
+from samba import (
+    NTSTATUSError,
+    dsdb,
+    generate_random_bytes,
+    generate_random_password,
+    ntstatus,
+    unix2nttime,
+    werror,
+)
+from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
+from samba.crypto import (
+    aead_aes_256_cbc_hmac_sha512_blob,
+    des_crypt_blob_16,
+    md4_hash_blob,
+    sha512_pbkdf2,
+)
+from samba.dcerpc import lsa, samr
+from samba.samdb import SamDB
+
+from samba.tests import connect_samdb, env_get_var_value, env_loadparm
+
+from samba.tests.krb5.as_req_tests import AsReqBaseTest
+from samba.tests.krb5 import kcrypto
+from samba.tests.krb5.kdc_base_test import KDCBaseTest
+from samba.tests.krb5.raw_testcase import KerberosCredentials
+import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
+from samba.tests.krb5.rfc4120_constants import (
+    KDC_ERR_CLIENT_REVOKED,
+    KDC_ERR_PREAUTH_FAILED,
+    KRB_AS_REP,
+    KRB_ERROR,
+    NT_PRINCIPAL,
+    NT_SRV_INST,
+)
+
+sys.path.insert(0, 'bin/python')
+os.environ['PYTHONUNBUFFERED'] = '1'
+
+global_asn1_print = False
+global_hexdump = False
+
+
+class ConnectionResult(Enum):
+    LOCKED_OUT = 1
+    WRONG_PASSWORD = 2
+    SUCCESS = 3
+
+
+def connect_kdc(pipe,
+                url,
+                hostname,
+                username,
+                password,
+                domain,
+                realm,
+                workstation,
+                dn,
+                expect_error=True):
+    AsReqBaseTest.setUpClass()
+    as_req_base = AsReqBaseTest()
+    as_req_base.setUp()
+
+    user_creds = KerberosCredentials()
+    user_creds.set_username(username)
+    user_creds.set_password(password)
+    user_creds.set_domain(domain)
+    user_creds.set_realm(realm)
+    user_creds.set_workstation(workstation)
+    user_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+    user_name = user_creds.get_username()
+    cname = as_req_base.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                             names=user_name.split('/'))
+
+    krbtgt_creds = as_req_base.get_krbtgt_creds()
+    krbtgt_supported_etypes = krbtgt_creds.tgs_supported_enctypes
+    realm = krbtgt_creds.get_realm()
+
+    krbtgt_account = krbtgt_creds.get_username()
+    sname = as_req_base.PrincipalName_create(name_type=NT_SRV_INST,
+                                             names=[krbtgt_account, realm])
+
+    expected_salt = user_creds.get_salt()
+
+    till = as_req_base.get_KerberosTime(offset=36000)
+
+    kdc_options = krb5_asn1.KDCOptions('postdated')
+
+    preauth_key = as_req_base.PasswordKey_from_creds(user_creds,
+                                                     kcrypto.Enctype.AES256)
+
+    ts_enc_padata = as_req_base.get_enc_timestamp_pa_data_from_key(preauth_key)
+    padata = [ts_enc_padata]
+
+    krbtgt_decryption_key = (
+        as_req_base.TicketDecryptionKey_from_creds(krbtgt_creds))
+
+    etypes = as_req_base.get_default_enctypes()
+
+    if expect_error:
+        expected_error_modes = (KDC_ERR_CLIENT_REVOKED,
+                                KDC_ERR_PREAUTH_FAILED)
+    else:
+        expected_error_modes = 0
+
+    # Remove the LDAP connection.
+    del type(as_req_base)._ldb
+
+    # Indicate that we're ready. This ensures we hit the right transaction
+    # lock.
+    pipe.send_bytes(b'0')
+
+    # Wait for the main process to take out a transaction lock.
+    if not pipe.poll(timeout=5):
+        raise AssertionError('main process failed to indicate readiness')
+
+    # Try making a Kerberos AS-REQ to the KDC. This should fail, either due to
+    # the user's account being locked out or due to using the wrong password.
+    as_rep, kdc_exchange_dict = as_req_base._test_as_exchange(
+        cname=cname,
+        realm=realm,
+        sname=sname,
+        till=till,
+        client_as_etypes=etypes,
+        expected_error_mode=expected_error_modes,
+        expected_crealm=realm,
+        expected_cname=cname,
+        expected_srealm=realm,
+        expected_sname=sname,
+        expected_salt=expected_salt,
+        etypes=etypes,
+        padata=padata,
+        kdc_options=kdc_options,
+        expected_supported_etypes=krbtgt_supported_etypes,
+        expected_account_name=user_name,
+        preauth_key=preauth_key,
+        ticket_decryption_key=krbtgt_decryption_key,
+        pac_request=True)
+    as_req_base.assertIsNotNone(as_rep)
+
+    msg_type = as_rep['msg-type']
+    if expect_error and msg_type != KRB_ERROR or (
+            not expect_error and msg_type != KRB_AS_REP):
+        raise AssertionError(f'wrong message type {msg_type}')
+
+    if not expect_error:
+        return ConnectionResult.SUCCESS
+
+    error_code = as_rep['error-code']
+    if error_code == KDC_ERR_CLIENT_REVOKED:
+        return ConnectionResult.LOCKED_OUT
+    elif error_code == KDC_ERR_PREAUTH_FAILED:
+        return ConnectionResult.WRONG_PASSWORD
+    else:
+        raise AssertionError(f'wrong error code {error_code}')
+
+
+def connect_ntlm(pipe,
+                 url,
+                 hostname,
+                 username,
+                 password,
+                 domain,
+                 realm,
+                 workstation,
+                 dn):
+    user_creds = KerberosCredentials()
+    user_creds.set_username(username)
+    user_creds.set_password(password)
+    user_creds.set_domain(domain)
+    user_creds.set_workstation(workstation)
+    user_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+    # Indicate that we're ready. This ensures we hit the right transaction
+    # lock.
+    pipe.send_bytes(b'0')
+
+    # Wait for the main process to take out a transaction lock.
+    if not pipe.poll(timeout=5):
+        raise AssertionError('main process failed to indicate readiness')
+
+    try:
+        # Try connecting to SamDB. This should fail, either due to our
+        # account being locked out or due to using the wrong password.
+        SamDB(url=url,
+              credentials=user_creds,
+              lp=env_loadparm())
+    except ldb.LdbError as err:
+        num, estr = err.args
+
+        if num != ldb.ERR_INVALID_CREDENTIALS:
+            raise AssertionError(f'connection raised wrong error code '
+                                 f'({err})')
+
+        if f'data {werror.WERR_ACCOUNT_LOCKED_OUT:x},' in estr:
+            return ConnectionResult.LOCKED_OUT
+        elif f'data {werror.WERR_LOGON_FAILURE:x},' in estr:
+            return ConnectionResult.WRONG_PASSWORD
+        else:
+            raise AssertionError(f'connection raised wrong error code '
+                                 f'({estr})')
+    else:
+        return ConnectionResult.SUCCESS
+
+
+def connect_samr(pipe,
+                 url,
+                 hostname,
+                 username,
+                 password,
+                 domain,
+                 realm,
+                 workstation,
+                 dn):
+    # Get the user's NT hash.
+    user_creds = KerberosCredentials()
+    user_creds.set_password(password)
+    nt_hash = user_creds.get_nt_hash()
+
+    # Generate a new UTF-16 password.
+    new_password = generate_random_password(32, 32)
+    new_password = new_password.encode('utf-16le')
+
+    # Generate the MD4 hash of the password.
+    new_password_md4 = md4_hash_blob(new_password)
+
+    # Prefix the password with padding so it is 512 bytes long.
+    new_password_len = len(new_password)
+    remaining_len = 512 - new_password_len
+    new_password = bytes(remaining_len) + new_password
+
+    # Append the 32-bit length of the password..
+    new_password += int.to_bytes(new_password_len,
+                                 length=4,
+                                 byteorder='little')
+
+    # Encrypt the password with RC4 and the existing NT hash.
+    encryptor = Cipher(algorithms.ARC4(nt_hash),
+                       None,
+                       default_backend()).encryptor()
+    new_password = encryptor.update(new_password)
+
+    # Create a key from the MD4 hash of the new password.
+    key = new_password_md4[:14]
+
+    # Encrypt the old NT hash with DES to obtain the verifier.
+    verifier = des_crypt_blob_16(nt_hash, key)
+
+    server = lsa.String()
+    server.string = hostname
+
+    account = lsa.String()
+    account.string = username
+
+    nt_password = samr.CryptPassword()
+    nt_password.data = list(new_password)
+
+    nt_verifier = samr.Password()
+    nt_verifier.hash = list(verifier)
+
+    conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
+
+    # Indicate that we're ready. This ensures we hit the right transaction
+    # lock.
+    pipe.send_bytes(b'0')
+
+    # Wait for the main process to take out a transaction lock.
+    if not pipe.poll(timeout=5):
+        raise AssertionError('main process failed to indicate readiness')
+
+    try:
+        # Try changing the password. This should fail, either due to our
+        # account being locked out or due to using the wrong password.
+        conn.ChangePasswordUser3(server=server,
+                                 account=account,
+                                 nt_password=nt_password,
+                                 nt_verifier=nt_verifier,
+                                 lm_change=True,
+                                 lm_password=None,
+                                 lm_verifier=None,
+                                 password3=None)
+    except NTSTATUSError as err:
+        num, estr = err.args
+
+        if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
+            return ConnectionResult.LOCKED_OUT
+        elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
+            return ConnectionResult.WRONG_PASSWORD
+        else:
+            raise AssertionError(f'pwd change raised wrong error code '
+                                 f'({num:08X})')
+    else:
+        return ConnectionResult.SUCCESS
+
+
+def connect_samr_aes(pipe,
+                     url,
+                     hostname,
+                     username,
+                     password,
+                     domain,
+                     realm,
+                     workstation,
+                     dn):
+    # Get the user's NT hash.
+    user_creds = KerberosCredentials()
+    user_creds.set_password(password)
+    nt_hash = user_creds.get_nt_hash()
+
+    # Generate a new UTF-16 password.
+    new_password = generate_random_password(32, 32)
+    new_password = new_password.encode('utf-16le')
+
+    # Prepend the 16-bit length of the password..
+    new_password_len = int.to_bytes(len(new_password),
+                                    length=2,
+                                    byteorder='little')
+    new_password = new_password_len + new_password
+
+    server = lsa.String()
+    server.string = hostname
+
+    account = lsa.String()
+    account.string = username
+
+    # Derive a key from the user's NT hash.
+    iv = generate_random_bytes(16)
+    iterations = 5555
+    cek = sha512_pbkdf2(nt_hash, iv, iterations)
+
+    enc_key_salt = (b'Microsoft SAM encryption key '
+                    b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
+    mac_key_salt = (b'Microsoft SAM MAC key '
+                    b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
+
+    # Encrypt the new password.
+    ciphertext, auth_data = aead_aes_256_cbc_hmac_sha512_blob(new_password,
+                                                              cek,
+                                                              enc_key_salt,
+                                                              mac_key_salt,
+                                                              iv)
+
+    # Create the new password structure
+    pwd_buf = samr.EncryptedPasswordAES()
+    pwd_buf.auth_data = list(auth_data)
+    pwd_buf.salt = list(iv)
+    pwd_buf.cipher_len = len(ciphertext)
+    pwd_buf.cipher = list(ciphertext)
+    pwd_buf.PBKDF2Iterations = iterations
+
+    conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
+
+    # Indicate that we're ready. This ensures we hit the right transaction
+    # lock.
+    pipe.send_bytes(b'0')
+
+    # Wait for the main process to take out a transaction lock.
+    if not pipe.poll(timeout=5):
+        raise AssertionError('main process failed to indicate readiness')
+
+    try:
+        # Try changing the password. This should fail, either due to our
+        # account being locked out or due to using the wrong password.
+        conn.ChangePasswordUser4(server=server,
+                                 account=account,
+                                 password=pwd_buf)
+    except NTSTATUSError as err:
+        num, estr = err.args
+
+        if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
+            return ConnectionResult.LOCKED_OUT
+        elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
+            return ConnectionResult.WRONG_PASSWORD
+        else:
+            raise AssertionError(f'pwd change raised wrong error code '
+                                 f'({num:08X})')
+    else:
+        return ConnectionResult.SUCCESS
+
+
+def ldap_pwd_change(pipe,
+                    url,
+                    hostname,
+                    username,
+                    password,
+                    domain,
+                    realm,
+                    workstation,
+                    dn):
+    lp = env_loadparm()
+
+    admin_creds = KerberosCredentials()
+    admin_creds.guess(lp)
+    admin_creds.set_username(env_get_var_value('ADMIN_USERNAME'))
+    admin_creds.set_password(env_get_var_value('ADMIN_PASSWORD'))
+    admin_creds.set_kerberos_state(MUST_USE_KERBEROS)
+
+    samdb = SamDB(url=url,
+                  credentials=admin_creds,
+                  lp=lp)
+
+    old_utf16pw = f'"{password}"'.encode('utf-16le')
+
+    new_password = generate_random_password(32, 32)
+    new_utf16pw = f'"{new_password}"'.encode('utf-16le')
+
+    msg = ldb.Message(ldb.Dn(samdb, dn))
+    msg['0'] = ldb.MessageElement(old_utf16pw,
+                                  ldb.FLAG_MOD_DELETE,
+                                  'unicodePwd')
+    msg['1'] = ldb.MessageElement(new_utf16pw,
+                                  ldb.FLAG_MOD_ADD,
+                                  'unicodePwd')
+
+    # Indicate that we're ready. This ensures we hit the right transaction
+    # lock.
+    pipe.send_bytes(b'0')
+
+    # Wait for the main process to take out a transaction lock.
+    if not pipe.poll(timeout=5):
+        raise AssertionError('main process failed to indicate readiness')
+
+    # Try changing the user's password. This should fail, either due to the
+    # user's account being locked out or due to specifying the wrong password.
+    try:
+        samdb.modify(msg)
+    except ldb.LdbError as err:
+        num, estr = err.args
+        if num != ldb.ERR_CONSTRAINT_VIOLATION:
+            raise AssertionError(f'pwd change raised wrong error code ({err})')
+
+        if f'<{werror.WERR_ACCOUNT_LOCKED_OUT:08X}:' in estr:
+            return ConnectionResult.LOCKED_OUT
+        elif f'<{werror.WERR_INVALID_PASSWORD:08X}:' in estr:
+            return ConnectionResult.WRONG_PASSWORD
+        else:
+            raise AssertionError(f'pwd change raised wrong error code '
+                                 f'({estr})')
+    else:
+        return ConnectionResult.SUCCESS
+
+
+class LockoutTests(KDCBaseTest):
+
+    def setUp(self):
+        super().setUp()
+        self.do_asn1_print = global_asn1_print
+        self.do_hexdump = global_hexdump
+
+        samdb = self.get_samdb()
+        base_dn = ldb.Dn(samdb, samdb.domain_dn())
+
+        def modify_attr(attr, value):
+            if value is None:
+                value = []
+                flag = ldb.FLAG_MOD_DELETE
+            else:
+                value = str(value)
+                flag = ldb.FLAG_MOD_REPLACE
+
+                msg = ldb.Message(base_dn)
+                msg[attr] = ldb.MessageElement(
+                    value, flag, attr)
+                samdb.modify(msg)
+
+        res = samdb.search(base_dn,
+                           scope=ldb.SCOPE_BASE,
+                           attrs=['lockoutDuration',
+                                  'lockoutThreshold',
+                                  'msDS-LogonTimeSyncInterval'])
+        self.assertEqual(1, len(res))
+
+        # Reset the lockout duration as it was before.
+        lockout_duration = res[0].get('lockoutDuration', idx=0)
+        self.addCleanup(modify_attr, 'lockoutDuration', lockout_duration)
+
+        # Set the new lockout duration: locked out accounts now stay locked
+        # out.
+        modify_attr('lockoutDuration', 0)
+
+        # Reset the lockout threshold as it was before.
+        lockout_threshold = res[0].get('lockoutThreshold', idx=0)
+        self.addCleanup(modify_attr, 'lockoutThreshold', lockout_threshold)
+
+        # Set the new lockout threshold.
+        self.lockout_threshold = 3
+        modify_attr('lockoutThreshold', self.lockout_threshold)
+
+        # Reset the logon time sync interval as it was before.
+        sync_interval = res[0].get('msDS-LogonTimeSyncInterval', idx=0)
+        self.addCleanup(modify_attr,
+                        'msDS-LogonTimeSyncInterval',
+                        sync_interval)
+
+        # Set the new logon time sync interval. Setting it to 0 eliminates the
+        # need for this attribute to be updated on logon, and thus the
+        # requirement to take out a transaction.
+        modify_attr('msDS-LogonTimeSyncInterval', 0)
+
+        # Get the old 'minPwdAge'.
+        minPwdAge = samdb.get_minPwdAge()
+
+        # Reset the 'minPwdAge' as it was before.
+        self.addCleanup(samdb.set_minPwdAge, minPwdAge)
+
+        # Set it temporarily to '0'.
+        samdb.set_minPwdAge('0')
+
+    def assertLocalSamDB(self, samdb):
+        if samdb.url.startswith('tdb://'):
+            return
+        if samdb.url.startswith('mdb://'):
+            return
+
+        self.fail(f'connection to {samdb.url} is not local!')
+
+    def wait_for_ready(self, pipe, future):
+        if pipe.poll(timeout=5):
+            return
+
+        # We failed to read a response from the pipe, so see if the test raised
+        # an exception with more information.
+        if future.done():
+            exception = future.exception(timeout=0)
+            if exception is not None:
+                raise exception
+
+        self.fail('test failed to indicate readiness')
+
+    def test_lockout_transaction_kdc(self):
+        self.do_lockout_transaction(connect_kdc)
+
+    def test_lockout_transaction_ntlm(self):
+        self.do_lockout_transaction(connect_ntlm)
+
+    def test_lockout_transaction_samr(self):
+        self.do_lockout_transaction(connect_samr)
+
+    def test_lockout_transaction_samr_aes(self):
+        if not self.gnutls_pbkdf2_support:
+            self.skipTest('gnutls_pbkdf2() is not available')
+        self.do_lockout_transaction(connect_samr_aes)
+
+    def test_lockout_transaction_ldap_pw_change(self):
+        self.do_lockout_transaction(ldap_pwd_change)
+
+    # Tests to ensure we can handle the account being renamed. We do not test
+    # renames with SAMR password changes, because in that case the entire
+    # process happens inside a transaction, and the password change method only
+    # receives the account username. By the time it searches for the account,
+    # it will have already been renamed, and so it will always fail to find the
+    # account.
+
+    def test_lockout_transaction_rename_kdc(self):
+        self.do_lockout_transaction(connect_kdc, rename=True)
+
+    def test_lockout_transaction_rename_ntlm(self):
+        self.do_lockout_transaction(connect_ntlm, rename=True)
+
+    def test_lockout_transaction_rename_ldap_pw_change(self):
+        self.do_lockout_transaction(ldap_pwd_change, rename=True)
+
+    def test_lockout_transaction_bad_pwd_kdc(self):
+        self.do_lockout_transaction(connect_kdc, correct_pw=False)
+
+    def test_lockout_transaction_bad_pwd_ntlm(self):
+        self.do_lockout_transaction(connect_ntlm, correct_pw=False)
+
+    def test_lockout_transaction_bad_pwd_samr(self):
+        self.do_lockout_transaction(connect_samr, correct_pw=False)
+
+    def test_lockout_transaction_bad_pwd_samr_aes(self):
+        if not self.gnutls_pbkdf2_support:
+            self.skipTest('gnutls_pbkdf2() is not available')
+        self.do_lockout_transaction(connect_samr_aes, correct_pw=False)
+
+    def test_lockout_transaction_bad_pwd_ldap_pw_change(self):
+        self.do_lockout_transaction(ldap_pwd_change, correct_pw=False)
+
+    def test_bad_pwd_count_transaction_kdc(self):
+        self.do_bad_pwd_count_transaction(connect_kdc)
+
+    def test_bad_pwd_count_transaction_ntlm(self):
+        self.do_bad_pwd_count_transaction(connect_ntlm)
+
+    def test_bad_pwd_count_transaction_samr(self):
+        self.do_bad_pwd_count_transaction(connect_samr)
+
+    def test_bad_pwd_count_transaction_samr_aes(self):
+        if not self.gnutls_pbkdf2_support:
+            self.skipTest('gnutls_pbkdf2() is not available')
+        self.do_bad_pwd_count_transaction(connect_samr_aes)
+
+    def test_bad_pwd_count_transaction_ldap_pw_change(self):
+        self.do_bad_pwd_count_transaction(ldap_pwd_change)
+
+    def test_bad_pwd_count_transaction_rename_kdc(self):
+        self.do_bad_pwd_count_transaction(connect_kdc, rename=True)
+
+    def test_bad_pwd_count_transaction_rename_ntlm(self):
+        self.do_bad_pwd_count_transaction(connect_ntlm, rename=True)
+
+    def test_bad_pwd_count_transaction_rename_ldap_pw_change(self):
+        self.do_bad_pwd_count_transaction(ldap_pwd_change, rename=True)
+
+    def test_lockout_race_kdc(self):
+        self.do_lockout_race(connect_kdc)
+
+    def test_lockout_race_ntlm(self):
+        self.do_lockout_race(connect_ntlm)
+
+    def test_lockout_race_samr(self):
+        self.do_lockout_race(connect_samr)
+
+    def test_lockout_race_samr_aes(self):
+        if not self.gnutls_pbkdf2_support:
+            self.skipTest('gnutls_pbkdf2() is not available')
+        self.do_lockout_race(connect_samr_aes)
+
+    def test_lockout_race_ldap_pw_change(self):
+        self.do_lockout_race(ldap_pwd_change)
+
+    def test_logon_without_transaction_ntlm(self):
+        self.do_logon_without_transaction(connect_ntlm)
+
+    # Tests to ensure that the connection functions work correctly in the happy
+    # path.
+
+    def test_logon_kdc(self):
+        self.do_logon(partial(connect_kdc, expect_error=False))
+
+    def test_logon_ntlm(self):
+        self.do_logon(connect_ntlm)
+
+    def test_logon_samr(self):
+        self.do_logon(connect_samr)
+
+    def test_logon_samr_aes(self):
+        if not self.gnutls_pbkdf2_support:
+            self.skipTest('gnutls_pbkdf2() is not available')
+        self.do_logon(connect_samr_aes)
+
+    def test_logon_ldap_pw_change(self):
+        self.do_logon(ldap_pwd_change)
+
+    # Test that connection without a correct password works.
+    def do_logon(self, connect_fn):
+        # Create the user account for testing.
+        user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+                                           use_cache=False)
+        user_dn = user_creds.get_dn()
+
+        admin_creds = self.get_admin_creds()
+        lp = self.get_lp()
+
+        # Get a connection to our local SamDB.
+        samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+                              credentials=admin_creds)
+        self.assertLocalSamDB(samdb)
+
+        password = user_creds.get_password()
+
+        # Prepare to connect to the server with a valid password.
+        our_pipe, their_pipe = Pipe(duplex=True)
+
+        # Inform the test function that it may proceed.
+        our_pipe.send_bytes(b'0')
+
+        result = connect_fn(pipe=their_pipe,
+                            url=f'ldap://{samdb.host_dns_name()}',
+                            hostname=samdb.host_dns_name(),
+                            username=user_creds.get_username(),
+                            password=password,
+                            domain=user_creds.get_domain(),
+                            realm=user_creds.get_realm(),
+                            workstation=user_creds.get_workstation(),
+                            dn=str(user_dn))
+
+        # The connection should succeed.
+        self.assertEqual(result, ConnectionResult.SUCCESS)
+
+    # Lock out the account while holding a transaction lock, then release the
+    # lock. A logon attempt already in progress should reread the account
+    # details and recognise the account is locked out. The account can
+    # additionally be renamed within the transaction to ensure that, by using
+    # the GUID, rereading the account's details still succeeds.
+    def do_lockout_transaction(self, connect_fn,
+                               rename=False,
+                               correct_pw=True):
+        # Create the user account for testing.
+        user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+                                           use_cache=False)
+        user_dn = user_creds.get_dn()
+
+        admin_creds = self.get_admin_creds()
+        lp = self.get_lp()
+
+        # Get a connection to our local SamDB.
+        samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+                              credentials=admin_creds)
+        self.assertLocalSamDB(samdb)
+
+        password = user_creds.get_password()
+        if not correct_pw:
+            password = password[:-1]
+
+        # Prepare to connect to the server.
+        with futures.ProcessPoolExecutor(max_workers=1) as executor:
+            our_pipe, their_pipe = Pipe(duplex=True)
+            connect_future = executor.submit(
+                connect_fn,
+                pipe=their_pipe,
+                url=f'ldap://{samdb.host_dns_name()}',
+                hostname=samdb.host_dns_name(),
+                username=user_creds.get_username(),
+                password=password,
+                domain=user_creds.get_domain(),
+                realm=user_creds.get_realm(),
+                workstation=user_creds.get_workstation(),
+                dn=str(user_dn))
+
+            # Wait until the test process indicates it's ready.
+            self.wait_for_ready(our_pipe, connect_future)
+
+            # Take out a transaction.
+            samdb.transaction_start()
+            try:
+                # Lock out the account. We must do it using an actual password
+                # check like so, rather than directly with a database
+                # modification, so that the account is also added to the
+                # auxiliary bad password database.
+
+                old_utf16pw = f'"Secret007"'.encode('utf-16le')  # invalid pwd
+                new_utf16pw = f'"Secret008"'.encode('utf-16le')
+
+                msg = ldb.Message(user_dn)
+                msg['0'] = ldb.MessageElement(old_utf16pw,
+                                              ldb.FLAG_MOD_DELETE,
+                                              'unicodePwd')
+                msg['1'] = ldb.MessageElement(new_utf16pw,
+                                              ldb.FLAG_MOD_ADD,
+                                              'unicodePwd')
+
+                for i in range(self.lockout_threshold):
+                    try:
+                        samdb.modify(msg)
+                    except ldb.LdbError as err:
+                        num, estr = err.args
+
+                        # We get an error, but the bad password count should
+                        # still be updated.
+                        self.assertEqual(num, ldb.ERR_OPERATIONS_ERROR)
+                        self.assertEqual('Failed to obtain remote address for '
+                                         'the LDAP client while changing the '
+                                         'password',
+                                         estr)
+                    else:
+                        self.fail('pwd change should have failed')
+
+                # Ensure the account is locked out.
+
+                res = samdb.search(
+                    user_dn, scope=ldb.SCOPE_BASE,
+                    attrs=['msDS-User-Account-Control-Computed'])
+                self.assertEqual(1, len(res))
+
+                uac = int(res[0].get('msDS-User-Account-Control-Computed',
+                                     idx=0))
+                self.assertTrue(uac & dsdb.UF_LOCKOUT)
+
+                # Now the bad password database has been updated, inform the
+                # test process that it may proceed.
+                our_pipe.send_bytes(b'0')
+
+                # Wait one second to ensure the test process hits the
+                # transaction lock.
+                time.sleep(1)
+
+                if rename:
+                    # While we're at it, rename the account to ensure that is
+                    # also safe if a race occurs.
+                    msg = ldb.Message(user_dn)
+                    new_username = self.get_new_username()
+                    msg['sAMAccountName'] = ldb.MessageElement(
+                        new_username,
+                        ldb.FLAG_MOD_REPLACE,
+                        'sAMAccountName')
+                    samdb.modify(msg)
+
+            except Exception:
+                samdb.transaction_cancel()
+                raise
+
+            # Commit the local transaction.
+            samdb.transaction_commit()
+
+            result = connect_future.result(timeout=5)
+            self.assertEqual(result, ConnectionResult.LOCKED_OUT)
+
+    # Update the bad password count while holding a transaction lock, then
+    # release the lock. A logon attempt already in progress should reread the
+    # account details and ensure the bad password count is atomically
+    # updated. The account can additionally be renamed within the transaction
+    # to ensure that, by using the GUID, rereading the account's details still
+    # succeeds.
+    def do_bad_pwd_count_transaction(self, connect_fn, rename=False):
+        # Create the user account for testing.
+        user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+                                           use_cache=False)
+        user_dn = user_creds.get_dn()
+
+        admin_creds = self.get_admin_creds()
+        lp = self.get_lp()
+
+        # Get a connection to our local SamDB.
+        samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+                              credentials=admin_creds)
+        self.assertLocalSamDB(samdb)
+
+        # Prepare to connect to the server with an invalid password.
+        with futures.ProcessPoolExecutor(max_workers=1) as executor:
+            our_pipe, their_pipe = Pipe(duplex=True)
+            connect_future = executor.submit(
+                connect_fn,
+                pipe=their_pipe,
+                url=f'ldap://{samdb.host_dns_name()}',
+                hostname=samdb.host_dns_name(),
+                username=user_creds.get_username(),
+                password=user_creds.get_password()[:-1],  # invalid password
+                domain=user_creds.get_domain(),
+                realm=user_creds.get_realm(),
+                workstation=user_creds.get_workstation(),
+                dn=str(user_dn))
+
+            # Wait until the test process indicates it's ready.
+            self.wait_for_ready(our_pipe, connect_future)
+
+            # Take out a transaction.
+            samdb.transaction_start()
+            try:
+                # Inform the test process that it may proceed.
+                our_pipe.send_bytes(b'0')
+
+                # Wait one second to ensure the test process hits the
+                # transaction lock.
+                time.sleep(1)
+
+                # Set badPwdCount to 1.
+                msg = ldb.Message(user_dn)
+                now = int(time.time())
+                bad_pwd_time = unix2nttime(now)
+                msg['badPwdCount'] = ldb.MessageElement(
+                    '1',
+                    ldb.FLAG_MOD_REPLACE,
+                    'badPwdCount')
+                msg['badPasswordTime'] = ldb.MessageElement(
+                    str(bad_pwd_time),
+                    ldb.FLAG_MOD_REPLACE,
+                    'badPasswordTime')
+                if rename:
+                    # While we're at it, rename the account to ensure that is
+                    # also safe if a race occurs.
+                    new_username = self.get_new_username()
+                    msg['sAMAccountName'] = ldb.MessageElement(
+                        new_username,
+                        ldb.FLAG_MOD_REPLACE,
+                        'sAMAccountName')
+                samdb.modify(msg)
+
+                # Ensure the account is not yet locked out.
+
+                res = samdb.search(
+                    user_dn, scope=ldb.SCOPE_BASE,
+                    attrs=['msDS-User-Account-Control-Computed'])
+                self.assertEqual(1, len(res))
+
+                uac = int(res[0].get('msDS-User-Account-Control-Computed',
+                                     idx=0))
+                self.assertFalse(uac & dsdb.UF_LOCKOUT)
+            except Exception:
+                samdb.transaction_cancel()
+                raise
+
+            # Commit the local transaction.
+            samdb.transaction_commit()
+
+            result = connect_future.result(timeout=5)
+            self.assertEqual(result, ConnectionResult.WRONG_PASSWORD, result)
+
+        # Check that badPwdCount has now increased to 2.
+
+        res = samdb.search(user_dn,
+                           scope=ldb.SCOPE_BASE,
+                           attrs=['badPwdCount'])
+        self.assertEqual(1, len(res))
+
+        bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
+        self.assertEqual(2, bad_pwd_count)
+
+    # Attempt to log in to the account with an incorrect password, using
+    # lockoutThreshold+1 simultaneous attempts. We should get three 'wrong
+    # password' errors and one 'locked out' error, showing that the bad
+    # password count is checked and incremented atomically.
+    def do_lockout_race(self, connect_fn):
+        # Create the user account for testing.
+        user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+                                           use_cache=False)
+        user_dn = user_creds.get_dn()
+
+        admin_creds = self.get_admin_creds()
+        lp = self.get_lp()
+
+        # Get a connection to our local SamDB.
+        samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+                              credentials=admin_creds)
+        self.assertLocalSamDB(samdb)
+
+        # Prepare to connect to the server with an invalid password, using four
+        # simultaneous requests. Only three of those attempts should get
+        # through before the account is locked out.
+        num_attempts = self.lockout_threshold + 1
+        with futures.ProcessPoolExecutor(max_workers=num_attempts) as executor:
+            connect_futures = []
+            our_pipes = []
+            for i in range(num_attempts):
+                our_pipe, their_pipe = Pipe(duplex=True)
+                our_pipes.append(our_pipe)
+
+                connect_future = executor.submit(
+                    connect_fn,
+                    pipe=their_pipe,
+                    url=f'ldap://{samdb.host_dns_name()}',
+                    hostname=samdb.host_dns_name(),
+                    username=user_creds.get_username(),
+                    password=user_creds.get_password()[:-1],  # invalid pw
+                    domain=user_creds.get_domain(),
+                    realm=user_creds.get_realm(),
+                    workstation=user_creds.get_workstation(),
+                    dn=str(user_dn))
+                connect_futures.append(connect_future)
+
+                # Wait until the test process indicates it's ready.
+                self.wait_for_ready(our_pipe, connect_future)
+
+            # Take out a transaction.
+            samdb.transaction_start()
+            try:
+                # Inform the test processes that they may proceed.
+                for our_pipe in our_pipes:
+                    our_pipe.send_bytes(b'0')
+
+                # Wait one second to ensure the test processes hit the
+                # transaction lock.
+                time.sleep(1)
+            except Exception:
+                samdb.transaction_cancel()
+                raise
+
+            # Commit the local transaction.
+            samdb.transaction_commit()
+
+            lockouts = 0
+            wrong_passwords = 0
+            for i, connect_future in enumerate(connect_futures):
+                result = connect_future.result(timeout=5)
+                if result == ConnectionResult.LOCKED_OUT:
+                    lockouts += 1
+                elif result == ConnectionResult.WRONG_PASSWORD:
+                    wrong_passwords += 1
+                else:
+                    self.fail(f'process {i} gave an unexpected result '
+                              f'{result}')
+
+            self.assertEqual(wrong_passwords, self.lockout_threshold)
+            self.assertEqual(lockouts, num_attempts - self.lockout_threshold)
+
+        # Ensure the account is now locked out.
+
+        res = samdb.search(
+            user_dn, scope=ldb.SCOPE_BASE,
+            attrs=['badPwdCount',
+                   'msDS-User-Account-Control-Computed'])
+        self.assertEqual(1, len(res))
+
+        bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
+        self.assertEqual(self.lockout_threshold, bad_pwd_count)
+
+        uac = int(res[0].get('msDS-User-Account-Control-Computed',
+                             idx=0))
+        self.assertTrue(uac & dsdb.UF_LOCKOUT)
+
+    # Test that logon is possible even while we locally hold a transaction
+    # lock. This test only works with NTLM authentication; Kerberos
+    # authentication must take out a transaction to update the logonCount
+    # attribute, and LDAP and SAMR password changes both take out a transaction
+    # to effect the password change. NTLM is the only logon method that does
+    # not require a transaction, and can thus be performed while we're holding
+    # the lock.
+    def do_logon_without_transaction(self, connect_fn):
+        # Create the user account for testing.
+        user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+                                           use_cache=False)
+        user_dn = user_creds.get_dn()
+
+        admin_creds = self.get_admin_creds()
+        lp = self.get_lp()
+
+        # Get a connection to our local SamDB.
+        samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
+                              credentials=admin_creds)
+        self.assertLocalSamDB(samdb)
+
+        password = user_creds.get_password()
+
+        # Prepare to connect to the server with a valid password.
+        with futures.ProcessPoolExecutor(max_workers=1) as executor:
+            our_pipe, their_pipe = Pipe(duplex=True)
+            connect_future = executor.submit(
+                connect_fn,
+                pipe=their_pipe,
+                url=f'ldap://{samdb.host_dns_name()}',
+                hostname=samdb.host_dns_name(),
+                username=user_creds.get_username(),
+                password=password,
+                domain=user_creds.get_domain(),
+                realm=user_creds.get_realm(),
+                workstation=user_creds.get_workstation(),
+                dn=str(user_dn))
+
+            # Wait until the test process indicates it's ready.
+            self.wait_for_ready(our_pipe, connect_future)
+
+            # Take out a transaction.
+            samdb.transaction_start()
+            try:
+                # Inform the test process that it may proceed.
+                our_pipe.send_bytes(b'0')
+
+                # The connection should succeed, despite our holding a
+                # transaction.
+                result = connect_future.result(timeout=5)
+                self.assertEqual(result, ConnectionResult.SUCCESS)
+            except Exception:
+                samdb.transaction_cancel()
+                raise
+
+            # Commit the local transaction.
+            samdb.transaction_commit()
+
+
+if __name__ == '__main__':
+    global_asn1_print = False
+    global_hexdump = False
+    import unittest
+    unittest.main()
index 34051cf301eb582fce252931b91700df429da68a..2cffd18b098f777e758387ca63320638c47f0208 100644 (file)
@@ -50,6 +50,7 @@ from samba.tests.krb5.rfc4120_constants import (
     AD_IF_RELEVANT,
     AD_WIN2K_PAC,
     FX_FAST_ARMOR_AP_REQUEST,
+    KDC_ERR_CLIENT_REVOKED,
     KDC_ERR_GENERIC,
     KDC_ERR_POLICY,
     KDC_ERR_PREAUTH_FAILED,
@@ -641,6 +642,13 @@ class RawKerberosTest(TestCaseInTempDir):
             tkt_sig_support = '0'
         cls.tkt_sig_support = bool(int(tkt_sig_support))
 
+        gnutls_pbkdf2_support = samba.tests.env_get_var_value(
+            'GNUTLS_PBKDF2_SUPPORT',
+            allow_missing=True)
+        if gnutls_pbkdf2_support is None:
+            gnutls_pbkdf2_support = '1'
+        cls.gnutls_pbkdf2_support = bool(int(gnutls_pbkdf2_support))
+
         expect_pac = samba.tests.env_get_var_value('EXPECT_PAC',
                                                    allow_missing=True)
         if expect_pac is None:
@@ -3756,7 +3764,7 @@ class RawKerberosTest(TestCaseInTempDir):
                 expected_patypes += (PADATA_ETYPE_INFO2,)
 
             if error_code not in (KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW,
-                                  KDC_ERR_POLICY):
+                                  KDC_ERR_POLICY, KDC_ERR_CLIENT_REVOKED):
                 if sent_fast:
                     expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
                 else:
index 7d20093f97dce615798e4c7307825d536877d8b9..16527f1359392147ca0d2f81cca11ba359f212cb 100644 (file)
@@ -88,6 +88,7 @@ KDC_ERR_POLICY = 12
 KDC_ERR_BADOPTION = 13
 KDC_ERR_ETYPE_NOSUPP = 14
 KDC_ERR_SUMTYPE_NOSUPP = 15
+KDC_ERR_CLIENT_REVOKED = 18
 KDC_ERR_TGT_REVOKED = 20
 KDC_ERR_PREAUTH_FAILED = 24
 KDC_ERR_PREAUTH_REQUIRED = 25
index a7d51eab2b56c23ad9c7a7935a4f83372f19e1e0..ffb52c06f867b67071e820eef8423b8c10d3483b 100644 (file)
@@ -114,6 +114,7 @@ EXCLUDE_USAGE = {
     'python/samba/tests/krb5/nt_hash_tests.py',
     'python/samba/tests/krb5/kpasswd_tests.py',
     'python/samba/tests/krb5/claims_tests.py',
+    'python/samba/tests/krb5/lockout_tests.py',
 }
 
 EXCLUDE_HELP = {
diff --git a/selftest/flapping.d/ldap-pwd-change-race b/selftest/flapping.d/ldap-pwd-change-race
new file mode 100644 (file)
index 0000000..54ed56c
--- /dev/null
@@ -0,0 +1,5 @@
+# This test currently depends on a race. The password_hash dsdb module
+# relinquishes and immediately reacquires a transaction lock, and another
+# process may be able to acquire it during the short period of time in which it
+# is not held.
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_lockout_race_ldap_pw_change.ad_dc:local
index 99f687e32126abe1b466176fc7925a1d386ee5b6..13bdb9691a70004857cd732476c85b0afd1e5168 100644 (file)
 ^samba.tests.krb5.claims_tests.samba.tests.krb5.claims_tests.ClaimsTests.test_tgs_claims_remove_claims.ad_dc
 ^samba.tests.krb5.claims_tests.samba.tests.krb5.claims_tests.ClaimsTests.test_tgs_claims_remove_claims_to_krbtgt.ad_dc
 ^samba.tests.krb5.claims_tests.samba.tests.krb5.claims_tests.ClaimsTests.test_tgs_claims_to_krbtgt.ad_dc
+#
+# Lockout tests
+#
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_ntlm.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_rename_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_rename_ntlm.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_samr.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_lockout_race_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_lockout_race_ntlm.ad_dc:local
index e336ae4ea94289e263deaa132fc8b8133fc66a37..a905af24892c87212919ecccf3fe5b847a79fddb 100644 (file)
@@ -528,3 +528,17 @@ samba.tests.krb5.as_canonicalization_tests.samba.tests.krb5.as_canonicalization_
 ^samba.tests.krb5.claims_tests.samba.tests.krb5.claims_tests.ClaimsTests.test_tgs_claims_remove_claims.ad_dc
 ^samba.tests.krb5.claims_tests.samba.tests.krb5.claims_tests.ClaimsTests.test_tgs_claims_remove_claims_to_krbtgt.ad_dc
 ^samba.tests.krb5.claims_tests.samba.tests.krb5.claims_tests.ClaimsTests.test_tgs_claims_to_krbtgt.ad_dc
+#
+# Lockout tests
+#
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_ntlm.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_rename_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_rename_ntlm.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_bad_pwd_count_transaction_samr.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_lockout_race_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_lockout_race_ntlm.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_lockout_transaction_bad_pwd_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_lockout_transaction_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_lockout_transaction_rename_kdc.ad_dc:local
+^samba.tests.krb5.lockout_tests.samba.tests.krb5.lockout_tests.LockoutTests.test_logon_kdc.ad_dc:local
index eeb8f75534c617ff45624adcc5f33b488bda49bc..e63e9b40bf814ae7dff96fc56c2c9c1341b3167b 100755 (executable)
@@ -1002,6 +1002,8 @@ if ('SAMBA4_USES_HEIMDAL' in config_hash or
 else:
     tkt_sig_support = 0
 
+gnutls_pbkdf2_support = int('HAVE_GNUTLS_PBKDF2' in config_hash)
+
 if 'HAVE_MIT_KRB5_1_20' in config_hash:
     kadmin_is_tgs = 1
 else:
@@ -1022,6 +1024,7 @@ krb5_environ = {
     'CLAIMS_SUPPORT': claims_support,
     'COMPOUND_ID_SUPPORT': compound_id_support,
     'TKT_SIG_SUPPORT': tkt_sig_support,
+    'GNUTLS_PBKDF2_SUPPORT': gnutls_pbkdf2_support,
     'EXPECT_PAC': expect_pac,
     'EXPECT_EXTRA_PAC_BUFFERS': extra_pac_buffers,
     'CHECK_CNAME': check_cname,
@@ -1731,6 +1734,10 @@ planoldpythontestsuite(
     'ad_dc',
     'samba.tests.krb5.claims_tests',
     environ=krb5_environ)
+planoldpythontestsuite(
+    'ad_dc:local',
+    'samba.tests.krb5.lockout_tests',
+    environ=krb5_environ)
 
 for env in [
         'vampire_dc',