Merge git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 30 May 2012 18:17:19 +0000 (11:17 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 30 May 2012 18:17:19 +0000 (11:17 -0700)
Pull ceph updates from Sage Weil:
 "There are some updates and cleanups to the CRUSH placement code, a bug
  fix with incremental maps, several cleanups and fixes from Josh Durgin
  in the RBD block device code, a series of cleanups and bug fixes from
  Alex Elder in the messenger code, and some miscellaneous bounds
  checking and gfp cleanups/fixes."

Fix up trivial conflicts in net/ceph/{messenger.c,osdmap.c} due to the
networking people preferring "unsigned int" over just "unsigned".

* git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (45 commits)
  libceph: fix pg_temp updates
  libceph: avoid unregistering osd request when not registered
  ceph: add auth buf in prepare_write_connect()
  ceph: rename prepare_connect_authorizer()
  ceph: return pointer from prepare_connect_authorizer()
  ceph: use info returned by get_authorizer
  ceph: have get_authorizer methods return pointers
  ceph: ensure auth ops are defined before use
  ceph: messenger: reduce args to create_authorizer
  ceph: define ceph_auth_handshake type
  ceph: messenger: check return from get_authorizer
  ceph: messenger: rework prepare_connect_authorizer()
  ceph: messenger: check prepare_write_connect() result
  ceph: don't set WRITE_PENDING too early
  ceph: drop msgr argument from prepare_write_connect()
  ceph: messenger: send banner in process_connect()
  ceph: messenger: reset connection kvec caller
  libceph: don't reset kvec in prepare_write_banner()
  ceph: ignore preferred_osd field
  ceph: fully initialize new layout
  ...

1  2 
include/linux/ceph/decode.h
net/ceph/crush/mapper.c
net/ceph/messenger.c
net/ceph/osd_client.c
net/ceph/osdmap.c

index 220ae21e819b1fb2623d19d8cf4f619862f11c42,ecf324eb2c9a63b2a5a83e7f727555a9eb499440..d8615dee5808d3f55c93a38c6fdb66113f09a691
@@@ -1,9 -1,8 +1,9 @@@
  #ifndef __CEPH_DECODE_H
  #define __CEPH_DECODE_H
  
 -#include <asm/unaligned.h>
 +#include <linux/bug.h>
  #include <linux/time.h>
 +#include <asm/unaligned.h>
  
  #include "types.h"
  
@@@ -46,9 -45,14 +46,14 @@@ static inline void ceph_decode_copy(voi
  /*
   * bounds check input.
   */
+ static inline int ceph_has_room(void **p, void *end, size_t n)
+ {
+       return end >= *p && n <= end - *p;
+ }
  #define ceph_decode_need(p, end, n, bad)              \
        do {                                            \
-               if (unlikely(*(p) + (n) > (end)))       \
+               if (!likely(ceph_has_room(p, end, n)))  \
                        goto bad;                       \
        } while (0)
  
@@@ -167,7 -171,7 +172,7 @@@ static inline void ceph_encode_string(v
  
  #define ceph_encode_need(p, end, n, bad)              \
        do {                                            \
-               if (unlikely(*(p) + (n) > (end)))       \
+               if (!likely(ceph_has_room(p, end, n)))  \
                        goto bad;                       \
        } while (0)
  
diff --combined net/ceph/crush/mapper.c
index 363f8f7e6c3caa15fa03d0bcea1967b731f2ae1d,11cf352201ba0aebc2ee41921e0b2f62afe8adbe..d7edc24333b84d5aab17da2d983878ff5044b2bb
@@@ -20,7 -20,6 +20,7 @@@
  
  #include <linux/crush/crush.h>
  #include <linux/crush/hash.h>
 +#include <linux/crush/mapper.h>
  
  /*
   * Implement the core CRUSH mapping algorithm.
@@@ -33,9 -32,9 +33,9 @@@
   * @type: storage ruleset type (user defined)
   * @size: output set size
   */
- int crush_find_rule(struct crush_map *map, int ruleset, int type, int size)
+ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size)
  {
-       int i;
+       __u32 i;
  
        for (i = 0; i < map->max_rules; i++) {
                if (map->rules[i] &&
  static int bucket_perm_choose(struct crush_bucket *bucket,
                              int x, int r)
  {
 -      unsigned pr = r % bucket->size;
 -      unsigned i, s;
 +      unsigned int pr = r % bucket->size;
 +      unsigned int i, s;
  
        /* start a new permutation if @x has changed */
-       if (bucket->perm_x != x || bucket->perm_n == 0) {
+       if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) {
                dprintk("bucket %d new x=%d\n", bucket->id, x);
                bucket->perm_x = x;
  
        for (i = 0; i < bucket->perm_n; i++)
                dprintk(" perm_choose have %d: %d\n", i, bucket->perm[i]);
        while (bucket->perm_n <= pr) {
 -              unsigned p = bucket->perm_n;
 +              unsigned int p = bucket->perm_n;
                /* no point in swapping the final entry */
                if (p < bucket->size - 1) {
                        i = crush_hash32_3(bucket->hash, x, bucket->id, p) %
                                (bucket->size - p);
                        if (i) {
 -                              unsigned t = bucket->perm[p + i];
 +                              unsigned int t = bucket->perm[p + i];
                                bucket->perm[p + i] = bucket->perm[p];
                                bucket->perm[p] = t;
                        }
@@@ -153,8 -152,8 +153,8 @@@ static int bucket_list_choose(struct cr
                        return bucket->h.items[i];
        }
  
-       BUG_ON(1);
-       return 0;
+       dprintk("bad list sums for bucket %d\n", bucket->h.id);
+       return bucket->h.items[0];
  }
  
  
@@@ -220,7 -219,7 +220,7 @@@ static int bucket_tree_choose(struct cr
  static int bucket_straw_choose(struct crush_bucket_straw *bucket,
                               int x, int r)
  {
-       int i;
+       __u32 i;
        int high = 0;
        __u64 high_draw = 0;
        __u64 draw;
  static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
  {
        dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
+       BUG_ON(in->size == 0);
        switch (in->alg) {
        case CRUSH_BUCKET_UNIFORM:
                return bucket_uniform_choose((struct crush_bucket_uniform *)in,
                return bucket_straw_choose((struct crush_bucket_straw *)in,
                                           x, r);
        default:
-               BUG_ON(1);
+               dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
                return in->items[0];
        }
  }
   * true if device is marked "out" (failed, fully offloaded)
   * of the cluster
   */
- static int is_out(struct crush_map *map, __u32 *weight, int item, int x)
+ static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x)
  {
        if (weight[item] >= 0x10000)
                return 0;
   * @recurse_to_leaf: true if we want one device under each item of given type
   * @out2: second output vector for leaf items (if @recurse_to_leaf)
   */
- static int crush_choose(struct crush_map *map,
+ static int crush_choose(const struct crush_map *map,
                        struct crush_bucket *bucket,
-                       __u32 *weight,
+                       const __u32 *weight,
                        int x, int numrep, int type,
                        int *out, int outpos,
                        int firstn, int recurse_to_leaf,
                        int *out2)
  {
        int rep;
-       int ftotal, flocal;
+       unsigned int ftotal, flocal;
        int retry_descent, retry_bucket, skip_rep;
        struct crush_bucket *in = bucket;
        int r;
        int item = 0;
        int itemtype;
        int collide, reject;
-       const int orig_tries = 5; /* attempts before we fall back to search */
+       const unsigned int orig_tries = 5; /* attempts before we fall back to search */
  
        dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
                bucket->id, x, outpos, numrep);
                                r = rep;
                                if (in->alg == CRUSH_BUCKET_UNIFORM) {
                                        /* be careful */
-                                       if (firstn || numrep >= in->size)
+                                       if (firstn || (__u32)numrep >= in->size)
                                                /* r' = r + f_total */
                                                r += ftotal;
                                        else if (in->size % numrep == 0)
                                        item = bucket_perm_choose(in, x, r);
                                else
                                        item = crush_bucket_choose(in, x, r);
-                               BUG_ON(item >= map->max_devices);
+                               if (item >= map->max_devices) {
+                                       dprintk("   bad item %d\n", item);
+                                       skip_rep = 1;
+                                       break;
+                               }
  
                                /* desired type? */
                                if (item < 0)
  
                                /* keep going? */
                                if (itemtype != type) {
-                                       BUG_ON(item >= 0 ||
-                                              (-1-item) >= map->max_buckets);
+                                       if (item >= 0 ||
+                                           (-1-item) >= map->max_buckets) {
+                                               dprintk("   bad item type %d\n", type);
+                                               skip_rep = 1;
+                                               break;
+                                       }
                                        in = map->buckets[-1-item];
                                        retry_bucket = 1;
                                        continue;
@@@ -416,7 -424,7 +425,7 @@@ reject
                                        if (collide && flocal < 3)
                                                /* retry locally a few times */
                                                retry_bucket = 1;
-                                       else if (flocal < in->size + orig_tries)
+                                       else if (flocal <= in->size + orig_tries)
                                                /* exhaustive bucket search */
                                                retry_bucket = 1;
                                        else if (ftotal < 20)
                                                /* else give up */
                                                skip_rep = 1;
                                        dprintk("  reject %d  collide %d  "
-                                               "ftotal %d  flocal %d\n",
+                                               "ftotal %u  flocal %u\n",
                                                reject, collide, ftotal,
                                                flocal);
                                }
   * @x: hash input
   * @result: pointer to result vector
   * @result_max: maximum result size
-  * @force: force initial replica choice; -1 for none
   */
- int crush_do_rule(struct crush_map *map,
+ int crush_do_rule(const struct crush_map *map,
                  int ruleno, int x, int *result, int result_max,
-                 int force, __u32 *weight)
+                 const __u32 *weight)
  {
        int result_len;
-       int force_context[CRUSH_MAX_DEPTH];
-       int force_pos = -1;
        int a[CRUSH_MAX_SET];
        int b[CRUSH_MAX_SET];
        int c[CRUSH_MAX_SET];
        int osize;
        int *tmp;
        struct crush_rule *rule;
-       int step;
+       __u32 step;
        int i, j;
        int numrep;
        int firstn;
  
-       BUG_ON(ruleno >= map->max_rules);
+       if ((__u32)ruleno >= map->max_rules) {
+               dprintk(" bad ruleno %d\n", ruleno);
+               return 0;
+       }
  
        rule = map->rules[ruleno];
        result_len = 0;
        w = a;
        o = b;
  
-       /*
-        * determine hierarchical context of force, if any.  note
-        * that this may or may not correspond to the specific types
-        * referenced by the crush rule.
-        */
-       if (force >= 0 &&
-           force < map->max_devices &&
-           map->device_parents[force] != 0 &&
-           !is_out(map, weight, force, x)) {
-               while (1) {
-                       force_context[++force_pos] = force;
-                       if (force >= 0)
-                               force = map->device_parents[force];
-                       else
-                               force = map->bucket_parents[-1-force];
-                       if (force == 0)
-                               break;
-               }
-       }
        for (step = 0; step < rule->len; step++) {
+               struct crush_rule_step *curstep = &rule->steps[step];
                firstn = 0;
-               switch (rule->steps[step].op) {
+               switch (curstep->op) {
                case CRUSH_RULE_TAKE:
-                       w[0] = rule->steps[step].arg1;
-                       /* find position in force_context/hierarchy */
-                       while (force_pos >= 0 &&
-                              force_context[force_pos] != w[0])
-                               force_pos--;
-                       /* and move past it */
-                       if (force_pos >= 0)
-                               force_pos--;
+                       w[0] = curstep->arg1;
                        wsize = 1;
                        break;
  
                case CRUSH_RULE_CHOOSE_LEAF_FIRSTN:
                case CRUSH_RULE_CHOOSE_FIRSTN:
                        firstn = 1;
+                       /* fall through */
                case CRUSH_RULE_CHOOSE_LEAF_INDEP:
                case CRUSH_RULE_CHOOSE_INDEP:
-                       BUG_ON(wsize == 0);
+                       if (wsize == 0)
+                               break;
  
                        recurse_to_leaf =
-                               rule->steps[step].op ==
+                               curstep->op ==
                                 CRUSH_RULE_CHOOSE_LEAF_FIRSTN ||
-                               rule->steps[step].op ==
+                               curstep->op ==
                                CRUSH_RULE_CHOOSE_LEAF_INDEP;
  
                        /* reset output */
                                 * basically, numrep <= 0 means relative to
                                 * the provided result_max
                                 */
-                               numrep = rule->steps[step].arg1;
+                               numrep = curstep->arg1;
                                if (numrep <= 0) {
                                        numrep += result_max;
                                        if (numrep <= 0)
                                                continue;
                                }
                                j = 0;
-                               if (osize == 0 && force_pos >= 0) {
-                                       /* skip any intermediate types */
-                                       while (force_pos &&
-                                              force_context[force_pos] < 0 &&
-                                              rule->steps[step].arg2 !=
-                                              map->buckets[-1 -
-                                              force_context[force_pos]]->type)
-                                               force_pos--;
-                                       o[osize] = force_context[force_pos];
-                                       if (recurse_to_leaf)
-                                               c[osize] = force_context[0];
-                                       j++;
-                                       force_pos--;
-                               }
                                osize += crush_choose(map,
                                                      map->buckets[-1-w[i]],
                                                      weight,
                                                      x, numrep,
-                                                     rule->steps[step].arg2,
+                                                     curstep->arg2,
                                                      o+osize, j,
                                                      firstn,
                                                      recurse_to_leaf, c+osize);
                        break;
  
                default:
-                       BUG_ON(1);
+                       dprintk(" unknown op %d at step %d\n",
+                               curstep->op, step);
+                       break;
                }
        }
        return result_len;
diff --combined net/ceph/messenger.c
index 36fa6bf684981688ff95c22788acb8db721271de,1a80907282cc261137bf11f4d4bb8f4e8ede7419..524f4e4f598b845a7242c0243efb1a4e6a843955
@@@ -653,54 -653,57 +653,57 @@@ static void prepare_write_keepalive(str
   * Connection negotiation.
   */
  
- static int prepare_connect_authorizer(struct ceph_connection *con)
+ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con,
+                                               int *auth_proto)
  {
-       void *auth_buf;
-       int auth_len = 0;
-       int auth_protocol = 0;
+       struct ceph_auth_handshake *auth;
+       if (!con->ops->get_authorizer) {
+               con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
+               con->out_connect.authorizer_len = 0;
+               return NULL;
+       }
+       /* Can't hold the mutex while getting authorizer */
  
        mutex_unlock(&con->mutex);
-       if (con->ops->get_authorizer)
-               con->ops->get_authorizer(con, &auth_buf, &auth_len,
-                                        &auth_protocol, &con->auth_reply_buf,
-                                        &con->auth_reply_buf_len,
-                                        con->auth_retry);
+       auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
        mutex_lock(&con->mutex);
  
-       if (test_bit(CLOSED, &con->state) ||
-           test_bit(OPENING, &con->state))
-               return -EAGAIN;
+       if (IS_ERR(auth))
+               return auth;
+       if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
+               return ERR_PTR(-EAGAIN);
  
-       con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
-       con->out_connect.authorizer_len = cpu_to_le32(auth_len);
+       con->auth_reply_buf = auth->authorizer_reply_buf;
+       con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
  
-       if (auth_len)
-               ceph_con_out_kvec_add(con, auth_len, auth_buf);
  
-       return 0;
+       return auth;
  }
  
  /*
   * We connected to a peer and are saying hello.
   */
- static void prepare_write_banner(struct ceph_messenger *msgr,
-                                struct ceph_connection *con)
+ static void prepare_write_banner(struct ceph_connection *con)
  {
-       ceph_con_out_kvec_reset(con);
        ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
-       ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
-                                       &msgr->my_enc_addr);
+       ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
+                                       &con->msgr->my_enc_addr);
  
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
  }
  
- static int prepare_write_connect(struct ceph_messenger *msgr,
-                                struct ceph_connection *con,
-                                int include_banner)
+ static int prepare_write_connect(struct ceph_connection *con)
  {
 -      unsigned global_seq = get_global_seq(con->msgr, 0);
 +      unsigned int global_seq = get_global_seq(con->msgr, 0);
        int proto;
+       int auth_proto;
+       struct ceph_auth_handshake *auth;
  
        switch (con->peer_name.type) {
        case CEPH_ENTITY_TYPE_MON:
        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
             con->connect_seq, global_seq, proto);
  
-       con->out_connect.features = cpu_to_le64(msgr->supported_features);
+       con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
        con->out_connect.protocol_version = cpu_to_le32(proto);
        con->out_connect.flags = 0;
  
-       if (include_banner)
-               prepare_write_banner(msgr, con);
-       else
-               ceph_con_out_kvec_reset(con);
-       ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
+       auth_proto = CEPH_AUTH_UNKNOWN;
+       auth = get_connect_authorizer(con, &auth_proto);
+       if (IS_ERR(auth))
+               return PTR_ERR(auth);
+       con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
+       con->out_connect.authorizer_len = auth ?
+               cpu_to_le32(auth->authorizer_buf_len) : 0;
+       ceph_con_out_kvec_add(con, sizeof (con->out_connect),
+                                       &con->out_connect);
+       if (auth && auth->authorizer_buf_len)
+               ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
+                                       auth->authorizer_buf);
  
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
  
-       return prepare_connect_authorizer(con);
+       return 0;
  }
  
  /*
@@@ -816,7 -828,7 +828,7 @@@ static void iter_bio_next(struct bio **
  static int write_partial_msg_pages(struct ceph_connection *con)
  {
        struct ceph_msg *msg = con->out_msg;
 -      unsigned data_len = le32_to_cpu(msg->hdr.data_len);
 +      unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
        bool do_datacrc = !con->msgr->nocrc;
        int ret;
@@@ -992,11 -1004,10 +1004,10 @@@ static int prepare_read_message(struct 
  
  
  static int read_partial(struct ceph_connection *con,
-                       int *to, int size, void *object)
+                       int end, int size, void *object)
  {
-       *to += size;
-       while (con->in_base_pos < *to) {
-               int left = *to - con->in_base_pos;
+       while (con->in_base_pos < end) {
+               int left = end - con->in_base_pos;
                int have = size - left;
                int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
                if (ret <= 0)
   */
  static int read_partial_banner(struct ceph_connection *con)
  {
-       int ret, to = 0;
+       int size;
+       int end;
+       int ret;
  
        dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
  
        /* peer's banner */
-       ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
+       size = strlen(CEPH_BANNER);
+       end = size;
+       ret = read_partial(con, end, size, con->in_banner);
        if (ret <= 0)
                goto out;
-       ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
-                          &con->actual_peer_addr);
+       size = sizeof (con->actual_peer_addr);
+       end += size;
+       ret = read_partial(con, end, size, &con->actual_peer_addr);
        if (ret <= 0)
                goto out;
-       ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
-                          &con->peer_addr_for_me);
+       size = sizeof (con->peer_addr_for_me);
+       end += size;
+       ret = read_partial(con, end, size, &con->peer_addr_for_me);
        if (ret <= 0)
                goto out;
  out:
        return ret;
  }
  
  static int read_partial_connect(struct ceph_connection *con)
  {
-       int ret, to = 0;
+       int size;
+       int end;
+       int ret;
  
        dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
  
-       ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
+       size = sizeof (con->in_reply);
+       end = size;
+       ret = read_partial(con, end, size, &con->in_reply);
        if (ret <= 0)
                goto out;
-       ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
-                          con->auth_reply_buf);
+       size = le32_to_cpu(con->in_reply.authorizer_len);
+       end += size;
+       ret = read_partial(con, end, size, con->auth_reply_buf);
        if (ret <= 0)
                goto out;
  
@@@ -1377,7 -1403,8 +1403,8 @@@ static int process_connect(struct ceph_
                        return -1;
                }
                con->auth_retry = 1;
-               ret = prepare_write_connect(con->msgr, con, 0);
+               ceph_con_out_kvec_reset(con);
+               ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
                prepare_read_connect(con);
                       ENTITY_NAME(con->peer_name),
                       ceph_pr_addr(&con->peer_addr.in_addr));
                reset_connection(con);
-               prepare_write_connect(con->msgr, con, 0);
+               ceph_con_out_kvec_reset(con);
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       return ret;
                prepare_read_connect(con);
  
                /* Tell ceph about it. */
                     le32_to_cpu(con->out_connect.connect_seq),
                     le32_to_cpu(con->in_connect.connect_seq));
                con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
-               prepare_write_connect(con->msgr, con, 0);
+               ceph_con_out_kvec_reset(con);
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       return ret;
                prepare_read_connect(con);
                break;
  
                     le32_to_cpu(con->in_connect.global_seq));
                get_global_seq(con->msgr,
                               le32_to_cpu(con->in_connect.global_seq));
-               prepare_write_connect(con->msgr, con, 0);
+               ceph_con_out_kvec_reset(con);
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       return ret;
                prepare_read_connect(con);
                break;
  
   */
  static int read_partial_ack(struct ceph_connection *con)
  {
-       int to = 0;
+       int size = sizeof (con->in_temp_ack);
+       int end = size;
  
-       return read_partial(con, &to, sizeof(con->in_temp_ack),
-                           &con->in_temp_ack);
+       return read_partial(con, end, size, &con->in_temp_ack);
  }
  
  
@@@ -1554,7 -1590,7 +1590,7 @@@ static struct ceph_msg *ceph_alloc_msg(
  
  static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
 -                                    unsigned data_len, bool do_datacrc)
 +                                    unsigned int data_len, bool do_datacrc)
  {
        void *p;
        int ret;
  #ifdef CONFIG_BLOCK
  static int read_partial_message_bio(struct ceph_connection *con,
                                    struct bio **bio_iter, int *bio_seg,
 -                                  unsigned data_len, bool do_datacrc)
 +                                  unsigned int data_len, bool do_datacrc)
  {
        struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
        void *p;
  static int read_partial_message(struct ceph_connection *con)
  {
        struct ceph_msg *m = con->in_msg;
+       int size;
+       int end;
        int ret;
-       int to, left;
 -      unsigned front_len, middle_len, data_len;
 +      unsigned int front_len, middle_len, data_len;
        bool do_datacrc = !con->msgr->nocrc;
        int skip;
        u64 seq;
        dout("read_partial_message con %p msg %p\n", con, m);
  
        /* header */
-       while (con->in_base_pos < sizeof(con->in_hdr)) {
-               left = sizeof(con->in_hdr) - con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock,
-                                      (char *)&con->in_hdr + con->in_base_pos,
-                                      left);
-               if (ret <= 0)
-                       return ret;
-               con->in_base_pos += ret;
-       }
+       size = sizeof (con->in_hdr);
+       end = size;
+       ret = read_partial(con, end, size, &con->in_hdr);
+       if (ret <= 0)
+               return ret;
  
        crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
        if (cpu_to_le32(crc) != con->in_hdr.crc) {
        }
  
        /* footer */
-       to = sizeof(m->hdr) + sizeof(m->footer);
-       while (con->in_base_pos < to) {
-               left = to - con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
-                                      (con->in_base_pos - sizeof(m->hdr)),
-                                      left);
-               if (ret <= 0)
-                       return ret;
-               con->in_base_pos += ret;
-       }
+       size = sizeof (m->footer);
+       end += size;
+       ret = read_partial(con, end, size, &m->footer);
+       if (ret <= 0)
+               return ret;
        dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
             m, front_len, m->footer.front_crc, middle_len,
             m->footer.middle_crc, data_len, m->footer.data_crc);
@@@ -1835,7 -1864,6 +1864,6 @@@ static void process_message(struct ceph
   */
  static int try_write(struct ceph_connection *con)
  {
-       struct ceph_messenger *msgr = con->msgr;
        int ret = 1;
  
        dout("try_write start %p state %lu nref %d\n", con, con->state,
@@@ -1846,7 -1874,11 +1874,11 @@@ more
  
        /* open the socket first? */
        if (con->sock == NULL) {
-               prepare_write_connect(msgr, con, 1);
+               ceph_con_out_kvec_reset(con);
+               prepare_write_banner(con);
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       goto out;
                prepare_read_banner(con);
                set_bit(CONNECTING, &con->state);
                clear_bit(NEGOTIATING, &con->state);
@@@ -2345,9 -2377,9 +2377,9 @@@ void ceph_con_revoke_message(struct cep
  {
        mutex_lock(&con->mutex);
        if (con->in_msg && con->in_msg == msg) {
 -              unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
 -              unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
 -              unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
 +              unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
 +              unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
 +              unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
  
                /* skip rest of message */
                dout("con_revoke_pages %p msg %p revoked\n", con, msg);
diff --combined net/ceph/osd_client.c
index 1b0ef3c4d393c5221d30c15eb24b935ee292e3fc,b098e7b591f0f2f9e8e647be37073e2e3b3e64cf..1ffebed5ce0f9a629ad2733349b8e33c326850d5
@@@ -278,7 -278,7 +278,7 @@@ static void osd_req_encode_op(struct ce
  {
        dst->op = cpu_to_le16(src->op);
  
-       switch (dst->op) {
+       switch (src->op) {
        case CEPH_OSD_OP_READ:
        case CEPH_OSD_OP_WRITE:
                dst->extent.offset =
@@@ -664,11 -664,11 +664,11 @@@ static void put_osd(struct ceph_osd *os
  {
        dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
             atomic_read(&osd->o_ref) - 1);
-       if (atomic_dec_and_test(&osd->o_ref)) {
+       if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
                struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
  
-               if (osd->o_authorizer)
-                       ac->ops->destroy_authorizer(ac, osd->o_authorizer);
+               if (ac->ops && ac->ops->destroy_authorizer)
+                       ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
                kfree(osd);
        }
  }
@@@ -841,6 -841,12 +841,12 @@@ static void register_request(struct cep
  static void __unregister_request(struct ceph_osd_client *osdc,
                                 struct ceph_osd_request *req)
  {
+       if (RB_EMPTY_NODE(&req->r_node)) {
+               dout("__unregister_request %p tid %lld not registered\n",
+                       req, req->r_tid);
+               return;
+       }
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
        rb_erase(&req->r_node, &osdc->requests);
        osdc->num_requests--;
@@@ -1214,7 -1220,7 +1220,7 @@@ static void handle_reply(struct ceph_os
        }
  
        if (!req->r_got_reply) {
 -              unsigned bytes;
 +              unsigned int bytes;
  
                req->r_result = le32_to_cpu(rhead->result);
                bytes = le32_to_cpu(msg->hdr.data_len);
@@@ -2108,37 -2114,32 +2114,32 @@@ static void put_osd_con(struct ceph_con
  /*
   * authentication
   */
- static int get_authorizer(struct ceph_connection *con,
-                         void **buf, int *len, int *proto,
-                         void **reply_buf, int *reply_len, int force_new)
+ /*
+  * Note: returned pointer is the address of a structure that's
+  * managed separately.  Caller must *not* attempt to free it.
+  */
+ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
+                                       int *proto, int force_new)
  {
        struct ceph_osd *o = con->private;
        struct ceph_osd_client *osdc = o->o_osdc;
        struct ceph_auth_client *ac = osdc->client->monc.auth;
-       int ret = 0;
+       struct ceph_auth_handshake *auth = &o->o_auth;
  
-       if (force_new && o->o_authorizer) {
-               ac->ops->destroy_authorizer(ac, o->o_authorizer);
-               o->o_authorizer = NULL;
-       }
-       if (o->o_authorizer == NULL) {
-               ret = ac->ops->create_authorizer(
-                       ac, CEPH_ENTITY_TYPE_OSD,
-                       &o->o_authorizer,
-                       &o->o_authorizer_buf,
-                       &o->o_authorizer_buf_len,
-                       &o->o_authorizer_reply_buf,
-                       &o->o_authorizer_reply_buf_len);
+       if (force_new && auth->authorizer) {
+               if (ac->ops && ac->ops->destroy_authorizer)
+                       ac->ops->destroy_authorizer(ac, auth->authorizer);
+               auth->authorizer = NULL;
+       }
+       if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
+               int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
+                                                       auth);
                if (ret)
-                       return ret;
+                       return ERR_PTR(ret);
        }
        *proto = ac->protocol;
-       *buf = o->o_authorizer_buf;
-       *len = o->o_authorizer_buf_len;
-       *reply_buf = o->o_authorizer_reply_buf;
-       *reply_len = o->o_authorizer_reply_buf_len;
-       return 0;
+       return auth;
  }
  
  
@@@ -2148,7 -2149,11 +2149,11 @@@ static int verify_authorizer_reply(stru
        struct ceph_osd_client *osdc = o->o_osdc;
        struct ceph_auth_client *ac = osdc->client->monc.auth;
  
-       return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
+       /*
+        * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
+        * XXX which do we do:  succeed or fail?
+        */
+       return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
  }
  
  static int invalidate_authorizer(struct ceph_connection *con)
        struct ceph_osd_client *osdc = o->o_osdc;
        struct ceph_auth_client *ac = osdc->client->monc.auth;
  
-       if (ac->ops->invalidate_authorizer)
+       if (ac->ops && ac->ops->invalidate_authorizer)
                ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
  
        return ceph_monc_validate_auth(&osdc->client->monc);
diff --combined net/ceph/osdmap.c
index 56e561a690044ee88b7fe22d4ed51c09910c8e99,1892c523c43c5308dd75fa17901a6c3de6cf70cf..81e3b84a77efdecb6c44603e7784a083fe94b980
@@@ -38,7 -38,7 +38,7 @@@ done
  
  /* maps */
  
 -static int calc_bits_of(unsigned t)
 +static int calc_bits_of(unsigned int t)
  {
        int b = 0;
        while (t) {
@@@ -154,20 -154,13 +154,13 @@@ static struct crush_map *crush_decode(v
        magic = ceph_decode_32(p);
        if (magic != CRUSH_MAGIC) {
                pr_err("crush_decode magic %x != current %x\n",
 -                     (unsigned)magic, (unsigned)CRUSH_MAGIC);
 +                     (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
                goto bad;
        }
        c->max_buckets = ceph_decode_32(p);
        c->max_rules = ceph_decode_32(p);
        c->max_devices = ceph_decode_32(p);
  
-       c->device_parents = kcalloc(c->max_devices, sizeof(u32), GFP_NOFS);
-       if (c->device_parents == NULL)
-               goto badmem;
-       c->bucket_parents = kcalloc(c->max_buckets, sizeof(u32), GFP_NOFS);
-       if (c->bucket_parents == NULL)
-               goto badmem;
        c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
        if (c->buckets == NULL)
                goto badmem;
@@@ -460,7 -453,7 +453,7 @@@ static void __remove_pg_pool(struct rb_
  
  static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
  {
 -      unsigned n, m;
 +      unsigned int n, m;
  
        ceph_decode_copy(p, &pi->v, sizeof(pi->v));
        calc_pg_masks(pi);
@@@ -890,8 -883,12 +883,12 @@@ struct ceph_osdmap *osdmap_apply_increm
                pglen = ceph_decode_32(p);
  
                if (pglen) {
-                       /* insert */
                        ceph_decode_need(p, end, pglen*sizeof(u32), bad);
+                       /* removing existing (if any) */
+                       (void) __remove_pg_mapping(&map->pg_temp, pgid);
+                       /* insert */
                        pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
                        if (!pg) {
                                err = -ENOMEM;
@@@ -970,7 -967,7 +967,7 @@@ void ceph_calc_file_object_mapping(stru
        objsetno = stripeno / su_per_object;
  
        *ono = objsetno * sc + stripepos;
 -      dout("objset %u * sc %u = ono %u\n", objsetno, sc, (unsigned)*ono);
 +      dout("objset %u * sc %u = ono %u\n", objsetno, sc, (unsigned int)*ono);
  
        /* *oxoff = *off % layout->fl_stripe_unit;  # offset in su */
        t = off;
@@@ -998,12 -995,11 +995,11 @@@ int ceph_calc_object_layout(struct ceph
                            struct ceph_file_layout *fl,
                            struct ceph_osdmap *osdmap)
  {
 -      unsigned num, num_mask;
 +      unsigned int num, num_mask;
        struct ceph_pg pgid;
-       s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred);
        int poolid = le32_to_cpu(fl->fl_pg_pool);
        struct ceph_pg_pool_info *pool;
 -      unsigned ps;
 +      unsigned int ps;
  
        BUG_ON(!osdmap);
  
        if (!pool)
                return -EIO;
        ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid));
-       if (preferred >= 0) {
-               ps += preferred;
-               num = le32_to_cpu(pool->v.lpg_num);
-               num_mask = pool->lpg_num_mask;
-       } else {
-               num = le32_to_cpu(pool->v.pg_num);
-               num_mask = pool->pg_num_mask;
-       }
+       num = le32_to_cpu(pool->v.pg_num);
+       num_mask = pool->pg_num_mask;
  
        pgid.ps = cpu_to_le16(ps);
-       pgid.preferred = cpu_to_le16(preferred);
+       pgid.preferred = cpu_to_le16(-1);
        pgid.pool = fl->fl_pg_pool;
-       if (preferred >= 0)
-               dout("calc_object_layout '%s' pgid %d.%xp%d\n", oid, poolid, ps,
-                    (int)preferred);
-       else
-               dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
+       dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
  
        ol->ol_pgid = pgid;
        ol->ol_stripe_unit = fl->fl_object_stripe_unit;
@@@ -1045,24 -1031,18 +1031,18 @@@ static int *calc_pg_raw(struct ceph_osd
        struct ceph_pg_mapping *pg;
        struct ceph_pg_pool_info *pool;
        int ruleno;
-       unsigned int poolid, ps, pps, t;
-       int preferred;
 -      unsigned poolid, ps, pps, t, r;
++      unsigned int poolid, ps, pps, t, r;
  
        poolid = le32_to_cpu(pgid.pool);
        ps = le16_to_cpu(pgid.ps);
-       preferred = (s16)le16_to_cpu(pgid.preferred);
  
        pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
        if (!pool)
                return NULL;
  
        /* pg_temp? */
-       if (preferred >= 0)
-               t = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpg_num),
-                                   pool->lpgp_num_mask);
-       else
-               t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
-                                   pool->pgp_num_mask);
+       t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
+                           pool->pgp_num_mask);
        pgid.ps = cpu_to_le16(t);
        pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
        if (pg) {
                return NULL;
        }
  
-       /* don't forcefeed bad device ids to crush */
-       if (preferred >= osdmap->max_osd ||
-           preferred >= osdmap->crush->max_devices)
-               preferred = -1;
-       if (preferred >= 0)
-               pps = ceph_stable_mod(ps,
-                                     le32_to_cpu(pool->v.lpgp_num),
-                                     pool->lpgp_num_mask);
-       else
-               pps = ceph_stable_mod(ps,
-                                     le32_to_cpu(pool->v.pgp_num),
-                                     pool->pgp_num_mask);
+       pps = ceph_stable_mod(ps,
+                             le32_to_cpu(pool->v.pgp_num),
+                             pool->pgp_num_mask);
        pps += poolid;
-       *num = crush_do_rule(osdmap->crush, ruleno, pps, osds,
-                            min_t(int, pool->v.size, *num),
-                            preferred, osdmap->osd_weight);
+       r = crush_do_rule(osdmap->crush, ruleno, pps, osds,
+                         min_t(int, pool->v.size, *num),
+                         osdmap->osd_weight);
+       if (r < 0) {
+               pr_err("error %d from crush rule: pool %d ruleset %d type %d"
+                      " size %d\n", r, poolid, pool->v.crush_ruleset,
+                      pool->v.type, pool->v.size);
+               return NULL;
+       }
+       *num = r;
        return osds;
  }