On Tue, 11 Jul 2023, Chuck Lever wrote: > On Mon, Jul 10, 2023 at 10:41:38AM +1000, NeilBrown wrote: > > > > The patch show an alternate approach to the recent patches which improve > > the latency for waking an idle thread when work is ready. > > > > The current approach involves searching a linked list for an idle thread > > to wake. The recent patches instead use a bitmap search to find the > > idle thread. With this patch no search is needed - and idle thread is > > directly available without searching. > > > > The idle threads are kept in an "llist" - there is no longer a list of > > all threads. > > > > The llist ADT does not allow concurrent delete_first operations, so to > > wake an idle thread we simply wake it and do not remove it from the > > list. > > When the thread is scheduled it will remove itself - which is safe - and > > will take the next thread if there is more work to do (and if there is > > another thread). > > > > The "remove itself" requires and addition to the llist api. > > "llist_del_first_this()" removes a given item if is it the first. > > Multiple callers can call this concurrently as along as they each give a > > different "this", so each thread can safely try to remove itself. It > > must be prepared for failure. > > > > Reducing the thread count currently requires finding any thing, idle or > > not, and calling kthread_stop(). This no longer possible as we don't > > have a list of all threads (though I guess we could keep the list if we > > wanted to...). Instead the pool is marked NEED_VICTIM and the next > > thread to go idle will become the VICTIM and duly exit - signalling > > this be clearing VICTIM_REMAINS. We replace kthread_should_stop() call > > with a new svc_should_stop() which checks and sets victim flags. > > > > nfsd threads can currently be told to exit with a signal. It might be > > time to deprecate/remove this feature. However this patch does support > > it. > > > > If the signalled thread is not at the head of the idle list it cannot > > remove itself. In this case it sets RQ_CLEAN_ME and SP_CLEANUP and the > > next thread to wake up will use llist_del_all_this() to remove all > > threads from the idle list. It then finds and removes any RQ_CLEAN_ME > > threads and puts the rest back on the list. > > > > There is quite a bit of churn here so it will need careful testing. > > In fact - it doesn't handle nfsv4 callback handling threads properly as > > they don't wait the same way that other threads wait... I'll need to > > think about that but I don't have time just now. > > > > For now it is primarily an RFC. I haven't given a lot of thought to > > trace points. > > > > It apply it you will need > > > > SUNRPC: Deduplicate thread wake-up code > > SUNRPC: Report when no service thread is available. > > SUNRPC: Split the svc_xprt_dequeue tracepoint > > SUNRPC: Clean up svc_set_num_threads > > SUNRPC: Replace dprintk() call site in __svc_create() > > > > from recent post by Chuck. > > Hi, thanks for letting us see your pencil sketches. :-) > > Later today, I'll push a topic branch to my kernel.org repo that we > can use as a base for continuing this work. > > Some initial remarks below, recognizing that this patch is still > incomplete. > > > > Signed-off-by: NeilBrown <neilb@xxxxxxx> > > --- > > fs/lockd/svc.c | 4 +- > > fs/lockd/svclock.c | 4 +- > > fs/nfs/callback.c | 5 +- > > fs/nfsd/nfssvc.c | 3 +- > > include/linux/llist.h | 4 + > > include/linux/lockd/lockd.h | 2 +- > > include/linux/sunrpc/svc.h | 55 +++++++++----- > > include/trace/events/sunrpc.h | 7 +- > > lib/llist.c | 51 +++++++++++++ > > net/sunrpc/svc.c | 139 ++++++++++++++++++++++++---------- > > net/sunrpc/svc_xprt.c | 61 ++++++++------- > > 11 files changed, 239 insertions(+), 96 deletions(-) > > > > diff --git a/fs/lockd/svc.c b/fs/lockd/svc.c > > index 22d3ff3818f5..df295771bd40 100644 > > --- a/fs/lockd/svc.c > > +++ b/fs/lockd/svc.c > > @@ -147,7 +147,7 @@ lockd(void *vrqstp) > > * The main request loop. We don't terminate until the last > > * NFS mount or NFS daemon has gone away. > > */ > > - while (!kthread_should_stop()) { > > + while (!svc_should_stop(rqstp)) { > > long timeout = MAX_SCHEDULE_TIMEOUT; > > RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]); > > > > @@ -160,7 +160,7 @@ lockd(void *vrqstp) > > continue; > > } > > > > - timeout = nlmsvc_retry_blocked(); > > + timeout = nlmsvc_retry_blocked(rqstp); > > > > /* > > * Find a socket with data available and call its > > diff --git a/fs/lockd/svclock.c b/fs/lockd/svclock.c > > index c43ccdf28ed9..54b679fcbcab 100644 > > --- a/fs/lockd/svclock.c > > +++ b/fs/lockd/svclock.c > > @@ -1009,13 +1009,13 @@ retry_deferred_block(struct nlm_block *block) > > * be retransmitted. > > */ > > unsigned long > > -nlmsvc_retry_blocked(void) > > +nlmsvc_retry_blocked(struct svc_rqst *rqstp) > > { > > unsigned long timeout = MAX_SCHEDULE_TIMEOUT; > > struct nlm_block *block; > > > > spin_lock(&nlm_blocked_lock); > > - while (!list_empty(&nlm_blocked) && !kthread_should_stop()) { > > + while (!list_empty(&nlm_blocked) && !svc_should_stop(rqstp)) { > > block = list_entry(nlm_blocked.next, struct nlm_block, b_list); > > > > if (block->b_when == NLM_NEVER) > > diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c > > index 456af7d230cf..646425f1dc36 100644 > > --- a/fs/nfs/callback.c > > +++ b/fs/nfs/callback.c > > @@ -111,7 +111,7 @@ nfs41_callback_svc(void *vrqstp) > > > > set_freezable(); > > > > - while (!kthread_freezable_should_stop(NULL)) { > > + while (!svc_should_stop(rqstp)) { > > > > if (signal_pending(current)) > > flush_signals(current); > > @@ -130,10 +130,11 @@ nfs41_callback_svc(void *vrqstp) > > error); > > } else { > > spin_unlock_bh(&serv->sv_cb_lock); > > - if (!kthread_should_stop()) > > + if (!svc_should_stop(rqstp)) > > schedule(); > > finish_wait(&serv->sv_cb_waitq, &wq); > > } > > + try_to_freeze(); > > } > > > > svc_exit_thread(rqstp); > > diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c > > index 9c7b1ef5be40..7cfa7f2e9bf7 100644 > > --- a/fs/nfsd/nfssvc.c > > +++ b/fs/nfsd/nfssvc.c > > @@ -62,8 +62,7 @@ static __be32 nfsd_init_request(struct svc_rqst *, > > * If (out side the lock) nn->nfsd_serv is non-NULL, then it must point to a > > * properly initialised 'struct svc_serv' with ->sv_nrthreads > 0 (unless > > * nn->keep_active is set). That number of nfsd threads must > > - * exist and each must be listed in ->sp_all_threads in some entry of > > - * ->sv_pools[]. > > + * exist. > > * > > * Each active thread holds a counted reference on nn->nfsd_serv, as does > > * the nn->keep_active flag and various transient calls to svc_get(). > > diff --git a/include/linux/llist.h b/include/linux/llist.h > > index 85bda2d02d65..5a22499844c8 100644 > > --- a/include/linux/llist.h > > +++ b/include/linux/llist.h > > @@ -248,6 +248,10 @@ static inline struct llist_node *__llist_del_all(struct llist_head *head) > > } > > > > extern struct llist_node *llist_del_first(struct llist_head *head); > > +extern struct llist_node *llist_del_first_this(struct llist_head *head, > > + struct llist_node *this); > > +extern struct llist_node *llist_del_all_this(struct llist_head *head, > > + struct llist_node *this); > > > > struct llist_node *llist_reverse_order(struct llist_node *head); > > > > diff --git a/include/linux/lockd/lockd.h b/include/linux/lockd/lockd.h > > index f42594a9efe0..c48020e7ee08 100644 > > --- a/include/linux/lockd/lockd.h > > +++ b/include/linux/lockd/lockd.h > > @@ -280,7 +280,7 @@ __be32 nlmsvc_testlock(struct svc_rqst *, struct nlm_file *, > > struct nlm_host *, struct nlm_lock *, > > struct nlm_lock *, struct nlm_cookie *); > > __be32 nlmsvc_cancel_blocked(struct net *net, struct nlm_file *, struct nlm_lock *); > > -unsigned long nlmsvc_retry_blocked(void); > > +unsigned long nlmsvc_retry_blocked(struct svc_rqst *rqstp); > > void nlmsvc_traverse_blocks(struct nlm_host *, struct nlm_file *, > > nlm_host_match_fn_t match); > > void nlmsvc_grant_reply(struct nlm_cookie *, __be32); > > diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h > > index 366f2b6b689c..cb2497b977c1 100644 > > --- a/include/linux/sunrpc/svc.h > > +++ b/include/linux/sunrpc/svc.h > > @@ -31,11 +31,11 @@ > > * node traffic on multi-node NUMA NFS servers. > > */ > > struct svc_pool { > > - unsigned int sp_id; /* pool id; also node id on NUMA */ > > - spinlock_t sp_lock; /* protects all fields */ > > + unsigned int sp_id; /* pool id; also node id on NUMA */ > > + spinlock_t sp_lock; /* protects all sp_socketsn sp_nrthreads*/ > > struct list_head sp_sockets; /* pending sockets */ > > unsigned int sp_nrthreads; /* # of threads in pool */ > > - struct list_head sp_all_threads; /* all server threads */ > > + struct llist_head sp_idle_threads;/* idle server threads */ > > > > /* statistics on pool operation */ > > struct percpu_counter sp_sockets_queued; > > @@ -43,12 +43,17 @@ struct svc_pool { > > struct percpu_counter sp_threads_timedout; > > struct percpu_counter sp_threads_starved; > > > > -#define SP_TASK_PENDING (0) /* still work to do even if no > > - * xprt is queued. */ > > -#define SP_CONGESTED (1) > > unsigned long sp_flags; > > } ____cacheline_aligned_in_smp; > > > > +enum svc_sp_flags { > > Let's make this an anonymous enum. Ditto below. The name was to compensate for moving the definitions away from the declaration for sp_flags. It means that if I search for "sp_flags" I'll easily find this list of bits. I guess a comment could achieve the same end, but I prefer code to be self-documenting where possible. > > > > + SP_TASK_PENDING, /* still work to do even if no xprt is queued */ > > + SP_CONGESTED, > > + SP_NEED_VICTIM, /* One thread needs to agree to exit */ > > + SP_VICTIM_REMAINS, /* One thread needs to actually exit */ > > + SP_CLEANUP, /* A thread has set RQ_CLEAN_ME */ > > +}; > > + > > Converting the bit flags to an enum seems like an unrelated clean- > up. It isn't necessary in order to implement the new scheduler. > Let's extract this into a separate patch that can be applied first. > > Also, I'm not clear on the justification for this clean up. That > should be explained in the patch description of the split-out > clean-up patch(es). > > > > /* > > * RPC service. > > * > > @@ -195,7 +200,7 @@ extern u32 svc_max_payload(const struct svc_rqst *rqstp); > > * processed. > > */ > > struct svc_rqst { > > - struct list_head rq_all; /* all threads list */ > > + struct llist_node rq_idle; /* On pool's idle list */ > > struct rcu_head rq_rcu_head; /* for RCU deferred kfree */ > > struct svc_xprt * rq_xprt; /* transport ptr */ > > > > @@ -233,16 +238,6 @@ struct svc_rqst { > > u32 rq_proc; /* procedure number */ > > u32 rq_prot; /* IP protocol */ > > int rq_cachetype; /* catering to nfsd */ > > -#define RQ_SECURE (0) /* secure port */ > > -#define RQ_LOCAL (1) /* local request */ > > -#define RQ_USEDEFERRAL (2) /* use deferral */ > > -#define RQ_DROPME (3) /* drop current reply */ > > -#define RQ_SPLICE_OK (4) /* turned off in gss privacy > > - * to prevent encrypting page > > - * cache pages */ > > -#define RQ_VICTIM (5) /* about to be shut down */ > > -#define RQ_BUSY (6) /* request is busy */ > > -#define RQ_DATA (7) /* request has data */ > > unsigned long rq_flags; /* flags field */ > > ktime_t rq_qtime; /* enqueue time */ > > > > @@ -274,6 +269,20 @@ struct svc_rqst { > > void ** rq_lease_breaker; /* The v4 client breaking a lease */ > > }; > > > > +enum svc_rq_flags { > > + RQ_SECURE, /* secure port */ > > + RQ_LOCAL, /* local request */ > > + RQ_USEDEFERRAL, /* use deferral */ > > + RQ_DROPME, /* drop current reply */ > > + RQ_SPLICE_OK, /* turned off in gss privacy > > + * to prevent encrypting page > > + * cache pages */ > > + RQ_VICTIM, /* agreed to shut down */ > > + RQ_DATA, /* request has data */ > > + RQ_CLEAN_ME, /* Thread needs to exit but > > + * is on the idle list */ > > +}; > > + > > Likewise here. And let's keep the flag clean-ups in separate patches. > > > > #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net) > > > > /* > > @@ -309,6 +318,15 @@ static inline struct sockaddr *svc_daddr(const struct svc_rqst *rqst) > > return (struct sockaddr *) &rqst->rq_daddr; > > } > > > > This needs a kdoc comment and a more conventional name. How about > svc_thread_should_stop() ? > > Actually it seems like a better abstraction all around if the upper > layers don't have to care that they are running in a kthread -- so > maybe replacing kthread_should_stop() is a good clean-up to apply > in advance. > > > > +static inline bool svc_should_stop(struct svc_rqst *rqstp) > > +{ > > + if (test_and_clear_bit(SP_NEED_VICTIM, &rqstp->rq_pool->sp_flags)) { > > + set_bit(RQ_VICTIM, &rqstp->rq_flags); > > + return true; > > + } > > + return test_bit(RQ_VICTIM, &rqstp->rq_flags); > > +} > > + > > struct svc_deferred_req { > > u32 prot; /* protocol (UDP or TCP) */ > > struct svc_xprt *xprt; > > @@ -416,6 +434,7 @@ bool svc_rqst_replace_page(struct svc_rqst *rqstp, > > void svc_rqst_release_pages(struct svc_rqst *rqstp); > > void svc_rqst_free(struct svc_rqst *); > > void svc_exit_thread(struct svc_rqst *); > > +bool svc_dequeue_rqst(struct svc_rqst *rqstp); > > struct svc_serv * svc_create_pooled(struct svc_program *, unsigned int, > > int (*threadfn)(void *data)); > > int svc_set_num_threads(struct svc_serv *, struct svc_pool *, int); > > @@ -428,7 +447,7 @@ int svc_register(const struct svc_serv *, struct net *, const int, > > > > void svc_wake_up(struct svc_serv *); > > void svc_reserve(struct svc_rqst *rqstp, int space); > > -struct svc_rqst *svc_pool_wake_idle_thread(struct svc_serv *serv, > > +bool svc_pool_wake_idle_thread(struct svc_serv *serv, > > struct svc_pool *pool); > > struct svc_pool *svc_pool_for_cpu(struct svc_serv *serv); > > char * svc_print_addr(struct svc_rqst *, char *, size_t); > > diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h > > index f6fd48961074..f63289d1491d 100644 > > --- a/include/trace/events/sunrpc.h > > +++ b/include/trace/events/sunrpc.h > > @@ -1601,7 +1601,7 @@ DEFINE_SVCXDRBUF_EVENT(sendto); > > svc_rqst_flag(DROPME) \ > > svc_rqst_flag(SPLICE_OK) \ > > svc_rqst_flag(VICTIM) \ > > - svc_rqst_flag(BUSY) \ > > + svc_rqst_flag(CLEAN_ME) \ > > svc_rqst_flag_end(DATA) > > > > #undef svc_rqst_flag > > @@ -1965,7 +1965,10 @@ TRACE_EVENT(svc_xprt_enqueue, > > #define show_svc_pool_flags(x) \ > > __print_flags(x, "|", \ > > { BIT(SP_TASK_PENDING), "TASK_PENDING" }, \ > > - { BIT(SP_CONGESTED), "CONGESTED" }) > > + { BIT(SP_CONGESTED), "CONGESTED" }, \ > > + { BIT(SP_NEED_VICTIM), "NEED_VICTIM" }, \ > > + { BIT(SP_VICTIM_REMAINS), "VICTIM_REMAINS" }, \ > > + { BIT(SP_CLEANUP), "CLEANUP" }) > > > > DECLARE_EVENT_CLASS(svc_pool_scheduler_class, > > TP_PROTO( > > diff --git a/lib/llist.c b/lib/llist.c > > index 6e668fa5a2c6..660be07795ac 100644 > > --- a/lib/llist.c > > +++ b/lib/llist.c > > @@ -65,6 +65,57 @@ struct llist_node *llist_del_first(struct llist_head *head) > > } > > EXPORT_SYMBOL_GPL(llist_del_first); > > > > +/** > > + * llist_del_first_this - delete given entry of lock-less list if it is first > > + * @head: the head for your lock-less list > > + * @this: a list entry. > > + * > > + * If head of the list is given entry, delete and return it, else > > + * return %NULL. > > + * > > + * Providing the caller has exclusive access to @this, multiple callers can > > + * safely call this concurrently with multiple llist_add() callers. > > + */ > > +struct llist_node *llist_del_first_this(struct llist_head *head, > > + struct llist_node *this) > > +{ > > + struct llist_node *entry, *next; > > + > > + entry = smp_load_acquire(&head->first); > > + do { > > + if (entry != this) > > + return NULL; > > + next = READ_ONCE(entry->next); > > + } while (!try_cmpxchg(&head->first, &entry, next)); > > + > > + return entry; > > +} > > +EXPORT_SYMBOL_GPL(llist_del_first_this); > > + > > +/** > > + * llist_del_all_this - delete all entries from lock-less list if first is the given element > > + * @head: the head of lock-less list to delete all entries > > + * @this: the expected first element. > > + * > > + * If the first element of the list is @this, delete all elements and > > + * return them, else return %NULL. Providing the caller has exclusive access > > + * to @this, multiple concurrent callers can call this or list_del_first_this() > > + * simultaneuously with multiple callers of llist_add(). > > + */ > > +struct llist_node *llist_del_all_this(struct llist_head *head, > > + struct llist_node *this) > > +{ > > + struct llist_node *entry; > > + > > + entry = smp_load_acquire(&head->first); > > + do { > > + if (entry != this) > > + return NULL; > > + } while (!try_cmpxchg(&head->first, &entry, NULL)); > > + > > + return entry; > > +} > > + > > I was going to say that we should copy the maintainer of > lib/llist.c on this patch set, but I'm a little surprised to see > no maintainer listed for it. I'm not sure how to get proper > review for the new API and mechanism. I would certainly copy the author if this ends up going anywhere. > > Sidebar: Are there any self-tests or kunit tests for llist? > > > > /** > > * llist_reverse_order - reverse order of a llist chain > > * @head: first item of the list to be reversed > > diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c > > index ffb7200e8257..55339cbbbc6e 100644 > > --- a/net/sunrpc/svc.c > > +++ b/net/sunrpc/svc.c > > @@ -507,7 +507,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, > > > > pool->sp_id = i; > > INIT_LIST_HEAD(&pool->sp_sockets); > > - INIT_LIST_HEAD(&pool->sp_all_threads); > > + init_llist_head(&pool->sp_idle_threads); > > spin_lock_init(&pool->sp_lock); > > > > percpu_counter_init(&pool->sp_sockets_queued, 0, GFP_KERNEL); > > @@ -652,9 +652,9 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node) > > > > pagevec_init(&rqstp->rq_pvec); > > > > - __set_bit(RQ_BUSY, &rqstp->rq_flags); > > rqstp->rq_server = serv; > > rqstp->rq_pool = pool; > > + rqstp->rq_idle.next = &rqstp->rq_idle; > > Is there really no initializer helper for this? This convention for ".next == &node" meaning it isn't on the list is a convention that I created. So there is no pre-existing help. Yes: I should add one. > > > > rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0); > > if (!rqstp->rq_scratch_page) > > @@ -694,7 +694,6 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) > > > > spin_lock_bh(&pool->sp_lock); > > pool->sp_nrthreads++; > > - list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads); > > spin_unlock_bh(&pool->sp_lock); > > return rqstp; > > } > > @@ -704,32 +703,34 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) > > * @serv: RPC service > > * @pool: service thread pool > > * > > - * Returns an idle service thread (now marked BUSY), or NULL > > - * if no service threads are available. Finding an idle service > > - * thread and marking it BUSY is atomic with respect to other > > - * calls to svc_pool_wake_idle_thread(). > > + * If there are any idle threads in the pool, wake one up and return > > + * %true, else return %false. The thread will become non-idle once > > + * the scheduler schedules it, at which point is might wake another > > + * thread if there seems to be enough work to justify that. > > So I'm wondering how another call to svc_pool_wake_idle_thread() > that happens concurrently will not find and wake the same thread? It will find an wake the same thread. When that thread is scheduled it will de-queue any work that is needed. Then if there is more it will wake another. > > > > */ > > -struct svc_rqst *svc_pool_wake_idle_thread(struct svc_serv *serv, > > - struct svc_pool *pool) > > +bool svc_pool_wake_idle_thread(struct svc_serv *serv, > > + struct svc_pool *pool) > > { > > struct svc_rqst *rqstp; > > + struct llist_node *ln; > > > > rcu_read_lock(); > > - list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) { > > - if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) > > - continue; > > - > > - rcu_read_unlock(); > > + ln = READ_ONCE(pool->sp_idle_threads.first); > > + if (ln) { > > + rqstp = llist_entry(ln, struct svc_rqst, rq_idle); > > WRITE_ONCE(rqstp->rq_qtime, ktime_get()); > > - wake_up_process(rqstp->rq_task); > > - percpu_counter_inc(&pool->sp_threads_woken); > > - return rqstp; > > + if (!task_is_running(rqstp->rq_task)) { > > + wake_up_process(rqstp->rq_task); > > + percpu_counter_inc(&pool->sp_threads_woken); > > + } > > + rcu_read_unlock(); > > + return true; > > } > > rcu_read_unlock(); > > > > trace_svc_pool_starved(serv, pool); > > percpu_counter_inc(&pool->sp_threads_starved); > > - return NULL; > > + return false; > > } > > > > static struct svc_pool * > > @@ -738,19 +739,22 @@ svc_pool_next(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state) > > return pool ? pool : &serv->sv_pools[(*state)++ % serv->sv_nrpools]; > > } > > > > -static struct task_struct * > > +static struct svc_pool * > > svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state) > > { > > - unsigned int i; > > - struct task_struct *task = NULL; > > > > if (pool != NULL) { > > spin_lock_bh(&pool->sp_lock); > > + if (pool->sp_nrthreads > 0) > > + goto found_pool; > > + spin_unlock_bh(&pool->sp_lock); > > + return NULL; > > } else { > > + unsigned int i; > > for (i = 0; i < serv->sv_nrpools; i++) { > > pool = &serv->sv_pools[--(*state) % serv->sv_nrpools]; > > spin_lock_bh(&pool->sp_lock); > > - if (!list_empty(&pool->sp_all_threads)) > > + if (pool->sp_nrthreads > 0) > > goto found_pool; > > spin_unlock_bh(&pool->sp_lock); > > } > > @@ -758,16 +762,10 @@ svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *stat > > } > > > > found_pool: > > - if (!list_empty(&pool->sp_all_threads)) { > > - struct svc_rqst *rqstp; > > - > > - rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all); > > - set_bit(RQ_VICTIM, &rqstp->rq_flags); > > - list_del_rcu(&rqstp->rq_all); > > - task = rqstp->rq_task; > > - } > > + set_bit(SP_VICTIM_REMAINS, &pool->sp_flags); > > + set_bit(SP_NEED_VICTIM, &pool->sp_flags); > > spin_unlock_bh(&pool->sp_lock); > > - return task; > > + return pool; > > } > > > > static int > > @@ -808,18 +806,16 @@ svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) > > static int > > svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) > > { > > - struct svc_rqst *rqstp; > > - struct task_struct *task; > > unsigned int state = serv->sv_nrthreads-1; > > + struct svc_pool *vpool; > > > > do { > > - task = svc_pool_victim(serv, pool, &state); > > - if (task == NULL) > > + vpool = svc_pool_victim(serv, pool, &state); > > + if (vpool == NULL) > > break; > > - rqstp = kthread_data(task); > > - /* Did we lose a race to svo_function threadfn? */ > > - if (kthread_stop(task) == -EINTR) > > - svc_exit_thread(rqstp); > > + svc_pool_wake_idle_thread(serv, vpool); > > + wait_on_bit(&vpool->sp_flags, SP_VICTIM_REMAINS, > > + TASK_UNINTERRUPTIBLE); > > nrservs++; > > } while (nrservs < 0); > > return 0; > > @@ -931,16 +927,75 @@ svc_rqst_free(struct svc_rqst *rqstp) > > } > > EXPORT_SYMBOL_GPL(svc_rqst_free); > > > > Can you add a kdoc comment for svc_dequeue_rqst()? > > There is a lot of complexity here that I'm not grokking on first > read. Either it needs more comments or simplification (or both). > > It's not winning me over, I have to say ;-) > > > > +bool svc_dequeue_rqst(struct svc_rqst *rqstp) > > +{ > > + struct svc_pool *pool = rqstp->rq_pool; > > + struct llist_node *le, *last; > > + > > +retry: > > + if (pool->sp_idle_threads.first != &rqstp->rq_idle) > > Would be better if there was a helper for this test. > > > > + /* Not at head of queue, so cannot wake up */ > > + return false; > > + if (!test_and_clear_bit(SP_CLEANUP, &pool->sp_flags)) { > > + le = llist_del_first_this(&pool->sp_idle_threads, > > + &rqstp->rq_idle); > > + if (le) > > + le->next = le; > > + return !!le; > > + } > > + /* Need to deal will RQ_CLEAN_ME thread */ > > + le = llist_del_all_this(&pool->sp_idle_threads, > > + &rqstp->rq_idle); > > + if (!le) { > > + /* lost a race, someone else need to clean up */ > > + set_bit(SP_CLEANUP, &pool->sp_flags); > > + svc_pool_wake_idle_thread(rqstp->rq_server, > > + pool); > > + goto retry; > > + } > > + if (!le->next) > > + return true; > > + last = le; > > + while (last->next) { > > + rqstp = list_entry(last->next, struct svc_rqst, rq_idle); > > + if (!test_bit(RQ_CLEAN_ME, &rqstp->rq_flags)) { > > + last = last->next; > > + continue; > > + } > > + last->next = last->next->next; > > + rqstp->rq_idle.next = &rqstp->rq_idle; > > + wake_up_process(rqstp->rq_task); > > + } > > + if (last != le) > > + llist_add_batch(le->next, last, &pool->sp_idle_threads); > > + le->next = le; > > + return true; > > +} > > + > > void > > svc_exit_thread(struct svc_rqst *rqstp) > > { > > struct svc_serv *serv = rqstp->rq_server; > > struct svc_pool *pool = rqstp->rq_pool; > > > > + while (rqstp->rq_idle.next != &rqstp->rq_idle) { > > Helper, maybe? > > > > + /* Still on the idle list. */ > > + if (llist_del_first_this(&pool->sp_idle_threads, > > + &rqstp->rq_idle)) { > > + /* Safely removed */ > > + rqstp->rq_idle.next = &rqstp->rq_idle; > > + } else { > > + set_current_state(TASK_UNINTERRUPTIBLE); > > + set_bit(RQ_CLEAN_ME, &rqstp->rq_flags); > > + set_bit(SP_CLEANUP, &pool->sp_flags); > > + svc_pool_wake_idle_thread(serv, pool); > > + if (!svc_dequeue_rqst(rqstp)) > > + schedule(); > > + __set_current_state(TASK_RUNNING); > > + } > > + } > > spin_lock_bh(&pool->sp_lock); > > pool->sp_nrthreads--; > > - if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags)) > > - list_del_rcu(&rqstp->rq_all); > > spin_unlock_bh(&pool->sp_lock); > > > > spin_lock_bh(&serv->sv_lock); > > @@ -948,6 +1003,8 @@ svc_exit_thread(struct svc_rqst *rqstp) > > spin_unlock_bh(&serv->sv_lock); > > svc_sock_update_bufs(serv); > > > > + if (test_bit(RQ_VICTIM, &rqstp->rq_flags)) > > + clear_and_wake_up_bit(SP_VICTIM_REMAINS, &pool->sp_flags); > > svc_rqst_free(rqstp); > > > > svc_put(serv); > > diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c > > index c4521bce1f27..d51587cd8d99 100644 > > --- a/net/sunrpc/svc_xprt.c > > +++ b/net/sunrpc/svc_xprt.c > > @@ -446,7 +446,6 @@ static bool svc_xprt_ready(struct svc_xprt *xprt) > > */ > > void svc_xprt_enqueue(struct svc_xprt *xprt) > > { > > - struct svc_rqst *rqstp; > > struct svc_pool *pool; > > > > if (!svc_xprt_ready(xprt)) > > @@ -467,20 +466,19 @@ void svc_xprt_enqueue(struct svc_xprt *xprt) > > list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); > > spin_unlock_bh(&pool->sp_lock); > > > > - rqstp = svc_pool_wake_idle_thread(xprt->xpt_server, pool); > > - if (!rqstp) { > > + if (!svc_pool_wake_idle_thread(xprt->xpt_server, pool)) { > > set_bit(SP_CONGESTED, &pool->sp_flags); > > return; > > } > > > > - trace_svc_xprt_enqueue(xprt, rqstp); > > + // trace_svc_xprt_enqueue(xprt, rqstp); > > } > > EXPORT_SYMBOL_GPL(svc_xprt_enqueue); > > > > /* > > * Dequeue the first transport, if there is one. > > */ > > -static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) > > +static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool, bool *more) > > { > > struct svc_xprt *xprt = NULL; > > > > @@ -493,6 +491,7 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) > > struct svc_xprt, xpt_ready); > > list_del_init(&xprt->xpt_ready); > > svc_xprt_get(xprt); > > + *more = !list_empty(&pool->sp_sockets); > > } > > spin_unlock_bh(&pool->sp_lock); > > out: > > @@ -577,15 +576,13 @@ static void svc_xprt_release(struct svc_rqst *rqstp) > > void svc_wake_up(struct svc_serv *serv) > > { > > struct svc_pool *pool = &serv->sv_pools[0]; > > - struct svc_rqst *rqstp; > > > > - rqstp = svc_pool_wake_idle_thread(serv, pool); > > - if (!rqstp) { > > + if (!svc_pool_wake_idle_thread(serv, pool)) { > > set_bit(SP_TASK_PENDING, &pool->sp_flags); > > return; > > } > > > > - trace_svc_wake_up(rqstp); > > + // trace_svc_wake_up(rqstp); > > } > > EXPORT_SYMBOL_GPL(svc_wake_up); > > > > @@ -676,7 +673,7 @@ static int svc_alloc_arg(struct svc_rqst *rqstp) > > continue; > > > > set_current_state(TASK_INTERRUPTIBLE); > > - if (signalled() || kthread_should_stop()) { > > + if (signalled() || svc_should_stop(rqstp)) { > > set_current_state(TASK_RUNNING); > > return -EINTR; > > } > > @@ -706,7 +703,10 @@ rqst_should_sleep(struct svc_rqst *rqstp) > > struct svc_pool *pool = rqstp->rq_pool; > > > > /* did someone call svc_wake_up? */ > > - if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags)) > > + if (test_bit(SP_TASK_PENDING, &pool->sp_flags)) > > + return false; > > + if (test_bit(SP_CLEANUP, &pool->sp_flags)) > > + /* a signalled thread needs to be released */ > > return false; > > > > /* was a socket queued? */ > > @@ -714,7 +714,7 @@ rqst_should_sleep(struct svc_rqst *rqstp) > > return false; > > > > /* are we shutting down? */ > > - if (signalled() || kthread_should_stop()) > > + if (signalled() || svc_should_stop(rqstp)) > > return false; > > > > /* are we freezing? */ > > @@ -728,11 +728,9 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) > > { > > struct svc_pool *pool = rqstp->rq_pool; > > long time_left = 0; > > + bool more = false; > > > > - /* rq_xprt should be clear on entry */ > > - WARN_ON_ONCE(rqstp->rq_xprt); > > - > > - rqstp->rq_xprt = svc_xprt_dequeue(pool); > > + rqstp->rq_xprt = svc_xprt_dequeue(pool, &more); > > if (rqstp->rq_xprt) { > > trace_svc_pool_polled(pool, rqstp); > > goto out_found; > > @@ -743,11 +741,10 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) > > * to bring down the daemons ... > > */ > > set_current_state(TASK_INTERRUPTIBLE); > > - smp_mb__before_atomic(); > > - clear_bit(SP_CONGESTED, &pool->sp_flags); > > - clear_bit(RQ_BUSY, &rqstp->rq_flags); > > - smp_mb__after_atomic(); > > + clear_bit_unlock(SP_CONGESTED, &pool->sp_flags); > > > > + llist_add(&rqstp->rq_idle, &pool->sp_idle_threads); > > +again: > > if (likely(rqst_should_sleep(rqstp))) > > time_left = schedule_timeout(timeout); > > else > > @@ -755,9 +752,20 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) > > > > try_to_freeze(); > > > > - set_bit(RQ_BUSY, &rqstp->rq_flags); > > - smp_mb__after_atomic(); > > - rqstp->rq_xprt = svc_xprt_dequeue(pool); > > + if (!svc_dequeue_rqst(rqstp)) { > > + if (signalled()) > > + /* Can only return while on idle list if signalled */ > > + return ERR_PTR(-EINTR); > > + /* Still on the idle list */ > > + goto again; > > + } > > + > > + clear_bit(SP_TASK_PENDING, &pool->sp_flags); > > + > > + if (svc_should_stop(rqstp)) > > + return ERR_PTR(-EINTR); > > + > > + rqstp->rq_xprt = svc_xprt_dequeue(pool, &more); > > if (rqstp->rq_xprt) { > > trace_svc_pool_awoken(pool, rqstp); > > goto out_found; > > @@ -766,10 +774,11 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) > > if (!time_left) > > percpu_counter_inc(&pool->sp_threads_timedout); > > > > - if (signalled() || kthread_should_stop()) > > - return ERR_PTR(-EINTR); > > return ERR_PTR(-EAGAIN); > > out_found: > > + if (more) > > + svc_pool_wake_idle_thread(rqstp->rq_server, pool); > > + > > I'm thinking that dealing with more work should be implemented as a > separate optimization (ie, a subsequent patch). Ideally: yes. However the two changes are inter-related. Using a lockless queue lends itself particularly well to only waking one thread at a time - the thread at the head. To preserve the current practice of waking a thread every time a work-item is queue we would need a lock to remove the thread from the queue. There would be a small constant amount of work to do under that lock, so the cost would be minimal and I could do that. But then I would just remove the lock again when changing to wake threads sequentially. Still - might be worth doing it that way for independent testing and performance comparison. > > > > /* Normally we will wait up to 5 seconds for any required > > * cache information to be provided. > > */ > > @@ -866,7 +875,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout) > > try_to_freeze(); > > cond_resched(); > > err = -EINTR; > > - if (signalled() || kthread_should_stop()) > > + if (signalled() || svc_should_stop(rqstp)) > > goto out; > > > > xprt = svc_get_next_xprt(rqstp, timeout); > > -- > > 2.40.1 > > > Thanks for the review. I generally agree with anything that I didn't reply to - and much that I did. What do you think of removing the ability to stop an nfsd thread by sending it a signal. Note that this doesn't apply to lockd or to nfsv4 callback threads. And nfs-utils never relies on this. I'm keen. It would make this patch a lot simpler. Thanks, NeilBrown