Kafka: add LZ4 decompression
authorPascal Quantin <pascal.quantin@gmail.com>
Tue, 9 May 2017 20:21:04 +0000 (21:21 +0100)
committerMichael Mann <mmann78@netscape.net>
Sat, 13 May 2017 11:54:31 +0000 (11:54 +0000)
Change-Id: Idf2f63782c8751778ad88f46a7f65fe7d5d49f3b
Reviewed-on: https://code.wireshark.org/review/21577
Reviewed-by: Pascal Quantin <pascal.quantin@gmail.com>
Petri-Dish: Pascal Quantin <pascal.quantin@gmail.com>
Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org>
Reviewed-by: Michael Mann <mmann78@netscape.net>
epan/dissectors/packet-kafka.c

index bcaeceb3443e3f077f567bdfd0b32fe2f2ff6dd8..9079ee552bfad7e807707d5dfc43cd86f42e120d 100644 (file)
@@ -33,6 +33,9 @@
 #ifdef HAVE_SNAPPY
 #include <snappy-c.h>
 #endif
+#ifdef HAVE_LZ4
+#include <lz4frame.h>
+#endif
 #include "packet-tcp.h"
 
 void proto_register_kafka(void);
@@ -351,6 +354,121 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
 
 /* HELPERS */
 
+#ifdef HAVE_LZ4
+/* Local copy of XXH32() algorithm as found in https://github.com/lz4/lz4/blob/v1.7.5/lib/xxhash.c
+   as some packagers are not providing xxhash.h in liblz4 */
+typedef struct {
+    guint32 total_len_32;
+    guint32 large_len;
+    guint32 v1;
+    guint32 v2;
+    guint32 v3;
+    guint32 v4;
+    guint32 mem32[4];   /* buffer defined as U32 for alignment */
+    guint32 memsize;
+    guint32 reserved;   /* never read nor write, will be removed in a future version */
+} XXH32_state_t;
+
+typedef enum {
+    XXH_bigEndian=0,
+    XXH_littleEndian=1
+} XXH_endianess;
+
+static const int g_one = 1;
+#define XXH_CPU_LITTLE_ENDIAN   (*(const char*)(&g_one))
+
+static const guint32 PRIME32_1 = 2654435761U;
+static const guint32 PRIME32_2 = 2246822519U;
+static const guint32 PRIME32_3 = 3266489917U;
+static const guint32 PRIME32_4 =  668265263U;
+static const guint32 PRIME32_5 =  374761393U;
+
+#define XXH_rotl32(x,r) ((x << r) | (x >> (32 - r)))
+
+static guint32 XXH_read32(const void* memPtr)
+{
+    guint32 val;
+    memcpy(&val, memPtr, sizeof(val));
+    return val;
+}
+
+static guint32 XXH_swap32(guint32 x)
+{
+    return  ((x << 24) & 0xff000000 ) |
+            ((x <<  8) & 0x00ff0000 ) |
+            ((x >>  8) & 0x0000ff00 ) |
+            ((x >> 24) & 0x000000ff );
+}
+
+#define XXH_readLE32(ptr, endian) (endian==XXH_littleEndian ? XXH_read32(ptr) : XXH_swap32(XXH_read32(ptr)))
+
+static guint32 XXH32_round(guint32 seed, guint32 input)
+{
+    seed += input * PRIME32_2;
+    seed  = XXH_rotl32(seed, 13);
+    seed *= PRIME32_1;
+    return seed;
+}
+
+static guint32 XXH32_endian(const void* input, size_t len, guint32 seed, XXH_endianess endian)
+{
+    const gint8* p = (const gint8*)input;
+    const gint8* bEnd = p + len;
+    guint32 h32;
+#define XXH_get32bits(p) XXH_readLE32(p, endian)
+
+    if (len>=16) {
+        const gint8* const limit = bEnd - 16;
+        guint32 v1 = seed + PRIME32_1 + PRIME32_2;
+        guint32 v2 = seed + PRIME32_2;
+        guint32 v3 = seed + 0;
+        guint32 v4 = seed - PRIME32_1;
+
+        do {
+            v1 = XXH32_round(v1, XXH_get32bits(p)); p+=4;
+            v2 = XXH32_round(v2, XXH_get32bits(p)); p+=4;
+            v3 = XXH32_round(v3, XXH_get32bits(p)); p+=4;
+            v4 = XXH32_round(v4, XXH_get32bits(p)); p+=4;
+        } while (p<=limit);
+
+        h32 = XXH_rotl32(v1, 1) + XXH_rotl32(v2, 7) + XXH_rotl32(v3, 12) + XXH_rotl32(v4, 18);
+    } else {
+        h32  = seed + PRIME32_5;
+    }
+
+    h32 += (guint32) len;
+
+    while (p+4<=bEnd) {
+        h32 += XXH_get32bits(p) * PRIME32_3;
+        h32  = XXH_rotl32(h32, 17) * PRIME32_4 ;
+        p+=4;
+    }
+
+    while (p<bEnd) {
+        h32 += (*p) * PRIME32_5;
+        h32 = XXH_rotl32(h32, 11) * PRIME32_1 ;
+        p++;
+    }
+
+    h32 ^= h32 >> 15;
+    h32 *= PRIME32_2;
+    h32 ^= h32 >> 13;
+    h32 *= PRIME32_3;
+    h32 ^= h32 >> 16;
+
+    return h32;
+}
+
+static guint XXH32(const void* input, size_t len, guint seed)
+{
+    XXH_endianess endian_detected = (XXH_endianess)XXH_CPU_LITTLE_ENDIAN;
+    if (endian_detected==XXH_littleEndian)
+        return XXH32_endian(input, len, seed, XXH_littleEndian);
+    else
+        return XXH32_endian(input, len, seed, XXH_bigEndian);
+}
+#endif
+
 static const char *
 kafka_error_to_str(kafka_error_t error)
 {
@@ -710,7 +828,99 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
             break;
 #endif
         case KAFKA_MESSAGE_CODEC_LZ4:
-            /* TODO: */
+#ifdef HAVE_LZ4
+            raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
+            offset += 4;
+            if (raw) {
+                LZ4F_decompressionContext_t lz4_ctxt;
+                LZ4F_frameInfo_t lz4_info;
+                LZ4F_errorCode_t ret;
+                LZ4F_decompressOptions_t dec_opts = {0};
+                size_t src_offset, src_size, dst_size;
+                guchar *decompressed_buffer = NULL;
+
+                /* Prepare compressed data buffer */
+                guint compressed_size = tvb_reported_length(raw);
+                guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), raw, 0, compressed_size);
+                /* Override header checksum to workaround buggy Kafka implementations */
+                if (compressed_size > 7) {
+                    guint hdr_end = 6;
+                    if (data[4] & 0x08) {
+                        hdr_end += 8;
+                    }
+                    if (hdr_end < compressed_size) {
+                        data[hdr_end] = (XXH32(&data[4], hdr_end - 4, 0) >> 8) & 0xff;
+                    }
+                }
+
+                /* Show raw compressed data */
+                proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA);
+
+                /* Allocate output buffer */
+                ret = LZ4F_createDecompressionContext(&lz4_ctxt, LZ4F_VERSION);
+                if (LZ4F_isError(ret)) {
+                    goto fail;
+                }
+                src_offset = compressed_size;
+                ret = LZ4F_getFrameInfo(lz4_ctxt, &lz4_info, data, &src_offset);
+                if (LZ4F_isError(ret)) {
+                    LZ4F_freeDecompressionContext(lz4_ctxt);
+                    goto fail;
+                }
+                switch (lz4_info.blockSizeID) {
+                case LZ4F_max64KB:
+                    dst_size = 1 << 16;
+                    break;
+                case LZ4F_max256KB:
+                    dst_size = 1 << 18;
+                    break;
+                case LZ4F_max1MB:
+                    dst_size = 1 << 20;
+                    break;
+                case LZ4F_max4MB:
+                    dst_size = 1 << 22;
+                    break;
+                default:
+                    LZ4F_freeDecompressionContext(lz4_ctxt);
+                    goto fail;
+                }
+                if (lz4_info.contentSize && lz4_info.contentSize < dst_size) {
+                    dst_size = (size_t)lz4_info.contentSize;
+                }
+                decompressed_buffer = (guchar*)wmem_alloc(pinfo->pool, dst_size);
+
+                /* Attempt the decompression. */
+                src_size = compressed_size - src_offset;
+                ret = LZ4F_decompress(lz4_ctxt, decompressed_buffer, &dst_size,
+                                      &data[src_offset], &src_size, &dec_opts);
+                LZ4F_freeDecompressionContext(lz4_ctxt);
+                if (ret == 0) {
+                    size_t uncompressed_size = dst_size;
+
+                    show_compression_reduction(tvb, subtree, compressed_size, (guint)uncompressed_size);
+
+                    /* Add as separate data tab */
+                    payload = tvb_new_child_real_data(tvb, decompressed_buffer,
+                                                      (guint32)uncompressed_size, (guint32)uncompressed_size);
+                    add_new_data_source(pinfo, payload, "Uncompressed Message");
+
+                    /* Dissect as a message set */
+                    dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec);
+
+                    /* Add to summary */
+                    col_append_fstr(pinfo->cinfo, COL_INFO, " [LZ4-compressed message set]");
+                    proto_item_append_text(message_ti, " (LZ4-compressed message set)");
+                } else {
+                fail:
+                    /* Error */
+                    decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA);
+                    expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress);
+                }
+                offset += compressed_size;
+            }
+            break;
+#endif
+
         case KAFKA_MESSAGE_CODEC_NONE:
         default:
             offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);