1 /* -*- c-file-style: "linux" -*-
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 socket and pipe IO utilities used in rsync
29 /* if no timeout is specified then use a 60 second select timeout */
30 #define SELECT_TIMEOUT 60
32 static int io_multiplexing_out;
33 static int io_multiplexing_in;
34 static int multiplex_in_fd;
35 static int multiplex_out_fd;
36 static time_t last_io;
41 extern int io_timeout;
42 extern struct stats stats;
45 /** Ignore EOF errors while reading a module listing if the remote
46 version is 24 or less. */
47 int kludge_around_eof = False;
50 static int io_error_fd = -1;
52 static void read_loop(int fd, char *buf, size_t len);
54 static void check_timeout(void)
56 extern int am_server, am_daemon;
61 if (!io_timeout) return;
70 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
71 if (!am_server && !am_daemon) {
72 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
75 exit_cleanup(RERR_TIMEOUT);
79 /* setup the fd used to propogate errors */
80 void io_set_error_fd(int fd)
85 /* read some data from the error fd and write it to the write log code */
86 static void read_error_fd(void)
93 /* io_error_fd is temporarily disabled -- is this meant to
94 * prevent indefinite recursion? */
97 read_loop(fd, buf, 4);
100 len = tag & 0xFFFFFF;
106 if (n > (sizeof(buf)-1))
108 read_loop(fd, buf, n);
109 rwrite((enum logcode)tag, buf, n);
117 static void whine_about_eof (void)
120 It's almost always an error to get an EOF when we're trying
121 to read from the network, because the protocol is
124 However, there is one unfortunate cases where it is not,
125 which is rsync <2.4.6 sending a list of modules on a
126 server, since the list is terminated by closing the socket.
127 So, for the section of the program where that is a problem
128 (start_socket_client), kludge_around_eof is True and we
132 if (kludge_around_eof)
136 "%s: connection unexpectedly closed "
137 "(%.0f bytes read so far)\n",
138 RSYNC_NAME, (double)stats.total_read);
140 exit_cleanup (RERR_STREAMIO);
145 static void die_from_readerr (int err)
147 /* this prevents us trying to write errors on a dead socket */
148 io_multiplexing_close();
150 rprintf(FERROR, "%s: read error: %s\n",
151 RSYNC_NAME, strerror (err));
152 exit_cleanup(RERR_STREAMIO);
157 * Read from a socket with IO timeout. return the number of bytes
158 * read. If no bytes can be read then exit, never return a number <= 0.
160 * TODO: If the remote shell connection fails, then current versions
161 * actually report an "unexpected EOF" error here. Since it's a
162 * fairly common mistake to try to use rsh when ssh is required, we
163 * should trap that: if we fail to read any data at all, we should
164 * give a better explanation. We can tell whether the connection has
165 * started by looking e.g. at whether the remote version is known yet.
167 static int read_timeout (int fd, char *buf, size_t len)
174 /* until we manage to read *something* */
182 if (io_error_fd != -1) {
183 FD_SET(io_error_fd, &fds);
184 if (io_error_fd > fd) fd_count = io_error_fd+1;
187 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
192 count = select(fd_count, &fds, NULL, NULL, &tv);
199 if (errno == EBADF) {
200 exit_cleanup(RERR_SOCKETIO);
205 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
209 if (!FD_ISSET(fd, &fds)) continue;
211 n = read(fd, buf, len);
218 last_io = time(NULL);
222 return -1; /* doesn't return */
223 } else if (n == -1) {
224 if (errno == EINTR || errno == EWOULDBLOCK ||
228 die_from_readerr (errno);
238 /*! Continue trying to read len bytes - don't return until len has
240 static void read_loop (int fd, char *buf, size_t len)
243 int n = read_timeout(fd, buf, len);
252 * Read from the file descriptor handling multiplexing - return number
255 * Never returns <= 0.
257 static int read_unbuffered(int fd, char *buf, size_t len)
259 static size_t remaining;
263 if (!io_multiplexing_in || fd != multiplex_in_fd)
264 return read_timeout(fd, buf, len);
268 len = MIN(len, remaining);
269 read_loop(fd, buf, len);
275 read_loop(fd, line, 4);
278 remaining = tag & 0xFFFFFF;
281 if (tag == MPLEX_BASE)
286 if (tag != FERROR && tag != FINFO) {
287 rprintf(FERROR, "unexpected tag %d\n", tag);
288 exit_cleanup(RERR_STREAMIO);
291 if (remaining > sizeof(line) - 1) {
292 rprintf(FERROR, "multiplexing overflow %d\n\n",
294 exit_cleanup(RERR_STREAMIO);
297 read_loop(fd, line, remaining);
300 rprintf((enum logcode) tag, "%s", line);
309 /* do a buffered read from fd. don't return until all N bytes
310 have been read. If all N can't be read then exit with an error */
311 static void readfd (int fd, char *buffer, size_t N)
319 ret = read_unbuffered (fd, buffer + total, N-total);
323 stats.total_read += total;
327 int32 read_int(int f)
334 if (ret == (int32)0xffffffff) return -1;
338 int64 read_longint(int f)
340 extern int remote_version;
345 if ((int32)ret != (int32)0xffffffff) {
350 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
351 exit_cleanup(RERR_UNSUPPORTED);
353 if (remote_version >= 16) {
355 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
362 void read_buf(int f,char *buf,size_t len)
367 void read_sbuf(int f,char *buf,size_t len)
369 read_buf (f,buf,len);
373 unsigned char read_byte(int f)
376 read_buf (f, (char *)&c, 1);
380 /* write len bytes to fd */
381 static void writefd_unbuffered(int fd,char *buf,size_t len)
392 while (total < len) {
398 if (io_error_fd != -1) {
399 FD_SET(io_error_fd,&r_fds);
400 if (io_error_fd > fd_count)
401 fd_count = io_error_fd;
404 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
409 count = select(fd_count+1,
410 io_error_fd != -1?&r_fds:NULL,
419 if (errno == EBADF) {
420 exit_cleanup(RERR_SOCKETIO);
425 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
429 if (FD_ISSET(fd, &w_fds)) {
431 size_t n = len-total;
432 ret = write(fd,buf+total,n);
434 if (ret == -1 && errno == EINTR) {
439 (errno == EWOULDBLOCK || errno == EAGAIN)) {
446 "error writing %d unbuffered bytes"
447 " - exiting: %s\n", len,
449 exit_cleanup(RERR_STREAMIO);
452 /* Sleep after writing to limit I/O bandwidth */
456 tv.tv_usec = ret * 1000 / bwlimit;
457 while (tv.tv_usec > 1000000)
460 tv.tv_usec -= 1000000;
462 select(0, NULL, NULL, NULL, &tv);
468 last_io = time(NULL);
476 static char *io_buffer;
477 static int io_buffer_count;
479 void io_start_buffering(int fd)
481 if (io_buffer) return;
482 multiplex_out_fd = fd;
483 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
484 if (!io_buffer) out_of_memory("writefd");
488 /* write an message to a multiplexed stream. If this fails then rsync
490 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
495 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
497 if (n > (sizeof(buffer)-4)) {
498 n = sizeof(buffer)-4;
501 memcpy(&buffer[4], buf, n);
502 writefd_unbuffered(fd, buffer, n+4);
508 writefd_unbuffered(fd, buf, len);
515 int fd = multiplex_out_fd;
519 if (!io_buffer_count || no_flush) return;
521 if (io_multiplexing_out) {
522 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
524 writefd_unbuffered(fd, io_buffer, io_buffer_count);
530 /* XXX: fd is ignored, which seems a little strange. */
531 void io_end_buffering(int fd)
534 if (!io_multiplexing_out) {
540 static void writefd(int fd,char *buf,size_t len)
542 stats.total_written += len;
546 if (!io_buffer || fd != multiplex_out_fd) {
547 writefd_unbuffered(fd, buf, len);
552 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
554 memcpy(io_buffer+io_buffer_count, buf, n);
557 io_buffer_count += n;
560 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
565 void write_int(int f,int32 x)
574 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
575 * 64-bit types on this platform.
577 void write_longint(int f, int64 x)
579 extern int remote_version;
582 if (remote_version < 16 || x <= 0x7FFFFFFF) {
583 write_int(f, (int)x);
587 write_int(f, (int32)0xFFFFFFFF);
588 SIVAL(b,0,(x&0xFFFFFFFF));
589 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
594 void write_buf(int f,char *buf,size_t len)
599 /* write a string to the connection */
600 static void write_sbuf(int f,char *buf)
602 write_buf(f, buf, strlen(buf));
606 void write_byte(int f,unsigned char c)
608 write_buf(f,(char *)&c,1);
613 int read_line(int f, char *buf, size_t maxlen)
618 if (buf[0] == 0) return 0;
619 if (buf[0] == '\n') {
623 if (buf[0] != '\r') {
637 void io_printf(int fd, const char *format, ...)
643 va_start(ap, format);
644 len = vsnprintf(buf, sizeof(buf), format, ap);
647 if (len < 0) exit_cleanup(RERR_STREAMIO);
653 /* setup for multiplexing an error stream with the data stream */
654 void io_start_multiplex_out(int fd)
656 multiplex_out_fd = fd;
658 io_start_buffering(fd);
659 io_multiplexing_out = 1;
662 /* setup for multiplexing an error stream with the data stream */
663 void io_start_multiplex_in(int fd)
665 multiplex_in_fd = fd;
667 io_multiplexing_in = 1;
670 /* write an message to the multiplexed error stream */
671 int io_multiplex_write(enum logcode code, char *buf, size_t len)
673 if (!io_multiplexing_out) return 0;
676 stats.total_written += (len+4);
677 mplex_write(multiplex_out_fd, code, buf, len);
681 /* stop output multiplexing */
682 void io_multiplexing_close(void)
684 io_multiplexing_out = 0;