SUNRPC: RPC transport queue must be low latency
authorTrond Myklebust <trond.myklebust@primarydata.com>
Fri, 27 May 2016 14:39:50 +0000 (10:39 -0400)
committerTrond Myklebust <trond.myklebust@primarydata.com>
Mon, 13 Jun 2016 16:35:51 +0000 (12:35 -0400)
rpciod can easily get congested due to the long list of queued rpc_tasks.
Having the receive queue wait in turn for those tasks to complete can
therefore be a bottleneck.

Address the problem by separating the workqueues into:
- rpciod: manages rpc_tasks
- xprtiod: manages transport related work.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
include/linux/sunrpc/sched.h
net/sunrpc/sched.c
net/sunrpc/xprt.c
net/sunrpc/xprtsock.c

index 05a1809c44d99e59813576a1e2a6daf242e2a4be..ef780b3b5e317d9e878bc99f471e4b9c620f768a 100644 (file)
@@ -247,6 +247,7 @@ void                rpc_show_tasks(struct net *);
 int            rpc_init_mempool(void);
 void           rpc_destroy_mempool(void);
 extern struct workqueue_struct *rpciod_workqueue;
+extern struct workqueue_struct *xprtiod_workqueue;
 void           rpc_prepare_task(struct rpc_task *task);
 
 static inline int rpc_wait_for_completion_task(struct rpc_task *task)
index fcfd48d263f64f1f52ef317a9a8974a8a457196e..a9f786247ffba4d5a01b521fdc0561c65162a540 100644 (file)
@@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue;
 /*
  * rpciod-related stuff
  */
-struct workqueue_struct *rpciod_workqueue;
+struct workqueue_struct *rpciod_workqueue __read_mostly;
+struct workqueue_struct *xprtiod_workqueue __read_mostly;
 
 /*
  * Disable the timer for a given RPC task. Should be called with
@@ -1071,10 +1072,22 @@ static int rpciod_start(void)
         * Create the rpciod thread and wait for it to start.
         */
        dprintk("RPC:       creating workqueue rpciod\n");
-       /* Note: highpri because network receive is latency sensitive */
-       wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+       wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
+       if (!wq)
+               goto out_failed;
        rpciod_workqueue = wq;
-       return rpciod_workqueue != NULL;
+       /* Note: highpri because network receive is latency sensitive */
+       wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+       if (!wq)
+               goto free_rpciod;
+       xprtiod_workqueue = wq;
+       return 1;
+free_rpciod:
+       wq = rpciod_workqueue;
+       rpciod_workqueue = NULL;
+       destroy_workqueue(wq);
+out_failed:
+       return 0;
 }
 
 static void rpciod_stop(void)
@@ -1088,6 +1101,9 @@ static void rpciod_stop(void)
        wq = rpciod_workqueue;
        rpciod_workqueue = NULL;
        destroy_workqueue(wq);
+       wq = xprtiod_workqueue;
+       xprtiod_workqueue = NULL;
+       destroy_workqueue(wq);
 }
 
 void
index 216a1385718a27e9f86516720d28d61b9aaac609..71df082b84a90c11b9a687f740f5fd19018a0cc9 100644 (file)
@@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
                clear_bit(XPRT_LOCKED, &xprt->state);
                smp_mb__after_atomic();
        } else
-               queue_work(rpciod_workqueue, &xprt->task_cleanup);
+               queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 }
 
 /*
@@ -645,7 +645,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
        /* Try to schedule an autoclose RPC call */
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
-               queue_work(rpciod_workqueue, &xprt->task_cleanup);
+               queue_work(xprtiod_workqueue, &xprt->task_cleanup);
        xprt_wake_pending_tasks(xprt, -EAGAIN);
        spin_unlock_bh(&xprt->transport_lock);
 }
@@ -672,7 +672,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
        /* Try to schedule an autoclose RPC call */
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
-               queue_work(rpciod_workqueue, &xprt->task_cleanup);
+               queue_work(xprtiod_workqueue, &xprt->task_cleanup);
        xprt_wake_pending_tasks(xprt, -EAGAIN);
 out:
        spin_unlock_bh(&xprt->transport_lock);
@@ -689,7 +689,7 @@ xprt_init_autodisconnect(unsigned long data)
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
                goto out_abort;
        spin_unlock(&xprt->transport_lock);
-       queue_work(rpciod_workqueue, &xprt->task_cleanup);
+       queue_work(xprtiod_workqueue, &xprt->task_cleanup);
        return;
 out_abort:
        spin_unlock(&xprt->transport_lock);
index 62b4f5a2a331b8058cc36f0d5edcea4cebf207f8..646170d0cb86839359f9ece30543d225321f1f68 100644 (file)
@@ -1095,7 +1095,7 @@ static void xs_data_ready(struct sock *sk)
                if (xprt->reestablish_timeout)
                        xprt->reestablish_timeout = 0;
                if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
-                       queue_work(rpciod_workqueue, &transport->recv_worker);
+                       queue_work(xprtiod_workqueue, &transport->recv_worker);
        }
        read_unlock_bh(&sk->sk_callback_lock);
 }
@@ -2378,7 +2378,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
                /* Start by resetting any existing state */
                xs_reset_transport(transport);
 
-               queue_delayed_work(rpciod_workqueue,
+               queue_delayed_work(xprtiod_workqueue,
                                   &transport->connect_worker,
                                   xprt->reestablish_timeout);
                xprt->reestablish_timeout <<= 1;
@@ -2388,7 +2388,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
                        xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
        } else {
                dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
-               queue_delayed_work(rpciod_workqueue,
+               queue_delayed_work(xprtiod_workqueue,
                                   &transport->connect_worker, 0);
        }
 }