Currently if several items of work become available in quick succession, that number of threads (if available) will be woken. By the time some of them wake up another thread that was already cache-warm might have come along and completed the work. Anecdotal evidence suggests as many as 15% of wakes find nothing to do once they get to the point of looking. This patch changes svc_pool_wake_idle_thread() to wake the first thread on the queue but NOT remove it. Subsequent calls will wake the same thread. Once that thread starts it will dequeue itself and after dequeueing some work to do, it will wake the next thread if there is more work ready. This results in a more orderly increase in the number of busy threads. As a bonus, this allows us to reduce locking around the idle queue. svc_pool_wake_idle_thread() no longer needs to take a lock (beyond rcu_read_lock()) as it doesn't manipulate the queue, it just looks at the first item. The thread itself can avoid locking by using the new llist_del_first_this() interface. This will safely remove the thread itself if it is the head. If it isn't the head, it will do nothing. If multiple threads call this concurrently only one will succeed. The others will do nothing, so no corruption can result. If a thread wakes up and finds that it cannot dequeue itself that means either - that it wasn't woken because it was the head of the queue. Maybe the freezer woke it. In that case it can go back to sleep (after trying to freeze of course). - some other thread found there was nothing to do very recently, and placed itself on the head of the queue in front of this thread. It must check again after placing itself there, so it can be deemed to be responsible for any pending work, and this thread can go back to sleep until woken. No code ever tests for busy threads any more. Only each thread itself cares if it is busy. So functions to set and test the busy status are no longer needed. Signed-off-by: NeilBrown <neilb@xxxxxxx> --- include/linux/llist.h | 2 ++ include/linux/sunrpc/svc.h | 24 -------------------- lib/llist.c | 27 ++++++++++++++++++++++ net/sunrpc/svc.c | 15 +++++-------- net/sunrpc/svc_xprt.c | 46 ++++++++++++++++++++++---------------- 5 files changed, 62 insertions(+), 52 deletions(-) diff --git a/include/linux/llist.h b/include/linux/llist.h index 85bda2d02d65..d8b1b73f3df0 100644 --- a/include/linux/llist.h +++ b/include/linux/llist.h @@ -248,6 +248,8 @@ static inline struct llist_node *__llist_del_all(struct llist_head *head) } extern struct llist_node *llist_del_first(struct llist_head *head); +extern struct llist_node *llist_del_first_this(struct llist_head *head, + struct llist_node *this); struct llist_node *llist_reverse_order(struct llist_node *head); diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h index 5216f95411e3..dafa362b4fdd 100644 --- a/include/linux/sunrpc/svc.h +++ b/include/linux/sunrpc/svc.h @@ -266,30 +266,6 @@ enum { RQ_DATA, /* request has data */ }; -/** - * svc_thread_set_busy - mark a thread as busy - * @rqstp: the thread which is now busy - * - * By convention a thread is busy if rq_idle.next points to rq_idle. - * This ensures it is not on the idle list. - */ -static inline void svc_thread_set_busy(struct svc_rqst *rqstp) -{ - rqstp->rq_idle.next = &rqstp->rq_idle; -} - -/** - * svc_thread_busy - check if a thread as busy - * @rqstp: the thread which might be busy - * - * By convention a thread is busy if rq_idle.next points to rq_idle. - * This ensures it is not on the idle list. - */ -static inline bool svc_thread_busy(struct svc_rqst *rqstp) -{ - return rqstp->rq_idle.next == &rqstp->rq_idle; -} - #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net) /* diff --git a/lib/llist.c b/lib/llist.c index 6e668fa5a2c6..7e8a13a13586 100644 --- a/lib/llist.c +++ b/lib/llist.c @@ -65,6 +65,33 @@ struct llist_node *llist_del_first(struct llist_head *head) } EXPORT_SYMBOL_GPL(llist_del_first); +/** + * llist_del_first_this - delete given entry of lock-less list if it is first + * @head: the head for your lock-less list + * @this: a list entry. + * + * If head of the list is given entry, delete and return it, else + * return %NULL. + * + * Providing the caller has exclusive access to @this, multiple callers can + * safely call this concurrently with multiple llist_add() callers. + */ +struct llist_node *llist_del_first_this(struct llist_head *head, + struct llist_node *this) +{ + struct llist_node *entry, *next; + + entry = smp_load_acquire(&head->first); + do { + if (entry != this) + return NULL; + next = READ_ONCE(entry->next); + } while (!try_cmpxchg(&head->first, &entry, next)); + + return entry; +} +EXPORT_SYMBOL_GPL(llist_del_first_this); + /** * llist_reverse_order - reverse order of a llist chain * @head: first item of the list to be reversed diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index addbf28ea50a..3267d740235e 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -642,7 +642,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node) folio_batch_init(&rqstp->rq_fbatch); - svc_thread_set_busy(rqstp); rqstp->rq_server = serv; rqstp->rq_pool = pool; @@ -704,18 +703,16 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool) struct llist_node *ln; rcu_read_lock(); - spin_lock_bh(&pool->sp_lock); - ln = llist_del_first(&pool->sp_idle_threads); - spin_unlock_bh(&pool->sp_lock); + ln = READ_ONCE(pool->sp_idle_threads.first); if (ln) { rqstp = llist_entry(ln, struct svc_rqst, rq_idle); - svc_thread_set_busy(rqstp); - WRITE_ONCE(rqstp->rq_qtime, ktime_get()); - wake_up_process(rqstp->rq_task); + if (!task_is_running(rqstp->rq_task)) { + wake_up_process(rqstp->rq_task); + trace_svc_wake_up(rqstp->rq_task->pid); + percpu_counter_inc(&pool->sp_threads_woken); + } rcu_read_unlock(); - percpu_counter_inc(&pool->sp_threads_woken); - trace_svc_wake_up(rqstp->rq_task->pid); return; } rcu_read_unlock(); diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index 81327001e074..397ca12299a2 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -732,26 +732,21 @@ static void svc_rqst_wait_for_work(struct svc_rqst *rqstp) if (rqst_should_sleep(rqstp)) { set_current_state(TASK_IDLE | TASK_FREEZABLE); llist_add(&rqstp->rq_idle, &pool->sp_idle_threads); + /* Maybe there were no idle threads when some work + * became ready and so nothing was woken. We've just + * become idle so someone can do the work - maybe us. + * So check again for work to do. + */ + if (likely(rqst_should_sleep(rqstp))) + schedule(); - if (unlikely(!rqst_should_sleep(rqstp))) - /* maybe there were no idle threads when some work - * became ready and so nothing was woken. We've just - * become idle so someone can to the work - maybe us. - * But we cannot reliably remove ourselves from the - * idle list - we can only remove the first task which - * might be us, and might not. - * So remove and wake it, then schedule(). If it was - * us, we won't sleep. If it is some other thread, they - * will do the work. + while (llist_del_first_this(&pool->sp_idle_threads, + &rqstp->rq_idle) == NULL) { + /* Cannot remove myself, so some other thread + * must have queued themselves after finding + * no work to do, so they have taken responsibility + * for any outstanding work. */ - svc_pool_wake_idle_thread(pool); - - /* We mustn't continue while on the idle list, and we - * cannot remove outselves reliably. The only "work" - * we can do while on the idle list is to freeze. - * So loop until someone removes us - */ - while (!svc_thread_busy(rqstp)) { schedule(); set_current_state(TASK_IDLE | TASK_FREEZABLE); } @@ -841,6 +836,15 @@ static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) svc_xprt_release(rqstp); } +static void wake_next(struct svc_rqst *rqstp) +{ + if (!rqst_should_sleep(rqstp)) + /* More work pending after I dequeued some, + * wake another worker + */ + svc_pool_wake_idle_thread(rqstp->rq_pool); +} + /** * svc_recv - Receive and process the next request on any transport * @rqstp: an idle RPC service thread @@ -860,13 +864,16 @@ void svc_recv(struct svc_rqst *rqstp) clear_bit(SP_TASK_PENDING, &pool->sp_flags); - if (svc_thread_should_stop(rqstp)) + if (svc_thread_should_stop(rqstp)) { + wake_next(rqstp); return; + } rqstp->rq_xprt = svc_xprt_dequeue(pool); if (rqstp->rq_xprt) { struct svc_xprt *xprt = rqstp->rq_xprt; + wake_next(rqstp); /* Normally we will wait up to 5 seconds for any required * cache information to be provided. When there are no * idle threads, we reduce the wait time. @@ -891,6 +898,7 @@ void svc_recv(struct svc_rqst *rqstp) if (req) { list_del(&req->rq_bc_list); spin_unlock_bh(&serv->sv_cb_lock); + wake_next(rqstp); svc_process_bc(req, rqstp); return; -- 2.40.1