struct rw_semaphore lock_rwsem;
enum rbd_lock_state lock_state;
+ char lock_cookie[32];
struct rbd_client_id owner_cid;
struct work_struct acquired_lock_work;
struct work_struct released_lock_work;
return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
}
- static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
- {
- return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
- rbd_dev->spec->snap_id == CEPH_NOSNAP &&
- !rbd_dev->mapping.read_only;
- }
-
static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
{
return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
kref_init(&rbdc->kref);
INIT_LIST_HEAD(&rbdc->node);
- rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
+ rbdc->client = ceph_create_client(ceph_opts, rbdc);
if (IS_ERR(rbdc->client))
goto out_rbdc;
ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
Opt_read_only,
Opt_read_write,
Opt_lock_on_read,
+ Opt_exclusive,
Opt_err
};
{Opt_read_write, "read_write"},
{Opt_read_write, "rw"}, /* Alternate spelling */
{Opt_lock_on_read, "lock_on_read"},
+ {Opt_exclusive, "exclusive"},
{Opt_err, NULL}
};
int queue_depth;
bool read_only;
bool lock_on_read;
+ bool exclusive;
};
#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
#define RBD_READ_ONLY_DEFAULT false
#define RBD_LOCK_ON_READ_DEFAULT false
+ #define RBD_EXCLUSIVE_DEFAULT false
static int parse_rbd_opts_token(char *c, void *private)
{
case Opt_lock_on_read:
rbd_opts->lock_on_read = true;
break;
+ case Opt_exclusive:
+ rbd_opts->exclusive = true;
+ break;
default:
/* libceph prints "bad option" msg */
return -EINVAL;
{
struct ceph_osd_request *osd_req = obj_request->osd_req;
- osd_req->r_mtime = CURRENT_TIME;
+ ktime_get_real_ts(&osd_req->r_mtime);
osd_req->r_data_offset = obj_request->offset;
}
char cookie[32];
int ret;
- WARN_ON(__rbd_is_lock_owner(rbd_dev));
+ WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
+ rbd_dev->lock_cookie[0] != '\0');
format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
return ret;
rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
+ strcpy(rbd_dev->lock_cookie, cookie);
rbd_set_owner_cid(rbd_dev, &cid);
queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
return 0;
/*
* lock_rwsem must be held for write
*/
- static int rbd_unlock(struct rbd_device *rbd_dev)
+ static void rbd_unlock(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
- char cookie[32];
int ret;
- WARN_ON(!__rbd_is_lock_owner(rbd_dev));
-
- rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+ WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
+ rbd_dev->lock_cookie[0] == '\0');
- format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
- RBD_LOCK_NAME, cookie);
- if (ret && ret != -ENOENT) {
- rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
- return ret;
- }
+ RBD_LOCK_NAME, rbd_dev->lock_cookie);
+ if (ret && ret != -ENOENT)
+ rbd_warn(rbd_dev, "failed to unlock: %d", ret);
+ /* treat errors as the image is unlocked */
+ rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+ rbd_dev->lock_cookie[0] = '\0';
rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
- return 0;
}
static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
ret = rbd_request_lock(rbd_dev);
if (ret == -ETIMEDOUT) {
goto again; /* treat this as a dead client */
+ } else if (ret == -EROFS) {
+ rbd_warn(rbd_dev, "peer will not release lock");
+ /*
+ * If this is rbd_add_acquire_lock(), we want to fail
+ * immediately -- reuse BLACKLISTED flag. Otherwise we
+ * want to block.
+ */
+ if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
+ set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
+ /* wake "rbd map --exclusive" process */
+ wake_requests(rbd_dev, false);
+ }
} else if (ret < 0) {
rbd_warn(rbd_dev, "error requesting lock: %d", ret);
mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
return false;
- if (!rbd_unlock(rbd_dev))
- /*
- * Give others a chance to grab the lock - we would re-acquire
- * almost immediately if we got new IO during ceph_osdc_sync()
- * otherwise. We need to ack our own notifications, so this
- * lock_dwork will be requeued from rbd_wait_state_locked()
- * after wake_requests() in rbd_handle_released_lock().
- */
- cancel_delayed_work(&rbd_dev->lock_dwork);
-
+ rbd_unlock(rbd_dev);
+ /*
+ * Give others a chance to grab the lock - we would re-acquire
+ * almost immediately if we got new IO during ceph_osdc_sync()
+ * otherwise. We need to ack our own notifications, so this
+ * lock_dwork will be requeued from rbd_wait_state_locked()
+ * after wake_requests() in rbd_handle_released_lock().
+ */
+ cancel_delayed_work(&rbd_dev->lock_dwork);
return true;
}
up_read(&rbd_dev->lock_rwsem);
}
- static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
- void **p)
+ /*
+ * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
+ * ResponseMessage is needed.
+ */
+ static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
+ void **p)
{
struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
struct rbd_client_id cid = { 0 };
- bool need_to_send;
+ int result = 1;
if (struct_v >= 2) {
cid.gid = ceph_decode_64(p);
dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
cid.handle);
if (rbd_cid_equal(&cid, &my_cid))
- return false;
+ return result;
down_read(&rbd_dev->lock_rwsem);
- need_to_send = __rbd_is_lock_owner(rbd_dev);
- if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
- if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
- dout("%s rbd_dev %p queueing unlock_work\n", __func__,
- rbd_dev);
- queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+ if (__rbd_is_lock_owner(rbd_dev)) {
+ if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
+ rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
+ goto out_unlock;
+
+ /*
+ * encode ResponseMessage(0) so the peer can detect
+ * a missing owner
+ */
+ result = 0;
+
+ if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
+ if (!rbd_dev->opts->exclusive) {
+ dout("%s rbd_dev %p queueing unlock_work\n",
+ __func__, rbd_dev);
+ queue_work(rbd_dev->task_wq,
+ &rbd_dev->unlock_work);
+ } else {
+ /* refuse to release the lock */
+ result = -EROFS;
+ }
}
}
+
+ out_unlock:
up_read(&rbd_dev->lock_rwsem);
- return need_to_send;
+ return result;
}
static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
case RBD_NOTIFY_OP_REQUEST_LOCK:
- if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
- /*
- * send ResponseMessage(0) back so the client
- * can detect a missing owner
- */
+ ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
+ if (ret <= 0)
rbd_acknowledge_notify_result(rbd_dev, notify_id,
- cookie, 0);
+ cookie, ret);
else
rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
break;
ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
}
+ /*
+ * lock_rwsem must be held for write
+ */
+ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
+ {
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ char cookie[32];
+ int ret;
+
+ WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+
+ format_lock_cookie(rbd_dev, cookie);
+ ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
+ &rbd_dev->header_oloc, RBD_LOCK_NAME,
+ CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
+ RBD_LOCK_TAG, cookie);
+ if (ret) {
+ if (ret != -EOPNOTSUPP)
+ rbd_warn(rbd_dev, "failed to update lock cookie: %d",
+ ret);
+
+ /*
+ * Lock cookie cannot be updated on older OSDs, so do
+ * a manual release and queue an acquire.
+ */
+ if (rbd_release_lock(rbd_dev))
+ queue_delayed_work(rbd_dev->task_wq,
+ &rbd_dev->lock_dwork, 0);
+ } else {
+ strcpy(rbd_dev->lock_cookie, cookie);
+ }
+ }
+
static void rbd_reregister_watch(struct work_struct *work)
{
struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
struct rbd_device, watch_dwork);
- bool was_lock_owner = false;
- bool need_to_wake = false;
int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
- down_write(&rbd_dev->lock_rwsem);
- if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
- was_lock_owner = rbd_release_lock(rbd_dev);
-
mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
mutex_unlock(&rbd_dev->watch_mutex);
- goto out;
+ return;
}
ret = __rbd_register_watch(rbd_dev);
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
if (ret == -EBLACKLISTED || ret == -ENOENT) {
set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
- need_to_wake = true;
+ wake_requests(rbd_dev, true);
} else {
queue_delayed_work(rbd_dev->task_wq,
&rbd_dev->watch_dwork,
RBD_RETRY_DELAY);
}
mutex_unlock(&rbd_dev->watch_mutex);
- goto out;
+ return;
}
- need_to_wake = true;
rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
mutex_unlock(&rbd_dev->watch_mutex);
+ down_write(&rbd_dev->lock_rwsem);
+ if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+ rbd_reacquire_lock(rbd_dev);
+ up_write(&rbd_dev->lock_rwsem);
+
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
-
- if (was_lock_owner) {
- ret = rbd_try_lock(rbd_dev);
- if (ret)
- rbd_warn(rbd_dev, "reregisteration lock failed: %d",
- ret);
- }
-
- out:
- up_write(&rbd_dev->lock_rwsem);
- if (need_to_wake)
- wake_requests(rbd_dev, true);
}
/*
if (op_type != OBJ_OP_READ) {
snapc = rbd_dev->header.snapc;
ceph_get_snap_context(snapc);
- must_be_locked = rbd_is_lock_supported(rbd_dev);
- } else {
- must_be_locked = rbd_dev->opts->lock_on_read &&
- rbd_is_lock_supported(rbd_dev);
}
up_read(&rbd_dev->header_rwsem);
goto err_rq;
}
+ must_be_locked =
+ (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
+ (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
if (must_be_locked) {
down_read(&rbd_dev->lock_rwsem);
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
- !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
+ !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+ if (rbd_dev->opts->exclusive) {
+ rbd_warn(rbd_dev, "exclusive lock required");
+ result = -EROFS;
+ goto err_unlock;
+ }
rbd_wait_state_locked(rbd_dev);
-
- WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
- !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
+ }
if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
result = -EBLACKLISTED;
goto err_unlock;
static void rbd_free_disk(struct rbd_device *rbd_dev)
{
- struct gendisk *disk = rbd_dev->disk;
-
- if (!disk)
- return;
-
+ blk_cleanup_queue(rbd_dev->disk->queue);
+ blk_mq_free_tag_set(&rbd_dev->tag_set);
+ put_disk(rbd_dev->disk);
rbd_dev->disk = NULL;
- if (disk->flags & GENHD_FL_UP) {
- del_gendisk(disk);
- if (disk->queue)
- blk_cleanup_queue(disk->queue);
- blk_mq_free_tag_set(&rbd_dev->tag_set);
- }
- put_disk(disk);
}
static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
return ret;
}
-static int rbd_init_request(void *data, struct request *rq,
- unsigned int hctx_idx, unsigned int request_idx,
- unsigned int numa_node)
+static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
+ unsigned int hctx_idx, unsigned int numa_node)
{
struct work_struct *work = blk_mq_rq_to_pdu(rq);
return 0;
}
-static struct blk_mq_ops rbd_mq_ops = {
+static const struct blk_mq_ops rbd_mq_ops = {
.queue_rq = rbd_queue_rq,
.init_request = rbd_init_request,
};
q->limits.discard_granularity = segment_size;
q->limits.discard_alignment = segment_size;
blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
- q->limits.discard_zeroes_data = 1;
if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
+ /*
+ * disk_release() expects a queue ref from add_disk() and will
+ * put it. Hold an extra ref until add_disk() is called.
+ */
+ WARN_ON(!blk_get_queue(q));
disk->queue = q;
-
q->queuedata = rbd_dev;
rbd_dev->disk = disk;
rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
+ rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
copts = ceph_parse_options(options, mon_addrs,
mon_addrs + mon_addrs_size - 1,
return ret;
}
+ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
+ {
+ down_write(&rbd_dev->lock_rwsem);
+ if (__rbd_is_lock_owner(rbd_dev))
+ rbd_unlock(rbd_dev);
+ up_write(&rbd_dev->lock_rwsem);
+ }
+
+ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
+ {
+ if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+ rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
+ return -EINVAL;
+ }
+
+ /* FIXME: "rbd map --exclusive" should be in interruptible */
+ down_read(&rbd_dev->lock_rwsem);
+ rbd_wait_state_locked(rbd_dev);
+ up_read(&rbd_dev->lock_rwsem);
+ if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+ rbd_warn(rbd_dev, "failed to acquire exclusive lock");
+ return -EROFS;
+ }
+
+ return 0;
+ }
+
/*
* An rbd format 2 image has a unique identifier, distinct from the
* name given to it by the user. Internally, that identifier is
return ret;
}
+ static void rbd_dev_device_release(struct rbd_device *rbd_dev)
+ {
+ clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+ rbd_dev_mapping_clear(rbd_dev);
+ rbd_free_disk(rbd_dev);
+ if (!single_major)
+ unregister_blkdev(rbd_dev->major, rbd_dev->name);
+ }
+
/*
* rbd_dev->header_rwsem must be locked for write and will be unlocked
* upon return.
set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
- dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
- ret = device_add(&rbd_dev->dev);
+ ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
if (ret)
goto err_out_mapping;
- /* Everything's ready. Announce the disk to the world. */
-
set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
up_write(&rbd_dev->header_rwsem);
-
- spin_lock(&rbd_dev_list_lock);
- list_add_tail(&rbd_dev->node, &rbd_dev_list);
- spin_unlock(&rbd_dev_list_lock);
-
- add_disk(rbd_dev->disk);
- pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
- (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
- rbd_dev->header.features);
-
- return ret;
+ return 0;
err_out_mapping:
rbd_dev_mapping_clear(rbd_dev);
static void rbd_dev_image_release(struct rbd_device *rbd_dev)
{
rbd_dev_unprobe(rbd_dev);
+ if (rbd_dev->opts)
+ rbd_unregister_watch(rbd_dev);
rbd_dev->image_format = 0;
kfree(rbd_dev->spec->image_id);
rbd_dev->spec->image_id = NULL;
-
- rbd_dev_destroy(rbd_dev);
}
/*
rbd_dev->mapping.read_only = read_only;
rc = rbd_dev_device_setup(rbd_dev);
- if (rc) {
- /*
- * rbd_unregister_watch() can't be moved into
- * rbd_dev_image_release() without refactoring, see
- * commit 1f3ef78861ac.
- */
- rbd_unregister_watch(rbd_dev);
- rbd_dev_image_release(rbd_dev);
- goto out;
+ if (rc)
+ goto err_out_image_probe;
+
+ if (rbd_dev->opts->exclusive) {
+ rc = rbd_add_acquire_lock(rbd_dev);
+ if (rc)
+ goto err_out_device_setup;
}
+ /* Everything's ready. Announce the disk to the world. */
+
+ rc = device_add(&rbd_dev->dev);
+ if (rc)
+ goto err_out_image_lock;
+
+ add_disk(rbd_dev->disk);
+ /* see rbd_init_disk() */
+ blk_put_queue(rbd_dev->disk->queue);
+
+ spin_lock(&rbd_dev_list_lock);
+ list_add_tail(&rbd_dev->node, &rbd_dev_list);
+ spin_unlock(&rbd_dev_list_lock);
+
+ pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
+ (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
+ rbd_dev->header.features);
rc = count;
out:
module_put(THIS_MODULE);
return rc;
+ err_out_image_lock:
+ rbd_dev_image_unlock(rbd_dev);
+ err_out_device_setup:
+ rbd_dev_device_release(rbd_dev);
+ err_out_image_probe:
+ rbd_dev_image_release(rbd_dev);
err_out_rbd_dev:
rbd_dev_destroy(rbd_dev);
err_out_client:
return do_rbd_add(bus, buf, count);
}
- static void rbd_dev_device_release(struct rbd_device *rbd_dev)
- {
- rbd_free_disk(rbd_dev);
-
- spin_lock(&rbd_dev_list_lock);
- list_del_init(&rbd_dev->node);
- spin_unlock(&rbd_dev_list_lock);
-
- clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
- device_del(&rbd_dev->dev);
- rbd_dev_mapping_clear(rbd_dev);
- if (!single_major)
- unregister_blkdev(rbd_dev->major, rbd_dev->name);
- }
-
static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
{
while (rbd_dev->parent) {
}
rbd_assert(second);
rbd_dev_image_release(second);
+ rbd_dev_destroy(second);
first->parent = NULL;
first->parent_overlap = 0;
blk_set_queue_dying(rbd_dev->disk->queue);
}
- down_write(&rbd_dev->lock_rwsem);
- if (__rbd_is_lock_owner(rbd_dev))
- rbd_unlock(rbd_dev);
- up_write(&rbd_dev->lock_rwsem);
- rbd_unregister_watch(rbd_dev);
+ del_gendisk(rbd_dev->disk);
+ spin_lock(&rbd_dev_list_lock);
+ list_del_init(&rbd_dev->node);
+ spin_unlock(&rbd_dev_list_lock);
+ device_del(&rbd_dev->dev);
- /*
- * Don't free anything from rbd_dev->disk until after all
- * notifies are completely processed. Otherwise
- * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
- * in a potential use after free of rbd_dev->disk or rbd_dev.
- */
+ rbd_dev_image_unlock(rbd_dev);
rbd_dev_device_release(rbd_dev);
rbd_dev_image_release(rbd_dev);
-
+ rbd_dev_destroy(rbd_dev);
return count;
}
writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
if (writeback_stat >
CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
- set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
+ set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
set_page_writeback(page);
err = ceph_osdc_writepages(osdc, ceph_vino(inode),
bool remove_page;
dout("writepages_finish %p rc %d\n", inode, rc);
- if (rc < 0)
+ if (rc < 0) {
mapping_set_error(mapping, rc);
+ ceph_set_error_write(ci);
+ } else {
+ ceph_clear_error_write(ci);
+ }
/*
* We lost the cache cap, need to truncate the page before
if (atomic_long_dec_return(&fsc->writeback_count) <
CONGESTION_OFF_THRESH(
fsc->mount_options->congestion_kb))
- clear_bdi_congested(&fsc->backing_dev_info,
+ clear_bdi_congested(inode_to_bdi(inode),
BLK_RW_ASYNC);
- if (rc < 0)
- SetPageError(page);
-
ceph_put_snap_context(page_snap_context(page));
page->private = 0;
ClearPagePrivate(page);
if (atomic_long_inc_return(&fsc->writeback_count) >
CONGESTION_ON_THRESH(
fsc->mount_options->congestion_kb)) {
- set_bdi_congested(&fsc->backing_dev_info,
+ set_bdi_congested(inode_to_bdi(inode),
BLK_RW_ASYNC);
}
err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
wr_req->r_mtime = ci->vfs_inode.i_mtime;
+ wr_req->r_abort_on_full = true;
err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
if (!err)
{
int i;
struct ceph_fs_client *fsc = s->private;
+ struct ceph_mdsmap *mdsmap;
if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL)
return 0;
- seq_printf(s, "epoch %d\n", fsc->mdsc->mdsmap->m_epoch);
- seq_printf(s, "root %d\n", fsc->mdsc->mdsmap->m_root);
- seq_printf(s, "session_timeout %d\n",
- fsc->mdsc->mdsmap->m_session_timeout);
- seq_printf(s, "session_autoclose %d\n",
- fsc->mdsc->mdsmap->m_session_autoclose);
- for (i = 0; i < fsc->mdsc->mdsmap->m_max_mds; i++) {
- struct ceph_entity_addr *addr =
- &fsc->mdsc->mdsmap->m_info[i].addr;
- int state = fsc->mdsc->mdsmap->m_info[i].state;
-
+ mdsmap = fsc->mdsc->mdsmap;
+ seq_printf(s, "epoch %d\n", mdsmap->m_epoch);
+ seq_printf(s, "root %d\n", mdsmap->m_root);
+ seq_printf(s, "max_mds %d\n", mdsmap->m_max_mds);
+ seq_printf(s, "session_timeout %d\n", mdsmap->m_session_timeout);
+ seq_printf(s, "session_autoclose %d\n", mdsmap->m_session_autoclose);
+ for (i = 0; i < mdsmap->m_num_mds; i++) {
+ struct ceph_entity_addr *addr = &mdsmap->m_info[i].addr;
+ int state = mdsmap->m_info[i].state;
seq_printf(s, "\tmds%d\t%s\t(%s)\n", i,
ceph_pr_addr(&addr->in_addr),
ceph_mds_state_name(state));
goto out;
snprintf(name, sizeof(name), "../../bdi/%s",
- dev_name(fsc->backing_dev_info.dev));
+ dev_name(fsc->sb->s_bdi->dev));
fsc->debugfs_bdi =
debugfs_create_symlink("bdi",
fsc->client->debugfs_dir,
#include "mds_client.h"
#include "cache.h"
+ static __le32 ceph_flags_sys2wire(u32 flags)
+ {
+ u32 wire_flags = 0;
+
+ switch (flags & O_ACCMODE) {
+ case O_RDONLY:
+ wire_flags |= CEPH_O_RDONLY;
+ break;
+ case O_WRONLY:
+ wire_flags |= CEPH_O_WRONLY;
+ break;
+ case O_RDWR:
+ wire_flags |= CEPH_O_RDWR;
+ break;
+ }
+
+ #define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; }
+
+ ceph_sys2wire(O_CREAT);
+ ceph_sys2wire(O_EXCL);
+ ceph_sys2wire(O_TRUNC);
+ ceph_sys2wire(O_DIRECTORY);
+ ceph_sys2wire(O_NOFOLLOW);
+
+ #undef ceph_sys2wire
+
+ if (flags)
+ dout("unused open flags: %x", flags);
+
+ return cpu_to_le32(wire_flags);
+ }
+
/*
* Ceph file operations
*
align = (unsigned long)(it->iov->iov_base + it->iov_offset) &
(PAGE_SIZE - 1);
npages = calc_pages_for(align, nbytes);
- pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL);
- if (!pages) {
- pages = vmalloc(sizeof(*pages) * npages);
- if (!pages)
- return ERR_PTR(-ENOMEM);
- }
+ pages = kvmalloc(sizeof(*pages) * npages, GFP_KERNEL);
+ if (!pages)
+ return ERR_PTR(-ENOMEM);
for (idx = 0; idx < npages; ) {
size_t start;
if (IS_ERR(req))
goto out;
req->r_fmode = ceph_flags_to_mode(flags);
- req->r_args.open.flags = cpu_to_le32(flags);
+ req->r_args.open.flags = ceph_flags_sys2wire(flags);
req->r_args.open.mode = cpu_to_le32(create_mode);
out:
return req;
spin_lock(&ci->i_ceph_lock);
wanted = __ceph_caps_file_wanted(ci);
if (__ceph_is_any_real_caps(ci) &&
- (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) {
+ (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
int issued = __ceph_caps_issued(ci, NULL);
spin_unlock(&ci->i_ceph_lock);
dout("renew caps %p want %s issued %s updating mds_wanted\n",
req->r_callback = ceph_aio_complete_req;
req->r_inode = inode;
req->r_priv = aio_req;
+ req->r_abort_on_full = true;
ret = ceph_osdc_start_request(req->r_osdc, req, false);
out:
out:
ceph_osdc_put_request(req);
- if (ret == 0) {
- pos += len;
- written += len;
-
- if (pos > i_size_read(inode)) {
- check_caps = ceph_inode_set_size(inode, pos);
- if (check_caps)
- ceph_check_caps(ceph_inode(inode),
- CHECK_CAPS_AUTHONLY,
- NULL);
- }
- } else
+ if (ret != 0) {
+ ceph_set_error_write(ci);
break;
+ }
+
+ ceph_clear_error_write(ci);
+ pos += len;
+ written += len;
+ if (pos > i_size_read(inode)) {
+ check_caps = ceph_inode_set_size(inode, pos);
+ if (check_caps)
+ ceph_check_caps(ceph_inode(inode),
+ CHECK_CAPS_AUTHONLY,
+ NULL);
+ }
+
}
if (ret != -EOLDSNAPC && written > 0) {
}
retry_snap:
+ /* FIXME: not complete since it doesn't account for being at quota */
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
err = -ENOSPC;
goto out;
inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
- (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
+ (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) ||
+ (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
struct ceph_snap_context *snapc;
struct iov_iter data;
inode_unlock(inode);
info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
+ info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
}
if (num == 0)
goto done;
static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
{
- if (atomic_inc_not_zero(&s->s_ref)) {
+ if (refcount_inc_not_zero(&s->s_ref)) {
dout("mdsc get_session %p %d -> %d\n", s,
- atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
+ refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
return s;
} else {
dout("mdsc get_session %p 0 -- FAIL", s);
void ceph_put_mds_session(struct ceph_mds_session *s)
{
dout("mdsc put_session %p %d -> %d\n", s,
- atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
- if (atomic_dec_and_test(&s->s_ref)) {
+ refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
+ if (refcount_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer)
ceph_auth_destroy_authorizer(s->s_auth.authorizer);
kfree(s);
return NULL;
session = mdsc->sessions[mds];
dout("lookup_mds_session %p %d\n", session,
- atomic_read(&session->s_ref));
+ refcount_read(&session->s_ref));
get_session(session);
return session;
}
{
struct ceph_mds_session *s;
- if (mds >= mdsc->mdsmap->m_max_mds)
+ if (mds >= mdsc->mdsmap->m_num_mds)
return ERR_PTR(-EINVAL);
s = kzalloc(sizeof(*s), GFP_NOFS);
INIT_LIST_HEAD(&s->s_caps);
s->s_nr_caps = 0;
s->s_trim_caps = 0;
- atomic_set(&s->s_ref, 1);
+ refcount_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
s->s_num_cap_releases = 0;
}
mdsc->sessions[mds] = s;
atomic_inc(&mdsc->num_sessions);
- atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
+ refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
struct ceph_mds_session *ts;
int i, mds = session->s_mds;
- if (mds >= mdsc->mdsmap->m_max_mds)
+ if (mds >= mdsc->mdsmap->m_num_mds)
return;
mi = &mdsc->mdsmap->m_info[mds];
struct ceph_msg *msg = NULL;
struct ceph_mds_cap_release *head;
struct ceph_mds_cap_item *item;
+ struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
struct ceph_cap *cap;
LIST_HEAD(tmp_list);
int num_cap_releases;
+ __le32 barrier, *cap_barrier;
+
+ down_read(&osdc->lock);
+ barrier = cpu_to_le32(osdc->epoch_barrier);
+ up_read(&osdc->lock);
spin_lock(&session->s_cap_lock);
again:
head = msg->front.iov_base;
head->num = cpu_to_le32(0);
msg->front.iov_len = sizeof(*head);
+
+ msg->hdr.version = cpu_to_le16(2);
+ msg->hdr.compat_version = cpu_to_le16(1);
}
+
cap = list_first_entry(&tmp_list, struct ceph_cap,
session_caps);
list_del(&cap->session_caps);
ceph_put_cap(mdsc, cap);
if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+ // Append cap_barrier field
+ cap_barrier = msg->front.iov_base + msg->front.iov_len;
+ *cap_barrier = barrier;
+ msg->front.iov_len += sizeof(*cap_barrier);
+
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
ceph_con_send(&session->s_con, msg);
spin_unlock(&session->s_cap_lock);
if (msg) {
+ // Append cap_barrier field
+ cap_barrier = msg->front.iov_base + msg->front.iov_len;
+ *cap_barrier = barrier;
+ msg->front.iov_len += sizeof(*cap_barrier);
+
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
ceph_con_send(&session->s_con, msg);
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
{
struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
+ struct timespec ts;
if (!req)
return ERR_PTR(-ENOMEM);
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
- req->r_stamp = current_fs_time(mdsc->fsc->sb);
+ ktime_get_real_ts(&ts);
+ req->r_stamp = timespec_trunc(ts, mdsc->fsc->sb->s_time_gran);
req->r_op = op;
req->r_direct_mode = mode;
if (req->r_pagelist) {
struct ceph_pagelist *pagelist = req->r_pagelist;
- atomic_inc(&pagelist->refcnt);
+ refcount_inc(&pagelist->refcnt);
ceph_msg_data_add_pagelist(msg, pagelist);
msg->hdr.data_len = cpu_to_le32(pagelist->length);
} else {
seq = le64_to_cpu(h->seq);
mutex_lock(&mdsc->mutex);
- if (op == CEPH_SESSION_CLOSE)
+ if (op == CEPH_SESSION_CLOSE) {
+ get_session(session);
__unregister_session(mdsc, session);
+ }
/* FIXME: this ttl calculation is generous */
session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
mutex_unlock(&mdsc->mutex);
kick_requests(mdsc, mds);
mutex_unlock(&mdsc->mutex);
}
+ if (op == CEPH_SESSION_CLOSE)
+ ceph_put_mds_session(session);
return;
bad:
dout("check_new_map new %u old %u\n",
newmap->m_epoch, oldmap->m_epoch);
- for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
+ for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
if (mdsc->sessions[i] == NULL)
continue;
s = mdsc->sessions[i];
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
ceph_session_state_name(s->s_state));
- if (i >= newmap->m_max_mds ||
+ if (i >= newmap->m_num_mds ||
memcmp(ceph_mdsmap_get_addr(oldmap, i),
ceph_mdsmap_get_addr(newmap, i),
sizeof(struct ceph_entity_addr))) {
if (s->s_state == CEPH_MDS_SESSION_OPENING) {
/* the session never opened, just close it
* out now */
+ get_session(s);
+ __unregister_session(mdsc, s);
__wake_requests(mdsc, &s->s_waiting);
+ ceph_put_mds_session(s);
+ } else if (i >= newmap->m_num_mds) {
+ /* force close session for stopped mds */
+ get_session(s);
__unregister_session(mdsc, s);
+ __wake_requests(mdsc, &s->s_waiting);
+ kick_requests(mdsc, i);
+ mutex_unlock(&mdsc->mutex);
+
+ mutex_lock(&s->s_mutex);
+ cleanup_session_requests(mdsc, s);
+ remove_session_caps(s);
+ mutex_unlock(&s->s_mutex);
+
+ ceph_put_mds_session(s);
+
+ mutex_lock(&mdsc->mutex);
} else {
/* just close it */
mutex_unlock(&mdsc->mutex);
}
}
- for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+ for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
s = mdsc->sessions[i];
if (!s)
continue;
struct ceph_mds_session *s = con->private;
if (get_session(s)) {
- dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
+ dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
return con;
}
dout("mdsc con_get %p FAIL\n", s);
{
struct ceph_mds_session *s = con->private;
- dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
+ dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
ceph_put_mds_session(s);
}
struct ceph_options *opt)
{
struct ceph_fs_client *fsc;
- const u64 supported_features =
- CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH |
- CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA;
- const u64 required_features = 0;
int page_count;
size_t size;
int err = -ENOMEM;
if (!fsc)
return ERR_PTR(-ENOMEM);
- fsc->client = ceph_create_client(opt, fsc, supported_features,
- required_features);
+ fsc->client = ceph_create_client(opt, fsc);
if (IS_ERR(fsc->client)) {
err = PTR_ERR(fsc->client);
goto fail;
atomic_long_set(&fsc->writeback_count, 0);
- err = bdi_init(&fsc->backing_dev_info);
- if (err < 0)
- goto fail_client;
-
err = -ENOMEM;
/*
* The number of concurrent works can be high but they don't need
*/
fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1);
if (fsc->wb_wq == NULL)
- goto fail_bdi;
+ goto fail_client;
fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1);
if (fsc->pg_inv_wq == NULL)
goto fail_wb_wq;
destroy_workqueue(fsc->pg_inv_wq);
fail_wb_wq:
destroy_workqueue(fsc->wb_wq);
-fail_bdi:
- bdi_destroy(&fsc->backing_dev_info);
fail_client:
ceph_destroy_client(fsc->client);
fail:
destroy_workqueue(fsc->pg_inv_wq);
destroy_workqueue(fsc->trunc_wq);
- bdi_destroy(&fsc->backing_dev_info);
-
mempool_destroy(fsc->wb_pagevec_pool);
destroy_mount_options(fsc->mount_options);
*/
static atomic_long_t bdi_seq = ATOMIC_LONG_INIT(0);
-static int ceph_register_bdi(struct super_block *sb,
- struct ceph_fs_client *fsc)
+static int ceph_setup_bdi(struct super_block *sb, struct ceph_fs_client *fsc)
{
int err;
+ err = super_setup_bdi_name(sb, "ceph-%ld",
+ atomic_long_inc_return(&bdi_seq));
+ if (err)
+ return err;
+
/* set ra_pages based on rasize mount option? */
if (fsc->mount_options->rasize >= PAGE_SIZE)
- fsc->backing_dev_info.ra_pages =
+ sb->s_bdi->ra_pages =
(fsc->mount_options->rasize + PAGE_SIZE - 1)
>> PAGE_SHIFT;
else
- fsc->backing_dev_info.ra_pages =
- VM_MAX_READAHEAD * 1024 / PAGE_SIZE;
+ sb->s_bdi->ra_pages = VM_MAX_READAHEAD * 1024 / PAGE_SIZE;
if (fsc->mount_options->rsize > fsc->mount_options->rasize &&
fsc->mount_options->rsize >= PAGE_SIZE)
- fsc->backing_dev_info.io_pages =
+ sb->s_bdi->io_pages =
(fsc->mount_options->rsize + PAGE_SIZE - 1)
>> PAGE_SHIFT;
else if (fsc->mount_options->rsize == 0)
- fsc->backing_dev_info.io_pages = ULONG_MAX;
+ sb->s_bdi->io_pages = ULONG_MAX;
- err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
- atomic_long_inc_return(&bdi_seq));
- if (!err)
- sb->s_bdi = &fsc->backing_dev_info;
- return err;
+ return 0;
}
static struct dentry *ceph_mount(struct file_system_type *fs_type,
dout("get_sb got existing client %p\n", fsc);
} else {
dout("get_sb using new client %p\n", fsc);
- err = ceph_register_bdi(sb, fsc);
+ err = ceph_setup_bdi(sb, fsc);
if (err < 0) {
res = ERR_PTR(err);
goto out_splat;
#include <linux/writeback.h>
#include <linux/slab.h>
#include <linux/posix_acl.h>
+ #include <linux/refcount.h>
#include <linux/ceph/libceph.h>
struct workqueue_struct *trunc_wq;
atomic_long_t writeback_count;
- struct backing_dev_info backing_dev_info;
-
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_dentry_lru, *debugfs_caps;
struct dentry *debugfs_congestion_kb;
* data before flushing the snapped state (tracked here) back to the MDS.
*/
struct ceph_cap_snap {
- atomic_t nref;
+ refcount_t nref;
struct list_head ci_item;
struct ceph_cap_flush cap_flush;
static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
{
- if (atomic_dec_and_test(&capsnap->nref)) {
+ if (refcount_dec_and_test(&capsnap->nref)) {
if (capsnap->xattr_blob)
ceph_buffer_put(capsnap->xattr_blob);
kfree(capsnap);
#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */
#define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */
#define CEPH_I_FLUSH_SNAPS (1 << 10) /* need flush snapss */
+ #define CEPH_I_ERROR_WRITE (1 << 11) /* have seen write errors */
+
+ /*
+ * We set the ERROR_WRITE bit when we start seeing write errors on an inode
+ * and then clear it when they start succeeding. Note that we do a lockless
+ * check first, and only take the lock if it looks like it needs to be changed.
+ * The write submission code just takes this as a hint, so we're not too
+ * worried if a few slip through in either direction.
+ */
+ static inline void ceph_set_error_write(struct ceph_inode_info *ci)
+ {
+ if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE)) {
+ spin_lock(&ci->i_ceph_lock);
+ ci->i_ceph_flags |= CEPH_I_ERROR_WRITE;
+ spin_unlock(&ci->i_ceph_lock);
+ }
+ }
+
+ static inline void ceph_clear_error_write(struct ceph_inode_info *ci)
+ {
+ if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE) {
+ spin_lock(&ci->i_ceph_lock);
+ ci->i_ceph_flags &= ~CEPH_I_ERROR_WRITE;
+ spin_unlock(&ci->i_ceph_lock);
+ }
+ }
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
long long release_count,
}
EXPORT_SYMBOL(libceph_compatible);
+ static int param_get_supported_features(char *buffer,
+ const struct kernel_param *kp)
+ {
+ return sprintf(buffer, "0x%llx", CEPH_FEATURES_SUPPORTED_DEFAULT);
+ }
+ static const struct kernel_param_ops param_ops_supported_features = {
+ .get = param_get_supported_features,
+ };
+ module_param_cb(supported_features, ¶m_ops_supported_features, NULL,
+ S_IRUGO);
+
/*
* find filename portion of a path (/foo/bar/baz -> baz)
*/
return ptr;
}
- return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
+ return __vmalloc(size, flags, PAGE_KERNEL);
}
/*
* create a fresh client instance
*/
- struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
- u64 supported_features,
- u64 required_features)
+ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
{
struct ceph_client *client;
struct ceph_entity_addr *myaddr = NULL;
init_waitqueue_head(&client->auth_wq);
client->auth_err = 0;
- if (!ceph_test_opt(client, NOMSGAUTH))
- required_features |= CEPH_FEATURE_MSG_AUTH;
-
client->extra_mon_dispatch = NULL;
- client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
- supported_features;
- client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT |
- required_features;
+ client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
+ client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT;
+
+ if (!ceph_test_opt(client, NOMSGAUTH))
+ client->required_features |= CEPH_FEATURE_MSG_AUTH;
/* msgr */
if (ceph_test_opt(client, MYIP))
truncate_size, truncate_seq);
}
+ req->r_abort_on_full = true;
req->r_flags = flags;
req->r_base_oloc.pool = layout->pool_id;
req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
*/
static void osd_init(struct ceph_osd *osd)
{
- atomic_set(&osd->o_ref, 1);
+ refcount_set(&osd->o_ref, 1);
RB_CLEAR_NODE(&osd->o_node);
osd->o_requests = RB_ROOT;
osd->o_linger_requests = RB_ROOT;
static struct ceph_osd *get_osd(struct ceph_osd *osd)
{
- if (atomic_inc_not_zero(&osd->o_ref)) {
- dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
- atomic_read(&osd->o_ref));
+ if (refcount_inc_not_zero(&osd->o_ref)) {
+ dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
+ refcount_read(&osd->o_ref));
return osd;
} else {
dout("get_osd %p FAIL\n", osd);
static void put_osd(struct ceph_osd *osd)
{
- dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
- atomic_read(&osd->o_ref) - 1);
- if (atomic_dec_and_test(&osd->o_ref)) {
+ dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
+ refcount_read(&osd->o_ref) - 1);
+ if (refcount_dec_and_test(&osd->o_ref)) {
osd_cleanup(osd);
kfree(osd);
}
__pool_full(pi);
WARN_ON(pi->id != t->base_oloc.pool);
- return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
- (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
+ return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
+ ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
+ (osdc->osdmap->epoch < osdc->epoch_barrier);
}
enum calc_target_result {
ceph_encode_32(&p, req->r_flags);
ceph_encode_timespec(p, &req->r_mtime);
p += sizeof(struct ceph_timespec);
- /* aka reassert_version */
- memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
- p += sizeof(req->r_replay_version);
+
+ /* reassert_version */
+ memset(p, 0, sizeof(struct ceph_eversion));
+ p += sizeof(struct ceph_eversion);
/* oloc */
ceph_start_encoding(&p, 5, 4,
ceph_monc_renew_subs(&osdc->client->monc);
}
+ static void complete_request(struct ceph_osd_request *req, int err);
static void send_map_check(struct ceph_osd_request *req);
static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
enum calc_target_result ct_res;
bool need_send = false;
bool promoted = false;
+ bool need_abort = false;
WARN_ON(req->r_tid);
dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
goto promote;
}
- if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
- ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
+ if (osdc->osdmap->epoch < osdc->epoch_barrier) {
+ dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
+ osdc->epoch_barrier);
+ req->r_t.paused = true;
+ maybe_request_map(osdc);
+ } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+ ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
dout("req %p pausewr\n", req);
req->r_t.paused = true;
maybe_request_map(osdc);
pr_warn_ratelimited("FULL or reached pool quota\n");
req->r_t.paused = true;
maybe_request_map(osdc);
+ if (req->r_abort_on_full)
+ need_abort = true;
} else if (!osd_homeless(osd)) {
need_send = true;
} else {
link_request(osd, req);
if (need_send)
send_request(req);
+ else if (need_abort)
+ complete_request(req, -ENOSPC);
mutex_unlock(&osd->lock);
if (ct_res == CALC_TARGET_POOL_DNE)
complete_request(req, err);
}
+ static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
+ {
+ if (likely(eb > osdc->epoch_barrier)) {
+ dout("updating epoch_barrier from %u to %u\n",
+ osdc->epoch_barrier, eb);
+ osdc->epoch_barrier = eb;
+ /* Request map if we're not to the barrier yet */
+ if (eb > osdc->osdmap->epoch)
+ maybe_request_map(osdc);
+ }
+ }
+
+ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
+ {
+ down_read(&osdc->lock);
+ if (unlikely(eb > osdc->epoch_barrier)) {
+ up_read(&osdc->lock);
+ down_write(&osdc->lock);
+ update_epoch_barrier(osdc, eb);
+ up_write(&osdc->lock);
+ } else {
+ up_read(&osdc->lock);
+ }
+ }
+ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
+
+ /*
+ * Drop all pending requests that are stalled waiting on a full condition to
+ * clear, and complete them with ENOSPC as the return code. Set the
+ * osdc->epoch_barrier to the latest map epoch that we've seen if any were
+ * cancelled.
+ */
+ static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
+ {
+ struct rb_node *n;
+ bool victims = false;
+
+ dout("enter abort_on_full\n");
+
+ if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
+ goto out;
+
+ /* Scan list and see if there is anything to abort */
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+ struct rb_node *m;
+
+ m = rb_first(&osd->o_requests);
+ while (m) {
+ struct ceph_osd_request *req = rb_entry(m,
+ struct ceph_osd_request, r_node);
+ m = rb_next(m);
+
+ if (req->r_abort_on_full) {
+ victims = true;
+ break;
+ }
+ }
+ if (victims)
+ break;
+ }
+
+ if (!victims)
+ goto out;
+
+ /*
+ * Update the barrier to current epoch if it's behind that point,
+ * since we know we have some calls to be aborted in the tree.
+ */
+ update_epoch_barrier(osdc, osdc->osdmap->epoch);
+
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+ struct rb_node *m;
+
+ m = rb_first(&osd->o_requests);
+ while (m) {
+ struct ceph_osd_request *req = rb_entry(m,
+ struct ceph_osd_request, r_node);
+ m = rb_next(m);
+
+ if (req->r_abort_on_full &&
+ (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
+ pool_full(osdc, req->r_t.target_oloc.pool)))
+ abort_request(req, -ENOSPC);
+ }
+ }
+ out:
+ dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
+ }
+
static void check_pool_dne(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
have_pool_full(osdc);
- if (was_pauserd || was_pausewr || pauserd || pausewr)
+ if (was_pauserd || was_pausewr || pauserd || pausewr ||
+ osdc->osdmap->epoch < osdc->epoch_barrier)
maybe_request_map(osdc);
kick_requests(osdc, &need_resend, &need_resend_linger);
+ ceph_osdc_abort_on_full(osdc);
ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
osdc->osdmap->epoch);
up_write(&osdc->lock);
ceph_oid_copy(&lreq->t.base_oid, oid);
ceph_oloc_copy(&lreq->t.base_oloc, oloc);
lreq->t.flags = CEPH_OSD_FLAG_WRITE;
- lreq->mtime = CURRENT_TIME;
+ ktime_get_real_ts(&lreq->mtime);
lreq->reg_req = alloc_linger_request(lreq);
if (!lreq->reg_req) {
ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
req->r_flags = CEPH_OSD_FLAG_WRITE;
- req->r_mtime = CURRENT_TIME;
+ ktime_get_real_ts(&req->r_mtime);
osd_req_op_watch_init(req, 0, lreq->linger_id,
CEPH_OSD_WATCH_OP_UNWATCH);
close_osd(osd);
}
up_write(&osdc->lock);
- WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
+ WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
osd_cleanup(&osdc->homeless_osd);
WARN_ON(!list_empty(&osdc->osd_lru));