eeb3a564bcb4cbd7de682cf15f24b9d239867942
[rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2    
3    Copyright (C) 1996-2001 by Andrew Tridgell 
4    Copyright (C) Paul Mackerras 1996
5    Copyright (C) 2001 by Martin Pool <mbp@samba.org>
6    
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22 /**
23  *
24  * @file io.c
25  *
26  * Socket and pipe IO utilities used in rsync.
27  *
28  * rsync provides its own multiplexing system, which is used to send
29  * stderr and stdout over a single socket.  We need this because
30  * stdout normally carries the binary data stream, and stderr all our
31  * error messages.
32  *
33  * For historical reasons this is off during the start of the
34  * connection, but it's switched on quite early using
35  * io_start_multiplex_out() and io_start_multiplex_in().
36  **/
37
38 #include "rsync.h"
39
40 /* if no timeout is specified then use a 60 second select timeout */
41 #define SELECT_TIMEOUT 60
42
43 static int io_multiplexing_out;
44 static int io_multiplexing_in;
45 static int multiplex_in_fd;
46 static int multiplex_out_fd;
47 static time_t last_io;
48 static int no_flush;
49
50 extern int bwlimit;
51 extern int verbose;
52 extern int io_timeout;
53 extern struct stats stats;
54
55
56 /** Ignore EOF errors while reading a module listing if the remote
57     version is 24 or less. */
58 int kludge_around_eof = False;
59
60
61 static int io_error_fd = -1;
62
63 static void read_loop(int fd, char *buf, size_t len);
64
65 static void check_timeout(void)
66 {
67         extern int am_server, am_daemon;
68         time_t t;
69
70         err_list_push();
71         
72         if (!io_timeout) return;
73
74         if (!last_io) {
75                 last_io = time(NULL);
76                 return;
77         }
78
79         t = time(NULL);
80
81         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
82                 if (!am_server && !am_daemon) {
83                         rprintf(FERROR,"io timeout after %d seconds - exiting\n", 
84                                 (int)(t-last_io));
85                 }
86                 exit_cleanup(RERR_TIMEOUT);
87         }
88 }
89
90 /* setup the fd used to propogate errors */
91 void io_set_error_fd(int fd)
92 {
93         io_error_fd = fd;
94 }
95
96 /* read some data from the error fd and write it to the write log code */
97 static void read_error_fd(void)
98 {
99         char buf[200];
100         size_t n;
101         int fd = io_error_fd;
102         int tag, len;
103
104         /* io_error_fd is temporarily disabled -- is this meant to
105          * prevent indefinite recursion? */
106         io_error_fd = -1;
107
108         read_loop(fd, buf, 4);
109         tag = IVAL(buf, 0);
110
111         len = tag & 0xFFFFFF;
112         tag = tag >> 24;
113         tag -= MPLEX_BASE;
114
115         while (len) {
116                 n = len;
117                 if (n > (sizeof(buf)-1))
118                         n = sizeof(buf)-1;
119                 read_loop(fd, buf, n);
120                 rwrite((enum logcode)tag, buf, n);
121                 len -= n;
122         }
123
124         io_error_fd = fd;
125 }
126
127
128 static void whine_about_eof (void)
129 {
130         /**
131            It's almost always an error to get an EOF when we're trying
132            to read from the network, because the protocol is
133            self-terminating.
134            
135            However, there is one unfortunate cases where it is not,
136            which is rsync <2.4.6 sending a list of modules on a
137            server, since the list is terminated by closing the socket.
138            So, for the section of the program where that is a problem
139            (start_socket_client), kludge_around_eof is True and we
140            just exit.
141         */
142
143         if (kludge_around_eof)
144                 exit_cleanup (0);
145         else {
146                 rprintf (FERROR,
147                          "%s: connection unexpectedly closed "
148                          "(%.0f bytes read so far)\n",
149                          RSYNC_NAME, (double)stats.total_read);
150         
151                 exit_cleanup (RERR_STREAMIO);
152         }
153 }
154
155
156 static void die_from_readerr (int err)
157 {
158         /* this prevents us trying to write errors on a dead socket */
159         io_multiplexing_close();
160                                 
161         rprintf(FERROR, "%s: read error: %s\n",
162                 RSYNC_NAME, strerror (err));
163         exit_cleanup(RERR_STREAMIO);
164 }
165
166
167 /*!
168  * Read from a socket with IO timeout. return the number of bytes
169  * read. If no bytes can be read then exit, never return a number <= 0.
170  *
171  * TODO: If the remote shell connection fails, then current versions
172  * actually report an "unexpected EOF" error here.  Since it's a
173  * fairly common mistake to try to use rsh when ssh is required, we
174  * should trap that: if we fail to read any data at all, we should
175  * give a better explanation.  We can tell whether the connection has
176  * started by looking e.g. at whether the remote version is known yet.
177  */
178 static int read_timeout (int fd, char *buf, size_t len)
179 {
180         int n, ret=0;
181
182         io_flush();
183
184         while (ret == 0) {
185                 /* until we manage to read *something* */
186                 fd_set fds;
187                 struct timeval tv;
188                 int fd_count = fd+1;
189                 int count;
190
191                 FD_ZERO(&fds);
192                 FD_SET(fd, &fds);
193                 if (io_error_fd != -1) {
194                         FD_SET(io_error_fd, &fds);
195                         if (io_error_fd > fd) fd_count = io_error_fd+1;
196                 }
197
198                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
199                 tv.tv_usec = 0;
200
201                 errno = 0;
202
203                 count = select(fd_count, &fds, NULL, NULL, &tv);
204
205                 if (count == 0) {
206                         check_timeout();
207                 }
208
209                 if (count <= 0) {
210                         if (errno == EBADF) {
211                                 exit_cleanup(RERR_SOCKETIO);
212                         }
213                         continue;
214                 }
215
216                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
217                         read_error_fd();
218                 }
219
220                 if (!FD_ISSET(fd, &fds)) continue;
221
222                 n = read(fd, buf, len);
223
224                 if (n > 0) {
225                         buf += n;
226                         len -= n;
227                         ret += n;
228                         if (io_timeout)
229                                 last_io = time(NULL);
230                         continue;
231                 } else if (n == 0) {
232                         whine_about_eof ();
233                         return -1; /* doesn't return */
234                 } else if (n == -1) {
235                         if (errno == EINTR || errno == EWOULDBLOCK ||
236                             errno == EAGAIN) 
237                                 continue;
238                         else
239                                 die_from_readerr (errno);
240                 }
241         }
242
243         return ret;
244 }
245
246
247
248
249 /*! Continue trying to read len bytes - don't return until len has
250   been read.   */
251 static void read_loop (int fd, char *buf, size_t len)
252 {
253         while (len) {
254                 int n = read_timeout(fd, buf, len);
255
256                 buf += n;
257                 len -= n;
258         }
259 }
260
261
262 /**
263  * Read from the file descriptor handling multiplexing - return number
264  * of bytes read.
265  * 
266  * Never returns <= 0. 
267  */
268 static int read_unbuffered(int fd, char *buf, size_t len)
269 {
270         static size_t remaining;
271         int tag, ret = 0;
272         char line[1024];
273
274         if (!io_multiplexing_in || fd != multiplex_in_fd)
275                 return read_timeout(fd, buf, len);
276
277         while (ret == 0) {
278                 if (remaining) {
279                         len = MIN(len, remaining);
280                         read_loop(fd, buf, len);
281                         remaining -= len;
282                         ret = len;
283                         continue;
284                 }
285
286                 read_loop(fd, line, 4);
287                 tag = IVAL(line, 0);
288
289                 remaining = tag & 0xFFFFFF;
290                 tag = tag >> 24;
291
292                 if (tag == MPLEX_BASE)
293                         continue;
294
295                 tag -= MPLEX_BASE;
296
297                 if (tag != FERROR && tag != FINFO) {
298                         rprintf(FERROR, "unexpected tag %d\n", tag);
299                         exit_cleanup(RERR_STREAMIO);
300                 }
301
302                 if (remaining > sizeof(line) - 1) {
303                         rprintf(FERROR, "multiplexing overflow %d\n\n",
304                                 remaining);
305                         exit_cleanup(RERR_STREAMIO);
306                 }
307
308                 read_loop(fd, line, remaining);
309                 line[remaining] = 0;
310
311                 rprintf((enum logcode) tag, "%s", line);
312                 remaining = 0;
313         }
314
315         return ret;
316 }
317
318
319
320 /* do a buffered read from fd. don't return until all N bytes
321    have been read. If all N can't be read then exit with an error */
322 static void readfd (int fd, char *buffer, size_t N)
323 {
324         int  ret;
325         size_t total=0;  
326         
327         while (total < N) {
328                 io_flush();
329
330                 ret = read_unbuffered (fd, buffer + total, N-total);
331                 total += ret;
332         }
333
334         stats.total_read += total;
335 }
336
337
338 int32 read_int(int f)
339 {
340         char b[4];
341         int32 ret;
342
343         readfd(f,b,4);
344         ret = IVAL(b,0);
345         if (ret == (int32)0xffffffff) return -1;
346         return ret;
347 }
348
349 int64 read_longint(int f)
350 {
351         extern int remote_version;
352         int64 ret;
353         char b[8];
354         ret = read_int(f);
355
356         if ((int32)ret != (int32)0xffffffff) {
357                 return ret;
358         }
359
360 #ifdef NO_INT64
361         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
362         exit_cleanup(RERR_UNSUPPORTED);
363 #else
364         if (remote_version >= 16) {
365                 readfd(f,b,8);
366                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
367         }
368 #endif
369
370         return ret;
371 }
372
373 void read_buf(int f,char *buf,size_t len)
374 {
375         readfd(f,buf,len);
376 }
377
378 void read_sbuf(int f,char *buf,size_t len)
379 {
380         read_buf (f,buf,len);
381         buf[len] = 0;
382 }
383
384 unsigned char read_byte(int f)
385 {
386         unsigned char c;
387         read_buf (f, (char *)&c, 1);
388         return c;
389 }
390
391 /* Write len bytes to fd.  This underlies the multiplexing system,
392  * which is always called by application code.  */
393 static void writefd_unbuffered(int fd,char *buf,size_t len)
394 {
395         size_t total = 0;
396         fd_set w_fds, r_fds;
397         int fd_count, count;
398         struct timeval tv;
399
400         err_list_push();
401
402         no_flush++;
403
404         while (total < len) {
405                 FD_ZERO(&w_fds);
406                 FD_ZERO(&r_fds);
407                 FD_SET(fd,&w_fds);
408                 fd_count = fd;
409
410                 if (io_error_fd != -1) {
411                         FD_SET(io_error_fd,&r_fds);
412                         if (io_error_fd > fd_count) 
413                                 fd_count = io_error_fd;
414                 }
415
416                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
417                 tv.tv_usec = 0;
418
419                 errno = 0;
420
421                 count = select(fd_count+1,
422                                io_error_fd != -1?&r_fds:NULL,
423                                &w_fds,NULL,
424                                &tv);
425
426                 if (count == 0) {
427                         check_timeout();
428                 }
429
430                 if (count <= 0) {
431                         if (errno == EBADF) {
432                                 exit_cleanup(RERR_SOCKETIO);
433                         }
434                         continue;
435                 }
436
437                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
438                         read_error_fd();
439                 }
440
441                 if (FD_ISSET(fd, &w_fds)) {
442                         int ret;
443                         size_t n = len-total;
444                         ret = write(fd,buf+total,n);
445
446                         if (ret == -1 && errno == EINTR) {
447                                 continue;
448                         }
449
450                         if (ret == -1 && 
451                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
452                                 msleep(1);
453                                 continue;
454                         }
455
456                         if (ret <= 0) {
457                                 /* Don't try to write errors back
458                                  * across the stream */
459                                 io_multiplexing_close();
460                                 rprintf(FERROR,
461                                         "error writing %d unbuffered bytes"
462                                         " - exiting: %s\n", len,
463                                         strerror(errno));
464                                 exit_cleanup(RERR_STREAMIO);
465                         }
466
467                         /* Sleep after writing to limit I/O bandwidth */
468                         if (bwlimit)
469                         {
470                             tv.tv_sec = 0;
471                             tv.tv_usec = ret * 1000 / bwlimit;
472                             while (tv.tv_usec > 1000000)
473                             {
474                                 tv.tv_sec++;
475                                 tv.tv_usec -= 1000000;
476                             }
477                             select(0, NULL, NULL, NULL, &tv);
478                         }
479  
480                         total += ret;
481
482                         if (io_timeout)
483                                 last_io = time(NULL);
484                 }
485         }
486
487         no_flush--;
488 }
489
490
491 static char *io_buffer;
492 static int io_buffer_count;
493
494 void io_start_buffering(int fd)
495 {
496         if (io_buffer) return;
497         multiplex_out_fd = fd;
498         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
499         if (!io_buffer) out_of_memory("writefd");
500         io_buffer_count = 0;
501 }
502
503 /* write an message to a multiplexed stream. If this fails then rsync
504    exits */
505 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
506 {
507         char buffer[4096];
508         size_t n = len;
509
510         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
511
512         if (n > (sizeof(buffer)-4)) {
513                 n = sizeof(buffer)-4;
514         }
515
516         memcpy(&buffer[4], buf, n);
517         writefd_unbuffered(fd, buffer, n+4);
518
519         len -= n;
520         buf += n;
521
522         if (len) {
523                 writefd_unbuffered(fd, buf, len);
524         }
525 }
526
527
528 void io_flush(void)
529 {
530         int fd = multiplex_out_fd;
531
532         err_list_push();
533
534         if (!io_buffer_count || no_flush) return;
535
536         if (io_multiplexing_out) {
537                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
538         } else {
539                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
540         }
541         io_buffer_count = 0;
542 }
543
544
545 void io_end_buffering(void)
546 {
547         io_flush();
548         if (!io_multiplexing_out) {
549                 free(io_buffer);
550                 io_buffer = NULL;
551         }
552 }
553
554 static void writefd(int fd,char *buf,size_t len)
555 {
556         stats.total_written += len;
557
558         err_list_push();
559
560         if (!io_buffer || fd != multiplex_out_fd) {
561                 writefd_unbuffered(fd, buf, len);
562                 return;
563         }
564
565         while (len) {
566                 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
567                 if (n > 0) {
568                         memcpy(io_buffer+io_buffer_count, buf, n);
569                         buf += n;
570                         len -= n;
571                         io_buffer_count += n;
572                 }
573                 
574                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
575         }
576 }
577
578
579 void write_int(int f,int32 x)
580 {
581         char b[4];
582         SIVAL(b,0,x);
583         writefd(f,b,4);
584 }
585
586
587 /*
588  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
589  * 64-bit types on this platform.
590  */
591 void write_longint(int f, int64 x)
592 {
593         extern int remote_version;
594         char b[8];
595
596         if (remote_version < 16 || x <= 0x7FFFFFFF) {
597                 write_int(f, (int)x);
598                 return;
599         }
600
601         write_int(f, (int32)0xFFFFFFFF);
602         SIVAL(b,0,(x&0xFFFFFFFF));
603         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
604
605         writefd(f,b,8);
606 }
607
608 void write_buf(int f,char *buf,size_t len)
609 {
610         writefd(f,buf,len);
611 }
612
613 /* write a string to the connection */
614 static void write_sbuf(int f,char *buf)
615 {
616         write_buf(f, buf, strlen(buf));
617 }
618
619
620 void write_byte(int f,unsigned char c)
621 {
622         write_buf(f,(char *)&c,1);
623 }
624
625
626
627 int read_line(int f, char *buf, size_t maxlen)
628 {
629         while (maxlen) {
630                 buf[0] = 0;
631                 read_buf(f, buf, 1);
632                 if (buf[0] == 0) return 0;
633                 if (buf[0] == '\n') {
634                         buf[0] = 0;
635                         break;
636                 }
637                 if (buf[0] != '\r') {
638                         buf++;
639                         maxlen--;
640                 }
641         }
642         if (maxlen == 0) {
643                 *buf = 0;
644                 return 0;
645         }
646
647         return 1;
648 }
649
650
651 void io_printf(int fd, const char *format, ...)
652 {
653         va_list ap;  
654         char buf[1024];
655         int len;
656         
657         va_start(ap, format);
658         len = vsnprintf(buf, sizeof(buf), format, ap);
659         va_end(ap);
660
661         if (len < 0) exit_cleanup(RERR_STREAMIO);
662
663         write_sbuf(fd, buf);
664 }
665
666
667 /* setup for multiplexing an error stream with the data stream */
668 void io_start_multiplex_out(int fd)
669 {
670         multiplex_out_fd = fd;
671         io_flush();
672         io_start_buffering(fd);
673         io_multiplexing_out = 1;
674 }
675
676 /* setup for multiplexing an error stream with the data stream */
677 void io_start_multiplex_in(int fd)
678 {
679         multiplex_in_fd = fd;
680         io_flush();
681         io_multiplexing_in = 1;
682 }
683
684 /* write an message to the multiplexed error stream */
685 int io_multiplex_write(enum logcode code, char *buf, size_t len)
686 {
687         if (!io_multiplexing_out) return 0;
688
689         io_flush();
690         stats.total_written += (len+4);
691         mplex_write(multiplex_out_fd, code, buf, len);
692         return 1;
693 }
694
695 /* stop output multiplexing */
696 void io_multiplexing_close(void)
697 {
698         io_multiplexing_out = 0;
699 }
700