Merge tag 'nfs-for-4.14-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 12 Sep 2017 05:01:44 +0000 (22:01 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 12 Sep 2017 05:01:44 +0000 (22:01 -0700)
Pull NFS client updates from Trond Myklebust:
 "Hightlights include:

  Stable bugfixes:
   - Fix mirror allocation in the writeback code to avoid a use after
     free
   - Fix the O_DSYNC writes to use the correct byte range
   - Fix 2 use after free issues in the I/O code

  Features:
   - Writeback fixes to split up the inode->i_lock in order to reduce
     contention
   - RPC client receive fixes to reduce the amount of time the
     xprt->transport_lock is held when receiving data from a socket into
     am XDR buffer.
   - Ditto fixes to reduce contention between call side users of the
     rdma rb_lock, and its use in rpcrdma_reply_handler.
   - Re-arrange rdma stats to reduce false cacheline sharing.
   - Various rdma cleanups and optimisations.
   - Refactor the NFSv4.1 exchange id code and clean up the code.
   - Const-ify all instances of struct rpc_xprt_ops

  Bugfixes:
   - Fix the NFSv2 'sec=' mount option.
   - NFSv4.1: don't use machine credentials for CLOSE when using
     'sec=sys'
   - Fix the NFSv3 GRANT callback when the port changes on the server.
   - Fix livelock issues with COMMIT
   - NFSv4: Use correct inode in _nfs4_opendata_to_nfs4_state() when
     doing and NFSv4.1 open by filehandle"

* tag 'nfs-for-4.14-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (69 commits)
  NFS: Count the bytes of skipped subrequests in nfs_lock_and_join_requests()
  NFS: Don't hold the group lock when calling nfs_release_request()
  NFS: Remove pnfs_generic_transfer_commit_list()
  NFS: nfs_lock_and_join_requests and nfs_scan_commit_list can deadlock
  NFS: Fix 2 use after free issues in the I/O code
  NFS: Sync the correct byte range during synchronous writes
  lockd: Delete an error message for a failed memory allocation in reclaimer()
  NFS: remove jiffies field from access cache
  NFS: flush data when locking a file to ensure cache coherence for mmap.
  SUNRPC: remove some dead code.
  NFS: don't expect errors from mempool_alloc().
  xprtrdma: Use xprt_pin_rqst in rpcrdma_reply_handler
  xprtrdma: Re-arrange struct rx_stats
  NFS: Fix NFSv2 security settings
  NFSv4.1: don't use machine credentials for CLOSE when using 'sec=sys'
  SUNRPC: ECONNREFUSED should cause a rebind.
  NFS: Remove unused parameter gfp_flags from nfs_pageio_init()
  NFSv4: Fix up mirror allocation
  SUNRPC: Add a separate spinlock to protect the RPC request receive list
  SUNRPC: Cleanup xs_tcp_read_common()
  ...

36 files changed:
fs/lockd/clntlock.c
fs/nfs/callback_proc.c
fs/nfs/delegation.c
fs/nfs/dir.c
fs/nfs/direct.c
fs/nfs/file.c
fs/nfs/inode.c
fs/nfs/internal.h
fs/nfs/nfs4_fs.h
fs/nfs/nfs4proc.c
fs/nfs/pagelist.c
fs/nfs/pnfs.c
fs/nfs/pnfs.h
fs/nfs/pnfs_nfs.c
fs/nfs/read.c
fs/nfs/super.c
fs/nfs/write.c
include/linux/nfs_fs.h
include/linux/nfs_page.h
include/linux/nfs_xdr.h
include/linux/sunrpc/sched.h
include/linux/sunrpc/xdr.h
include/linux/sunrpc/xprt.h
net/sunrpc/backchannel_rqst.c
net/sunrpc/clnt.c
net/sunrpc/svcsock.c
net/sunrpc/xprt.c
net/sunrpc/xprtrdma/backchannel.c
net/sunrpc/xprtrdma/fmr_ops.c
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/svc_rdma_backchannel.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h
net/sunrpc/xprtsock.c

index 27d577dbe51a48f21950a63758e76783fed67eca..96c1d14c18f12fd6d61e74344c6d28d3e6cf2730 100644 (file)
@@ -235,12 +235,8 @@ reclaimer(void *ptr)
        struct net *net = host->net;
 
        req = kmalloc(sizeof(*req), GFP_KERNEL);
-       if (!req) {
-               printk(KERN_ERR "lockd: reclaimer unable to alloc memory."
-                               " Locks for %s won't be reclaimed!\n",
-                               host->h_name);
+       if (!req)
                return 0;
-       }
 
        allow_signal(SIGKILL);
 
