If the server is slow, we can find ourselves with quite a lot of entries on the receive queue. Converting the search from an O(n) to O(log(n)) can make a significant difference, particularly since we have to hold a number of locks while searching. Signed-off-by: Trond Myklebust <trond.myklebust@xxxxxxxxxxxxxxx> --- include/linux/sunrpc/xprt.h | 4 +- net/sunrpc/xprt.c | 93 ++++++++++++++++++++++++++++++++----- 2 files changed, 84 insertions(+), 13 deletions(-) diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 823860cce0bc..9be399020dab 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -85,7 +85,7 @@ struct rpc_rqst { union { struct list_head rq_list; /* Slot allocation list */ - struct list_head rq_recv; /* Receive queue */ + struct rb_node rq_recv; /* Receive queue */ }; struct list_head rq_xmit; /* Send queue */ @@ -260,7 +260,7 @@ struct rpc_xprt { * backchannel rpc_rqst's */ #endif /* CONFIG_SUNRPC_BACKCHANNEL */ - struct list_head recv_queue; /* Receive queue */ + struct rb_root recv_queue; /* Receive queue */ struct { unsigned long bind_count, /* total number of binds */ diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index a1cb28a4adad..051638d5b39c 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -753,7 +753,7 @@ static void xprt_schedule_autodisconnect(struct rpc_xprt *xprt) __must_hold(&xprt->transport_lock) { - if (list_empty(&xprt->recv_queue) && xprt_has_timer(xprt)) + if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); } @@ -763,7 +763,7 @@ xprt_init_autodisconnect(struct timer_list *t) struct rpc_xprt *xprt = from_timer(xprt, t, timer); spin_lock(&xprt->transport_lock); - if (!list_empty(&xprt->recv_queue)) + if (!RB_EMPTY_ROOT(&xprt->recv_queue)) goto out_abort; /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ xprt->last_used = jiffies; @@ -880,6 +880,75 @@ static void xprt_connect_status(struct rpc_task *task) } } +enum xprt_xid_rb_cmp { + XID_RB_EQUAL, + XID_RB_LEFT, + XID_RB_RIGHT, +}; +static enum xprt_xid_rb_cmp +xprt_xid_cmp(__be32 xid1, __be32 xid2) +{ + if (xid1 == xid2) + return XID_RB_EQUAL; + if ((__force u32)xid1 < (__force u32)xid2) + return XID_RB_LEFT; + return XID_RB_RIGHT; +} + +static struct rpc_rqst * +xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) +{ + struct rb_node *n = xprt->recv_queue.rb_node; + struct rpc_rqst *req; + + while (n != NULL) { + req = rb_entry(n, struct rpc_rqst, rq_recv); + switch (xprt_xid_cmp(xid, req->rq_xid)) { + case XID_RB_LEFT: + n = n->rb_left; + break; + case XID_RB_RIGHT: + n = n->rb_right; + break; + case XID_RB_EQUAL: + return req; + } + } + return NULL; +} + +static void +xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) +{ + struct rb_node **p = &xprt->recv_queue.rb_node; + struct rb_node *n = NULL; + struct rpc_rqst *req; + + while (*p != NULL) { + n = *p; + req = rb_entry(n, struct rpc_rqst, rq_recv); + switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { + case XID_RB_LEFT: + p = &n->rb_left; + break; + case XID_RB_RIGHT: + p = &n->rb_right; + break; + case XID_RB_EQUAL: + WARN_ON_ONCE(new != req); + return; + } + } + rb_link_node(&new->rq_recv, n, p); + rb_insert_color(&new->rq_recv, &xprt->recv_queue); +} + +static void +xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) +{ + rb_erase(&req->rq_recv, &xprt->recv_queue); +} + /** * xprt_lookup_rqst - find an RPC request corresponding to an XID * @xprt: transport on which the original request was transmitted @@ -891,12 +960,12 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) { struct rpc_rqst *entry; - list_for_each_entry(entry, &xprt->recv_queue, rq_recv) - if (entry->rq_xid == xid) { - trace_xprt_lookup_rqst(xprt, xid, 0); - entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); - return entry; - } + entry = xprt_request_rb_find(xprt, xid); + if (entry != NULL) { + trace_xprt_lookup_rqst(xprt, xid, 0); + entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); + return entry; + } dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", ntohl(xid)); @@ -980,7 +1049,7 @@ xprt_request_enqueue_receive(struct rpc_task *task) sizeof(req->rq_private_buf)); /* Add request to the receive list */ - list_add_tail(&req->rq_recv, &xprt->recv_queue); + xprt_request_rb_insert(xprt, req); set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); spin_unlock(&xprt->queue_lock); @@ -998,8 +1067,10 @@ xprt_request_enqueue_receive(struct rpc_task *task) static void xprt_request_dequeue_receive_locked(struct rpc_task *task) { + struct rpc_rqst *req = task->tk_rqstp; + if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) - list_del(&task->tk_rqstp->rq_recv); + xprt_request_rb_remove(req->rq_xprt, req); } /** @@ -1710,7 +1781,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) spin_lock_init(&xprt->queue_lock); INIT_LIST_HEAD(&xprt->free); - INIT_LIST_HEAD(&xprt->recv_queue); + xprt->recv_queue = RB_ROOT; INIT_LIST_HEAD(&xprt->xmit_queue); #if defined(CONFIG_SUNRPC_BACKCHANNEL) spin_lock_init(&xprt->bc_pa_lock); -- 2.17.1