Made the new msg2sndr handling even better.
authorWayne Davison <wayned@samba.org>
Thu, 16 Mar 2006 09:57:32 +0000 (09:57 +0000)
committerWayne Davison <wayned@samba.org>
Thu, 16 Mar 2006 09:57:32 +0000 (09:57 +0000)
io.c

diff --git a/io.c b/io.c
index 458ed7fbd20c1d88b05f2be0983c88a0791197f2..ea3c81bd2104b6c4c3de55913631398645cc023f 100644 (file)
--- a/io.c
+++ b/io.c
@@ -1063,6 +1063,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
        size_t n, total = 0;
        fd_set w_fds, r_fds;
        int maxfd, count, cnt, using_r_fds;
+       int defer_save = defer_forwarding_messages;
        struct timeval tv;
 
        no_flush++;
@@ -1101,18 +1102,6 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                if (!FD_ISSET(fd, &w_fds))
                        continue;
 
-               if (msg2sndr.head && !defer_forwarding_messages) {
-                       struct msg_list_item *m = msg2sndr.head;
-                       int code = *((uchar*)m->buf+3) - MPLEX_BASE;
-                       if (!(msg2sndr.head = m->next))
-                               msg2sndr.tail = NULL;
-                       defer_forwarding_messages = 1;
-                       io_multiplex_write(code, m->buf+4, m->len-4);
-                       defer_forwarding_messages = 0;
-                       free(m);
-                       continue;
-               }
-
                n = len - total;
                if (bwlimit_writemax && n > bwlimit_writemax)
                        n = bwlimit_writemax;
@@ -1154,11 +1143,28 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                        sleep_for_bwlimit(cnt);
                }
        }
-       defer_forwarding_messages = 0;
 
+       defer_forwarding_messages = defer_save;
        no_flush--;
 }
 
+static void msg2sndr_flush(void)
+{
+       if (defer_forwarding_messages)
+               return;
+
+       while (msg2sndr.head) {
+               struct msg_list_item *m = msg2sndr.head;
+               if (!(msg2sndr.head = m->next))
+                       msg2sndr.tail = NULL;
+               stats.total_written += m->len;
+               defer_forwarding_messages = 1;
+               writefd_unbuffered(sock_f_out, m->buf, m->len);
+               defer_forwarding_messages = 0;
+               free(m);
+       }
+}
+
 /**
  * Write an message to a multiplexed stream. If this fails then rsync
  * exits.
@@ -1180,13 +1186,18 @@ static void mplex_write(enum msgcode code, char *buf, size_t len)
        len -= n;
        buf += n;
 
-       if (len)
+       if (len) {
+               defer_forwarding_messages = 1;
                writefd_unbuffered(sock_f_out, buf, len);
+               defer_forwarding_messages = 0;
+               msg2sndr_flush();
+       }
 }
 
 void io_flush(int flush_it_all)
 {
        msg2genr_flush(flush_it_all);
+       msg2sndr_flush();
 
        if (!iobuf_out_cnt || no_flush)
                return;