don't pprint the IO timeout message if we are a server or daemon (can
[rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   socket and pipe IO utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 /* if no timeout is specified then use a 60 second select timeout */
28 #define SELECT_TIMEOUT 60
29
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static int multiplex_in_fd;
33 static int multiplex_out_fd;
34 static time_t last_io;
35 static int eof_error=1;
36 extern int verbose;
37 extern int io_timeout;
38 extern struct stats stats;
39
40 static int buffer_f_in = -1;
41 static int io_error_fd = -1;
42
43 static void read_loop(int fd, char *buf, int len);
44
45 void setup_readbuffer(int f_in)
46 {
47         buffer_f_in = f_in;
48 }
49
50 static void check_timeout(void)
51 {
52         extern int am_server, am_daemon;
53         time_t t;
54         
55         if (!io_timeout) return;
56
57         if (!last_io) {
58                 last_io = time(NULL);
59                 return;
60         }
61
62         t = time(NULL);
63
64         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
65                 if (!am_server && !am_daemon) {
66                         rprintf(FERROR,"io timeout after %d second - exiting\n", 
67                                 (int)(t-last_io));
68                 }
69                 exit_cleanup(RERR_TIMEOUT);
70         }
71 }
72
73 /* setup the fd used to propogate errors */
74 void io_set_error_fd(int fd)
75 {
76         io_error_fd = fd;
77 }
78
79 /* read some data from the error fd and write it to the write log code */
80 static void read_error_fd(void)
81 {
82         char buf[200];
83         int n;
84         int fd = io_error_fd;
85         int tag, len;
86
87         io_error_fd = -1;
88
89         read_loop(fd, buf, 4);
90         tag = IVAL(buf, 0);
91
92         len = tag & 0xFFFFFF;
93         tag = tag >> 24;
94         tag -= MPLEX_BASE;
95
96         while (len) {
97                 n = len;
98                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
99                 read_loop(fd, buf, n);
100                 rwrite((enum logcode)tag, buf, n);
101                 len -= n;
102         }
103
104         io_error_fd = fd;
105 }
106
107
108 static int no_flush;
109
110 /* read from a socket with IO timeout. return the number of
111    bytes read. If no bytes can be read then exit, never return
112    a number <= 0 */
113 static int read_timeout(int fd, char *buf, int len)
114 {
115         int n, ret=0;
116
117         io_flush();
118
119         while (ret == 0) {
120                 fd_set fds;
121                 struct timeval tv;
122                 int fd_count = fd+1;
123
124                 FD_ZERO(&fds);
125                 FD_SET(fd, &fds);
126                 if (io_error_fd != -1) {
127                         FD_SET(io_error_fd, &fds);
128                         if (io_error_fd > fd) fd_count = io_error_fd+1;
129                 }
130
131                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
132                 tv.tv_usec = 0;
133
134                 errno = 0;
135
136                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
137                         if (errno == EBADF) {
138                                 exit_cleanup(RERR_SOCKETIO);
139                         }
140                         check_timeout();
141                         continue;
142                 }
143
144                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
145                         read_error_fd();
146                 }
147
148                 if (!FD_ISSET(fd, &fds)) continue;
149
150                 n = read(fd, buf, len);
151
152                 if (n > 0) {
153                         buf += n;
154                         len -= n;
155                         ret += n;
156                         if (io_timeout)
157                                 last_io = time(NULL);
158                         continue;
159                 }
160
161                 if (n == -1 && errno == EINTR) {
162                         continue;
163                 }
164
165                 if (n == -1 && 
166                     (errno == EWOULDBLOCK || errno == EAGAIN)) {
167                         continue;
168                 }
169
170
171                 if (n == 0) {
172                         if (eof_error) {
173                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
174                         }
175                         exit_cleanup(RERR_STREAMIO);
176                 }
177
178                 /* this prevents us trying to write errors on a dead socket */
179                 io_multiplexing_close();
180
181                 rprintf(FERROR,"read error: %s\n", strerror(errno));
182                 exit_cleanup(RERR_STREAMIO);
183         }
184
185         return ret;
186 }
187
188 /* continue trying to read len bytes - don't return until len
189    has been read */
190 static void read_loop(int fd, char *buf, int len)
191 {
192         while (len) {
193                 int n = read_timeout(fd, buf, len);
194
195                 buf += n;
196                 len -= n;
197         }
198 }
199
200 /* read from the file descriptor handling multiplexing - 
201    return number of bytes read
202    never return <= 0 */
203 static int read_unbuffered(int fd, char *buf, int len)
204 {
205         static int remaining;
206         int tag, ret=0;
207         char line[1024];
208
209         if (!io_multiplexing_in || fd != multiplex_in_fd) 
210                 return read_timeout(fd, buf, len);
211
212         while (ret == 0) {
213                 if (remaining) {
214                         len = MIN(len, remaining);
215                         read_loop(fd, buf, len);
216                         remaining -= len;
217                         ret = len;
218                         continue;
219                 }
220
221                 read_loop(fd, line, 4);
222                 tag = IVAL(line, 0);
223
224                 remaining = tag & 0xFFFFFF;
225                 tag = tag >> 24;
226
227                 if (tag == MPLEX_BASE) continue;
228
229                 tag -= MPLEX_BASE;
230
231                 if (tag != FERROR && tag != FINFO) {
232                         rprintf(FERROR,"unexpected tag %d\n", tag);
233                         exit_cleanup(RERR_STREAMIO);
234                 }
235
236                 if (remaining > sizeof(line)-1) {
237                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
238                                 remaining);
239                         exit_cleanup(RERR_STREAMIO);
240                 }
241
242                 read_loop(fd, line, remaining);
243                 line[remaining] = 0;
244
245                 rprintf((enum logcode)tag,"%s", line);
246                 remaining = 0;
247         }
248
249         return ret;
250 }
251
252
253 /* do a buffered read from fd. don't return until all N bytes
254    have been read. If all N can't be read then exit with an error */
255 static void readfd(int fd,char *buffer,int N)
256 {
257         int  ret;
258         int total=0;  
259         
260         while (total < N) {
261                 io_flush();
262
263                 ret = read_unbuffered(fd,buffer + total,N-total);
264                 total += ret;
265         }
266
267         stats.total_read += total;
268 }
269
270
271 int32 read_int(int f)
272 {
273         char b[4];
274         int32 ret;
275
276         readfd(f,b,4);
277         ret = IVAL(b,0);
278         if (ret == (int32)0xffffffff) return -1;
279         return ret;
280 }
281
282 int64 read_longint(int f)
283 {
284         extern int remote_version;
285         int64 ret;
286         char b[8];
287         ret = read_int(f);
288
289         if ((int32)ret != (int32)0xffffffff) {
290                 return ret;
291         }
292
293 #ifdef NO_INT64
294         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
295         exit_cleanup(RERR_UNSUPPORTED);
296 #else
297         if (remote_version >= 16) {
298                 readfd(f,b,8);
299                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
300         }
301 #endif
302
303         return ret;
304 }
305
306 void read_buf(int f,char *buf,int len)
307 {
308         readfd(f,buf,len);
309 }
310
311 void read_sbuf(int f,char *buf,int len)
312 {
313         read_buf(f,buf,len);
314         buf[len] = 0;
315 }
316
317 unsigned char read_byte(int f)
318 {
319         unsigned char c;
320         read_buf(f,(char *)&c,1);
321         return c;
322 }
323
324
325
326 /* write len bytes to fd, possibly reading from buffer_f_in if set
327    in order to unclog the pipe. don't return until all len
328    bytes have been written */
329 static void writefd_unbuffered(int fd,char *buf,int len)
330 {
331         int total = 0;
332         fd_set w_fds, r_fds;
333         int fd_count, count;
334         struct timeval tv;
335
336         no_flush++;
337
338         while (total < len) {
339                 FD_ZERO(&w_fds);
340                 FD_ZERO(&r_fds);
341                 FD_SET(fd,&w_fds);
342                 fd_count = fd;
343
344                 if (io_error_fd != -1) {
345                         FD_SET(io_error_fd,&r_fds);
346                         if (io_error_fd > fd_count) 
347                                 fd_count = io_error_fd;
348                 }
349
350                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
351                 tv.tv_usec = 0;
352
353                 errno = 0;
354
355                 count = select(fd_count+1,
356                                io_error_fd != -1?&r_fds:NULL,
357                                &w_fds,NULL,
358                                &tv);
359
360                 if (count <= 0) {
361                         if (errno == EBADF) {
362                                 exit_cleanup(RERR_SOCKETIO);
363                         }
364                         check_timeout();
365                         continue;
366                 }
367
368                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
369                         read_error_fd();
370                 }
371
372                 if (FD_ISSET(fd, &w_fds)) {
373                         int ret, n = len-total;
374                         
375                         ret = write(fd,buf+total,n);
376
377                         if (ret == -1 && errno == EINTR) {
378                                 continue;
379                         }
380
381                         if (ret == -1 && 
382                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
383                                 continue;
384                         }
385
386                         if (ret <= 0) {
387                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
388                                 exit_cleanup(RERR_STREAMIO);
389                         }
390
391                         total += ret;
392
393                         if (io_timeout)
394                                 last_io = time(NULL);
395                 }
396         }
397
398         no_flush--;
399 }
400
401
402 static char *io_buffer;
403 static int io_buffer_count;
404
405 void io_start_buffering(int fd)
406 {
407         if (io_buffer) return;
408         multiplex_out_fd = fd;
409         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
410         if (!io_buffer) out_of_memory("writefd");
411         io_buffer_count = 0;
412 }
413
414 /* write an message to a multiplexed stream. If this fails then rsync
415    exits */
416 static void mplex_write(int fd, enum logcode code, char *buf, int len)
417 {
418         char buffer[4096];
419         int n = len;
420
421         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
422
423         if (n > (sizeof(buffer)-4)) {
424                 n = sizeof(buffer)-4;
425         }
426
427         memcpy(&buffer[4], buf, n);
428         writefd_unbuffered(fd, buffer, n+4);
429
430         len -= n;
431         buf += n;
432
433         if (len) {
434                 writefd_unbuffered(fd, buf, len);
435         }
436 }
437
438
439 void io_flush(void)
440 {
441         int fd = multiplex_out_fd;
442         if (!io_buffer_count || no_flush) return;
443
444         if (io_multiplexing_out) {
445                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
446         } else {
447                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
448         }
449         io_buffer_count = 0;
450 }
451
452 void io_end_buffering(int fd)
453 {
454         io_flush();
455         if (!io_multiplexing_out) {
456                 free(io_buffer);
457                 io_buffer = NULL;
458         }
459 }
460
461 static void writefd(int fd,char *buf,int len)
462 {
463         stats.total_written += len;
464
465         if (!io_buffer || fd != multiplex_out_fd) {
466                 writefd_unbuffered(fd, buf, len);
467                 return;
468         }
469
470         while (len) {
471                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
472                 if (n > 0) {
473                         memcpy(io_buffer+io_buffer_count, buf, n);
474                         buf += n;
475                         len -= n;
476                         io_buffer_count += n;
477                 }
478                 
479                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
480         }
481 }
482
483
484 void write_int(int f,int32 x)
485 {
486         char b[4];
487         SIVAL(b,0,x);
488         writefd(f,b,4);
489 }
490
491 void write_longint(int f, int64 x)
492 {
493         extern int remote_version;
494         char b[8];
495
496         if (remote_version < 16 || x <= 0x7FFFFFFF) {
497                 write_int(f, (int)x);
498                 return;
499         }
500
501         write_int(f, (int32)0xFFFFFFFF);
502         SIVAL(b,0,(x&0xFFFFFFFF));
503         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
504
505         writefd(f,b,8);
506 }
507
508 void write_buf(int f,char *buf,int len)
509 {
510         writefd(f,buf,len);
511 }
512
513 /* write a string to the connection */
514 static void write_sbuf(int f,char *buf)
515 {
516         write_buf(f, buf, strlen(buf));
517 }
518
519
520 void write_byte(int f,unsigned char c)
521 {
522         write_buf(f,(char *)&c,1);
523 }
524
525 int read_line(int f, char *buf, int maxlen)
526 {
527         eof_error = 0;
528
529         while (maxlen) {
530                 buf[0] = 0;
531                 read_buf(f, buf, 1);
532                 if (buf[0] == 0) return 0;
533                 if (buf[0] == '\n') {
534                         buf[0] = 0;
535                         break;
536                 }
537                 if (buf[0] != '\r') {
538                         buf++;
539                         maxlen--;
540                 }
541         }
542         if (maxlen == 0) {
543                 *buf = 0;
544                 return 0;
545         }
546
547         eof_error = 1;
548
549         return 1;
550 }
551
552
553 void io_printf(int fd, const char *format, ...)
554 {
555         va_list ap;  
556         char buf[1024];
557         int len;
558         
559         va_start(ap, format);
560         len = vslprintf(buf, sizeof(buf), format, ap);
561         va_end(ap);
562
563         if (len < 0) exit_cleanup(RERR_STREAMIO);
564
565         write_sbuf(fd, buf);
566 }
567
568
569 /* setup for multiplexing an error stream with the data stream */
570 void io_start_multiplex_out(int fd)
571 {
572         multiplex_out_fd = fd;
573         io_flush();
574         io_start_buffering(fd);
575         io_multiplexing_out = 1;
576 }
577
578 /* setup for multiplexing an error stream with the data stream */
579 void io_start_multiplex_in(int fd)
580 {
581         multiplex_in_fd = fd;
582         io_flush();
583         io_multiplexing_in = 1;
584 }
585
586 /* write an message to the multiplexed error stream */
587 int io_multiplex_write(enum logcode code, char *buf, int len)
588 {
589         if (!io_multiplexing_out) return 0;
590
591         io_flush();
592         stats.total_written += (len+4);
593         mplex_write(multiplex_out_fd, code, buf, len);
594         return 1;
595 }
596
597 /* write a message to the special error fd */
598 int io_error_write(int f, enum logcode code, char *buf, int len)
599 {
600         if (f == -1) return 0;
601         mplex_write(f, code, buf, len);
602         return 1;
603 }
604
605 /* stop output multiplexing */
606 void io_multiplexing_close(void)
607 {
608         io_multiplexing_out = 0;
609 }
610
611 void io_close_input(int fd)
612 {
613         buffer_f_in = -1;
614 }
615