Merge git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 30 May 2012 18:17:19 +0000 (11:17 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 30 May 2012 18:17:19 +0000 (11:17 -0700)
Pull ceph updates from Sage Weil:
 "There are some updates and cleanups to the CRUSH placement code, a bug
  fix with incremental maps, several cleanups and fixes from Josh Durgin
  in the RBD block device code, a series of cleanups and bug fixes from
  Alex Elder in the messenger code, and some miscellaneous bounds
  checking and gfp cleanups/fixes."

Fix up trivial conflicts in net/ceph/{messenger.c,osdmap.c} due to the
networking people preferring "unsigned int" over just "unsigned".

* git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (45 commits)
  libceph: fix pg_temp updates
  libceph: avoid unregistering osd request when not registered
  ceph: add auth buf in prepare_write_connect()
  ceph: rename prepare_connect_authorizer()
  ceph: return pointer from prepare_connect_authorizer()
  ceph: use info returned by get_authorizer
  ceph: have get_authorizer methods return pointers
  ceph: ensure auth ops are defined before use
  ceph: messenger: reduce args to create_authorizer
  ceph: define ceph_auth_handshake type
  ceph: messenger: check return from get_authorizer
  ceph: messenger: rework prepare_connect_authorizer()
  ceph: messenger: check prepare_write_connect() result
  ceph: don't set WRITE_PENDING too early
  ceph: drop msgr argument from prepare_write_connect()
  ceph: messenger: send banner in process_connect()
  ceph: messenger: reset connection kvec caller
  libceph: don't reset kvec in prepare_write_banner()
  ceph: ignore preferred_osd field
  ceph: fully initialize new layout
  ...

23 files changed:
Documentation/ABI/testing/sysfs-bus-rbd
drivers/block/rbd.c
fs/ceph/file.c
fs/ceph/ioctl.c
fs/ceph/ioctl.h
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/xattr.c
include/linux/ceph/auth.h
include/linux/ceph/ceph_fs.h
include/linux/ceph/decode.h
include/linux/ceph/messenger.h
include/linux/ceph/osd_client.h
include/linux/ceph/osdmap.h
include/linux/crush/crush.h
include/linux/crush/mapper.h
net/ceph/auth_none.c
net/ceph/auth_x.c
net/ceph/crush/crush.c
net/ceph/crush/mapper.c
net/ceph/messenger.c
net/ceph/osd_client.c
net/ceph/osdmap.c

index dbedafb095e24d3d3a8e2d93b6cbd727268d1754..bcd88eb7ebcd240abcdbe0ef34f4a517f0f404bd 100644 (file)
@@ -65,11 +65,11 @@ snap_*
 Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
 -------------------------------------------------------------
 
-id
+snap_id
 
        The rados internal snapshot id assigned for this snapshot
 
-size
+snap_size
 
        The size of the image when this snapshot was taken.
 
index 013c7a549fb6dbc3d5d1afe2e01730e951846507..65665c9c42c62ba5805324df96292e560f16b04a 100644 (file)
@@ -141,7 +141,7 @@ struct rbd_request {
 struct rbd_snap {
        struct  device          dev;
        const char              *name;
-       size_t                  size;
+       u64                     size;
        struct list_head        node;
        u64                     id;
 };
@@ -175,8 +175,7 @@ struct rbd_device {
        /* protects updating the header */
        struct rw_semaphore     header_rwsem;
        char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
-       u32 cur_snap;   /* index+1 of current snapshot within snap context
-                          0 - for the head */
+       u64                     snap_id;        /* current snapshot id */
        int read_only;
 
        struct list_head        node;
@@ -241,7 +240,7 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
        put_device(&rbd_dev->dev);
 }
 
-static int __rbd_update_snaps(struct rbd_device *rbd_dev);
+static int __rbd_refresh_header(struct rbd_device *rbd_dev);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -450,7 +449,9 @@ static void rbd_client_release(struct kref *kref)
        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 
        dout("rbd_release_client %p\n", rbdc);
+       spin_lock(&rbd_client_list_lock);
        list_del(&rbdc->node);
+       spin_unlock(&rbd_client_list_lock);
 
        ceph_destroy_client(rbdc->client);
        kfree(rbdc->rbd_opts);
@@ -463,9 +464,7 @@ static void rbd_client_release(struct kref *kref)
  */
 static void rbd_put_client(struct rbd_device *rbd_dev)
 {
-       spin_lock(&rbd_client_list_lock);
        kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
-       spin_unlock(&rbd_client_list_lock);
        rbd_dev->rbd_client = NULL;
 }
 
@@ -487,16 +486,18 @@ static void rbd_coll_release(struct kref *kref)
  */
 static int rbd_header_from_disk(struct rbd_image_header *header,
                                 struct rbd_image_header_ondisk *ondisk,
-                                int allocated_snaps,
+                                u32 allocated_snaps,
                                 gfp_t gfp_flags)
 {
-       int i;
-       u32 snap_count;
+       u32 i, snap_count;
 
        if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
                return -ENXIO;
 
        snap_count = le32_to_cpu(ondisk->snap_count);
+       if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
+                        / sizeof (*ondisk))
+               return -EINVAL;
        header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
                                snap_count * sizeof (*ondisk),
                                gfp_flags);
@@ -506,11 +507,11 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
        header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
        if (snap_count) {
                header->snap_names = kmalloc(header->snap_names_len,
-                                            GFP_KERNEL);
+                                            gfp_flags);
                if (!header->snap_names)
                        goto err_snapc;
                header->snap_sizes = kmalloc(snap_count * sizeof(u64),
-                                            GFP_KERNEL);
+                                            gfp_flags);
                if (!header->snap_sizes)
                        goto err_names;
        } else {
@@ -552,21 +553,6 @@ err_snapc:
        return -ENOMEM;
 }
 
-static int snap_index(struct rbd_image_header *header, int snap_num)
-{
-       return header->total_snaps - snap_num;
-}
-
-static u64 cur_snap_id(struct rbd_device *rbd_dev)
-{
-       struct rbd_image_header *header = &rbd_dev->header;
-
-       if (!rbd_dev->cur_snap)
-               return 0;
-
-       return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
-}
-
 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
                        u64 *seq, u64 *size)
 {
@@ -605,7 +591,7 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
                        snapc->seq = header->snap_seq;
                else
                        snapc->seq = 0;
-               dev->cur_snap = 0;
+               dev->snap_id = CEPH_NOSNAP;
                dev->read_only = 0;
                if (size)
                        *size = header->image_size;
@@ -613,8 +599,7 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
                ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
                if (ret < 0)
                        goto done;
-
-               dev->cur_snap = header->total_snaps - ret;
+               dev->snap_id = snapc->seq;
                dev->read_only = 1;
        }
 
@@ -935,7 +920,6 @@ static int rbd_do_request(struct request *rq,
        layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        layout->fl_stripe_count = cpu_to_le32(1);
        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       layout->fl_pg_preferred = cpu_to_le32(-1);
        layout->fl_pg_pool = cpu_to_le32(dev->poolid);
        ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
                                req, ops);
