[PATCH 19/19] quota: redesign dquot reference counting

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Currently each find_get_dquot() goes trough dq_mutex regardless to
dq_count value. This just kill system performance.

With help of small modifications we can avoid locking on the mutex
on fast path.

XXX: cmpxch is not the fastest operation in the world. Right now i can't
If it become a problem we can easily switch from atomic operations to
counter protected by dq_lock.

Signed-off-by: Dmitry Monakhov <dmonakhov@xxxxxxxxxx>
---
 fs/quota/dquot.c      |  117 +++++++++++++++++++++++++++++++++++-------------
 include/linux/quota.h |    3 +-
 2 files changed, 87 insertions(+), 33 deletions(-)

diff --git a/fs/quota/dquot.c b/fs/quota/dquot.c
index 5c8ad82..a5577fd 100644
--- a/fs/quota/dquot.c
+++ b/fs/quota/dquot.c
@@ -233,6 +233,32 @@ static void put_quota_format(struct quota_format_type *fmt)
 struct dqstats dqstats;
 EXPORT_SYMBOL(dqstats);
 
+/**
+ * cmpxch not the fastest operation in the world, but still better
+ * than spin_lock in our case.
+ * atomic_add_if_greater - add if the number is greater than value
+ * @v: pointer of type atomic_t
+ * @a: the amount to add to v...
+ * @u: ...if v is greater to u.
+ *
+ * Atomically adds @a to @v, so long as @v is greater than @u.
+ * Returns non-zero if @v was greater than @u, and zero otherwise.
+ */
+static inline int atomic_add_if_greater(atomic_t *v, int a, int u)
+{
+	int c, old;
+	c = atomic_read(v);
+	for (;;) {
+		if (unlikely(c <= (u)))
+			break;
+		old = atomic_cmpxchg((v), c, c + (a));
+		if (likely(old == c))
+			break;
+		c = old;
+	}
+	return c > (u);
+}
+
 static qsize_t inode_get_rsv_space(struct inode *inode);
 static qsize_t __inode_get_rsv_space(struct inode *inode);
 static void __dquot_initialize(struct inode *inode, int type);
@@ -313,13 +339,6 @@ static inline void remove_inuse(struct dquot *dquot)
 /*
  * End of list functions needing dq_list_lock
  */