index 5427cdf04c5a1611934848c78455b299412dd185..14358de173fb9d851433977a0e70b8dcff9f0ff3 100644 (file)
@@ -51,7 +51,7 @@ __be32 nfs4_callback_getattr(void *argp, void *resp,
                goto out_iput;
        res->size = i_size_read(inode);
        res->change_attr = delegation->change_attr;
-       if (nfsi->nrequests != 0)
+       if (nfs_have_writebacks(inode))
                res->change_attr++;
        res->ctime = inode->i_ctime;
        res->mtime = inode->i_mtime;
index d7df5e67b0c1560ff3ea93cccd6a4b21e066cd88..606dd3871f66b0881c2760af3bbc3839748f9bc2 100644 (file)
@@ -1089,7 +1089,7 @@ bool nfs4_delegation_flush_on_close(const struct inode *inode)
        delegation = rcu_dereference(nfsi->delegation);
        if (delegation == NULL || !(delegation->type & FMODE_WRITE))
                goto out;
-       if (nfsi->nrequests < delegation->pagemod_limit)
+       if (atomic_long_read(&nfsi->nrequests) < delegation->pagemod_limit)
                ret = false;
 out:
        rcu_read_unlock();
index 3522b1249019ce261db090af01baf9525302f10d..5ceaeb1f6fb69d8ce15e932ff278a70837a48a30 100644 (file)
@@ -2260,7 +2260,6 @@ static int nfs_access_get_cached(struct inode *inode, struct rpc_cred *cred, str
                spin_lock(&inode->i_lock);
                retry = false;
        }
-       res->jiffies = cache->jiffies;
        res->cred = cache->cred;
        res->mask = cache->mask;
        list_move_tail(&cache->lru, &nfsi->access_cache_entry_lru);
@@ -2296,7 +2295,6 @@ static int nfs_access_get_cached_rcu(struct inode *inode, struct rpc_cred *cred,
                goto out;
        if (nfs_check_cache_invalid(inode, NFS_INO_INVALID_ACCESS))
                goto out;
-       res->jiffies = cache->jiffies;
        res->cred = cache->cred;
        res->mask = cache->mask;
        err = 0;
@@ -2344,7 +2342,6 @@ void nfs_access_add_cache(struct inode *inode, struct nfs_access_entry *set)
        if (cache == NULL)
                return;
        RB_CLEAR_NODE(&cache->rb_node);
-       cache->jiffies = set->jiffies;
        cache->cred = get_rpccred(set->cred);
        cache->mask = set->mask;
 
@@ -2432,7 +2429,6 @@ static int nfs_do_access(struct inode *inode, struct rpc_cred *cred, int mask)
        cache.mask = NFS_MAY_LOOKUP | NFS_MAY_EXECUTE
                     | NFS_MAY_WRITE | NFS_MAY_READ;
        cache.cred = cred;
-       cache.jiffies = jiffies;
        status = NFS_PROTO(inode)->access(inode, &cache);
        if (status != 0) {
                if (status == -ESTALE) {
index 6fb9fad2d1e6cf6909cfe6dfb2e482bff969e7df..d2972d5374695050cdc3598f6488510af87b3379 100644 (file)
@@ -616,13 +616,13 @@ nfs_direct_write_scan_commit_list(struct inode *inode,
                                  struct list_head *list,
                                  struct nfs_commit_info *cinfo)
 {
-       spin_lock(&cinfo->inode->i_lock);
+       mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
 #ifdef CONFIG_NFS_V4_1
        if (cinfo->ds != NULL && cinfo->ds->nwritten != 0)
                NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo);
 #endif
        nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
-       spin_unlock(&cinfo->inode->i_lock);
+       mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 }
 
 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
index af330c31f62752f22c6fe1dc6de5c57f0e02a3d3..a385d1c3f1465a619ad71adea85a01ac4c1ecd13 100644 (file)
@@ -631,11 +631,11 @@ ssize_t nfs_file_write(struct kiocb *iocb, struct iov_iter *from)
        if (result <= 0)
                goto out;
 
-       result = generic_write_sync(iocb, result);
-       if (result < 0)
-               goto out;
        written = result;
        iocb->ki_pos += written;
+       result = generic_write_sync(iocb, written);
+       if (result < 0)
+               goto out;
 
        /* Return error values */
        if (nfs_need_check_write(file, inode)) {
@@ -744,15 +744,18 @@ do_setlk(struct file *filp, int cmd, struct file_lock *fl, int is_local)
                goto out;
 
        /*
-        * Revalidate the cache if the server has time stamps granular
-        * enough to detect subsecond changes.  Otherwise, clear the
-        * cache to prevent missing any changes.
+        * Invalidate cache to prevent missing any changes.  If
+        * the file is mapped, clear the page cache as well so
+        * those mappings will be loaded.
         *
         * This makes locking act as a cache coherency point.
         */
        nfs_sync_mapping(filp->f_mapping);
-       if (!NFS_PROTO(inode)->have_delegation(inode, FMODE_READ))
+       if (!NFS_PROTO(inode)->have_delegation(inode, FMODE_READ)) {
                nfs_zap_caches(inode);
+               if (mapping_mapped(filp->f_mapping))
+                       nfs_revalidate_mapping(inode, filp->f_mapping);
+       }
 out:
        return status;
 }
index 109279d6d91bd61421ef572bbcaa1dfe9fa0e9c8..134d9f560240f3ba1183a9ab4173a81ff21f4b63 100644 (file)
@@ -1285,7 +1285,6 @@ static bool nfs_file_has_buffered_writers(struct nfs_inode *nfsi)
 
 static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 {
-       struct nfs_inode *nfsi = NFS_I(inode);
        unsigned long ret = 0;
 
        if ((fattr->valid & NFS_ATTR_FATTR_PRECHANGE)
@@ -1315,7 +1314,7 @@ static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr
        if ((fattr->valid & NFS_ATTR_FATTR_PRESIZE)
                        && (fattr->valid & NFS_ATTR_FATTR_SIZE)
                        && i_size_read(inode) == nfs_size_to_loff_t(fattr->pre_size)
-                       && nfsi->nrequests == 0) {
+                       && !nfs_have_writebacks(inode)) {
                i_size_write(inode, nfs_size_to_loff_t(fattr->size));
                ret |= NFS_INO_INVALID_ATTR;
        }
@@ -1823,7 +1822,7 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
                if (new_isize != cur_isize) {
                        /* Do we perhaps have any outstanding writes, or has
                         * the file grown beyond our last write? */
-                       if (nfsi->nrequests == 0 || new_isize > cur_isize) {
+                       if (!nfs_have_writebacks(inode) || new_isize > cur_isize) {
                                i_size_write(inode, new_isize);
                                if (!have_writers)
                                        invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA;
@@ -2012,10 +2011,11 @@ static void init_once(void *foo)
        INIT_LIST_HEAD(&nfsi->access_cache_entry_lru);
        INIT_LIST_HEAD(&nfsi->access_cache_inode_lru);
        INIT_LIST_HEAD(&nfsi->commit_info.list);
-       nfsi->nrequests = 0;
-       nfsi->commit_info.ncommit = 0;
+       atomic_long_set(&nfsi->nrequests, 0);
+       atomic_long_set(&nfsi->commit_info.ncommit, 0);
        atomic_set(&nfsi->commit_info.rpcs_out, 0);
        init_rwsem(&nfsi->rmdir_sem);
+       mutex_init(&nfsi->commit_mutex);
        nfs4_init_once(nfsi);
 }
 
index dc456416d2becc960678625da5712a9992595bc4..68cc22083639bb8c90ec8829a713c27a55eef75f 100644 (file)
@@ -251,7 +251,6 @@ int nfs_iocounter_wait(struct nfs_lock_context *l_ctx);
 extern const struct nfs_pageio_ops nfs_pgio_rw_ops;
 struct nfs_pgio_header *nfs_pgio_header_alloc(const struct nfs_rw_ops *);
 void nfs_pgio_header_free(struct nfs_pgio_header *);
-void nfs_pgio_data_destroy(struct nfs_pgio_header *);
 int nfs_generic_pgio(struct nfs_pageio_descriptor *, struct nfs_pgio_header *);
 int nfs_initiate_pgio(struct rpc_clnt *clnt, struct nfs_pgio_header *hdr,
                      struct rpc_cred *cred, const struct nfs_rpc_ops *rpc_ops,
index 40bd05f05e743f01228d09b7d9d23d8fbe7bb213..ac4f10b7f6c1beafc5027ad02a53b33e091f32d0 100644 (file)
@@ -303,6 +303,17 @@ _nfs4_state_protect(struct nfs_client *clp, unsigned long sp4_mode,
        struct rpc_cred *newcred = NULL;
        rpc_authflavor_t flavor;
 
+       if (sp4_mode == NFS_SP4_MACH_CRED_CLEANUP ||
+           sp4_mode == NFS_SP4_MACH_CRED_PNFS_CLEANUP) {
+               /* Using machine creds for cleanup operations
+                * is only relevent if the client credentials
+                * might expire. So don't bother for
+                * RPC_AUTH_UNIX.  If file was only exported to
+                * sec=sys, the PUTFH would fail anyway.
+                */
+               if ((*clntp)->cl_auth->au_flavor == RPC_AUTH_UNIX)
+                       return false;
+       }
        if (test_bit(sp4_mode, &clp->cl_sp4_flags)) {
                spin_lock(&clp->cl_lock);
                if (clp->cl_machine_cred != NULL)
index d901326423401c3e7d442f62bfa80d03d281ed02..6c61e2b996351cde05b1c922674cd3a2670913ea 100644 (file)
@@ -1659,12 +1659,52 @@ update:
        return state;
 }
 
+static struct inode *
+nfs4_opendata_get_inode(struct nfs4_opendata *data)
+{
+       struct inode *inode;
+
+       switch (data->o_arg.claim) {
+       case NFS4_OPEN_CLAIM_NULL:
+       case NFS4_OPEN_CLAIM_DELEGATE_CUR:
+       case NFS4_OPEN_CLAIM_DELEGATE_PREV:
+               if (!(data->f_attr.valid & NFS_ATTR_FATTR))
+                       return ERR_PTR(-EAGAIN);
+               inode = nfs_fhget(data->dir->d_sb, &data->o_res.fh,
+                               &data->f_attr, data->f_label);
+               break;
+       default:
+               inode = d_inode(data->dentry);
+               ihold(inode);
+               nfs_refresh_inode(inode, &data->f_attr);
+       }
+       return inode;
+}
+
 static struct nfs4_state *
-_nfs4_opendata_to_nfs4_state(struct nfs4_opendata *data)
+nfs4_opendata_find_nfs4_state(struct nfs4_opendata *data)
 {
+       struct nfs4_state *state;
        struct inode *inode;
-       struct nfs4_state *state = NULL;
-       int ret;
+
+       inode = nfs4_opendata_get_inode(data);
+       if (IS_ERR(inode))
+               return ERR_CAST(inode);
+       if (data->state != NULL && data->state->inode == inode) {
+               state = data->state;
+               atomic_inc(&state->count);
+       } else
+               state = nfs4_get_open_state(inode, data->owner);
+       iput(inode);
+       if (state == NULL)
+               state = ERR_PTR(-ENOMEM);
+       return state;
+}
+
+static struct nfs4_state *
+_nfs4_opendata_to_nfs4_state(struct nfs4_opendata *data)
+{
+       struct nfs4_state *state;
 
        if (!data->rpc_done) {
                state = nfs4_try_open_cached(data);
@@ -1672,29 +1712,17 @@ _nfs4_opendata_to_nfs4_state(struct nfs4_opendata *data)
                goto out;
        }
 
-       ret = -EAGAIN;
-       if (!(data->f_attr.valid & NFS_ATTR_FATTR))
-               goto err;
-       inode = nfs_fhget(data->dir->d_sb, &data->o_res.fh, &data->f_attr, data->f_label);
-       ret = PTR_ERR(inode);
-       if (IS_ERR(inode))
-               goto err;
-       ret = -ENOMEM;
-       state = nfs4_get_open_state(inode, data->owner);
-       if (state == NULL)
-               goto err_put_inode;
+       state = nfs4_opendata_find_nfs4_state(data);
+       if (IS_ERR(state))
+               goto out;
+
        if (data->o_res.delegation_type != 0)
                nfs4_opendata_check_deleg(data, state);
        update_open_stateid(state, &data->o_res.stateid, NULL,
                        data->o_arg.fmode);
-       iput(inode);
 out:
        nfs_release_seqid(data->o_arg.seqid);
        return state;
-err_put_inode:
-       iput(inode);
-err:
-       return ERR_PTR(ret);
 }
 
 static struct nfs4_state *
@@ -2071,7 +2099,6 @@ static void nfs4_open_prepare(struct rpc_task *task, void *calldata)
                data->o_arg.open_bitmap = &nfs4_open_noattr_bitmap[0];
        case NFS4_OPEN_CLAIM_FH:
                task->tk_msg.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_OPEN_NOATTR];
-               nfs_copy_fh(&data->o_res.fh, data->o_arg.fh);
        }
        data->timestamp = jiffies;
        if (nfs4_setup_sequence(data->o_arg.server->nfs_client,
@@ -2258,7 +2285,6 @@ static int nfs4_opendata_access(struct rpc_cred *cred,
                mask = NFS4_ACCESS_READ;
 
        cache.cred = cred;
-       cache.jiffies = jiffies;
        nfs_access_set_mask(&cache, opendata->o_res.access_result);
        nfs_access_add_cache(state->inode, &cache);
 
@@ -7318,7 +7344,9 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
                      1 << (OP_DESTROY_SESSION - 32) |
                      1 << (OP_DESTROY_CLIENTID - 32)
        };
+       unsigned long flags = 0;
        unsigned int i;
+       int ret = 0;
 
        if (sp->how == SP4_MACH_CRED) {
                /* Print state protect result */
@@ -7334,7 +7362,8 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
                for (i = 0; i < NFS4_OP_MAP_NUM_WORDS; i++) {
                        if (sp->enforce.u.words[i] & ~supported_enforce[i]) {
                                dfprintk(MOUNT, "sp4_mach_cred: disabled\n");
-                               return -EINVAL;
+                               ret = -EINVAL;
+                               goto out;
                        }
                }
 
@@ -7353,10 +7382,11 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
                    test_bit(OP_DESTROY_CLIENTID, sp->enforce.u.longs)) {
                        dfprintk(MOUNT, "sp4_mach_cred:\n");
                        dfprintk(MOUNT, "  minimal mode enabled\n");
-                       set_bit(NFS_SP4_MACH_CRED_MINIMAL, &clp->cl_sp4_flags);
+                       __set_bit(NFS_SP4_MACH_CRED_MINIMAL, &flags);
                } else {
                        dfprintk(MOUNT, "sp4_mach_cred: disabled\n");
-                       return -EINVAL;
+                       ret = -EINVAL;
+                       goto out;
                }
 
                if (test_bit(OP_CLOSE, sp->allow.u.longs) &&
@@ -7364,110 +7394,46 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
                    test_bit(OP_DELEGRETURN, sp->allow.u.longs) &&
                    test_bit(OP_LOCKU, sp->allow.u.longs)) {
                        dfprintk(MOUNT, "  cleanup mode enabled\n");
-                       set_bit(NFS_SP4_MACH_CRED_CLEANUP, &clp->cl_sp4_flags);
+                       __set_bit(NFS_SP4_MACH_CRED_CLEANUP, &flags);
                }
 
                if (test_bit(OP_LAYOUTRETURN, sp->allow.u.longs)) {
                        dfprintk(MOUNT, "  pnfs cleanup mode enabled\n");
-                       set_bit(NFS_SP4_MACH_CRED_PNFS_CLEANUP,
-                               &clp->cl_sp4_flags);
+                       __set_bit(NFS_SP4_MACH_CRED_PNFS_CLEANUP, &flags);
                }
 
                if (test_bit(OP_SECINFO, sp->allow.u.longs) &&
                    test_bit(OP_SECINFO_NO_NAME, sp->allow.u.longs)) {
                        dfprintk(MOUNT, "  secinfo mode enabled\n");
-                       set_bit(NFS_SP4_MACH_CRED_SECINFO, &clp->cl_sp4_flags);
+                       __set_bit(NFS_SP4_MACH_CRED_SECINFO, &flags);
                }
 
                if (test_bit(OP_TEST_STATEID, sp->allow.u.longs) &&
                    test_bit(OP_FREE_STATEID, sp->allow.u.longs)) {
                        dfprintk(MOUNT, "  stateid mode enabled\n");
-                       set_bit(NFS_SP4_MACH_CRED_STATEID, &clp->cl_sp4_flags);
+                       __set_bit(NFS_SP4_MACH_CRED_STATEID, &flags);
                }
 
                if (test_bit(OP_WRITE, sp->allow.u.longs)) {
                        dfprintk(MOUNT, "  write mode enabled\n");
-                       set_bit(NFS_SP4_MACH_CRED_WRITE, &clp->cl_sp4_flags);
+                       __set_bit(NFS_SP4_MACH_CRED_WRITE, &flags);
                }
 
                if (test_bit(OP_COMMIT, sp->allow.u.longs)) {
                        dfprintk(MOUNT, "  commit mode enabled\n");
-                       set_bit(NFS_SP4_MACH_CRED_COMMIT, &clp->cl_sp4_flags);
+                       __set_bit(NFS_SP4_MACH_CRED_COMMIT, &flags);
                }
        }
-
+out:
+       clp->cl_sp4_flags = flags;
        return 0;
 }
 
 struct nfs41_exchange_id_data {
        struct nfs41_exchange_id_res res;
        struct nfs41_exchange_id_args args;
-       struct rpc_xprt *xprt;
-       int rpc_status;
 };
 
-static void nfs4_exchange_id_done(struct rpc_task *task, void *data)
-{
-       struct nfs41_exchange_id_data *cdata =
-                                       (struct nfs41_exchange_id_data *)data;
-       struct nfs_client *clp = cdata->args.client;
-       int status = task->tk_status;
-
-       trace_nfs4_exchange_id(clp, status);
-
-       if (status == 0)
-               status = nfs4_check_cl_exchange_flags(cdata->res.flags);
-
-       if (cdata->xprt && status == 0) {
-               status = nfs4_detect_session_trunking(clp, &cdata->res,
-                                                     cdata->xprt);
-               goto out;
-       }
-
-       if (status  == 0)
-               status = nfs4_sp4_select_mode(clp, &cdata->res.state_protect);
-
-       if (status == 0) {
-               clp->cl_clientid = cdata->res.clientid;
-               clp->cl_exchange_flags = cdata->res.flags;
-               clp->cl_seqid = cdata->res.seqid;
-               /* Client ID is not confirmed */
-               if (!(cdata->res.flags & EXCHGID4_FLAG_CONFIRMED_R))
-                       clear_bit(NFS4_SESSION_ESTABLISHED,
-                                 &clp->cl_session->session_state);
-
-               kfree(clp->cl_serverowner);
-               clp->cl_serverowner = cdata->res.server_owner;
-               cdata->res.server_owner = NULL;
-
-               /* use the most recent implementation id */
-               kfree(clp->cl_implid);
-               clp->cl_implid = cdata->res.impl_id;
-               cdata->res.impl_id = NULL;
-
-               if (clp->cl_serverscope != NULL &&
-                   !nfs41_same_server_scope(clp->cl_serverscope,
-                                       cdata->res.server_scope)) {
-                       dprintk("%s: server_scope mismatch detected\n",
-                               __func__);
-                       set_bit(NFS4CLNT_SERVER_SCOPE_MISMATCH, &clp->cl_state);
-                       kfree(clp->cl_serverscope);
-                       clp->cl_serverscope = NULL;
-               }
-
-               if (clp->cl_serverscope == NULL) {
-                       clp->cl_serverscope = cdata->res.server_scope;
-                       cdata->res.server_scope = NULL;
-               }
-               /* Save the EXCHANGE_ID verifier session trunk tests */
-               memcpy(clp->cl_confirm.data, cdata->args.verifier.data,
-                      sizeof(clp->cl_confirm.data));
-       }
-out:
-       cdata->rpc_status = status;
-       return;
-}
-
 static void nfs4_exchange_id_release(void *data)
 {
        struct nfs41_exchange_id_data *cdata =
@@ -7481,7 +7447,6 @@ static void nfs4_exchange_id_release(void *data)
 }
 
 static const struct rpc_call_ops nfs4_exchange_id_call_ops = {
-       .rpc_call_done = nfs4_exchange_id_done,
        .rpc_release = nfs4_exchange_id_release,
 };
 
@@ -7490,7 +7455,8 @@ static const struct rpc_call_ops nfs4_exchange_id_call_ops = {
  *
  * Wrapper for EXCHANGE_ID operation.
  */
-static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
+static struct rpc_task *
+nfs4_run_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
                        u32 sp4_how, struct rpc_xprt *xprt)
 {
        struct rpc_message msg = {
@@ -7504,17 +7470,15 @@ static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
                .flags = RPC_TASK_TIMEOUT,
        };
        struct nfs41_exchange_id_data *calldata;
-       struct rpc_task *task;
        int status;
 
        if (!atomic_inc_not_zero(&clp->cl_count))
-               return -EIO;
+               return ERR_PTR(-EIO);
 
+       status = -ENOMEM;
        calldata = kzalloc(sizeof(*calldata), GFP_NOFS);
-       if (!calldata) {
-               nfs_put_client(clp);
-               return -ENOMEM;
-       }
+       if (!calldata)
+               goto out;
 
        nfs4_init_boot_verifier(clp, &calldata->args.verifier);
 
@@ -7553,34 +7517,22 @@ static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
                goto out_impl_id;
        }
        if (xprt) {
-               calldata->xprt = xprt;
                task_setup_data.rpc_xprt = xprt;
                task_setup_data.flags |= RPC_TASK_SOFTCONN;
                memcpy(calldata->args.verifier.data, clp->cl_confirm.data,
                                sizeof(calldata->args.verifier.data));
        }
        calldata->args.client = clp;
-#ifdef CONFIG_NFS_V4_1_MIGRATION
        calldata->args.flags = EXCHGID4_FLAG_SUPP_MOVED_REFER |
-       EXCHGID4_FLAG_BIND_PRINC_STATEID |
-       EXCHGID4_FLAG_SUPP_MOVED_MIGR,
-#else
-       calldata->args.flags = EXCHGID4_FLAG_SUPP_MOVED_REFER |
-       EXCHGID4_FLAG_BIND_PRINC_STATEID,
+       EXCHGID4_FLAG_BIND_PRINC_STATEID;
+#ifdef CONFIG_NFS_V4_1_MIGRATION
+       calldata->args.flags |= EXCHGID4_FLAG_SUPP_MOVED_MIGR;
 #endif
        msg.rpc_argp = &calldata->args;
        msg.rpc_resp = &calldata->res;
        task_setup_data.callback_data = calldata;
 
-       task = rpc_run_task(&task_setup_data);
-       if (IS_ERR(task))
-               return PTR_ERR(task);
-
-       status = calldata->rpc_status;
-
-       rpc_put_task(task);
-out:
-       return status;
+       return rpc_run_task(&task_setup_data);
 
 out_impl_id:
        kfree(calldata->res.impl_id);
@@ -7590,8 +7542,69 @@ out_server_owner:
        kfree(calldata->res.server_owner);
 out_calldata:
        kfree(calldata);
+out:
        nfs_put_client(clp);
-       goto out;
+       return ERR_PTR(status);
+}
+
+/*
+ * _nfs4_proc_exchange_id()
+ *
+ * Wrapper for EXCHANGE_ID operation.
+ */
+static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
+                       u32 sp4_how)
+{
+       struct rpc_task *task;
+       struct nfs41_exchange_id_args *argp;
+       struct nfs41_exchange_id_res *resp;
+       int status;
+
+       task = nfs4_run_exchange_id(clp, cred, sp4_how, NULL);
+       if (IS_ERR(task))
+               return PTR_ERR(task);
+
+       argp = task->tk_msg.rpc_argp;
+       resp = task->tk_msg.rpc_resp;
+       status = task->tk_status;
+       if (status  != 0)
+               goto out;
+
+       status = nfs4_check_cl_exchange_flags(resp->flags);
+       if (status  != 0)
+               goto out;
+
+       status = nfs4_sp4_select_mode(clp, &resp->state_protect);
+       if (status != 0)
+               goto out;
+
+       clp->cl_clientid = resp->clientid;
+       clp->cl_exchange_flags = resp->flags;
+       clp->cl_seqid = resp->seqid;
+       /* Client ID is not confirmed */
+       if (!(resp->flags & EXCHGID4_FLAG_CONFIRMED_R))
+               clear_bit(NFS4_SESSION_ESTABLISHED,
+                         &clp->cl_session->session_state);
+
+       if (clp->cl_serverscope != NULL &&
+           !nfs41_same_server_scope(clp->cl_serverscope,
+                               resp->server_scope)) {
+               dprintk("%s: server_scope mismatch detected\n",
+                       __func__);
+               set_bit(NFS4CLNT_SERVER_SCOPE_MISMATCH, &clp->cl_state);
+       }
+
+       swap(clp->cl_serverowner, resp->server_owner);
+       swap(clp->cl_serverscope, resp->server_scope);
+       swap(clp->cl_implid, resp->impl_id);
+
+       /* Save the EXCHANGE_ID verifier session trunk tests */
+       memcpy(clp->cl_confirm.data, argp->verifier.data,
+              sizeof(clp->cl_confirm.data));
+out:
+       trace_nfs4_exchange_id(clp, status);
+       rpc_put_task(task);
+       return status;
 }
 
 /*
@@ -7614,13 +7627,13 @@ int nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred)
        /* try SP4_MACH_CRED if krb5i/p */
        if (authflavor == RPC_AUTH_GSS_KRB5I ||
            authflavor == RPC_AUTH_GSS_KRB5P) {
-               status = _nfs4_proc_exchange_id(clp, cred, SP4_MACH_CRED, NULL);
+               status = _nfs4_proc_exchange_id(clp, cred, SP4_MACH_CRED);
                if (!status)
                        return 0;
        }
 
        /* try SP4_NONE */
-       return _nfs4_proc_exchange_id(clp, cred, SP4_NONE, NULL);
+       return _nfs4_proc_exchange_id(clp, cred, SP4_NONE);
 }
 
 /**
@@ -7642,6 +7655,9 @@ int nfs4_test_session_trunk(struct rpc_clnt *clnt, struct rpc_xprt *xprt,
                            void *data)
 {
        struct nfs4_add_xprt_data *adata = (struct nfs4_add_xprt_data *)data;
+       struct rpc_task *task;
+       int status;
+
        u32 sp4_how;
 
        dprintk("--> %s try %s\n", __func__,
@@ -7650,7 +7666,17 @@ int nfs4_test_session_trunk(struct rpc_clnt *clnt, struct rpc_xprt *xprt,
        sp4_how = (adata->clp->cl_sp4_flags == 0 ? SP4_NONE : SP4_MACH_CRED);
 
        /* Test connection for session trunking. Async exchange_id call */
-       return  _nfs4_proc_exchange_id(adata->clp, adata->cred, sp4_how, xprt);
+       task = nfs4_run_exchange_id(adata->clp, adata->cred, sp4_how, xprt);
+       if (IS_ERR(task))
+               return PTR_ERR(task);
+
+       status = task->tk_status;
+       if (status == 0)
+               status = nfs4_detect_session_trunking(adata->clp,
+                               task->tk_msg.rpc_resp, xprt);
+
+       rpc_put_task(task);
+       return status;
 }
 EXPORT_SYMBOL_GPL(nfs4_test_session_trunk);
 
index de9066a92c0d27cb9e397233fb5b92116c8dc127..bec120ec1967c4f649543852f87b5c3c3e02d8fb 100644 (file)
@@ -134,19 +134,14 @@ EXPORT_SYMBOL_GPL(nfs_async_iocounter_wait);
 /*
  * nfs_page_group_lock - lock the head of the page group
  * @req - request in group that is to be locked
- * @nonblock - if true don't block waiting for lock
  *
- * this lock must be held if modifying the page group list
+ * this lock must be held when traversing or modifying the page
+ * group list
  *
- * return 0 on success, < 0 on error: -EDELAY if nonblocking or the
- * result from wait_on_bit_lock
- *
- * NOTE: calling with nonblock=false should always have set the
- *       lock bit (see fs/buffer.c and other uses of wait_on_bit_lock
- *       with TASK_UNINTERRUPTIBLE), so there is no need to check the result.
+ * return 0 on success, < 0 on error
  */
 int
-nfs_page_group_lock(struct nfs_page *req, bool nonblock)
+nfs_page_group_lock(struct nfs_page *req)
 {
        struct nfs_page *head = req->wb_head;
 
@@ -155,35 +150,10 @@ nfs_page_group_lock(struct nfs_page *req, bool nonblock)
        if (!test_and_set_bit(PG_HEADLOCK, &head->wb_flags))
                return 0;
 
-       if (!nonblock) {
-               set_bit(PG_CONTENDED1, &head->wb_flags);
-               smp_mb__after_atomic();
-               return wait_on_bit_lock(&head->wb_flags, PG_HEADLOCK,
-                               TASK_UNINTERRUPTIBLE);
-       }
-
-       return -EAGAIN;
-}
-
-/*
- * nfs_page_group_lock_wait - wait for the lock to clear, but don't grab it
- * @req - a request in the group
- *
- * This is a blocking call to wait for the group lock to be cleared.
- */
-void
-nfs_page_group_lock_wait(struct nfs_page *req)
-{
-       struct nfs_page *head = req->wb_head;
-
-       WARN_ON_ONCE(head != head->wb_head);
-
-       if (!test_bit(PG_HEADLOCK, &head->wb_flags))
-               return;
        set_bit(PG_CONTENDED1, &head->wb_flags);
        smp_mb__after_atomic();
-       wait_on_bit(&head->wb_flags, PG_HEADLOCK,
-               TASK_UNINTERRUPTIBLE);
+       return wait_on_bit_lock(&head->wb_flags, PG_HEADLOCK,
+                               TASK_UNINTERRUPTIBLE);
 }
 
 /*
@@ -246,7 +216,7 @@ bool nfs_page_group_sync_on_bit(struct nfs_page *req, unsigned int bit)
 {
        bool ret;
 
-       nfs_page_group_lock(req, false);
+       nfs_page_group_lock(req);
        ret = nfs_page_group_sync_on_bit_locked(req, bit);
        nfs_page_group_unlock(req);
 
@@ -288,9 +258,7 @@ nfs_page_group_init(struct nfs_page *req, struct nfs_page *prev)
                        inode = page_file_mapping(req->wb_page)->host;
                        set_bit(PG_INODE_REF, &req->wb_flags);
                        kref_get(&req->wb_kref);
-                       spin_lock(&inode->i_lock);
-                       NFS_I(inode)->nrequests++;
-                       spin_unlock(&inode->i_lock);
+                       atomic_long_inc(&NFS_I(inode)->nrequests);
                }
        }
 }
@@ -306,14 +274,11 @@ static void
 nfs_page_group_destroy(struct kref *kref)
 {
        struct nfs_page *req = container_of(kref, struct nfs_page, wb_kref);
+       struct nfs_page *head = req->wb_head;
        struct nfs_page *tmp, *next;
 
-       /* subrequests must release the ref on the head request */
-       if (req->wb_head != req)
-               nfs_release_request(req->wb_head);
-
        if (!nfs_page_group_sync_on_bit(req, PG_TEARDOWN))
-               return;
+               goto out;
 
        tmp = req;
        do {
@@ -324,6 +289,10 @@ nfs_page_group_destroy(struct kref *kref)
                nfs_free_request(tmp);
                tmp = next;
        } while (tmp != req);
+out:
+       /* subrequests must release the ref on the head request */
+       if (head != req)
+               nfs_release_request(head);
 }
 
 /**
@@ -465,6 +434,7 @@ void nfs_release_request(struct nfs_page *req)
 {
        kref_put(&req->wb_kref, nfs_page_group_destroy);
 }
+EXPORT_SYMBOL_GPL(nfs_release_request);
 
 /**
  * nfs_wait_on_request - Wait for a request to complete.
@@ -483,6 +453,7 @@ nfs_wait_on_request(struct nfs_page *req)
        return wait_on_bit_io(&req->wb_flags, PG_BUSY,
                              TASK_UNINTERRUPTIBLE);
 }
+EXPORT_SYMBOL_GPL(nfs_wait_on_request);
 
 /*
  * nfs_generic_pg_test - determine if requests can be coalesced
@@ -530,16 +501,6 @@ struct nfs_pgio_header *nfs_pgio_header_alloc(const struct nfs_rw_ops *ops)
 }
 EXPORT_SYMBOL_GPL(nfs_pgio_header_alloc);
 
-/*
- * nfs_pgio_header_free - Free a read or write header
- * @hdr: The header to free
- */
-void nfs_pgio_header_free(struct nfs_pgio_header *hdr)
-{
-       hdr->rw_ops->rw_free_header(hdr);
-}
-EXPORT_SYMBOL_GPL(nfs_pgio_header_free);
-
 /**
  * nfs_pgio_data_destroy - make @hdr suitable for reuse
  *
@@ -548,14 +509,24 @@ EXPORT_SYMBOL_GPL(nfs_pgio_header_free);
  *
  * @hdr: A header that has had nfs_generic_pgio called
  */
-void nfs_pgio_data_destroy(struct nfs_pgio_header *hdr)
+static void nfs_pgio_data_destroy(struct nfs_pgio_header *hdr)
 {
        if (hdr->args.context)
                put_nfs_open_context(hdr->args.context);
        if (hdr->page_array.pagevec != hdr->page_array.page_array)
                kfree(hdr->page_array.pagevec);
 }
-EXPORT_SYMBOL_GPL(nfs_pgio_data_destroy);
+
+/*
+ * nfs_pgio_header_free - Free a read or write header
+ * @hdr: The header to free
+ */
+void nfs_pgio_header_free(struct nfs_pgio_header *hdr)
+{
+       nfs_pgio_data_destroy(hdr);
+       hdr->rw_ops->rw_free_header(hdr);
+}
+EXPORT_SYMBOL_GPL(nfs_pgio_header_free);
 
 /**
  * nfs_pgio_rpcsetup - Set up arguments for a pageio call
@@ -669,7 +640,6 @@ EXPORT_SYMBOL_GPL(nfs_initiate_pgio);
 static void nfs_pgio_error(struct nfs_pgio_header *hdr)
 {
        set_bit(NFS_IOHDR_REDO, &hdr->flags);
-       nfs_pgio_data_destroy(hdr);
        hdr->completion_ops->completion(hdr);
 }
 
@@ -680,7 +650,6 @@ static void nfs_pgio_error(struct nfs_pgio_header *hdr)
 static void nfs_pgio_release(void *calldata)
 {
        struct nfs_pgio_header *hdr = calldata;
-       nfs_pgio_data_destroy(hdr);
        hdr->completion_ops->completion(hdr);
 }
 
@@ -711,12 +680,8 @@ void nfs_pageio_init(struct nfs_pageio_descriptor *desc,
                     const struct nfs_pgio_completion_ops *compl_ops,
                     const struct nfs_rw_ops *rw_ops,
                     size_t bsize,
-                    int io_flags,
-                    gfp_t gfp_flags)
+                    int io_flags)
 {
-       struct nfs_pgio_mirror *new;
-       int i;
-
        desc->pg_moreio = 0;
        desc->pg_inode = inode;
        desc->pg_ops = pg_ops;
@@ -732,23 +697,10 @@ void nfs_pageio_init(struct nfs_pageio_descriptor *desc,
        desc->pg_mirror_count = 1;
        desc->pg_mirror_idx = 0;
 
-       if (pg_ops->pg_get_mirror_count) {
-               /* until we have a request, we don't have an lseg and no
-                * idea how many mirrors there will be */
-               new = kcalloc(NFS_PAGEIO_DESCRIPTOR_MIRROR_MAX,
-                             sizeof(struct nfs_pgio_mirror), gfp_flags);
-               desc->pg_mirrors_dynamic = new;
-               desc->pg_mirrors = new;
-
-               for (i = 0; i < NFS_PAGEIO_DESCRIPTOR_MIRROR_MAX; i++)
-                       nfs_pageio_mirror_init(&desc->pg_mirrors[i], bsize);
-       } else {
-               desc->pg_mirrors_dynamic = NULL;
-               desc->pg_mirrors = desc->pg_mirrors_static;
-               nfs_pageio_mirror_init(&desc->pg_mirrors[0], bsize);
-       }
+       desc->pg_mirrors_dynamic = NULL;
+       desc->pg_mirrors = desc->pg_mirrors_static;
+       nfs_pageio_mirror_init(&desc->pg_mirrors[0], bsize);
 }
