Merge tag 'ceph-for-5.1-rc1' of git://github.com/ceph/ceph-client
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
index 1e92b61d0bd51cd481f06a112de4ed3f0f6718fd..4ba967d65cf963c6f2a2084a62f2dbad1935c3bf 100644 (file)
@@ -115,12 +115,14 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define RBD_FEATURE_LAYERING           (1ULL<<0)
 #define RBD_FEATURE_STRIPINGV2         (1ULL<<1)
 #define RBD_FEATURE_EXCLUSIVE_LOCK     (1ULL<<2)
+#define RBD_FEATURE_DEEP_FLATTEN       (1ULL<<5)
 #define RBD_FEATURE_DATA_POOL          (1ULL<<7)
 #define RBD_FEATURE_OPERATIONS         (1ULL<<8)
 
 #define RBD_FEATURES_ALL       (RBD_FEATURE_LAYERING |         \
                                 RBD_FEATURE_STRIPINGV2 |       \
                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
+                                RBD_FEATURE_DEEP_FLATTEN |     \
                                 RBD_FEATURE_DATA_POOL |        \
                                 RBD_FEATURE_OPERATIONS)
 
@@ -214,28 +216,40 @@ enum obj_operation_type {
        OBJ_OP_READ = 1,
        OBJ_OP_WRITE,
        OBJ_OP_DISCARD,
+       OBJ_OP_ZEROOUT,
 };
 
 /*
  * Writes go through the following state machine to deal with
  * layering:
  *
- *                       need copyup
- * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
- *        |     ^                              |
- *        v     \------------------------------/
- *      done
- *        ^
- *        |
- * RBD_OBJ_WRITE_FLAT
+ *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
+ *            .                 |                                    .
+ *            .                 v                                    .
+ *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
+ *            .                 |                    .               .
+ *            .                 v                    v (deep-copyup  .
+ *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
+ * flattened) v                 |                    .               .
+ *            .                 v                    .               .
+ *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
+ *                              |                        not needed) v
+ *                              v                                    .
+ *                            done . . . . . . . . . . . . . . . . . .
+ *                              ^
+ *                              |
+ *                     RBD_OBJ_WRITE_FLAT
  *
  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
- * there is a parent or not.
+ * assert_exists guard is needed or not (in some cases it's not needed
+ * even if there is a parent).
  */
 enum rbd_obj_write_state {
        RBD_OBJ_WRITE_FLAT = 1,
        RBD_OBJ_WRITE_GUARD,
-       RBD_OBJ_WRITE_COPYUP,
+       RBD_OBJ_WRITE_READ_FROM_PARENT,
+       RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC,
+       RBD_OBJ_WRITE_COPYUP_OPS,
 };
 
 struct rbd_obj_request {
@@ -291,7 +305,6 @@ struct rbd_img_request {
        int                     result; /* first nonzero obj_request result */
 
        struct list_head        object_extents; /* obj_req.ex structs */
-       u32                     obj_request_count;
        u32                     pending_count;
 
        struct kref             kref;
@@ -421,6 +434,10 @@ static DEFINE_IDA(rbd_dev_id_ida);
 
 static struct workqueue_struct *rbd_wq;
 
+static struct ceph_snap_context rbd_empty_snapc = {
+       .nref = REFCOUNT_INIT(1),
+};
+
 /*
  * single-major requires >= 0.75 version of userspace rbd utility.
  */
@@ -428,14 +445,13 @@ static bool single_major = true;
 module_param(single_major, bool, 0444);
 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 
-static ssize_t rbd_add(struct bus_type *bus, const char *buf,
-                      size_t count);
-static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
-                         size_t count);
-static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
-                                   size_t count);
-static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
-                                      size_t count);
+static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
+static ssize_t remove_store(struct bus_type *bus, const char *buf,
+                           size_t count);
+static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
+                                     size_t count);
+static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
+                                        size_t count);
 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 
 static int rbd_dev_id_to_minor(int dev_id)
@@ -464,16 +480,16 @@ static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
        return is_lock_owner;
 }
 
-static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
+static ssize_t supported_features_show(struct bus_type *bus, char *buf)
 {
        return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
 }
 
