On Thu, 31 Aug 2023, Chuck Lever wrote: > On Wed, Aug 30, 2023 at 12:54:48PM +1000, NeilBrown wrote: > > lwq is a FIFO single-linked queue that only requires a spinlock > > for dequeueing, which happens in process context. Enqueueing is atomic > > with no spinlock and can happen in any context. > > > > Include a unit test for basic functionality - runs at boot time. Does > > not use kunit framework. > > > > Signed-off-by: NeilBrown <neilb@xxxxxxx> > > --- > > include/linux/lwq.h | 120 +++++++++++++++++++++++++++++++++++ > > lib/Kconfig | 5 ++ > > lib/Makefile | 2 +- > > lib/lwq.c | 149 ++++++++++++++++++++++++++++++++++++++++++++ > > 4 files changed, 275 insertions(+), 1 deletion(-) > > create mode 100644 include/linux/lwq.h > > create mode 100644 lib/lwq.c > > > > diff --git a/include/linux/lwq.h b/include/linux/lwq.h > > new file mode 100644 > > index 000000000000..52b9c81b493a > > --- /dev/null > > +++ b/include/linux/lwq.h > > @@ -0,0 +1,120 @@ > > +/* SPDX-License-Identifier: GPL-2.0-only */ > > + > > +#ifndef LWQ_H > > +#define LWQ_H > > +/* > > + * light-weight single-linked queue built from llist > > + * > > + * Entries can be enqueued from any context with no locking. > > + * Entries can be dequeued from process context with integrated locking. > > + */ > > +#include <linux/container_of.h> > > +#include <linux/spinlock.h> > > +#include <linux/llist.h> > > + > > +struct lwq_node { > > + struct llist_node node; > > +}; > > + > > +struct lwq { > > + spinlock_t lock; > > + struct llist_node *ready; /* entries to be dequeued */ > > + struct llist_head new; /* entries being enqueued */ > > +}; > > + > > +/** > > + * lwq_init - initialise a lwq > > + * @q: the lwq object > > + */ > > +static inline void lwq_init(struct lwq *q) > > +{ > > + spin_lock_init(&q->lock); > > + q->ready = NULL; > > + init_llist_head(&q->new); > > +} > > + > > +/** > > + * lwq_empty - test if lwq contains any entry > > + * @q: the lwq object > > + * > > + * This empty test contains an acquire barrier so that if a wakeup > > + * is sent when lwq_dequeue returns true, it is safe to go to sleep after > > + * a test on lwq_empty(). > > + */ > > +static inline bool lwq_empty(struct lwq *q) > > +{ > > + /* acquire ensures ordering wrt lwq_enqueue() */ > > + return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new); > > +} > > + > > +struct llist_node *__lwq_dequeue(struct lwq *q); > > +/** > > + * lwq_dequeue - dequeue first (oldest) entry from lwq > > + * @q: the queue to dequeue from > > + * @type: the type of object to return > > + * @member: them member in returned object which is an lwq_node. > > + * > > + * Remove a single object from the lwq and return it. This will take > > + * a spinlock and so must always be called in the same context, typcially > > + * process contet. > > + */ > > +#define lwq_dequeue(q, type, member) \ > > + ({ struct llist_node *_n = __lwq_dequeue(q); \ > > + _n ? container_of(_n, type, member.node) : NULL; }) > > + > > +struct llist_node *lwq_dequeue_all(struct lwq *q); > > + > > +/** > > + * lwq_for_each_safe - iterate over detached queue allowing deletion > > + * @_n: iterator variable > > + * @_t1: temporary struct llist_node ** > > + * @_t2: temporary struct llist_node * > > + * @_l: address of llist_node pointer from lwq_dequeue_all() > > + * @_member: member in _n where lwq_node is found. > > + * > > + * Iterate over members in a dequeued list. If the iterator variable > > + * is set to NULL, the iterator removes that entry from the queue. > > + */ > > +#define lwq_for_each_safe(_n, _t1, _t2, _l, _member) \ > > + for (_t1 = (_l); \ > > + *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\ > > + _t2 = ((*_t1)->next), \ > > + true) \ > > + : false; \ > > + (_n) ? (_t1 = &(_n)->_member.node.next, 0) \ > > + : ((*(_t1) = (_t2)), 0)) > > + > > +/** > > + * lwq_enqueue - add a new item to the end of the queue > > + * @n - the lwq_node embedded in the item to be added > > + * @q - the lwq to append to. > > + * > > + * No locking is needed to append to the queue so this can > > + * be called from any context. > > + * Return %true is the list may have previously been empty. > > + */ > > +static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q) > > +{ > > + /* acquire enqures ordering wrt lwq_dequeue */ > > + return llist_add(&n->node, &q->new) && > > + smp_load_acquire(&q->ready) == NULL; > > +} > > + > > +/** > > + * lwq_enqueue_batch - add a list of new items to the end of the queue > > + * @n - the lwq_node embedded in the first item to be added > > + * @q - the lwq to append to. > > + * > > + * No locking is needed to append to the queue so this can > > + * be called from any context. > > + * Return %true is the list may have previously been empty. > > + */ > > +static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q) > > +{ > > + struct llist_node *e = n; > > + > > + /* acquire enqures ordering wrt lwq_dequeue */ > > + return llist_add_batch(llist_reverse_order(n), e, &q->new) && > > + smp_load_acquire(&q->ready) == NULL; > > +} > > +#endif /* LWQ_H */ > > diff --git a/lib/Kconfig b/lib/Kconfig > > index 5c2da561c516..6620bdba4f94 100644 > > --- a/lib/Kconfig > > +++ b/lib/Kconfig > > @@ -763,3 +763,8 @@ config ASN1_ENCODER > > > > config POLYNOMIAL > > tristate > > + > > +config LWQ_TEST > > + bool "RPC: enable boot-time test for lwq queuing" > > Since LWQ is no longer RPC specific, you can drop the "RPC: " from > the option's short description. Thanks. I changed "RPC" to "lib" locally. If I need to resend that will be included, or you could just make the change if nothing else turns up. NeilBrown