On Wed, 10 Dec 2014 09:17:32 -0600 (CST) Christoph Lameter <cl@xxxxxxxxx> wrote: > On Wed, 10 Dec 2014, Jesper Dangaard Brouer wrote: > > > Patch1: alf_queue (Lock-Free queue) > > For some reason that key patch is not in my linux-mm archives nor in my > inbox. That is very strange! I did notice that it was somehow delayed in showing up on gmane.org (http://thread.gmane.org/gmane.linux.network/342347/focus=126148) and didn't show up on netdev either... Inserting it below (hoping my email client does not screw it up): [PATCH RFC] lib: adding an Array-based Lock-Free (ALF) queue From: Jesper Dangaard Brouer <brouer@xxxxxxxxxx> This Array-based Lock-Free (ALF) queue, is a very fast bounded Producer-Consumer queue, supporting bulking. The MPMC (Multi-Producer/Multi-Consumer) variant uses a locked cmpxchg, but the cost can be amorized by utilizing bulk enqueue/dequeue. Results on x86_64 CPU E5-2695, for variants: MPMC = Multi-Producer-Multi-Consumer SPSC = Single-Producer-Single-Consumer (none-bulking): per element cost MPMC and SPSC MPMC -- SPSC simple : 9.519 ns -- 1.282 ns multi(step:128): 12.905 ns -- 2.240 ns The majority of the cost is associated with the locked cmpxchg in the MPMC variant. Bulking helps amortize this cost: (bulking) cost per element comparing MPMC -> SPSC: MPMC -- SPSC bulk2 : 5.849 ns -- 1.748 ns bulk3 : 4.102 ns -- 1.531 ns bulk4 : 3.281 ns -- 1.383 ns bulk6 : 2.530 ns -- 1.238 ns bulk8 : 2.125 ns -- 1.196 ns bulk16: 1.552 ns -- 1.109 ns Joint work with Hannes Frederic Sowa. Signed-off-by: Jesper Dangaard Brouer <brouer@xxxxxxxxxx> Signed-off-by: Hannes Frederic Sowa <hannes@xxxxxxxxxxxxxxxxxxx> ---- Correctness of memory barries on different arch's need to be evaluated. The API might need some adjustments/discussions regarding: 1) the semantics of enq/deq when doing bulking, choosing what to do when e.g. a bulk enqueue cannot fit fully. 2) some better way to detect miss-use of API, e.g. using single-enqueue variant function call on a multi-enqueue variant data structure. Now no detection happens. --- include/linux/alf_queue.h | 303 +++++++++++++++++++++++++++++++++++++++++++++ lib/Kconfig | 13 ++ lib/Makefile | 2 lib/alf_queue.c | 47 +++++++ 4 files changed, 365 insertions(+), 0 deletions(-) create mode 100644 include/linux/alf_queue.h create mode 100644 lib/alf_queue.c diff --git a/include/linux/alf_queue.h b/include/linux/alf_queue.h new file mode 100644 index 0000000..fb1a774 --- /dev/null +++ b/include/linux/alf_queue.h @@ -0,0 +1,303 @@ +#ifndef _LINUX_ALF_QUEUE_H +#define _LINUX_ALF_QUEUE_H +/* linux/alf_queue.h + * + * ALF: Array-based Lock-Free queue + * + * Queue properties + * - Array based for cache-line optimization + * - Bounded by the array size + * - FIFO Producer/Consumer queue, no queue traversal supported + * - Very fast + * - Designed as a queue for pointers to objects + * - Bulk enqueue and dequeue support + * - Supports combinations of Multi and Single Producer/Consumer + * + * Copyright (C) 2014, Red Hat, Inc., + * by Jesper Dangaard Brouer and Hannes Frederic Sowa + * for licensing details see kernel-base/COPYING + */ +#include <linux/compiler.h> +#include <linux/kernel.h> + +struct alf_actor { + u32 head; + u32 tail; +}; + +struct alf_queue { + u32 size; + u32 mask; + u32 flags; + struct alf_actor producer ____cacheline_aligned_in_smp; + struct alf_actor consumer ____cacheline_aligned_in_smp; + void *ring[0] ____cacheline_aligned_in_smp; +}; + +struct alf_queue *alf_queue_alloc(u32 size, gfp_t gfp); +void alf_queue_free(struct alf_queue *q); + +/* Helpers for LOAD and STORE of elements, have been split-out because: + * 1. They can be reused for both "Single" and "Multi" variants + * 2. Allow us to experiment with (pipeline) optimizations in this area. + */ +static inline void +__helper_alf_enqueue_store(u32 p_head, struct alf_queue *q, + void **ptr, const u32 n) +{ + int i, index = p_head; + + for (i = 0; i < n; i++, index++) + q->ring[index & q->mask] = ptr[i]; +} + +static inline void +__helper_alf_dequeue_load(u32 c_head, struct alf_queue *q, + void **ptr, const u32 elems) +{ + int i, index = c_head; + + for (i = 0; i < elems; i++, index++) + ptr[i] = q->ring[index & q->mask]; +} + +/* Main Multi-Producer ENQUEUE + * + * Even-though current API have a "fixed" semantics of aborting if it + * cannot enqueue the full bulk size. Users of this API should check + * on the returned number of enqueue elements match, to verify enqueue + * was successful. This allow us to introduce a "variable" enqueue + * scheme later. + * + * Not preemption safe. Multiple CPUs can enqueue elements, but the + * same CPU is not allowed to be preempted and access the same + * queue. Due to how the tail is updated, this can result in a soft + * lock-up. (Same goes for alf_mc_dequeue). + */ +static inline int +alf_mp_enqueue(const u32 n; + struct alf_queue *q, void *ptr[n], const u32 n) +{ + u32 p_head, p_next, c_tail, space; + + /* Reserve part of the array for enqueue STORE/WRITE */ + do { + p_head = ACCESS_ONCE(q->producer.head); + c_tail = ACCESS_ONCE(q->consumer.tail); + + space = q->size + c_tail - p_head; + if (n > space) + return 0; + + p_next = p_head + n; + } + while (unlikely(cmpxchg(&q->producer.head, p_head, p_next) != p_head)); + + /* STORE the elems into the queue array */ + __helper_alf_enqueue_store(p_head, q, ptr, n); + smp_wmb(); /* Write-Memory-Barrier matching dequeue LOADs */ + + /* Wait for other concurrent preceding enqueues not yet done, + * this part make us none-wait-free and could be problematic + * in case of congestion with many CPUs + */ + while (unlikely(ACCESS_ONCE(q->producer.tail) != p_head)) + cpu_relax(); + /* Mark this enq done and avail for consumption */ + ACCESS_ONCE(q->producer.tail) = p_next; + + return n; +} + +/* Main Multi-Consumer DEQUEUE */ +static inline int +alf_mc_dequeue(const u32 n; + struct alf_queue *q, void *ptr[n], const u32 n) +{ + u32 c_head, c_next, p_tail, elems; + + /* Reserve part of the array for dequeue LOAD/READ */ + do { + c_head = ACCESS_ONCE(q->consumer.head); + p_tail = ACCESS_ONCE(q->producer.tail); + + elems = p_tail - c_head; + + if (elems == 0) + return 0; + else + elems = min(elems, n); + + c_next = c_head + elems; + } + while (unlikely(cmpxchg(&q->consumer.head, c_head, c_next) != c_head)); + + /* LOAD the elems from the queue array. + * We don't need a smb_rmb() Read-Memory-Barrier here because + * the above cmpxchg is an implied full Memory-Barrier. + */ + __helper_alf_dequeue_load(c_head, q, ptr, elems); + + /* Archs with weak Memory Ordering need a memory barrier here. + * As the STORE to q->consumer.tail, must happen after the + * dequeue LOADs. Dequeue LOADs have a dependent STORE into + * ptr, thus a smp_wmb() is enough. Paired with enqueue + * implicit full-MB in cmpxchg. + */ + smp_wmb(); + + /* Wait for other concurrent preceding dequeues not yet done */ + while (unlikely(ACCESS_ONCE(q->consumer.tail) != c_head)) + cpu_relax(); + /* Mark this deq done and avail for producers */ + ACCESS_ONCE(q->consumer.tail) = c_next; + + return elems; +} + +/* #define ASSERT_DEBUG_SPSC 1 */ +#ifndef ASSERT_DEBUG_SPSC +#define ASSERT(x) do { } while (0) +#else +#define ASSERT(x) \ + do { \ + if (unlikely(!(x))) { \ + pr_crit("Assertion failed %s:%d: \"%s\"\n", \ + __FILE__, __LINE__, #x); \ + BUG(); \ + } \ + } while (0) +#endif + +/* Main SINGLE Producer ENQUEUE + * caller MUST make sure preemption is disabled + */ +static inline int +alf_sp_enqueue(const u32 n; + struct alf_queue *q, void *ptr[n], const u32 n) +{ + u32 p_head, p_next, c_tail, space; + + /* Reserve part of the array for enqueue STORE/WRITE */ + p_head = q->producer.head; + smp_rmb(); /* for consumer.tail write, making sure deq loads are done */ + c_tail = ACCESS_ONCE(q->consumer.tail); + + space = q->size + c_tail - p_head; + if (n > space) + return 0; + + p_next = p_head + n; + ASSERT(ACCESS_ONCE(q->producer.head) == p_head); + q->producer.head = p_next; + + /* STORE the elems into the queue array */ + __helper_alf_enqueue_store(p_head, q, ptr, n); + smp_wmb(); /* Write-Memory-Barrier matching dequeue LOADs */ + + /* Assert no other CPU (or same CPU via preemption) changed queue */ + ASSERT(ACCESS_ONCE(q->producer.tail) == p_head); + + /* Mark this enq done and avail for consumption */ + ACCESS_ONCE(q->producer.tail) = p_next; + + return n; +} + +/* Main SINGLE Consumer DEQUEUE + * caller MUST make sure preemption is disabled + */ +static inline int +alf_sc_dequeue(const u32 n; + struct alf_queue *q, void *ptr[n], const u32 n) +{ + u32 c_head, c_next, p_tail, elems; + + /* Reserve part of the array for dequeue LOAD/READ */ + c_head = q->consumer.head; + p_tail = ACCESS_ONCE(q->producer.tail); + + elems = p_tail - c_head; + + if (elems == 0) + return 0; + else + elems = min(elems, n); + + c_next = c_head + elems; + ASSERT(ACCESS_ONCE(q->consumer.head) == c_head); + q->consumer.head = c_next; + + smp_rmb(); /* Read-Memory-Barrier matching enq STOREs */ + __helper_alf_dequeue_load(c_head, q, ptr, elems); + + /* Archs with weak Memory Ordering need a memory barrier here. + * As the STORE to q->consumer.tail, must happen after the + * dequeue LOADs. Dequeue LOADs have a dependent STORE into + * ptr, thus a smp_wmb() is enough. + */ + smp_wmb(); + + /* Assert no other CPU (or same CPU via preemption) changed queue */ + ASSERT(ACCESS_ONCE(q->consumer.tail) == c_head); + + /* Mark this deq done and avail for producers */ + ACCESS_ONCE(q->consumer.tail) = c_next; + + return elems; +} + +static inline bool +alf_queue_empty(struct alf_queue *q) +{ + u32 c_tail = ACCESS_ONCE(q->consumer.tail); + u32 p_tail = ACCESS_ONCE(q->producer.tail); + + /* The empty (and initial state) is when consumer have reached + * up with producer. + * + * DOUBLE-CHECK: Should we use producer.head, as this indicate + * a producer is in-progress(?) + */ + return c_tail == p_tail; +} + +static inline int +alf_queue_count(struct alf_queue *q) +{ + u32 c_head = ACCESS_ONCE(q->consumer.head); + u32 p_tail = ACCESS_ONCE(q->producer.tail); + u32 elems; + + /* Due to u32 arithmetic the values are implicitly + * masked/modulo 32-bit, thus saving one mask operation + */ + elems = p_tail - c_head; + /* Thus, same as: + * elems = (p_tail - c_head) & q->mask; + */ + return elems; +} + +static inline int +alf_queue_avail_space(struct alf_queue *q) +{ + u32 p_head = ACCESS_ONCE(q->producer.head); + u32 c_tail = ACCESS_ONCE(q->consumer.tail); + u32 space; + + /* The max avail space is q->size and + * the empty state is when (consumer == producer) + */ + + /* Due to u32 arithmetic the values are implicitly + * masked/modulo 32-bit, thus saving one mask operation + */ + space = q->size + c_tail - p_head; + /* Thus, same as: + * space = (q->size + c_tail - p_head) & q->mask; + */ + return space; +} + +#endif /* _LINUX_ALF_QUEUE_H */ diff --git a/lib/Kconfig b/lib/Kconfig index 54cf309..3c0cd58 100644 --- a/lib/Kconfig +++ b/lib/Kconfig @@ -439,6 +439,19 @@ config NLATTR bool # +# ALF queue +# +config ALF_QUEUE + bool "ALF: Array-based Lock-Free (Producer-Consumer) queue" + default y + help + This Array-based Lock-Free (ALF) queue, is a very fast + bounded Producer-Consumer queue, supporting bulking. The + MPMC (Multi-Producer/Multi-Consumer) variant uses a locked + cmpxchg, but the cost can be amorized by utilizing bulk + enqueue/dequeue. + +# # Generic 64-bit atomic support is selected if needed # config GENERIC_ATOMIC64 diff --git a/lib/Makefile b/lib/Makefile index 0211d2b..cd3a2d0 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -119,6 +119,8 @@ obj-$(CONFIG_DYNAMIC_DEBUG) += dynamic_debug.o obj-$(CONFIG_NLATTR) += nlattr.o +obj-$(CONFIG_ALF_QUEUE) += alf_queue.o + obj-$(CONFIG_LRU_CACHE) += lru_cache.o obj-$(CONFIG_DMA_API_DEBUG) += dma-debug.o diff --git a/lib/alf_queue.c b/lib/alf_queue.c new file mode 100644 index 0000000..d6c9b69 --- /dev/null +++ b/lib/alf_queue.c @@ -0,0 +1,47 @@ +/* + * lib/alf_queue.c + * + * ALF: Array-based Lock-Free queue + * - Main implementation in: include/linux/alf_queue.h + * + * Copyright (C) 2014, Red Hat, Inc., + * by Jesper Dangaard Brouer and Hannes Frederic Sowa + * for licensing details see kernel-base/COPYING + */ +#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt + +#include <linux/module.h> +#include <linux/slab.h> /* kzalloc */ +#include <linux/alf_queue.h> +#include <linux/log2.h> + +struct alf_queue *alf_queue_alloc(u32 size, gfp_t gfp) +{ + struct alf_queue *q; + size_t mem_size; + + if (!(is_power_of_2(size)) || size > 65536) + return ERR_PTR(-EINVAL); + + /* The ring array is allocated together with the queue struct */ + mem_size = size * sizeof(void *) + sizeof(struct alf_queue); + q = kzalloc(mem_size, gfp); + if (!q) + return ERR_PTR(-ENOMEM); + + q->size = size; + q->mask = size - 1; + + return q; +} +EXPORT_SYMBOL_GPL(alf_queue_alloc); + +void alf_queue_free(struct alf_queue *q) +{ + kfree(q); +} +EXPORT_SYMBOL_GPL(alf_queue_free); + +MODULE_DESCRIPTION("ALF: Array-based Lock-Free queue"); +MODULE_AUTHOR("Jesper Dangaard Brouer <netoptimizer@xxxxxxxxxx>"); +MODULE_LICENSE("GPL"); -- To unsubscribe, send a message with 'unsubscribe linux-mm' in the body to majordomo@xxxxxxxxx. For more info on Linux MM, see: http://www.linux-mm.org/ . Don't email: <a href=mailto:"dont@xxxxxxxxx"> email@xxxxxxxxx </a>