-EXPORT_SYMBOL_GPL(nfs_pageio_init);
 
 /**
  * nfs_pgio_result - Basic pageio error handling
@@ -865,32 +817,52 @@ static int nfs_generic_pg_pgios(struct nfs_pageio_descriptor *desc)
        return ret;
 }
 
+static struct nfs_pgio_mirror *
+nfs_pageio_alloc_mirrors(struct nfs_pageio_descriptor *desc,
+               unsigned int mirror_count)
+{
+       struct nfs_pgio_mirror *ret;
+       unsigned int i;
+
+       kfree(desc->pg_mirrors_dynamic);
+       desc->pg_mirrors_dynamic = NULL;
+       if (mirror_count == 1)
+               return desc->pg_mirrors_static;
+       ret = kmalloc_array(mirror_count, sizeof(*ret), GFP_NOFS);
+       if (ret != NULL) {
+               for (i = 0; i < mirror_count; i++)
+                       nfs_pageio_mirror_init(&ret[i], desc->pg_bsize);
+               desc->pg_mirrors_dynamic = ret;
+       }
+       return ret;
+}
+
 /*
  * nfs_pageio_setup_mirroring - determine if mirroring is to be used
  *                             by calling the pg_get_mirror_count op
  */
-static int nfs_pageio_setup_mirroring(struct nfs_pageio_descriptor *pgio,
+static void nfs_pageio_setup_mirroring(struct nfs_pageio_descriptor *pgio,
                                       struct nfs_page *req)
 {
-       int mirror_count = 1;
-
-       if (!pgio->pg_ops->pg_get_mirror_count)
-               return 0;
+       unsigned int mirror_count = 1;
 
-       mirror_count = pgio->pg_ops->pg_get_mirror_count(pgio, req);
-
-       if (pgio->pg_error < 0)
-               return pgio->pg_error;
-
-       if (!mirror_count || mirror_count > NFS_PAGEIO_DESCRIPTOR_MIRROR_MAX)
-               return -EINVAL;
+       if (pgio->pg_ops->pg_get_mirror_count)
+               mirror_count = pgio->pg_ops->pg_get_mirror_count(pgio, req);
+       if (mirror_count == pgio->pg_mirror_count || pgio->pg_error < 0)
+               return;
 
-       if (WARN_ON_ONCE(!pgio->pg_mirrors_dynamic))
-               return -EINVAL;
+       if (!mirror_count || mirror_count > NFS_PAGEIO_DESCRIPTOR_MIRROR_MAX) {
+               pgio->pg_error = -EINVAL;
+               return;
+       }
 
+       pgio->pg_mirrors = nfs_pageio_alloc_mirrors(pgio, mirror_count);
+       if (pgio->pg_mirrors == NULL) {
+               pgio->pg_error = -ENOMEM;
+               pgio->pg_mirrors = pgio->pg_mirrors_static;
+               mirror_count = 1;
+       }
        pgio->pg_mirror_count = mirror_count;
-
-       return 0;
 }
 
 /*
@@ -1036,7 +1008,7 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
        unsigned int bytes_left = 0;
        unsigned int offset, pgbase;
 
-       nfs_page_group_lock(req, false);
+       nfs_page_group_lock(req);
 
        subreq = req;
        bytes_left = subreq->wb_bytes;
@@ -1058,7 +1030,7 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
                        if (mirror->pg_recoalesce)
                                return 0;
                        /* retry add_request for this subreq */
-                       nfs_page_group_lock(req, false);
+                       nfs_page_group_lock(req);
                        continue;
                }
 
@@ -1155,7 +1127,7 @@ int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 
        for (midx = 0; midx < desc->pg_mirror_count; midx++) {
                if (midx) {
-                       nfs_page_group_lock(req, false);
+                       nfs_page_group_lock(req);
 
                        /* find the last request */
                        for (lastreq = req->wb_head;
index c383d0913b54c90fb96020a62faaf989a5d946b3..7879ed8ceb76b52f1fd9b55ea90462d36554ae49 100644 (file)
@@ -529,47 +529,6 @@ pnfs_put_lseg(struct pnfs_layout_segment *lseg)
 }
 EXPORT_SYMBOL_GPL(pnfs_put_lseg);
 
-static void pnfs_free_lseg_async_work(struct work_struct *work)
-{
-       struct pnfs_layout_segment *lseg;
-       struct pnfs_layout_hdr *lo;
-
-       lseg = container_of(work, struct pnfs_layout_segment, pls_work);
-       lo = lseg->pls_layout;
-
-       pnfs_free_lseg(lseg);
-       pnfs_put_layout_hdr(lo);
-}
-
-static void pnfs_free_lseg_async(struct pnfs_layout_segment *lseg)
-{
-       INIT_WORK(&lseg->pls_work, pnfs_free_lseg_async_work);
-       schedule_work(&lseg->pls_work);
-}
-
-void
-pnfs_put_lseg_locked(struct pnfs_layout_segment *lseg)
-{
-       if (!lseg)
-               return;
-
-       assert_spin_locked(&lseg->pls_layout->plh_inode->i_lock);
-
-       dprintk("%s: lseg %p ref %d valid %d\n", __func__, lseg,
-               atomic_read(&lseg->pls_refcount),
-               test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
-       if (atomic_dec_and_test(&lseg->pls_refcount)) {
-               struct pnfs_layout_hdr *lo = lseg->pls_layout;
-               if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags))
-                       return;
-               pnfs_layout_remove_lseg(lo, lseg);
-               if (!pnfs_cache_lseg_for_layoutreturn(lo, lseg)) {
-                       pnfs_get_layout_hdr(lo);
-                       pnfs_free_lseg_async(lseg);
-               }
-       }
-}
-
 /*
  * is l2 fully contained in l1?
  *   start1                             end1
@@ -2274,7 +2233,6 @@ pnfs_write_through_mds(struct nfs_pageio_descriptor *desc,
                nfs_pageio_reset_write_mds(desc);
                mirror->pg_recoalesce = 1;
        }
-       nfs_pgio_data_destroy(hdr);
        hdr->release(hdr);
 }
 
@@ -2398,7 +2356,6 @@ pnfs_read_through_mds(struct nfs_pageio_descriptor *desc,
                nfs_pageio_reset_read_mds(desc);
                mirror->pg_recoalesce = 1;
        }
-       nfs_pgio_data_destroy(hdr);
        hdr->release(hdr);
 }
 
index 99731e3e332f3ec32eec26cca47556193dcf68fe..87f144f14d1e0c0c6c3232cc54c6e6b4167d5c24 100644 (file)
@@ -67,7 +67,6 @@ struct pnfs_layout_segment {
        u32 pls_seq;
        unsigned long pls_flags;
        struct pnfs_layout_hdr *pls_layout;
-       struct work_struct pls_work;
 };
 
 enum pnfs_try_status {
@@ -230,7 +229,6 @@ extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool sync);
 /* pnfs.c */
 void pnfs_get_layout_hdr(struct pnfs_layout_hdr *lo);
 void pnfs_put_lseg(struct pnfs_layout_segment *lseg);
-void pnfs_put_lseg_locked(struct pnfs_layout_segment *lseg);
 
 void set_pnfs_layoutdriver(struct nfs_server *, const struct nfs_fh *, struct nfs_fsinfo *);
 void unset_pnfs_layoutdriver(struct nfs_server *);
index 25f28fa64c575129130d916d566674da792376ae..60da59be83b6128241bf3be4a93e153e0329421a 100644 (file)
@@ -83,33 +83,10 @@ pnfs_generic_clear_request_commit(struct nfs_page *req,
        }
 out:
        nfs_request_remove_commit_list(req, cinfo);
-       pnfs_put_lseg_locked(freeme);
+       pnfs_put_lseg(freeme);
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_clear_request_commit);
 
-static int
-pnfs_generic_transfer_commit_list(struct list_head *src, struct list_head *dst,
-                                 struct nfs_commit_info *cinfo, int max)
-{
-       struct nfs_page *req, *tmp;
-       int ret = 0;
-
-       list_for_each_entry_safe(req, tmp, src, wb_list) {
-               if (!nfs_lock_request(req))
-                       continue;
-               kref_get(&req->wb_kref);
-               if (cond_resched_lock(&cinfo->inode->i_lock))
-                       list_safe_reset_next(req, tmp, wb_list);
-               nfs_request_remove_commit_list(req, cinfo);
-               clear_bit(PG_COMMIT_TO_DS, &req->wb_flags);
-               nfs_list_add_request(req, dst);
-               ret++;
-               if ((ret == max) && !cinfo->dreq)
-                       break;
-       }
-       return ret;
-}
-
 static int
 pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
                                 struct nfs_commit_info *cinfo,
@@ -119,15 +96,15 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
        struct list_head *dst = &bucket->committing;
        int ret;
 
-       lockdep_assert_held(&cinfo->inode->i_lock);
-       ret = pnfs_generic_transfer_commit_list(src, dst, cinfo, max);
+       lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
+       ret = nfs_scan_commit_list(src, dst, cinfo, max);
        if (ret) {
                cinfo->ds->nwritten -= ret;
                cinfo->ds->ncommitting += ret;
                if (bucket->clseg == NULL)
                        bucket->clseg = pnfs_get_lseg(bucket->wlseg);
                if (list_empty(src)) {
-                       pnfs_put_lseg_locked(bucket->wlseg);
+                       pnfs_put_lseg(bucket->wlseg);
                        bucket->wlseg = NULL;
                }
        }
@@ -142,7 +119,7 @@ int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo,
 {
        int i, rv = 0, cnt;
 
-       lockdep_assert_held(&cinfo->inode->i_lock);
+       lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
        for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) {
                cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i],
                                                       cinfo, max);
@@ -162,11 +139,10 @@ void pnfs_generic_recover_commit_reqs(struct list_head *dst,
        int nwritten;
        int i;
 
-       lockdep_assert_held(&cinfo->inode->i_lock);
+       lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
 restart:
        for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
-               nwritten = pnfs_generic_transfer_commit_list(&b->written,
-                               dst, cinfo, 0);
+               nwritten = nfs_scan_commit_list(&b->written, dst, cinfo, 0);
                if (!nwritten)
                        continue;
                cinfo->ds->nwritten -= nwritten;
@@ -953,12 +929,12 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
        struct list_head *list;
        struct pnfs_commit_bucket *buckets;
 
-       spin_lock(&cinfo->inode->i_lock);
+       mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
        buckets = cinfo->ds->buckets;
        list = &buckets[ds_commit_idx].written;
        if (list_empty(list)) {
                if (!pnfs_is_valid_lseg(lseg)) {
-                       spin_unlock(&cinfo->inode->i_lock);
+                       mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
                        cinfo->completion_ops->resched_write(cinfo, req);
                        return;
                }
@@ -975,7 +951,7 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
        cinfo->ds->nwritten++;
 
        nfs_request_add_commit_list_locked(req, list, cinfo);
-       spin_unlock(&cinfo->inode->i_lock);
+       mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
        nfs_mark_page_unstable(req->wb_page, cinfo);
 }
 EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit);
index a8421d9dab6a125c4eb6b7ad9f074d8ce91a5a99..0d42573d423d6d7940f1d2dbaca45ab3dd7b65f7 100644 (file)
@@ -68,7 +68,7 @@ void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
                pg_ops = server->pnfs_curr_ld->pg_read_ops;
 #endif
        nfs_pageio_init(pgio, inode, pg_ops, compl_ops, &nfs_rw_read_ops,
-                       server->rsize, 0, GFP_KERNEL);
+                       server->rsize, 0);
 }
 EXPORT_SYMBOL_GPL(nfs_pageio_init_read);
 
