Re: [PATCH/RFC] sunrpc: constant-time code to wake idle thread

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, Jul 10, 2023 at 10:41:38AM +1000, NeilBrown wrote:
> 
> The patch show an alternate approach to the recent patches which improve
> the latency for waking an idle thread when work is ready.
> 
> The current approach involves searching a linked list for an idle thread
> to wake.  The recent patches instead use a bitmap search to find the
> idle thread.  With this patch no search is needed - and idle thread is
> directly available without searching.
> 
> The idle threads are kept in an "llist" - there is no longer a list of
> all threads.
> 
> The llist ADT does not allow concurrent delete_first operations, so to
> wake an idle thread we simply wake it and do not remove it from the
> list.
> When the thread is scheduled it will remove itself - which is safe - and
> will take the next thread if there is more work to do (and if there is
> another thread).
> 
> The "remove itself" requires and addition to the llist api.
> "llist_del_first_this()" removes a given item if is it the first.
> Multiple callers can call this concurrently as along as they each give a
> different "this", so each thread can safely try to remove itself.  It
> must be prepared for failure.
> 
> Reducing the thread count currently requires finding any thing, idle or
> not, and calling kthread_stop().  This no longer possible as we don't
> have a list of all threads (though I guess we could keep the list if we
> wanted to...).  Instead the pool is marked NEED_VICTIM and the next
> thread to go idle will become the VICTIM and duly exit - signalling
> this be clearing VICTIM_REMAINS.  We replace kthread_should_stop() call
> with a new svc_should_stop() which checks and sets victim flags.
> 
> nfsd threads can currently be told to exit with a signal.  It might be
> time to deprecate/remove this feature.  However this patch does support
> it.
> 
> If the signalled thread is not at the head of the idle list it cannot
> remove itself.  In this case it sets RQ_CLEAN_ME and SP_CLEANUP and the
> next thread to wake up will use llist_del_all_this() to remove all
> threads from the idle list.  It then finds and removes any RQ_CLEAN_ME
> threads and puts the rest back on the list.
> 
> There is quite a bit of churn here so it will need careful testing.
> In fact - it doesn't handle nfsv4 callback handling threads properly as
> they don't wait the same way that other threads wait...  I'll need to
> think about that but I don't have time just now.
> 
> For now it is primarily an RFC.  I haven't given a lot of thought to
> trace points.
> 
> It apply it you will need
> 
>  SUNRPC: Deduplicate thread wake-up code
>  SUNRPC: Report when no service thread is available.
>  SUNRPC: Split the svc_xprt_dequeue tracepoint
>  SUNRPC: Clean up svc_set_num_threads
>  SUNRPC: Replace dprintk() call site in __svc_create()
> 
> from recent post by Chuck.
> 
> Signed-off-by: NeilBrown <neilb@xxxxxxx>
> ---
>  fs/lockd/svc.c                |   4 +-
>  fs/lockd/svclock.c            |   4 +-
>  fs/nfs/callback.c             |   5 +-
>  fs/nfsd/nfssvc.c              |   3 +-
>  include/linux/llist.h         |   4 +
>  include/linux/lockd/lockd.h   |   2 +-
>  include/linux/sunrpc/svc.h    |  55 +++++++++-----
>  include/trace/events/sunrpc.h |   7 +-
>  lib/llist.c                   |  51 +++++++++++++
>  net/sunrpc/svc.c              | 139 ++++++++++++++++++++++++----------
>  net/sunrpc/svc_xprt.c         |  61 ++++++++-------
>  11 files changed, 239 insertions(+), 96 deletions(-)
> 
> diff --git a/fs/lockd/svc.c b/fs/lockd/svc.c
> index 22d3ff3818f5..df295771bd40 100644
> --- a/fs/lockd/svc.c
> +++ b/fs/lockd/svc.c
> @@ -147,7 +147,7 @@ lockd(void *vrqstp)
>  	 * The main request loop. We don't terminate until the last
>  	 * NFS mount or NFS daemon has gone away.
>  	 */
> -	while (!kthread_should_stop()) {
> +	while (!svc_should_stop(rqstp)) {
>  		long timeout = MAX_SCHEDULE_TIMEOUT;
>  		RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
>  
> @@ -160,7 +160,7 @@ lockd(void *vrqstp)
>  			continue;
>  		}
>  
> -		timeout = nlmsvc_retry_blocked();
> +		timeout = nlmsvc_retry_blocked(rqstp);
>  
>  		/*
>  		 * Find a socket with data available and call its
> diff --git a/fs/lockd/svclock.c b/fs/lockd/svclock.c
> index c43ccdf28ed9..54b679fcbcab 100644
> --- a/fs/lockd/svclock.c
> +++ b/fs/lockd/svclock.c
> @@ -1009,13 +1009,13 @@ retry_deferred_block(struct nlm_block *block)
>   * be retransmitted.
>   */
>  unsigned long
> -nlmsvc_retry_blocked(void)
> +nlmsvc_retry_blocked(struct svc_rqst *rqstp)
>  {
>  	unsigned long	timeout = MAX_SCHEDULE_TIMEOUT;
>  	struct nlm_block *block;
>  
>  	spin_lock(&nlm_blocked_lock);
> -	while (!list_empty(&nlm_blocked) && !kthread_should_stop()) {
> +	while (!list_empty(&nlm_blocked) && !svc_should_stop(rqstp)) {
>  		block = list_entry(nlm_blocked.next, struct nlm_block, b_list);
>  
>  		if (block->b_when == NLM_NEVER)
> diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
> index 456af7d230cf..646425f1dc36 100644
> --- a/fs/nfs/callback.c
> +++ b/fs/nfs/callback.c
> @@ -111,7 +111,7 @@ nfs41_callback_svc(void *vrqstp)
>  
>  	set_freezable();
>  
> -	while (!kthread_freezable_should_stop(NULL)) {
> +	while (!svc_should_stop(rqstp)) {
>  
>  		if (signal_pending(current))
>  			flush_signals(current);
> @@ -130,10 +130,11 @@ nfs41_callback_svc(void *vrqstp)
>  				error);
>  		} else {
>  			spin_unlock_bh(&serv->sv_cb_lock);
> -			if (!kthread_should_stop())
> +			if (!svc_should_stop(rqstp))
>  				schedule();
>  			finish_wait(&serv->sv_cb_waitq, &wq);
>  		}
> +		try_to_freeze();
>  	}
>  
>  	svc_exit_thread(rqstp);
> diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c
> index 9c7b1ef5be40..7cfa7f2e9bf7 100644
> --- a/fs/nfsd/nfssvc.c
> +++ b/fs/nfsd/nfssvc.c
> @@ -62,8 +62,7 @@ static __be32			nfsd_init_request(struct svc_rqst *,
>   * If (out side the lock) nn->nfsd_serv is non-NULL, then it must point to a
>   * properly initialised 'struct svc_serv' with ->sv_nrthreads > 0 (unless
>   * nn->keep_active is set).  That number of nfsd threads must
> - * exist and each must be listed in ->sp_all_threads in some entry of
> - * ->sv_pools[].
> + * exist.
>   *
>   * Each active thread holds a counted reference on nn->nfsd_serv, as does
>   * the nn->keep_active flag and various transient calls to svc_get().
> diff --git a/include/linux/llist.h b/include/linux/llist.h
> index 85bda2d02d65..5a22499844c8 100644
> --- a/include/linux/llist.h
> +++ b/include/linux/llist.h
> @@ -248,6 +248,10 @@ static inline struct llist_node *__llist_del_all(struct llist_head *head)
>  }
>  
>  extern struct llist_node *llist_del_first(struct llist_head *head);
> +extern struct llist_node *llist_del_first_this(struct llist_head *head,
> +					       struct llist_node *this);
> +extern struct llist_node *llist_del_all_this(struct llist_head *head,
> +					     struct llist_node *this);
>  
>  struct llist_node *llist_reverse_order(struct llist_node *head);
>  
> diff --git a/include/linux/lockd/lockd.h b/include/linux/lockd/lockd.h
> index f42594a9efe0..c48020e7ee08 100644
> --- a/include/linux/lockd/lockd.h
> +++ b/include/linux/lockd/lockd.h
> @@ -280,7 +280,7 @@ __be32		  nlmsvc_testlock(struct svc_rqst *, struct nlm_file *,
>  			struct nlm_host *, struct nlm_lock *,
>  			struct nlm_lock *, struct nlm_cookie *);
>  __be32		  nlmsvc_cancel_blocked(struct net *net, struct nlm_file *, struct nlm_lock *);
> -unsigned long	  nlmsvc_retry_blocked(void);
> +unsigned long	  nlmsvc_retry_blocked(struct svc_rqst *rqstp);
>  void		  nlmsvc_traverse_blocks(struct nlm_host *, struct nlm_file *,
>  					nlm_host_match_fn_t match);
>  void		  nlmsvc_grant_reply(struct nlm_cookie *, __be32);
> diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
> index 366f2b6b689c..cb2497b977c1 100644
> --- a/include/linux/sunrpc/svc.h
> +++ b/include/linux/sunrpc/svc.h
> @@ -31,11 +31,11 @@
>   * node traffic on multi-node NUMA NFS servers.
>   */
>  struct svc_pool {
> -	unsigned int		sp_id;	    	/* pool id; also node id on NUMA */
> -	spinlock_t		sp_lock;	/* protects all fields */
> +	unsigned int		sp_id;		/* pool id; also node id on NUMA */
> +	spinlock_t		sp_lock;	/* protects all sp_socketsn sp_nrthreads*/
>  	struct list_head	sp_sockets;	/* pending sockets */
>  	unsigned int		sp_nrthreads;	/* # of threads in pool */
> -	struct list_head	sp_all_threads;	/* all server threads */
> +	struct llist_head	sp_idle_threads;/* idle server threads */

Actually... Lorenzo reminded me that we need to retain a mechanism
that can iterate through all the threads in a pool. The xarray
replaces the "all_threads" list in this regard.


>  	/* statistics on pool operation */
>  	struct percpu_counter	sp_sockets_queued;
> @@ -43,12 +43,17 @@ struct svc_pool {
>  	struct percpu_counter	sp_threads_timedout;
>  	struct percpu_counter	sp_threads_starved;
>  
> -#define	SP_TASK_PENDING		(0)		/* still work to do even if no
> -						 * xprt is queued. */
> -#define SP_CONGESTED		(1)
>  	unsigned long		sp_flags;
>  } ____cacheline_aligned_in_smp;
>  
> +enum svc_sp_flags {
> +	SP_TASK_PENDING,	/* still work to do even if no xprt is queued */
> +	SP_CONGESTED,
> +	SP_NEED_VICTIM,		/* One thread needs to agree to exit */
> +	SP_VICTIM_REMAINS,	/* One thread needs to actually exit */
> +	SP_CLEANUP,		/* A thread has set RQ_CLEAN_ME */
> +};
> +
>  /*
>   * RPC service.
>   *
> @@ -195,7 +200,7 @@ extern u32 svc_max_payload(const struct svc_rqst *rqstp);
>   * processed.
>   */
>  struct svc_rqst {
> -	struct list_head	rq_all;		/* all threads list */
> +	struct llist_node	rq_idle;	/* On pool's idle list */
>  	struct rcu_head		rq_rcu_head;	/* for RCU deferred kfree */
>  	struct svc_xprt *	rq_xprt;	/* transport ptr */
>  
> @@ -233,16 +238,6 @@ struct svc_rqst {
>  	u32			rq_proc;	/* procedure number */
>  	u32			rq_prot;	/* IP protocol */
>  	int			rq_cachetype;	/* catering to nfsd */
> -#define	RQ_SECURE	(0)			/* secure port */
> -#define	RQ_LOCAL	(1)			/* local request */
> -#define	RQ_USEDEFERRAL	(2)			/* use deferral */
> -#define	RQ_DROPME	(3)			/* drop current reply */
> -#define	RQ_SPLICE_OK	(4)			/* turned off in gss privacy
> -						 * to prevent encrypting page
> -						 * cache pages */
> -#define	RQ_VICTIM	(5)			/* about to be shut down */
> -#define	RQ_BUSY		(6)			/* request is busy */
> -#define	RQ_DATA		(7)			/* request has data */
>  	unsigned long		rq_flags;	/* flags field */
>  	ktime_t			rq_qtime;	/* enqueue time */
>  
> @@ -274,6 +269,20 @@ struct svc_rqst {
>  	void **			rq_lease_breaker; /* The v4 client breaking a lease */
>  };
>  
> +enum svc_rq_flags {
> +	RQ_SECURE,			/* secure port */
> +	RQ_LOCAL,			/* local request */
> +	RQ_USEDEFERRAL,			/* use deferral */
> +	RQ_DROPME,			/* drop current reply */
> +	RQ_SPLICE_OK,			/* turned off in gss privacy
> +					 * to prevent encrypting page
> +					 * cache pages */
> +	RQ_VICTIM,			/* agreed to shut down */
> +	RQ_DATA,			/* request has data */
> +	RQ_CLEAN_ME,			/* Thread needs to exit but
> +					 * is on the idle list */
> +};
> +
>  #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)
>  
>  /*
> @@ -309,6 +318,15 @@ static inline struct sockaddr *svc_daddr(const struct svc_rqst *rqst)
>  	return (struct sockaddr *) &rqst->rq_daddr;
>  }
>  
> +static inline bool svc_should_stop(struct svc_rqst *rqstp)
> +{
> +	if (test_and_clear_bit(SP_NEED_VICTIM, &rqstp->rq_pool->sp_flags)) {
> +		set_bit(RQ_VICTIM, &rqstp->rq_flags);
> +		return true;
> +	}
> +	return test_bit(RQ_VICTIM, &rqstp->rq_flags);
> +}
> +
>  struct svc_deferred_req {
>  	u32			prot;	/* protocol (UDP or TCP) */
>  	struct svc_xprt		*xprt;
> @@ -416,6 +434,7 @@ bool		   svc_rqst_replace_page(struct svc_rqst *rqstp,
>  void		   svc_rqst_release_pages(struct svc_rqst *rqstp);
>  void		   svc_rqst_free(struct svc_rqst *);
>  void		   svc_exit_thread(struct svc_rqst *);
> +bool		   svc_dequeue_rqst(struct svc_rqst *rqstp);
>  struct svc_serv *  svc_create_pooled(struct svc_program *, unsigned int,
>  				     int (*threadfn)(void *data));
>  int		   svc_set_num_threads(struct svc_serv *, struct svc_pool *, int);
> @@ -428,7 +447,7 @@ int		   svc_register(const struct svc_serv *, struct net *, const int,
>  
>  void		   svc_wake_up(struct svc_serv *);
>  void		   svc_reserve(struct svc_rqst *rqstp, int space);
> -struct svc_rqst	  *svc_pool_wake_idle_thread(struct svc_serv *serv,
> +bool		   svc_pool_wake_idle_thread(struct svc_serv *serv,
>  					     struct svc_pool *pool);
>  struct svc_pool   *svc_pool_for_cpu(struct svc_serv *serv);
>  char *		   svc_print_addr(struct svc_rqst *, char *, size_t);
> diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h
> index f6fd48961074..f63289d1491d 100644
> --- a/include/trace/events/sunrpc.h
> +++ b/include/trace/events/sunrpc.h
> @@ -1601,7 +1601,7 @@ DEFINE_SVCXDRBUF_EVENT(sendto);
>  	svc_rqst_flag(DROPME)						\
>  	svc_rqst_flag(SPLICE_OK)					\
>  	svc_rqst_flag(VICTIM)						\
> -	svc_rqst_flag(BUSY)						\
> +	svc_rqst_flag(CLEAN_ME)						\
>  	svc_rqst_flag_end(DATA)
>  
>  #undef svc_rqst_flag
> @@ -1965,7 +1965,10 @@ TRACE_EVENT(svc_xprt_enqueue,
>  #define show_svc_pool_flags(x)						\
>  	__print_flags(x, "|",						\
>  		{ BIT(SP_TASK_PENDING),		"TASK_PENDING" },	\
> -		{ BIT(SP_CONGESTED),		"CONGESTED" })
> +		{ BIT(SP_CONGESTED),		"CONGESTED" },		\
> +		{ BIT(SP_NEED_VICTIM),		"NEED_VICTIM" },	\
> +		{ BIT(SP_VICTIM_REMAINS),	"VICTIM_REMAINS" },	\
> +		{ BIT(SP_CLEANUP),		"CLEANUP" })
>  
>  DECLARE_EVENT_CLASS(svc_pool_scheduler_class,
>  	TP_PROTO(
> diff --git a/lib/llist.c b/lib/llist.c
> index 6e668fa5a2c6..660be07795ac 100644
> --- a/lib/llist.c
> +++ b/lib/llist.c
> @@ -65,6 +65,57 @@ struct llist_node *llist_del_first(struct llist_head *head)
>  }
>  EXPORT_SYMBOL_GPL(llist_del_first);
>  
> +/**
> + * llist_del_first_this - delete given entry of lock-less list if it is first
> + * @head:	the head for your lock-less list
> + * @this:	a list entry.
> + *
> + * If head of the list is given entry, delete and return it, else
> + * return %NULL.
> + *
> + * Providing the caller has exclusive access to @this, multiple callers can
> + * safely call this concurrently with multiple llist_add() callers.
> + */
> +struct llist_node *llist_del_first_this(struct llist_head *head,
> +					struct llist_node *this)
> +{
> +	struct llist_node *entry, *next;
> +
> +	entry = smp_load_acquire(&head->first);
> +	do {
> +		if (entry != this)
> +			return NULL;
> +		next = READ_ONCE(entry->next);
> +	} while (!try_cmpxchg(&head->first, &entry, next));
> +
> +	return entry;
> +}
> +EXPORT_SYMBOL_GPL(llist_del_first_this);
> +
> +/**
> + * llist_del_all_this - delete all entries from lock-less list if first is the given element
> + * @head:	the head of lock-less list to delete all entries
> + * @this:	the expected first element.
> + *
> + * If the first element of the list is @this, delete all elements and
> + * return them, else return %NULL.  Providing the caller has exclusive access
> + * to @this, multiple concurrent callers can call this or list_del_first_this()
> + * simultaneuously with multiple callers of llist_add().
> + */
> +struct llist_node *llist_del_all_this(struct llist_head *head,
> +				      struct llist_node *this)
> +{
> +	struct llist_node *entry;
> +
> +	entry = smp_load_acquire(&head->first);
> +	do {
> +		if (entry != this)
> +			return NULL;
> +	} while (!try_cmpxchg(&head->first, &entry, NULL));
> +
> +	return entry;
> +}
> +
>  /**
>   * llist_reverse_order - reverse order of a llist chain
>   * @head:	first item of the list to be reversed
> diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
> index ffb7200e8257..55339cbbbc6e 100644
> --- a/net/sunrpc/svc.c
> +++ b/net/sunrpc/svc.c
> @@ -507,7 +507,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
>  
>  		pool->sp_id = i;
>  		INIT_LIST_HEAD(&pool->sp_sockets);
> -		INIT_LIST_HEAD(&pool->sp_all_threads);
> +		init_llist_head(&pool->sp_idle_threads);
>  		spin_lock_init(&pool->sp_lock);
>  
>  		percpu_counter_init(&pool->sp_sockets_queued, 0, GFP_KERNEL);
> @@ -652,9 +652,9 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
>  
>  	pagevec_init(&rqstp->rq_pvec);
>  
> -	__set_bit(RQ_BUSY, &rqstp->rq_flags);
>  	rqstp->rq_server = serv;
>  	rqstp->rq_pool = pool;
> +	rqstp->rq_idle.next = &rqstp->rq_idle;
>  
>  	rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0);
>  	if (!rqstp->rq_scratch_page)
> @@ -694,7 +694,6 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
>  
>  	spin_lock_bh(&pool->sp_lock);
>  	pool->sp_nrthreads++;
> -	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
>  	spin_unlock_bh(&pool->sp_lock);
>  	return rqstp;
>  }
> @@ -704,32 +703,34 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
>   * @serv: RPC service
>   * @pool: service thread pool
>   *
> - * Returns an idle service thread (now marked BUSY), or NULL
> - * if no service threads are available. Finding an idle service
> - * thread and marking it BUSY is atomic with respect to other
> - * calls to svc_pool_wake_idle_thread().
> + * If there are any idle threads in the pool, wake one up and return
> + * %true, else return %false.  The thread will become non-idle once
> + * the scheduler schedules it, at which point is might wake another
> + * thread if there seems to be enough work to justify that.
>   */
> -struct svc_rqst *svc_pool_wake_idle_thread(struct svc_serv *serv,
> -					   struct svc_pool *pool)
> +bool svc_pool_wake_idle_thread(struct svc_serv *serv,
> +			       struct svc_pool *pool)
>  {
>  	struct svc_rqst	*rqstp;
> +	struct llist_node *ln;
>  
>  	rcu_read_lock();
> -	list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
> -		if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
> -			continue;
> -
> -		rcu_read_unlock();
> +	ln = READ_ONCE(pool->sp_idle_threads.first);
> +	if (ln) {
> +		rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
>  		WRITE_ONCE(rqstp->rq_qtime, ktime_get());
> -		wake_up_process(rqstp->rq_task);
> -		percpu_counter_inc(&pool->sp_threads_woken);
> -		return rqstp;
> +		if (!task_is_running(rqstp->rq_task)) {
> +			wake_up_process(rqstp->rq_task);
> +			percpu_counter_inc(&pool->sp_threads_woken);
> +		}
> +		rcu_read_unlock();
> +		return true;
>  	}
>  	rcu_read_unlock();
>  
>  	trace_svc_pool_starved(serv, pool);
>  	percpu_counter_inc(&pool->sp_threads_starved);
> -	return NULL;
> +	return false;
>  }
>  
>  static struct svc_pool *
> @@ -738,19 +739,22 @@ svc_pool_next(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
>  	return pool ? pool : &serv->sv_pools[(*state)++ % serv->sv_nrpools];
>  }
>  
> -static struct task_struct *
> +static struct svc_pool *
>  svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
>  {
> -	unsigned int i;
> -	struct task_struct *task = NULL;
>  
>  	if (pool != NULL) {
>  		spin_lock_bh(&pool->sp_lock);
> +		if (pool->sp_nrthreads > 0)
> +			goto found_pool;
> +		spin_unlock_bh(&pool->sp_lock);
> +		return NULL;
>  	} else {
> +		unsigned int i;
>  		for (i = 0; i < serv->sv_nrpools; i++) {
>  			pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
>  			spin_lock_bh(&pool->sp_lock);
> -			if (!list_empty(&pool->sp_all_threads))
> +			if (pool->sp_nrthreads > 0)
>  				goto found_pool;
>  			spin_unlock_bh(&pool->sp_lock);
>  		}
> @@ -758,16 +762,10 @@ svc_pool_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *stat
>  	}
>  
>  found_pool:
> -	if (!list_empty(&pool->sp_all_threads)) {
> -		struct svc_rqst *rqstp;
> -
> -		rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
> -		set_bit(RQ_VICTIM, &rqstp->rq_flags);
> -		list_del_rcu(&rqstp->rq_all);
> -		task = rqstp->rq_task;
> -	}
> +	set_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
> +	set_bit(SP_NEED_VICTIM, &pool->sp_flags);
>  	spin_unlock_bh(&pool->sp_lock);
> -	return task;
> +	return pool;
>  }
>  
>  static int
> @@ -808,18 +806,16 @@ svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
>  static int
>  svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
>  {
> -	struct svc_rqst	*rqstp;
> -	struct task_struct *task;
>  	unsigned int state = serv->sv_nrthreads-1;
> +	struct svc_pool *vpool;
>  
>  	do {
> -		task = svc_pool_victim(serv, pool, &state);
> -		if (task == NULL)
> +		vpool = svc_pool_victim(serv, pool, &state);
> +		if (vpool == NULL)
>  			break;
> -		rqstp = kthread_data(task);
> -		/* Did we lose a race to svo_function threadfn? */
> -		if (kthread_stop(task) == -EINTR)
> -			svc_exit_thread(rqstp);
> +		svc_pool_wake_idle_thread(serv, vpool);
> +		wait_on_bit(&vpool->sp_flags, SP_VICTIM_REMAINS,
> +			    TASK_UNINTERRUPTIBLE);
>  		nrservs++;
>  	} while (nrservs < 0);
>  	return 0;
> @@ -931,16 +927,75 @@ svc_rqst_free(struct svc_rqst *rqstp)
>  }
>  EXPORT_SYMBOL_GPL(svc_rqst_free);
>  
> +bool svc_dequeue_rqst(struct svc_rqst *rqstp)
> +{
> +	struct svc_pool *pool = rqstp->rq_pool;
> +	struct llist_node *le, *last;
> +
> +retry:
> +	if (pool->sp_idle_threads.first != &rqstp->rq_idle)
> +		/* Not at head of queue, so cannot wake up */
> +		return false;
> +	if (!test_and_clear_bit(SP_CLEANUP, &pool->sp_flags)) {
> +		le = llist_del_first_this(&pool->sp_idle_threads,
> +					  &rqstp->rq_idle);
> +		if (le)
> +			le->next = le;
> +		return !!le;
> +	}
> +	/* Need to deal will RQ_CLEAN_ME thread */
> +	le = llist_del_all_this(&pool->sp_idle_threads,
> +				&rqstp->rq_idle);
> +	if (!le) {
> +		/* lost a race, someone else need to clean up */
> +		set_bit(SP_CLEANUP, &pool->sp_flags);
> +		svc_pool_wake_idle_thread(rqstp->rq_server,
> +					  pool);
> +		goto retry;
> +	}
> +	if (!le->next)
> +		return true;
> +	last = le;
> +	while (last->next) {
> +		rqstp = list_entry(last->next, struct svc_rqst, rq_idle);
> +		if (!test_bit(RQ_CLEAN_ME, &rqstp->rq_flags)) {
> +			last = last->next;
> +			continue;
> +		}
> +		last->next = last->next->next;
> +		rqstp->rq_idle.next = &rqstp->rq_idle;
> +		wake_up_process(rqstp->rq_task);
> +	}
> +	if (last != le)
> +		llist_add_batch(le->next, last, &pool->sp_idle_threads);
> +	le->next = le;
> +	return true;
> +}
> +
>  void
>  svc_exit_thread(struct svc_rqst *rqstp)
>  {
>  	struct svc_serv	*serv = rqstp->rq_server;
>  	struct svc_pool	*pool = rqstp->rq_pool;
>  
> +	while (rqstp->rq_idle.next != &rqstp->rq_idle) {
> +		/* Still on the idle list. */
> +		if (llist_del_first_this(&pool->sp_idle_threads,
> +					 &rqstp->rq_idle)) {
> +			/* Safely removed */
> +			rqstp->rq_idle.next = &rqstp->rq_idle;
> +		} else {
> +			set_current_state(TASK_UNINTERRUPTIBLE);
> +			set_bit(RQ_CLEAN_ME, &rqstp->rq_flags);
> +			set_bit(SP_CLEANUP, &pool->sp_flags);
> +			svc_pool_wake_idle_thread(serv, pool);
> +			if (!svc_dequeue_rqst(rqstp))
> +				schedule();
> +			__set_current_state(TASK_RUNNING);
> +		}
> +	}
>  	spin_lock_bh(&pool->sp_lock);
>  	pool->sp_nrthreads--;
> -	if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
> -		list_del_rcu(&rqstp->rq_all);
>  	spin_unlock_bh(&pool->sp_lock);
>  
>  	spin_lock_bh(&serv->sv_lock);
> @@ -948,6 +1003,8 @@ svc_exit_thread(struct svc_rqst *rqstp)
>  	spin_unlock_bh(&serv->sv_lock);
>  	svc_sock_update_bufs(serv);
>  
> +	if (test_bit(RQ_VICTIM, &rqstp->rq_flags))
> +		clear_and_wake_up_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
>  	svc_rqst_free(rqstp);
>  
>  	svc_put(serv);
> diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
> index c4521bce1f27..d51587cd8d99 100644
> --- a/net/sunrpc/svc_xprt.c
> +++ b/net/sunrpc/svc_xprt.c
> @@ -446,7 +446,6 @@ static bool svc_xprt_ready(struct svc_xprt *xprt)
>   */
>  void svc_xprt_enqueue(struct svc_xprt *xprt)
>  {
> -	struct svc_rqst *rqstp;
>  	struct svc_pool *pool;
>  
>  	if (!svc_xprt_ready(xprt))
> @@ -467,20 +466,19 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
>  	list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
>  	spin_unlock_bh(&pool->sp_lock);
>  
> -	rqstp = svc_pool_wake_idle_thread(xprt->xpt_server, pool);
> -	if (!rqstp) {
> +	if (!svc_pool_wake_idle_thread(xprt->xpt_server, pool)) {
>  		set_bit(SP_CONGESTED, &pool->sp_flags);
>  		return;
>  	}
>  
> -	trace_svc_xprt_enqueue(xprt, rqstp);
> +	// trace_svc_xprt_enqueue(xprt, rqstp);
>  }
>  EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
>  
>  /*
>   * Dequeue the first transport, if there is one.
>   */
> -static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
> +static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool, bool *more)
>  {
>  	struct svc_xprt	*xprt = NULL;
>  
> @@ -493,6 +491,7 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
>  					struct svc_xprt, xpt_ready);
>  		list_del_init(&xprt->xpt_ready);
>  		svc_xprt_get(xprt);
> +		*more = !list_empty(&pool->sp_sockets);
>  	}
>  	spin_unlock_bh(&pool->sp_lock);
>  out:
> @@ -577,15 +576,13 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
>  void svc_wake_up(struct svc_serv *serv)
>  {
>  	struct svc_pool *pool = &serv->sv_pools[0];
> -	struct svc_rqst *rqstp;
>  
> -	rqstp = svc_pool_wake_idle_thread(serv, pool);
> -	if (!rqstp) {
> +	if (!svc_pool_wake_idle_thread(serv, pool)) {
>  		set_bit(SP_TASK_PENDING, &pool->sp_flags);
>  		return;
>  	}
>  
> -	trace_svc_wake_up(rqstp);
> +	// trace_svc_wake_up(rqstp);
>  }
>  EXPORT_SYMBOL_GPL(svc_wake_up);
>  
> @@ -676,7 +673,7 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
>  			continue;
>  
>  		set_current_state(TASK_INTERRUPTIBLE);
> -		if (signalled() || kthread_should_stop()) {
> +		if (signalled() || svc_should_stop(rqstp)) {
>  			set_current_state(TASK_RUNNING);
>  			return -EINTR;
>  		}
> @@ -706,7 +703,10 @@ rqst_should_sleep(struct svc_rqst *rqstp)
>  	struct svc_pool		*pool = rqstp->rq_pool;
>  
>  	/* did someone call svc_wake_up? */
> -	if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags))
> +	if (test_bit(SP_TASK_PENDING, &pool->sp_flags))
> +		return false;
> +	if (test_bit(SP_CLEANUP, &pool->sp_flags))
> +		/* a signalled thread needs to be released */
>  		return false;
>  
>  	/* was a socket queued? */
> @@ -714,7 +714,7 @@ rqst_should_sleep(struct svc_rqst *rqstp)
>  		return false;
>  
>  	/* are we shutting down? */
> -	if (signalled() || kthread_should_stop())
> +	if (signalled() || svc_should_stop(rqstp))
>  		return false;
>  
>  	/* are we freezing? */
> @@ -728,11 +728,9 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
>  {
>  	struct svc_pool		*pool = rqstp->rq_pool;
>  	long			time_left = 0;
> +	bool			more = false;
>  
> -	/* rq_xprt should be clear on entry */
> -	WARN_ON_ONCE(rqstp->rq_xprt);
> -
> -	rqstp->rq_xprt = svc_xprt_dequeue(pool);
> +	rqstp->rq_xprt = svc_xprt_dequeue(pool, &more);
>  	if (rqstp->rq_xprt) {
>  		trace_svc_pool_polled(pool, rqstp);
>  		goto out_found;
> @@ -743,11 +741,10 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
>  	 * to bring down the daemons ...
>  	 */
>  	set_current_state(TASK_INTERRUPTIBLE);
> -	smp_mb__before_atomic();
> -	clear_bit(SP_CONGESTED, &pool->sp_flags);
> -	clear_bit(RQ_BUSY, &rqstp->rq_flags);
> -	smp_mb__after_atomic();
> +	clear_bit_unlock(SP_CONGESTED, &pool->sp_flags);
>  
> +	llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
> +again:
>  	if (likely(rqst_should_sleep(rqstp)))
>  		time_left = schedule_timeout(timeout);
>  	else
> @@ -755,9 +752,20 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
>  
>  	try_to_freeze();
>  
> -	set_bit(RQ_BUSY, &rqstp->rq_flags);
> -	smp_mb__after_atomic();
> -	rqstp->rq_xprt = svc_xprt_dequeue(pool);
> +	if (!svc_dequeue_rqst(rqstp)) {
> +		if (signalled())
> +			/* Can only return while on idle list if signalled */
> +			return ERR_PTR(-EINTR);
> +		/* Still on the idle list */
> +		goto again;
> +	}
> +
> +	clear_bit(SP_TASK_PENDING, &pool->sp_flags);
> +
> +	if (svc_should_stop(rqstp))
> +		return ERR_PTR(-EINTR);
> +
> +	rqstp->rq_xprt = svc_xprt_dequeue(pool, &more);
>  	if (rqstp->rq_xprt) {
>  		trace_svc_pool_awoken(pool, rqstp);
>  		goto out_found;
> @@ -766,10 +774,11 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
>  	if (!time_left)
>  		percpu_counter_inc(&pool->sp_threads_timedout);
>  
> -	if (signalled() || kthread_should_stop())
> -		return ERR_PTR(-EINTR);
>  	return ERR_PTR(-EAGAIN);
>  out_found:
> +	if (more)
> +		svc_pool_wake_idle_thread(rqstp->rq_server, pool);
> +
>  	/* Normally we will wait up to 5 seconds for any required
>  	 * cache information to be provided.
>  	 */
> @@ -866,7 +875,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
>  	try_to_freeze();
>  	cond_resched();
>  	err = -EINTR;
> -	if (signalled() || kthread_should_stop())
> +	if (signalled() || svc_should_stop(rqstp))
>  		goto out;
>  
>  	xprt = svc_get_next_xprt(rqstp, timeout);
> -- 
> 2.40.1
> 



[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux