Re: [RFC PATCH 1/4] hazptr: Add initial implementation of hazard pointers

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



2024年9月17日 22:33,Boqun Feng <boqun.feng@xxxxxxxxx> wrote:
> 
> Hazard pointers [1] provide a way to dynamically distribute refcounting
> and can be used to improve the scalability of refcounting without
> significant space cost.
> 
> Hazard pointers are similar to RCU: they build the synchronization
> between two part, readers and updaters. Readers are refcount users, they
> acquire and release refcount. Updaters cleanup objects when there are no
> reader referencing them (via call_hazptr()). The difference is instead
> of waiting for a grace period, hazard pointers can free an object as
> long as there is no one referencing the object. This means for a
> particular workload, hazard pointers may have smaller memory footprint
> due to fewer pending callbacks.
> 
> The synchronization between readers and updaters is built around "hazard
> pointer slots": a slot readers can use to store a pointer value.
> 
> Reader side protection:
> 
> 1. Read the value of a pointer to the target data element.
> 2. Store it to a hazard pointer slot.
> 3. Enforce full ordering (e.g. smp_mb()).
> 4. Re-read the original pointer, reset the slot and retry if the
> value changed.
> 5. Otherwise, the continued existence of the target data element
> is guaranteed.
> 
> Updater side check:
> 
> 1. Unpublish the target data element (e.g. setting the pointer
> value to NULL).
> 2. Enforce full ordering.
> 3. Read the value from a hazard pointer slot.
> 4. If the value doesn't match the target data element, then this
> slot doesn't represent a reference to it.
> 5. Otherwise, updater needs to re-check (step 3).
> 
> To distribute the accesses of hazptr slots from different contexts,
> hazptr_context is introduced. Users need to define/allocate their own
> hazptr_context to allocate hazard pointer slots.
> 
> For the updater side to confirm no existing reference, it needs to scan
> all the possible slots, and to speed up this process, hazptr_context
> also contains an rbtree node for each slot so that updater can cache the
> reader-scan result in an rbtree. The rbtree nodes are pre-allocated in
> this way to prevent "allocate memory to free memory" in extreme cases.
> 
> [1]: M. M. Michael, "Hazard pointers: safe memory reclamation for
>     lock-free objects," in IEEE Transactions on Parallel and
>     Distributed Systems, vol. 15, no. 6, pp. 491-504, June 2004
> 
> Co-developed-by: Paul E. McKenney <paulmck@xxxxxxxxxx>
> Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxx>
> Co-developed-by: Neeraj Upadhyay <neeraj.upadhyay@xxxxxxx>
> Signed-off-by: Neeraj Upadhyay <neeraj.upadhyay@xxxxxxx>
> Signed-off-by: Boqun Feng <boqun.feng@xxxxxxxxx>
> ---
> include/linux/hazptr.h |  83 ++++++++
> kernel/Makefile        |   1 +
> kernel/hazptr.c        | 463 +++++++++++++++++++++++++++++++++++++++++
> 3 files changed, 547 insertions(+)
> create mode 100644 include/linux/hazptr.h
> create mode 100644 kernel/hazptr.c
> 
> diff --git a/include/linux/hazptr.h b/include/linux/hazptr.h
> new file mode 100644
> index 000000000000..4548ca8c75eb
> --- /dev/null
> +++ b/include/linux/hazptr.h
> @@ -0,0 +1,83 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef _LINUX_HAZPTR_H
> +#define _LINUX_HAZPTR_H
> +
> +#include <linux/list.h>
> +#include <linux/spinlock.h>
> +#include <linux/rbtree.h>
> +
> +/* Hazard pointer slot. */
> +typedef void* hazptr_t;
> +
> +#define HAZPTR_SLOT_PER_CTX 8
> +
> +struct hazptr_slot_snap {
> + struct rb_node node;
> + hazptr_t slot;
> +};
> +
> +/*
> + * A set of hazard pointer slots for a context.
> + *
> + * The context can be a task, CPU or reader (depends on the use case). It may
> + * be allocated statically or dynamically. It can only be used after calling
> + * init_hazptr_context(), and users are also responsible to call
> + * cleaup_hazptr_context() when it's not used any more.
> + */
> +struct hazptr_context {
> + // The lock of the percpu context lists.
> + spinlock_t *lock;
> +
> + struct list_head list;
> + struct hazptr_slot_snap snaps[HAZPTR_SLOT_PER_CTX];
> + ____cacheline_aligned hazptr_t slots[HAZPTR_SLOT_PER_CTX];
> +};
> +
> +void init_hazptr_context(struct hazptr_context *hzcp);
> +void cleanup_hazptr_context(struct hazptr_context *hzcp);
> +hazptr_t *hazptr_alloc(struct hazptr_context *hzcp);
> +void hazptr_free(struct hazptr_context *hzcp, hazptr_t *hzp);
> +
> +#define hazptr_tryprotect(hzp, gp, field) (typeof(gp))__hazptr_tryprotect(hzp, (void **)&(gp), offsetof(typeof(*gp), field))
> +#define hazptr_protect(hzp, gp, field) ({ \
> + typeof(gp) ___p; \
> + \
> + ___p = hazptr_tryprotect(hzp, gp, field); \
> + BUG_ON(!___p); \

