Merge tag 'ceph-for-4.17-rc1' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 10 Apr 2018 19:25:30 +0000 (12:25 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 10 Apr 2018 19:25:30 +0000 (12:25 -0700)
Pull ceph updates from Ilya Dryomov:
 "The big ticket items are:

   - support for rbd "fancy" striping (myself).

     The striping feature bit is now fully implemented, allowing mapping
     v2 images with non-default striping patterns. This completes
     support for --image-format 2.

   - CephFS quota support (Luis Henriques and Zheng Yan).

     This set is based on the new SnapRealm code in the upcoming v13.y.z
     ("Mimic") release. Quota handling will be rejected on older
     filesystems.

   - memory usage improvements in CephFS (Chengguang Xu).

     Directory specific bits have been split out of ceph_file_info and
     some effort went into improving cap reservation code to avoid OOM
     crashes.

  Also included a bunch of assorted fixes all over the place from
  Chengguang and others"

* tag 'ceph-for-4.17-rc1' of git://github.com/ceph/ceph-client: (67 commits)
  ceph: quota: report root dir quota usage in statfs
  ceph: quota: add counter for snaprealms with quota
  ceph: quota: cache inode pointer in ceph_snap_realm
  ceph: fix root quota realm check
  ceph: don't check quota for snap inode
  ceph: quota: update MDS when max_bytes is approaching
  ceph: quota: support for ceph.quota.max_bytes
  ceph: quota: don't allow cross-quota renames
  ceph: quota: support for ceph.quota.max_files
  ceph: quota: add initial infrastructure to support cephfs quotas
  rbd: remove VLA usage
  rbd: fix spelling mistake: "reregisteration" -> "reregistration"
  ceph: rename function drop_leases() to a more descriptive name
  ceph: fix invalid point dereference for error case in mdsc destroy
  ceph: return proper bool type to caller instead of pointer
  ceph: optimize memory usage
  ceph: optimize mds session register
  libceph, ceph: add __init attribution to init funcitons
  ceph: filter out used flags when printing unused open flags
  ceph: don't wait on writeback when there is no more dirty pages
  ...

35 files changed:
Documentation/filesystems/ceph.txt
drivers/block/rbd.c
fs/ceph/Makefile
fs/ceph/addr.c
fs/ceph/cache.c
fs/ceph/caps.c
fs/ceph/debugfs.c
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/ioctl.c
fs/ceph/locks.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/quota.c [new file with mode: 0644]
fs/ceph/snap.c
fs/ceph/super.c
fs/ceph/super.h
fs/ceph/xattr.c
include/linux/ceph/ceph_features.h
include/linux/ceph/ceph_fs.h
include/linux/ceph/libceph.h
include/linux/ceph/messenger.h
include/linux/ceph/osd_client.h
include/linux/ceph/osdmap.h
include/linux/ceph/striper.h [new file with mode: 0644]
net/ceph/Makefile
net/ceph/ceph_common.c
net/ceph/crypto.c
net/ceph/debugfs.c
net/ceph/messenger.c
net/ceph/mon_client.c
net/ceph/osd_client.c
net/ceph/osdmap.c
net/ceph/striper.c [new file with mode: 0644]

index 0b302a11718a43fd7ed44725390a5a4ceacb2229..d7f011ddc1500cdf8e705430c90ed60deb7a2ecc 100644 (file)
@@ -62,6 +62,18 @@ subdirectories, and a summation of all nested file sizes.  This makes
 the identification of large disk space consumers relatively quick, as
 no 'du' or similar recursive scan of the file system is required.
 
+Finally, Ceph also allows quotas to be set on any directory in the system.
+The quota can restrict the number of bytes or the number of files stored
+beneath that point in the directory hierarchy.  Quotas can be set using
+extended attributes 'ceph.quota.max_files' and 'ceph.quota.max_bytes', eg:
+
+ setfattr -n ceph.quota.max_bytes -v 100000000 /some/dir
+ getfattr -n ceph.quota.max_bytes /some/dir
+
+A limitation of the current quotas implementation is that it relies on the
+cooperation of the client mounting the file system to stop writers when a
+limit is reached.  A modified or adversarial client cannot be prevented
+from writing as much data as it needs.
 
 Mount Syntax
 ============
@@ -137,6 +149,10 @@ Mount Options
   noasyncreaddir
        Do not use the dcache as above for readdir.
 
+  noquotadf
+        Report overall filesystem usage in statfs instead of using the root
+        directory quota.
+
 More Information
 ================
 
index 1e03b04819c8652afba4332adaf7d7120f80755d..07dc5419bd63bec43af2de5da0a485ec99bab7cd 100644 (file)
@@ -32,6 +32,7 @@
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/cls_lock_client.h>
+#include <linux/ceph/striper.h>
 #include <linux/ceph/decode.h>
 #include <linux/parser.h>
 #include <linux/bsearch.h>
@@ -200,95 +201,81 @@ struct rbd_client {
 };
 
 struct rbd_img_request;
-typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
-
-#define        BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
-
-struct rbd_obj_request;
-typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 
 enum obj_request_type {
-       OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
+       OBJ_REQUEST_NODATA = 1,
+       OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
+       OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
+       OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
 };
 
 enum obj_operation_type {
+       OBJ_OP_READ = 1,
        OBJ_OP_WRITE,
-       OBJ_OP_READ,
        OBJ_OP_DISCARD,
 };
 
-enum obj_req_flags {
-       OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
-       OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
-       OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
-       OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
+/*
+ * Writes go through the following state machine to deal with
+ * layering:
+ *
+ *                       need copyup
+ * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
+ *        |     ^                              |
+ *        v     \------------------------------/
+ *      done
+ *        ^
+ *        |
+ * RBD_OBJ_WRITE_FLAT
+ *
+ * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
+ * there is a parent or not.
+ */
+enum rbd_obj_write_state {
+       RBD_OBJ_WRITE_FLAT = 1,
+       RBD_OBJ_WRITE_GUARD,
+       RBD_OBJ_WRITE_COPYUP,
 };
 
 struct rbd_obj_request {
-       u64                     object_no;
-       u64                     offset;         /* object start byte */
-       u64                     length;         /* bytes from offset */
-       unsigned long           flags;
-
-       /*
-        * An object request associated with an image will have its
-        * img_data flag set; a standalone object request will not.
-        *
-        * A standalone object request will have which == BAD_WHICH
-        * and a null obj_request pointer.
-        *
-        * An object request initiated in support of a layered image
-        * object (to check for its existence before a write) will
-        * have which == BAD_WHICH and a non-null obj_request pointer.
-        *
-        * Finally, an object request for rbd image data will have
-        * which != BAD_WHICH, and will have a non-null img_request
-        * pointer.  The value of which will be in the range
-        * 0..(img_request->obj_request_count-1).
-        */
+       struct ceph_object_extent ex;
        union {
-               struct rbd_obj_request  *obj_request;   /* STAT op */
-               struct {
-                       struct rbd_img_request  *img_request;
-                       u64                     img_offset;
-                       /* links for img_request->obj_requests list */
-                       struct list_head        links;
-               };
+               bool                    tried_parent;   /* for reads */
+               enum rbd_obj_write_state write_state;   /* for writes */
        };
-       u32                     which;          /* posn image request list */
 
-       enum obj_request_type   type;
+       struct rbd_img_request  *img_request;
+       struct ceph_file_extent *img_extents;
+       u32                     num_img_extents;
+
        union {
-               struct bio      *bio_list;
+               struct ceph_bio_iter    bio_pos;
                struct {
-                       struct page     **pages;
-                       u32             page_count;
+                       struct ceph_bvec_iter   bvec_pos;
+                       u32                     bvec_count;
+                       u32                     bvec_idx;
                };
        };
-       struct page             **copyup_pages;
-       u32                     copyup_page_count;
+       struct bio_vec          *copyup_bvecs;
+       u32                     copyup_bvec_count;
 
        struct ceph_osd_request *osd_req;
 
        u64                     xferred;        /* bytes transferred */
        int                     result;
 
-       rbd_obj_callback_t      callback;
-
        struct kref             kref;
 };
 
 enum img_req_flags {
-       IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
-       IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
 };
 
 struct rbd_img_request {
        struct rbd_device       *rbd_dev;
-       u64                     offset; /* starting image byte offset */
-       u64                     length; /* byte count from offset */
+       enum obj_operation_type op_type;
+       enum obj_request_type   data_type;
        unsigned long           flags;
        union {
                u64                     snap_id;        /* for reads */
@@ -298,26 +285,21 @@ struct rbd_img_request {
                struct request          *rq;            /* block request */
                struct rbd_obj_request  *obj_request;   /* obj req initiator */
        };
-       struct page             **copyup_pages;
-       u32                     copyup_page_count;
-       spinlock_t              completion_lock;/* protects next_completion */
-       u32                     next_completion;
-       rbd_img_callback_t      callback;
+       spinlock_t              completion_lock;
        u64                     xferred;/* aggregate bytes transferred */
        int                     result; /* first nonzero obj_request result */
 
+       struct list_head        object_extents; /* obj_req.ex structs */
        u32                     obj_request_count;
-       struct list_head        obj_requests;   /* rbd_obj_request structs */
+       u32                     pending_count;
 
        struct kref             kref;
 };
 
 #define for_each_obj_request(ireq, oreq) \
-       list_for_each_entry(oreq, &(ireq)->obj_requests, links)
-#define for_each_obj_request_from(ireq, oreq) \
-       list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
+       list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
 #define for_each_obj_request_safe(ireq, oreq, n) \
-       list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
+       list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
 
 enum rbd_watch_state {
        RBD_WATCH_STATE_UNREGISTERED,
@@ -433,8 +415,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock);
 static struct kmem_cache       *rbd_img_request_cache;
 static struct kmem_cache       *rbd_obj_request_cache;
 
-static struct bio_set          *rbd_bio_clone;
-
 static int rbd_major;
 static DEFINE_IDA(rbd_dev_id_ida);
 
@@ -447,8 +427,6 @@ static bool single_major = true;
 module_param(single_major, bool, S_IRUGO);
 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 
-static int rbd_img_request_submit(struct rbd_img_request *img_request);
-
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
@@ -458,7 +436,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
                                       size_t count);
 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
-static void rbd_spec_put(struct rbd_spec *spec);
 
 static int rbd_dev_id_to_minor(int dev_id)
 {
@@ -577,9 +554,6 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 #  define rbd_assert(expr)     ((void) 0)
 #endif /* !RBD_DEBUG */
 
-static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
-static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
-static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
@@ -856,26 +830,6 @@ static char* obj_op_name(enum obj_operation_type op_type)
        }
 }
 
-/*
- * Get a ceph client with specific addr and configuration, if one does
- * not exist create it.  Either way, ceph_opts is consumed by this
- * function.
- */
-static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
-{
-       struct rbd_client *rbdc;
-
-       mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
-       rbdc = rbd_client_find(ceph_opts);
-       if (rbdc)       /* using an existing client */
-               ceph_destroy_options(ceph_opts);
-       else
-               rbdc = rbd_client_create(ceph_opts);
-       mutex_unlock(&client_mutex);
-
-       return rbdc;
-}
-
 /*
  * Destroy ceph client
  *
@@ -904,6 +858,56 @@ static void rbd_put_client(struct rbd_client *rbdc)
                kref_put(&rbdc->kref, rbd_client_release);
 }
 
+static int wait_for_latest_osdmap(struct ceph_client *client)
+{
+       u64 newest_epoch;
+       int ret;
+
+       ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
+       if (ret)
+               return ret;
+
+       if (client->osdc.osdmap->epoch >= newest_epoch)
+               return 0;
+
+       ceph_osdc_maybe_request_map(&client->osdc);
+       return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
+                                    client->options->mount_timeout);
+}
+
+/*
+ * Get a ceph client with specific addr and configuration, if one does
+ * not exist create it.  Either way, ceph_opts is consumed by this
+ * function.
+ */
+static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
+{
+       struct rbd_client *rbdc;
+       int ret;
+
+       mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
+       rbdc = rbd_client_find(ceph_opts);
+       if (rbdc) {
+               ceph_destroy_options(ceph_opts);
+
+               /*
+                * Using an existing client.  Make sure ->pg_pools is up to
+                * date before we look up the pool id in do_rbd_add().
+                */
+               ret = wait_for_latest_osdmap(rbdc->client);
+               if (ret) {
+                       rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
+                       rbd_put_client(rbdc);
+                       rbdc = ERR_PTR(ret);
+               }
+       } else {
+               rbdc = rbd_client_create(ceph_opts);
+       }
+       mutex_unlock(&client_mutex);
+
+       return rbdc;
+}
+
 static bool rbd_image_format_valid(u32 image_format)
 {
        return image_format == 1 || image_format == 2;
@@ -1223,272 +1227,59 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
        rbd_dev->mapping.features = 0;
 }
 
-static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
-{
-       u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
-
-       return offset & (segment_size - 1);
-}
-
-static u64 rbd_segment_length(struct rbd_device *rbd_dev,
-                               u64 offset, u64 length)
-{
-       u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
-
-       offset &= segment_size - 1;
-
-       rbd_assert(length <= U64_MAX - offset);
-       if (offset + length > segment_size)
-               length = segment_size - offset;
-
-       return length;
-}
-
-/*
- * bio helpers
- */
-
-static void bio_chain_put(struct bio *chain)
-{
-       struct bio *tmp;
-
-       while (chain) {
-               tmp = chain;
-               chain = chain->bi_next;
-               bio_put(tmp);
-       }
-}
-
-/*
- * zeros a bio chain, starting at specific offset
- */
-static void zero_bio_chain(struct bio *chain, int start_ofs)
+static void zero_bvec(struct bio_vec *bv)
 {
-       struct bio_vec bv;
-       struct bvec_iter iter;
-       unsigned long flags;
        void *buf;
-       int pos = 0;
-
-       while (chain) {
-               bio_for_each_segment(bv, chain, iter) {
-                       if (pos + bv.bv_len > start_ofs) {
-                               int remainder = max(start_ofs - pos, 0);
-                               buf = bvec_kmap_irq(&bv, &flags);
-                               memset(buf + remainder, 0,
-                                      bv.bv_len - remainder);
-                               flush_dcache_page(bv.bv_page);
-                               bvec_kunmap_irq(buf, &flags);
-                       }
-                       pos += bv.bv_len;
-               }
+       unsigned long flags;
 
-               chain = chain->bi_next;
-       }
+       buf = bvec_kmap_irq(bv, &flags);
+       memset(buf, 0, bv->bv_len);
+       flush_dcache_page(bv->bv_page);
+       bvec_kunmap_irq(buf, &flags);
 }
 
-/*
- * similar to zero_bio_chain(), zeros data defined by a page array,
- * starting at the given byte offset from the start of the array and
- * continuing up to the given end offset.  The pages array is
- * assumed to be big enough to hold all bytes up to the end.
- */
-static void zero_pages(struct page **pages, u64 offset, u64 end)
+static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
 {
-       struct page **page = &pages[offset >> PAGE_SHIFT];
-
-       rbd_assert(end > offset);
-       rbd_assert(end - offset <= (u64)SIZE_MAX);
-       while (offset < end) {
-               size_t page_offset;
-               size_t length;
-               unsigned long flags;
-               void *kaddr;
-
-               page_offset = offset & ~PAGE_MASK;
-               length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
-               local_irq_save(flags);
-               kaddr = kmap_atomic(*page);
-               memset(kaddr + page_offset, 0, length);
-               flush_dcache_page(*page);
-               kunmap_atomic(kaddr);
-               local_irq_restore(flags);
+       struct ceph_bio_iter it = *bio_pos;
 
-               offset += length;
-               page++;
-       }
+       ceph_bio_iter_advance(&it, off);
+       ceph_bio_iter_advance_step(&it, bytes, ({
+               zero_bvec(&bv);
+       }));
 }
 
-/*
- * Clone a portion of a bio, starting at the given byte offset
- * and continuing for the number of bytes indicated.
- */
-static struct bio *bio_clone_range(struct bio *bio_src,
-                                       unsigned int offset,
-                                       unsigned int len,
-                                       gfp_t gfpmask)
+static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
 {
-       struct bio *bio;
-
-       bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
-       if (!bio)
-               return NULL;    /* ENOMEM */
+       struct ceph_bvec_iter it = *bvec_pos;
 
-       bio_advance(bio, offset);
-       bio->bi_iter.bi_size = len;
-
-       return bio;
+       ceph_bvec_iter_advance(&it, off);
+       ceph_bvec_iter_advance_step(&it, bytes, ({
+               zero_bvec(&bv);
+       }));
 }
 
 /*
- * Clone a portion of a bio chain, starting at the given byte offset
- * into the first bio in the source chain and continuing for the
- * number of bytes indicated.  The result is another bio chain of
- * exactly the given length, or a null pointer on error.
- *
- * The bio_src and offset parameters are both in-out.  On entry they
- * refer to the first source bio and the offset into that bio where
- * the start of data to be cloned is located.
+ * Zero a range in @obj_req data buffer defined by a bio (list) or
+ * (private) bio_vec array.
  *
- * On return, bio_src is updated to refer to the bio in the source
- * chain that contains first un-cloned byte, and *offset will
- * contain the offset of that byte within that bio.
- */
-static struct bio *bio_chain_clone_range(struct bio **bio_src,
-                                       unsigned int *offset,
-                                       unsigned int len,
-                                       gfp_t gfpmask)
-{
-       struct bio *bi = *bio_src;
-       unsigned int off = *offset;
-       struct bio *chain = NULL;
-       struct bio **end;
-
-       /* Build up a chain of clone bios up to the limit */
-
-       if (!bi || off >= bi->bi_iter.bi_size || !len)
-               return NULL;            /* Nothing to clone */
-
-       end = &chain;
-       while (len) {
-               unsigned int bi_size;
-               struct bio *bio;
-
-               if (!bi) {
-                       rbd_warn(NULL, "bio_chain exhausted with %u left", len);
-                       goto out_err;   /* EINVAL; ran out of bio's */
-               }
-               bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
-               bio = bio_clone_range(bi, off, bi_size, gfpmask);
-               if (!bio)
-                       goto out_err;   /* ENOMEM */
-
-               *end = bio;
-               end = &bio->bi_next;
-
-               off += bi_size;
-               if (off == bi->bi_iter.bi_size) {
-                       bi = bi->bi_next;
-                       off = 0;
-               }
-               len -= bi_size;
-       }
-       *bio_src = bi;
-       *offset = off;
-
-       return chain;
-out_err:
-       bio_chain_put(chain);
-
-       return NULL;
-}
-
-/*
- * The default/initial value for all object request flags is 0.  For
- * each flag, once its value is set to 1 it is never reset to 0
- * again.
+ * @off is relative to the start of the data buffer.
  */
-static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
-{
-       if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
-               struct rbd_device *rbd_dev;
-
-               rbd_dev = obj_request->img_request->rbd_dev;
-               rbd_warn(rbd_dev, "obj_request %p already marked img_data",
-                       obj_request);
-       }
-}
-
-static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
+static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
+                              u32 bytes)
 {
-       smp_mb();
-       return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
-}
-
-static void obj_request_done_set(struct rbd_obj_request *obj_request)
-{
-       if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
-               struct rbd_device *rbd_dev = NULL;
-
-               if (obj_request_img_data_test(obj_request))
-                       rbd_dev = obj_request->img_request->rbd_dev;
-               rbd_warn(rbd_dev, "obj_request %p already marked done",
-                       obj_request);
+       switch (obj_req->img_request->data_type) {
+       case OBJ_REQUEST_BIO:
+               zero_bios(&obj_req->bio_pos, off, bytes);
+               break;
+       case OBJ_REQUEST_BVECS:
+       case OBJ_REQUEST_OWN_BVECS:
+               zero_bvecs(&obj_req->bvec_pos, off, bytes);
+               break;
+       default:
+               rbd_assert(0);
        }
 }
 
-static bool obj_request_done_test(struct rbd_obj_request *obj_request)
-{
-       smp_mb();
-       return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
-}
-
-/*
- * This sets the KNOWN flag after (possibly) setting the EXISTS
- * flag.  The latter is set based on the "exists" value provided.
- *
- * Note that for our purposes once an object exists it never goes
- * away again.  It's possible that the response from two existence
- * checks are separated by the creation of the target object, and
- * the first ("doesn't exist") response arrives *after* the second
- * ("does exist").  In that case we ignore the second one.
- */
-static void obj_request_existence_set(struct rbd_obj_request *obj_request,
-                               bool exists)
-{
-       if (exists)
-               set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
-       set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
-       smp_mb();
-}
-
-static bool obj_request_known_test(struct rbd_obj_request *obj_request)
-{
-       smp_mb();
-       return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
-}
-
-static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
-{
-       smp_mb();
-       return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
-}
-
-static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
-{
-       struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
-
-       return obj_request->img_offset <
-           round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
-}
-
-static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
-{
-       dout("%s: obj %p (was %d)\n", __func__, obj_request,
-               kref_read(&obj_request->kref));
-       kref_get(&obj_request->kref);
-}
-
 static void rbd_obj_request_destroy(struct kref *kref);
 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
 {
@@ -1505,18 +1296,13 @@ static void rbd_img_request_get(struct rbd_img_request *img_request)
        kref_get(&img_request->kref);
 }
 
-static bool img_request_child_test(struct rbd_img_request *img_request);
-static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
 static void rbd_img_request_put(struct rbd_img_request *img_request)
 {
        rbd_assert(img_request != NULL);
        dout("%s: img %p (was %d)\n", __func__, img_request,
                kref_read(&img_request->kref));
-       if (img_request_child_test(img_request))
-               kref_put(&img_request->kref, rbd_parent_request_destroy);
-       else
-               kref_put(&img_request->kref, rbd_img_request_destroy);
+       kref_put(&img_request->kref, rbd_img_request_destroy);
 }
 
 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
@@ -1526,139 +1312,37 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
 
        /* Image request now owns object's original reference */
        obj_request->img_request = img_request;
-       obj_request->which = img_request->obj_request_count;
-       rbd_assert(!obj_request_img_data_test(obj_request));
-       obj_request_img_data_set(obj_request);
-       rbd_assert(obj_request->which != BAD_WHICH);
        img_request->obj_request_count++;
-       list_add_tail(&obj_request->links, &img_request->obj_requests);
-       dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
-               obj_request->which);
+       img_request->pending_count++;
+       dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
 }
 
 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
                                        struct rbd_obj_request *obj_request)
 {
-       rbd_assert(obj_request->which != BAD_WHICH);
-
-       dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
-               obj_request->which);
-       list_del(&obj_request->links);
+       dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
+       list_del(&obj_request->ex.oe_item);
        rbd_assert(img_request->obj_request_count > 0);
        img_request->obj_request_count--;
-       rbd_assert(obj_request->which == img_request->obj_request_count);
-       obj_request->which = BAD_WHICH;
-       rbd_assert(obj_request_img_data_test(obj_request));
        rbd_assert(obj_request->img_request == img_request);
-       obj_request->img_request = NULL;
-       obj_request->callback = NULL;
        rbd_obj_request_put(obj_request);
 }
 
-static bool obj_request_type_valid(enum obj_request_type type)
-{
-       switch (type) {
-       case OBJ_REQUEST_NODATA:
-       case OBJ_REQUEST_BIO:
-       case OBJ_REQUEST_PAGES:
-               return true;
-       default:
-               return false;
-       }
-}
-
-static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
-
 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
 {
        struct ceph_osd_request *osd_req = obj_request->osd_req;
 
        dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
-            obj_request, obj_request->object_no, obj_request->offset,
-            obj_request->length, osd_req);
-       if (obj_request_img_data_test(obj_request)) {
-               WARN_ON(obj_request->callback != rbd_img_obj_callback);
-               rbd_img_request_get(obj_request->img_request);
-       }
+            obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
+            obj_request->ex.oe_len, osd_req);
        ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
 }
 
-static void rbd_img_request_complete(struct rbd_img_request *img_request)
-{
-
-       dout("%s: img %p\n", __func__, img_request);
-
-       /*
-        * If no error occurred, compute the aggregate transfer
-        * count for the image request.  We could instead use
-        * atomic64_cmpxchg() to update it as each object request
-        * completes; not clear which way is better off hand.
-        */
-       if (!img_request->result) {
-               struct rbd_obj_request *obj_request;
-               u64 xferred = 0;
-
-               for_each_obj_request(img_request, obj_request)
-                       xferred += obj_request->xferred;
-               img_request->xferred = xferred;
-       }
-
-       if (img_request->callback)
-               img_request->callback(img_request);
-       else
-               rbd_img_request_put(img_request);
-}
-
 /*
  * The default/initial value for all image request flags is 0.  Each
  * is conditionally set to 1 at image request initialization time
  * and currently never change thereafter.
  */
-static void img_request_write_set(struct rbd_img_request *img_request)
-{
-       set_bit(IMG_REQ_WRITE, &img_request->flags);
-       smp_mb();
-}
-
-static bool img_request_write_test(struct rbd_img_request *img_request)
-{
-       smp_mb();
-       return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
-}
-
-/*
- * Set the discard flag when the img_request is an discard request
- */
-static void img_request_discard_set(struct rbd_img_request *img_request)
-{
-       set_bit(IMG_REQ_DISCARD, &img_request->flags);
-       smp_mb();
-}
-
-static bool img_request_discard_test(struct rbd_img_request *img_request)
-{
-       smp_mb();
-       return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
-}
-
-static void img_request_child_set(struct rbd_img_request *img_request)
-{
-       set_bit(IMG_REQ_CHILD, &img_request->flags);
-       smp_mb();
-}
-
-static void img_request_child_clear(struct rbd_img_request *img_request)
-{
-       clear_bit(IMG_REQ_CHILD, &img_request->flags);
-       smp_mb();
-}
-
-static bool img_request_child_test(struct rbd_img_request *img_request)
-{
-       smp_mb();
-       return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
-}
-
 static void img_request_layered_set(struct rbd_img_request *img_request)
 {
        set_bit(IMG_REQ_LAYERED, &img_request->flags);
@@ -1677,209 +1361,70 @@ static bool img_request_layered_test(struct rbd_img_request *img_request)
        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
 }
 
-static enum obj_operation_type
-rbd_img_request_op_type(struct rbd_img_request *img_request)
-{
-       if (img_request_write_test(img_request))
-               return OBJ_OP_WRITE;
-       else if (img_request_discard_test(img_request))
-               return OBJ_OP_DISCARD;
-       else
-               return OBJ_OP_READ;
-}
-
-static void
-rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
+static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
 {
-       u64 xferred = obj_request->xferred;
-       u64 length = obj_request->length;
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 
-       dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
-               obj_request, obj_request->img_request, obj_request->result,
-               xferred, length);
-       /*
-        * ENOENT means a hole in the image.  We zero-fill the entire
-        * length of the request.  A short read also implies zero-fill
-        * to the end of the request.  An error requires the whole
-        * length of the request to be reported finished with an error
-        * to the block layer.  In each case we update the xferred
-        * count to indicate the whole request was satisfied.
-        */
-       rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
-       if (obj_request->result == -ENOENT) {
-               if (obj_request->type == OBJ_REQUEST_BIO)
-                       zero_bio_chain(obj_request->bio_list, 0);
-               else
-                       zero_pages(obj_request->pages, 0, length);
-               obj_request->result = 0;
-       } else if (xferred < length && !obj_request->result) {
-               if (obj_request->type == OBJ_REQUEST_BIO)
-                       zero_bio_chain(obj_request->bio_list, xferred);
-               else
-                       zero_pages(obj_request->pages, xferred, length);
-       }
-       obj_request->xferred = length;
-       obj_request_done_set(obj_request);
+       return !obj_req->ex.oe_off &&
+              obj_req->ex.oe_len == rbd_dev->layout.object_size;
 }
 
