[PATCH 06/19] quota: make dquot lists per-sb

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Dmitry Monakhov <dmonakhov@xxxxxxxxx>

Currently quota lists are global which is very bad for scalability.
* inuse_lists -> sb->s_dquot->dq_inuse_list
* free_lists  -> sb->s_dquot->dq_free_lists
* Add per sb lock for quota's lists protection

Do not remove dq_lists_lock is used now only for protecting quota_hash

Signed-off-by: Dmitry Monakhov <dmonakhov@xxxxxxxxxx>
---
 fs/quota/dquot.c      |   88 +++++++++++++++++++++++++++++++++++++++---------
 include/linux/quota.h |    4 ++
 2 files changed, 75 insertions(+), 17 deletions(-)

diff --git a/fs/quota/dquot.c b/fs/quota/dquot.c
index 31d6b44..324f124 100644
--- a/fs/quota/dquot.c
+++ b/fs/quota/dquot.c
@@ -90,7 +90,8 @@
  * about latest values take it as well.
  *
  * The spinlock ordering is hence: dq_data_lock > dq_list_lock > i_lock,
- *   dq_list_lock > dq_state_lock
+ *   dq_list_lock > sb->s_dquot->dq_state_lock
+ *   dq_list_lock > sb->s_dquot->dq_list_lock
  *
  * Note that some things (eg. sb pointer, type, id) doesn't change during
  * the life of the dquot structure and so needn't to be protected by a lock
@@ -236,8 +237,6 @@ static void put_quota_format(struct quota_format_type *fmt)
  * mechanism to locate a specific dquot.
  */
 
-static LIST_HEAD(inuse_list);
-static LIST_HEAD(free_dquots);
 static unsigned int dq_hash_bits, dq_hash_mask;
 static struct hlist_head *dquot_hash;
 
