[PATCH v9 5/5] lib/dlock-list: Scale dlock_lists_empty()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Davidlohr Bueso <dave@xxxxxxxxxxxx>

Instead of the current O(N) implementation, at the cost
of adding an atomic counter, we can convert the call to
an atomic_read(). The counter only serves for accounting
empty to non-empty transitions, and vice versa; therefore
only modified twice for each of the lists during the
lifetime of the dlock (while used).

In addition, to be able to unaccount a list_del(), we
add a dlist pointer to each head, thus minimizing the
overall memory footprint.

Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
Acked-by: Waiman Long <longman@xxxxxxxxxx>
---
 include/linux/dlock-list.h |  8 ++++++
 lib/dlock-list.c           | 67 +++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 65 insertions(+), 10 deletions(-)

diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
index 327cb9e..ac1a2e3 100644
--- a/include/linux/dlock-list.h
+++ b/include/linux/dlock-list.h
@@ -32,10 +32,18 @@
 struct dlock_list_head {
 	struct list_head list;
 	spinlock_t lock;
+	struct dlock_list_heads *dlist;
 } ____cacheline_aligned_in_smp;
 
+/*
+ * This is the main dlist data structure, with the array of heads
+ * and a counter that atomically tracks if any of the lists are
+ * being used. That is, empty to non-empty (and vice versa)
+ * head->list transitions.
+ */
 struct dlock_list_heads {
 	struct dlock_list_head *heads;
+	atomic_t used_lists;
 };
 
 /*
diff --git a/lib/dlock-list.c b/lib/dlock-list.c
index e286094..04da20d 100644
--- a/lib/dlock-list.c
+++ b/lib/dlock-list.c
@@ -122,8 +122,11 @@ int __alloc_dlock_list_heads(struct dlock_list_heads *dlist,
 
 		INIT_LIST_HEAD(&head->list);
 		head->lock = __SPIN_LOCK_UNLOCKED(&head->lock);
+		head->dlist = dlist;
 		lockdep_set_class(&head->lock, key);
 	}
+
+	atomic_set(&dlist->used_lists, 0);
 	return 0;
 }
 EXPORT_SYMBOL(__alloc_dlock_list_heads);
@@ -139,29 +142,36 @@ void free_dlock_list_heads(struct dlock_list_heads *dlist)
 {
 	kfree(dlist->heads);
 	dlist->heads = NULL;
+	atomic_set(&dlist->used_lists, 0);
 }
 EXPORT_SYMBOL(free_dlock_list_heads);
 
 /**
  * dlock_lists_empty - Check if all the dlock lists are empty
  * @dlist: Pointer to the dlock_list_heads structure
- * Return: true if list is empty, false otherwise.
  *
- * This can be a pretty expensive function call. If this function is required
- * in a performance critical path, we may have to maintain a global count
- * of the list entries in the global dlock_list_heads structure instead.
+ * Return: true if all dlock lists are empty, false otherwise.
  */
 bool dlock_lists_empty(struct dlock_list_heads *dlist)
 {
-	int idx;
-
 	/* Shouldn't be called before nr_dlock_lists is initialized */
 	WARN_ON_ONCE(!nr_dlock_lists);
 
-	for (idx = 0; idx < nr_dlock_lists; idx++)
-		if (!list_empty(&dlist->heads[idx].list))
-			return false;
-	return true;
+	/*
+	 * Serialize dlist->used_lists such that a 0->1 transition is not
+	 * missed by another thread checking if any of the dlock lists are
+	 * used.
+	 *
+	 * CPU0				    CPU1
+	 * dlock_list_add()                 dlock_lists_empty()
+	 *   [S] atomic_inc(used_lists);
+	 *       smp_mb__after_atomic();
+	 *					  smp_mb__before_atomic();
+	 *				      [L] atomic_read(used_lists)
+	 *       list_add()
+	 */
+	smp_mb__before_atomic();
+	return !atomic_read(&dlist->used_lists);
 }
 EXPORT_SYMBOL(dlock_lists_empty);
 
@@ -177,11 +187,39 @@ void dlock_lists_add(struct dlock_list_node *node,
 		     struct dlock_list_heads *dlist)
 {
 	struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)];
+	bool list_empty_before_lock = false;
+
+	/*
+	 * Optimistically bump the used_lists counter _before_ taking
+	 * the head->lock such that we don't miss a thread adding itself
+	 * to a list while spinning for the lock.
+	 *
+	 * Then, after taking the lock, recheck if the empty to non-empty
+	 * transition changed and (un)account for ourselves, accordingly.
+	 * Note that all these scenarios are corner cases, and not the
+	 * common scenario, where the lists are actually populated most
+	 * of the time.
+	 */
+	if (unlikely(list_empty_careful(&head->list))) {
+		list_empty_before_lock = true;
+		atomic_inc(&dlist->used_lists);
+		smp_mb__after_atomic();
+	}
 
 	/*
 	 * There is no need to disable preemption
 	 */
 	spin_lock(&head->lock);
+
+	if (unlikely(!list_empty_before_lock && list_empty(&head->list))) {
+		atomic_inc(&dlist->used_lists);
+		smp_mb__after_atomic();
+	}
+	if (unlikely(list_empty_before_lock && !list_empty(&head->list))) {
+		atomic_dec(&dlist->used_lists);
+		smp_mb__after_atomic();
+	}
+
 	WRITE_ONCE(node->head, head);
 	list_add(&node->list, &head->list);
 	spin_unlock(&head->lock);
@@ -212,6 +250,15 @@ void dlock_lists_del(struct dlock_list_node *node)
 		spin_lock(&head->lock);
 		if (likely(head == READ_ONCE(node->head))) {
 			list_del_init(&node->list);
+
+			if (unlikely(list_empty(&head->list))) {
+				struct dlock_list_heads *dlist;
+				dlist = node->head->dlist;
+
+				atomic_dec(&dlist->used_lists);
+				smp_mb__after_atomic();
+			}
+
 			WRITE_ONCE(node->head, NULL);
 			retry = false;
 		} else {
-- 
1.8.3.1




[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux