libceph: support the RADOS copy-from operation
[sfrench/cifs-2.6.git] / net / ceph / osd_client.c
index 60934bd8796c53c5ec96477e1c29fb9e8234e804..d23a9f81f3d784123f0f97aaba1b529081350fae 100644 (file)
@@ -126,6 +126,9 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
        osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
 }
 
+/*
+ * Consumes @pages if @own_pages is true.
+ */
 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
                        struct page **pages, u64 length, u32 alignment,
                        bool pages_from_pool, bool own_pages)
@@ -138,6 +141,9 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
        osd_data->own_pages = own_pages;
 }
 
+/*
+ * Consumes a ref on @pagelist.
+ */
 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
                        struct ceph_pagelist *pagelist)
 {
@@ -362,6 +368,8 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
                num_pages = calc_pages_for((u64)osd_data->alignment,
                                                (u64)osd_data->length);
                ceph_release_page_vector(osd_data->pages, num_pages);
+       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+               ceph_pagelist_release(osd_data->pagelist);
        }
        ceph_osd_data_init(osd_data);
 }
@@ -402,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
        case CEPH_OSD_OP_LIST_WATCHERS:
                ceph_osd_data_release(&op->list_watchers.response_data);
                break;
+       case CEPH_OSD_OP_COPY_FROM:
+               ceph_osd_data_release(&op->copy_from.osd_data);
+               break;
        default:
                break;
        }
@@ -606,12 +617,15 @@ static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
        return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
 }
 
-int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
+                                     int num_request_data_items,
+                                     int num_reply_data_items)
 {
        struct ceph_osd_client *osdc = req->r_osdc;
        struct ceph_msg *msg;
        int msg_size;
 
+       WARN_ON(req->r_request || req->r_reply);
        WARN_ON(ceph_oid_empty(&req->r_base_oid));
        WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 
@@ -633,9 +647,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
        msg_size += 4 + 8; /* retry_attempt, features */
 
        if (req->r_mempool)
-               msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
+               msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
+                                      num_request_data_items);
        else
-               msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
+               msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
+                                   num_request_data_items, gfp, true);
        if (!msg)
                return -ENOMEM;
 
@@ -648,9 +664,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
        msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
 
        if (req->r_mempool)
-               msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+               msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
+                                      num_reply_data_items);
        else
-               msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
+               msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
+                                   num_reply_data_items, gfp, true);
        if (!msg)
                return -ENOMEM;
 
@@ -658,7 +676,6 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 
        return 0;
 }
-EXPORT_SYMBOL(ceph_osdc_alloc_messages);
 
 static bool osd_req_opcode_valid(u16 opcode)
 {
@@ -671,6 +688,65 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
        }
 }
 
