Re: [RFC PATCH 1/1] hpref: Hazard Pointers with Reference Counter

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Mathieu,

interesting idea. Conceptually it looks good.

There's another approach of using hazard pointer to optimize shared reference counting (to make it lock-free in common cases).

https://github.com/cmuparlay/concurrent_deferred_rc

It doesn't go as far as what you're doing, but they also use the hazard pointer to protect the refcount (like you do in the promote function).

I haven't read your code in detail but it seems to me you have an ABA bug: as I explained elsewhere, you could read the same pointer after ABA but you don't synchronize with the newer store that gave you node2, leaving you to speculatively read stale values through *ctx->hp. (I am assuming here that ctx->hp is essentially an out parameter used to let the caller know which node got protected).

Have fun,
  jonas

+/*
+ * hpref_hp_get: Obtain a reference to a stable object, protected either
+ *               by hazard pointer (fast-path) or using reference
+ *               counter as fall-back.
+ */
+static inline
+bool hpref_hp_get(struct hpref_node **node_p, struct hpref_ctx *ctx)
+{
+	int cpu = rseq_current_cpu_raw();
+	struct hpref_percpu_slots *cpu_slots = rseq_percpu_ptr(hpref_percpu_slots, cpu);
+	struct hpref_slot *slot = &cpu_slots->slots[cpu_slots->current_slot];
+	bool use_refcount = false;
+	struct hpref_node *node, *node2;
+	unsigned int next_slot;
+
+retry:
+	node = uatomic_load(node_p, CMM_RELAXED);
+	if (!node)
+		return false;
+	/* Use rseq to try setting current slot hp. Store B. */
+	if (rseq_load_cbne_store__ptr(RSEQ_MO_RELAXED, RSEQ_PERCPU_CPU_ID,
+				(intptr_t *) &slot->node, (intptr_t) NULL,
+				(intptr_t) node, cpu)) {
+		slot = &cpu_slots->slots[HPREF_EMERGENCY_SLOT];
+		use_refcount = true;
+		/*
+		 * This may busy-wait for another reader using the
+		 * emergency slot to transition to refcount.
+		 */
+		caa_cpu_relax();
+		goto retry;
+	}
+	/* Memory ordering: Store B before Load A. */
+	cmm_smp_mb();
+	node2 = uatomic_load(node_p, CMM_RELAXED);	/* Load A */
+	if (node != node2) {
+		uatomic_store(&slot->node, NULL, CMM_RELAXED);
+		if (!node2)
+			return false;
+		goto retry;
+	}
+	ctx->type = HPREF_TYPE_HP;
+	ctx->hp = node;
                   ^---- here

+	ctx->slot = slot;
+	if (use_refcount) {
+		hpref_promote_hp_to_ref(ctx);
+		return true;
+	}
+	/*
+	 * Increment current slot (racy increment is OK because it is
+	 * just a position hint). Skip the emergency slot.
+	 */
+	next_slot = uatomic_load(&cpu_slots->current_slot, CMM_RELAXED) + 1;
+	if (next_slot >= HPREF_EMERGENCY_SLOT)
+		next_slot = 0;
+	uatomic_store(&cpu_slots->current_slot, next_slot, CMM_RELAXED);
+	return true;
+}
+
+static inline
+void hpref_put(struct hpref_ctx *ctx)
+{
+	if (ctx->type == HPREF_TYPE_REF) {
+		urcu_ref_put(&ctx->hp->refcount, hpref_release);
+	} else {
+		/* Release HP. */
+		uatomic_store(&ctx->slot->node, NULL, CMM_RELEASE);
+	}
+	ctx->hp = NULL;
+}
+
+/*
+ * hpref_set_pointer: Store pointer @node to @ptr, with RCU publication
+ *                    guarantees.
+ */
+static inline
+void hpref_set_pointer(struct hpref_node **ptr, struct hpref_node *node)
+{
+	if (__builtin_constant_p(node) && node == NULL)
+		uatomic_store(ptr, NULL, CMM_RELAXED);
+	else
+		uatomic_store(ptr, node, CMM_RELEASE);
+}
+
+/*
+ * Return the content of the hpref context hazard pointer field.
+ */
+static inline
+struct hpref_node *hpref_ctx_pointer(struct hpref_ctx *ctx)
+{
+	return ctx->hp;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _URCU_HPREF_H */
diff --git a/src/Makefile.am b/src/Makefile.am
index b555c818..7312c9f7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -19,7 +19,8 @@ RCULFHASH = rculfhash.c rculfhash-mm-order.c rculfhash-mm-chunk.c \
  lib_LTLIBRARIES = liburcu-common.la \
  		liburcu.la liburcu-qsbr.la \
  		liburcu-mb.la liburcu-bp.la \
-		liburcu-memb.la liburcu-cds.la
+		liburcu-memb.la liburcu-cds.la \
+		liburcu-hpref.la
#
  # liburcu-common contains wait-free queues (needed by call_rcu) as well
@@ -50,6 +51,9 @@ liburcu_cds_la_SOURCES = rculfqueue.c rculfstack.c lfstack.c \
  	workqueue.c workqueue.h $(RCULFHASH) $(COMPAT)
  liburcu_cds_la_LIBADD = liburcu-common.la
+liburcu_hpref_la_SOURCES = hpref.c
+liburcu_hpref_la_LIBADD = -lrseq
+
  pkgconfigdir = $(libdir)/pkgconfig
  pkgconfig_DATA = liburcu-cds.pc liburcu.pc liburcu-bp.pc liburcu-qsbr.pc \
  	liburcu-mb.pc liburcu-memb.pc
diff --git a/src/hpref.c b/src/hpref.c
new file mode 100644
index 00000000..f63530f5
--- /dev/null
+++ b/src/hpref.c
@@ -0,0 +1,78 @@
+// SPDX-FileCopyrightText: 2024 Mathieu Desnoyers <mathieu.desnoyers@xxxxxxxxxxxx>
+//
+// SPDX-License-Identifier: LGPL-2.1-or-later
+
+/*
+ * HPREF: Hazard pointers with reference counters
+ */
+
+#define _LGPL_SOURCE
+#include <urcu/hpref.h>
+#include <rseq/mempool.h>	/* Per-CPU memory */
+
+static struct rseq_mempool *mempool;
+struct hpref_percpu_slots *hpref_percpu_slots;
+
+void hpref_release(struct urcu_ref *ref)
+{
+	struct hpref_node *node = caa_container_of(ref, struct hpref_node, refcount);
+
+	node->release(node);
+}
+
+/*
+ * hpref_synchronize: Wait for any reader possessing a hazard pointer to
+ *                    @node to clear its hazard pointer slot.
+ */
+void hpref_synchronize(struct hpref_node *node)
+{
+	int nr_cpus = rseq_get_max_nr_cpus(), cpu;
+
+	/* Memory ordering: Store A before Load B. */
+	cmm_smp_mb();
+	/* Scan all CPUs slots. */
+	for (cpu = 0; cpu < nr_cpus; cpu++) {
+		struct hpref_percpu_slots *cpu_slots = rseq_percpu_ptr(hpref_percpu_slots, cpu);
+		struct hpref_slot *slot;
+		unsigned int i;
+
+		for (i = 0; i < HPREF_NR_PERCPU_SLOTS; i++) {
+			slot = &cpu_slots->slots[i];
+			/* Busy-wait if node is found. */
+			while (uatomic_load(&slot->node, CMM_ACQUIRE) == node)	/* Load B */
+				caa_cpu_relax();
+		}
+	}
+}
+
+/*
+ * hpref_synchronize_put: Wait for any reader possessing a hazard
+ *                        pointer to clear its slot and put reference
+ *                        count.
+ */
+void hpref_synchronize_put(struct hpref_node *node)
+{
+	if (!node)
+		return;
+	hpref_synchronize(node);
+	urcu_ref_put(&node->refcount, hpref_release);
+}
+
+static __attribute__((constructor))
+void hpref_init(void)
+{
+	mempool = rseq_mempool_create("hpref", sizeof(struct hpref_percpu_slots), NULL);
+	if (!mempool)
+		abort();
+	hpref_percpu_slots = rseq_mempool_percpu_zmalloc(mempool);
+	if (!hpref_percpu_slots)
+		abort();
+}
+
+static __attribute__((destructor))
+void hpref_exit(void)
+{
+	rseq_mempool_percpu_free(hpref_percpu_slots);
+	if (rseq_mempool_destroy(mempool))
+		abort();
+}





[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Bugtraq]     [Linux OMAP]     [Linux MIPS]     [eCos]     [Asterisk Internet PBX]     [Linux API]

  Powered by Linux