-static BUS_ATTR(add, 0200, NULL, rbd_add);
-static BUS_ATTR(remove, 0200, NULL, rbd_remove);
-static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
-static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
-static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
+static BUS_ATTR_WO(add);
+static BUS_ATTR_WO(remove);
+static BUS_ATTR_WO(add_single_major);
+static BUS_ATTR_WO(remove_single_major);
+static BUS_ATTR_RO(supported_features);
 
 static struct attribute *rbd_bus_attrs[] = {
        &bus_attr_add.attr,
@@ -733,6 +749,7 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
  */
 enum {
        Opt_queue_depth,
+       Opt_alloc_size,
        Opt_lock_timeout,
        Opt_last_int,
        /* int args above */
@@ -749,6 +766,7 @@ enum {
 
 static match_table_t rbd_opts_tokens = {
        {Opt_queue_depth, "queue_depth=%d"},
+       {Opt_alloc_size, "alloc_size=%d"},
        {Opt_lock_timeout, "lock_timeout=%d"},
        /* int args above */
        {Opt_pool_ns, "_pool_ns=%s"},
@@ -765,6 +783,7 @@ static match_table_t rbd_opts_tokens = {
 
 struct rbd_options {
        int     queue_depth;
+       int     alloc_size;
        unsigned long   lock_timeout;
        bool    read_only;
        bool    lock_on_read;
@@ -773,6 +792,7 @@ struct rbd_options {
 };
 
 #define RBD_QUEUE_DEPTH_DEFAULT        BLKDEV_MAX_RQ
+#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
 #define RBD_READ_ONLY_DEFAULT  false
 #define RBD_LOCK_ON_READ_DEFAULT false
@@ -812,6 +832,17 @@ static int parse_rbd_opts_token(char *c, void *private)
                }
                pctx->opts->queue_depth = intval;
                break;
+       case Opt_alloc_size:
+               if (intval < 1) {
+                       pr_err("alloc_size out of range\n");
+                       return -EINVAL;
+               }
+               if (!is_power_of_2(intval)) {
+                       pr_err("alloc_size must be a power of 2\n");
+                       return -EINVAL;
+               }
+               pctx->opts->alloc_size = intval;
+               break;
        case Opt_lock_timeout:
                /* 0 is "wait forever" (i.e. infinite timeout) */
                if (intval < 0 || intval > INT_MAX / 1000) {
@@ -858,6 +889,8 @@ static char* obj_op_name(enum obj_operation_type op_type)
                return "write";
        case OBJ_OP_DISCARD:
                return "discard";
+       case OBJ_OP_ZEROOUT:
+               return "zeroout";
        default:
                return "???";
        }
@@ -1345,7 +1378,6 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
 
        /* Image request now owns object's original reference */
        obj_request->img_request = img_request;
-       img_request->obj_request_count++;
        img_request->pending_count++;
        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
 }
@@ -1355,8 +1387,6 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
 {
        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
        list_del(&obj_request->ex.oe_item);
-       rbd_assert(img_request->obj_request_count > 0);
-       img_request->obj_request_count--;
        rbd_assert(obj_request->img_request == img_request);
        rbd_obj_request_put(obj_request);
 }
@@ -1410,6 +1440,19 @@ static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
                                        rbd_dev->layout.object_size;
 }
 
+/*
+ * Must be called after rbd_obj_calc_img_extents().
+ */
+static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
+{
+       if (!obj_req->num_img_extents ||
+           (rbd_obj_is_entire(obj_req) &&
+            !obj_req->img_request->snapc->num_snaps))
+               return false;
+
+       return true;
+}
+
 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
 {
        return ceph_file_extents_bytes(obj_req->img_extents,
@@ -1423,6 +1466,7 @@ static bool rbd_img_is_write(struct rbd_img_request *img_req)
                return false;
        case OBJ_OP_WRITE:
        case OBJ_OP_DISCARD:
+       case OBJ_OP_ZEROOUT:
                return true;
        default:
                BUG();
@@ -1471,18 +1515,16 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
 }
 
 static struct ceph_osd_request *
-rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+__rbd_osd_req_create(struct rbd_obj_request *obj_req,
+                    struct ceph_snap_context *snapc, unsigned int num_ops)
 {
-       struct rbd_img_request *img_req = obj_req->img_request;
-       struct rbd_device *rbd_dev = img_req->rbd_dev;
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct ceph_osd_request *req;
        const char *name_format = rbd_dev->image_format == 1 ?
                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
 
-       req = ceph_osdc_alloc_request(osdc,
-                       (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
-                       num_ops, false, GFP_NOIO);
+       req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
        if (!req)
                return NULL;
 
@@ -1507,6 +1549,13 @@ err_req:
        return NULL;
 }
 
+static struct ceph_osd_request *
+rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+{
+       return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
+                                   num_ops);
+}
+
 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
 {
        ceph_osdc_put_request(osd_req);
@@ -1672,7 +1721,6 @@ static void rbd_img_request_destroy(struct kref *kref)
 
        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
                rbd_img_obj_request_del(img_request, obj_request);
-       rbd_assert(img_request->obj_request_count == 0);
 
        if (img_request_layered_test(img_request)) {
                img_request_layered_clear(img_request);
@@ -1755,7 +1803,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
 
 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
 {
-       obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
+       obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
        if (!obj_req->osd_req)
                return -ENOMEM;
 
@@ -1791,6 +1839,11 @@ static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
        return 0;
 }
 
+static int count_write_ops(struct rbd_obj_request *obj_req)
+{
+       return 2; /* setallochint + write/writefull */
+}
+
 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
                                  unsigned int which)
 {
@@ -1817,6 +1870,7 @@ static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
 {
        unsigned int num_osd_ops, which = 0;
+       bool need_guard;
        int ret;
 
        /* reverse map the entire object onto the parent */
@@ -1824,47 +1878,112 @@ static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
        if (ret)
                return ret;
 
-       if (obj_req->num_img_extents) {
-               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
-               num_osd_ops = 3; /* stat + setallochint + write/writefull */
-       } else {
-               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-               num_osd_ops = 2; /* setallochint + write/writefull */
-       }
+       need_guard = rbd_obj_copyup_enabled(obj_req);
+       num_osd_ops = need_guard + count_write_ops(obj_req);
 
        obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
        if (!obj_req->osd_req)
                return -ENOMEM;
 
-       if (obj_req->num_img_extents) {
+       if (need_guard) {
                ret = __rbd_obj_setup_stat(obj_req, which++);
                if (ret)
                        return ret;
+
+               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+       } else {
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
        }
 
        __rbd_obj_setup_write(obj_req, which);
        return 0;
 }
 
-static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
+static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
+{
+       return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
+                                         CEPH_OSD_OP_ZERO;
+}
+
+static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u64 off = obj_req->ex.oe_off;
+       u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
+       int ret;
+
+       /*
+        * Align the range to alloc_size boundary and punt on discards
+        * that are too small to free up any space.
+        *
+        * alloc_size == object_size && is_tail() is a special case for
+        * filestore with filestore_punch_hole = false, needed to allow
+        * truncate (in addition to delete).
+        */
+       if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
+           !rbd_obj_is_tail(obj_req)) {
+               off = round_up(off, rbd_dev->opts->alloc_size);
+               next_off = round_down(next_off, rbd_dev->opts->alloc_size);
+               if (off >= next_off)
+                       return 1;
+       }
+
+       /* reverse map the entire object onto the parent */
+       ret = rbd_obj_calc_img_extents(obj_req, true);
+       if (ret)
+               return ret;
+
+       obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
+
+       if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
+               osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
+       } else {
+               dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
+                    obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
+                    off, next_off - off);
+               osd_req_op_extent_init(obj_req->osd_req, 0,
+                                      truncate_or_zero_opcode(obj_req),
+                                      off, next_off - off, 0, 0);
+       }
+
+       obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+       rbd_osd_req_format_write(obj_req);
+       return 0;
+}
+
+static int count_zeroout_ops(struct rbd_obj_request *obj_req)
+{
+       int num_osd_ops;
+
+       if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
+           !rbd_obj_copyup_enabled(obj_req))
+               num_osd_ops = 2; /* create + truncate */
+       else
+               num_osd_ops = 1; /* delete/truncate/zero */
+
+       return num_osd_ops;
+}
+
+static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
                                    unsigned int which)
 {
        u16 opcode;
 
        if (rbd_obj_is_entire(obj_req)) {
                if (obj_req->num_img_extents) {
-                       osd_req_op_init(obj_req->osd_req, which++,
-                                       CEPH_OSD_OP_CREATE, 0);
+                       if (!rbd_obj_copyup_enabled(obj_req))
+                               osd_req_op_init(obj_req->osd_req, which++,
+                                               CEPH_OSD_OP_CREATE, 0);
                        opcode = CEPH_OSD_OP_TRUNCATE;
                } else {
                        osd_req_op_init(obj_req->osd_req, which++,
                                        CEPH_OSD_OP_DELETE, 0);
                        opcode = 0;
                }
-       } else if (rbd_obj_is_tail(obj_req)) {
-               opcode = CEPH_OSD_OP_TRUNCATE;
        } else {
-               opcode = CEPH_OSD_OP_ZERO;
+               opcode = truncate_or_zero_opcode(obj_req);
        }
 
        if (opcode)
@@ -1876,9 +1995,10 @@ static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
        rbd_osd_req_format_write(obj_req);
 }
 
-static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
 {
        unsigned int num_osd_ops, which = 0;
+       bool need_guard;
        int ret;
 
        /* reverse map the entire object onto the parent */
@@ -1886,33 +2006,24 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
        if (ret)
                return ret;
 
-       if (rbd_obj_is_entire(obj_req)) {
-               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-               if (obj_req->num_img_extents)
-                       num_osd_ops = 2; /* create + truncate */
-               else
-                       num_osd_ops = 1; /* delete */
-       } else {
-               if (obj_req->num_img_extents) {
-                       obj_req->write_state = RBD_OBJ_WRITE_GUARD;
-                       num_osd_ops = 2; /* stat + truncate/zero */
-               } else {
-                       obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-                       num_osd_ops = 1; /* truncate/zero */
-               }
-       }
+       need_guard = rbd_obj_copyup_enabled(obj_req);
+       num_osd_ops = need_guard + count_zeroout_ops(obj_req);
 
        obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
        if (!obj_req->osd_req)
                return -ENOMEM;
 
-       if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
+       if (need_guard) {
                ret = __rbd_obj_setup_stat(obj_req, which++);
                if (ret)
                        return ret;
+
+               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+       } else {
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
        }
 
-       __rbd_obj_setup_discard(obj_req, which);
+       __rbd_obj_setup_zeroout(obj_req, which);
        return 0;
 }
 
@@ -1923,10 +2034,10 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
  */
 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
 {
-       struct rbd_obj_request *obj_req;
+       struct rbd_obj_request *obj_req, *next_obj_req;
        int ret;
 
-       for_each_obj_request(img_req, obj_req) {
+       for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
                switch (img_req->op_type) {
                case OBJ_OP_READ:
                        ret = rbd_obj_setup_read(obj_req);
@@ -1937,11 +2048,20 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
                case OBJ_OP_DISCARD:
                        ret = rbd_obj_setup_discard(obj_req);
                        break;
+               case OBJ_OP_ZEROOUT:
+                       ret = rbd_obj_setup_zeroout(obj_req);
+                       break;
                default:
                        rbd_assert(0);
                }
-               if (ret)
+               if (ret < 0)
                        return ret;
+               if (ret > 0) {
+                       img_req->xferred += obj_req->ex.oe_len;
+                       img_req->pending_count--;
+                       rbd_img_obj_request_del(img_req, obj_req);
+                       continue;
+               }
 
                ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
                if (ret)
@@ -2357,21 +2477,19 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
        return true;
 }
 
-static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
+#define MODS_ONLY      U32_MAX
+
+static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
+                                           u32 bytes)
 {
-       unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
        int ret;
 
        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
        rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
+       rbd_assert(bytes > 0 && bytes != MODS_ONLY);
        rbd_osd_req_destroy(obj_req->osd_req);
 
-       /*
-        * Create a copyup request with the same number of OSD ops as
-        * the original request.  The original request was stat + op(s),
-        * the new copyup request will be copyup + the same op(s).
-        */
-       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+       obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
        if (!obj_req->osd_req)
                return -ENOMEM;
 
@@ -2379,27 +2497,65 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
        if (ret)
                return ret;
 
-       /*
-        * Only send non-zero copyup data to save some I/O and network
-        * bandwidth -- zero copyup data is equivalent to the object not
-        * existing.
-        */
-       if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
-               dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
-               bytes = 0;
-       }
        osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
                                          obj_req->copyup_bvecs,
                                          obj_req->copyup_bvec_count,
                                          bytes);