-static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
+static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
 {
-       dout("%s: obj %p cb %p\n", __func__, obj_request,
-               obj_request->callback);
-       obj_request->callback(obj_request);
-}
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 
-static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
-{
-       obj_request->result = err;
-       obj_request->xferred = 0;
-       /*
-        * kludge - mirror rbd_obj_request_submit() to match a put in
-        * rbd_img_obj_callback()
-        */
-       if (obj_request_img_data_test(obj_request)) {
-               WARN_ON(obj_request->callback != rbd_img_obj_callback);
-               rbd_img_request_get(obj_request->img_request);
-       }
-       obj_request_done_set(obj_request);
-       rbd_obj_request_complete(obj_request);
+       return obj_req->ex.oe_off + obj_req->ex.oe_len ==
+                                       rbd_dev->layout.object_size;
 }
 
-static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
+static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
 {
-       struct rbd_img_request *img_request = NULL;
-       struct rbd_device *rbd_dev = NULL;
-       bool layered = false;
-
-       if (obj_request_img_data_test(obj_request)) {
-               img_request = obj_request->img_request;
-               layered = img_request && img_request_layered_test(img_request);
-               rbd_dev = img_request->rbd_dev;
-       }
-
-       dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
-               obj_request, img_request, obj_request->result,
-               obj_request->xferred, obj_request->length);
-       if (layered && obj_request->result == -ENOENT &&
-                       obj_request->img_offset < rbd_dev->parent_overlap)
-               rbd_img_parent_read(obj_request);
-       else if (img_request)
-               rbd_img_obj_request_read_callback(obj_request);
-       else
-               obj_request_done_set(obj_request);
+       return ceph_file_extents_bytes(obj_req->img_extents,
+                                      obj_req->num_img_extents);
 }
 
-static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
+static bool rbd_img_is_write(struct rbd_img_request *img_req)
 {
-       dout("%s: obj %p result %d %llu\n", __func__, obj_request,
-               obj_request->result, obj_request->length);
-       /*
-        * There is no such thing as a successful short write.  Set
-        * it to our originally-requested length.
-        */
-       obj_request->xferred = obj_request->length;
-       obj_request_done_set(obj_request);
-}
-
-static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
-{
-       dout("%s: obj %p result %d %llu\n", __func__, obj_request,
-               obj_request->result, obj_request->length);
-       /*
-        * There is no such thing as a successful short discard.  Set
-        * it to our originally-requested length.
-        */
-       obj_request->xferred = obj_request->length;
-       /* discarding a non-existent object is not a problem */
-       if (obj_request->result == -ENOENT)
-               obj_request->result = 0;
-       obj_request_done_set(obj_request);
+       switch (img_req->op_type) {
+       case OBJ_OP_READ:
+               return false;
+       case OBJ_OP_WRITE:
+       case OBJ_OP_DISCARD:
+               return true;
+       default:
+               rbd_assert(0);
+       }
 }
 
-/*
- * For a simple stat call there's nothing to do.  We'll do more if
- * this is part of a write sequence for a layered image.
- */
-static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
-{
-       dout("%s: obj %p\n", __func__, obj_request);
-       obj_request_done_set(obj_request);
-}
-
-static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
-{
-       dout("%s: obj %p\n", __func__, obj_request);
-
-       if (obj_request_img_data_test(obj_request))
-               rbd_osd_copyup_callback(obj_request);
-       else
-               obj_request_done_set(obj_request);
-}
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
 
 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
 {
-       struct rbd_obj_request *obj_request = osd_req->r_priv;
-       u16 opcode;
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
 
-       dout("%s: osd_req %p\n", __func__, osd_req);
-       rbd_assert(osd_req == obj_request->osd_req);
-       if (obj_request_img_data_test(obj_request)) {
-               rbd_assert(obj_request->img_request);
-               rbd_assert(obj_request->which != BAD_WHICH);
-       } else {
-               rbd_assert(obj_request->which == BAD_WHICH);
-       }
+       dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
+            osd_req->r_result, obj_req);
+       rbd_assert(osd_req == obj_req->osd_req);
 
-       if (osd_req->r_result < 0)
-               obj_request->result = osd_req->r_result;
-
-       /*
-        * We support a 64-bit length, but ultimately it has to be
-        * passed to the block layer, which just supports a 32-bit
-        * length field.
-        */
-       obj_request->xferred = osd_req->r_ops[0].outdata_len;
-       rbd_assert(obj_request->xferred < (u64)UINT_MAX);
-
-       opcode = osd_req->r_ops[0].op;
-       switch (opcode) {
-       case CEPH_OSD_OP_READ:
-               rbd_osd_read_callback(obj_request);
-               break;
-       case CEPH_OSD_OP_SETALLOCHINT:
-               rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
-                          osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
-               /* fall through */
-       case CEPH_OSD_OP_WRITE:
-       case CEPH_OSD_OP_WRITEFULL:
-               rbd_osd_write_callback(obj_request);
-               break;
-       case CEPH_OSD_OP_STAT:
-               rbd_osd_stat_callback(obj_request);
-               break;
-       case CEPH_OSD_OP_DELETE:
-       case CEPH_OSD_OP_TRUNCATE:
-       case CEPH_OSD_OP_ZERO:
-               rbd_osd_discard_callback(obj_request);
-               break;
-       case CEPH_OSD_OP_CALL:
-               rbd_osd_call_callback(obj_request);
-               break;
-       default:
-               rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
-                        obj_request->object_no, opcode);
-               break;
-       }
+       obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
+       if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
+               obj_req->xferred = osd_req->r_result;
+       else
+               /*
+                * Writes aren't allowed to return a data payload.  In some
+                * guarded write cases (e.g. stat + zero on an empty object)
+                * a stat response makes it through, but we don't care.
+                */
+               obj_req->xferred = 0;
 
-       if (obj_request_done_test(obj_request))
-               rbd_obj_request_complete(obj_request);
+       rbd_obj_handle_request(obj_req);
 }
 
 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
 {
        struct ceph_osd_request *osd_req = obj_request->osd_req;
 
-       rbd_assert(obj_request_img_data_test(obj_request));
+       osd_req->r_flags = CEPH_OSD_FLAG_READ;
        osd_req->r_snapid = obj_request->img_request->snap_id;
 }
 
@@ -1887,32 +1432,33 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
 {
        struct ceph_osd_request *osd_req = obj_request->osd_req;
 
+       osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
        ktime_get_real_ts(&osd_req->r_mtime);
-       osd_req->r_data_offset = obj_request->offset;
+       osd_req->r_data_offset = obj_request->ex.oe_off;
 }
 
 static struct ceph_osd_request *
-__rbd_osd_req_create(struct rbd_device *rbd_dev,
-                    struct ceph_snap_context *snapc,
-                    int num_ops, unsigned int flags,
-                    struct rbd_obj_request *obj_request)
+rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
 {
+       struct rbd_img_request *img_req = obj_req->img_request;
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct ceph_osd_request *req;
        const char *name_format = rbd_dev->image_format == 1 ?
                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
 
-       req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
+       req = ceph_osdc_alloc_request(osdc,
+                       (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
+                       num_ops, false, GFP_NOIO);
        if (!req)
                return NULL;
 
-       req->r_flags = flags;
        req->r_callback = rbd_osd_req_callback;
-       req->r_priv = obj_request;
+       req->r_priv = obj_req;
 
        req->r_base_oloc.pool = rbd_dev->layout.pool_id;
        if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
-                       rbd_dev->header.object_prefix, obj_request->object_no))
+                       rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
                goto err_req;
 
        if (ceph_osdc_alloc_messages(req, GFP_NOIO))
@@ -1925,83 +1471,20 @@ err_req:
        return NULL;
 }
 
-/*
- * Create an osd request.  A read request has one osd op (read).
- * A write request has either one (watch) or two (hint+write) osd ops.
- * (All rbd data writes are prefixed with an allocation hint op, but
- * technically osd watch is a write request, hence this distinction.)
- */
-static struct ceph_osd_request *rbd_osd_req_create(
-                                       struct rbd_device *rbd_dev,
-                                       enum obj_operation_type op_type,
-                                       unsigned int num_ops,
-                                       struct rbd_obj_request *obj_request)
-{
-       struct ceph_snap_context *snapc = NULL;
-
-       if (obj_request_img_data_test(obj_request) &&
-               (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
-               struct rbd_img_request *img_request = obj_request->img_request;
-               if (op_type == OBJ_OP_WRITE) {
-                       rbd_assert(img_request_write_test(img_request));
-               } else {
-                       rbd_assert(img_request_discard_test(img_request));
-               }
-               snapc = img_request->snapc;
-       }
-
-       rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
-
-       return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
-           (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
-           CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
-}
-
-/*
- * Create a copyup osd request based on the information in the object
- * request supplied.  A copyup request has two or three osd ops, a
- * copyup method call, potentially a hint op, and a write or truncate
- * or zero op.
- */
-static struct ceph_osd_request *
-rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
-{
-       struct rbd_img_request *img_request;
-       int num_osd_ops = 3;
-
-       rbd_assert(obj_request_img_data_test(obj_request));
-       img_request = obj_request->img_request;
-       rbd_assert(img_request);
-       rbd_assert(img_request_write_test(img_request) ||
-                       img_request_discard_test(img_request));
-
-       if (img_request_discard_test(img_request))
-               num_osd_ops = 2;
-
-       return __rbd_osd_req_create(img_request->rbd_dev,
-                                   img_request->snapc, num_osd_ops,
-                                   CEPH_OSD_FLAG_WRITE, obj_request);
-}
-
 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
 {
        ceph_osdc_put_request(osd_req);
 }
 
-static struct rbd_obj_request *
-rbd_obj_request_create(enum obj_request_type type)
+static struct rbd_obj_request *rbd_obj_request_create(void)
 {
        struct rbd_obj_request *obj_request;
 
-       rbd_assert(obj_request_type_valid(type));
-
        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
        if (!obj_request)
                return NULL;
 
-       obj_request->which = BAD_WHICH;
-       obj_request->type = type;
-       INIT_LIST_HEAD(&obj_request->links);
+       ceph_object_extent_init(&obj_request->ex);
        kref_init(&obj_request->kref);
 
        dout("%s %p\n", __func__, obj_request);
@@ -2011,32 +1494,34 @@ rbd_obj_request_create(enum obj_request_type type)
 static void rbd_obj_request_destroy(struct kref *kref)
 {
        struct rbd_obj_request *obj_request;
+       u32 i;
 
        obj_request = container_of(kref, struct rbd_obj_request, kref);
 
        dout("%s: obj %p\n", __func__, obj_request);
 
-       rbd_assert(obj_request->img_request == NULL);
-       rbd_assert(obj_request->which == BAD_WHICH);
-
        if (obj_request->osd_req)
                rbd_osd_req_destroy(obj_request->osd_req);
 
-       rbd_assert(obj_request_type_valid(obj_request->type));
-       switch (obj_request->type) {
+       switch (obj_request->img_request->data_type) {
        case OBJ_REQUEST_NODATA:
-               break;          /* Nothing to do */
        case OBJ_REQUEST_BIO:
-               if (obj_request->bio_list)
-                       bio_chain_put(obj_request->bio_list);
-               break;
-       case OBJ_REQUEST_PAGES:
-               /* img_data requests don't own their page array */
-               if (obj_request->pages &&
-                   !obj_request_img_data_test(obj_request))
-                       ceph_release_page_vector(obj_request->pages,
-                                               obj_request->page_count);
+       case OBJ_REQUEST_BVECS:
+               break;          /* Nothing to do */
+       case OBJ_REQUEST_OWN_BVECS:
+               kfree(obj_request->bvec_pos.bvecs);
                break;
+       default:
+               rbd_assert(0);
+       }
+
+       kfree(obj_request->img_extents);
+       if (obj_request->copyup_bvecs) {
+               for (i = 0; i < obj_request->copyup_bvec_count; i++) {
+                       if (obj_request->copyup_bvecs[i].bv_page)
+                               __free_page(obj_request->copyup_bvecs[i].bv_page);
+               }
+               kfree(obj_request->copyup_bvecs);
        }
 
        kmem_cache_free(rbd_obj_request_cache, obj_request);
@@ -2111,7 +1596,6 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
  */
 static struct rbd_img_request *rbd_img_request_create(
                                        struct rbd_device *rbd_dev,
-                                       u64 offset, u64 length,
                                        enum obj_operation_type op_type,
                                        struct ceph_snap_context *snapc)
 {
@@ -2122,27 +1606,21 @@ static struct rbd_img_request *rbd_img_request_create(
                return NULL;
 
        img_request->rbd_dev = rbd_dev;
-       img_request->offset = offset;
-       img_request->length = length;
-       if (op_type == OBJ_OP_DISCARD) {
-               img_request_discard_set(img_request);
-               img_request->snapc = snapc;
-       } else if (op_type == OBJ_OP_WRITE) {
-               img_request_write_set(img_request);
-               img_request->snapc = snapc;
-       } else {
+       img_request->op_type = op_type;
+       if (!rbd_img_is_write(img_request))
                img_request->snap_id = rbd_dev->spec->snap_id;
-       }
+       else
+               img_request->snapc = snapc;
+
        if (rbd_dev_parent_get(rbd_dev))
                img_request_layered_set(img_request);
 
        spin_lock_init(&img_request->completion_lock);
-       INIT_LIST_HEAD(&img_request->obj_requests);
+       INIT_LIST_HEAD(&img_request->object_extents);
        kref_init(&img_request->kref);
 
-       dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
-               obj_op_name(op_type), offset, length, img_request);
-
+       dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
+            obj_op_name(op_type), img_request);
        return img_request;
 }
 
@@ -2165,829 +1643,934 @@ static void rbd_img_request_destroy(struct kref *kref)
                rbd_dev_parent_put(img_request->rbd_dev);
        }
 
-       if (img_request_write_test(img_request) ||
-               img_request_discard_test(img_request))
+       if (rbd_img_is_write(img_request))
                ceph_put_snap_context(img_request->snapc);
 
        kmem_cache_free(rbd_img_request_cache, img_request);
 }
 
-static struct rbd_img_request *rbd_parent_request_create(
-                                       struct rbd_obj_request *obj_request,
-                                       u64 img_offset, u64 length)
+static void prune_extents(struct ceph_file_extent *img_extents,
+                         u32 *num_img_extents, u64 overlap)
 {
-       struct rbd_img_request *parent_request;
-       struct rbd_device *rbd_dev;
+       u32 cnt = *num_img_extents;
 
-       rbd_assert(obj_request->img_request);
-       rbd_dev = obj_request->img_request->rbd_dev;
+       /* drop extents completely beyond the overlap */
+       while (cnt && img_extents[cnt - 1].fe_off >= overlap)
+               cnt--;
 
-       parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
-                                               length, OBJ_OP_READ, NULL);
-       if (!parent_request)
-               return NULL;
+       if (cnt) {
+               struct ceph_file_extent *ex = &img_extents[cnt - 1];
 
-       img_request_child_set(parent_request);
-       rbd_obj_request_get(obj_request);
-       parent_request->obj_request = obj_request;
+               /* trim final overlapping extent */
+               if (ex->fe_off + ex->fe_len > overlap)
+                       ex->fe_len = overlap - ex->fe_off;
+       }
 
-       return parent_request;
+       *num_img_extents = cnt;
 }
 
-static void rbd_parent_request_destroy(struct kref *kref)
+/*
+ * Determine the byte range(s) covered by either just the object extent
+ * or the entire object in the parent image.
+ */
+static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
+                                   bool entire)
 {
-       struct rbd_img_request *parent_request;
-       struct rbd_obj_request *orig_request;
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       int ret;
 
-       parent_request = container_of(kref, struct rbd_img_request, kref);
-       orig_request = parent_request->obj_request;
+       if (!rbd_dev->parent_overlap)
+               return 0;
 
-       parent_request->obj_request = NULL;
-       rbd_obj_request_put(orig_request);
-       img_request_child_clear(parent_request);
+       ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
+                                 entire ? 0 : obj_req->ex.oe_off,
+                                 entire ? rbd_dev->layout.object_size :
+                                                       obj_req->ex.oe_len,
+                                 &obj_req->img_extents,
+                                 &obj_req->num_img_extents);
+       if (ret)
+               return ret;
 
-       rbd_img_request_destroy(kref);
+       prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
+                     rbd_dev->parent_overlap);
+       return 0;
 }
 
-static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
+static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
 {
-       struct rbd_img_request *img_request;
-       unsigned int xferred;
-       int result;
-       bool more;
-
-       rbd_assert(obj_request_img_data_test(obj_request));
-       img_request = obj_request->img_request;
-
-       rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
-       xferred = (unsigned int)obj_request->xferred;
-       result = obj_request->result;
-       if (result) {
-               struct rbd_device *rbd_dev = img_request->rbd_dev;
-               enum obj_operation_type op_type;
-
-               if (img_request_discard_test(img_request))
-                       op_type = OBJ_OP_DISCARD;
-               else if (img_request_write_test(img_request))
-                       op_type = OBJ_OP_WRITE;
-               else
-                       op_type = OBJ_OP_READ;
-
-               rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
-                       obj_op_name(op_type), obj_request->length,
-                       obj_request->img_offset, obj_request->offset);
-               rbd_warn(rbd_dev, "  result %d xferred %x",
-                       result, xferred);
-               if (!img_request->result)
-                       img_request->result = result;
-               /*
-                * Need to end I/O on the entire obj_request worth of
-                * bytes in case of error.
-                */
-               xferred = obj_request->length;
+       switch (obj_req->img_request->data_type) {
+       case OBJ_REQUEST_BIO:
+               osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
+                                              &obj_req->bio_pos,
+                                              obj_req->ex.oe_len);
+               break;
+       case OBJ_REQUEST_BVECS:
+       case OBJ_REQUEST_OWN_BVECS:
+               rbd_assert(obj_req->bvec_pos.iter.bi_size ==
+                                                       obj_req->ex.oe_len);
+               rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
+               osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
+                                                   &obj_req->bvec_pos);
+               break;
+       default:
+               rbd_assert(0);
        }
+}
 
-       if (img_request_child_test(img_request)) {
-               rbd_assert(img_request->obj_request != NULL);
-               more = obj_request->which < img_request->obj_request_count - 1;
-       } else {
-               blk_status_t status = errno_to_blk_status(result);
+static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
+{
+       obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
 
-               rbd_assert(img_request->rq != NULL);
+       osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
+                              obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
+       rbd_osd_req_setup_data(obj_req, 0);
 
-               more = blk_update_request(img_request->rq, status, xferred);
-               if (!more)
-                       __blk_mq_end_request(img_request->rq, status);
-       }
+       rbd_osd_req_format_read(obj_req);
+       return 0;
+}
+
+static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
+                               unsigned int which)
+{
+       struct page **pages;
 
-       return more;
+       /*
+        * The response data for a STAT call consists of:
+        *     le64 length;
+        *     struct {
+        *         le32 tv_sec;
+        *         le32 tv_nsec;
+        *     } mtime;
+        */
+       pages = ceph_alloc_page_vector(1, GFP_NOIO);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
+       osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
+                                    8 + sizeof(struct ceph_timespec),
+                                    0, false, true);
+       return 0;
 }
 
-static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
+static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
+                                 unsigned int which)
 {
-       struct rbd_img_request *img_request;
-       u32 which = obj_request->which;
-       bool more = true;
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u16 opcode;
 
-       rbd_assert(obj_request_img_data_test(obj_request));
-       img_request = obj_request->img_request;
+       osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
+                                  rbd_dev->layout.object_size,
+                                  rbd_dev->layout.object_size);
 
-       dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
-       rbd_assert(img_request != NULL);
-       rbd_assert(img_request->obj_request_count > 0);
-       rbd_assert(which != BAD_WHICH);
-       rbd_assert(which < img_request->obj_request_count);
+       if (rbd_obj_is_entire(obj_req))
+               opcode = CEPH_OSD_OP_WRITEFULL;
+       else
+               opcode = CEPH_OSD_OP_WRITE;
 
-       spin_lock_irq(&img_request->completion_lock);
-       if (which != img_request->next_completion)
-               goto out;
+       osd_req_op_extent_init(obj_req->osd_req, which, opcode,
+                              obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
+       rbd_osd_req_setup_data(obj_req, which++);
 
-       for_each_obj_request_from(img_request, obj_request) {
-               rbd_assert(more);
-               rbd_assert(which < img_request->obj_request_count);
+       rbd_assert(which == obj_req->osd_req->r_num_ops);
+       rbd_osd_req_format_write(obj_req);
+}
 
-               if (!obj_request_done_test(obj_request))
-                       break;
-               more = rbd_img_obj_end_request(obj_request);
-               which++;
+static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
+{
+       unsigned int num_osd_ops, which = 0;
+       int ret;
+
+       /* reverse map the entire object onto the parent */
+       ret = rbd_obj_calc_img_extents(obj_req, true);
+       if (ret)
+               return ret;
+
+       if (obj_req->num_img_extents) {
+               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+               num_osd_ops = 3; /* stat + setallochint + write/writefull */
+       } else {
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+               num_osd_ops = 2; /* setallochint + write/writefull */
        }
 
-       rbd_assert(more ^ (which == img_request->obj_request_count));
-       img_request->next_completion = which;
-out:
-       spin_unlock_irq(&img_request->completion_lock);
-       rbd_img_request_put(img_request);
+       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
 
-       if (!more)
-               rbd_img_request_complete(img_request);
+       if (obj_req->num_img_extents) {
+               ret = __rbd_obj_setup_stat(obj_req, which++);
+               if (ret)
+                       return ret;
+       }
+
+       __rbd_obj_setup_write(obj_req, which);
+       return 0;
 }
 
-/*
- * Add individual osd ops to the given ceph_osd_request and prepare
- * them for submission. num_ops is the current number of
- * osd operations already to the object request.
- */
-static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
-                               struct ceph_osd_request *osd_request,
-                               enum obj_operation_type op_type,
-                               unsigned int num_ops)
-{
-       struct rbd_img_request *img_request = obj_request->img_request;
-       struct rbd_device *rbd_dev = img_request->rbd_dev;
-       u64 object_size = rbd_obj_bytes(&rbd_dev->header);
-       u64 offset = obj_request->offset;
-       u64 length = obj_request->length;
-       u64 img_end;
+static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
+                                   unsigned int which)
+{
        u16 opcode;
 
-       if (op_type == OBJ_OP_DISCARD) {
-               if (!offset && length == object_size &&
-                   (!img_request_layered_test(img_request) ||
-                    !obj_request_overlaps_parent(obj_request))) {
-                       opcode = CEPH_OSD_OP_DELETE;
-               } else if ((offset + length == object_size)) {
+       if (rbd_obj_is_entire(obj_req)) {
+               if (obj_req->num_img_extents) {
+                       osd_req_op_init(obj_req->osd_req, which++,
+                                       CEPH_OSD_OP_CREATE, 0);
                        opcode = CEPH_OSD_OP_TRUNCATE;
                } else {
-                       down_read(&rbd_dev->header_rwsem);
-                       img_end = rbd_dev->header.image_size;
-                       up_read(&rbd_dev->header_rwsem);
-
-                       if (obj_request->img_offset + length == img_end)
-                               opcode = CEPH_OSD_OP_TRUNCATE;
-                       else
-                               opcode = CEPH_OSD_OP_ZERO;
+                       osd_req_op_init(obj_req->osd_req, which++,
+                                       CEPH_OSD_OP_DELETE, 0);
+                       opcode = 0;
                }
-       } else if (op_type == OBJ_OP_WRITE) {
-               if (!offset && length == object_size)
-                       opcode = CEPH_OSD_OP_WRITEFULL;
-               else
-                       opcode = CEPH_OSD_OP_WRITE;
-               osd_req_op_alloc_hint_init(osd_request, num_ops,
-                                       object_size, object_size);
-               num_ops++;
+       } else if (rbd_obj_is_tail(obj_req)) {
+               opcode = CEPH_OSD_OP_TRUNCATE;
        } else {
-               opcode = CEPH_OSD_OP_READ;
+               opcode = CEPH_OSD_OP_ZERO;
        }
 
-       if (opcode == CEPH_OSD_OP_DELETE)
-               osd_req_op_init(osd_request, num_ops, opcode, 0);
-       else
-               osd_req_op_extent_init(osd_request, num_ops, opcode,
-                                      offset, length, 0, 0);
-
-       if (obj_request->type == OBJ_REQUEST_BIO)
-               osd_req_op_extent_osd_data_bio(osd_request, num_ops,
-                                       obj_request->bio_list, length);
-       else if (obj_request->type == OBJ_REQUEST_PAGES)
-               osd_req_op_extent_osd_data_pages(osd_request, num_ops,
-                                       obj_request->pages, length,
-                                       offset & ~PAGE_MASK, false, false);
-
-       /* Discards are also writes */
-       if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
-               rbd_osd_req_format_write(obj_request);
-       else
-               rbd_osd_req_format_read(obj_request);
+       if (opcode)
+               osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
+                                      obj_req->ex.oe_off, obj_req->ex.oe_len,
+                                      0, 0);
+
+       rbd_assert(which == obj_req->osd_req->r_num_ops);
+       rbd_osd_req_format_write(obj_req);
 }
 
-/*
- * Split up an image request into one or more object requests, each
- * to a different object.  The "type" parameter indicates whether
- * "data_desc" is the pointer to the head of a list of bio
- * structures, or the base of a page array.  In either case this
- * function assumes data_desc describes memory sufficient to hold
- * all data described by the image request.
- */
-static int rbd_img_request_fill(struct rbd_img_request *img_request,
-                                       enum obj_request_type type,
-                                       void *data_desc)
+static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
 {
-       struct rbd_device *rbd_dev = img_request->rbd_dev;
-       struct rbd_obj_request *obj_request = NULL;
-       struct rbd_obj_request *next_obj_request;
-       struct bio *bio_list = NULL;
-       unsigned int bio_offset = 0;
-       struct page **pages = NULL;
-       enum obj_operation_type op_type;
-       u64 img_offset;
-       u64 resid;
-
-       dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
-               (int)type, data_desc);
+       unsigned int num_osd_ops, which = 0;
+       int ret;
 
-       img_offset = img_request->offset;
-       resid = img_request->length;
-       rbd_assert(resid > 0);
-       op_type = rbd_img_request_op_type(img_request);
+       /* reverse map the entire object onto the parent */
+       ret = rbd_obj_calc_img_extents(obj_req, true);
+       if (ret)
+               return ret;
 
-       if (type == OBJ_REQUEST_BIO) {
-               bio_list = data_desc;
-               rbd_assert(img_offset ==
-                          bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
-       } else if (type == OBJ_REQUEST_PAGES) {
-               pages = data_desc;
+       if (rbd_obj_is_entire(obj_req)) {
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+               if (obj_req->num_img_extents)
+                       num_osd_ops = 2; /* create + truncate */
+               else
+                       num_osd_ops = 1; /* delete */
+       } else {
+               if (obj_req->num_img_extents) {
+                       obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+                       num_osd_ops = 2; /* stat + truncate/zero */
+               } else {
+                       obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+                       num_osd_ops = 1; /* truncate/zero */
+               }
        }
 
-       while (resid) {
-               struct ceph_osd_request *osd_req;
-               u64 object_no = img_offset >> rbd_dev->header.obj_order;
-               u64 offset = rbd_segment_offset(rbd_dev, img_offset);
-               u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
-
-               obj_request = rbd_obj_request_create(type);
-               if (!obj_request)
-                       goto out_unwind;
-
-               obj_request->object_no = object_no;
-               obj_request->offset = offset;
-               obj_request->length = length;
-
-               /*
-                * set obj_request->img_request before creating the
-                * osd_request so that it gets the right snapc
-                */
-               rbd_img_obj_request_add(img_request, obj_request);
-
-               if (type == OBJ_REQUEST_BIO) {
-                       unsigned int clone_size;
-
-                       rbd_assert(length <= (u64)UINT_MAX);
-                       clone_size = (unsigned int)length;
-                       obj_request->bio_list =
-                                       bio_chain_clone_range(&bio_list,
-                                                               &bio_offset,
-                                                               clone_size,
-                                                               GFP_NOIO);
-                       if (!obj_request->bio_list)
-                               goto out_unwind;
-               } else if (type == OBJ_REQUEST_PAGES) {
-                       unsigned int page_count;
-
-                       obj_request->pages = pages;
-                       page_count = (u32)calc_pages_for(offset, length);
-                       obj_request->page_count = page_count;
-                       if ((offset + length) & ~PAGE_MASK)
-                               page_count--;   /* more on last page */
-                       pages += page_count;
-               }
+       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
 
-               osd_req = rbd_osd_req_create(rbd_dev, op_type,
-                                       (op_type == OBJ_OP_WRITE) ? 2 : 1,
-                                       obj_request);
-               if (!osd_req)
-                       goto out_unwind;
+       if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
+               ret = __rbd_obj_setup_stat(obj_req, which++);
+               if (ret)
+                       return ret;
+       }
 
-               obj_request->osd_req = osd_req;
-               obj_request->callback = rbd_img_obj_callback;
-               obj_request->img_offset = img_offset;
+       __rbd_obj_setup_discard(obj_req, which);
+       return 0;
+}
 
-               rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
+/*
+ * For each object request in @img_req, allocate an OSD request, add
+ * individual OSD ops and prepare them for submission.  The number of
+ * OSD ops depends on op_type and the overlap point (if any).
+ */
+static int __rbd_img_fill_request(struct rbd_img_request *img_req)
+{
+       struct rbd_obj_request *obj_req;
+       int ret;
 
-               img_offset += length;
-               resid -= length;
+       for_each_obj_request(img_req, obj_req) {
+               switch (img_req->op_type) {
+               case OBJ_OP_READ:
+                       ret = rbd_obj_setup_read(obj_req);
+                       break;
+               case OBJ_OP_WRITE:
+                       ret = rbd_obj_setup_write(obj_req);
+                       break;
+               case OBJ_OP_DISCARD:
+                       ret = rbd_obj_setup_discard(obj_req);
+                       break;
+               default:
+                       rbd_assert(0);
+               }
+               if (ret)
+                       return ret;
        }
 
        return 0;
+}
 
-out_unwind:
-       for_each_obj_request_safe(img_request, obj_request, next_obj_request)
-               rbd_img_obj_request_del(img_request, obj_request);
+union rbd_img_fill_iter {
+       struct ceph_bio_iter    bio_iter;
+       struct ceph_bvec_iter   bvec_iter;
+};
 
-       return -ENOMEM;
-}
+struct rbd_img_fill_ctx {
+       enum obj_request_type   pos_type;
+       union rbd_img_fill_iter *pos;
+       union rbd_img_fill_iter iter;
+       ceph_object_extent_fn_t set_pos_fn;
+       ceph_object_extent_fn_t count_fn;
+       ceph_object_extent_fn_t copy_fn;
+};
 
-static void
-rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
+static struct ceph_object_extent *alloc_object_extent(void *arg)
 {
-       struct rbd_img_request *img_request;
-       struct rbd_device *rbd_dev;
-       struct page **pages;
-       u32 page_count;
+       struct rbd_img_request *img_req = arg;
+       struct rbd_obj_request *obj_req;
 
-       dout("%s: obj %p\n", __func__, obj_request);
+       obj_req = rbd_obj_request_create();
+       if (!obj_req)
+               return NULL;
 
-       rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
-               obj_request->type == OBJ_REQUEST_NODATA);
-       rbd_assert(obj_request_img_data_test(obj_request));
-       img_request = obj_request->img_request;
-       rbd_assert(img_request);
+       rbd_img_obj_request_add(img_req, obj_req);
+       return &obj_req->ex;
+}
 
