2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 socket and pipe IO utilities used in rsync
25 Midstrength stream cypher privacy added by Martin Pool, October 2000.
28 #include "lib/arcfour.h"
31 /* if no timeout is specified then use a 60 second select timeout */
32 #define SELECT_TIMEOUT 60
36 static int io_multiplexing_out;
37 static int io_multiplexing_in;
38 static int multiplex_in_fd;
39 static int multiplex_out_fd;
40 static time_t last_io;
41 static int eof_error=1;
43 extern int io_timeout;
44 extern struct stats stats;
46 extern ArcfourContext arcfour_enc_ctx, arcfour_dec_ctx;
48 static int io_error_fd = -1;
50 static void read_loop(int fd, char *buf, int len);
52 static void check_timeout(void)
54 extern int am_server, am_daemon;
57 if (!io_timeout) return;
66 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
67 if (!am_server && !am_daemon) {
68 rprintf(FERROR,"io timeout after %d second - exiting\n",
71 exit_cleanup(RERR_TIMEOUT);
75 /* setup the fd used to propogate errors */
76 void io_set_error_fd(int fd)
81 /* read some data from the error fd and write it to the write log code */
82 static void read_error_fd(void)
91 read_loop(fd, buf, 4);
100 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
101 read_loop(fd, buf, n);
102 rwrite((enum logcode)tag, buf, n);
113 * This is the most fundamental socket read function -- the only one that
114 * actually calls the kernel.
116 * It reads from a socket with IO timeout. return the number of bytes
117 * read. If no bytes can be read then exit, never return a number <= 0
119 * If arcfour_enabled is set, it decrypts data while reading using the
120 * global arcfour state.
122 static int read_timeout(int fd, char *buf, int len)
135 if (io_error_fd != -1) {
136 FD_SET(io_error_fd, &fds);
137 if (io_error_fd > fd) fd_count = io_error_fd+1;
140 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
145 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
146 if (errno == EBADF) {
147 exit_cleanup(RERR_SOCKETIO);
153 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
157 if (!FD_ISSET(fd, &fds)) continue;
159 n = read(fd, buf, len);
162 /* arcfour can decrypt in place. */
163 if (arcfour_enabled) {
164 rprintf(FERROR, "decrypt %d bytes..", n);
165 arcfour_decrypt(&arcfour_dec_ctx, buf, buf, n);
166 rprintf(FERROR, "done\n");
172 last_io = time(NULL);
176 if (n == -1 && errno == EINTR) {
181 (errno == EWOULDBLOCK || errno == EAGAIN)) {
188 rprintf(FERROR,"unexpected EOF in read_timeout\n");
190 exit_cleanup(RERR_STREAMIO);
193 /* this prevents us trying to write errors on a dead socket */
194 io_multiplexing_close();
196 rprintf(FERROR,"read error: %s\n", strerror(errno));
197 exit_cleanup(RERR_STREAMIO);
203 /* continue trying to read len bytes - don't return until len
205 static void read_loop(int fd, char *buf, int len)
208 int n = read_timeout(fd, buf, len);
215 /* read from the file descriptor handling multiplexing -
216 return number of bytes read
218 static int read_unbuffered(int fd, char *buf, int len)
220 static int remaining;
224 if (!io_multiplexing_in || fd != multiplex_in_fd)
225 return read_timeout(fd, buf, len);
229 len = MIN(len, remaining);
230 read_loop(fd, buf, len);
236 read_loop(fd, line, 4);
239 remaining = tag & 0xFFFFFF;
242 if (tag == MPLEX_BASE) continue;
246 if (tag != FERROR && tag != FINFO) {
247 rprintf(FERROR,"unexpected tag %d\n", tag);
248 exit_cleanup(RERR_STREAMIO);
251 if (remaining > sizeof(line)-1) {
252 rprintf(FERROR,"multiplexing overflow %d\n\n",
254 exit_cleanup(RERR_STREAMIO);
257 read_loop(fd, line, remaining);
260 rprintf((enum logcode)tag,"%s", line);
268 /* do a buffered read from fd. don't return until all N bytes
269 have been read. If all N can't be read then exit with an error */
270 static void readfd(int fd,char *buffer,int N)
278 ret = read_unbuffered(fd,buffer + total,N-total);
282 stats.total_read += total;
286 int32 read_int(int f)
293 if (ret == (int32)0xffffffff) return -1;
297 int64 read_longint(int f)
299 extern int remote_version;
304 if ((int32)ret != (int32)0xffffffff) {
309 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
310 exit_cleanup(RERR_UNSUPPORTED);
312 if (remote_version >= 16) {
314 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
321 void read_buf(int f,char *buf,int len)
326 void read_sbuf(int f,char *buf,int len)
332 unsigned char read_byte(int f)
335 read_buf(f,(char *)&c,1);
341 * Write len bytes to fd.
343 * If arcfour_enabled is true, encrypt all data as it passes onto the
344 * wire using the global arcfour state.
346 static void writefd_unbuffered(int fd, char const *buf, int len)
355 while (total < len) {
361 if (io_error_fd != -1) {
362 FD_SET(io_error_fd,&r_fds);
363 if (io_error_fd > fd_count)
364 fd_count = io_error_fd;
367 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
372 count = select(fd_count+1,
373 io_error_fd != -1?&r_fds:NULL,
378 if (errno == EBADF) {
379 exit_cleanup(RERR_SOCKETIO);
385 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
389 if (FD_ISSET(fd, &w_fds)) {
390 int ret, n = len-total;
392 ret = write(fd,buf+total,n);
394 if (ret == -1 && errno == EINTR) {
399 (errno == EWOULDBLOCK || errno == EAGAIN)) {
405 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
406 exit_cleanup(RERR_STREAMIO);
409 /* Sleep after writing to limit I/O bandwidth */
413 tv.tv_usec = ret * 1000 / bwlimit;
414 while (tv.tv_usec > 1000000)
417 tv.tv_usec -= 1000000;
419 select(0, NULL, NULL, NULL, &tv);
425 last_io = time(NULL);
433 /* build up a temporary buffer of encrypted data */
434 static void writefd_encrypt(int fd, char const *buf, int len)
436 static char *arcbuf = NULL;
437 static int buf_len = 0;
439 if (arcfour_enabled) {
441 if (len > buf_len || !arcbuf) {
444 arcbuf = malloc(len);
448 rprintf(FERROR, "encrypt %d bytes ..", len);
450 arcfour_encrypt(&arcfour_dec_ctx, arcbuf, buf, len);
451 // writefd_unbuffered(fd, arcbuf, len);
452 writefd_unbuffered(fd, buf, len);
453 rprintf(FERROR, "done\n");
455 writefd_unbuffered(fd, buf, len);
460 static char *io_buffer;
461 static int io_buffer_count;
463 void io_start_buffering(int fd)
465 if (io_buffer) return;
466 multiplex_out_fd = fd;
467 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
468 if (!io_buffer) out_of_memory("writefd");
472 /* write an message to a multiplexed stream. If this fails then rsync
474 static void mplex_write(int fd, enum logcode code, char *buf, int len)
479 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
481 if (n > (sizeof(buffer)-4)) {
482 n = sizeof(buffer)-4;
485 memcpy(&buffer[4], buf, n);
486 writefd_encrypt(fd, buffer, n+4);
492 writefd_encrypt(fd, buf, len);
499 int fd = multiplex_out_fd;
500 if (!io_buffer_count || no_flush) return;
502 if (io_multiplexing_out) {
503 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
505 writefd_encrypt(fd, io_buffer, io_buffer_count);
510 void io_end_buffering(int fd)
513 if (!io_multiplexing_out) {
519 /* some OSes have a bug where an exit causes the pending writes on
520 a socket to be flushed. Do an explicit shutdown to try to prevent this */
521 void io_shutdown(void)
523 if (multiplex_out_fd != -1) close(multiplex_out_fd);
524 if (io_error_fd != -1) close(io_error_fd);
525 multiplex_out_fd = -1;
530 static void writefd(int fd,char *buf,int len)
532 stats.total_written += len;
534 if (!io_buffer || fd != multiplex_out_fd) {
535 writefd_encrypt(fd, buf, len);
540 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
542 memcpy(io_buffer+io_buffer_count, buf, n);
545 io_buffer_count += n;
548 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
553 void write_int(int f,int32 x)
560 void write_longint(int f, int64 x)
562 extern int remote_version;
565 if (remote_version < 16 || x <= 0x7FFFFFFF) {
566 write_int(f, (int)x);
570 write_int(f, (int32)0xFFFFFFFF);
571 SIVAL(b,0,(x&0xFFFFFFFF));
572 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
577 void write_buf(int f,char *buf,int len)
582 /* write a string to the connection */
583 static void write_sbuf(int f,char *buf)
585 write_buf(f, buf, strlen(buf));
589 void write_byte(int f,unsigned char c)
591 write_buf(f,(char *)&c,1);
594 int read_line(int f, char *buf, int maxlen)
601 if (buf[0] == 0) return 0;
602 if (buf[0] == '\n') {
606 if (buf[0] != '\r') {
622 void io_printf(int fd, const char *format, ...)
628 va_start(ap, format);
629 len = vslprintf(buf, sizeof(buf), format, ap);
632 if (len < 0) exit_cleanup(RERR_STREAMIO);
638 /* setup for multiplexing an error stream with the data stream */
639 void io_start_multiplex_out(int fd)
641 multiplex_out_fd = fd;
643 io_start_buffering(fd);
644 io_multiplexing_out = 1;
647 /* setup for multiplexing an error stream with the data stream */
648 void io_start_multiplex_in(int fd)
650 multiplex_in_fd = fd;
652 io_multiplexing_in = 1;
655 /* write an message to the multiplexed error stream */
656 int io_multiplex_write(enum logcode code, char *buf, int len)
658 if (!io_multiplexing_out) return 0;
661 stats.total_written += (len+4);
662 mplex_write(multiplex_out_fd, code, buf, len);
666 /* write a message to the special error fd */
667 int io_error_write(int f, enum logcode code, char *buf, int len)
669 if (f == -1) return 0;
670 mplex_write(f, code, buf, len);
674 /* stop output multiplexing */
675 void io_multiplexing_close(void)
677 io_multiplexing_out = 0;