Merge tag 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/dledford/rdma
[sfrench/cifs-2.6.git] / net / sunrpc / xprtrdma / verbs.c
index 6561d4a35acbe5f7bfeb7198f9f6b58d3bb5048b..be3178e5e2d24c4a46549060f464f3d3b4debd6a 100644 (file)
@@ -51,6 +51,7 @@
 #include <linux/slab.h>
 #include <linux/prefetch.h>
 #include <linux/sunrpc/addr.h>
+#include <linux/sunrpc/svc_rdma.h>
 #include <asm/bitops.h>
 #include <linux/module.h> /* try_module_get()/module_put() */
 
@@ -923,7 +924,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        }
 
        INIT_LIST_HEAD(&buf->rb_recv_bufs);
-       for (i = 0; i < buf->rb_max_requests; i++) {
+       for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) {
                struct rpcrdma_rep *rep;
 
                rep = rpcrdma_create_rep(r_xprt);
@@ -1018,6 +1019,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
                rep = rpcrdma_buffer_get_rep_locked(buf);
                rpcrdma_destroy_rep(ia, rep);
        }
+       buf->rb_send_count = 0;
 
        spin_lock(&buf->rb_reqslock);
        while (!list_empty(&buf->rb_allreqs)) {
@@ -1032,6 +1034,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
                spin_lock(&buf->rb_reqslock);
        }
        spin_unlock(&buf->rb_reqslock);
+       buf->rb_recv_count = 0;
 
        rpcrdma_destroy_mrs(buf);
 }
@@ -1074,8 +1077,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
        spin_unlock(&buf->rb_mwlock);
 }
 
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
+{
+       /* If an RPC previously completed without a reply (say, a
+        * credential problem or a soft timeout occurs) then hold off
+        * on supplying more Receive buffers until the number of new
+        * pending RPCs catches up to the number of posted Receives.
+        */
+       if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
+               return NULL;
+
+       if (unlikely(list_empty(&buffers->rb_recv_bufs)))
+               return NULL;
+       buffers->rb_recv_count++;
+       return rpcrdma_buffer_get_rep_locked(buffers);
+}
+
 /*
  * Get a set of request/reply buffers.
+ *
+ * Reply buffer (if available) is attached to send buffer upon return.
  */
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
@@ -1085,21 +1107,15 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
        spin_lock(&buffers->rb_lock);
        if (list_empty(&buffers->rb_send_bufs))
                goto out_reqbuf;
+       buffers->rb_send_count++;
        req = rpcrdma_buffer_get_req_locked(buffers);
-       if (list_empty(&buffers->rb_recv_bufs))
-               goto out_repbuf;
-       req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+       req->rl_reply = rpcrdma_buffer_get_rep(buffers);
        spin_unlock(&buffers->rb_lock);
        return req;
 
 out_reqbuf:
        spin_unlock(&buffers->rb_lock);
-       pr_warn("rpcrdma: out of request buffers (%p)\n", buffers);
-       return NULL;
-out_repbuf:
-       list_add(&req->rl_free, &buffers->rb_send_bufs);
-       spin_unlock(&buffers->rb_lock);
-       pr_warn("rpcrdma: out of reply buffers (%p)\n", buffers);
+       pr_warn("RPC:       %s: out of request buffers\n", __func__);
        return NULL;
 }
 
@@ -1117,9 +1133,12 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
        req->rl_reply = NULL;
 
        spin_lock(&buffers->rb_lock);
+       buffers->rb_send_count--;
        list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
-       if (rep)
+       if (rep) {
+               buffers->rb_recv_count--;
                list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+       }
        spin_unlock(&buffers->rb_lock);
 }
 
@@ -1133,8 +1152,7 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
        struct rpcrdma_buffer *buffers = req->rl_buffer;
 
        spin_lock(&buffers->rb_lock);
-       if (!list_empty(&buffers->rb_recv_bufs))
-               req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+       req->rl_reply = rpcrdma_buffer_get_rep(buffers);
        spin_unlock(&buffers->rb_lock);
 }
 
@@ -1148,6 +1166,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
        struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
 
        spin_lock(&buffers->rb_lock);
+       buffers->rb_recv_count--;
        list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
        spin_unlock(&buffers->rb_lock);
 }