A simple array based FIFO of pointers. Intended for net stack which commonly has a single consumer/producer. Signed-off-by: Michael S. Tsirkin <mst@xxxxxxxxxx> --- include/linux/ptr_ring.h | 267 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 include/linux/ptr_ring.h diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h new file mode 100644 index 0000000..b40af9a --- /dev/null +++ b/include/linux/ptr_ring.h @@ -0,0 +1,267 @@ +/* + * Definitions for the 'struct ptr_ring' datastructure. + * + * Author: + * Michael S. Tsirkin <mst@xxxxxxxxxx> + * + * Copyright (C) 2016 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This is a limited-size FIFO maintaining pointers in FIFO order, with + * one CPU producing entries and another consuming entries from a FIFO. + * + * This implementation tries to minimize cache-contention when there is a + * single producer and a single consumer CPU. + */ + +#ifndef _LINUX_PTR_RING_H +#define _LINUX_PTR_RING_H 1 + +#ifdef __KERNEL__ +#include <linux/spinlock.h> +#include <linux/cache.h> +#include <linux/types.h> +#include <linux/compiler.h> +#include <linux/cache.h> +#include <linux/slab.h> +#include <asm/errno.h> +#endif + +struct ptr_ring { + int producer ____cacheline_aligned_in_smp; + spinlock_t producer_lock; + int consumer ____cacheline_aligned_in_smp; + spinlock_t consumer_lock; + /* Shared consumer/producer data */ + /* Read-only by both the producer and the consumer */ + int size ____cacheline_aligned_in_smp; /* max entries in queue */ + void **queue; +}; + +/* Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). + * Callers don't need to take producer lock - if they don't + * the next call to __ptr_ring_produce may fail. + */ +static inline bool __ptr_ring_full(struct ptr_ring *r) +{ + return r->queue[r->producer]; +} + +static inline bool ptr_ring_full(struct ptr_ring *r) +{ + barrier(); + return __ptr_ring_full(r); +} + +/* Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). + */ +static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) +{ + if (__ptr_ring_full(r)) + return -ENOSPC; + + r->queue[r->producer++] = ptr; + if (unlikely(r->producer >= r->size)) + r->producer = 0; + return 0; +} + +static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) +{ + int ret; + + spin_lock(&r->producer_lock); + ret = __ptr_ring_produce(r, ptr); + spin_unlock(&r->producer_lock); + + return ret; +} + +static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) +{ + int ret; + + spin_lock_irq(&r->producer_lock); + ret = __ptr_ring_produce(r, ptr); + spin_unlock_irq(&r->producer_lock); + + return ret; +} + +static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) +{ + unsigned long flags; + int ret; + + spin_lock_irqsave(&r->producer_lock, flags); + ret = __ptr_ring_produce(r, ptr); + spin_unlock_irqrestore(&r->producer_lock, flags); + + return ret; +} + +static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) +{ + int ret; + + spin_lock_bh(&r->producer_lock); + ret = __ptr_ring_produce(r, ptr); + spin_unlock_bh(&r->producer_lock); + + return ret; +} + +/* Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). Callers must take consumer_lock + * if they dereference the pointer - see e.g. PTR_RING_PEEK_FIELD. + * There's no need for a lock if pointer is merely tested - see e.g. + * ptr_ring_empty. + */ +static inline void *__ptr_ring_peek(struct ptr_ring *r) +{ + return r->queue[r->consumer]; +} + +static inline bool ptr_ring_empty(struct ptr_ring *r) +{ + barrier(); + return !__ptr_ring_peek(r); +} + +/* Must only be called after __ptr_ring_peek returned !NULL */ +static inline void __ptr_ring_discard_one(struct ptr_ring *r) +{ + r->queue[r->consumer++] = NULL; + if (unlikely(r->consumer >= r->size)) + r->consumer = 0; +} + +static inline void *__ptr_ring_consume(struct ptr_ring *r) +{ + void *ptr; + + ptr = __ptr_ring_peek(r); + if (ptr) + __ptr_ring_discard_one(r); + + return ptr; +} + +static inline void *ptr_ring_consume(struct ptr_ring *r) +{ + void *ptr; + + spin_lock(&r->consumer_lock); + ptr = __ptr_ring_consume(r); + spin_unlock(&r->consumer_lock); + + return ptr; +} + +static inline void *ptr_ring_consume_irq(struct ptr_ring *r) +{ + void *ptr; + + spin_lock_irq(&r->consumer_lock); + ptr = __ptr_ring_consume(r); + spin_unlock_irq(&r->consumer_lock); + + return ptr; +} + +static inline void *ptr_ring_consume_any(struct ptr_ring *r) +{ + unsigned long flags; + void *ptr; + + spin_lock_irqsave(&r->consumer_lock, flags); + ptr = __ptr_ring_consume(r); + spin_unlock_irqrestore(&r->consumer_lock, flags); + + return ptr; +} + +static inline void *ptr_ring_consume_bh(struct ptr_ring *r) +{ + void *ptr; + + spin_lock(&r->consumer_lock); + ptr = __ptr_ring_consume(r); + spin_unlock(&r->consumer_lock); + + return ptr; +} + +/* Cast to structure type and peek at a field without discarding from FIFO. + * Callers must take consumer_lock. + */ +#define __PTR_RING_PEEK_FIELD(r, type, field, dflt) ({ \ + type *__PTR_RING_PEEK_FIELD_p = __ptr_ring_peek(r); \ + \ + __PTR_RING_PEEK_FIELD_p ? __PTR_RING_PEEK_FIELD_p->field : (dflt); \ +}) + +#define PTR_RING_PEEK_FIELD(r, type, field, dflt) ({ \ + typeof(((type *)0)->field) __PTR_RING_PEEK_FIELD_v; \ + \ + spin_lock(&(r)->consumer_lock); \ + __PTR_RING_PEEK_FIELD_v = __PTR_RING_PEEK_FIELD(r, type, field, dflt); \ + spin_unlock(&(r)->consumer_lock); \ + __PTR_RING_PEEK_FIELD_v; \ +}) + +#define PTR_RING_PEEK_FIELD_IRQ(r, type, field, dflt) ({ \ + typeof(((type *)0)->field) __PTR_RING_PEEK_FIELD_v; \ + \ + spin_lock_irq(&(r)->consumer_lock); \ + __PTR_RING_PEEK_FIELD_v = __PTR_RING_PEEK_FIELD(r, type, field, dflt); \ + spin_unlock_irq(&(r)->consumer_lock); \ + __PTR_RING_PEEK_FIELD_v; \ +}) + +#define PTR_RING_PEEK_FIELD_BH(r, type, field, dflt) ({ \ + typeof(((type *)0)->field) __PTR_RING_PEEK_FIELD_v; \ + \ + spin_lock_bh(&(r)->consumer_lock); \ + __PTR_RING_PEEK_FIELD_v = __PTR_RING_PEEK_FIELD(r, type, field, dflt); \ + spin_unlock_bh(&(r)->consumer_lock); \ + __PTR_RING_PEEK_FIELD_v; \ +}) + +#define PTR_RING_PEEK_FIELD_ANY(r, type, field, dflt) ({ \ + typeof(((type *)0)->field) __PTR_RING_PEEK_FIELD_v; \ + unsigned long __PTR_RING_PEEK_FIELD_f;\ + \ + spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_FIELD_f); \ + __PTR_RING_PEEK_FIELD_v = __PTR_RING_PEEK_FIELD(r, type, field, dflt); \ + spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_FIELD_f); \ + __PTR_RING_PEEK_FIELD_v; \ +}) + +static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) +{ + r->queue = kzalloc(ALIGN(size * sizeof *(r->queue), SMP_CACHE_BYTES), + gfp); + if (!r->queue) + return -ENOMEM; + + r->size = size; + r->producer = r->consumer = 0; + spin_lock_init(&r->producer_lock); + spin_lock_init(&r->consumer_lock); + + return 0; +} + +static inline void ptr_ring_cleanup(struct ptr_ring *r) +{ + kfree(r->queue); +} + +#endif /* _LINUX_PTR_RING_H */ -- MST -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html