+       rbd_osd_req_format_write(obj_req);
 
-       switch (obj_req->img_request->op_type) {
+       ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+       if (ret)
+               return ret;
+
+       rbd_obj_request_submit(obj_req);
+       return 0;
+}
+
+static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
+{
+       struct rbd_img_request *img_req = obj_req->img_request;
+       unsigned int num_osd_ops = (bytes != MODS_ONLY);
+       unsigned int which = 0;
+       int ret;
+
+       dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
+       rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
+                  obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
+       rbd_osd_req_destroy(obj_req->osd_req);
+
+       switch (img_req->op_type) {
        case OBJ_OP_WRITE:
-               __rbd_obj_setup_write(obj_req, 1);
+               num_osd_ops += count_write_ops(obj_req);
                break;
-       case OBJ_OP_DISCARD:
-               rbd_assert(!rbd_obj_is_entire(obj_req));
-               __rbd_obj_setup_discard(obj_req, 1);
+       case OBJ_OP_ZEROOUT:
+               num_osd_ops += count_zeroout_ops(obj_req);
+               break;
+       default:
+               rbd_assert(0);
+       }
+
+       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
+
+       if (bytes != MODS_ONLY) {
+               ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
+                                         "copyup");
+               if (ret)
+                       return ret;
+
+               osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
+                                                 obj_req->copyup_bvecs,
+                                                 obj_req->copyup_bvec_count,
+                                                 bytes);
+       }
+
+       switch (img_req->op_type) {
+       case OBJ_OP_WRITE:
+               __rbd_obj_setup_write(obj_req, which);
+               break;
+       case OBJ_OP_ZEROOUT:
+               __rbd_obj_setup_zeroout(obj_req, which);
                break;
        default:
                rbd_assert(0);
@@ -2413,6 +2569,33 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
        return 0;
 }
 