-
-static void wait_on_dquot(struct dquot *dquot)
-{
-	mutex_lock(&dquot->dq_mutex);
-	mutex_unlock(&dquot->dq_mutex);
-}
-
 static inline int dquot_dirty(struct dquot *dquot)
 {
 	return test_bit(DQ_MOD_B, &dquot->dq_flags);
@@ -738,12 +757,16 @@ static void prune_one_sb_dqcache(struct super_block *sb, void *arg)
 	head = dqopt->dq_free_list.prev;
 	while (head != &dqopt->dq_free_list && count) {
 		dquot = list_entry(head, struct dquot, dq_free);
+		head = dqopt->dq_free_list.prev;
+		/* If someone is waiting for this dquot to becomes active.
+		   skip it.*/
+		if (test_bit(DQ_WAIT_B, &dquot->dq_flags))
+			continue;
 		remove_dquot_hash(dquot);
 		remove_free_dquot(dquot);
 		remove_inuse(dquot);
 		do_destroy_dquot(dquot);
 		count--;
-		head = dqopt->dq_free_list.prev;
 	}
 	spin_unlock(&dqopt->dq_list_lock);
 	mutex_unlock(&dqctl(sb)->dqonoff_mutex);
@@ -782,6 +805,9 @@ void dqput(struct dquot *dquot)
 
 	if (!dquot)
 		return;
+	dqopt = sb_dqopts(dquot);
+	dqstats_inc(DQST_DROPS);
+we_slept:
 #ifdef CONFIG_QUOTA_DEBUG
 	if (!atomic_read(&dquot->dq_count)) {
 		quota_error(dquot->dq_sb, "trying to free free dquot of %s %d",
@@ -789,16 +815,18 @@ void dqput(struct dquot *dquot)
 		BUG();
 	}
 #endif
-	dqopt = sb_dqopts(dquot);
-	dqstats_inc(DQST_DROPS);
-we_slept:
+
+	/* If count is greater than 2 we don't have to grab any locks */
+	if (atomic_add_if_greater(&dquot->dq_count, -1, 2)) {
+		/* fastpath, nothing to be done there */
+		return;
+	}
 	spin_lock(&dqopt->dq_list_lock);
-	if (atomic_read(&dquot->dq_count) > 1) {
+	if (atomic_add_unless(&dquot->dq_count, -1, 1)) {
 		/* We have more than one user... nothing to do */
-		atomic_dec(&dquot->dq_count);
 		/* Releasing dquot during quotaoff phase? */
-		if (!sb_has_quota_active(dquot->dq_sb, dquot->dq_type) &&
-		    atomic_read(&dquot->dq_count) == 1)
+		if (!sb_has_quota_active(dquot->dq_sb, dquot->dq_type)
+			&& atomic_read(&dquot->dq_count) == 1)
 			wake_up(&dquot->dq_wait_unused);
 		spin_unlock(&dqopt->dq_list_lock);
 		return;
@@ -872,6 +900,34 @@ inline void dqget(struct dquot *dquot)
 	atomic_inc(&dquot->dq_count);
 }
 
+static int  dqget_stable(struct dquot *dquot)
+{
+	if (atomic_add_if_greater(&dquot->dq_count, 1, 1))
+		/* Reference was successfully incremented */
+		return 1;
+	/*
+	 * dquot is in unstable state. In order to serialize with
+	 * dquot_release() we have to wait on ->dq_mutex, but object may be
+	 * belongs free list and may be pruned from cache at any moment after
+	 * we drop list_lock.
+	 * To protect dquot from that we set the WAIT bit.
+	 */
+	set_bit(DQ_WAIT_B, &dquot->dq_flags);
+	spin_unlock(&sb_dqopts(dquot)->dq_list_lock);
+	/*
+	 * Increment count under mutex, to serlialize with dquot_release().
+	 * After we are guaranteed from later release attempts.
+	 */
+	mutex_lock(&dquot->dq_mutex);
+	dqget(dquot);
+	mutex_unlock(&dquot->dq_mutex);
+
+	spin_lock(&sb_dqopts(dquot)->dq_list_lock);
+	remove_free_dquot(dquot);
+	clear_bit(DQ_WAIT_B, &dquot->dq_flags);
+	return 0;
+}
+
 /*
  * Get reference to dquot
  *
@@ -896,7 +952,12 @@ we_slept:
 	rcu_read_unlock();
 
 	dquot = find_dquot(sb, id, type);
-	if (!dquot) {
+	if (dquot) {
+		dqget_stable(dquot);
+		spin_unlock(&dqopt->dq_list_lock);
+		dqstats_inc(DQST_CACHE_HITS);
+		goto found;
+	} else {
 		if (!empty) {
 			spin_unlock(&dqopt->dq_list_lock);
 			empty = get_empty_dquot(sb, type);
@@ -912,24 +973,16 @@ we_slept:
 		/* hash it first so it can be found */
 		insert_dquot_hash(dquot);
 		spin_unlock(&dqopt->dq_list_lock);
-		dqstats_inc(DQST_LOOKUPS);
-	} else {
-		if (!atomic_read(&dquot->dq_count))
-			remove_free_dquot(dquot);
-		dqget(dquot);
-		spin_unlock(&dqopt->dq_list_lock);
-		dqstats_inc(DQST_CACHE_HITS);
-		dqstats_inc(DQST_LOOKUPS);
 	}
-	/* Wait for dq_mutex - after this we know that either dquot_release() is
-	 * already finished or it will be canceled due to dq_count > 1 test */
-	wait_on_dquot(dquot);
+found:
+	dqstats_inc(DQST_LOOKUPS);
 	/* Read the dquot / allocate space in quota file */
-	if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags) &&
-	    dqctl(sb)->dq_op->acquire_dquot(dquot) < 0) {
-		dqput(dquot);
-		dquot = NULL;
-		goto out;
+	if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags)) {
+		if (dqctl(sb)->dq_op->acquire_dquot(dquot) < 0) {
+			dqput(dquot);
+			dquot = NULL;
+			goto out;
+		}
 	}
 #ifdef CONFIG_QUOTA_DEBUG
 	BUG_ON(!dquot->dq_sb);	/* Has somebody invalidated entry under us? */
diff --git a/include/linux/quota.h b/include/linux/quota.h
index 834ed1b..2260fa3 100644
--- a/include/linux/quota.h
+++ b/include/linux/quota.h
@@ -276,7 +276,8 @@ static inline void dqstats_dec(unsigned int type)
 #define DQ_FAKE_B	3	/* no limits only usage */
 #define DQ_READ_B	4	/* dquot was read into memory */
 #define DQ_ACTIVE_B	5	/* dquot is active (dquot_release not called) */
-#define DQ_LASTSET_B	6	/* Following 6 bits (see QIF_) are reserved\
+#define DQ_WAIT_B	6	/* Do not prune this dquot from free list */
+#define DQ_LASTSET_B	7	/* Following 6 bits (see QIF_) are reserved\
 				 * for the mask of entries set via SETQUOTA\
 				 * quotactl. They are set under dq_data_lock\
 				 * and the quota format handling dquot can\
-- 
1.6.5.2

--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux