ceph: support copy_file_range file operation
[sfrench/cifs-2.6.git] / fs / ceph / file.c
index 92ab2043368291e68c5c4b67b2da7e6bfdd25fcf..5557ec6760ea5dddfb8a62083d873c06fce21551 100644 (file)
@@ -1,5 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -556,91 +557,27 @@ enum {
        READ_INLINE =  3,
 };
 
-/*
- * Read a range of bytes striped over one or more objects.  Iterate over
- * objects we stripe over.  (That's not atomic, but good enough for now.)
- *
- * If we get a short result from the OSD, check against i_size; we need to
- * only return a short read to the caller if we hit EOF.
- */
-static int striped_read(struct inode *inode,
-                       u64 pos, u64 len,
-                       struct page **pages, int num_pages,
-                       int page_align, int *checkeof)
-{
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       u64 this_len;
-       loff_t i_size;
-       int page_idx;
-       int ret, read = 0;
-       bool hit_stripe, was_short;
-
-       /*
-        * we may need to do multiple reads.  not atomic, unfortunately.
-        */
-more:
-       this_len = len;
-       page_idx = (page_align + read) >> PAGE_SHIFT;
-       ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
-                                 &ci->i_layout, pos, &this_len,
-                                 ci->i_truncate_seq, ci->i_truncate_size,
-                                 pages + page_idx, num_pages - page_idx,
-                                 ((page_align + read) & ~PAGE_MASK));
-       if (ret == -ENOENT)
-               ret = 0;
-       hit_stripe = this_len < len;
-       was_short = ret >= 0 && ret < this_len;
-       dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
-            ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
-
-       i_size = i_size_read(inode);
-       if (ret >= 0) {
-               if (was_short && (pos + ret < i_size)) {
-                       int zlen = min(this_len - ret, i_size - pos - ret);
-                       int zoff = page_align + read + ret;
-                       dout(" zero gap %llu to %llu\n",
-                            pos + ret, pos + ret + zlen);
-                       ceph_zero_page_vector_range(zoff, zlen, pages);
-                       ret += zlen;
-               }
-
-               read += ret;
-               pos += ret;
-               len -= ret;
-
-               /* hit stripe and need continue*/
-               if (len && hit_stripe && pos < i_size)
-                       goto more;
-       }
-
-       if (read > 0) {
-               ret = read;
-               /* did we bounce off eof? */
-               if (pos + len > i_size)
-                       *checkeof = CHECK_EOF;
-       }
-
-       dout("striped_read returns %d\n", ret);
-       return ret;
-}
-
 /*
  * Completely synchronous read and write methods.  Direct from __user
  * buffer to osd, or directly to user pages (if O_DIRECT).
  *
- * If the read spans object boundary, just do multiple reads.
+ * If the read spans object boundary, just do multiple reads.  (That's not
+ * atomic, but good enough for now.)
+ *
+ * If we get a short result from the OSD, check against i_size; we need to
+ * only return a short read to the caller if we hit EOF.
  */
 static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