-       rbd_dev = img_request->rbd_dev;
-       rbd_assert(rbd_dev);
+/*
+ * While su != os && sc == 1 is technically not fancy (it's the same
+ * layout as su == os && sc == 1), we can't use the nocopy path for it
+ * because ->set_pos_fn() should be called only once per object.
+ * ceph_file_to_extents() invokes action_fn once per stripe unit, so
+ * treat su != os && sc == 1 as fancy.
+ */
+static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
+{
+       return l->stripe_unit != l->object_size;
+}
 
-       pages = obj_request->copyup_pages;
-       rbd_assert(pages != NULL);
-       obj_request->copyup_pages = NULL;
-       page_count = obj_request->copyup_page_count;
-       rbd_assert(page_count);
-       obj_request->copyup_page_count = 0;
-       ceph_release_page_vector(pages, page_count);
+static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
+                                      struct ceph_file_extent *img_extents,
+                                      u32 num_img_extents,
+                                      struct rbd_img_fill_ctx *fctx)
+{
+       u32 i;
+       int ret;
+
+       img_req->data_type = fctx->pos_type;
 
        /*
-        * We want the transfer count to reflect the size of the
-        * original write request.  There is no such thing as a
-        * successful short write, so if the request was successful
-        * we can just set it to the originally-requested length.
+        * Create object requests and set each object request's starting
+        * position in the provided bio (list) or bio_vec array.
         */
-       if (!obj_request->result)
-               obj_request->xferred = obj_request->length;
+       fctx->iter = *fctx->pos;
+       for (i = 0; i < num_img_extents; i++) {
+               ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
+                                          img_extents[i].fe_off,
+                                          img_extents[i].fe_len,
+                                          &img_req->object_extents,
+                                          alloc_object_extent, img_req,
+                                          fctx->set_pos_fn, &fctx->iter);
+               if (ret)
+                       return ret;
+       }
 
-       obj_request_done_set(obj_request);
+       return __rbd_img_fill_request(img_req);
 }
 
-static void
-rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
+/*
+ * Map a list of image extents to a list of object extents, create the
+ * corresponding object requests (normally each to a different object,
+ * but not always) and add them to @img_req.  For each object request,
+ * set up its data descriptor to point to the corresponding chunk(s) of
+ * @fctx->pos data buffer.
+ *
+ * Because ceph_file_to_extents() will merge adjacent object extents
+ * together, each object request's data descriptor may point to multiple
+ * different chunks of @fctx->pos data buffer.
+ *
+ * @fctx->pos data buffer is assumed to be large enough.
+ */
+static int rbd_img_fill_request(struct rbd_img_request *img_req,
+                               struct ceph_file_extent *img_extents,
+                               u32 num_img_extents,
+                               struct rbd_img_fill_ctx *fctx)
 {
-       struct rbd_obj_request *orig_request;
-       struct ceph_osd_request *osd_req;
-       struct rbd_device *rbd_dev;
-       struct page **pages;
-       enum obj_operation_type op_type;
-       u32 page_count;
-       int img_result;
-       u64 parent_length;
-
-       rbd_assert(img_request_child_test(img_request));
-
-       /* First get what we need from the image request */
-
-       pages = img_request->copyup_pages;
-       rbd_assert(pages != NULL);
-       img_request->copyup_pages = NULL;
-       page_count = img_request->copyup_page_count;
-       rbd_assert(page_count);
-       img_request->copyup_page_count = 0;
-
-       orig_request = img_request->obj_request;
-       rbd_assert(orig_request != NULL);
-       rbd_assert(obj_request_type_valid(orig_request->type));
-       img_result = img_request->result;
-       parent_length = img_request->length;
-       rbd_assert(img_result || parent_length == img_request->xferred);
-       rbd_img_request_put(img_request);
+       struct rbd_device *rbd_dev = img_req->rbd_dev;
+       struct rbd_obj_request *obj_req;
+       u32 i;
+       int ret;
+
+       if (fctx->pos_type == OBJ_REQUEST_NODATA ||
+           !rbd_layout_is_fancy(&rbd_dev->layout))
+               return rbd_img_fill_request_nocopy(img_req, img_extents,
+                                                  num_img_extents, fctx);
 
-       rbd_assert(orig_request->img_request);
-       rbd_dev = orig_request->img_request->rbd_dev;
-       rbd_assert(rbd_dev);
+       img_req->data_type = OBJ_REQUEST_OWN_BVECS;
 
        /*
-        * If the overlap has become 0 (most likely because the
-        * image has been flattened) we need to free the pages
-        * and re-submit the original write request.
+        * Create object requests and determine ->bvec_count for each object
+        * request.  Note that ->bvec_count sum over all object requests may
+        * be greater than the number of bio_vecs in the provided bio (list)
+        * or bio_vec array because when mapped, those bio_vecs can straddle
+        * stripe unit boundaries.
         */
-       if (!rbd_dev->parent_overlap) {
-               ceph_release_page_vector(pages, page_count);
-               rbd_obj_request_submit(orig_request);
-               return;
+       fctx->iter = *fctx->pos;
+       for (i = 0; i < num_img_extents; i++) {
+               ret = ceph_file_to_extents(&rbd_dev->layout,
+                                          img_extents[i].fe_off,
+                                          img_extents[i].fe_len,
+                                          &img_req->object_extents,
+                                          alloc_object_extent, img_req,
+                                          fctx->count_fn, &fctx->iter);
+               if (ret)
+                       return ret;
        }
 
-       if (img_result)
-               goto out_err;
+       for_each_obj_request(img_req, obj_req) {
+               obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
+                                             sizeof(*obj_req->bvec_pos.bvecs),
+                                             GFP_NOIO);
+               if (!obj_req->bvec_pos.bvecs)
+                       return -ENOMEM;
+       }
 
        /*
-        * The original osd request is of no use to use any more.
-        * We need a new one that can hold the three ops in a copyup
-        * request.  Allocate the new copyup osd request for the
-        * original request, and release the old one.
+        * Fill in each object request's private bio_vec array, splitting and
+        * rearranging the provided bio_vecs in stripe unit chunks as needed.
         */
-       img_result = -ENOMEM;
-       osd_req = rbd_osd_req_create_copyup(orig_request);
-       if (!osd_req)
-               goto out_err;
-       rbd_osd_req_destroy(orig_request->osd_req);
-       orig_request->osd_req = osd_req;
-       orig_request->copyup_pages = pages;
-       orig_request->copyup_page_count = page_count;
+       fctx->iter = *fctx->pos;
+       for (i = 0; i < num_img_extents; i++) {
+               ret = ceph_iterate_extents(&rbd_dev->layout,
+                                          img_extents[i].fe_off,
+                                          img_extents[i].fe_len,
+                                          &img_req->object_extents,
+                                          fctx->copy_fn, &fctx->iter);
+               if (ret)
+                       return ret;
+       }
 
-       /* Initialize the copyup op */
+       return __rbd_img_fill_request(img_req);
+}
+
+static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
+                              u64 off, u64 len)
+{
+       struct ceph_file_extent ex = { off, len };
+       union rbd_img_fill_iter dummy;
+       struct rbd_img_fill_ctx fctx = {
+               .pos_type = OBJ_REQUEST_NODATA,
+               .pos = &dummy,
+       };
+
+       return rbd_img_fill_request(img_req, &ex, 1, &fctx);
+}
+
+static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
+{
+       struct rbd_obj_request *obj_req =
+           container_of(ex, struct rbd_obj_request, ex);
+       struct ceph_bio_iter *it = arg;
 
-       osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
-       osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
-                                               false, false);
+       dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
+       obj_req->bio_pos = *it;
+       ceph_bio_iter_advance(it, bytes);
+}
 
-       /* Add the other op(s) */
+static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
+{
+       struct rbd_obj_request *obj_req =
+           container_of(ex, struct rbd_obj_request, ex);
+       struct ceph_bio_iter *it = arg;
 
-       op_type = rbd_img_request_op_type(orig_request->img_request);
-       rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
+       dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
+       ceph_bio_iter_advance_step(it, bytes, ({
+               obj_req->bvec_count++;
+       }));
 
-       /* All set, send it off. */
+}
 
-       rbd_obj_request_submit(orig_request);
-       return;
+static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
+{
+       struct rbd_obj_request *obj_req =
+           container_of(ex, struct rbd_obj_request, ex);
+       struct ceph_bio_iter *it = arg;
 
-out_err:
-       ceph_release_page_vector(pages, page_count);
-       rbd_obj_request_error(orig_request, img_result);
+       dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
+       ceph_bio_iter_advance_step(it, bytes, ({
+               obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
+               obj_req->bvec_pos.iter.bi_size += bv.bv_len;
+       }));
 }
 
-/*
- * Read from the parent image the range of data that covers the
- * entire target of the given object request.  This is used for
- * satisfying a layered image write request when the target of an
- * object request from the image request does not exist.
- *
- * A page array big enough to hold the returned data is allocated
- * and supplied to rbd_img_request_fill() as the "data descriptor."
- * When the read completes, this page array will be transferred to
- * the original object request for the copyup operation.
- *
- * If an error occurs, it is recorded as the result of the original
- * object request in rbd_img_obj_exists_callback().
- */
-static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
-{
-       struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
-       struct rbd_img_request *parent_request = NULL;
-       u64 img_offset;
-       u64 length;
-       struct page **pages = NULL;
-       u32 page_count;
-       int result;
+static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
+                                  struct ceph_file_extent *img_extents,
+                                  u32 num_img_extents,
+                                  struct ceph_bio_iter *bio_pos)
+{
+       struct rbd_img_fill_ctx fctx = {
+               .pos_type = OBJ_REQUEST_BIO,
+               .pos = (union rbd_img_fill_iter *)bio_pos,
+               .set_pos_fn = set_bio_pos,
+               .count_fn = count_bio_bvecs,
+               .copy_fn = copy_bio_bvecs,
+       };
 
-       rbd_assert(rbd_dev->parent != NULL);
+       return rbd_img_fill_request(img_req, img_extents, num_img_extents,
+                                   &fctx);
+}
 
-       /*
-        * Determine the byte range covered by the object in the
-        * child image to which the original request was to be sent.
-        */
-       img_offset = obj_request->img_offset - obj_request->offset;
-       length = rbd_obj_bytes(&rbd_dev->header);
+static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
+                                u64 off, u64 len, struct bio *bio)
+{
+       struct ceph_file_extent ex = { off, len };
+       struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
 
-       /*
-        * There is no defined parent data beyond the parent
-        * overlap, so limit what we read at that boundary if
-        * necessary.
-        */
-       if (img_offset + length > rbd_dev->parent_overlap) {
-               rbd_assert(img_offset < rbd_dev->parent_overlap);
-               length = rbd_dev->parent_overlap - img_offset;
-       }
+       return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
+}
 
-       /*
-        * Allocate a page array big enough to receive the data read
-        * from the parent.
-        */
-       page_count = (u32)calc_pages_for(0, length);
-       pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
-       if (IS_ERR(pages)) {
-               result = PTR_ERR(pages);
-               pages = NULL;
-               goto out_err;
-       }
+static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
+{
+       struct rbd_obj_request *obj_req =
+           container_of(ex, struct rbd_obj_request, ex);
+       struct ceph_bvec_iter *it = arg;
 
-       result = -ENOMEM;
-       parent_request = rbd_parent_request_create(obj_request,
-                                               img_offset, length);
-       if (!parent_request)
-               goto out_err;
+       obj_req->bvec_pos = *it;
+       ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
+       ceph_bvec_iter_advance(it, bytes);
+}
 
-       result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
-       if (result)
-               goto out_err;
+static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
+{
+       struct rbd_obj_request *obj_req =
+           container_of(ex, struct rbd_obj_request, ex);
+       struct ceph_bvec_iter *it = arg;
 
-       parent_request->copyup_pages = pages;
-       parent_request->copyup_page_count = page_count;
-       parent_request->callback = rbd_img_obj_parent_read_full_callback;
+       ceph_bvec_iter_advance_step(it, bytes, ({
+               obj_req->bvec_count++;
+       }));
+}
 
-       result = rbd_img_request_submit(parent_request);
-       if (!result)
-               return 0;
+static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
+{
+       struct rbd_obj_request *obj_req =
+           container_of(ex, struct rbd_obj_request, ex);
+       struct ceph_bvec_iter *it = arg;
 
-       parent_request->copyup_pages = NULL;
-       parent_request->copyup_page_count = 0;
-out_err:
-       if (pages)
-               ceph_release_page_vector(pages, page_count);
-       if (parent_request)
-               rbd_img_request_put(parent_request);
-       return result;
+       ceph_bvec_iter_advance_step(it, bytes, ({
+               obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
+               obj_req->bvec_pos.iter.bi_size += bv.bv_len;
+       }));
 }
 
-static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
+static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
+                                    struct ceph_file_extent *img_extents,
+                                    u32 num_img_extents,
+                                    struct ceph_bvec_iter *bvec_pos)
 {
-       struct rbd_obj_request *orig_request;
-       struct rbd_device *rbd_dev;
-       int result;
+       struct rbd_img_fill_ctx fctx = {
+               .pos_type = OBJ_REQUEST_BVECS,
+               .pos = (union rbd_img_fill_iter *)bvec_pos,
+               .set_pos_fn = set_bvec_pos,
+               .count_fn = count_bvecs,
+               .copy_fn = copy_bvecs,
+       };
 
-       rbd_assert(!obj_request_img_data_test(obj_request));
+       return rbd_img_fill_request(img_req, img_extents, num_img_extents,
+                                   &fctx);
+}
 
-       /*
-        * All we need from the object request is the original
-        * request and the result of the STAT op.  Grab those, then
-        * we're done with the request.
-        */
-       orig_request = obj_request->obj_request;
-       obj_request->obj_request = NULL;
-       rbd_obj_request_put(orig_request);
-       rbd_assert(orig_request);
-       rbd_assert(orig_request->img_request);
-
-       result = obj_request->result;
-       obj_request->result = 0;
-
-       dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
-               obj_request, orig_request, result,
-               obj_request->xferred, obj_request->length);
-       rbd_obj_request_put(obj_request);
+static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
+                                  struct ceph_file_extent *img_extents,
+                                  u32 num_img_extents,
+                                  struct bio_vec *bvecs)
+{
+       struct ceph_bvec_iter it = {
+               .bvecs = bvecs,
+               .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
+                                                            num_img_extents) },
+       };
 
-       /*
-        * If the overlap has become 0 (most likely because the
-        * image has been flattened) we need to re-submit the
-        * original request.
-        */
-       rbd_dev = orig_request->img_request->rbd_dev;
-       if (!rbd_dev->parent_overlap) {
-               rbd_obj_request_submit(orig_request);
-               return;
-       }
+       return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
+                                        &it);
+}
 
-       /*
-        * Our only purpose here is to determine whether the object
-        * exists, and we don't want to treat the non-existence as
-        * an error.  If something else comes back, transfer the
-        * error to the original request and complete it now.
-        */
-       if (!result) {
-               obj_request_existence_set(orig_request, true);
-       } else if (result == -ENOENT) {
-               obj_request_existence_set(orig_request, false);
-       } else {
-               goto fail_orig_request;
-       }
+static void rbd_img_request_submit(struct rbd_img_request *img_request)
+{
+       struct rbd_obj_request *obj_request;
 
-       /*
-        * Resubmit the original request now that we have recorded
-        * whether the target object exists.
-        */
-       result = rbd_img_obj_request_submit(orig_request);
-       if (result)
-               goto fail_orig_request;
+       dout("%s: img %p\n", __func__, img_request);
 
-       return;
+       rbd_img_request_get(img_request);
+       for_each_obj_request(img_request, obj_request)
+               rbd_obj_request_submit(obj_request);
 
-fail_orig_request:
-       rbd_obj_request_error(orig_request, result);
+       rbd_img_request_put(img_request);
 }
 
-static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
+static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
 {
-       struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
-       struct rbd_obj_request *stat_request;
-       struct page **pages;
-       u32 page_count;
-       size_t size;
+       struct rbd_img_request *img_req = obj_req->img_request;
+       struct rbd_img_request *child_img_req;
        int ret;
 
-       stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
-       if (!stat_request)
+       child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
+                                              OBJ_OP_READ, NULL);
+       if (!child_img_req)
                return -ENOMEM;
 
-       stat_request->object_no = obj_request->object_no;
+       __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
+       child_img_req->obj_request = obj_req;
 
-       stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-                                                  stat_request);
-       if (!stat_request->osd_req) {
-               ret = -ENOMEM;
-               goto fail_stat_request;
+       if (!rbd_img_is_write(img_req)) {
+               switch (img_req->data_type) {
+               case OBJ_REQUEST_BIO:
+                       ret = __rbd_img_fill_from_bio(child_img_req,
+                                                     obj_req->img_extents,
+                                                     obj_req->num_img_extents,
+                                                     &obj_req->bio_pos);
+                       break;
+               case OBJ_REQUEST_BVECS:
+               case OBJ_REQUEST_OWN_BVECS:
+                       ret = __rbd_img_fill_from_bvecs(child_img_req,
+                                                     obj_req->img_extents,
+                                                     obj_req->num_img_extents,
+                                                     &obj_req->bvec_pos);
+                       break;
+               default:
+                       rbd_assert(0);
+               }
+       } else {
+               ret = rbd_img_fill_from_bvecs(child_img_req,
+                                             obj_req->img_extents,
+                                             obj_req->num_img_extents,
+                                             obj_req->copyup_bvecs);
+       }
+       if (ret) {
+               rbd_img_request_put(child_img_req);
+               return ret;
+       }
+
+       rbd_img_request_submit(child_img_req);
+       return 0;
+}
+
+static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       int ret;
+
+       if (obj_req->result == -ENOENT &&
+           rbd_dev->parent_overlap && !obj_req->tried_parent) {
+               /* reverse map this object extent onto the parent */
+               ret = rbd_obj_calc_img_extents(obj_req, false);
+               if (ret) {
+                       obj_req->result = ret;
+                       return true;
+               }
+
+               if (obj_req->num_img_extents) {
+                       obj_req->tried_parent = true;
+                       ret = rbd_obj_read_from_parent(obj_req);
+                       if (ret) {
+                               obj_req->result = ret;
+                               return true;
+                       }
+                       return false;
+               }
        }
 
        /*
-        * The response data for a STAT call consists of:
-        *     le64 length;
-        *     struct {
-        *         le32 tv_sec;
-        *         le32 tv_nsec;
-        *     } mtime;
+        * -ENOENT means a hole in the image -- zero-fill the entire
+        * length of the request.  A short read also implies zero-fill
+        * to the end of the request.  In both cases we update xferred
+        * count to indicate the whole request was satisfied.
         */
-       size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
-       page_count = (u32)calc_pages_for(0, size);
-       pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
-       if (IS_ERR(pages)) {
-               ret = PTR_ERR(pages);
-               goto fail_stat_request;
+       if (obj_req->result == -ENOENT ||
+           (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
+               rbd_assert(!obj_req->xferred || !obj_req->result);
+               rbd_obj_zero_range(obj_req, obj_req->xferred,
+                                  obj_req->ex.oe_len - obj_req->xferred);
+               obj_req->result = 0;
+               obj_req->xferred = obj_req->ex.oe_len;
        }
 
-       osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
-       osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
-                                    false, false);
-
-       rbd_obj_request_get(obj_request);
-       stat_request->obj_request = obj_request;
-       stat_request->pages = pages;
-       stat_request->page_count = page_count;
-       stat_request->callback = rbd_img_obj_exists_callback;
+       return true;
+}
 
-       rbd_obj_request_submit(stat_request);
-       return 0;
+/*
+ * copyup_bvecs pages are never highmem pages
+ */
+static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
+{
+       struct ceph_bvec_iter it = {
+               .bvecs = bvecs,
+               .iter = { .bi_size = bytes },
+       };
 
-fail_stat_request:
-       rbd_obj_request_put(stat_request);
-       return ret;
+       ceph_bvec_iter_advance_step(&it, bytes, ({
+               if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
+                              bv.bv_len))
+                       return false;
+       }));
+       return true;
 }
 
-static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
+static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
 {
-       struct rbd_img_request *img_request = obj_request->img_request;
-       struct rbd_device *rbd_dev = img_request->rbd_dev;
-
-       /* Reads */
-       if (!img_request_write_test(img_request) &&
-           !img_request_discard_test(img_request))
-               return true;
+       unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
 
-       /* Non-layered writes */
-       if (!img_request_layered_test(img_request))
-               return true;
+       dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
+       rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
+       rbd_osd_req_destroy(obj_req->osd_req);
 
        /*
-        * Layered writes outside of the parent overlap range don't
-        * share any data with the parent.
+        * Create a copyup request with the same number of OSD ops as
+        * the original request.  The original request was stat + op(s),
+        * the new copyup request will be copyup + the same op(s).
         */
-       if (!obj_request_overlaps_parent(obj_request))
-               return true;
+       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
 
        /*
-        * Entire-object layered writes - we will overwrite whatever
-        * parent data there is anyway.
+        * Only send non-zero copyup data to save some I/O and network
+        * bandwidth -- zero copyup data is equivalent to the object not
+        * existing.
         */
-       if (!obj_request->offset &&
-           obj_request->length == rbd_obj_bytes(&rbd_dev->header))
-               return true;
+       if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
+               dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
+               bytes = 0;
+       }
 
-       /*
-        * If the object is known to already exist, its parent data has
-        * already been copied.
-        */
-       if (obj_request_known_test(obj_request) &&
-           obj_request_exists_test(obj_request))
-               return true;
+       osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
+                           "copyup");
+       osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
+                                         obj_req->copyup_bvecs, bytes);
+
+       switch (obj_req->img_request->op_type) {
+       case OBJ_OP_WRITE:
+               __rbd_obj_setup_write(obj_req, 1);
+               break;
+       case OBJ_OP_DISCARD:
+               rbd_assert(!rbd_obj_is_entire(obj_req));
+               __rbd_obj_setup_discard(obj_req, 1);
+               break;
+       default:
+               rbd_assert(0);
+       }
 
-       return false;
+       rbd_obj_request_submit(obj_req);
+       return 0;
 }
 
-static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
+static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
 {
-       rbd_assert(obj_request_img_data_test(obj_request));
-       rbd_assert(obj_request_type_valid(obj_request->type));
-       rbd_assert(obj_request->img_request);
+       u32 i;
 
-       if (img_obj_request_simple(obj_request)) {
-               rbd_obj_request_submit(obj_request);
-               return 0;
-       }
+       rbd_assert(!obj_req->copyup_bvecs);
+       obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
+       obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
+                                       sizeof(*obj_req->copyup_bvecs),
+                                       GFP_NOIO);
+       if (!obj_req->copyup_bvecs)
+               return -ENOMEM;
 
