Include lz4 compression support.
authorWayne Davison <wayne@opencoder.net>
Mon, 25 May 2020 20:43:25 +0000 (13:43 -0700)
committerWayne Davison <wayne@opencoder.net>
Mon, 25 May 2020 20:45:56 +0000 (13:45 -0700)
Based on a patch that was emailed to me without a valid return address.

compat.c
configure.ac
token.c

index 59c050453826b931179db24345fc62e3c35f0379..4bef882069a013f8bc751eed316e60aa0c52da05 100644 (file)
--- a/compat.c
+++ b/compat.c
@@ -93,6 +93,10 @@ struct name_num_obj valid_compressions = {
 #ifdef SUPPORT_ZSTD
                /* TODO decide where in the default preference order this should go. */
                { CPRES_ZSTD, "zstd", NULL },
+#endif
+#ifdef SUPPORT_LZ4
+               /* TODO decide where in the default preference order this should go. */
+               { CPRES_LZ4, "lz4", NULL },
 #endif
                { CPRES_NONE, "none", NULL },
                { 0, NULL, NULL }
index c6bd7d69ded2f26cd728c982988891263b78f0b9..2b65ef9dd0acef1d069ff1e7f2a16de7433c38b0 100644 (file)
@@ -381,7 +381,7 @@ AC_CHECK_HEADERS(sys/fcntl.h sys/select.h fcntl.h sys/time.h sys/unistd.h \
     netdb.h malloc.h float.h limits.h iconv.h libcharset.h langinfo.h \
     sys/acl.h acl/libacl.h attr/xattr.h sys/xattr.h sys/extattr.h \
     popt.h popt/popt.h linux/falloc.h netinet/in_systm.h netinet/ip.h \
-    zlib.h xxhash.h openssl/md4.h openssl/md5.h zstd.h)
+    zlib.h xxhash.h openssl/md4.h openssl/md5.h zstd.h lz4.h)
 AC_HEADER_MAJOR_FIXED
 
 AC_MSG_CHECKING([whether to enable use of openssl crypto library])
@@ -420,6 +420,18 @@ else
     AC_MSG_RESULT(no)
 fi
 