+static void get_num_data_items(struct ceph_osd_request *req,
+                              int *num_request_data_items,
+                              int *num_reply_data_items)
+{
+       struct ceph_osd_req_op *op;
+
+       *num_request_data_items = 0;
+       *num_reply_data_items = 0;
+
+       for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
+               switch (op->op) {
+               /* request */
+               case CEPH_OSD_OP_WRITE:
+               case CEPH_OSD_OP_WRITEFULL:
+               case CEPH_OSD_OP_SETXATTR:
+               case CEPH_OSD_OP_CMPXATTR:
+               case CEPH_OSD_OP_NOTIFY_ACK:
+               case CEPH_OSD_OP_COPY_FROM:
+                       *num_request_data_items += 1;
+                       break;
+
+               /* reply */
+               case CEPH_OSD_OP_STAT:
+               case CEPH_OSD_OP_READ:
+               case CEPH_OSD_OP_LIST_WATCHERS:
+                       *num_reply_data_items += 1;
+                       break;
+
+               /* both */
+               case CEPH_OSD_OP_NOTIFY:
+                       *num_request_data_items += 1;
+                       *num_reply_data_items += 1;
+                       break;
+               case CEPH_OSD_OP_CALL:
+                       *num_request_data_items += 2;
+                       *num_reply_data_items += 1;
+                       break;
+
+               default:
+                       WARN_ON(!osd_req_opcode_valid(op->op));
+                       break;
+               }
+       }
+}
+
+/*
+ * oid, oloc and OSD op opcode(s) must be filled in before this function
+ * is called.
+ */
+int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+{
+       int num_request_data_items, num_reply_data_items;
+
+       get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
+       return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
+                                         num_reply_data_items);
+}
+EXPORT_SYMBOL(ceph_osdc_alloc_messages);
+
 /*
  * This is an osd op init function for opcodes that have no data or
  * other information associated with them.  It also serves as a
@@ -767,22 +843,19 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
 
 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
-                       u16 opcode, const char *class, const char *method)
+                       const char *class, const char *method)
 {
-       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-                                                     opcode, 0);
+       struct ceph_osd_req_op *op;
        struct ceph_pagelist *pagelist;
        size_t payload_len = 0;
        size_t size;
 
-       BUG_ON(opcode != CEPH_OSD_OP_CALL);
+       op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
 
-       pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+       pagelist = ceph_pagelist_alloc(GFP_NOFS);
        if (!pagelist)
                return -ENOMEM;
 
-       ceph_pagelist_init(pagelist);
-
        op->cls.class_name = class;
        size = strlen(class);
        BUG_ON(size > (size_t) U8_MAX);
@@ -815,12 +888,10 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 
        BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
 
-       pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+       pagelist = ceph_pagelist_alloc(GFP_NOFS);
        if (!pagelist)
                return -ENOMEM;
 
-       ceph_pagelist_init(pagelist);
-
        payload_len = strlen(name);
        op->xattr.name_len = payload_len;
        ceph_pagelist_append(pagelist, name, payload_len);
@@ -900,12 +971,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
                             const struct ceph_osd_req_op *src)
 {
-       if (WARN_ON(!osd_req_opcode_valid(src->op))) {
-               pr_err("unrecognized osd opcode %d\n", src->op);
-
-               return 0;
-       }
-
        switch (src->op) {
        case CEPH_OSD_OP_STAT:
                break;
@@ -955,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
        case CEPH_OSD_OP_CREATE:
        case CEPH_OSD_OP_DELETE:
                break;
+       case CEPH_OSD_OP_COPY_FROM:
+               dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
+               dst->copy_from.src_version =
+                       cpu_to_le64(src->copy_from.src_version);
+               dst->copy_from.flags = src->copy_from.flags;
+               dst->copy_from.src_fadvise_flags =
+                       cpu_to_le32(src->copy_from.src_fadvise_flags);
+               break;
        default:
                pr_err("unsupported osd opcode %s\n",
                        ceph_osd_op_name(src->op));
@@ -1038,7 +1111,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (flags & CEPH_OSD_FLAG_WRITE)
                req->r_data_offset = off;
 
-       r = ceph_osdc_alloc_messages(req, GFP_NOFS);
+       if (num_ops > 1)
+               /*
+                * This is a special case for ceph_writepages_start(), but it
+                * also covers ceph_uninline_data().  If more multi-op request
+                * use cases emerge, we will need a separate helper.
+                */
+               r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
+       else
+               r = ceph_osdc_alloc_messages(req, GFP_NOFS);
        if (r)
                goto fail;
 
@@ -1845,48 +1926,55 @@ static bool should_plug_request(struct ceph_osd_request *req)
        return true;
 }
 