-       /*
-        * It's a layered write.  The target object might exist but
-        * we may not know that yet.  If we know it doesn't exist,
-        * start by reading the data for the full target object from
-        * the parent so we can use it for a copyup to the target.
-        */
-       if (obj_request_known_test(obj_request))
-               return rbd_img_obj_parent_read_full(obj_request);
+       for (i = 0; i < obj_req->copyup_bvec_count; i++) {
+               unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
+
+               obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
+               if (!obj_req->copyup_bvecs[i].bv_page)
+                       return -ENOMEM;
 
-       /* We don't know whether the target exists.  Go find out. */
+               obj_req->copyup_bvecs[i].bv_offset = 0;
+               obj_req->copyup_bvecs[i].bv_len = len;
+               obj_overlap -= len;
+       }
 
-       return rbd_img_obj_exists_submit(obj_request);
+       rbd_assert(!obj_overlap);
+       return 0;
 }
 
-static int rbd_img_request_submit(struct rbd_img_request *img_request)
+static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
 {
-       struct rbd_obj_request *obj_request;
-       struct rbd_obj_request *next_obj_request;
-       int ret = 0;
-
-       dout("%s: img %p\n", __func__, img_request);
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       int ret;
 
-       rbd_img_request_get(img_request);
-       for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
-               ret = rbd_img_obj_request_submit(obj_request);
-               if (ret)
-                       goto out_put_ireq;
+       rbd_assert(obj_req->num_img_extents);
+       prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
+                     rbd_dev->parent_overlap);
+       if (!obj_req->num_img_extents) {
+               /*
+                * The overlap has become 0 (most likely because the
+                * image has been flattened).  Use rbd_obj_issue_copyup()
+                * to re-submit the original write request -- the copyup
+                * operation itself will be a no-op, since someone must
+                * have populated the child object while we weren't
+                * looking.  Move to WRITE_FLAT state as we'll be done
+                * with the operation once the null copyup completes.
+                */
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+               return rbd_obj_issue_copyup(obj_req, 0);
        }
 
-out_put_ireq:
-       rbd_img_request_put(img_request);
-       return ret;
+       ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
+       if (ret)
+               return ret;
+
+       obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
+       return rbd_obj_read_from_parent(obj_req);
 }
 
-static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
+static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
 {
-       struct rbd_obj_request *obj_request;
-       struct rbd_device *rbd_dev;
-       u64 obj_end;
-       u64 img_xferred;
-       int img_result;
+       int ret;
 
-       rbd_assert(img_request_child_test(img_request));
+again:
+       switch (obj_req->write_state) {
+       case RBD_OBJ_WRITE_GUARD:
+               rbd_assert(!obj_req->xferred);
+               if (obj_req->result == -ENOENT) {
+                       /*
+                        * The target object doesn't exist.  Read the data for
+                        * the entire target object up to the overlap point (if
+                        * any) from the parent, so we can use it for a copyup.
+                        */
+                       ret = rbd_obj_handle_write_guard(obj_req);
+                       if (ret) {
+                               obj_req->result = ret;
+                               return true;
+                       }
+                       return false;
+               }
+               /* fall through */
+       case RBD_OBJ_WRITE_FLAT:
+               if (!obj_req->result)
+                       /*
+                        * There is no such thing as a successful short
+                        * write -- indicate the whole request was satisfied.
+                        */
+                       obj_req->xferred = obj_req->ex.oe_len;
+               return true;
+       case RBD_OBJ_WRITE_COPYUP:
+               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+               if (obj_req->result)
+                       goto again;
 
-       /* First get what we need from the image request and release it */
+               rbd_assert(obj_req->xferred);
+               ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
+               if (ret) {
+                       obj_req->result = ret;
+                       return true;
+               }
+               return false;
+       default:
+               rbd_assert(0);
+       }
+}
 
-       obj_request = img_request->obj_request;
-       img_xferred = img_request->xferred;
-       img_result = img_request->result;
-       rbd_img_request_put(img_request);
+/*
+ * Returns true if @obj_req is completed, or false otherwise.
+ */
+static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+{
+       switch (obj_req->img_request->op_type) {
+       case OBJ_OP_READ:
+               return rbd_obj_handle_read(obj_req);
+       case OBJ_OP_WRITE:
+               return rbd_obj_handle_write(obj_req);
+       case OBJ_OP_DISCARD:
+               if (rbd_obj_handle_write(obj_req)) {
+                       /*
+                        * Hide -ENOENT from delete/truncate/zero -- discarding
+                        * a non-existent object is not a problem.
+                        */
+                       if (obj_req->result == -ENOENT) {
+                               obj_req->result = 0;
+                               obj_req->xferred = obj_req->ex.oe_len;
+                       }
+                       return true;
+               }
+               return false;
+       default:
+               rbd_assert(0);
+       }
+}
 
-       /*
-        * If the overlap has become 0 (most likely because the
-        * image has been flattened) we need to re-submit the
-        * original request.
-        */
-       rbd_assert(obj_request);
-       rbd_assert(obj_request->img_request);
-       rbd_dev = obj_request->img_request->rbd_dev;
-       if (!rbd_dev->parent_overlap) {
-               rbd_obj_request_submit(obj_request);
+static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
+{
+       struct rbd_img_request *img_req = obj_req->img_request;
+
+       rbd_assert((!obj_req->result &&
+                   obj_req->xferred == obj_req->ex.oe_len) ||
+                  (obj_req->result < 0 && !obj_req->xferred));
+       if (!obj_req->result) {
+               img_req->xferred += obj_req->xferred;
                return;
        }
 
-       obj_request->result = img_result;
-       if (obj_request->result)
-               goto out;
+       rbd_warn(img_req->rbd_dev,
+                "%s at objno %llu %llu~%llu result %d xferred %llu",
+                obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
+                obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
+                obj_req->xferred);
+       if (!img_req->result) {
+               img_req->result = obj_req->result;
+               img_req->xferred = 0;
+       }
+}
 
-       /*
-        * We need to zero anything beyond the parent overlap
-        * boundary.  Since rbd_img_obj_request_read_callback()
-        * will zero anything beyond the end of a short read, an
-        * easy way to do this is to pretend the data from the
-        * parent came up short--ending at the overlap boundary.
-        */
-       rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
-       obj_end = obj_request->img_offset + obj_request->length;
-       if (obj_end > rbd_dev->parent_overlap) {
-               u64 xferred = 0;
+static void rbd_img_end_child_request(struct rbd_img_request *img_req)
+{
+       struct rbd_obj_request *obj_req = img_req->obj_request;
 
-               if (obj_request->img_offset < rbd_dev->parent_overlap)
-                       xferred = rbd_dev->parent_overlap -
-                                       obj_request->img_offset;
+       rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
+       rbd_assert((!img_req->result &&
+                   img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
+                  (img_req->result < 0 && !img_req->xferred));
 
-               obj_request->xferred = min(img_xferred, xferred);
-       } else {
-               obj_request->xferred = img_xferred;
-       }
-out:
-       rbd_img_obj_request_read_callback(obj_request);
-       rbd_obj_request_complete(obj_request);
+       obj_req->result = img_req->result;
+       obj_req->xferred = img_req->xferred;
+       rbd_img_request_put(img_req);
 }
 
-static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
+static void rbd_img_end_request(struct rbd_img_request *img_req)
 {
-       struct rbd_img_request *img_request;
-       int result;
+       rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
+       rbd_assert((!img_req->result &&
+                   img_req->xferred == blk_rq_bytes(img_req->rq)) ||
+                  (img_req->result < 0 && !img_req->xferred));
 
-       rbd_assert(obj_request_img_data_test(obj_request));
-       rbd_assert(obj_request->img_request != NULL);
-       rbd_assert(obj_request->result == (s32) -ENOENT);
-       rbd_assert(obj_request_type_valid(obj_request->type));
+       blk_mq_end_request(img_req->rq,
+                          errno_to_blk_status(img_req->result));
+       rbd_img_request_put(img_req);
+}
 
-       /* rbd_read_finish(obj_request, obj_request->length); */
-       img_request = rbd_parent_request_create(obj_request,
-                                               obj_request->img_offset,
-                                               obj_request->length);
-       result = -ENOMEM;
-       if (!img_request)
-               goto out_err;
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+{
+       struct rbd_img_request *img_req;
 
-       if (obj_request->type == OBJ_REQUEST_BIO)
-               result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-                                               obj_request->bio_list);
-       else
-               result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
-                                               obj_request->pages);
-       if (result)
-               goto out_err;
+again:
+       if (!__rbd_obj_handle_request(obj_req))
+               return;
 
-       img_request->callback = rbd_img_parent_read_callback;
-       result = rbd_img_request_submit(img_request);
-       if (result)
-               goto out_err;
+       img_req = obj_req->img_request;
+       spin_lock(&img_req->completion_lock);
+       rbd_obj_end_request(obj_req);
+       rbd_assert(img_req->pending_count);
+       if (--img_req->pending_count) {
+               spin_unlock(&img_req->completion_lock);
+               return;
+       }
 
-       return;
-out_err:
-       if (img_request)
-               rbd_img_request_put(img_request);
-       obj_request->result = result;
-       obj_request->xferred = 0;
-       obj_request_done_set(obj_request);
+       spin_unlock(&img_req->completion_lock);
+       if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
+               obj_req = img_req->obj_request;
+               rbd_img_end_child_request(img_req);
+               goto again;
+       }
+       rbd_img_end_request(img_req);
 }
 
 static const struct rbd_client_id rbd_empty_cid;
@@ -3091,8 +2674,8 @@ static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
-       int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
-       char buf[buf_size];
+       char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
+       int buf_size = sizeof(buf);
        void *p = buf;
 
        dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
@@ -3610,8 +3193,8 @@ static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
                                     u64 notify_id, u64 cookie, s32 *result)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
-       char buf[buf_size];
+       char buf[4 + CEPH_ENCODING_START_BLK_LEN];
+       int buf_size = sizeof(buf);
        int ret;
 
        if (result) {
@@ -3887,7 +3470,7 @@ static void rbd_reregister_watch(struct work_struct *work)
 
        ret = rbd_dev_refresh(rbd_dev);
        if (ret)
-               rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+               rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
 }
 
 /*
@@ -4070,8 +3653,7 @@ static void rbd_queue_workfn(struct work_struct *work)
                }
        }
 
-       img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
-                                            snapc);
+       img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
        if (!img_request) {
                result = -ENOMEM;
                goto err_unlock;
@@ -4080,18 +3662,14 @@ static void rbd_queue_workfn(struct work_struct *work)
        snapc = NULL; /* img_request consumes a ref */
 
        if (op_type == OBJ_OP_DISCARD)
-               result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
-                                             NULL);
+               result = rbd_img_fill_nodata(img_request, offset, length);
        else
-               result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-                                             rq->bio);
-       if (result)
-               goto err_img_request;
-
-       result = rbd_img_request_submit(img_request);
+               result = rbd_img_fill_from_bio(img_request, offset, length,
+                                              rq->bio);
        if (result)
                goto err_img_request;
 
+       rbd_img_request_submit(img_request);
        if (must_be_locked)
                up_read(&rbd_dev->lock_rwsem);
        return;
@@ -4369,7 +3947,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
        q->limits.max_sectors = queue_max_hw_sectors(q);
        blk_queue_max_segments(q, USHRT_MAX);
-       blk_queue_max_segment_size(q, segment_size);
+       blk_queue_max_segment_size(q, UINT_MAX);
        blk_queue_io_min(q, segment_size);
        blk_queue_io_opt(q, segment_size);
 
@@ -5057,9 +4635,6 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
        } __attribute__ ((packed)) striping_info_buf = { 0 };
        size_t size = sizeof (striping_info_buf);
        void *p;
-       u64 obj_size;
-       u64 stripe_unit;
-       u64 stripe_count;
        int ret;
 
        ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
@@ -5071,31 +4646,9 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
        if (ret < size)
                return -ERANGE;
 
-       /*
-        * We don't actually support the "fancy striping" feature
-        * (STRIPINGV2) yet, but if the striping sizes are the
-        * defaults the behavior is the same as before.  So find
-        * out, and only fail if the image has non-default values.
-        */
-       ret = -EINVAL;
-       obj_size = rbd_obj_bytes(&rbd_dev->header);
        p = &striping_info_buf;
-       stripe_unit = ceph_decode_64(&p);
-       if (stripe_unit != obj_size) {
-               rbd_warn(rbd_dev, "unsupported stripe unit "
-                               "(got %llu want %llu)",
-                               stripe_unit, obj_size);
-               return -EINVAL;
-       }
-       stripe_count = ceph_decode_64(&p);
-       if (stripe_count != 1) {
-               rbd_warn(rbd_dev, "unsupported stripe count "
-                               "(got %llu want 1)", stripe_count);
-               return -EINVAL;
-       }
-       rbd_dev->header.stripe_unit = stripe_unit;
-       rbd_dev->header.stripe_count = stripe_count;
-
+       rbd_dev->header.stripe_unit = ceph_decode_64(&p);
+       rbd_dev->header.stripe_count = ceph_decode_64(&p);
        return 0;
 }
 
@@ -5653,39 +5206,6 @@ out_err:
        return ret;
 }
 
-/*
- * Return pool id (>= 0) or a negative error code.
- */
-static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
-{
-       struct ceph_options *opts = rbdc->client->options;
-       u64 newest_epoch;
-       int tries = 0;
-       int ret;
-
-again:
-       ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
-       if (ret == -ENOENT && tries++ < 1) {
-               ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
-                                           &newest_epoch);
-               if (ret < 0)
-                       return ret;
-
-               if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
-                       ceph_osdc_maybe_request_map(&rbdc->client->osdc);
-                       (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
-                                                    newest_epoch,
-                                                    opts->mount_timeout);
-                       goto again;
-               } else {
-                       /* the osdmap we have is new enough */
-                       return -ENOENT;
-               }
-       }
-
-       return ret;
-}
-
 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
 {
        down_write(&rbd_dev->lock_rwsem);
@@ -6114,7 +5634,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        }
 
        /* pick the pool */
-       rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
+       rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
        if (rc < 0) {
                if (rc == -ENOENT)
                        pr_info("pool %s does not exist\n", spec->pool_name);
@@ -6366,16 +5886,8 @@ static int rbd_slab_init(void)
        if (!rbd_obj_request_cache)
                goto out_err;
 
-       rbd_assert(!rbd_bio_clone);
-       rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
-       if (!rbd_bio_clone)
-               goto out_err_clone;
-
        return 0;
 
-out_err_clone:
-       kmem_cache_destroy(rbd_obj_request_cache);
-       rbd_obj_request_cache = NULL;
 out_err:
        kmem_cache_destroy(rbd_img_request_cache);
        rbd_img_request_cache = NULL;
@@ -6391,10 +5903,6 @@ static void rbd_slab_exit(void)
        rbd_assert(rbd_img_request_cache);
        kmem_cache_destroy(rbd_img_request_cache);
        rbd_img_request_cache = NULL;
-
-       rbd_assert(rbd_bio_clone);
-       bioset_free(rbd_bio_clone);
-       rbd_bio_clone = NULL;
 }
 
 static int __init rbd_init(void)
index 174f5709e5086257bdfd623de7fbb07011c30634..a699e320393f2afba3785d2469da36b2c32350fd 100644 (file)
@@ -6,7 +6,7 @@
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
 ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
-       export.o caps.o snap.o xattr.o \
+       export.o caps.o snap.o xattr.o quota.o \
        mds_client.o mdsmap.o strings.o ceph_frag.o \
        debugfs.o
 
index b4336b42ce3bb1fabec247c01f51d86c2c28490f..5f7ad3d0df2ea69121acded120e26d05515fd79a 100644 (file)
@@ -15,6 +15,7 @@
 #include "mds_client.h"
 #include "cache.h"
 #include <linux/ceph/osd_client.h>
+#include <linux/ceph/striper.h>
 
 /*
  * Ceph address space ops.
@@ -438,7 +439,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 {
        struct inode *inode = file_inode(file);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_file_info *ci = file->private_data;
+       struct ceph_file_info *fi = file->private_data;
        struct ceph_rw_context *rw_ctx;
        int rc = 0;
        int max = 0;
@@ -452,7 +453,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
        if (rc == 0)
                goto out;
 
-       rw_ctx = ceph_find_rw_context(ci);
+       rw_ctx = ceph_find_rw_context(fi);
        max = fsc->mount_options->rsize >> PAGE_SHIFT;
        dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
             inode, file, rw_ctx, nr_pages, max);
@@ -800,7 +801,7 @@ static int ceph_writepages_start(struct address_space *mapping,
        struct ceph_osd_request *req = NULL;
        struct ceph_writeback_ctl ceph_wbc;
        bool should_loop, range_whole = false;
-       bool stop, done = false;
+       bool done = false;
 
        dout("writepages_start %p (mode=%s)\n", inode,
             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
@@ -856,7 +857,7 @@ retry:
                 * in that range can be associated with newer snapc.
                 * They are not writeable until we write all dirty pages
                 * associated with 'snapc' get written */
-               if (index > 0 || wbc->sync_mode != WB_SYNC_NONE)
+               if (index > 0)
                        should_loop = true;
                dout(" non-head snapc, range whole\n");
        }
@@ -864,8 +865,7 @@ retry:
        ceph_put_snap_context(last_snapc);
        last_snapc = snapc;
 
