CVE-2022-2031 tests/krb5: Add kpasswd_exchange() method
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Tue, 24 May 2022 07:57:57 +0000 (19:57 +1200)
committerJule Anger <janger@samba.org>
Wed, 27 Jul 2022 10:52:36 +0000 (10:52 +0000)
Now we can test the kpasswd service from Python.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
python/samba/tests/krb5/raw_testcase.py

index b16b78ebf562fed090c5d8d70b7e8c78cced9cfe..b22617c38820260a43e8d454c441fe5b272b86c2 100644 (file)
@@ -26,6 +26,8 @@ import binascii
 import itertools
 import collections
 
+from enum import Enum
+
 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
@@ -33,6 +35,8 @@ from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
 
 from pyasn1.codec.ber.encoder import BitStringEncoder
 
+from pyasn1.error import PyAsn1Error
+
 from samba.credentials import Credentials
 from samba.dcerpc import krb5pac, security
 from samba.gensec import FEATURE_SEAL
@@ -52,6 +56,7 @@ from samba.tests.krb5.rfc4120_constants import (
     KDC_ERR_SKEW,
     KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
     KERB_ERR_TYPE_EXTENDED,
+    KRB_AP_REP,
     KRB_AP_REQ,
     KRB_AS_REP,
     KRB_AS_REQ,
@@ -61,6 +66,7 @@ from samba.tests.krb5.rfc4120_constants import (
     KRB_TGS_REQ,
     KU_AP_REQ_AUTH,
     KU_AS_REP_ENC_PART,
+    KU_AP_REQ_ENC_PART,
     KU_AS_REQ,
     KU_ENC_CHALLENGE_KDC,
     KU_FAST_ENC,
@@ -76,6 +82,7 @@ from samba.tests.krb5.rfc4120_constants import (
     KU_TGS_REQ_AUTH_DAT_SESSION,
     KU_TGS_REQ_AUTH_DAT_SUBKEY,
     KU_TICKET,
+    NT_PRINCIPAL,
     NT_SRV_INST,
     NT_WELLKNOWN,
     PADATA_ENCRYPTED_CHALLENGE,
@@ -525,6 +532,10 @@ class KerberosTicketCreds:
 class RawKerberosTest(TestCaseInTempDir):
     """A raw Kerberos Test case."""
 
+    class KpasswdMode(Enum):
+        SET = object()
+        CHANGE = object()
+
     pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
                           krb5pac.PAC_TYPE_KDC_CHECKSUM,
                           krb5pac.PAC_TYPE_TICKET_CHECKSUM}
@@ -1931,6 +1942,224 @@ class RawKerberosTest(TestCaseInTempDir):
 
         return msg
 
+    def get_enc_part(self, obj, key, usage):
+        self.assertElementEqual(obj, 'pvno', 5)
+
+        enc_part = obj['enc-part']
+        self.assertElementEqual(enc_part, 'etype', key.etype)
+        self.assertElementKVNO(enc_part, 'kvno', key.kvno)
+
+        enc_part = key.decrypt(usage, enc_part['cipher'])
+
+        return enc_part
+
+    def kpasswd_exchange(self,
+                         ticket,
+                         new_password,
+                         expected_code,
+                         expected_msg,
+                         mode,
+                         target_princ=None,
+                         target_realm=None,
+                         ap_options=None,
+                         send_seq_number=True):
+        if mode is self.KpasswdMode.SET:
+            version = 0xff80
+            user_data = self.ChangePasswdDataMS_create(new_password,
+                                                       target_princ,
+                                                       target_realm)
+        elif mode is self.KpasswdMode.CHANGE:
+            self.assertIsNone(target_princ,
+                              'target_princ only valid for pw set')
+            self.assertIsNone(target_realm,
+                              'target_realm only valid for pw set')
+
+            version = 1
+            user_data = new_password.encode('utf-8')
+        else:
+            self.fail(f'invalid mode {mode}')
+
+        subkey = self.RandomKey(kcrypto.Enctype.AES256)
+
+        if ap_options is None:
+            ap_options = '0'
+        ap_options = str(krb5_asn1.APOptions(ap_options))
+
+        kdc_exchange_dict = {
+            'tgt': ticket,
+            'authenticator_subkey': subkey,
+            'auth_data': None,
+            'ap_options': ap_options,
+        }
+
+        if send_seq_number:
+            seq_number = random.randint(0, 0xfffffffe)
+        else:
+            seq_number = None
+
+        ap_req = self.generate_ap_req(kdc_exchange_dict,
+                                      None,
+                                      req_body=None,
+                                      armor=False,
+                                      usage=KU_AP_REQ_AUTH,
+                                      seq_number=seq_number)
+
+        self.connect(self.host, port=464)
+        self.assertIsNotNone(self.s)
+
+        family = self.s.family
+
+        if family == socket.AF_INET:
+            addr_type = 2  # IPv4
+        elif family == socket.AF_INET6:
+            addr_type = 24  # IPv6
+        else:
+            self.fail(f'unknown family {family}')
+
+        def create_address(ip):
+            return {
+                'addr-type': addr_type,
+                'address': socket.inet_pton(family, ip),
+            }
+
+        local_ip = self.s.getsockname()[0]
+        local_address = create_address(local_ip)
+
+        # remote_ip = self.s.getpeername()[0]
+        # remote_address = create_address(remote_ip)
+
+        # TODO: due to a bug (?), MIT Kerberos will not accept the request
+        # unless r-address is set to our _local_ address. Heimdal, on the other
+        # hand, requires the r-address is set to the remote address (as
+        # expected). To avoid problems, avoid sending r-address for now.
+        remote_address = None
+
+        msg = self.kpasswd_create(subkey,
+                                  user_data,
+                                  version,
+                                  seq_number,
+                                  ap_req,
+                                  local_address,
+                                  remote_address)
+
+        self.send_msg(msg)
+        rep_pdu = self.recv_pdu_raw()
+
+        self._disconnect('transaction done')
+
+        self.assertIsNotNone(rep_pdu)
+
+        header = rep_pdu[:6]
+        reply = rep_pdu[6:]
+
+        reply_len = (header[0] << 8) | header[1]
+        reply_version = (header[2] << 8) | header[3]
+        ap_rep_len = (header[4] << 8) | header[5]
+
+        self.assertEqual(reply_len, len(rep_pdu))
+        self.assertEqual(1, reply_version)  # KRB5_KPASSWD_VERS_CHANGEPW
+        self.assertLess(ap_rep_len, reply_len)
+
+        self.assertNotEqual(0x7e, rep_pdu[1])
+        self.assertNotEqual(0x5e, rep_pdu[1])
+
+        if ap_rep_len:
+            # We received an AP-REQ and KRB-PRIV as a response. This may or may
+            # not indicate an error, depending on the status code.
+            ap_rep = reply[:ap_rep_len]
+            krb_priv = reply[ap_rep_len:]
+
+            key = ticket.session_key
+
+            ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP())
+            self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP)
+            enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART)
+            enc_part = self.der_decode(
+                enc_part, asn1Spec=krb5_asn1.EncAPRepPart())
+
+            self.assertElementPresent(enc_part, 'ctime')
+            self.assertElementPresent(enc_part, 'cusec')
+            # self.assertElementMissing(enc_part, 'subkey') # TODO
+            # self.assertElementPresent(enc_part, 'seq-number') # TODO
+
+            try:
+                krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV())
+            except PyAsn1Error:
+                self.fail()
+
+            self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV)
+            priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV)
+            priv_enc_part = self.der_decode(
+                priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart())
+
+            self.assertElementMissing(priv_enc_part, 'timestamp')
+            self.assertElementMissing(priv_enc_part, 'usec')
+            # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
+            # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
+            # self.assertElementMissing(priv_enc_part, 'r-address') # TODO
+
+            result_data = priv_enc_part['user-data']
+        else:
+            # We received a KRB-ERROR as a response, indicating an error.
+            krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR())
+
+            sname = self.PrincipalName_create(
+                name_type=NT_PRINCIPAL,
+                names=['kadmin', 'changepw'])
+            realm = self.get_krbtgt_creds().get_realm().upper()
+
+            self.assertElementEqual(krb_error, 'pvno', 5)
+            self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR)
+            self.assertElementMissing(krb_error, 'ctime')
+            self.assertElementMissing(krb_error, 'usec')
+            self.assertElementPresent(krb_error, 'stime')
+            self.assertElementPresent(krb_error, 'susec')
+
+            error_code = krb_error['error-code']
+            if isinstance(expected_code, int):
+                self.assertEqual(error_code, expected_code)
+            else:
+                self.assertIn(error_code, expected_code)
+
+            self.assertElementMissing(krb_error, 'crealm')
+            self.assertElementMissing(krb_error, 'cname')
+            self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8'))
+            self.assertElementEqualPrincipal(krb_error, 'sname', sname)
+            self.assertElementMissing(krb_error, 'e-text')
+
+            result_data = krb_error['e-data']
+
+        status = result_data[:2]
+        message = result_data[2:]
+
+        status_code = (status[0] << 8) | status[1]
+        if isinstance(expected_code, int):
+            self.assertEqual(status_code, expected_code)
+        else:
+            self.assertIn(status_code, expected_code)
+
+        if not message:
+            self.assertEqual(0, status_code,
+                             'got an error result, but no message')
+            return
+
+        # Check the first character of the message.
+        if message[0]:
+            if isinstance(expected_msg, bytes):
+                self.assertEqual(message, expected_msg)
+            else:
+                self.assertIn(message, expected_msg)
+        else:
+            # We got AD password policy information.
+            self.assertEqual(30, len(message))
+
+            (empty_bytes,
+             min_length,
+             history_length,
+             properties,
+             expire_time,
+             min_age) = struct.unpack('>HIIIQQ', message)
+
     def _generic_kdc_exchange(self,
                               kdc_exchange_dict,  # required
                               cname=None,  # optional
@@ -2041,7 +2270,7 @@ class RawKerberosTest(TestCaseInTempDir):
             self.assertIsNotNone(generate_fast_fn)
             fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
                                                  callback_dict,
-                                                 req_body,
+                                                 None,
                                                  armor=True)
 
             fast_armor_type = kdc_exchange_dict['fast_armor_type']