-static void setup_request_data(struct ceph_osd_request *req,
-                              struct ceph_msg *msg)
+/*
+ * Keep get_num_data_items() in sync with this function.
+ */
+static void setup_request_data(struct ceph_osd_request *req)
 {
-       u32 data_len = 0;
-       int i;
+       struct ceph_msg *request_msg = req->r_request;
+       struct ceph_msg *reply_msg = req->r_reply;
+       struct ceph_osd_req_op *op;
 
-       if (!list_empty(&msg->data))
+       if (req->r_request->num_data_items || req->r_reply->num_data_items)
                return;
 
-       WARN_ON(msg->data_length);
-       for (i = 0; i < req->r_num_ops; i++) {
-               struct ceph_osd_req_op *op = &req->r_ops[i];
-
+       WARN_ON(request_msg->data_length || reply_msg->data_length);
+       for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
                switch (op->op) {
                /* request */
                case CEPH_OSD_OP_WRITE:
                case CEPH_OSD_OP_WRITEFULL:
                        WARN_ON(op->indata_len != op->extent.length);
-                       ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->extent.osd_data);
                        break;
                case CEPH_OSD_OP_SETXATTR:
                case CEPH_OSD_OP_CMPXATTR:
                        WARN_ON(op->indata_len != op->xattr.name_len +
                                                  op->xattr.value_len);
-                       ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->xattr.osd_data);
                        break;
                case CEPH_OSD_OP_NOTIFY_ACK:
-                       ceph_osdc_msg_data_add(msg,
+                       ceph_osdc_msg_data_add(request_msg,
                                               &op->notify_ack.request_data);
                        break;
+               case CEPH_OSD_OP_COPY_FROM:
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->copy_from.osd_data);
+                       break;
 
                /* reply */
                case CEPH_OSD_OP_STAT:
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->raw_data_in);
                        break;
                case CEPH_OSD_OP_READ:
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->extent.osd_data);
                        break;
                case CEPH_OSD_OP_LIST_WATCHERS:
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->list_watchers.response_data);
                        break;
 
@@ -1895,25 +1983,23 @@ static void setup_request_data(struct ceph_osd_request *req,
                        WARN_ON(op->indata_len != op->cls.class_len +
                                                  op->cls.method_len +
                                                  op->cls.indata_len);
-                       ceph_osdc_msg_data_add(msg, &op->cls.request_info);
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->cls.request_info);
                        /* optional, can be NONE */
-                       ceph_osdc_msg_data_add(msg, &op->cls.request_data);
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->cls.request_data);
                        /* optional, can be NONE */
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->cls.response_data);
                        break;
                case CEPH_OSD_OP_NOTIFY:
-                       ceph_osdc_msg_data_add(msg,
+                       ceph_osdc_msg_data_add(request_msg,
                                               &op->notify.request_data);
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->notify.response_data);
                        break;
                }
-
-               data_len += op->indata_len;
        }
-
-       WARN_ON(data_len != msg->data_length);
 }
 
 static void encode_pgid(void **p, const struct ceph_pg *pgid)
@@ -1961,7 +2047,7 @@ static void encode_request_partial(struct ceph_osd_request *req,
                        req->r_data_offset || req->r_snapc);
        }
 
-       setup_request_data(req, msg);
+       setup_request_data(req);
 
        encode_spgid(&p, &req->r_t.spgid); /* actual spg */
        ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
@@ -3001,11 +3087,21 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
        struct ceph_osd_client *osdc = lreq->osdc;
        struct ceph_osd *osd;
 
+       down_write(&osdc->lock);
+       linger_register(lreq);
+       if (lreq->is_watch) {
+               lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id;
+               lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id;
+       } else {
+               lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id;
+       }
+
        calc_target(osdc, &lreq->t, NULL, false);
        osd = lookup_create_osd(osdc, lreq->t.osd, true);
        link_linger(osd, lreq);
 
        send_linger(lreq);
+       up_write(&osdc->lock);
 }
 
 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
