xprtrdma: Replace rpcrdma_receive_wq with a per-xprt workqueue
authorChuck Lever <chuck.lever@oracle.com>
Wed, 19 Dec 2018 15:58:29 +0000 (10:58 -0500)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Wed, 2 Jan 2019 17:05:16 +0000 (12:05 -0500)
To address a connection-close ordering problem, we need the ability
to drain the RPC completions running on rpcrdma_receive_wq for just
one transport. Give each transport its own RPC completion workqueue,
and drain that workqueue when disconnecting the transport.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index dc2397731c5c614d4144176c4a20e54ffa62222b..5738c9f021444e0421e990ae7087036cee993116 100644 (file)
@@ -1356,7 +1356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
        clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 
        trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
-       queue_work(rpcrdma_receive_wq, &rep->rr_work);
+       queue_work(buf->rb_completion_wq, &rep->rr_work);
        return;
 
 out_badversion:
index ae2a83828953706e9b5cde03b8a3449b9aeb3df1..91c476a8f51c375dc884d8a34388c4a894f1d3a3 100644 (file)
@@ -444,10 +444,14 @@ xprt_rdma_close(struct rpc_xprt *xprt)
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
+       might_sleep();
+
        dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);
 
+       /* Prevent marshaling and sending of new requests */
+       xprt_clear_connected(xprt);
+
        if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
-               xprt_clear_connected(xprt);
                rpcrdma_ia_remove(ia);
                return;
        }
@@ -858,8 +862,6 @@ void xprt_rdma_cleanup(void)
                dprintk("RPC:       %s: xprt_unregister returned %i\n",
                        __func__, rc);
 
-       rpcrdma_destroy_wq();
-
        rc = xprt_unregister_transport(&xprt_rdma_bc);
        if (rc)
                dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
@@ -870,20 +872,13 @@ int xprt_rdma_init(void)
 {
        int rc;
 
-       rc = rpcrdma_alloc_wq();
-       if (rc)
-               return rc;
-
        rc = xprt_register_transport(&xprt_rdma);
-       if (rc) {
-               rpcrdma_destroy_wq();
+       if (rc)
                return rc;
-       }
 
        rc = xprt_register_transport(&xprt_rdma_bc);
        if (rc) {
                xprt_unregister_transport(&xprt_rdma);
-               rpcrdma_destroy_wq();
                return rc;
        }
 
index e4461e7c1b0c46d5902173c039095ba6e8bf0f72..cff3a5df0b9059a98076dcfe21f88d6653273e21 100644 (file)
@@ -80,33 +80,23 @@ static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
 static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
-
-int
-rpcrdma_alloc_wq(void)
+/* Wait for outstanding transport work to finish.
+ */
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 {
-       struct workqueue_struct *recv_wq;
-
-       recv_wq = alloc_workqueue("xprtrdma_receive",
-                                 WQ_MEM_RECLAIM | WQ_HIGHPRI,
-                                 0);
-       if (!recv_wq)
-               return -ENOMEM;
-
-       rpcrdma_receive_wq = recv_wq;
-       return 0;
-}
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-void
-rpcrdma_destroy_wq(void)
-{
-       struct workqueue_struct *wq;
+       /* Flush Receives, then wait for deferred Reply work
+        * to complete.
+        */
+       ib_drain_qp(ia->ri_id->qp);
+       drain_workqueue(buf->rb_completion_wq);
 
-       if (rpcrdma_receive_wq) {
-               wq = rpcrdma_receive_wq;
-               rpcrdma_receive_wq = NULL;
-               destroy_workqueue(wq);
-       }
+       /* Deferred Reply processing might have scheduled
+        * local invalidations.
+        */
+       ib_drain_sq(ia->ri_id->qp);
 }
 
 /**
@@ -483,7 +473,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
         *   connection is already gone.
         */
        if (ia->ri_id->qp) {
-               ib_drain_qp(ia->ri_id->qp);
+               rpcrdma_xprt_drain(r_xprt);
                rdma_destroy_qp(ia->ri_id);
                ia->ri_id->qp = NULL;
        }
@@ -825,8 +815,10 @@ out_noupdate:
        return rc;
 }
 
-/*
- * rpcrdma_ep_disconnect
+/**
+ * rpcrdma_ep_disconnect - Disconnect underlying transport
+ * @ep: endpoint to disconnect
+ * @ia: associated interface adapter
  *
  * This is separate from destroy to facilitate the ability
  * to reconnect without recreating the endpoint.
@@ -837,19 +829,20 @@ out_noupdate:
 void
 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
+       struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
+                                                  rx_ep);
        int rc;
 
+       /* returns without wait if ID is not connected */
        rc = rdma_disconnect(ia->ri_id);
        if (!rc)
-               /* returns without wait if not connected */
                wait_event_interruptible(ep->rep_connect_wait,
                                                        ep->rep_connected != 1);
        else
                ep->rep_connected = rc;
-       trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
-                                              rx_ep), rc);
+       trace_xprtrdma_disconnect(r_xprt, rc);
 
-       ib_drain_qp(ia->ri_id->qp);
+       rpcrdma_xprt_drain(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -1183,6 +1176,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        if (rc)
                goto out;
 
+       buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
+                                               WQ_MEM_RECLAIM | WQ_HIGHPRI,
+                                               0,
+                       r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
+       if (!buf->rb_completion_wq)
+               goto out;
+
        return 0;
 out:
        rpcrdma_buffer_destroy(buf);
@@ -1241,6 +1241,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
        cancel_delayed_work_sync(&buf->rb_refresh_worker);
 
+       if (buf->rb_completion_wq) {
+               destroy_workqueue(buf->rb_completion_wq);
+               buf->rb_completion_wq = NULL;
+       }
+
        rpcrdma_sendctxs_destroy(buf);
 
        while (!list_empty(&buf->rb_recv_bufs)) {
index 788124cd9258e6d989e090e1639abda6ef796d1c..3f198cde41e3097e93eeae641aabfd7602675615 100644 (file)
@@ -412,6 +412,7 @@ struct rpcrdma_buffer {
 
        u32                     rb_bc_max_requests;
 
+       struct workqueue_struct *rb_completion_wq;
        struct delayed_work     rb_refresh_worker;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -547,8 +548,6 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
 bool frwr_is_supported(struct rpcrdma_ia *);
 bool fmr_is_supported(struct rpcrdma_ia *);
 
-extern struct workqueue_struct *rpcrdma_receive_wq;
-
 /*
  * Endpoint calls - xprtrdma/verbs.c
  */
@@ -603,9 +602,6 @@ rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
        return __rpcrdma_dma_map_regbuf(ia, rb);
 }
 
-int rpcrdma_alloc_wq(void);
-void rpcrdma_destroy_wq(void);
-
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */