tests/krb5: Add method to replace client or device claims in a PAC
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Thu, 28 Sep 2023 03:13:08 +0000 (16:13 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 28 Sep 2023 03:33:38 +0000 (03:33 +0000)
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/kdc_base_test.py

index 73ed4c42bed9254d9fa35a3f0a30331ca15d634a..e55d35fa389ad358523d3cf801a18b9b7d4e288a 100644 (file)
@@ -1825,6 +1825,108 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
 
         return pac
 
+    def set_pac_claims(self, pac, *, client_claims=None, device_claims=None, claim_ids=None):
+        if claim_ids is None:
+            claim_ids = {}
+
+        if client_claims is not None:
+            self.assertIsNone(device_claims,
+                              'don’t specify both client and device claims')
+            pac_claims = client_claims
+            pac_buffer_type = krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO
+        else:
+            self.assertIsNotNone(device_claims,
+                                 'please specify client or device claims')
+            pac_claims = device_claims
+            pac_buffer_type = krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO
+
+        claim_value_types = {
+            claims.CLAIM_TYPE_INT64: claims.CLAIM_INT64,
+            claims.CLAIM_TYPE_UINT64: claims.CLAIM_UINT64,
+            claims.CLAIM_TYPE_STRING: claims.CLAIM_STRING,
+            claims.CLAIM_TYPE_BOOLEAN: claims.CLAIM_UINT64,
+        }
+
+        claims_arrays = []
+
+        for pac_claim_array in pac_claims:
+            pac_claim_source_type, pac_claim_entries = (
+                pac_claim_array)
+
+            claim_entries = []
+
+            for pac_claim_entry in pac_claim_entries:
+                pac_claim_id, pac_claim_type, pac_claim_values = (
+                    pac_claim_entry)
+
+                claim_values_type = claim_value_types.get(
+                    pac_claim_type, claims.CLAIM_STRING)
+
+                claim_values_enum = claim_values_type()
+                claim_values_enum.values = pac_claim_values
+                claim_values_enum.value_count = len(
+                    pac_claim_values)
+
+                claim_entry = claims.CLAIM_ENTRY()
+                try:
+                    claim_entry.id = pac_claim_id.format_map(
+                        claim_ids)
+                except KeyError as err:
+                    raise RuntimeError(
+                        f'unknown claim name(s) '
+                        f'in ‘{pac_claim_id}’'
+                    ) from err
+                claim_entry.type = pac_claim_type
+                claim_entry.values = claim_values_enum
+
+                claim_entries.append(claim_entry)
+
+            claims_array = claims.CLAIMS_ARRAY()
+            claims_array.claims_source_type = pac_claim_source_type
+            claims_array.claim_entries = claim_entries
+            claims_array.claims_count = len(claim_entries)
+
+            claims_arrays.append(claims_array)
+
+        claims_set = claims.CLAIMS_SET()
+        claims_set.claims_arrays = claims_arrays
+        claims_set.claims_array_count = len(claims_arrays)
+
+        claims_ctr = claims.CLAIMS_SET_CTR()
+        claims_ctr.claims = claims_set
+
+        claims_ndr = claims.CLAIMS_SET_NDR()
+        claims_ndr.claims = claims_ctr
+
+        metadata = claims.CLAIMS_SET_METADATA()
+        metadata.claims_set = claims_ndr
+        metadata.compression_format = (
+            claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF)
+
+        metadata_ctr = claims.CLAIMS_SET_METADATA_CTR()
+        metadata_ctr.metadata = metadata
+
+        metadata_ndr = claims.CLAIMS_SET_METADATA_NDR()
+        metadata_ndr.claims = metadata_ctr
+
+        pac_buffers = pac.buffers
+        for pac_buffer in pac_buffers:
+            if pac_buffer.type == pac_buffer_type:
+                break
+        else:
+            pac_buffer = krb5pac.PAC_BUFFER()
+            pac_buffer.type = pac_buffer_type
+            pac_buffer.info = krb5pac.DATA_BLOB_REM()
+
+            pac_buffers.append(pac_buffer)
+
+        pac_buffer.info.remaining = ndr_pack(metadata_ndr)
+
+        pac.buffers = pac_buffers
+        pac.num_buffers = len(pac_buffers)
+
+        return pac
+
     def get_cached_creds(self, *,
                          account_type,
                          opts=None,