index d828ef88e7db63e83b74e34ef918bee0fbe2b362..6b179af59b92987a2eaeb6aa6651b7bbbfc0f590 100644 (file)
@@ -1691,8 +1691,8 @@ static int nfs_verify_authflavors(struct nfs_parsed_mount_data *args,
                        rpc_authflavor_t *server_authlist, unsigned int count)
 {
        rpc_authflavor_t flavor = RPC_AUTH_MAXFLAVOR;
+       bool found_auth_null = false;
        unsigned int i;
-       int use_auth_null = false;
 
        /*
         * If the sec= mount option is used, the specified flavor or AUTH_NULL
@@ -1701,6 +1701,10 @@ static int nfs_verify_authflavors(struct nfs_parsed_mount_data *args,
         * AUTH_NULL has a special meaning when it's in the server list - it
         * means that the server will ignore the rpc creds, so any flavor
         * can be used but still use the sec= that was specified.
+        *
+        * Note also that the MNT procedure in MNTv1 does not return a list
+        * of supported security flavors. In this case, nfs_mount() fabricates
+        * a security flavor list containing just AUTH_NULL.
         */
        for (i = 0; i < count; i++) {
                flavor = server_authlist[i];
@@ -1709,11 +1713,11 @@ static int nfs_verify_authflavors(struct nfs_parsed_mount_data *args,
                        goto out;
 
                if (flavor == RPC_AUTH_NULL)
-                       use_auth_null = true;
+                       found_auth_null = true;
        }
 
-       if (use_auth_null) {
-               flavor = RPC_AUTH_NULL;
+       if (found_auth_null) {
+               flavor = args->auth_info.flavors[0];
                goto out;
        }
 
index b1af5dee5e0a87fdbd370bfdd9277d331aa4c721..f68083db63c8a7735b9eeff4606dc9a71bac55e3 100644 (file)
@@ -102,10 +102,8 @@ static struct nfs_pgio_header *nfs_writehdr_alloc(void)
 {
        struct nfs_pgio_header *p = mempool_alloc(nfs_wdata_mempool, GFP_NOIO);
 
-       if (p) {
-               memset(p, 0, sizeof(*p));
-               p->rw_mode = FMODE_WRITE;
-       }
+       memset(p, 0, sizeof(*p));
+       p->rw_mode = FMODE_WRITE;
        return p;
 }
 
@@ -154,6 +152,14 @@ static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error)
        set_bit(NFS_CONTEXT_ERROR_WRITE, &ctx->flags);
 }
 
+static struct nfs_page *
+nfs_page_private_request(struct page *page)
+{
+       if (!PagePrivate(page))
+               return NULL;
+       return (struct nfs_page *)page_private(page);
+}
+
 /*
  * nfs_page_find_head_request_locked - find head request associated with @page
  *
@@ -162,21 +168,41 @@ static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error)
  * returns matching head request with reference held, or NULL if not found.
  */
 static struct nfs_page *
-nfs_page_find_head_request_locked(struct nfs_inode *nfsi, struct page *page)
+nfs_page_find_private_request(struct page *page)
 {
-       struct nfs_page *req = NULL;
-
-       if (PagePrivate(page))
-               req = (struct nfs_page *)page_private(page);
-       else if (unlikely(PageSwapCache(page)))
-               req = nfs_page_search_commits_for_head_request_locked(nfsi,
-                       page);
+       struct address_space *mapping = page_file_mapping(page);
+       struct nfs_page *req;
 
+       if (!PagePrivate(page))
+               return NULL;
+       spin_lock(&mapping->private_lock);
+       req = nfs_page_private_request(page);
        if (req) {
                WARN_ON_ONCE(req->wb_head != req);
                kref_get(&req->wb_kref);
        }
+       spin_unlock(&mapping->private_lock);
+       return req;
+}
 
+static struct nfs_page *
+nfs_page_find_swap_request(struct page *page)
+{
+       struct inode *inode = page_file_mapping(page)->host;
+       struct nfs_inode *nfsi = NFS_I(inode);
+       struct nfs_page *req = NULL;
+       if (!PageSwapCache(page))
+               return NULL;
+       mutex_lock(&nfsi->commit_mutex);
+       if (PageSwapCache(page)) {
+               req = nfs_page_search_commits_for_head_request_locked(nfsi,
+                       page);
+               if (req) {
+                       WARN_ON_ONCE(req->wb_head != req);
+                       kref_get(&req->wb_kref);
+               }
+       }
+       mutex_unlock(&nfsi->commit_mutex);
        return req;
 }
 
@@ -187,12 +213,11 @@ nfs_page_find_head_request_locked(struct nfs_inode *nfsi, struct page *page)
  */
 static struct nfs_page *nfs_page_find_head_request(struct page *page)
 {
-       struct inode *inode = page_file_mapping(page)->host;
-       struct nfs_page *req = NULL;
+       struct nfs_page *req;
 
-       spin_lock(&inode->i_lock);
-       req = nfs_page_find_head_request_locked(NFS_I(inode), page);
-       spin_unlock(&inode->i_lock);
+       req = nfs_page_find_private_request(page);
+       if (!req)
+               req = nfs_page_find_swap_request(page);
        return req;
 }
 
@@ -241,9 +266,6 @@ nfs_page_group_search_locked(struct nfs_page *head, unsigned int page_offset)
 {
        struct nfs_page *req;
 
-       WARN_ON_ONCE(head != head->wb_head);
-       WARN_ON_ONCE(!test_bit(PG_HEADLOCK, &head->wb_head->wb_flags));
-
        req = head;
        do {
                if (page_offset >= req->wb_pgbase &&
@@ -269,20 +291,17 @@ static bool nfs_page_group_covers_page(struct nfs_page *req)
        unsigned int pos = 0;
        unsigned int len = nfs_page_length(req->wb_page);
 
-       nfs_page_group_lock(req, false);
+       nfs_page_group_lock(req);
 
-       do {
+       for (;;) {
                tmp = nfs_page_group_search_locked(req->wb_head, pos);
-               if (tmp) {
-                       /* no way this should happen */
-                       WARN_ON_ONCE(tmp->wb_pgbase != pos);
-                       pos += tmp->wb_bytes - (pos - tmp->wb_pgbase);
-               }
-       } while (tmp && pos < len);
+               if (!tmp)
+                       break;
+               pos = tmp->wb_pgbase + tmp->wb_bytes;
+       }
 
        nfs_page_group_unlock(req);
-       WARN_ON_ONCE(pos > len);
-       return pos == len;
+       return pos >= len;
 }
 
 /* We can set the PG_uptodate flag if we see that a write request
@@ -333,8 +352,11 @@ static void nfs_end_page_writeback(struct nfs_page *req)
 {
        struct inode *inode = page_file_mapping(req->wb_page)->host;
        struct nfs_server *nfss = NFS_SERVER(inode);
+       bool is_done;
 
-       if (!nfs_page_group_sync_on_bit(req, PG_WB_END))
+       is_done = nfs_page_group_sync_on_bit(req, PG_WB_END);
+       nfs_unlock_request(req);
+       if (!is_done)
                return;
 
        end_page_writeback(req->wb_page);
@@ -342,22 +364,6 @@ static void nfs_end_page_writeback(struct nfs_page *req)
                clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
 }
 
-
-/* nfs_page_group_clear_bits
- *   @req - an nfs request
- * clears all page group related bits from @req
- */
-static void
-nfs_page_group_clear_bits(struct nfs_page *req)
-{
-       clear_bit(PG_TEARDOWN, &req->wb_flags);
-       clear_bit(PG_UNLOCKPAGE, &req->wb_flags);
-       clear_bit(PG_UPTODATE, &req->wb_flags);
-       clear_bit(PG_WB_END, &req->wb_flags);
-       clear_bit(PG_REMOVE, &req->wb_flags);
-}
-
-
 /*
  * nfs_unroll_locks_and_wait -  unlock all newly locked reqs and wait on @req
  *
@@ -366,43 +372,24 @@ nfs_page_group_clear_bits(struct nfs_page *req)
  * @inode - inode associated with request page group, must be holding inode lock
  * @head  - head request of page group, must be holding head lock
  * @req   - request that couldn't lock and needs to wait on the req bit lock
- * @nonblock - if true, don't actually wait
  *
- * NOTE: this must be called holding page_group bit lock and inode spin lock
- *       and BOTH will be released before returning.
+ * NOTE: this must be called holding page_group bit lock
+ *       which will be released before returning.
  *
  * returns 0 on success, < 0 on error.
  */
-static int
-nfs_unroll_locks_and_wait(struct inode *inode, struct nfs_page *head,
-                         struct nfs_page *req, bool nonblock)
-       __releases(&inode->i_lock)
+static void
+nfs_unroll_locks(struct inode *inode, struct nfs_page *head,
+                         struct nfs_page *req)
 {
        struct nfs_page *tmp;
-       int ret;
 
        /* relinquish all the locks successfully grabbed this run */
-       for (tmp = head ; tmp != req; tmp = tmp->wb_this_page)
-               nfs_unlock_request(tmp);
-
-       WARN_ON_ONCE(test_bit(PG_TEARDOWN, &req->wb_flags));
-
-       /* grab a ref on the request that will be waited on */
-       kref_get(&req->wb_kref);
-
-       nfs_page_group_unlock(head);
-       spin_unlock(&inode->i_lock);
-
-       /* release ref from nfs_page_find_head_request_locked */
-       nfs_release_request(head);
-
-       if (!nonblock)
-               ret = nfs_wait_on_request(req);
-       else
-               ret = -EAGAIN;
-       nfs_release_request(req);
-
-       return ret;
+       for (tmp = head->wb_this_page ; tmp != req; tmp = tmp->wb_this_page) {
+               if (!kref_read(&tmp->wb_kref))
+                       continue;
+               nfs_unlock_and_release_request(tmp);
+       }
 }
 
 /*
@@ -417,7 +404,8 @@ nfs_unroll_locks_and_wait(struct inode *inode, struct nfs_page *head,
  */
 static void
 nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
-                                struct nfs_page *old_head)
+                                struct nfs_page *old_head,
+                                struct inode *inode)
 {
        while (destroy_list) {
                struct nfs_page *subreq = destroy_list;
@@ -428,33 +416,28 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
                WARN_ON_ONCE(old_head != subreq->wb_head);
 
                /* make sure old group is not used */
-               subreq->wb_head = subreq;
                subreq->wb_this_page = subreq;
 
-               /* subreq is now totally disconnected from page group or any
-                * write / commit lists. last chance to wake any waiters */
-               nfs_unlock_request(subreq);
+               clear_bit(PG_REMOVE, &subreq->wb_flags);
 
-               if (!test_bit(PG_TEARDOWN, &subreq->wb_flags)) {
-                       /* release ref on old head request */
-                       nfs_release_request(old_head);
+               /* Note: races with nfs_page_group_destroy() */
+               if (!kref_read(&subreq->wb_kref)) {
+                       /* Check if we raced with nfs_page_group_destroy() */
+                       if (test_and_clear_bit(PG_TEARDOWN, &subreq->wb_flags))
+                               nfs_free_request(subreq);
+                       continue;
+               }
 
-                       nfs_page_group_clear_bits(subreq);
+               subreq->wb_head = subreq;
 
-                       /* release the PG_INODE_REF reference */
-                       if (test_and_clear_bit(PG_INODE_REF, &subreq->wb_flags))
-                               nfs_release_request(subreq);
-                       else
-                               WARN_ON_ONCE(1);
-               } else {
-                       WARN_ON_ONCE(test_bit(PG_CLEAN, &subreq->wb_flags));
-                       /* zombie requests have already released the last
-                        * reference and were waiting on the rest of the
-                        * group to complete. Since it's no longer part of a
-                        * group, simply free the request */
-                       nfs_page_group_clear_bits(subreq);
-                       nfs_free_request(subreq);
+               if (test_and_clear_bit(PG_INODE_REF, &subreq->wb_flags)) {
+                       nfs_release_request(subreq);
+                       atomic_long_dec(&NFS_I(inode)->nrequests);
                }
+
+               /* subreq is now totally disconnected from page group or any
+                * write / commit lists. last chance to wake any waiters */
+               nfs_unlock_and_release_request(subreq);
        }
 }
 
@@ -464,7 +447,6 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
  *                              operations for this page.
  *
  * @page - the page used to lookup the "page group" of nfs_page structures
- * @nonblock - if true, don't block waiting for request locks
  *
  * This function joins all sub requests to the head request by first
  * locking all requests in the group, cancelling any pending operations
@@ -478,7 +460,7 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
  * error was encountered.
  */
 static struct nfs_page *
-nfs_lock_and_join_requests(struct page *page, bool nonblock)
+nfs_lock_and_join_requests(struct page *page)
 {
        struct inode *inode = page_file_mapping(page)->host;
        struct nfs_page *head, *subreq;
@@ -487,43 +469,63 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock)
        int ret;
 
 try_again:
-       total_bytes = 0;
-
-       WARN_ON_ONCE(destroy_list);
-
-       spin_lock(&inode->i_lock);
-
        /*
         * A reference is taken only on the head request which acts as a
         * reference to the whole page group - the group will not be destroyed
         * until the head reference is released.
         */
-       head = nfs_page_find_head_request_locked(NFS_I(inode), page);
-
-       if (!head) {
-               spin_unlock(&inode->i_lock);
+       head = nfs_page_find_head_request(page);
+       if (!head)
                return NULL;
-       }
 
-       /* holding inode lock, so always make a non-blocking call to try the
-        * page group lock */
-       ret = nfs_page_group_lock(head, true);
-       if (ret < 0) {
-               spin_unlock(&inode->i_lock);
+       /* lock the page head first in order to avoid an ABBA inefficiency */
+       if (!nfs_lock_request(head)) {
+               ret = nfs_wait_on_request(head);
+               nfs_release_request(head);
+               if (ret < 0)
+                       return ERR_PTR(ret);
+               goto try_again;
+       }
 
-               if (!nonblock && ret == -EAGAIN) {
-                       nfs_page_group_lock_wait(head);
-                       nfs_release_request(head);
-                       goto try_again;
-               }
+       /* Ensure that nobody removed the request before we locked it */
+       if (head != nfs_page_private_request(page) && !PageSwapCache(page)) {
+               nfs_unlock_and_release_request(head);
+               goto try_again;
+       }
 
-               nfs_release_request(head);
+       ret = nfs_page_group_lock(head);
+       if (ret < 0) {
+               nfs_unlock_and_release_request(head);
                return ERR_PTR(ret);
        }
 
        /* lock each request in the page group */
-       subreq = head;
-       do {
+       total_bytes = head->wb_bytes;
+       for (subreq = head->wb_this_page; subreq != head;
+                       subreq = subreq->wb_this_page) {
+
+               if (!kref_get_unless_zero(&subreq->wb_kref)) {
+                       if (subreq->wb_offset == head->wb_offset + total_bytes)
+                               total_bytes += subreq->wb_bytes;
+                       continue;
+               }
+
+               while (!nfs_lock_request(subreq)) {
+                       /*
+                        * Unlock page to allow nfs_page_group_sync_on_bit()
+                        * to succeed
+                        */
+                       nfs_page_group_unlock(head);
+                       ret = nfs_wait_on_request(subreq);
+                       if (!ret)
+                               ret = nfs_page_group_lock(head);
+                       if (ret < 0) {
+                               nfs_unroll_locks(inode, head, subreq);
+                               nfs_release_request(subreq);
+                               nfs_unlock_and_release_request(head);
+                               return ERR_PTR(ret);
+                       }
+               }
                /*
                 * Subrequests are always contiguous, non overlapping
                 * and in order - but may be repeated (mirrored writes).
@@ -535,24 +537,12 @@ try_again:
                            ((subreq->wb_offset + subreq->wb_bytes) >
                             (head->wb_offset + total_bytes)))) {
                        nfs_page_group_unlock(head);
-                       spin_unlock(&inode->i_lock);
+                       nfs_unroll_locks(inode, head, subreq);
+                       nfs_unlock_and_release_request(subreq);
+                       nfs_unlock_and_release_request(head);
                        return ERR_PTR(-EIO);
                }
-
-               if (!nfs_lock_request(subreq)) {
-                       /* releases page group bit lock and
-                        * inode spin lock and all references */
-                       ret = nfs_unroll_locks_and_wait(inode, head,
-                               subreq, nonblock);
-
-                       if (ret == 0)
-                               goto try_again;
-
-                       return ERR_PTR(ret);
-               }
-
-               subreq = subreq->wb_this_page;
-       } while (subreq != head);
+       }
 
        /* Now that all requests are locked, make sure they aren't on any list.
         * Commit list removal accounting is done after locks are dropped */
@@ -573,34 +563,30 @@ try_again:
                head->wb_bytes = total_bytes;
        }
 
-       /*
-        * prepare head request to be added to new pgio descriptor
-        */
-       nfs_page_group_clear_bits(head);
-
-       /*
-        * some part of the group was still on the inode list - otherwise
-        * the group wouldn't be involved in async write.
-        * grab a reference for the head request, iff it needs one.
-        */
-       if (!test_and_set_bit(PG_INODE_REF, &head->wb_flags))
+       /* Postpone destruction of this request */
+       if (test_and_clear_bit(PG_REMOVE, &head->wb_flags)) {
+               set_bit(PG_INODE_REF, &head->wb_flags);
                kref_get(&head->wb_kref);
+               atomic_long_inc(&NFS_I(inode)->nrequests);
+       }
 
        nfs_page_group_unlock(head);
 
-       /* drop lock to clean uprequests on destroy list */
-       spin_unlock(&inode->i_lock);
+       nfs_destroy_unlinked_subrequests(destroy_list, head, inode);
 
-       nfs_destroy_unlinked_subrequests(destroy_list, head);
+       /* Did we lose a race with nfs_inode_remove_request()? */
+       if (!(PagePrivate(page) || PageSwapCache(page))) {
+               nfs_unlock_and_release_request(head);
+               return NULL;
+       }
 
-       /* still holds ref on head from nfs_page_find_head_request_locked
+       /* still holds ref on head from nfs_page_find_head_request
         * and still has lock on head from lock loop */
        return head;
 }
 
 static void nfs_write_error_remove_page(struct nfs_page *req)
 {
-       nfs_unlock_request(req);
        nfs_end_page_writeback(req);
        generic_error_remove_page(page_file_mapping(req->wb_page),
                                  req->wb_page);
@@ -624,12 +610,12 @@ nfs_error_is_fatal_on_server(int err)
  * May return an error if the user signalled nfs_wait_on_request().
  */
 static int nfs_page_async_flush(struct nfs_pageio_descriptor *pgio,
-                               struct page *page, bool nonblock)
+                               struct page *page)
 {
        struct nfs_page *req;
        int ret = 0;
 
-       req = nfs_lock_and_join_requests(page, nonblock);
+       req = nfs_lock_and_join_requests(page);
        if (!req)
                goto out;
        ret = PTR_ERR(req);
@@ -672,7 +658,7 @@ static int nfs_do_writepage(struct page *page, struct writeback_control *wbc,
        int ret;
 
        nfs_pageio_cond_complete(pgio, page_index(page));
-       ret = nfs_page_async_flush(pgio, page, wbc->sync_mode == WB_SYNC_NONE);
+       ret = nfs_page_async_flush(pgio, page);
        if (ret == -EAGAIN) {
                redirty_page_for_writepage(wbc, page);
                ret = 0;
@@ -759,6 +745,7 @@ out_err:
  */
 static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
 {
+       struct address_space *mapping = page_file_mapping(req->wb_page);
        struct nfs_inode *nfsi = NFS_I(inode);
 
        WARN_ON_ONCE(req->wb_this_page != req);
@@ -766,27 +753,30 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
        /* Lock the request! */
        nfs_lock_request(req);
 
-       spin_lock(&inode->i_lock);
-       if (!nfsi->nrequests &&
-           NFS_PROTO(inode)->have_delegation(inode, FMODE_WRITE))
-               inode->i_version++;
        /*
         * Swap-space should not get truncated. Hence no need to plug the race
         * with invalidate/truncate.
         */
+       spin_lock(&mapping->private_lock);
+       if (!nfs_have_writebacks(inode) &&
+           NFS_PROTO(inode)->have_delegation(inode, FMODE_WRITE)) {
+               spin_lock(&inode->i_lock);
+               inode->i_version++;
+               spin_unlock(&inode->i_lock);
+       }
        if (likely(!PageSwapCache(req->wb_page))) {
                set_bit(PG_MAPPED, &req->wb_flags);
                SetPagePrivate(req->wb_page);
                set_page_private(req->wb_page, (unsigned long)req);
        }
-       nfsi->nrequests++;
+       spin_unlock(&mapping->private_lock);
+       atomic_long_inc(&nfsi->nrequests);
        /* this a head request for a page group - mark it as having an
         * extra reference so sub groups can follow suit.
         * This flag also informs pgio layer when to bump nrequests when
         * adding subrequests. */
        WARN_ON(test_and_set_bit(PG_INODE_REF, &req->wb_flags));
        kref_get(&req->wb_kref);
-       spin_unlock(&inode->i_lock);
 }
 
 /*
@@ -794,25 +784,22 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
  */
 static void nfs_inode_remove_request(struct nfs_page *req)
 {
-       struct inode *inode = d_inode(req->wb_context->dentry);
+       struct address_space *mapping = page_file_mapping(req->wb_page);
+       struct inode *inode = mapping->host;
        struct nfs_inode *nfsi = NFS_I(inode);
        struct nfs_page *head;
 
+       atomic_long_dec(&nfsi->nrequests);
        if (nfs_page_group_sync_on_bit(req, PG_REMOVE)) {
                head = req->wb_head;
 
-               spin_lock(&inode->i_lock);
+               spin_lock(&mapping->private_lock);
                if (likely(head->wb_page && !PageSwapCache(head->wb_page))) {
                        set_page_private(head->wb_page, 0);
                        ClearPagePrivate(head->wb_page);
                        clear_bit(PG_MAPPED, &head->wb_flags);
                }
-               nfsi->nrequests--;
-               spin_unlock(&inode->i_lock);
-       } else {
-               spin_lock(&inode->i_lock);
-               nfsi->nrequests--;
-               spin_unlock(&inode->i_lock);
+               spin_unlock(&mapping->private_lock);
        }
 
        if (test_and_clear_bit(PG_INODE_REF, &req->wb_flags))
@@ -868,7 +855,8 @@ nfs_page_search_commits_for_head_request_locked(struct nfs_inode *nfsi,
  * number of outstanding requests requiring a commit as well as
  * the MM page stats.
  *
- * The caller must hold cinfo->inode->i_lock, and the nfs_page lock.
+ * The caller must hold NFS_I(cinfo->inode)->commit_mutex, and the
+ * nfs_page lock.
  */
 void
 nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst,
@@ -876,7 +864,7 @@ nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst,
 {
        set_bit(PG_CLEAN, &req->wb_flags);
        nfs_list_add_request(req, dst);
-       cinfo->mds->ncommit++;
+       atomic_long_inc(&cinfo->mds->ncommit);
 }
 EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked);
 
@@ -896,9 +884,9 @@ EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked);
 void
 nfs_request_add_commit_list(struct nfs_page *req, struct nfs_commit_info *cinfo)
 {
-       spin_lock(&cinfo->inode->i_lock);
+       mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
        nfs_request_add_commit_list_locked(req, &cinfo->mds->list, cinfo);
-       spin_unlock(&cinfo->inode->i_lock);
+       mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
        if (req->wb_page)
                nfs_mark_page_unstable(req->wb_page, cinfo);
 }
@@ -922,7 +910,7 @@ nfs_request_remove_commit_list(struct nfs_page *req,
        if (!test_and_clear_bit(PG_CLEAN, &(req)->wb_flags))
                return;
        nfs_list_remove_request(req);
-       cinfo->mds->ncommit--;
+       atomic_long_dec(&cinfo->mds->ncommit);
 }
 EXPORT_SYMBOL_GPL(nfs_request_remove_commit_list);
 
@@ -967,7 +955,7 @@ nfs_clear_page_commit(struct page *page)
                    WB_RECLAIMABLE);
 }
 
-/* Called holding inode (/cinfo) lock */
+/* Called holding the request lock on @req */
 static void
 nfs_clear_request_commit(struct nfs_page *req)
 {
@@ -976,9 +964,11 @@ nfs_clear_request_commit(struct nfs_page *req)
                struct nfs_commit_info cinfo;
 
                nfs_init_cinfo_from_inode(&cinfo, inode);
+               mutex_lock(&NFS_I(inode)->commit_mutex);
                if (!pnfs_clear_request_commit(req, &cinfo)) {
                        nfs_request_remove_commit_list(req, &cinfo);
                }
+               mutex_unlock(&NFS_I(inode)->commit_mutex);
                nfs_clear_page_commit(req->wb_page);
        }
 }
@@ -1023,7 +1013,6 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr)
 remove_req:
                nfs_inode_remove_request(req);
 next:
-               nfs_unlock_request(req);
                nfs_end_page_writeback(req);
                nfs_release_request(req);
        }
@@ -1035,10 +1024,10 @@ out:
 unsigned long
 nfs_reqs_to_commit(struct nfs_commit_info *cinfo)
 {
-       return cinfo->mds->ncommit;
+       return atomic_long_read(&cinfo->mds->ncommit);
 }
 
-/* cinfo->inode->i_lock held by caller */
+/* NFS_I(cinfo->inode)->commit_mutex held by caller */
 int
 nfs_scan_commit_list(struct list_head *src, struct list_head *dst,
                     struct nfs_commit_info *cinfo, int max)
@@ -1046,20 +1035,37 @@ nfs_scan_commit_list(struct list_head *src, struct list_head *dst,
        struct nfs_page *req, *tmp;
        int ret = 0;
 
+restart:
        list_for_each_entry_safe(req, tmp, src, wb_list) {
-               if (!nfs_lock_request(req))
-                       continue;
                kref_get(&req->wb_kref);
-               if (cond_resched_lock(&cinfo->inode->i_lock))
-                       list_safe_reset_next(req, tmp, wb_list);
+               if (!nfs_lock_request(req)) {
+                       int status;
+
+                       /* Prevent deadlock with nfs_lock_and_join_requests */
+                       if (!list_empty(dst)) {
+                               nfs_release_request(req);
+                               continue;
+                       }
+                       /* Ensure we make progress to prevent livelock */
+                       mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
+                       status = nfs_wait_on_request(req);
+                       nfs_release_request(req);
+                       mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
+                       if (status < 0)
+                               break;
+                       goto restart;
+               }
                nfs_request_remove_commit_list(req, cinfo);
+               clear_bit(PG_COMMIT_TO_DS, &req->wb_flags);
                nfs_list_add_request(req, dst);
                ret++;
                if ((ret == max) && !cinfo->dreq)
                        break;
+               cond_resched();
        }
        return ret;
 }
+EXPORT_SYMBOL_GPL(nfs_scan_commit_list);
 
 /*
  * nfs_scan_commit - Scan an inode for commit requests
@@ -1076,15 +1082,17 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst,
 {
        int ret = 0;
 
-       spin_lock(&cinfo->inode->i_lock);
-       if (cinfo->mds->ncommit > 0) {
+       if (!atomic_long_read(&cinfo->mds->ncommit))
+               return 0;
+       mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
+       if (atomic_long_read(&cinfo->mds->ncommit) > 0) {
                const int max = INT_MAX;
 
                ret = nfs_scan_commit_list(&cinfo->mds->list, dst,
                                           cinfo, max);
                ret += pnfs_scan_commit_lists(inode, cinfo, max - ret);
        }
-       spin_unlock(&cinfo->inode->i_lock);
+       mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
        return ret;
 }
 
@@ -1105,43 +1113,21 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode,
        unsigned int end;
        int error;
 
-       if (!PagePrivate(page))
-               return NULL;
-
        end = offset + bytes;
-       spin_lock(&inode->i_lock);
-
-       for (;;) {
-               req = nfs_page_find_head_request_locked(NFS_I(inode), page);
-               if (req == NULL)
-                       goto out_unlock;
 
-               /* should be handled by nfs_flush_incompatible */
-               WARN_ON_ONCE(req->wb_head != req);
-               WARN_ON_ONCE(req->wb_this_page != req);
-
-               rqend = req->wb_offset + req->wb_bytes;
-               /*
-                * Tell the caller to flush out the request if
-                * the offsets are non-contiguous.
-                * Note: nfs_flush_incompatible() will already
-                * have flushed out requests having wrong owners.
-                */
-               if (offset > rqend
-                   || end < req->wb_offset)
-                       goto out_flushme;
-
-               if (nfs_lock_request(req))
-                       break;
+       req = nfs_lock_and_join_requests(page);
+       if (IS_ERR_OR_NULL(req))
+               return req;
 
