Merge tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 10 May 2017 15:42:33 +0000 (08:42 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 10 May 2017 15:42:33 +0000 (08:42 -0700)
Pull ceph updates from Ilya Dryomov:
 "The two main items are support for disabling automatic rbd exclusive
  lock transfers from myself and the long awaited -ENOSPC handling
  series from Jeff.

  The former will allow rbd users to take advantage of exclusive lock's
  built-in blacklist/break-lock functionality while staying in control
  of who owns the lock. With the latter in place, we will abort
  filesystem writes on -ENOSPC instead of having them block
  indefinitely.

  Beyond that we've got the usual pile of filesystem fixes from Zheng,
  some refcount_t conversion patches from Elena and a patch for an
  ancient open() flags handling bug from Alexander"

* tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client: (31 commits)
  ceph: fix memory leak in __ceph_setxattr()
  ceph: fix file open flags on ppc64
  ceph: choose readdir frag based on previous readdir reply
  rbd: exclusive map option
  rbd: return ResponseMessage result from rbd_handle_request_lock()
  rbd: kill rbd_is_lock_supported()
  rbd: support updating the lock cookie without releasing the lock
  rbd: store lock cookie
  rbd: ignore unlock errors
  rbd: fix error handling around rbd_init_disk()
  rbd: move rbd_unregister_watch() call into rbd_dev_image_release()
  rbd: move rbd_dev_destroy() call out of rbd_dev_image_release()
  ceph: when seeing write errors on an inode, switch to sync writes
  Revert "ceph: SetPageError() for writeback pages if writepages fails"
  ceph: handle epoch barriers in cap messages
  libceph: add an epoch_barrier field to struct ceph_osd_client
  libceph: abort already submitted but abortable requests when map or pool goes full
  libceph: allow requests to return immediately on full conditions if caller wishes
  libceph: remove req->r_replay_version
  ceph: make seeky readdir more efficient
  ...

1  2 
drivers/block/rbd.c
fs/ceph/addr.c
fs/ceph/debugfs.c
fs/ceph/file.c
fs/ceph/mds_client.c
fs/ceph/super.c
fs/ceph/super.h
net/ceph/ceph_common.c
net/ceph/osd_client.c

diff --combined drivers/block/rbd.c
index 26812c1ed0cf0a80c6cc0c4a97fa2a15bd0d8e23,3402ff7414c559b27817ae1f50fac0081192b991..454bf9c34882f33d673ccbaf0c8afa4f3ee18ad4
@@@ -387,6 -387,7 +387,7 @@@ struct rbd_device 
  
        struct rw_semaphore     lock_rwsem;
        enum rbd_lock_state     lock_state;
+       char                    lock_cookie[32];
        struct rbd_client_id    owner_cid;
        struct work_struct      acquired_lock_work;
        struct work_struct      released_lock_work;
@@@ -477,13 -478,6 +478,6 @@@ static int minor_to_rbd_dev_id(int mino
        return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
  }
  
- static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
- {
-       return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
-              rbd_dev->spec->snap_id == CEPH_NOSNAP &&
-              !rbd_dev->mapping.read_only;
- }
  static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
  {
        return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
@@@ -731,7 -725,7 +725,7 @@@ static struct rbd_client *rbd_client_cr
        kref_init(&rbdc->kref);
        INIT_LIST_HEAD(&rbdc->node);
  
-       rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
+       rbdc->client = ceph_create_client(ceph_opts, rbdc);
        if (IS_ERR(rbdc->client))
                goto out_rbdc;
        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
@@@ -804,6 -798,7 +798,7 @@@ enum 
        Opt_read_only,
        Opt_read_write,
        Opt_lock_on_read,
+       Opt_exclusive,
        Opt_err
  };
  
@@@ -816,6 -811,7 +811,7 @@@ static match_table_t rbd_opts_tokens = 
        {Opt_read_write, "read_write"},
        {Opt_read_write, "rw"},         /* Alternate spelling */
        {Opt_lock_on_read, "lock_on_read"},
+       {Opt_exclusive, "exclusive"},
        {Opt_err, NULL}
  };
  
@@@ -823,11 -819,13 +819,13 @@@ struct rbd_options 
        int     queue_depth;
        bool    read_only;
        bool    lock_on_read;
+       bool    exclusive;
  };
  
  #define RBD_QUEUE_DEPTH_DEFAULT       BLKDEV_MAX_RQ
  #define RBD_READ_ONLY_DEFAULT false
  #define RBD_LOCK_ON_READ_DEFAULT false
+ #define RBD_EXCLUSIVE_DEFAULT false
  
  static int parse_rbd_opts_token(char *c, void *private)
  {
        case Opt_lock_on_read:
                rbd_opts->lock_on_read = true;
                break;
+       case Opt_exclusive:
+               rbd_opts->exclusive = true;
+               break;
        default:
                /* libceph prints "bad option" msg */
                return -EINVAL;
@@@ -1922,7 -1923,7 +1923,7 @@@ static void rbd_osd_req_format_write(st
  {
        struct ceph_osd_request *osd_req = obj_request->osd_req;
  
 -      osd_req->r_mtime = CURRENT_TIME;
 +      ktime_get_real_ts(&osd_req->r_mtime);
        osd_req->r_data_offset = obj_request->offset;
  }
  
@@@ -3079,7 -3080,8 +3080,8 @@@ static int rbd_lock(struct rbd_device *
        char cookie[32];
        int ret;
  
-       WARN_ON(__rbd_is_lock_owner(rbd_dev));
+       WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
+               rbd_dev->lock_cookie[0] != '\0');
  
        format_lock_cookie(rbd_dev, cookie);
        ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
                return ret;
  
        rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
+       strcpy(rbd_dev->lock_cookie, cookie);
        rbd_set_owner_cid(rbd_dev, &cid);
        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
        return 0;
  /*
   * lock_rwsem must be held for write
   */
- static int rbd_unlock(struct rbd_device *rbd_dev)
+ static void rbd_unlock(struct rbd_device *rbd_dev)
  {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       char cookie[32];
        int ret;
  
-       WARN_ON(!__rbd_is_lock_owner(rbd_dev));
-       rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+       WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
+               rbd_dev->lock_cookie[0] == '\0');
  
-       format_lock_cookie(rbd_dev, cookie);
        ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
-                             RBD_LOCK_NAME, cookie);
-       if (ret && ret != -ENOENT) {
-               rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
-               return ret;
-       }
+                             RBD_LOCK_NAME, rbd_dev->lock_cookie);
+       if (ret && ret != -ENOENT)
+               rbd_warn(rbd_dev, "failed to unlock: %d", ret);
  
+       /* treat errors as the image is unlocked */
+       rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+       rbd_dev->lock_cookie[0] = '\0';
        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
        queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
-       return 0;
  }
  
  static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