-       stop = false;
-       while (!stop && index <= end) {
+       while (!done && index <= end) {
                int num_ops = 0, op_idx;
                unsigned i, pvec_pages, max_pages, locked_pages = 0;
                struct page **pages = NULL, **data_pages;
@@ -898,16 +898,30 @@ get_more_pages:
                                unlock_page(page);
                                continue;
                        }
-                       if (strip_unit_end && (page->index > strip_unit_end)) {
-                               dout("end of strip unit %p\n", page);
+                       /* only if matching snap context */
+                       pgsnapc = page_snap_context(page);
+                       if (pgsnapc != snapc) {
+                               dout("page snapc %p %lld != oldest %p %lld\n",
+                                    pgsnapc, pgsnapc->seq, snapc, snapc->seq);
+                               if (!should_loop &&
+                                   !ceph_wbc.head_snapc &&
+                                   wbc->sync_mode != WB_SYNC_NONE)
+                                       should_loop = true;
                                unlock_page(page);
-                               break;
+                               continue;
                        }
                        if (page_offset(page) >= ceph_wbc.i_size) {
                                dout("%p page eof %llu\n",
                                     page, ceph_wbc.i_size);
-                               /* not done if range_cyclic */
-                               stop = true;
+                               if (ceph_wbc.size_stable ||
+                                   page_offset(page) >= i_size_read(inode))
+                                       mapping->a_ops->invalidatepage(page,
+                                                               0, PAGE_SIZE);
+                               unlock_page(page);
+                               continue;
+                       }
+                       if (strip_unit_end && (page->index > strip_unit_end)) {
+                               dout("end of strip unit %p\n", page);
                                unlock_page(page);
                                break;
                        }
@@ -921,15 +935,6 @@ get_more_pages:
                                wait_on_page_writeback(page);
                        }
 
-                       /* only if matching snap context */
-                       pgsnapc = page_snap_context(page);
-                       if (pgsnapc != snapc) {
-                               dout("page snapc %p %lld != oldest %p %lld\n",
-                                    pgsnapc, pgsnapc->seq, snapc, snapc->seq);
-                               unlock_page(page);
-                               continue;
-                       }
-
                        if (!clear_page_dirty_for_io(page)) {
                                dout("%p !clear_page_dirty_for_io\n", page);
                                unlock_page(page);
@@ -945,19 +950,15 @@ get_more_pages:
                        if (locked_pages == 0) {
                                u64 objnum;
                                u64 objoff;
+                               u32 xlen;
 
                                /* prepare async write request */
                                offset = (u64)page_offset(page);
-                               len = wsize;
-
-                               rc = ceph_calc_file_object_mapping(&ci->i_layout,
-                                                               offset, len,
-                                                               &objnum, &objoff,
-                                                               &len);
-                               if (rc < 0) {
-                                       unlock_page(page);
-                                       break;
-                               }
+                               ceph_calc_file_object_mapping(&ci->i_layout,
+                                                             offset, wsize,
+                                                             &objnum, &objoff,
+                                                             &xlen);
+                               len = xlen;
 
                                num_ops = 1;
                                strip_unit_end = page->index +
@@ -1146,7 +1147,7 @@ new_request:
                 * we tagged for writeback prior to entering this loop.
                 */
                if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
-                       done = stop = true;
+                       done = true;
 
 release_pvec_pages:
                dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
index 33a211b364ed9e230d56a1fd30a5deb311516118..bb524c880b1eadf2915a0a551eb01730871dee9a 100644 (file)
@@ -51,7 +51,7 @@ static const struct fscache_cookie_def ceph_fscache_fsid_object_def = {
        .type           = FSCACHE_COOKIE_TYPE_INDEX,
 };
 
-int ceph_fscache_register(void)
+int __init ceph_fscache_register(void)
 {
        return fscache_register_netfs(&ceph_cache_netfs);
 }
@@ -135,7 +135,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
        if (memcmp(data, &aux, sizeof(aux)) != 0)
                return FSCACHE_CHECKAUX_OBSOLETE;
 
-       dout("ceph inode 0x%p cached okay", ci);
+       dout("ceph inode 0x%p cached okay\n", ci);
        return FSCACHE_CHECKAUX_OKAY;
 }
 
index 0e5bd3e3344e7983e6bdf38dba1adfba17500eba..23dbfae1615685dde002b8fa4e77ed54f6a01c0e 100644 (file)
@@ -184,36 +184,54 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
                                         mdsc->caps_avail_count);
        spin_unlock(&mdsc->caps_list_lock);
 
-       for (i = have; i < need; i++) {
-retry:
+       for (i = have; i < need; ) {
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
-               if (!cap) {
-                       if (!trimmed) {
-                               for (j = 0; j < mdsc->max_sessions; j++) {
-                                       s = __ceph_lookup_mds_session(mdsc, j);
-                                       if (!s)
-                                               continue;
-                                       mutex_unlock(&mdsc->mutex);
+               if (cap) {
+                       list_add(&cap->caps_item, &newcaps);
+                       alloc++;
+                       i++;
+                       continue;
+               }
 
-                                       mutex_lock(&s->s_mutex);
-                                       max_caps = s->s_nr_caps - (need - i);
-                                       ceph_trim_caps(mdsc, s, max_caps);
-                                       mutex_unlock(&s->s_mutex);
+               if (!trimmed) {
+                       for (j = 0; j < mdsc->max_sessions; j++) {
+                               s = __ceph_lookup_mds_session(mdsc, j);
+                               if (!s)
+                                       continue;
+                               mutex_unlock(&mdsc->mutex);
 
-                                       ceph_put_mds_session(s);
-                                       mutex_lock(&mdsc->mutex);
-                               }
-                               trimmed = true;
-                               goto retry;
-                       } else {
-                               pr_warn("reserve caps ctx=%p ENOMEM "
-                                       "need=%d got=%d\n",
-                                       ctx, need, have + alloc);
-                               goto out_nomem;
+                               mutex_lock(&s->s_mutex);
+                               max_caps = s->s_nr_caps - (need - i);
+                               ceph_trim_caps(mdsc, s, max_caps);
+                               mutex_unlock(&s->s_mutex);
+
+                               ceph_put_mds_session(s);
+                               mutex_lock(&mdsc->mutex);
                        }
+                       trimmed = true;
+
+                       spin_lock(&mdsc->caps_list_lock);
+                       if (mdsc->caps_avail_count) {
+                               int more_have;
+                               if (mdsc->caps_avail_count >= need - i)
+                                       more_have = need - i;
+                               else
+                                       more_have = mdsc->caps_avail_count;
+
+                               i += more_have;
+                               have += more_have;
+                               mdsc->caps_avail_count -= more_have;
+                               mdsc->caps_reserve_count += more_have;
+
+                       }
+                       spin_unlock(&mdsc->caps_list_lock);
+
+                       continue;
                }
-               list_add(&cap->caps_item, &newcaps);
-               alloc++;
+
+               pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
+                       ctx, need, have + alloc);
+               goto out_nomem;
        }
        BUG_ON(have + alloc != need);
 
@@ -234,16 +252,28 @@ retry:
        return 0;
 
 out_nomem:
+
+       spin_lock(&mdsc->caps_list_lock);
+       mdsc->caps_avail_count += have;
+       mdsc->caps_reserve_count -= have;
+
        while (!list_empty(&newcaps)) {
                cap = list_first_entry(&newcaps,
                                struct ceph_cap, caps_item);
                list_del(&cap->caps_item);
-               kmem_cache_free(ceph_cap_cachep, cap);
+
+               /* Keep some preallocated caps around (ceph_min_count), to
+                * avoid lots of free/alloc churn. */
+               if (mdsc->caps_avail_count >=
+                   mdsc->caps_reserve_count + mdsc->caps_min_count) {
+                       kmem_cache_free(ceph_cap_cachep, cap);
+               } else {
+                       mdsc->caps_avail_count++;
+                       mdsc->caps_total_count++;
+                       list_add(&cap->caps_item, &mdsc->caps_list);
+               }
        }
 
-       spin_lock(&mdsc->caps_list_lock);
-       mdsc->caps_avail_count += have;
-       mdsc->caps_reserve_count -= have;
        BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
                                         mdsc->caps_reserve_count +
                                         mdsc->caps_avail_count);
@@ -254,12 +284,26 @@ out_nomem:
 int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
                        struct ceph_cap_reservation *ctx)
 {
+       int i;
+       struct ceph_cap *cap;
+
        dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
        if (ctx->count) {
                spin_lock(&mdsc->caps_list_lock);
                BUG_ON(mdsc->caps_reserve_count < ctx->count);
                mdsc->caps_reserve_count -= ctx->count;
-               mdsc->caps_avail_count += ctx->count;
+               if (mdsc->caps_avail_count >=
+                   mdsc->caps_reserve_count + mdsc->caps_min_count) {
+                       mdsc->caps_total_count -= ctx->count;
+                       for (i = 0; i < ctx->count; i++) {
+                               cap = list_first_entry(&mdsc->caps_list,
+                                       struct ceph_cap, caps_item);
+                               list_del(&cap->caps_item);
+                               kmem_cache_free(ceph_cap_cachep, cap);
+                       }
+               } else {
+                       mdsc->caps_avail_count += ctx->count;
+               }
                ctx->count = 0;
                dout("unreserve caps %d = %d used + %d resv + %d avail\n",
                     mdsc->caps_total_count, mdsc->caps_use_count,
@@ -285,7 +329,23 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
                        mdsc->caps_use_count++;
                        mdsc->caps_total_count++;
                        spin_unlock(&mdsc->caps_list_lock);
+               } else {
+                       spin_lock(&mdsc->caps_list_lock);
+                       if (mdsc->caps_avail_count) {
+                               BUG_ON(list_empty(&mdsc->caps_list));
+
+                               mdsc->caps_avail_count--;
+                               mdsc->caps_use_count++;
+                               cap = list_first_entry(&mdsc->caps_list,
+                                               struct ceph_cap, caps_item);
+                               list_del(&cap->caps_item);
+
+                               BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                      mdsc->caps_reserve_count + mdsc->caps_avail_count);
+                       }
+                       spin_unlock(&mdsc->caps_list_lock);
                }
+
                return cap;
        }
 
@@ -341,6 +401,8 @@ void ceph_reservation_status(struct ceph_fs_client *fsc,
 {
        struct ceph_mds_client *mdsc = fsc->mdsc;
 
+       spin_lock(&mdsc->caps_list_lock);
+
        if (total)
                *total = mdsc->caps_total_count;
        if (avail)
@@ -351,6 +413,8 @@ void ceph_reservation_status(struct ceph_fs_client *fsc,
                *reserved = mdsc->caps_reserve_count;
        if (min)
                *min = mdsc->caps_min_count;
+
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
 /*
@@ -639,9 +703,11 @@ void ceph_add_cap(struct inode *inode,
                        }
 
                        spin_lock(&realm->inodes_with_caps_lock);
-                       ci->i_snap_realm = realm;
                        list_add(&ci->i_snap_realm_item,
                                 &realm->inodes_with_caps);
+                       ci->i_snap_realm = realm;
+                       if (realm->ino == ci->i_vino.ino)
+                               realm->inode = inode;
                        spin_unlock(&realm->inodes_with_caps_lock);
 
                        if (oldrealm)
index 644def8137547e103a67e653b9fe3832e0737eb8..abdf98deeec40244693cd731a2d6434d4ebffa09 100644 (file)
@@ -260,7 +260,7 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
                goto out;
 
        fsc->debugfs_mdsmap = debugfs_create_file("mdsmap",
-                                       0600,
+                                       0400,
                                        fsc->client->debugfs_dir,
                                        fsc,
                                        &mdsmap_show_fops);
@@ -268,7 +268,7 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
                goto out;
 
        fsc->debugfs_mds_sessions = debugfs_create_file("mds_sessions",
-                                       0600,
+                                       0400,
                                        fsc->client->debugfs_dir,
                                        fsc,
                                        &mds_sessions_show_fops);
@@ -276,7 +276,7 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
                goto out;
 
        fsc->debugfs_mdsc = debugfs_create_file("mdsc",
-                                               0600,
+                                               0400,
                                                fsc->client->debugfs_dir,
                                                fsc,
                                                &mdsc_show_fops);
@@ -292,7 +292,7 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
                goto out;
 
        fsc->debugfs_dentry_lru = debugfs_create_file("dentry_lru",
-                                       0600,
+                                       0400,
                                        fsc->client->debugfs_dir,
                                        fsc,
                                        &dentry_lru_show_fops);
index 2bdd561c4c68d38ef92b78eb4829afb88186b188..1a78dd6f8bf27655b03161de2617f7fc57f8f1b7 100644 (file)
@@ -101,18 +101,18 @@ static int fpos_cmp(loff_t l, loff_t r)
  * regardless of what dir changes take place on the
  * server.
  */
-static int note_last_dentry(struct ceph_file_info *fi, const char *name,
+static int note_last_dentry(struct ceph_dir_file_info *dfi, const char *name,
                            int len, unsigned next_offset)
 {
        char *buf = kmalloc(len+1, GFP_KERNEL);
        if (!buf)
                return -ENOMEM;
-       kfree(fi->last_name);
-       fi->last_name = buf;
-       memcpy(fi->last_name, name, len);
-       fi->last_name[len] = 0;
-       fi->next_offset = next_offset;
-       dout("note_last_dentry '%s'\n", fi->last_name);
+       kfree(dfi->last_name);
+       dfi->last_name = buf;
+       memcpy(dfi->last_name, name, len);
+       dfi->last_name[len] = 0;
+       dfi->next_offset = next_offset;
+       dout("note_last_dentry '%s'\n", dfi->last_name);
        return 0;
 }
 
@@ -174,7 +174,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx,
 static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                            int shared_gen)
 {
-       struct ceph_file_info *fi = file->private_data;
+       struct ceph_dir_file_info *dfi = file->private_data;
        struct dentry *parent = file->f_path.dentry;
        struct inode *dir = d_inode(parent);
        struct dentry *dentry, *last = NULL;
@@ -221,7 +221,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                bool emit_dentry = false;
                dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
                if (!dentry) {
-                       fi->flags |= CEPH_F_ATEND;
+                       dfi->file_info.flags |= CEPH_F_ATEND;
                        err = 0;
                        break;
                }
@@ -272,33 +272,33 @@ out:
        if (last) {
                int ret;
                di = ceph_dentry(last);
-               ret = note_last_dentry(fi, last->d_name.name, last->d_name.len,
+               ret = note_last_dentry(dfi, last->d_name.name, last->d_name.len,
                                       fpos_off(di->offset) + 1);
                if (ret < 0)
                        err = ret;
                dput(last);
                /* last_name no longer match cache index */
-               if (fi->readdir_cache_idx >= 0) {
-                       fi->readdir_cache_idx = -1;
-                       fi->dir_release_count = 0;
+               if (dfi->readdir_cache_idx >= 0) {
+                       dfi->readdir_cache_idx = -1;
+                       dfi->dir_release_count = 0;
                }
        }
        return err;
 }
 
-static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
+static bool need_send_readdir(struct ceph_dir_file_info *dfi, loff_t pos)
 {
-       if (!fi->last_readdir)
+       if (!dfi->last_readdir)
                return true;
        if (is_hash_order(pos))
-               return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
+               return !ceph_frag_contains_value(dfi->frag, fpos_hash(pos));
        else
-               return fi->frag != fpos_frag(pos);
+               return dfi->frag != fpos_frag(pos);
 }
 
 static int ceph_readdir(struct file *file, struct dir_context *ctx)
 {
-       struct ceph_file_info *fi = file->private_data;
+       struct ceph_dir_file_info *dfi = file->private_data;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
@@ -309,7 +309,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        struct ceph_mds_reply_info_parsed *rinfo;
 
        dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
-       if (fi->flags & CEPH_F_ATEND)
+       if (dfi->file_info.flags & CEPH_F_ATEND)
                return 0;
 
        /* always start with . and .. */
@@ -350,15 +350,15 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        /* proceed with a normal readdir */
 more:
        /* do we have the correct frag content buffered? */
-       if (need_send_readdir(fi, ctx->pos)) {
+       if (need_send_readdir(dfi, ctx->pos)) {
                struct ceph_mds_request *req;
                int op = ceph_snap(inode) == CEPH_SNAPDIR ?
                        CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 
                /* discard old result, if any */
-               if (fi->last_readdir) {
-                       ceph_mdsc_put_request(fi->last_readdir);
-                       fi->last_readdir = NULL;
+               if (dfi->last_readdir) {
+                       ceph_mdsc_put_request(dfi->last_readdir);
+                       dfi->last_readdir = NULL;
                }
 
                if (is_hash_order(ctx->pos)) {
@@ -372,7 +372,7 @@ more:
                }
 
                dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
-                    ceph_vinop(inode), frag, fi->last_name);
+                    ceph_vinop(inode), frag, dfi->last_name);
                req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
                if (IS_ERR(req))
                        return PTR_ERR(req);
@@ -388,8 +388,8 @@ more:
                        __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
                        req->r_inode_drop = CEPH_CAP_FILE_EXCL;
                }
-               if (fi->last_name) {
-                       req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
+               if (dfi->last_name) {
+                       req->r_path2 = kstrdup(dfi->last_name, GFP_KERNEL);
                        if (!req->r_path2) {
                                ceph_mdsc_put_request(req);
                                return -ENOMEM;
@@ -399,10 +399,10 @@ more:
                                cpu_to_le32(fpos_hash(ctx->pos));
                }
 
-               req->r_dir_release_cnt = fi->dir_release_count;
-               req->r_dir_ordered_cnt = fi->dir_ordered_count;
-               req->r_readdir_cache_idx = fi->readdir_cache_idx;
-               req->r_readdir_offset = fi->next_offset;
+               req->r_dir_release_cnt = dfi->dir_release_count;
+               req->r_dir_ordered_cnt = dfi->dir_ordered_count;
+               req->r_readdir_cache_idx = dfi->readdir_cache_idx;
+               req->r_readdir_offset = dfi->next_offset;
                req->r_args.readdir.frag = cpu_to_le32(frag);
                req->r_args.readdir.flags =
                                cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
@@ -426,35 +426,35 @@ more:
                if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                        frag = le32_to_cpu(rinfo->dir_dir->frag);
                        if (!rinfo->hash_order) {
-                               fi->next_offset = req->r_readdir_offset;
+                               dfi->next_offset = req->r_readdir_offset;
                                /* adjust ctx->pos to beginning of frag */
                                ctx->pos = ceph_make_fpos(frag,
-                                                         fi->next_offset,
+                                                         dfi->next_offset,
                                                          false);
                        }
                }
 
-               fi->frag = frag;
-               fi->last_readdir = req;
+               dfi->frag = frag;
+               dfi->last_readdir = req;
 
                if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) {
-                       fi->readdir_cache_idx = req->r_readdir_cache_idx;
-                       if (fi->readdir_cache_idx < 0) {
+                       dfi->readdir_cache_idx = req->r_readdir_cache_idx;
+                       if (dfi->readdir_cache_idx < 0) {
                                /* preclude from marking dir ordered */
-                               fi->dir_ordered_count = 0;
+                               dfi->dir_ordered_count = 0;
                        } else if (ceph_frag_is_leftmost(frag) &&
-                                  fi->next_offset == 2) {
+                                  dfi->next_offset == 2) {
                                /* note dir version at start of readdir so
                                 * we can tell if any dentries get dropped */
-                               fi->dir_release_count = req->r_dir_release_cnt;
-                               fi->dir_ordered_count = req->r_dir_ordered_cnt;
+                               dfi->dir_release_count = req->r_dir_release_cnt;
+                               dfi->dir_ordered_count = req->r_dir_ordered_cnt;
                        }
                } else {
-                       dout("readdir !did_prepopulate");
+                       dout("readdir !did_prepopulate\n");
                        /* disable readdir cache */
-                       fi->readdir_cache_idx = -1;
+                       dfi->readdir_cache_idx = -1;
                        /* preclude from marking dir complete */
-                       fi->dir_release_count = 0;
+                       dfi->dir_release_count = 0;
                }
 
                /* note next offset and last dentry name */
@@ -463,19 +463,19 @@ more:
                                        rinfo->dir_entries + (rinfo->dir_nr-1);
                        unsigned next_offset = req->r_reply_info.dir_end ?
                                        2 : (fpos_off(rde->offset) + 1);
-                       err = note_last_dentry(fi, rde->name, rde->name_len,
+                       err = note_last_dentry(dfi, rde->name, rde->name_len,
                                               next_offset);
                        if (err)
                                return err;
                } else if (req->r_reply_info.dir_end) {
-                       fi->next_offset = 2;
+                       dfi->next_offset = 2;
                        /* keep last name */
                }
        }
 
-       rinfo = &fi->last_readdir->r_reply_info;
+       rinfo = &dfi->last_readdir->r_reply_info;
        dout("readdir frag %x num %d pos %llx chunk first %llx\n",
-            fi->frag, rinfo->dir_nr, ctx->pos,
+            dfi->frag, rinfo->dir_nr, ctx->pos,
             rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
 
        i = 0;
@@ -519,52 +519,55 @@ more:
                ctx->pos++;
        }
 
-       ceph_mdsc_put_request(fi->last_readdir);
-       fi->last_readdir = NULL;
+       ceph_mdsc_put_request(dfi->last_readdir);
+       dfi->last_readdir = NULL;
 
-       if (fi->next_offset > 2) {
-               frag = fi->frag;
+       if (dfi->next_offset > 2) {
+               frag = dfi->frag;
                goto more;
        }
 
        /* more frags? */
-       if (!ceph_frag_is_rightmost(fi->frag)) {
-               frag = ceph_frag_next(fi->frag);
+       if (!ceph_frag_is_rightmost(dfi->frag)) {
+               frag = ceph_frag_next(dfi->frag);
                if (is_hash_order(ctx->pos)) {
                        loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
-                                                       fi->next_offset, true);
+                                                       dfi->next_offset, true);
                        if (new_pos > ctx->pos)
                                ctx->pos = new_pos;
                        /* keep last_name */
                } else {
-                       ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
-                       kfree(fi->last_name);
-                       fi->last_name = NULL;
+                       ctx->pos = ceph_make_fpos(frag, dfi->next_offset,
+                                                       false);
+                       kfree(dfi->last_name);
+                       dfi->last_name = NULL;
                }
                dout("readdir next frag is %x\n", frag);
                goto more;
        }
-       fi->flags |= CEPH_F_ATEND;
+       dfi->file_info.flags |= CEPH_F_ATEND;
 
        /*
         * if dir_release_count still matches the dir, no dentries
         * were released during the whole readdir, and we should have
         * the complete dir contents in our cache.
         */
-       if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) {
+       if (atomic64_read(&ci->i_release_count) ==
+                                       dfi->dir_release_count) {
                spin_lock(&ci->i_ceph_lock);
-               if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) {
+               if (dfi->dir_ordered_count ==
+                               atomic64_read(&ci->i_ordered_count)) {
                        dout(" marking %p complete and ordered\n", inode);
                        /* use i_size to track number of entries in
                         * readdir cache */
-                       BUG_ON(fi->readdir_cache_idx < 0);
-                       i_size_write(inode, fi->readdir_cache_idx *
+                       BUG_ON(dfi->readdir_cache_idx < 0);
+                       i_size_write(inode, dfi->readdir_cache_idx *
                                     sizeof(struct dentry*));
                } else {
                        dout(" marking %p complete\n", inode);
                }
-               __ceph_dir_set_complete(ci, fi->dir_release_count,
-                                       fi->dir_ordered_count);
+               __ceph_dir_set_complete(ci, dfi->dir_release_count,
+                                       dfi->dir_ordered_count);
                spin_unlock(&ci->i_ceph_lock);
        }
 
@@ -572,25 +575,25 @@ more:
        return 0;
 }
 
-static void reset_readdir(struct ceph_file_info *fi)
+static void reset_readdir(struct ceph_dir_file_info *dfi)
 {
-       if (fi->last_readdir) {
-               ceph_mdsc_put_request(fi->last_readdir);
-               fi->last_readdir = NULL;
+       if (dfi->last_readdir) {
+               ceph_mdsc_put_request(dfi->last_readdir);
+               dfi->last_readdir = NULL;
        }
-       kfree(fi->last_name);
-       fi->last_name = NULL;
-       fi->dir_release_count = 0;
-       fi->readdir_cache_idx = -1;
-       fi->next_offset = 2;  /* compensate for . and .. */
-       fi->flags &= ~CEPH_F_ATEND;
+       kfree(dfi->last_name);
+       dfi->last_name = NULL;
+       dfi->dir_release_count = 0;
+       dfi->readdir_cache_idx = -1;
+       dfi->next_offset = 2;  /* compensate for . and .. */
+       dfi->file_info.flags &= ~CEPH_F_ATEND;
 }
 
 /*
  * discard buffered readdir content on seekdir(0), or seek to new frag,
  * or seek prior to current chunk
  */
-static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
+static bool need_reset_readdir(struct ceph_dir_file_info *dfi, loff_t new_pos)
 {
        struct ceph_mds_reply_info_parsed *rinfo;
        loff_t chunk_offset;
@@ -599,10 +602,10 @@ static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
        if (is_hash_order(new_pos)) {
                /* no need to reset last_name for a forward seek when
                 * dentries are sotred in hash order */
-       } else if (fi->frag != fpos_frag(new_pos)) {
+       } else if (dfi->frag != fpos_frag(new_pos)) {
                return true;
        }
-       rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
+       rinfo = dfi->last_readdir ? &dfi->last_readdir->r_reply_info : NULL;
        if (!rinfo || !rinfo->dir_nr)
                return true;
        chunk_offset = rinfo->dir_entries[0].offset;
@@ -612,7 +615,7 @@ static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
 
 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 {
-       struct ceph_file_info *fi = file->private_data;
+       struct ceph_dir_file_info *dfi = file->private_data;
        struct inode *inode = file->f_mapping->host;
        loff_t retval;
 
@@ -630,20 +633,20 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
        }
 
        if (offset >= 0) {
-               if (need_reset_readdir(fi, offset)) {
+               if (need_reset_readdir(dfi, offset)) {
                        dout("dir_llseek dropping %p content\n", file);
-                       reset_readdir(fi);
+                       reset_readdir(dfi);
                } else if (is_hash_order(offset) && offset > file->f_pos) {
                        /* for hash offset, we don't know if a forward seek
                         * is within same frag */
-                       fi->dir_release_count = 0;
-                       fi->readdir_cache_idx = -1;
+                       dfi->dir_release_count = 0;
+                       dfi->readdir_cache_idx = -1;
                }
 
                if (offset != file->f_pos) {
                        file->f_pos = offset;
                        file->f_version = 0;
-                       fi->flags &= ~CEPH_F_ATEND;
+                       dfi->file_info.flags &= ~CEPH_F_ATEND;
                }
                retval = offset;
        }
@@ -824,6 +827,9 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
+       if (ceph_quota_is_max_files_exceeded(dir))
+               return -EDQUOT;
+
        err = ceph_pre_init_acls(dir, &mode, &acls);
        if (err < 0)
                return err;
@@ -877,6 +883,9 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
+       if (ceph_quota_is_max_files_exceeded(dir))
+               return -EDQUOT;
+
        dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
        if (IS_ERR(req)) {
@@ -926,6 +935,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
                goto out;
        }
 
+       if (op == CEPH_MDS_OP_MKDIR &&
+           ceph_quota_is_max_files_exceeded(dir)) {
+               err = -EDQUOT;
+               goto out;
+       }
+
        mode |= S_IFDIR;
        err = ceph_pre_init_acls(dir, &mode, &acls);
        if (err < 0)
@@ -1065,6 +1080,11 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
                else
                        return -EROFS;
        }
+       /* don't allow cross-quota renames */
+       if ((old_dir != new_dir) &&
+           (!ceph_quota_is_same_realm(old_dir, new_dir)))
+               return -EXDEV;
+
        dout("rename dir %p dentry %p to dir %p dentry %p\n",
             old_dir, old_dentry, new_dir, new_dentry);
        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@@ -1351,7 +1371,7 @@ static void ceph_d_prune(struct dentry *dentry)
 static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
                             loff_t *ppos)
 {
-       struct ceph_file_info *cf = file->private_data;
+       struct ceph_dir_file_info *dfi = file->private_data;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        int left;
@@ -1360,12 +1380,12 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
        if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT))
                return -EISDIR;
 
-       if (!cf->dir_info) {
-               cf->dir_info = kmalloc(bufsize, GFP_KERNEL);
-               if (!cf->dir_info)
+       if (!dfi->dir_info) {
+               dfi->dir_info = kmalloc(bufsize, GFP_KERNEL);
+               if (!dfi->dir_info)
                        return -ENOMEM;
-               cf->dir_info_len =
-                       snprintf(cf->dir_info, bufsize,
+               dfi->dir_info_len =
+                       snprintf(dfi->dir_info, bufsize,
                                "entries:   %20lld\n"
                                " files:    %20lld\n"
                                " subdirs:  %20lld\n"
@@ -1385,10 +1405,10 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
                                (long)ci->i_rctime.tv_nsec);
        }
 
-       if (*ppos >= cf->dir_info_len)
+       if (*ppos >= dfi->dir_info_len)
                return 0;
-       size = min_t(unsigned, size, cf->dir_info_len-*ppos);
-       left = copy_to_user(buf, cf->dir_info + *ppos, size);
+       size = min_t(unsigned, size, dfi->dir_info_len-*ppos);
+       left = copy_to_user(buf, dfi->dir_info + *ppos, size);
        if (left == size)
                return -EFAULT;
        *ppos += (size - left);
index b67eec3532a125c0c8ca0c97b7b0f22c650f7966..f85040d73e3dcaa0214196ca0c242eadd0db475d 100644 (file)
@@ -30,6 +30,8 @@ static __le32 ceph_flags_sys2wire(u32 flags)
                break;
        }
 
+       flags &= ~O_ACCMODE;
+
 #define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; }
 
        ceph_sys2wire(O_CREAT);
@@ -41,7 +43,7 @@ static __le32 ceph_flags_sys2wire(u32 flags)
 #undef ceph_sys2wire
 
        if (flags)
-               dout("unused open flags: %x", flags);
+               dout("unused open flags: %x\n", flags);
 
        return cpu_to_le32(wire_flags);
 }
@@ -159,13 +161,50 @@ out:
        return req;
 }
 
+static int ceph_init_file_info(struct inode *inode, struct file *file,
+                                       int fmode, bool isdir)
+{
+       struct ceph_file_info *fi;
+
+       dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
+                       inode->i_mode, isdir ? "dir" : "regular");
+       BUG_ON(inode->i_fop->release != ceph_release);
+
+       if (isdir) {
+               struct ceph_dir_file_info *dfi =
+                       kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
+               if (!dfi) {
+                       ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+                       return -ENOMEM;
+               }
+
+               file->private_data = dfi;
+               fi = &dfi->file_info;
+               dfi->next_offset = 2;
+               dfi->readdir_cache_idx = -1;
+       } else {
+               fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
+               if (!fi) {
+                       ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+                       return -ENOMEM;
+               }
+
+               file->private_data = fi;
+       }
+
+       fi->fmode = fmode;
+       spin_lock_init(&fi->rw_contexts_lock);
+       INIT_LIST_HEAD(&fi->rw_contexts);
+
+       return 0;
+}
+
 /*
  * initialize private struct file data.
  * if we fail, clean up by dropping fmode reference on the ceph_inode
  */
 static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 {
-       struct ceph_file_info *cf;
        int ret = 0;
 
        switch (inode->i_mode & S_IFMT) {
@@ -173,22 +212,10 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
                ceph_fscache_register_inode_cookie(inode);
                ceph_fscache_file_set_cookie(inode, file);
        case S_IFDIR:
-               dout("init_file %p %p 0%o (regular)\n", inode, file,
-                    inode->i_mode);
-               cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
-               if (!cf) {
-                       ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
-                       return -ENOMEM;
-               }
-               cf->fmode = fmode;
-
-               spin_lock_init(&cf->rw_contexts_lock);
-               INIT_LIST_HEAD(&cf->rw_contexts);
-
-               cf->next_offset = 2;
-               cf->readdir_cache_idx = -1;
-               file->private_data = cf;
-               BUG_ON(inode->i_fop->release != ceph_release);
+               ret = ceph_init_file_info(inode, file, fmode,
+                                               S_ISDIR(inode->i_mode));
+               if (ret)
+                       return ret;
                break;
 
        case S_IFLNK:
@@ -278,11 +305,11 @@ int ceph_open(struct inode *inode, struct file *file)
        struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
-       struct ceph_file_info *cf = file->private_data;
+       struct ceph_file_info *fi = file->private_data;
        int err;
        int flags, fmode, wanted;
 
-       if (cf) {
+       if (fi) {
                dout("open file %p is already opened\n", file);
                return 0;
        }
@@ -375,7 +402,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        struct ceph_mds_request *req;
        struct dentry *dn;
        struct ceph_acls_info acls = {};
-       int mask;
+       int mask;
        int err;
 
        dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
@@ -386,6 +413,8 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
                return -ENAMETOOLONG;
 
        if (flags & O_CREAT) {
+               if (ceph_quota_is_max_files_exceeded(dir))
+                       return -EDQUOT;
                err = ceph_pre_init_acls(dir, &mode, &acls);
                if (err < 0)
                        return err;
@@ -460,16 +489,27 @@ out_acl:
 int ceph_release(struct inode *inode, struct file *file)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_file_info *cf = file->private_data;
 
-       dout("release inode %p file %p\n", inode, file);
-       ceph_put_fmode(ci, cf->fmode);
-       if (cf->last_readdir)
-               ceph_mdsc_put_request(cf->last_readdir);
-       kfree(cf->last_name);
-       kfree(cf->dir_info);
-       WARN_ON(!list_empty(&cf->rw_contexts));
-       kmem_cache_free(ceph_file_cachep, cf);
+       if (S_ISDIR(inode->i_mode)) {
+               struct ceph_dir_file_info *dfi = file->private_data;
+               dout("release inode %p dir file %p\n", inode, file);
+               WARN_ON(!list_empty(&dfi->file_info.rw_contexts));
+
+               ceph_put_fmode(ci, dfi->file_info.fmode);
+
+               if (dfi->last_readdir)
+                       ceph_mdsc_put_request(dfi->last_readdir);
+               kfree(dfi->last_name);
+               kfree(dfi->dir_info);
+               kmem_cache_free(ceph_dir_file_cachep, dfi);
+       } else {
+               struct ceph_file_info *fi = file->private_data;
+               dout("release inode %p regular file %p\n", inode, file);
+               WARN_ON(!list_empty(&fi->rw_contexts));
+
+               ceph_put_fmode(ci, fi->fmode);
+               kmem_cache_free(ceph_file_cachep, fi);
+       }
 
        /* wake up anyone waiting for caps on this inode */
        wake_up_all(&ci->i_cap_wq);
@@ -1338,6 +1378,11 @@ retry_snap:
 
        pos = iocb->ki_pos;
        count = iov_iter_count(from);
+       if (ceph_quota_is_max_bytes_exceeded(inode, pos + count)) {
+               err = -EDQUOT;
+               goto out;
+       }
+
        err = file_remove_privs(file);
        if (err)
                goto out;
@@ -1419,6 +1464,7 @@ retry_snap:
 
        if (written >= 0) {
                int dirty;
+
                spin_lock(&ci->i_ceph_lock);
                ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
@@ -1426,6 +1472,8 @@ retry_snap:
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
                        __mark_inode_dirty(inode, dirty);
+               if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos))
+                       ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
        }
 
        dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
@@ -1668,6 +1716,12 @@ static long ceph_fallocate(struct file *file, int mode,
                goto unlock;
        }
 
+       if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) &&
+           ceph_quota_is_max_bytes_exceeded(inode, offset + length)) {
+               ret = -EDQUOT;
+               goto unlock;
+       }
+
        if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) &&
            !(mode & FALLOC_FL_PUNCH_HOLE)) {
                ret = -ENOSPC;
@@ -1716,6 +1770,9 @@ static long ceph_fallocate(struct file *file, int mode,
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
                        __mark_inode_dirty(inode, dirty);
+               if ((endoff > size) &&
+                   ceph_quota_is_max_bytes_approaching(inode, endoff))
+                       ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
        }
 
        ceph_put_cap_refs(ci, got);
index c6ec5aa461002796325e84796199dd0ed2dd0a67..8bf60250309e359ffd6a34f89c75991b3a1c3c48 100644 (file)
@@ -441,6 +441,9 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        atomic64_set(&ci->i_complete_seq[1], 0);
        ci->i_symlink = NULL;
 
+       ci->i_max_bytes = 0;
+       ci->i_max_files = 0;
+
        memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
        RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
 
@@ -536,6 +539,9 @@ void ceph_destroy_inode(struct inode *inode)
 
        ceph_queue_caps_release(inode);
 
+       if (__ceph_has_any_quota(ci))
+               ceph_adjust_quota_realms_count(inode, false);
+
        /*
         * we may still have a snap_realm reference if there are stray
         * caps in i_snap_caps.
@@ -548,6 +554,9 @@ void ceph_destroy_inode(struct inode *inode)
                dout(" dropping residual ref to snap realm %p\n", realm);
                spin_lock(&realm->inodes_with_caps_lock);
                list_del_init(&ci->i_snap_realm_item);
+               ci->i_snap_realm = NULL;
+               if (realm->ino == ci->i_vino.ino)
+                       realm->inode = NULL;
                spin_unlock(&realm->inodes_with_caps_lock);
                ceph_put_snap_realm(mdsc, realm);
        }
@@ -790,6 +799,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
        inode->i_rdev = le32_to_cpu(info->rdev);
        inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 
+       __ceph_update_quota(ci, iinfo->max_bytes, iinfo->max_files);
+
        if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
            (issued & CEPH_CAP_AUTH_EXCL) == 0) {
                inode->i_mode = le32_to_cpu(info->mode);
@@ -1867,20 +1878,9 @@ retry:
         * possibly truncate them.. so write AND block!
         */
        if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {
-               struct ceph_cap_snap *capsnap;
-               to = ci->i_truncate_size;
-               list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
-                       // MDS should have revoked Frw caps
-                       WARN_ON_ONCE(capsnap->writing);
-                       if (capsnap->dirty_pages && capsnap->size > to)
-                               to = capsnap->size;
-               }
                spin_unlock(&ci->i_ceph_lock);
                dout("__do_pending_vmtruncate %p flushing snaps first\n",
                     inode);
-
-               truncate_pagecache(inode, to);
-
                filemap_write_and_wait_range(&inode->i_data, 0,
                                             inode->i_sb->s_maxbytes);
                goto retry;
@@ -2152,6 +2152,10 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        if (err != 0)
                return err;
 
+       if ((attr->ia_valid & ATTR_SIZE) &&
+           ceph_quota_is_max_bytes_exceeded(inode, attr->ia_size))
+               return -EDQUOT;
+
        err = __ceph_setattr(inode, attr);
 
        if (err >= 0 && (attr->ia_valid & ATTR_MODE))
index 851aa69ec8f0dae6ec4837b2dc3807c4420cc751..c90f03beb15d0b70ef0455015cf2343746227574 100644 (file)
@@ -5,7 +5,7 @@
 #include "super.h"
 #include "mds_client.h"
 #include "ioctl.h"
-
+#include <linux/ceph/striper.h>
 
 /*
  * ioctls
@@ -185,7 +185,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
                &ceph_sb_to_client(inode->i_sb)->client->osdc;
        struct ceph_object_locator oloc;
        CEPH_DEFINE_OID_ONSTACK(oid);
-       u64 len = 1, olen;
+       u32 xlen;
        u64 tmp;
        struct ceph_pg pgid;
        int r;
@@ -195,13 +195,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
                return -EFAULT;
 
        down_read(&osdc->lock);
-       r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
-                                         &dl.object_no, &dl.object_offset,
-                                         &olen);
-       if (r < 0) {
-               up_read(&osdc->lock);
-               return -EIO;
-       }
+       ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, 1,
+                                     &dl.object_no, &dl.object_offset, &xlen);
        dl.file_offset -= dl.object_offset;
        dl.object_size = ci->i_layout.object_size;
        dl.block_size = ci->i_layout.stripe_unit;
index 9e66f69ee8a5ecc9e8455465f232bf70529e59a8..9dae2ec7e1fa89705f649b1c3349520b8bb23543 100644 (file)
@@ -95,7 +95,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
        owner = secure_addr(fl->fl_owner);
 
        dout("ceph_lock_message: rule: %d, op: %d, owner: %llx, pid: %llu, "
-            "start: %llu, length: %llu, wait: %d, type: %d", (int)lock_type,
+            "start: %llu, length: %llu, wait: %d, type: %d\n", (int)lock_type,
             (int)operation, owner, (u64)fl->fl_pid, fl->fl_start, length,
             wait, fl->fl_type);
 
@@ -132,7 +132,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
        }
        ceph_mdsc_put_request(req);
        dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
-            "length: %llu, wait: %d, type: %d, err code %d", (int)lock_type,
+            "length: %llu, wait: %d, type: %d, err code %d\n", (int)lock_type,
             (int)operation, (u64)fl->fl_pid, fl->fl_start,
             length, wait, fl->fl_type, err);
        return err;
@@ -226,7 +226,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
        if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK)
                return -ENOLCK;
 
-       dout("ceph_lock, fl_owner: %p", fl->fl_owner);
+       dout("ceph_lock, fl_owner: %p\n", fl->fl_owner);
 
        /* set wait bit as appropriate, then make command as Ceph expects it*/
        if (IS_GETLK(cmd))
@@ -264,7 +264,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
        err = ceph_lock_message(CEPH_LOCK_FCNTL, op, inode, lock_cmd, wait, fl);
        if (!err) {
                if (op == CEPH_MDS_OP_SETFILELOCK) {
-                       dout("mds locked, locking locally");
+                       dout("mds locked, locking locally\n");
                        err = posix_lock_file(file, fl, NULL);
                        if (err) {
                                /* undo! This should only happen if
@@ -272,7 +272,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
                                 * deadlock. */
                                ceph_lock_message(CEPH_LOCK_FCNTL, op, inode,
                                                  CEPH_LOCK_UNLOCK, 0, fl);
-                               dout("got %d on posix_lock_file, undid lock",
+                               dout("got %d on posix_lock_file, undid lock\n",
                                     err);
                        }
                }
@@ -294,7 +294,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
        if (fl->fl_type & LOCK_MAND)
                return -EOPNOTSUPP;
 
-       dout("ceph_flock, fl_file: %p", fl->fl_file);
+       dout("ceph_flock, fl_file: %p\n", fl->fl_file);
 
        spin_lock(&ci->i_ceph_lock);
        if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) {
@@ -329,7 +329,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
                        ceph_lock_message(CEPH_LOCK_FLOCK,
                                          CEPH_MDS_OP_SETFILELOCK,
                                          inode, CEPH_LOCK_UNLOCK, 0, fl);
-                       dout("got %d on locks_lock_file_wait, undid lock", err);
+                       dout("got %d on locks_lock_file_wait, undid lock\n", err);
                }
        }
        return err;
@@ -356,7 +356,7 @@ void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count)
                        ++(*flock_count);
                spin_unlock(&ctx->flc_lock);
        }
-       dout("counted %d flock locks and %d fcntl locks",
+       dout("counted %d flock locks and %d fcntl locks\n",
             *flock_count, *fcntl_count);
 }
 
@@ -384,7 +384,7 @@ static int lock_to_ceph_filelock(struct file_lock *lock,
                cephlock->type = CEPH_LOCK_UNLOCK;
                break;
        default:
-               dout("Have unknown lock type %d", lock->fl_type);
+               dout("Have unknown lock type %d\n", lock->fl_type);
                err = -EINVAL;
        }
 
@@ -407,7 +407,7 @@ int ceph_encode_locks_to_buffer(struct inode *inode,
        int seen_flock = 0;
        int l = 0;
 
-       dout("encoding %d flock and %d fcntl locks", num_flock_locks,
+       dout("encoding %d flock and %d fcntl locks\n", num_flock_locks,
             num_fcntl_locks);
 
        if (!ctx)
index 2e8f90f96540291ab412703133d2a4278edbac71..5ece2e6ad1548e0893ee9734132a64bb66ffc89e 100644 (file)
@@ -100,6 +100,26 @@ static int parse_reply_info_in(void **p, void *end,
        } else
                info->inline_version = CEPH_INLINE_NONE;
 
+       if (features & CEPH_FEATURE_MDS_QUOTA) {
+               u8 struct_v, struct_compat;
+               u32 struct_len;
+
+               /*
+                * both struct_v and struct_compat are expected to be >= 1
+                */
+               ceph_decode_8_safe(p, end, struct_v, bad);
+               ceph_decode_8_safe(p, end, struct_compat, bad);
+               if (!struct_v || !struct_compat)
+                       goto bad;
+               ceph_decode_32_safe(p, end, struct_len, bad);
+               ceph_decode_need(p, end, struct_len, bad);
+               ceph_decode_64_safe(p, end, info->max_bytes, bad);
+               ceph_decode_64_safe(p, end, info->max_files, bad);
+       } else {
+               info->max_bytes = 0;
+               info->max_files = 0;
+       }
+
        info->pool_ns_len = 0;
        info->pool_ns_data = NULL;
        if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
@@ -384,7 +404,7 @@ static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
                     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
                return s;
        } else {
-               dout("mdsc get_session %p 0 -- FAIL", s);
+               dout("mdsc get_session %p 0 -- FAIL\n", s);
                return NULL;
        }
 }
@@ -419,9 +439,10 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 
 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 {
-       if (mds >= mdsc->max_sessions)
+       if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
                return false;
-       return mdsc->sessions[mds];
+       else
+               return true;
 }
 
 static int __verify_registered_session(struct ceph_mds_client *mdsc,
@@ -448,6 +469,25 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s = kzalloc(sizeof(*s), GFP_NOFS);
        if (!s)
                return ERR_PTR(-ENOMEM);
+
+       if (mds >= mdsc->max_sessions) {
+               int newmax = 1 << get_count_order(mds + 1);
+               struct ceph_mds_session **sa;
+
+               dout("%s: realloc to %d\n", __func__, newmax);
+               sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
+               if (!sa)
+                       goto fail_realloc;
+               if (mdsc->sessions) {
+                       memcpy(sa, mdsc->sessions,
+                              mdsc->max_sessions * sizeof(void *));
+                       kfree(mdsc->sessions);
+               }
+               mdsc->sessions = sa;
+               mdsc->max_sessions = newmax;
+       }
+
+       dout("%s: mds%d\n", __func__, mds);
        s->s_mdsc = mdsc;
        s->s_mds = mds;
        s->s_state = CEPH_MDS_SESSION_NEW;
@@ -476,23 +516,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_LIST_HEAD(&s->s_cap_flushing);
 
-       dout("register_session mds%d\n", mds);
-       if (mds >= mdsc->max_sessions) {
-               int newmax = 1 << get_count_order(mds+1);
-               struct ceph_mds_session **sa;
-
-               dout("register_session realloc to %d\n", newmax);
-               sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
-               if (!sa)
-                       goto fail_realloc;
-               if (mdsc->sessions) {
-                       memcpy(sa, mdsc->sessions,
-                              mdsc->max_sessions * sizeof(void *));
-                       kfree(mdsc->sessions);
-               }
-               mdsc->sessions = sa;
-               mdsc->max_sessions = newmax;
-       }
        mdsc->sessions[mds] = s;
        atomic_inc(&mdsc->num_sessions);
        refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
@@ -2531,10 +2554,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
         * Otherwise we just have to return an ESTALE
         */
        if (result == -ESTALE) {
-               dout("got ESTALE on request %llu", req->r_tid);
+               dout("got ESTALE on request %llu\n", req->r_tid);
                req->r_resend_mds = -1;
                if (req->r_direct_mode != USE_AUTH_MDS) {
-                       dout("not using auth, setting for that now");
+                       dout("not using auth, setting for that now\n");
                        req->r_direct_mode = USE_AUTH_MDS;
                        __do_request(mdsc, req);
                        mutex_unlock(&mdsc->mutex);
@@ -2542,13 +2565,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                } else  {
                        int mds = __choose_mds(mdsc, req);
                        if (mds >= 0 && mds != req->r_session->s_mds) {
-                               dout("but auth changed, so resending");
+                               dout("but auth changed, so resending\n");
                                __do_request(mdsc, req);
                                mutex_unlock(&mdsc->mutex);
                                goto out;
                        }
                }
-               dout("have to return ESTALE on request %llu", req->r_tid);
+               dout("have to return ESTALE on request %llu\n", req->r_tid);
        }
 
 
@@ -3470,13 +3493,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
 }
 
 /*
- * drop all leases (and dentry refs) in preparation for umount
+ * lock unlock sessions, to wait ongoing session activities
  */
-static void drop_leases(struct ceph_mds_client *mdsc)
+static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
 {
        int i;
 
-       dout("drop_leases\n");
        mutex_lock(&mdsc->mutex);
        for (i = 0; i < mdsc->max_sessions; i++) {
                struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
@@ -3572,7 +3594,6 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        if (!mdsc)
                return -ENOMEM;
        mdsc->fsc = fsc;
-       fsc->mdsc = mdsc;
        mutex_init(&mdsc->mutex);
        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
        if (!mdsc->mdsmap) {
@@ -3580,6 +3601,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
                return -ENOMEM;
        }
 
+       fsc->mdsc = mdsc;
        init_completion(&mdsc->safe_umount_waiters);
        init_waitqueue_head(&mdsc->session_close_wq);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -3587,6 +3609,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        atomic_set(&mdsc->num_sessions, 0);
        mdsc->max_sessions = 0;
        mdsc->stopping = 0;
+       atomic64_set(&mdsc->quotarealms_count, 0);
        mdsc->last_snap_seq = 0;
        init_rwsem(&mdsc->snap_rwsem);
        mdsc->snap_realms = RB_ROOT;
@@ -3660,7 +3683,7 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
        dout("pre_umount\n");
        mdsc->stopping = 1;
 
-       drop_leases(mdsc);
+       lock_unlock_sessions(mdsc);
        ceph_flush_dirty_caps(mdsc);
        wait_requests(mdsc);
 
@@ -3858,6 +3881,9 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
        struct ceph_mds_client *mdsc = fsc->mdsc;
        dout("mdsc_destroy %p\n", mdsc);
 
+       if (!mdsc)
+               return;
+
        /* flush out any connection work with references to us */
        ceph_msgr_flush();
 
@@ -4077,6 +4103,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        case CEPH_MSG_CLIENT_LEASE:
                handle_lease(mdsc, s, msg);
                break;
+       case CEPH_MSG_CLIENT_QUOTA:
+               ceph_handle_quota(mdsc, s, msg);
+               break;
 
        default:
                pr_err("received unknown message type %d %s\n", type,
index 71e3b783ee6fae5a64ae0a4a3eb1471c9fd9ed44..2ec3b5b350671a1fa1e574ae67d6a67567762dd8 100644 (file)
@@ -49,6 +49,8 @@ struct ceph_mds_reply_info_in {
        char *inline_data;
        u32 pool_ns_len;
        char *pool_ns_data;
+       u64 max_bytes;
+       u64 max_files;
 };
 
 struct ceph_mds_reply_dir_entry {
@@ -312,6 +314,8 @@ struct ceph_mds_client {
        int                     max_sessions;  /* len of s_mds_sessions */
        int                     stopping;      /* true if shutting down */
 
+       atomic64_t              quotarealms_count; /* # realms with quota */
+
        /*
         * snap_rwsem will cover cap linkage into snaprealms, and
         * realm snap contexts.  (later, we can do per-realm snap
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
new file mode 100644 (file)
index 0000000..242bfa5
--- /dev/null
@@ -0,0 +1,361 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * quota.c - CephFS quota
+ *
+ * Copyright (C) 2017-2018 SUSE
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <linux/statfs.h>
+
+#include "super.h"
+#include "mds_client.h"
+
+void ceph_adjust_quota_realms_count(struct inode *inode, bool inc)
+{
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+       if (inc)
+               atomic64_inc(&mdsc->quotarealms_count);
+       else
+               atomic64_dec(&mdsc->quotarealms_count);
+}
+
+static inline bool ceph_has_realms_with_quotas(struct inode *inode)
+{
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+       return atomic64_read(&mdsc->quotarealms_count) > 0;
+}
+
+void ceph_handle_quota(struct ceph_mds_client *mdsc,
+                      struct ceph_mds_session *session,
+                      struct ceph_msg *msg)
+{
+       struct super_block *sb = mdsc->fsc->sb;
+       struct ceph_mds_quota *h = msg->front.iov_base;
+       struct ceph_vino vino;
+       struct inode *inode;
+       struct ceph_inode_info *ci;
+
+       if (msg->front.iov_len != sizeof(*h)) {
+               pr_err("%s corrupt message mds%d len %d\n", __func__,
+                      session->s_mds, (int)msg->front.iov_len);
+               ceph_msg_dump(msg);
+               return;
+       }
+
+       /* increment msg sequence number */
+       mutex_lock(&session->s_mutex);
+       session->s_seq++;
+       mutex_unlock(&session->s_mutex);
+
+       /* lookup inode */
+       vino.ino = le64_to_cpu(h->ino);
+       vino.snap = CEPH_NOSNAP;
+       inode = ceph_find_inode(sb, vino);
+       if (!inode) {
+               pr_warn("Failed to find inode %llu\n", vino.ino);
+               return;
+       }
+       ci = ceph_inode(inode);
+
+       spin_lock(&ci->i_ceph_lock);
+       ci->i_rbytes = le64_to_cpu(h->rbytes);
+       ci->i_rfiles = le64_to_cpu(h->rfiles);
+       ci->i_rsubdirs = le64_to_cpu(h->rsubdirs);
+       __ceph_update_quota(ci, le64_to_cpu(h->max_bytes),
+                           le64_to_cpu(h->max_files));
+       spin_unlock(&ci->i_ceph_lock);
+
+       iput(inode);
+}
+
+/*
+ * This function walks through the snaprealm for an inode and returns the
+ * ceph_snap_realm for the first snaprealm that has quotas set (either max_files
+ * or max_bytes).  If the root is reached, return the root ceph_snap_realm
+ * instead.
+ *
+ * Note that the caller is responsible for calling ceph_put_snap_realm() on the
+ * returned realm.
+ */
+static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
+                                              struct inode *inode)
+{
+       struct ceph_inode_info *ci = NULL;
+       struct ceph_snap_realm *realm, *next;
+       struct inode *in;
+       bool has_quota;
+
+       if (ceph_snap(inode) != CEPH_NOSNAP)
+               return NULL;
+
+       realm = ceph_inode(inode)->i_snap_realm;
+       if (realm)
+               ceph_get_snap_realm(mdsc, realm);
+       else
+               pr_err_ratelimited("get_quota_realm: ino (%llx.%llx) "
+                                  "null i_snap_realm\n", ceph_vinop(inode));
+       while (realm) {
+               spin_lock(&realm->inodes_with_caps_lock);
+               in = realm->inode ? igrab(realm->inode) : NULL;
+               spin_unlock(&realm->inodes_with_caps_lock);
+               if (!in)
+                       break;
+
+               ci = ceph_inode(in);
+               has_quota = __ceph_has_any_quota(ci);
+               iput(in);
+
+               next = realm->parent;
+               if (has_quota || !next)
+                      return realm;
+
+               ceph_get_snap_realm(mdsc, next);
+               ceph_put_snap_realm(mdsc, realm);
+               realm = next;
+       }
+       if (realm)
+               ceph_put_snap_realm(mdsc, realm);
+
+       return NULL;
+}
+
+bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
+{
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(old)->mdsc;
+       struct ceph_snap_realm *old_realm, *new_realm;
+       bool is_same;
+
+       down_read(&mdsc->snap_rwsem);
+       old_realm = get_quota_realm(mdsc, old);
+       new_realm = get_quota_realm(mdsc, new);
+       is_same = (old_realm == new_realm);
+       up_read(&mdsc->snap_rwsem);
+
+       if (old_realm)
+               ceph_put_snap_realm(mdsc, old_realm);
+       if (new_realm)
+               ceph_put_snap_realm(mdsc, new_realm);
+
+       return is_same;
+}
+
+enum quota_check_op {
+       QUOTA_CHECK_MAX_FILES_OP,       /* check quota max_files limit */
+       QUOTA_CHECK_MAX_BYTES_OP,       /* check quota max_files limit */
+       QUOTA_CHECK_MAX_BYTES_APPROACHING_OP    /* check if quota max_files
+                                                  limit is approaching */
+};
+
+/*
+ * check_quota_exceeded() will walk up the snaprealm hierarchy and, for each
+ * realm, it will execute quota check operation defined by the 'op' parameter.
+ * The snaprealm walk is interrupted if the quota check detects that the quota
+ * is exceeded or if the root inode is reached.
+ */
+static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op,
+                                loff_t delta)
+{
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+       struct ceph_inode_info *ci;
+       struct ceph_snap_realm *realm, *next;
+       struct inode *in;
+       u64 max, rvalue;
+       bool exceeded = false;
+
+       if (ceph_snap(inode) != CEPH_NOSNAP)
+               return false;
+
+       down_read(&mdsc->snap_rwsem);
+       realm = ceph_inode(inode)->i_snap_realm;
+       if (realm)
+               ceph_get_snap_realm(mdsc, realm);
+       else
+               pr_err_ratelimited("check_quota_exceeded: ino (%llx.%llx) "
+                                  "null i_snap_realm\n", ceph_vinop(inode));
+       while (realm) {
+               spin_lock(&realm->inodes_with_caps_lock);
+               in = realm->inode ? igrab(realm->inode) : NULL;
+               spin_unlock(&realm->inodes_with_caps_lock);
+               if (!in)
+                       break;
+
+               ci = ceph_inode(in);
+               spin_lock(&ci->i_ceph_lock);
+               if (op == QUOTA_CHECK_MAX_FILES_OP) {
+                       max = ci->i_max_files;
+                       rvalue = ci->i_rfiles + ci->i_rsubdirs;
+               } else {
+                       max = ci->i_max_bytes;
+                       rvalue = ci->i_rbytes;
+               }
+               spin_unlock(&ci->i_ceph_lock);
+               switch (op) {
+               case QUOTA_CHECK_MAX_FILES_OP:
+                       exceeded = (max && (rvalue >= max));
+                       break;
+               case QUOTA_CHECK_MAX_BYTES_OP:
+                       exceeded = (max && (rvalue + delta > max));
+                       break;
+               case QUOTA_CHECK_MAX_BYTES_APPROACHING_OP:
+                       if (max) {
+                               if (rvalue >= max)
+                                       exceeded = true;
+                               else {
+                                       /*
+                                        * when we're writing more that 1/16th
+                                        * of the available space
+                                        */
+                                       exceeded =
+                                               (((max - rvalue) >> 4) < delta);
+                               }
+                       }
+                       break;
+               default:
+                       /* Shouldn't happen */
+                       pr_warn("Invalid quota check op (%d)\n", op);
+                       exceeded = true; /* Just break the loop */
+               }
+               iput(in);
+
+               next = realm->parent;
+               if (exceeded || !next)
+                       break;
+               ceph_get_snap_realm(mdsc, next);
+               ceph_put_snap_realm(mdsc, realm);
+               realm = next;
+       }
+       ceph_put_snap_realm(mdsc, realm);
+       up_read(&mdsc->snap_rwsem);
+
+       return exceeded;
+}
+
+/*
+ * ceph_quota_is_max_files_exceeded - check if we can create a new file
+ * @inode:     directory where a new file is being created
+ *
+ * This functions returns true is max_files quota allows a new file to be
+ * created.  It is necessary to walk through the snaprealm hierarchy (until the
+ * FS root) to check all realms with quotas set.
+ */
+bool ceph_quota_is_max_files_exceeded(struct inode *inode)
+{
+       if (!ceph_has_realms_with_quotas(inode))
+               return false;
+
+       WARN_ON(!S_ISDIR(inode->i_mode));
+
+       return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 0);
+}
+
+/*
+ * ceph_quota_is_max_bytes_exceeded - check if we can write to a file
+ * @inode:     inode being written
+ * @newsize:   new size if write succeeds
+ *
+ * This functions returns true is max_bytes quota allows a file size to reach
+ * @newsize; it returns false otherwise.
+ */
+bool ceph_quota_is_max_bytes_exceeded(struct inode *inode, loff_t newsize)
+{
+       loff_t size = i_size_read(inode);
+
+       if (!ceph_has_realms_with_quotas(inode))
+               return false;
+
+       /* return immediately if we're decreasing file size */
+       if (newsize <= size)
+               return false;
+
+       return check_quota_exceeded(inode, QUOTA_CHECK_MAX_BYTES_OP, (newsize - size));
+}
+
+/*
+ * ceph_quota_is_max_bytes_approaching - check if we're reaching max_bytes
+ * @inode:     inode being written
+ * @newsize:   new size if write succeeds
+ *
+ * This function returns true if the new file size @newsize will be consuming
+ * more than 1/16th of the available quota space; it returns false otherwise.
+ */
+bool ceph_quota_is_max_bytes_approaching(struct inode *inode, loff_t newsize)
+{
+       loff_t size = ceph_inode(inode)->i_reported_size;
+
+       if (!ceph_has_realms_with_quotas(inode))
+               return false;
+
+       /* return immediately if we're decreasing file size */
+       if (newsize <= size)
+               return false;
+
+       return check_quota_exceeded(inode, QUOTA_CHECK_MAX_BYTES_APPROACHING_OP,
+                                   (newsize - size));
+}
+
+/*
+ * ceph_quota_update_statfs - if root has quota update statfs with quota status
+ * @fsc:       filesystem client instance
+ * @buf:       statfs to update
+ *
+ * If the mounted filesystem root has max_bytes quota set, update the filesystem
+ * statistics with the quota status.
+ *
+ * This function returns true if the stats have been updated, false otherwise.
+ */
+bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf)
+{
+       struct ceph_mds_client *mdsc = fsc->mdsc;
+       struct ceph_inode_info *ci;
+       struct ceph_snap_realm *realm;
+       struct inode *in;
+       u64 total = 0, used, free;
+       bool is_updated = false;
+
+       down_read(&mdsc->snap_rwsem);
+       realm = get_quota_realm(mdsc, d_inode(fsc->sb->s_root));
+       up_read(&mdsc->snap_rwsem);
+       if (!realm)
+               return false;
+
+       spin_lock(&realm->inodes_with_caps_lock);
+       in = realm->inode ? igrab(realm->inode) : NULL;
+       spin_unlock(&realm->inodes_with_caps_lock);
+       if (in) {
+               ci = ceph_inode(in);
+               spin_lock(&ci->i_ceph_lock);
+               if (ci->i_max_bytes) {
+                       total = ci->i_max_bytes >> CEPH_BLOCK_SHIFT;
+                       used = ci->i_rbytes >> CEPH_BLOCK_SHIFT;
+                       /* It is possible for a quota to be exceeded.
+                        * Report 'zero' in that case
+                        */
+                       free = total > used ? total - used : 0;
+               }
+               spin_unlock(&ci->i_ceph_lock);
+               if (total) {
+                       buf->f_blocks = total;
+                       buf->f_bfree = free;
+                       buf->f_bavail = free;
+                       is_updated = true;
+               }
+               iput(in);
+       }
+       ceph_put_snap_realm(mdsc, realm);
+
+       return is_updated;
+}
+
index 07cf95e6413d775d01c383f3d3d7e0e8e4879910..041c27ea8de155a0002bdb5af25eb2fc5f8e6efa 100644 (file)
@@ -931,6 +931,8 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                        list_add(&ci->i_snap_realm_item,
                                 &realm->inodes_with_caps);
                        ci->i_snap_realm = realm;
+                       if (realm->ino == ci->i_vino.ino)
+                                realm->inode = inode;
                        spin_unlock(&realm->inodes_with_caps_lock);
 
                        spin_unlock(&ci->i_ceph_lock);
index fb2bc9c15a2378ceb5712269ab93e677c360f5d0..b33082e6878f1ca1ba8820716dafb1924d77855d 100644 (file)
@@ -76,9 +76,18 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
         */
        buf->f_bsize = 1 << CEPH_BLOCK_SHIFT;
        buf->f_frsize = 1 << CEPH_BLOCK_SHIFT;
-       buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10);
-       buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
-       buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
+
+       /*
+        * By default use root quota for stats; fallback to overall filesystem
+        * usage if using 'noquotadf' mount option or if the root dir doesn't
+        * have max_bytes quota set.
+        */
+       if (ceph_test_mount_opt(fsc, NOQUOTADF) ||
+           !ceph_quota_update_statfs(fsc, buf)) {
+               buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10);
+               buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
+               buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
+       }
 
        buf->f_files = le64_to_cpu(st.num_objects);
        buf->f_ffree = -1;
