xprtrdma: Cache free MRs in each rpcrdma_req
authorChuck Lever <chuck.lever@oracle.com>
Mon, 19 Aug 2019 22:47:10 +0000 (18:47 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Wed, 21 Aug 2019 15:06:24 +0000 (11:06 -0400)
Instead of a globally-contended MR free list, cache MRs in each
rpcrdma_req as they are released. This means acquiring and releasing
an MR will be lock-free in the common case, even outside the
transport send lock.

The original idea of per-rpcrdma_req MR free lists was suggested by
Shirley Ma <shirley.ma@oracle.com> several years ago. I just now
figured out how to make that idea work with on-demand MR allocation.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
include/trace/events/rpcrdma.h
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 83c4dfd7feeae4d94b42d4fce841f9234c27eb71..a138306161075703feb10fe811088ddd964c5ee1 100644 (file)
@@ -451,16 +451,50 @@ TRACE_EVENT(xprtrdma_createmrs,
 
        TP_STRUCT__entry(
                __field(const void *, r_xprt)
+               __string(addr, rpcrdma_addrstr(r_xprt))
+               __string(port, rpcrdma_portstr(r_xprt))
                __field(unsigned int, count)
        ),
 
        TP_fast_assign(
                __entry->r_xprt = r_xprt;
                __entry->count = count;
+               __assign_str(addr, rpcrdma_addrstr(r_xprt));
+               __assign_str(port, rpcrdma_portstr(r_xprt));
        ),
 
-       TP_printk("r_xprt=%p: created %u MRs",
-               __entry->r_xprt, __entry->count
+       TP_printk("peer=[%s]:%s r_xprt=%p: created %u MRs",
+               __get_str(addr), __get_str(port), __entry->r_xprt,
+               __entry->count
+       )
+);
+
+TRACE_EVENT(xprtrdma_mr_get,
+       TP_PROTO(
+               const struct rpcrdma_req *req
+       ),
+
+       TP_ARGS(req),
+
+       TP_STRUCT__entry(
+               __field(const void *, req)
+               __field(unsigned int, task_id)
+               __field(unsigned int, client_id)
+               __field(u32, xid)
+       ),
+
+       TP_fast_assign(
+               const struct rpc_rqst *rqst = &req->rl_slot;
+
+               __entry->req = req;
+               __entry->task_id = rqst->rq_task->tk_pid;
+               __entry->client_id = rqst->rq_task->tk_client->cl_clid;
+               __entry->xid = be32_to_cpu(rqst->rq_xid);
+       ),
+
+       TP_printk("task:%u@%u xid=0x%08x req=%p",
+               __entry->task_id, __entry->client_id, __entry->xid,
+               __entry->req
        )
 );
 
index 1f2e3dda74012f6be9bacb3063a7b5e924d18b90..0e740bae2d801414efe351d7ee0dc28f1b583410 100644 (file)
@@ -488,8 +488,8 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 
        /* WARNING: Only wr_cqe and status are reliable at this point */
        trace_xprtrdma_wc_li_wake(wc, frwr);
-       complete(&frwr->fr_linv_done);
        __frwr_release_mr(wc, mr);
+       complete(&frwr->fr_linv_done);
 }
 
 /**
@@ -587,11 +587,15 @@ static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
        struct rpcrdma_frwr *frwr =
                container_of(cqe, struct rpcrdma_frwr, fr_cqe);
        struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+       struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
 
        /* WARNING: Only wr_cqe and status are reliable at this point */
        trace_xprtrdma_wc_li_done(wc, frwr);
-       rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
        __frwr_release_mr(wc, mr);
+
+       /* Ensure @rep is generated before __frwr_release_mr */
+       smp_rmb();
+       rpcrdma_complete_rqst(rep);
 }
 
 /**
@@ -624,7 +628,6 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 
                frwr = &mr->frwr;
                frwr->fr_cqe.done = frwr_wc_localinv;
-               frwr->fr_req = req;
                last = &frwr->fr_invwr;
                last->next = NULL;
                last->wr_cqe = &frwr->fr_cqe;
index 34772cb192867028dcb688f518d34cd364720398..ffeb4dfebd464773a7e392396f34db128cffbbb7 100644 (file)
@@ -348,9 +348,14 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
                                                 int nsegs, bool writing,
                                                 struct rpcrdma_mr **mr)
 {
-       *mr = rpcrdma_mr_get(r_xprt);
-       if (!*mr)
-               goto out_getmr_err;
+       *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
+       if (!*mr) {
+               *mr = rpcrdma_mr_get(r_xprt);
+               if (!*mr)
+                       goto out_getmr_err;
+               trace_xprtrdma_mr_get(req);
+               (*mr)->mr_req = req;
+       }
 
        rpcrdma_mr_push(*mr, &req->rl_registered);
        return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
index cb6df58488bbebd0eb4543fc101f6555744af5cd..69753ec73c369e09f81add1c690fedc4f1c86741 100644 (file)
@@ -77,6 +77,7 @@
 static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
+static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
 static struct rpcrdma_regbuf *
 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
                     gfp_t flags);
@@ -1022,6 +1023,7 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
        if (!req->rl_recvbuf)
                goto out4;
 
+       INIT_LIST_HEAD(&req->rl_free_mrs);
        INIT_LIST_HEAD(&req->rl_registered);
        spin_lock(&buffer->rb_lock);
        list_add(&req->rl_all, &buffer->rb_allreqs);
@@ -1130,11 +1132,13 @@ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
  * This function assumes that the caller prevents concurrent device
  * unload and transport tear-down.
  */
