[RFC PATCH 1/4] hazptr: Add initial implementation of hazard pointers

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hazard pointers [1] provide a way to dynamically distribute refcounting
and can be used to improve the scalability of refcounting without
significant space cost.

Hazard pointers are similar to RCU: they build the synchronization
between two part, readers and updaters. Readers are refcount users, they
acquire and release refcount. Updaters cleanup objects when there are no
reader referencing them (via call_hazptr()). The difference is instead
of waiting for a grace period, hazard pointers can free an object as
long as there is no one referencing the object. This means for a
particular workload, hazard pointers may have smaller memory footprint
due to fewer pending callbacks.

The synchronization between readers and updaters is built around "hazard
pointer slots": a slot readers can use to store a pointer value.

Reader side protection:

1.	Read the value of a pointer to the target data element.
2.	Store it to a hazard pointer slot.
3.	Enforce full ordering (e.g. smp_mb()).
4.	Re-read the original pointer, reset the slot and retry if the
	value changed.
5.	Otherwise, the continued existence of the target data element
	is guaranteed.

Updater side check:

1.	Unpublish the target data element (e.g. setting the pointer
	value to NULL).
2.	Enforce full ordering.
3.	Read the value from a hazard pointer slot.
4.	If the value doesn't match the target data element, then this
	slot doesn't represent a reference to it.
5.	Otherwise, updater needs to re-check (step 3).

To distribute the accesses of hazptr slots from different contexts,
hazptr_context is introduced. Users need to define/allocate their own
hazptr_context to allocate hazard pointer slots.

For the updater side to confirm no existing reference, it needs to scan
all the possible slots, and to speed up this process, hazptr_context
also contains an rbtree node for each slot so that updater can cache the
reader-scan result in an rbtree. The rbtree nodes are pre-allocated in
this way to prevent "allocate memory to free memory" in extreme cases.

[1]: M. M. Michael, "Hazard pointers: safe memory reclamation for
     lock-free objects," in IEEE Transactions on Parallel and
     Distributed Systems, vol. 15, no. 6, pp. 491-504, June 2004

Co-developed-by: Paul E. McKenney <paulmck@xxxxxxxxxx>
Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxx>
Co-developed-by: Neeraj Upadhyay <neeraj.upadhyay@xxxxxxx>
Signed-off-by: Neeraj Upadhyay <neeraj.upadhyay@xxxxxxx>
Signed-off-by: Boqun Feng <boqun.feng@xxxxxxxxx>
---
 include/linux/hazptr.h |  83 ++++++++
 kernel/Makefile        |   1 +
 kernel/hazptr.c        | 463 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 547 insertions(+)
 create mode 100644 include/linux/hazptr.h
 create mode 100644 kernel/hazptr.c

diff --git a/include/linux/hazptr.h b/include/linux/hazptr.h
new file mode 100644
index 000000000000..4548ca8c75eb
--- /dev/null
+++ b/include/linux/hazptr.h
@@ -0,0 +1,83 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _LINUX_HAZPTR_H
+#define _LINUX_HAZPTR_H
+
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/rbtree.h>
+
+/* Hazard pointer slot. */
+typedef void* hazptr_t;
+
+#define HAZPTR_SLOT_PER_CTX 8
+
+struct hazptr_slot_snap {
+	struct rb_node node;
+	hazptr_t slot;
+};
+
+/*
+ * A set of hazard pointer slots for a context.
+ *
+ * The context can be a task, CPU or reader (depends on the use case). It may
+ * be allocated statically or dynamically. It can only be used after calling
+ * init_hazptr_context(), and users are also responsible to call
+ * cleaup_hazptr_context() when it's not used any more.
+ */
+struct hazptr_context {
+	// The lock of the percpu context lists.
+	spinlock_t *lock;
+
+	struct list_head list;
+	struct hazptr_slot_snap snaps[HAZPTR_SLOT_PER_CTX];
+	____cacheline_aligned hazptr_t slots[HAZPTR_SLOT_PER_CTX];
+};
+
+void init_hazptr_context(struct hazptr_context *hzcp);
+void cleanup_hazptr_context(struct hazptr_context *hzcp);
+hazptr_t *hazptr_alloc(struct hazptr_context *hzcp);
+void hazptr_free(struct hazptr_context *hzcp, hazptr_t *hzp);
+
+#define hazptr_tryprotect(hzp, gp, field)	(typeof(gp))__hazptr_tryprotect(hzp, (void **)&(gp), offsetof(typeof(*gp), field))
+#define hazptr_protect(hzp, gp, field) ({				\
+	typeof(gp) ___p;						\
+									\
+	___p = hazptr_tryprotect(hzp, gp, field);			\
+	BUG_ON(!___p);							\
+	___p;								\
+})
+
+static inline void *__hazptr_tryprotect(hazptr_t *hzp,
+					void *const *p,
+					unsigned long head_offset)
+{
+	void *ptr;
+	struct callback_head *head;
+
+	ptr = READ_ONCE(*p);
+
+	if (ptr == NULL)
+		return NULL;
+
+	head = (struct callback_head *)(ptr + head_offset);
+
+	WRITE_ONCE(*hzp, head);
+	smp_mb();
+
+	ptr = READ_ONCE(*p); // read again
+
+	if (ptr + head_offset != head) { // pointer changed
+		WRITE_ONCE(*hzp, NULL);  // reset hazard pointer
+		return NULL;
+	} else
+		return ptr;
+}
+
+static inline void hazptr_clear(hazptr_t *hzp)
+{
+	/* Pairs with smp_load_acquire() in reader scan. */
+	smp_store_release(hzp, NULL);
+}
+
+void call_hazptr(struct callback_head *head, rcu_callback_t func);
+#endif
diff --git a/kernel/Makefile b/kernel/Makefile
index 3c13240dfc9f..7927264b9870 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -50,6 +50,7 @@ obj-y += rcu/
 obj-y += livepatch/
 obj-y += dma/
 obj-y += entry/