@@@ -3447,6 -3447,18 +3447,18 @@@ again
        ret = rbd_request_lock(rbd_dev);
        if (ret == -ETIMEDOUT) {
                goto again; /* treat this as a dead client */
+       } else if (ret == -EROFS) {
+               rbd_warn(rbd_dev, "peer will not release lock");
+               /*
+                * If this is rbd_add_acquire_lock(), we want to fail
+                * immediately -- reuse BLACKLISTED flag.  Otherwise we
+                * want to block.
+                */
+               if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
+                       set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
+                       /* wake "rbd map --exclusive" process */
+                       wake_requests(rbd_dev, false);
+               }
        } else if (ret < 0) {
                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@@ -3490,16 -3502,15 +3502,15 @@@ static bool rbd_release_lock(struct rbd
        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
                return false;
  
-       if (!rbd_unlock(rbd_dev))
-               /*
-                * Give others a chance to grab the lock - we would re-acquire
-                * almost immediately if we got new IO during ceph_osdc_sync()
-                * otherwise.  We need to ack our own notifications, so this
-                * lock_dwork will be requeued from rbd_wait_state_locked()
-                * after wake_requests() in rbd_handle_released_lock().
-                */
-               cancel_delayed_work(&rbd_dev->lock_dwork);
+       rbd_unlock(rbd_dev);
+       /*
+        * Give others a chance to grab the lock - we would re-acquire
+        * almost immediately if we got new IO during ceph_osdc_sync()
+        * otherwise.  We need to ack our own notifications, so this
+        * lock_dwork will be requeued from rbd_wait_state_locked()
+        * after wake_requests() in rbd_handle_released_lock().
+        */
+       cancel_delayed_work(&rbd_dev->lock_dwork);
        return true;
  }
  
@@@ -3580,12 -3591,16 +3591,16 @@@ static void rbd_handle_released_lock(st
        up_read(&rbd_dev->lock_rwsem);
  }
  
- static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
-                                   void **p)
+ /*
+  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
+  * ResponseMessage is needed.
+  */
+ static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
+                                  void **p)
  {
        struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
        struct rbd_client_id cid = { 0 };
-       bool need_to_send;
+       int result = 1;
  
        if (struct_v >= 2) {
                cid.gid = ceph_decode_64(p);
        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
             cid.handle);
        if (rbd_cid_equal(&cid, &my_cid))
-               return false;
+               return result;
  
        down_read(&rbd_dev->lock_rwsem);
-       need_to_send = __rbd_is_lock_owner(rbd_dev);
-       if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
-               if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
-                       dout("%s rbd_dev %p queueing unlock_work\n", __func__,
-                            rbd_dev);
-                       queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+       if (__rbd_is_lock_owner(rbd_dev)) {
+               if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
+                   rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
+                       goto out_unlock;
+               /*
+                * encode ResponseMessage(0) so the peer can detect
+                * a missing owner
+                */
+               result = 0;
+               if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
+                       if (!rbd_dev->opts->exclusive) {
+                               dout("%s rbd_dev %p queueing unlock_work\n",
+                                    __func__, rbd_dev);
+                               queue_work(rbd_dev->task_wq,
+                                          &rbd_dev->unlock_work);
+                       } else {
+                               /* refuse to release the lock */
+                               result = -EROFS;
+                       }
                }
        }
+ out_unlock:
        up_read(&rbd_dev->lock_rwsem);
-       return need_to_send;
+       return result;
  }
  
  static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
@@@ -3690,13 -3722,10 +3722,10 @@@ static void rbd_watch_cb(void *arg, u6
                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
                break;
        case RBD_NOTIFY_OP_REQUEST_LOCK:
-               if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
-                       /*
-                        * send ResponseMessage(0) back so the client
-                        * can detect a missing owner
-                        */
+               ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
+               if (ret <= 0)
                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
-                                                     cookie, 0);
+                                                     cookie, ret);
                else
                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
                break;
@@@ -3821,24 -3850,51 +3850,51 @@@ static void rbd_unregister_watch(struc
        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
  }
  
+ /*
+  * lock_rwsem must be held for write
+  */
+ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
+ {
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       char cookie[32];
+       int ret;
+       WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+       format_lock_cookie(rbd_dev, cookie);
+       ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, RBD_LOCK_NAME,
+                                 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
+                                 RBD_LOCK_TAG, cookie);
+       if (ret) {
+               if (ret != -EOPNOTSUPP)
+                       rbd_warn(rbd_dev, "failed to update lock cookie: %d",
+                                ret);
+               /*
+                * Lock cookie cannot be updated on older OSDs, so do
+                * a manual release and queue an acquire.
+                */
+               if (rbd_release_lock(rbd_dev))
+                       queue_delayed_work(rbd_dev->task_wq,
+                                          &rbd_dev->lock_dwork, 0);
+       } else {
+               strcpy(rbd_dev->lock_cookie, cookie);
+       }
+ }
  static void rbd_reregister_watch(struct work_struct *work)
  {
        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
                                            struct rbd_device, watch_dwork);
-       bool was_lock_owner = false;
-       bool need_to_wake = false;
        int ret;
  
        dout("%s rbd_dev %p\n", __func__, rbd_dev);
  
-       down_write(&rbd_dev->lock_rwsem);
-       if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
-               was_lock_owner = rbd_release_lock(rbd_dev);
        mutex_lock(&rbd_dev->watch_mutex);
        if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
                mutex_unlock(&rbd_dev->watch_mutex);
-               goto out;
+               return;
        }
  
        ret = __rbd_register_watch(rbd_dev);
                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
                if (ret == -EBLACKLISTED || ret == -ENOENT) {
                        set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
-                       need_to_wake = true;
+                       wake_requests(rbd_dev, true);
                } else {
                        queue_delayed_work(rbd_dev->task_wq,
                                           &rbd_dev->watch_dwork,
                                           RBD_RETRY_DELAY);
                }
                mutex_unlock(&rbd_dev->watch_mutex);
-               goto out;
+               return;
        }
  
-       need_to_wake = true;
        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
        mutex_unlock(&rbd_dev->watch_mutex);
  
+       down_write(&rbd_dev->lock_rwsem);
+       if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+               rbd_reacquire_lock(rbd_dev);
+       up_write(&rbd_dev->lock_rwsem);
        ret = rbd_dev_refresh(rbd_dev);
        if (ret)
                rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
-       if (was_lock_owner) {
-               ret = rbd_try_lock(rbd_dev);
-               if (ret)
-                       rbd_warn(rbd_dev, "reregisteration lock failed: %d",
-                                ret);
-       }
- out:
-       up_write(&rbd_dev->lock_rwsem);
-       if (need_to_wake)
-               wake_requests(rbd_dev, true);
  }
  
  /*
@@@ -4034,10 -4082,6 +4082,6 @@@ static void rbd_queue_workfn(struct wor
        if (op_type != OBJ_OP_READ) {
                snapc = rbd_dev->header.snapc;
                ceph_get_snap_context(snapc);
-               must_be_locked = rbd_is_lock_supported(rbd_dev);
-       } else {
-               must_be_locked = rbd_dev->opts->lock_on_read &&
-                                       rbd_is_lock_supported(rbd_dev);
        }
        up_read(&rbd_dev->header_rwsem);
  
                goto err_rq;
        }
  
+       must_be_locked =
+           (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
+           (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
        if (must_be_locked) {
                down_read(&rbd_dev->lock_rwsem);
                if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
-                   !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
+                   !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+                       if (rbd_dev->opts->exclusive) {
+                               rbd_warn(rbd_dev, "exclusive lock required");
+                               result = -EROFS;
+                               goto err_unlock;
+                       }
                        rbd_wait_state_locked(rbd_dev);
-               WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
-                       !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
+               }
                if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
                        result = -EBLACKLISTED;
                        goto err_unlock;
@@@ -4114,19 -4164,10 +4164,10 @@@ static int rbd_queue_rq(struct blk_mq_h
  
  static void rbd_free_disk(struct rbd_device *rbd_dev)
  {
-       struct gendisk *disk = rbd_dev->disk;
-       if (!disk)
-               return;
+       blk_cleanup_queue(rbd_dev->disk->queue);
+       blk_mq_free_tag_set(&rbd_dev->tag_set);
+       put_disk(rbd_dev->disk);
        rbd_dev->disk = NULL;
-       if (disk->flags & GENHD_FL_UP) {
-               del_gendisk(disk);
-               if (disk->queue)
-                       blk_cleanup_queue(disk->queue);
-               blk_mq_free_tag_set(&rbd_dev->tag_set);
-       }
-       put_disk(disk);
  }
  
  static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
@@@ -4307,8 -4348,9 +4348,8 @@@ out
        return ret;
  }
  
 -static int rbd_init_request(void *data, struct request *rq,
 -              unsigned int hctx_idx, unsigned int request_idx,
 -              unsigned int numa_node)
 +static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
 +              unsigned int hctx_idx, unsigned int numa_node)
  {
        struct work_struct *work = blk_mq_rq_to_pdu(rq);
  
        return 0;
  }
  
 -static struct blk_mq_ops rbd_mq_ops = {
 +static const struct blk_mq_ops rbd_mq_ops = {
        .queue_rq       = rbd_queue_rq,
        .init_request   = rbd_init_request,
  };
@@@ -4379,12 -4421,17 +4420,16 @@@ static int rbd_init_disk(struct rbd_dev
        q->limits.discard_granularity = segment_size;
        q->limits.discard_alignment = segment_size;
        blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
 -      q->limits.discard_zeroes_data = 1;
  
        if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
                q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
  
+       /*
+        * disk_release() expects a queue ref from add_disk() and will
+        * put it.  Hold an extra ref until add_disk() is called.
+        */
+       WARN_ON(!blk_get_queue(q));
        disk->queue = q;
        q->queuedata = rbd_dev;
  
        rbd_dev->disk = disk;