-void
-rpcrdma_req_destroy(struct rpcrdma_req *req)
+void rpcrdma_req_destroy(struct rpcrdma_req *req)
 {
        list_del(&req->rl_all);
 
+       while (!list_empty(&req->rl_free_mrs))
+               rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
+
        rpcrdma_regbuf_free(req->rl_recvbuf);
        rpcrdma_regbuf_free(req->rl_sendbuf);
        rpcrdma_regbuf_free(req->rl_rdmabuf);
@@ -1228,7 +1232,6 @@ rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
 void rpcrdma_mr_put(struct rpcrdma_mr *mr)
 {
        struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
-       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 
        if (mr->mr_dir != DMA_NONE) {
                trace_xprtrdma_mr_unmap(mr);
@@ -1237,6 +1240,15 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
                mr->mr_dir = DMA_NONE;
        }
 
+       rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
+}
+
+static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
+{
+       struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+       mr->mr_req = NULL;
        spin_lock(&buf->rb_mrlock);
        rpcrdma_mr_push(mr, &buf->rb_mrs);
        spin_unlock(&buf->rb_mrlock);
index 9573587ca602d7e7aee8f769d1d3b7b124cf093b..c375b0e434ac7e1bfa2d10fb54fde8f19208b1af 100644 (file)
@@ -234,20 +234,20 @@ struct rpcrdma_sendctx {
  * An external memory region is any buffer or page that is registered
  * on the fly (ie, not pre-registered).
  */
-struct rpcrdma_req;
 struct rpcrdma_frwr {
        struct ib_mr                    *fr_mr;
        struct ib_cqe                   fr_cqe;
        struct completion               fr_linv_done;
-       struct rpcrdma_req              *fr_req;
        union {
                struct ib_reg_wr        fr_regwr;
                struct ib_send_wr       fr_invwr;
        };
 };
 
+struct rpcrdma_req;
 struct rpcrdma_mr {
        struct list_head        mr_list;
+       struct rpcrdma_req      *mr_req;
        struct scatterlist      *mr_sg;
        int                     mr_nents;
        enum dma_data_direction mr_dir;
@@ -325,7 +325,8 @@ struct rpcrdma_req {
        struct list_head        rl_all;
        struct kref             rl_kref;
 
-       struct list_head        rl_registered;  /* registered segments */
+       struct list_head        rl_free_mrs;
+       struct list_head        rl_registered;
        struct rpcrdma_mr_seg   rl_segments[RPCRDMA_MAX_SEGS];
 };