f15637e4e4a8ef796c01ba7e0064218317959ee4
[rsync.git] / token.c
1 /*
2  * Routines used by the file-transfer code.
3  *
4  * Copyright (C) 1996 Andrew Tridgell
5  * Copyright (C) 1996 Paul Mackerras
6  * Copyright (C) 2003-2020 Wayne Davison
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License along
19  * with this program; if not, visit the http://fsf.org website.
20  */
21
22 #include "rsync.h"
23 #include "itypes.h"
24 #include <zlib.h>
25 #ifdef SUPPORT_ZSTD
26 #include <zstd.h>
27 #endif
28 #ifdef SUPPORT_LZ4
29 #include <lz4.h>
30 #endif
31
32 extern int do_compression;
33 extern int protocol_version;
34 extern int module_id;
35 extern int do_compression_level;
36 extern char *skip_compress;
37
38 #ifndef Z_INSERT_ONLY
39 #define Z_INSERT_ONLY Z_SYNC_FLUSH
40 #endif
41
42 static int compression_level; /* The compression level for the current file. */
43 static int skip_compression_level; /* The least possible compressing for handling skip-compress files. */
44 static int per_file_default_level; /* The default level that each new file gets prior to checking its suffix. */
45
46 struct suffix_tree {
47         struct suffix_tree *sibling;
48         struct suffix_tree *child;
49         char letter, word_end;
50 };
51
52 static char *match_list;
53 static struct suffix_tree *suftree;
54
55 void init_compression_level(void)
56 {
57         int min_level, max_level, def_level, off_level;
58
59         switch (do_compression) {
60         case CPRES_NONE:
61                 return;
62         case CPRES_ZLIB:
63         case CPRES_ZLIBX:
64                 min_level = 1;
65                 max_level = Z_BEST_COMPRESSION;
66                 def_level = 6; /* Z_DEFAULT_COMPRESSION is -1, so set it to the real default */
67                 off_level = skip_compression_level = Z_NO_COMPRESSION;
68                 if (do_compression_level == Z_DEFAULT_COMPRESSION)
69                         do_compression_level = def_level;
70                 break;
71 #ifdef SUPPORT_ZSTD
72         case CPRES_ZSTD:
73                 min_level = skip_compression_level = ZSTD_minCLevel();
74                 max_level = ZSTD_maxCLevel();
75                 def_level = ZSTD_CLEVEL_DEFAULT;
76                 off_level = CLVL_NOT_SPECIFIED;
77                 if (do_compression_level == 0)
78                         do_compression_level = def_level;
79                 break;
80 #endif
81 #ifdef SUPPORT_LZ4
82         case CPRES_LZ4:
83                 min_level = skip_compression_level = 0;
84                 max_level = 0;
85                 def_level = 0;
86                 off_level = CLVL_NOT_SPECIFIED;
87                 break;
88 #endif
89         default: /* paranoia to prevent missing case values */
90                 assert(0);
91         }
92
93         if (do_compression_level == CLVL_NOT_SPECIFIED)
94                 do_compression_level = def_level;
95         else if (do_compression_level == off_level) {
96                 do_compression = CPRES_NONE;
97                 return;
98         }
99
100         /* We don't bother with any errors or warnings -- just make sure that the values are valid. */
101         if (do_compression_level < min_level)
102                 do_compression_level = min_level;
103         else if (do_compression_level > max_level)
104                 do_compression_level = max_level;
105 }
106
107 static void add_suffix(struct suffix_tree **prior, char ltr, const char *str)
108 {
109         struct suffix_tree *node, *newnode;
110
111         if (ltr == '[') {
112                 const char *after = strchr(str, ']');
113                 /* Treat "[foo" and "[]" as having a literal '['. */
114                 if (after && after++ != str+1) {
115                         while ((ltr = *str++) != ']')
116                                 add_suffix(prior, ltr, after);
117                         return;
118                 }
119         }
120
121         for (node = *prior; node; prior = &node->sibling, node = node->sibling) {
122                 if (node->letter == ltr) {
123                         if (*str)
124                                 add_suffix(&node->child, *str, str+1);
125                         else
126                                 node->word_end = 1;
127                         return;
128                 }
129                 if (node->letter > ltr)
130                         break;
131         }
132         newnode = new(struct suffix_tree);
133         newnode->sibling = node;
134         newnode->child = NULL;
135         newnode->letter = ltr;
136         *prior = newnode;
137         if (*str) {
138                 add_suffix(&newnode->child, *str, str+1);
139                 newnode->word_end = 0;
140         } else
141                 newnode->word_end = 1;
142 }
143
144 static void add_nocompress_suffixes(const char *str)
145 {
146         char *buf, *t;
147         const char *f = str;
148
149         buf = new_array(char, strlen(f) + 1);
150
151         while (*f) {
152                 if (*f == '/') {
153                         f++;
154                         continue;
155                 }
156
157                 t = buf;
158                 do {
159                         if (isUpper(f))
160                                 *t++ = toLower(f);
161                         else
162                                 *t++ = *f;
163                 } while (*++f != '/' && *f);
164                 *t++ = '\0';
165
166                 add_suffix(&suftree, *buf, buf+1);
167         }
168
169         free(buf);
170 }
171
172 static void init_set_compression(void)
173 {
174         const char *f;
175         char *t, *start;
176
177         if (skip_compress)
178                 add_nocompress_suffixes(skip_compress);
179
180         /* A non-daemon transfer skips the default suffix list if the
181          * user specified --skip-compress. */
182         if (skip_compress && module_id < 0)
183                 f = "";
184         else
185                 f = lp_dont_compress(module_id);
186
187         match_list = t = new_array(char, strlen(f) + 2);
188
189         per_file_default_level = do_compression_level;
190
191         while (*f) {
192                 if (*f == ' ') {
193                         f++;
194                         continue;
195                 }
196
197                 start = t;
198                 do {
199                         if (isUpper(f))
200                                 *t++ = toLower(f);
201                         else
202                                 *t++ = *f;
203                 } while (*++f != ' ' && *f);
204                 *t++ = '\0';
205
206                 if (t - start == 1+1 && *start == '*') {
207                         /* Optimize a match-string of "*". */
208                         *match_list = '\0';
209                         suftree = NULL;
210                         per_file_default_level = skip_compression_level;
211                         break;
212                 }
213
214                 /* Move *.foo items into the stuffix tree. */
215                 if (*start == '*' && start[1] == '.' && start[2]
216                  && !strpbrk(start+2, ".?*")) {
217                         add_suffix(&suftree, start[2], start+3);
218                         t = start;
219                 }
220         }
221         *t++ = '\0';
222 }
223
224 /* determine the compression level based on a wildcard filename list */
225 void set_compression(const char *fname)
226 {
227         const struct suffix_tree *node;
228         const char *s;
229         char ltr;
230
231         if (!do_compression)
232                 return;
233
234         if (!match_list)
235                 init_set_compression();
236
237         compression_level = per_file_default_level;
238
239         if (!*match_list && !suftree)
240                 return;
241
242         if ((s = strrchr(fname, '/')) != NULL)
243                 fname = s + 1;
244
245         for (s = match_list; *s; s += strlen(s) + 1) {
246                 if (iwildmatch(s, fname)) {
247                         compression_level = skip_compression_level;
248                         return;
249                 }
250         }
251
252         if (!(node = suftree) || !(s = strrchr(fname, '.'))
253          || s == fname || !(ltr = *++s))
254                 return;
255
256         while (1) {
257                 if (isUpper(&ltr))
258                         ltr = toLower(&ltr);
259                 while (node->letter != ltr) {
260                         if (node->letter > ltr)
261                                 return;
262                         if (!(node = node->sibling))
263                                 return;
264                 }
265                 if ((ltr = *++s) == '\0') {
266                         if (node->word_end)
267                                 compression_level = skip_compression_level;
268                         return;
269                 }
270                 if (!(node = node->child))
271                         return;
272         }
273 }
274
275 /* non-compressing recv token */
276 static int32 simple_recv_token(int f, char **data)
277 {
278         static int32 residue;
279         static char *buf;
280         int32 n;
281
282         if (!buf)
283                 buf = new_array(char, CHUNK_SIZE);
284
285         if (residue == 0) {
286                 int32 i = read_int(f);
287                 if (i <= 0)
288                         return i;
289                 residue = i;
290         }
291
292         *data = buf;
293         n = MIN(CHUNK_SIZE,residue);
294         residue -= n;
295         read_buf(f,buf,n);
296         return n;
297 }
298
299 /* non-compressing send token */
300 static void simple_send_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 n)
301 {
302         if (n > 0) {
303                 int32 len = 0;
304                 while (len < n) {
305                         int32 n1 = MIN(CHUNK_SIZE, n-len);
306                         write_int(f, n1);
307                         write_buf(f, map_ptr(buf, offset+len, n1), n1);
308                         len += n1;
309                 }
310         }
311         /* a -2 token means to send data only and no token */
312         if (token != -2)
313                 write_int(f, -(token+1));
314 }
315
316 /* Flag bytes in compressed stream are encoded as follows: */
317 #define END_FLAG        0       /* that's all folks */
318 #define TOKEN_LONG      0x20    /* followed by 32-bit token number */
319 #define TOKENRUN_LONG   0x21    /* ditto with 16-bit run count */
320 #define DEFLATED_DATA   0x40    /* + 6-bit high len, then low len byte */
321 #define TOKEN_REL       0x80    /* + 6-bit relative token number */
322 #define TOKENRUN_REL    0xc0    /* ditto with 16-bit run count */
323
324 #define MAX_DATA_COUNT  16383   /* fit 14 bit count into 2 bytes with flags */
325
326 /* zlib.h says that if we want to be able to compress something in a single
327  * call, avail_out must be at least 0.1% larger than avail_in plus 12 bytes.
328  * We'll add in 0.1%+16, just to be safe (and we'll avoid floating point,
329  * to ensure that this is a compile-time value). */
330 #define AVAIL_OUT_SIZE(avail_in_size) ((avail_in_size)*1001/1000+16)
331
332 /* For coding runs of tokens */
333 static int32 last_token = -1;
334 static int32 run_start;
335 static int32 last_run_end;
336
337 /* Deflation state */
338 static z_stream tx_strm;
339
340 /* Output buffer */
341 static char *obuf;
342
343 /* We want obuf to be able to hold both MAX_DATA_COUNT+2 bytes as well as
344  * AVAIL_OUT_SIZE(CHUNK_SIZE) bytes, so make sure that it's large enough. */
345 #if MAX_DATA_COUNT+2 > AVAIL_OUT_SIZE(CHUNK_SIZE)
346 #define OBUF_SIZE       (MAX_DATA_COUNT+2)
347 #else
348 #define OBUF_SIZE       AVAIL_OUT_SIZE(CHUNK_SIZE)
349 #endif
350
351 /* Send a deflated token */
352 static void
353 send_deflated_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb, int32 toklen)
354 {
355         static int init_done, flush_pending;
356         int32 n, r;
357
358         if (last_token == -1) {
359                 /* initialization */
360                 if (!init_done) {
361                         tx_strm.next_in = NULL;
362                         tx_strm.zalloc = NULL;
363                         tx_strm.zfree = NULL;
364                         if (deflateInit2(&tx_strm, compression_level,
365                                          Z_DEFLATED, -15, 8,
366                                          Z_DEFAULT_STRATEGY) != Z_OK) {
367                                 rprintf(FERROR, "compression init failed\n");
368                                 exit_cleanup(RERR_PROTOCOL);
369                         }
370                         obuf = new_array(char, OBUF_SIZE);
371                         init_done = 1;
372                 } else
373                         deflateReset(&tx_strm);
374                 last_run_end = 0;
375                 run_start = token;
376                 flush_pending = 0;
377         } else if (last_token == -2) {
378                 run_start = token;
379         } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
380                 /* output previous run */
381                 r = run_start - last_run_end;
382                 n = last_token - run_start;
383                 if (r >= 0 && r <= 63) {
384                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
385                 } else {
386                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
387                         write_int(f, run_start);
388                 }
389                 if (n != 0) {
390                         write_byte(f, n);
391                         write_byte(f, n >> 8);
392                 }
393                 last_run_end = last_token;
394                 run_start = token;
395         }
396
397         last_token = token;
398
399         if (nb != 0 || flush_pending) {
400                 /* deflate the data starting at offset */
401                 int flush = Z_NO_FLUSH;
402                 tx_strm.avail_in = 0;
403                 tx_strm.avail_out = 0;
404                 do {
405                         if (tx_strm.avail_in == 0 && nb != 0) {
406                                 /* give it some more input */
407                                 n = MIN(nb, CHUNK_SIZE);
408                                 tx_strm.next_in = (Bytef *)
409                                         map_ptr(buf, offset, n);
410                                 tx_strm.avail_in = n;
411                                 nb -= n;
412                                 offset += n;
413                         }
414                         if (tx_strm.avail_out == 0) {
415                                 tx_strm.next_out = (Bytef *)(obuf + 2);
416                                 tx_strm.avail_out = MAX_DATA_COUNT;
417                                 if (flush != Z_NO_FLUSH) {
418                                         /*
419                                          * We left the last 4 bytes in the
420                                          * buffer, in case they are the
421                                          * last 4.  Move them to the front.
422                                          */
423                                         memcpy(tx_strm.next_out, obuf+MAX_DATA_COUNT-2, 4);
424                                         tx_strm.next_out += 4;
425                                         tx_strm.avail_out -= 4;
426                                 }
427                         }
428                         if (nb == 0 && token != -2)
429                                 flush = Z_SYNC_FLUSH;
430                         r = deflate(&tx_strm, flush);
431                         if (r != Z_OK) {
432                                 rprintf(FERROR, "deflate returned %d\n", r);
433                                 exit_cleanup(RERR_STREAMIO);
434                         }
435                         if (nb == 0 || tx_strm.avail_out == 0) {
436                                 n = MAX_DATA_COUNT - tx_strm.avail_out;
437                                 if (flush != Z_NO_FLUSH) {
438                                         /*
439                                          * We have to trim off the last 4
440                                          * bytes of output when flushing
441                                          * (they are just 0, 0, ff, ff).
442                                          */
443                                         n -= 4;
444                                 }
445                                 if (n > 0) {
446                                         obuf[0] = DEFLATED_DATA + (n >> 8);
447                                         obuf[1] = n;
448                                         write_buf(f, obuf, n+2);
449                                 }
450                         }
451                 } while (nb != 0 || tx_strm.avail_out == 0);
452                 flush_pending = token == -2;
453         }
454
455         if (token == -1) {
456                 /* end of file - clean up */
457                 write_byte(f, END_FLAG);
458         } else if (token != -2 && do_compression == CPRES_ZLIB) {
459                 /* Add the data in the current block to the compressor's
460                  * history and hash table. */
461                 do {
462                         /* Break up long sections in the same way that
463                          * see_deflate_token() does. */
464                         int32 n1 = toklen > 0xffff ? 0xffff : toklen;
465                         toklen -= n1;
466                         tx_strm.next_in = (Bytef *)map_ptr(buf, offset, n1);
467                         tx_strm.avail_in = n1;
468                         if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
469                                 offset += n1;
470                         tx_strm.next_out = (Bytef *) obuf;
471                         tx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
472                         r = deflate(&tx_strm, Z_INSERT_ONLY);
473                         if (r != Z_OK || tx_strm.avail_in != 0) {
474                                 rprintf(FERROR, "deflate on token returned %d (%d bytes left)\n",
475                                         r, tx_strm.avail_in);
476                                 exit_cleanup(RERR_STREAMIO);
477                         }
478                 } while (toklen > 0);
479         }
480 }
481
482 /* tells us what the receiver is in the middle of doing */
483 static enum { r_init, r_idle, r_running, r_inflating, r_inflated } recv_state;
484
485 /* for inflating stuff */
486 static z_stream rx_strm;
487 static char *cbuf;
488 static char *dbuf;
489
490 /* for decoding runs of tokens */
491 static int32 rx_token;
492 static int32 rx_run;
493
494 /* Receive a deflated token and inflate it */
495 static int32 recv_deflated_token(int f, char **data)
496 {
497         static int init_done;
498         static int32 saved_flag;
499         int32 n, flag;
500         int r;
501
502         for (;;) {
503                 switch (recv_state) {
504                 case r_init:
505                         if (!init_done) {
506                                 rx_strm.next_out = NULL;
507                                 rx_strm.zalloc = NULL;
508                                 rx_strm.zfree = NULL;
509                                 if (inflateInit2(&rx_strm, -15) != Z_OK) {
510                                         rprintf(FERROR, "inflate init failed\n");
511                                         exit_cleanup(RERR_PROTOCOL);
512                                 }
513                                 cbuf = new_array(char, MAX_DATA_COUNT);
514                                 dbuf = new_array(char, AVAIL_OUT_SIZE(CHUNK_SIZE));
515                                 init_done = 1;
516                         } else {
517                                 inflateReset(&rx_strm);
518                         }
519                         recv_state = r_idle;
520                         rx_token = 0;
521                         break;
522
523                 case r_idle:
524                 case r_inflated:
525                         if (saved_flag) {
526                                 flag = saved_flag & 0xff;
527                                 saved_flag = 0;
528                         } else
529                                 flag = read_byte(f);
530                         if ((flag & 0xC0) == DEFLATED_DATA) {
531                                 n = ((flag & 0x3f) << 8) + read_byte(f);
532                                 read_buf(f, cbuf, n);
533                                 rx_strm.next_in = (Bytef *)cbuf;
534                                 rx_strm.avail_in = n;
535                                 recv_state = r_inflating;
536                                 break;
537                         }
538                         if (recv_state == r_inflated) {
539                                 /* check previous inflated stuff ended correctly */
540                                 rx_strm.avail_in = 0;
541                                 rx_strm.next_out = (Bytef *)dbuf;
542                                 rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
543                                 r = inflate(&rx_strm, Z_SYNC_FLUSH);
544                                 n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
545                                 /*
546                                  * Z_BUF_ERROR just means no progress was
547                                  * made, i.e. the decompressor didn't have
548                                  * any pending output for us.
549                                  */
550                                 if (r != Z_OK && r != Z_BUF_ERROR) {
551                                         rprintf(FERROR, "inflate flush returned %d (%d bytes)\n",
552                                                 r, n);
553                                         exit_cleanup(RERR_STREAMIO);
554                                 }
555                                 if (n != 0 && r != Z_BUF_ERROR) {
556                                         /* have to return some more data and
557                                            save the flag for later. */
558                                         saved_flag = flag + 0x10000;
559                                         *data = dbuf;
560                                         return n;
561                                 }
562                                 /*
563                                  * At this point the decompressor should
564                                  * be expecting to see the 0, 0, ff, ff bytes.
565                                  */
566                                 if (!inflateSyncPoint(&rx_strm)) {
567                                         rprintf(FERROR, "decompressor lost sync!\n");
568                                         exit_cleanup(RERR_STREAMIO);
569                                 }
570                                 rx_strm.avail_in = 4;
571                                 rx_strm.next_in = (Bytef *)cbuf;
572                                 cbuf[0] = cbuf[1] = 0;
573                                 cbuf[2] = cbuf[3] = 0xff;
574                                 inflate(&rx_strm, Z_SYNC_FLUSH);
575                                 recv_state = r_idle;
576                         }
577                         if (flag == END_FLAG) {
578                                 /* that's all folks */
579                                 recv_state = r_init;
580                                 return 0;
581                         }
582
583                         /* here we have a token of some kind */
584                         if (flag & TOKEN_REL) {
585                                 rx_token += flag & 0x3f;
586                                 flag >>= 6;
587                         } else
588                                 rx_token = read_int(f);
589                         if (flag & 1) {
590                                 rx_run = read_byte(f);
591                                 rx_run += read_byte(f) << 8;
592                                 recv_state = r_running;
593                         }
594                         return -1 - rx_token;
595
596                 case r_inflating:
597                         rx_strm.next_out = (Bytef *)dbuf;
598                         rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
599                         r = inflate(&rx_strm, Z_NO_FLUSH);
600                         n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
601                         if (r != Z_OK) {
602                                 rprintf(FERROR, "inflate returned %d (%d bytes)\n", r, n);
603                                 exit_cleanup(RERR_STREAMIO);
604                         }
605                         if (rx_strm.avail_in == 0)
606                                 recv_state = r_inflated;
607                         if (n != 0) {
608                                 *data = dbuf;
609                                 return n;
610                         }
611                         break;
612
613                 case r_running:
614                         ++rx_token;
615                         if (--rx_run == 0)
616                                 recv_state = r_idle;
617                         return -1 - rx_token;
618                 }
619         }
620 }
621
622 /*
623  * put the data corresponding to a token that we've just returned
624  * from recv_deflated_token into the decompressor's history buffer.
625  */
626 static void see_deflate_token(char *buf, int32 len)
627 {
628         int r;
629         int32 blklen;
630         unsigned char hdr[5];
631
632         rx_strm.avail_in = 0;
633         blklen = 0;
634         hdr[0] = 0;
635         do {
636                 if (rx_strm.avail_in == 0 && len != 0) {
637                         if (blklen == 0) {
638                                 /* Give it a fake stored-block header. */
639                                 rx_strm.next_in = (Bytef *)hdr;
640                                 rx_strm.avail_in = 5;
641                                 blklen = len;
642                                 if (blklen > 0xffff)
643                                         blklen = 0xffff;
644                                 hdr[1] = blklen;
645                                 hdr[2] = blklen >> 8;
646                                 hdr[3] = ~hdr[1];
647                                 hdr[4] = ~hdr[2];
648                         } else {
649                                 rx_strm.next_in = (Bytef *)buf;
650                                 rx_strm.avail_in = blklen;
651                                 if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
652                                         buf += blklen;
653                                 len -= blklen;
654                                 blklen = 0;
655                         }
656                 }
657                 rx_strm.next_out = (Bytef *)dbuf;
658                 rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
659                 r = inflate(&rx_strm, Z_SYNC_FLUSH);
660                 if (r != Z_OK && r != Z_BUF_ERROR) {
661                         rprintf(FERROR, "inflate (token) returned %d\n", r);
662                         exit_cleanup(RERR_STREAMIO);
663                 }
664         } while (len || rx_strm.avail_out == 0);
665 }
666
667 #ifdef SUPPORT_ZSTD
668
669 static ZSTD_inBuffer zstd_in_buff;
670 static ZSTD_outBuffer zstd_out_buff;
671 static ZSTD_CCtx *zstd_cctx;
672
673 static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
674 {
675         static int comp_init_done, flush_pending;
676         ZSTD_EndDirective flush = ZSTD_e_continue;
677         int32 n, r;
678
679         /* initialization */
680         if (!comp_init_done) {
681                 zstd_cctx = ZSTD_createCCtx();
682                 if (!zstd_cctx) {
683                         rprintf(FERROR, "compression init failed\n");
684                         exit_cleanup(RERR_PROTOCOL);
685                 }
686
687                 obuf = new_array(char, OBUF_SIZE);
688
689                 ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level);
690                 zstd_out_buff.dst = obuf + 2;
691
692                 comp_init_done = 1;
693         }
694
695         if (last_token == -1) {
696                 last_run_end = 0;
697                 run_start = token;
698                 flush_pending = 0;
699         } else if (last_token == -2) {
700                 run_start = token;
701         } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
702                 /* output previous run */
703                 r = run_start - last_run_end;
704                 n = last_token - run_start;
705
706                 if (r >= 0 && r <= 63) {
707                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
708                 } else {
709                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
710                         write_int(f, run_start);
711                 }
712                 if (n != 0) {
713                         write_byte(f, n);
714                         write_byte(f, n >> 8);
715                 }
716                 last_run_end = last_token;
717                 run_start = token;
718         }
719
720         last_token = token;
721
722         if (nb || flush_pending) {
723
724                 zstd_in_buff.src = map_ptr(buf, offset, nb);
725                 zstd_in_buff.size = nb;
726                 zstd_in_buff.pos = 0;
727
728                 do {
729                         if (zstd_out_buff.size == 0) {
730                                 zstd_out_buff.size = MAX_DATA_COUNT;
731                                 zstd_out_buff.pos = 0;
732                         }
733
734                         /* File ended, flush */
735                         if (token != -2)
736                                 flush = ZSTD_e_flush;
737
738                         r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush);
739                         if (ZSTD_isError(r)) {
740                                 rprintf(FERROR, "ZSTD_compressStream returned %d\n", r);
741                                 exit_cleanup(RERR_STREAMIO);
742                         }
743
744                         /*
745                          * Nothing is sent if the buffer isn't full so avoid smaller
746                          * transfers. If a file is finished then we flush the internal
747                          * state and send a smaller buffer so that the remote side can
748                          * finish the file.
749                          */
750                         if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
751                                 n = zstd_out_buff.pos;
752
753                                 obuf[0] = DEFLATED_DATA + (n >> 8);
754                                 obuf[1] = n;
755                                 write_buf(f, obuf, n+2);
756
757                                 zstd_out_buff.size = 0;
758                         }
759                         /*
760                          * Loop while the input buffer isn't full consumed or the
761                          * internal state isn't fully flushed.
762                          */
763                 } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
764                 flush_pending = token == -2;
765         }
766
767         if (token == -1) {
768                 /* end of file - clean up */
769                 write_byte(f, END_FLAG);
770         }
771 }
772
773 static ZSTD_DCtx *zstd_dctx;
774
775 static int32 recv_zstd_token(int f, char **data)
776 {
777         static int decomp_init_done;
778         static int out_buffer_size;
779         int32 n, flag;
780         int r;
781
782         if (!decomp_init_done) {
783                 zstd_dctx = ZSTD_createDCtx();
784                 if (!zstd_dctx) {
785                         rprintf(FERROR, "ZSTD_createDStream failed\n");
786                         exit_cleanup(RERR_PROTOCOL);
787                 }
788
789                 /* Output buffer fits two decompressed blocks */
790                 out_buffer_size = ZSTD_DStreamOutSize() * 2;
791                 cbuf = new_array(char, MAX_DATA_COUNT);
792                 dbuf = new_array(char, out_buffer_size);
793
794                 zstd_in_buff.src = cbuf;
795                 zstd_out_buff.dst = dbuf;
796
797                 decomp_init_done = 1;
798         }
799
800         for (;;) {
801                 switch (recv_state) {
802                 case r_init:
803                         recv_state = r_idle;
804                         rx_token = 0;
805                         break;
806
807                 case r_idle:
808                         flag = read_byte(f);
809                         if ((flag & 0xC0) == DEFLATED_DATA) {
810                                 n = ((flag & 0x3f) << 8) + read_byte(f);
811                                 read_buf(f, cbuf, n);
812
813                                 zstd_in_buff.size = n;
814                                 zstd_in_buff.pos = 0;
815
816                                 recv_state = r_inflating;
817                                 break;
818                         }
819
820                         if (flag == END_FLAG) {
821                                 /* that's all folks */
822                                 recv_state = r_init;
823                                 return 0;
824                         }
825                         /* here we have a token of some kind */
826                         if (flag & TOKEN_REL) {
827                                 rx_token += flag & 0x3f;
828                                 flag >>= 6;
829                         } else
830                                 rx_token = read_int(f);
831                         if (flag & 1) {
832                                 rx_run = read_byte(f);
833                                 rx_run += read_byte(f) << 8;
834                                 recv_state = r_running;
835                         }
836                         return -1 - rx_token;
837
838                 case r_inflated: /* zstd doesn't get into this state */
839                         break;
840
841                 case r_inflating:
842                         zstd_out_buff.size = out_buffer_size;
843                         zstd_out_buff.pos = 0;
844
845                         r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff);
846                         n = zstd_out_buff.pos;
847                         if (ZSTD_isError(r)) {
848                                 rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n);
849                                 exit_cleanup(RERR_STREAMIO);
850                         }
851
852                         /*
853                          * If the input buffer is fully consumed and the output
854                          * buffer is not full then next step is to read more
855                          * data.
856                          */
857                         if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size)
858                                 recv_state = r_idle;
859
860                         if (n != 0) {
861                                 *data = dbuf;
862                                 return n;
863                         }
864                         break;
865
866                 case r_running:
867                         ++rx_token;
868                         if (--rx_run == 0)
869                                 recv_state = r_idle;
870                         return -1 - rx_token;
871                 }
872         }
873 }
874 #endif /* SUPPORT_ZSTD */
875
876 #ifdef SUPPORT_LZ4
877 static void
878 send_compressed_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
879 {
880         static int init_done, flush_pending;
881         int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
882         int32 n, r;
883
884         if (last_token == -1) {
885                 if (!init_done) {
886                         obuf = new_array(char, size);
887                         init_done = 1;
888                 }
889                 last_run_end = 0;
890                 run_start = token;
891                 flush_pending = 0;
892         } else if (last_token == -2) {
893                 run_start = token;
894         } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
895                 /* output previous run */
896                 r = run_start - last_run_end;
897                 n = last_token - run_start;
898                 if (r >= 0 && r <= 63) {
899                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
900                 } else {
901                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
902                         write_int(f, run_start);
903                 }
904                 if (n != 0) {
905                         write_byte(f, n);
906                         write_byte(f, n >> 8);
907                 }
908                 last_run_end = last_token;
909                 run_start = token;
910         }
911
912         last_token = token;
913
914         if (nb != 0 || flush_pending) {
915                 int available_in, available_out = 0;
916                 const char *next_in;
917
918                 do {
919                         char *next_out = obuf + 2;
920
921                         if (available_out == 0) {
922                                 available_in = MIN(nb, MAX_DATA_COUNT);
923                                 next_in = map_ptr(buf, offset, available_in);
924                         } else
925                                 available_in /= 2;
926
927                         available_out = LZ4_compress_default(next_in, next_out, available_in, size - 2);
928                         if (!available_out) {
929                                 rprintf(FERROR, "compress returned %d\n", available_out);
930                                 exit_cleanup(RERR_STREAMIO);
931                         }
932                         if (available_out <= MAX_DATA_COUNT) {
933                                 obuf[0] = DEFLATED_DATA + (available_out >> 8);
934                                 obuf[1] = available_out;
935
936                                 write_buf(f, obuf, available_out + 2);
937
938                                 available_out = 0;
939                                 nb -= available_in;
940                                 offset += available_in;
941                         }
942                 } while (nb != 0);
943                 flush_pending = token == -2;
944         }
945         if (token == -1) {
946                 /* end of file - clean up */
947                 write_byte(f, END_FLAG);
948         }
949 }
950
951 static int32 recv_compressed_token(int f, char **data)
952 {
953         static int init_done;
954         int32 n, flag;
955         int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
956         static const char *next_in;
957         static int avail_in;
958         int avail_out;
959
960         for (;;) {
961                 switch (recv_state) {
962                 case r_init:
963                         if (!init_done) {
964                                 cbuf = new_array(char, MAX_DATA_COUNT);
965                                 dbuf = new_array(char, size);
966                                 init_done = 1;
967                         }
968                         recv_state = r_idle;
969                         rx_token = 0;
970                         break;
971
972                 case r_idle:
973                         flag = read_byte(f);
974                         if ((flag & 0xC0) == DEFLATED_DATA) {
975                                 n = ((flag & 0x3f) << 8) + read_byte(f);
976                                 read_buf(f, cbuf, n);
977                                 next_in = (char *)cbuf;
978                                 avail_in = n;
979                                 recv_state = r_inflating;
980                                 break;
981                         }
982
983                         if (flag == END_FLAG) {
984                                 /* that's all folks */
985                                 recv_state = r_init;
986                                 return 0;
987                         }
988
989                         /* here we have a token of some kind */
990                         if (flag & TOKEN_REL) {
991                                 rx_token += flag & 0x3f;
992                                 flag >>= 6;
993                         } else
994                                 rx_token = read_int(f);
995                         if (flag & 1) {
996                                 rx_run = read_byte(f);
997                                 rx_run += read_byte(f) << 8;
998                                 recv_state = r_running;
999                         }
1000                         return -1 - rx_token;
1001
1002                 case r_inflating:
1003                         avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, size);
1004                         if (avail_out < 0) {
1005                                 rprintf(FERROR, "uncompress failed: %d\n", avail_out);
1006                                 exit_cleanup(RERR_STREAMIO);
1007                         }
1008                         recv_state = r_idle;
1009                         *data = dbuf;
1010                         return avail_out;
1011
1012                 case r_inflated: /* lz4 doesn't get into this state */
1013                         break;
1014
1015                 case r_running:
1016                         ++rx_token;
1017                         if (--rx_run == 0)
1018                                 recv_state = r_idle;
1019                         return -1 - rx_token;
1020                 }
1021         }
1022 }
1023 #endif /* SUPPORT_LZ4 */
1024
1025 /**
1026  * Transmit a verbatim buffer of length @p n followed by a token.
1027  * If token == -1 then we have reached EOF
1028  * If n == 0 then don't send a buffer
1029  */
1030 void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
1031                 int32 n, int32 toklen)
1032 {
1033         switch (do_compression) {
1034         case CPRES_NONE:
1035                 simple_send_token(f, token, buf, offset, n);
1036                 break;
1037         case CPRES_ZLIB:
1038         case CPRES_ZLIBX:
1039                 send_deflated_token(f, token, buf, offset, n, toklen);
1040                 break;
1041 #ifdef SUPPORT_ZSTD
1042         case CPRES_ZSTD:
1043                 send_zstd_token(f, token, buf, offset, n);
1044                 break;
1045 #endif
1046 #ifdef SUPPORT_LZ4
1047         case CPRES_LZ4:
1048                 send_compressed_token(f, token, buf, offset, n);
1049                 break;
1050 #endif
1051         default:
1052                 assert(0);
1053         }
1054 }
1055
1056 /*
1057  * receive a token or buffer from the other end. If the return value is >0 then
1058  * it is a data buffer of that length, and *data will point at the data.
1059  * if the return value is -i then it represents token i-1
1060  * if the return value is 0 then the end has been reached
1061  */
1062 int32 recv_token(int f, char **data)
1063 {
1064         switch (do_compression) {
1065         case CPRES_NONE:
1066                 return simple_recv_token(f,data);
1067         case CPRES_ZLIB:
1068         case CPRES_ZLIBX:
1069                 return recv_deflated_token(f, data);
1070 #ifdef SUPPORT_ZSTD
1071         case CPRES_ZSTD:
1072                 return recv_zstd_token(f, data);
1073 #endif
1074 #ifdef SUPPORT_LZ4
1075         case CPRES_LZ4:
1076                 return recv_compressed_token(f, data);
1077 #endif
1078         default:
1079                 assert(0);
1080         }
1081 }
1082
1083 /*
1084  * look at the data corresponding to a token, if necessary
1085  */
1086 void see_token(char *data, int32 toklen)
1087 {
1088         switch (do_compression) {
1089         case CPRES_NONE:
1090                 break;
1091         case CPRES_ZLIB:
1092                 see_deflate_token(data, toklen);
1093                 break;
1094         case CPRES_ZLIBX:
1095                 break;
1096 #ifdef SUPPORT_LZ4
1097         case CPRES_LZ4:
1098                 /*see_uncompressed_token(data, toklen);*/
1099                 break;
1100 #endif
1101 #ifdef SUPPORT_LZ4
1102         case CPRES_ZSTD:
1103                 break;
1104 #endif
1105         default:
1106                 assert(0);
1107         }
1108 }