-               /* The request is locked, so wait and then retry */
-               spin_unlock(&inode->i_lock);
-               error = nfs_wait_on_request(req);
-               nfs_release_request(req);
-               if (error != 0)
-                       goto out_err;
-               spin_lock(&inode->i_lock);
-       }
+       rqend = req->wb_offset + req->wb_bytes;
+       /*
+        * Tell the caller to flush out the request if
+        * the offsets are non-contiguous.
+        * Note: nfs_flush_incompatible() will already
+        * have flushed out requests having wrong owners.
+        */
+       if (offset > rqend || end < req->wb_offset)
+               goto out_flushme;
 
        /* Okay, the request matches. Update the region */
        if (offset < req->wb_offset) {
@@ -1152,17 +1138,17 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode,
                req->wb_bytes = end - req->wb_offset;
        else
                req->wb_bytes = rqend - req->wb_offset;
-out_unlock:
-       if (req)
-               nfs_clear_request_commit(req);
-       spin_unlock(&inode->i_lock);
        return req;
 out_flushme:
-       spin_unlock(&inode->i_lock);
-       nfs_release_request(req);
+       /*
+        * Note: we mark the request dirty here because
+        * nfs_lock_and_join_requests() cannot preserve
+        * commit flags, so we have to replay the write.
+        */
+       nfs_mark_request_dirty(req);
+       nfs_unlock_and_release_request(req);
        error = nfs_wb_page(inode, page);
-out_err:
-       return ERR_PTR(error);
+       return (error < 0) ? ERR_PTR(error) : NULL;
 }
 
 /*
@@ -1227,8 +1213,6 @@ int nfs_flush_incompatible(struct file *file, struct page *page)
                l_ctx = req->wb_lock_context;
                do_flush = req->wb_page != page ||
                        !nfs_match_open_context(req->wb_context, ctx);
-               /* for now, flush if more than 1 request in page_group */
-               do_flush |= req->wb_this_page != req;
                if (l_ctx && flctx &&
                    !(list_empty_careful(&flctx->flc_posix) &&
                      list_empty_careful(&flctx->flc_flock))) {
@@ -1412,7 +1396,6 @@ static void nfs_redirty_request(struct nfs_page *req)
 {
        nfs_mark_request_dirty(req);
        set_bit(NFS_CONTEXT_RESEND_WRITES, &req->wb_context->flags);
-       nfs_unlock_request(req);
        nfs_end_page_writeback(req);
        nfs_release_request(req);
 }
@@ -1452,7 +1435,7 @@ void nfs_pageio_init_write(struct nfs_pageio_descriptor *pgio,
                pg_ops = server->pnfs_curr_ld->pg_write_ops;
 #endif
        nfs_pageio_init(pgio, inode, pg_ops, compl_ops, &nfs_rw_write_ops,
-                       server->wsize, ioflags, GFP_NOIO);
+                       server->wsize, ioflags);
 }
 EXPORT_SYMBOL_GPL(nfs_pageio_init_write);
 
@@ -1934,7 +1917,7 @@ int nfs_write_inode(struct inode *inode, struct writeback_control *wbc)
        int ret = 0;
 
        /* no commits means nothing needs to be done */
-       if (!nfsi->commit_info.ncommit)
+       if (!atomic_long_read(&nfsi->commit_info.ncommit))
                return ret;
 
        if (wbc->sync_mode == WB_SYNC_NONE) {
@@ -2015,7 +1998,7 @@ int nfs_wb_page_cancel(struct inode *inode, struct page *page)
 
        /* blocking call to cancel all requests and join to a single (head)
         * request */
-       req = nfs_lock_and_join_requests(page, false);
+       req = nfs_lock_and_join_requests(page);
 
        if (IS_ERR(req)) {
                ret = PTR_ERR(req);
index 5cc91d6381a35ce73d64fbf40743a21ef2876d68..a0282ceaa48b7a2ac71cf3c5942169096289c7a6 100644 (file)
@@ -49,7 +49,6 @@
 struct nfs_access_entry {
        struct rb_node          rb_node;
        struct list_head        lru;
-       unsigned long           jiffies;
        struct rpc_cred *       cred;
        __u32                   mask;
        struct rcu_head         rcu_head;
@@ -154,7 +153,7 @@ struct nfs_inode {
         */
        __be32                  cookieverf[2];
 
-       unsigned long           nrequests;
+       atomic_long_t           nrequests;
        struct nfs_mds_commit_info commit_info;
 
        /* Open contexts for shared mmap writes */
@@ -163,6 +162,7 @@ struct nfs_inode {
        /* Readers: in-flight sillydelete RPC calls */
        /* Writers: rmdir */
        struct rw_semaphore     rmdir_sem;
+       struct mutex            commit_mutex;
 
 #if IS_ENABLED(CONFIG_NFS_V4)
        struct nfs4_cached_acl  *nfs4_acl;
@@ -510,7 +510,7 @@ extern void nfs_commit_free(struct nfs_commit_data *data);
 static inline int
 nfs_have_writebacks(struct inode *inode)
 {
-       return NFS_I(inode)->nrequests != 0;
+       return atomic_long_read(&NFS_I(inode)->nrequests) != 0;
 }
 
 /*
index d67b67ae6c8bfba9ccfdbdbdd1d132e203f22b55..d117120c9b6e06ed5f3f02651cc18be9575b7c76 100644 (file)
@@ -125,8 +125,7 @@ extern      void nfs_pageio_init(struct nfs_pageio_descriptor *desc,
                             const struct nfs_pgio_completion_ops *compl_ops,
                             const struct nfs_rw_ops *rw_ops,
                             size_t bsize,
-                            int how,
-                            gfp_t gfp_flags);
+                            int how);
 extern int nfs_pageio_add_request(struct nfs_pageio_descriptor *,
                                   struct nfs_page *);
 extern  int nfs_pageio_resend(struct nfs_pageio_descriptor *,
@@ -139,8 +138,7 @@ extern size_t nfs_generic_pg_test(struct nfs_pageio_descriptor *desc,
 extern  int nfs_wait_on_request(struct nfs_page *);
 extern void nfs_unlock_request(struct nfs_page *req);
 extern void nfs_unlock_and_release_request(struct nfs_page *);
-extern int nfs_page_group_lock(struct nfs_page *, bool);
-extern void nfs_page_group_lock_wait(struct nfs_page *);
+extern int nfs_page_group_lock(struct nfs_page *);
 extern void nfs_page_group_unlock(struct nfs_page *);
 extern bool nfs_page_group_sync_on_bit(struct nfs_page *, unsigned int);
 extern bool nfs_async_iocounter_wait(struct rpc_task *, struct nfs_lock_context *);
index 62cbcb842f99c2cbda7121bcb5a4ebc5e818a3bc..164d5359d4ab0099772cfc21f5c257f2a9d6211d 100644 (file)
@@ -1476,7 +1476,7 @@ struct nfs_pgio_header {
 
 struct nfs_mds_commit_info {
        atomic_t rpcs_out;
-       unsigned long           ncommit;
+       atomic_long_t           ncommit;
        struct list_head        list;
 };
 
index 50a99a117da7fc860bf2ab7c389e3214656a2a83..c1768f9d993bf25346dcd67215897278172e7b05 100644 (file)
@@ -139,6 +139,8 @@ struct rpc_task_setup {
 #define RPC_TASK_RUNNING       0
 #define RPC_TASK_QUEUED                1
 #define RPC_TASK_ACTIVE                2
+#define RPC_TASK_MSG_RECV      3
+#define RPC_TASK_MSG_RECV_WAIT 4
 
 #define RPC_IS_RUNNING(t)      test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
 #define rpc_set_running(t)     set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
index 261b48a2701d2ac01ada6bff67d4059a2633a817..86b59e3525a516d4eb0b7c80b9324ed50597c642 100644 (file)
@@ -239,6 +239,19 @@ extern unsigned int xdr_read_pages(struct xdr_stream *xdr, unsigned int len);
 extern void xdr_enter_page(struct xdr_stream *xdr, unsigned int len);
 extern int xdr_process_buf(struct xdr_buf *buf, unsigned int offset, unsigned int len, int (*actor)(struct scatterlist *, void *), void *data);
 
+/**
+ * xdr_stream_remaining - Return the number of bytes remaining in the stream
+ * @xdr: pointer to struct xdr_stream
+ *
+ * Return value:
+ *   Number of bytes remaining in @xdr before xdr->end
+ */
+static inline size_t
+xdr_stream_remaining(const struct xdr_stream *xdr)
+{
+       return xdr->nwords << 2;
+}
+
 ssize_t xdr_stream_decode_string_dup(struct xdr_stream *xdr, char **str,
                size_t maxlen, gfp_t gfp_flags);
 /**
index eab1c749e192b5ba37b050d5fb5592e825d61682..5a7bff41f6b70b3e60908a98f3049ec92ff5a390 100644 (file)
@@ -174,7 +174,7 @@ enum xprt_transports {
 
 struct rpc_xprt {
        struct kref             kref;           /* Reference count */
-       struct rpc_xprt_ops *   ops;            /* transport methods */
+       const struct rpc_xprt_ops *ops;         /* transport methods */
 
        const struct rpc_timeout *timeout;      /* timeout parms */
        struct sockaddr_storage addr;           /* server address */
@@ -232,6 +232,7 @@ struct rpc_xprt {
         */
        spinlock_t              transport_lock; /* lock transport info */
        spinlock_t              reserve_lock;   /* lock slot table */
+       spinlock_t              recv_lock;      /* lock receive list */
        u32                     xid;            /* Next XID value to use */
        struct rpc_task *       snd_task;       /* Task blocked in send */
        struct svc_xprt         *bc_xprt;       /* NFSv4.1 backchannel */
@@ -372,6 +373,8 @@ void                        xprt_write_space(struct rpc_xprt *xprt);
 void                   xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result);
 struct rpc_rqst *      xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid);
 void                   xprt_complete_rqst(struct rpc_task *task, int copied);
+void                   xprt_pin_rqst(struct rpc_rqst *req);
+void                   xprt_unpin_rqst(struct rpc_rqst *req);
 void                   xprt_release_rqst_cong(struct rpc_task *task);
 void                   xprt_disconnect_done(struct rpc_xprt *xprt);
 void                   xprt_force_disconnect(struct rpc_xprt *xprt);
index ac701c28f44f376195f27507ba1c61698d0f9b65..c2c68a15b59de07c9837599d959174b55ae353ba 100644 (file)
@@ -171,10 +171,10 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
        /*
         * Add the temporary list to the backchannel preallocation list
         */
-       spin_lock_bh(&xprt->bc_pa_lock);
+       spin_lock(&xprt->bc_pa_lock);
        list_splice(&tmp_list, &xprt->bc_pa_list);
        xprt_inc_alloc_count(xprt, min_reqs);
-       spin_unlock_bh(&xprt->bc_pa_lock);
+       spin_unlock(&xprt->bc_pa_lock);
 
        dprintk("RPC:       setup backchannel transport done\n");
        return 0;
index 2e49d1f892b7911ba051a4a433afac9c4266a8b3..2ad827db270422ed7f2c84aae374c234f445e6ab 100644 (file)
@@ -1903,6 +1903,14 @@ call_connect_status(struct rpc_task *task)
        task->tk_status = 0;
        switch (status) {
        case -ECONNREFUSED:
+               /* A positive refusal suggests a rebind is needed. */
+               if (RPC_IS_SOFTCONN(task))
+                       break;
+               if (clnt->cl_autobind) {
+                       rpc_force_rebind(clnt);
+                       task->tk_action = call_bind;
+                       return;
+               }
        case -ECONNRESET:
        case -ECONNABORTED:
        case -ENETUNREACH:
@@ -2139,10 +2147,6 @@ call_status(struct rpc_task *task)
                rpc_delay(task, 3*HZ);
        case -ETIMEDOUT:
                task->tk_action = call_timeout;
-               if (!(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
-                   && task->tk_client->cl_discrtry)
-                       xprt_conditional_disconnect(req->rq_xprt,
-                                       req->rq_connect_cookie);
                break;
        case -ECONNREFUSED:
        case -ECONNRESET:
index 399fab5d19365e59a0c4526edab2c4ccb42545df..ff8e06cd067e975eb87b55a5ff2485fd90299a0a 100644 (file)
@@ -1013,7 +1013,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
 
        if (!bc_xprt)
                return -EAGAIN;
-       spin_lock_bh(&bc_xprt->transport_lock);
+       spin_lock(&bc_xprt->recv_lock);
        req = xprt_lookup_rqst(bc_xprt, xid);
        if (!req)
                goto unlock_notfound;
@@ -1031,7 +1031,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
        memcpy(dst->iov_base, src->iov_base, src->iov_len);
        xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
        rqstp->rq_arg.len = 0;
-       spin_unlock_bh(&bc_xprt->transport_lock);
+       spin_unlock(&bc_xprt->recv_lock);
        return 0;
 unlock_notfound:
        printk(KERN_NOTICE
@@ -1040,7 +1040,7 @@ unlock_notfound:
                __func__, ntohl(calldir),
                bc_xprt, ntohl(xid));
 unlock_eagain:
-       spin_unlock_bh(&bc_xprt->transport_lock);
+       spin_unlock(&bc_xprt->recv_lock);
        return -EAGAIN;
 }
 
index 4654a99342697e729d1c4e61242a421b72d98cf7..e741ec2b4d8e6ea5d08a8e942e3021e5a20e4c6a 100644 (file)
@@ -844,6 +844,50 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 }
 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 
+/**
+ * xprt_pin_rqst - Pin a request on the transport receive list
+ * @req: Request to pin
+ *
+ * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
+ * so should be holding the xprt transport lock.
+ */
+void xprt_pin_rqst(struct rpc_rqst *req)
+{
+       set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
+}
+EXPORT_SYMBOL_GPL(xprt_pin_rqst);
+
+/**
+ * xprt_unpin_rqst - Unpin a request on the transport receive list
+ * @req: Request to pin
+ *
+ * Caller should be holding the xprt transport lock.
+ */
+void xprt_unpin_rqst(struct rpc_rqst *req)
+{
+       struct rpc_task *task = req->rq_task;
+
+       clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
+       if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
+               wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
+}
+EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
+
+static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
+__must_hold(&req->rq_xprt->recv_lock)
+{
+       struct rpc_task *task = req->rq_task;
+       
+       if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
+               spin_unlock(&req->rq_xprt->recv_lock);
+               set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
+               wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
+                               TASK_UNINTERRUPTIBLE);
+               clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
+               spin_lock(&req->rq_xprt->recv_lock);
+       }
+}
+
 static void xprt_update_rtt(struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
@@ -966,13 +1010,13 @@ void xprt_transmit(struct rpc_task *task)
                        /*
                         * Add to the list only if we're expecting a reply
                         */
-                       spin_lock_bh(&xprt->transport_lock);
                        /* Update the softirq receive buffer */
                        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
                                        sizeof(req->rq_private_buf));
                        /* Add request to the receive list */
+                       spin_lock(&xprt->recv_lock);
                        list_add_tail(&req->rq_list, &xprt->recv);
-                       spin_unlock_bh(&xprt->transport_lock);
+                       spin_unlock(&xprt->recv_lock);
                        xprt_reset_majortimeo(req);
                        /* Turn off autodisconnect */
                        del_singleshot_timer_sync(&xprt->timer);
@@ -1287,12 +1331,16 @@ void xprt_release(struct rpc_task *task)
                task->tk_ops->rpc_count_stats(task, task->tk_calldata);
        else if (task->tk_client)
                rpc_count_iostats(task, task->tk_client->cl_metrics);
+       spin_lock(&xprt->recv_lock);
+       if (!list_empty(&req->rq_list)) {
+               list_del(&req->rq_list);
+               xprt_wait_on_pinned_rqst(req);
+       }
+       spin_unlock(&xprt->recv_lock);
        spin_lock_bh(&xprt->transport_lock);
        xprt->ops->release_xprt(xprt, task);
        if (xprt->ops->release_request)
                xprt->ops->release_request(task);
-       if (!list_empty(&req->rq_list))
-               list_del(&req->rq_list);
        xprt->last_used = jiffies;
        xprt_schedule_autodisconnect(xprt);
        spin_unlock_bh(&xprt->transport_lock);
@@ -1318,6 +1366,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 
        spin_lock_init(&xprt->transport_lock);
        spin_lock_init(&xprt->reserve_lock);
+       spin_lock_init(&xprt->recv_lock);
 
        INIT_LIST_HEAD(&xprt->free);
        INIT_LIST_HEAD(&xprt->recv);
index 03f6b5840764dcc2c486015edd8dc7de4cd84b26..d31d0ac5ada9a6a08fe6760a3b5e2cb4eaa552e9 100644 (file)
@@ -49,6 +49,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
        if (IS_ERR(rb))
                goto out_fail;
        req->rl_rdmabuf = rb;
+       xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
 
        size = r_xprt->rx_data.inline_rsize;
        rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
@@ -202,20 +203,24 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
  */
 int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 {
-       struct rpc_xprt *xprt = rqst->rq_xprt;
-       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-       struct rpcrdma_msg *headerp;
-
-       headerp = rdmab_to_msg(req->rl_rdmabuf);
-       headerp->rm_xid = rqst->rq_xid;
-       headerp->rm_vers = rpcrdma_version;
-       headerp->rm_credit =
-                       cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
-       headerp->rm_type = rdma_msg;
-       headerp->rm_body.rm_chunks[0] = xdr_zero;
-       headerp->rm_body.rm_chunks[1] = xdr_zero;
-       headerp->rm_body.rm_chunks[2] = xdr_zero;
+       __be32 *p;
+
+       rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
+       xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
+                       req->rl_rdmabuf->rg_base);
+
+       p = xdr_reserve_space(&req->rl_stream, 28);
+       if (unlikely(!p))
+               return -EIO;
+       *p++ = rqst->rq_xid;
+       *p++ = rpcrdma_version;
+       *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
+       *p++ = rdma_msg;
+       *p++ = xdr_zero;
+       *p++ = xdr_zero;
+       *p = xdr_zero;
 
        if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
                                       &rqst->rq_snd_buf, rpcrdma_noch))
@@ -271,9 +276,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
  * @xprt: transport receiving the call
  * @rep: receive buffer containing the call
  *
- * Called in the RPC reply handler, which runs in a tasklet.
- * Be quick about it.
- *
  * Operational assumptions:
  *    o Backchannel credits are ignored, just as the NFS server
  *      forechannel currently does
