ceph: ignore wbc->range_{start,end} when write back snapshot data
authorYan, Zheng <zyan@redhat.com>
Fri, 1 Sep 2017 08:53:58 +0000 (16:53 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 6 Sep 2017 17:56:58 +0000 (19:56 +0200)
writepages() needs to write dirty pages to OSD in strict order of
snapshot context. It must first write dirty pages associated with
the oldest snapshot context. In the write range case, dirty pages
in the specified range can be associated with newer snapc. They
are not writeable until we write all dirty pages associated with
the oldest snapc.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/addr.c

index 4a54f7369f51a597f409dfac55fea03d681c2a25..201e529e8a6cf1db5bb54a8a60d0aea1400a3e5a 100644 (file)
@@ -469,6 +469,7 @@ struct ceph_writeback_ctl
        u64 truncate_size;
        u32 truncate_seq;
        bool size_stable;
+       bool head_snapc;
 };
 
 /*
@@ -504,6 +505,7 @@ get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
                        }
                        ctl->truncate_size = capsnap->truncate_size;
                        ctl->truncate_seq = capsnap->truncate_seq;
+                       ctl->head_snapc = false;
                }
 
                if (snapc)
@@ -524,6 +526,7 @@ get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
                        ctl->truncate_size = ci->i_truncate_size;
                        ctl->truncate_seq = ci->i_truncate_seq;
                        ctl->size_stable = false;
+                       ctl->head_snapc = true;
                }
        }
        spin_unlock(&ci->i_ceph_lock);
@@ -781,7 +784,7 @@ static int ceph_writepages_start(struct address_space *mapping,
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_vino vino = ceph_vino(inode);
-       pgoff_t index, start_index, end;
+       pgoff_t index, start_index, end = -1;
        struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
        struct pagevec pvec;
        int rc = 0;
@@ -810,25 +813,10 @@ static int ceph_writepages_start(struct address_space *mapping,
        pagevec_init(&pvec, 0);
 
        start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
-
-       /* where to start/end? */
-       if (wbc->range_cyclic) {
-               index = start_index;
-               end = -1;
-               should_loop = (index > 0);
-               dout(" cyclic, start at %lu\n", index);
-       } else {
-               index = wbc->range_start >> PAGE_SHIFT;
-               end = wbc->range_end >> PAGE_SHIFT;
-               if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
-                       range_whole = true;
-               should_loop = false;
-               dout(" not cyclic, %lu to %lu\n", index, end);
-       }
+       index = start_index;
 
 retry:
        /* find oldest snap context with dirty data */
-       ceph_put_snap_context(snapc);
        snapc = get_oldest_context(inode, &ceph_wbc, NULL);
        if (!snapc) {
                /* hmm, why does writepages get called when there
@@ -839,13 +827,33 @@ retry:
        dout(" oldest snapc is %p seq %lld (%d snaps)\n",
             snapc, snapc->seq, snapc->num_snaps);
 
-       if (last_snapc && snapc != last_snapc) {
-               /* if we switched to a newer snapc, restart our scan at the
-                * start of the original file range. */
-               dout("  snapc differs from last pass, restarting at %lu\n",
-                    index);
-               index = start;
+       should_loop = false;
+       if (ceph_wbc.head_snapc && snapc != last_snapc) {
+               /* where to start/end? */
+               if (wbc->range_cyclic) {
+                       index = start_index;
+                       end = -1;
+                       if (index > 0)
+                               should_loop = true;
+                       dout(" cyclic, start at %lu\n", index);
+               } else {
+                       index = wbc->range_start >> PAGE_SHIFT;
+                       end = wbc->range_end >> PAGE_SHIFT;
+                       if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
+                               range_whole = true;
+                       dout(" not cyclic, %lu to %lu\n", index, end);
+               }
+       } else if (!ceph_wbc.head_snapc) {
+               /* Do not respect wbc->range_{start,end}. Dirty pages
+                * in that range can be associated with newer snapc.
+                * They are not writeable until we write all dirty pages
+                * associated with 'snapc' get written */
+               if (index > 0 || wbc->sync_mode != WB_SYNC_NONE)
+                       should_loop = true;
+               dout(" non-head snapc, range whole\n");
        }
+
+       ceph_put_snap_context(last_snapc);
        last_snapc = snapc;
 
        stop = false;
@@ -891,7 +899,9 @@ get_more_pages:
                                dout("end of range %p\n", page);
                                /* can't be range_cyclic (1st pass) because
                                 * end == -1 in that case. */
-                               stop = done = true;
+                               stop = true;
+                               if (ceph_wbc.head_snapc)
+                                       done = true;
                                unlock_page(page);
                                break;
                        }
@@ -1136,24 +1146,26 @@ new_request:
                if (pages)
                        goto new_request;
 
-               if (wbc->nr_to_write <= 0)
-                       stop = done = true;
+               /*
+                * We stop writing back only if we are not doing
+                * integrity sync. In case of integrity sync we have to
+                * keep going until we have written all the pages
+                * we tagged for writeback prior to entering this loop.
+                */
+               if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
+                       done = stop = true;
 
 release_pvec_pages:
                dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
                     pvec.nr ? pvec.pages[0] : NULL);
                pagevec_release(&pvec);
-
-               if (locked_pages && !done)
-                       goto retry;
        }
 
        if (should_loop && !done) {
                /* more to do; loop back to beginning of file */
                dout("writepages looping back to beginning of file\n");
-               should_loop = false;
-               end = start_index - 1;
-
+               end = start_index - 1; /* OK even when start_index == 0 */
+               start_index = 0;
                index = 0;
                goto retry;
        }
@@ -1163,8 +1175,8 @@ release_pvec_pages:
 
 out:
        ceph_osdc_put_request(req);
-       ceph_put_snap_context(snapc);
-       dout("writepages done, rc = %d\n", rc);
+       ceph_put_snap_context(last_snapc);
+       dout("writepages dend - startone, rc = %d\n", rc);
        return rc;
 }