@@ -1168,7 +1152,7 @@ static int rbd_req_read(struct request *rq,
                         int coll_index)
 {
        return rbd_do_op(rq, rbd_dev, NULL,
-                        (snapid ? snapid : CEPH_NOSNAP),
+                        snapid,
                         CEPH_OSD_OP_READ,
                         CEPH_OSD_FLAG_READ,
                         2,
@@ -1187,7 +1171,7 @@ static int rbd_req_sync_read(struct rbd_device *dev,
                          u64 *ver)
 {
        return rbd_req_sync_op(dev, NULL,
-                              (snapid ? snapid : CEPH_NOSNAP),
+                              snapid,
                               CEPH_OSD_OP_READ,
                               CEPH_OSD_FLAG_READ,
                               NULL,
@@ -1238,7 +1222,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
                notify_id, (int)opcode);
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       rc = __rbd_update_snaps(dev);
+       rc = __rbd_refresh_header(dev);
        mutex_unlock(&ctl_mutex);
        if (rc)
                pr_warning(RBD_DRV_NAME "%d got notification but failed to "
@@ -1521,7 +1505,7 @@ static void rbd_rq_fn(struct request_queue *q)
                                              coll, cur_seg);
                        else
                                rbd_req_read(rq, rbd_dev,
-                                            cur_snap_id(rbd_dev),
+                                            rbd_dev->snap_id,
                                             ofs,
                                             op_size, bio,
                                             coll, cur_seg);
@@ -1592,7 +1576,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
 {
        ssize_t rc;
        struct rbd_image_header_ondisk *dh;
-       int snap_count = 0;
+       u32 snap_count = 0;
        u64 ver;
        size_t len;
 
@@ -1656,7 +1640,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        struct ceph_mon_client *monc;
 
        /* we should create a snapshot only if we're pointing at the head */
-       if (dev->cur_snap)
+       if (dev->snap_id != CEPH_NOSNAP)
                return -EINVAL;
 
        monc = &dev->rbd_client->client->monc;
@@ -1683,7 +1667,9 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        if (ret < 0)
                return ret;
 
-       dev->header.snapc->seq =  new_snapid;
+       down_write(&dev->header_rwsem);
+       dev->header.snapc->seq = new_snapid;
+       up_write(&dev->header_rwsem);
 
        return 0;
 bad:
@@ -1703,7 +1689,7 @@ static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int __rbd_update_snaps(struct rbd_device *rbd_dev)
+static int __rbd_refresh_header(struct rbd_device *rbd_dev)
 {
        int ret;
        struct rbd_image_header h;
@@ -1890,7 +1876,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
 
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 
-       rc = __rbd_update_snaps(rbd_dev);
+       rc = __rbd_refresh_header(rbd_dev);
        if (rc < 0)
                ret = rc;
 
@@ -1949,7 +1935,7 @@ static ssize_t rbd_snap_size_show(struct device *dev,
 {
        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
 
-       return sprintf(buf, "%zd\n", snap->size);
+       return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
 }
 
 static ssize_t rbd_snap_id_show(struct device *dev,
@@ -1958,7 +1944,7 @@ static ssize_t rbd_snap_id_show(struct device *dev,
 {
        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
 
-       return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
+       return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
 }
 
 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
@@ -2173,7 +2159,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
                                         rbd_dev->header.obj_version);
                if (ret == -ERANGE) {
                        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-                       rc = __rbd_update_snaps(rbd_dev);
+                       rc = __rbd_refresh_header(rbd_dev);
                        mutex_unlock(&ctl_mutex);
                        if (rc < 0)
                                return rc;
@@ -2558,7 +2544,7 @@ static ssize_t rbd_snap_add(struct device *dev,
        if (ret < 0)
                goto err_unlock;
 
-       ret = __rbd_update_snaps(rbd_dev);
+       ret = __rbd_refresh_header(rbd_dev);
        if (ret < 0)
                goto err_unlock;
 
index ed72428d9c75c80a6744ccd6a996b83c1a20d333..988d4f302e4880281a2b5e04c9f44dd7870202d2 100644 (file)
@@ -54,7 +54,6 @@ prepare_open_request(struct super_block *sb, int flags, int create_mode)
        req->r_fmode = ceph_flags_to_mode(flags);
        req->r_args.open.flags = cpu_to_le32(flags);
        req->r_args.open.mode = cpu_to_le32(create_mode);
-       req->r_args.open.preferred = cpu_to_le32(-1);
 out:
        return req;
 }
index 790914a598dd5d68b8f40b851c2faff2e790e4af..8e3fb69fbe62e60cd3698c07d9fa8e53d6d2b184 100644 (file)
@@ -26,8 +26,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
                l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
                l.object_size = ceph_file_layout_object_size(ci->i_layout);
                l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
-               l.preferred_osd =
-                       (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
+               l.preferred_osd = (s32)-1;
                if (copy_to_user(arg, &l, sizeof(l)))
                        return -EFAULT;
        }
@@ -35,6 +34,32 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
        return err;
 }
 
+static long __validate_layout(struct ceph_mds_client *mdsc,
+                             struct ceph_ioctl_layout *l)
+{
+       int i, err;
+
+       /* validate striping parameters */
+       if ((l->object_size & ~PAGE_MASK) ||
+           (l->stripe_unit & ~PAGE_MASK) ||
+           ((unsigned)l->object_size % (unsigned)l->stripe_unit))
+               return -EINVAL;
+
+       /* make sure it's a valid data pool */
+       mutex_lock(&mdsc->mutex);
+       err = -EINVAL;
+       for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++)
+               if (mdsc->mdsmap->m_data_pg_pools[i] == l->data_pool) {
+                       err = 0;
+                       break;
+               }
+       mutex_unlock(&mdsc->mutex);
+       if (err)
+               return err;
+
+       return 0;
+}
+
 static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
 {
        struct inode *inode = file->f_dentry->d_inode;
@@ -44,52 +69,40 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
        struct ceph_ioctl_layout l;
        struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode);
        struct ceph_ioctl_layout nl;
-       int err, i;
+       int err;
 
        if (copy_from_user(&l, arg, sizeof(l)))
                return -EFAULT;
 
        /* validate changed params against current layout */
        err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT);
-       if (!err) {
-               nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
-               nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
-               nl.object_size = ceph_file_layout_object_size(ci->i_layout);
-               nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
-               nl.preferred_osd =
-                               (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
-       } else
+       if (err)
                return err;
 
+       memset(&nl, 0, sizeof(nl));
        if (l.stripe_count)
                nl.stripe_count = l.stripe_count;
+       else
+               nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
        if (l.stripe_unit)
                nl.stripe_unit = l.stripe_unit;
+       else
+               nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
        if (l.object_size)
                nl.object_size = l.object_size;
+       else
+               nl.object_size = ceph_file_layout_object_size(ci->i_layout);
        if (l.data_pool)
                nl.data_pool = l.data_pool;
-       if (l.preferred_osd)
-               nl.preferred_osd = l.preferred_osd;
+       else
+               nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout);
 
-       if ((nl.object_size & ~PAGE_MASK) ||
-           (nl.stripe_unit & ~PAGE_MASK) ||
-           ((unsigned)nl.object_size % (unsigned)nl.stripe_unit))
-               return -EINVAL;
+       /* this is obsolete, and always -1 */
+       nl.preferred_osd = le64_to_cpu(-1);
 
-       /* make sure it's a valid data pool */
-       if (l.data_pool > 0) {
-               mutex_lock(&mdsc->mutex);
-               err = -EINVAL;
-               for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++)
-                       if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) {
-                               err = 0;
-                               break;
-                       }
-               mutex_unlock(&mdsc->mutex);
-               if (err)
-                       return err;
-       }
+       err = __validate_layout(mdsc, &nl);
+       if (err)
+               return err;
 
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETLAYOUT,
                                       USE_AUTH_MDS);
@@ -106,8 +119,6 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
        req->r_args.setlayout.layout.fl_object_size =
                cpu_to_le32(l.object_size);
        req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool);
-       req->r_args.setlayout.layout.fl_pg_preferred =
-               cpu_to_le32(l.preferred_osd);
 
        parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
        err = ceph_mdsc_do_request(mdsc, parent_inode, req);
@@ -127,33 +138,16 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg)
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_mds_request *req;
        struct ceph_ioctl_layout l;
-       int err, i;
+       int err;
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 
        /* copy and validate */
        if (copy_from_user(&l, arg, sizeof(l)))
                return -EFAULT;
 
-       if ((l.object_size & ~PAGE_MASK) ||
-           (l.stripe_unit & ~PAGE_MASK) ||
-           !l.stripe_unit ||
-           (l.object_size &&
-               (unsigned)l.object_size % (unsigned)l.stripe_unit))
-               return -EINVAL;
-
-       /* make sure it's a valid data pool */
-       if (l.data_pool > 0) {
-               mutex_lock(&mdsc->mutex);
-               err = -EINVAL;
-               for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++)
-                       if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) {
-                               err = 0;
-                               break;
-                       }
-               mutex_unlock(&mdsc->mutex);
-               if (err)
-                       return err;
-       }
+       err = __validate_layout(mdsc, &l);
+       if (err)
+               return err;
 
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETDIRLAYOUT,
                                       USE_AUTH_MDS);
