Currently quota lists are global which is very bad for scalability. * inuse_lists -> sb->s_dquot->dq_inuse_list * free_lists -> sb->s_dquot->dq_free_lists * Add per sb lock for quota's lists protection Do not remove dq_lists_lock is used now only for protecting quota_hash Signed-off-by: Dmitry Monakhov <dmonakhov@xxxxxxxxx> --- fs/quota/dquot.c | 80 ++++++++++++++++++++++++++++++++++++++----------- fs/super.c | 3 ++ include/linux/quota.h | 4 ++ 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/fs/quota/dquot.c b/fs/quota/dquot.c index 5e0b099..f2092d1 100644 --- a/fs/quota/dquot.c +++ b/fs/quota/dquot.c @@ -90,7 +90,8 @@ * about latest values take it as well. * * The spinlock ordering is hence: dq_data_lock > dq_list_lock > i_lock, - * dq_list_lock > dq_state_lock + * dq_list_lock > sb->s_dquot->dq_state_lock + * dq_list_lock > sb->s_dquot->dq_list_lock * * Note that some things (eg. sb pointer, type, id) doesn't change during * the life of the dquot structure and so needn't to be protected by a lock @@ -236,8 +237,6 @@ static void put_quota_format(struct quota_format_type *fmt) * mechanism to locate a specific dquot. */ -static LIST_HEAD(inuse_list); -static LIST_HEAD(free_dquots); static unsigned int dq_hash_bits, dq_hash_mask; static struct hlist_head *dquot_hash; @@ -289,7 +288,7 @@ static struct dquot *find_dquot(unsigned int hashent, struct super_block *sb, /* Add a dquot to the tail of the free list */ static inline void put_dquot_last(struct dquot *dquot) { - list_add_tail(&dquot->dq_free, &free_dquots); + list_add_tail(&dquot->dq_free, &dq_opt(dquot)->dq_free_list); dqstats_inc(DQST_FREE_DQUOTS); } @@ -305,7 +304,7 @@ static inline void put_inuse(struct dquot *dquot) { /* We add to the back of inuse list so we don't have to restart * when traversing this list and we block */ - list_add_tail(&dquot->dq_inuse, &inuse_list); + list_add_tail(&dquot->dq_inuse, &dq_opt(dquot)->dq_inuse_list); dqstats_inc(DQST_ALLOC_DQUOTS); } @@ -338,17 +337,20 @@ static inline int mark_dquot_dirty(struct dquot *dquot) int dquot_mark_dquot_dirty(struct dquot *dquot) { int ret = 1; + struct quota_info *dqopt = dq_opt(dquot); /* If quota is dirty already, we don't have to acquire dq_list_lock */ if (test_bit(DQ_MOD_B, &dquot->dq_flags)) return 1; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (!test_and_set_bit(DQ_MOD_B, &dquot->dq_flags)) { - list_add(&dquot->dq_dirty, &dq_opt(dquot)-> - info[dquot->dq_type].dqi_dirty_list); + list_add(&dquot->dq_dirty, + &dqopt->info[dquot->dq_type].dqi_dirty_list); ret = 0; } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return ret; } @@ -442,10 +444,13 @@ int dquot_commit(struct dquot *dquot) mutex_lock(&dqopt->dqio_mutex); spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (!clear_dquot_dirty(dquot)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); goto out_sem; } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Inactive dquot can be only if there was error during read/init * => we have better not writing it */ @@ -515,10 +520,12 @@ static inline void do_destroy_dquot(struct dquot *dquot) static void invalidate_dquots(struct super_block *sb, int type) { struct dquot *dquot, *tmp; + struct quota_info *dqopt = sb_dqopt(sb); restart: spin_lock(&dq_list_lock); - list_for_each_entry_safe(dquot, tmp, &inuse_list, dq_inuse) { + spin_lock(&dqopt->dq_list_lock); + list_for_each_entry_safe(dquot, tmp, &dqopt->dq_inuse_list, dq_inuse) { if (dquot->dq_sb != sb) continue; if (dquot->dq_type != type) @@ -530,6 +537,7 @@ restart: atomic_inc(&dquot->dq_count); prepare_to_wait(&dquot->dq_wait_unused, &wait, TASK_UNINTERRUPTIBLE); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Once dqput() wakes us up, we know it's time to free * the dquot. @@ -556,6 +564,7 @@ restart: remove_inuse(dquot); do_destroy_dquot(dquot); } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } @@ -565,17 +574,20 @@ int dquot_scan_active(struct super_block *sb, unsigned long priv) { struct dquot *dquot, *old_dquot = NULL; + struct quota_info *dqopt = sb_dqopt(sb); int ret = 0; - mutex_lock(&sb_dqopt(sb)->dqonoff_mutex); + mutex_lock(&dqopt->dqonoff_mutex); spin_lock(&dq_list_lock); - list_for_each_entry(dquot, &inuse_list, dq_inuse) { + spin_lock(&dqopt->dq_list_lock); + list_for_each_entry(dquot, &dqopt->dq_inuse_list, dq_inuse) { if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags)) continue; if (dquot->dq_sb != sb) continue; /* Now we have active dquot so we can just increase use count */ atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); dqput(old_dquot); @@ -584,13 +596,15 @@ int dquot_scan_active(struct super_block *sb, if (ret < 0) goto out; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); /* We are safe to continue now because our dquot could not * be moved out of the inuse list while we hold the reference */ } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); out: dqput(old_dquot); - mutex_unlock(&sb_dqopt(sb)->dqonoff_mutex); + mutex_unlock(&dqopt->dqonoff_mutex); return ret; } EXPORT_SYMBOL(dquot_scan_active); @@ -609,6 +623,7 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) if (!sb_has_quota_active(sb, cnt)) continue; spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); dirty = &dqopt->info[cnt].dqi_dirty_list; while (!list_empty(dirty)) { dquot = list_first_entry(dirty, struct dquot, @@ -622,12 +637,16 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) * holding reference so we can safely just increase * use count */ atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); sb->dq_op->write_dquot(dquot); dqput(dquot); spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); + } + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } @@ -670,23 +689,30 @@ int dquot_quota_sync(struct super_block *sb, int type, int wait) EXPORT_SYMBOL(dquot_quota_sync); /* Free unused dquots from cache */ -static void prune_dqcache(int count) +static void prune_one_sb_dqcache(struct super_block *sb, void *arg) { struct list_head *head; struct dquot *dquot; + struct quota_info *dqopt = sb_dqopt(sb); + int count = *(int*) arg; - head = free_dquots.prev; - while (head != &free_dquots && count) { + spin_lock(&dqopt->dq_list_lock); + head = dqopt->dq_free_list.prev; + while (head != &dqopt->dq_free_list && count) { dquot = list_entry(head, struct dquot, dq_free); remove_dquot_hash(dquot); remove_free_dquot(dquot); remove_inuse(dquot); do_destroy_dquot(dquot); count--; - head = free_dquots.prev; + head = dqopt->dq_free_list.prev; } + spin_unlock(&dqopt->dq_list_lock); +} +static void prune_dqcache(int count) +{ + iterate_supers(prune_one_sb_dqcache, &count); } - /* * This is called from kswapd when we think we need some * more memory @@ -715,6 +741,7 @@ static struct shrinker dqcache_shrinker = { void dqput(struct dquot *dquot) { int ret; + struct quota_info *dqopt; if (!dquot) return; @@ -725,9 +752,11 @@ void dqput(struct dquot *dquot) BUG(); } #endif + dqopt = dq_opt(dquot); dqstats_inc(DQST_DROPS); we_slept: spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); if (atomic_read(&dquot->dq_count) > 1) { /* We have more than one user... nothing to do */ atomic_dec(&dquot->dq_count); @@ -735,11 +764,13 @@ we_slept: if (!sb_has_quota_active(dquot->dq_sb, dquot->dq_type) && atomic_read(&dquot->dq_count) == 1) wake_up(&dquot->dq_wait_unused); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return; } /* Need to release dquot? */ if (test_bit(DQ_ACTIVE_B, &dquot->dq_flags) && dquot_dirty(dquot)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); /* Commit dquot before releasing */ ret = dquot->dq_sb->dq_op->write_dquot(dquot); @@ -752,7 +783,9 @@ we_slept: * infinite loop here */ spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); clear_dquot_dirty(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } goto we_slept; @@ -760,6 +793,7 @@ we_slept: /* Clear flag in case dquot was inactive (something bad happened) */ clear_dquot_dirty(dquot); if (test_bit(DQ_ACTIVE_B, &dquot->dq_flags)) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dquot->dq_sb->dq_op->release_dquot(dquot); goto we_slept; @@ -770,6 +804,7 @@ we_slept: BUG_ON(!list_empty(&dquot->dq_free)); #endif put_dquot_last(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); } EXPORT_SYMBOL(dqput); @@ -813,14 +848,17 @@ struct dquot *dqget(struct super_block *sb, unsigned int id, int type) { unsigned int hashent = hashfn(sb, id, type); struct dquot *dquot = NULL, *empty = NULL; + struct quota_info *dqopt = sb_dqopt(sb); if (!sb_has_quota_active(sb, type)) return NULL; we_slept: spin_lock(&dq_list_lock); - spin_lock(&sb_dqopt(sb)->dq_state_lock); + spin_lock(&dqopt->dq_list_lock); + spin_lock(&dqopt->dq_state_lock); if (!sb_has_quota_active(sb, type)) { - spin_unlock(&sb_dqopt(sb)->dq_state_lock); + spin_unlock(&dqopt->dq_state_lock); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); goto out; } @@ -829,6 +867,7 @@ we_slept: dquot = find_dquot(hashent, sb, id, type); if (!dquot) { if (!empty) { + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); empty = get_empty_dquot(sb, type); if (!empty) @@ -842,12 +881,14 @@ we_slept: put_inuse(dquot); /* hash it first so it can be found */ insert_dquot_hash(dquot); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_LOOKUPS); } else { if (!atomic_read(&dquot->dq_count)) remove_free_dquot(dquot); atomic_inc(&dquot->dq_count); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); dqstats_inc(DQST_CACHE_HITS); dqstats_inc(DQST_LOOKUPS); @@ -953,6 +994,7 @@ static int remove_inode_dquot_ref(struct inode *inode, int type, struct list_head *tofree_head) { struct dquot *dquot = inode->i_dquot[type]; + struct quota_info *dqopt = sb_dqopt(inode->i_sb); inode->i_dquot[type] = NULL; if (dquot) { @@ -964,9 +1006,11 @@ static int remove_inode_dquot_ref(struct inode *inode, int type, atomic_read(&dquot->dq_count)); #endif spin_lock(&dq_list_lock); + spin_lock(&dqopt->dq_list_lock); /* As dquot must have currently users it can't be on * the free list... */ list_add(&dquot->dq_free, tofree_head); + spin_unlock(&dqopt->dq_list_lock); spin_unlock(&dq_list_lock); return 1; } diff --git a/fs/super.c b/fs/super.c index b54cb8b..852866b 100644 --- a/fs/super.c +++ b/fs/super.c @@ -107,6 +107,9 @@ static struct super_block *alloc_super(struct file_system_type *type) mutex_init(&s->s_dquot.dqonoff_mutex); spin_lock_init(&s->s_dquot.dq_state_lock); init_rwsem(&s->s_dquot.dqptr_sem); + spin_lock_init(&s->s_dquot.dq_list_lock); + INIT_LIST_HEAD(&s->s_dquot.dq_inuse_list); + INIT_LIST_HEAD(&s->s_dquot.dq_free_list); init_waitqueue_head(&s->s_wait_unfrozen); s->s_maxbytes = MAX_NON_LFS; s->s_op = &default_op; diff --git a/include/linux/quota.h b/include/linux/quota.h index e39b01c..134c18d 100644 --- a/include/linux/quota.h +++ b/include/linux/quota.h @@ -399,6 +399,10 @@ struct quota_info { struct mutex dqonoff_mutex; /* Serialize quotaon & quotaoff */ struct rw_semaphore dqptr_sem; /* serialize ops using quota_info struct, pointers from inode to dquots */ spinlock_t dq_state_lock; /* serialize quota state changes*/ + spinlock_t dq_list_lock; /* protect lists */ + struct list_head dq_inuse_list; /* list of inused dquotas */ + struct list_head dq_free_list; /* list of free dquotas */ + struct inode *files[MAXQUOTAS]; /* inodes of quotafiles */ struct mem_dqinfo info[MAXQUOTAS]; /* Information for each quota type */ const struct quota_format_ops *ops[MAXQUOTAS]; /* Operations for each type */ -- 1.6.6.1 -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html