@@ -4318,9 +4414,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
                             lreq->notify_id, notify_id);
                } else if (!completion_done(&lreq->notify_finish_wait)) {
                        struct ceph_msg_data *data =
-                           list_first_entry_or_null(&msg->data,
-                                                    struct ceph_msg_data,
-                                                    links);
+                           msg->num_data_items ? &msg->data[0] : NULL;
 
                        if (data) {
                                if (lreq->preply_pages) {
@@ -4476,6 +4570,23 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq)
 
        ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
        ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
+       return req;
+}
+
+static struct ceph_osd_request *
+alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
+{
+       struct ceph_osd_request *req;
+
+       req = alloc_linger_request(lreq);
+       if (!req)
+               return NULL;
+
+       /*
+        * Pass 0 for cookie because we don't know it yet, it will be
+        * filled in by linger_submit().
+        */
+       osd_req_op_watch_init(req, 0, 0, watch_opcode);
 
        if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
                ceph_osdc_put_request(req);
@@ -4514,27 +4625,19 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
        lreq->t.flags = CEPH_OSD_FLAG_WRITE;
        ktime_get_real_ts64(&lreq->mtime);
 
-       lreq->reg_req = alloc_linger_request(lreq);
+       lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH);
        if (!lreq->reg_req) {
                ret = -ENOMEM;
                goto err_put_lreq;
        }
 
-       lreq->ping_req = alloc_linger_request(lreq);
+       lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING);
        if (!lreq->ping_req) {
                ret = -ENOMEM;
                goto err_put_lreq;
        }
 
-       down_write(&osdc->lock);
-       linger_register(lreq); /* before osd_req_op_* */
-       osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
-                             CEPH_OSD_WATCH_OP_WATCH);
-       osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
-                             CEPH_OSD_WATCH_OP_PING);
        linger_submit(lreq);
-       up_write(&osdc->lock);
-
        ret = linger_reg_commit_wait(lreq);
        if (ret) {
                linger_cancel(lreq);
@@ -4599,11 +4702,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
 
        op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
 
-       pl = kmalloc(sizeof(*pl), GFP_NOIO);
+       pl = ceph_pagelist_alloc(GFP_NOIO);
        if (!pl)
                return -ENOMEM;
 
-       ceph_pagelist_init(pl);
        ret = ceph_pagelist_encode_64(pl, notify_id);
        ret |= ceph_pagelist_encode_64(pl, cookie);
        if (payload) {
@@ -4641,12 +4743,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = CEPH_OSD_FLAG_READ;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
+                                        payload_len);
        if (ret)
                goto out_put_req;
 
-       ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
-                                        payload_len);
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
        if (ret)
                goto out_put_req;
 
@@ -4670,11 +4772,10 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
        op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
        op->notify.cookie = cookie;
 
-       pl = kmalloc(sizeof(*pl), GFP_NOIO);
+       pl = ceph_pagelist_alloc(GFP_NOIO);
        if (!pl)
                return -ENOMEM;
 
-       ceph_pagelist_init(pl);
        ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
        ret |= ceph_pagelist_encode_32(pl, timeout);
        ret |= ceph_pagelist_encode_32(pl, payload_len);
@@ -4733,29 +4834,30 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
                goto out_put_lreq;
        }
 
+       /*
+        * Pass 0 for cookie because we don't know it yet, it will be
+        * filled in by linger_submit().
+        */
+       ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout,
+                                    payload, payload_len);
+       if (ret)
+               goto out_put_lreq;
+
        /* for notify_id */
        pages = ceph_alloc_page_vector(1, GFP_NOIO);
        if (IS_ERR(pages)) {
                ret = PTR_ERR(pages);
                goto out_put_lreq;
        }
-
-       down_write(&osdc->lock);
-       linger_register(lreq); /* before osd_req_op_* */
-       ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
-                                    timeout, payload, payload_len);
-       if (ret) {
-               linger_unregister(lreq);
-               up_write(&osdc->lock);
-               ceph_release_page_vector(pages, 1);
-               goto out_put_lreq;
-       }
        ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
                                                 response_data),
                                 pages, PAGE_SIZE, 0, false, true);
-       linger_submit(lreq);
-       up_write(&osdc->lock);
 
+       ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
+       if (ret)
+               goto out_put_lreq;
+
+       linger_submit(lreq);
        ret = linger_reg_commit_wait(lreq);
        if (!ret)
                ret = linger_notify_finish_wait(lreq);
