ceph: add destination file data sync before doing any remote copy
[sfrench/cifs-2.6.git] / fs / ceph / file.c
index 0fa6b6b6ccbcf5bbc56340d9505aa0b45806aa30..189df668b6a0cf0dc2d000184d170ce58d248076 100644 (file)
@@ -1,5 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -614,7 +615,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 
                more = len < iov_iter_count(to);
 
-               if (unlikely(to->type & ITER_PIPE)) {
+               if (unlikely(iov_iter_is_pipe(to))) {
                        ret = iov_iter_get_pages_alloc(to, &pages, len,
                                                       &page_off);
                        if (ret <= 0) {
@@ -661,7 +662,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
                        ret += zlen;
                }
 
-               if (unlikely(to->type & ITER_PIPE)) {
+               if (unlikely(iov_iter_is_pipe(to))) {
                        if (ret > 0) {
                                iov_iter_advance(to, ret);
                                off += ret;
@@ -814,7 +815,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
                                aio_req->total_len = rc + zlen;
                        }
 
-                       iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs,
+                       iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs,
                                      osd_data->num_bvecs,
                                      osd_data->bvec_pos.iter.bi_size);
                        iov_iter_advance(&i, rc);
@@ -1037,8 +1038,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
                                int zlen = min_t(size_t, len - ret,
                                                 size - pos - ret);
 
-                               iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages,
-                                             len);
+                               iov_iter_bvec(&i, READ, bvecs, num_pages, len);
                                iov_iter_advance(&i, ret);
                                iov_iter_zero(zlen, &i);
                                ret += zlen;
@@ -1795,6 +1795,307 @@ unlock:
        return ret;
 }
 
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci.  Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
+                         loff_t src_endoff, int *src_got,
+                         struct ceph_inode_info *dst_ci,
+                         loff_t dst_endoff, int *dst_got)
+{
+       int ret = 0;
+       bool retrying = false;
+
+retry_caps:
+       ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+                           dst_endoff, dst_got, NULL);
+       if (ret < 0)
+               return ret;
+
+       /*
+        * Since we're already holding the FILE_WR capability for the dst file,
+        * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
+        * retry dance instead to try to get both capabilities.
+        */
+       ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+                               false, src_got);
+       if (ret <= 0) {
+               /* Start by dropping dst_ci caps and getting src_ci caps */
+               ceph_put_cap_refs(dst_ci, *dst_got);
+               if (retrying) {
+                       if (!ret)
+                               /* ceph_try_get_caps masks EAGAIN */
+                               ret = -EAGAIN;
+                       return ret;
+               }
+               ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+                                   CEPH_CAP_FILE_SHARED, src_endoff,
+                                   src_got, NULL);
+               if (ret < 0)
+                       return ret;
+               /*... drop src_ci caps too, and retry */
+               ceph_put_cap_refs(src_ci, *src_got);
+               retrying = true;
+               goto retry_caps;
+       }
+       return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+                          struct ceph_inode_info *dst_ci, int dst_got)
+{
+       ceph_put_cap_refs(src_ci, src_got);
+       ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ *  - source file is smaller than off+len
+ *  - destination file size is not OK (inode_newsize_ok())
+ *  - max bytes quotas is exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+                          loff_t src_off, loff_t dst_off, size_t len)
+{
+       loff_t size, endoff;
+
+       size = i_size_read(src_inode);
+       /*
+        * Don't copy beyond source file EOF.  Instead of simply setting length
+        * to (size - src_off), just drop to VFS default implementation, as the
+        * local i_size may be stale due to other clients writing to the source
+        * inode.
+        */
+       if (src_off + len > size) {
+               dout("Copy beyond EOF (%llu + %zu > %llu)\n",
+                    src_off, len, size);
+               return -EOPNOTSUPP;
+       }
+       size = i_size_read(dst_inode);
+
+       endoff = dst_off + len;
+       if (inode_newsize_ok(dst_inode, endoff))
+               return -EOPNOTSUPP;
+
+       if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+               return -EDQUOT;
+
+       return 0;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+                                   struct file *dst_file, loff_t dst_off,
+                                   size_t len, unsigned int flags)
+{
+       struct inode *src_inode = file_inode(src_file);
+       struct inode *dst_inode = file_inode(dst_file);
+       struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+       struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+       struct ceph_cap_flush *prealloc_cf;
+       struct ceph_object_locator src_oloc, dst_oloc;
+       struct ceph_object_id src_oid, dst_oid;
+       loff_t endoff = 0, size;
+       ssize_t ret = -EIO;
+       u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+       u32 src_objlen, dst_objlen, object_size;
+       int src_got = 0, dst_got = 0, err, dirty;
+       bool do_final_copy = false;
+
+       if (src_inode == dst_inode)
+               return -EINVAL;
+       if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+               return -EROFS;
+
+       /*
+        * Some of the checks below will return -EOPNOTSUPP, which will force a
+        * fallback to the default VFS copy_file_range implementation.  This is
+        * desirable in several cases (for ex, the 'len' is smaller than the
+        * size of the objects, or in cases where that would be more
+        * efficient).
+        */
+
+       if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM))
+               return -EOPNOTSUPP;
+
+       if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+           (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
+           (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
+               return -EOPNOTSUPP;
+
+       if (len < src_ci->i_layout.object_size)
+               return -EOPNOTSUPP; /* no remote copy will be done */
+
+       prealloc_cf = ceph_alloc_cap_flush();
+       if (!prealloc_cf)
+               return -ENOMEM;
+
+       /* Start by sync'ing the source and destination files */
+       ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+       if (ret < 0) {
+               dout("failed to write src file (%zd)\n", ret);
+               goto out;
+       }
+       ret = file_write_and_wait_range(dst_file, dst_off, (dst_off + len));
+       if (ret < 0) {
+               dout("failed to write dst file (%zd)\n", ret);
+               goto out;
+       }
+
+       /*
+        * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+        * clients may have dirty data in their caches.  And OSDs know nothing
+        * about caps, so they can't safely do the remote object copies.
+        */
+       err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
+                            dst_ci, (dst_off + len), &dst_got);
+       if (err < 0) {
+               dout("get_rd_wr_caps returned %d\n", err);
+               ret = -EOPNOTSUPP;
+               goto out;
+       }
+
+       ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+       if (ret < 0)
+               goto out_caps;
+
+       size = i_size_read(dst_inode);
+       endoff = dst_off + len;
+
+       /* Drop dst file cached pages */
+       ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+                                           dst_off >> PAGE_SHIFT,
+                                           endoff >> PAGE_SHIFT);
+       if (ret < 0) {
+               dout("Failed to invalidate inode pages (%zd)\n", ret);
+               ret = 0; /* XXX */
+       }
+       src_oloc.pool = src_ci->i_layout.pool_id;
+       src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+       dst_oloc.pool = dst_ci->i_layout.pool_id;
+       dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+       ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+                                     src_ci->i_layout.object_size,
+                                     &src_objnum, &src_objoff, &src_objlen);
+       ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+                                     dst_ci->i_layout.object_size,
+                                     &dst_objnum, &dst_objoff, &dst_objlen);
+       /* object-level offsets need to the same */
+       if (src_objoff != dst_objoff) {
+               ret = -EOPNOTSUPP;
+               goto out_caps;
+       }
+
+       /*
+        * Do a manual copy if the object offset isn't object aligned.
+        * 'src_objlen' contains the bytes left until the end of the object,
+        * starting at the src_off
+        */
+       if (src_objoff) {
+               /*
+                * we need to temporarily drop all caps as we'll be calling
+                * {read,write}_iter, which will get caps again.
+                */
+               put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+               ret = do_splice_direct(src_file, &src_off, dst_file,
+                                      &dst_off, src_objlen, flags);
+               if (ret < 0) {
+                       dout("do_splice_direct returned %d\n", err);
+                       goto out;
+               }
+               len -= ret;
+               err = get_rd_wr_caps(src_ci, (src_off + len),
+                                    &src_got, dst_ci,
+                                    (dst_off + len), &dst_got);
+               if (err < 0)
+                       goto out;
+               err = is_file_size_ok(src_inode, dst_inode,
+                                     src_off, dst_off, len);
+               if (err < 0)
+                       goto out_caps;
+       }
+       object_size = src_ci->i_layout.object_size;
+       while (len >= object_size) {
+               ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+                                             object_size, &src_objnum,
+                                             &src_objoff, &src_objlen);
+               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+                                             object_size, &dst_objnum,
+                                             &dst_objoff, &dst_objlen);
+               ceph_oid_init(&src_oid);
+               ceph_oid_printf(&src_oid, "%llx.%08llx",
+                               src_ci->i_vino.ino, src_objnum);
+               ceph_oid_init(&dst_oid);
+               ceph_oid_printf(&dst_oid, "%llx.%08llx",
+                               dst_ci->i_vino.ino, dst_objnum);
+               /* Do an object remote copy */
+               err = ceph_osdc_copy_from(
+                       &ceph_inode_to_client(src_inode)->client->osdc,
+                       src_ci->i_vino.snap, 0,
+                       &src_oid, &src_oloc,
+                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+                       CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+                       &dst_oid, &dst_oloc,
+                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+                       CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
+               if (err) {
+                       dout("ceph_osdc_copy_from returned %d\n", err);
+                       if (!ret)
+                               ret = err;
+                       goto out_caps;
+               }
+               len -= object_size;
+               src_off += object_size;
+               dst_off += object_size;
+               ret += object_size;
+       }
+
+       if (len)
+               /* We still need one final local copy */
+               do_final_copy = true;
+
+       file_update_time(dst_file);
+       if (endoff > size) {
+               int caps_flags = 0;
+
+               /* Let the MDS know about dst file size change */
+               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+                       caps_flags |= CHECK_CAPS_NODELAY;
+               if (ceph_inode_set_size(dst_inode, endoff))
+                       caps_flags |= CHECK_CAPS_AUTHONLY;
+               if (caps_flags)
+                       ceph_check_caps(dst_ci, caps_flags, NULL);
+       }
+       /* Mark Fw dirty */
+       spin_lock(&dst_ci->i_ceph_lock);
+       dst_ci->i_inline_version = CEPH_INLINE_NONE;
+       dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+       spin_unlock(&dst_ci->i_ceph_lock);
+       if (dirty)
+               __mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+       put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+
+       if (do_final_copy) {
+               err = do_splice_direct(src_file, &src_off, dst_file,
+                                      &dst_off, len, flags);
+               if (err < 0) {
+                       dout("do_splice_direct returned %d\n", err);
+                       goto out;
+               }
+               len -= err;
+               ret += err;
+       }
+
+out:
+       ceph_free_cap_flush(prealloc_cf);
+
+       return ret;
+}
+
 const struct file_operations ceph_file_fops = {
        .open = ceph_open,
        .release = ceph_release,
@@ -1810,5 +2111,5 @@ const struct file_operations ceph_file_fops = {
        .unlocked_ioctl = ceph_ioctl,
        .compat_ioctl   = ceph_ioctl,
        .fallocate      = ceph_fallocate,
+       .copy_file_range = ceph_copy_file_range,
 };
-