@@ -284,7 +286,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
                             struct rpcrdma_rep *rep)
 {
        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-       struct rpcrdma_msg *headerp;
        struct svc_serv *bc_serv;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
@@ -292,24 +293,15 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
        size_t size;
        __be32 *p;
 
-       headerp = rdmab_to_msg(rep->rr_rdmabuf);
+       p = xdr_inline_decode(&rep->rr_stream, 0);
+       size = xdr_stream_remaining(&rep->rr_stream);
+
 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
        pr_info("RPC:       %s: callback XID %08x, length=%u\n",
-               __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
-       pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
+               __func__, be32_to_cpup(p), size);
+       pr_info("RPC:       %s: %*ph\n", __func__, size, p);
 #endif
 
-       /* Sanity check:
-        * Need at least enough bytes for RPC/RDMA header, as code
-        * here references the header fields by array offset. Also,
-        * backward calls are always inline, so ensure there
-        * are some bytes beyond the RPC/RDMA header.
-        */
-       if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
-               goto out_short;
-       p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
-       size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
-
        /* Grab a free bc rqst */
        spin_lock(&xprt->bc_pa_lock);
        if (list_empty(&xprt->bc_pa_list)) {
@@ -325,7 +317,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
        /* Prepare rqst */
        rqst->rq_reply_bytes_recvd = 0;
        rqst->rq_bytes_sent = 0;
-       rqst->rq_xid = headerp->rm_xid;
+       rqst->rq_xid = *p;
 
        rqst->rq_private_buf.len = size;
        set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
@@ -337,9 +329,9 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
        buf->len = size;
 
        /* The receive buffer has to be hooked to the rpcrdma_req
-        * so that it can be reposted after the server is done
-        * parsing it but just before sending the backward
-        * direction reply.
+        * so that it is not released while the req is pointing
+        * to its buffer, and so that it can be reposted after
+        * the Upper Layer is done decoding it.
         */
        req = rpcr_to_rdmar(rqst);
        dprintk("RPC:       %s: attaching rep %p to req %p\n",
@@ -367,13 +359,4 @@ out_overflow:
         * when the connection is re-established.
         */
        return;
-
-out_short:
-       pr_warn("RPC/RDMA short backward direction call\n");
-
-       if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
-               xprt_disconnect_done(xprt);
-       else
-               pr_warn("RPC:       %s: reposting rep %p\n",
-                       __func__, rep);
 }
index d3f84bb1d44352b55b757dfe438f1cc830828767..6c7151341194635919e31b812223732c4436ba40 100644 (file)
@@ -177,7 +177,7 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 /* Use the ib_map_phys_fmr() verb to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
-static int
+static struct rpcrdma_mr_seg *
 fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
           int nsegs, bool writing, struct rpcrdma_mw **out)
 {
@@ -188,7 +188,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
        mw = rpcrdma_get_mw(r_xprt);
        if (!mw)
-               return -ENOBUFS;
+               return ERR_PTR(-ENOBUFS);
 
        pageoff = offset_in_page(seg1->mr_offset);
        seg1->mr_offset -= pageoff;     /* start of page */
@@ -232,13 +232,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
        mw->mw_offset = dma_pages[0] + pageoff;
 
        *out = mw;
-       return mw->mw_nents;
+       return seg;
 
 out_dmamap_err:
        pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
               mw->mw_sg, i);
        rpcrdma_put_mw(r_xprt, mw);
-       return -EIO;
+       return ERR_PTR(-EIO);
 
 out_maperr:
        pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
@@ -247,7 +247,7 @@ out_maperr:
        ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
                        mw->mw_sg, mw->mw_nents, mw->mw_dir);
        rpcrdma_put_mw(r_xprt, mw);
-       return -EIO;
+       return ERR_PTR(-EIO);
 }
 
 /* Invalidate all memory regions that were registered for "req".
index 6aea36a38bfdcbf95bd617089221060138e5ecaa..5a936a6a31a3245cc6ab0f6e9804d268bc198261 100644 (file)
@@ -344,7 +344,7 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 /* Post a REG_MR Work Request to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
-static int
+static struct rpcrdma_mr_seg *
 frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
            int nsegs, bool writing, struct rpcrdma_mw **out)
 {
@@ -364,7 +364,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
                        rpcrdma_defer_mr_recovery(mw);
                mw = rpcrdma_get_mw(r_xprt);
                if (!mw)
-                       return -ENOBUFS;
+                       return ERR_PTR(-ENOBUFS);
        } while (mw->frmr.fr_state != FRMR_IS_INVALID);
        frmr = &mw->frmr;
        frmr->fr_state = FRMR_IS_VALID;
@@ -429,25 +429,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
        mw->mw_offset = mr->iova;
 
        *out = mw;
-       return mw->mw_nents;
+       return seg;
 
 out_dmamap_err:
        pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
               mw->mw_sg, i);
        frmr->fr_state = FRMR_IS_INVALID;
        rpcrdma_put_mw(r_xprt, mw);
-       return -EIO;
+       return ERR_PTR(-EIO);
 
 out_mapmr_err:
        pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
               frmr->fr_mr, n, mw->mw_nents);
        rpcrdma_defer_mr_recovery(mw);
-       return -EIO;
+       return ERR_PTR(-EIO);
 
 out_senderr:
        pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc);
        rpcrdma_defer_mr_recovery(mw);
-       return -ENOTCONN;
+       return ERR_PTR(-ENOTCONN);
 }
 
 /* Invalidate all memory regions that were registered for "req".
index ca4d6e4528f32ffa4f567f1b8c05872288d64650..f1889f4d48030f3a0a07eef6fa524bbc12580216 100644 (file)
@@ -169,40 +169,41 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
        return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
 }
 
-/* Split "vec" on page boundaries into segments. FMR registers pages,
- * not a byte range. Other modes coalesce these segments into a single
- * MR when they can.
+/* Split @vec on page boundaries into SGEs. FMR registers pages, not
+ * a byte range. Other modes coalesce these SGEs into a single MR
+ * when they can.
+ *
+ * Returns pointer to next available SGE, and bumps the total number
+ * of SGEs consumed.
  */
-static int
-rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
+static struct rpcrdma_mr_seg *
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
+                    unsigned int *n)
 {
-       size_t page_offset;
-       u32 remaining;
+       u32 remaining, page_offset;
        char *base;
 
        base = vec->iov_base;
        page_offset = offset_in_page(base);
        remaining = vec->iov_len;
-       while (remaining && n < RPCRDMA_MAX_SEGS) {
-               seg[n].mr_page = NULL;
-               seg[n].mr_offset = base;
-               seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
-               remaining -= seg[n].mr_len;
-               base += seg[n].mr_len;
-               ++n;
+       while (remaining) {
+               seg->mr_page = NULL;
+               seg->mr_offset = base;
+               seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
+               remaining -= seg->mr_len;
+               base += seg->mr_len;
+               ++seg;
+               ++(*n);
                page_offset = 0;
        }
-       return n;
+       return seg;
 }
 
-/*
- * Chunk assembly from upper layer xdr_buf.
- *
- * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
- * elements. Segments are then coalesced when registered, if possible
- * within the selected memreg mode.
+/* Convert @xdrbuf into SGEs no larger than a page each. As they
+ * are registered, these SGEs are then coalesced into RDMA segments
+ * when the selected memreg mode supports it.
  *
- * Returns positive number of segments converted, or a negative errno.
+ * Returns positive number of SGEs consumed, or a negative errno.
  */
 
 static int
@@ -210,47 +211,41 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
                     unsigned int pos, enum rpcrdma_chunktype type,
                     struct rpcrdma_mr_seg *seg)
 {
-       int len, n, p, page_base;
+       unsigned long page_base;
+       unsigned int len, n;
        struct page **ppages;
 
        n = 0;
-       if (pos == 0) {
-               n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n);
-               if (n == RPCRDMA_MAX_SEGS)
-                       goto out_overflow;
-       }
+       if (pos == 0)
+               seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
 
        len = xdrbuf->page_len;
        ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
        page_base = offset_in_page(xdrbuf->page_base);
-       p = 0;
-       while (len && n < RPCRDMA_MAX_SEGS) {
-               if (!ppages[p]) {
-                       /* alloc the pagelist for receiving buffer */
-                       ppages[p] = alloc_page(GFP_ATOMIC);
-                       if (!ppages[p])
+       while (len) {
+               if (unlikely(!*ppages)) {
+                       /* XXX: Certain upper layer operations do
+                        *      not provide receive buffer pages.
+                        */
+                       *ppages = alloc_page(GFP_ATOMIC);
+                       if (!*ppages)
                                return -EAGAIN;
                }
-               seg[n].mr_page = ppages[p];
-               seg[n].mr_offset = (void *)(unsigned long) page_base;
-               seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
-               if (seg[n].mr_len > PAGE_SIZE)
-                       goto out_overflow;
-               len -= seg[n].mr_len;
+               seg->mr_page = *ppages;
+               seg->mr_offset = (char *)page_base;
+               seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
+               len -= seg->mr_len;
+               ++ppages;
+               ++seg;
                ++n;
-               ++p;
-               page_base = 0;  /* page offset only applies to first page */
+               page_base = 0;
        }
 
-       /* Message overflows the seg array */
-       if (len && n == RPCRDMA_MAX_SEGS)
-               goto out_overflow;
-
        /* When encoding a Read chunk, the tail iovec contains an
         * XDR pad and may be omitted.
         */
        if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
-               return n;
+               goto out;
 
        /* When encoding a Write chunk, some servers need to see an
         * extra segment for non-XDR-aligned Write chunks. The upper
@@ -258,30 +253,81 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
         * for this purpose.
         */
        if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
-               return n;
+               goto out;
 
-       if (xdrbuf->tail[0].iov_len) {
-               n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n);
-               if (n == RPCRDMA_MAX_SEGS)
-                       goto out_overflow;
-       }
+       if (xdrbuf->tail[0].iov_len)
+               seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
 
+out:
+       if (unlikely(n > RPCRDMA_MAX_SEGS))
+               return -EIO;
        return n;
+}
 
-out_overflow:
-       pr_err("rpcrdma: segment array overflow\n");
-       return -EIO;
+static inline int
+encode_item_present(struct xdr_stream *xdr)
+{
+       __be32 *p;
+
+       p = xdr_reserve_space(xdr, sizeof(*p));
+       if (unlikely(!p))
+               return -EMSGSIZE;
+
+       *p = xdr_one;
+       return 0;
 }
 
-static inline __be32 *
+static inline int
+encode_item_not_present(struct xdr_stream *xdr)
+{
+       __be32 *p;
+
+       p = xdr_reserve_space(xdr, sizeof(*p));
+       if (unlikely(!p))
+               return -EMSGSIZE;
+
+       *p = xdr_zero;
+       return 0;
+}
+
+static void
 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
 {
        *iptr++ = cpu_to_be32(mw->mw_handle);
        *iptr++ = cpu_to_be32(mw->mw_length);
-       return xdr_encode_hyper(iptr, mw->mw_offset);
+       xdr_encode_hyper(iptr, mw->mw_offset);
 }
 
-/* XDR-encode the Read list. Supports encoding a list of read
+static int
+encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw)
+{
+       __be32 *p;
+
+       p = xdr_reserve_space(xdr, 4 * sizeof(*p));
+       if (unlikely(!p))
+               return -EMSGSIZE;
+
+       xdr_encode_rdma_segment(p, mw);
+       return 0;
+}
+
+static int
+encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw,
+                   u32 position)
+{
+       __be32 *p;
+
+       p = xdr_reserve_space(xdr, 6 * sizeof(*p));
+       if (unlikely(!p))
+               return -EMSGSIZE;
+
+       *p++ = xdr_one;                 /* Item present */
+       *p++ = cpu_to_be32(position);
+       xdr_encode_rdma_segment(p, mw);
+       return 0;
+}
+
+/* Register and XDR encode the Read list. Supports encoding a list of read
  * segments that belong to a single read chunk.
  *
  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
@@ -290,23 +336,20 @@ xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
  *   N elements, position P (same P for all chunks of same arg!):
  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
  *
- * Returns a pointer to the XDR word in the RDMA header following
- * the end of the Read list, or an error pointer.
+ * Returns zero on success, or a negative errno if a failure occurred.
+ * @xdr is advanced to the next position in the stream.
+ *
+ * Only a single @pos value is currently supported.
  */
-static __be32 *
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
-                        struct rpcrdma_req *req, struct rpc_rqst *rqst,
-                        __be32 *iptr, enum rpcrdma_chunktype rtype)
+static noinline int
+rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+                        struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
 {
+       struct xdr_stream *xdr = &req->rl_stream;
        struct rpcrdma_mr_seg *seg;
        struct rpcrdma_mw *mw;
        unsigned int pos;
-       int n, nsegs;
-
-       if (rtype == rpcrdma_noch) {
-               *iptr++ = xdr_zero;     /* item not present */
-               return iptr;
-       }
+       int nsegs;
 
        pos = rqst->rq_snd_buf.head[0].iov_len;
        if (rtype == rpcrdma_areadch)
@@ -315,40 +358,33 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
                                     rtype, seg);
        if (nsegs < 0)
-               return ERR_PTR(nsegs);
+               return nsegs;
 
        do {
-               n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-                                                false, &mw);
-               if (n < 0)
-                       return ERR_PTR(n);
+               seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+                                                  false, &mw);
+               if (IS_ERR(seg))
+                       return PTR_ERR(seg);
                rpcrdma_push_mw(mw, &req->rl_registered);
 
-               *iptr++ = xdr_one;      /* item present */
-
-               /* All read segments in this chunk
-                * have the same "position".
-                */
-               *iptr++ = cpu_to_be32(pos);
-               iptr = xdr_encode_rdma_segment(iptr, mw);
+               if (encode_read_segment(xdr, mw, pos) < 0)
+                       return -EMSGSIZE;
 
                dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
                        rqst->rq_task->tk_pid, __func__, pos,
                        mw->mw_length, (unsigned long long)mw->mw_offset,
-                       mw->mw_handle, n < nsegs ? "more" : "last");
+                       mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
 
                r_xprt->rx_stats.read_chunk_count++;
-               seg += n;
-               nsegs -= n;
+               nsegs -= mw->mw_nents;
        } while (nsegs);
 
-       /* Finish Read list */
-       *iptr++ = xdr_zero;     /* Next item not present */
-       return iptr;
+       return 0;
 }
 
-/* XDR-encode the Write list. Supports encoding a list containing
- * one array of plain segments that belong to a single write chunk.
+/* Register and XDR encode the Write list. Supports encoding a list
+ * containing one array of plain segments that belong to a single
+ * write chunk.
  *
  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  *
@@ -356,66 +392,65 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO - 0
  *
- * Returns a pointer to the XDR word in the RDMA header following
- * the end of the Write list, or an error pointer.
+ * Returns zero on success, or a negative errno if a failure occurred.
+ * @xdr is advanced to the next position in the stream.
+ *
+ * Only a single Write chunk is currently supported.
  */
-static __be32 *
+static noinline int
 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-                         struct rpc_rqst *rqst, __be32 *iptr,
-                         enum rpcrdma_chunktype wtype)
+                         struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
 {
+       struct xdr_stream *xdr = &req->rl_stream;
        struct rpcrdma_mr_seg *seg;
        struct rpcrdma_mw *mw;
-       int n, nsegs, nchunks;
+       int nsegs, nchunks;
        __be32 *segcount;
 
-       if (wtype != rpcrdma_writech) {
-               *iptr++ = xdr_zero;     /* no Write list present */
-               return iptr;
-       }
-
        seg = req->rl_segments;
        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
                                     rqst->rq_rcv_buf.head[0].iov_len,
                                     wtype, seg);
        if (nsegs < 0)
-               return ERR_PTR(nsegs);
+               return nsegs;
 
-       *iptr++ = xdr_one;      /* Write list present */
-       segcount = iptr++;      /* save location of segment count */
+       if (encode_item_present(xdr) < 0)
+               return -EMSGSIZE;
+       segcount = xdr_reserve_space(xdr, sizeof(*segcount));
+       if (unlikely(!segcount))
+               return -EMSGSIZE;
+       /* Actual value encoded below */
 
        nchunks = 0;
        do {
-               n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-                                                true, &mw);
-               if (n < 0)
-                       return ERR_PTR(n);
+               seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+                                                  true, &mw);
+               if (IS_ERR(seg))
+                       return PTR_ERR(seg);
                rpcrdma_push_mw(mw, &req->rl_registered);
 
-               iptr = xdr_encode_rdma_segment(iptr, mw);
+               if (encode_rdma_segment(xdr, mw) < 0)
+                       return -EMSGSIZE;
 
                dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
                        rqst->rq_task->tk_pid, __func__,
                        mw->mw_length, (unsigned long long)mw->mw_offset,
-                       mw->mw_handle, n < nsegs ? "more" : "last");
+                       mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
 
                r_xprt->rx_stats.write_chunk_count++;
                r_xprt->rx_stats.total_rdma_request += seg->mr_len;
                nchunks++;
-               seg   += n;
-               nsegs -= n;
+               nsegs -= mw->mw_nents;
        } while (nsegs);
 
        /* Update count of segments in this Write chunk */
        *segcount = cpu_to_be32(nchunks);
 
-       /* Finish Write list */
-       *iptr++ = xdr_zero;     /* Next item not present */
-       return iptr;
+       return 0;
 }
 
-/* XDR-encode the Reply chunk. Supports encoding an array of plain
- * segments that belong to a single write (reply) chunk.
+/* Register and XDR encode the Reply chunk. Supports encoding an array
+ * of plain segments that belong to a single write (reply) chunk.
  *
  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  *
@@ -423,58 +458,57 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO
  *
- * Returns a pointer to the XDR word in the RDMA header following
- * the end of the Reply chunk, or an error pointer.
+ * Returns zero on success, or a negative errno if a failure occurred.
+ * @xdr is advanced to the next position in the stream.
  */
-static __be32 *
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
-                          struct rpcrdma_req *req, struct rpc_rqst *rqst,
-                          __be32 *iptr, enum rpcrdma_chunktype wtype)
+static noinline int
+rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+                          struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
 {
+       struct xdr_stream *xdr = &req->rl_stream;
        struct rpcrdma_mr_seg *seg;
        struct rpcrdma_mw *mw;
-       int n, nsegs, nchunks;
+       int nsegs, nchunks;
        __be32 *segcount;
 
-       if (wtype != rpcrdma_replych) {
-               *iptr++ = xdr_zero;     /* no Reply chunk present */
-               return iptr;
-       }
-
        seg = req->rl_segments;
        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
        if (nsegs < 0)
-               return ERR_PTR(nsegs);
+               return nsegs;
 
-       *iptr++ = xdr_one;      /* Reply chunk present */
-       segcount = iptr++;      /* save location of segment count */
+       if (encode_item_present(xdr) < 0)
+               return -EMSGSIZE;
+       segcount = xdr_reserve_space(xdr, sizeof(*segcount));
+       if (unlikely(!segcount))
+               return -EMSGSIZE;
+       /* Actual value encoded below */
 
        nchunks = 0;
        do {
-               n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-                                                true, &mw);
-               if (n < 0)
-                       return ERR_PTR(n);
+               seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+                                                  true, &mw);
+               if (IS_ERR(seg))
+                       return PTR_ERR(seg);
                rpcrdma_push_mw(mw, &req->rl_registered);
 
-               iptr = xdr_encode_rdma_segment(iptr, mw);
+               if (encode_rdma_segment(xdr, mw) < 0)
+                       return -EMSGSIZE;
 
                dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
                        rqst->rq_task->tk_pid, __func__,
                        mw->mw_length, (unsigned long long)mw->mw_offset,
-                       mw->mw_handle, n < nsegs ? "more" : "last");
+                       mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
 
                r_xprt->rx_stats.reply_chunk_count++;
                r_xprt->rx_stats.total_rdma_request += seg->mr_len;
                nchunks++;
-               seg   += n;
-               nsegs -= n;
+               nsegs -= mw->mw_nents;
        } while (nsegs);
 
        /* Update count of segments in the Reply chunk */
        *segcount = cpu_to_be32(nchunks);
 
-       return iptr;
+       return 0;
 }
 
 /* Prepare the RPC-over-RDMA header SGE.
@@ -651,37 +685,52 @@ rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
        req->rl_mapped_sges = 0;
 }
 
-/*
- * Marshal a request: the primary job of this routine is to choose
- * the transfer modes. See comments below.
+/**
+ * rpcrdma_marshal_req - Marshal and send one RPC request
+ * @r_xprt: controlling transport
+ * @rqst: RPC request to be marshaled
+ *
+ * For the RPC in "rqst", this function:
+ *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
+ *  - Registers Read, Write, and Reply chunks
+ *  - Constructs the transport header
+ *  - Posts a Send WR to send the transport header and request
  *
- * Returns zero on success, otherwise a negative errno.
+ * Returns:
+ *     %0 if the RPC was sent successfully,
+ *     %-ENOTCONN if the connection was lost,
+ *     %-EAGAIN if not enough pages are available for on-demand reply buffer,
+ *     %-ENOBUFS if no MRs are available to register chunks,
+ *     %-EMSGSIZE if the transport header is too small,
+ *     %-EIO if a permanent problem occurred while marshaling.
  */
-
 int