hazptr_tryprotect might return NULL, do you need a loop here?

> + ___p; \
> +})
> +
> +static inline void *__hazptr_tryprotect(hazptr_t *hzp,
> + void *const *p,
> + unsigned long head_offset)
> +{
> + void *ptr;
> + struct callback_head *head;
> +
> + ptr = READ_ONCE(*p);
> +
> + if (ptr == NULL)
> + return NULL;
> +
> + head = (struct callback_head *)(ptr + head_offset);
> +
> + WRITE_ONCE(*hzp, head);
> + smp_mb();
> +
> + ptr = READ_ONCE(*p); // read again
> +
> + if (ptr + head_offset != head) { // pointer changed
> + WRITE_ONCE(*hzp, NULL);  // reset hazard pointer
> + return NULL;
> + } else
> + return ptr;
> +}
> +
> +static inline void hazptr_clear(hazptr_t *hzp)
> +{
> + /* Pairs with smp_load_acquire() in reader scan. */
> + smp_store_release(hzp, NULL);
> +}
> +
> +void call_hazptr(struct callback_head *head, rcu_callback_t func);
> +#endif
> diff --git a/kernel/Makefile b/kernel/Makefile
> index 3c13240dfc9f..7927264b9870 100644
> --- a/kernel/Makefile
> +++ b/kernel/Makefile
> @@ -50,6 +50,7 @@ obj-y += rcu/
> obj-y += livepatch/
> obj-y += dma/
> obj-y += entry/
> +obj-y += hazptr.o
> obj-$(CONFIG_MODULES) += module/
> 
> obj-$(CONFIG_KCMP) += kcmp.o
> diff --git a/kernel/hazptr.c b/kernel/hazptr.c
> new file mode 100644
> index 000000000000..f22ccc2a4a62
> --- /dev/null
> +++ b/kernel/hazptr.c
> @@ -0,0 +1,463 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +
> +#include <linux/spinlock.h>
> +#include <linux/cleanup.h>
> +#include <linux/hazptr.h>
> +#include <linux/percpu.h>
> +#include <linux/workqueue.h>
> +
> +#define HAZPTR_UNUSED (1ul)
> +
> +/* Per-CPU data for hazard pointer management. */
> +struct hazptr_percpu {
> + spinlock_t ctx_lock; /* hazptr context list lock */
> + struct list_head ctx_list; /* hazptr context list */
> + spinlock_t callback_lock; /* Per-CPU callback queue lock */
> + struct callback_head *queued; /* Per-CPU callback queue */
> + struct callback_head **tail;
> +};
> +
> +/*
> + * Per-CPU data contains context lists and callbacks, which are maintained in a
> + * per-CPU maner to reduce potential contention. This means a global scan (for
> + * readers or callbacks) has to take each CPU's lock, but it's less problematic
> + * because that's a slowpath.
> + */
> +DEFINE_PER_CPU(struct hazptr_percpu, hzpcpu);
> +
> +/* A RBTree that stores the reader scan result of all hazptr slot */
> +struct hazptr_reader_tree {
> + spinlock_t lock;
> + struct rb_root root;
> +};
> +
> +static void init_hazptr_reader_tree(struct hazptr_reader_tree *tree)
> +{
> + spin_lock_init(&tree->lock);
> + tree->root = RB_ROOT;
> +}
> +
> +static bool is_null_or_unused(hazptr_t slot)
> +{
> + return slot == NULL || ((unsigned long)slot) == HAZPTR_UNUSED;
> +}
> +
> +static int hazptr_node_cmp(const void *key, const struct rb_node *curr)
> +{
> + unsigned long slot = (unsigned long)key;
> + struct hazptr_slot_snap *curr_snap = container_of(curr, struct hazptr_slot_snap, node);
> + unsigned long curr_slot = (unsigned long)(curr_snap->slot);
> +
> + if (slot < curr_slot)
> + return -1;
> + else if (slot > curr_slot)
> + return 1;
> + else
> + return 0;
> +}
> +
> +static bool hazptr_node_less(struct rb_node *new, const struct rb_node *curr)
> +{
> + struct hazptr_slot_snap *new_snap = container_of(new, struct hazptr_slot_snap, node);
> +
> + return hazptr_node_cmp((void *)new_snap->slot, curr) < 0;
> +}
> +
> +/* Add the snapshot into a search tree. tree->lock must be held. */
> +static inline void reader_add_locked(struct hazptr_reader_tree *tree,
> +     struct hazptr_slot_snap *snap)
> +{
> + lockdep_assert_held(&tree->lock);
> + BUG_ON(is_null_or_unused(snap->slot));
> +
> + rb_add(&snap->node, &tree->root, hazptr_node_less);
> +}
> +
> +static inline void reader_add(struct hazptr_reader_tree *tree,
> +      struct hazptr_slot_snap *snap)
> +{
> + guard(spinlock_irqsave)(&tree->lock);
> +
> + reader_add_locked(tree, snap);
> +}
> +
> +/* Delete the snapshot from a search tree. tree->lock must be held. */
> +static inline void reader_del_locked(struct hazptr_reader_tree *tree,
> +     struct hazptr_slot_snap *snap)
> +{
> + lockdep_assert_held(&tree->lock);
> +
> + rb_erase(&snap->node, &tree->root);
> +}
> +
> +static inline void reader_del(struct hazptr_reader_tree *tree,
> +      struct hazptr_slot_snap *snap)
> +{
> + guard(spinlock_irqsave)(&tree->lock);
> +
> + reader_del_locked(tree, snap);
> +}
> +
> +/* Find whether a read exists. tree->lock must be held. */
> +static inline bool reader_exist_locked(struct hazptr_reader_tree *tree,
> +       unsigned long slot)
> +{
> + lockdep_assert_held(&tree->lock);
> +
> + return !!rb_find((void *)slot, &tree->root, hazptr_node_cmp);
> +}
> +
> +static inline bool reader_exist(struct hazptr_reader_tree *tree,
> + unsigned long slot)
> +{
> + guard(spinlock_irqsave)(&tree->lock);
> +
> + return reader_exist_locked(tree, slot);
> +}
> +
> +/*
> + * Scan the readers from one hazptr context and update the global readers tree.
> + *
> + * Must be called with hzcp->lock held.
> + */
> +static void hazptr_context_snap_readers_locked(struct hazptr_reader_tree *tree,
> +       struct hazptr_context *hzcp)
> +{
> + lockdep_assert_held(hzcp->lock);
> +
> + for (int i = 0; i < HAZPTR_SLOT_PER_CTX; i++) {
> + /*
> + * Pairs with smp_store_release() in hazptr_{clear,free}().
> + *
> + * Ensure
> + *
> + * <reader> <updater>
> + *
> + * [access protected pointers]
> + * hazptr_clear();
> + *   smp_store_release()
> + * // in reader scan.
> + * smp_load_acquire(); // is null or unused.
> + * [run callbacks] // all accesses from
> + * // reader must be
> + * // observed.
> + */
> + hazptr_t val = smp_load_acquire(&hzcp->slots[i]);
> +
> + if (!is_null_or_unused(val)) {
> + struct hazptr_slot_snap *snap = &hzcp->snaps[i];
> +
> + // Already in the tree, need to remove first.
> + if (!is_null_or_unused(snap->slot)) {
> + reader_del(tree, snap);
> + }
> + snap->slot = val;
> + reader_add(tree, snap);
> + }
> + }
> +}
> +
> +/*
> + * Initialize a hazptr context.
> + *
> + * Must be called before using the context for hazptr allocation.
> + */
> +void init_hazptr_context(struct hazptr_context *hzcp)
> +{
> + struct hazptr_percpu *pcpu = this_cpu_ptr(&hzpcpu);
> +
> + for (int i = 0; i < HAZPTR_SLOT_PER_CTX; i++) {
> + hzcp->slots[i] = (hazptr_t)HAZPTR_UNUSED;
> + hzcp->snaps[i].slot = (hazptr_t)HAZPTR_UNUSED;
> + }
> +
> + guard(spinlock_irqsave)(&pcpu->ctx_lock);
> + list_add(&hzcp->list, &pcpu->ctx_list);
> + hzcp->lock = &pcpu->ctx_lock;
> +}
> +
> +struct hazptr_struct {
> + struct work_struct work;
> + bool scheduled;
> +
> + // list of callbacks, we move percpu queued callbacks into the global
> + // queued list in workqueue function.
> + spinlock_t callback_lock;
> + struct callback_head *queued;
> + struct callback_head **tail;
> +
> + struct hazptr_reader_tree readers;
> +};
> +
> +static struct hazptr_struct hazptr_struct;
> +
> +/*
> + * Clean up hazptr context.
> + *
> + * Must call before freeing the context. This function also removes any
> + * reference held by the hazard pointer slots in the context, even
> + * hazptr_clear() or hazptr_free() is not called previously.
> + */
> +void cleanup_hazptr_context(struct hazptr_context *hzcp)
> +{
> + if (hzcp->lock) {
> + scoped_guard(spinlock_irqsave, hzcp->lock) {
> + list_del(&hzcp->list);
> + hzcp->lock = NULL;
> + }
> +
> + for (int i = 0; i < HAZPTR_SLOT_PER_CTX; i++) {
> + struct hazptr_slot_snap *snap = &hzcp->snaps[i];
> +
> + if (!is_null_or_unused(snap->slot))
> + reader_del(&hazptr_struct.readers, snap);
> + }
> + }
> +}
> +
> +/*
> + * Allocate a hazptr slot from a hazptr_context.
> + *
> + * Return: NULL means fail to allocate, otherwise the address of the allocated
> + * slot.
> + */
> +hazptr_t *hazptr_alloc(struct hazptr_context *hzcp)
> +{
> + unsigned long unused;
> +
> + for (int i = 0; i < HAZPTR_SLOT_PER_CTX; i++) {
> + if (((unsigned long)READ_ONCE(hzcp->slots[i])) == HAZPTR_UNUSED) {
> + unused = HAZPTR_UNUSED;
> +
> + /*
> + * rwm-sequence is relied on here.
> + *
> + * This is needed since in case of a previous reader:
> + *
> + * <reader 1> <reader 2> <updater>
> + * [access protected pointers]
> + * hazptr_free():
> + *   smp_store_release(); // hzptr == UNUSED
> + * hazptr_alloc():
> + *  try_cmpxchg_relaxed(); // hzptr == NULL
> + *
> + * // in reader scan
> + * smp_load_acquire(); // is null
> + * [run callbacks]
> + *
> + * Because of the rwm-sequence of release operations,
> + * when callbacks are run, accesses from reader 1 must
> + * be already observed by the updater.
> + */
> + if (try_cmpxchg_relaxed(&hzcp->slots[i], &unused, NULL)) {
> + return (hazptr_t *)&hzcp->slots[i];
> + }
> + }
> + }
> +
> + return NULL;
> +}
> +
> +/* Free a hazptr slot. */
> +void hazptr_free(struct hazptr_context *hzcp, hazptr_t *hzptr)
> +{
> + WARN_ON(((unsigned long)*hzptr) == HAZPTR_UNUSED);
> +
> + /* Pairs with smp_load_acquire() in reader scan */
> + smp_store_release(hzptr, (void *)HAZPTR_UNUSED);
> +}
> +
> +/* Scan all possible readers, and update the global reader tree. */
> +static void check_readers(struct hazptr_struct *hzst)
> +{
> + int cpu;
> +
> + for_each_possible_cpu(cpu) {
> + struct hazptr_percpu *pcpu = per_cpu_ptr(&hzpcpu, cpu);
> + struct hazptr_context *ctx;
> +
> + guard(spinlock_irqsave)(&pcpu->ctx_lock);
> + list_for_each_entry(ctx, &pcpu->ctx_list, list)
> + hazptr_context_snap_readers_locked(&hzst->readers, ctx);
> + }
> +}
> +
> +/*
> + * Start the background work for hazptr callback handling if not started.
> + *
> + * Must be called with hazptr_struct lock held.
> + */
> +static void kick_hazptr_work(void)
> +{
> + if (hazptr_struct.scheduled)
> + return;
> +
> + queue_work(system_wq, &hazptr_struct.work);
> + hazptr_struct.scheduled = true;
> +}
> +
> +/*
> + * Check which callbacks are ready to be called.
> + *
> + * Return: a callback list that no reader is referencing the corresponding
> + * objects.
> + */
> +static struct callback_head *do_hazptr(struct hazptr_struct *hzst)
> +{
> + struct callback_head *tmp, **curr;
> + struct callback_head *todo = NULL, **todo_tail = &todo;
> +
> + // Currently, the lock is unnecessary, but maybe we will have multiple
> + // work_structs sharing the same callback list in the future;
> + guard(spinlock_irqsave)(&hzst->callback_lock);
> +
> + curr = &hzst->queued;
> +
> + while ((tmp = *curr)) {
> + // No reader, we can free the object.
> + if (!reader_exist(&hzst->readers, (unsigned long)tmp)) {
> + // Add tmp into todo.
> + *todo_tail = tmp;
> + todo_tail = &tmp->next;
> +
> + // Delete tmp from ->queued and move to the next entry.
> + *curr = tmp->next;
> + } else
> + curr = &tmp->next;
> + }
> +
> + hzst->tail = curr;
> +
> + // Keep checking if callback list is not empty.
> + if (hzst->queued)
> + kick_hazptr_work();
> +
> + *todo_tail = NULL;
> +
> + return todo;
> +}
> +
> +static void hazptr_work_func(struct work_struct *work)
> +{
> + struct hazptr_struct *hzst = container_of(work, struct hazptr_struct, work);
> + struct callback_head *todo;
> +
> + // Advance callbacks from hzpcpu to hzst
> + scoped_guard(spinlock_irqsave, &hzst->callback_lock) {
> + int cpu;
> +
> + hzst->scheduled = false;
> + for_each_possible_cpu(cpu) {
> + struct hazptr_percpu *pcpu = per_cpu_ptr(&hzpcpu, cpu);
> +
> + guard(spinlock)(&pcpu->callback_lock);
> +
> + if (pcpu->queued) {
> + *(hzst->tail) = pcpu->queued;
> + hzst->tail = pcpu->tail;
> + pcpu->queued = NULL;
> + pcpu->tail = &pcpu->queued;
> + }
> + }
> + }
> +
> + // Pairs with the smp_mb() on the reader side. This guarantees that if
> + // the hazptr work picked up the callback from an updater and the
> + // updater set the global pointer to NULL before enqueue the callback,
> + // the hazptr work must observe a reader that protects the hazptr before
> + // the updater.
> + //
> + // <reader> <updater> <hazptr work>
> + // ptr = READ_ONCE(gp);
> + // WRITE_ONCE(*hazptr, ptr);
> + // smp_mb(); // => ->strong-fence
> + // tofree = gp;
> + // ptr = READ_ONCE(gp); // re-read, gp is not NULL
> + // // => ->fre
> + // WRITE_ONCE(gp, NULL);
> + // call_hazptr(gp, ...):
> + //  lock(->callback_lock);
> + //  [queued the callback]
> + //  unlock(->callback_lock);// => ->po-unlock-lock-po
> + // lock(->callback_lock);
> + // [move from hzpcpu to hzst]
> + //
> + // smp_mb(); => ->strong-fence
> + // ptr = READ_ONCE(*hazptr);
> + // // ptr == gp, otherwise => ->fre
> + //
> + // If ptr != gp, it means there exists a circle with the following
> + // memory ordering relationships:
> + // ->strong-fence ->fre ->po-unlock-lock-po ->strong-fence ->fre
> + // => (due to the definition of prop)
> + // ->strong-fence ->prop ->strong-fence ->hb ->prop
> + // => (rotate the circle)
> + // ->prop ->strong-fence ->prop ->strong-fence ->hb
> + // => (due to the definition of pb)
> + // ->pb ->pb
> + // but pb is acyclic according to LKMM, so this cannot happen.
> + smp_mb();
> + check_readers(hzst);
> +
> + todo = do_hazptr(hzst);
> +
> + while (todo) {
> + struct callback_head *next = todo->next;
> + void (*func)(struct callback_head *) = todo->func;
> +
> + func(todo);
> +
> + todo = next;
> + }
> +}
> +
> +/*
> + * Put @head into the cleanup callback queue.
> + *
> + * @func(@head) will be called after no one is referencing the object. Caller
> + * must ensure the object has been unpublished before calling this.
> + */
> +void call_hazptr(struct callback_head *head, rcu_callback_t func)
> +{
> + struct hazptr_percpu *pcpu = this_cpu_ptr(&hzpcpu);
> +
> + head->func = func;
> + head->next = NULL;
> +
> + scoped_guard(spinlock_irqsave, &pcpu->callback_lock) {
> + *(pcpu->tail) = head;
> + pcpu->tail = &head->next;
> + }
> +
> + guard(spinlock_irqsave)(&hazptr_struct.callback_lock);
> + kick_hazptr_work();
> +}
> +
> +static int init_hazptr_struct(void)
> +{
> + int cpu;
> +
> + INIT_WORK(&hazptr_struct.work, hazptr_work_func);
> +
> + spin_lock_init(&hazptr_struct.callback_lock);
> + hazptr_struct.queued = NULL;
> + hazptr_struct.tail = &hazptr_struct.queued;
> +
> + for_each_possible_cpu(cpu) {
> + struct hazptr_percpu *pcpu = per_cpu_ptr(&hzpcpu, cpu);
> +
> + spin_lock_init(&pcpu->ctx_lock);
> + INIT_LIST_HEAD(&pcpu->ctx_list);
> +
> + spin_lock_init(&pcpu->callback_lock);
> + pcpu->queued = NULL;
> + pcpu->tail = &pcpu->queued;
> +
> + }
> +
> + init_hazptr_reader_tree(&hazptr_struct.readers);
> +
> + return 0;
> +}
> +
> +early_initcall(init_hazptr_struct);
> -- 
> 2.45.2
> 
> 






[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux