+#ifdef HAVE_LZ4
+ raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
+ offset += 4;
+ if (raw) {
+ LZ4F_decompressionContext_t lz4_ctxt;
+ LZ4F_frameInfo_t lz4_info;
+ LZ4F_errorCode_t ret;
+ LZ4F_decompressOptions_t dec_opts = {0};
+ size_t src_offset, src_size, dst_size;
+ guchar *decompressed_buffer = NULL;
+
+ /* Prepare compressed data buffer */
+ guint compressed_size = tvb_reported_length(raw);
+ guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), raw, 0, compressed_size);
+ /* Override header checksum to workaround buggy Kafka implementations */
+ if (compressed_size > 7) {
+ guint hdr_end = 6;
+ if (data[4] & 0x08) {
+ hdr_end += 8;
+ }
+ if (hdr_end < compressed_size) {
+ data[hdr_end] = (XXH32(&data[4], hdr_end - 4, 0) >> 8) & 0xff;
+ }
+ }
+
+ /* Show raw compressed data */
+ proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA);
+
+ /* Allocate output buffer */
+ ret = LZ4F_createDecompressionContext(&lz4_ctxt, LZ4F_VERSION);
+ if (LZ4F_isError(ret)) {
+ goto fail;
+ }
+ src_offset = compressed_size;
+ ret = LZ4F_getFrameInfo(lz4_ctxt, &lz4_info, data, &src_offset);
+ if (LZ4F_isError(ret)) {
+ LZ4F_freeDecompressionContext(lz4_ctxt);
+ goto fail;
+ }
+ switch (lz4_info.blockSizeID) {
+ case LZ4F_max64KB:
+ dst_size = 1 << 16;
+ break;
+ case LZ4F_max256KB:
+ dst_size = 1 << 18;
+ break;
+ case LZ4F_max1MB:
+ dst_size = 1 << 20;
+ break;
+ case LZ4F_max4MB:
+ dst_size = 1 << 22;
+ break;
+ default:
+ LZ4F_freeDecompressionContext(lz4_ctxt);
+ goto fail;
+ }
+ if (lz4_info.contentSize && lz4_info.contentSize < dst_size) {
+ dst_size = (size_t)lz4_info.contentSize;
+ }
+ decompressed_buffer = (guchar*)wmem_alloc(pinfo->pool, dst_size);
+
+ /* Attempt the decompression. */
+ src_size = compressed_size - src_offset;
+ ret = LZ4F_decompress(lz4_ctxt, decompressed_buffer, &dst_size,
+ &data[src_offset], &src_size, &dec_opts);
+ LZ4F_freeDecompressionContext(lz4_ctxt);
+ if (ret == 0) {
+ size_t uncompressed_size = dst_size;
+
+ show_compression_reduction(tvb, subtree, compressed_size, (guint)uncompressed_size);
+
+ /* Add as separate data tab */
+ payload = tvb_new_child_real_data(tvb, decompressed_buffer,
+ (guint32)uncompressed_size, (guint32)uncompressed_size);
+ add_new_data_source(pinfo, payload, "Uncompressed Message");
+
+ /* Dissect as a message set */
+ dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec);
+
+ /* Add to summary */
+ col_append_fstr(pinfo->cinfo, COL_INFO, " [LZ4-compressed message set]");
+ proto_item_append_text(message_ti, " (LZ4-compressed message set)");
+ } else {
+ fail:
+ /* Error */
+ decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA);
+ expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress);
+ }
+ offset += compressed_size;
+ }
+ break;
+#endif
+