9f9c3823f1cdea569451aee1d612978341518ccf
[rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2  *
3  * Copyright (C) 1996-2001 by Andrew Tridgell
4  * Copyright (C) Paul Mackerras 1996
5  * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /**
23  * @file io.c
24  *
25  * Socket and pipe I/O utilities used in rsync.
26  *
27  * rsync provides its own multiplexing system, which is used to send
28  * stderr and stdout over a single socket.  We need this because
29  * stdout normally carries the binary data stream, and stderr all our
30  * error messages.
31  *
32  * For historical reasons this is off during the start of the
33  * connection, but it's switched on quite early using
34  * io_start_multiplex_out() and io_start_multiplex_in().
35  **/
36
37 #include "rsync.h"
38
39 /** If no timeout is specified then use a 60 second select timeout */
40 #define SELECT_TIMEOUT 60
41
42 static int io_multiplexing_out;
43 static int io_multiplexing_in;
44 static int multiplex_in_fd = -1;
45 static int multiplex_out_fd = -1;
46 static time_t last_io;
47 static int no_flush;
48
49 extern int bwlimit;
50 extern size_t bwlimit_writemax;
51 extern int verbose;
52 extern int io_timeout;
53 extern int am_server;
54 extern int am_daemon;
55 extern int am_sender;
56 extern int eol_nulls;
57 extern int checksum_seed;
58 extern int protocol_version;
59 extern char *remote_filesfrom_file;
60 extern struct stats stats;
61
62 const char phase_unknown[] = "unknown";
63 int select_timeout = SELECT_TIMEOUT;
64 int batch_fd = -1;
65
66 /**
67  * The connection might be dropped at some point; perhaps because the
68  * remote instance crashed.  Just giving the offset on the stream is
69  * not very helpful.  So instead we try to make io_phase_name point to
70  * something useful.
71  *
72  * For buffered/multiplexed I/O these names will be somewhat
73  * approximate; perhaps for ease of support we would rather make the
74  * buffer always flush when a single application-level I/O finishes.
75  *
76  * @todo Perhaps we want some simple stack functionality, but there's
77  * no need to overdo it.
78  **/
79 const char *io_write_phase = phase_unknown;
80 const char *io_read_phase = phase_unknown;
81
82 /** Ignore EOF errors while reading a module listing if the remote
83     version is 24 or less. */
84 int kludge_around_eof = False;
85
86 int msg_fd_in = -1;
87 int msg_fd_out = -1;
88
89 static int write_batch_monitor_in = -1;
90 static int write_batch_monitor_out = -1;
91
92 static int io_filesfrom_f_in = -1;
93 static int io_filesfrom_f_out = -1;
94 static char io_filesfrom_buf[2048];
95 static char *io_filesfrom_bp;
96 static char io_filesfrom_lastchar;
97 static int io_filesfrom_buflen;
98
99 static void read_loop(int fd, char *buf, size_t len);
100
101 struct redo_list {
102         struct redo_list *next;
103         int num;
104 };
105
106 static struct redo_list *redo_list_head;
107 static struct redo_list *redo_list_tail;
108
109 struct msg_list {
110         struct msg_list *next;
111         char *buf;
112         int len;
113 };
114
115 static struct msg_list *msg_list_head;
116 static struct msg_list *msg_list_tail;
117
118 static void redo_list_add(int num)
119 {
120         struct redo_list *rl;
121
122         if (!(rl = new(struct redo_list)))
123                 exit_cleanup(RERR_MALLOC);
124         rl->next = NULL;
125         rl->num = num;
126         if (redo_list_tail)
127                 redo_list_tail->next = rl;
128         else
129                 redo_list_head = rl;
130         redo_list_tail = rl;
131 }
132
133 static void check_timeout(void)
134 {
135         time_t t;
136
137         if (!io_timeout)
138                 return;
139
140         if (!last_io) {
141                 last_io = time(NULL);
142                 return;
143         }
144
145         t = time(NULL);
146
147         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
148                 if (!am_server && !am_daemon) {
149                         rprintf(FERROR, "io timeout after %d seconds - exiting\n",
150                                 (int)(t-last_io));
151                 }
152                 exit_cleanup(RERR_TIMEOUT);
153         }
154 }
155
156 /** Setup the fd used to receive MSG_* messages.  Only needed when
157  * we're the generator because the sender and receiver both use the
158  * multiplexed I/O setup. */
159 void set_msg_fd_in(int fd)
160 {
161         msg_fd_in = fd;
162 }
163
164 /** Setup the fd used to send our MSG_* messages.  Only needed when
165  * we're the receiver because the generator and the sender both use
166  * the multiplexed I/O setup. */
167 void set_msg_fd_out(int fd)
168 {
169         msg_fd_out = fd;
170         set_nonblocking(msg_fd_out);
171 }
172
173 /* Add a message to the pending MSG_* list. */
174 static void msg_list_add(int code, char *buf, int len)
175 {
176         struct msg_list *ml;
177
178         if (!(ml = new(struct msg_list)))
179                 exit_cleanup(RERR_MALLOC);
180         ml->next = NULL;
181         if (!(ml->buf = new_array(char, len+4)))
182                 exit_cleanup(RERR_MALLOC);
183         SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
184         memcpy(ml->buf+4, buf, len);
185         ml->len = len+4;
186         if (msg_list_tail)
187                 msg_list_tail->next = ml;
188         else
189                 msg_list_head = ml;
190         msg_list_tail = ml;
191 }
192
193 void send_msg(enum msgcode code, char *buf, int len)
194 {
195         msg_list_add(code, buf, len);
196         msg_list_push(NORMAL_FLUSH);
197 }
198
199 /** Read a message from the MSG_* fd and dispatch it.  This is only
200  * called by the generator. */
201 static void read_msg_fd(void)
202 {
203         char buf[2048];
204         size_t n;
205         int fd = msg_fd_in;
206         int tag, len;
207
208         /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
209          * to this routine from read_timeout() and writefd_unbuffered(). */
210         msg_fd_in = -1;
211
212         read_loop(fd, buf, 4);
213         tag = IVAL(buf, 0);
214
215         len = tag & 0xFFFFFF;
216         tag = (tag >> 24) - MPLEX_BASE;
217
218         switch (tag) {
219         case MSG_DONE:
220                 if (len != 0) {
221                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
222                         exit_cleanup(RERR_STREAMIO);
223                 }
224                 redo_list_add(-1);
225                 break;
226         case MSG_REDO:
227                 if (len != 4) {
228                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
229                         exit_cleanup(RERR_STREAMIO);
230                 }
231                 read_loop(fd, buf, 4);
232                 redo_list_add(IVAL(buf,0));
233                 break;
234         case MSG_INFO:
235         case MSG_ERROR:
236         case MSG_LOG:
237                 while (len) {
238                         n = len;
239                         if (n >= sizeof buf)
240                                 n = sizeof buf - 1;
241                         read_loop(fd, buf, n);
242                         rwrite((enum logcode)tag, buf, n);
243                         len -= n;
244                 }
245                 break;
246         default:
247                 rprintf(FERROR, "unknown message %d:%d\n", tag, len);
248                 exit_cleanup(RERR_STREAMIO);
249         }
250
251         msg_fd_in = fd;
252 }
253
254 /* Try to push messages off the list onto the wire.  If we leave with more
255  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
256  * This is only active in the receiver. */
257 int msg_list_push(int flush_it_all)
258 {
259         static int written = 0;
260         struct timeval tv;
261         fd_set fds;
262
263         if (msg_fd_out < 0)
264                 return -1;
265
266         while (msg_list_head) {
267                 struct msg_list *ml = msg_list_head;
268                 int n = write(msg_fd_out, ml->buf + written, ml->len - written);
269                 if (n < 0) {
270                         if (errno == EINTR)
271                                 continue;
272                         if (errno != EWOULDBLOCK && errno != EAGAIN)
273                                 return -1;
274                         if (!flush_it_all)
275                                 return 0;
276                         FD_ZERO(&fds);
277                         FD_SET(msg_fd_out, &fds);
278                         tv.tv_sec = select_timeout;
279                         tv.tv_usec = 0;
280                         if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
281                                 check_timeout();
282                 } else if ((written += n) == ml->len) {
283                         free(ml->buf);
284                         msg_list_head = ml->next;
285                         if (!msg_list_head)
286                                 msg_list_tail = NULL;
287                         free(ml);
288                         written = 0;
289                 }
290         }
291         return 1;
292 }
293
294 int get_redo_num(void)
295 {
296         struct redo_list *next;
297         int num;
298
299         while (!redo_list_head)
300                 read_msg_fd();
301
302         num = redo_list_head->num;
303         next = redo_list_head->next;
304         free(redo_list_head);
305         redo_list_head = next;
306         if (!next)
307                 redo_list_tail = NULL;
308
309         return num;
310 }
311
312 /**
313  * When we're the receiver and we have a local --files-from list of names
314  * that needs to be sent over the socket to the sender, we have to do two
315  * things at the same time: send the sender a list of what files we're
316  * processing and read the incoming file+info list from the sender.  We do
317  * this by augmenting the read_timeout() function to copy this data.  It
318  * uses the io_filesfrom_buf to read a block of data from f_in (when it is
319  * ready, since it might be a pipe) and then blast it out f_out (when it
320  * is ready to receive more data).
321  */
322 void io_set_filesfrom_fds(int f_in, int f_out)
323 {
324         io_filesfrom_f_in = f_in;
325         io_filesfrom_f_out = f_out;
326         io_filesfrom_bp = io_filesfrom_buf;
327         io_filesfrom_lastchar = '\0';
328         io_filesfrom_buflen = 0;
329 }
330
331 /**
332  * It's almost always an error to get an EOF when we're trying to read
333  * from the network, because the protocol is self-terminating.
334  *
335  * However, there is one unfortunate cases where it is not, which is
336  * rsync <2.4.6 sending a list of modules on a server, since the list
337  * is terminated by closing the socket. So, for the section of the
338  * program where that is a problem (start_socket_client),
339  * kludge_around_eof is True and we just exit.
340  */
341 static void whine_about_eof(void)
342 {
343         if (kludge_around_eof)
344                 exit_cleanup(0);
345
346         rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
347                 "(%.0f bytes read so far)\n",
348                 (double)stats.total_read);
349
350         exit_cleanup(RERR_STREAMIO);
351 }
352
353
354 static void die_from_readerr(int err)
355 {
356         /* this prevents us trying to write errors on a dead socket */
357         io_multiplexing_close();
358
359         rsyserr(FERROR, err, "read error");
360         exit_cleanup(RERR_STREAMIO);
361 }
362
363
364 /**
365  * Read from a socket with I/O timeout. return the number of bytes
366  * read. If no bytes can be read then exit, never return a number <= 0.
367  *
368  * TODO: If the remote shell connection fails, then current versions
369  * actually report an "unexpected EOF" error here.  Since it's a
370  * fairly common mistake to try to use rsh when ssh is required, we
371  * should trap that: if we fail to read any data at all, we should
372  * give a better explanation.  We can tell whether the connection has
373  * started by looking e.g. at whether the remote version is known yet.
374  */
375 static int read_timeout(int fd, char *buf, size_t len)
376 {
377         int n, ret = 0;
378
379         io_flush(NORMAL_FLUSH);
380
381         while (ret == 0) {
382                 /* until we manage to read *something* */
383                 fd_set r_fds, w_fds;
384                 struct timeval tv;
385                 int maxfd = fd;
386                 int count;
387
388                 FD_ZERO(&r_fds);
389                 FD_ZERO(&w_fds);
390                 FD_SET(fd, &r_fds);
391                 if (msg_fd_in >= 0) {
392                         FD_SET(msg_fd_in, &r_fds);
393                         if (msg_fd_in > maxfd)
394                                 maxfd = msg_fd_in;
395                 } else if (msg_list_head) {
396                         FD_SET(msg_fd_out, &w_fds);
397                         if (msg_fd_out > maxfd)
398                                 maxfd = msg_fd_out;
399                 }
400                 if (io_filesfrom_f_out >= 0) {
401                         int new_fd;
402                         if (io_filesfrom_buflen == 0) {
403                                 if (io_filesfrom_f_in >= 0) {
404                                         FD_SET(io_filesfrom_f_in, &r_fds);
405                                         new_fd = io_filesfrom_f_in;
406                                 } else {
407                                         io_filesfrom_f_out = -1;
408                                         new_fd = -1;
409                                 }
410                         } else {
411                                 FD_SET(io_filesfrom_f_out, &w_fds);
412                                 new_fd = io_filesfrom_f_out;
413                         }
414                         if (new_fd > maxfd)
415                                 maxfd = new_fd;
416                 }
417
418                 tv.tv_sec = select_timeout;
419                 tv.tv_usec = 0;
420
421                 errno = 0;
422
423                 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
424
425                 if (count <= 0) {
426                         if (errno == EBADF)
427                                 exit_cleanup(RERR_SOCKETIO);
428                         check_timeout();
429                         continue;
430                 }
431
432                 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
433                         read_msg_fd();
434                 else if (msg_list_head && FD_ISSET(msg_fd_out, &w_fds))
435                         msg_list_push(NORMAL_FLUSH);
436
437                 if (io_filesfrom_f_out >= 0) {
438                         if (io_filesfrom_buflen) {
439                                 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
440                                         int l = write(io_filesfrom_f_out,
441                                                       io_filesfrom_bp,
442                                                       io_filesfrom_buflen);
443                                         if (l > 0) {
444                                                 if (!(io_filesfrom_buflen -= l))
445                                                         io_filesfrom_bp = io_filesfrom_buf;
446                                                 else
447                                                         io_filesfrom_bp += l;
448                                         } else {
449                                                 /* XXX should we complain? */
450                                                 io_filesfrom_f_out = -1;
451                                         }
452                                 }
453                         } else if (io_filesfrom_f_in >= 0) {
454                                 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
455                                         int l = read(io_filesfrom_f_in,
456                                                      io_filesfrom_buf,
457                                                      sizeof io_filesfrom_buf);
458                                         if (l <= 0) {
459                                                 /* Send end-of-file marker */
460                                                 io_filesfrom_buf[0] = '\0';
461                                                 io_filesfrom_buf[1] = '\0';
462                                                 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
463                                                 io_filesfrom_f_in = -1;
464                                         } else {
465                                                 if (!eol_nulls) {
466                                                         char *s = io_filesfrom_buf + l;
467                                                         /* Transform CR and/or LF into '\0' */
468                                                         while (s-- > io_filesfrom_buf) {
469                                                                 if (*s == '\n' || *s == '\r')
470                                                                         *s = '\0';
471                                                         }
472                                                 }
473                                                 if (!io_filesfrom_lastchar) {
474                                                         /* Last buf ended with a '\0', so don't
475                                                          * let this buf start with one. */
476                                                         while (l && !*io_filesfrom_bp)
477                                                                 io_filesfrom_bp++, l--;
478                                                 }
479                                                 if (!l)
480                                                         io_filesfrom_bp = io_filesfrom_buf;
481                                                 else {
482                                                         char *f = io_filesfrom_bp;
483                                                         char *t = f;
484                                                         char *eob = f + l;
485                                                         /* Eliminate any multi-'\0' runs. */
486                                                         while (f != eob) {
487                                                                 if (!(*t++ = *f++)) {
488                                                                         while (f != eob && !*f)
489                                                                                 f++, l--;
490                                                                 }
491                                                         }
492                                                         io_filesfrom_lastchar = f[-1];
493                                                 }
494                                                 io_filesfrom_buflen = l;
495                                         }
496                                 }
497                         }
498                 }
499
500                 if (!FD_ISSET(fd, &r_fds))
501                         continue;
502
503                 n = read(fd, buf, len);
504
505                 if (n <= 0) {
506                         if (n == 0)
507                                 whine_about_eof(); /* Doesn't return. */
508                         if (errno == EINTR || errno == EWOULDBLOCK
509                             || errno == EAGAIN)
510                                 continue;
511                         die_from_readerr(errno); /* Doesn't return. */
512                 }
513
514                 buf += n;
515                 len -= n;
516                 ret += n;
517                 if (io_timeout)
518                         last_io = time(NULL);
519         }
520
521         return ret;
522 }
523
524 /**
525  * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
526  * characters long).
527  */
528 int read_filesfrom_line(int fd, char *fname)
529 {
530         char ch, *s, *eob = fname + MAXPATHLEN - 1;
531         int cnt;
532         int reading_remotely = remote_filesfrom_file != NULL;
533         int nulls = eol_nulls || reading_remotely;
534
535   start:
536         s = fname;
537         while (1) {
538                 cnt = read(fd, &ch, 1);
539                 if (cnt < 0 && (errno == EWOULDBLOCK
540                   || errno == EINTR || errno == EAGAIN)) {
541                         struct timeval tv;
542                         fd_set fds;
543                         FD_ZERO(&fds);
544                         FD_SET(fd, &fds);
545                         tv.tv_sec = select_timeout;
546                         tv.tv_usec = 0;
547                         if (!select(fd+1, &fds, NULL, NULL, &tv))
548                                 check_timeout();
549                         continue;
550                 }
551                 if (cnt != 1)
552                         break;
553                 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
554                         /* Skip empty lines if reading locally. */
555                         if (!reading_remotely && s == fname)
556                                 continue;
557                         break;
558                 }
559                 if (s < eob)
560                         *s++ = ch;
561         }
562         *s = '\0';
563
564         /* Dump comments. */
565         if (*fname == '#' || *fname == ';')
566                 goto start;
567
568         return s - fname;
569 }
570
571
572 /**
573  * Continue trying to read len bytes - don't return until len has been
574  * read.
575  **/
576 static void read_loop(int fd, char *buf, size_t len)
577 {
578         while (len) {
579                 int n = read_timeout(fd, buf, len);
580
581                 buf += n;
582                 len -= n;
583         }
584 }
585
586
587 /**
588  * Read from the file descriptor handling multiplexing - return number
589  * of bytes read.
590  *
591  * Never returns <= 0.
592  */
593 static int readfd_unbuffered(int fd, char *buf, size_t len)
594 {
595         static size_t remaining;
596         int tag, ret = 0;
597         char line[1024];
598         static char *buffer;
599         static size_t bufferIdx = 0;
600         static size_t bufferSz;
601
602         if (fd != multiplex_in_fd)
603                 return read_timeout(fd, buf, len);
604
605         if (!io_multiplexing_in && remaining == 0) {
606                 if (!buffer) {
607                         bufferSz = 2 * IO_BUFFER_SIZE;
608                         buffer   = new_array(char, bufferSz);
609                         if (!buffer)
610                                 out_of_memory("readfd_unbuffered");
611                 }
612                 remaining = read_timeout(fd, buffer, bufferSz);
613                 bufferIdx = 0;
614         }
615
616         while (ret == 0) {
617                 if (remaining) {
618                         len = MIN(len, remaining);
619                         memcpy(buf, buffer + bufferIdx, len);
620                         bufferIdx += len;
621                         remaining -= len;
622                         ret = len;
623                         break;
624                 }
625
626                 read_loop(fd, line, 4);
627                 tag = IVAL(line, 0);
628
629                 remaining = tag & 0xFFFFFF;
630                 tag = (tag >> 24) - MPLEX_BASE;
631
632                 switch (tag) {
633                 case MSG_DATA:
634                         if (!buffer || remaining > bufferSz) {
635                                 buffer = realloc_array(buffer, char, remaining);
636                                 if (!buffer)
637                                         out_of_memory("readfd_unbuffered");
638                                 bufferSz = remaining;
639                         }
640                         read_loop(fd, buffer, remaining);
641                         bufferIdx = 0;
642                         break;
643                 case MSG_INFO:
644                 case MSG_ERROR:
645                         if (remaining >= sizeof line) {
646                                 rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
647                                         tag, (long)remaining);
648                                 exit_cleanup(RERR_STREAMIO);
649                         }
650                         read_loop(fd, line, remaining);
651                         rwrite((enum logcode)tag, line, remaining);
652                         remaining = 0;
653                         break;
654                 default:
655                         rprintf(FERROR, "unexpected tag %d\n", tag);
656                         exit_cleanup(RERR_STREAMIO);
657                 }
658         }
659
660         if (remaining == 0)
661                 io_flush(NORMAL_FLUSH);
662
663         return ret;
664 }
665
666
667
668 /**
669  * Do a buffered read from @p fd.  Don't return until all @p n bytes
670  * have been read.  If all @p n can't be read then exit with an
671  * error.
672  **/
673 static void readfd(int fd, char *buffer, size_t N)
674 {
675         int  ret;
676         size_t total = 0;
677
678         while (total < N) {
679                 ret = readfd_unbuffered(fd, buffer + total, N-total);
680                 total += ret;
681         }
682
683         if (fd == write_batch_monitor_in) {
684                 if ((size_t)write(batch_fd, buffer, total) != total)
685                         exit_cleanup(RERR_FILEIO);
686         }
687         
688         stats.total_read += total;
689 }
690
691
692 int32 read_int(int f)
693 {
694         char b[4];
695         int32 ret;
696
697         readfd(f,b,4);
698         ret = IVAL(b,0);
699         if (ret == (int32)0xffffffff)
700                 return -1;
701         return ret;
702 }
703
704 int64 read_longint(int f)
705 {
706         int64 ret;
707         char b[8];
708         ret = read_int(f);
709
710         if ((int32)ret != (int32)0xffffffff)
711                 return ret;
712
713 #ifdef NO_INT64
714         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
715         exit_cleanup(RERR_UNSUPPORTED);
716 #else
717         readfd(f,b,8);
718         ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
719 #endif
720
721         return ret;
722 }
723
724 void read_buf(int f,char *buf,size_t len)
725 {
726         readfd(f,buf,len);
727 }
728
729 void read_sbuf(int f,char *buf,size_t len)
730 {
731         read_buf(f,buf,len);
732         buf[len] = 0;
733 }
734
735 unsigned char read_byte(int f)
736 {
737         unsigned char c;
738         read_buf(f, (char *)&c, 1);
739         return c;
740 }
741
742
743 /**
744  * Sleep after writing to limit I/O bandwidth usage.
745  *
746  * @todo Rather than sleeping after each write, it might be better to
747  * use some kind of averaging.  The current algorithm seems to always
748  * use a bit less bandwidth than specified, because it doesn't make up
749  * for slow periods.  But arguably this is a feature.  In addition, we
750  * ought to take the time used to write the data into account.
751  *
752  * During some phases of big transfers (file FOO is uptodate) this is
753  * called with a small bytes_written every time.  As the kernel has to
754  * round small waits up to guarantee that we actually wait at least the
755  * requested number of microseconds, this can become grossly inaccurate.
756  * We therefore keep track of the bytes we've written over time and only
757  * sleep when the accumulated delay is at least 1 tenth of a second.
758  **/
759 static void sleep_for_bwlimit(int bytes_written)
760 {
761         static struct timeval prior_tv;
762         static long total_written = 0; 
763         struct timeval tv, start_tv;
764         long elapsed_usec, sleep_usec;
765
766 #define ONE_SEC 1000000L /* # of microseconds in a second */
767
768         if (!bwlimit)
769                 return;
770
771         total_written += bytes_written; 
772
773         gettimeofday(&start_tv, NULL);
774         if (prior_tv.tv_sec) {
775                 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
776                              + (start_tv.tv_usec - prior_tv.tv_usec);
777                 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
778                 if (total_written < 0)
779                         total_written = 0;
780         }
781
782         sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
783         if (sleep_usec < ONE_SEC / 10) {
784                 prior_tv = start_tv;
785                 return;
786         }
787
788         tv.tv_sec  = sleep_usec / ONE_SEC;
789         tv.tv_usec = sleep_usec % ONE_SEC;
790         select(0, NULL, NULL, NULL, &tv);
791
792         gettimeofday(&prior_tv, NULL);
793         elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
794                      + (prior_tv.tv_usec - start_tv.tv_usec);
795         total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
796 }
797
798
799 /**
800  * Write len bytes to the file descriptor @p fd.
801  *
802  * This function underlies the multiplexing system.  The body of the
803  * application never calls this function directly.
804  **/
805 static void writefd_unbuffered(int fd,char *buf,size_t len)
806 {
807         size_t n, total = 0;
808         fd_set w_fds, r_fds;
809         int maxfd, count, ret;
810         struct timeval tv;
811
812         if (fd == msg_fd_out) {
813                 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
814                 exit_cleanup(RERR_PROTOCOL);
815         }
816
817         no_flush++;
818
819         while (total < len) {
820                 FD_ZERO(&w_fds);
821                 FD_SET(fd,&w_fds);
822                 maxfd = fd;
823
824                 if (msg_fd_in >= 0) {
825                         FD_ZERO(&r_fds);
826                         FD_SET(msg_fd_in,&r_fds);
827                         if (msg_fd_in > maxfd)
828                                 maxfd = msg_fd_in;
829                 }
830
831                 tv.tv_sec = select_timeout;
832                 tv.tv_usec = 0;
833
834                 errno = 0;
835                 count = select(maxfd + 1, msg_fd_in >= 0 ? &r_fds : NULL,
836                                &w_fds, NULL, &tv);
837
838                 if (count <= 0) {
839                         if (count < 0 && errno == EBADF)
840                                 exit_cleanup(RERR_SOCKETIO);
841                         check_timeout();
842                         continue;
843                 }
844
845                 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
846                         read_msg_fd();
847
848                 if (!FD_ISSET(fd, &w_fds))
849                         continue;
850
851                 n = len - total;
852                 if (bwlimit && n > bwlimit_writemax)
853                         n = bwlimit_writemax;
854                 ret = write(fd, buf + total, n);
855
856                 if (ret <= 0) {
857                         if (ret < 0) {
858                                 if (errno == EINTR)
859                                         continue;
860                                 if (errno == EWOULDBLOCK || errno == EAGAIN) {
861                                         msleep(1);
862                                         continue;
863                                 }
864                         }
865
866                         /* Don't try to write errors back across the stream. */
867                         io_multiplexing_close();
868                         rsyserr(FERROR, errno,
869                                 "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
870                                 (long)len, io_write_phase);
871                         exit_cleanup(RERR_STREAMIO);
872                 }
873
874                 sleep_for_bwlimit(ret);
875
876                 total += ret;
877
878                 if (io_timeout)
879                         last_io = time(NULL);
880         }
881
882         no_flush--;
883 }
884
885
886 static char *io_buffer;
887 static int io_buffer_count;
888
889 void io_start_buffering_out(int fd)
890 {
891         if (io_buffer)
892                 return;
893         multiplex_out_fd = fd;
894         io_buffer = new_array(char, IO_BUFFER_SIZE);
895         if (!io_buffer)
896                 out_of_memory("writefd");
897         io_buffer_count = 0;
898 }
899
900 void io_start_buffering_in(int fd)
901 {
902         multiplex_in_fd = fd;
903 }
904
905 /**
906  * Write an message to a multiplexed stream. If this fails then rsync
907  * exits.
908  **/
909 static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
910 {
911         char buffer[4096];
912         size_t n = len;
913
914         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
915
916         if (n > sizeof buffer - 4)
917                 n = sizeof buffer - 4;
918
919         memcpy(&buffer[4], buf, n);
920         writefd_unbuffered(fd, buffer, n+4);
921
922         len -= n;
923         buf += n;
924
925         if (len)
926                 writefd_unbuffered(fd, buf, len);
927 }
928
929
930 void io_flush(int flush_it_all)
931 {
932         int fd = multiplex_out_fd;
933
934         msg_list_push(flush_it_all);
935
936         if (!io_buffer_count || no_flush)
937                 return;
938
939         if (io_multiplexing_out)
940                 mplex_write(fd, MSG_DATA, io_buffer, io_buffer_count);
941         else
942                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
943         io_buffer_count = 0;
944 }
945
946
947 void io_end_buffering(void)
948 {
949         io_flush(NORMAL_FLUSH);
950         if (!io_multiplexing_out) {
951                 free(io_buffer);
952                 io_buffer = NULL;
953         }
954 }
955
956 static void writefd(int fd,char *buf,size_t len)
957 {
958         stats.total_written += len;
959
960         if (fd == msg_fd_out) {
961                 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
962                 exit_cleanup(RERR_PROTOCOL);
963         }
964
965         if (fd == write_batch_monitor_out) {
966                 if ((size_t)write(batch_fd, buf, len) != len)
967                         exit_cleanup(RERR_FILEIO);
968         }
969
970         if (!io_buffer || fd != multiplex_out_fd) {
971                 writefd_unbuffered(fd, buf, len);
972                 return;
973         }
974
975         while (len) {
976                 int n = MIN((int)len, IO_BUFFER_SIZE-io_buffer_count);
977                 if (n > 0) {
978                         memcpy(io_buffer+io_buffer_count, buf, n);
979                         buf += n;
980                         len -= n;
981                         io_buffer_count += n;
982                 }
983
984                 if (io_buffer_count == IO_BUFFER_SIZE)
985                         io_flush(NORMAL_FLUSH);
986         }
987 }
988
989
990 void write_int(int f,int32 x)
991 {
992         char b[4];
993         SIVAL(b,0,x);
994         writefd(f,b,4);
995 }
996
997
998 void write_int_named(int f, int32 x, const char *phase)
999 {
1000         io_write_phase = phase;
1001         write_int(f, x);
1002         io_write_phase = phase_unknown;
1003 }
1004
1005
1006 /*
1007  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1008  * 64-bit types on this platform.
1009  */
1010 void write_longint(int f, int64 x)
1011 {
1012         char b[8];
1013
1014         if (x <= 0x7FFFFFFF) {
1015                 write_int(f, (int)x);
1016                 return;
1017         }
1018
1019 #ifdef NO_INT64
1020         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
1021         exit_cleanup(RERR_UNSUPPORTED);
1022 #else
1023         write_int(f, (int32)0xFFFFFFFF);
1024         SIVAL(b,0,(x&0xFFFFFFFF));
1025         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1026
1027         writefd(f,b,8);
1028 #endif
1029 }
1030
1031 void write_buf(int f,char *buf,size_t len)
1032 {
1033         writefd(f,buf,len);
1034 }
1035
1036 /** Write a string to the connection */
1037 static void write_sbuf(int f,char *buf)
1038 {
1039         write_buf(f, buf, strlen(buf));
1040 }
1041
1042
1043 void write_byte(int f,unsigned char c)
1044 {
1045         write_buf(f,(char *)&c,1);
1046 }
1047
1048
1049
1050 /**
1051  * Read a line of up to @p maxlen characters into @p buf (not counting
1052  * the trailing null).  Strips the (required) trailing newline and all
1053  * carriage returns.
1054  *
1055  * @return 1 for success; 0 for I/O error or truncation.
1056  **/
1057 int read_line(int f, char *buf, size_t maxlen)
1058 {
1059         while (maxlen) {
1060                 buf[0] = 0;
1061                 read_buf(f, buf, 1);
1062                 if (buf[0] == 0)
1063                         return 0;
1064                 if (buf[0] == '\n')
1065                         break;
1066                 if (buf[0] != '\r') {
1067                         buf++;
1068                         maxlen--;
1069                 }
1070         }
1071         *buf = '\0';
1072         return maxlen > 0;
1073 }
1074
1075
1076 void io_printf(int fd, const char *format, ...)
1077 {
1078         va_list ap;
1079         char buf[1024];
1080         int len;
1081
1082         va_start(ap, format);
1083         len = vsnprintf(buf, sizeof buf, format, ap);
1084         va_end(ap);
1085
1086         if (len < 0)
1087                 exit_cleanup(RERR_STREAMIO);
1088
1089         write_sbuf(fd, buf);
1090 }
1091
1092
1093 /** Setup for multiplexing a MSG_* stream with the data stream. */
1094 void io_start_multiplex_out(int fd)
1095 {
1096         multiplex_out_fd = fd;
1097         io_flush(NORMAL_FLUSH);
1098         io_start_buffering_out(fd);
1099         io_multiplexing_out = 1;
1100 }
1101
1102 /** Setup for multiplexing a MSG_* stream with the data stream. */
1103 void io_start_multiplex_in(int fd)
1104 {
1105         multiplex_in_fd = fd;
1106         io_flush(NORMAL_FLUSH);
1107         io_multiplexing_in = 1;
1108 }
1109
1110 /** Write an message to the multiplexed data stream. */
1111 int io_multiplex_write(enum msgcode code, char *buf, size_t len)
1112 {
1113         if (!io_multiplexing_out)
1114                 return 0;
1115
1116         io_flush(NORMAL_FLUSH);
1117         stats.total_written += (len+4);
1118         mplex_write(multiplex_out_fd, code, buf, len);
1119         return 1;
1120 }
1121
1122 /** Stop output multiplexing. */
1123 void io_multiplexing_close(void)
1124 {
1125         io_multiplexing_out = 0;
1126 }
1127
1128 void start_write_batch(int fd)
1129 {
1130         /* Some communication has already taken place, but we don't
1131          * enable batch writing until here so that we can write a
1132          * canonical record of the communication even though the
1133          * actual communication so far depends on whether a daemon
1134          * is involved. */
1135         write_int(batch_fd, protocol_version);
1136         write_int(batch_fd, checksum_seed);
1137         stats.total_written -= sizeof (int) * 2;
1138
1139         if (am_sender)
1140                 write_batch_monitor_out = fd;
1141         else
1142                 write_batch_monitor_in = fd;
1143 }
1144
1145 void stop_write_batch(void)
1146 {
1147         write_batch_monitor_out = -1;
1148         write_batch_monitor_in = -1;
1149 }