+obj-y += hazptr.o
 obj-$(CONFIG_MODULES) += module/
 
 obj-$(CONFIG_KCMP) += kcmp.o
diff --git a/kernel/hazptr.c b/kernel/hazptr.c
new file mode 100644
index 000000000000..f22ccc2a4a62
--- /dev/null
+++ b/kernel/hazptr.c
@@ -0,0 +1,463 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#include <linux/spinlock.h>
+#include <linux/cleanup.h>
+#include <linux/hazptr.h>
+#include <linux/percpu.h>
+#include <linux/workqueue.h>
+
+#define HAZPTR_UNUSED (1ul)
+
+/* Per-CPU data for hazard pointer management. */
+struct hazptr_percpu {
+	spinlock_t ctx_lock;		/* hazptr context list lock */
+	struct list_head ctx_list;	/* hazptr context list */
+	spinlock_t callback_lock;	/* Per-CPU callback queue lock */
+	struct callback_head *queued;	/* Per-CPU callback queue */
+	struct callback_head **tail;
+};
+
+/*
+ * Per-CPU data contains context lists and callbacks, which are maintained in a
+ * per-CPU maner to reduce potential contention. This means a global scan (for
+ * readers or callbacks) has to take each CPU's lock, but it's less problematic
+ * because that's a slowpath.
+ */
+DEFINE_PER_CPU(struct hazptr_percpu, hzpcpu);
+
+/* A RBTree that stores the reader scan result of all hazptr slot */
+struct hazptr_reader_tree {
+	spinlock_t lock;
+	struct rb_root root;
+};
+
+static void init_hazptr_reader_tree(struct hazptr_reader_tree *tree)
+{
+	spin_lock_init(&tree->lock);
+	tree->root = RB_ROOT;
+}
+
+static bool is_null_or_unused(hazptr_t slot)
+{
+	return slot == NULL || ((unsigned long)slot) == HAZPTR_UNUSED;
+}
+
+static int hazptr_node_cmp(const void *key, const struct rb_node *curr)
+{
+	unsigned long slot = (unsigned long)key;
+	struct hazptr_slot_snap *curr_snap = container_of(curr, struct hazptr_slot_snap, node);
+	unsigned long curr_slot = (unsigned long)(curr_snap->slot);
+
+	if (slot < curr_slot)
+		return -1;
+	else if (slot > curr_slot)
+		return 1;
+	else
+		return 0;
+}
+
+static bool hazptr_node_less(struct rb_node *new, const struct rb_node *curr)
+{
+	struct hazptr_slot_snap *new_snap = container_of(new, struct hazptr_slot_snap, node);
+
+	return hazptr_node_cmp((void *)new_snap->slot, curr) < 0;
+}
+
+/* Add the snapshot into a search tree. tree->lock must be held. */
+static inline void reader_add_locked(struct hazptr_reader_tree *tree,
+				     struct hazptr_slot_snap *snap)
+{
+	lockdep_assert_held(&tree->lock);
+	BUG_ON(is_null_or_unused(snap->slot));
+
+	rb_add(&snap->node, &tree->root, hazptr_node_less);
+}
+
+static inline void reader_add(struct hazptr_reader_tree *tree,
+			      struct hazptr_slot_snap *snap)
+{
+	guard(spinlock_irqsave)(&tree->lock);
+
+	reader_add_locked(tree, snap);
+}
+
+/* Delete the snapshot from a search tree. tree->lock must be held. */
+static inline void reader_del_locked(struct hazptr_reader_tree *tree,
+				     struct hazptr_slot_snap *snap)
+{
+	lockdep_assert_held(&tree->lock);
+
+	rb_erase(&snap->node, &tree->root);
+}
+
+static inline void reader_del(struct hazptr_reader_tree *tree,
+			      struct hazptr_slot_snap *snap)
+{
+	guard(spinlock_irqsave)(&tree->lock);
+
+	reader_del_locked(tree, snap);
+}
+
+/* Find whether a read exists. tree->lock must be held. */
+static inline bool reader_exist_locked(struct hazptr_reader_tree *tree,
+				       unsigned long slot)
+{
+	lockdep_assert_held(&tree->lock);
+
+	return !!rb_find((void *)slot, &tree->root, hazptr_node_cmp);
+}
+
+static inline bool reader_exist(struct hazptr_reader_tree *tree,
+				unsigned long slot)
+{
+	guard(spinlock_irqsave)(&tree->lock);
+
+	return reader_exist_locked(tree, slot);
+}
+
+/*
+ * Scan the readers from one hazptr context and update the global readers tree.
+ *
+ * Must be called with hzcp->lock held.
+ */
+static void hazptr_context_snap_readers_locked(struct hazptr_reader_tree *tree,
+					       struct hazptr_context *hzcp)
+{
+	lockdep_assert_held(hzcp->lock);
+
+	for (int i = 0; i < HAZPTR_SLOT_PER_CTX; i++) {
+		/*
+		 * Pairs with smp_store_release() in hazptr_{clear,free}().
+		 *
+		 * Ensure
+		 *
+		 * <reader>		<updater>
+		 *
+		 * [access protected pointers]
+		 * hazptr_clear();
+		 *   smp_store_release()
+		 *			// in reader scan.
+		 *			smp_load_acquire(); // is null or unused.
+		 *			[run callbacks] // all accesses from
+		 *					// reader must be
+		 *					// observed.
+		 */
+		hazptr_t val = smp_load_acquire(&hzcp->slots[i]);
+
+		if (!is_null_or_unused(val)) {
+			struct hazptr_slot_snap *snap = &hzcp->snaps[i];
+
+			// Already in the tree, need to remove first.
+			if (!is_null_or_unused(snap->slot)) {
+				reader_del(tree, snap);
+			}
+			snap->slot = val;
+			reader_add(tree, snap);
+		}
+	}
+}
+
+/*
+ * Initialize a hazptr context.
+ *
+ * Must be called before using the context for hazptr allocation.
+ */
+void init_hazptr_context(struct hazptr_context *hzcp)
+{
+	struct hazptr_percpu *pcpu = this_cpu_ptr(&hzpcpu);
+
+	for (int i = 0; i < HAZPTR_SLOT_PER_CTX; i++) {
+		hzcp->slots[i] = (hazptr_t)HAZPTR_UNUSED;
+		hzcp->snaps[i].slot = (hazptr_t)HAZPTR_UNUSED;
+	}
+
+	guard(spinlock_irqsave)(&pcpu->ctx_lock);
+	list_add(&hzcp->list, &pcpu->ctx_list);
+	hzcp->lock = &pcpu->ctx_lock;
+}
+
+struct hazptr_struct {
+	struct work_struct work;
+	bool scheduled;
+
+	// list of callbacks, we move percpu queued callbacks into the global
+	// queued list in workqueue function.
+	spinlock_t callback_lock;
+	struct callback_head *queued;
+	struct callback_head **tail;
+
+	struct hazptr_reader_tree readers;
+};
+
+static struct hazptr_struct hazptr_struct;
+
+/*
+ * Clean up hazptr context.
+ *
+ * Must call before freeing the context. This function also removes any
+ * reference held by the hazard pointer slots in the context, even
+ * hazptr_clear() or hazptr_free() is not called previously.
+ */
+void cleanup_hazptr_context(struct hazptr_context *hzcp)
+{
+	if (hzcp->lock) {
+		scoped_guard(spinlock_irqsave, hzcp->lock) {
+			list_del(&hzcp->list);
+			hzcp->lock = NULL;
+		}
+
+		for (int i = 0; i < HAZPTR_SLOT_PER_CTX; i++) {
+			struct hazptr_slot_snap *snap = &hzcp->snaps[i];
+
+			if (!is_null_or_unused(snap->slot))
+				reader_del(&hazptr_struct.readers, snap);
+		}
+	}
+}
+
+/*
+ * Allocate a hazptr slot from a hazptr_context.
+ *
+ * Return: NULL means fail to allocate, otherwise the address of the allocated
+ * slot.
+ */
+hazptr_t *hazptr_alloc(struct hazptr_context *hzcp)
+{
+	unsigned long unused;
+
+	for (int i = 0; i < HAZPTR_SLOT_PER_CTX; i++) {
+		if (((unsigned long)READ_ONCE(hzcp->slots[i])) == HAZPTR_UNUSED) {
+			unused = HAZPTR_UNUSED;
+
+			/*
+			 * rwm-sequence is relied on here.
+			 *
+			 * This is needed since in case of a previous reader:
+			 *
+			 * <reader 1>		<reader 2>		<updater>
+			 * [access protected pointers]
+			 * hazptr_free():
+			 *   smp_store_release(); // hzptr == UNUSED
+			 *			hazptr_alloc():
+			 *			  try_cmpxchg_relaxed(); // hzptr == NULL
+			 *
+			 *						// in reader scan
+			 *						smp_load_acquire(); // is null
+			 *						[run callbacks]
+			 *
+			 * Because of the rwm-sequence of release operations,
+			 * when callbacks are run, accesses from reader 1 must
+			 * be already observed by the updater.
+			 */
+			if (try_cmpxchg_relaxed(&hzcp->slots[i], &unused, NULL)) {
+				return (hazptr_t *)&hzcp->slots[i];
+			}
+		}
+	}
+
+	return NULL;
+}
+
+/* Free a hazptr slot. */
+void hazptr_free(struct hazptr_context *hzcp, hazptr_t *hzptr)
+{
+	WARN_ON(((unsigned long)*hzptr) == HAZPTR_UNUSED);
+
+	/* Pairs with smp_load_acquire() in reader scan */
+	smp_store_release(hzptr, (void *)HAZPTR_UNUSED);
+}
+
+/* Scan all possible readers, and update the global reader tree. */
+static void check_readers(struct hazptr_struct *hzst)
+{
+	int cpu;
+
+	for_each_possible_cpu(cpu) {
+		struct hazptr_percpu *pcpu = per_cpu_ptr(&hzpcpu, cpu);
+		struct hazptr_context *ctx;
+
+		guard(spinlock_irqsave)(&pcpu->ctx_lock);
+		list_for_each_entry(ctx, &pcpu->ctx_list, list)
+			hazptr_context_snap_readers_locked(&hzst->readers, ctx);
+	}
+}
+
+/*
+ * Start the background work for hazptr callback handling if not started.
+ *
+ * Must be called with hazptr_struct lock held.
+ */
+static void kick_hazptr_work(void)
+{
+	if (hazptr_struct.scheduled)
+		return;
+
+	queue_work(system_wq, &hazptr_struct.work);
+	hazptr_struct.scheduled = true;
+}
+
+/*
+ * Check which callbacks are ready to be called.
+ *
+ * Return: a callback list that no reader is referencing the corresponding
+ * objects.
+ */
+static struct callback_head *do_hazptr(struct hazptr_struct *hzst)
+{
+	struct callback_head *tmp, **curr;
+	struct callback_head *todo = NULL, **todo_tail = &todo;
+
+	// Currently, the lock is unnecessary, but maybe we will have multiple
+	// work_structs sharing the same callback list in the future;
+	guard(spinlock_irqsave)(&hzst->callback_lock);
+
+	curr = &hzst->queued;
+
+	while ((tmp = *curr)) {
+		// No reader, we can free the object.
+		if (!reader_exist(&hzst->readers, (unsigned long)tmp)) {
+			// Add tmp into todo.
+			*todo_tail = tmp;
+			todo_tail = &tmp->next;
+
+			// Delete tmp from ->queued and move to the next entry.
+			*curr = tmp->next;
+		} else
+			curr = &tmp->next;
+	}
+
+	hzst->tail = curr;
+
+	// Keep checking if callback list is not empty.
+	if (hzst->queued)
+		kick_hazptr_work();
+
+	*todo_tail = NULL;
+
+	return todo;
+}
+
+static void hazptr_work_func(struct work_struct *work)
+{
+	struct hazptr_struct *hzst = container_of(work, struct hazptr_struct, work);
+	struct callback_head *todo;
+
+	// Advance callbacks from hzpcpu to hzst
+	scoped_guard(spinlock_irqsave, &hzst->callback_lock) {
+		int cpu;
+
+		hzst->scheduled = false;
+		for_each_possible_cpu(cpu) {
+			struct hazptr_percpu *pcpu = per_cpu_ptr(&hzpcpu, cpu);
+
+			guard(spinlock)(&pcpu->callback_lock);
+
+			if (pcpu->queued) {
+				*(hzst->tail) = pcpu->queued;
+				hzst->tail = pcpu->tail;
+				pcpu->queued = NULL;
+				pcpu->tail = &pcpu->queued;
+			}
+		}
+	}
+
+	// Pairs with the smp_mb() on the reader side. This guarantees that if
+	// the hazptr work picked up the callback from an updater and the
+	// updater set the global pointer to NULL before enqueue the callback,
+	// the hazptr work must observe a reader that protects the hazptr before
+	// the updater.
+	//
+	// <reader>		<updater>		<hazptr work>
+	// ptr = READ_ONCE(gp);
+	// WRITE_ONCE(*hazptr, ptr);
+	// smp_mb(); // => ->strong-fence
+	//			tofree = gp;
+	// ptr = READ_ONCE(gp); // re-read, gp is not NULL
+	//			// => ->fre
+	//			WRITE_ONCE(gp, NULL);
+	//			call_hazptr(gp, ...):
+	//			  lock(->callback_lock);
+	//			  [queued the callback]
+	//			  unlock(->callback_lock);// => ->po-unlock-lock-po
+	//						lock(->callback_lock);
+	//						[move from hzpcpu to hzst]
+	//
+	//						smp_mb(); => ->strong-fence
+	//						ptr = READ_ONCE(*hazptr);
+	//						// ptr == gp, otherwise => ->fre
+	//
+	// If ptr != gp, it means there exists a circle with the following
+	// memory ordering relationships:
+	//	->strong-fence ->fre ->po-unlock-lock-po ->strong-fence ->fre
+	// => (due to the definition of prop)
+	//	->strong-fence ->prop ->strong-fence ->hb ->prop
+	// => (rotate the circle)
+	//	->prop ->strong-fence ->prop ->strong-fence ->hb
+	// => (due to the definition of pb)
+	//	->pb ->pb
+	// but pb is acyclic according to LKMM, so this cannot happen.
+	smp_mb();
+	check_readers(hzst);
+
+	todo = do_hazptr(hzst);
+
+	while (todo) {
+		struct callback_head *next = todo->next;
+		void (*func)(struct callback_head *) = todo->func;
+
+		func(todo);
+
+		todo = next;
+	}
+}
+
+/*
+ * Put @head into the cleanup callback queue.
+ *
+ * @func(@head) will be called after no one is referencing the object. Caller
+ * must ensure the object has been unpublished before calling this.
+ */
+void call_hazptr(struct callback_head *head, rcu_callback_t func)
+{
+	struct hazptr_percpu *pcpu = this_cpu_ptr(&hzpcpu);
+
+	head->func = func;
+	head->next = NULL;
+
+	scoped_guard(spinlock_irqsave, &pcpu->callback_lock) {
+		*(pcpu->tail) = head;
+		pcpu->tail = &head->next;
+	}
+
+	guard(spinlock_irqsave)(&hazptr_struct.callback_lock);
+	kick_hazptr_work();
+}
+
+static int init_hazptr_struct(void)
+{
+	int cpu;
+
+	INIT_WORK(&hazptr_struct.work, hazptr_work_func);
+
+	spin_lock_init(&hazptr_struct.callback_lock);
+	hazptr_struct.queued = NULL;
+	hazptr_struct.tail = &hazptr_struct.queued;
+
+	for_each_possible_cpu(cpu) {
+		struct hazptr_percpu *pcpu = per_cpu_ptr(&hzpcpu, cpu);
+
+		spin_lock_init(&pcpu->ctx_lock);
+		INIT_LIST_HEAD(&pcpu->ctx_list);
+
+		spin_lock_init(&pcpu->callback_lock);
+		pcpu->queued = NULL;
+		pcpu->tail = &pcpu->queued;
+
+	}
+
+	init_hazptr_reader_tree(&hazptr_struct.readers);
+
+	return 0;
+}
+
+early_initcall(init_hazptr_struct);
-- 
2.45.2





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux