[RFC 3/6] percpu-refcount: Extend managed mode to allow runtime switching

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Provide more flexibility in terms of runtime mode switch for a managed
percpu ref. This can be useful for cases when in some scenarios, a
managed ref's object enters shutdown phase. Instead of waiting for
manager thread to process the ref, user can dirctly invoke
percpu_ref_kill() for the ref.

The init modes are same as in existing code. Runtime mode switching
allows switching back a managed ref to unmanaged mode, which allows
transitions to all reinit modes from managed mode.

To -->       A   P   P(RI)   M   D  D(RI)  D(RI/M)   EX   REI   RES

  A          y   n     y     y   n    y       y       y     y     y
  P          n   n     n     n   y    n       n       y     n     n
  M          y*  n     y*    y   n    y*      y       y*    y     y
  P(RI)      y   n     y     y   n    y       y       y     y     y
  D(RI)      y   n     y     y   n    y       y       -     y     y
  D(RI/M)    y*  n     y*    y   n    y*      y       -     y     y

Modes:
A - Atomic  P - PerCPU  M - Managed  P(RI) - PerCPU with ReInit
D(RI) - Dead with ReInit  D(RI/M) - Dead with ReInit and Managed

PerCPU Ref Ops:

KLL - Kill  REI - Reinit  RES - Resurrect

(RI) is for modes which are initialized with PERCPU_REF_ALLOW_REINIT.
The transitions shown above are the allowed transitions and can be
indirect transitions. For example, managed ref switches to P(RI) mode
when percpu_ref_switch_to_unmanaged() is called for it. P(RI) mode
can be directly switched to A mode using percpu_ref_switch_to_atomic().

Signed-off-by: Neeraj Upadhyay <Neeraj.Upadhyay@xxxxxxx>
---
 include/linux/percpu-refcount.h |   3 +-
 lib/percpu-refcount.c           | 248 +++++++++++---------------------
 2 files changed, 88 insertions(+), 163 deletions(-)

diff --git a/include/linux/percpu-refcount.h b/include/linux/percpu-refcount.h
index e6aea81b3d01..fe967db431a6 100644
--- a/include/linux/percpu-refcount.h
+++ b/include/linux/percpu-refcount.h
@@ -110,7 +110,7 @@ struct percpu_ref_data {
 	struct rcu_head		rcu;
 	struct percpu_ref	*ref;
 	unsigned int		aux_flags;
-	struct llist_node	node;
+	struct list_head	node;
 
 };
 
@@ -139,6 +139,7 @@ void percpu_ref_switch_to_atomic(struct percpu_ref *ref,
 void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref);
 void percpu_ref_switch_to_percpu(struct percpu_ref *ref);
 int percpu_ref_switch_to_managed(struct percpu_ref *ref);
+void percpu_ref_switch_to_unmanaged(struct percpu_ref *ref);
 void percpu_ref_kill_and_confirm(struct percpu_ref *ref,
 				 percpu_ref_func_t *confirm_kill);
 void percpu_ref_resurrect(struct percpu_ref *ref);
diff --git a/lib/percpu-refcount.c b/lib/percpu-refcount.c
index 7d0c85c7ce57..b79e36905aa4 100644
--- a/lib/percpu-refcount.c
+++ b/lib/percpu-refcount.c
@@ -5,7 +5,7 @@
 #include <linux/sched.h>
 #include <linux/wait.h>
 #include <linux/slab.h>
-#include <linux/llist.h>
+#include <linux/list.h>
 #include <linux/moduleparam.h>
 #include <linux/types.h>
 #include <linux/mm.h>
@@ -43,7 +43,12 @@
 
 static DEFINE_SPINLOCK(percpu_ref_switch_lock);
 static DECLARE_WAIT_QUEUE_HEAD(percpu_ref_switch_waitq);
