SUNRPC: Convert xprt receive queue to use an rbtree
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Fri, 7 Sep 2018 12:35:22 +0000 (08:35 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sun, 30 Sep 2018 19:35:16 +0000 (15:35 -0400)
If the server is slow, we can find ourselves with quite a lot of entries
on the receive queue. Converting the search from an O(n) to O(log(n))
can make a significant difference, particularly since we have to hold
a number of locks while searching.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
include/linux/sunrpc/xprt.h
net/sunrpc/xprt.c

index 823860cce0bc9f513a2570571aa4a9c7e478a262..9be399020dab691ca6124346c53fb3cdd7591e6b 100644 (file)
@@ -85,7 +85,7 @@ struct rpc_rqst {
 
        union {
                struct list_head        rq_list;        /* Slot allocation list */
-               struct list_head        rq_recv;        /* Receive queue */
+               struct rb_node          rq_recv;        /* Receive queue */
        };
 
        struct list_head        rq_xmit;        /* Send queue */
@@ -260,7 +260,7 @@ struct rpc_xprt {
                                                 * backchannel rpc_rqst's */
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
-       struct list_head        recv_queue;     /* Receive queue */
+       struct rb_root          recv_queue;     /* Receive queue */
 
        struct {
                unsigned long           bind_count,     /* total number of binds */
index 11133ba716b9d21bce40bcc857146760cfca5221..480461ad0c86117de2eb163bec02bf4c614df51e 100644 (file)
@@ -753,7 +753,7 @@ static void
 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
        __must_hold(&xprt->transport_lock)
 {
-       if (list_empty(&xprt->recv_queue) && xprt_has_timer(xprt))
+       if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
                mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
 }
 
@@ -763,7 +763,7 @@ xprt_init_autodisconnect(struct timer_list *t)
        struct rpc_xprt *xprt = from_timer(xprt, t, timer);
 
        spin_lock(&xprt->transport_lock);
-       if (!list_empty(&xprt->recv_queue))
+       if (!RB_EMPTY_ROOT(&xprt->recv_queue))
                goto out_abort;
        /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
        xprt->last_used = jiffies;
@@ -880,6 +880,75 @@ static void xprt_connect_status(struct rpc_task *task)
        }
 }
 
+enum xprt_xid_rb_cmp {
+       XID_RB_EQUAL,
+       XID_RB_LEFT,
+       XID_RB_RIGHT,
+};
+static enum xprt_xid_rb_cmp
+xprt_xid_cmp(__be32 xid1, __be32 xid2)
+{
+       if (xid1 == xid2)
+               return XID_RB_EQUAL;
+       if ((__force u32)xid1 < (__force u32)xid2)
+               return XID_RB_LEFT;
+       return XID_RB_RIGHT;
+}
+
+static struct rpc_rqst *
+xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
+{
+       struct rb_node *n = xprt->recv_queue.rb_node;
+       struct rpc_rqst *req;
+
+       while (n != NULL) {
+               req = rb_entry(n, struct rpc_rqst, rq_recv);
+               switch (xprt_xid_cmp(xid, req->rq_xid)) {
+               case XID_RB_LEFT:
+                       n = n->rb_left;
+                       break;
+               case XID_RB_RIGHT:
+                       n = n->rb_right;
+                       break;
+               case XID_RB_EQUAL:
+                       return req;
+               }
+       }
+       return NULL;
+}
+
+static void
+xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
+{
+       struct rb_node **p = &xprt->recv_queue.rb_node;
+       struct rb_node *n = NULL;
+       struct rpc_rqst *req;
+
+       while (*p != NULL) {
+               n = *p;
+               req = rb_entry(n, struct rpc_rqst, rq_recv);
+               switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
+               case XID_RB_LEFT:
+                       p = &n->rb_left;
+                       break;
+               case XID_RB_RIGHT:
+                       p = &n->rb_right;
+                       break;
+               case XID_RB_EQUAL:
+                       WARN_ON_ONCE(new != req);
+                       return;
+               }
+       }
+       rb_link_node(&new->rq_recv, n, p);
+       rb_insert_color(&new->rq_recv, &xprt->recv_queue);
+}
+
+static void
+xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+       rb_erase(&req->rq_recv, &xprt->recv_queue);
+}
+
 /**
  * xprt_lookup_rqst - find an RPC request corresponding to an XID
  * @xprt: transport on which the original request was transmitted
@@ -891,12 +960,12 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 {
        struct rpc_rqst *entry;
 
-       list_for_each_entry(entry, &xprt->recv_queue, rq_recv)
-               if (entry->rq_xid == xid) {
-                       trace_xprt_lookup_rqst(xprt, xid, 0);
-                       entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
-                       return entry;
-               }
+       entry = xprt_request_rb_find(xprt, xid);
+       if (entry != NULL) {
+               trace_xprt_lookup_rqst(xprt, xid, 0);
+               entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
+               return entry;
+       }
 
        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
                        ntohl(xid));
@@ -981,7 +1050,7 @@ xprt_request_enqueue_receive(struct rpc_task *task)
                        sizeof(req->rq_private_buf));
 
        /* Add request to the receive list */
-       list_add_tail(&req->rq_recv, &xprt->recv_queue);
+       xprt_request_rb_insert(xprt, req);
        set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
        spin_unlock(&xprt->queue_lock);
 
@@ -999,8 +1068,10 @@ xprt_request_enqueue_receive(struct rpc_task *task)
 static void
 xprt_request_dequeue_receive_locked(struct rpc_task *task)
 {
+       struct rpc_rqst *req = task->tk_rqstp;
+
        if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
-               list_del(&task->tk_rqstp->rq_recv);
+               xprt_request_rb_remove(req->rq_xprt, req);
 }
 
 /**
@@ -1711,7 +1782,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
        spin_lock_init(&xprt->queue_lock);
 
        INIT_LIST_HEAD(&xprt->free);
-       INIT_LIST_HEAD(&xprt->recv_queue);
+       xprt->recv_queue = RB_ROOT;
        INIT_LIST_HEAD(&xprt->xmit_queue);
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
        spin_lock_init(&xprt->bc_pa_lock);