On Mon, Jul 10, 2023 at 10:41:38AM +1000, NeilBrown wrote: > > The patch show an alternate approach to the recent patches which improve > the latency for waking an idle thread when work is ready. > > The current approach involves searching a linked list for an idle thread > to wake. The recent patches instead use a bitmap search to find the > idle thread. With this patch no search is needed - and idle thread is > directly available without searching. > > The idle threads are kept in an "llist" - there is no longer a list of > all threads. > > The llist ADT does not allow concurrent delete_first operations, so to > wake an idle thread we simply wake it and do not remove it from the > list. > When the thread is scheduled it will remove itself - which is safe - and > will take the next thread if there is more work to do (and if there is > another thread). > > The "remove itself" requires and addition to the llist api. > "llist_del_first_this()" removes a given item if is it the first. > Multiple callers can call this concurrently as along as they each give a > different "this", so each thread can safely try to remove itself. It > must be prepared for failure. > > Reducing the thread count currently requires finding any thing, idle or > not, and calling kthread_stop(). This no longer possible as we don't > have a list of all threads (though I guess we could keep the list if we > wanted to...). Instead the pool is marked NEED_VICTIM and the next > thread to go idle will become the VICTIM and duly exit - signalling > this be clearing VICTIM_REMAINS. We replace kthread_should_stop() call > with a new svc_should_stop() which checks and sets victim flags. > > nfsd threads can currently be told to exit with a signal. It might be > time to deprecate/remove this feature. However this patch does support > it. > > If the signalled thread is not at the head of the idle list it cannot > remove itself. In this case it sets RQ_CLEAN_ME and SP_CLEANUP and the > next thread to wake up will use llist_del_all_this() to remove all > threads from the idle list. It then finds and removes any RQ_CLEAN_ME > threads and puts the rest back on the list. > > There is quite a bit of churn here so it will need careful testing. > In fact - it doesn't handle nfsv4 callback handling threads properly as > they don't wait the same way that other threads wait... I'll need to > think about that but I don't have time just now. > > For now it is primarily an RFC. I haven't given a lot of thought to > trace points. > > It apply it you will need > > SUNRPC: Deduplicate thread wake-up code > SUNRPC: Report when no service thread is available. > SUNRPC: Split the svc_xprt_dequeue tracepoint > SUNRPC: Clean up svc_set_num_threads > SUNRPC: Replace dprintk() call site in __svc_create() > > from recent post by Chuck. Hi, thanks for letting us see your pencil sketches. :-) Later today, I'll push a topic branch to my kernel.org repo that we can use as a base for continuing this work. Some initial remarks below, recognizing that this patch is still incomplete. > Signed-off-by: NeilBrown <neilb@xxxxxxx> > --- > fs/lockd/svc.c | 4 +- > fs/lockd/svclock.c | 4 +- > fs/nfs/callback.c | 5 +- > fs/nfsd/nfssvc.c | 3 +- > include/linux/llist.h | 4 + > include/linux/lockd/lockd.h | 2 +- > include/linux/sunrpc/svc.h | 55 +++++++++----- > include/trace/events/sunrpc.h | 7 +- > lib/llist.c | 51 +++++++++++++ > net/sunrpc/svc.c | 139 ++++++++++++++++++++++++---------- > net/sunrpc/svc_xprt.c | 61 ++++++++------- > 11 files changed, 239 insertions(+), 96 deletions(-) > > diff --git a/fs/lockd/svc.c b/fs/lockd/svc.c > index 22d3ff3818f5..df295771bd40 100644 > --- a/fs/lockd/svc.c > +++ b/fs/lockd/svc.c > @@ -147,7 +147,7 @@ lockd(void *vrqstp) > * The main request loop. We don't terminate until the last > * NFS mount or NFS daemon has gone away. > */ > - while (!kthread_should_stop()) { > + while (!svc_should_stop(rqstp)) { > long timeout = MAX_SCHEDULE_TIMEOUT; > RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]); > > @@ -160,7 +160,7 @@ lockd(void *vrqstp) > continue; > } > > - timeout = nlmsvc_retry_blocked(); > + timeout = nlmsvc_retry_blocked(rqstp); > > /* > * Find a socket with data available and call its > diff --git a/fs/lockd/svclock.c b/fs/lockd/svclock.c > index c43ccdf28ed9..54b679fcbcab 100644 > --- a/fs/lockd/svclock.c > +++ b/fs/lockd/svclock.c > @@ -1009,13 +1009,13 @@ retry_deferred_block(struct nlm_block *block) > * be retransmitted. > */ > unsigned long > -nlmsvc_retry_blocked(void) > +nlmsvc_retry_blocked(struct svc_rqst *rqstp) > { > unsigned long timeout = MAX_SCHEDULE_TIMEOUT; > struct nlm_block *block; > > spin_lock(&nlm_blocked_lock); > - while (!list_empty(&nlm_blocked) && !kthread_should_stop()) { > + while (!list_empty(&nlm_blocked) && !svc_should_stop(rqstp)) { > block = list_entry(nlm_blocked.next, struct nlm_block, b_list); > > if (block->b_when == NLM_NEVER) > diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c > index 456af7d230cf..646425f1dc36 100644 > --- a/fs/nfs/callback.c > +++ b/fs/nfs/callback.c > @@ -111,7 +111,7 @@ nfs41_callback_svc(void *vrqstp) > > set_freezable(); > > - while (!kthread_freezable_should_stop(NULL)) { > + while (!svc_should_stop(rqstp)) { > > if (signal_pending(current)) > flush_signals(current); > @@ -130,10 +130,11 @@ nfs41_callback_svc(void *vrqstp) > error); > } else { > spin_unlock_bh(&serv->sv_cb_lock); > - if (!kthread_should_stop()) > + if (!svc_should_stop(rqstp)) > schedule(); > finish_wait(&serv->sv_cb_waitq, &wq); > } > + try_to_freeze(); > } > > svc_exit_thread(rqstp); > diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c > index 9c7b1ef5be40..7cfa7f2e9bf7 100644 > --- a/fs/nfsd/nfssvc.c > +++ b/fs/nfsd/nfssvc.c > @@ -62,8 +62,7 @@ static __be32 nfsd_init_request(struct svc_rqst *, > * If (out side the lock) nn->nfsd_serv is non-NULL, then it must point to a > * properly initialised 'struct svc_serv' with ->sv_nrthreads > 0 (unless > * nn->keep_active is set). That number of nfsd threads must > - * exist and each must be listed in ->sp_all_threads in some entry of > - * ->sv_pools[]. > + * exist. > * > * Each active thread holds a counted reference on nn->nfsd_serv, as does > * the nn->keep_active flag and various transient calls to svc_get(). > diff --git a/include/linux/llist.h b/include/linux/llist.h > index 85bda2d02d65..5a22499844c8 100644 > --- a/include/linux/llist.h > +++ b/include/linux/llist.h > @@ -248,6 +248,10 @@ static inline struct llist_node *__llist_del_all(struct llist_head *head) > } > > extern struct llist_node *llist_del_first(struct llist_head *head); > +extern struct llist_node *llist_del_first_this(struct llist_head *head, > + struct llist_node *this); > +extern struct llist_node *llist_del_all_this(struct llist_head *head, > + struct llist_node *this); > > struct llist_node *llist_reverse_order(struct llist_node *head); > > diff --git a/include/linux/lockd/lockd.h b/include/linux/lockd/lockd.h > index f42594a9efe0..c48020e7ee08 100644 > --- a/include/linux/lockd/lockd.h > +++ b/include/linux/lockd/lockd.h > @@ -280,7 +280,7 @@ __be32 nlmsvc_testlock(struct svc_rqst *, struct nlm_file *, > struct nlm_host *, struct nlm_lock *, > struct nlm_lock *, struct nlm_cookie *); > __be32 nlmsvc_cancel_blocked(struct net *net, struct nlm_file *, struct nlm_lock *); > -unsigned long nlmsvc_retry_blocked(void); > +unsigned long nlmsvc_retry_blocked(struct svc_rqst *rqstp); > void nlmsvc_traverse_blocks(struct nlm_host *, struct nlm_file *, > nlm_host_match_fn_t match); > void nlmsvc_grant_reply(struct nlm_cookie *, __be32); > diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h > index 366f2b6b689c..cb2497b977c1 100644 > --- a/include/linux/sunrpc/svc.h > +++ b/include/linux/sunrpc/svc.h > @@ -31,11 +31,11 @@ > * node traffic on multi-node NUMA NFS servers. > */ > struct svc_pool { > - unsigned int sp_id; /* pool id; also node id on NUMA */ > - spinlock_t sp_lock; /* protects all fields */ > + unsigned int sp_id; /* pool id; also node id on NUMA */ > + spinlock_t sp_lock; /* protects all sp_socketsn sp_nrthreads*/ > struct list_head sp_sockets; /* pending sockets */ > unsigned int sp_nrthreads; /* # of threads in pool */ > - struct list_head sp_all_threads; /* all server threads */ > + struct llist_head sp_idle_threads;/* idle server threads */ > > /* statistics on pool operation */ > struct percpu_counter sp_sockets_queued; > @@ -43,12 +43,17 @@ struct svc_pool { > struct percpu_counter sp_threads_timedout; > struct percpu_counter sp_threads_starved; > > -#define SP_TASK_PENDING (0) /* still work to do even if no > - * xprt is queued. */ > -#define SP_CONGESTED (1) > unsigned long sp_flags; > } ____cacheline_aligned_in_smp; > > +enum svc_sp_flags { Let's make this an anonymous enum. Ditto below. > + SP_TASK_PENDING, /* still work to do even if no xprt is queued */ > + SP_CONGESTED, > + SP_NEED_VICTIM, /* One thread needs to agree to exit */ > + SP_VICTIM_REMAINS, /* One thread needs to actually exit */ > + SP_CLEANUP, /* A thread has set RQ_CLEAN_ME */ > +}; > + Converting the bit flags to an enum seems like an unrelated clean- up. It isn't necessary in order to implement the new scheduler. Let's extract this into a separate patch that can be applied first. Also, I'm not clear on the justification for this clean up. That should be explained in the patch description of the split-out clean-up patch(es). > /* > * RPC service. > * > @@ -195,7 +200,7 @@ extern u32 svc_max_payload(const struct svc_rqst *rqstp); > * processed. > */ > struct svc_rqst { > - struct list_head rq_all; /* all threads list */ > + struct llist_node rq_idle; /* On pool's idle list */ > struct rcu_head rq_rcu_head; /* for RCU deferred kfree */ > struct svc_xprt * rq_xprt; /* transport ptr */ > > @@ -233,16 +238,6 @@ struct svc_rqst { > u32 rq_proc; /* procedure number */ > u32 rq_prot; /* IP protocol */ > int rq_cachetype; /* catering to nfsd */ > -#define RQ_SECURE (0) /* secure port */ > -#define RQ_LOCAL (1) /* local request */ > -#define RQ_USEDEFERRAL (2) /* use deferral */ > -#define RQ_DROPME (3) /* drop current reply */ > -#define RQ_SPLICE_OK (4) /* turned off in gss privacy > - * to prevent encrypting page > - * cache pages */ > -#define RQ_VICTIM (5) /* about to be shut down */ > -#define RQ_BUSY (6) /* request is busy */ > -#define RQ_DATA (7) /* request has data */ > unsigned long rq_flags; /* flags field */ > ktime_t rq_qtime; /* enqueue time */ > > @@ -274,6 +269,20 @@ struct svc_rqst { > void ** rq_lease_breaker; /* The v4 client breaking a lease */ > }; > > +enum svc_rq_flags { > + RQ_SECURE, /* secure port */ > + RQ_LOCAL, /* local request */ > + RQ_USEDEFERRAL, /* use deferral */ > + RQ_DROPME, /* drop current reply */ > + RQ_SPLICE_OK, /* turned off in gss privacy > + * to prevent encrypting page > + * cache pages */ > + RQ_VICTIM, /* agreed to shut down */ > + RQ_DATA, /* request has data */ > + RQ_CLEAN_ME, /* Thread needs to exit but > + * is on the idle list */ > +}; > + Likewise here. And let's keep the flag clean-ups in separate patches. > #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net) > > /* > @@ -309,6 +318,15 @@ static inline struct sockaddr *svc_daddr(const struct svc_rqst *rqst) > return (struct sockaddr *) &rqst->rq_daddr; > } > This needs a kdoc comment and a more conventional name. How about svc_thread_should_stop() ? Actually it seems like a better abstraction all around if the upper layers don't have to care that they are running in a kthread -- so maybe replacing kthread_should_stop() is a good clean-up to apply in advance. > +static inline bool svc_should_stop(struct svc_rqst *rqstp) > +{ > + if (test_and_clear_bit(SP_NEED_VICTIM, &rqstp->rq_pool->sp_flags)) { > + set_bit(RQ_VICTIM, &rqstp->rq_flags); > + return true; > + } > + return test_bit(RQ_VICTIM, &rqstp->rq_flags); > +} > + > struct svc_deferred_req { > u32 prot; /* protocol (UDP or TCP) */ > struct svc_xprt *xprt; > @@ -416,6 +434,7 @@ bool svc_rqst_replace_page(struct svc_rqst *rqstp, > void svc_rqst_release_pages(struct svc_rqst *rqstp); > void svc_rqst_free(struct svc_rqst *); > void svc_exit_thread(struct svc_rqst *); > +bool svc_dequeue_rqst(struct svc_rqst *rqstp); > struct svc_serv * svc_create_pooled(struct svc_program *, unsigned int, > int (*threadfn)(void *data)); > int svc_set_num_threads(struct svc_serv *, struct svc_pool *, int); > @@ -428,7 +447,7 @@ int svc_register(const struct svc_serv *, struct net *, const int, > > void svc_wake_up(struct svc_serv *); > void svc_reserve(struct svc_rqst *rqstp, int space); > -struct svc_rqst *svc_pool_wake_idle_thread(struct svc_serv *serv, > +bool svc_pool_wake_idle_thread(struct svc_serv *serv, > struct svc_pool *pool); > struct svc_pool *svc_pool_for_cpu(struct svc_serv *serv); > char * svc_print_addr(struct svc_rqst *, char *, size_t); > diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h > index f6fd48961074..f63289d1491d 100644 > --- a/include/trace/events/sunrpc.h > +++ b/include/trace/events/sunrpc.h > @@ -1601,7 +1601,7 @@ DEFINE_SVCXDRBUF_EVENT(sendto); > svc_rqst_flag(DROPME) \ > svc_rqst_flag(SPLICE_OK) \ > svc_rqst_flag(VICTIM) \ > - svc_rqst_flag(BUSY) \ > + svc_rqst_flag(CLEAN_ME) \ > svc_rqst_flag_end(DATA) > > #undef svc_rqst_flag > @@ -1965,7 +1965,10 @@ TRACE_EVENT(svc_xprt_enqueue, > #define show_svc_pool_flags(x) \ > __print_flags(x, "|", \ > { BIT(SP_TASK_PENDING), "TASK_PENDING" }, \ > - { BIT(SP_CONGESTED), "CONGESTED" }) > + { BIT(SP_CONGESTED), "CONGESTED" }, \ > + { BIT(SP_NEED_VICTIM), "NEED_VICTIM" }, \ > + { BIT(SP_VICTIM_REMAINS), "VICTIM_REMAINS" }, \ > + { BIT(SP_CLEANUP), "CLEANUP" }) > > DECLARE_EVENT_CLASS(svc_pool_scheduler_class, > TP_PROTO( > diff --git a/lib/llist.c b/lib/llist.c > index 6e668fa5a2c6..660be07795ac 100644 > --- a/lib/llist.c > +++ b/lib/llist.c > @@ -65,6 +65,57 @@ struct llist_node *llist_del_first(struct llist_head *head) > } > EXPORT_SYMBOL_GPL(llist_del_first); > > +/** > + * llist_del_first_this - delete given entry of lock-less list if it is first > + * @head: the head for your lock-less list > + * @this: a list entry. > + * > + * If head of the list is given entry, delete and return it, else > + * return %NULL. > + * > + * Providing the caller has exclusive access to @this, multiple callers can > + * safely call this concurrently with multiple llist_add() callers. > + */ > +struct llist_node *llist_del_first_this(struct llist_head *head, > + struct llist_node *this) > +{ > + struct llist_node *entry, *next; > + > + entry = smp_load_acquire(&head->first); > + do { > + if (entry != this) > + return NULL; > + next = READ_ONCE(entry->next); > + } while (!try_cmpxchg(&head->first, &entry, next)); > + > + return entry; > +} > +EXPORT_SYMBOL_GPL(llist_del_first_this); > + > +/** > + * llist_del_all_this - delete all entries from lock-less list if first is the given element > + * @head: the head of lock-less list to delete all entries > + * @this: the expected first element. > + * > + * If the first element of the list is @this, delete all elements and > + * return them, else return %NULL. Providing the caller has exclusive access > + * to @this, multiple concurrent callers can call this or list_del_first_this() > + * simultaneuously with multiple callers of llist_add(). > + */ > +struct llist_node *llist_del_all_this(struct llist_head *head, > + struct llist_node *this) > +{ > + struct llist_node *entry; > + > + entry = smp_load_acquire(&head->first); > + do { > + if (entry != this) > + return NULL; > + } while (!try_cmpxchg(&head->first, &entry, NULL)); > + > + return entry; > +} > + I was going to say that we should copy the maintainer of lib/llist.c on this patch set, but I'm a little surprised to see no maintainer listed for it. I'm not sure how to get proper review for the new API and mechanism. Sidebar: Are there any self-tests or kunit tests for llist? > /** > * llist_reverse_order - reverse order of a llist chain > * @head: first item of the list to be reversed > diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c > index ffb7200e8257..55339cbbbc6e 100644 > --- a/net/sunrpc/svc.c > +++ b/net/sunrpc/svc.c > @@ -507,7 +507,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, > > pool->sp_id = i; > INIT_LIST_HEAD(&pool->sp_sockets); > - INIT_LIST_HEAD(&pool->sp_all_threads); > + init_llist_head(&pool->sp_idle_threads); > spin_lock_init(&pool->sp_lock); > > percpu_counter_init(&pool->sp_sockets_queued, 0, GFP_KERNEL); > @@ -652,9 +652,9 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node) > > pagevec_init(&rqstp->rq_pvec); > > - __set_bit(RQ_BUSY, &rqstp->rq_flags); > rqstp->rq_server = serv; > rqstp->rq_pool = pool; > + rqstp->rq_idle.next = &rqstp->rq_idle; Is there really no initializer helper for this? > rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0); > if (!rqstp->rq_scratch_page) > @@ -694,7 +694,6 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) > > spin_lock_bh(&pool->sp_lock); > pool->sp_nrthreads++; > - list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads); > spin_unlock_bh(&pool->sp_lock); > return rqstp; > } > @@ -704,32 +703,34 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) > * @serv: RPC service > * @pool: service thread pool > * > - * Returns an idle service thread (now marked BUSY), or NULL > - * if no service threads are available. Finding an idle service > - * thread and marking it BUSY is atomic with respect to other > - * calls to svc_pool_wake_idle_thread(). > + * If there are any idle threads in the pool, wake one up and return > + * %true, else return %false. The thread will become non-idle once > + * the scheduler schedules it, at which point is might wake another > + * thread if there seems to be enough work to justify that. So I'm wondering how another call to svc_pool_wake_idle_thread() that happens concurrently will not find and wake the same thread? > */ > -struct svc_rqst *svc_pool_wake_idle_thread(struct svc_serv *serv, > - struct svc_pool *pool) > +bool svc_pool_wake_idle_thread(struct svc_serv *serv, > + struct svc_pool *pool) > { > struct svc_rqst *rqstp; > + struct llist_node *ln; > > rcu_read_lock(); > - list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) { > - if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) > - continue; > - > - rcu_read_unlock(); > + ln = READ_ONCE(pool->sp_idle_threads.first); > + if (ln) { > + rqstp = llist_entry(ln, struct svc_rqst, rq_idle); > WRITE_ONCE(rqstp->rq_qtime, ktime_get()); > - wake_up_process(rqstp->rq_task); > - percpu_counter_inc(&pool->sp_threads_woken); > - return rqstp; > + if (!task_is_running(rqstp->rq_task)) { > + wake_up_process(rqstp->rq_task); > + percpu_counter_inc(&pool->sp_threads_woken); > + } > + rcu_read_unlock(); > + return true; > } > rcu_read_unlock(); > > trace_svc_pool_starved(serv, pool); > percpu_counter_inc(&pool->sp_threads_starved); > - return NULL; > + return false; > } > > static struct svc_pool * > @@ -738,19 +739,22 @@ svc_pool_next(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state) > return pool ? pool : &serv->sv_pools[(*state)++ % serv->sv_nrpools]; > } > > -static struct task_struct * > +static struct svc_pool * > svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state) > { > - unsigned int i; > - struct task_struct *task = NULL; > > if (pool != NULL) { > spin_lock_bh(&pool->sp_lock); > + if (pool->sp_nrthreads > 0) > + goto found_pool; > + spin_unlock_bh(&pool->sp_lock); > + return NULL; > } else { > + unsigned int i; > for (i = 0; i < serv->sv_nrpools; i++) { > pool = &serv->sv_pools[--(*state) % serv->sv_nrpools]; > spin_lock_bh(&pool->sp_lock); > - if (!list_empty(&pool->sp_all_threads)) > + if (pool->sp_nrthreads > 0) > goto found_pool; > spin_unlock_bh(&pool->sp_lock); > } > @@ -758,16 +762,10 @@ svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *stat > } > > found_pool: > - if (!list_empty(&pool->sp_all_threads)) { > - struct svc_rqst *rqstp; > - > - rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all); > - set_bit(RQ_VICTIM, &rqstp->rq_flags); > - list_del_rcu(&rqstp->rq_all); > - task = rqstp->rq_task; > - } > + set_bit(SP_VICTIM_REMAINS, &pool->sp_flags); > + set_bit(SP_NEED_VICTIM, &pool->sp_flags); > spin_unlock_bh(&pool->sp_lock); > - return task; > + return pool; > } > > static int > @@ -808,18 +806,16 @@ svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) > static int > svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) > { > - struct svc_rqst *rqstp; > - struct task_struct *task; > unsigned int state = serv->sv_nrthreads-1; > + struct svc_pool *vpool; > > do { > - task = svc_pool_victim(serv, pool, &state); > - if (task == NULL) > + vpool = svc_pool_victim(serv, pool, &state); > + if (vpool == NULL) > break; > - rqstp = kthread_data(task); > - /* Did we lose a race to svo_function threadfn? */ > - if (kthread_stop(task) == -EINTR) > - svc_exit_thread(rqstp); > + svc_pool_wake_idle_thread(serv, vpool); > + wait_on_bit(&vpool->sp_flags, SP_VICTIM_REMAINS, > + TASK_UNINTERRUPTIBLE); > nrservs++; > } while (nrservs < 0); > return 0; > @@ -931,16 +927,75 @@ svc_rqst_free(struct svc_rqst *rqstp) > } > EXPORT_SYMBOL_GPL(svc_rqst_free); > Can you add a kdoc comment for svc_dequeue_rqst()? There is a lot of complexity here that I'm not grokking on first read. Either it needs more comments or simplification (or both). It's not winning me over, I have to say ;-) > +bool svc_dequeue_rqst(struct svc_rqst *rqstp) > +{ > + struct svc_pool *pool = rqstp->rq_pool; > + struct llist_node *le, *last; > + > +retry: > + if (pool->sp_idle_threads.first != &rqstp->rq_idle) Would be better if there was a helper for this test. > + /* Not at head of queue, so cannot wake up */ > + return false; > + if (!test_and_clear_bit(SP_CLEANUP, &pool->sp_flags)) { > + le = llist_del_first_this(&pool->sp_idle_threads, > + &rqstp->rq_idle); > + if (le) > + le->next = le; > + return !!le; > + } > + /* Need to deal will RQ_CLEAN_ME thread */ > + le = llist_del_all_this(&pool->sp_idle_threads, > + &rqstp->rq_idle); > + if (!le) { > + /* lost a race, someone else need to clean up */ > + set_bit(SP_CLEANUP, &pool->sp_flags); > + svc_pool_wake_idle_thread(rqstp->rq_server, > + pool); > + goto retry; > + } > + if (!le->next) > + return true; > + last = le; > + while (last->next) { > + rqstp = list_entry(last->next, struct svc_rqst, rq_idle); > + if (!test_bit(RQ_CLEAN_ME, &rqstp->rq_flags)) { > + last = last->next; > + continue; > + } > + last->next = last->next->next; > + rqstp->rq_idle.next = &rqstp->rq_idle; > + wake_up_process(rqstp->rq_task); > + } > + if (last != le) > + llist_add_batch(le->next, last, &pool->sp_idle_threads); > + le->next = le; > + return true; > +} > + > void > svc_exit_thread(struct svc_rqst *rqstp) > { > struct svc_serv *serv = rqstp->rq_server; > struct svc_pool *pool = rqstp->rq_pool; > > + while (rqstp->rq_idle.next != &rqstp->rq_idle) { Helper, maybe? > + /* Still on the idle list. */ > + if (llist_del_first_this(&pool->sp_idle_threads, > + &rqstp->rq_idle)) { > + /* Safely removed */ > + rqstp->rq_idle.next = &rqstp->rq_idle; > + } else { > + set_current_state(TASK_UNINTERRUPTIBLE); > + set_bit(RQ_CLEAN_ME, &rqstp->rq_flags); > + set_bit(SP_CLEANUP, &pool->sp_flags); > + svc_pool_wake_idle_thread(serv, pool); > + if (!svc_dequeue_rqst(rqstp)) > + schedule(); > + __set_current_state(TASK_RUNNING); > + } > + } > spin_lock_bh(&pool->sp_lock); > pool->sp_nrthreads--; > - if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags)) > - list_del_rcu(&rqstp->rq_all); > spin_unlock_bh(&pool->sp_lock); > > spin_lock_bh(&serv->sv_lock); > @@ -948,6 +1003,8 @@ svc_exit_thread(struct svc_rqst *rqstp) > spin_unlock_bh(&serv->sv_lock); > svc_sock_update_bufs(serv); > > + if (test_bit(RQ_VICTIM, &rqstp->rq_flags)) > + clear_and_wake_up_bit(SP_VICTIM_REMAINS, &pool->sp_flags); > svc_rqst_free(rqstp); > > svc_put(serv); > diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c > index c4521bce1f27..d51587cd8d99 100644 > --- a/net/sunrpc/svc_xprt.c > +++ b/net/sunrpc/svc_xprt.c > @@ -446,7 +446,6 @@ static bool svc_xprt_ready(struct svc_xprt *xprt) > */ > void svc_xprt_enqueue(struct svc_xprt *xprt) > { > - struct svc_rqst *rqstp; > struct svc_pool *pool; > > if (!svc_xprt_ready(xprt)) > @@ -467,20 +466,19 @@ void svc_xprt_enqueue(struct svc_xprt *xprt) > list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); > spin_unlock_bh(&pool->sp_lock); > > - rqstp = svc_pool_wake_idle_thread(xprt->xpt_server, pool); > - if (!rqstp) { > + if (!svc_pool_wake_idle_thread(xprt->xpt_server, pool)) { > set_bit(SP_CONGESTED, &pool->sp_flags); > return; > } > > - trace_svc_xprt_enqueue(xprt, rqstp); > + // trace_svc_xprt_enqueue(xprt, rqstp); > } > EXPORT_SYMBOL_GPL(svc_xprt_enqueue); > > /* > * Dequeue the first transport, if there is one. > */ > -static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) > +static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool, bool *more) > { > struct svc_xprt *xprt = NULL; > > @@ -493,6 +491,7 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) > struct svc_xprt, xpt_ready); > list_del_init(&xprt->xpt_ready); > svc_xprt_get(xprt); > + *more = !list_empty(&pool->sp_sockets); > } > spin_unlock_bh(&pool->sp_lock); > out: > @@ -577,15 +576,13 @@ static void svc_xprt_release(struct svc_rqst *rqstp) > void svc_wake_up(struct svc_serv *serv) > { > struct svc_pool *pool = &serv->sv_pools[0]; > - struct svc_rqst *rqstp; > > - rqstp = svc_pool_wake_idle_thread(serv, pool); > - if (!rqstp) { > + if (!svc_pool_wake_idle_thread(serv, pool)) { > set_bit(SP_TASK_PENDING, &pool->sp_flags); > return; > } > > - trace_svc_wake_up(rqstp); > + // trace_svc_wake_up(rqstp); > } > EXPORT_SYMBOL_GPL(svc_wake_up); > > @@ -676,7 +673,7 @@ static int svc_alloc_arg(struct svc_rqst *rqstp) > continue; > > set_current_state(TASK_INTERRUPTIBLE); > - if (signalled() || kthread_should_stop()) { > + if (signalled() || svc_should_stop(rqstp)) { > set_current_state(TASK_RUNNING); > return -EINTR; > } > @@ -706,7 +703,10 @@ rqst_should_sleep(struct svc_rqst *rqstp) > struct svc_pool *pool = rqstp->rq_pool; > > /* did someone call svc_wake_up? */ > - if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags)) > + if (test_bit(SP_TASK_PENDING, &pool->sp_flags)) > + return false; > + if (test_bit(SP_CLEANUP, &pool->sp_flags)) > + /* a signalled thread needs to be released */ > return false; > > /* was a socket queued? */ > @@ -714,7 +714,7 @@ rqst_should_sleep(struct svc_rqst *rqstp) > return false; > > /* are we shutting down? */ > - if (signalled() || kthread_should_stop()) > + if (signalled() || svc_should_stop(rqstp)) > return false; > > /* are we freezing? */ > @@ -728,11 +728,9 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) > { > struct svc_pool *pool = rqstp->rq_pool; > long time_left = 0; > + bool more = false; > > - /* rq_xprt should be clear on entry */ > - WARN_ON_ONCE(rqstp->rq_xprt); > - > - rqstp->rq_xprt = svc_xprt_dequeue(pool); > + rqstp->rq_xprt = svc_xprt_dequeue(pool, &more); > if (rqstp->rq_xprt) { > trace_svc_pool_polled(pool, rqstp); > goto out_found; > @@ -743,11 +741,10 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) > * to bring down the daemons ... > */ > set_current_state(TASK_INTERRUPTIBLE); > - smp_mb__before_atomic(); > - clear_bit(SP_CONGESTED, &pool->sp_flags); > - clear_bit(RQ_BUSY, &rqstp->rq_flags); > - smp_mb__after_atomic(); > + clear_bit_unlock(SP_CONGESTED, &pool->sp_flags); > > + llist_add(&rqstp->rq_idle, &pool->sp_idle_threads); > +again: > if (likely(rqst_should_sleep(rqstp))) > time_left = schedule_timeout(timeout); > else > @@ -755,9 +752,20 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) > > try_to_freeze(); > > - set_bit(RQ_BUSY, &rqstp->rq_flags); > - smp_mb__after_atomic(); > - rqstp->rq_xprt = svc_xprt_dequeue(pool); > + if (!svc_dequeue_rqst(rqstp)) { > + if (signalled()) > + /* Can only return while on idle list if signalled */ > + return ERR_PTR(-EINTR); > + /* Still on the idle list */ > + goto again; > + } > + > + clear_bit(SP_TASK_PENDING, &pool->sp_flags); > + > + if (svc_should_stop(rqstp)) > + return ERR_PTR(-EINTR); > + > + rqstp->rq_xprt = svc_xprt_dequeue(pool, &more); > if (rqstp->rq_xprt) { > trace_svc_pool_awoken(pool, rqstp); > goto out_found; > @@ -766,10 +774,11 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) > if (!time_left) > percpu_counter_inc(&pool->sp_threads_timedout); > > - if (signalled() || kthread_should_stop()) > - return ERR_PTR(-EINTR); > return ERR_PTR(-EAGAIN); > out_found: > + if (more) > + svc_pool_wake_idle_thread(rqstp->rq_server, pool); > + I'm thinking that dealing with more work should be implemented as a separate optimization (ie, a subsequent patch). > /* Normally we will wait up to 5 seconds for any required > * cache information to be provided. > */ > @@ -866,7 +875,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout) > try_to_freeze(); > cond_resched(); > err = -EINTR; > - if (signalled() || kthread_should_stop()) > + if (signalled() || svc_should_stop(rqstp)) > goto out; > > xprt = svc_get_next_xprt(rqstp, timeout); > -- > 2.40.1 >