@@ -151,6 +160,8 @@ enum {
        Opt_acl,
 #endif
        Opt_noacl,
+       Opt_quotadf,
+       Opt_noquotadf,
 };
 
 static match_table_t fsopt_tokens = {
@@ -187,6 +198,8 @@ static match_table_t fsopt_tokens = {
        {Opt_acl, "acl"},
 #endif
        {Opt_noacl, "noacl"},
+       {Opt_quotadf, "quotadf"},
+       {Opt_noquotadf, "noquotadf"},
        {-1, NULL}
 };
 
@@ -314,13 +327,16 @@ static int parse_fsopt_token(char *c, void *private)
                break;
        case Opt_fscache:
                fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
+               kfree(fsopt->fscache_uniq);
+               fsopt->fscache_uniq = NULL;
                break;
        case Opt_nofscache:
                fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE;
+               kfree(fsopt->fscache_uniq);
+               fsopt->fscache_uniq = NULL;
                break;
        case Opt_poolperm:
                fsopt->flags &= ~CEPH_MOUNT_OPT_NOPOOLPERM;
-               printk ("pool perm");
                break;
        case Opt_nopoolperm:
                fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM;
@@ -331,6 +347,12 @@ static int parse_fsopt_token(char *c, void *private)
        case Opt_norequire_active_mds:
                fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT;
                break;
+       case Opt_quotadf:
+               fsopt->flags &= ~CEPH_MOUNT_OPT_NOQUOTADF;
+               break;
+       case Opt_noquotadf:
+               fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF;
+               break;
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
        case Opt_acl:
                fsopt->sb_flags |= SB_POSIXACL;
@@ -513,13 +535,12 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
        if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
                seq_puts(m, ",nodcache");
        if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) {
-               if (fsopt->fscache_uniq)
-                       seq_printf(m, ",fsc=%s", fsopt->fscache_uniq);
-               else
-                       seq_puts(m, ",fsc");
+               seq_show_option(m, "fsc", fsopt->fscache_uniq);
        }
        if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM)
                seq_puts(m, ",nopoolperm");