-static LLIST_HEAD(percpu_ref_manage_head);
+static struct list_head percpu_ref_manage_head = LIST_HEAD_INIT(percpu_ref_manage_head);
+/* Spinlock protects node additions/deletions */
+static DEFINE_SPINLOCK(percpu_ref_manage_lock);
+/* Mutex synchronizes node deletions with the node being scanned */
+static DEFINE_MUTEX(percpu_ref_active_switch_mutex);
+static struct list_head *next_percpu_ref_node = &percpu_ref_manage_head;
 
 static unsigned long __percpu *percpu_count_ptr(struct percpu_ref *ref)
 {
@@ -112,7 +117,7 @@ int percpu_ref_init(struct percpu_ref *ref, percpu_ref_func_t *release,
 	data->confirm_switch = NULL;
 	data->ref = ref;
 	ref->data = data;
-	init_llist_node(&data->node);
+	INIT_LIST_HEAD(&data->node);
 
 	if (flags & PERCPU_REF_REL_MANAGED)
 		percpu_ref_switch_to_managed(ref);
@@ -150,9 +155,9 @@ static int __percpu_ref_switch_to_managed(struct percpu_ref *ref)
 	data->force_atomic = false;
 	if (!__ref_is_percpu(ref, &percpu_count))
 		__percpu_ref_switch_mode(ref, NULL);
-	/* Ensure ordering of percpu mode switch and node scan */
-	smp_mb();
-	llist_add(&data->node, &percpu_ref_manage_head);
+	spin_lock(&percpu_ref_manage_lock);
+	list_add(&data->node, &percpu_ref_manage_head);
+	spin_unlock(&percpu_ref_manage_lock);
 
 	return 0;
 
@@ -162,7 +167,7 @@ static int __percpu_ref_switch_to_managed(struct percpu_ref *ref)
 }
 
 /**
- * percpu_ref_switch_to_managed - Switch an unmanaged ref to percpu mode.
+ * percpu_ref_switch_to_managed - Switch an unmanaged ref to percpu managed mode.
  *
  * @ref: percpu_ref to switch to managed mode
  *
@@ -179,6 +184,47 @@ int percpu_ref_switch_to_managed(struct percpu_ref *ref)
 }
 EXPORT_SYMBOL_GPL(percpu_ref_switch_to_managed);
 
+/**
+ * percpu_ref_switch_to_unmanaged - Switch a managed ref to percpu mode.
+ *
+ * @ref: percpu_ref to switch back to unmanaged percpu mode
+ *
+ * Must only be called with elevated refcount.
+ */
+void percpu_ref_switch_to_unmanaged(struct percpu_ref *ref)
+{
+	bool mutex_taken = false;
+	struct list_head *node;
+	unsigned long flags;
+
+	might_sleep();
+
+	WARN_ONCE(!percpu_ref_is_managed(ref), "Percpu ref is not managed");
+
+	node = &ref->data->node;
+	spin_lock(&percpu_ref_manage_lock);
+	if (list_empty(node)) {
+		spin_unlock(&percpu_ref_manage_lock);
+		mutex_taken = true;
+		mutex_lock(&percpu_ref_active_switch_mutex);
+		spin_lock(&percpu_ref_manage_lock);
+	}
+
+	if (next_percpu_ref_node == node)
+		next_percpu_ref_node = next_percpu_ref_node->next;
+	list_del_init(node);
+	spin_unlock(&percpu_ref_manage_lock);
+	if (mutex_taken)
+		mutex_unlock(&percpu_ref_active_switch_mutex);
+
+	/* Drop the pseudo-init reference */
+	percpu_ref_put(ref);
+	spin_lock_irqsave(&percpu_ref_switch_lock, flags);
+	ref->data->aux_flags &= ~__PERCPU_REL_MANAGED;
+	spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
+}
+EXPORT_SYMBOL_GPL(percpu_ref_switch_to_unmanaged);
+
 static void __percpu_ref_exit(struct percpu_ref *ref)
 {
 	unsigned long __percpu *percpu_count = percpu_count_ptr(ref);
@@ -599,164 +645,35 @@ module_param(max_scan_count, int, 0444);
 
 static void percpu_ref_release_work_fn(struct work_struct *work);
 
-/*
- * Sentinel llist nodes for lockless list traveral and deletions by
- * the pcpu ref release worker, while nodes are added from
- * percpu_ref_init() and percpu_ref_switch_to_managed().
- *
- * Sentinel node marks the head of list traversal for the current
- * iteration of kworker execution.
- */
-struct percpu_ref_sen_node {
-	bool inuse;
-	struct llist_node node;
-};
-
-/*
- * We need two sentinel nodes for lockless list manipulations from release
- * worker - first node will be used in current reclaim iteration. The second
- * node will be used in next iteration. Next iteration marks the first node
- * as free, for use in subsequent iteration.
- */
-#define PERCPU_REF_SEN_NODES_COUNT     2
-
-/* Track last processed percpu ref node */
-static struct llist_node *last_percpu_ref_node;
-
-static struct percpu_ref_sen_node
-	percpu_ref_sen_nodes[PERCPU_REF_SEN_NODES_COUNT];
-
 static DECLARE_DELAYED_WORK(percpu_ref_release_work, percpu_ref_release_work_fn);
 
-static bool percpu_ref_is_sen_node(struct llist_node *node)
-{
-	return &percpu_ref_sen_nodes[0].node <= node &&
-		node <= &percpu_ref_sen_nodes[PERCPU_REF_SEN_NODES_COUNT - 1].node;
-}
-
-static struct llist_node *percpu_ref_get_sen_node(void)
-{
-	int i;
-	struct percpu_ref_sen_node *sn;
-
-	for (i = 0; i < PERCPU_REF_SEN_NODES_COUNT; i++) {
-		sn = &percpu_ref_sen_nodes[i];
-		if (!sn->inuse) {
-			sn->inuse = true;
-			return &sn->node;
-		}
-	}
-
-	return NULL;
-}
-
-static void percpu_ref_put_sen_node(struct llist_node *node)
-{
-	struct percpu_ref_sen_node *sn = container_of(node, struct percpu_ref_sen_node, node);
-
-	sn->inuse = false;
-	init_llist_node(node);
-}
-
-static void percpu_ref_put_all_sen_nodes_except(struct llist_node *node)
-{
-	int i;
-
-	for (i = 0; i < PERCPU_REF_SEN_NODES_COUNT; i++) {
-		if (&percpu_ref_sen_nodes[i].node == node)
-			continue;
-		percpu_ref_sen_nodes[i].inuse = false;
-		init_llist_node(&percpu_ref_sen_nodes[i].node);
-	}
-}
-
 static struct workqueue_struct *percpu_ref_release_wq;
 
 static void percpu_ref_release_work_fn(struct work_struct *work)
 {
-	struct llist_node *pos, *first, *head, *prev, *next;
-	struct llist_node *sen_node;
+	struct list_head *node;
 	struct percpu_ref *ref;
 	int count = 0;
 	bool held;
-	struct llist_node *last_node = READ_ONCE(last_percpu_ref_node);
 
-	first = READ_ONCE(percpu_ref_manage_head.first);
-	if (!first)
+	mutex_lock(&percpu_ref_active_switch_mutex);
+	spin_lock(&percpu_ref_manage_lock);
+	if (list_empty(&percpu_ref_manage_head)) {
+		next_percpu_ref_node = &percpu_ref_manage_head;
+		spin_unlock(&percpu_ref_manage_lock);
+		mutex_unlock(&percpu_ref_active_switch_mutex);
 		goto queue_release_work;
-
-	/*
-	 * Enqueue a dummy node to mark the start of scan. This dummy
-	 * node is used as start point of scan and ensures that
-	 * there is no additional synchronization required with new
-	 * label node additions to the llist. Any new labels will
-	 * be processed in next run of the kworker.
-	 *
-	 *                SCAN START PTR
-	 *                     |
-	 *                     v
-	 * +----------+     +------+    +------+    +------+
-	 * |          |     |      |    |      |    |      |
-	 * |   head   ------> dummy|--->|label |--->| label|--->NULL
-	 * |          |     | node |    |      |    |      |
-	 * +----------+     +------+    +------+    +------+
-	 *
-	 *
-	 * New label addition:
-	 *
-	 *                       SCAN START PTR
-	 *                            |
-	 *                            v
-	 * +----------+  +------+  +------+    +------+    +------+
-	 * |          |  |      |  |      |    |      |    |      |
-	 * |   head   |--> label|--> dummy|--->|label |--->| label|--->NULL
-	 * |          |  |      |  | node |    |      |    |      |
-	 * +----------+  +------+  +------+    +------+    +------+
-	 *
-	 */
-	if (last_node == NULL || last_node->next == NULL) {
-retry_sentinel_get:
-		sen_node = percpu_ref_get_sen_node();
-		/*
-		 * All sentinel nodes are in use? This should not happen, as we
-		 * require only one sentinel for the start of list traversal and
-		 * other sentinel node is freed during the traversal.
-		 */
-		if (WARN_ONCE(!sen_node, "All sentinel nodes are in use")) {
-			/* Use first node as the sentinel node */
-			head = first->next;
-			if (!head) {
-				struct llist_node *ign_node = NULL;
-				/*
-				 * We exhausted sentinel nodes. However, there aren't
-				 * enough nodes in the llist. So, we have leaked
-				 * sentinel nodes. Reclaim sentinels and retry.
-				 */
-				if (percpu_ref_is_sen_node(first))
-					ign_node = first;
-				percpu_ref_put_all_sen_nodes_except(ign_node);
-				goto retry_sentinel_get;
-			}
-			prev = first;
-		} else {
-			llist_add(sen_node, &percpu_ref_manage_head);
-			prev = sen_node;
-			head = prev->next;
-		}
-	} else {
-		prev = last_node;
-		head = prev->next;
 	}
+	if (next_percpu_ref_node == &percpu_ref_manage_head)
+		node = percpu_ref_manage_head.next;
+	else
+		node = next_percpu_ref_node;
+	next_percpu_ref_node = node->next;
+	list_del_init(node);
+	spin_unlock(&percpu_ref_manage_lock);
 
-	llist_for_each_safe(pos, next, head) {
-		/* Free sentinel node which is present in the list */
-		if (percpu_ref_is_sen_node(pos)) {
-			prev->next = pos->next;
-			percpu_ref_put_sen_node(pos);
-			continue;
-		}
-
-		ref = container_of(pos, struct percpu_ref_data, node)->ref;
+	while (!list_is_head(node, &percpu_ref_manage_head)) {
+		ref = container_of(node, struct percpu_ref_data, node)->ref;
 		__percpu_ref_switch_to_atomic_sync_checked(ref, false);
 		/*
 		 * Drop the ref while in RCU read critical section to
@@ -765,24 +682,31 @@ static void percpu_ref_release_work_fn(struct work_struct *work)
 		rcu_read_lock();
 		percpu_ref_put(ref);
 		held = percpu_ref_tryget(ref);
-		if (!held) {
-			prev->next = pos->next;
-			init_llist_node(pos);
+		if (held) {
+			spin_lock(&percpu_ref_manage_lock);
+			list_add(node, &percpu_ref_manage_head);
+			spin_unlock(&percpu_ref_manage_lock);
+			__percpu_ref_switch_to_percpu_checked(ref, false);
+		} else {
 			ref->percpu_count_ptr |= __PERCPU_REF_DEAD;
 		}
 		rcu_read_unlock();
-		if (!held)
-			continue;
-		__percpu_ref_switch_to_percpu_checked(ref, false);
+		mutex_unlock(&percpu_ref_active_switch_mutex);
 		count++;
-		if (count == READ_ONCE(max_scan_count)) {
-			WRITE_ONCE(last_percpu_ref_node, pos);
+		if (count == READ_ONCE(max_scan_count))
 			goto queue_release_work;
+		mutex_lock(&percpu_ref_active_switch_mutex);
+		spin_lock(&percpu_ref_manage_lock);
+		node = next_percpu_ref_node;
+		if (!list_is_head(next_percpu_ref_node, &percpu_ref_manage_head)) {
+			next_percpu_ref_node = next_percpu_ref_node->next;
+			list_del_init(node);
 		}
-		prev = pos;
+		spin_unlock(&percpu_ref_manage_lock);
 	}
 
-	WRITE_ONCE(last_percpu_ref_node, NULL);
+	mutex_unlock(&percpu_ref_active_switch_mutex);
+
 queue_release_work:
 	queue_delayed_work(percpu_ref_release_wq, &percpu_ref_release_work,
 			   scan_interval);
-- 
2.34.1





[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Bugtraq]     [Linux OMAP]     [Linux MIPS]     [eCos]     [Asterisk Internet PBX]     [Linux API]

  Powered by Linux