@@@ -5624,6 -5671,7 +5669,7 @@@ static int rbd_add_parse_args(const cha
        rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
        rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
        rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
+       rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
  
        copts = ceph_parse_options(options, mon_addrs,
                                        mon_addrs + mon_addrs_size - 1,
@@@ -5682,6 -5730,33 +5728,33 @@@ again
        return ret;
  }
  
+ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
+ {
+       down_write(&rbd_dev->lock_rwsem);
+       if (__rbd_is_lock_owner(rbd_dev))
+               rbd_unlock(rbd_dev);
+       up_write(&rbd_dev->lock_rwsem);
+ }
+ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
+ {
+       if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+               rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
+               return -EINVAL;
+       }
+       /* FIXME: "rbd map --exclusive" should be in interruptible */
+       down_read(&rbd_dev->lock_rwsem);
+       rbd_wait_state_locked(rbd_dev);
+       up_read(&rbd_dev->lock_rwsem);
+       if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+               rbd_warn(rbd_dev, "failed to acquire exclusive lock");
+               return -EROFS;
+       }
+       return 0;
+ }
  /*
   * An rbd format 2 image has a unique identifier, distinct from the
   * name given to it by the user.  Internally, that identifier is
@@@ -5873,6 -5948,15 +5946,15 @@@ out_err
        return ret;
  }
  
+ static void rbd_dev_device_release(struct rbd_device *rbd_dev)
+ {
+       clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+       rbd_dev_mapping_clear(rbd_dev);
+       rbd_free_disk(rbd_dev);
+       if (!single_major)
+               unregister_blkdev(rbd_dev->major, rbd_dev->name);
+ }
  /*
   * rbd_dev->header_rwsem must be locked for write and will be unlocked
   * upon return.
@@@ -5908,26 -5992,13 +5990,13 @@@ static int rbd_dev_device_setup(struct 
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
        set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
  
-       dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
-       ret = device_add(&rbd_dev->dev);
+       ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
        if (ret)
                goto err_out_mapping;
  
-       /* Everything's ready.  Announce the disk to the world. */
        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        up_write(&rbd_dev->header_rwsem);
-       spin_lock(&rbd_dev_list_lock);
-       list_add_tail(&rbd_dev->node, &rbd_dev_list);
-       spin_unlock(&rbd_dev_list_lock);
-       add_disk(rbd_dev->disk);
-       pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
-               (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
-               rbd_dev->header.features);
-       return ret;
+       return 0;
  
  err_out_mapping:
        rbd_dev_mapping_clear(rbd_dev);
