a91b3db63c575f9fbf44d00aa3c15fab06483ee3
[rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   Utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 static int64 total_written;
28 static int64 total_read;
29
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static int multiplex_in_fd;
33 static int multiplex_out_fd;
34 static time_t last_io;
35
36 extern int verbose;
37 extern int sparse_files;
38 extern int io_timeout;
39
40 int64 write_total(void)
41 {
42         return total_written;
43 }
44
45 int64 read_total(void)
46 {
47         return total_read;
48 }
49
50 static int buffer_f_in = -1;
51
52 void setup_nonblocking(int f_in,int f_out)
53 {
54         set_blocking(f_out,0);
55         buffer_f_in = f_in;
56 }
57
58 static void check_timeout(void)
59 {
60         time_t t;
61         
62         if (!io_timeout) return;
63
64         if (!last_io) {
65                 last_io = time(NULL);
66                 return;
67         }
68
69         t = time(NULL);
70
71         if (last_io && io_timeout && (t-last_io)>io_timeout) {
72                 rprintf(FERROR,"read timeout after %d second - exiting\n", 
73                         (int)(t-last_io));
74                 exit_cleanup(1);
75         }
76 }
77
78
79 static char *read_buffer;
80 static char *read_buffer_p;
81 static int read_buffer_len;
82 static int read_buffer_size;
83
84
85 /* continue trying to read len bytes - don't return until len
86    has been read */
87 static void read_loop(int fd, char *buf, int len)
88 {
89         while (len) {
90                 int n = read(fd, buf, len);
91                 if (n > 0) {
92                         buf += n;
93                         len -= n;
94                 }
95                 if (n == 0) {
96                         rprintf(FERROR,"EOF in read_loop\n");
97                         exit_cleanup(1);
98                 }
99                 if (n == -1) {
100                         fd_set fds;
101                         struct timeval tv;
102
103                         if (errno != EAGAIN && errno != EWOULDBLOCK) {
104                                 rprintf(FERROR,"io error: %s\n", 
105                                         strerror(errno));
106                                 exit_cleanup(1);
107                         }
108
109                         FD_ZERO(&fds);
110                         FD_SET(fd, &fds);
111                         tv.tv_sec = io_timeout;
112                         tv.tv_usec = 0;
113
114                         if (select(fd+1, &fds, NULL, NULL, 
115                                    io_timeout?&tv:NULL) != 1) {
116                                 check_timeout();
117                         }
118                 }
119         }
120 }
121
122 static int read_unbuffered(int fd, char *buf, int len)
123 {
124         static int remaining;
125         char ibuf[4];
126         int tag, ret=0;
127         char line[1024];
128
129         if (!io_multiplexing_in || fd != multiplex_in_fd) 
130                 return read(fd, buf, len);
131
132         while (ret == 0) {
133                 if (remaining) {
134                         len = MIN(len, remaining);
135                         read_loop(fd, buf, len);
136                         remaining -= len;
137                         ret = len;
138                         continue;
139                 }
140
141                 read_loop(fd, ibuf, 4);
142                 tag = IVAL(ibuf, 0);
143
144                 remaining = tag & 0xFFFFFF;
145                 tag = tag >> 24;
146
147                 if (tag == MPLEX_BASE) continue;
148
149                 tag -= MPLEX_BASE;
150
151                 if (tag != FERROR && tag != FINFO) {
152                         rprintf(FERROR,"unexpected tag %d\n", tag);
153                         exit_cleanup(1);
154                 }
155
156                 if (remaining > sizeof(line)-1) {
157                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
158                                 remaining);
159                         exit_cleanup(1);
160                 }
161
162                 read_loop(fd, line, remaining);
163                 line[remaining] = 0;
164
165                 rprintf(tag,"%s", line);
166                 remaining = 0;
167         }
168
169         return ret;
170 }
171
172
173 /* This function was added to overcome a deadlock problem when using
174  * ssh.  It looks like we can't allow our receive queue to get full or
175  * ssh will clag up. Uggh.  */
176 static void read_check(int f)
177 {
178         int n;
179
180         if (f == -1) return;
181
182         if (read_buffer_len == 0) {
183                 read_buffer_p = read_buffer;
184         }
185
186         if ((n=num_waiting(f)) <= 0)
187                 return;
188
189         /* things could deteriorate if we read in really small chunks */
190         if (n < 10) n = 1024;
191
192         if (n > MAX_READ_BUFFER/4)
193                 n = MAX_READ_BUFFER/4;
194
195         if (read_buffer_p != read_buffer) {
196                 memmove(read_buffer,read_buffer_p,read_buffer_len);
197                 read_buffer_p = read_buffer;
198         }
199
200         if (n > (read_buffer_size - read_buffer_len)) {
201                 read_buffer_size += n;
202                 if (!read_buffer)
203                         read_buffer = (char *)malloc(read_buffer_size);
204                 else
205                         read_buffer = (char *)realloc(read_buffer,read_buffer_size);
206                 if (!read_buffer) out_of_memory("read check");      
207                 read_buffer_p = read_buffer;      
208         }
209
210         n = read_unbuffered(f,read_buffer+read_buffer_len,n);
211         if (n > 0) {
212                 read_buffer_len += n;
213         }
214 }
215
216 static int readfd(int fd,char *buffer,int N)
217 {
218         int  ret;
219         int total=0;  
220         struct timeval tv;
221         
222         if (read_buffer_len < N)
223                 read_check(buffer_f_in);
224         
225         while (total < N) {
226                 if (read_buffer_len > 0 && buffer_f_in == fd) {
227                         ret = MIN(read_buffer_len,N-total);
228                         memcpy(buffer+total,read_buffer_p,ret);
229                         read_buffer_p += ret;
230                         read_buffer_len -= ret;
231                         total += ret;
232                         continue;
233                 } 
234
235                 io_flush();
236
237                 while ((ret = read_unbuffered(fd,buffer + total,N-total)) == -1) {
238                         fd_set fds;
239
240                         if (errno != EAGAIN && errno != EWOULDBLOCK)
241                                 return -1;
242                         FD_ZERO(&fds);
243                         FD_SET(fd, &fds);
244                         tv.tv_sec = io_timeout;
245                         tv.tv_usec = 0;
246
247                         if (select(fd+1, &fds, NULL, NULL, 
248                                    io_timeout?&tv:NULL) != 1) {
249                                 check_timeout();
250                         }
251                 }
252
253                 if (ret <= 0)
254                         return total;
255                 total += ret;
256         }
257
258         if (io_timeout)
259                 last_io = time(NULL);
260         return total;
261 }
262
263
264 int32 read_int(int f)
265 {
266   int ret;
267   char b[4];
268   if ((ret=readfd(f,b,4)) != 4) {
269     if (verbose > 1) 
270       rprintf(FERROR,"(%d) read_int: Error reading %d bytes : %s\n",
271               getpid(),4,ret==-1?strerror(errno):"EOF");
272     exit_cleanup(1);
273   }
274   total_read += 4;
275   return IVAL(b,0);
276 }
277
278 int64 read_longint(int f)
279 {
280         extern int remote_version;
281         int64 ret;
282         char b[8];
283         ret = read_int(f);
284
285         if ((int32)ret != (int32)0xffffffff) return ret;
286
287 #ifdef NO_INT64
288         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
289         exit_cleanup(1);
290 #else
291         if (remote_version >= 16) {
292                 if ((ret=readfd(f,b,8)) != 8) {
293                         if (verbose > 1) 
294                                 rprintf(FERROR,"(%d) read_longint: Error reading %d bytes : %s\n",
295                                         getpid(),8,ret==-1?strerror(errno):"EOF");
296                         exit_cleanup(1);
297                 }
298                 total_read += 8;
299                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
300         }
301 #endif
302
303         return ret;
304 }
305
306 void read_buf(int f,char *buf,int len)
307 {
308   int ret;
309   if ((ret=readfd(f,buf,len)) != len) {
310     if (verbose > 1) 
311       rprintf(FERROR,"(%d) read_buf: Error reading %d bytes : %s\n",
312               getpid(),len,ret==-1?strerror(errno):"EOF");
313     exit_cleanup(1);
314   }
315   total_read += len;
316 }
317
318 void read_sbuf(int f,char *buf,int len)
319 {
320         read_buf(f,buf,len);
321         buf[len] = 0;
322 }
323
324 unsigned char read_byte(int f)
325 {
326   unsigned char c;
327   read_buf(f,(char *)&c,1);
328   return c;
329 }
330
331
332 static char last_byte;
333 static int last_sparse;
334
335 int sparse_end(int f)
336 {
337         if (last_sparse) {
338                 do_lseek(f,-1,SEEK_CUR);
339                 return (write(f,&last_byte,1) == 1 ? 0 : -1);
340         }
341         last_sparse = 0;
342         return 0;
343 }
344
345
346 static int write_sparse(int f,char *buf,int len)
347 {
348         int l1=0,l2=0;
349         int ret;
350
351         for (l1=0;l1<len && buf[l1]==0;l1++) ;
352         for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
353
354         last_byte = buf[len-1];
355
356         if (l1 == len || l2 > 0)
357                 last_sparse=1;
358
359         if (l1 > 0)
360                 do_lseek(f,l1,SEEK_CUR);  
361
362         if (l1 == len) 
363                 return len;
364
365         if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
366                 if (ret == -1 || ret == 0) return ret;
367                 return (l1+ret);
368         }
369
370         if (l2 > 0)
371                 do_lseek(f,l2,SEEK_CUR);
372         
373         return len;
374 }
375
376
377
378 int write_file(int f,char *buf,int len)
379 {
380         int ret = 0;
381
382         if (!sparse_files) 
383                 return write(f,buf,len);
384
385         while (len>0) {
386                 int len1 = MIN(len, SPARSE_WRITE_SIZE);
387                 int r1 = write_sparse(f, buf, len1);
388                 if (r1 <= 0) {
389                         if (ret > 0) return ret;
390                         return r1;
391                 }
392                 len -= r1;
393                 buf += r1;
394                 ret += r1;
395         }
396         return ret;
397 }
398
399
400 static int writefd_unbuffered(int fd,char *buf,int len)
401 {
402         int total = 0;
403         fd_set w_fds, r_fds;
404         int fd_count, count, got_select=0;
405         struct timeval tv;
406
407         while (total < len) {
408                 int ret = write(fd,buf+total,len-total);
409
410                 if (ret == 0) return total;
411
412                 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN)) 
413                         return -1;
414
415                 if (ret == -1 && got_select) {
416                         /* hmmm, we got a write select on the fd and
417                            then failed to write.  Why doesn't that
418                            mean that the fd is dead? It doesn't on
419                            some systems it seems (eg. IRIX) */
420                         u_sleep(1000);
421                 }
422
423                 got_select = 0;
424
425
426                 if (ret != -1) {
427                         total += ret;
428                         continue;
429                 }
430
431                 if (read_buffer_len < MAX_READ_BUFFER && buffer_f_in != -1)
432                         read_check(buffer_f_in);
433
434                 fd_count = fd+1;
435                 FD_ZERO(&w_fds);
436                 FD_ZERO(&r_fds);
437                 FD_SET(fd,&w_fds);
438                 if (buffer_f_in != -1) {
439                         FD_SET(buffer_f_in,&r_fds);
440                         if (buffer_f_in > fd) 
441                                 fd_count = buffer_f_in+1;
442                 }
443
444                 tv.tv_sec = BLOCKING_TIMEOUT;
445                 tv.tv_usec = 0;
446                 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
447                                &w_fds,NULL,&tv);
448                 
449                 if (count == -1 && errno != EINTR) {
450                         if (verbose > 1) 
451                                 rprintf(FERROR,"select error: %s\n", strerror(errno));
452                         exit_cleanup(1);
453                 }
454                 
455                 if (count == 0) {
456                         check_timeout();
457                         continue;
458                 }
459                 
460                 if (FD_ISSET(fd, &w_fds)) {
461                         got_select = 1;
462                 }
463         }
464
465         if (io_timeout)
466                 last_io = time(NULL);
467         
468         return total;
469 }
470
471
472 static char *io_buffer;
473 static int io_buffer_count;
474
475 void io_start_buffering(int fd)
476 {
477         if (io_buffer) return;
478         multiplex_out_fd = fd;
479         io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
480         if (!io_buffer) out_of_memory("writefd");
481         io_buffer_count = 0;
482
483         /* leave room for the multiplex header in case it's needed */
484         io_buffer += 4;
485 }
486
487 void io_flush(void)
488 {
489         int fd = multiplex_out_fd;
490         if (!io_buffer_count) return;
491
492         if (io_multiplexing_out) {
493                 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
494                 if (writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4) !=
495                     io_buffer_count+4) {
496                         rprintf(FERROR,"write failed\n");
497                         exit_cleanup(1);
498                 }
499         } else {
500                 if (writefd_unbuffered(fd, io_buffer, io_buffer_count) != 
501                     io_buffer_count) {
502                         rprintf(FERROR,"write failed\n");
503                         exit_cleanup(1);
504                 }
505         }
506         io_buffer_count = 0;
507 }
508
509 void io_end_buffering(int fd)
510 {
511         io_flush();
512         if (!io_multiplexing_out) {
513                 free(io_buffer-4);
514                 io_buffer = NULL;
515         }
516 }
517
518 static int writefd(int fd,char *buf,int len1)
519 {
520         int len = len1;
521
522         if (!io_buffer) return writefd_unbuffered(fd, buf, len);
523
524         while (len) {
525                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
526                 if (n > 0) {
527                         memcpy(io_buffer+io_buffer_count, buf, n);
528                         buf += n;
529                         len -= n;
530                         io_buffer_count += n;
531                 }
532                 
533                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
534         }
535
536         return len1;
537 }
538
539
540 void write_int(int f,int32 x)
541 {
542         int ret;
543         char b[4];
544         SIVAL(b,0,x);
545         if ((ret=writefd(f,b,4)) != 4) {
546                 rprintf(FERROR,"write_int failed : %s\n",
547                         ret==-1?strerror(errno):"EOF");
548                 exit_cleanup(1);
549         }
550         total_written += 4;
551 }
552
553 void write_longint(int f, int64 x)
554 {
555         extern int remote_version;
556         char b[8];
557         int ret;
558
559         if (remote_version < 16 || x <= 0x7FFFFFFF) {
560                 write_int(f, (int)x);
561                 return;
562         }
563
564         write_int(f, -1);
565         SIVAL(b,0,(x&0xFFFFFFFF));
566         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
567
568         if ((ret=writefd(f,b,8)) != 8) {
569                 rprintf(FERROR,"write_longint failed : %s\n",
570                         ret==-1?strerror(errno):"EOF");
571                 exit_cleanup(1);
572         }
573         total_written += 8;
574 }
575
576 void write_buf(int f,char *buf,int len)
577 {
578         int ret;
579         if ((ret=writefd(f,buf,len)) != len) {
580                 rprintf(FERROR,"write_buf failed : %s\n",
581                         ret==-1?strerror(errno):"EOF");
582                 exit_cleanup(1);
583         }
584         total_written += len;
585 }
586
587 /* write a string to the connection */
588 void write_sbuf(int f,char *buf)
589 {
590         write_buf(f, buf, strlen(buf));
591 }
592
593
594 void write_byte(int f,unsigned char c)
595 {
596         write_buf(f,(char *)&c,1);
597 }
598
599 void write_flush(int f)
600 {
601 }
602
603
604 int read_line(int f, char *buf, int maxlen)
605 {
606         while (maxlen) {
607                 read_buf(f, buf, 1);
608                 if (buf[0] == '\n') {
609                         buf[0] = 0;
610                         break;
611                 }
612                 if (buf[0] != '\r') {
613                         buf++;
614                         maxlen--;
615                 }
616         }
617         if (maxlen == 0) {
618                 *buf = 0;
619                 return 0;
620         }
621         return 1;
622 }
623
624
625 void io_printf(int fd, const char *format, ...)
626 {
627         va_list ap;  
628         char buf[1024];
629         int len;
630         
631         va_start(ap, format);
632         len = vslprintf(buf, sizeof(buf)-1, format, ap);
633         va_end(ap);
634
635         if (len < 0) exit_cleanup(1);
636
637         write_sbuf(fd, buf);
638 }
639
640
641 /* setup for multiplexing an error stream with the data stream */
642 void io_start_multiplex_out(int fd)
643 {
644         multiplex_out_fd = fd;
645         io_flush();
646         io_start_buffering(fd);
647         io_multiplexing_out = 1;
648 }
649
650 /* setup for multiplexing an error stream with the data stream */
651 void io_start_multiplex_in(int fd)
652 {
653         multiplex_in_fd = fd;
654         io_flush();
655         if (read_buffer_len) {
656                 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
657                 exit_cleanup(1);
658         }
659
660         io_multiplexing_in = 1;
661 }
662
663 /* write an message to the error stream */
664 int io_multiplex_write(int f, char *buf, int len)
665 {
666         if (!io_multiplexing_out) return 0;
667
668         io_flush();
669
670         SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
671         memcpy(io_buffer, buf, len);
672
673         writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
674         return 1;
675 }
676
677 void io_close_input(int fd)
678 {
679         buffer_f_in = -1;
680 }