+static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
+{
+       /*
+        * Only send non-zero copyup data to save some I/O and network
+        * bandwidth -- zero copyup data is equivalent to the object not
+        * existing.
+        */
+       if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
+               dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
+               bytes = 0;
+       }
+
+       if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
+               /*
+                * Send a copyup request with an empty snapshot context to
+                * deep-copyup the object through all existing snapshots.
+                * A second request with the current snapshot context will be
+                * sent for the actual modification.
+                */
+               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC;
+               return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes);
+       }
+
+       obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+       return rbd_obj_issue_copyup_ops(obj_req, bytes);
+}
+
 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
 {
        u32 i;
@@ -2452,22 +2635,19 @@ static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
        if (!obj_req->num_img_extents) {
                /*
                 * The overlap has become 0 (most likely because the
-                * image has been flattened).  Use rbd_obj_issue_copyup()
-                * to re-submit the original write request -- the copyup
-                * operation itself will be a no-op, since someone must
-                * have populated the child object while we weren't
-                * looking.  Move to WRITE_FLAT state as we'll be done
-                * with the operation once the null copyup completes.
+                * image has been flattened).  Re-submit the original write
+                * request -- pass MODS_ONLY since the copyup isn't needed
+                * anymore.
                 */
-               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-               return rbd_obj_issue_copyup(obj_req, 0);
+               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+               return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
        }
 
        ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
        if (ret)
                return ret;
 
-       obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
+       obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT;
        return rbd_obj_read_from_parent(obj_req);
 }
 