@@@ -5962,11 -6033,11 +6031,11 @@@ static int rbd_dev_header_name(struct r
  static void rbd_dev_image_release(struct rbd_device *rbd_dev)
  {
        rbd_dev_unprobe(rbd_dev);
+       if (rbd_dev->opts)
+               rbd_unregister_watch(rbd_dev);
        rbd_dev->image_format = 0;
        kfree(rbd_dev->spec->image_id);
        rbd_dev->spec->image_id = NULL;
-       rbd_dev_destroy(rbd_dev);
  }
  
  /*
@@@ -6126,22 -6197,43 +6195,43 @@@ static ssize_t do_rbd_add(struct bus_ty
        rbd_dev->mapping.read_only = read_only;
  
        rc = rbd_dev_device_setup(rbd_dev);
-       if (rc) {
-               /*
-                * rbd_unregister_watch() can't be moved into
-                * rbd_dev_image_release() without refactoring, see
-                * commit 1f3ef78861ac.
-                */
-               rbd_unregister_watch(rbd_dev);
-               rbd_dev_image_release(rbd_dev);
-               goto out;
+       if (rc)
+               goto err_out_image_probe;
+       if (rbd_dev->opts->exclusive) {
+               rc = rbd_add_acquire_lock(rbd_dev);
+               if (rc)
+                       goto err_out_device_setup;
        }
  
+       /* Everything's ready.  Announce the disk to the world. */
+       rc = device_add(&rbd_dev->dev);
+       if (rc)
+               goto err_out_image_lock;
+       add_disk(rbd_dev->disk);
+       /* see rbd_init_disk() */
+       blk_put_queue(rbd_dev->disk->queue);
+       spin_lock(&rbd_dev_list_lock);
+       list_add_tail(&rbd_dev->node, &rbd_dev_list);
+       spin_unlock(&rbd_dev_list_lock);
+       pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
+               (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
+               rbd_dev->header.features);
        rc = count;
  out:
        module_put(THIS_MODULE);
        return rc;
  
+ err_out_image_lock:
+       rbd_dev_image_unlock(rbd_dev);
+ err_out_device_setup:
+       rbd_dev_device_release(rbd_dev);
+ err_out_image_probe:
+       rbd_dev_image_release(rbd_dev);
  err_out_rbd_dev:
        rbd_dev_destroy(rbd_dev);
  err_out_client:
@@@ -6169,21 -6261,6 +6259,6 @@@ static ssize_t rbd_add_single_major(str
        return do_rbd_add(bus, buf, count);
  }
  
- static void rbd_dev_device_release(struct rbd_device *rbd_dev)
- {
-       rbd_free_disk(rbd_dev);
-       spin_lock(&rbd_dev_list_lock);
-       list_del_init(&rbd_dev->node);
-       spin_unlock(&rbd_dev_list_lock);
-       clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-       device_del(&rbd_dev->dev);
-       rbd_dev_mapping_clear(rbd_dev);
-       if (!single_major)
-               unregister_blkdev(rbd_dev->major, rbd_dev->name);
- }
  static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
  {
        while (rbd_dev->parent) {
                }
                rbd_assert(second);
                rbd_dev_image_release(second);
+               rbd_dev_destroy(second);
                first->parent = NULL;
                first->parent_overlap = 0;
  
@@@ -6269,21 -6347,16 +6345,16 @@@ static ssize_t do_rbd_remove(struct bus
                blk_set_queue_dying(rbd_dev->disk->queue);
        }
  
-       down_write(&rbd_dev->lock_rwsem);
-       if (__rbd_is_lock_owner(rbd_dev))
-               rbd_unlock(rbd_dev);
-       up_write(&rbd_dev->lock_rwsem);
-       rbd_unregister_watch(rbd_dev);
+       del_gendisk(rbd_dev->disk);
+       spin_lock(&rbd_dev_list_lock);
+       list_del_init(&rbd_dev->node);
+       spin_unlock(&rbd_dev_list_lock);
+       device_del(&rbd_dev->dev);
  
-       /*
-        * Don't free anything from rbd_dev->disk until after all
-        * notifies are completely processed. Otherwise
-        * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
-        * in a potential use after free of rbd_dev->disk or rbd_dev.
-        */
+       rbd_dev_image_unlock(rbd_dev);
        rbd_dev_device_release(rbd_dev);
        rbd_dev_image_release(rbd_dev);
+       rbd_dev_destroy(rbd_dev);
        return count;
  }
  
diff --combined fs/ceph/addr.c
index 9ecb2fd348cb3c01727a903c8ae151582179991e,e253102b43cd37b9dcad4b1c1c9724ac12e29f1a..1e71e6ca5ddfb09466bee7d8721d064a4a846176
@@@ -578,7 -578,7 +578,7 @@@ static int writepage_nounlock(struct pa
        writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
        if (writeback_stat >
            CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
 -              set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
 +              set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
  
        set_page_writeback(page);
        err = ceph_osdc_writepages(osdc, ceph_vino(inode),
@@@ -670,8 -670,12 +670,12 @@@ static void writepages_finish(struct ce
        bool remove_page;
  
        dout("writepages_finish %p rc %d\n", inode, rc);
-       if (rc < 0)
+       if (rc < 0) {
                mapping_set_error(mapping, rc);
+               ceph_set_error_write(ci);
+       } else {
+               ceph_clear_error_write(ci);
+       }
  
        /*
         * We lost the cache cap, need to truncate the page before
                        if (atomic_long_dec_return(&fsc->writeback_count) <
                             CONGESTION_OFF_THRESH(
                                        fsc->mount_options->congestion_kb))
 -                              clear_bdi_congested(&fsc->backing_dev_info,
 +                              clear_bdi_congested(inode_to_bdi(inode),
                                                    BLK_RW_ASYNC);
  
-                       if (rc < 0)
-                               SetPageError(page);
                        ceph_put_snap_context(page_snap_context(page));
                        page->private = 0;
                        ClearPagePrivate(page);
@@@ -979,7 -980,7 +980,7 @@@ get_more_pages
                        if (atomic_long_inc_return(&fsc->writeback_count) >
                            CONGESTION_ON_THRESH(
                                    fsc->mount_options->congestion_kb)) {
 -                              set_bdi_congested(&fsc->backing_dev_info,
 +                              set_bdi_congested(inode_to_bdi(inode),
                                                  BLK_RW_ASYNC);
                        }
  
@@@ -1892,6 -1893,7 +1893,7 @@@ static int __ceph_pool_perm_get(struct 
        err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
  
        wr_req->r_mtime = ci->vfs_inode.i_mtime;
+       wr_req->r_abort_on_full = true;
        err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
  
        if (!err)
diff --combined fs/ceph/debugfs.c
index 3ef11bc8d728d6129818428ba192830186e23804,4107a887ca34a6de803ba1e3523d332810d629e4..4e2d112c982f4b12852ba7b8bad58327783bc3db
@@@ -22,20 -22,19 +22,19 @@@ static int mdsmap_show(struct seq_file 
  {
        int i;
        struct ceph_fs_client *fsc = s->private;
+       struct ceph_mdsmap *mdsmap;
  
        if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL)
                return 0;
-       seq_printf(s, "epoch %d\n", fsc->mdsc->mdsmap->m_epoch);
-       seq_printf(s, "root %d\n", fsc->mdsc->mdsmap->m_root);
-       seq_printf(s, "session_timeout %d\n",
-                      fsc->mdsc->mdsmap->m_session_timeout);
-       seq_printf(s, "session_autoclose %d\n",
-                      fsc->mdsc->mdsmap->m_session_autoclose);
-       for (i = 0; i < fsc->mdsc->mdsmap->m_max_mds; i++) {
-               struct ceph_entity_addr *addr =
-                       &fsc->mdsc->mdsmap->m_info[i].addr;
-               int state = fsc->mdsc->mdsmap->m_info[i].state;
+       mdsmap = fsc->mdsc->mdsmap;
+       seq_printf(s, "epoch %d\n", mdsmap->m_epoch);
+       seq_printf(s, "root %d\n", mdsmap->m_root);
+       seq_printf(s, "max_mds %d\n", mdsmap->m_max_mds);
+       seq_printf(s, "session_timeout %d\n", mdsmap->m_session_timeout);
+       seq_printf(s, "session_autoclose %d\n", mdsmap->m_session_autoclose);
+       for (i = 0; i < mdsmap->m_num_mds; i++) {
+               struct ceph_entity_addr *addr = &mdsmap->m_info[i].addr;
+               int state = mdsmap->m_info[i].state;
                seq_printf(s, "\tmds%d\t%s\t(%s)\n", i,
                               ceph_pr_addr(&addr->in_addr),
                               ceph_mds_state_name(state));
@@@ -251,7 -250,7 +250,7 @@@ int ceph_fs_debugfs_init(struct ceph_fs
                goto out;
  
        snprintf(name, sizeof(name), "../../bdi/%s",
 -               dev_name(fsc->backing_dev_info.dev));
 +               dev_name(fsc->sb->s_bdi->dev));
        fsc->debugfs_bdi =
                debugfs_create_symlink("bdi",
                                       fsc->client->debugfs_dir,
diff --combined fs/ceph/file.c
index 18c045e2ead6d822400291ae6353cd4b92803755,9d2eeed9e3232b11e65cb3831a7a76c47c148809..3fdde0b283c9b86f7fa6aaa9585593699be29fca
  #include "mds_client.h"
  #include "cache.h"
  
+ static __le32 ceph_flags_sys2wire(u32 flags)
+ {
+       u32 wire_flags = 0;
+       switch (flags & O_ACCMODE) {
+       case O_RDONLY:
+               wire_flags |= CEPH_O_RDONLY;
+               break;
+       case O_WRONLY:
+               wire_flags |= CEPH_O_WRONLY;
+               break;
+       case O_RDWR:
+               wire_flags |= CEPH_O_RDWR;
+               break;
+       }
+ #define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; }
+       ceph_sys2wire(O_CREAT);
+       ceph_sys2wire(O_EXCL);
+       ceph_sys2wire(O_TRUNC);
+       ceph_sys2wire(O_DIRECTORY);
+       ceph_sys2wire(O_NOFOLLOW);
+ #undef ceph_sys2wire
+       if (flags)
+               dout("unused open flags: %x", flags);
+       return cpu_to_le32(wire_flags);
+ }
  /*
   * Ceph file operations
   *
@@@ -74,9 -106,12 +106,9 @@@ dio_get_pages_alloc(const struct iov_it
        align = (unsigned long)(it->iov->iov_base + it->iov_offset) &
                (PAGE_SIZE - 1);
        npages = calc_pages_for(align, nbytes);
 -      pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL);
 -      if (!pages) {
 -              pages = vmalloc(sizeof(*pages) * npages);
 -              if (!pages)
 -                      return ERR_PTR(-ENOMEM);
 -      }
 +      pages = kvmalloc(sizeof(*pages) * npages, GFP_KERNEL);
 +      if (!pages)
 +              return ERR_PTR(-ENOMEM);
  
        for (idx = 0; idx < npages; ) {
                size_t start;
@@@ -120,7 -155,7 +152,7 @@@ prepare_open_request(struct super_bloc
        if (IS_ERR(req))
                goto out;
        req->r_fmode = ceph_flags_to_mode(flags);
-       req->r_args.open.flags = cpu_to_le32(flags);
+       req->r_args.open.flags = ceph_flags_sys2wire(flags);
        req->r_args.open.mode = cpu_to_le32(create_mode);
  out:
        return req;
@@@ -189,7 -224,7 +221,7 @@@ int ceph_renew_caps(struct inode *inode
        spin_lock(&ci->i_ceph_lock);
        wanted = __ceph_caps_file_wanted(ci);
        if (__ceph_is_any_real_caps(ci) &&
-           (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) {
+           (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
                int issued = __ceph_caps_issued(ci, NULL);
                spin_unlock(&ci->i_ceph_lock);
                dout("renew caps %p want %s issued %s updating mds_wanted\n",
@@@ -778,6 -813,7 +810,7 @@@ static void ceph_aio_retry_work(struct 
        req->r_callback = ceph_aio_complete_req;
        req->r_inode = inode;
        req->r_priv = aio_req;
+       req->r_abort_on_full = true;
  
        ret = ceph_osdc_start_request(req->r_osdc, req, false);
  out:
@@@ -1085,19 -1121,22 +1118,22 @@@ ceph_sync_write(struct kiocb *iocb, str
  
  out:
                ceph_osdc_put_request(req);
-               if (ret == 0) {
-                       pos += len;
-                       written += len;
-                       if (pos > i_size_read(inode)) {
-                               check_caps = ceph_inode_set_size(inode, pos);
-                               if (check_caps)
-                                       ceph_check_caps(ceph_inode(inode),
-                                                       CHECK_CAPS_AUTHONLY,
-                                                       NULL);
-                       }
-               } else
+               if (ret != 0) {
+                       ceph_set_error_write(ci);
                        break;
+               }
+               ceph_clear_error_write(ci);
+               pos += len;
+               written += len;
+               if (pos > i_size_read(inode)) {
+                       check_caps = ceph_inode_set_size(inode, pos);
+                       if (check_caps)
+                               ceph_check_caps(ceph_inode(inode),
+                                               CHECK_CAPS_AUTHONLY,
+                                               NULL);
+               }
        }
  
        if (ret != -EOLDSNAPC && written > 0) {
@@@ -1303,6 -1342,7 +1339,7 @@@ static ssize_t ceph_write_iter(struct k
        }
  
  retry_snap:
+       /* FIXME: not complete since it doesn't account for being at quota */
        if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
                err = -ENOSPC;
                goto out;
             inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
  
        if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-           (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
+           (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) ||
+           (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
                struct ceph_snap_context *snapc;
                struct iov_iter data;
                inode_unlock(inode);
diff --combined fs/ceph/mds_client.c
index 1d3fa90d40b92a660bce5e5d5a73a0f0823fb2c7,f7bfc22eb39c6577cbf97a960f4cb033bc518557..f38e56fa97129d646285fa2b433e8130c7b85312
@@@ -189,6 -189,7 +189,7 @@@ static int parse_reply_info_dir(void **
                info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
                info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
                info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
+               info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
        }
        if (num == 0)
                goto done;
@@@ -378,9 -379,9 +379,9 @@@ const char *ceph_session_state_name(in
  
  static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
  {
-       if (atomic_inc_not_zero(&s->s_ref)) {
+       if (refcount_inc_not_zero(&s->s_ref)) {
                dout("mdsc get_session %p %d -> %d\n", s,
-                    atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
+                    refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
                return s;
        } else {
                dout("mdsc get_session %p 0 -- FAIL", s);
  void ceph_put_mds_session(struct ceph_mds_session *s)
  {
        dout("mdsc put_session %p %d -> %d\n", s,
-            atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
-       if (atomic_dec_and_test(&s->s_ref)) {
+            refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
+       if (refcount_dec_and_test(&s->s_ref)) {
                if (s->s_auth.authorizer)
                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
                kfree(s);
@@@ -411,7 -412,7 +412,7 @@@ struct ceph_mds_session *__ceph_lookup_
                return NULL;
        session = mdsc->sessions[mds];
        dout("lookup_mds_session %p %d\n", session,
-            atomic_read(&session->s_ref));
+            refcount_read(&session->s_ref));
        get_session(session);
        return session;
  }
@@@ -441,7 -442,7 +442,7 @@@ static struct ceph_mds_session *registe
  {
        struct ceph_mds_session *s;
  
-       if (mds >= mdsc->mdsmap->m_max_mds)
+       if (mds >= mdsc->mdsmap->m_num_mds)
                return ERR_PTR(-EINVAL);
  
        s = kzalloc(sizeof(*s), GFP_NOFS);
        INIT_LIST_HEAD(&s->s_caps);
        s->s_nr_caps = 0;
        s->s_trim_caps = 0;
-       atomic_set(&s->s_ref, 1);
+       refcount_set(&s->s_ref, 1);
        INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
        s->s_num_cap_releases = 0;
        }
        mdsc->sessions[mds] = s;
        atomic_inc(&mdsc->num_sessions);
-       atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
+       refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
  
        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
@@@ -1004,7 -1005,7 +1005,7 @@@ static void __open_export_target_sessio
        struct ceph_mds_session *ts;
        int i, mds = session->s_mds;
  
-       if (mds >= mdsc->mdsmap->m_max_mds)
+       if (mds >= mdsc->mdsmap->m_num_mds)
                return;
  
        mi = &mdsc->mdsmap->m_info[mds];
@@@ -1551,9 -1552,15 +1552,15 @@@ void ceph_send_cap_releases(struct ceph
        struct ceph_msg *msg = NULL;
        struct ceph_mds_cap_release *head;
        struct ceph_mds_cap_item *item;
+       struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
        struct ceph_cap *cap;
        LIST_HEAD(tmp_list);
        int num_cap_releases;
+       __le32  barrier, *cap_barrier;
+       down_read(&osdc->lock);
+       barrier = cpu_to_le32(osdc->epoch_barrier);
+       up_read(&osdc->lock);
  
        spin_lock(&session->s_cap_lock);
  again:
                        head = msg->front.iov_base;
                        head->num = cpu_to_le32(0);
                        msg->front.iov_len = sizeof(*head);
+                       msg->hdr.version = cpu_to_le16(2);
+                       msg->hdr.compat_version = cpu_to_le16(1);
                }
                cap = list_first_entry(&tmp_list, struct ceph_cap,
                                        session_caps);
                list_del(&cap->session_caps);
                ceph_put_cap(mdsc, cap);
  
                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+                       // Append cap_barrier field
+                       cap_barrier = msg->front.iov_base + msg->front.iov_len;
+                       *cap_barrier = barrier;
+                       msg->front.iov_len += sizeof(*cap_barrier);
                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
                        ceph_con_send(&session->s_con, msg);
        spin_unlock(&session->s_cap_lock);
  
        if (msg) {
+               // Append cap_barrier field
+               cap_barrier = msg->front.iov_base + msg->front.iov_len;
+               *cap_barrier = barrier;
+               msg->front.iov_len += sizeof(*cap_barrier);
                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
                ceph_con_send(&session->s_con, msg);
@@@ -1666,7 -1687,6 +1687,7 @@@ struct ceph_mds_request 
  ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
  {
        struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
 +      struct timespec ts;
  
        if (!req)
                return ERR_PTR(-ENOMEM);
        init_completion(&req->r_safe_completion);
        INIT_LIST_HEAD(&req->r_unsafe_item);
  
 -      req->r_stamp = current_fs_time(mdsc->fsc->sb);
 +      ktime_get_real_ts(&ts);
 +      req->r_stamp = timespec_trunc(ts, mdsc->fsc->sb->s_time_gran);
  
        req->r_op = op;
        req->r_direct_mode = mode;
@@@ -1993,7 -2012,7 +2014,7 @@@ static struct ceph_msg *create_request_
  
        if (req->r_pagelist) {
                struct ceph_pagelist *pagelist = req->r_pagelist;
-               atomic_inc(&pagelist->refcnt);
+               refcount_inc(&pagelist->refcnt);
                ceph_msg_data_add_pagelist(msg, pagelist);
                msg->hdr.data_len = cpu_to_le32(pagelist->length);
        } else {
@@@ -2640,8 -2659,10 +2661,10 @@@ static void handle_session(struct ceph_
        seq = le64_to_cpu(h->seq);
  
        mutex_lock(&mdsc->mutex);
-       if (op == CEPH_SESSION_CLOSE)
+       if (op == CEPH_SESSION_CLOSE) {
+               get_session(session);
                __unregister_session(mdsc, session);
+       }
        /* FIXME: this ttl calculation is generous */
        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
        mutex_unlock(&mdsc->mutex);
                        kick_requests(mdsc, mds);
                mutex_unlock(&mdsc->mutex);
        }
+       if (op == CEPH_SESSION_CLOSE)
+               ceph_put_mds_session(session);
        return;
  
  bad:
@@@ -3109,7 -3132,7 +3134,7 @@@ static void check_new_map(struct ceph_m
        dout("check_new_map new %u old %u\n",
             newmap->m_epoch, oldmap->m_epoch);
  
-       for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
+       for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
                if (mdsc->sessions[i] == NULL)
                        continue;
                s = mdsc->sessions[i];
                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
                     ceph_session_state_name(s->s_state));
  
-               if (i >= newmap->m_max_mds ||
+               if (i >= newmap->m_num_mds ||
                    memcmp(ceph_mdsmap_get_addr(oldmap, i),
                           ceph_mdsmap_get_addr(newmap, i),
                           sizeof(struct ceph_entity_addr))) {
                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
                                /* the session never opened, just close it
                                 * out now */
+                               get_session(s);
+                               __unregister_session(mdsc, s);
                                __wake_requests(mdsc, &s->s_waiting);
+                               ceph_put_mds_session(s);
+                       } else if (i >= newmap->m_num_mds) {
+                               /* force close session for stopped mds */
+                               get_session(s);
                                __unregister_session(mdsc, s);
+                               __wake_requests(mdsc, &s->s_waiting);
+                               kick_requests(mdsc, i);
+                               mutex_unlock(&mdsc->mutex);
+                               mutex_lock(&s->s_mutex);
+                               cleanup_session_requests(mdsc, s);
+                               remove_session_caps(s);
+                               mutex_unlock(&s->s_mutex);
+                               ceph_put_mds_session(s);
+                               mutex_lock(&mdsc->mutex);
                        } else {
                                /* just close it */
                                mutex_unlock(&mdsc->mutex);
                }
        }
  
-       for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+       for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
                s = mdsc->sessions[i];
                if (!s)
                        continue;
@@@ -3883,7 -3924,7 +3926,7 @@@ static struct ceph_connection *con_get(
        struct ceph_mds_session *s = con->private;
  
        if (get_session(s)) {
-               dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
+               dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
                return con;
        }
        dout("mdsc con_get %p FAIL\n", s);
@@@ -3894,7 -3935,7 +3937,7 @@@ static void con_put(struct ceph_connect
  {
        struct ceph_mds_session *s = con->private;
  
-       dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
+       dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
        ceph_put_mds_session(s);
  }
  
diff --combined fs/ceph/super.c
index a8c81b2052ca9052f67cd217785f1c77a510d788,8b9f645006da3f7fcfeb9308e0bf8b833484c3b1..8d7918ce694a9c0d980641ab66b7be6d0aaa33a5
@@@ -544,10 -544,6 +544,6 @@@ static struct ceph_fs_client *create_fs
                                        struct ceph_options *opt)
  {
        struct ceph_fs_client *fsc;
-       const u64 supported_features =
-               CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH |
-               CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA;
-       const u64 required_features = 0;
        int page_count;
        size_t size;
        int err = -ENOMEM;
        if (!fsc)
                return ERR_PTR(-ENOMEM);
  
-       fsc->client = ceph_create_client(opt, fsc, supported_features,
-                                        required_features);
+       fsc->client = ceph_create_client(opt, fsc);
        if (IS_ERR(fsc->client)) {
                err = PTR_ERR(fsc->client);
                goto fail;
  
        atomic_long_set(&fsc->writeback_count, 0);
  
 -      err = bdi_init(&fsc->backing_dev_info);
 -      if (err < 0)
 -              goto fail_client;
 -
        err = -ENOMEM;
        /*
         * The number of concurrent works can be high but they don't need
         */
        fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1);
        if (fsc->wb_wq == NULL)
 -              goto fail_bdi;
 +              goto fail_client;
        fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1);
        if (fsc->pg_inv_wq == NULL)
                goto fail_wb_wq;
@@@ -620,6 -619,8 +615,6 @@@ fail_pg_inv_wq
        destroy_workqueue(fsc->pg_inv_wq);
  fail_wb_wq:
        destroy_workqueue(fsc->wb_wq);
 -fail_bdi:
 -      bdi_destroy(&fsc->backing_dev_info);
  fail_client:
        ceph_destroy_client(fsc->client);
  fail:
@@@ -637,6 -638,8 +632,6 @@@ static void destroy_fs_client(struct ce
        destroy_workqueue(fsc->pg_inv_wq);
        destroy_workqueue(fsc->trunc_wq);
  
 -      bdi_destroy(&fsc->backing_dev_info);
 -
        mempool_destroy(fsc->wb_pagevec_pool);
  
        destroy_mount_options(fsc->mount_options);
@@@ -929,32 -932,33 +924,32 @@@ static int ceph_compare_super(struct su
   */
  static atomic_long_t bdi_seq = ATOMIC_LONG_INIT(0);
  
 -static int ceph_register_bdi(struct super_block *sb,
 -                           struct ceph_fs_client *fsc)
 +static int ceph_setup_bdi(struct super_block *sb, struct ceph_fs_client *fsc)
  {
        int err;
  
 +      err = super_setup_bdi_name(sb, "ceph-%ld",
 +                                 atomic_long_inc_return(&bdi_seq));
 +      if (err)
 +              return err;
 +
        /* set ra_pages based on rasize mount option? */
        if (fsc->mount_options->rasize >= PAGE_SIZE)
 -              fsc->backing_dev_info.ra_pages =
 +              sb->s_bdi->ra_pages =
                        (fsc->mount_options->rasize + PAGE_SIZE - 1)
                        >> PAGE_SHIFT;
        else
 -              fsc->backing_dev_info.ra_pages =
 -                      VM_MAX_READAHEAD * 1024 / PAGE_SIZE;
 +              sb->s_bdi->ra_pages = VM_MAX_READAHEAD * 1024 / PAGE_SIZE;
  
        if (fsc->mount_options->rsize > fsc->mount_options->rasize &&
            fsc->mount_options->rsize >= PAGE_SIZE)
 -              fsc->backing_dev_info.io_pages =
 +              sb->s_bdi->io_pages =
                        (fsc->mount_options->rsize + PAGE_SIZE - 1)
                        >> PAGE_SHIFT;
        else if (fsc->mount_options->rsize == 0)
 -              fsc->backing_dev_info.io_pages = ULONG_MAX;
 +              sb->s_bdi->io_pages = ULONG_MAX;
  
 -      err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
 -                         atomic_long_inc_return(&bdi_seq));
 -      if (!err)
 -              sb->s_bdi = &fsc->backing_dev_info;
 -      return err;
 +      return 0;
  }
  
  static struct dentry *ceph_mount(struct file_system_type *fs_type,
                dout("get_sb got existing client %p\n", fsc);
        } else {
                dout("get_sb using new client %p\n", fsc);
 -              err = ceph_register_bdi(sb, fsc);
 +              err = ceph_setup_bdi(sb, fsc);
                if (err < 0) {
                        res = ERR_PTR(err);
                        goto out_splat;
diff --combined fs/ceph/super.h
index 176186b124575225537afe7049686041d3148c26,7334ee86b9e81c4d6dbf57fc9e458f6e94020cb9..a973acd8beaff67b5b0e67b1217c9bb7419c9982
@@@ -14,6 -14,7 +14,7 @@@
  #include <linux/writeback.h>
  #include <linux/slab.h>
  #include <linux/posix_acl.h>
+ #include <linux/refcount.h>
  
  #include <linux/ceph/libceph.h>
  
@@@ -92,6 -93,8 +93,6 @@@ struct ceph_fs_client 
        struct workqueue_struct *trunc_wq;
        atomic_long_t writeback_count;
  
 -      struct backing_dev_info backing_dev_info;
 -
  #ifdef CONFIG_DEBUG_FS
        struct dentry *debugfs_dentry_lru, *debugfs_caps;
        struct dentry *debugfs_congestion_kb;
@@@ -160,7 -163,7 +161,7 @@@ struct ceph_cap_flush 
   * data before flushing the snapped state (tracked here) back to the MDS.
   */
  struct ceph_cap_snap {
-       atomic_t nref;
+       refcount_t nref;
        struct list_head ci_item;
  
        struct ceph_cap_flush cap_flush;
  
  static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
  {
-       if (atomic_dec_and_test(&capsnap->nref)) {
+       if (refcount_dec_and_test(&capsnap->nref)) {
                if (capsnap->xattr_blob)
                        ceph_buffer_put(capsnap->xattr_blob);
                kfree(capsnap);
@@@ -471,6 -474,32 +472,32 @@@ static inline struct inode *ceph_find_i
  #define CEPH_I_CAP_DROPPED    (1 << 8)  /* caps were forcibly dropped */
  #define CEPH_I_KICK_FLUSH     (1 << 9)  /* kick flushing caps */
  #define CEPH_I_FLUSH_SNAPS    (1 << 10) /* need flush snapss */
+ #define CEPH_I_ERROR_WRITE    (1 << 11) /* have seen write errors */
+ /*
+  * We set the ERROR_WRITE bit when we start seeing write errors on an inode
+  * and then clear it when they start succeeding. Note that we do a lockless
+  * check first, and only take the lock if it looks like it needs to be changed.
+  * The write submission code just takes this as a hint, so we're not too
+  * worried if a few slip through in either direction.
+  */
+ static inline void ceph_set_error_write(struct ceph_inode_info *ci)
+ {
+       if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE)) {
+               spin_lock(&ci->i_ceph_lock);
+               ci->i_ceph_flags |= CEPH_I_ERROR_WRITE;
+               spin_unlock(&ci->i_ceph_lock);
+       }
+ }
+ static inline void ceph_clear_error_write(struct ceph_inode_info *ci)
+ {
+       if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE) {
+               spin_lock(&ci->i_ceph_lock);
+               ci->i_ceph_flags &= ~CEPH_I_ERROR_WRITE;
+               spin_unlock(&ci->i_ceph_lock);
+       }
+ }
  
  static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
                                           long long release_count,
diff --combined net/ceph/ceph_common.c
index 4eb773ccce110aad98051636ed28c1b61121df04,6eeeb67bed84f75e15d9c78001a99b66972c682b..4fd02831beed20fb3d98475c7da2a0b4f2818513
@@@ -45,6 -45,17 +45,17 @@@ bool libceph_compatible(void *data
  }
  EXPORT_SYMBOL(libceph_compatible);
  
+ static int param_get_supported_features(char *buffer,
+                                       const struct kernel_param *kp)
+ {
+       return sprintf(buffer, "0x%llx", CEPH_FEATURES_SUPPORTED_DEFAULT);
+ }
+ static const struct kernel_param_ops param_ops_supported_features = {
+       .get = param_get_supported_features,
+ };
+ module_param_cb(supported_features, &param_ops_supported_features, NULL,
+               S_IRUGO);
  /*
   * find filename portion of a path (/foo/bar/baz -> baz)
   */
@@@ -187,7 -198,7 +198,7 @@@ void *ceph_kvmalloc(size_t size, gfp_t 
                        return ptr;
        }
  
 -      return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
 +      return __vmalloc(size, flags, PAGE_KERNEL);
  }
  
  
@@@ -596,9 -607,7 +607,7 @@@ EXPORT_SYMBOL(ceph_client_gid)
  /*
   * create a fresh client instance
   */
- struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
-                                      u64 supported_features,
-                                      u64 required_features)
+ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
  {
        struct ceph_client *client;
        struct ceph_entity_addr *myaddr = NULL;
        init_waitqueue_head(&client->auth_wq);
        client->auth_err = 0;
  
-       if (!ceph_test_opt(client, NOMSGAUTH))
-               required_features |= CEPH_FEATURE_MSG_AUTH;
        client->extra_mon_dispatch = NULL;
-       client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
-               supported_features;
-       client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT |
-               required_features;
+       client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
+       client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT;
+       if (!ceph_test_opt(client, NOMSGAUTH))
+               client->required_features |= CEPH_FEATURE_MSG_AUTH;
  
        /* msgr */
        if (ceph_test_opt(client, MYIP))
diff --combined net/ceph/osd_client.c
index 242d7c0d92f8c34084ec5f3201c7529c55c681a9,9b8f57fd5ee18a69cbc1a46ae11ac9b7717fa453..924f07c36ddbc2864c7428723766d0a64729c7c4
@@@ -961,6 -961,7 +961,7 @@@ struct ceph_osd_request *ceph_osdc_new_
                                       truncate_size, truncate_seq);
        }
  
+       req->r_abort_on_full = true;
        req->r_flags = flags;
        req->r_base_oloc.pool = layout->pool_id;
        req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
@@@ -1005,7 -1006,7 +1006,7 @@@ static bool osd_registered(struct ceph_
   */
  static void osd_init(struct ceph_osd *osd)
  {
-       atomic_set(&osd->o_ref, 1);
+       refcount_set(&osd->o_ref, 1);
        RB_CLEAR_NODE(&osd->o_node);
        osd->o_requests = RB_ROOT;
        osd->o_linger_requests = RB_ROOT;
@@@ -1050,9 -1051,9 +1051,9 @@@ static struct ceph_osd *create_osd(stru
  
  static struct ceph_osd *get_osd(struct ceph_osd *osd)
  {
-       if (atomic_inc_not_zero(&osd->o_ref)) {
-               dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
-                    atomic_read(&osd->o_ref));
+       if (refcount_inc_not_zero(&osd->o_ref)) {
+               dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
+                    refcount_read(&osd->o_ref));
                return osd;
        } else {
                dout("get_osd %p FAIL\n", osd);
  
  static void put_osd(struct ceph_osd *osd)
  {
-       dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
-            atomic_read(&osd->o_ref) - 1);
-       if (atomic_dec_and_test(&osd->o_ref)) {
+       dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
+            refcount_read(&osd->o_ref) - 1);
+       if (refcount_dec_and_test(&osd->o_ref)) {
                osd_cleanup(osd);
                kfree(osd);
        }
@@@ -1297,8 -1298,9 +1298,9 @@@ static bool target_should_be_paused(str
                       __pool_full(pi);
  
        WARN_ON(pi->id != t->base_oloc.pool);
-       return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
-              (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
+       return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
+              ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
+              (osdc->osdmap->epoch < osdc->epoch_barrier);
  }
  
  enum calc_target_result {
@@@ -1503,9 -1505,10 +1505,10 @@@ static void encode_request(struct ceph_
        ceph_encode_32(&p, req->r_flags);
        ceph_encode_timespec(p, &req->r_mtime);
        p += sizeof(struct ceph_timespec);
-       /* aka reassert_version */
-       memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
-       p += sizeof(req->r_replay_version);
+       /* reassert_version */
+       memset(p, 0, sizeof(struct ceph_eversion));
+       p += sizeof(struct ceph_eversion);
  
        /* oloc */
        ceph_start_encoding(&p, 5, 4,
@@@ -1626,6 -1629,7 +1629,7 @@@ static void maybe_request_map(struct ce
                ceph_monc_renew_subs(&osdc->client->monc);
  }
  
+ static void complete_request(struct ceph_osd_request *req, int err);
  static void send_map_check(struct ceph_osd_request *req);
  
  static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
        enum calc_target_result ct_res;
        bool need_send = false;
        bool promoted = false;
+       bool need_abort = false;
  
        WARN_ON(req->r_tid);
        dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
@@@ -1650,8 -1655,13 +1655,13 @@@ again
                goto promote;
        }
  
-       if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
-           ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
+       if (osdc->osdmap->epoch < osdc->epoch_barrier) {
+               dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
+                    osdc->epoch_barrier);
+               req->r_t.paused = true;
+               maybe_request_map(osdc);
+       } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+                  ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
                dout("req %p pausewr\n", req);
                req->r_t.paused = true;
                maybe_request_map(osdc);
                pr_warn_ratelimited("FULL or reached pool quota\n");
                req->r_t.paused = true;
                maybe_request_map(osdc);
+               if (req->r_abort_on_full)
+                       need_abort = true;
        } else if (!osd_homeless(osd)) {
                need_send = true;
        } else {
        link_request(osd, req);
        if (need_send)
                send_request(req);
+       else if (need_abort)
+               complete_request(req, -ENOSPC);
        mutex_unlock(&osd->lock);
  
        if (ct_res == CALC_TARGET_POOL_DNE)
@@@ -1799,6 -1813,97 +1813,97 @@@ static void abort_request(struct ceph_o
        complete_request(req, err);
  }
  
+ static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
+ {
+       if (likely(eb > osdc->epoch_barrier)) {
+               dout("updating epoch_barrier from %u to %u\n",
+                               osdc->epoch_barrier, eb);
+               osdc->epoch_barrier = eb;
+               /* Request map if we're not to the barrier yet */
+               if (eb > osdc->osdmap->epoch)
+                       maybe_request_map(osdc);
+       }
+ }
+ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
+ {
+       down_read(&osdc->lock);
+       if (unlikely(eb > osdc->epoch_barrier)) {
+               up_read(&osdc->lock);
+               down_write(&osdc->lock);
+               update_epoch_barrier(osdc, eb);
+               up_write(&osdc->lock);
+       } else {
+               up_read(&osdc->lock);
+       }
+ }
+ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
+ /*
+  * Drop all pending requests that are stalled waiting on a full condition to
+  * clear, and complete them with ENOSPC as the return code. Set the
+  * osdc->epoch_barrier to the latest map epoch that we've seen if any were
+  * cancelled.
+  */
+ static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
+ {
+       struct rb_node *n;
+       bool victims = false;
+       dout("enter abort_on_full\n");
+       if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
+               goto out;
+       /* Scan list and see if there is anything to abort */
+       for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+               struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+               struct rb_node *m;
+               m = rb_first(&osd->o_requests);
+               while (m) {
+                       struct ceph_osd_request *req = rb_entry(m,
+                                       struct ceph_osd_request, r_node);
+                       m = rb_next(m);
+                       if (req->r_abort_on_full) {
+                               victims = true;
+                               break;
+                       }
+               }
+               if (victims)
+                       break;
+       }
+       if (!victims)
+               goto out;
+       /*
+        * Update the barrier to current epoch if it's behind that point,
+        * since we know we have some calls to be aborted in the tree.
+        */
+       update_epoch_barrier(osdc, osdc->osdmap->epoch);
+       for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+               struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+               struct rb_node *m;
+               m = rb_first(&osd->o_requests);
+               while (m) {
+                       struct ceph_osd_request *req = rb_entry(m,
+                                       struct ceph_osd_request, r_node);
+                       m = rb_next(m);
+                       if (req->r_abort_on_full &&
+                           (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
+                            pool_full(osdc, req->r_t.target_oloc.pool)))
+                               abort_request(req, -ENOSPC);
+               }
+       }
+ out:
+       dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
+ }
  static void check_pool_dne(struct ceph_osd_request *req)
  {
        struct ceph_osd_client *osdc = req->r_osdc;
@@@ -3252,11 -3357,13 +3357,13 @@@ done
        pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
                  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
                  have_pool_full(osdc);
-       if (was_pauserd || was_pausewr || pauserd || pausewr)
+       if (was_pauserd || was_pausewr || pauserd || pausewr ||
+           osdc->osdmap->epoch < osdc->epoch_barrier)
                maybe_request_map(osdc);
  
        kick_requests(osdc, &need_resend, &need_resend_linger);
  
+       ceph_osdc_abort_on_full(osdc);
        ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
                          osdc->osdmap->epoch);
        up_write(&osdc->lock);
@@@ -3574,7 -3681,7 +3681,7 @@@ ceph_osdc_watch(struct ceph_osd_client 
        ceph_oid_copy(&lreq->t.base_oid, oid);
        ceph_oloc_copy(&lreq->t.base_oloc, oloc);
        lreq->t.flags = CEPH_OSD_FLAG_WRITE;
 -      lreq->mtime = CURRENT_TIME;
 +      ktime_get_real_ts(&lreq->mtime);
  
        lreq->reg_req = alloc_linger_request(lreq);
        if (!lreq->reg_req) {
@@@ -3632,7 -3739,7 +3739,7 @@@ int ceph_osdc_unwatch(struct ceph_osd_c
        ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
        ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
        req->r_flags = CEPH_OSD_FLAG_WRITE;
 -      req->r_mtime = CURRENT_TIME;
 +      ktime_get_real_ts(&req->r_mtime);
        osd_req_op_watch_init(req, 0, lreq->linger_id,
                              CEPH_OSD_WATCH_OP_UNWATCH);
  
@@@ -4126,7 -4233,7 +4233,7 @@@ void ceph_osdc_stop(struct ceph_osd_cli
                close_osd(osd);
        }
        up_write(&osdc->lock);
-       WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
+       WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
        osd_cleanup(&osdc->homeless_osd);
  
        WARN_ON(!list_empty(&osdc->osd_lru));