Merge tag 'nfsd-4.20' of git://linux-nfs.org/~bfields/linux
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 30 Oct 2018 20:03:29 +0000 (13:03 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 30 Oct 2018 20:03:29 +0000 (13:03 -0700)
Pull nfsd updates from Bruce Fields:
 "Olga added support for the NFSv4.2 asynchronous copy protocol. We
  already supported COPY, by copying a limited amount of data and then
  returning a short result, letting the client resend. The asynchronous
  protocol should offer better performance at the expense of some
  complexity.

  The other highlight is Trond's work to convert the duplicate reply
  cache to a red-black tree, and to move it and some other server caches
  to RCU. (Previously these have meant taking global spinlocks on every
  RPC)

  Otherwise, some RDMA work and miscellaneous bugfixes"

* tag 'nfsd-4.20' of git://linux-nfs.org/~bfields/linux: (30 commits)
  lockd: fix access beyond unterminated strings in prints
  nfsd: Fix an Oops in free_session()
  nfsd: correctly decrement odstate refcount in error path
  svcrdma: Increase the default connection credit limit
  svcrdma: Remove try_module_get from backchannel
  svcrdma: Remove ->release_rqst call in bc reply handler
  svcrdma: Reduce max_send_sges
  nfsd: fix fall-through annotations
  knfsd: Improve lookup performance in the duplicate reply cache using an rbtree
  knfsd: Further simplify the cache lookup
  knfsd: Simplify NFS duplicate replay cache
  knfsd: Remove dead code from nfsd_cache_lookup
  SUNRPC: Simplify TCP receive code
  SUNRPC: Replace the cache_detail->hash_lock with a regular spinlock
  SUNRPC: Remove non-RCU protected lookup
  NFS: Fix up a typo in nfs_dns_ent_put
  NFS: Lockless DNS lookups
  knfsd: Lockless lookup of NFSv4 identities.
  SUNRPC: Lockless server RPCSEC_GSS context lookup
  knfsd: Allow lockless lookups of the exports
  ...

29 files changed:
Documentation/filesystems/nfs/rpc-cache.txt
fs/lockd/host.c
fs/nfs/dns_resolve.c
fs/nfsd/cache.h
fs/nfsd/export.c
fs/nfsd/export.h
fs/nfsd/netns.h
fs/nfsd/nfs4callback.c
fs/nfsd/nfs4idmap.c
fs/nfsd/nfs4proc.c
fs/nfsd/nfs4state.c
fs/nfsd/nfs4xdr.c
fs/nfsd/nfscache.c
fs/nfsd/nfsctl.c
fs/nfsd/state.h
fs/nfsd/vfs.c
fs/nfsd/xdr4.h
fs/nfsd/xdr4cb.h
include/linux/sunrpc/cache.h
include/linux/sunrpc/svc_rdma.h
include/linux/sunrpc/svcauth.h
net/sunrpc/auth_gss/svcauth_gss.c
net/sunrpc/cache.c
net/sunrpc/svc_xprt.c
net/sunrpc/svcauth.c
net/sunrpc/svcauth_unix.c
net/sunrpc/svcsock.c
net/sunrpc/xprtrdma/svc_rdma_backchannel.c
net/sunrpc/xprtrdma/svc_rdma_transport.c

index ebcaaee2161684b99f5d6c0f5866bea805f6c405..c4dac829db0f60cab1ea7b7c88eb2c700489bf08 100644 (file)
@@ -84,7 +84,7 @@ Creating a Cache
                A message from user space has arrived to fill out a
                cache entry.  It is in 'buf' of length 'len'.
                cache_parse should parse this, find the item in the
-               cache with sunrpc_cache_lookup, and update the item
+               cache with sunrpc_cache_lookup_rcu, and update the item
                with sunrpc_cache_update.
 
 
@@ -95,7 +95,7 @@ Creating a Cache
 Using a cache
 -------------
 
-To find a value in a cache, call sunrpc_cache_lookup passing a pointer
+To find a value in a cache, call sunrpc_cache_lookup_rcu passing a pointer
 to the cache_head in a sample item with the 'key' fields filled in.
 This will be passed to ->match to identify the target entry.  If no
 entry is found, a new entry will be create, added to the cache, and
@@ -116,7 +116,7 @@ item does become valid, the deferred copy of the request will be
 revisited (->revisit).  It is expected that this method will
 reschedule the request for processing.
 
-The value returned by sunrpc_cache_lookup can also be passed to
+The value returned by sunrpc_cache_lookup_rcu can also be passed to
 sunrpc_cache_update to set the content for the item.  A second item is
 passed which should hold the content.  If the item found by _lookup
 has valid data, then it is discarded and a new item is created.  This
index d35cd6be067592acd7f2bc8ca50f0dd869e817e5..93fb7cf0b92b631358cf36eab60d5947cc0312a7 100644 (file)
@@ -341,7 +341,7 @@ struct nlm_host *nlmsvc_lookup_host(const struct svc_rqst *rqstp,
        };
        struct lockd_net *ln = net_generic(net, lockd_net_id);
 
-       dprintk("lockd: %s(host='%*s', vers=%u, proto=%s)\n", __func__,
+       dprintk("lockd: %s(host='%.*s', vers=%u, proto=%s)\n", __func__,
                        (int)hostname_len, hostname, rqstp->rq_vers,
                        (rqstp->rq_prot == IPPROTO_UDP ? "udp" : "tcp"));
 
index 060c658eab6600ebb0cf2dfa069d70d4e0f152a0..a7d3df85736dfba49c461ed54a17b5db0d3bfa44 100644 (file)
@@ -65,6 +65,7 @@ struct nfs_dns_ent {
 
        struct sockaddr_storage addr;
        size_t addrlen;
+       struct rcu_head rcu_head;
 };
 
 
@@ -101,15 +102,23 @@ static void nfs_dns_ent_init(struct cache_head *cnew,
        }
 }
 
-static void nfs_dns_ent_put(struct kref *ref)
+static void nfs_dns_ent_free_rcu(struct rcu_head *head)
 {
        struct nfs_dns_ent *item;
 
-       item = container_of(ref, struct nfs_dns_ent, h.ref);
+       item = container_of(head, struct nfs_dns_ent, rcu_head);
        kfree(item->hostname);
        kfree(item);
 }
 
+static void nfs_dns_ent_put(struct kref *ref)
+{
+       struct nfs_dns_ent *item;
+
+       item = container_of(ref, struct nfs_dns_ent, h.ref);
+       call_rcu(&item->rcu_head, nfs_dns_ent_free_rcu);
+}
+
 static struct cache_head *nfs_dns_ent_alloc(void)
 {
        struct nfs_dns_ent *item = kmalloc(sizeof(*item), GFP_KERNEL);
@@ -195,7 +204,7 @@ static struct nfs_dns_ent *nfs_dns_lookup(struct cache_detail *cd,
 {
        struct cache_head *ch;
 
-       ch = sunrpc_cache_lookup(cd,
+       ch = sunrpc_cache_lookup_rcu(cd,
                        &key->h,
                        nfs_dns_hash(key));
        if (!ch)
index b7559c6f2b9767609f0832f0fb5e0fb80d51fce0..4a98537efb0fd3bfb8b6936624f36cbfa1f6d1d7 100644 (file)
  * is much larger than a sockaddr_in6.
  */
 struct svc_cacherep {
-       struct list_head        c_lru;
+       struct {
+               /* Keep often-read xid, csum in the same cache line: */
+               __be32                  k_xid;
+               __wsum                  k_csum;
+               u32                     k_proc;
+               u32                     k_prot;
+               u32                     k_vers;
+               unsigned int            k_len;
+               struct sockaddr_in6     k_addr;
+       } c_key;
 
+       struct rb_node          c_node;
+       struct list_head        c_lru;
        unsigned char           c_state,        /* unused, inprog, done */
                                c_type,         /* status, buffer */
                                c_secure : 1;   /* req came from port < 1024 */
-       struct sockaddr_in6     c_addr;
-       __be32                  c_xid;
-       u32                     c_prot;
-       u32                     c_proc;
-       u32                     c_vers;
-       unsigned int            c_len;
-       __wsum                  c_csum;
        unsigned long           c_timestamp;
        union {
                struct kvec     u_vec;
index a1143f7c220153c0809cb8dd43da895a685fe98b..802993d8912f79f6a70ab1661ff13ea41815bbbf 100644 (file)
@@ -46,7 +46,7 @@ static void expkey_put(struct kref *ref)
            !test_bit(CACHE_NEGATIVE, &key->h.flags))
                path_put(&key->ek_path);
        auth_domain_put(key->ek_client);
-       kfree(key);
+       kfree_rcu(key, ek_rcu);
 }
 
 static void expkey_request(struct cache_detail *cd,
@@ -265,7 +265,7 @@ svc_expkey_lookup(struct cache_detail *cd, struct svc_expkey *item)
        struct cache_head *ch;
        int hash = svc_expkey_hash(item);
 
-       ch = sunrpc_cache_lookup(cd, &item->h, hash);
+       ch = sunrpc_cache_lookup_rcu(cd, &item->h, hash);
        if (ch)
                return container_of(ch, struct svc_expkey, h);
        else
@@ -314,7 +314,7 @@ static void svc_export_put(struct kref *ref)
        auth_domain_put(exp->ex_client);
        nfsd4_fslocs_free(&exp->ex_fslocs);
        kfree(exp->ex_uuid);
-       kfree(exp);
+       kfree_rcu(exp, ex_rcu);
 }
 
 static void svc_export_request(struct cache_detail *cd,
@@ -780,7 +780,7 @@ svc_export_lookup(struct svc_export *exp)
        struct cache_head *ch;
        int hash = svc_export_hash(exp);
 
-       ch = sunrpc_cache_lookup(exp->cd, &exp->h, hash);
+       ch = sunrpc_cache_lookup_rcu(exp->cd, &exp->h, hash);
        if (ch)
                return container_of(ch, struct svc_export, h);
        else
@@ -1216,9 +1216,9 @@ static int e_show(struct seq_file *m, void *p)
 }
 
 const struct seq_operations nfs_exports_op = {
-       .start  = cache_seq_start,
-       .next   = cache_seq_next,
-       .stop   = cache_seq_stop,
+       .start  = cache_seq_start_rcu,
+       .next   = cache_seq_next_rcu,
+       .stop   = cache_seq_stop_rcu,
        .show   = e_show,
 };
 
index c8b74126ddaa86c4de9ab9233f3ce3552d7e5edf..e7daa1f246f0866beee438abe96533108a978062 100644 (file)
@@ -61,6 +61,7 @@ struct svc_export {
        u32                     ex_layout_types;
        struct nfsd4_deviceid_map *ex_devid_map;
        struct cache_detail     *cd;
+       struct rcu_head         ex_rcu;
 };
 
 /* an "export key" (expkey) maps a filehandlefragement to an
@@ -75,6 +76,7 @@ struct svc_expkey {
        u32                     ek_fsid[6];
 
        struct path             ek_path;
+       struct rcu_head         ek_rcu;
 };
 
 #define EX_ISSYNC(exp)         (!((exp)->ex_flags & NFSEXP_ASYNC))
index 426f550056974d9d58b2868239fce3a74d0fc383..32cb8c027483e943d930117ba037c572597a7ee4 100644 (file)
@@ -123,6 +123,14 @@ struct nfsd_net {
 
        wait_queue_head_t ntf_wq;
        atomic_t ntf_refcnt;
+
+       /*
+        * clientid and stateid data for construction of net unique COPY
+        * stateids.
+        */
+       u32             s2s_cp_cl_id;
+       struct idr      s2s_cp_stateids;
+       spinlock_t      s2s_cp_lock;
 };
 
 /* Simple check to find out if a given net was properly initialized */
index 601bf33c26a0bbdc036bb7b2a195fc07ccfc7fe8..25987bcdf96f81f1611db1dc9ced9d44ff1809c6 100644 (file)
@@ -39,6 +39,7 @@
 #include "state.h"
 #include "netns.h"
 #include "xdr4cb.h"
+#include "xdr4.h"
 
 #define NFSDDBG_FACILITY                NFSDDBG_PROC
 
@@ -105,6 +106,7 @@ enum nfs_cb_opnum4 {
        OP_CB_WANTS_CANCELLED           = 12,
        OP_CB_NOTIFY_LOCK               = 13,
        OP_CB_NOTIFY_DEVICEID           = 14,
+       OP_CB_OFFLOAD                   = 15,
        OP_CB_ILLEGAL                   = 10044
 };
 
@@ -682,6 +684,101 @@ static int nfs4_xdr_dec_cb_notify_lock(struct rpc_rqst *rqstp,
        return decode_cb_op_status(xdr, OP_CB_NOTIFY_LOCK, &cb->cb_status);
 }
 
+/*
+ * struct write_response4 {
+ *     stateid4        wr_callback_id<1>;
+ *     length4         wr_count;
+ *     stable_how4     wr_committed;
+ *     verifier4       wr_writeverf;
+ * };
+ * union offload_info4 switch (nfsstat4 coa_status) {
+ *     case NFS4_OK:
+ *             write_response4 coa_resok4;
+ *     default:
+ *     length4         coa_bytes_copied;
+ * };
+ * struct CB_OFFLOAD4args {
+ *     nfs_fh4         coa_fh;
+ *     stateid4        coa_stateid;
+ *     offload_info4   coa_offload_info;
+ * };
+ */
+static void encode_offload_info4(struct xdr_stream *xdr,
+                                __be32 nfserr,
+                                const struct nfsd4_copy *cp)
+{
+       __be32 *p;
+
+       p = xdr_reserve_space(xdr, 4);
+       *p++ = nfserr;
+       if (!nfserr) {
+               p = xdr_reserve_space(xdr, 4 + 8 + 4 + NFS4_VERIFIER_SIZE);
+               p = xdr_encode_empty_array(p);
+               p = xdr_encode_hyper(p, cp->cp_res.wr_bytes_written);
+               *p++ = cpu_to_be32(cp->cp_res.wr_stable_how);
+               p = xdr_encode_opaque_fixed(p, cp->cp_res.wr_verifier.data,
+                                           NFS4_VERIFIER_SIZE);
+       } else {
+               p = xdr_reserve_space(xdr, 8);
+               /* We always return success if bytes were written */
+               p = xdr_encode_hyper(p, 0);
+       }
+}
+
+static void encode_cb_offload4args(struct xdr_stream *xdr,
+                                  __be32 nfserr,
+                                  const struct knfsd_fh *fh,
+                                  const struct nfsd4_copy *cp,
+                                  struct nfs4_cb_compound_hdr *hdr)
+{
+       __be32 *p;
+
+       p = xdr_reserve_space(xdr, 4);
+       *p++ = cpu_to_be32(OP_CB_OFFLOAD);
+       encode_nfs_fh4(xdr, fh);
+       encode_stateid4(xdr, &cp->cp_res.cb_stateid);
+       encode_offload_info4(xdr, nfserr, cp);
+
+       hdr->nops++;
+}
+
+static void nfs4_xdr_enc_cb_offload(struct rpc_rqst *req,
+                                   struct xdr_stream *xdr,
+                                   const void *data)
+{
+       const struct nfsd4_callback *cb = data;
+       const struct nfsd4_copy *cp =
+               container_of(cb, struct nfsd4_copy, cp_cb);
+       struct nfs4_cb_compound_hdr hdr = {
+               .ident = 0,
+               .minorversion = cb->cb_clp->cl_minorversion,
+       };
+
+       encode_cb_compound4args(xdr, &hdr);
+       encode_cb_sequence4args(xdr, cb, &hdr);
+       encode_cb_offload4args(xdr, cp->nfserr, &cp->fh, cp, &hdr);
+       encode_cb_nops(&hdr);
+}
+
+static int nfs4_xdr_dec_cb_offload(struct rpc_rqst *rqstp,
+                                  struct xdr_stream *xdr,
+                                  void *data)
+{
+       struct nfsd4_callback *cb = data;
+       struct nfs4_cb_compound_hdr hdr;
+       int status;
+
+       status = decode_cb_compound4res(xdr, &hdr);
+       if (unlikely(status))
+               return status;
+
+       if (cb) {
+               status = decode_cb_sequence4res(xdr, cb);
+               if (unlikely(status || cb->cb_seq_status))
+                       return status;
+       }
+       return decode_cb_op_status(xdr, OP_CB_OFFLOAD, &cb->cb_status);
+}
 /*
  * RPC procedure tables
  */
@@ -703,6 +800,7 @@ static const struct rpc_procinfo nfs4_cb_procedures[] = {
        PROC(CB_LAYOUT, COMPOUND,       cb_layout,      cb_layout),
 #endif
        PROC(CB_NOTIFY_LOCK,    COMPOUND,       cb_notify_lock, cb_notify_lock),
+       PROC(CB_OFFLOAD,        COMPOUND,       cb_offload,     cb_offload),
 };
 
 static unsigned int nfs4_cb_counts[ARRAY_SIZE(nfs4_cb_procedures)];
index a5bb76593ce72c280ddbd8e5957beddea49ec413..bf137fec33ff916558ca7c95e2662a46c5880163 100644 (file)
@@ -65,6 +65,7 @@ struct ent {
        u32               id;
        char              name[IDMAP_NAMESZ];
        char              authname[IDMAP_NAMESZ];
+       struct rcu_head   rcu_head;
 };
 
 /* Common entry handling */
@@ -89,7 +90,7 @@ static void
 ent_put(struct kref *ref)
 {
        struct ent *map = container_of(ref, struct ent, h.ref);
-       kfree(map);
+       kfree_rcu(map, rcu_head);
 }
 
 static struct cache_head *
@@ -264,8 +265,8 @@ out:
 static struct ent *
 idtoname_lookup(struct cache_detail *cd, struct ent *item)
 {
-       struct cache_head *ch = sunrpc_cache_lookup(cd, &item->h,
-                                                   idtoname_hash(item));
+       struct cache_head *ch = sunrpc_cache_lookup_rcu(cd, &item->h,
+                                                       idtoname_hash(item));
        if (ch)
                return container_of(ch, struct ent, h);
        else
@@ -422,8 +423,8 @@ out:
 static struct ent *
 nametoid_lookup(struct cache_detail *cd, struct ent *item)
 {
-       struct cache_head *ch = sunrpc_cache_lookup(cd, &item->h,
-                                                   nametoid_hash(item));
+       struct cache_head *ch = sunrpc_cache_lookup_rcu(cd, &item->h,
+                                                       nametoid_hash(item));
        if (ch)
                return container_of(ch, struct ent, h);
        else
index b7bc6e1a85ac3d1472b7867ccd75b5819126f2c8..edff074d38c75c19a06a6ae5c634ba1fd1688d98 100644 (file)
@@ -36,6 +36,7 @@
 #include <linux/file.h>
 #include <linux/falloc.h>
 #include <linux/slab.h>
+#include <linux/kthread.h>
 
 #include "idmap.h"
 #include "cache.h"
@@ -1089,36 +1090,254 @@ out:
        return status;
 }
 
+void nfs4_put_copy(struct nfsd4_copy *copy)
+{
+       if (!refcount_dec_and_test(&copy->refcount))
+               return;
+       kfree(copy);
+}
+
+static bool
+check_and_set_stop_copy(struct nfsd4_copy *copy)
+{
+       bool value;
+
+       spin_lock(&copy->cp_clp->async_lock);
+       value = copy->stopped;
+       if (!copy->stopped)
+               copy->stopped = true;
+       spin_unlock(&copy->cp_clp->async_lock);
+       return value;
+}
+
+static void nfsd4_stop_copy(struct nfsd4_copy *copy)
+{
+       /* only 1 thread should stop the copy */
+       if (!check_and_set_stop_copy(copy))
+               kthread_stop(copy->copy_task);
+       nfs4_put_copy(copy);
+}
+
+static struct nfsd4_copy *nfsd4_get_copy(struct nfs4_client *clp)
+{
+       struct nfsd4_copy *copy = NULL;
+
+       spin_lock(&clp->async_lock);
+       if (!list_empty(&clp->async_copies)) {
+               copy = list_first_entry(&clp->async_copies, struct nfsd4_copy,
+                                       copies);
+               refcount_inc(&copy->refcount);
+       }
+       spin_unlock(&clp->async_lock);
+       return copy;
+}
+
+void nfsd4_shutdown_copy(struct nfs4_client *clp)
+{
+       struct nfsd4_copy *copy;
+
+       while ((copy = nfsd4_get_copy(clp)) != NULL)
+               nfsd4_stop_copy(copy);
+}
+
+static void nfsd4_cb_offload_release(struct nfsd4_callback *cb)
+{
+       struct nfsd4_copy *copy = container_of(cb, struct nfsd4_copy, cp_cb);
+
+       nfs4_put_copy(copy);
+}
+
+static int nfsd4_cb_offload_done(struct nfsd4_callback *cb,
+                                struct rpc_task *task)
+{
+       return 1;
+}
+
+static const struct nfsd4_callback_ops nfsd4_cb_offload_ops = {
+       .release = nfsd4_cb_offload_release,
+       .done = nfsd4_cb_offload_done
+};
+
+static void nfsd4_init_copy_res(struct nfsd4_copy *copy, bool sync)
+{
+       copy->cp_res.wr_stable_how = NFS_UNSTABLE;
+       copy->cp_synchronous = sync;
+       gen_boot_verifier(&copy->cp_res.wr_verifier, copy->cp_clp->net);
+}
+
+static ssize_t _nfsd_copy_file_range(struct nfsd4_copy *copy)
+{
+       ssize_t bytes_copied = 0;
+       size_t bytes_total = copy->cp_count;
+       u64 src_pos = copy->cp_src_pos;
+       u64 dst_pos = copy->cp_dst_pos;
+
+       do {
+               if (kthread_should_stop())
+                       break;
+               bytes_copied = nfsd_copy_file_range(copy->file_src, src_pos,
+                               copy->file_dst, dst_pos, bytes_total);
+               if (bytes_copied <= 0)
+                       break;
+               bytes_total -= bytes_copied;
+               copy->cp_res.wr_bytes_written += bytes_copied;
+               src_pos += bytes_copied;
+               dst_pos += bytes_copied;
+       } while (bytes_total > 0 && !copy->cp_synchronous);
+       return bytes_copied;
+}
+
+static __be32 nfsd4_do_copy(struct nfsd4_copy *copy, bool sync)
+{
+       __be32 status;
+       ssize_t bytes;
+
+       bytes = _nfsd_copy_file_range(copy);
+       /* for async copy, we ignore the error, client can always retry
+        * to get the error
+        */
+       if (bytes < 0 && !copy->cp_res.wr_bytes_written)
+               status = nfserrno(bytes);
+       else {
+               nfsd4_init_copy_res(copy, sync);
+               status = nfs_ok;
+       }
+
+       fput(copy->file_src);
+       fput(copy->file_dst);
+       return status;
+}
+
+static void dup_copy_fields(struct nfsd4_copy *src, struct nfsd4_copy *dst)
+{
+       dst->cp_src_pos = src->cp_src_pos;
+       dst->cp_dst_pos = src->cp_dst_pos;
+       dst->cp_count = src->cp_count;
+       dst->cp_synchronous = src->cp_synchronous;
+       memcpy(&dst->cp_res, &src->cp_res, sizeof(src->cp_res));
+       memcpy(&dst->fh, &src->fh, sizeof(src->fh));
+       dst->cp_clp = src->cp_clp;
+       dst->file_dst = get_file(src->file_dst);
+       dst->file_src = get_file(src->file_src);
+       memcpy(&dst->cp_stateid, &src->cp_stateid, sizeof(src->cp_stateid));
+}
+
+static void cleanup_async_copy(struct nfsd4_copy *copy)
+{
+       nfs4_free_cp_state(copy);
+       fput(copy->file_dst);
+       fput(copy->file_src);
+       spin_lock(&copy->cp_clp->async_lock);
+       list_del(&copy->copies);
+       spin_unlock(&copy->cp_clp->async_lock);
+       nfs4_put_copy(copy);
+}
+
+static int nfsd4_do_async_copy(void *data)
+{
+       struct nfsd4_copy *copy = (struct nfsd4_copy *)data;
+       struct nfsd4_copy *cb_copy;
+
+       copy->nfserr = nfsd4_do_copy(copy, 0);
+       cb_copy = kzalloc(sizeof(struct nfsd4_copy), GFP_KERNEL);
+       if (!cb_copy)
+               goto out;
+       memcpy(&cb_copy->cp_res, &copy->cp_res, sizeof(copy->cp_res));
+       cb_copy->cp_clp = copy->cp_clp;
+       cb_copy->nfserr = copy->nfserr;
+       memcpy(&cb_copy->fh, &copy->fh, sizeof(copy->fh));
+       nfsd4_init_cb(&cb_copy->cp_cb, cb_copy->cp_clp,
+                       &nfsd4_cb_offload_ops, NFSPROC4_CLNT_CB_OFFLOAD);
+       nfsd4_run_cb(&cb_copy->cp_cb);
+out:
+       cleanup_async_copy(copy);
+       return 0;
+}
+
 static __be32
 nfsd4_copy(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
                union nfsd4_op_u *u)
 {
        struct nfsd4_copy *copy = &u->copy;
-       struct file *src, *dst;
        __be32 status;
-       ssize_t bytes;
+       struct nfsd4_copy *async_copy = NULL;
 
-       status = nfsd4_verify_copy(rqstp, cstate, &copy->cp_src_stateid, &src,
-                                  &copy->cp_dst_stateid, &dst);
+       status = nfsd4_verify_copy(rqstp, cstate, &copy->cp_src_stateid,
+                                  &copy->file_src, &copy->cp_dst_stateid,
+                                  &copy->file_dst);
        if (status)
                goto out;
 
-       bytes = nfsd_copy_file_range(src, copy->cp_src_pos,
-                       dst, copy->cp_dst_pos, copy->cp_count);
+       copy->cp_clp = cstate->clp;
+       memcpy(&copy->fh, &cstate->current_fh.fh_handle,
+               sizeof(struct knfsd_fh));
+       if (!copy->cp_synchronous) {
+               struct nfsd_net *nn = net_generic(SVC_NET(rqstp), nfsd_net_id);
 
-       if (bytes < 0)
-               status = nfserrno(bytes);
-       else {
-               copy->cp_res.wr_bytes_written = bytes;
-               copy->cp_res.wr_stable_how = NFS_UNSTABLE;
-               copy->cp_synchronous = 1;
-               gen_boot_verifier(&copy->cp_res.wr_verifier, SVC_NET(rqstp));
+               status = nfserrno(-ENOMEM);
+               async_copy = kzalloc(sizeof(struct nfsd4_copy), GFP_KERNEL);
+               if (!async_copy)
+                       goto out;
+               if (!nfs4_init_cp_state(nn, copy)) {
+                       kfree(async_copy);
+                       goto out;
+               }
+               refcount_set(&async_copy->refcount, 1);
+               memcpy(&copy->cp_res.cb_stateid, &copy->cp_stateid,
+                       sizeof(copy->cp_stateid));
+               dup_copy_fields(copy, async_copy);
+               async_copy->copy_task = kthread_create(nfsd4_do_async_copy,
+                               async_copy, "%s", "copy thread");
+               if (IS_ERR(async_copy->copy_task))
+                       goto out_err;
+               spin_lock(&async_copy->cp_clp->async_lock);
+               list_add(&async_copy->copies,
+                               &async_copy->cp_clp->async_copies);
+               spin_unlock(&async_copy->cp_clp->async_lock);
+               wake_up_process(async_copy->copy_task);
                status = nfs_ok;
+       } else
+               status = nfsd4_do_copy(copy, 1);
+out:
+       return status;
+out_err:
+       cleanup_async_copy(async_copy);
+       goto out;
+}
+
+struct nfsd4_copy *
+find_async_copy(struct nfs4_client *clp, stateid_t *stateid)
+{
+       struct nfsd4_copy *copy;
+
+       spin_lock(&clp->async_lock);
+       list_for_each_entry(copy, &clp->async_copies, copies) {
+               if (memcmp(&copy->cp_stateid, stateid, NFS4_STATEID_SIZE))
+                       continue;
+               refcount_inc(&copy->refcount);
+               spin_unlock(&clp->async_lock);
+               return copy;
        }
+       spin_unlock(&clp->async_lock);
+       return NULL;
+}
+
+static __be32
+nfsd4_offload_cancel(struct svc_rqst *rqstp,
+                    struct nfsd4_compound_state *cstate,
+                    union nfsd4_op_u *u)
+{
+       struct nfsd4_offload_status *os = &u->offload_status;
+       __be32 status = 0;
+       struct nfsd4_copy *copy;
+       struct nfs4_client *clp = cstate->clp;
+
+       copy = find_async_copy(clp, &os->stateid);
+       if (copy)
+               nfsd4_stop_copy(copy);
+       else
+               status = nfserr_bad_stateid;
 
-       fput(src);
-       fput(dst);
-out:
        return status;
 }
 
@@ -1144,6 +1363,25 @@ nfsd4_fallocate(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
        fput(file);
        return status;
 }
+static __be32
+nfsd4_offload_status(struct svc_rqst *rqstp,
+                    struct nfsd4_compound_state *cstate,
+                    union nfsd4_op_u *u)
+{
+       struct nfsd4_offload_status *os = &u->offload_status;
+       __be32 status = 0;
+       struct nfsd4_copy *copy;
+       struct nfs4_client *clp = cstate->clp;
+
+       copy = find_async_copy(clp, &os->stateid);
+       if (copy) {
+               os->count = copy->cp_res.wr_bytes_written;
+               nfs4_put_copy(copy);
+       } else
+               status = nfserr_bad_stateid;
+
+       return status;
+}
 
 static __be32
 nfsd4_allocate(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
@@ -2047,6 +2285,14 @@ static inline u32 nfsd4_copy_rsize(struct svc_rqst *rqstp, struct nfsd4_op *op)
                1 /* cr_synchronous */) * sizeof(__be32);
 }
 
+static inline u32 nfsd4_offload_status_rsize(struct svc_rqst *rqstp,
+                                            struct nfsd4_op *op)
+{
+       return (op_encode_hdr_size +
+               2 /* osr_count */ +
+               1 /* osr_complete<1> optional 0 for now */) * sizeof(__be32);
+}
+
 #ifdef CONFIG_NFSD_PNFS
 static inline u32 nfsd4_getdeviceinfo_rsize(struct svc_rqst *rqstp, struct nfsd4_op *op)
 {
@@ -2460,6 +2706,17 @@ static const struct nfsd4_operation nfsd4_ops[] = {
                .op_name = "OP_SEEK",
                .op_rsize_bop = nfsd4_seek_rsize,
        },
+       [OP_OFFLOAD_STATUS] = {
+               .op_func = nfsd4_offload_status,
+               .op_name = "OP_OFFLOAD_STATUS",
+               .op_rsize_bop = nfsd4_offload_status_rsize,
+       },
+       [OP_OFFLOAD_CANCEL] = {
+               .op_func = nfsd4_offload_cancel,
+               .op_flags = OP_MODIFIES_SOMETHING,
+               .op_name = "OP_OFFLOAD_CANCEL",
+               .op_rsize_bop = nfsd4_only_status_rsize,
+       },
 };
 
 /**
index b0ca0efd287510417387fe916ea9668c587ddec1..f093fbe471338b06a2d544ebe00983cb065968fa 100644 (file)
@@ -713,6 +713,36 @@ out_free:
        return NULL;
 }
 
+/*
+ * Create a unique stateid_t to represent each COPY.
+ */
+int nfs4_init_cp_state(struct nfsd_net *nn, struct nfsd4_copy *copy)
+{
+       int new_id;
+
+       idr_preload(GFP_KERNEL);
+       spin_lock(&nn->s2s_cp_lock);
+       new_id = idr_alloc_cyclic(&nn->s2s_cp_stateids, copy, 0, 0, GFP_NOWAIT);
+       spin_unlock(&nn->s2s_cp_lock);
+       idr_preload_end();
+       if (new_id < 0)
+               return 0;
+       copy->cp_stateid.si_opaque.so_id = new_id;
+       copy->cp_stateid.si_opaque.so_clid.cl_boot = nn->boot_time;
+       copy->cp_stateid.si_opaque.so_clid.cl_id = nn->s2s_cp_cl_id;
+       return 1;
+}
+
+void nfs4_free_cp_state(struct nfsd4_copy *copy)
+{
+       struct nfsd_net *nn;
+
+       nn = net_generic(copy->cp_clp->net, nfsd_net_id);
+       spin_lock(&nn->s2s_cp_lock);
+       idr_remove(&nn->s2s_cp_stateids, copy->cp_stateid.si_opaque.so_id);
+       spin_unlock(&nn->s2s_cp_lock);
+}
+
 static struct nfs4_ol_stateid * nfs4_alloc_open_stateid(struct nfs4_client *clp)
 {
        struct nfs4_stid *stid;
@@ -1827,6 +1857,8 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name)
 #ifdef CONFIG_NFSD_PNFS
        INIT_LIST_HEAD(&clp->cl_lo_states);
 #endif
+       INIT_LIST_HEAD(&clp->async_copies);
+       spin_lock_init(&clp->async_lock);
        spin_lock_init(&clp->cl_lock);
        rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table");
        return clp;
@@ -1942,6 +1974,7 @@ __destroy_client(struct nfs4_client *clp)
                }
        }
        nfsd4_return_all_client_layouts(clp);
+       nfsd4_shutdown_copy(clp);
        nfsd4_shutdown_callback(clp);
        if (clp->cl_cb_conn.cb_xprt)
                svc_xprt_put(clp->cl_cb_conn.cb_xprt);
@@ -2475,7 +2508,8 @@ static bool client_has_state(struct nfs4_client *clp)
                || !list_empty(&clp->cl_lo_states)
 #endif
                || !list_empty(&clp->cl_delegations)
-               || !list_empty(&clp->cl_sessions);
+               || !list_empty(&clp->cl_sessions)
+               || !list_empty(&clp->async_copies);
 }
 
 __be32
@@ -4364,7 +4398,7 @@ nfs4_set_delegation(struct nfs4_client *clp, struct svc_fh *fh,
 
        fl = nfs4_alloc_init_lease(dp, NFS4_OPEN_DELEGATE_READ);
        if (!fl)
-               goto out_stid;
+               goto out_clnt_odstate;
 
        status = vfs_setlease(fp->fi_deleg_file, fl->fl_type, &fl, NULL);
        if (fl)
@@ -4389,7 +4423,6 @@ out_unlock:
        vfs_setlease(fp->fi_deleg_file, F_UNLCK, NULL, (void **)&dp);
 out_clnt_odstate:
        put_clnt_odstate(dp->dl_clnt_odstate);
-out_stid:
        nfs4_put_stid(&dp->dl_stid);
 out_delegees:
        put_deleg_file(fp);
@@ -7161,6 +7194,8 @@ static int nfs4_state_create_net(struct net *net)
        INIT_LIST_HEAD(&nn->close_lru);
        INIT_LIST_HEAD(&nn->del_recall_lru);
        spin_lock_init(&nn->client_lock);
+       spin_lock_init(&nn->s2s_cp_lock);
+       idr_init(&nn->s2s_cp_stateids);
 
        spin_lock_init(&nn->blocked_locks_lock);
        INIT_LIST_HEAD(&nn->blocked_locks_lru);
index 418fa9c78186b6b6f5d36ced347d1255ce2505fe..3de42a7290939eac692cc57b66d4e515576641ec 100644 (file)
@@ -1767,6 +1767,13 @@ nfsd4_decode_copy(struct nfsd4_compoundargs *argp, struct nfsd4_copy *copy)
        DECODE_TAIL;
 }
 
+static __be32
+nfsd4_decode_offload_status(struct nfsd4_compoundargs *argp,
+                           struct nfsd4_offload_status *os)
+{
+       return nfsd4_decode_stateid(argp, &os->stateid);
+}
+
 static __be32
 nfsd4_decode_seek(struct nfsd4_compoundargs *argp, struct nfsd4_seek *seek)
 {
@@ -1873,8 +1880,8 @@ static const nfsd4_dec nfsd4_dec_ops[] = {
        [OP_IO_ADVISE]          = (nfsd4_dec)nfsd4_decode_notsupp,
        [OP_LAYOUTERROR]        = (nfsd4_dec)nfsd4_decode_notsupp,
        [OP_LAYOUTSTATS]        = (nfsd4_dec)nfsd4_decode_notsupp,
-       [OP_OFFLOAD_CANCEL]     = (nfsd4_dec)nfsd4_decode_notsupp,
-       [OP_OFFLOAD_STATUS]     = (nfsd4_dec)nfsd4_decode_notsupp,
+       [OP_OFFLOAD_CANCEL]     = (nfsd4_dec)nfsd4_decode_offload_status,
+       [OP_OFFLOAD_STATUS]     = (nfsd4_dec)nfsd4_decode_offload_status,
        [OP_READ_PLUS]          = (nfsd4_dec)nfsd4_decode_notsupp,
        [OP_SEEK]               = (nfsd4_dec)nfsd4_decode_seek,
        [OP_WRITE_SAME]         = (nfsd4_dec)nfsd4_decode_notsupp,
@@ -4224,15 +4231,27 @@ nfsd4_encode_layoutreturn(struct nfsd4_compoundres *resp, __be32 nfserr,
 #endif /* CONFIG_NFSD_PNFS */
 
 static __be32
-nfsd42_encode_write_res(struct nfsd4_compoundres *resp, struct nfsd42_write_res *write)
+nfsd42_encode_write_res(struct nfsd4_compoundres *resp,
+               struct nfsd42_write_res *write, bool sync)
 {
        __be32 *p;
+       p = xdr_reserve_space(&resp->xdr, 4);
+       if (!p)
+               return nfserr_resource;
 
-       p = xdr_reserve_space(&resp->xdr, 4 + 8 + 4 + NFS4_VERIFIER_SIZE);
+       if (sync)
+               *p++ = cpu_to_be32(0);
+       else {
+               __be32 nfserr;
+               *p++ = cpu_to_be32(1);
+               nfserr = nfsd4_encode_stateid(&resp->xdr, &write->cb_stateid);
+               if (nfserr)
+                       return nfserr;
+       }
+       p = xdr_reserve_space(&resp->xdr, 8 + 4 + NFS4_VERIFIER_SIZE);
        if (!p)
                return nfserr_resource;
 
-       *p++ = cpu_to_be32(0);
        p = xdr_encode_hyper(p, write->wr_bytes_written);
        *p++ = cpu_to_be32(write->wr_stable_how);
        p = xdr_encode_opaque_fixed(p, write->wr_verifier.data,
@@ -4246,7 +4265,8 @@ nfsd4_encode_copy(struct nfsd4_compoundres *resp, __be32 nfserr,
 {
        __be32 *p;
 
-       nfserr = nfsd42_encode_write_res(resp, &copy->cp_res);
+       nfserr = nfsd42_encode_write_res(resp, &copy->cp_res,
+                       copy->cp_synchronous);
        if (nfserr)
                return nfserr;
 
@@ -4256,6 +4276,22 @@ nfsd4_encode_copy(struct nfsd4_compoundres *resp, __be32 nfserr,
        return 0;
 }
 
+static __be32
+nfsd4_encode_offload_status(struct nfsd4_compoundres *resp, __be32 nfserr,
+                           struct nfsd4_offload_status *os)
+{
+       struct xdr_stream *xdr = &resp->xdr;
+       __be32 *p;
+
+       p = xdr_reserve_space(xdr, 8 + 4);
+       if (!p)
+               return nfserr_resource;
+       p = xdr_encode_hyper(p, os->count);
+       *p++ = cpu_to_be32(0);
+
+       return nfserr;
+}
+
 static __be32
 nfsd4_encode_seek(struct nfsd4_compoundres *resp, __be32 nfserr,
                  struct nfsd4_seek *seek)
@@ -4359,7 +4395,7 @@ static const nfsd4_enc nfsd4_enc_ops[] = {
        [OP_LAYOUTERROR]        = (nfsd4_enc)nfsd4_encode_noop,
        [OP_LAYOUTSTATS]        = (nfsd4_enc)nfsd4_encode_noop,
        [OP_OFFLOAD_CANCEL]     = (nfsd4_enc)nfsd4_encode_noop,
-       [OP_OFFLOAD_STATUS]     = (nfsd4_enc)nfsd4_encode_noop,
+       [OP_OFFLOAD_STATUS]     = (nfsd4_enc)nfsd4_encode_offload_status,
        [OP_READ_PLUS]          = (nfsd4_enc)nfsd4_encode_noop,
        [OP_SEEK]               = (nfsd4_enc)nfsd4_encode_seek,
        [OP_WRITE_SAME]         = (nfsd4_enc)nfsd4_encode_noop,
index dbdeb9d6af0392e3017b1ed73feebc993397c269..e2fe0e9ce0df08973e400db9754c842005ace49a 100644 (file)
@@ -30,6 +30,7 @@
 #define TARGET_BUCKET_SIZE     64
 
 struct nfsd_drc_bucket {
+       struct rb_root rb_head;
        struct list_head lru_head;
        spinlock_t cache_lock;
 };
@@ -121,7 +122,7 @@ nfsd_cache_hash(__be32 xid)
 }
 
 static struct svc_cacherep *
-nfsd_reply_cache_alloc(void)
+nfsd_reply_cache_alloc(struct svc_rqst *rqstp, __wsum csum)
 {
        struct svc_cacherep     *rp;
 
@@ -129,21 +130,35 @@ nfsd_reply_cache_alloc(void)
        if (rp) {
                rp->c_state = RC_UNUSED;
                rp->c_type = RC_NOCACHE;
+               RB_CLEAR_NODE(&rp->c_node);
                INIT_LIST_HEAD(&rp->c_lru);
+
+               memset(&rp->c_key, 0, sizeof(rp->c_key));
+               rp->c_key.k_xid = rqstp->rq_xid;
+               rp->c_key.k_proc = rqstp->rq_proc;
+               rpc_copy_addr((struct sockaddr *)&rp->c_key.k_addr, svc_addr(rqstp));
+               rpc_set_port((struct sockaddr *)&rp->c_key.k_addr, rpc_get_port(svc_addr(rqstp)));
+               rp->c_key.k_prot = rqstp->rq_prot;
+               rp->c_key.k_vers = rqstp->rq_vers;
+               rp->c_key.k_len = rqstp->rq_arg.len;
+               rp->c_key.k_csum = csum;
        }
        return rp;
 }
 
 static void
-nfsd_reply_cache_free_locked(struct svc_cacherep *rp)
+nfsd_reply_cache_free_locked(struct nfsd_drc_bucket *b, struct svc_cacherep *rp)
 {
        if (rp->c_type == RC_REPLBUFF && rp->c_replvec.iov_base) {
                drc_mem_usage -= rp->c_replvec.iov_len;
                kfree(rp->c_replvec.iov_base);
        }
-       list_del(&rp->c_lru);
-       atomic_dec(&num_drc_entries);
-       drc_mem_usage -= sizeof(*rp);
+       if (rp->c_state != RC_UNUSED) {
+               rb_erase(&rp->c_node, &b->rb_head);
+               list_del(&rp->c_lru);
+               atomic_dec(&num_drc_entries);
+               drc_mem_usage -= sizeof(*rp);
+       }
        kmem_cache_free(drc_slab, rp);
 }
 
@@ -151,7 +166,7 @@ static void
 nfsd_reply_cache_free(struct nfsd_drc_bucket *b, struct svc_cacherep *rp)
 {
        spin_lock(&b->cache_lock);
-       nfsd_reply_cache_free_locked(rp);
+       nfsd_reply_cache_free_locked(b, rp);
        spin_unlock(&b->cache_lock);
 }
 
@@ -207,7 +222,7 @@ void nfsd_reply_cache_shutdown(void)
                struct list_head *head = &drc_hashtbl[i].lru_head;
                while (!list_empty(head)) {
                        rp = list_first_entry(head, struct svc_cacherep, c_lru);
-                       nfsd_reply_cache_free_locked(rp);
+                       nfsd_reply_cache_free_locked(&drc_hashtbl[i], rp);
                }
        }
 
@@ -246,7 +261,7 @@ prune_bucket(struct nfsd_drc_bucket *b)
                if (atomic_read(&num_drc_entries) <= max_drc_entries &&
                    time_before(jiffies, rp->c_timestamp + RC_EXPIRE))
                        break;
-               nfsd_reply_cache_free_locked(rp);
+               nfsd_reply_cache_free_locked(b, rp);
                freed++;
        }
        return freed;
@@ -318,51 +333,48 @@ nfsd_cache_csum(struct svc_rqst *rqstp)
        return csum;
 }
 
-static bool
-nfsd_cache_match(struct svc_rqst *rqstp, __wsum csum, struct svc_cacherep *rp)
+static int
+nfsd_cache_key_cmp(const struct svc_cacherep *key, const struct svc_cacherep *rp)
 {
-       /* Check RPC XID first */
-       if (rqstp->rq_xid != rp->c_xid)
-               return false;
-       /* compare checksum of NFS data */
-       if (csum != rp->c_csum) {
+       if (key->c_key.k_xid == rp->c_key.k_xid &&
+           key->c_key.k_csum != rp->c_key.k_csum)
                ++payload_misses;
-               return false;
-       }
 
-       /* Other discriminators */
-       if (rqstp->rq_proc != rp->c_proc ||
-           rqstp->rq_prot != rp->c_prot ||
-           rqstp->rq_vers != rp->c_vers ||
-           rqstp->rq_arg.len != rp->c_len ||
-           !rpc_cmp_addr(svc_addr(rqstp), (struct sockaddr *)&rp->c_addr) ||
-           rpc_get_port(svc_addr(rqstp)) != rpc_get_port((struct sockaddr *)&rp->c_addr))
-               return false;
-
-       return true;
+       return memcmp(&key->c_key, &rp->c_key, sizeof(key->c_key));
 }
 
 /*
  * Search the request hash for an entry that matches the given rqstp.
  * Must be called with cache_lock held. Returns the found entry or
- * NULL on failure.
+ * inserts an empty key on failure.
  */
 static struct svc_cacherep *
-nfsd_cache_search(struct nfsd_drc_bucket *b, struct svc_rqst *rqstp,
-               __wsum csum)
+nfsd_cache_insert(struct nfsd_drc_bucket *b, struct svc_cacherep *key)
 {
-       struct svc_cacherep     *rp, *ret = NULL;
-       struct list_head        *rh = &b->lru_head;
+       struct svc_cacherep     *rp, *ret = key;
+       struct rb_node          **p = &b->rb_head.rb_node,
+                               *parent = NULL;
        unsigned int            entries = 0;
+       int cmp;
 
-       list_for_each_entry(rp, rh, c_lru) {
+       while (*p != NULL) {
                ++entries;
-               if (nfsd_cache_match(rqstp, csum, rp)) {
+               parent = *p;
+               rp = rb_entry(parent, struct svc_cacherep, c_node);
+
+               cmp = nfsd_cache_key_cmp(key, rp);
+               if (cmp < 0)
+                       p = &parent->rb_left;
+               else if (cmp > 0)
+                       p = &parent->rb_right;
+               else {
                        ret = rp;
-                       break;
+                       goto out;
                }
        }
-
+       rb_link_node(&key->c_node, parent, p);
+       rb_insert_color(&key->c_node, &b->rb_head);
+out:
        /* tally hash chain length stats */
        if (entries > longest_chain) {
                longest_chain = entries;
@@ -374,6 +386,7 @@ nfsd_cache_search(struct nfsd_drc_bucket *b, struct svc_rqst *rqstp,
                                atomic_read(&num_drc_entries));
        }
 
+       lru_put_end(b, ret);
        return ret;
 }
 
@@ -389,9 +402,6 @@ nfsd_cache_lookup(struct svc_rqst *rqstp)
 {
        struct svc_cacherep     *rp, *found;
        __be32                  xid = rqstp->rq_xid;
-       u32                     proto =  rqstp->rq_prot,
-                               vers = rqstp->rq_vers,
-                               proc = rqstp->rq_proc;
        __wsum                  csum;
        u32 hash = nfsd_cache_hash(xid);
        struct nfsd_drc_bucket *b = &drc_hashtbl[hash];
@@ -410,60 +420,38 @@ nfsd_cache_lookup(struct svc_rqst *rqstp)
         * Since the common case is a cache miss followed by an insert,
         * preallocate an entry.
         */
-       rp = nfsd_reply_cache_alloc();
-       spin_lock(&b->cache_lock);
-       if (likely(rp)) {
-               atomic_inc(&num_drc_entries);
-               drc_mem_usage += sizeof(*rp);
+       rp = nfsd_reply_cache_alloc(rqstp, csum);
+       if (!rp) {
+               dprintk("nfsd: unable to allocate DRC entry!\n");
+               return rtn;
        }
 
-       /* go ahead and prune the cache */
-       prune_bucket(b);
-
-       found = nfsd_cache_search(b, rqstp, csum);
-       if (found) {
-               if (likely(rp))
-                       nfsd_reply_cache_free_locked(rp);
+       spin_lock(&b->cache_lock);
+       found = nfsd_cache_insert(b, rp);
+       if (found != rp) {
+               nfsd_reply_cache_free_locked(NULL, rp);
                rp = found;
                goto found_entry;
        }
 
-       if (!rp) {
-               dprintk("nfsd: unable to allocate DRC entry!\n");
-               goto out;
-       }
-
        nfsdstats.rcmisses++;
        rqstp->rq_cacherep = rp;
        rp->c_state = RC_INPROG;
-       rp->c_xid = xid;
-       rp->c_proc = proc;
-       rpc_copy_addr((struct sockaddr *)&rp->c_addr, svc_addr(rqstp));
-       rpc_set_port((struct sockaddr *)&rp->c_addr, rpc_get_port(svc_addr(rqstp)));
-       rp->c_prot = proto;
-       rp->c_vers = vers;
-       rp->c_len = rqstp->rq_arg.len;
-       rp->c_csum = csum;
 
-       lru_put_end(b, rp);
+       atomic_inc(&num_drc_entries);
+       drc_mem_usage += sizeof(*rp);
 
-       /* release any buffer */
-       if (rp->c_type == RC_REPLBUFF) {
-               drc_mem_usage -= rp->c_replvec.iov_len;
-               kfree(rp->c_replvec.iov_base);
-               rp->c_replvec.iov_base = NULL;
-       }
-       rp->c_type = RC_NOCACHE;
+       /* go ahead and prune the cache */
+       prune_bucket(b);
  out:
        spin_unlock(&b->cache_lock);
        return rtn;
 
 found_entry:
-       nfsdstats.rchits++;
        /* We found a matching entry which is either in progress or done. */
-       lru_put_end(b, rp);
-
+       nfsdstats.rchits++;
        rtn = RC_DROPIT;
+
        /* Request being processed */
        if (rp->c_state == RC_INPROG)
                goto out;
@@ -489,7 +477,7 @@ found_entry:
                break;
        default:
                printk(KERN_WARNING "nfsd: bad repcache type %d\n", rp->c_type);
-               nfsd_reply_cache_free_locked(rp);
+               nfsd_reply_cache_free_locked(b, rp);
        }
 
        goto out;
@@ -524,7 +512,7 @@ nfsd_cache_update(struct svc_rqst *rqstp, int cachetype, __be32 *statp)
        if (!rp)
                return;
 
-       hash = nfsd_cache_hash(rp->c_xid);
+       hash = nfsd_cache_hash(rp->c_key.k_xid);
        b = &drc_hashtbl[hash];
 
        len = resv->iov_len - ((char*)statp - (char*)resv->iov_base);
index 7fb9f7c667b11077adc4afacb20d5c75319c5841..6384c9b9489883d0e08cc83e328799782d45dc7c 100644 (file)
@@ -1242,6 +1242,7 @@ static __net_init int nfsd_init_net(struct net *net)
        nn->somebody_reclaimed = false;
        nn->clverifier_counter = prandom_u32();
        nn->clientid_counter = prandom_u32();
+       nn->s2s_cp_cl_id = nn->clientid_counter++;
 
        atomic_set(&nn->ntf_refcnt, 0);
        init_waitqueue_head(&nn->ntf_wq);
index 0b15dac7e609716ce032d4fa6873eefe8e2d4690..6aacb325b6a0f3d1f8cd1fd881d96fccc9499d95 100644 (file)
@@ -355,6 +355,8 @@ struct nfs4_client {
        struct rpc_wait_queue   cl_cb_waitq;    /* backchannel callers may */
                                                /* wait here for slots */
        struct net              *net;
+       struct list_head        async_copies;   /* list of async copies */
+       spinlock_t              async_lock;     /* lock for async copies */
 };
 
 /* struct nfs4_client_reset
@@ -573,6 +575,7 @@ enum nfsd4_cb_op {
        NFSPROC4_CLNT_CB_NULL = 0,
        NFSPROC4_CLNT_CB_RECALL,
        NFSPROC4_CLNT_CB_LAYOUT,
+       NFSPROC4_CLNT_CB_OFFLOAD,
        NFSPROC4_CLNT_CB_SEQUENCE,
        NFSPROC4_CLNT_CB_NOTIFY_LOCK,
 };
@@ -599,6 +602,7 @@ struct nfsd4_blocked_lock {
 
 struct nfsd4_compound_state;
 struct nfsd_net;
+struct nfsd4_copy;
 
 extern __be32 nfs4_preprocess_stateid_op(struct svc_rqst *rqstp,
                struct nfsd4_compound_state *cstate, struct svc_fh *fhp,
@@ -608,6 +612,8 @@ __be32 nfsd4_lookup_stateid(struct nfsd4_compound_state *cstate,
                     struct nfs4_stid **s, struct nfsd_net *nn);
 struct nfs4_stid *nfs4_alloc_stid(struct nfs4_client *cl, struct kmem_cache *slab,
                                  void (*sc_free)(struct nfs4_stid *));
+int nfs4_init_cp_state(struct nfsd_net *nn, struct nfsd4_copy *copy);
+void nfs4_free_cp_state(struct nfsd4_copy *copy);
 void nfs4_unhash_stid(struct nfs4_stid *s);
 void nfs4_put_stid(struct nfs4_stid *s);
 void nfs4_inc_and_copy_stateid(stateid_t *dst, struct nfs4_stid *stid);
@@ -626,6 +632,7 @@ extern void nfsd4_run_cb(struct nfsd4_callback *cb);
 extern int nfsd4_create_callback_queue(void);
 extern void nfsd4_destroy_callback_queue(void);
 extern void nfsd4_shutdown_callback(struct nfs4_client *);
+extern void nfsd4_shutdown_copy(struct nfs4_client *clp);
 extern void nfsd4_prepare_cb_recall(struct nfs4_delegation *dp);
 extern struct nfs4_client_reclaim *nfs4_client_to_reclaim(const char *name,
                                                        struct nfsd_net *nn);
@@ -633,6 +640,9 @@ extern bool nfs4_has_reclaimed_state(const char *name, struct nfsd_net *nn);
 
 struct nfs4_file *find_file(struct knfsd_fh *fh);
 void put_nfs4_file(struct nfs4_file *fi);
+extern void nfs4_put_copy(struct nfsd4_copy *copy);
+extern struct nfsd4_copy *
+find_async_copy(struct nfs4_client *clp, stateid_t *staetid);
 static inline void get_nfs4_file(struct nfs4_file *fi)
 {
        refcount_inc(&fi->fi_ref);
index b53e76391e52539d11daee791bc47cc07b2ae773..2751976704e9388239fbb3742001e261ca09bdfe 100644 (file)
@@ -1276,7 +1276,6 @@ nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
                int type, dev_t rdev, struct svc_fh *resfhp)
 {
        struct dentry   *dentry, *dchild = NULL;
-       struct inode    *dirp;
        __be32          err;
        int             host_err;
 
@@ -1288,7 +1287,6 @@ nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
                return err;
 
        dentry = fhp->fh_dentry;
-       dirp = d_inode(dentry);
 
        host_err = fh_want_write(fhp);
        if (host_err)
@@ -1409,6 +1407,7 @@ do_nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
                                        *created = 1;
                                break;
                        }
+                       /* fall through */
                case NFS4_CREATE_EXCLUSIVE4_1:
                        if (   d_inode(dchild)->i_mtime.tv_sec == v_mtime
                            && d_inode(dchild)->i_atime.tv_sec == v_atime
@@ -1417,7 +1416,7 @@ do_nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
                                        *created = 1;
                                goto set_attr;
                        }
-                        /* fallthru */
+                       /* fall through */
                case NFS3_CREATE_GUARDED:
                        err = nfserr_exist;
                }
index 17c453a7999c42d5b55a4a7ce05312c655e1c161..feeb6d4bdffda38463bec8f75c6124820a5697ac 100644 (file)
@@ -511,6 +511,7 @@ struct nfsd42_write_res {
        u64                     wr_bytes_written;
        u32                     wr_stable_how;
        nfs4_verifier           wr_verifier;
+       stateid_t               cb_stateid;
 };
 
 struct nfsd4_copy {
@@ -526,6 +527,23 @@ struct nfsd4_copy {
 
        /* response */
        struct nfsd42_write_res cp_res;
+
+       /* for cb_offload */
+       struct nfsd4_callback   cp_cb;
+       __be32                  nfserr;
+       struct knfsd_fh         fh;
+
+       struct nfs4_client      *cp_clp;
+
+       struct file             *file_src;
+       struct file             *file_dst;
+
+       stateid_t               cp_stateid;
+
+       struct list_head        copies;
+       struct task_struct      *copy_task;
+       refcount_t              refcount;
+       bool                    stopped;
 };
 
 struct nfsd4_seek {
@@ -539,6 +557,15 @@ struct nfsd4_seek {
        loff_t          seek_pos;
 };
 
+struct nfsd4_offload_status {
+       /* request */
+       stateid_t       stateid;
+
+       /* response */
+       u64             count;
+       u32             status;
+};
+
 struct nfsd4_op {
        int                                     opnum;
        const struct nfsd4_operation *          opdesc;
@@ -597,6 +624,7 @@ struct nfsd4_op {
                struct nfsd4_fallocate          deallocate;
                struct nfsd4_clone              clone;
                struct nfsd4_copy               copy;
+               struct nfsd4_offload_status     offload_status;
                struct nfsd4_seek               seek;
        } u;
        struct nfs4_replay *                    replay;
index 517239af03027c176eeb9dd73e2c08a0ed9d43bd..547cf07cf4e08c3fbfd4da2a2cb48859e403fdf9 100644 (file)
 #define NFS4_dec_cb_notify_lock_sz     (cb_compound_dec_hdr_sz  +      \
                                        cb_sequence_dec_sz +            \
                                        op_dec_sz)
+#define enc_cb_offload_info_sz         (1 + 1 + 2 + 1 +                \
+                                       XDR_QUADLEN(NFS4_VERIFIER_SIZE))
+#define NFS4_enc_cb_offload_sz         (cb_compound_enc_hdr_sz +       \
+                                       cb_sequence_enc_sz +            \
+                                       enc_nfs4_fh_sz +                \
+                                       enc_stateid_sz +                \
+                                       enc_cb_offload_info_sz)
+#define NFS4_dec_cb_offload_sz         (cb_compound_dec_hdr_sz  +      \
+                                       cb_sequence_dec_sz +            \
+                                       op_dec_sz)
index 40d2822f0e2f1d1a6aa1a84e2ec820a8f9df392e..5a3e95017fc60968da060b318f97dde648f020b4 100644 (file)
@@ -67,7 +67,7 @@ struct cache_detail {
        struct module *         owner;
        int                     hash_size;
        struct hlist_head *     hash_table;
-       rwlock_t                hash_lock;
+       spinlock_t              hash_lock;
 
        char                    *name;
        void                    (*cache_put)(struct kref *);
@@ -168,8 +168,8 @@ extern const struct file_operations content_file_operations_pipefs;
 extern const struct file_operations cache_flush_operations_pipefs;
 
 extern struct cache_head *
-sunrpc_cache_lookup(struct cache_detail *detail,
-                   struct cache_head *key, int hash);
+sunrpc_cache_lookup_rcu(struct cache_detail *detail,
+                       struct cache_head *key, int hash);
 extern struct cache_head *
 sunrpc_cache_update(struct cache_detail *detail,
                    struct cache_head *new, struct cache_head *old, int hash);
@@ -186,6 +186,12 @@ static inline struct cache_head  *cache_get(struct cache_head *h)
        return h;
 }
 
+static inline struct cache_head  *cache_get_rcu(struct cache_head *h)
+{
+       if (kref_get_unless_zero(&h->ref))
+               return h;
+       return NULL;
+}
 
 static inline void cache_put(struct cache_head *h, struct cache_detail *cd)
 {
@@ -224,9 +230,9 @@ extern void sunrpc_cache_unregister_pipefs(struct cache_detail *);
 extern void sunrpc_cache_unhash(struct cache_detail *, struct cache_head *);
 
 /* Must store cache_detail in seq_file->private if using next three functions */
-extern void *cache_seq_start(struct seq_file *file, loff_t *pos);
-extern void *cache_seq_next(struct seq_file *file, void *p, loff_t *pos);
-extern void cache_seq_stop(struct seq_file *file, void *p);
+extern void *cache_seq_start_rcu(struct seq_file *file, loff_t *pos);
+extern void *cache_seq_next_rcu(struct seq_file *file, void *p, loff_t *pos);
+extern void cache_seq_stop_rcu(struct seq_file *file, void *p);
 
 extern void qword_add(char **bpp, int *lp, char *str);
 extern void qword_addhex(char **bpp, int *lp, char *buf, int blen);
index fd78f78df5c662b61430b93858bb5295fec45480..e6e26918504c1ee6644b52604bd25c223303a3ce 100644 (file)
@@ -113,13 +113,14 @@ struct svcxprt_rdma {
 /* sc_flags */
 #define RDMAXPRT_CONN_PENDING  3
 
-#define RPCRDMA_LISTEN_BACKLOG  10
-#define RPCRDMA_MAX_REQUESTS    32
-
-/* Typical ULP usage of BC requests is NFSv4.1 backchannel. Our
- * current NFSv4.1 implementation supports one backchannel slot.
+/*
+ * Default connection parameters
  */
-#define RPCRDMA_MAX_BC_REQUESTS        2
+enum {
+       RPCRDMA_LISTEN_BACKLOG  = 10,
+       RPCRDMA_MAX_REQUESTS    = 64,
+       RPCRDMA_MAX_BC_REQUESTS = 2,
+};
 
 #define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD
 
index 04e404a0788222be04f12f84724a5595f52acfe4..3e53a6e2ada746e1ba7b3eaf2a34b39ad3f46194 100644 (file)
@@ -82,6 +82,7 @@ struct auth_domain {
        struct hlist_node       hash;
        char                    *name;
        struct auth_ops         *flavour;
+       struct rcu_head         rcu_head;
 };
 
 /*
index 860f2a1bbb675d71f4de3bd70acd1284974ffe3a..1ece4bc3eb8d86470f41255c05e57ece1ba90f0e 100644 (file)
@@ -76,6 +76,7 @@ struct rsi {
        struct xdr_netobj       in_handle, in_token;
        struct xdr_netobj       out_handle, out_token;
        int                     major_status, minor_status;
+       struct rcu_head         rcu_head;
 };
 
 static struct rsi *rsi_update(struct cache_detail *cd, struct rsi *new, struct rsi *old);
@@ -89,13 +90,21 @@ static void rsi_free(struct rsi *rsii)
        kfree(rsii->out_token.data);
 }
 
-static void rsi_put(struct kref *ref)
+static void rsi_free_rcu(struct rcu_head *head)
 {
-       struct rsi *rsii = container_of(ref, struct rsi, h.ref);
+       struct rsi *rsii = container_of(head, struct rsi, rcu_head);
+
        rsi_free(rsii);
        kfree(rsii);
 }
 
+static void rsi_put(struct kref *ref)
+{
+       struct rsi *rsii = container_of(ref, struct rsi, h.ref);
+
+       call_rcu(&rsii->rcu_head, rsi_free_rcu);
+}
+
 static inline int rsi_hash(struct rsi *item)
 {
        return hash_mem(item->in_handle.data, item->in_handle.len, RSI_HASHBITS)
@@ -282,7 +291,7 @@ static struct rsi *rsi_lookup(struct cache_detail *cd, struct rsi *item)
        struct cache_head *ch;
        int hash = rsi_hash(item);
 
-       ch = sunrpc_cache_lookup(cd, &item->h, hash);
+       ch = sunrpc_cache_lookup_rcu(cd, &item->h, hash);
        if (ch)
                return container_of(ch, struct rsi, h);
        else
@@ -330,6 +339,7 @@ struct rsc {
        struct svc_cred         cred;
        struct gss_svc_seq_data seqdata;
        struct gss_ctx          *mechctx;
+       struct rcu_head         rcu_head;
 };
 
 static struct rsc *rsc_update(struct cache_detail *cd, struct rsc *new, struct rsc *old);
@@ -343,12 +353,22 @@ static void rsc_free(struct rsc *rsci)
        free_svc_cred(&rsci->cred);
 }
 
+static void rsc_free_rcu(struct rcu_head *head)
+{
+       struct rsc *rsci = container_of(head, struct rsc, rcu_head);
+
+       kfree(rsci->handle.data);
+       kfree(rsci);
+}
+
 static void rsc_put(struct kref *ref)
 {
        struct rsc *rsci = container_of(ref, struct rsc, h.ref);
 
-       rsc_free(rsci);
-       kfree(rsci);
+       if (rsci->mechctx)
+               gss_delete_sec_context(&rsci->mechctx);
+       free_svc_cred(&rsci->cred);
+       call_rcu(&rsci->rcu_head, rsc_free_rcu);
 }
 
 static inline int
@@ -542,7 +562,7 @@ static struct rsc *rsc_lookup(struct cache_detail *cd, struct rsc *item)
        struct cache_head *ch;
        int hash = rsc_hash(item);
 
-       ch = sunrpc_cache_lookup(cd, &item->h, hash);
+       ch = sunrpc_cache_lookup_rcu(cd, &item->h, hash);
        if (ch)
                return container_of(ch, struct rsc, h);
        else
@@ -1764,14 +1784,21 @@ out_err:
 }
 
 static void
-svcauth_gss_domain_release(struct auth_domain *dom)
+svcauth_gss_domain_release_rcu(struct rcu_head *head)
 {
+       struct auth_domain *dom = container_of(head, struct auth_domain, rcu_head);
        struct gss_domain *gd = container_of(dom, struct gss_domain, h);
 
        kfree(dom->name);
        kfree(gd);
 }
 
+static void
+svcauth_gss_domain_release(struct auth_domain *dom)
+{
+       call_rcu(&dom->rcu_head, svcauth_gss_domain_release_rcu);
+}
+
 static struct auth_ops svcauthops_gss = {
        .name           = "rpcsec_gss",
        .owner          = THIS_MODULE,
index 109fbe591e7bf35de11e7d5fee20de519e872aa0..f96345b1180ee9cf41013008ac2f052a29496818 100644 (file)
@@ -54,28 +54,33 @@ static void cache_init(struct cache_head *h, struct cache_detail *detail)
        h->last_refresh = now;
 }
 
-struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
-                                      struct cache_head *key, int hash)
+static struct cache_head *sunrpc_cache_find_rcu(struct cache_detail *detail,
+                                               struct cache_head *key,
+                                               int hash)
 {
-       struct cache_head *new = NULL, *freeme = NULL, *tmp = NULL;
-       struct hlist_head *head;
-
-       head = &detail->hash_table[hash];
-
-       read_lock(&detail->hash_lock);
+       struct hlist_head *head = &detail->hash_table[hash];
+       struct cache_head *tmp;
 
-       hlist_for_each_entry(tmp, head, cache_list) {
+       rcu_read_lock();
+       hlist_for_each_entry_rcu(tmp, head, cache_list) {
                if (detail->match(tmp, key)) {
                        if (cache_is_expired(detail, tmp))
-                               /* This entry is expired, we will discard it. */
-                               break;
-                       cache_get(tmp);
-                       read_unlock(&detail->hash_lock);
+                               continue;
+                       tmp = cache_get_rcu(tmp);
+                       rcu_read_unlock();
                        return tmp;
                }
        }
-       read_unlock(&detail->hash_lock);
-       /* Didn't find anything, insert an empty entry */
+       rcu_read_unlock();
+       return NULL;
+}
+
+static struct cache_head *sunrpc_cache_add_entry(struct cache_detail *detail,
+                                                struct cache_head *key,
+                                                int hash)
+{
+       struct cache_head *new, *tmp, *freeme = NULL;
+       struct hlist_head *head = &detail->hash_table[hash];
 
        new = detail->alloc();
        if (!new)
@@ -87,35 +92,46 @@ struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
        cache_init(new, detail);
        detail->init(new, key);
 
-       write_lock(&detail->hash_lock);
+       spin_lock(&detail->hash_lock);
 
        /* check if entry appeared while we slept */
-       hlist_for_each_entry(tmp, head, cache_list) {
+       hlist_for_each_entry_rcu(tmp, head, cache_list) {
                if (detail->match(tmp, key)) {
                        if (cache_is_expired(detail, tmp)) {
-                               hlist_del_init(&tmp->cache_list);
+                               hlist_del_init_rcu(&tmp->cache_list);
                                detail->entries --;
                                freeme = tmp;
                                break;
                        }
                        cache_get(tmp);
-                       write_unlock(&detail->hash_lock);
+                       spin_unlock(&detail->hash_lock);
                        cache_put(new, detail);
                        return tmp;
                }
        }
 
-       hlist_add_head(&new->cache_list, head);
+       hlist_add_head_rcu(&new->cache_list, head);
        detail->entries++;
        cache_get(new);
-       write_unlock(&detail->hash_lock);
+       spin_unlock(&detail->hash_lock);
 
        if (freeme)
                cache_put(freeme, detail);
        return new;
 }
-EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
 
+struct cache_head *sunrpc_cache_lookup_rcu(struct cache_detail *detail,
+                                          struct cache_head *key, int hash)
+{
+       struct cache_head *ret;
+
+       ret = sunrpc_cache_find_rcu(detail, key, hash);
+       if (ret)
+               return ret;
+       /* Didn't find anything, insert an empty entry */
+       return sunrpc_cache_add_entry(detail, key, hash);
+}
+EXPORT_SYMBOL_GPL(sunrpc_cache_lookup_rcu);
 
 static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch);
 
@@ -151,18 +167,18 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
        struct cache_head *tmp;
 
        if (!test_bit(CACHE_VALID, &old->flags)) {
-               write_lock(&detail->hash_lock);
+               spin_lock(&detail->hash_lock);
                if (!test_bit(CACHE_VALID, &old->flags)) {
                        if (test_bit(CACHE_NEGATIVE, &new->flags))
                                set_bit(CACHE_NEGATIVE, &old->flags);
                        else
                                detail->update(old, new);
                        cache_fresh_locked(old, new->expiry_time, detail);
-                       write_unlock(&detail->hash_lock);
+                       spin_unlock(&detail->hash_lock);
                        cache_fresh_unlocked(old, detail);
                        return old;
                }
-               write_unlock(&detail->hash_lock);
+               spin_unlock(&detail->hash_lock);
        }
        /* We need to insert a new entry */
        tmp = detail->alloc();
@@ -173,7 +189,7 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
        cache_init(tmp, detail);
        detail->init(tmp, old);
 
-       write_lock(&detail->hash_lock);
+       spin_lock(&detail->hash_lock);
        if (test_bit(CACHE_NEGATIVE, &new->flags))
                set_bit(CACHE_NEGATIVE, &tmp->flags);
        else
@@ -183,7 +199,7 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
        cache_get(tmp);
        cache_fresh_locked(tmp, new->expiry_time, detail);
        cache_fresh_locked(old, 0, detail);
-       write_unlock(&detail->hash_lock);
+       spin_unlock(&detail->hash_lock);
        cache_fresh_unlocked(tmp, detail);
        cache_fresh_unlocked(old, detail);
        cache_put(old, detail);
@@ -223,7 +239,7 @@ static int try_to_negate_entry(struct cache_detail *detail, struct cache_head *h
 {
        int rv;
 
-       write_lock(&detail->hash_lock);
+       spin_lock(&detail->hash_lock);
        rv = cache_is_valid(h);
        if (rv == -EAGAIN) {
                set_bit(CACHE_NEGATIVE, &h->flags);
@@ -231,7 +247,7 @@ static int try_to_negate_entry(struct cache_detail *detail, struct cache_head *h
                                   detail);
                rv = -ENOENT;
        }
-       write_unlock(&detail->hash_lock);
+       spin_unlock(&detail->hash_lock);
        cache_fresh_unlocked(h, detail);
        return rv;
 }
@@ -341,7 +357,7 @@ static struct delayed_work cache_cleaner;
 
 void sunrpc_init_cache_detail(struct cache_detail *cd)
 {
-       rwlock_init(&cd->hash_lock);
+       spin_lock_init(&cd->hash_lock);
        INIT_LIST_HEAD(&cd->queue);
        spin_lock(&cache_list_lock);
        cd->nextcheck = 0;
@@ -361,11 +377,11 @@ void sunrpc_destroy_cache_detail(struct cache_detail *cd)
 {
        cache_purge(cd);
        spin_lock(&cache_list_lock);
-       write_lock(&cd->hash_lock);
+       spin_lock(&cd->hash_lock);
        if (current_detail == cd)
                current_detail = NULL;
        list_del_init(&cd->others);
-       write_unlock(&cd->hash_lock);
+       spin_unlock(&cd->hash_lock);
        spin_unlock(&cache_list_lock);
        if (list_empty(&cache_list)) {
                /* module must be being unloaded so its safe to kill the worker */
@@ -422,7 +438,7 @@ static int cache_clean(void)
                struct hlist_head *head;
                struct hlist_node *tmp;
 
-               write_lock(&current_detail->hash_lock);
+               spin_lock(&current_detail->hash_lock);
 
                /* Ok, now to clean this strand */
 
@@ -433,13 +449,13 @@ static int cache_clean(void)
                        if (!cache_is_expired(current_detail, ch))
                                continue;
 
-                       hlist_del_init(&ch->cache_list);
+                       hlist_del_init_rcu(&ch->cache_list);
                        current_detail->entries--;
                        rv = 1;
                        break;
                }
 
-               write_unlock(&current_detail->hash_lock);
+               spin_unlock(&current_detail->hash_lock);
                d = current_detail;
                if (!ch)
                        current_index ++;
@@ -494,9 +510,9 @@ void cache_purge(struct cache_detail *detail)
        struct hlist_node *tmp = NULL;
        int i = 0;
 
-       write_lock(&detail->hash_lock);
+       spin_lock(&detail->hash_lock);
        if (!detail->entries) {
-               write_unlock(&detail->hash_lock);
+               spin_unlock(&detail->hash_lock);
                return;
        }
 
@@ -504,17 +520,17 @@ void cache_purge(struct cache_detail *detail)
        for (i = 0; i < detail->hash_size; i++) {
                head = &detail->hash_table[i];
                hlist_for_each_entry_safe(ch, tmp, head, cache_list) {
-                       hlist_del_init(&ch->cache_list);
+                       hlist_del_init_rcu(&ch->cache_list);
                        detail->entries--;
 
                        set_bit(CACHE_CLEANED, &ch->flags);
-                       write_unlock(&detail->hash_lock);
+                       spin_unlock(&detail->hash_lock);
                        cache_fresh_unlocked(ch, detail);
                        cache_put(ch, detail);
-                       write_lock(&detail->hash_lock);
+                       spin_lock(&detail->hash_lock);
                }
        }
-       write_unlock(&detail->hash_lock);
+       spin_unlock(&detail->hash_lock);
 }
 EXPORT_SYMBOL_GPL(cache_purge);
 
@@ -1289,21 +1305,19 @@ EXPORT_SYMBOL_GPL(qword_get);
  * get a header, then pass each real item in the cache
  */
 
-void *cache_seq_start(struct seq_file *m, loff_t *pos)
-       __acquires(cd->hash_lock)
+static void *__cache_seq_start(struct seq_file *m, loff_t *pos)
 {
        loff_t n = *pos;
        unsigned int hash, entry;
        struct cache_head *ch;
        struct cache_detail *cd = m->private;
 
-       read_lock(&cd->hash_lock);
        if (!n--)
                return SEQ_START_TOKEN;
        hash = n >> 32;
        entry = n & ((1LL<<32) - 1);
 
-       hlist_for_each_entry(ch, &cd->hash_table[hash], cache_list)
+       hlist_for_each_entry_rcu(ch, &cd->hash_table[hash], cache_list)
                if (!entry--)
                        return ch;
        n &= ~((1LL<<32) - 1);
@@ -1315,12 +1329,12 @@ void *cache_seq_start(struct seq_file *m, loff_t *pos)
        if (hash >= cd->hash_size)
                return NULL;
        *pos = n+1;
-       return hlist_entry_safe(cd->hash_table[hash].first,
+       return hlist_entry_safe(rcu_dereference_raw(
+                               hlist_first_rcu(&cd->hash_table[hash])),
                                struct cache_head, cache_list);
 }
-EXPORT_SYMBOL_GPL(cache_seq_start);
 
-void *cache_seq_next(struct seq_file *m, void *p, loff_t *pos)
+static void *cache_seq_next(struct seq_file *m, void *p, loff_t *pos)
 {
        struct cache_head *ch = p;
        int hash = (*pos >> 32);
@@ -1333,7 +1347,8 @@ void *cache_seq_next(struct seq_file *m, void *p, loff_t *pos)
                *pos += 1LL<<32;
        } else {
                ++*pos;
-               return hlist_entry_safe(ch->cache_list.next,
+               return hlist_entry_safe(rcu_dereference_raw(
+                                       hlist_next_rcu(&ch->cache_list)),
                                        struct cache_head, cache_list);
        }
        *pos &= ~((1LL<<32) - 1);
@@ -1345,18 +1360,32 @@ void *cache_seq_next(struct seq_file *m, void *p, loff_t *pos)
        if (hash >= cd->hash_size)
                return NULL;
        ++*pos;
-       return hlist_entry_safe(cd->hash_table[hash].first,
+       return hlist_entry_safe(rcu_dereference_raw(
+                               hlist_first_rcu(&cd->hash_table[hash])),
                                struct cache_head, cache_list);
 }
 EXPORT_SYMBOL_GPL(cache_seq_next);
 
-void cache_seq_stop(struct seq_file *m, void *p)
-       __releases(cd->hash_lock)
+void *cache_seq_start_rcu(struct seq_file *m, loff_t *pos)
+       __acquires(RCU)
 {
-       struct cache_detail *cd = m->private;
-       read_unlock(&cd->hash_lock);
+       rcu_read_lock();
+       return __cache_seq_start(m, pos);
+}
+EXPORT_SYMBOL_GPL(cache_seq_start_rcu);
+
+void *cache_seq_next_rcu(struct seq_file *file, void *p, loff_t *pos)
+{
+       return cache_seq_next(file, p, pos);
+}
+EXPORT_SYMBOL_GPL(cache_seq_next_rcu);
+
+void cache_seq_stop_rcu(struct seq_file *m, void *p)
+       __releases(RCU)
+{
+       rcu_read_unlock();
 }
-EXPORT_SYMBOL_GPL(cache_seq_stop);
+EXPORT_SYMBOL_GPL(cache_seq_stop_rcu);
 
 static int c_show(struct seq_file *m, void *p)
 {
@@ -1384,9 +1413,9 @@ static int c_show(struct seq_file *m, void *p)
 }
 
 static const struct seq_operations cache_content_op = {
-       .start  = cache_seq_start,
-       .next   = cache_seq_next,
-       .stop   = cache_seq_stop,
+       .start  = cache_seq_start_rcu,
+       .next   = cache_seq_next_rcu,
+       .stop   = cache_seq_stop_rcu,
        .show   = c_show,
 };
 
@@ -1844,13 +1873,13 @@ EXPORT_SYMBOL_GPL(sunrpc_cache_unregister_pipefs);
 
 void sunrpc_cache_unhash(struct cache_detail *cd, struct cache_head *h)
 {
-       write_lock(&cd->hash_lock);
+       spin_lock(&cd->hash_lock);
        if (!hlist_unhashed(&h->cache_list)){
-               hlist_del_init(&h->cache_list);
+               hlist_del_init_rcu(&h->cache_list);
                cd->entries--;
-               write_unlock(&cd->hash_lock);
+               spin_unlock(&cd->hash_lock);
                cache_put(h, cd);
        } else
-               write_unlock(&cd->hash_lock);
+               spin_unlock(&cd->hash_lock);
 }
 EXPORT_SYMBOL_GPL(sunrpc_cache_unhash);
index 87533fbb96cfa89b2fdbc9bfa4ba240d881903d4..51d36230b6e3e350da9452877b87f7b14b70dd63 100644 (file)
@@ -987,7 +987,7 @@ static void call_xpt_users(struct svc_xprt *xprt)
        spin_lock(&xprt->xpt_lock);
        while (!list_empty(&xprt->xpt_users)) {
                u = list_first_entry(&xprt->xpt_users, struct svc_xpt_user, list);
-               list_del(&u->list);
+               list_del_init(&u->list);
                u->callback(u);
        }
        spin_unlock(&xprt->xpt_lock);
index bb8db3cb8032ee0a4714cf3b49aeb83cb73037bd..775b8c94265bc329e3a36bf8b3010e53f054c1e2 100644 (file)
 extern struct auth_ops svcauth_null;
 extern struct auth_ops svcauth_unix;
 
-static DEFINE_SPINLOCK(authtab_lock);
-static struct auth_ops *authtab[RPC_AUTH_MAXFLAVOR] = {
-       [0] = &svcauth_null,
-       [1] = &svcauth_unix,
+static struct auth_ops __rcu *authtab[RPC_AUTH_MAXFLAVOR] = {
+       [RPC_AUTH_NULL] = (struct auth_ops __force __rcu *)&svcauth_null,
+       [RPC_AUTH_UNIX] = (struct auth_ops __force __rcu *)&svcauth_unix,
 };
 
+static struct auth_ops *
+svc_get_auth_ops(rpc_authflavor_t flavor)
+{
+       struct auth_ops         *aops;
+
+       if (flavor >= RPC_AUTH_MAXFLAVOR)
+               return NULL;
+       rcu_read_lock();
+       aops = rcu_dereference(authtab[flavor]);
+       if (aops != NULL && !try_module_get(aops->owner))
+               aops = NULL;
+       rcu_read_unlock();
+       return aops;
+}
+
+static void
+svc_put_auth_ops(struct auth_ops *aops)
+{
+       module_put(aops->owner);
+}
+
 int
 svc_authenticate(struct svc_rqst *rqstp, __be32 *authp)
 {
@@ -45,14 +65,11 @@ svc_authenticate(struct svc_rqst *rqstp, __be32 *authp)
 
        dprintk("svc: svc_authenticate (%d)\n", flavor);
 
-       spin_lock(&authtab_lock);
-       if (flavor >= RPC_AUTH_MAXFLAVOR || !(aops = authtab[flavor]) ||
-           !try_module_get(aops->owner)) {
-               spin_unlock(&authtab_lock);
+       aops = svc_get_auth_ops(flavor);
+       if (aops == NULL) {
                *authp = rpc_autherr_badcred;
                return SVC_DENIED;
        }
-       spin_unlock(&authtab_lock);
 
        rqstp->rq_auth_slack = 0;
        init_svc_cred(&rqstp->rq_cred);
@@ -82,7 +99,7 @@ int svc_authorise(struct svc_rqst *rqstp)
 
        if (aops) {
                rv = aops->release(rqstp);
-               module_put(aops->owner);
+               svc_put_auth_ops(aops);
        }
        return rv;
 }
@@ -90,13 +107,14 @@ int svc_authorise(struct svc_rqst *rqstp)
 int
 svc_auth_register(rpc_authflavor_t flavor, struct auth_ops *aops)
 {
+       struct auth_ops *old;
        int rv = -EINVAL;
-       spin_lock(&authtab_lock);
-       if (flavor < RPC_AUTH_MAXFLAVOR && authtab[flavor] == NULL) {
-               authtab[flavor] = aops;
-               rv = 0;
+
+       if (flavor < RPC_AUTH_MAXFLAVOR) {
+               old = cmpxchg((struct auth_ops ** __force)&authtab[flavor], NULL, aops);
+               if (old == NULL || old == aops)
+                       rv = 0;
        }
-       spin_unlock(&authtab_lock);
        return rv;
 }
 EXPORT_SYMBOL_GPL(svc_auth_register);
@@ -104,10 +122,8 @@ EXPORT_SYMBOL_GPL(svc_auth_register);
 void
 svc_auth_unregister(rpc_authflavor_t flavor)
 {
-       spin_lock(&authtab_lock);
        if (flavor < RPC_AUTH_MAXFLAVOR)
-               authtab[flavor] = NULL;
-       spin_unlock(&authtab_lock);
+               rcu_assign_pointer(authtab[flavor], NULL);
 }
 EXPORT_SYMBOL_GPL(svc_auth_unregister);
 
@@ -127,10 +143,11 @@ static struct hlist_head  auth_domain_table[DN_HASHMAX];
 static DEFINE_SPINLOCK(auth_domain_lock);
 
 static void auth_domain_release(struct kref *kref)
+       __releases(&auth_domain_lock)
 {
        struct auth_domain *dom = container_of(kref, struct auth_domain, ref);
 
-       hlist_del(&dom->hash);
+       hlist_del_rcu(&dom->hash);
        dom->flavour->domain_release(dom);
        spin_unlock(&auth_domain_lock);
 }
@@ -159,7 +176,7 @@ auth_domain_lookup(char *name, struct auth_domain *new)
                }
        }
        if (new)
-               hlist_add_head(&new->hash, head);
+               hlist_add_head_rcu(&new->hash, head);
        spin_unlock(&auth_domain_lock);
        return new;
 }
@@ -167,6 +184,21 @@ EXPORT_SYMBOL_GPL(auth_domain_lookup);
 
 struct auth_domain *auth_domain_find(char *name)
 {
-       return auth_domain_lookup(name, NULL);
+       struct auth_domain *hp;
+       struct hlist_head *head;
+
+       head = &auth_domain_table[hash_str(name, DN_HASHBITS)];
+
+       rcu_read_lock();
+       hlist_for_each_entry_rcu(hp, head, hash) {
+               if (strcmp(hp->name, name)==0) {
+                       if (!kref_get_unless_zero(&hp->ref))
+                               hp = NULL;
+                       rcu_read_unlock();
+                       return hp;
+               }
+       }
+       rcu_read_unlock();
+       return NULL;
 }
 EXPORT_SYMBOL_GPL(auth_domain_find);
index af7f28fb8102e4313f5ced6aa585e30f3911ca6c..fb9041b92f72233841cf1d173aa1b1cafe91e623 100644 (file)
@@ -37,20 +37,26 @@ struct unix_domain {
 extern struct auth_ops svcauth_null;
 extern struct auth_ops svcauth_unix;
 
-static void svcauth_unix_domain_release(struct auth_domain *dom)
+static void svcauth_unix_domain_release_rcu(struct rcu_head *head)
 {
+       struct auth_domain *dom = container_of(head, struct auth_domain, rcu_head);
        struct unix_domain *ud = container_of(dom, struct unix_domain, h);
 
        kfree(dom->name);
        kfree(ud);
 }
 
+static void svcauth_unix_domain_release(struct auth_domain *dom)
+{
+       call_rcu(&dom->rcu_head, svcauth_unix_domain_release_rcu);
+}
+
 struct auth_domain *unix_domain_find(char *name)
 {
        struct auth_domain *rv;
        struct unix_domain *new = NULL;
 
-       rv = auth_domain_lookup(name, NULL);
+       rv = auth_domain_find(name);
        while(1) {
                if (rv) {
                        if (new && rv != &new->h)
@@ -91,6 +97,7 @@ struct ip_map {
        char                    m_class[8]; /* e.g. "nfsd" */
        struct in6_addr         m_addr;
        struct unix_domain      *m_client;
+       struct rcu_head         m_rcu;
 };
 
 static void ip_map_put(struct kref *kref)
@@ -101,7 +108,7 @@ static void ip_map_put(struct kref *kref)
        if (test_bit(CACHE_VALID, &item->flags) &&
            !test_bit(CACHE_NEGATIVE, &item->flags))
                auth_domain_put(&im->m_client->h);
-       kfree(im);
+       kfree_rcu(im, m_rcu);
 }
 
 static inline int hash_ip6(const struct in6_addr *ip)
@@ -280,9 +287,9 @@ static struct ip_map *__ip_map_lookup(struct cache_detail *cd, char *class,
 
        strcpy(ip.m_class, class);
        ip.m_addr = *addr;
-       ch = sunrpc_cache_lookup(cd, &ip.h,
-                                hash_str(class, IP_HASHBITS) ^
-                                hash_ip6(addr));
+       ch = sunrpc_cache_lookup_rcu(cd, &ip.h,
+                                    hash_str(class, IP_HASHBITS) ^
+                                    hash_ip6(addr));
 
        if (ch)
                return container_of(ch, struct ip_map, h);
@@ -412,6 +419,7 @@ struct unix_gid {
        struct cache_head       h;
        kuid_t                  uid;
        struct group_info       *gi;
+       struct rcu_head         rcu;
 };
 
 static int unix_gid_hash(kuid_t uid)
@@ -426,7 +434,7 @@ static void unix_gid_put(struct kref *kref)
        if (test_bit(CACHE_VALID, &item->flags) &&
            !test_bit(CACHE_NEGATIVE, &item->flags))
                put_group_info(ug->gi);
-       kfree(ug);
+       kfree_rcu(ug, rcu);
 }
 
 static int unix_gid_match(struct cache_head *corig, struct cache_head *cnew)
@@ -619,7 +627,7 @@ static struct unix_gid *unix_gid_lookup(struct cache_detail *cd, kuid_t uid)
        struct cache_head *ch;
 
        ug.uid = uid;
-       ch = sunrpc_cache_lookup(cd, &ug.h, unix_gid_hash(uid));
+       ch = sunrpc_cache_lookup_rcu(cd, &ug.h, unix_gid_hash(uid));
        if (ch)
                return container_of(ch, struct unix_gid, h);
        else
index db8bb6b3a2b0faf387868b80539c33ae9c5626dd..3b525accaa6857bc76cda9a8b9f131791fcd1f26 100644 (file)
@@ -325,59 +325,34 @@ static int svc_one_sock_name(struct svc_sock *svsk, char *buf, int remaining)
 /*
  * Generic recvfrom routine.
  */
-static int svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr,
-                       int buflen)
+static ssize_t svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov,
+                           unsigned int nr, size_t buflen, unsigned int base)
 {
        struct svc_sock *svsk =
                container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
-       struct msghdr msg = {
-               .msg_flags      = MSG_DONTWAIT,
-       };
-       int len;
+       struct msghdr msg = { NULL };
+       ssize_t len;
 
        rqstp->rq_xprt_hlen = 0;
 
        clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
        iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, iov, nr, buflen);
-       len = sock_recvmsg(svsk->sk_sock, &msg, msg.msg_flags);
+       if (base != 0) {
+               iov_iter_advance(&msg.msg_iter, base);
+               buflen -= base;
+       }
+       len = sock_recvmsg(svsk->sk_sock, &msg, MSG_DONTWAIT);
        /* If we read a full record, then assume there may be more
         * data to read (stream based sockets only!)
         */
        if (len == buflen)
                set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 
-       dprintk("svc: socket %p recvfrom(%p, %zu) = %d\n",
+       dprintk("svc: socket %p recvfrom(%p, %zu) = %zd\n",
                svsk, iov[0].iov_base, iov[0].iov_len, len);
        return len;
 }
 
-static int svc_partial_recvfrom(struct svc_rqst *rqstp,
-                               struct kvec *iov, int nr,
-                               int buflen, unsigned int base)
-{
-       size_t save_iovlen;
-       void *save_iovbase;
-       unsigned int i;
-       int ret;
-
-       if (base == 0)
-               return svc_recvfrom(rqstp, iov, nr, buflen);
-
-       for (i = 0; i < nr; i++) {
-               if (iov[i].iov_len > base)
-                       break;
-               base -= iov[i].iov_len;
-       }
-       save_iovlen = iov[i].iov_len;
-       save_iovbase = iov[i].iov_base;
-       iov[i].iov_len -= base;
-       iov[i].iov_base += base;
-       ret = svc_recvfrom(rqstp, &iov[i], nr - i, buflen);
-       iov[i].iov_len = save_iovlen;
-       iov[i].iov_base = save_iovbase;
-       return ret;
-}
-
 /*
  * Set socket snd and rcv buffer lengths
  */
@@ -962,7 +937,8 @@ static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
                want = sizeof(rpc_fraghdr) - svsk->sk_tcplen;
                iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen;
                iov.iov_len  = want;
-               if ((len = svc_recvfrom(rqstp, &iov, 1, want)) < 0)
+               len = svc_recvfrom(rqstp, &iov, 1, want, 0);
+               if (len < 0)
                        goto error;
                svsk->sk_tcplen += len;
 
@@ -1088,14 +1064,13 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 
        vec = rqstp->rq_vec;
 
-       pnum = copy_pages_to_kvecs(&vec[0], &rqstp->rq_pages[0],
-                                               svsk->sk_datalen + want);
+       pnum = copy_pages_to_kvecs(&vec[0], &rqstp->rq_pages[0], base + want);
 
        rqstp->rq_respages = &rqstp->rq_pages[pnum];
        rqstp->rq_next_page = rqstp->rq_respages + 1;
 
        /* Now receive data */
-       len = svc_partial_recvfrom(rqstp, vec, pnum, want, base);
+       len = svc_recvfrom(rqstp, vec, pnum, base + want, base);
        if (len >= 0) {
                svsk->sk_tcplen += len;
                svsk->sk_datalen += len;
index d3a1a237cee6e4f49f6af104c8cb0c68c7463d65..f3c147d70286e8fd6ea080e7b8619cb40a8ae3df 100644 (file)
@@ -5,8 +5,6 @@
  * Support for backward direction RPCs on RPC/RDMA (server-side).
  */
 
-#include <linux/module.h>
-
 #include <linux/sunrpc/svc_rdma.h>
 
 #include "xprt_rdma.h"
@@ -32,7 +30,6 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
        struct kvec *dst, *src = &rcvbuf->head[0];
        struct rpc_rqst *req;
-       unsigned long cwnd;
        u32 credits;
        size_t len;
        __be32 xid;
@@ -66,6 +63,8 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
        if (dst->iov_len < len)
                goto out_unlock;
        memcpy(dst->iov_base, p, len);
+       xprt_pin_rqst(req);
+       spin_unlock(&xprt->queue_lock);
 
        credits = be32_to_cpup(rdma_resp + 2);
        if (credits == 0)
@@ -74,15 +73,13 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
                credits = r_xprt->rx_buf.rb_bc_max_requests;
 
        spin_lock_bh(&xprt->transport_lock);
-       cwnd = xprt->cwnd;
        xprt->cwnd = credits << RPC_CWNDSHIFT;
-       if (xprt->cwnd > cwnd)
-               xprt_release_rqst_cong(req->rq_task);
        spin_unlock_bh(&xprt->transport_lock);
 
-
+       spin_lock(&xprt->queue_lock);
        ret = 0;
        xprt_complete_rqst(req->rq_task, rcvbuf->len);
+       xprt_unpin_rqst(req);
        rcvbuf->len = 0;
 
 out_unlock:
@@ -251,7 +248,6 @@ xprt_rdma_bc_put(struct rpc_xprt *xprt)
        dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
 
        xprt_free(xprt);
-       module_put(THIS_MODULE);
 }
 
 static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
@@ -323,20 +319,9 @@ xprt_setup_rdma_bc(struct xprt_create *args)
        args->bc_xprt->xpt_bc_xprt = xprt;
        xprt->bc_xprt = args->bc_xprt;
 
-       if (!try_module_get(THIS_MODULE))
-               goto out_fail;
-
        /* Final put for backchannel xprt is in __svc_rdma_free */
        xprt_get(xprt);
        return xprt;
-
-out_fail:
-       xprt_rdma_free_addresses(xprt);
-       args->bc_xprt->xpt_bc_xprt = NULL;
-       args->bc_xprt->xpt_bc_xps = NULL;
-       xprt_put(xprt);
-       xprt_free(xprt);
-       return ERR_PTR(-EINVAL);
 }
 
 struct xprt_class xprt_rdma_bc = {
index 2848cafd4a17744d2fe9a515126611d9a891a189..2f7ec8912f49417f2fc65afc7e6d067e74559de9 100644 (file)
@@ -475,10 +475,12 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 
        /* Qualify the transport resource defaults with the
         * capabilities of this particular device */
-       newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
-       /* transport hdr, head iovec, one page list entry, tail iovec */
-       if (newxprt->sc_max_send_sges < 4) {
-               pr_err("svcrdma: too few Send SGEs available (%d)\n",
+       /* Transport header, head iovec, tail iovec */
+       newxprt->sc_max_send_sges = 3;
+       /* Add one SGE per page list entry */
+       newxprt->sc_max_send_sges += svcrdma_max_req_size / PAGE_SIZE;
+       if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge) {
+               pr_err("svcrdma: too few Send SGEs available (%d needed)\n",
                       newxprt->sc_max_send_sges);
                goto errout;
        }