- switched on multiplexing for all connections, not just daemon
[rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   socket and pipe IO utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 /* if no timeout is specified then use a 60 second select timeout */
28 #define SELECT_TIMEOUT 60
29
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static int multiplex_in_fd;
33 static int multiplex_out_fd;
34 static time_t last_io;
35 static int eof_error=1;
36 extern int verbose;
37 extern int io_timeout;
38 extern struct stats stats;
39
40 static int buffer_f_in = -1;
41 static int io_error_fd = -1;
42
43 static void read_loop(int fd, char *buf, int len);
44
45 void setup_readbuffer(int f_in)
46 {
47         buffer_f_in = f_in;
48 }
49
50 static void check_timeout(void)
51 {
52         time_t t;
53         
54         if (!io_timeout) return;
55
56         if (!last_io) {
57                 last_io = time(NULL);
58                 return;
59         }
60
61         t = time(NULL);
62
63         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
64                 rprintf(FERROR,"io timeout after %d second - exiting\n", 
65                         (int)(t-last_io));
66                 exit_cleanup(RERR_TIMEOUT);
67         }
68 }
69
70 /* setup the fd used to propogate errors */
71 void io_set_error_fd(int fd)
72 {
73         io_error_fd = fd;
74 }
75
76 /* read some data from the error fd and write it to the write log code */
77 static void read_error_fd(void)
78 {
79         char buf[200];
80         int n;
81         int fd = io_error_fd;
82         int tag, len;
83
84         io_error_fd = -1;
85
86         read_loop(fd, buf, 4);
87         tag = IVAL(buf, 0);
88
89         len = tag & 0xFFFFFF;
90         tag = tag >> 24;
91         tag -= MPLEX_BASE;
92
93         while (len) {
94                 n = len;
95                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
96                 read_loop(fd, buf, n);
97                 rwrite((enum logcode)tag, buf, n);
98                 len -= n;
99         }
100
101         io_error_fd = fd;
102 }
103
104
105 static int no_flush;
106
107 /* read from a socket with IO timeout. return the number of
108    bytes read. If no bytes can be read then exit, never return
109    a number <= 0 */
110 static int read_timeout(int fd, char *buf, int len)
111 {
112         int n, ret=0;
113
114         io_flush();
115
116         while (ret == 0) {
117                 fd_set fds;
118                 struct timeval tv;
119                 int fd_count = fd+1;
120
121                 FD_ZERO(&fds);
122                 FD_SET(fd, &fds);
123                 if (io_error_fd != -1) {
124                         FD_SET(io_error_fd, &fds);
125                         if (io_error_fd > fd) fd_count = io_error_fd+1;
126                 }
127
128                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
129                 tv.tv_usec = 0;
130
131                 errno = 0;
132
133                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
134                         if (errno == EBADF) {
135                                 exit_cleanup(RERR_SOCKETIO);
136                         }
137                         check_timeout();
138                         continue;
139                 }
140
141                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
142                         read_error_fd();
143                 }
144
145                 if (!FD_ISSET(fd, &fds)) continue;
146
147                 n = read(fd, buf, len);
148
149                 if (n > 0) {
150                         buf += n;
151                         len -= n;
152                         ret += n;
153                         if (io_timeout)
154                                 last_io = time(NULL);
155                         continue;
156                 }
157
158                 if (n == -1 && errno == EINTR) {
159                         continue;
160                 }
161
162
163                 if (n == 0) {
164                         if (eof_error) {
165                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
166                         }
167                         exit_cleanup(RERR_STREAMIO);
168                 }
169
170                 /* this prevents us trying to write errors on a dead socket */
171                 io_multiplexing_close();
172
173                 rprintf(FERROR,"read error: %s\n", strerror(errno));
174                 exit_cleanup(RERR_STREAMIO);
175         }
176
177         return ret;
178 }
179
180 /* continue trying to read len bytes - don't return until len
181    has been read */
182 static void read_loop(int fd, char *buf, int len)
183 {
184         while (len) {
185                 int n = read_timeout(fd, buf, len);
186
187                 buf += n;
188                 len -= n;
189         }
190 }
191
192 /* read from the file descriptor handling multiplexing - 
193    return number of bytes read
194    never return <= 0 */
195 static int read_unbuffered(int fd, char *buf, int len)
196 {
197         static int remaining;
198         int tag, ret=0;
199         char line[1024];
200
201         if (!io_multiplexing_in || fd != multiplex_in_fd) 
202                 return read_timeout(fd, buf, len);
203
204         while (ret == 0) {
205                 if (remaining) {
206                         len = MIN(len, remaining);
207                         read_loop(fd, buf, len);
208                         remaining -= len;
209                         ret = len;
210                         continue;
211                 }
212
213                 read_loop(fd, line, 4);
214                 tag = IVAL(line, 0);
215
216                 remaining = tag & 0xFFFFFF;
217                 tag = tag >> 24;
218
219                 if (tag == MPLEX_BASE) continue;
220
221                 tag -= MPLEX_BASE;
222
223                 if (tag != FERROR && tag != FINFO) {
224                         rprintf(FERROR,"unexpected tag %d\n", tag);
225                         exit_cleanup(RERR_STREAMIO);
226                 }
227
228                 if (remaining > sizeof(line)-1) {
229                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
230                                 remaining);
231                         exit_cleanup(RERR_STREAMIO);
232                 }
233
234                 read_loop(fd, line, remaining);
235                 line[remaining] = 0;
236
237                 rprintf((enum logcode)tag,"%s", line);
238                 remaining = 0;
239         }
240
241         return ret;
242 }
243
244
245 /* do a buffered read from fd. don't return until all N bytes
246    have been read. If all N can't be read then exit with an error */
247 static void readfd(int fd,char *buffer,int N)
248 {
249         int  ret;
250         int total=0;  
251         
252         while (total < N) {
253                 io_flush();
254
255                 ret = read_unbuffered(fd,buffer + total,N-total);
256                 total += ret;
257         }
258
259         stats.total_read += total;
260 }
261
262
263 int32 read_int(int f)
264 {
265         char b[4];
266         int32 ret;
267
268         readfd(f,b,4);
269         ret = IVAL(b,0);
270         if (ret == (int32)0xffffffff) return -1;
271         return ret;
272 }
273
274 int64 read_longint(int f)
275 {
276         extern int remote_version;
277         int64 ret;
278         char b[8];
279         ret = read_int(f);
280
281         if ((int32)ret != (int32)0xffffffff) {
282                 return ret;
283         }
284
285 #ifdef NO_INT64
286         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
287         exit_cleanup(RERR_UNSUPPORTED);
288 #else
289         if (remote_version >= 16) {
290                 readfd(f,b,8);
291                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
292         }
293 #endif
294
295         return ret;
296 }
297
298 void read_buf(int f,char *buf,int len)
299 {
300         readfd(f,buf,len);
301 }
302
303 void read_sbuf(int f,char *buf,int len)
304 {
305         read_buf(f,buf,len);
306         buf[len] = 0;
307 }
308
309 unsigned char read_byte(int f)
310 {
311         unsigned char c;
312         read_buf(f,(char *)&c,1);
313         return c;
314 }
315
316
317
318 /* write len bytes to fd, possibly reading from buffer_f_in if set
319    in order to unclog the pipe. don't return until all len
320    bytes have been written */
321 static void writefd_unbuffered(int fd,char *buf,int len)
322 {
323         int total = 0;
324         fd_set w_fds, r_fds;
325         int fd_count, count;
326         struct timeval tv;
327
328         no_flush++;
329
330         while (total < len) {
331                 FD_ZERO(&w_fds);
332                 FD_ZERO(&r_fds);
333                 FD_SET(fd,&w_fds);
334                 fd_count = fd;
335
336                 if (io_error_fd != -1) {
337                         FD_SET(io_error_fd,&r_fds);
338                         if (io_error_fd > fd_count) 
339                                 fd_count = io_error_fd;
340                 }
341
342                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
343                 tv.tv_usec = 0;
344
345                 errno = 0;
346
347                 count = select(fd_count+1,
348                                io_error_fd != -1?&r_fds:NULL,
349                                &w_fds,NULL,
350                                &tv);
351
352                 if (count <= 0) {
353                         if (errno == EBADF) {
354                                 exit_cleanup(RERR_SOCKETIO);
355                         }
356                         check_timeout();
357                         continue;
358                 }
359
360                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
361                         read_error_fd();
362                 }
363
364                 if (FD_ISSET(fd, &w_fds)) {
365                         int ret, n = len-total;
366                         
367                         if (n > PIPE_BUF) n = PIPE_BUF;
368
369                         ret = write(fd,buf+total,n?n:1);
370
371                         if (ret == -1 && errno == EINTR) {
372                                 continue;
373                         }
374
375                         if (ret <= 0) {
376                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
377                                 exit_cleanup(RERR_STREAMIO);
378                         }
379
380                         total += ret;
381
382                         if (io_timeout)
383                                 last_io = time(NULL);
384                 }
385         }
386
387         no_flush--;
388 }
389
390
391 static char *io_buffer;
392 static int io_buffer_count;
393
394 void io_start_buffering(int fd)
395 {
396         if (io_buffer) return;
397         multiplex_out_fd = fd;
398         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
399         if (!io_buffer) out_of_memory("writefd");
400         io_buffer_count = 0;
401 }
402
403 /* write an message to a multiplexed stream. If this fails then rsync
404    exits */
405 static void mplex_write(int fd, enum logcode code, char *buf, int len)
406 {
407         char buffer[4096];
408         int n = len;
409
410         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
411
412         if (n > (sizeof(buf)-4)) {
413                 n = sizeof(buf)-4;
414         }
415
416         memcpy(&buffer[4], buf, n);
417         writefd_unbuffered(fd, buffer, n+4);
418
419         len -= n;
420         buf += n;
421
422         writefd_unbuffered(fd, buf, len);
423 }
424
425
426 void io_flush(void)
427 {
428         int fd = multiplex_out_fd;
429         if (!io_buffer_count || no_flush) return;
430
431         if (io_multiplexing_out) {
432                 mplex_write(fd, 0, io_buffer, io_buffer_count);
433         } else {
434                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
435         }
436         io_buffer_count = 0;
437 }
438
439 void io_end_buffering(int fd)
440 {
441         io_flush();
442         if (!io_multiplexing_out) {
443                 free(io_buffer);
444                 io_buffer = NULL;
445         }
446 }
447
448 static void writefd(int fd,char *buf,int len)
449 {
450         stats.total_written += len;
451
452         if (!io_buffer || fd != multiplex_out_fd) {
453                 writefd_unbuffered(fd, buf, len);
454                 return;
455         }
456
457         while (len) {
458                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
459                 if (n > 0) {
460                         memcpy(io_buffer+io_buffer_count, buf, n);
461                         buf += n;
462                         len -= n;
463                         io_buffer_count += n;
464                 }
465                 
466                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
467         }
468 }
469
470
471 void write_int(int f,int32 x)
472 {
473         char b[4];
474         SIVAL(b,0,x);
475         writefd(f,b,4);
476 }
477
478 void write_longint(int f, int64 x)
479 {
480         extern int remote_version;
481         char b[8];
482
483         if (remote_version < 16 || x <= 0x7FFFFFFF) {
484                 write_int(f, (int)x);
485                 return;
486         }
487
488         write_int(f, (int32)0xFFFFFFFF);
489         SIVAL(b,0,(x&0xFFFFFFFF));
490         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
491
492         writefd(f,b,8);
493 }
494
495 void write_buf(int f,char *buf,int len)
496 {
497         writefd(f,buf,len);
498 }
499
500 /* write a string to the connection */
501 static void write_sbuf(int f,char *buf)
502 {
503         write_buf(f, buf, strlen(buf));
504 }
505
506
507 void write_byte(int f,unsigned char c)
508 {
509         write_buf(f,(char *)&c,1);
510 }
511
512 int read_line(int f, char *buf, int maxlen)
513 {
514         eof_error = 0;
515
516         while (maxlen) {
517                 buf[0] = 0;
518                 read_buf(f, buf, 1);
519                 if (buf[0] == 0) return 0;
520                 if (buf[0] == '\n') {
521                         buf[0] = 0;
522                         break;
523                 }
524                 if (buf[0] != '\r') {
525                         buf++;
526                         maxlen--;
527                 }
528         }
529         if (maxlen == 0) {
530                 *buf = 0;
531                 return 0;
532         }
533
534         eof_error = 1;
535
536         return 1;
537 }
538
539
540 void io_printf(int fd, const char *format, ...)
541 {
542         va_list ap;  
543         char buf[1024];
544         int len;
545         
546         va_start(ap, format);
547         len = vslprintf(buf, sizeof(buf), format, ap);
548         va_end(ap);
549
550         if (len < 0) exit_cleanup(RERR_STREAMIO);
551
552         write_sbuf(fd, buf);
553 }
554
555
556 /* setup for multiplexing an error stream with the data stream */
557 void io_start_multiplex_out(int fd)
558 {
559         multiplex_out_fd = fd;
560         io_flush();
561         io_start_buffering(fd);
562         io_multiplexing_out = 1;
563 }
564
565 /* setup for multiplexing an error stream with the data stream */
566 void io_start_multiplex_in(int fd)
567 {
568         multiplex_in_fd = fd;
569         io_flush();
570         io_multiplexing_in = 1;
571 }
572
573 /* write an message to the multiplexed error stream */
574 int io_multiplex_write(enum logcode code, char *buf, int len)
575 {
576         if (!io_multiplexing_out) return 0;
577
578         io_flush();
579         stats.total_written += (len+4);
580         mplex_write(multiplex_out_fd, code, buf, len);
581         return 1;
582 }
583
584 /* write a message to the special error fd */
585 int io_error_write(int f, enum logcode code, char *buf, int len)
586 {
587         if (f == -1) return 0;
588         mplex_write(f, code, buf, len);
589         return 1;
590 }
591
592 /* stop output multiplexing */
593 void io_multiplexing_close(void)
594 {
595         io_multiplexing_out = 0;
596 }
597
598 void io_close_input(int fd)
599 {
600         buffer_f_in = -1;
601 }