From: Dmitry Monakhov <dmonakhov@xxxxxxxxx> Currently quota lists are global which is very bad for scalability. * inuse_lists -> sb->s_dquot->dq_inuse_list * free_lists -> sb->s_dquot->dq_free_lists * Add per sb lock for quota's lists protection Do not remove dq_lists_lock is used now only for protecting quota_hash Signed-off-by: Dmitry Monakhov <dmonakhov@xxxxxxxxxx> --- fs/quota/dquot.c | 88 +++++++++++++++++++++++++++++++++++++++--------- include/linux/quota.h | 4 ++ 2 files changed, 75 insertions(+), 17 deletions(-) diff --git a/fs/quota/dquot.c b/fs/quota/dquot.c index 31d6b44..324f124 100644 --- a/fs/quota/dquot.c +++ b/fs/quota/dquot.c @@ -90,7 +90,8 @@ * about latest values take it as well. * * The spinlock ordering is hence: dq_data_lock > dq_list_lock > i_lock, - * dq_list_lock > dq_state_lock + * dq_list_lock > sb->s_dquot->dq_state_lock + * dq_list_lock > sb->s_dquot->dq_list_lock * * Note that some things (eg. sb pointer, type, id) doesn't change during * the life of the dquot structure and so needn't to be protected by a lock @@ -236,8 +237,6 @@ static void put_quota_format(struct quota_format_type *fmt) * mechanism to locate a specific dquot. */ -static LIST_HEAD(inuse_list); -static LIST_HEAD(free_dquots); static unsigned int dq_hash_bits, dq_hash_mask; static struct hlist_head *dquot_hash; @@ -289,7 +288,7 @@ static struct dquot *find_dquot(unsigned int hashent, struct super_block *sb, /* Add a dquot to the tail of the free list */ static inline void put_dquot_last(struct dquot *dquot) { - list_add_tail(&dquot->dq_free, &free_dquots); + list_add_tail(&dquot->dq_free, &sb_dqopts(dquot)->dq_free_list); dqstats_inc(DQST_FREE_DQUOTS); } @@ -305,7 +304,7 @@ static inline void put_inuse(struct dquot *dquot) { /* We add to the back of inuse list so we don't have to restart * when traversing this list and we block */ - list_add_tail(&dquot->dq_inuse, &inuse_list); + list_add_tail(&dquot->dq_inuse, &sb_dqopts(dquot)->dq_inuse_list); dqstats_inc(DQST_ALLOC_DQUOTS); } @@ -338,17 +337,20 @@ static inline int mark_dquot_dirty(struct dquot *dquot) int dquot_mark_dquot_dirty(struct dquot *dquot) { int ret = 1; + struct quota_info *dqopt = sb_dqopts(dquot); /* If quota is dirty already, we don't have to acquire dq_list_lock */ if (test_bit(DQ_MOD_B, &dquot->dq_flags)) return 1; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (!test_and_set_bit(DQ_MOD_B, &dquot->dq_flags)) { - list_add(&dquot->dq_dirty, &sb_dqopts(dquot)-> - info[dquot->dq_type].dqi_dirty_list); + list_add(&dquot->dq_dirty, + &dqopt->info[dquot->dq_type].dqi_dirty_list); ret = 0; } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return ret; } @@ -442,10 +444,13 @@ int dquot_commit(struct dquot *dquot) mutex_lock(&dqopt->dqio_mutex); spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (!clear_dquot_dirty(dquot)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); goto out_sem; } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Inactive dquot can be only if there was error during read/init * => we have better not writing it */ @@ -515,10 +520,12 @@ static inline void do_destroy_dquot(struct dquot *dquot) static void invalidate_dquots(struct super_block *sb, int type) { struct dquot *dquot, *tmp; + struct quota_info *dqopt = dqopts(sb); restart: spin_lock(&dq_list_lock); - list_for_each_entry_safe(dquot, tmp, &inuse_list, dq_inuse) { + spin_lock(&dqopt->dq_list_lock); + list_for_each_entry_safe(dquot, tmp, &dqopt->dq_inuse_list, dq_inuse) { if (dquot->dq_sb != sb) continue; if (dquot->dq_type != type) @@ -530,6 +537,7 @@ restart: atomic_inc(&dquot->dq_count); prepare_to_wait(&dquot->dq_wait_unused, &wait, TASK_UNINTERRUPTIBLE); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Once dqput() wakes us up, we know it's time to free * the dquot. @@ -556,6 +564,7 @@ restart: remove_inuse(dquot); do_destroy_dquot(dquot); } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } @@ -565,17 +574,21 @@ int dquot_scan_active(struct super_block *sb, unsigned long priv) { struct dquot *dquot, *old_dquot = NULL; + struct quota_info *dqopt; int ret = 0; mutex_lock(&dqctl(sb)->dqonoff_mutex); + dqopt = dqopts(sb); spin_lock(&dq_list_lock); - list_for_each_entry(dquot, &inuse_list, dq_inuse) { + spin_lock(&dqopt->dq_list_lock); + list_for_each_entry(dquot, &dqopt->dq_inuse_list, dq_inuse) { if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags)) continue; if (dquot->dq_sb != sb) continue; /* Now we have active dquot so we can just increase use count */ atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); dqput(old_dquot); @@ -584,9 +597,11 @@ int dquot_scan_active(struct super_block *sb, if (ret < 0) goto out; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); /* We are safe to continue now because our dquot could not * be moved out of the inuse list while we hold the reference */ } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); out: dqput(old_dquot); @@ -610,6 +625,7 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) if (!sb_has_quota_active(sb, cnt)) continue; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); dirty = &dqopt->info[cnt].dqi_dirty_list; while (!list_empty(dirty)) { dquot = list_first_entry(dirty, struct dquot, @@ -623,12 +639,15 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) * holding reference so we can safely just increase * use count */ atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); dqctl(sb)->dq_op->write_dquot(dquot); dqput(dquot); + spin_lock(&dqopt->dq_list_lock); spin_lock(&dq_list_lock); } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } @@ -672,23 +691,36 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) EXPORT_SYMBOL(dquot_quota_sync); /* Free unused dquots from cache */ -static void prune_dqcache(int count) +static void prune_one_sb_dqcache(struct super_block *sb, void *arg) { struct list_head *head; struct dquot *dquot; + struct quota_info *dqopt = dqopts(sb); + int count = *(int*) arg; - head = free_dquots.prev; - while (head != &free_dquots && count) { + mutex_lock(&dqctl(sb)->dqonoff_mutex); + if (!sb_any_quota_loaded(sb)) { + mutex_unlock(&dqctl(sb)->dqonoff_mutex); + return; + } + spin_lock(&dqopt->dq_list_lock); + head = dqopt->dq_free_list.prev; + while (head != &dqopt->dq_free_list && count) { dquot = list_entry(head, struct dquot, dq_free); remove_dquot_hash(dquot); remove_free_dquot(dquot); remove_inuse(dquot); do_destroy_dquot(dquot); count--; - head = free_dquots.prev; + head = dqopt->dq_free_list.prev; } + spin_unlock(&dqopt->dq_list_lock); + mutex_unlock(&dqctl(sb)->dqonoff_mutex); +} +static void prune_dqcache(int count) +{ + iterate_supers(prune_one_sb_dqcache, &count); } - /* * This is called from kswapd when we think we need some * more memory @@ -717,6 +749,7 @@ static struct shrinker dqcache_shrinker = { void dqput(struct dquot *dquot) { int ret; + struct quota_info *dqopt; if (!dquot) return; @@ -727,9 +760,11 @@ void dqput(struct dquot *dquot) BUG(); } #endif + dqopt = sb_dqopts(dquot); dqstats_inc(DQST_DROPS); we_slept: spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (atomic_read(&dquot->dq_count) > 1) { /* We have more than one user... nothing to do */ atomic_dec(&dquot->dq_count); @@ -737,11 +772,13 @@ we_slept: if (!sb_has_quota_active(dquot->dq_sb, dquot->dq_type) && atomic_read(&dquot->dq_count) == 1) wake_up(&dquot->dq_wait_unused); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return; } /* Need to release dquot? */ if (test_bit(DQ_ACTIVE_B, &dquot->dq_flags) && dquot_dirty(dquot)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Commit dquot before releasing */ ret = dqctl(dquot->dq_sb)->dq_op->write_dquot(dquot); @@ -754,7 +791,9 @@ we_slept: * infinite loop here */ spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); clear_dquot_dirty(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } goto we_slept; @@ -762,6 +801,7 @@ we_slept: /* Clear flag in case dquot was inactive (something bad happened) */ clear_dquot_dirty(dquot); if (test_bit(DQ_ACTIVE_B, &dquot->dq_flags)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqctl(dquot->dq_sb)->dq_op->release_dquot(dquot); goto we_slept; @@ -772,6 +812,7 @@ we_slept: BUG_ON(!list_empty(&dquot->dq_free)); #endif put_dquot_last(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } EXPORT_SYMBOL(dqput); @@ -815,22 +856,26 @@ struct dquot *dqget(struct super_block *sb, unsigned int id, int type) { unsigned int hashent = hashfn(sb, id, type); struct dquot *dquot = NULL, *empty = NULL; + struct quota_info *dqopt = dqopts(sb); if (!sb_has_quota_active(sb, type)) return NULL; we_slept: spin_lock(&dq_list_lock); - spin_lock(&dqopts(sb)->dq_state_lock); + spin_lock(&dqopt->dq_list_lock); + spin_lock(&dqopt->dq_state_lock); if (!sb_has_quota_active(sb, type)) { - spin_unlock(&dqopts(sb)->dq_state_lock); + spin_unlock(&dqopt->dq_state_lock); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); goto out; } - spin_unlock(&dqopts(sb)->dq_state_lock); + spin_unlock(&dqopt->dq_state_lock); dquot = find_dquot(hashent, sb, id, type); if (!dquot) { if (!empty) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); empty = get_empty_dquot(sb, type); if (!empty) @@ -844,12 +889,14 @@ we_slept: put_inuse(dquot); /* hash it first so it can be found */ insert_dquot_hash(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); } else { if (!atomic_read(&dquot->dq_count)) remove_free_dquot(dquot); atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_CACHE_HITS); dqstats_inc(DQST_LOOKUPS); @@ -955,6 +1002,7 @@ static int remove_inode_dquot_ref(struct inode *inode, int type, struct list_head *tofree_head) { struct dquot *dquot = inode->i_dquot[type]; + struct quota_info *dqopt = dqopts(inode->i_sb); inode->i_dquot[type] = NULL; if (dquot) { @@ -966,9 +1014,11 @@ static int remove_inode_dquot_ref(struct inode *inode, int type, atomic_read(&dquot->dq_count)); #endif spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); /* As dquot must have currently users it can't be on * the free list... */ list_add(&dquot->dq_free, tofree_head); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return 1; } @@ -1903,6 +1953,10 @@ static int alloc_quota_info(struct quota_ctl_info *dqctl) { mutex_init(&dqopt->dqio_mutex); spin_lock_init(&dqopt->dq_state_lock); + spin_lock_init(&dqopt->dq_list_lock); + INIT_LIST_HEAD(&dqopt->dq_inuse_list); + INIT_LIST_HEAD(&dqopt->dq_free_list); + dqctl->dq_opt = dqopt; return 0; } diff --git a/include/linux/quota.h b/include/linux/quota.h index 3fca71f..bb63abf 100644 --- a/include/linux/quota.h +++ b/include/linux/quota.h @@ -405,6 +405,10 @@ struct quota_info { struct mutex dqio_mutex; /* lock device while I/O in progress */ struct mem_dqinfo info[MAXQUOTAS]; /* Information for each quota type */ spinlock_t dq_state_lock; /* serialize quota state changes*/ + spinlock_t dq_list_lock; /* protect lists */ + struct list_head dq_inuse_list; /* list of inused dquotas */ + struct list_head dq_free_list; /* list of free dquotas */ + struct inode *files[MAXQUOTAS]; /* inodes of quotafiles */ const struct quota_format_ops *fmt_ops[MAXQUOTAS]; /* Operations for each type */ }; -- 1.6.5.2 -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html