@@ -4881,10 +4983,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = CEPH_OSD_FLAG_READ;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-       if (ret)
-               goto out_put_req;
-
        pages = ceph_alloc_page_vector(1, GFP_NOIO);
        if (IS_ERR(pages)) {
                ret = PTR_ERR(pages);
@@ -4896,6 +4994,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
                                                 response_data),
                                 pages, PAGE_SIZE, 0, false, true);
 
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               goto out_put_req;
+
        ceph_osdc_start_request(osdc, req, false);
        ret = ceph_osdc_wait_request(osdc, req);
        if (ret >= 0) {
@@ -4958,11 +5060,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = flags;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-       if (ret)
-               goto out_put_req;
-
-       ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+       ret = osd_req_op_cls_init(req, 0, class, method);
        if (ret)
                goto out_put_req;
 
@@ -4973,6 +5071,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
                osd_req_op_cls_response_data_pages(req, 0, &resp_page,
                                                   *resp_len, 0, false, false);
 
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               goto out_put_req;
+
        ceph_osdc_start_request(osdc, req, false);
        ret = ceph_osdc_wait_request(osdc, req);
        if (ret >= 0) {
@@ -5021,11 +5123,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
                goto out_map;
 
        err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
-                               PAGE_SIZE, 10, true, "osd_op");
+                               PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
        if (err < 0)
                goto out_mempool;
        err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
-                               PAGE_SIZE, 10, true, "osd_op_reply");
+                               PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
+                               "osd_op_reply");
        if (err < 0)
                goto out_msgpool;
 
@@ -5168,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 }
 EXPORT_SYMBOL(ceph_osdc_writepages);
 
+static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
+                                    u64 src_snapid, u64 src_version,
+                                    struct ceph_object_id *src_oid,
+                                    struct ceph_object_locator *src_oloc,
+                                    u32 src_fadvise_flags,
+                                    u32 dst_fadvise_flags,
+                                    u8 copy_from_flags)
+{
+       struct ceph_osd_req_op *op;
+       struct page **pages;
+       void *p, *end;
+
+       pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
+       op->copy_from.snapid = src_snapid;
+       op->copy_from.src_version = src_version;
+       op->copy_from.flags = copy_from_flags;
+       op->copy_from.src_fadvise_flags = src_fadvise_flags;
+
+       p = page_address(pages[0]);
+       end = p + PAGE_SIZE;
+       ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
+       encode_oloc(&p, end, src_oloc);
+       op->indata_len = PAGE_SIZE - (end - p);
+
+       ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
+                                op->indata_len, 0, false, true);
+       return 0;
+}
+
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+                       u64 src_snapid, u64 src_version,
+                       struct ceph_object_id *src_oid,
+                       struct ceph_object_locator *src_oloc,
+                       u32 src_fadvise_flags,
+                       struct ceph_object_id *dst_oid,
+                       struct ceph_object_locator *dst_oloc,
+                       u32 dst_fadvise_flags,
+                       u8 copy_from_flags)
+{
+       struct ceph_osd_request *req;
+       int ret;
+
+       req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+       if (!req)
+               return -ENOMEM;
+
+       req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+       ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
+       ceph_oid_copy(&req->r_t.base_oid, dst_oid);
+
+       ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
+                                       src_oloc, src_fadvise_flags,
+                                       dst_fadvise_flags, copy_from_flags);
+       if (ret)
+               goto out;
+
+       ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+       if (ret)
+               goto out;
+
+       ceph_osdc_start_request(osdc, req, false);
+       ret = ceph_osdc_wait_request(osdc, req);
+
+out:
+       ceph_osdc_put_request(req);
+       return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_copy_from);
+
 int __init ceph_osdc_setup(void)
 {
        size_t size = sizeof(struct ceph_osd_request) +
@@ -5295,7 +5472,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
        u32 front_len = le32_to_cpu(hdr->front_len);
        u32 data_len = le32_to_cpu(hdr->data_len);
 
-       m = ceph_msg_new(type, front_len, GFP_NOIO, false);
+       m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
        if (!m)
                return NULL;