realm,
workstation,
dn,
- expect_error=True):
+ expect_error=True,
+ expect_status=None):
AsReqBaseTest.setUpClass()
as_req_base = AsReqBaseTest()
as_req_base.setUp()
etypes = as_req_base.get_default_enctypes(user_creds)
+ # Remove the LDAP connection.
+ del type(as_req_base)._ldb
+
if expect_error:
expected_error_modes = (KDC_ERR_CLIENT_REVOKED,
KDC_ERR_PREAUTH_FAILED)
+
+ # Wrap generic_check_kdc_error() to expect an NTSTATUS code when the
+ # account is locked out.
+ def check_error_fn(kdc_exchange_dict,
+ callback_dict,
+ rep):
+ error_code = rep.get('error-code')
+ if error_code == KDC_ERR_CLIENT_REVOKED:
+ # The account was locked out.
+ if expect_status:
+ # Expect to get a LOCKED_OUT NTSTATUS code.
+ kdc_exchange_dict['expect_edata'] = True
+ kdc_exchange_dict['expect_status'] = True
+ kdc_exchange_dict['expected_status'] = (
+ ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT)
+
+ elif error_code == KDC_ERR_PREAUTH_FAILED:
+ # Just a wrong password: the account wasn’t locked out. Don’t
+ # expect an NTSTATUS code.
+ kdc_exchange_dict['expect_status'] = False
+
+ # Continue with the generic error-checking logic.
+ return as_req_base.generic_check_kdc_error(
+ kdc_exchange_dict,
+ callback_dict,
+ rep)
+
+ check_rep_fn = None
else:
expected_error_modes = 0
- # Remove the LDAP connection.
- del type(as_req_base)._ldb
+ check_error_fn = None
+ check_rep_fn = as_req_base.generic_check_kdc_rep
- # Indicate that we're ready. This ensures we hit the right transaction
- # lock.
- pipe.send_bytes(b'0')
+ def _generate_padata_copy(_kdc_exchange_dict,
+ _callback_dict,
+ req_body):
+ return padata, req_body
- # Wait for the main process to take out a transaction lock.
- if not pipe.poll(timeout=5):
- raise AssertionError('main process failed to indicate readiness')
-
- # Try making a Kerberos AS-REQ to the KDC. This should fail, either due to
- # the user's account being locked out or due to using the wrong password.
- as_rep, kdc_exchange_dict = as_req_base._test_as_exchange(
+ kdc_exchange_dict = as_req_base.as_exchange_dict(
creds=user_creds,
- cname=cname,
- realm=realm,
- sname=sname,
- till=till,
- expected_error_mode=expected_error_modes,
expected_crealm=realm,
expected_cname=cname,
expected_srealm=realm,
expected_sname=sname,
- expected_salt=expected_salt,
- etypes=etypes,
- padata=padata,
- kdc_options=kdc_options,
- expected_supported_etypes=krbtgt_supported_etypes,
expected_account_name=user_name,
- preauth_key=preauth_key,
+ expected_supported_etypes=krbtgt_supported_etypes,
ticket_decryption_key=krbtgt_decryption_key,
+ generate_padata_fn=_generate_padata_copy,
+ check_error_fn=check_error_fn,
+ check_rep_fn=check_rep_fn,
+ check_kdc_private_fn=as_req_base.generic_check_kdc_private,
+ expected_error_mode=expected_error_modes,
+ expected_salt=expected_salt,
+ preauth_key=preauth_key,
+ kdc_options=str(kdc_options),
pac_request=True)
+
+ # Indicate that we're ready. This ensures we hit the right transaction
+ # lock.
+ pipe.send_bytes(b'0')
+
+ # Wait for the main process to take out a transaction lock.
+ if not pipe.poll(timeout=5):
+ raise AssertionError('main process failed to indicate readiness')
+
+ # Try making a Kerberos AS-REQ to the KDC. This might fail, either due to
+ # the user's account being locked out or due to using the wrong password.
+ as_rep = as_req_base._generic_kdc_exchange(kdc_exchange_dict,
+ cname=cname,
+ realm=realm,
+ sname=sname,
+ till_time=till,
+ etypes=etypes)
+
as_req_base.assertIsNotNone(as_rep)
msg_type = as_rep['msg-type']
def test_lockout_transaction_kdc(self):
self.do_lockout_transaction(connect_kdc)
+ def test_lockout_transaction_kdc_ntstatus(self):
+ self.do_lockout_transaction(partial(connect_kdc, expect_status=True))
+
def test_lockout_transaction_ntlm(self):
self.do_lockout_transaction(connect_ntlm)
def test_lockout_transaction_rename_kdc(self):
self.do_lockout_transaction(connect_kdc, rename=True)
+ def test_lockout_transaction_rename_kdc_ntstatus(self):
+ self.do_lockout_transaction(partial(connect_kdc, expect_status=True),
+ rename=True)
+
def test_lockout_transaction_rename_ntlm(self):
self.do_lockout_transaction(connect_ntlm, rename=True)
def test_lockout_transaction_bad_pwd_kdc(self):
self.do_lockout_transaction(connect_kdc, correct_pw=False)
+ def test_lockout_transaction_bad_pwd_kdc_ntstatus(self):
+ self.do_lockout_transaction(partial(connect_kdc, expect_status=True),
+ correct_pw=False)
+
def test_lockout_transaction_bad_pwd_ntlm(self):
self.do_lockout_transaction(connect_ntlm, correct_pw=False)
def test_lockout_race_kdc(self):
self.do_lockout_race(connect_kdc)
+ def test_lockout_race_kdc_ntstatus(self):
+ self.do_lockout_race(partial(connect_kdc, expect_status=True))
+
def test_lockout_race_ntlm(self):
self.do_lockout_race(connect_ntlm)