@@ -289,7 +288,7 @@ static struct dquot *find_dquot(unsigned int hashent, struct super_block *sb,
 /* Add a dquot to the tail of the free list */
 static inline void put_dquot_last(struct dquot *dquot)
 {
-	list_add_tail(&dquot->dq_free, &free_dquots);
+	list_add_tail(&dquot->dq_free, &sb_dqopts(dquot)->dq_free_list);
 	dqstats_inc(DQST_FREE_DQUOTS);
 }
 
@@ -305,7 +304,7 @@ static inline void put_inuse(struct dquot *dquot)
 {
 	/* We add to the back of inuse list so we don't have to restart
 	 * when traversing this list and we block */
-	list_add_tail(&dquot->dq_inuse, &inuse_list);
+	list_add_tail(&dquot->dq_inuse, &sb_dqopts(dquot)->dq_inuse_list);
 	dqstats_inc(DQST_ALLOC_DQUOTS);
 }
 
@@ -338,17 +337,20 @@ static inline int mark_dquot_dirty(struct dquot *dquot)
 int dquot_mark_dquot_dirty(struct dquot *dquot)
 {
 	int ret = 1;
+	struct quota_info *dqopt = sb_dqopts(dquot);
 
 	/* If quota is dirty already, we don't have to acquire dq_list_lock */
 	if (test_bit(DQ_MOD_B, &dquot->dq_flags))
 		return 1;
 
 	spin_lock(&dq_list_lock);
+	spin_lock(&dqopt->dq_list_lock);
 	if (!test_and_set_bit(DQ_MOD_B, &dquot->dq_flags)) {
-		list_add(&dquot->dq_dirty, &sb_dqopts(dquot)->
-				info[dquot->dq_type].dqi_dirty_list);
+		list_add(&dquot->dq_dirty,
+			&dqopt->info[dquot->dq_type].dqi_dirty_list);
 		ret = 0;
 	}
+	spin_unlock(&dqopt->dq_list_lock);
 	spin_unlock(&dq_list_lock);
 	return ret;
 }
@@ -442,10 +444,13 @@ int dquot_commit(struct dquot *dquot)
 
 	mutex_lock(&dqopt->dqio_mutex);
 	spin_lock(&dq_list_lock);
+	spin_lock(&dqopt->dq_list_lock);
 	if (!clear_dquot_dirty(dquot)) {
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 		goto out_sem;
 	}
+	spin_unlock(&dqopt->dq_list_lock);
 	spin_unlock(&dq_list_lock);
 	/* Inactive dquot can be only if there was error during read/init
 	 * => we have better not writing it */
@@ -515,10 +520,12 @@ static inline void do_destroy_dquot(struct dquot *dquot)
 static void invalidate_dquots(struct super_block *sb, int type)
 {
 	struct dquot *dquot, *tmp;
+	struct quota_info *dqopt = dqopts(sb);
 
 restart:
 	spin_lock(&dq_list_lock);
-	list_for_each_entry_safe(dquot, tmp, &inuse_list, dq_inuse) {
+	spin_lock(&dqopt->dq_list_lock);
+	list_for_each_entry_safe(dquot, tmp, &dqopt->dq_inuse_list, dq_inuse) {
 		if (dquot->dq_sb != sb)
 			continue;
 		if (dquot->dq_type != type)
@@ -530,6 +537,7 @@ restart:
 			atomic_inc(&dquot->dq_count);
 			prepare_to_wait(&dquot->dq_wait_unused, &wait,
 					TASK_UNINTERRUPTIBLE);
+			spin_unlock(&dqopt->dq_list_lock);
 			spin_unlock(&dq_list_lock);
 			/* Once dqput() wakes us up, we know it's time to free
 			 * the dquot.
@@ -556,6 +564,7 @@ restart:
 		remove_inuse(dquot);
 		do_destroy_dquot(dquot);
 	}
+	spin_unlock(&dqopt->dq_list_lock);
 	spin_unlock(&dq_list_lock);
 }
 
@@ -565,17 +574,21 @@ int dquot_scan_active(struct super_block *sb,
 		      unsigned long priv)
 {
 	struct dquot *dquot, *old_dquot = NULL;
+	struct quota_info *dqopt;
 	int ret = 0;
 
 	mutex_lock(&dqctl(sb)->dqonoff_mutex);
+	dqopt = dqopts(sb);
 	spin_lock(&dq_list_lock);
-	list_for_each_entry(dquot, &inuse_list, dq_inuse) {
+	spin_lock(&dqopt->dq_list_lock);
+	list_for_each_entry(dquot, &dqopt->dq_inuse_list, dq_inuse) {
 		if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags))
 			continue;
 		if (dquot->dq_sb != sb)
 			continue;
 		/* Now we have active dquot so we can just increase use count */
 		atomic_inc(&dquot->dq_count);
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 		dqstats_inc(DQST_LOOKUPS);
 		dqput(old_dquot);
@@ -584,9 +597,11 @@ int dquot_scan_active(struct super_block *sb,
 		if (ret < 0)
 			goto out;
 		spin_lock(&dq_list_lock);
+		spin_lock(&dqopt->dq_list_lock);
 		/* We are safe to continue now because our dquot could not
 		 * be moved out of the inuse list while we hold the reference */
 	}
+	spin_unlock(&dqopt->dq_list_lock);
 	spin_unlock(&dq_list_lock);
 out:
 	dqput(old_dquot);
@@ -610,6 +625,7 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait)
 		if (!sb_has_quota_active(sb, cnt))
 			continue;
 		spin_lock(&dq_list_lock);
+		spin_lock(&dqopt->dq_list_lock);
 		dirty = &dqopt->info[cnt].dqi_dirty_list;
 		while (!list_empty(dirty)) {
 			dquot = list_first_entry(dirty, struct dquot,
@@ -623,12 +639,15 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait)
  			 * holding reference so we can safely just increase
 			 * use count */
 			atomic_inc(&dquot->dq_count);
+			spin_unlock(&dqopt->dq_list_lock);
 			spin_unlock(&dq_list_lock);
 			dqstats_inc(DQST_LOOKUPS);
 			dqctl(sb)->dq_op->write_dquot(dquot);
 			dqput(dquot);
+			spin_lock(&dqopt->dq_list_lock);
 			spin_lock(&dq_list_lock);
 		}
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 	}
 
@@ -672,23 +691,36 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait)
 EXPORT_SYMBOL(dquot_quota_sync);
 
 /* Free unused dquots from cache */
-static void prune_dqcache(int count)
+static void prune_one_sb_dqcache(struct super_block *sb, void *arg)
 {
 	struct list_head *head;
 	struct dquot *dquot;
+	struct quota_info *dqopt = dqopts(sb);
+	int count = *(int*) arg;
 
-	head = free_dquots.prev;
-	while (head != &free_dquots && count) {
+	mutex_lock(&dqctl(sb)->dqonoff_mutex);
+	if (!sb_any_quota_loaded(sb)) {
+		mutex_unlock(&dqctl(sb)->dqonoff_mutex);
+		return;
+	}
+	spin_lock(&dqopt->dq_list_lock);
+	head = dqopt->dq_free_list.prev;
+	while (head != &dqopt->dq_free_list && count) {
 		dquot = list_entry(head, struct dquot, dq_free);
 		remove_dquot_hash(dquot);
 		remove_free_dquot(dquot);
 		remove_inuse(dquot);
 		do_destroy_dquot(dquot);
 		count--;
-		head = free_dquots.prev;
+		head = dqopt->dq_free_list.prev;
 	}
+	spin_unlock(&dqopt->dq_list_lock);
+	mutex_unlock(&dqctl(sb)->dqonoff_mutex);
+}
+static void prune_dqcache(int count)
+{
+	iterate_supers(prune_one_sb_dqcache, &count);
 }
-
 /*
  * This is called from kswapd when we think we need some
  * more memory
@@ -717,6 +749,7 @@ static struct shrinker dqcache_shrinker = {
 void dqput(struct dquot *dquot)
 {
 	int ret;
+	struct quota_info *dqopt;
 
 	if (!dquot)
 		return;
@@ -727,9 +760,11 @@ void dqput(struct dquot *dquot)
 		BUG();
 	}
 #endif
+	dqopt = sb_dqopts(dquot);
 	dqstats_inc(DQST_DROPS);
 we_slept:
 	spin_lock(&dq_list_lock);
+	spin_lock(&dqopt->dq_list_lock);
 	if (atomic_read(&dquot->dq_count) > 1) {
 		/* We have more than one user... nothing to do */
 		atomic_dec(&dquot->dq_count);
@@ -737,11 +772,13 @@ we_slept:
 		if (!sb_has_quota_active(dquot->dq_sb, dquot->dq_type) &&
 		    atomic_read(&dquot->dq_count) == 1)
 			wake_up(&dquot->dq_wait_unused);
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 		return;
 	}
 	/* Need to release dquot? */
 	if (test_bit(DQ_ACTIVE_B, &dquot->dq_flags) && dquot_dirty(dquot)) {
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 		/* Commit dquot before releasing */
 		ret = dqctl(dquot->dq_sb)->dq_op->write_dquot(dquot);
@@ -754,7 +791,9 @@ we_slept:
 			 * infinite loop here
 			 */
 			spin_lock(&dq_list_lock);
+			spin_lock(&dqopt->dq_list_lock);
 			clear_dquot_dirty(dquot);
+			spin_unlock(&dqopt->dq_list_lock);
 			spin_unlock(&dq_list_lock);
 		}
 		goto we_slept;
@@ -762,6 +801,7 @@ we_slept:
 	/* Clear flag in case dquot was inactive (something bad happened) */
 	clear_dquot_dirty(dquot);
 	if (test_bit(DQ_ACTIVE_B, &dquot->dq_flags)) {
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 		dqctl(dquot->dq_sb)->dq_op->release_dquot(dquot);
 		goto we_slept;
@@ -772,6 +812,7 @@ we_slept:
 	BUG_ON(!list_empty(&dquot->dq_free));
 #endif
 	put_dquot_last(dquot);
+	spin_unlock(&dqopt->dq_list_lock);
 	spin_unlock(&dq_list_lock);
 }
 EXPORT_SYMBOL(dqput);
@@ -815,22 +856,26 @@ struct dquot *dqget(struct super_block *sb, unsigned int id, int type)
 {
 	unsigned int hashent = hashfn(sb, id, type);
 	struct dquot *dquot = NULL, *empty = NULL;
+	struct quota_info *dqopt = dqopts(sb);
 
         if (!sb_has_quota_active(sb, type))
 		return NULL;
 we_slept:
 	spin_lock(&dq_list_lock);
-	spin_lock(&dqopts(sb)->dq_state_lock);
+	spin_lock(&dqopt->dq_list_lock);
+	spin_lock(&dqopt->dq_state_lock);
 	if (!sb_has_quota_active(sb, type)) {
-		spin_unlock(&dqopts(sb)->dq_state_lock);
+		spin_unlock(&dqopt->dq_state_lock);
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 		goto out;
 	}
-	spin_unlock(&dqopts(sb)->dq_state_lock);
+	spin_unlock(&dqopt->dq_state_lock);
 
 	dquot = find_dquot(hashent, sb, id, type);
 	if (!dquot) {
 		if (!empty) {
+			spin_unlock(&dqopt->dq_list_lock);
 			spin_unlock(&dq_list_lock);
 			empty = get_empty_dquot(sb, type);
 			if (!empty)
@@ -844,12 +889,14 @@ we_slept:
 		put_inuse(dquot);
 		/* hash it first so it can be found */
 		insert_dquot_hash(dquot);
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 		dqstats_inc(DQST_LOOKUPS);
 	} else {
 		if (!atomic_read(&dquot->dq_count))
 			remove_free_dquot(dquot);
 		atomic_inc(&dquot->dq_count);
+		spin_unlock(&dqopt->dq_list_lock);
 		spin_unlock(&dq_list_lock);
 		dqstats_inc(DQST_CACHE_HITS);
 		dqstats_inc(DQST_LOOKUPS);
@@ -955,6 +1002,7 @@ static int remove_inode_dquot_ref(struct inode *inode, int type,
 				  struct list_head *tofree_head)
 {
 	struct dquot *dquot = inode->i_dquot[type];
+	struct quota_info *dqopt = dqopts(inode->i_sb);
 
 	inode->i_dquot[type] = NULL;
 	if (dquot) {
@@ -966,9 +1014,11 @@ static int remove_inode_dquot_ref(struct inode *inode, int type,
 					    atomic_read(&dquot->dq_count));
 #endif
 			spin_lock(&dq_list_lock);
+			spin_lock(&dqopt->dq_list_lock);
 			/* As dquot must have currently users it can't be on
 			 * the free list... */
 			list_add(&dquot->dq_free, tofree_head);
+			spin_unlock(&dqopt->dq_list_lock);
 			spin_unlock(&dq_list_lock);
 			return 1;
 		}
@@ -1903,6 +1953,10 @@ static int alloc_quota_info(struct quota_ctl_info *dqctl) {
 
 	mutex_init(&dqopt->dqio_mutex);
 	spin_lock_init(&dqopt->dq_state_lock);
+	spin_lock_init(&dqopt->dq_list_lock);
+	INIT_LIST_HEAD(&dqopt->dq_inuse_list);
+	INIT_LIST_HEAD(&dqopt->dq_free_list);
+
 	dqctl->dq_opt = dqopt;
 	return 0;
 }
diff --git a/include/linux/quota.h b/include/linux/quota.h
index 3fca71f..bb63abf 100644
--- a/include/linux/quota.h
+++ b/include/linux/quota.h
@@ -405,6 +405,10 @@ struct quota_info {
 	struct mutex dqio_mutex;		/* lock device while I/O in progress */
 	struct mem_dqinfo info[MAXQUOTAS];	/* Information for each quota type */
 	spinlock_t dq_state_lock;	/* serialize quota state changes*/
+	spinlock_t dq_list_lock;		/* protect lists */
+	struct list_head dq_inuse_list;		/* list of inused dquotas */
+	struct list_head dq_free_list;		/* list of free dquotas */
+
 	struct inode *files[MAXQUOTAS];	/* inodes of quotafiles */
 	const struct quota_format_ops *fmt_ops[MAXQUOTAS];	/* Operations for each type */
 };
-- 
1.6.5.2

--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux