Re: [PATCH 05/10] lib: add light-weight queuing mechanism.

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, 31 Aug 2023, Chuck Lever wrote:
> On Wed, Aug 30, 2023 at 12:54:48PM +1000, NeilBrown wrote:
> > lwq is a FIFO single-linked queue that only requires a spinlock
> > for dequeueing, which happens in process context.  Enqueueing is atomic
> > with no spinlock and can happen in any context.
> > 
> > Include a unit test for basic functionality - runs at boot time.  Does
> > not use kunit framework.
> > 
> > Signed-off-by: NeilBrown <neilb@xxxxxxx>
> > ---
> >  include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++
> >  lib/Kconfig         |   5 ++
> >  lib/Makefile        |   2 +-
> >  lib/lwq.c           | 149 ++++++++++++++++++++++++++++++++++++++++++++
> >  4 files changed, 275 insertions(+), 1 deletion(-)
> >  create mode 100644 include/linux/lwq.h
> >  create mode 100644 lib/lwq.c
> > 
> > diff --git a/include/linux/lwq.h b/include/linux/lwq.h
> > new file mode 100644
> > index 000000000000..52b9c81b493a
> > --- /dev/null
> > +++ b/include/linux/lwq.h
> > @@ -0,0 +1,120 @@
> > +/* SPDX-License-Identifier: GPL-2.0-only */
> > +
> > +#ifndef LWQ_H
> > +#define LWQ_H
> > +/*
> > + * light-weight single-linked queue built from llist
> > + *
> > + * Entries can be enqueued from any context with no locking.
> > + * Entries can be dequeued from process context with integrated locking.
> > + */
> > +#include <linux/container_of.h>
> > +#include <linux/spinlock.h>
> > +#include <linux/llist.h>
> > +
> > +struct lwq_node {
> > +	struct llist_node node;
> > +};
> > +
> > +struct lwq {
> > +	spinlock_t		lock;
> > +	struct llist_node	*ready;		/* entries to be dequeued */
> > +	struct llist_head	new;		/* entries being enqueued */
> > +};
> > +
> > +/**
> > + * lwq_init - initialise a lwq
> > + * @q:	the lwq object
> > + */
> > +static inline void lwq_init(struct lwq *q)
> > +{
> > +	spin_lock_init(&q->lock);
> > +	q->ready = NULL;
> > +	init_llist_head(&q->new);
> > +}
> > +
> > +/**
> > + * lwq_empty - test if lwq contains any entry
> > + * @q:	the lwq object
> > + *
> > + * This empty test contains an acquire barrier so that if a wakeup
> > + * is sent when lwq_dequeue returns true, it is safe to go to sleep after
> > + * a test on lwq_empty().
> > + */
> > +static inline bool lwq_empty(struct lwq *q)
> > +{
> > +	/* acquire ensures ordering wrt lwq_enqueue() */
> > +	return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new);
> > +}
> > +
> > +struct llist_node *__lwq_dequeue(struct lwq *q);
> > +/**
> > + * lwq_dequeue - dequeue first (oldest) entry from lwq
> > + * @q:		the queue to dequeue from
> > + * @type:	the type of object to return
> > + * @member:	them member in returned object which is an lwq_node.
> > + *
> > + * Remove a single object from the lwq and return it.  This will take
> > + * a spinlock and so must always be called in the same context, typcially
> > + * process contet.
> > + */
> > +#define lwq_dequeue(q, type, member)					\
> > +	({ struct llist_node *_n = __lwq_dequeue(q);			\
> > +	  _n ? container_of(_n, type, member.node) : NULL; })
> > +
> > +struct llist_node *lwq_dequeue_all(struct lwq *q);
> > +
> > +/**
> > + * lwq_for_each_safe - iterate over detached queue allowing deletion
> > + * @_n:		iterator variable
> > + * @_t1:	temporary struct llist_node **
> > + * @_t2:	temporary struct llist_node *
> > + * @_l:		address of llist_node pointer from lwq_dequeue_all()
> > + * @_member:	member in _n where lwq_node is found.
> > + *
> > + * Iterate over members in a dequeued list.  If the iterator variable
> > + * is set to NULL, the iterator removes that entry from the queue.
> > + */
> > +#define lwq_for_each_safe(_n, _t1, _t2, _l, _member)			\
> > +	for (_t1 = (_l);						\
> > +	     *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\
> > +		       _t2 = ((*_t1)->next),				\
> > +		       true)						\
> > +	     : false;							\
> > +	     (_n) ? (_t1 = &(_n)->_member.node.next, 0)			\
> > +	     : ((*(_t1) = (_t2)),  0))
> > +
> > +/**
> > + * lwq_enqueue - add a new item to the end of the queue
> > + * @n	- the lwq_node embedded in the item to be added
> > + * @q	- the lwq to append to.
> > + *
> > + * No locking is needed to append to the queue so this can
> > + * be called from any context.
> > + * Return %true is the list may have previously been empty.
> > + */
> > +static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q)
> > +{
> > +	/* acquire enqures ordering wrt lwq_dequeue */
> > +	return llist_add(&n->node, &q->new) &&
> > +		smp_load_acquire(&q->ready) == NULL;
> > +}
> > +
> > +/**
> > + * lwq_enqueue_batch - add a list of new items to the end of the queue
> > + * @n	- the lwq_node embedded in the first item to be added
> > + * @q	- the lwq to append to.
> > + *
> > + * No locking is needed to append to the queue so this can
> > + * be called from any context.
> > + * Return %true is the list may have previously been empty.
> > + */
> > +static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q)
> > +{
> > +	struct llist_node *e = n;
> > +
> > +	/* acquire enqures ordering wrt lwq_dequeue */
> > +	return llist_add_batch(llist_reverse_order(n), e, &q->new) &&
> > +		smp_load_acquire(&q->ready) == NULL;
> > +}
> > +#endif /* LWQ_H */
> > diff --git a/lib/Kconfig b/lib/Kconfig
> > index 5c2da561c516..6620bdba4f94 100644
> > --- a/lib/Kconfig
> > +++ b/lib/Kconfig
> > @@ -763,3 +763,8 @@ config ASN1_ENCODER
> >  
> >  config POLYNOMIAL
> >         tristate
> > +
> > +config LWQ_TEST
> > +	bool "RPC: enable boot-time test for lwq queuing"
> > +	help
> > +          Enable boot-time test of lwq functionality.
> > diff --git a/lib/Makefile b/lib/Makefile
> > index 1ffae65bb7ee..4b67c2d6af62 100644
> > --- a/lib/Makefile
> > +++ b/lib/Makefile
> > @@ -45,7 +45,7 @@ obj-y	+= lockref.o
> >  obj-y += bcd.o sort.o parser.o debug_locks.o random32.o \
> >  	 bust_spinlocks.o kasprintf.o bitmap.o scatterlist.o \
> >  	 list_sort.o uuid.o iov_iter.o clz_ctz.o \
> > -	 bsearch.o find_bit.o llist.o memweight.o kfifo.o \
> > +	 bsearch.o find_bit.o llist.o lwq.o memweight.o kfifo.o \
> >  	 percpu-refcount.o rhashtable.o base64.o \
> >  	 once.o refcount.o rcuref.o usercopy.o errseq.o bucket_locks.o \
> >  	 generic-radix-tree.o
> > diff --git a/lib/lwq.c b/lib/lwq.c
> > new file mode 100644
> > index 000000000000..d6be6dda3867
> > --- /dev/null
> > +++ b/lib/lwq.c
> > @@ -0,0 +1,149 @@
> > +// SPDX-License-Identifier: GPL-2.0-only
> > +/*
> > + * Light weight single-linked queue.
> > + *
> > + * Entries are enqueued to the head of an llist, with no blocking.
> > + * This can happen in any context.
> > + *
> > + * Entries are dequeued using a spinlock to protect against
> > + * multiple access.  The llist is staged in reverse order, and refreshed
> > + * from the llist when it exhausts.
> > + */
> > +#include <linux/rcupdate.h>
> > +#include <linux/lwq.h>
> > +
> > +struct llist_node *__lwq_dequeue(struct lwq *q)
> > +{
> > +	struct llist_node *this;
> > +
> > +	if (lwq_empty(q))
> > +		return NULL;
> > +	spin_lock(&q->lock);
> > +	this = q->ready;
> > +	if (!this && !llist_empty(&q->new)) {
> > +		/* ensure queue doesn't appear transiently lwq_empty */
> > +		smp_store_release(&q->ready, (void *)1);
> > +		this = llist_reverse_order(llist_del_all(&q->new));
> > +		if (!this)
> > +			q->ready = NULL;
> > +	}
> > +	if (this)
> > +		q->ready = llist_next(this);
> > +	spin_unlock(&q->lock);
> > +	return this;
> > +}
> > +
> > +/**
> > + * lwq_dequeue_all - dequeue all currently enqueued objects
> > + * @q:	the queue to dequeue from
> > + *
> > + * Remove and return a linked list of llist_nodes of all the objects that were
> > + * in the queue. The first on the list will be the object that was least
> > + * recently enqueued.
> > + */
> > +struct llist_node *lwq_dequeue_all(struct lwq *q)
> > +{
> > +	struct llist_node *r, *t, **ep;
> > +
> > +	if (lwq_empty(q))
> > +		return NULL;
> > +
> > +	spin_lock(&q->lock);
> > +	r = q->ready;
> > +	q->ready = NULL;
> > +	t = llist_del_all(&q->new);
> > +	spin_unlock(&q->lock);
> > +	ep = &r;
> > +	while (*ep)
> > +		ep = &(*ep)->next;
> > +	*ep = llist_reverse_order(t);
> > +	return r;
> > +}
> 
> ERROR: modpost: "lwq_dequeue_all" [net/sunrpc/sunrpc.ko] undefined!
> ERROR: modpost: "__lwq_dequeue" [net/sunrpc/sunrpc.ko] undefined!
> make[3]: *** [/home/cel/src/linux/even-releases/scripts/Makefile.modpost:144: Module.symvers] Error 1
> make[2]: *** [/home/cel/src/linux/even-releases/Makefile:1984: modpost] Error 2
> make[1]: *** [/home/cel/src/linux/even-releases/Makefile:234: __sub-make] Error 2
> make: *** [Makefile:234: __sub-make] Error 2
> 
> You might need an EXPORT_SYMBOL_GPL or two now.

:-)  It seems something else did turn up..

Yes,
+EXPORT_SYMBOL_GPL(__lwq_dequeue);
...
+EXPORT_SYMBOL_GPL(lwq_dequeue_all);

should be enough.

Thanks,
NeilBrown




[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux