Merge tag 'ceph-for-5.1-rc1' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 12 Mar 2019 21:58:35 +0000 (14:58 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 12 Mar 2019 21:58:35 +0000 (14:58 -0700)
Pull ceph updates from Ilya Dryomov:
 "The highlights are:

   - rbd will now ignore discards that aren't aligned and big enough to
     actually free up some space (myself). This is controlled by the new
     alloc_size map option and can be disabled if needed.

   - support for rbd deep-flatten feature (myself). Deep-flatten allows
     "rbd flatten" to fully disconnect the clone image and its snapshots
     from the parent and make the parent snapshot removable.

   - a new round of cap handling improvements (Zheng Yan). The kernel
     client should now be much more prompt about releasing its caps and
     it is possible to put a limit on the number of caps held.

   - support for getting ceph.dir.pin extended attribute (Zheng Yan)"

* tag 'ceph-for-5.1-rc1' of git://github.com/ceph/ceph-client: (26 commits)
  Documentation: modern versions of ceph are not backed by btrfs
  rbd: advertise support for RBD_FEATURE_DEEP_FLATTEN
  rbd: whole-object write and zeroout should copyup when snapshots exist
  rbd: copyup with an empty snapshot context (aka deep-copyup)
  rbd: introduce rbd_obj_issue_copyup_ops()
  rbd: stop copying num_osd_ops in rbd_obj_issue_copyup()
  rbd: factor out __rbd_osd_req_create()
  rbd: clear ->xferred on error from rbd_obj_issue_copyup()
  rbd: remove experimental designation from kernel layering
  ceph: add mount option to limit caps count
  ceph: periodically trim stale dentries
  ceph: delete stale dentry when last reference is dropped
  ceph: remove dentry_lru file from debugfs
  ceph: touch existing cap when handling reply
  ceph: pass inclusive lend parameter to filemap_write_and_wait_range()
  rbd: round off and ignore discards that are too small
  rbd: handle DISCARD and WRITE_ZEROES separately
  rbd: get rid of obj_req->obj_request_count
  libceph: use struct_size() for kmalloc() in crush_decode()
  ceph: send cap releases more aggressively
  ...

1  2 
drivers/block/rbd.c

diff --combined drivers/block/rbd.c
index 74088d8dbaf357b9ecf46b02051dabf96e2d0a10,8dbfc5e54ae3af971c4028209b9e16156f6e2137..4ba967d65cf963c6f2a2084a62f2dbad1935c3bf
@@@ -115,12 -115,14 +115,14 @@@ static int atomic_dec_return_safe(atomi
  #define RBD_FEATURE_LAYERING          (1ULL<<0)
  #define RBD_FEATURE_STRIPINGV2                (1ULL<<1)
  #define RBD_FEATURE_EXCLUSIVE_LOCK    (1ULL<<2)
+ #define RBD_FEATURE_DEEP_FLATTEN      (1ULL<<5)
  #define RBD_FEATURE_DATA_POOL         (1ULL<<7)
  #define RBD_FEATURE_OPERATIONS                (1ULL<<8)
  
  #define RBD_FEATURES_ALL      (RBD_FEATURE_LAYERING |         \
                                 RBD_FEATURE_STRIPINGV2 |       \
                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
+                                RBD_FEATURE_DEEP_FLATTEN |     \
                                 RBD_FEATURE_DATA_POOL |        \
                                 RBD_FEATURE_OPERATIONS)
  
@@@ -214,28 -216,40 +216,40 @@@ enum obj_operation_type 
        OBJ_OP_READ = 1,
        OBJ_OP_WRITE,
        OBJ_OP_DISCARD,
+       OBJ_OP_ZEROOUT,
  };
  
  /*
   * Writes go through the following state machine to deal with
   * layering:
   *
-  *                       need copyup
-  * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
-  *        |     ^                              |
-  *        v     \------------------------------/
-  *      done
-  *        ^
-  *        |
-  * RBD_OBJ_WRITE_FLAT
+  *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
+  *            .                 |                                    .
+  *            .                 v                                    .
+  *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
+  *            .                 |                    .               .
+  *            .                 v                    v (deep-copyup  .
+  *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
+  * flattened) v                 |                    .               .
+  *            .                 v                    .               .
+  *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
+  *                              |                        not needed) v
+  *                              v                                    .
+  *                            done . . . . . . . . . . . . . . . . . .
+  *                              ^
+  *                              |
+  *                     RBD_OBJ_WRITE_FLAT
   *
   * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
-  * there is a parent or not.
+  * assert_exists guard is needed or not (in some cases it's not needed
+  * even if there is a parent).
   */
  enum rbd_obj_write_state {
        RBD_OBJ_WRITE_FLAT = 1,
        RBD_OBJ_WRITE_GUARD,
-       RBD_OBJ_WRITE_COPYUP,
+       RBD_OBJ_WRITE_READ_FROM_PARENT,
+       RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC,
+       RBD_OBJ_WRITE_COPYUP_OPS,
  };
  
  struct rbd_obj_request {
@@@ -291,7 -305,6 +305,6 @@@ struct rbd_img_request 
        int                     result; /* first nonzero obj_request result */
  
        struct list_head        object_extents; /* obj_req.ex structs */
-       u32                     obj_request_count;
        u32                     pending_count;
  
        struct kref             kref;
@@@ -421,6 -434,10 +434,10 @@@ static DEFINE_IDA(rbd_dev_id_ida)
  
  static struct workqueue_struct *rbd_wq;
  
+ static struct ceph_snap_context rbd_empty_snapc = {
+       .nref = REFCOUNT_INIT(1),
+ };
  /*
   * single-major requires >= 0.75 version of userspace rbd utility.
   */
@@@ -428,13 -445,14 +445,13 @@@ static bool single_major = true
  module_param(single_major, bool, 0444);
  MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
  
 -static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 -                     size_t count);
 -static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 -                        size_t count);
 -static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
 -                                  size_t count);
 -static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
 -                                     size_t count);
 +static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
 +static ssize_t remove_store(struct bus_type *bus, const char *buf,
 +                          size_t count);
 +static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
 +                                    size_t count);
 +static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
 +                                       size_t count);
  static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
  
  static int rbd_dev_id_to_minor(int dev_id)
@@@ -463,16 -481,16 +480,16 @@@ static bool rbd_is_lock_owner(struct rb
        return is_lock_owner;
  }
  
 -static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
 +static ssize_t supported_features_show(struct bus_type *bus, char *buf)
  {
        return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
  }
  
 -static BUS_ATTR(add, 0200, NULL, rbd_add);
 -static BUS_ATTR(remove, 0200, NULL, rbd_remove);
 -static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
 -static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
 -static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
 +static BUS_ATTR_WO(add);
 +static BUS_ATTR_WO(remove);
 +static BUS_ATTR_WO(add_single_major);
 +static BUS_ATTR_WO(remove_single_major);
 +static BUS_ATTR_RO(supported_features);
  
  static struct attribute *rbd_bus_attrs[] = {
        &bus_attr_add.attr,
@@@ -732,6 -750,7 +749,7 @@@ static struct rbd_client *rbd_client_fi
   */
  enum {
        Opt_queue_depth,
+       Opt_alloc_size,
        Opt_lock_timeout,
        Opt_last_int,
        /* int args above */
  
  static match_table_t rbd_opts_tokens = {
        {Opt_queue_depth, "queue_depth=%d"},
+       {Opt_alloc_size, "alloc_size=%d"},
        {Opt_lock_timeout, "lock_timeout=%d"},
        /* int args above */
        {Opt_pool_ns, "_pool_ns=%s"},
  
  struct rbd_options {
        int     queue_depth;
+       int     alloc_size;
        unsigned long   lock_timeout;
        bool    read_only;
        bool    lock_on_read;
  };
  
  #define RBD_QUEUE_DEPTH_DEFAULT       BLKDEV_MAX_RQ
+ #define RBD_ALLOC_SIZE_DEFAULT        (64 * 1024)
  #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
  #define RBD_READ_ONLY_DEFAULT false
  #define RBD_LOCK_ON_READ_DEFAULT false
@@@ -811,6 -833,17 +832,17 @@@ static int parse_rbd_opts_token(char *c
                }
                pctx->opts->queue_depth = intval;
                break;
+       case Opt_alloc_size:
+               if (intval < 1) {
+                       pr_err("alloc_size out of range\n");
+                       return -EINVAL;
+               }
+               if (!is_power_of_2(intval)) {
+                       pr_err("alloc_size must be a power of 2\n");
+                       return -EINVAL;
+               }
+               pctx->opts->alloc_size = intval;
+               break;
        case Opt_lock_timeout:
                /* 0 is "wait forever" (i.e. infinite timeout) */
                if (intval < 0 || intval > INT_MAX / 1000) {
@@@ -857,6 -890,8 +889,8 @@@ static char* obj_op_name(enum obj_opera
                return "write";
        case OBJ_OP_DISCARD:
                return "discard";
+       case OBJ_OP_ZEROOUT:
+               return "zeroout";
        default:
                return "???";
        }
@@@ -1344,7 -1379,6 +1378,6 @@@ static inline void rbd_img_obj_request_
  
        /* Image request now owns object's original reference */
        obj_request->img_request = img_request;
-       img_request->obj_request_count++;
        img_request->pending_count++;
        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
  }
@@@ -1354,8 -1388,6 +1387,6 @@@ static inline void rbd_img_obj_request_
  {
        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
        list_del(&obj_request->ex.oe_item);
-       rbd_assert(img_request->obj_request_count > 0);
-       img_request->obj_request_count--;
        rbd_assert(obj_request->img_request == img_request);
        rbd_obj_request_put(obj_request);
  }
@@@ -1409,6 -1441,19 +1440,19 @@@ static bool rbd_obj_is_tail(struct rbd_
                                        rbd_dev->layout.object_size;
  }
  
+ /*
+  * Must be called after rbd_obj_calc_img_extents().
+  */
+ static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
+ {
+       if (!obj_req->num_img_extents ||
+           (rbd_obj_is_entire(obj_req) &&
+            !obj_req->img_request->snapc->num_snaps))
+               return false;
+       return true;
+ }
  static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
  {
        return ceph_file_extents_bytes(obj_req->img_extents,
@@@ -1422,6 -1467,7 +1466,7 @@@ static bool rbd_img_is_write(struct rbd
                return false;
        case OBJ_OP_WRITE:
        case OBJ_OP_DISCARD:
+       case OBJ_OP_ZEROOUT:
                return true;
        default:
                BUG();
@@@ -1470,18 -1516,16 +1515,16 @@@ static void rbd_osd_req_format_write(st
  }
  
  static struct ceph_osd_request *
- rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+ __rbd_osd_req_create(struct rbd_obj_request *obj_req,
+                    struct ceph_snap_context *snapc, unsigned int num_ops)
  {
-       struct rbd_img_request *img_req = obj_req->img_request;
-       struct rbd_device *rbd_dev = img_req->rbd_dev;
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct ceph_osd_request *req;
        const char *name_format = rbd_dev->image_format == 1 ?
                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
  
-       req = ceph_osdc_alloc_request(osdc,
-                       (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
-                       num_ops, false, GFP_NOIO);
+       req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
        if (!req)
                return NULL;
  
@@@ -1506,6 -1550,13 +1549,13 @@@ err_req
        return NULL;
  }
  
+ static struct ceph_osd_request *
+ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+ {
+       return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
+                                   num_ops);
+ }
  static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
  {
        ceph_osdc_put_request(osd_req);
@@@ -1671,7 -1722,6 +1721,6 @@@ static void rbd_img_request_destroy(str
  
        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
                rbd_img_obj_request_del(img_request, obj_request);
-       rbd_assert(img_request->obj_request_count == 0);
  
        if (img_request_layered_test(img_request)) {
                img_request_layered_clear(img_request);
@@@ -1754,7 -1804,7 +1803,7 @@@ static void rbd_osd_req_setup_data(stru
  
  static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
  {
-       obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
+       obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
        if (!obj_req->osd_req)
                return -ENOMEM;
  
@@@ -1790,6 -1840,11 +1839,11 @@@ static int __rbd_obj_setup_stat(struct 
        return 0;
  }
  
+ static int count_write_ops(struct rbd_obj_request *obj_req)
+ {
+       return 2; /* setallochint + write/writefull */
+ }
  static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
                                  unsigned int which)
  {
  static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
  {
        unsigned int num_osd_ops, which = 0;
+       bool need_guard;
        int ret;
  
        /* reverse map the entire object onto the parent */
        if (ret)
                return ret;
  
-       if (obj_req->num_img_extents) {
-               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
-               num_osd_ops = 3; /* stat + setallochint + write/writefull */
-       } else {
-               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-               num_osd_ops = 2; /* setallochint + write/writefull */
-       }
+       need_guard = rbd_obj_copyup_enabled(obj_req);
+       num_osd_ops = need_guard + count_write_ops(obj_req);
  
        obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
        if (!obj_req->osd_req)
                return -ENOMEM;
  
-       if (obj_req->num_img_extents) {
+       if (need_guard) {
                ret = __rbd_obj_setup_stat(obj_req, which++);
                if (ret)
                        return ret;
+               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+       } else {
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
        }
  
        __rbd_obj_setup_write(obj_req, which);
        return 0;
  }
  
- static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
+ static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
+ {
+       return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
+                                         CEPH_OSD_OP_ZERO;
+ }
+ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+ {
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u64 off = obj_req->ex.oe_off;
+       u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
+       int ret;
+       /*
+        * Align the range to alloc_size boundary and punt on discards
+        * that are too small to free up any space.
+        *
+        * alloc_size == object_size && is_tail() is a special case for
+        * filestore with filestore_punch_hole = false, needed to allow
+        * truncate (in addition to delete).
+        */
+       if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
+           !rbd_obj_is_tail(obj_req)) {
+               off = round_up(off, rbd_dev->opts->alloc_size);
+               next_off = round_down(next_off, rbd_dev->opts->alloc_size);
+               if (off >= next_off)
+                       return 1;
+       }
+       /* reverse map the entire object onto the parent */
+       ret = rbd_obj_calc_img_extents(obj_req, true);
+       if (ret)
+               return ret;
+       obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
+       if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
+               osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
+       } else {
+               dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
+                    obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
+                    off, next_off - off);
+               osd_req_op_extent_init(obj_req->osd_req, 0,
+                                      truncate_or_zero_opcode(obj_req),
+                                      off, next_off - off, 0, 0);
+       }
+       obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+       rbd_osd_req_format_write(obj_req);
+       return 0;
+ }
+ static int count_zeroout_ops(struct rbd_obj_request *obj_req)
+ {
+       int num_osd_ops;
+       if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
+           !rbd_obj_copyup_enabled(obj_req))
+               num_osd_ops = 2; /* create + truncate */
+       else
+               num_osd_ops = 1; /* delete/truncate/zero */
+       return num_osd_ops;
+ }
+ static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
                                    unsigned int which)
  {
        u16 opcode;
  
        if (rbd_obj_is_entire(obj_req)) {
                if (obj_req->num_img_extents) {
-                       osd_req_op_init(obj_req->osd_req, which++,
-                                       CEPH_OSD_OP_CREATE, 0);
+                       if (!rbd_obj_copyup_enabled(obj_req))
+                               osd_req_op_init(obj_req->osd_req, which++,
+                                               CEPH_OSD_OP_CREATE, 0);
                        opcode = CEPH_OSD_OP_TRUNCATE;
                } else {
                        osd_req_op_init(obj_req->osd_req, which++,
                                        CEPH_OSD_OP_DELETE, 0);
                        opcode = 0;
                }
-       } else if (rbd_obj_is_tail(obj_req)) {
-               opcode = CEPH_OSD_OP_TRUNCATE;
        } else {
-               opcode = CEPH_OSD_OP_ZERO;
+               opcode = truncate_or_zero_opcode(obj_req);
        }
  
        if (opcode)
        rbd_osd_req_format_write(obj_req);
  }
  
- static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+ static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
  {
        unsigned int num_osd_ops, which = 0;
+       bool need_guard;
        int ret;
  
        /* reverse map the entire object onto the parent */
        if (ret)
                return ret;
  
-       if (rbd_obj_is_entire(obj_req)) {
-               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-               if (obj_req->num_img_extents)
-                       num_osd_ops = 2; /* create + truncate */
-               else
-                       num_osd_ops = 1; /* delete */
-       } else {
-               if (obj_req->num_img_extents) {
-                       obj_req->write_state = RBD_OBJ_WRITE_GUARD;
-                       num_osd_ops = 2; /* stat + truncate/zero */
-               } else {
-                       obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-                       num_osd_ops = 1; /* truncate/zero */
-               }
-       }
+       need_guard = rbd_obj_copyup_enabled(obj_req);
+       num_osd_ops = need_guard + count_zeroout_ops(obj_req);
  
        obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
        if (!obj_req->osd_req)
                return -ENOMEM;
  
-       if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
+       if (need_guard) {
                ret = __rbd_obj_setup_stat(obj_req, which++);
                if (ret)
                        return ret;
+               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+       } else {
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
        }
  
-       __rbd_obj_setup_discard(obj_req, which);
+       __rbd_obj_setup_zeroout(obj_req, which);
        return 0;
  }
  
   */
  static int __rbd_img_fill_request(struct rbd_img_request *img_req)
  {
-       struct rbd_obj_request *obj_req;
+       struct rbd_obj_request *obj_req, *next_obj_req;
        int ret;
  
-       for_each_obj_request(img_req, obj_req) {
+       for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
                switch (img_req->op_type) {
                case OBJ_OP_READ:
                        ret = rbd_obj_setup_read(obj_req);
                case OBJ_OP_DISCARD:
                        ret = rbd_obj_setup_discard(obj_req);
                        break;
+               case OBJ_OP_ZEROOUT:
+                       ret = rbd_obj_setup_zeroout(obj_req);
+                       break;
                default:
                        rbd_assert(0);
                }
-               if (ret)
+               if (ret < 0)
                        return ret;
+               if (ret > 0) {
+                       img_req->xferred += obj_req->ex.oe_len;
+                       img_req->pending_count--;
+                       rbd_img_obj_request_del(img_req, obj_req);
+                       continue;
+               }
  
                ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
                if (ret)
@@@ -2356,21 -2478,19 +2477,19 @@@ static bool is_zero_bvecs(struct bio_ve
        return true;
  }
  
- static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
+ #define MODS_ONLY     U32_MAX
+ static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
+                                           u32 bytes)
  {
-       unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
        int ret;
  
        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
        rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
+       rbd_assert(bytes > 0 && bytes != MODS_ONLY);
        rbd_osd_req_destroy(obj_req->osd_req);
  
-       /*
-        * Create a copyup request with the same number of OSD ops as
-        * the original request.  The original request was stat + op(s),
-        * the new copyup request will be copyup + the same op(s).
-        */
-       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+       obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
        if (!obj_req->osd_req)
                return -ENOMEM;
  
        if (ret)
                return ret;
  
-       /*
-        * Only send non-zero copyup data to save some I/O and network
-        * bandwidth -- zero copyup data is equivalent to the object not
-        * existing.
-        */
-       if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
-               dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
-               bytes = 0;
-       }
        osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
                                          obj_req->copyup_bvecs,
                                          obj_req->copyup_bvec_count,
                                          bytes);
+       rbd_osd_req_format_write(obj_req);
  
-       switch (obj_req->img_request->op_type) {
+       ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+       if (ret)
+               return ret;
+       rbd_obj_request_submit(obj_req);
+       return 0;
+ }
+ static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
+ {
+       struct rbd_img_request *img_req = obj_req->img_request;
+       unsigned int num_osd_ops = (bytes != MODS_ONLY);
+       unsigned int which = 0;
+       int ret;
+       dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
+       rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
+                  obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
+       rbd_osd_req_destroy(obj_req->osd_req);
+       switch (img_req->op_type) {
        case OBJ_OP_WRITE:
-               __rbd_obj_setup_write(obj_req, 1);
+               num_osd_ops += count_write_ops(obj_req);
                break;
-       case OBJ_OP_DISCARD:
-               rbd_assert(!rbd_obj_is_entire(obj_req));
-               __rbd_obj_setup_discard(obj_req, 1);
+       case OBJ_OP_ZEROOUT:
+               num_osd_ops += count_zeroout_ops(obj_req);
+               break;
+       default:
+               rbd_assert(0);
+       }
+       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
+       if (bytes != MODS_ONLY) {
+               ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
+                                         "copyup");
+               if (ret)
+                       return ret;
+               osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
+                                                 obj_req->copyup_bvecs,
+                                                 obj_req->copyup_bvec_count,
+                                                 bytes);
+       }
+       switch (img_req->op_type) {
+       case OBJ_OP_WRITE:
+               __rbd_obj_setup_write(obj_req, which);
+               break;
+       case OBJ_OP_ZEROOUT:
+               __rbd_obj_setup_zeroout(obj_req, which);
                break;
        default:
                rbd_assert(0);
        return 0;
  }
  
+ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
+ {
+       /*
+        * Only send non-zero copyup data to save some I/O and network
+        * bandwidth -- zero copyup data is equivalent to the object not
+        * existing.
+        */
+       if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
+               dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
+               bytes = 0;
+       }
+       if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
+               /*
+                * Send a copyup request with an empty snapshot context to
+                * deep-copyup the object through all existing snapshots.
+                * A second request with the current snapshot context will be
+                * sent for the actual modification.
+                */
+               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC;
+               return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes);
+       }
+       obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+       return rbd_obj_issue_copyup_ops(obj_req, bytes);
+ }
  static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
  {
        u32 i;
@@@ -2451,22 -2636,19 +2635,19 @@@ static int rbd_obj_handle_write_guard(s
        if (!obj_req->num_img_extents) {
                /*
                 * The overlap has become 0 (most likely because the
-                * image has been flattened).  Use rbd_obj_issue_copyup()
-                * to re-submit the original write request -- the copyup
-                * operation itself will be a no-op, since someone must
-                * have populated the child object while we weren't
-                * looking.  Move to WRITE_FLAT state as we'll be done
-                * with the operation once the null copyup completes.
+                * image has been flattened).  Re-submit the original write
+                * request -- pass MODS_ONLY since the copyup isn't needed
+                * anymore.
                 */
-               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-               return rbd_obj_issue_copyup(obj_req, 0);
+               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+               return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
        }
  
        ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
        if (ret)
                return ret;
  
-       obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
+       obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT;
        return rbd_obj_read_from_parent(obj_req);
  }
  
@@@ -2474,7 -2656,6 +2655,6 @@@ static bool rbd_obj_handle_write(struc
  {
        int ret;
  
- again:
        switch (obj_req->write_state) {
        case RBD_OBJ_WRITE_GUARD:
                rbd_assert(!obj_req->xferred);
                }
                /* fall through */
        case RBD_OBJ_WRITE_FLAT:
+       case RBD_OBJ_WRITE_COPYUP_OPS:
                if (!obj_req->result)
                        /*
                         * There is no such thing as a successful short
                         */
                        obj_req->xferred = obj_req->ex.oe_len;
                return true;
-       case RBD_OBJ_WRITE_COPYUP:
-               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+       case RBD_OBJ_WRITE_READ_FROM_PARENT:
                if (obj_req->result)
-                       goto again;
+                       return true;
  
                rbd_assert(obj_req->xferred);
                ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
+               if (ret) {
+                       obj_req->result = ret;
+                       obj_req->xferred = 0;
+                       return true;
+               }
+               return false;
+       case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC:
+               if (obj_req->result)
+                       return true;
+               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+               ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
                if (ret) {
                        obj_req->result = ret;
                        return true;
@@@ -2528,6 -2721,7 +2720,7 @@@ static bool __rbd_obj_handle_request(st
        case OBJ_OP_WRITE:
                return rbd_obj_handle_write(obj_req);
        case OBJ_OP_DISCARD:
+       case OBJ_OP_ZEROOUT:
                if (rbd_obj_handle_write(obj_req)) {
                        /*
                         * Hide -ENOENT from delete/truncate/zero -- discarding
@@@ -3640,9 -3834,11 +3833,11 @@@ static void rbd_queue_workfn(struct wor
  
        switch (req_op(rq)) {
        case REQ_OP_DISCARD:
-       case REQ_OP_WRITE_ZEROES:
                op_type = OBJ_OP_DISCARD;
                break;
+       case REQ_OP_WRITE_ZEROES:
+               op_type = OBJ_OP_ZEROOUT;
+               break;
        case REQ_OP_WRITE:
                op_type = OBJ_OP_WRITE;
                break;
        img_request->rq = rq;
        snapc = NULL; /* img_request consumes a ref */
  
-       if (op_type == OBJ_OP_DISCARD)
+       if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
                result = rbd_img_fill_nodata(img_request, offset, length);
        else
                result = rbd_img_fill_from_bio(img_request, offset, length,
                                               rq->bio);
-       if (result)
+       if (result || !img_request->pending_count)
                goto err_img_request;
  
        rbd_img_request_submit(img_request);
@@@ -3987,7 -4183,7 +4182,7 @@@ static int rbd_init_disk(struct rbd_dev
        rbd_dev->tag_set.ops = &rbd_mq_ops;
        rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
        rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
 -      rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
 +      rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
        rbd_dev->tag_set.nr_hw_queues = 1;
        rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
  
@@@ -5388,6 -5584,7 +5583,7 @@@ static int rbd_add_parse_args(const cha
  
        pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
        pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
+       pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
        pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
        pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
        pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
@@@ -5795,14 -5992,6 +5991,6 @@@ static int rbd_dev_image_probe(struct r
                ret = rbd_dev_v2_parent_info(rbd_dev);
                if (ret)
                        goto err_out_probe;
-               /*
-                * Need to warn users if this image is the one being
-                * mapped and has a parent.
-                */
-               if (!depth && rbd_dev->parent_spec)
-                       rbd_warn(rbd_dev,
-                                "WARNING: kernel layering is EXPERIMENTAL!");
        }
  
        ret = rbd_dev_probe_parent(rbd_dev, depth);
@@@ -5885,6 -6074,12 +6073,12 @@@ static ssize_t do_rbd_add(struct bus_ty
        if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
                rbd_dev->opts->read_only = true;
  
+       if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
+               rbd_warn(rbd_dev, "alloc_size adjusted to %u",
+                        rbd_dev->layout.object_size);
+               rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
+       }
        rc = rbd_dev_device_setup(rbd_dev);
        if (rc)
                goto err_out_image_probe;
@@@ -5933,7 -6128,9 +6127,7 @@@ err_out_args
        goto out;
  }
  
 -static ssize_t rbd_add(struct bus_type *bus,
 -                     const char *buf,
 -                     size_t count)
 +static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
  {
        if (single_major)
                return -EINVAL;
        return do_rbd_add(bus, buf, count);
  }
  
 -static ssize_t rbd_add_single_major(struct bus_type *bus,
 -                                  const char *buf,
 -                                  size_t count)
 +static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
 +                                    size_t count)
  {
        return do_rbd_add(bus, buf, count);
  }
@@@ -6045,7 -6243,9 +6239,7 @@@ static ssize_t do_rbd_remove(struct bus
        return count;
  }
  
 -static ssize_t rbd_remove(struct bus_type *bus,
 -                        const char *buf,
 -                        size_t count)
 +static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
  {
        if (single_major)
                return -EINVAL;
        return do_rbd_remove(bus, buf, count);
  }
  
 -static ssize_t rbd_remove_single_major(struct bus_type *bus,
 -                                     const char *buf,
 -                                     size_t count)
 +static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
 +                                       size_t count)
  {
        return do_rbd_remove(bus, buf, count);
  }