@@ -2475,7 +2655,6 @@ static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
 {
        int ret;
 
-again:
        switch (obj_req->write_state) {
        case RBD_OBJ_WRITE_GUARD:
                rbd_assert(!obj_req->xferred);
@@ -2494,6 +2673,7 @@ again:
                }
                /* fall through */
        case RBD_OBJ_WRITE_FLAT:
+       case RBD_OBJ_WRITE_COPYUP_OPS:
                if (!obj_req->result)
                        /*
                         * There is no such thing as a successful short
@@ -2501,13 +2681,24 @@ again:
                         */
                        obj_req->xferred = obj_req->ex.oe_len;
                return true;
-       case RBD_OBJ_WRITE_COPYUP:
-               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+       case RBD_OBJ_WRITE_READ_FROM_PARENT:
                if (obj_req->result)
-                       goto again;
+                       return true;
 
                rbd_assert(obj_req->xferred);
                ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
+               if (ret) {
+                       obj_req->result = ret;
+                       obj_req->xferred = 0;
+                       return true;
+               }
+               return false;
+       case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC:
+               if (obj_req->result)
+                       return true;
+
+               obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+               ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
                if (ret) {
                        obj_req->result = ret;
                        return true;
@@ -2529,6 +2720,7 @@ static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
        case OBJ_OP_WRITE:
                return rbd_obj_handle_write(obj_req);
        case OBJ_OP_DISCARD:
+       case OBJ_OP_ZEROOUT:
                if (rbd_obj_handle_write(obj_req)) {
                        /*
                         * Hide -ENOENT from delete/truncate/zero -- discarding
@@ -3641,9 +3833,11 @@ static void rbd_queue_workfn(struct work_struct *work)
 
        switch (req_op(rq)) {
        case REQ_OP_DISCARD:
-       case REQ_OP_WRITE_ZEROES:
                op_type = OBJ_OP_DISCARD;
                break;
+       case REQ_OP_WRITE_ZEROES:
+               op_type = OBJ_OP_ZEROOUT;
+               break;
        case REQ_OP_WRITE:
                op_type = OBJ_OP_WRITE;
                break;
@@ -3723,12 +3917,12 @@ static void rbd_queue_workfn(struct work_struct *work)
        img_request->rq = rq;
        snapc = NULL; /* img_request consumes a ref */
 
-       if (op_type == OBJ_OP_DISCARD)
+       if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
                result = rbd_img_fill_nodata(img_request, offset, length);
        else
                result = rbd_img_fill_from_bio(img_request, offset, length,
                                               rq->bio);
-       if (result)
+       if (result || !img_request->pending_count)
                goto err_img_request;
 
        rbd_img_request_submit(img_request);
@@ -3988,7 +4182,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        rbd_dev->tag_set.ops = &rbd_mq_ops;
        rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
        rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
-       rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
+       rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
        rbd_dev->tag_set.nr_hw_queues = 1;
        rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
 
@@ -5389,6 +5583,7 @@ static int rbd_add_parse_args(const char *buf,
 
        pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
        pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
+       pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
        pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
        pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
        pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
@@ -5796,14 +5991,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
                ret = rbd_dev_v2_parent_info(rbd_dev);
                if (ret)
                        goto err_out_probe;
-
-               /*
-                * Need to warn users if this image is the one being
-                * mapped and has a parent.
-                */
-               if (!depth && rbd_dev->parent_spec)
-                       rbd_warn(rbd_dev,
-                                "WARNING: kernel layering is EXPERIMENTAL!");
        }
 
        ret = rbd_dev_probe_parent(rbd_dev, depth);
@@ -5886,6 +6073,12 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
                rbd_dev->opts->read_only = true;
 
+       if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
+               rbd_warn(rbd_dev, "alloc_size adjusted to %u",
+                        rbd_dev->layout.object_size);
+               rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
+       }
+
        rc = rbd_dev_device_setup(rbd_dev);
        if (rc)
                goto err_out_image_probe;
@@ -5934,9 +6127,7 @@ err_out_args:
        goto out;
 }
 
-static ssize_t rbd_add(struct bus_type *bus,
-                      const char *buf,
-                      size_t count)
+static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
 {
        if (single_major)
                return -EINVAL;
@@ -5944,9 +6135,8 @@ static ssize_t rbd_add(struct bus_type *bus,
        return do_rbd_add(bus, buf, count);
 }
 
-static ssize_t rbd_add_single_major(struct bus_type *bus,
-                                   const char *buf,
-                                   size_t count)
+static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
+                                     size_t count)
 {
        return do_rbd_add(bus, buf, count);
 }
@@ -6049,9 +6239,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
        return count;
 }
 
-static ssize_t rbd_remove(struct bus_type *bus,
-                         const char *buf,
-                         size_t count)
+static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
 {
        if (single_major)
                return -EINVAL;
@@ -6059,9 +6247,8 @@ static ssize_t rbd_remove(struct bus_type *bus,
        return do_rbd_remove(bus, buf, count);
 }
 
-static ssize_t rbd_remove_single_major(struct bus_type *bus,
-                                      const char *buf,
-                                      size_t count)
+static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
+                                        size_t count)
 {
        return do_rbd_remove(bus, buf, count);
 }