CVE-2022-2031 tests/krb5: Add methods to create ASN1 kpasswd structures
[samba.git] / python / samba / tests / krb5 / raw_testcase.py
index 8d2b84c9d7f748e4ecbc49127721c8f2572a48b8..2a67e6d991cb536493dedab2d54a3c2bc31d95f7 100644 (file)
@@ -56,6 +56,7 @@ from samba.tests.krb5.rfc4120_constants import (
     KRB_AS_REP,
     KRB_AS_REQ,
     KRB_ERROR,
+    KRB_PRIV,
     KRB_TGS_REP,
     KRB_TGS_REQ,
     KU_AP_REQ_AUTH,
@@ -66,6 +67,7 @@ from samba.tests.krb5.rfc4120_constants import (
     KU_FAST_FINISHED,
     KU_FAST_REP,
     KU_FAST_REQ_CHKSUM,
+    KU_KRB_PRIV,
     KU_NON_KERB_CKSUM_SALT,
     KU_TGS_REP_ENC_PART_SESSION,
     KU_TGS_REP_ENC_PART_SUB_KEY,
@@ -1825,6 +1827,99 @@ class RawKerberosTest(TestCaseInTempDir):
             PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
         return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
 
+    def ChangePasswdDataMS_create(self,
+                                  new_password,
+                                  target_princ=None,
+                                  target_realm=None):
+        ChangePasswdDataMS_obj = {
+            'newpasswd': new_password,
+        }
+        if target_princ is not None:
+            ChangePasswdDataMS_obj['targname'] = target_princ
+        if target_realm is not None:
+            ChangePasswdDataMS_obj['targrealm'] = target_realm
+
+        change_password_data = self.der_encode(
+            ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS())
+
+        return change_password_data
+
+    def KRB_PRIV_create(self,
+                        subkey,
+                        user_data,
+                        s_address,
+                        timestamp=None,
+                        usec=None,
+                        seq_number=None,
+                        r_address=None):
+        EncKrbPrivPart_obj = {
+            'user-data': user_data,
+            's-address': s_address,
+        }
+        if timestamp is not None:
+            EncKrbPrivPart_obj['timestamp'] = timestamp
+        if usec is not None:
+            EncKrbPrivPart_obj['usec'] = usec
+        if seq_number is not None:
+            EncKrbPrivPart_obj['seq-number'] = seq_number
+        if r_address is not None:
+            EncKrbPrivPart_obj['r-address'] = r_address
+
+        enc_krb_priv_part = self.der_encode(
+            EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart())
+
+        enc_data = self.EncryptedData_create(subkey,
+                                             KU_KRB_PRIV,
+                                             enc_krb_priv_part)
+
+        KRB_PRIV_obj = {
+            'pvno': 5,
+            'msg-type': KRB_PRIV,
+            'enc-part': enc_data,
+        }
+
+        krb_priv = self.der_encode(
+            KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV())
+
+        return krb_priv
+
+    def kpasswd_create(self,
+                       subkey,
+                       user_data,
+                       version,
+                       seq_number,
+                       ap_req,
+                       local_address,
+                       remote_address):
+        self.assertIsNotNone(self.s, 'call self.connect() first')
+
+        timestamp, usec = self.get_KerberosTimeWithUsec()
+
+        krb_priv = self.KRB_PRIV_create(subkey,
+                                        user_data,
+                                        s_address=local_address,
+                                        timestamp=timestamp,
+                                        usec=usec,
+                                        seq_number=seq_number,
+                                        r_address=remote_address)
+
+        size = 6 + len(ap_req) + len(krb_priv)
+        self.assertLess(size, 0x10000)
+
+        msg = bytearray()
+        msg.append(size >> 8)
+        msg.append(size & 0xff)
+        msg.append(version >> 8)
+        msg.append(version & 0xff)
+        msg.append(len(ap_req) >> 8)
+        msg.append(len(ap_req) & 0xff)
+        # Note: for sets, there could be a little-endian four-byte length here.
+
+        msg.extend(ap_req)
+        msg.extend(krb_priv)
+
+        return msg
+
     def _generic_kdc_exchange(self,
                               kdc_exchange_dict,  # required
                               cname=None,  # optional