@@ -3438,31 +3667,39 @@ class RawKerberosTest(TestCaseInTempDir):
                         kdc_exchange_dict,
                         _callback_dict,
                         req_body,
-                        armor):
+                        armor,
+                        usage=None,
+                        seq_number=None):
+        req_body_checksum = None
+
         if armor:
+            self.assertIsNone(req_body)
+
             tgt = kdc_exchange_dict['armor_tgt']
             authenticator_subkey = kdc_exchange_dict['armor_subkey']
-
-            req_body_checksum = None
         else:
             tgt = kdc_exchange_dict['tgt']
             authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
-            body_checksum_type = kdc_exchange_dict['body_checksum_type']
 
-            req_body_blob = self.der_encode(req_body,
-                                            asn1Spec=krb5_asn1.KDC_REQ_BODY())
+            if req_body is not None:
+                body_checksum_type = kdc_exchange_dict['body_checksum_type']
+
+                req_body_blob = self.der_encode(
+                    req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
 
-            req_body_checksum = self.Checksum_create(tgt.session_key,
-                                                     KU_TGS_REQ_AUTH_CKSUM,
-                                                     req_body_blob,
-                                                     ctype=body_checksum_type)
+                req_body_checksum = self.Checksum_create(
+                    tgt.session_key,
+                    KU_TGS_REQ_AUTH_CKSUM,
+                    req_body_blob,
+                    ctype=body_checksum_type)
 
         auth_data = kdc_exchange_dict['auth_data']
 
         subkey_obj = None
         if authenticator_subkey is not None:
             subkey_obj = authenticator_subkey.export_obj()
-        seq_number = random.randint(0, 0xfffffffe)
+        if seq_number is None:
+            seq_number = random.randint(0, 0xfffffffe)
         (ctime, cusec) = self.get_KerberosTimeWithUsec()
         authenticator_obj = self.Authenticator_create(
             crealm=tgt.crealm,
@@ -3477,7 +3714,8 @@ class RawKerberosTest(TestCaseInTempDir):
             authenticator_obj,
             asn1Spec=krb5_asn1.Authenticator())
 
-        usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
+        if usage is None:
+            usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
         authenticator = self.EncryptedData_create(tgt.session_key,
                                                   usage,
                                                   authenticator_blob)