+       if (fsopt->flags & CEPH_MOUNT_OPT_NOQUOTADF)
+               seq_puts(m, ",noquotadf");
 
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
        if (fsopt->sb_flags & SB_POSIXACL)
@@ -529,7 +550,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 #endif
 
        if (fsopt->mds_namespace)
-               seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
+               seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
        if (fsopt->wsize)
                seq_printf(m, ",wsize=%d", fsopt->wsize);
        if (fsopt->rsize != CEPH_MAX_READ_SIZE)
@@ -679,6 +700,7 @@ struct kmem_cache *ceph_cap_cachep;
 struct kmem_cache *ceph_cap_flush_cachep;
 struct kmem_cache *ceph_dentry_cachep;
 struct kmem_cache *ceph_file_cachep;
+struct kmem_cache *ceph_dir_file_cachep;
 
 static void ceph_inode_init_once(void *foo)
 {
@@ -698,8 +720,7 @@ static int __init init_caches(void)
        if (!ceph_inode_cachep)
                return -ENOMEM;
 
-       ceph_cap_cachep = KMEM_CACHE(ceph_cap,
-                                    SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
+       ceph_cap_cachep = KMEM_CACHE(ceph_cap, SLAB_MEM_SPREAD);
        if (!ceph_cap_cachep)
                goto bad_cap;
        ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush,
@@ -716,6 +737,10 @@ static int __init init_caches(void)
        if (!ceph_file_cachep)
                goto bad_file;
 
+       ceph_dir_file_cachep = KMEM_CACHE(ceph_dir_file_info, SLAB_MEM_SPREAD);
+       if (!ceph_dir_file_cachep)
+               goto bad_dir_file;
+
        error = ceph_fscache_register();
        if (error)
                goto bad_fscache;
@@ -723,6 +748,8 @@ static int __init init_caches(void)
        return 0;
 
 bad_fscache:
+       kmem_cache_destroy(ceph_dir_file_cachep);
+bad_dir_file:
        kmem_cache_destroy(ceph_file_cachep);
 bad_file:
        kmem_cache_destroy(ceph_dentry_cachep);
@@ -748,6 +775,7 @@ static void destroy_caches(void)
        kmem_cache_destroy(ceph_cap_flush_cachep);
        kmem_cache_destroy(ceph_dentry_cachep);
        kmem_cache_destroy(ceph_file_cachep);
+       kmem_cache_destroy(ceph_dir_file_cachep);
 
        ceph_fscache_unregister();
 }
index 1c2086e0fec27a60577c7a676b00fac67fb11102..a7077a0c989fb33cde837a41e6b4d8b9951d2c43 100644 (file)
@@ -39,6 +39,7 @@
 #define CEPH_MOUNT_OPT_FSCACHE         (1<<10) /* use fscache */
 #define CEPH_MOUNT_OPT_NOPOOLPERM      (1<<11) /* no pool permission check */
 #define CEPH_MOUNT_OPT_MOUNTWAIT       (1<<12) /* mount waits if no mds is up */
+#define CEPH_MOUNT_OPT_NOQUOTADF       (1<<13) /* no root dir quota in statfs */
 
 #define CEPH_MOUNT_OPT_DEFAULT    CEPH_MOUNT_OPT_DCACHE
 
@@ -310,6 +311,9 @@ struct ceph_inode_info {
        u64 i_rbytes, i_rfiles, i_rsubdirs;
        u64 i_files, i_subdirs;
 
+       /* quotas */
+       u64 i_max_bytes, i_max_files;
+
        struct rb_root i_fragtree;
        int i_fragtree_nsplits;
        struct mutex i_fragtree_mutex;
@@ -671,6 +675,10 @@ struct ceph_file_info {
 
        spinlock_t rw_contexts_lock;
        struct list_head rw_contexts;
+};
+
+struct ceph_dir_file_info {
+       struct ceph_file_info file_info;
 
        /* readdir: position within the dir */
        u32 frag;
@@ -748,6 +756,7 @@ struct ceph_readdir_cache_control {
  */
 struct ceph_snap_realm {
        u64 ino;
+       struct inode *inode;
        atomic_t nref;
        struct rb_node node;
 
@@ -1066,4 +1075,37 @@ extern int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
 extern int ceph_fs_debugfs_init(struct ceph_fs_client *client);
 extern void ceph_fs_debugfs_cleanup(struct ceph_fs_client *client);
 
+/* quota.c */
+static inline bool __ceph_has_any_quota(struct ceph_inode_info *ci)
+{
+       return ci->i_max_files || ci->i_max_bytes;
+}
+
+extern void ceph_adjust_quota_realms_count(struct inode *inode, bool inc);
+
+static inline void __ceph_update_quota(struct ceph_inode_info *ci,
+                                      u64 max_bytes, u64 max_files)
+{
+       bool had_quota, has_quota;
+       had_quota = __ceph_has_any_quota(ci);
+       ci->i_max_bytes = max_bytes;
+       ci->i_max_files = max_files;
+       has_quota = __ceph_has_any_quota(ci);
+
+       if (had_quota != has_quota)
+               ceph_adjust_quota_realms_count(&ci->vfs_inode, has_quota);
+}
+
+extern void ceph_handle_quota(struct ceph_mds_client *mdsc,
+                             struct ceph_mds_session *session,
+                             struct ceph_msg *msg);
+extern bool ceph_quota_is_max_files_exceeded(struct inode *inode);
+extern bool ceph_quota_is_same_realm(struct inode *old, struct inode *new);
+extern bool ceph_quota_is_max_bytes_exceeded(struct inode *inode,
+                                            loff_t newlen);
+extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode,
+                                               loff_t newlen);
+extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc,
+                                    struct kstatfs *buf);
+
 #endif /* _FS_CEPH_SUPER_H */
index e1c4e0b12b4cd1309af51bad916120c7f2273fc4..7e72348639e4bcbf523ec4bec3a589fbcd643c7c 100644 (file)
@@ -224,6 +224,31 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
                        (long)ci->i_rctime.tv_nsec);
 }
 
+/* quotas */
+
+static bool ceph_vxattrcb_quota_exists(struct ceph_inode_info *ci)
+{
+       return (ci->i_max_files || ci->i_max_bytes);
+}
+
+static size_t ceph_vxattrcb_quota(struct ceph_inode_info *ci, char *val,
+                                 size_t size)
+{
+       return snprintf(val, size, "max_bytes=%llu max_files=%llu",
+                       ci->i_max_bytes, ci->i_max_files);
+}
+
+static size_t ceph_vxattrcb_quota_max_bytes(struct ceph_inode_info *ci,
+                                           char *val, size_t size)
+{
+       return snprintf(val, size, "%llu", ci->i_max_bytes);
+}
+
+static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci,
+                                           char *val, size_t size)
+{
+       return snprintf(val, size, "%llu", ci->i_max_files);
+}
 
 #define CEPH_XATTR_NAME(_type, _name)  XATTR_CEPH_PREFIX #_type "." #_name
 #define CEPH_XATTR_NAME2(_type, _name, _name2) \
@@ -247,6 +272,15 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
                .hidden = true,                 \
                .exists_cb = ceph_vxattrcb_layout_exists,       \
        }
+#define XATTR_QUOTA_FIELD(_type, _name)                                        \
+       {                                                               \
+               .name = CEPH_XATTR_NAME(_type, _name),                  \
+               .name_size = sizeof(CEPH_XATTR_NAME(_type, _name)),     \
+               .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name,   \
+               .readonly = false,                                      \
+               .hidden = true,                                         \
+               .exists_cb = ceph_vxattrcb_quota_exists,                \
+       }
 
 static struct ceph_vxattr ceph_dir_vxattrs[] = {
        {
@@ -270,6 +304,16 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
        XATTR_NAME_CEPH(dir, rsubdirs),
        XATTR_NAME_CEPH(dir, rbytes),
        XATTR_NAME_CEPH(dir, rctime),
+       {
+               .name = "ceph.quota",
+               .name_size = sizeof("ceph.quota"),
+               .getxattr_cb = ceph_vxattrcb_quota,
+               .readonly = false,
+               .hidden = true,
+               .exists_cb = ceph_vxattrcb_quota_exists,
+       },
+       XATTR_QUOTA_FIELD(quota, max_bytes),
+       XATTR_QUOTA_FIELD(quota, max_files),
        { .name = NULL, 0 }     /* Required table terminator */
 };
 static size_t ceph_dir_vxattrs_name_size;      /* total size of all names */
index 59042d5ac52026a5f43416e6a92a2b70166e63b3..3901927cf6a0618122798e8b16491099d35e2cda 100644 (file)
@@ -204,6 +204,7 @@ DEFINE_CEPH_FEATURE_DEPRECATED(63, 1, RESERVED_BROKEN, LUMINOUS) // client-facin
         CEPH_FEATURE_OSD_PRIMARY_AFFINITY |    \
         CEPH_FEATURE_MSGR_KEEPALIVE2 |         \
         CEPH_FEATURE_OSD_POOLRESEND |          \
+        CEPH_FEATURE_MDS_QUOTA |               \
         CEPH_FEATURE_CRUSH_V4 |                \
         CEPH_FEATURE_NEW_OSDOP_ENCODING |      \
         CEPH_FEATURE_SERVER_JEWEL |            \
index 88dd51381aaf9d72e18f277dee6173a214bcb8c8..7ecfc88314d835605d72189e7458f362bb6a1632 100644 (file)
@@ -134,6 +134,7 @@ struct ceph_dir_layout {
 #define CEPH_MSG_CLIENT_LEASE           0x311
 #define CEPH_MSG_CLIENT_SNAP            0x312
 #define CEPH_MSG_CLIENT_CAPRELEASE      0x313
+#define CEPH_MSG_CLIENT_QUOTA           0x314
 
 /* pool ops */
 #define CEPH_MSG_POOLOP_REPLY           48
@@ -807,4 +808,20 @@ struct ceph_mds_snap_realm {
 } __attribute__ ((packed));
 /* followed by my snap list, then prior parent snap list */
 
+/*
+ * quotas
+ */
+struct ceph_mds_quota {
+       __le64 ino;             /* ino */
+       struct ceph_timespec rctime;
+       __le64 rbytes;          /* dir stats */
+       __le64 rfiles;
+       __le64 rsubdirs;
+       __u8 struct_v;          /* compat */
+       __u8 struct_compat;
+       __le32 struct_len;
+       __le64 max_bytes;       /* quota max. bytes */
+       __le64 max_files;       /* quota max. files */
+} __attribute__ ((packed));
+
 #endif
index c2ec44cf5098af4d247cc22d124625eef32fb471..49c93b9308d70543c865f437bd3390ef6e7be73e 100644 (file)
@@ -262,6 +262,7 @@ extern struct kmem_cache *ceph_cap_cachep;
 extern struct kmem_cache *ceph_cap_flush_cachep;
 extern struct kmem_cache *ceph_dentry_cachep;
 extern struct kmem_cache *ceph_file_cachep;
+extern struct kmem_cache *ceph_dir_file_cachep;
 
 /* ceph_common.c */
 extern bool libceph_compatible(void *data);
index ead9d85f1c1114412c4f83688e377e7f9f0c6294..c7dfcb8a1fb2fcb5e253ab0202955a0752d5b844 100644 (file)
@@ -76,6 +76,7 @@ enum ceph_msg_data_type {
 #ifdef CONFIG_BLOCK
        CEPH_MSG_DATA_BIO,      /* data source/destination is a bio list */
 #endif /* CONFIG_BLOCK */
+       CEPH_MSG_DATA_BVECS,    /* data source/destination is a bio_vec array */
 };
 
 static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
@@ -87,22 +88,106 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
                return true;
        default:
                return false;
        }
 }
 
+#ifdef CONFIG_BLOCK
+
+struct ceph_bio_iter {
+       struct bio *bio;
+       struct bvec_iter iter;
+};
+
+#define __ceph_bio_iter_advance_step(it, n, STEP) do {                       \
+       unsigned int __n = (n), __cur_n;                                      \
+                                                                             \
+       while (__n) {                                                         \
+               BUG_ON(!(it)->iter.bi_size);                                  \
+               __cur_n = min((it)->iter.bi_size, __n);                       \
+               (void)(STEP);                                                 \
+               bio_advance_iter((it)->bio, &(it)->iter, __cur_n);            \
+               if (!(it)->iter.bi_size && (it)->bio->bi_next) {              \
+                       dout("__ceph_bio_iter_advance_step next bio\n");      \
+                       (it)->bio = (it)->bio->bi_next;                       \
+                       (it)->iter = (it)->bio->bi_iter;                      \
+               }                                                             \
+               __n -= __cur_n;                                               \
+       }                                                                     \
+} while (0)
+
+/*
+ * Advance @it by @n bytes.
+ */
+#define ceph_bio_iter_advance(it, n)                                         \
+       __ceph_bio_iter_advance_step(it, n, 0)
+
+/*
+ * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
+ */
+#define ceph_bio_iter_advance_step(it, n, BVEC_STEP)                         \
+       __ceph_bio_iter_advance_step(it, n, ({                                \
+               struct bio_vec bv;                                            \
+               struct bvec_iter __cur_iter;                                  \
+                                                                             \
+               __cur_iter = (it)->iter;                                      \
+               __cur_iter.bi_size = __cur_n;                                 \
+               __bio_for_each_segment(bv, (it)->bio, __cur_iter, __cur_iter) \
+                       (void)(BVEC_STEP);                                    \
+       }))
+
+#endif /* CONFIG_BLOCK */
+
+struct ceph_bvec_iter {
+       struct bio_vec *bvecs;
+       struct bvec_iter iter;
+};
+
+#define __ceph_bvec_iter_advance_step(it, n, STEP) do {                              \
+       BUG_ON((n) > (it)->iter.bi_size);                                     \
+       (void)(STEP);                                                         \
+       bvec_iter_advance((it)->bvecs, &(it)->iter, (n));                     \
+} while (0)
+
+/*
+ * Advance @it by @n bytes.
+ */
+#define ceph_bvec_iter_advance(it, n)                                        \
+       __ceph_bvec_iter_advance_step(it, n, 0)
+
+/*
+ * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
+ */
+#define ceph_bvec_iter_advance_step(it, n, BVEC_STEP)                        \
+       __ceph_bvec_iter_advance_step(it, n, ({                               \
+               struct bio_vec bv;                                            \
+               struct bvec_iter __cur_iter;                                  \
+                                                                             \
+               __cur_iter = (it)->iter;                                      \
+               __cur_iter.bi_size = (n);                                     \
+               for_each_bvec(bv, (it)->bvecs, __cur_iter, __cur_iter)        \
+                       (void)(BVEC_STEP);                                    \
+       }))
+
+#define ceph_bvec_iter_shorten(it, n) do {                                   \
+       BUG_ON((n) > (it)->iter.bi_size);                                     \
+       (it)->iter.bi_size = (n);                                             \
+} while (0)
+
 struct ceph_msg_data {
        struct list_head                links;  /* ceph_msg->data */
        enum ceph_msg_data_type         type;
        union {
 #ifdef CONFIG_BLOCK
                struct {
-                       struct bio      *bio;
-                       size_t          bio_length;
+                       struct ceph_bio_iter    bio_pos;
+                       u32                     bio_length;
                };
 #endif /* CONFIG_BLOCK */
+               struct ceph_bvec_iter   bvec_pos;
                struct {
                        struct page     **pages;        /* NOT OWNER. */
                        size_t          length;         /* total # bytes */
@@ -122,11 +207,9 @@ struct ceph_msg_data_cursor {
        bool                    need_crc;       /* crc update needed */
        union {
 #ifdef CONFIG_BLOCK
-               struct {                                /* bio */
-                       struct bio      *bio;           /* bio from list */
-                       struct bvec_iter bvec_iter;
-               };
+               struct ceph_bio_iter    bio_iter;
 #endif /* CONFIG_BLOCK */
+               struct bvec_iter        bvec_iter;
                struct {                                /* pages */
                        unsigned int    page_offset;    /* offset in page */
                        unsigned short  page_index;     /* index in array */
@@ -290,9 +373,11 @@ extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
 extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
                                struct ceph_pagelist *pagelist);
 #ifdef CONFIG_BLOCK
-extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
-                               size_t length);
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
+                          u32 length);
 #endif /* CONFIG_BLOCK */
+void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
+                            struct ceph_bvec_iter *bvec_pos);
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
                                     bool can_fail);
index 52fb37d1c2a5fa2fcd5d54c71dfe231bb926f4be..528ccc943cee0a5da4bc95e5d200a8591debe836 100644 (file)
@@ -57,6 +57,7 @@ enum ceph_osd_data_type {
 #ifdef CONFIG_BLOCK
        CEPH_OSD_DATA_TYPE_BIO,
 #endif /* CONFIG_BLOCK */
+       CEPH_OSD_DATA_TYPE_BVECS,
 };
 
 struct ceph_osd_data {
@@ -72,10 +73,11 @@ struct ceph_osd_data {
                struct ceph_pagelist    *pagelist;
 #ifdef CONFIG_BLOCK
                struct {
-                       struct bio      *bio;           /* list of bios */
-                       size_t          bio_length;     /* total in list */
+                       struct ceph_bio_iter    bio_pos;
+                       u32                     bio_length;
                };
 #endif /* CONFIG_BLOCK */
+               struct ceph_bvec_iter   bvec_pos;
        };
 };
 
@@ -405,10 +407,14 @@ extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *,
                                        unsigned int which,
                                        struct ceph_pagelist *pagelist);
 #ifdef CONFIG_BLOCK
-extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *,
-                                       unsigned int which,
-                                       struct bio *bio, size_t bio_length);
+void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
+                                   unsigned int which,
+                                   struct ceph_bio_iter *bio_pos,
+                                   u32 bio_length);
 #endif /* CONFIG_BLOCK */
+void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
+                                        unsigned int which,
+                                        struct ceph_bvec_iter *bvec_pos);
 
 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
                                        unsigned int which,
@@ -418,6 +424,9 @@ extern void osd_req_op_cls_request_data_pages(struct ceph_osd_request *,
                                        struct page **pages, u64 length,
                                        u32 alignment, bool pages_from_pool,
                                        bool own_pages);
+void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
+                                      unsigned int which,
+                                      struct bio_vec *bvecs, u32 bytes);
 extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
                                        unsigned int which,
                                        struct page **pages, u64 length,
index d41fad99c0fa7ece3c05fa1478f162c82da7e2ff..e71fb222c7c3640e46426dc5a0de96d6814d96df 100644 (file)
@@ -5,7 +5,6 @@
 #include <linux/rbtree.h>
 #include <linux/ceph/types.h>
 #include <linux/ceph/decode.h>
-#include <linux/ceph/ceph_fs.h>
 #include <linux/crush/crush.h>
 
 /*
@@ -280,11 +279,6 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting,
                       const struct ceph_osds *new_acting,
                       bool any_change);
 
-/* calculate mapping of a file extent to an object */
-extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
-                                        u64 off, u64 len,
-                                        u64 *bno, u64 *oxoff, u64 *oxlen);
-
 int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
                                const struct ceph_object_id *oid,
                                const struct ceph_object_locator *oloc,
diff --git a/include/linux/ceph/striper.h b/include/linux/ceph/striper.h
new file mode 100644 (file)
index 0000000..cbd0d24
--- /dev/null
@@ -0,0 +1,69 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _LINUX_CEPH_STRIPER_H
+#define _LINUX_CEPH_STRIPER_H
+
+#include <linux/list.h>
+#include <linux/types.h>
+
+struct ceph_file_layout;
+
+void ceph_calc_file_object_mapping(struct ceph_file_layout *l,
+                                  u64 off, u64 len,
+                                  u64 *objno, u64 *objoff, u32 *xlen);
+
+struct ceph_object_extent {
+       struct list_head oe_item;
+       u64 oe_objno;
+       u64 oe_off;
+       u64 oe_len;
+};
+
+static inline void ceph_object_extent_init(struct ceph_object_extent *ex)
+{
+       INIT_LIST_HEAD(&ex->oe_item);
+}
+
+/*
+ * Called for each mapped stripe unit.
+ *
+ * @bytes: number of bytes mapped, i.e. the minimum of the full length
+ *         requested (file extent length) or the remainder of the stripe
+ *         unit within an object
+ */
+typedef void (*ceph_object_extent_fn_t)(struct ceph_object_extent *ex,
+                                       u32 bytes, void *arg);
+
+int ceph_file_to_extents(struct ceph_file_layout *l, u64 off, u64 len,
+                        struct list_head *object_extents,
+                        struct ceph_object_extent *alloc_fn(void *arg),
+                        void *alloc_arg,
+                        ceph_object_extent_fn_t action_fn,
+                        void *action_arg);
+int ceph_iterate_extents(struct ceph_file_layout *l, u64 off, u64 len,
+                        struct list_head *object_extents,
+                        ceph_object_extent_fn_t action_fn,
+                        void *action_arg);
+
+struct ceph_file_extent {
+       u64 fe_off;
+       u64 fe_len;
+};
+
+static inline u64 ceph_file_extents_bytes(struct ceph_file_extent *file_extents,
+                                         u32 num_file_extents)
+{
+       u64 bytes = 0;
+       u32 i;
+
+       for (i = 0; i < num_file_extents; i++)
+               bytes += file_extents[i].fe_len;
+
+       return bytes;
+}
+
+int ceph_extent_to_file(struct ceph_file_layout *l,
+                       u64 objno, u64 objoff, u64 objlen,
+                       struct ceph_file_extent **file_extents,
+                       u32 *num_file_extents);
+
+#endif
index b4bded4b53960faa19099dbb5c7fd70a31209d83..12bf49772d24b93c90c49ebb87e8770f41bb3918 100644 (file)
@@ -8,6 +8,7 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
        mon_client.o \
        cls_lock_client.o \
        osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
+       striper.o \
        debugfs.o \
        auth.o auth_none.o \
        crypto.o armor.o \
index 4adf07826f4a33086bbc78ce6e6142311fac8d8b..584fdbef2088cf7eaa6ad8fea103dc6aa42b6ce9 100644 (file)
@@ -72,6 +72,7 @@ const char *ceph_msg_type_name(int type)
        case CEPH_MSG_MON_GET_VERSION: return "mon_get_version";
        case CEPH_MSG_MON_GET_VERSION_REPLY: return "mon_get_version_reply";
        case CEPH_MSG_MDS_MAP: return "mds_map";
+       case CEPH_MSG_FS_MAP_USER: return "fs_map_user";
        case CEPH_MSG_CLIENT_SESSION: return "client_session";
        case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect";
        case CEPH_MSG_CLIENT_REQUEST: return "client_request";
@@ -79,8 +80,13 @@ const char *ceph_msg_type_name(int type)
        case CEPH_MSG_CLIENT_REPLY: return "client_reply";
        case CEPH_MSG_CLIENT_CAPS: return "client_caps";
        case CEPH_MSG_CLIENT_CAPRELEASE: return "client_cap_release";
+       case CEPH_MSG_CLIENT_QUOTA: return "client_quota";
        case CEPH_MSG_CLIENT_SNAP: return "client_snap";
        case CEPH_MSG_CLIENT_LEASE: return "client_lease";
+       case CEPH_MSG_POOLOP_REPLY: return "poolop_reply";
+       case CEPH_MSG_POOLOP: return "poolop";
+       case CEPH_MSG_MON_COMMAND: return "mon_command";
+       case CEPH_MSG_MON_COMMAND_ACK: return "mon_command_ack";
        case CEPH_MSG_OSD_MAP: return "osd_map";
        case CEPH_MSG_OSD_OP: return "osd_op";
        case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
@@ -217,7 +223,7 @@ static int parse_fsid(const char *str, struct ceph_fsid *fsid)
 
        if (i == 16)
                err = 0;
-       dout("parse_fsid ret %d got fsid %pU", err, fsid);
+       dout("parse_fsid ret %d got fsid %pU\n", err, fsid);
        return err;
 }
 
index bf9d079cbafd6e89d56ef5f10d81727bd5cbd42b..02172c408ff28d81a23da4cd7aea12025293611d 100644 (file)
@@ -347,10 +347,12 @@ struct key_type key_type_ceph = {
        .destroy        = ceph_key_destroy,
 };
 