-rpcrdma_marshal_req(struct rpc_rqst *rqst)
+rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
 {
-       struct rpc_xprt *xprt = rqst->rq_xprt;
-       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+       struct xdr_stream *xdr = &req->rl_stream;
        enum rpcrdma_chunktype rtype, wtype;
-       struct rpcrdma_msg *headerp;
        bool ddp_allowed;
-       ssize_t hdrlen;
-       size_t rpclen;
-       __be32 *iptr;
+       __be32 *p;
+       int ret;
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
        if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
                return rpcrdma_bc_marshal_reply(rqst);
 #endif
 
-       headerp = rdmab_to_msg(req->rl_rdmabuf);
-       /* don't byte-swap XID, it's already done in request */
-       headerp->rm_xid = rqst->rq_xid;
-       headerp->rm_vers = rpcrdma_version;
-       headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
-       headerp->rm_type = rdma_msg;
+       rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
+       xdr_init_encode(xdr, &req->rl_hdrbuf,
+                       req->rl_rdmabuf->rg_base);
+
+       /* Fixed header fields */
+       ret = -EMSGSIZE;
+       p = xdr_reserve_space(xdr, 4 * sizeof(*p));
+       if (!p)
+               goto out_err;
+       *p++ = rqst->rq_xid;
+       *p++ = rpcrdma_version;
+       *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
 
        /* When the ULP employs a GSS flavor that guarantees integrity
         * or privacy, direct data placement of individual data items
@@ -721,22 +770,17 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
         * by themselves are larger than the inline threshold.
         */
        if (rpcrdma_args_inline(r_xprt, rqst)) {
+               *p++ = rdma_msg;
                rtype = rpcrdma_noch;
-               rpclen = rqst->rq_snd_buf.len;
        } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
+               *p++ = rdma_msg;
                rtype = rpcrdma_readch;
-               rpclen = rqst->rq_snd_buf.head[0].iov_len +
-                        rqst->rq_snd_buf.tail[0].iov_len;
        } else {
                r_xprt->rx_stats.nomsg_call_count++;
-               headerp->rm_type = htonl(RDMA_NOMSG);
+               *p++ = rdma_nomsg;
                rtype = rpcrdma_areadch;
-               rpclen = 0;
        }
 
-       req->rl_xid = rqst->rq_xid;
-       rpcrdma_insert_req(&r_xprt->rx_buf, req);
-
        /* This implementation supports the following combinations
         * of chunk lists in one RPC-over-RDMA Call message:
         *
@@ -759,79 +803,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
         * send a Call message with a Position Zero Read chunk and a
         * regular Read chunk at the same time.
         */
-       iptr = headerp->rm_body.rm_chunks;
-       iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
-       if (IS_ERR(iptr))
+       if (rtype != rpcrdma_noch) {
+               ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
+               if (ret)
+                       goto out_err;
+       }
+       ret = encode_item_not_present(xdr);
+       if (ret)
                goto out_err;
-       iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
-       if (IS_ERR(iptr))
+
+       if (wtype == rpcrdma_writech) {
+               ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
+               if (ret)
+                       goto out_err;
+       }
+       ret = encode_item_not_present(xdr);
+       if (ret)
                goto out_err;
-       iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
-       if (IS_ERR(iptr))
+
+       if (wtype != rpcrdma_replych)
+               ret = encode_item_not_present(xdr);
+       else
+               ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
+       if (ret)
                goto out_err;
-       hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
 
-       dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
+       dprintk("RPC: %5u %s: %s/%s: hdrlen %u rpclen\n",
                rqst->rq_task->tk_pid, __func__,
                transfertypes[rtype], transfertypes[wtype],
-               hdrlen, rpclen);
+               xdr_stream_pos(xdr));
 
-       if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen,
+       if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req,
+                                      xdr_stream_pos(xdr),
                                       &rqst->rq_snd_buf, rtype)) {
-               iptr = ERR_PTR(-EIO);
+               ret = -EIO;
                goto out_err;
        }
        return 0;
 
 out_err:
-       if (PTR_ERR(iptr) != -ENOBUFS) {
-               pr_err("rpcrdma: rpcrdma_marshal_req failed, status %ld\n",
-                      PTR_ERR(iptr));
+       if (ret != -ENOBUFS) {
+               pr_err("rpcrdma: header marshaling failed (%d)\n", ret);
                r_xprt->rx_stats.failed_marshal_count++;
        }
-       return PTR_ERR(iptr);
-}
-
-/*
- * Chase down a received write or reply chunklist to get length
- * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
- */
-static int
-rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
-{
-       unsigned int i, total_len;
-       struct rpcrdma_write_chunk *cur_wchunk;
-       char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
-
-       i = be32_to_cpu(**iptrp);
-       cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
-       total_len = 0;
-       while (i--) {
-               struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
-               ifdebug(FACILITY) {
-                       u64 off;
-                       xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
-                       dprintk("RPC:       %s: chunk %d@0x%016llx:0x%08x\n",
-                               __func__,
-                               be32_to_cpu(seg->rs_length),
-                               (unsigned long long)off,
-                               be32_to_cpu(seg->rs_handle));
-               }
-               total_len += be32_to_cpu(seg->rs_length);
-               ++cur_wchunk;
-       }
-       /* check and adjust for properly terminated write chunk */
-       if (wrchunk) {
-               __be32 *w = (__be32 *) cur_wchunk;
-               if (*w++ != xdr_zero)
-                       return -1;
-               cur_wchunk = (struct rpcrdma_write_chunk *) w;
-       }
-       if ((char *)cur_wchunk > base + rep->rr_len)
-               return -1;
-
-       *iptrp = (__be32 *) cur_wchunk;
-       return total_len;
+       return ret;
 }
 
 /**
@@ -949,37 +964,254 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws,
                }
 }
 
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /* By convention, backchannel calls arrive via rdma_msg type
  * messages, and never populate the chunk lists. This makes
  * the RPC/RDMA header small and fixed in size, so it is
  * straightforward to check the RPC header's direction field.
  */
 static bool
-rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
+                __be32 xid, __be32 proc)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 {
-       __be32 *p = (__be32 *)headerp;
+       struct xdr_stream *xdr = &rep->rr_stream;
+       __be32 *p;
 
-       if (headerp->rm_type != rdma_msg)
+       if (proc != rdma_msg)
                return false;
-       if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+
+       /* Peek at stream contents without advancing. */
+       p = xdr_inline_decode(xdr, 0);
+
+       /* Chunk lists */
+       if (*p++ != xdr_zero)
                return false;
-       if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+       if (*p++ != xdr_zero)
                return false;
-       if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+       if (*p++ != xdr_zero)
                return false;
 
-       /* sanity */
-       if (p[7] != headerp->rm_xid)
+       /* RPC header */
+       if (*p++ != xid)
                return false;
-       /* call direction */
-       if (p[8] != cpu_to_be32(RPC_CALL))
+       if (*p != cpu_to_be32(RPC_CALL))
                return false;
 
+       /* Now that we are sure this is a backchannel call,
+        * advance to the RPC header.
+        */
+       p = xdr_inline_decode(xdr, 3 * sizeof(*p));
+       if (unlikely(!p))
+               goto out_short;
+
+       rpcrdma_bc_receive_call(r_xprt, rep);
+       return true;
+
+out_short:
+       pr_warn("RPC/RDMA short backward direction call\n");
+       if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
+               xprt_disconnect_done(&r_xprt->rx_xprt);
        return true;
 }
+#else  /* CONFIG_SUNRPC_BACKCHANNEL */
+{
+       return false;
+}
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
+static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
+{
+       __be32 *p;
+
+       p = xdr_inline_decode(xdr, 4 * sizeof(*p));
+       if (unlikely(!p))
+               return -EIO;
+
+       ifdebug(FACILITY) {
+               u64 offset;
+               u32 handle;
+
+               handle = be32_to_cpup(p++);
+               *length = be32_to_cpup(p++);
+               xdr_decode_hyper(p, &offset);
+               dprintk("RPC:       %s:   segment %u@0x%016llx:0x%08x\n",
+                       __func__, *length, (unsigned long long)offset,
+                       handle);
+       } else {
+               *length = be32_to_cpup(p + 1);
+       }
+
+       return 0;
+}
+
+static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
+{
+       u32 segcount, seglength;
+       __be32 *p;
+
+       p = xdr_inline_decode(xdr, sizeof(*p));
+       if (unlikely(!p))
+               return -EIO;
+
+       *length = 0;
+       segcount = be32_to_cpup(p);
+       while (segcount--) {
+               if (decode_rdma_segment(xdr, &seglength))
+                       return -EIO;
+               *length += seglength;
+       }
+
+       dprintk("RPC:       %s: segcount=%u, %u bytes\n",
+               __func__, be32_to_cpup(p), *length);
+       return 0;
+}
+
+/* In RPC-over-RDMA Version One replies, a Read list is never
+ * expected. This decoder is a stub that returns an error if
+ * a Read list is present.
+ */
+static int decode_read_list(struct xdr_stream *xdr)
+{
+       __be32 *p;
+
+       p = xdr_inline_decode(xdr, sizeof(*p));
+       if (unlikely(!p))
+               return -EIO;
+       if (unlikely(*p != xdr_zero))
+               return -EIO;
+       return 0;
+}
+
+/* Supports only one Write chunk in the Write list
+ */
+static int decode_write_list(struct xdr_stream *xdr, u32 *length)
+{
+       u32 chunklen;
+       bool first;
+       __be32 *p;
+
+       *length = 0;
+       first = true;
+       do {
+               p = xdr_inline_decode(xdr, sizeof(*p));
+               if (unlikely(!p))
+                       return -EIO;
+               if (*p == xdr_zero)
+                       break;
+               if (!first)
+                       return -EIO;
+
+               if (decode_write_chunk(xdr, &chunklen))
+                       return -EIO;
+               *length += chunklen;
+               first = false;
+       } while (true);
+       return 0;
+}
+
+static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
+{
+       __be32 *p;
+
+       p = xdr_inline_decode(xdr, sizeof(*p));
+       if (unlikely(!p))
+               return -EIO;
+
+       *length = 0;
+       if (*p != xdr_zero)
+               if (decode_write_chunk(xdr, length))
+                       return -EIO;
+       return 0;
+}
+
+static int
+rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
+                  struct rpc_rqst *rqst)
+{
+       struct xdr_stream *xdr = &rep->rr_stream;
+       u32 writelist, replychunk, rpclen;
+       char *base;
+
+       /* Decode the chunk lists */
+       if (decode_read_list(xdr))
+               return -EIO;
+       if (decode_write_list(xdr, &writelist))
+               return -EIO;
+       if (decode_reply_chunk(xdr, &replychunk))
+               return -EIO;
+
+       /* RDMA_MSG sanity checks */
+       if (unlikely(replychunk))
+               return -EIO;
+
+       /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
+       base = (char *)xdr_inline_decode(xdr, 0);
+       rpclen = xdr_stream_remaining(xdr);
+       r_xprt->rx_stats.fixup_copy_count +=
+               rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
+
+       r_xprt->rx_stats.total_rdma_reply += writelist;
+       return rpclen + xdr_align_size(writelist);
+}
+
+static noinline int
+rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
+{
+       struct xdr_stream *xdr = &rep->rr_stream;
+       u32 writelist, replychunk;
+
+       /* Decode the chunk lists */
+       if (decode_read_list(xdr))
+               return -EIO;
+       if (decode_write_list(xdr, &writelist))
+               return -EIO;
+       if (decode_reply_chunk(xdr, &replychunk))
+               return -EIO;
+
+       /* RDMA_NOMSG sanity checks */
+       if (unlikely(writelist))
+               return -EIO;
+       if (unlikely(!replychunk))
+               return -EIO;
+
+       /* Reply chunk buffer already is the reply vector */
+       r_xprt->rx_stats.total_rdma_reply += replychunk;
+       return replychunk;
+}
+
+static noinline int
+rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
+                    struct rpc_rqst *rqst)
+{
+       struct xdr_stream *xdr = &rep->rr_stream;
+       __be32 *p;
+
+       p = xdr_inline_decode(xdr, sizeof(*p));
+       if (unlikely(!p))
+               return -EIO;
+
+       switch (*p) {
+       case err_vers:
+               p = xdr_inline_decode(xdr, 2 * sizeof(*p));
+               if (!p)
+                       break;
+               dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
+                       rqst->rq_task->tk_pid, __func__,
+                       be32_to_cpup(p), be32_to_cpu(*(p + 1)));
+               break;
+       case err_chunk:
+               dprintk("RPC: %5u: %s: server reports header decoding error\n",
+                       rqst->rq_task->tk_pid, __func__);
+               break;
+       default:
+               dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
+                       rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
+       }
+
+       r_xprt->rx_stats.bad_reply_count++;
+       return -EREMOTEIO;
+}
+
 /* Process received RPC/RDMA messages.
  *
  * Errors must result in the RPC task either being awakened, or
@@ -991,51 +1223,48 @@ rpcrdma_reply_handler(struct work_struct *work)
        struct rpcrdma_rep *rep =
                        container_of(work, struct rpcrdma_rep, rr_work);
        struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-       struct rpcrdma_msg *headerp;
+       struct xdr_stream *xdr = &rep->rr_stream;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
-       __be32 *iptr;
-       int rdmalen, status, rmerr;
+       __be32 *p, xid, vers, proc;
        unsigned long cwnd;
-       struct list_head mws;
+       int status;
 
        dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 
-       if (rep->rr_len == RPCRDMA_BAD_LEN)
+       if (rep->rr_hdrbuf.head[0].iov_len == 0)
                goto out_badstatus;
-       if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
+
+       xdr_init_decode(xdr, &rep->rr_hdrbuf,
+                       rep->rr_hdrbuf.head[0].iov_base);
+
+       /* Fixed transport header fields */
+       p = xdr_inline_decode(xdr, 4 * sizeof(*p));
+       if (unlikely(!p))
                goto out_shortreply;
+       xid = *p++;
+       vers = *p++;
+       p++;    /* credits */
+       proc = *p++;
 
-       headerp = rdmab_to_msg(rep->rr_rdmabuf);
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-       if (rpcrdma_is_bcall(headerp))
-               goto out_bcall;
-#endif
+       if (rpcrdma_is_bcall(r_xprt, rep, xid, proc))
+               return;
 
        /* Match incoming rpcrdma_rep to an rpcrdma_req to
         * get context for handling any incoming chunks.
         */
-       spin_lock(&buf->rb_lock);
-       req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
-                                       headerp->rm_xid);
-       if (!req)
-               goto out_nomatch;
-       if (req->rl_reply)
-               goto out_duplicate;
-
-       list_replace_init(&req->rl_registered, &mws);
-       rpcrdma_mark_remote_invalidation(&mws, rep);
-
-       /* Avoid races with signals and duplicate replies
-        * by marking this req as matched.
-        */
+       spin_lock(&xprt->recv_lock);
+       rqst = xprt_lookup_rqst(xprt, xid);
+       if (!rqst)
+               goto out_norqst;
+       xprt_pin_rqst(rqst);
+       spin_unlock(&xprt->recv_lock);
+       req = rpcr_to_rdmar(rqst);
        req->rl_reply = rep;
-       spin_unlock(&buf->rb_lock);
 
        dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
-               __func__, rep, req, be32_to_cpu(headerp->rm_xid));
+               __func__, rep, req, be32_to_cpu(xid));
 
        /* Invalidate and unmap the data payloads before waking the
         * waiting application. This guarantees the memory regions
@@ -1044,99 +1273,42 @@ rpcrdma_reply_handler(struct work_struct *work)
         * waking the next RPC waits until this RPC has relinquished
         * all its Send Queue entries.
         */
-       if (!list_empty(&mws))
-               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
+       if (!list_empty(&req->rl_registered)) {
+               rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
+               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
+                                                   &req->rl_registered);
+       }
 
-       /* Perform XID lookup, reconstruction of the RPC reply, and
-        * RPC completion while holding the transport lock to ensure
-        * the rep, rqst, and rq_task pointers remain stable.
-        */
-       spin_lock_bh(&xprt->transport_lock);
-       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-       if (!rqst)
-               goto out_norqst;
        xprt->reestablish_timeout = 0;
-       if (headerp->rm_vers != rpcrdma_version)
+       if (vers != rpcrdma_version)
                goto out_badversion;
 
-       /* check for expected message types */
-       /* The order of some of these tests is important. */
-       switch (headerp->rm_type) {
+       switch (proc) {
        case rdma_msg:
-               /* never expect read chunks */
-               /* never expect reply chunks (two ways to check) */
-               if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
-                   (headerp->rm_body.rm_chunks[1] == xdr_zero &&
-                    headerp->rm_body.rm_chunks[2] != xdr_zero))
-                       goto badheader;
-               if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
-                       /* count any expected write chunks in read reply */
-                       /* start at write chunk array count */
-                       iptr = &headerp->rm_body.rm_chunks[2];
-                       rdmalen = rpcrdma_count_chunks(rep, 1, &iptr);
-                       /* check for validity, and no reply chunk after */
-                       if (rdmalen < 0 || *iptr++ != xdr_zero)
-                               goto badheader;
-                       rep->rr_len -=
-                           ((unsigned char *)iptr - (unsigned char *)headerp);
-                       status = rep->rr_len + rdmalen;
-                       r_xprt->rx_stats.total_rdma_reply += rdmalen;
-                       /* special case - last chunk may omit padding */
-                       if (rdmalen &= 3) {
-                               rdmalen = 4 - rdmalen;
-                               status += rdmalen;
-                       }
-               } else {
-                       /* else ordinary inline */
-                       rdmalen = 0;
-                       iptr = (__be32 *)((unsigned char *)headerp +
-                                                       RPCRDMA_HDRLEN_MIN);
-                       rep->rr_len -= RPCRDMA_HDRLEN_MIN;
-                       status = rep->rr_len;
-               }
-
-               r_xprt->rx_stats.fixup_copy_count +=
-                       rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len,
-                                            rdmalen);
+               status = rpcrdma_decode_msg(r_xprt, rep, rqst);
                break;
-
        case rdma_nomsg:
-               /* never expect read or write chunks, always reply chunks */
-               if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
-                   headerp->rm_body.rm_chunks[1] != xdr_zero ||
-                   headerp->rm_body.rm_chunks[2] != xdr_one)
-                       goto badheader;
-               iptr = (__be32 *)((unsigned char *)headerp +
-                                                       RPCRDMA_HDRLEN_MIN);
-               rdmalen = rpcrdma_count_chunks(rep, 0, &iptr);
-               if (rdmalen < 0)
-                       goto badheader;
-               r_xprt->rx_stats.total_rdma_reply += rdmalen;
-               /* Reply chunk buffer already is the reply vector - no fixup. */
-               status = rdmalen;
+               status = rpcrdma_decode_nomsg(r_xprt, rep);
                break;
-
        case rdma_error:
-               goto out_rdmaerr;
-
-badheader:
+               status = rpcrdma_decode_error(r_xprt, rep, rqst);
+               break;
        default:
-               dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
-                       rqst->rq_task->tk_pid, __func__,
-                       be32_to_cpu(headerp->rm_type));
                status = -EIO;
-               r_xprt->rx_stats.bad_reply_count++;
-               break;
        }
+       if (status < 0)
+               goto out_badheader;
 
 out:
+       spin_lock(&xprt->recv_lock);
        cwnd = xprt->cwnd;
        xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
                xprt_release_rqst_cong(rqst->rq_task);
 
        xprt_complete_rqst(rqst->rq_task, status);
-       spin_unlock_bh(&xprt->transport_lock);
+       xprt_unpin_rqst(rqst);
+       spin_unlock(&xprt->recv_lock);
        dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
                __func__, xprt, rqst, status);
        return;
@@ -1149,72 +1321,38 @@ out_badstatus:
        }
        return;
 
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-out_bcall:
-       rpcrdma_bc_receive_call(r_xprt, rep);
-       return;
-#endif
-
 /* If the incoming reply terminated a pending RPC, the next
  * RPC call will post a replacement receive buffer as it is
  * being marshaled.
  */
 out_badversion:
        dprintk("RPC:       %s: invalid version %d\n",
-               __func__, be32_to_cpu(headerp->rm_vers));
+               __func__, be32_to_cpu(vers));
        status = -EIO;
        r_xprt->rx_stats.bad_reply_count++;
        goto out;
 