@@ -171,8 +165,6 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg)
                        cpu_to_le32(l.object_size);
        req->r_args.setlayout.layout.fl_pg_pool =
                        cpu_to_le32(l.data_pool);
-       req->r_args.setlayout.layout.fl_pg_preferred =
-                       cpu_to_le32(l.preferred_osd);
 
        err = ceph_mdsc_do_request(mdsc, inode, req);
        ceph_mdsc_put_request(req);
index be4a604873331dc7c547950650a899b514f56d26..c77028afb1e1e6b52315a73d013a26fb5c9c7f5b 100644 (file)
@@ -34,6 +34,8 @@
 struct ceph_ioctl_layout {
        __u64 stripe_unit, stripe_count, object_size;
        __u64 data_pool;
+
+       /* obsolete.  new values ignored, always return -1 */
        __s64 preferred_osd;
 };
 
index 89971e137aab80454fed8a51a105d7df903b3101..200bc87eceb1cc417a1caa1a73e264cde79dda78 100644 (file)
@@ -334,10 +334,10 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
        dout("mdsc put_session %p %d -> %d\n", s,
             atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
        if (atomic_dec_and_test(&s->s_ref)) {
-               if (s->s_authorizer)
+               if (s->s_auth.authorizer)
                     s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
                             s->s_mdsc->fsc->client->monc.auth,
-                            s->s_authorizer);
+                            s->s_auth.authorizer);
                kfree(s);
        }
 }
@@ -3395,39 +3395,33 @@ out:
 /*
  * authentication
  */
-static int get_authorizer(struct ceph_connection *con,
-                         void **buf, int *len, int *proto,
-                         void **reply_buf, int *reply_len, int force_new)
+
+/*
+ * Note: returned pointer is the address of a structure that's
+ * managed separately.  Caller must *not* attempt to free it.
+ */
+static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
+                                       int *proto, int force_new)
 {
        struct ceph_mds_session *s = con->private;
        struct ceph_mds_client *mdsc = s->s_mdsc;
        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
-       int ret = 0;
-
-       if (force_new && s->s_authorizer) {
-               ac->ops->destroy_authorizer(ac, s->s_authorizer);
-               s->s_authorizer = NULL;
-       }
-       if (s->s_authorizer == NULL) {
-               if (ac->ops->create_authorizer) {
-                       ret = ac->ops->create_authorizer(
-                               ac, CEPH_ENTITY_TYPE_MDS,
-                               &s->s_authorizer,
-                               &s->s_authorizer_buf,
-                               &s->s_authorizer_buf_len,
-                               &s->s_authorizer_reply_buf,
-                               &s->s_authorizer_reply_buf_len);
-                       if (ret)
-                               return ret;
-               }
-       }
+       struct ceph_auth_handshake *auth = &s->s_auth;
 
+       if (force_new && auth->authorizer) {
+               if (ac->ops && ac->ops->destroy_authorizer)
+                       ac->ops->destroy_authorizer(ac, auth->authorizer);
+               auth->authorizer = NULL;
+       }
+       if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
+               int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
+                                                       auth);
+               if (ret)
+                       return ERR_PTR(ret);
+       }
        *proto = ac->protocol;
-       *buf = s->s_authorizer_buf;
-       *len = s->s_authorizer_buf_len;
-       *reply_buf = s->s_authorizer_reply_buf;
-       *reply_len = s->s_authorizer_reply_buf_len;
-       return 0;
+
+       return auth;
 }
 
 
@@ -3437,7 +3431,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
        struct ceph_mds_client *mdsc = s->s_mdsc;
        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
 
-       return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
+       return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len);
 }
 
 static int invalidate_authorizer(struct ceph_connection *con)
index 8c7c04ebb595a1a8bd2e9c1b177890f8ba234b9a..dd26846dd71de4267146b7deb43c7a5d7b9b976f 100644 (file)
@@ -11,6 +11,7 @@
 #include <linux/ceph/types.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/mdsmap.h>
+#include <linux/ceph/auth.h>
 
 /*
  * Some lock dependencies:
@@ -113,9 +114,7 @@ struct ceph_mds_session {
 
        struct ceph_connection s_con;
 
-       struct ceph_authorizer *s_authorizer;
-       void             *s_authorizer_buf, *s_authorizer_reply_buf;
-       size_t            s_authorizer_buf_len, s_authorizer_reply_buf_len;
+       struct ceph_auth_handshake s_auth;
 
        /* protected by s_gen_ttl_lock */
        spinlock_t        s_gen_ttl_lock;