-int ceph_crypto_init(void) {
+int __init ceph_crypto_init(void)
+{
        return register_key_type(&key_type_ceph);
 }
 
-void ceph_crypto_shutdown(void) {
+void ceph_crypto_shutdown(void)
+{
        unregister_key_type(&key_type_ceph);
 }
index 1eef6806aa1a01e1901f7f57d759a2a52c6cd1df..02952605d121871200d437d13a832241b3beb5a4 100644 (file)
@@ -389,7 +389,7 @@ CEPH_DEFINE_SHOW_FUNC(monc_show)
 CEPH_DEFINE_SHOW_FUNC(osdc_show)
 CEPH_DEFINE_SHOW_FUNC(client_options_show)
 
-int ceph_debugfs_init(void)
+int __init ceph_debugfs_init(void)
 {
        ceph_debugfs_dir = debugfs_create_dir("ceph", NULL);
        if (!ceph_debugfs_dir)
@@ -418,7 +418,7 @@ int ceph_debugfs_client_init(struct ceph_client *client)
                goto out;
 
        client->monc.debugfs_file = debugfs_create_file("monc",
-                                                     0600,
+                                                     0400,
                                                      client->debugfs_dir,
                                                      client,
                                                      &monc_show_fops);
@@ -426,7 +426,7 @@ int ceph_debugfs_client_init(struct ceph_client *client)
                goto out;
 
        client->osdc.debugfs_file = debugfs_create_file("osdc",
-                                                     0600,
+                                                     0400,
                                                      client->debugfs_dir,
                                                      client,
                                                      &osdc_show_fops);
@@ -434,7 +434,7 @@ int ceph_debugfs_client_init(struct ceph_client *client)
                goto out;
 
        client->debugfs_monmap = debugfs_create_file("monmap",
-                                       0600,
+                                       0400,
                                        client->debugfs_dir,
                                        client,
                                        &monmap_show_fops);
@@ -442,7 +442,7 @@ int ceph_debugfs_client_init(struct ceph_client *client)
                goto out;
 
        client->debugfs_osdmap = debugfs_create_file("osdmap",
-                                       0600,
+                                       0400,
                                        client->debugfs_dir,
                                        client,
                                        &osdmap_show_fops);
@@ -450,7 +450,7 @@ int ceph_debugfs_client_init(struct ceph_client *client)
                goto out;
 
        client->debugfs_options = debugfs_create_file("client_options",
-                                       0600,
+                                       0400,
                                        client->debugfs_dir,
                                        client,
                                        &client_options_show_fops);
@@ -477,7 +477,7 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
 
 #else  /* CONFIG_DEBUG_FS */
 
-int ceph_debugfs_init(void)
+int __init ceph_debugfs_init(void)
 {
        return 0;
 }
@@ -496,6 +496,3 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
 }
 
 #endif  /* CONFIG_DEBUG_FS */
-
-EXPORT_SYMBOL(ceph_debugfs_init);
-EXPORT_SYMBOL(ceph_debugfs_cleanup);
index 8a4d3758030b73d3b9ca24b09f91b1e65be0844e..fcb40c12b1f838e24b2f2a1a58b632522acab8b4 100644 (file)
@@ -277,7 +277,7 @@ static void _ceph_msgr_exit(void)
        ceph_msgr_slab_exit();
 }
 
-int ceph_msgr_init(void)
+int __init ceph_msgr_init(void)
 {
        if (ceph_msgr_slab_init())
                return -ENOMEM;
@@ -299,7 +299,6 @@ int ceph_msgr_init(void)
 
        return -ENOMEM;
 }
-EXPORT_SYMBOL(ceph_msgr_init);
 
 void ceph_msgr_exit(void)
 {
@@ -307,7 +306,6 @@ void ceph_msgr_exit(void)
 
        _ceph_msgr_exit();
 }
-EXPORT_SYMBOL(ceph_msgr_exit);
 
 void ceph_msgr_flush(void)
 {
@@ -839,93 +837,112 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
                                        size_t length)
 {
        struct ceph_msg_data *data = cursor->data;
-       struct bio *bio;
+       struct ceph_bio_iter *it = &cursor->bio_iter;
 
-       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+       cursor->resid = min_t(size_t, length, data->bio_length);
+       *it = data->bio_pos;
+       if (cursor->resid < it->iter.bi_size)
+               it->iter.bi_size = cursor->resid;
 
-       bio = data->bio;
-       BUG_ON(!bio);
-
-       cursor->resid = min(length, data->bio_length);
-       cursor->bio = bio;
-       cursor->bvec_iter = bio->bi_iter;
-       cursor->last_piece =
-               cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);
+       BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+       cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 }
 
 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
                                                size_t *page_offset,
                                                size_t *length)
 {
-       struct ceph_msg_data *data = cursor->data;
-       struct bio *bio;
-       struct bio_vec bio_vec;
-
-       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
-
-       bio = cursor->bio;
-       BUG_ON(!bio);
-
-       bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
-
-       *page_offset = (size_t) bio_vec.bv_offset;
-       BUG_ON(*page_offset >= PAGE_SIZE);
-       if (cursor->last_piece) /* pagelist offset is always 0 */
-               *length = cursor->resid;
-       else
-               *length = (size_t) bio_vec.bv_len;
-       BUG_ON(*length > cursor->resid);
-       BUG_ON(*page_offset + *length > PAGE_SIZE);
+       struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
+                                          cursor->bio_iter.iter);
 
-       return bio_vec.bv_page;
+       *page_offset = bv.bv_offset;
+       *length = bv.bv_len;
+       return bv.bv_page;
 }
 
 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
                                        size_t bytes)
 {
-       struct bio *bio;
-       struct bio_vec bio_vec;
+       struct ceph_bio_iter *it = &cursor->bio_iter;
 
-       BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
+       BUG_ON(bytes > cursor->resid);
+       BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
+       cursor->resid -= bytes;
+       bio_advance_iter(it->bio, &it->iter, bytes);
 
-       bio = cursor->bio;
-       BUG_ON(!bio);
+       if (!cursor->resid) {
+               BUG_ON(!cursor->last_piece);
+               return false;   /* no more data */
+       }
 
-       bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
+       if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done))
+               return false;   /* more bytes to process in this segment */
 
-       /* Advance the cursor offset */
+       if (!it->iter.bi_size) {
+               it->bio = it->bio->bi_next;
+               it->iter = it->bio->bi_iter;
+               if (cursor->resid < it->iter.bi_size)
+                       it->iter.bi_size = cursor->resid;
+       }
 
-       BUG_ON(cursor->resid < bytes);
-       cursor->resid -= bytes;
+       BUG_ON(cursor->last_piece);
+       BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+       cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
+       return true;
+}
+#endif /* CONFIG_BLOCK */
 
-       bio_advance_iter(bio, &cursor->bvec_iter, bytes);
+static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
+                                       size_t length)
+{
+       struct ceph_msg_data *data = cursor->data;
+       struct bio_vec *bvecs = data->bvec_pos.bvecs;
 
-       if (bytes < bio_vec.bv_len)
-               return false;   /* more bytes to process in this segment */
+       cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
+       cursor->bvec_iter = data->bvec_pos.iter;
+       cursor->bvec_iter.bi_size = cursor->resid;
 
-       /* Move on to the next segment, and possibly the next bio */
+       BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->last_piece =
+           cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
+}
 
-       if (!cursor->bvec_iter.bi_size) {
-               bio = bio->bi_next;
-               cursor->bio = bio;
-               if (bio)
-                       cursor->bvec_iter = bio->bi_iter;
-               else
-                       memset(&cursor->bvec_iter, 0,
-                              sizeof(cursor->bvec_iter));
-       }
+static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
+                                               size_t *page_offset,
+                                               size_t *length)
+{
+       struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
+                                          cursor->bvec_iter);
+
+       *page_offset = bv.bv_offset;
+       *length = bv.bv_len;
+       return bv.bv_page;
+}
+
+static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
+                                       size_t bytes)
+{
+       struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
+
+       BUG_ON(bytes > cursor->resid);
+       BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->resid -= bytes;
+       bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
 
-       if (!cursor->last_piece) {
-               BUG_ON(!cursor->resid);
-               BUG_ON(!bio);
-               /* A short read is OK, so use <= rather than == */
-               if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))
-                       cursor->last_piece = true;
+       if (!cursor->resid) {
+               BUG_ON(!cursor->last_piece);
+               return false;   /* no more data */
        }
 
+       if (!bytes || cursor->bvec_iter.bi_bvec_done)
+               return false;   /* more bytes to process in this segment */
+
+       BUG_ON(cursor->last_piece);
+       BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->last_piece =
+           cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
        return true;
 }
-#endif /* CONFIG_BLOCK */
 
 /*
  * For a page array, a piece comes from the first page in the array
@@ -1110,6 +1127,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
                ceph_msg_data_bio_cursor_init(cursor, length);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               ceph_msg_data_bvecs_cursor_init(cursor, length);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                /* BUG(); */
@@ -1158,14 +1178,19 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
                page = ceph_msg_data_bio_next(cursor, page_offset, length);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                page = NULL;
                break;
        }
+
        BUG_ON(!page);
        BUG_ON(*page_offset + *length > PAGE_SIZE);
        BUG_ON(!*length);
+       BUG_ON(*length > cursor->resid);
        if (last_piece)
                *last_piece = cursor->last_piece;
 
@@ -1194,6 +1219,9 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
                new_piece = ceph_msg_data_bio_advance(cursor, bytes);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                BUG();
@@ -1575,13 +1603,18 @@ static int write_partial_message_data(struct ceph_connection *con)
         * been revoked, so use the zero page.
         */
        crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
-       while (cursor->resid) {
+       while (cursor->total_resid) {
                struct page *page;
                size_t page_offset;
                size_t length;
                bool last_piece;
                int ret;
 
+               if (!cursor->resid) {
+                       ceph_msg_data_advance(cursor, 0);
+                       continue;
+               }
+
                page = ceph_msg_data_next(cursor, &page_offset, &length,
                                          &last_piece);
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
@@ -2297,7 +2330,12 @@ static int read_partial_msg_data(struct ceph_connection *con)
 
        if (do_datacrc)
                crc = con->in_data_crc;
-       while (cursor->resid) {
+       while (cursor->total_resid) {
+               if (!cursor->resid) {
+                       ceph_msg_data_advance(cursor, 0);
+                       continue;
+               }
+
                page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
                if (ret <= 0) {
@@ -3262,16 +3300,14 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
 
 #ifdef CONFIG_BLOCK
-void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
-               size_t length)
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
+                          u32 length)
 {
        struct ceph_msg_data *data;
 
-       BUG_ON(!bio);
-
        data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
        BUG_ON(!data);
-       data->bio = bio;
+       data->bio_pos = *bio_pos;
        data->bio_length = length;
 
        list_add_tail(&data->links, &msg->data);
@@ -3280,6 +3316,20 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
 EXPORT_SYMBOL(ceph_msg_data_add_bio);
 #endif /* CONFIG_BLOCK */
 
+void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
+                            struct ceph_bvec_iter *bvec_pos)
+{
+       struct ceph_msg_data *data;
+
+       data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS);
+       BUG_ON(!data);
+       data->bvec_pos = *bvec_pos;
+
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += bvec_pos->iter.bi_size;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
+
 /*
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
index 1547107f48544e9690319e4fe9415968448a9f87..b3dac24412d34cbe2e3616371db9fe9838ce550f 100644 (file)
@@ -60,7 +60,7 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
        num_mon = ceph_decode_32(&p);
        ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
 
-       if (num_mon >= CEPH_MAX_MON)
+       if (num_mon > CEPH_MAX_MON)
                goto bad;
        m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
        if (m == NULL)
index 2814dba5902d7a862a0ffb42b488af473a43fb59..ea2a6c9fb7cef01b54eb86a1a7c580625feb4b1e 100644 (file)
@@ -20,6 +20,7 @@
 #include <linux/ceph/decode.h>
 #include <linux/ceph/auth.h>
 #include <linux/ceph/pagelist.h>
+#include <linux/ceph/striper.h>
 
 #define OSD_OPREPLY_FRONT_LEN  512
 
@@ -103,13 +104,12 @@ static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
                        u64 *objnum, u64 *objoff, u64 *objlen)
 {
        u64 orig_len = *plen;
-       int r;
+       u32 xlen;
 
        /* object extent? */
-       r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
-                                         objoff, objlen);
-       if (r < 0)
-               return r;
+       ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
+                                         objoff, &xlen);
+       *objlen = xlen;
        if (*objlen < orig_len) {
                *plen = *objlen;
                dout(" skipping last %llu, final file extent %llu~%llu\n",
@@ -117,7 +117,6 @@ static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
        }
 
        dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
-
        return 0;
 }
 
@@ -148,14 +147,22 @@ static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
 
 #ifdef CONFIG_BLOCK
 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
-                       struct bio *bio, size_t bio_length)
+                                  struct ceph_bio_iter *bio_pos,
+                                  u32 bio_length)
 {
        osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
-       osd_data->bio = bio;
+       osd_data->bio_pos = *bio_pos;
        osd_data->bio_length = bio_length;
 }
 #endif /* CONFIG_BLOCK */
 
+static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
+                                    struct ceph_bvec_iter *bvec_pos)
+{
+       osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
+       osd_data->bvec_pos = *bvec_pos;
+}
+
 #define osd_req_op_data(oreq, whch, typ, fld)                          \
 ({                                                                     \
        struct ceph_osd_request *__oreq = (oreq);                       \
@@ -218,16 +225,29 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
 
 #ifdef CONFIG_BLOCK
 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
-                       unsigned int which, struct bio *bio, size_t bio_length)
+                                   unsigned int which,
+                                   struct ceph_bio_iter *bio_pos,
+                                   u32 bio_length)
 {
        struct ceph_osd_data *osd_data;
 
        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-       ceph_osd_data_bio_init(osd_data, bio, bio_length);
+       ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 #endif /* CONFIG_BLOCK */
 
+void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
+                                        unsigned int which,
+                                        struct ceph_bvec_iter *bvec_pos)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+       ceph_osd_data_bvecs_init(osd_data, bvec_pos);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
+
 static void osd_req_op_cls_request_info_pagelist(
                        struct ceph_osd_request *osd_req,
                        unsigned int which, struct ceph_pagelist *pagelist)
@@ -265,6 +285,23 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
 }
 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
 
+void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
+                                      unsigned int which,
+                                      struct bio_vec *bvecs, u32 bytes)
+{
+       struct ceph_osd_data *osd_data;
+       struct ceph_bvec_iter it = {
+               .bvecs = bvecs,
+               .iter = { .bi_size = bytes },
+       };
+
+       osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+       ceph_osd_data_bvecs_init(osd_data, &it);
+       osd_req->r_ops[which].cls.indata_len += bytes;
+       osd_req->r_ops[which].indata_len += bytes;
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
+
 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
                        unsigned int which, struct page **pages, u64 length,
                        u32 alignment, bool pages_from_pool, bool own_pages)
@@ -290,6 +327,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
        case CEPH_OSD_DATA_TYPE_BIO:
                return (u64)osd_data->bio_length;
 #endif /* CONFIG_BLOCK */
+       case CEPH_OSD_DATA_TYPE_BVECS:
+               return osd_data->bvec_pos.iter.bi_size;
        default:
                WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
                return 0;
@@ -828,8 +867,10 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
                ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
 #ifdef CONFIG_BLOCK
        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
-               ceph_msg_data_add_bio(msg, osd_data->bio, length);
+               ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
 #endif
+       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
+               ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
        } else {
                BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
        }
@@ -5065,7 +5106,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 }
 EXPORT_SYMBOL(ceph_osdc_writepages);
 
-int ceph_osdc_setup(void)
+int __init ceph_osdc_setup(void)
 {
        size_t size = sizeof(struct ceph_osd_request) +
            CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
@@ -5076,7 +5117,6 @@ int ceph_osdc_setup(void)
 
        return ceph_osd_request_cache ? 0 : -ENOMEM;
 }
-EXPORT_SYMBOL(ceph_osdc_setup);
 
 void ceph_osdc_cleanup(void)
 {
@@ -5084,7 +5124,6 @@ void ceph_osdc_cleanup(void)
        kmem_cache_destroy(ceph_osd_request_cache);
        ceph_osd_request_cache = NULL;
 }
-EXPORT_SYMBOL(ceph_osdc_cleanup);
 
 /*
  * handle incoming message
index 0da27c66349a7a378bc1697de00611618f034f1f..9645ffd6acfb24b1432ad69f655d1caa09741fcf 100644 (file)
@@ -4,7 +4,6 @@
 
 #include <linux/module.h>
 #include <linux/slab.h>
-#include <asm/div64.h>
 
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/osdmap.h>
@@ -2140,76 +2139,6 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting,
        return false;
 }
 
-/*
- * calculate file layout from given offset, length.
- * fill in correct oid, logical length, and object extent
- * offset, length.
- *
- * for now, we write only a single su, until we can
- * pass a stride back to the caller.
- */
-int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
-                                  u64 off, u64 len,
-                                  u64 *ono,
-                                  u64 *oxoff, u64 *oxlen)
-{
-       u32 osize = layout->object_size;
-       u32 su = layout->stripe_unit;
-       u32 sc = layout->stripe_count;
-       u32 bl, stripeno, stripepos, objsetno;
-       u32 su_per_object;
-       u64 t, su_offset;
-
-       dout("mapping %llu~%llu  osize %u fl_su %u\n", off, len,
-            osize, su);
-       if (su == 0 || sc == 0)
-               goto invalid;
-       su_per_object = osize / su;
-       if (su_per_object == 0)
-               goto invalid;
-       dout("osize %u / su %u = su_per_object %u\n", osize, su,
-            su_per_object);
-
-       if ((su & ~PAGE_MASK) != 0)
-               goto invalid;
-
-       /* bl = *off / su; */
-       t = off;
-       do_div(t, su);
-       bl = t;
-       dout("off %llu / su %u = bl %u\n", off, su, bl);
-
-       stripeno = bl / sc;
-       stripepos = bl % sc;
-       objsetno = stripeno / su_per_object;
-
-       *ono = objsetno * sc + stripepos;
-       dout("objset %u * sc %u = ono %u\n", objsetno, sc, (unsigned int)*ono);
-
-       /* *oxoff = *off % layout->fl_stripe_unit;  # offset in su */
-       t = off;
-       su_offset = do_div(t, su);
-       *oxoff = su_offset + (stripeno % su_per_object) * su;
-
-       /*
-        * Calculate the length of the extent being written to the selected
-        * object. This is the minimum of the full length requested (len) or
-        * the remainder of the current stripe being written to.
-        */
-       *oxlen = min_t(u64, len, su - su_offset);
-
-       dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
-       return 0;
-
-invalid:
-       dout(" invalid layout\n");
-       *ono = 0;
-       *oxoff = 0;
-       *oxlen = 0;
-       return -EINVAL;
-}
-EXPORT_SYMBOL(ceph_calc_file_object_mapping);
-
 /*
  * Map an object into a PG.
  *
diff --git a/net/ceph/striper.c b/net/ceph/striper.c
new file mode 100644 (file)
index 0000000..c36462d
--- /dev/null
@@ -0,0 +1,261 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/math64.h>
+#include <linux/slab.h>
+
+#include <linux/ceph/striper.h>
+#include <linux/ceph/types.h>
+
+/*
+ * Map a file extent to a stripe unit within an object.
+ * Fill in objno, offset into object, and object extent length (i.e. the
+ * number of bytes mapped, less than or equal to @l->stripe_unit).
+ *
+ * Example for stripe_count = 3, stripes_per_object = 4:
+ *
+ * blockno   |  0  3  6  9 |  1  4  7 10 |  2  5  8 11 | 12 15 18 21 | 13 16 19
+ * stripeno  |  0  1  2  3 |  0  1  2  3 |  0  1  2  3 |  4  5  6  7 |  4  5  6
+ * stripepos |      0      |      1      |      2      |      0      |      1
+ * objno     |      0      |      1      |      2      |      3      |      4
+ * objsetno  |                    0                    |                    1
+ */
+void ceph_calc_file_object_mapping(struct ceph_file_layout *l,
+                                  u64 off, u64 len,
+                                  u64 *objno, u64 *objoff, u32 *xlen)
+{
+       u32 stripes_per_object = l->object_size / l->stripe_unit;
+       u64 blockno;    /* which su in the file (i.e. globally) */
+       u32 blockoff;   /* offset into su */
+       u64 stripeno;   /* which stripe */
+       u32 stripepos;  /* which su in the stripe,
+                          which object in the object set */
+       u64 objsetno;   /* which object set */
+       u32 objsetpos;  /* which stripe in the object set */
+
+       blockno = div_u64_rem(off, l->stripe_unit, &blockoff);
+       stripeno = div_u64_rem(blockno, l->stripe_count, &stripepos);
+       objsetno = div_u64_rem(stripeno, stripes_per_object, &objsetpos);
+
+       *objno = objsetno * l->stripe_count + stripepos;
+       *objoff = objsetpos * l->stripe_unit + blockoff;
+       *xlen = min_t(u64, len, l->stripe_unit - blockoff);
+}
+EXPORT_SYMBOL(ceph_calc_file_object_mapping);
+
+/*
+ * Return the last extent with given objno (@object_extents is sorted
+ * by objno).  If not found, return NULL and set @add_pos so that the
+ * new extent can be added with list_add(add_pos, new_ex).
+ */
+static struct ceph_object_extent *
+lookup_last(struct list_head *object_extents, u64 objno,
+           struct list_head **add_pos)
+{
+       struct list_head *pos;
+
+       list_for_each_prev(pos, object_extents) {
+               struct ceph_object_extent *ex =
+                   list_entry(pos, typeof(*ex), oe_item);
+
+               if (ex->oe_objno == objno)
+                       return ex;
+
+               if (ex->oe_objno < objno)
+                       break;
+       }
+
+       *add_pos = pos;
+       return NULL;
+}
+
+static struct ceph_object_extent *
+lookup_containing(struct list_head *object_extents, u64 objno,
+                 u64 objoff, u32 xlen)
+{
+       struct ceph_object_extent *ex;
+
+       list_for_each_entry(ex, object_extents, oe_item) {
+               if (ex->oe_objno == objno &&
+                   ex->oe_off <= objoff &&
+                   ex->oe_off + ex->oe_len >= objoff + xlen) /* paranoia */
+                       return ex;
+
+               if (ex->oe_objno > objno)
+                       break;
+       }
+
+       return NULL;
+}
+
+/*
+ * Map a file extent to a sorted list of object extents.
+ *
+ * We want only one (or as few as possible) object extents per object.
+ * Adjacent object extents will be merged together, each returned object
+ * extent may reverse map to multiple different file extents.
+ *
+ * Call @alloc_fn for each new object extent and @action_fn for each
+ * mapped stripe unit, whether it was merged into an already allocated
+ * object extent or started a new object extent.
+ *
+ * Newly allocated object extents are added to @object_extents.
+ * To keep @object_extents sorted, successive calls to this function
+ * must map successive file extents (i.e. the list of file extents that
+ * are mapped using the same @object_extents must be sorted).
+ *
+ * The caller is responsible for @object_extents.
+ */
+int ceph_file_to_extents(struct ceph_file_layout *l, u64 off, u64 len,
+                        struct list_head *object_extents,
+                        struct ceph_object_extent *alloc_fn(void *arg),
+                        void *alloc_arg,
+                        ceph_object_extent_fn_t action_fn,
+                        void *action_arg)
+{
+       struct ceph_object_extent *last_ex, *ex;
+
+       while (len) {
+               struct list_head *add_pos = NULL;
+               u64 objno, objoff;
+               u32 xlen;
+
+               ceph_calc_file_object_mapping(l, off, len, &objno, &objoff,
+                                             &xlen);
+
+               last_ex = lookup_last(object_extents, objno, &add_pos);
+               if (!last_ex || last_ex->oe_off + last_ex->oe_len != objoff) {
+                       ex = alloc_fn(alloc_arg);
+                       if (!ex)
+                               return -ENOMEM;
+
+                       ex->oe_objno = objno;
+                       ex->oe_off = objoff;
+                       ex->oe_len = xlen;
+                       if (action_fn)
+                               action_fn(ex, xlen, action_arg);
+
+                       if (!last_ex)
+                               list_add(&ex->oe_item, add_pos);
+                       else
+                               list_add(&ex->oe_item, &last_ex->oe_item);
+               } else {
+                       last_ex->oe_len += xlen;
+                       if (action_fn)
+                               action_fn(last_ex, xlen, action_arg);
+               }
+
+               off += xlen;
+               len -= xlen;
+       }
+
+       for (last_ex = list_first_entry(object_extents, typeof(*ex), oe_item),
+            ex = list_next_entry(last_ex, oe_item);
+            &ex->oe_item != object_extents;
+            last_ex = ex, ex = list_next_entry(ex, oe_item)) {
+               if (last_ex->oe_objno > ex->oe_objno ||
+                   (last_ex->oe_objno == ex->oe_objno &&
+                    last_ex->oe_off + last_ex->oe_len >= ex->oe_off)) {
+                       WARN(1, "%s: object_extents list not sorted!\n",
+                            __func__);
+                       return -EINVAL;
+               }
+       }
+
+       return 0;
+}
+EXPORT_SYMBOL(ceph_file_to_extents);
+
+/*
+ * A stripped down, non-allocating version of ceph_file_to_extents(),
+ * for when @object_extents is already populated.
+ */
+int ceph_iterate_extents(struct ceph_file_layout *l, u64 off, u64 len,
+                        struct list_head *object_extents,
+                        ceph_object_extent_fn_t action_fn,
+                        void *action_arg)
+{
+       while (len) {
+               struct ceph_object_extent *ex;
+               u64 objno, objoff;
+               u32 xlen;
+
+               ceph_calc_file_object_mapping(l, off, len, &objno, &objoff,
+                                             &xlen);
+
+               ex = lookup_containing(object_extents, objno, objoff, xlen);
+               if (!ex) {
+                       WARN(1, "%s: objno %llu %llu~%u not found!\n",
+                            __func__, objno, objoff, xlen);
+                       return -EINVAL;
+               }
+
+               action_fn(ex, xlen, action_arg);
+
+               off += xlen;
+               len -= xlen;
+       }
+
+       return 0;
+}
+EXPORT_SYMBOL(ceph_iterate_extents);
+
+/*
+ * Reverse map an object extent to a sorted list of file extents.
+ *
+ * On success, the caller is responsible for:
+ *
+ *     kfree(file_extents)
+ */
+int ceph_extent_to_file(struct ceph_file_layout *l,
+                       u64 objno, u64 objoff, u64 objlen,
+                       struct ceph_file_extent **file_extents,
+                       u32 *num_file_extents)
+{
+       u32 stripes_per_object = l->object_size / l->stripe_unit;
+       u64 blockno;    /* which su */
+       u32 blockoff;   /* offset into su */
+       u64 stripeno;   /* which stripe */
+       u32 stripepos;  /* which su in the stripe,
+                          which object in the object set */
+       u64 objsetno;   /* which object set */
+       u32 i = 0;
+
+       if (!objlen) {
+               *file_extents = NULL;
+               *num_file_extents = 0;
+               return 0;
+       }
+
+       *num_file_extents = DIV_ROUND_UP_ULL(objoff + objlen, l->stripe_unit) -
+                                    DIV_ROUND_DOWN_ULL(objoff, l->stripe_unit);
+       *file_extents = kmalloc_array(*num_file_extents, sizeof(**file_extents),
+                                     GFP_NOIO);
+       if (!*file_extents)
+               return -ENOMEM;
+
+       div_u64_rem(objoff, l->stripe_unit, &blockoff);
+       while (objlen) {
+               u64 off, len;
+
+               objsetno = div_u64_rem(objno, l->stripe_count, &stripepos);
+               stripeno = div_u64(objoff, l->stripe_unit) +
+                                               objsetno * stripes_per_object;
+               blockno = stripeno * l->stripe_count + stripepos;
+               off = blockno * l->stripe_unit + blockoff;
+               len = min_t(u64, objlen, l->stripe_unit - blockoff);
+
+               (*file_extents)[i].fe_off = off;
+               (*file_extents)[i].fe_len = len;
+
+               blockoff = 0;
+               objoff += len;
+               objlen -= len;
+               i++;
+       }
+
+       BUG_ON(i != *num_file_extents);
+       return 0;
+}
+EXPORT_SYMBOL(ceph_extent_to_file);