osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
}
+/*
+ * Consumes @pages if @own_pages is true.
+ */
static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
struct page **pages, u64 length, u32 alignment,
bool pages_from_pool, bool own_pages)
osd_data->own_pages = own_pages;
}
+/*
+ * Consumes a ref on @pagelist.
+ */
static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
struct ceph_pagelist *pagelist)
{
num_pages = calc_pages_for((u64)osd_data->alignment,
(u64)osd_data->length);
ceph_release_page_vector(osd_data->pages, num_pages);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+ ceph_pagelist_release(osd_data->pagelist);
}
ceph_osd_data_init(osd_data);
}
case CEPH_OSD_OP_LIST_WATCHERS:
ceph_osd_data_release(&op->list_watchers.response_data);
break;
+ case CEPH_OSD_OP_COPY_FROM:
+ ceph_osd_data_release(&op->copy_from.osd_data);
+ break;
default:
break;
}
return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
}
-int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
+ int num_request_data_items,
+ int num_reply_data_items)
{
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_msg *msg;
int msg_size;
+ WARN_ON(req->r_request || req->r_reply);
WARN_ON(ceph_oid_empty(&req->r_base_oid));
WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
msg_size += 4 + 8; /* retry_attempt, features */
if (req->r_mempool)
- msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
+ msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
+ num_request_data_items);
else
- msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
+ msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
+ num_request_data_items, gfp, true);
if (!msg)
return -ENOMEM;
msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
if (req->r_mempool)
- msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+ msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
+ num_reply_data_items);
else
- msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
+ msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
+ num_reply_data_items, gfp, true);
if (!msg)
return -ENOMEM;
return 0;
}
-EXPORT_SYMBOL(ceph_osdc_alloc_messages);
static bool osd_req_opcode_valid(u16 opcode)
{
}
}
+static void get_num_data_items(struct ceph_osd_request *req,
+ int *num_request_data_items,
+ int *num_reply_data_items)
+{
+ struct ceph_osd_req_op *op;
+
+ *num_request_data_items = 0;
+ *num_reply_data_items = 0;
+
+ for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
+ switch (op->op) {
+ /* request */
+ case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_WRITEFULL:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ case CEPH_OSD_OP_NOTIFY_ACK:
+ case CEPH_OSD_OP_COPY_FROM:
+ *num_request_data_items += 1;
+ break;
+
+ /* reply */
+ case CEPH_OSD_OP_STAT:
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_LIST_WATCHERS:
+ *num_reply_data_items += 1;
+ break;
+
+ /* both */
+ case CEPH_OSD_OP_NOTIFY:
+ *num_request_data_items += 1;
+ *num_reply_data_items += 1;
+ break;
+ case CEPH_OSD_OP_CALL:
+ *num_request_data_items += 2;
+ *num_reply_data_items += 1;
+ break;
+
+ default:
+ WARN_ON(!osd_req_opcode_valid(op->op));
+ break;
+ }
+ }
+}
+
+/*
+ * oid, oloc and OSD op opcode(s) must be filled in before this function
+ * is called.
+ */
+int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+{
+ int num_request_data_items, num_reply_data_items;
+
+ get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
+ return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
+ num_reply_data_items);
+}
+EXPORT_SYMBOL(ceph_osdc_alloc_messages);
+
/*
* This is an osd op init function for opcodes that have no data or
* other information associated with them. It also serves as a
EXPORT_SYMBOL(osd_req_op_extent_dup_last);
int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
- u16 opcode, const char *class, const char *method)
+ const char *class, const char *method)
{
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
- opcode, 0);
+ struct ceph_osd_req_op *op;
struct ceph_pagelist *pagelist;
size_t payload_len = 0;
size_t size;
- BUG_ON(opcode != CEPH_OSD_OP_CALL);
+ op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
- pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+ pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!pagelist)
return -ENOMEM;
- ceph_pagelist_init(pagelist);
-
op->cls.class_name = class;
size = strlen(class);
BUG_ON(size > (size_t) U8_MAX);
BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
- pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+ pagelist = ceph_pagelist_alloc(GFP_NOFS);
if (!pagelist)
return -ENOMEM;
- ceph_pagelist_init(pagelist);
-
payload_len = strlen(name);
op->xattr.name_len = payload_len;
ceph_pagelist_append(pagelist, name, payload_len);
static u32 osd_req_encode_op(struct ceph_osd_op *dst,
const struct ceph_osd_req_op *src)
{
- if (WARN_ON(!osd_req_opcode_valid(src->op))) {
- pr_err("unrecognized osd opcode %d\n", src->op);
-
- return 0;
- }
-
switch (src->op) {
case CEPH_OSD_OP_STAT:
break;
case CEPH_OSD_OP_CREATE:
case CEPH_OSD_OP_DELETE:
break;
+ case CEPH_OSD_OP_COPY_FROM:
+ dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
+ dst->copy_from.src_version =
+ cpu_to_le64(src->copy_from.src_version);
+ dst->copy_from.flags = src->copy_from.flags;
+ dst->copy_from.src_fadvise_flags =
+ cpu_to_le32(src->copy_from.src_fadvise_flags);
+ break;
default:
pr_err("unsupported osd opcode %s\n",
ceph_osd_op_name(src->op));
if (flags & CEPH_OSD_FLAG_WRITE)
req->r_data_offset = off;
- r = ceph_osdc_alloc_messages(req, GFP_NOFS);
+ if (num_ops > 1)
+ /*
+ * This is a special case for ceph_writepages_start(), but it
+ * also covers ceph_uninline_data(). If more multi-op request
+ * use cases emerge, we will need a separate helper.
+ */
+ r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
+ else
+ r = ceph_osdc_alloc_messages(req, GFP_NOFS);
if (r)
goto fail;
return true;
}
-static void setup_request_data(struct ceph_osd_request *req,
- struct ceph_msg *msg)
+/*
+ * Keep get_num_data_items() in sync with this function.
+ */
+static void setup_request_data(struct ceph_osd_request *req)
{
- u32 data_len = 0;
- int i;
+ struct ceph_msg *request_msg = req->r_request;
+ struct ceph_msg *reply_msg = req->r_reply;
+ struct ceph_osd_req_op *op;
- if (!list_empty(&msg->data))
+ if (req->r_request->num_data_items || req->r_reply->num_data_items)
return;
- WARN_ON(msg->data_length);
- for (i = 0; i < req->r_num_ops; i++) {
- struct ceph_osd_req_op *op = &req->r_ops[i];
-
+ WARN_ON(request_msg->data_length || reply_msg->data_length);
+ for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
switch (op->op) {
/* request */
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_WRITEFULL:
WARN_ON(op->indata_len != op->extent.length);
- ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
+ ceph_osdc_msg_data_add(request_msg,
+ &op->extent.osd_data);
break;
case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR:
WARN_ON(op->indata_len != op->xattr.name_len +
op->xattr.value_len);
- ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
+ ceph_osdc_msg_data_add(request_msg,
+ &op->xattr.osd_data);
break;
case CEPH_OSD_OP_NOTIFY_ACK:
- ceph_osdc_msg_data_add(msg,
+ ceph_osdc_msg_data_add(request_msg,
&op->notify_ack.request_data);
break;
+ case CEPH_OSD_OP_COPY_FROM:
+ ceph_osdc_msg_data_add(request_msg,
+ &op->copy_from.osd_data);
+ break;
/* reply */
case CEPH_OSD_OP_STAT:
- ceph_osdc_msg_data_add(req->r_reply,
+ ceph_osdc_msg_data_add(reply_msg,
&op->raw_data_in);
break;
case CEPH_OSD_OP_READ:
- ceph_osdc_msg_data_add(req->r_reply,
+ ceph_osdc_msg_data_add(reply_msg,
&op->extent.osd_data);
break;
case CEPH_OSD_OP_LIST_WATCHERS:
- ceph_osdc_msg_data_add(req->r_reply,
+ ceph_osdc_msg_data_add(reply_msg,
&op->list_watchers.response_data);
break;
WARN_ON(op->indata_len != op->cls.class_len +
op->cls.method_len +
op->cls.indata_len);
- ceph_osdc_msg_data_add(msg, &op->cls.request_info);
+ ceph_osdc_msg_data_add(request_msg,
+ &op->cls.request_info);
/* optional, can be NONE */
- ceph_osdc_msg_data_add(msg, &op->cls.request_data);
+ ceph_osdc_msg_data_add(request_msg,
+ &op->cls.request_data);
/* optional, can be NONE */
- ceph_osdc_msg_data_add(req->r_reply,
+ ceph_osdc_msg_data_add(reply_msg,
&op->cls.response_data);
break;
case CEPH_OSD_OP_NOTIFY:
- ceph_osdc_msg_data_add(msg,
+ ceph_osdc_msg_data_add(request_msg,
&op->notify.request_data);
- ceph_osdc_msg_data_add(req->r_reply,
+ ceph_osdc_msg_data_add(reply_msg,
&op->notify.response_data);
break;
}
-
- data_len += op->indata_len;
}
-
- WARN_ON(data_len != msg->data_length);
}
static void encode_pgid(void **p, const struct ceph_pg *pgid)
req->r_data_offset || req->r_snapc);
}
- setup_request_data(req, msg);
+ setup_request_data(req);
encode_spgid(&p, &req->r_t.spgid); /* actual spg */
ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
struct ceph_osd_client *osdc = lreq->osdc;
struct ceph_osd *osd;
+ down_write(&osdc->lock);
+ linger_register(lreq);
+ if (lreq->is_watch) {
+ lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id;
+ lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id;
+ } else {
+ lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id;
+ }
+
calc_target(osdc, &lreq->t, NULL, false);
osd = lookup_create_osd(osdc, lreq->t.osd, true);
link_linger(osd, lreq);
send_linger(lreq);
+ up_write(&osdc->lock);
}
static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
lreq->notify_id, notify_id);
} else if (!completion_done(&lreq->notify_finish_wait)) {
struct ceph_msg_data *data =
- list_first_entry_or_null(&msg->data,
- struct ceph_msg_data,
- links);
+ msg->num_data_items ? &msg->data[0] : NULL;
if (data) {
if (lreq->preply_pages) {
ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
+ return req;
+}
+
+static struct ceph_osd_request *
+alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
+{
+ struct ceph_osd_request *req;
+
+ req = alloc_linger_request(lreq);
+ if (!req)
+ return NULL;
+
+ /*
+ * Pass 0 for cookie because we don't know it yet, it will be
+ * filled in by linger_submit().
+ */
+ osd_req_op_watch_init(req, 0, 0, watch_opcode);
if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
ceph_osdc_put_request(req);
lreq->t.flags = CEPH_OSD_FLAG_WRITE;
ktime_get_real_ts64(&lreq->mtime);
- lreq->reg_req = alloc_linger_request(lreq);
+ lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH);
if (!lreq->reg_req) {
ret = -ENOMEM;
goto err_put_lreq;
}
- lreq->ping_req = alloc_linger_request(lreq);
+ lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING);
if (!lreq->ping_req) {
ret = -ENOMEM;
goto err_put_lreq;
}
- down_write(&osdc->lock);
- linger_register(lreq); /* before osd_req_op_* */
- osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
- CEPH_OSD_WATCH_OP_WATCH);
- osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
- CEPH_OSD_WATCH_OP_PING);
linger_submit(lreq);
- up_write(&osdc->lock);
-
ret = linger_reg_commit_wait(lreq);
if (ret) {
linger_cancel(lreq);
op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
- pl = kmalloc(sizeof(*pl), GFP_NOIO);
+ pl = ceph_pagelist_alloc(GFP_NOIO);
if (!pl)
return -ENOMEM;
- ceph_pagelist_init(pl);
ret = ceph_pagelist_encode_64(pl, notify_id);
ret |= ceph_pagelist_encode_64(pl, cookie);
if (payload) {
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ;
- ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+ ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
+ payload_len);
if (ret)
goto out_put_req;
- ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
- payload_len);
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
op->notify.cookie = cookie;
- pl = kmalloc(sizeof(*pl), GFP_NOIO);
+ pl = ceph_pagelist_alloc(GFP_NOIO);
if (!pl)
return -ENOMEM;
- ceph_pagelist_init(pl);
ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
ret |= ceph_pagelist_encode_32(pl, timeout);
ret |= ceph_pagelist_encode_32(pl, payload_len);
goto out_put_lreq;
}
+ /*
+ * Pass 0 for cookie because we don't know it yet, it will be
+ * filled in by linger_submit().
+ */
+ ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout,
+ payload, payload_len);
+ if (ret)
+ goto out_put_lreq;
+
/* for notify_id */
pages = ceph_alloc_page_vector(1, GFP_NOIO);
if (IS_ERR(pages)) {
ret = PTR_ERR(pages);
goto out_put_lreq;
}
-
- down_write(&osdc->lock);
- linger_register(lreq); /* before osd_req_op_* */
- ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
- timeout, payload, payload_len);
- if (ret) {
- linger_unregister(lreq);
- up_write(&osdc->lock);
- ceph_release_page_vector(pages, 1);
- goto out_put_lreq;
- }
ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
response_data),
pages, PAGE_SIZE, 0, false, true);
- linger_submit(lreq);
- up_write(&osdc->lock);
+ ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
+ if (ret)
+ goto out_put_lreq;
+
+ linger_submit(lreq);
ret = linger_reg_commit_wait(lreq);
if (!ret)
ret = linger_notify_finish_wait(lreq);
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ;
- ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
- if (ret)
- goto out_put_req;
-
pages = ceph_alloc_page_vector(1, GFP_NOIO);
if (IS_ERR(pages)) {
ret = PTR_ERR(pages);
response_data),
pages, PAGE_SIZE, 0, false, true);
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+ if (ret)
+ goto out_put_req;
+
ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) {
ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = flags;
- ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
- if (ret)
- goto out_put_req;
-
- ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+ ret = osd_req_op_cls_init(req, 0, class, method);
if (ret)
goto out_put_req;
osd_req_op_cls_response_data_pages(req, 0, &resp_page,
*resp_len, 0, false, false);
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+ if (ret)
+ goto out_put_req;
+
ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) {
goto out_map;
err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
- PAGE_SIZE, 10, true, "osd_op");
+ PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
if (err < 0)
goto out_mempool;
err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
- PAGE_SIZE, 10, true, "osd_op_reply");
+ PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
+ "osd_op_reply");
if (err < 0)
goto out_msgpool;
}
EXPORT_SYMBOL(ceph_osdc_writepages);
+static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
+ u64 src_snapid, u64 src_version,
+ struct ceph_object_id *src_oid,
+ struct ceph_object_locator *src_oloc,
+ u32 src_fadvise_flags,
+ u32 dst_fadvise_flags,
+ u8 copy_from_flags)
+{
+ struct ceph_osd_req_op *op;
+ struct page **pages;
+ void *p, *end;
+
+ pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
+ op->copy_from.snapid = src_snapid;
+ op->copy_from.src_version = src_version;
+ op->copy_from.flags = copy_from_flags;
+ op->copy_from.src_fadvise_flags = src_fadvise_flags;
+
+ p = page_address(pages[0]);
+ end = p + PAGE_SIZE;
+ ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
+ encode_oloc(&p, end, src_oloc);
+ op->indata_len = PAGE_SIZE - (end - p);
+
+ ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
+ op->indata_len, 0, false, true);
+ return 0;
+}
+
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+ u64 src_snapid, u64 src_version,
+ struct ceph_object_id *src_oid,
+ struct ceph_object_locator *src_oloc,
+ u32 src_fadvise_flags,
+ struct ceph_object_id *dst_oid,
+ struct ceph_object_locator *dst_oloc,
+ u32 dst_fadvise_flags,
+ u8 copy_from_flags)
+{
+ struct ceph_osd_request *req;
+ int ret;
+
+ req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+ if (!req)
+ return -ENOMEM;
+
+ req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+ ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
+ ceph_oid_copy(&req->r_t.base_oid, dst_oid);
+
+ ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
+ src_oloc, src_fadvise_flags,
+ dst_fadvise_flags, copy_from_flags);
+ if (ret)
+ goto out;
+
+ ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+ if (ret)
+ goto out;
+
+ ceph_osdc_start_request(osdc, req, false);
+ ret = ceph_osdc_wait_request(osdc, req);
+
+out:
+ ceph_osdc_put_request(req);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_copy_from);
+
int __init ceph_osdc_setup(void)
{
size_t size = sizeof(struct ceph_osd_request) +
u32 front_len = le32_to_cpu(hdr->front_len);
u32 data_len = le32_to_cpu(hdr->data_len);
- m = ceph_msg_new(type, front_len, GFP_NOIO, false);
+ m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
if (!m)
return NULL;