-out_rdmaerr:
-       rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
-       switch (rmerr) {
-       case ERR_VERS:
-               pr_err("%s: server reports header version error (%u-%u)\n",
-                      __func__,
-                      be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
-                      be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
-               break;
-       case ERR_CHUNK:
-               pr_err("%s: server reports header decoding error\n",
-                      __func__);
-               break;
-       default:
-               pr_err("%s: server reports unknown error %d\n",
-                      __func__, rmerr);
-       }
-       status = -EREMOTEIO;
+out_badheader:
+       dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
+               rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc));
        r_xprt->rx_stats.bad_reply_count++;
+       status = -EIO;
        goto out;
 
-/* The req was still available, but by the time the transport_lock
+/* The req was still available, but by the time the recv_lock
  * was acquired, the rqst and task had been released. Thus the RPC
  * has already been terminated.
  */
 out_norqst:
-       spin_unlock_bh(&xprt->transport_lock);
-       rpcrdma_buffer_put(req);
-       dprintk("RPC:       %s: race, no rqst left for req %p\n",
-               __func__, req);
-       return;
+       spin_unlock(&xprt->recv_lock);
+       dprintk("RPC:       %s: no match for incoming xid 0x%08x\n",
+               __func__, be32_to_cpu(xid));
+       goto repost;
 
 out_shortreply:
        dprintk("RPC:       %s: short/invalid reply\n", __func__);
        goto repost;
 
-out_nomatch:
-       spin_unlock(&buf->rb_lock);
-       dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
-               __func__, be32_to_cpu(headerp->rm_xid),
-               rep->rr_len);
-       goto repost;
-
-out_duplicate:
-       spin_unlock(&buf->rb_lock);
-       dprintk("RPC:       %s: "
-               "duplicate reply %p to RPC request %p: xid 0x%08x\n",
-               __func__, rep, req, be32_to_cpu(headerp->rm_xid));
-
 /* If no pending RPC transaction was matched, post a replacement
  * receive buffer before returning.
  */
index c676ed0efb5af2cceb6ed59e238a0f7a0109f916..ec37ad83b068a5067ca6e76585ab817335c51768 100644 (file)
@@ -52,7 +52,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
        if (src->iov_len < 24)
                goto out_shortreply;
 
-       spin_lock_bh(&xprt->transport_lock);
+       spin_lock(&xprt->recv_lock);
        req = xprt_lookup_rqst(xprt, xid);
        if (!req)
                goto out_notfound;
@@ -69,17 +69,20 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
        else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
                credits = r_xprt->rx_buf.rb_bc_max_requests;
 
+       spin_lock_bh(&xprt->transport_lock);
        cwnd = xprt->cwnd;
        xprt->cwnd = credits << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
                xprt_release_rqst_cong(req->rq_task);
+       spin_unlock_bh(&xprt->transport_lock);
+
 
        ret = 0;
        xprt_complete_rqst(req->rq_task, rcvbuf->len);
        rcvbuf->len = 0;
 
 out_unlock:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&xprt->recv_lock);
 out:
        return ret;
 
@@ -266,7 +269,7 @@ xprt_rdma_bc_put(struct rpc_xprt *xprt)
        module_put(THIS_MODULE);
 }
 
-static struct rpc_xprt_ops xprt_rdma_bc_procs = {
+static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
        .reserve_xprt           = xprt_reserve_xprt_cong,
        .release_xprt           = xprt_release_xprt_cong,
        .alloc_slot             = xprt_alloc_slot,
index d1c458e5ec4de25b81f30fa5919cb0b9dfc5ffdf..c84e2b644e133ee13c94082b171acbe063bf3f97 100644 (file)
@@ -149,7 +149,7 @@ static struct ctl_table sunrpc_table[] = {
 
 #endif
 
-static struct rpc_xprt_ops xprt_rdma_procs;    /*forward reference */
+static const struct rpc_xprt_ops xprt_rdma_procs;
 
 static void
 xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
@@ -559,6 +559,7 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 
        r_xprt->rx_stats.hardway_register_count += size;
        req->rl_rdmabuf = rb;
+       xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
        return true;
 }
 
@@ -684,7 +685,6 @@ xprt_rdma_free(struct rpc_task *task)
 
        dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-       rpcrdma_remove_req(&r_xprt->rx_buf, req);
        if (!list_empty(&req->rl_registered))
                ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
        rpcrdma_unmap_sges(ia, req);
@@ -730,7 +730,7 @@ xprt_rdma_send_request(struct rpc_task *task)
        if (unlikely(!list_empty(&req->rl_registered)))
                r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
 
-       rc = rpcrdma_marshal_req(rqst);
+       rc = rpcrdma_marshal_req(r_xprt, rqst);
        if (rc < 0)
                goto failed_marshal;
 
@@ -811,7 +811,7 @@ xprt_rdma_disable_swap(struct rpc_xprt *xprt)
  * Plumbing for rpc transport switch and kernel module
  */
 
-static struct rpc_xprt_ops xprt_rdma_procs = {
+static const struct rpc_xprt_ops xprt_rdma_procs = {
        .reserve_xprt           = xprt_reserve_xprt_cong,
        .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
        .alloc_slot             = xprt_alloc_slot,
index e4171f2abe37d966b82b9a858300843027e7224c..11a1fbf7e59e08e9172f4562652dc55d53240075 100644 (file)
@@ -139,14 +139,11 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 static void
 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
 {
-       struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
        struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
+       __be32 *p = rep->rr_rdmabuf->rg_base;
        u32 credits;
 
-       if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
-               return;
-
-       credits = be32_to_cpu(rmsgp->rm_credit);
+       credits = be32_to_cpup(p + 2);
        if (credits == 0)
                credits = 1;    /* don't deadlock */
        else if (credits > buffer->rb_max_requests)
@@ -173,21 +170,19 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
                goto out_fail;
 
        /* status == SUCCESS means all fields in wc are trustworthy */
-       if (wc->opcode != IB_WC_RECV)
-               return;
-
        dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
                __func__, rep, wc->byte_len);
 
-       rep->rr_len = wc->byte_len;
+       rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
        rep->rr_wc_flags = wc->wc_flags;
        rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 
        ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
                                   rdmab_addr(rep->rr_rdmabuf),
-                                  rep->rr_len, DMA_FROM_DEVICE);
+                                  wc->byte_len, DMA_FROM_DEVICE);
 
-       rpcrdma_update_granted_credits(rep);
+       if (wc->byte_len >= RPCRDMA_HDRLEN_ERR)
+               rpcrdma_update_granted_credits(rep);
 
 out_schedule:
        queue_work(rpcrdma_receive_wq, &rep->rr_work);
@@ -198,7 +193,7 @@ out_fail:
                pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
                       ib_wc_status_msg(wc->status),
                       wc->status, wc->vendor_err);
-       rep->rr_len = RPCRDMA_BAD_LEN;
+       rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
        goto out_schedule;
 }
 
@@ -974,6 +969,8 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
                rc = PTR_ERR(rep->rr_rdmabuf);
                goto out_free;
        }
+       xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
+                    rdmab_length(rep->rr_rdmabuf));
 
        rep->rr_cqe.done = rpcrdma_wc_receive;
        rep->rr_rxprt = r_xprt;
@@ -1004,7 +1001,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        spin_lock_init(&buf->rb_recovery_lock);
        INIT_LIST_HEAD(&buf->rb_mws);
        INIT_LIST_HEAD(&buf->rb_all);
-       INIT_LIST_HEAD(&buf->rb_pending);
        INIT_LIST_HEAD(&buf->rb_stale_mrs);
        INIT_DELAYED_WORK(&buf->rb_refresh_worker,
                          rpcrdma_mr_refresh_worker);
index b282d3f8cdd8004c6f0a6391434b6b94775c29f9..e26a97d2f922fad2cc14aa11ea0cd4287d1c26b5 100644 (file)
@@ -218,18 +218,17 @@ enum {
 
 struct rpcrdma_rep {
        struct ib_cqe           rr_cqe;
-       unsigned int            rr_len;
        int                     rr_wc_flags;
        u32                     rr_inv_rkey;
+       struct rpcrdma_regbuf   *rr_rdmabuf;
        struct rpcrdma_xprt     *rr_rxprt;
        struct work_struct      rr_work;
+       struct xdr_buf          rr_hdrbuf;
+       struct xdr_stream       rr_stream;
        struct list_head        rr_list;
        struct ib_recv_wr       rr_recv_wr;
-       struct rpcrdma_regbuf   *rr_rdmabuf;
 };
 
-#define RPCRDMA_BAD_LEN                (~0U)
-
 /*
  * struct rpcrdma_mw - external memory region metadata
  *
@@ -341,11 +340,12 @@ enum {
 struct rpcrdma_buffer;
 struct rpcrdma_req {
        struct list_head        rl_list;
-       __be32                  rl_xid;
        unsigned int            rl_mapped_sges;
        unsigned int            rl_connect_cookie;
        struct rpcrdma_buffer   *rl_buffer;
        struct rpcrdma_rep      *rl_reply;
+       struct xdr_stream       rl_stream;
+       struct xdr_buf          rl_hdrbuf;
        struct ib_send_wr       rl_send_wr;
        struct ib_sge           rl_send_sge[RPCRDMA_MAX_SEND_SGES];
        struct rpcrdma_regbuf   *rl_rdmabuf;    /* xprt header */
@@ -403,7 +403,6 @@ struct rpcrdma_buffer {
        int                     rb_send_count, rb_recv_count;
        struct list_head        rb_send_bufs;
        struct list_head        rb_recv_bufs;
-       struct list_head        rb_pending;
        u32                     rb_max_requests;
        atomic_t                rb_credits;     /* most recent credit grant */
 
@@ -440,24 +439,27 @@ struct rpcrdma_create_data_internal {
  * Statistics for RPCRDMA
  */
 struct rpcrdma_stats {
+       /* accessed when sending a call */
        unsigned long           read_chunk_count;
        unsigned long           write_chunk_count;
        unsigned long           reply_chunk_count;
-
        unsigned long long      total_rdma_request;
-       unsigned long long      total_rdma_reply;
 
+       /* rarely accessed error counters */
        unsigned long long      pullup_copy_count;
-       unsigned long long      fixup_copy_count;
        unsigned long           hardway_register_count;
        unsigned long           failed_marshal_count;
        unsigned long           bad_reply_count;
-       unsigned long           nomsg_call_count;
-       unsigned long           bcall_count;
        unsigned long           mrs_recovered;
        unsigned long           mrs_orphaned;
        unsigned long           mrs_allocated;
+
+       /* accessed when receiving a reply */
+       unsigned long long      total_rdma_reply;
+       unsigned long long      fixup_copy_count;
        unsigned long           local_inv_needed;
+       unsigned long           nomsg_call_count;
+       unsigned long           bcall_count;
 };
 
 /*
@@ -465,7 +467,8 @@ struct rpcrdma_stats {
  */
 struct rpcrdma_xprt;
 struct rpcrdma_memreg_ops {
-       int             (*ro_map)(struct rpcrdma_xprt *,
+       struct rpcrdma_mr_seg *
+                       (*ro_map)(struct rpcrdma_xprt *,
                                  struct rpcrdma_mr_seg *, int, bool,
                                  struct rpcrdma_mw **);
        void            (*ro_unmap_sync)(struct rpcrdma_xprt *,
@@ -552,34 +555,6 @@ void rpcrdma_destroy_req(struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
-static inline void
-rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
-{
-       spin_lock(&buffers->rb_lock);
-       if (list_empty(&req->rl_list))
-               list_add_tail(&req->rl_list, &buffers->rb_pending);
-       spin_unlock(&buffers->rb_lock);
-}
-
-static inline struct rpcrdma_req *
-rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
-{
-       struct rpcrdma_req *pos;
-
-       list_for_each_entry(pos, &buffers->rb_pending, rl_list)
-               if (pos->rl_xid == xid)
-                       return pos;
-       return NULL;
-}
-
-static inline void
-rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
-{
-       spin_lock(&buffers->rb_lock);
-       list_del(&req->rl_list);
-       spin_unlock(&buffers->rb_lock);
-}
-
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
@@ -638,10 +613,16 @@ enum rpcrdma_chunktype {
 bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
                               u32, struct xdr_buf *, enum rpcrdma_chunktype);
 void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
-int rpcrdma_marshal_req(struct rpc_rqst *);
+int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_reply_handler(struct work_struct *work);
 
+static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
+{
+       xdr->head[0].iov_len = len;
+       xdr->len = len;
+}
+
 /* RPC/RDMA module init - xprtrdma/transport.c
  */
 extern unsigned int xprt_rdma_max_inline_read;
index 4f154d3887483e9cf1a6ab151c487e1d6113c1a8..9b5de31aa42939cbc3f6aa4a0a94d7a28f711e84 100644 (file)
@@ -969,10 +969,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
                return;
 
        /* Look up and lock the request corresponding to the given XID */
-       spin_lock_bh(&xprt->transport_lock);
+       spin_lock(&xprt->recv_lock);
        rovr = xprt_lookup_rqst(xprt, *xp);
        if (!rovr)
                goto out_unlock;
+       xprt_pin_rqst(rovr);
+       spin_unlock(&xprt->recv_lock);
        task = rovr->rq_task;
 
        copied = rovr->rq_private_buf.buflen;
@@ -981,13 +983,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
 
        if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
                dprintk("RPC:       sk_buff copy failed\n");
-               goto out_unlock;
+               spin_lock(&xprt->recv_lock);
+               goto out_unpin;
        }
 
+       spin_lock(&xprt->recv_lock);
        xprt_complete_rqst(task, copied);
-
+out_unpin:
+       xprt_unpin_rqst(rovr);
  out_unlock:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&xprt->recv_lock);
 }
 
 static void xs_local_data_receive(struct sock_xprt *transport)
@@ -1050,10 +1055,12 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
                return;
 
        /* Look up and lock the request corresponding to the given XID */
-       spin_lock_bh(&xprt->transport_lock);
+       spin_lock(&xprt->recv_lock);
        rovr = xprt_lookup_rqst(xprt, *xp);
        if (!rovr)
                goto out_unlock;
+       xprt_pin_rqst(rovr);
+       spin_unlock(&xprt->recv_lock);
        task = rovr->rq_task;
 
        if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1062,16 +1069,21 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
        /* Suck it into the iovec, verify checksum if not done by hw. */
        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
                __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
-               goto out_unlock;
+               spin_lock(&xprt->recv_lock);
+               goto out_unpin;
        }
 
        __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
 
+       spin_lock_bh(&xprt->transport_lock);
        xprt_adjust_cwnd(xprt, task, copied);
+       spin_unlock_bh(&xprt->transport_lock);
+       spin_lock(&xprt->recv_lock);
        xprt_complete_rqst(task, copied);
-
+out_unpin:
+       xprt_unpin_rqst(rovr);
  out_unlock:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&xprt->recv_lock);
 }
 
 static void xs_udp_data_receive(struct sock_xprt *transport)
@@ -1277,25 +1289,12 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
        }
 
        len = desc->count;
-       if (len > transport->tcp_reclen - transport->tcp_offset) {
-               struct xdr_skb_reader my_desc;
-
-               len = transport->tcp_reclen - transport->tcp_offset;
-               memcpy(&my_desc, desc, sizeof(my_desc));
-               my_desc.count = len;
-               r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
-                                         &my_desc, xdr_skb_read_bits);
-               desc->count -= r;
-               desc->offset += r;
-       } else
-               r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
+       if (len > transport->tcp_reclen - transport->tcp_offset)
+               desc->count = transport->tcp_reclen - transport->tcp_offset;
+       r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
                                          desc, xdr_skb_read_bits);
 
-       if (r > 0) {
-               transport->tcp_copied += r;
-               transport->tcp_offset += r;
-       }
-       if (r != len) {
+       if (desc->count) {
                /* Error when copying to the receive buffer,
                 * usually because we weren't able to allocate
                 * additional buffer pages. All we can do now
@@ -1315,6 +1314,10 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
                return;
        }
 
+       transport->tcp_copied += r;
+       transport->tcp_offset += r;
+       desc->count = len - r;
+
        dprintk("RPC:       XID %08x read %zd bytes\n",
                        ntohl(transport->tcp_xid), r);
        dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
@@ -1343,21 +1346,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
        dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
 
        /* Find and lock the request corresponding to this xid */
-       spin_lock_bh(&xprt->transport_lock);
+       spin_lock(&xprt->recv_lock);
        req = xprt_lookup_rqst(xprt, transport->tcp_xid);
        if (!req) {
                dprintk("RPC:       XID %08x request not found!\n",
                                ntohl(transport->tcp_xid));
-               spin_unlock_bh(&xprt->transport_lock);
+               spin_unlock(&xprt->recv_lock);
                return -1;
        }
+       xprt_pin_rqst(req);
+       spin_unlock(&xprt->recv_lock);
 
        xs_tcp_read_common(xprt, desc, req);
 
+       spin_lock(&xprt->recv_lock);
        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
                xprt_complete_rqst(req->rq_task, transport->tcp_copied);
-
-       spin_unlock_bh(&xprt->transport_lock);
+       xprt_unpin_rqst(req);
+       spin_unlock(&xprt->recv_lock);
        return 0;
 }
 
@@ -1376,11 +1382,9 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
                                container_of(xprt, struct sock_xprt, xprt);
        struct rpc_rqst *req;
 
-       /* Look up and lock the request corresponding to the given XID */
-       spin_lock_bh(&xprt->transport_lock);
+       /* Look up the request corresponding to the given XID */
        req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
        if (req == NULL) {
-               spin_unlock_bh(&xprt->transport_lock);
                printk(KERN_WARNING "Callback slot table overflowed\n");
                xprt_force_disconnect(xprt);
                return -1;
@@ -1391,7 +1395,6 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
 
        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
                xprt_complete_bc_request(req, transport->tcp_copied);
-       spin_unlock_bh(&xprt->transport_lock);
 
        return 0;
 }
@@ -1516,6 +1519,7 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
                .arg.data = xprt,
        };
        unsigned long total = 0;
+       int loop;
        int read = 0;
 
        mutex_lock(&transport->recv_mutex);
@@ -1524,20 +1528,20 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
                goto out;
 
        /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
-       for (;;) {
+       for (loop = 0; loop < 64; loop++) {
                lock_sock(sk);
                read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
                if (read <= 0) {
                        clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
                        release_sock(sk);
-                       if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
-                               break;
-               } else {
-                       release_sock(sk);
-                       total += read;
+                       break;
                }
+               release_sock(sk);
+               total += read;
                rd_desc.count = 65536;
        }
+       if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+               queue_work(xprtiod_workqueue, &transport->recv_worker);
 out:
        mutex_unlock(&transport->recv_mutex);
        trace_xs_tcp_data_ready(xprt, read, total);
@@ -2724,7 +2728,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
        module_put(THIS_MODULE);
 }
 
-static struct rpc_xprt_ops xs_local_ops = {
+static const struct rpc_xprt_ops xs_local_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
        .release_xprt           = xs_tcp_release_xprt,
        .alloc_slot             = xprt_alloc_slot,
@@ -2742,7 +2746,7 @@ static struct rpc_xprt_ops xs_local_ops = {
        .disable_swap           = xs_disable_swap,
 };
 
-static struct rpc_xprt_ops xs_udp_ops = {
+static const struct rpc_xprt_ops xs_udp_ops = {
        .set_buffer_size        = xs_udp_set_buffer_size,
        .reserve_xprt           = xprt_reserve_xprt_cong,
        .release_xprt           = xprt_release_xprt_cong,
@@ -2764,7 +2768,7 @@ static struct rpc_xprt_ops xs_udp_ops = {
        .inject_disconnect      = xs_inject_disconnect,
 };
 
-static struct rpc_xprt_ops xs_tcp_ops = {
+static const struct rpc_xprt_ops xs_tcp_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
        .release_xprt           = xs_tcp_release_xprt,
        .alloc_slot             = xprt_lock_and_alloc_slot,
@@ -2795,7 +2799,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
  * The rpc_xprt_ops for the server backchannel
  */
 
-static struct rpc_xprt_ops bc_tcp_ops = {
+static const struct rpc_xprt_ops bc_tcp_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
        .release_xprt           = xprt_release_xprt,
        .alloc_slot             = xprt_alloc_slot,