+AC_MSG_CHECKING([whether to enable LZ4 compression])
+AC_ARG_ENABLE([lz4],
+        AC_HELP_STRING([--disable-lz4], [disable LZ4 compression]))
+AH_TEMPLATE([SUPPORT_LZ4],
+[Undefine if you do not want LZ4 compression.  By default this is defined.])
+if test x"$enable_lz4" != x"no" && test x"$ac_cv_header_lz4_h" = x"yes"; then
+    AC_MSG_RESULT(yes)
+    AC_SEARCH_LIBS(LZ4_compress, lz4, [AC_DEFINE(SUPPORT_LZ4)])
+else
+    AC_MSG_RESULT(no)
+fi
+
 AC_CACHE_CHECK([if makedev takes 3 args],rsync_cv_MAKEDEV_TAKES_3_ARGS,[
 AC_RUN_IFELSE([AC_LANG_SOURCE([[
 #include <sys/types.h>
diff --git a/token.c b/token.c
index 7dde4e9ea27d770faf85a0883fd8a62950d63e6f..f92b9b5b4e649c1a92006ecc94eb641ec34142fc 100644 (file)
--- a/token.c
+++ b/token.c
@@ -25,6 +25,9 @@
 #ifdef SUPPORT_ZSTD
 #include <zstd.h>
 #endif
+#ifdef SUPPORT_LZ4
+#include <lz4.h>
+#endif
 
 extern int do_compression;
 extern int protocol_version;
@@ -68,6 +71,14 @@ void init_compression_level(void)
                def_level = 3;
                off_level = CLVL_NOT_SPECIFIED;
                break;
+#endif
+#ifdef SUPPORT_LZ4
+       case CPRES_LZ4:
+               min_level = 0;
+               max_level = 0;
+               def_level = 0;
+               off_level = CLVL_NOT_SPECIFIED;
+               break;
 #endif
        default: /* paranoia to prevent missing case values */
                exit_cleanup(RERR_UNSUPPORTED);
@@ -879,7 +890,210 @@ static int32 recv_zstd_token(int f, char **data)
        }
        } while (1);
 }
-#endif
+#endif /* SUPPORT_ZSTD */
+
+#ifdef SUPPORT_LZ4
+static void
+send_compressed_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
+{
+       static int init_done, flush_pending;
+       int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
+       int32 n, r;
+
+       if (last_token == -1) {
+               if (!init_done) {
+                       if ((obuf = new_array(char, size)) == NULL)
+                               out_of_memory("send_compressed_token");
+                       init_done = 1;
+               }
+               last_run_end = 0;
+               run_start = token;
+               flush_pending = 0;
+       } else if (last_token == -2) {
+               run_start = token;
+       } else if (nb != 0 || token != last_token + 1
+                  || token >= run_start + 65536) {
+               /* output previous run */
+               r = run_start - last_run_end;
+               n = last_token - run_start;
+               if (r >= 0 && r <= 63) {
+                       write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
+               } else {
+                       write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
+                       write_int(f, run_start);
+               }
+               if (n != 0) {
+                       write_byte(f, n);
+                       write_byte(f, n >> 8);
+               }
+               last_run_end = last_token;
+               run_start = token;
+       }
+
+       last_token = token;
+
+       if (nb != 0 || flush_pending) {
+               int available_in, available_out = 0;
+               const char *next_in;
+
+               do {
+                       char *ptr = obuf;
+                       char *next_out = obuf + 2;
+
+                       if (available_out == 0) {
+                               available_in = MIN(nb, MAX_DATA_COUNT);
+                               next_in = map_ptr(buf, offset, available_in);
+                       } else
+                               available_in /= 2;
+
+                       available_out = LZ4_compress(next_in, next_out, available_in);
+                       if (!available_out) {
+                               rprintf(FERROR, "compress returned %d\n", available_out);
+                               exit_cleanup(RERR_STREAMIO);
+                       }
+                       if (available_out <= MAX_DATA_COUNT) {
+                               ptr[0] = DEFLATED_DATA + (available_out >> 8);
+                               ptr[1] = available_out;
+
+                               write_buf(f, ptr, available_out + 2);
+
+                               available_out = 0;
+                               nb -= available_in;
+                               offset += available_in;
+                       }
+               } while (nb != 0);
+               flush_pending = token == -2;
+       }
+       if (token == -1)
+               /* end of file - clean up */
+               write_byte(f, END_FLAG);
+}
+
+static int32 recv_compressed_token(int f, char **data)
+{
+       static int32 saved_flag;
+       static int init_done;
+       int32 n, flag;
+       int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
+       static const char *next_in;
+       static int avail_in;
+       int avail_out;
+
+       for (;;) {
+               switch (recv_state) {
+               case r_init:
+                       if (!init_done) {
+                               if (!(cbuf = new_array(char, MAX_DATA_COUNT))
+                                   || !(dbuf = new_array(char, size)))
+                                       out_of_memory("recv_compressed_token");
+                               init_done = 1;
+                       }
+                       recv_state = r_idle;
+                       rx_token = 0;
+                       break;
+               case r_idle:
+               case r_inflated:
+                       if (saved_flag) {
+                               flag = saved_flag & 0xff;
+                               saved_flag = 0;
+                       } else
+                               flag = read_byte(f);
+                       if ((flag & 0xC0) == DEFLATED_DATA) {
+                               n = ((flag & 0x3f) << 8) + read_byte(f);
+                               read_buf(f, cbuf, n);
+                               next_in = (char *)cbuf;
+                               avail_in = n;
+                               recv_state = r_inflating;
+                               break;
+                       }
+
+                       if (recv_state == r_inflated)
+                               recv_state = r_idle;
+
+                       if (flag == END_FLAG) {
+                               /* that's all folks */
+                               recv_state = r_init;
+                               return 0;
+                       }
+
+                       /* here we have a token of some kind */
+                       if (flag & TOKEN_REL) {
+                               rx_token += flag & 0x3f;
+                               flag >>= 6;
+                       } else
+                               rx_token = read_int(f);
+                       if (flag & 1) {
+                               rx_run = read_byte(f);
+                               rx_run += read_byte(f) << 8;
+                               recv_state = r_running;
+                       }
+                       return -1 - rx_token;
+
+               case r_inflating:
+                       avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, size);
+                       if (avail_out < 0) {
+                               rprintf(FERROR, "uncompress failed: %d\n", avail_out);
+                               exit_cleanup(RERR_STREAMIO);
+                       }
+                       recv_state = r_inflated;
+                       *data = dbuf;
+                       return avail_out;
+
+               case r_running:
+                       ++rx_token;
+                       if (--rx_run == 0)
+                               recv_state = r_idle;
+                       return -1 - rx_token;
+               }
+       }
+
+}
+
+# if 0
+static void see_uncompressed_token(char *buf, int32 len)
+{
+       static const char *next_in;
+       static int avail_in;
+       int avail_out;
+
+       int32 blklen;
+       char hdr[5];
+
+       avail_in = 0;
+       blklen = 0;
+       hdr[0] = 0;
+       do {
+               if (avail_in == 0 && len != 0) {
+                       if (blklen == 0) {
+                               /* Give it a fake stored-block header. */
+                               next_in = hdr;
+                               avail_in = 5;
+                               blklen = len;
+                               if (blklen > 0xffff)
+                                       blklen = 0xffff;
+                               hdr[1] = blklen;
+                               hdr[2] = blklen >> 8;
+                               hdr[3] = ~hdr[1];
+                               hdr[4] = ~hdr[2];
+                       } else {
+                               next_in = (char *)buf;
+                               avail_in = blklen;
+                               if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
+                                       buf += blklen;
+                               len -= blklen;
+                               blklen = 0;
+                       }
+               }
+               avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, LZ4_compressBound(CHUNK_SIZE));
+               if (avail_out < 0) {
+                       rprintf(FERROR, "uncompress failed: %d\n", avail_out);
+                       exit_cleanup(RERR_STREAMIO);
+               }
+
+       } while (len);
+}
+# endif /* 0 */
+#endif /* SUPPORT_LZ4 */
 
 /**
  * Transmit a verbatim buffer of length @p n followed by a token.
@@ -894,6 +1108,10 @@ void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
 #ifdef SUPPORT_ZSTD
        else if (do_compression == CPRES_ZSTD)
                send_zstd_token(f, token, buf, offset, n);
+#endif
+#ifdef SUPPORT_LZ4
+       else if (do_compression == CPRES_LZ4)
+               send_compressed_token(f, token, buf, offset, n);
 #endif
        else
                send_deflated_token(f, token, buf, offset, n, toklen);
@@ -914,6 +1132,10 @@ int32 recv_token(int f, char **data)
 #ifdef SUPPORT_ZSTD
        else if (do_compression == CPRES_ZSTD)
                tok = recv_zstd_token(f, data);
+#endif
+#ifdef SUPPORT_LZ4
+       else if (do_compression == CPRES_LZ4)
+               tok = recv_compressed_token(f, data);
 #endif
        else /* CPRES_ZLIB & CPRES_ZLIBX */
                tok = recv_deflated_token(f, data);
@@ -927,4 +1149,10 @@ void see_token(char *data, int32 toklen)
 {
        if (do_compression == CPRES_ZLIB)
                see_deflate_token(data, toklen);
+#ifdef SUPPORT_LZ4
+# if 0
+       else if (do_compression == CPRES_LZ4)
+               see_uncompressed_token(data, toklen);
+# endif
+#endif
 }