index 35b86331d8a5ce84c311e9eb2730757f80149179..785cb3057c95a436c344da2d76ee24b86cb7954b 100644 (file)
@@ -118,15 +118,6 @@ static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val,
                (unsigned long long)ceph_file_layout_su(ci->i_layout),
                (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
                (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
-
-       if (ceph_file_layout_pg_preferred(ci->i_layout) >= 0) {
-               val += ret;
-               size -= ret;
-               ret += snprintf(val, size, "preferred_osd=%lld\n",
-                           (unsigned long long)ceph_file_layout_pg_preferred(
-                                   ci->i_layout));
-       }
-
        return ret;
 }
 
index aa13392a7efbf2234add8465540869b75930a25b..d4080f309b5699d5fbaa5dbe96d38a1977be7693 100644 (file)
 struct ceph_auth_client;
 struct ceph_authorizer;
 
+struct ceph_auth_handshake {
+       struct ceph_authorizer *authorizer;
+       void *authorizer_buf;
+       size_t authorizer_buf_len;
+       void *authorizer_reply_buf;
+       size_t authorizer_reply_buf_len;
+};
+
 struct ceph_auth_client_ops {
        const char *name;
 
@@ -43,9 +51,7 @@ struct ceph_auth_client_ops {
         * the response to authenticate the service.
         */
        int (*create_authorizer)(struct ceph_auth_client *ac, int peer_type,
-                                struct ceph_authorizer **a,
-                                void **buf, size_t *len,
-                                void **reply_buf, size_t *reply_len);
+                                struct ceph_auth_handshake *auth);
        int (*verify_authorizer_reply)(struct ceph_auth_client *ac,
                                       struct ceph_authorizer *a, size_t len);
        void (*destroy_authorizer)(struct ceph_auth_client *ac,
index b8c60694b2b0977d4ecdb982d8bcaea5f0ef2673..e81ab30d4896329e29d47bea33d92e5634da6a89 100644 (file)
@@ -65,7 +65,7 @@ struct ceph_file_layout {
        __le32 fl_object_stripe_unit;  /* UNUSED.  for per-object parity, if any */
 
        /* object -> pg layout */
-       __le32 fl_pg_preferred; /* preferred primary for pg (-1 for none) */
+       __le32 fl_unused;       /* unused; used to be preferred primary (-1) */
        __le32 fl_pg_pool;      /* namespace, crush ruleset, rep level */
 } __attribute__ ((packed));
 
@@ -384,7 +384,7 @@ union ceph_mds_request_args {
                __le32 stripe_count;         /* ... */
                __le32 object_size;
                __le32 file_replication;
-               __le32 preferred;
+               __le32 unused;               /* used to be preferred osd */
        } __attribute__ ((packed)) open;
        struct {
                __le32 flags;
index 220ae21e819b1fb2623d19d8cf4f619862f11c42..d8615dee5808d3f55c93a38c6fdb66113f09a691 100644 (file)
@@ -46,9 +46,14 @@ static inline void ceph_decode_copy(void **p, void *pv, size_t n)
 /*
  * bounds check input.
  */
+static inline int ceph_has_room(void **p, void *end, size_t n)
+{
+       return end >= *p && n <= end - *p;
+}
+
 #define ceph_decode_need(p, end, n, bad)               \
        do {                                            \
-               if (unlikely(*(p) + (n) > (end)))       \
+               if (!likely(ceph_has_room(p, end, n)))  \
                        goto bad;                       \
        } while (0)
 
@@ -167,7 +172,7 @@ static inline void ceph_encode_string(void **p, void *end,
 
 #define ceph_encode_need(p, end, n, bad)               \
        do {                                            \
-               if (unlikely(*(p) + (n) > (end)))       \
+               if (!likely(ceph_has_room(p, end, n)))  \
                        goto bad;                       \
        } while (0)
 
index 3bff047f6b0f19d1037e4e7157fc785dd213f4cc..2521a95fa6d98597d1fafe4d4cff23ce9dc0f069 100644 (file)
@@ -25,9 +25,9 @@ struct ceph_connection_operations {
        void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
 
        /* authorize an outgoing connection */
-       int (*get_authorizer) (struct ceph_connection *con,
-                              void **buf, int *len, int *proto,
-                              void **reply_buf, int *reply_len, int force_new);
+       struct ceph_auth_handshake *(*get_authorizer) (
+                               struct ceph_connection *con,
+                              int *proto, int force_new);
        int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
        int (*invalidate_authorizer)(struct ceph_connection *con);
 
index 7c05ac202d90650069d4713ac2e13b3be9b86024..cedfb1a8434a11a0ba0b32348a0687e0ff9e7836 100644 (file)
@@ -6,9 +6,10 @@
 #include <linux/mempool.h>
 #include <linux/rbtree.h>
 
-#include "types.h"
-#include "osdmap.h"
-#include "messenger.h"
+#include <linux/ceph/types.h>
+#include <linux/ceph/osdmap.h>
+#include <linux/ceph/messenger.h>
+#include <linux/ceph/auth.h>
 
 /* 
  * Maximum object name size 
@@ -40,9 +41,7 @@ struct ceph_osd {
        struct list_head o_requests;
        struct list_head o_linger_requests;
        struct list_head o_osd_lru;
-       struct ceph_authorizer *o_authorizer;
-       void *o_authorizer_buf, *o_authorizer_reply_buf;
-       size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
+       struct ceph_auth_handshake o_auth;
        unsigned long lru_ttl;
        int o_marked_for_keepalive;
        struct list_head o_keepalive_item;
index ba4c205cbb016a141495e2e872ee6a3cfd57253a..311ef8d6aa9efc41b89ea1e6af9b07f538a84440 100644 (file)
@@ -65,8 +65,6 @@ struct ceph_osdmap {
 #define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash))
 #define ceph_file_layout_object_su(l) \
        ((__s32)le32_to_cpu((l).fl_object_stripe_unit))
-#define ceph_file_layout_pg_preferred(l) \
-       ((__s32)le32_to_cpu((l).fl_pg_preferred))
 #define ceph_file_layout_pg_pool(l) \
        ((__s32)le32_to_cpu((l).fl_pg_pool))
 
index 97e435b191f411380bbc546530c725093095231c..7c4750811b966e7d865484c2cf7020199c628164 100644 (file)
@@ -151,16 +151,6 @@ struct crush_map {
        struct crush_bucket **buckets;
        struct crush_rule **rules;
 
-       /*
-        * Parent pointers to identify the parent bucket a device or
-        * bucket in the hierarchy.  If an item appears more than
-        * once, this is the _last_ time it appeared (where buckets
-        * are processed in bucket id order, from -1 on down to
-        * -max_buckets.
-        */
-       __u32 *bucket_parents;
-       __u32 *device_parents;
-
        __s32 max_buckets;
        __u32 max_rules;
        __s32 max_devices;
@@ -168,8 +158,7 @@ struct crush_map {
 
 
 /* crush.c */
-extern int crush_get_bucket_item_weight(struct crush_bucket *b, int pos);
-extern void crush_calc_parents(struct crush_map *map);
+extern int crush_get_bucket_item_weight(const struct crush_bucket *b, int pos);
 extern void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b);
 extern void crush_destroy_bucket_list(struct crush_bucket_list *b);
 extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b);
@@ -177,4 +166,9 @@ extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b);
 extern void crush_destroy_bucket(struct crush_bucket *b);
 extern void crush_destroy(struct crush_map *map);
 
+static inline int crush_calc_tree_node(int i)
+{
+       return ((i+1) << 1)-1;
+}
+
 #endif
index c46b99c18bb0ca772f87c567f4ab46cec54932db..71d79f44a7d0753faeb61a3072efdbacc1aa2371 100644 (file)
 
 #include "crush.h"
 
-extern int crush_find_rule(struct crush_map *map, int pool, int type, int size);
-extern int crush_do_rule(struct crush_map *map,
+extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size);
+extern int crush_do_rule(const struct crush_map *map,
                         int ruleno,
                         int x, int *result, int result_max,
-                        int forcefeed,    /* -1 for none */
-                        __u32 *weights);
+                        const __u32 *weights);
 
 #endif
index 214c2bb43d6252056a7a77dcd2d8eb5877c719d1..925ca583c09c8eae2fbaebbd73603e9194132f97 100644 (file)
@@ -59,9 +59,7 @@ static int handle_reply(struct ceph_auth_client *ac, int result,
  */
 static int ceph_auth_none_create_authorizer(
        struct ceph_auth_client *ac, int peer_type,
-       struct ceph_authorizer **a,
-       void **buf, size_t *len,
-       void **reply_buf, size_t *reply_len)
+       struct ceph_auth_handshake *auth)
 {
        struct ceph_auth_none_info *ai = ac->private;
        struct ceph_none_authorizer *au = &ai->au;
@@ -82,11 +80,12 @@ static int ceph_auth_none_create_authorizer(
                dout("built authorizer len %d\n", au->buf_len);
        }
 
-       *a = (struct ceph_authorizer *)au;
-       *buf = au->buf;
-       *len = au->buf_len;
-       *reply_buf = au->reply_buf;
-       *reply_len = sizeof(au->reply_buf);
+       auth->authorizer = (struct ceph_authorizer *) au;
+       auth->authorizer_buf = au->buf;
+       auth->authorizer_buf_len = au->buf_len;
+       auth->authorizer_reply_buf = au->reply_buf;
+       auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
+
        return 0;
 
 bad2:
index 1587dc6010c6276fd7c6e997bb7af6fe08d426be..a16bf14eb027cd4e765320f98b0fb378b5a5e7c4 100644 (file)
@@ -526,9 +526,7 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result,
 
 static int ceph_x_create_authorizer(
        struct ceph_auth_client *ac, int peer_type,
-       struct ceph_authorizer **a,
-       void **buf, size_t *len,
-       void **reply_buf, size_t *reply_len)
+       struct ceph_auth_handshake *auth)
 {
        struct ceph_x_authorizer *au;
        struct ceph_x_ticket_handler *th;
@@ -548,11 +546,12 @@ static int ceph_x_create_authorizer(
                return ret;
        }
 
-       *a = (struct ceph_authorizer *)au;
-       *buf = au->buf->vec.iov_base;
-       *len = au->buf->vec.iov_len;
-       *reply_buf = au->reply_buf;
-       *reply_len = sizeof(au->reply_buf);
+       auth->authorizer = (struct ceph_authorizer *) au;
+       auth->authorizer_buf = au->buf->vec.iov_base;
+       auth->authorizer_buf_len = au->buf->vec.iov_len;
+       auth->authorizer_reply_buf = au->reply_buf;
+       auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
+
        return 0;
 }
 
index d6ebb13a18a4bc787eb249ad3e4b62ec1c174f97..089613234f032610c05f25a239c1d2053e768b45 100644 (file)
@@ -26,9 +26,9 @@ const char *crush_bucket_alg_name(int alg)
  * @b: bucket pointer
  * @p: item index in bucket
  */
-int crush_get_bucket_item_weight(struct crush_bucket *b, int p)
+int crush_get_bucket_item_weight(const struct crush_bucket *b, int p)
 {
-       if (p >= b->size)
+       if ((__u32)p >= b->size)
                return 0;
 
        switch (b->alg) {
@@ -37,38 +37,13 @@ int crush_get_bucket_item_weight(struct crush_bucket *b, int p)
        case CRUSH_BUCKET_LIST:
                return ((struct crush_bucket_list *)b)->item_weights[p];
        case CRUSH_BUCKET_TREE:
-               if (p & 1)
-                       return ((struct crush_bucket_tree *)b)->node_weights[p];
-               return 0;
+               return ((struct crush_bucket_tree *)b)->node_weights[crush_calc_tree_node(p)];
        case CRUSH_BUCKET_STRAW:
                return ((struct crush_bucket_straw *)b)->item_weights[p];
        }
        return 0;
 }
 
-/**
- * crush_calc_parents - Calculate parent vectors for the given crush map.
- * @map: crush_map pointer
- */
-void crush_calc_parents(struct crush_map *map)
-{
-       int i, b, c;
-
-       for (b = 0; b < map->max_buckets; b++) {
-               if (map->buckets[b] == NULL)
-                       continue;
-               for (i = 0; i < map->buckets[b]->size; i++) {
-                       c = map->buckets[b]->items[i];
-                       BUG_ON(c >= map->max_devices ||
-                              c < -map->max_buckets);
-                       if (c >= 0)
-                               map->device_parents[c] = map->buckets[b]->id;
-                       else
-                               map->bucket_parents[-1-c] = map->buckets[b]->id;
-               }
-       }
-}
-
 void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b)
 {
        kfree(b->h.perm);
@@ -87,6 +62,8 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b)
 
 void crush_destroy_bucket_tree(struct crush_bucket_tree *b)
 {
+       kfree(b->h.perm);
+       kfree(b->h.items);
        kfree(b->node_weights);
        kfree(b);
 }
@@ -124,10 +101,9 @@ void crush_destroy_bucket(struct crush_bucket *b)
  */
 void crush_destroy(struct crush_map *map)
 {
-       int b;
-
        /* buckets */
        if (map->buckets) {
+               __s32 b;
                for (b = 0; b < map->max_buckets; b++) {
                        if (map->buckets[b] == NULL)
                                continue;
@@ -138,13 +114,12 @@ void crush_destroy(struct crush_map *map)
 
        /* rules */
        if (map->rules) {
+               __u32 b;
                for (b = 0; b < map->max_rules; b++)
                        kfree(map->rules[b]);
                kfree(map->rules);
        }
 
-       kfree(map->bucket_parents);
-       kfree(map->device_parents);
        kfree(map);
 }
 
index 363f8f7e6c3caa15fa03d0bcea1967b731f2ae1d..d7edc24333b84d5aab17da2d983878ff5044b2bb 100644 (file)
@@ -33,9 +33,9 @@
  * @type: storage ruleset type (user defined)
  * @size: output set size
  */
-int crush_find_rule(struct crush_map *map, int ruleset, int type, int size)
+int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size)
 {
-       int i;
+       __u32 i;
 
        for (i = 0; i < map->max_rules; i++) {
                if (map->rules[i] &&
@@ -73,7 +73,7 @@ static int bucket_perm_choose(struct crush_bucket *bucket,
        unsigned int i, s;
 
        /* start a new permutation if @x has changed */
-       if (bucket->perm_x != x || bucket->perm_n == 0) {
+       if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) {
                dprintk("bucket %d new x=%d\n", bucket->id, x);
                bucket->perm_x = x;
 
@@ -153,8 +153,8 @@ static int bucket_list_choose(struct crush_bucket_list *bucket,
                        return bucket->h.items[i];
        }
 
-       BUG_ON(1);
-       return 0;
+       dprintk("bad list sums for bucket %d\n", bucket->h.id);
+       return bucket->h.items[0];
 }
 
 
@@ -220,7 +220,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
 static int bucket_straw_choose(struct crush_bucket_straw *bucket,
                               int x, int r)
 {
-       int i;
+       __u32 i;
        int high = 0;
        __u64 high_draw = 0;
        __u64 draw;
@@ -240,6 +240,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
 static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
 {
        dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
+       BUG_ON(in->size == 0);
        switch (in->alg) {
        case CRUSH_BUCKET_UNIFORM:
                return bucket_uniform_choose((struct crush_bucket_uniform *)in,
@@ -254,7 +255,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
                return bucket_straw_choose((struct crush_bucket_straw *)in,
                                           x, r);
        default:
-               BUG_ON(1);
+               dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
                return in->items[0];
        }
 }
@@ -263,7 +264,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
  * true if device is marked "out" (failed, fully offloaded)
  * of the cluster
  */
-static int is_out(struct crush_map *map, __u32 *weight, int item, int x)
+static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x)
 {
        if (weight[item] >= 0x10000)
                return 0;
@@ -288,16 +289,16 @@ static int is_out(struct crush_map *map, __u32 *weight, int item, int x)
  * @recurse_to_leaf: true if we want one device under each item of given type
  * @out2: second output vector for leaf items (if @recurse_to_leaf)
  */
-static int crush_choose(struct crush_map *map,
+static int crush_choose(const struct crush_map *map,
                        struct crush_bucket *bucket,
-                       __u32 *weight,
+                       const __u32 *weight,
                        int x, int numrep, int type,
                        int *out, int outpos,
                        int firstn, int recurse_to_leaf,
                        int *out2)
 {
        int rep;
-       int ftotal, flocal;
+       unsigned int ftotal, flocal;
        int retry_descent, retry_bucket, skip_rep;
        struct crush_bucket *in = bucket;
        int r;
@@ -305,7 +306,7 @@ static int crush_choose(struct crush_map *map,
        int item = 0;
        int itemtype;
        int collide, reject;
-       const int orig_tries = 5; /* attempts before we fall back to search */
+       const unsigned int orig_tries = 5; /* attempts before we fall back to search */
 
        dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
                bucket->id, x, outpos, numrep);
@@ -326,7 +327,7 @@ static int crush_choose(struct crush_map *map,
                                r = rep;
                                if (in->alg == CRUSH_BUCKET_UNIFORM) {
                                        /* be careful */
-                                       if (firstn || numrep >= in->size)
+                                       if (firstn || (__u32)numrep >= in->size)
                                                /* r' = r + f_total */
                                                r += ftotal;
                                        else if (in->size % numrep == 0)
@@ -355,7 +356,11 @@ static int crush_choose(struct crush_map *map,
                                        item = bucket_perm_choose(in, x, r);
                                else
                                        item = crush_bucket_choose(in, x, r);
-                               BUG_ON(item >= map->max_devices);
+                               if (item >= map->max_devices) {
+                                       dprintk("   bad item %d\n", item);
+                                       skip_rep = 1;
+                                       break;
+                               }
 
                                /* desired type? */
                                if (item < 0)
@@ -366,8 +371,12 @@ static int crush_choose(struct crush_map *map,
 
                                /* keep going? */
                                if (itemtype != type) {
-                                       BUG_ON(item >= 0 ||
-                                              (-1-item) >= map->max_buckets);
+                                       if (item >= 0 ||
+                                           (-1-item) >= map->max_buckets) {
+                                               dprintk("   bad item type %d\n", type);
+                                               skip_rep = 1;
+                                               break;
+                                       }
                                        in = map->buckets[-1-item];
                                        retry_bucket = 1;
                                        continue;
@@ -416,7 +425,7 @@ reject:
                                        if (collide && flocal < 3)
                                                /* retry locally a few times */
                                                retry_bucket = 1;
-                                       else if (flocal < in->size + orig_tries)
+                                       else if (flocal <= in->size + orig_tries)
                                                /* exhaustive bucket search */
                                                retry_bucket = 1;
                                        else if (ftotal < 20)
@@ -426,7 +435,7 @@ reject:
                                                /* else give up */
                                                skip_rep = 1;
                                        dprintk("  reject %d  collide %d  "
-                                               "ftotal %d  flocal %d\n",
+                                               "ftotal %u  flocal %u\n",
                                                reject, collide, ftotal,
                                                flocal);
                                }
@@ -455,15 +464,12 @@ reject:
  * @x: hash input
  * @result: pointer to result vector
  * @result_max: maximum result size
- * @force: force initial replica choice; -1 for none
  */
-int crush_do_rule(struct crush_map *map,
+int crush_do_rule(const struct crush_map *map,
                  int ruleno, int x, int *result, int result_max,
-                 int force, __u32 *weight)
+                 const __u32 *weight)
 {
        int result_len;
-       int force_context[CRUSH_MAX_DEPTH];
-       int force_pos = -1;
        int a[CRUSH_MAX_SET];
        int b[CRUSH_MAX_SET];
        int c[CRUSH_MAX_SET];
@@ -474,66 +480,44 @@ int crush_do_rule(struct crush_map *map,
        int osize;
        int *tmp;
        struct crush_rule *rule;
-       int step;
+       __u32 step;
        int i, j;
        int numrep;
        int firstn;
 
-       BUG_ON(ruleno >= map->max_rules);
+       if ((__u32)ruleno >= map->max_rules) {
+               dprintk(" bad ruleno %d\n", ruleno);
+               return 0;
+       }
 
        rule = map->rules[ruleno];
        result_len = 0;
        w = a;
        o = b;
 
-       /*
-        * determine hierarchical context of force, if any.  note
-        * that this may or may not correspond to the specific types
-        * referenced by the crush rule.
-        */
-       if (force >= 0 &&
-           force < map->max_devices &&
-           map->device_parents[force] != 0 &&
-           !is_out(map, weight, force, x)) {
-               while (1) {
-                       force_context[++force_pos] = force;
-                       if (force >= 0)
-                               force = map->device_parents[force];
-                       else
-                               force = map->bucket_parents[-1-force];
-                       if (force == 0)
-                               break;
-               }
-       }
-
        for (step = 0; step < rule->len; step++) {
+               struct crush_rule_step *curstep = &rule->steps[step];
+
                firstn = 0;
-               switch (rule->steps[step].op) {
+               switch (curstep->op) {
                case CRUSH_RULE_TAKE:
-                       w[0] = rule->steps[step].arg1;
-
-                       /* find position in force_context/hierarchy */
-                       while (force_pos >= 0 &&
-                              force_context[force_pos] != w[0])
-                               force_pos--;
-                       /* and move past it */
-                       if (force_pos >= 0)
-                               force_pos--;
-
+                       w[0] = curstep->arg1;
                        wsize = 1;
                        break;
 
                case CRUSH_RULE_CHOOSE_LEAF_FIRSTN:
                case CRUSH_RULE_CHOOSE_FIRSTN:
                        firstn = 1;
+                       /* fall through */
                case CRUSH_RULE_CHOOSE_LEAF_INDEP:
                case CRUSH_RULE_CHOOSE_INDEP:
-                       BUG_ON(wsize == 0);
+                       if (wsize == 0)
+                               break;
 
                        recurse_to_leaf =
-                               rule->steps[step].op ==
+                               curstep->op ==
                                 CRUSH_RULE_CHOOSE_LEAF_FIRSTN ||
-                               rule->steps[step].op ==
+                               curstep->op ==
                                CRUSH_RULE_CHOOSE_LEAF_INDEP;
 
                        /* reset output */
@@ -545,32 +529,18 @@ int crush_do_rule(struct crush_map *map,
                                 * basically, numrep <= 0 means relative to
                                 * the provided result_max
                                 */
-                               numrep = rule->steps[step].arg1;
+                               numrep = curstep->arg1;
                                if (numrep <= 0) {
                                        numrep += result_max;
                                        if (numrep <= 0)
                                                continue;
                                }
                                j = 0;
-                               if (osize == 0 && force_pos >= 0) {
-                                       /* skip any intermediate types */
-                                       while (force_pos &&
-                                              force_context[force_pos] < 0 &&
-                                              rule->steps[step].arg2 !=
-                                              map->buckets[-1 -
-                                              force_context[force_pos]]->type)
-                                               force_pos--;
-                                       o[osize] = force_context[force_pos];
-                                       if (recurse_to_leaf)
-                                               c[osize] = force_context[0];
-                                       j++;
-                                       force_pos--;
-                               }
                                osize += crush_choose(map,
                                                      map->buckets[-1-w[i]],
                                                      weight,
                                                      x, numrep,
-                                                     rule->steps[step].arg2,
+                                                     curstep->arg2,
                                                      o+osize, j,
                                                      firstn,
                                                      recurse_to_leaf, c+osize);
@@ -597,7 +567,9 @@ int crush_do_rule(struct crush_map *map,
                        break;
 
                default:
-                       BUG_ON(1);
+                       dprintk(" unknown op %d at step %d\n",
+                               curstep->op, step);
+                       break;
                }
        }
        return result_len;
index 36fa6bf684981688ff95c22788acb8db721271de..524f4e4f598b845a7242c0243efb1a4e6a843955 100644 (file)
@@ -653,54 +653,57 @@ static void prepare_write_keepalive(struct ceph_connection *con)
  * Connection negotiation.
  */
 
-static int prepare_connect_authorizer(struct ceph_connection *con)
+static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con,
+                                               int *auth_proto)
 {
-       void *auth_buf;
-       int auth_len = 0;
-       int auth_protocol = 0;
+       struct ceph_auth_handshake *auth;
+
+       if (!con->ops->get_authorizer) {
+               con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
+               con->out_connect.authorizer_len = 0;
+
+               return NULL;
+       }
+
+       /* Can't hold the mutex while getting authorizer */
 
        mutex_unlock(&con->mutex);
-       if (con->ops->get_authorizer)
-               con->ops->get_authorizer(con, &auth_buf, &auth_len,
-                                        &auth_protocol, &con->auth_reply_buf,
-                                        &con->auth_reply_buf_len,
-                                        con->auth_retry);
+
+       auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
+
        mutex_lock(&con->mutex);
 
-       if (test_bit(CLOSED, &con->state) ||
-           test_bit(OPENING, &con->state))
-               return -EAGAIN;
+       if (IS_ERR(auth))
+               return auth;
+       if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
+               return ERR_PTR(-EAGAIN);
 
-       con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
-       con->out_connect.authorizer_len = cpu_to_le32(auth_len);
+       con->auth_reply_buf = auth->authorizer_reply_buf;
+       con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
 
-       if (auth_len)
-               ceph_con_out_kvec_add(con, auth_len, auth_buf);
 
-       return 0;
+       return auth;
 }
 
 /*
  * We connected to a peer and are saying hello.
  */
-static void prepare_write_banner(struct ceph_messenger *msgr,
-                                struct ceph_connection *con)
+static void prepare_write_banner(struct ceph_connection *con)
 {
-       ceph_con_out_kvec_reset(con);
        ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
-       ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
-                                       &msgr->my_enc_addr);
+       ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
+                                       &con->msgr->my_enc_addr);
 
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 }
 
-static int prepare_write_connect(struct ceph_messenger *msgr,
-                                struct ceph_connection *con,
-                                int include_banner)
+static int prepare_write_connect(struct ceph_connection *con)
 {
        unsigned int global_seq = get_global_seq(con->msgr, 0);
        int proto;
+       int auth_proto;
+       struct ceph_auth_handshake *auth;
 
        switch (con->peer_name.type) {
        case CEPH_ENTITY_TYPE_MON:
@@ -719,23 +722,32 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
             con->connect_seq, global_seq, proto);
 
-       con->out_connect.features = cpu_to_le64(msgr->supported_features);
+       con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
        con->out_connect.protocol_version = cpu_to_le32(proto);
        con->out_connect.flags = 0;
 
-       if (include_banner)
-               prepare_write_banner(msgr, con);
-       else
-               ceph_con_out_kvec_reset(con);
-       ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
+       auth_proto = CEPH_AUTH_UNKNOWN;
+       auth = get_connect_authorizer(con, &auth_proto);
+       if (IS_ERR(auth))
+               return PTR_ERR(auth);
+
+       con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
+       con->out_connect.authorizer_len = auth ?
+               cpu_to_le32(auth->authorizer_buf_len) : 0;
+
+       ceph_con_out_kvec_add(con, sizeof (con->out_connect),
+                                       &con->out_connect);
+       if (auth && auth->authorizer_buf_len)
+               ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
+                                       auth->authorizer_buf);
 
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 
-       return prepare_connect_authorizer(con);
+       return 0;
 }
 
 /*
@@ -992,11 +1004,10 @@ static int prepare_read_message(struct ceph_connection *con)
 
 
 static int read_partial(struct ceph_connection *con,
-                       int *to, int size, void *object)
+                       int end, int size, void *object)
 {
-       *to += size;
-       while (con->in_base_pos < *to) {
-               int left = *to - con->in_base_pos;
+       while (con->in_base_pos < end) {
+               int left = end - con->in_base_pos;
                int have = size - left;
                int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
                if (ret <= 0)
@@ -1012,37 +1023,52 @@ static int read_partial(struct ceph_connection *con,
  */
 static int read_partial_banner(struct ceph_connection *con)
 {
-       int ret, to = 0;
+       int size;
+       int end;
+       int ret;
 
        dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
 
        /* peer's banner */
-       ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
+       size = strlen(CEPH_BANNER);
+       end = size;
+       ret = read_partial(con, end, size, con->in_banner);
        if (ret <= 0)
                goto out;
-       ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
-                          &con->actual_peer_addr);
+
+       size = sizeof (con->actual_peer_addr);
+       end += size;
+       ret = read_partial(con, end, size, &con->actual_peer_addr);
        if (ret <= 0)
                goto out;
-       ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
-                          &con->peer_addr_for_me);
+
+       size = sizeof (con->peer_addr_for_me);
+       end += size;
+       ret = read_partial(con, end, size, &con->peer_addr_for_me);
        if (ret <= 0)
                goto out;
+
 out:
        return ret;
 }
 
 static int read_partial_connect(struct ceph_connection *con)
 {
-       int ret, to = 0;
+       int size;
+       int end;
+       int ret;
 
        dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
 
-       ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
+       size = sizeof (con->in_reply);
+       end = size;
+       ret = read_partial(con, end, size, &con->in_reply);
        if (ret <= 0)
                goto out;
-       ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
-                          con->auth_reply_buf);
+
+       size = le32_to_cpu(con->in_reply.authorizer_len);
+       end += size;
+       ret = read_partial(con, end, size, con->auth_reply_buf);
        if (ret <= 0)
                goto out;
 
@@ -1377,7 +1403,8 @@ static int process_connect(struct ceph_connection *con)
                        return -1;
                }
                con->auth_retry = 1;
-               ret = prepare_write_connect(con->msgr, con, 0);
+               ceph_con_out_kvec_reset(con);
+               ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
                prepare_read_connect(con);
@@ -1397,7 +1424,10 @@ static int process_connect(struct ceph_connection *con)
                       ENTITY_NAME(con->peer_name),
                       ceph_pr_addr(&con->peer_addr.in_addr));
                reset_connection(con);
-               prepare_write_connect(con->msgr, con, 0);
+               ceph_con_out_kvec_reset(con);
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       return ret;
                prepare_read_connect(con);
 
                /* Tell ceph about it. */
@@ -1420,7 +1450,10 @@ static int process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->out_connect.connect_seq),
                     le32_to_cpu(con->in_connect.connect_seq));
                con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
-               prepare_write_connect(con->msgr, con, 0);
+               ceph_con_out_kvec_reset(con);
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       return ret;
                prepare_read_connect(con);
                break;
 
@@ -1434,7 +1467,10 @@ static int process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->in_connect.global_seq));
                get_global_seq(con->msgr,
                               le32_to_cpu(con->in_connect.global_seq));
-               prepare_write_connect(con->msgr, con, 0);
+               ceph_con_out_kvec_reset(con);
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       return ret;
                prepare_read_connect(con);
                break;
 
@@ -1491,10 +1527,10 @@ static int process_connect(struct ceph_connection *con)
  */
 static int read_partial_ack(struct ceph_connection *con)
 {
-       int to = 0;
+       int size = sizeof (con->in_temp_ack);
+       int end = size;
 
-       return read_partial(con, &to, sizeof(con->in_temp_ack),
-                           &con->in_temp_ack);
+       return read_partial(con, end, size, &con->in_temp_ack);
 }
 
 
@@ -1627,8 +1663,9 @@ static int read_partial_message_bio(struct ceph_connection *con,
 static int read_partial_message(struct ceph_connection *con)
 {
        struct ceph_msg *m = con->in_msg;
+       int size;
+       int end;
        int ret;
-       int to, left;
        unsigned int front_len, middle_len, data_len;
        bool do_datacrc = !con->msgr->nocrc;
        int skip;
@@ -1638,15 +1675,11 @@ static int read_partial_message(struct ceph_connection *con)
        dout("read_partial_message con %p msg %p\n", con, m);
 
        /* header */
-       while (con->in_base_pos < sizeof(con->in_hdr)) {
-               left = sizeof(con->in_hdr) - con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock,
-                                      (char *)&con->in_hdr + con->in_base_pos,
-                                      left);
-               if (ret <= 0)
-                       return ret;
-               con->in_base_pos += ret;
-       }
+       size = sizeof (con->in_hdr);
+       end = size;
+       ret = read_partial(con, end, size, &con->in_hdr);
+       if (ret <= 0)
+               return ret;
 
        crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
        if (cpu_to_le32(crc) != con->in_hdr.crc) {
@@ -1759,16 +1792,12 @@ static int read_partial_message(struct ceph_connection *con)
        }
 
        /* footer */
-       to = sizeof(m->hdr) + sizeof(m->footer);
-       while (con->in_base_pos < to) {
-               left = to - con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
-                                      (con->in_base_pos - sizeof(m->hdr)),
-                                      left);
-               if (ret <= 0)
-                       return ret;
-               con->in_base_pos += ret;
-       }
+       size = sizeof (m->footer);
+       end += size;
+       ret = read_partial(con, end, size, &m->footer);
+       if (ret <= 0)
+               return ret;
+
        dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
             m, front_len, m->footer.front_crc, middle_len,
             m->footer.middle_crc, data_len, m->footer.data_crc);
@@ -1835,7 +1864,6 @@ static void process_message(struct ceph_connection *con)
  */
 static int try_write(struct ceph_connection *con)
 {
-       struct ceph_messenger *msgr = con->msgr;
        int ret = 1;
 
        dout("try_write start %p state %lu nref %d\n", con, con->state,
@@ -1846,7 +1874,11 @@ more:
 
        /* open the socket first? */
        if (con->sock == NULL) {
-               prepare_write_connect(msgr, con, 1);
+               ceph_con_out_kvec_reset(con);
+               prepare_write_banner(con);
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       goto out;
                prepare_read_banner(con);
                set_bit(CONNECTING, &con->state);
                clear_bit(NEGOTIATING, &con->state);
index 1b0ef3c4d393c5221d30c15eb24b935ee292e3fc..1ffebed5ce0f9a629ad2733349b8e33c326850d5 100644 (file)
@@ -278,7 +278,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 {
        dst->op = cpu_to_le16(src->op);
 
-       switch (dst->op) {
+       switch (src->op) {
        case CEPH_OSD_OP_READ:
        case CEPH_OSD_OP_WRITE:
                dst->extent.offset =
@@ -664,11 +664,11 @@ static void put_osd(struct ceph_osd *osd)
 {
        dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
             atomic_read(&osd->o_ref) - 1);
-       if (atomic_dec_and_test(&osd->o_ref)) {
+       if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
                struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
 
-               if (osd->o_authorizer)
-                       ac->ops->destroy_authorizer(ac, osd->o_authorizer);
+               if (ac->ops && ac->ops->destroy_authorizer)
+                       ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
                kfree(osd);
        }
 }
@@ -841,6 +841,12 @@ static void register_request(struct ceph_osd_client *osdc,
 static void __unregister_request(struct ceph_osd_client *osdc,
                                 struct ceph_osd_request *req)
 {
+       if (RB_EMPTY_NODE(&req->r_node)) {
+               dout("__unregister_request %p tid %lld not registered\n",
+                       req, req->r_tid);
+               return;
+       }
+
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
        rb_erase(&req->r_node, &osdc->requests);
        osdc->num_requests--;
@@ -2108,37 +2114,32 @@ static void put_osd_con(struct ceph_connection *con)
 /*
  * authentication
  */
-static int get_authorizer(struct ceph_connection *con,
-                         void **buf, int *len, int *proto,
-                         void **reply_buf, int *reply_len, int force_new)
+/*
+ * Note: returned pointer is the address of a structure that's
+ * managed separately.  Caller must *not* attempt to free it.
+ */
+static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
+                                       int *proto, int force_new)
 {
        struct ceph_osd *o = con->private;
        struct ceph_osd_client *osdc = o->o_osdc;
        struct ceph_auth_client *ac = osdc->client->monc.auth;
-       int ret = 0;
+       struct ceph_auth_handshake *auth = &o->o_auth;
 
-       if (force_new && o->o_authorizer) {
-               ac->ops->destroy_authorizer(ac, o->o_authorizer);
-               o->o_authorizer = NULL;
-       }
-       if (o->o_authorizer == NULL) {
-               ret = ac->ops->create_authorizer(
-                       ac, CEPH_ENTITY_TYPE_OSD,
-                       &o->o_authorizer,
-                       &o->o_authorizer_buf,
-                       &o->o_authorizer_buf_len,
-                       &o->o_authorizer_reply_buf,
-                       &o->o_authorizer_reply_buf_len);
+       if (force_new && auth->authorizer) {
+               if (ac->ops && ac->ops->destroy_authorizer)
+                       ac->ops->destroy_authorizer(ac, auth->authorizer);
+               auth->authorizer = NULL;
+       }
+       if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
+               int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
+                                                       auth);
                if (ret)
-                       return ret;
+                       return ERR_PTR(ret);
        }
-
        *proto = ac->protocol;
-       *buf = o->o_authorizer_buf;
-       *len = o->o_authorizer_buf_len;
-       *reply_buf = o->o_authorizer_reply_buf;
-       *reply_len = o->o_authorizer_reply_buf_len;
-       return 0;
+
+       return auth;
 }
 
 
@@ -2148,7 +2149,11 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
        struct ceph_osd_client *osdc = o->o_osdc;
        struct ceph_auth_client *ac = osdc->client->monc.auth;
 
-       return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
+       /*
+        * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
+        * XXX which do we do:  succeed or fail?
+        */
+       return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
 }
 
 static int invalidate_authorizer(struct ceph_connection *con)
@@ -2157,7 +2162,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
        struct ceph_osd_client *osdc = o->o_osdc;
        struct ceph_auth_client *ac = osdc->client->monc.auth;
 
-       if (ac->ops->invalidate_authorizer)
+       if (ac->ops && ac->ops->invalidate_authorizer)
                ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
 
        return ceph_monc_validate_auth(&osdc->client->monc);
index 56e561a690044ee88b7fe22d4ed51c09910c8e99..81e3b84a77efdecb6c44603e7784a083fe94b980 100644 (file)
@@ -161,13 +161,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
        c->max_rules = ceph_decode_32(p);
        c->max_devices = ceph_decode_32(p);
 
-       c->device_parents = kcalloc(c->max_devices, sizeof(u32), GFP_NOFS);
-       if (c->device_parents == NULL)
-               goto badmem;
-       c->bucket_parents = kcalloc(c->max_buckets, sizeof(u32), GFP_NOFS);
-       if (c->bucket_parents == NULL)
-               goto badmem;
-
        c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
        if (c->buckets == NULL)
                goto badmem;
@@ -890,8 +883,12 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
                pglen = ceph_decode_32(p);
 
                if (pglen) {
-                       /* insert */
                        ceph_decode_need(p, end, pglen*sizeof(u32), bad);
+
+                       /* removing existing (if any) */
+                       (void) __remove_pg_mapping(&map->pg_temp, pgid);
+
+                       /* insert */
                        pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
                        if (!pg) {
                                err = -ENOMEM;
@@ -1000,7 +997,6 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol,
 {
        unsigned int num, num_mask;
        struct ceph_pg pgid;
-       s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred);
        int poolid = le32_to_cpu(fl->fl_pg_pool);
        struct ceph_pg_pool_info *pool;
        unsigned int ps;
@@ -1011,23 +1007,13 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol,
        if (!pool)
                return -EIO;
        ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid));
-       if (preferred >= 0) {
-               ps += preferred;
-               num = le32_to_cpu(pool->v.lpg_num);
-               num_mask = pool->lpg_num_mask;
-       } else {
-               num = le32_to_cpu(pool->v.pg_num);
-               num_mask = pool->pg_num_mask;
-       }
+       num = le32_to_cpu(pool->v.pg_num);
+       num_mask = pool->pg_num_mask;
 
        pgid.ps = cpu_to_le16(ps);
-       pgid.preferred = cpu_to_le16(preferred);
+       pgid.preferred = cpu_to_le16(-1);
        pgid.pool = fl->fl_pg_pool;
-       if (preferred >= 0)
-               dout("calc_object_layout '%s' pgid %d.%xp%d\n", oid, poolid, ps,
-                    (int)preferred);
-       else
-               dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
+       dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
 
        ol->ol_pgid = pgid;
        ol->ol_stripe_unit = fl->fl_object_stripe_unit;
@@ -1045,24 +1031,18 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
        struct ceph_pg_mapping *pg;
        struct ceph_pg_pool_info *pool;
        int ruleno;
-       unsigned int poolid, ps, pps, t;
-       int preferred;
+       unsigned int poolid, ps, pps, t, r;
 
        poolid = le32_to_cpu(pgid.pool);
        ps = le16_to_cpu(pgid.ps);
-       preferred = (s16)le16_to_cpu(pgid.preferred);
 
        pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
        if (!pool)
                return NULL;
 
        /* pg_temp? */
-       if (preferred >= 0)
-               t = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpg_num),
-                                   pool->lpgp_num_mask);
-       else
-               t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
-                                   pool->pgp_num_mask);
+       t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
+                           pool->pgp_num_mask);
        pgid.ps = cpu_to_le16(t);
        pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
        if (pg) {
@@ -1080,23 +1060,20 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
                return NULL;
        }
 
-       /* don't forcefeed bad device ids to crush */
-       if (preferred >= osdmap->max_osd ||
-           preferred >= osdmap->crush->max_devices)
-               preferred = -1;
-
-       if (preferred >= 0)
-               pps = ceph_stable_mod(ps,
-                                     le32_to_cpu(pool->v.lpgp_num),
-                                     pool->lpgp_num_mask);
-       else
-               pps = ceph_stable_mod(ps,
-                                     le32_to_cpu(pool->v.pgp_num),
-                                     pool->pgp_num_mask);
+       pps = ceph_stable_mod(ps,
+                             le32_to_cpu(pool->v.pgp_num),
+                             pool->pgp_num_mask);
        pps += poolid;
-       *num = crush_do_rule(osdmap->crush, ruleno, pps, osds,
-                            min_t(int, pool->v.size, *num),
-                            preferred, osdmap->osd_weight);
+       r = crush_do_rule(osdmap->crush, ruleno, pps, osds,
+                         min_t(int, pool->v.size, *num),
+                         osdmap->osd_weight);
+       if (r < 0) {
+               pr_err("error %d from crush rule: pool %d ruleset %d type %d"
+                      " size %d\n", r, poolid, pool->v.crush_ruleset,
+                      pool->v.type, pool->v.size);
+               return NULL;
+       }
+       *num = r;
        return osds;
 }