Currently quota lists are global which is very bad for scalability. * inuse_lists -> sb->s_dquot->dq_inuse_list * free_lists -> sb->s_dquot->dq_free_lists * Add per sb lock for quota's lists protection Do not remove dq_lists_lock is used now only for protecting quota_hash Signed-off-by: Dmitry Monakhov <dmonakhov@xxxxxxxxxx> --- fs/quota/dquot.c | 88 +++++++++++++++++++++++++++++++++++++++--------- include/linux/quota.h | 3 ++ 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/fs/quota/dquot.c b/fs/quota/dquot.c index f719a6f..d7ec471 100644 --- a/fs/quota/dquot.c +++ b/fs/quota/dquot.c @@ -87,8 +87,8 @@ * i_blocks and i_bytes updates itself are guarded by i_lock acquired directly * in inode_add_bytes() and inode_sub_bytes(). * - * The spinlock ordering is hence: dq_data_lock > dq_list_lock > i_lock. - * + * The spinlock ordering is hence: dq_data_lock > dq_list_lock > i_lock, + * dq_list_lock > sb->s_dquot->dq_list_lock * Note that some things (eg. sb pointer, type, id) doesn't change during * the life of the dquot structure and so needn't to be protected by a lock * @@ -233,8 +233,6 @@ static void put_quota_format(struct quota_format_type *fmt) * mechanism to locate a specific dquot. */ -static LIST_HEAD(inuse_list); -static LIST_HEAD(free_dquots); static unsigned int dq_hash_bits, dq_hash_mask; static struct hlist_head *dquot_hash; @@ -286,7 +284,7 @@ static struct dquot *find_dquot(unsigned int hashent, struct super_block *sb, /* Add a dquot to the tail of the free list */ static inline void put_dquot_last(struct dquot *dquot) { - list_add_tail(&dquot->dq_free, &free_dquots); + list_add_tail(&dquot->dq_free, &sb_dqopts(dquot)->dq_free_list); dqstats_inc(DQST_FREE_DQUOTS); } @@ -302,7 +300,7 @@ static inline void put_inuse(struct dquot *dquot) { /* We add to the back of inuse list so we don't have to restart * when traversing this list and we block */ - list_add_tail(&dquot->dq_inuse, &inuse_list); + list_add_tail(&dquot->dq_inuse, &sb_dqopts(dquot)->dq_inuse_list); dqstats_inc(DQST_ALLOC_DQUOTS); } @@ -335,17 +333,20 @@ static inline int mark_dquot_dirty(struct dquot *dquot) int dquot_mark_dquot_dirty(struct dquot *dquot) { int ret = 1; + struct quota_info *dqopt = sb_dqopts(dquot); /* If quota is dirty already, we don't have to acquire dq_list_lock */ if (test_bit(DQ_MOD_B, &dquot->dq_flags)) return 1; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (!test_and_set_bit(DQ_MOD_B, &dquot->dq_flags)) { - list_add(&dquot->dq_dirty, &sb_dqopts(dquot)-> - info[dquot->dq_type].dqi_dirty_list); + list_add(&dquot->dq_dirty, + &dqopt->info[dquot->dq_type].dqi_dirty_list); ret = 0; } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return ret; } @@ -439,10 +440,13 @@ int dquot_commit(struct dquot *dquot) mutex_lock(&dqopt->dqio_mutex); spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (!clear_dquot_dirty(dquot)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); goto out_sem; } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Inactive dquot can be only if there was error during read/init * => we have better not writing it */ @@ -512,10 +516,12 @@ static inline void do_destroy_dquot(struct dquot *dquot) static void invalidate_dquots(struct super_block *sb, int type) { struct dquot *dquot, *tmp; + struct quota_info *dqopt = dqopts(sb); restart: spin_lock(&dq_list_lock); - list_for_each_entry_safe(dquot, tmp, &inuse_list, dq_inuse) { + spin_lock(&dqopt->dq_list_lock); + list_for_each_entry_safe(dquot, tmp, &dqopt->dq_inuse_list, dq_inuse) { if (dquot->dq_sb != sb) continue; if (dquot->dq_type != type) @@ -527,6 +533,7 @@ restart: atomic_inc(&dquot->dq_count); prepare_to_wait(&dquot->dq_wait_unused, &wait, TASK_UNINTERRUPTIBLE); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Once dqput() wakes us up, we know it's time to free * the dquot. @@ -553,6 +560,7 @@ restart: remove_inuse(dquot); do_destroy_dquot(dquot); } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } @@ -562,17 +570,21 @@ int dquot_scan_active(struct super_block *sb, unsigned long priv) { struct dquot *dquot, *old_dquot = NULL; + struct quota_info *dqopt; int ret = 0; mutex_lock(&dqctl(sb)->dqonoff_mutex); + dqopt = dqopts(sb); spin_lock(&dq_list_lock); - list_for_each_entry(dquot, &inuse_list, dq_inuse) { + spin_lock(&dqopt->dq_list_lock); + list_for_each_entry(dquot, &dqopt->dq_inuse_list, dq_inuse) { if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags)) continue; if (dquot->dq_sb != sb) continue; /* Now we have active dquot so we can just increase use count */ atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); dqput(old_dquot); @@ -581,9 +593,11 @@ int dquot_scan_active(struct super_block *sb, if (ret < 0) goto out; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); /* We are safe to continue now because our dquot could not * be moved out of the inuse list while we hold the reference */ } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); out: dqput(old_dquot); @@ -607,6 +621,7 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) if (!sb_has_quota_active(sb, cnt)) continue; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); dirty = &dqopt->info[cnt].dqi_dirty_list; while (!list_empty(dirty)) { dquot = list_first_entry(dirty, struct dquot, @@ -620,12 +635,15 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) * holding reference so we can safely just increase * use count */ atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); dqctl(sb)->dq_op->write_dquot(dquot); dqput(dquot); + spin_lock(&dqopt->dq_list_lock); spin_lock(&dq_list_lock); } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } @@ -669,23 +687,36 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) EXPORT_SYMBOL(dquot_quota_sync); /* Free unused dquots from cache */ -static void prune_dqcache(int count) +static void prune_one_sb_dqcache(struct super_block *sb, void *arg) { struct list_head *head; struct dquot *dquot; + struct quota_info *dqopt = dqopts(sb); + int count = *(int*) arg; - head = free_dquots.prev; - while (head != &free_dquots && count) { + mutex_lock(&dqctl(sb)->dqonoff_mutex); + if (!sb_any_quota_loaded(sb)) { + mutex_unlock(&dqctl(sb)->dqonoff_mutex); + return; + } + spin_lock(&dqopt->dq_list_lock); + head = dqopt->dq_free_list.prev; + while (head != &dqopt->dq_free_list && count) { dquot = list_entry(head, struct dquot, dq_free); remove_dquot_hash(dquot); remove_free_dquot(dquot); remove_inuse(dquot); do_destroy_dquot(dquot); count--; - head = free_dquots.prev; + head = dqopt->dq_free_list.prev; } + spin_unlock(&dqopt->dq_list_lock); + mutex_unlock(&dqctl(sb)->dqonoff_mutex); +} +static void prune_dqcache(int count) +{ + iterate_supers(prune_one_sb_dqcache, &count); } - /* * This is called from kswapd when we think we need some * more memory @@ -714,6 +745,7 @@ static struct shrinker dqcache_shrinker = { void dqput(struct dquot *dquot) { int ret; + struct quota_info *dqopt; if (!dquot) return; @@ -724,9 +756,11 @@ void dqput(struct dquot *dquot) BUG(); } #endif + dqopt = sb_dqopts(dquot); dqstats_inc(DQST_DROPS); we_slept: spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (atomic_read(&dquot->dq_count) > 1) { /* We have more than one user... nothing to do */ atomic_dec(&dquot->dq_count); @@ -734,11 +768,13 @@ we_slept: if (!sb_has_quota_active(dquot->dq_sb, dquot->dq_type) && atomic_read(&dquot->dq_count) == 1) wake_up(&dquot->dq_wait_unused); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return; } /* Need to release dquot? */ if (test_bit(DQ_ACTIVE_B, &dquot->dq_flags) && dquot_dirty(dquot)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Commit dquot before releasing */ ret = dqctl(dquot->dq_sb)->dq_op->write_dquot(dquot); @@ -751,7 +787,9 @@ we_slept: * infinite loop here */ spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); clear_dquot_dirty(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } goto we_slept; @@ -759,6 +797,7 @@ we_slept: /* Clear flag in case dquot was inactive (something bad happened) */ clear_dquot_dirty(dquot); if (test_bit(DQ_ACTIVE_B, &dquot->dq_flags)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqctl(dquot->dq_sb)->dq_op->release_dquot(dquot); goto we_slept; @@ -769,6 +808,7 @@ we_slept: BUG_ON(!list_empty(&dquot->dq_free)); #endif put_dquot_last(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } EXPORT_SYMBOL(dqput); @@ -812,6 +852,7 @@ struct dquot *dqget(struct super_block *sb, unsigned int id, int type) { unsigned int hashent = hashfn(sb, id, type); struct dquot *dquot = NULL, *empty = NULL; + struct quota_info *dqopt; int idx; rcu_read_lock(); @@ -819,17 +860,21 @@ struct dquot *dqget(struct super_block *sb, unsigned int id, int type) rcu_read_unlock(); return NULL; } - idx = srcu_read_lock(&dqopts(sb)->dq_srcu); + dqopt = dqopts(sb); + idx = srcu_read_lock(&dqopt->dq_srcu); rcu_read_unlock(); we_slept: spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (!sb_has_quota_active(sb, type)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); goto out; } dquot = find_dquot(hashent, sb, id, type); if (!dquot) { if (!empty) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); empty = get_empty_dquot(sb, type); if (!empty) @@ -843,12 +888,14 @@ we_slept: put_inuse(dquot); /* hash it first so it can be found */ insert_dquot_hash(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); } else { if (!atomic_read(&dquot->dq_count)) remove_free_dquot(dquot); atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_CACHE_HITS); dqstats_inc(DQST_LOOKUPS); @@ -867,7 +914,7 @@ we_slept: BUG_ON(!dquot->dq_sb); /* Has somebody invalidated entry under us? */ #endif out: - srcu_read_unlock(&dqopts(sb)->dq_srcu, idx); + srcu_read_unlock(&dqopt->dq_srcu, idx); if (empty) do_destroy_dquot(empty); @@ -955,6 +1002,7 @@ static int remove_inode_dquot_ref(struct inode *inode, int type, struct list_head *tofree_head) { struct dquot *dquot = inode->i_dquot[type]; + struct quota_info *dqopt = dqopts(inode->i_sb); inode->i_dquot[type] = NULL; if (dquot) { @@ -966,9 +1014,11 @@ static int remove_inode_dquot_ref(struct inode *inode, int type, atomic_read(&dquot->dq_count)); #endif spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); /* As dquot must have currently users it can't be on * the free list... */ list_add(&dquot->dq_free, tofree_head); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return 1; } @@ -1964,6 +2014,10 @@ static int alloc_quota_info(struct quota_ctl_info *dqctl) { } mutex_init(&dqopt->dqio_mutex); init_rwsem(&dqopt->dqptr_sem); + spin_lock_init(&dqopt->dq_list_lock); + INIT_LIST_HEAD(&dqopt->dq_inuse_list); + INIT_LIST_HEAD(&dqopt->dq_free_list); + dqctl->dq_opt = dqopt; return 0; } diff --git a/include/linux/quota.h b/include/linux/quota.h index 7170730..4ca03aa 100644 --- a/include/linux/quota.h +++ b/include/linux/quota.h @@ -406,6 +406,9 @@ struct quota_ctl_info { struct quota_info { struct mutex dqio_mutex; /* lock device while I/O in progress */ struct mem_dqinfo info[MAXQUOTAS]; /* Information for each quota type */ + spinlock_t dq_list_lock; /* protect lists */ + struct list_head dq_inuse_list; /* list of inused dquotas */ + struct list_head dq_free_list; /* list of free dquotas */ struct inode *files[MAXQUOTAS]; /* inodes of quotafiles */ const struct quota_format_ops *fmt_ops[MAXQUOTAS]; /* Operations for each type */ struct srcu_struct dq_srcu; /* use count read lock */ -- 1.6.5.2 -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html