-                             int *checkeof)
+                             int *retry_op)
 {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
-       struct page **pages;
-       u64 off = iocb->ki_pos;
-       int num_pages;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_osd_client *osdc = &fsc->client->osdc;
        ssize_t ret;
-       size_t len = iov_iter_count(to);
+       u64 off = iocb->ki_pos;
+       u64 len = iov_iter_count(to);
 
        dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
             (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -653,61 +590,118 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
         * but it will at least behave sensibly when they are
         * in sequence.
         */
-       ret = filemap_write_and_wait_range(inode->i_mapping, off,
-                                               off + len);
+       ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len);
        if (ret < 0)
                return ret;
 
-       if (unlikely(to->type & ITER_PIPE)) {
+       ret = 0;
+       while ((len = iov_iter_count(to)) > 0) {
+               struct ceph_osd_request *req;
+               struct page **pages;
+               int num_pages;
                size_t page_off;
-               ret = iov_iter_get_pages_alloc(to, &pages, len,
-                                              &page_off);
-               if (ret <= 0)
-                       return -ENOMEM;
-               num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+               u64 i_size;
+               bool more;
+
+               req = ceph_osdc_new_request(osdc, &ci->i_layout,
+                                       ci->i_vino, off, &len, 0, 1,
+                                       CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+                                       NULL, ci->i_truncate_seq,
+                                       ci->i_truncate_size, false);
+               if (IS_ERR(req)) {
+                       ret = PTR_ERR(req);
+                       break;
+               }
+
+               more = len < iov_iter_count(to);
 
-               ret = striped_read(inode, off, ret, pages, num_pages,
-                                  page_off, checkeof);
-               if (ret > 0) {
-                       iov_iter_advance(to, ret);
-                       off += ret;
+               if (unlikely(to->type & ITER_PIPE)) {
+                       ret = iov_iter_get_pages_alloc(to, &pages, len,
+                                                      &page_off);
+                       if (ret <= 0) {
+                               ceph_osdc_put_request(req);
+                               ret = -ENOMEM;
+                               break;
+                       }
+                       num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+                       if (ret < len) {
+                               len = ret;
+                               osd_req_op_extent_update(req, 0, len);
+                               more = false;
+                       }
                } else {
-                       iov_iter_advance(to, 0);
+                       num_pages = calc_pages_for(off, len);
+                       page_off = off & ~PAGE_MASK;
+                       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+                       if (IS_ERR(pages)) {
+                               ceph_osdc_put_request(req);
+                               ret = PTR_ERR(pages);
+                               break;
+                       }
                }
-               ceph_put_page_vector(pages, num_pages, false);
-       } else {
-               num_pages = calc_pages_for(off, len);
-               pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-               if (IS_ERR(pages))
-                       return PTR_ERR(pages);
-
-               ret = striped_read(inode, off, len, pages, num_pages,
-                                  (off & ~PAGE_MASK), checkeof);
-               if (ret > 0) {
-                       int l, k = 0;
-                       size_t left = ret;
-
-                       while (left) {
-                               size_t page_off = off & ~PAGE_MASK;
-                               size_t copy = min_t(size_t, left,
-                                                   PAGE_SIZE - page_off);
-                               l = copy_page_to_iter(pages[k++], page_off,
-                                                     copy, to);
-                               off += l;
-                               left -= l;
-                               if (l < copy)
+
+               osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
+                                                false, false);
+               ret = ceph_osdc_start_request(osdc, req, false);
+               if (!ret)
+                       ret = ceph_osdc_wait_request(osdc, req);
+               ceph_osdc_put_request(req);
+
+               i_size = i_size_read(inode);
+               dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
+                    off, len, ret, i_size, (more ? " MORE" : ""));
+
+               if (ret == -ENOENT)
+                       ret = 0;
+               if (ret >= 0 && ret < len && (off + ret < i_size)) {
+                       int zlen = min(len - ret, i_size - off - ret);
+                       int zoff = page_off + ret;
+                       dout("sync_read zero gap %llu~%llu\n",
+                             off + ret, off + ret + zlen);
+                       ceph_zero_page_vector_range(zoff, zlen, pages);
+                       ret += zlen;
+               }
+
+               if (unlikely(to->type & ITER_PIPE)) {
+                       if (ret > 0) {
+                               iov_iter_advance(to, ret);
+                               off += ret;
+                       } else {
+                               iov_iter_advance(to, 0);
+                       }
+                       ceph_put_page_vector(pages, num_pages, false);
+               } else {
+                       int idx = 0;
+                       size_t left = ret > 0 ? ret : 0;
+                       while (left > 0) {
+                               size_t len, copied;
+                               page_off = off & ~PAGE_MASK;
+                               len = min_t(size_t, left, PAGE_SIZE - page_off);
+                               copied = copy_page_to_iter(pages[idx++],
+                                                          page_off, len, to);
+                               off += copied;
+                               left -= copied;
+                               if (copied < len) {
+                                       ret = -EFAULT;
                                        break;
+                               }
                        }
+                       ceph_release_page_vector(pages, num_pages);
                }
-               ceph_release_page_vector(pages, num_pages);
+
+               if (ret <= 0 || off >= i_size || !more)
+                       break;
        }
 
        if (off > iocb->ki_pos) {
+               if (ret >= 0 &&
+                   iov_iter_count(to) > 0 && off >= i_size_read(inode))
+                       *retry_op = CHECK_EOF;
                ret = off - iocb->ki_pos;
                iocb->ki_pos = off;
        }
 
-       dout("sync_read result %zd\n", ret);
+       dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
        return ret;
 }
 
@@ -865,7 +859,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
        }
        spin_unlock(&ci->i_ceph_lock);
 
-       req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+       req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
                        false, GFP_NOFS);
        if (!req) {
                ret = -ENOMEM;
@@ -877,6 +871,11 @@ static void ceph_aio_retry_work(struct work_struct *work)
        ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
        ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
 
+       req->r_ops[0] = orig_req->r_ops[0];
+
+       req->r_mtime = aio_req->mtime;
+       req->r_data_offset = req->r_ops[0].extent.offset;
+
        ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
        if (ret) {
                ceph_osdc_put_request(req);
@@ -884,11 +883,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
                goto out;
        }
 
-       req->r_ops[0] = orig_req->r_ops[0];
-
-       req->r_mtime = aio_req->mtime;
-       req->r_data_offset = req->r_ops[0].extent.offset;
-
        ceph_osdc_put_request(orig_req);
 
        req->r_callback = ceph_aio_complete_req;
@@ -1735,7 +1729,6 @@ static long ceph_fallocate(struct file *file, int mode,
        struct ceph_file_info *fi = file->private_data;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_cap_flush *prealloc_cf;
        int want, got = 0;
        int dirty;
@@ -1743,10 +1736,7 @@ static long ceph_fallocate(struct file *file, int mode,
        loff_t endoff = 0;
        loff_t size;
 
-       if ((offset + length) > max(i_size_read(inode), fsc->max_file_size))
-               return -EFBIG;
-
-       if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
+       if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
                return -EOPNOTSUPP;
 
        if (!S_ISREG(inode->i_mode))
@@ -1763,18 +1753,6 @@ static long ceph_fallocate(struct file *file, int mode,
                goto unlock;
        }
 
-       if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) &&
-           ceph_quota_is_max_bytes_exceeded(inode, offset + length)) {
-               ret = -EDQUOT;
-               goto unlock;
-       }
-
-       if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL) &&
-           !(mode & FALLOC_FL_PUNCH_HOLE)) {
-               ret = -ENOSPC;
-               goto unlock;
-       }
-
        if (ci->i_inline_version != CEPH_INLINE_NONE) {
                ret = ceph_uninline_data(file, NULL);
                if (ret < 0)
@@ -1782,12 +1760,12 @@ static long ceph_fallocate(struct file *file, int mode,
        }
 
        size = i_size_read(inode);
-       if (!(mode & FALLOC_FL_KEEP_SIZE)) {
-               endoff = offset + length;
-               ret = inode_newsize_ok(inode, endoff);
-               if (ret)
-                       goto unlock;
-       }
+
+       /* Are we punching a hole beyond EOF? */
+       if (offset >= size)
+               goto unlock;
+       if ((offset + length) > size)
+               length = size - offset;
 
        if (fi->fmode & CEPH_FILE_MODE_LAZY)
                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
@@ -1798,16 +1776,8 @@ static long ceph_fallocate(struct file *file, int mode,
        if (ret < 0)
                goto unlock;
 
-       if (mode & FALLOC_FL_PUNCH_HOLE) {
-               if (offset < size)
-                       ceph_zero_pagecache_range(inode, offset, length);
-               ret = ceph_zero_objects(inode, offset, length);
-       } else if (endoff > size) {
-               truncate_pagecache_range(inode, size, -1);
-               if (ceph_inode_set_size(inode, endoff))
-                       ceph_check_caps(ceph_inode(inode),
-                               CHECK_CAPS_AUTHONLY, NULL);
-       }
+       ceph_zero_pagecache_range(inode, offset, length);
+       ret = ceph_zero_objects(inode, offset, length);
 
        if (!ret) {
                spin_lock(&ci->i_ceph_lock);
@@ -1817,9 +1787,6 @@ static long ceph_fallocate(struct file *file, int mode,
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
                        __mark_inode_dirty(inode, dirty);
-               if ((endoff > size) &&
-                   ceph_quota_is_max_bytes_approaching(inode, endoff))
-                       ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
        }
 
        ceph_put_cap_refs(ci, got);
@@ -1829,6 +1796,297 @@ unlock:
        return ret;
 }
 
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci.  Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
+                         loff_t src_endoff, int *src_got,
+                         struct ceph_inode_info *dst_ci,
+                         loff_t dst_endoff, int *dst_got)
+{
+       int ret = 0;
+       bool retrying = false;
+
+retry_caps:
+       ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+                           dst_endoff, dst_got, NULL);
+       if (ret < 0)
+               return ret;
+
+       /*
+        * Since we're already holding the FILE_WR capability for the dst file,
+        * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
+        * retry dance instead to try to get both capabilities.
+        */
+       ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+                               false, src_got);
+       if (ret <= 0) {
+               /* Start by dropping dst_ci caps and getting src_ci caps */
+               ceph_put_cap_refs(dst_ci, *dst_got);
+               if (retrying) {
+                       if (!ret)
+                               /* ceph_try_get_caps masks EAGAIN */
+                               ret = -EAGAIN;
+                       return ret;
+               }
+               ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+                                   CEPH_CAP_FILE_SHARED, src_endoff,
+                                   src_got, NULL);
+               if (ret < 0)
+                       return ret;
+               /*... drop src_ci caps too, and retry */
+               ceph_put_cap_refs(src_ci, *src_got);
+               retrying = true;
+               goto retry_caps;
+       }
+       return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+                          struct ceph_inode_info *dst_ci, int dst_got)
+{
+       ceph_put_cap_refs(src_ci, src_got);
+       ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ *  - source file is smaller than off+len
+ *  - destination file size is not OK (inode_newsize_ok())
+ *  - max bytes quotas is exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+                          loff_t src_off, loff_t dst_off, size_t len)
+{
+       loff_t size, endoff;
+
+       size = i_size_read(src_inode);
+       /*
+        * Don't copy beyond source file EOF.  Instead of simply setting length
+        * to (size - src_off), just drop to VFS default implementation, as the
+        * local i_size may be stale due to other clients writing to the source
+        * inode.
+        */
+       if (src_off + len > size) {
+               dout("Copy beyond EOF (%llu + %zu > %llu)\n",
+                    src_off, len, size);
+               return -EOPNOTSUPP;
+       }
+       size = i_size_read(dst_inode);
+
+       endoff = dst_off + len;
+       if (inode_newsize_ok(dst_inode, endoff))
+               return -EOPNOTSUPP;
+
+       if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+               return -EDQUOT;
+
+       return 0;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+                                   struct file *dst_file, loff_t dst_off,
+                                   size_t len, unsigned int flags)
+{
+       struct inode *src_inode = file_inode(src_file);
+       struct inode *dst_inode = file_inode(dst_file);
+       struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+       struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+       struct ceph_cap_flush *prealloc_cf;
+       struct ceph_object_locator src_oloc, dst_oloc;
+       struct ceph_object_id src_oid, dst_oid;
+       loff_t endoff = 0, size;
+       ssize_t ret = -EIO;
+       u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+       u32 src_objlen, dst_objlen, object_size;
+       int src_got = 0, dst_got = 0, err, dirty;
+       bool do_final_copy = false;
+
+       if (src_inode == dst_inode)
+               return -EINVAL;
+       if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+               return -EROFS;
+
+       /*
+        * Some of the checks below will return -EOPNOTSUPP, which will force a
+        * fallback to the default VFS copy_file_range implementation.  This is
+        * desirable in several cases (for ex, the 'len' is smaller than the
+        * size of the objects, or in cases where that would be more
+        * efficient).
+        */
+
+       if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+           (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
+           (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
+               return -EOPNOTSUPP;
+
+       if (len < src_ci->i_layout.object_size)
+               return -EOPNOTSUPP; /* no remote copy will be done */
+
+       prealloc_cf = ceph_alloc_cap_flush();
+       if (!prealloc_cf)
+               return -ENOMEM;
+
+       /* Start by sync'ing the source file */
+       ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+       if (ret < 0)
+               goto out;
+
+       /*
+        * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+        * clients may have dirty data in their caches.  And OSDs know nothing
+        * about caps, so they can't safely do the remote object copies.
+        */
+       err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
+                            dst_ci, (dst_off + len), &dst_got);
+       if (err < 0) {
+               dout("get_rd_wr_caps returned %d\n", err);
+               ret = -EOPNOTSUPP;
+               goto out;
+       }
+
+       ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+       if (ret < 0)
+               goto out_caps;
+
+       size = i_size_read(dst_inode);
+       endoff = dst_off + len;
+
+       /* Drop dst file cached pages */
+       ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+                                           dst_off >> PAGE_SHIFT,
+                                           endoff >> PAGE_SHIFT);
+       if (ret < 0) {
+               dout("Failed to invalidate inode pages (%zd)\n", ret);
+               ret = 0; /* XXX */
+       }
+       src_oloc.pool = src_ci->i_layout.pool_id;
+       src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+       dst_oloc.pool = dst_ci->i_layout.pool_id;
+       dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+       ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+                                     src_ci->i_layout.object_size,
+                                     &src_objnum, &src_objoff, &src_objlen);
+       ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+                                     dst_ci->i_layout.object_size,
+                                     &dst_objnum, &dst_objoff, &dst_objlen);
+       /* object-level offsets need to the same */
+       if (src_objoff != dst_objoff) {
+               ret = -EOPNOTSUPP;
+               goto out_caps;
+       }
+
+       /*
+        * Do a manual copy if the object offset isn't object aligned.
+        * 'src_objlen' contains the bytes left until the end of the object,
+        * starting at the src_off
+        */
+       if (src_objoff) {
+               /*
+                * we need to temporarily drop all caps as we'll be calling
+                * {read,write}_iter, which will get caps again.
+                */
+               put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+               ret = do_splice_direct(src_file, &src_off, dst_file,
+                                      &dst_off, src_objlen, flags);
+               if (ret < 0) {
+                       dout("do_splice_direct returned %d\n", err);
+                       goto out;
+               }
+               len -= ret;
+               err = get_rd_wr_caps(src_ci, (src_off + len),
+                                    &src_got, dst_ci,
+                                    (dst_off + len), &dst_got);
+               if (err < 0)
+                       goto out;
+               err = is_file_size_ok(src_inode, dst_inode,
+                                     src_off, dst_off, len);
+               if (err < 0)
+                       goto out_caps;
+       }
+       object_size = src_ci->i_layout.object_size;
+       while (len >= object_size) {
+               ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+                                             object_size, &src_objnum,
+                                             &src_objoff, &src_objlen);
+               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+                                             object_size, &dst_objnum,
+                                             &dst_objoff, &dst_objlen);
+               ceph_oid_init(&src_oid);
+               ceph_oid_printf(&src_oid, "%llx.%08llx",
+                               src_ci->i_vino.ino, src_objnum);
+               ceph_oid_init(&dst_oid);
+               ceph_oid_printf(&dst_oid, "%llx.%08llx",
+                               dst_ci->i_vino.ino, dst_objnum);
+               /* Do an object remote copy */
+               err = ceph_osdc_copy_from(
+                       &ceph_inode_to_client(src_inode)->client->osdc,
+                       src_ci->i_vino.snap, 0,
+                       &src_oid, &src_oloc,
+                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+                       CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+                       &dst_oid, &dst_oloc,
+                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+                       CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
+               if (err) {
+                       dout("ceph_osdc_copy_from returned %d\n", err);
+                       if (!ret)
+                               ret = err;
+                       goto out_caps;
+               }
+               len -= object_size;
+               src_off += object_size;
+               dst_off += object_size;
+               ret += object_size;
+       }
+
+       if (len)
+               /* We still need one final local copy */
+               do_final_copy = true;
+
+       file_update_time(dst_file);
+       if (endoff > size) {
+               int caps_flags = 0;
+
+               /* Let the MDS know about dst file size change */
+               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+                       caps_flags |= CHECK_CAPS_NODELAY;
+               if (ceph_inode_set_size(dst_inode, endoff))
+                       caps_flags |= CHECK_CAPS_AUTHONLY;
+               if (caps_flags)
+                       ceph_check_caps(dst_ci, caps_flags, NULL);
+       }
+       /* Mark Fw dirty */
+       spin_lock(&dst_ci->i_ceph_lock);
+       dst_ci->i_inline_version = CEPH_INLINE_NONE;
+       dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+       spin_unlock(&dst_ci->i_ceph_lock);
+       if (dirty)
+               __mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+       put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+
+       if (do_final_copy) {
+               err = do_splice_direct(src_file, &src_off, dst_file,
+                                      &dst_off, len, flags);
+               if (err < 0) {
+                       dout("do_splice_direct returned %d\n", err);
+                       goto out;
+               }
+               len -= err;
+               ret += err;
+       }
+
+out:
+       ceph_free_cap_flush(prealloc_cf);
+
+       return ret;
+}
+
 const struct file_operations ceph_file_fops = {
        .open = ceph_open,
        .release = ceph_release,
@@ -1844,5 +2102,5 @@ const struct file_operations ceph_file_fops = {
        .unlocked_ioctl = ceph_ioctl,
        .compat_ioctl   = ceph_ioctl,
        .fallocate      = ceph_fallocate,
+       .copy_file_range = ceph_copy_file_range,
 };
-