Currently, while an ACL is not cached yet, the initial caller to get_acl is responsible for reading the ACL and updating the cache; any concurrent readers only read the ACL without updating the cache. This works reasonably well for local filesystems where reading ACLs is fast once the underlying disk blocks are cached, but not so well for filesystems where reading an ACL requires network communication, for example. To address that, make subsequent concurrent readers wait for the initial reader instead. Any blocked readers are woken up as soon as an ACL is available (or the initial read has failed). (Filesystems can still force the VFS not to cache ACLs by calling forget_cached_acl() in ->get_acl. This will allow any concurrent readers waiting for the initial reader to proceed. As of now, only Lustre is doing that.) Signed-off-by: Andreas Gruenbacher <agruenba@xxxxxxxxxx> Cc: Oleg Drokin <oleg.drokin@xxxxxxxxx> Cc: Andreas Dilger <andreas.dilger@xxxxxxxxx> --- fs/posix_acl.c | 37 +++++++++++++++++++++++++++++++++---- include/linux/wait.h | 1 + kernel/sched/wait.c | 9 +++++++++ 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/fs/posix_acl.c b/fs/posix_acl.c index 0be840e..c02c7ca 100644 --- a/fs/posix_acl.c +++ b/fs/posix_acl.c @@ -58,6 +58,11 @@ struct posix_acl *get_cached_acl_rcu(struct inode *inode, int type) } EXPORT_SYMBOL(get_cached_acl_rcu); +static bool is_uncached_acl_sentinel(struct posix_acl *acl) +{ + return is_uncached_acl(acl) && acl != ACL_NOT_CACHED; +} + void set_cached_acl(struct inode *inode, int type, struct posix_acl *acl) { struct posix_acl **p = acl_by_type(inode, type); @@ -66,6 +71,8 @@ void set_cached_acl(struct inode *inode, int type, struct posix_acl *acl) old = xchg(p, posix_acl_dup(acl)); if (!is_uncached_acl(old)) posix_acl_release(old); + else if (is_uncached_acl_sentinel(old)) + wake_up_all(generic_waitqueue(p)); } EXPORT_SYMBOL(set_cached_acl); @@ -76,6 +83,8 @@ static void __forget_cached_acl(struct posix_acl **p) old = xchg(p, ACL_NOT_CACHED); if (!is_uncached_acl(old)) posix_acl_release(old); + else if (is_uncached_acl_sentinel(old)) + wake_up_all(generic_waitqueue(p)); } void forget_cached_acl(struct inode *inode, int type) @@ -110,7 +119,9 @@ static void __complete_get_acl(struct posix_acl **p, struct posix_acl *acl) struct posix_acl *sentinel = uncached_acl_sentinel(current); posix_acl_dup(acl); - if (cmpxchg(p, sentinel, acl) != sentinel) + if (cmpxchg(p, sentinel, acl) == sentinel) + wake_up_all(generic_waitqueue(p)); + else posix_acl_release(acl); } @@ -129,7 +140,8 @@ static void __abort_get_acl(struct posix_acl **p) { struct posix_acl *sentinel = uncached_acl_sentinel(current); - cmpxchg(p, sentinel, ACL_NOT_CACHED); + if (cmpxchg(p, sentinel, ACL_NOT_CACHED) == sentinel) + wake_up_all(generic_waitqueue(p)); } void abort_get_acl(struct inode *inode, int type) @@ -165,6 +177,7 @@ struct posix_acl *get_acl(struct inode *inode, int type) struct posix_acl **p; struct posix_acl *acl; +repeat: acl = get_cached_acl(inode, type); if (!is_uncached_acl(acl)) return acl; @@ -182,12 +195,28 @@ struct posix_acl *get_acl(struct inode *inode, int type) } p = acl_by_type(inode, type); - __prepare_get_acl(p); + if (!__prepare_get_acl(p)) { + wait_queue_head_t *wq = generic_waitqueue(p); + DEFINE_WAIT(wait); + + for(;;) { + prepare_to_wait(wq, &wait, TASK_INTERRUPTIBLE); + smp_mb(); + if (!is_uncached_acl_sentinel(*p)) + break; + io_schedule(); + } + finish_wait(wq, &wait); + if (signal_pending(current)) + return ERR_PTR(-ERESTARTSYS); + goto repeat; + } /* * Normally, the ACL returned by ->get_acl will be cached. * A filesystem can prevent that by calling - * forget_cached_acl(inode, type) in ->get_acl. + * forget_cached_acl(inode, type) in ->get_acl, preferably + * early in ->get_acl to avoid serializing concurrent readers. */ acl = inode->i_op->get_acl(inode, type); diff --git a/include/linux/wait.h b/include/linux/wait.h index c3ff74d..adfeb3c 100644 --- a/include/linux/wait.h +++ b/include/linux/wait.h @@ -212,6 +212,7 @@ int out_of_line_wait_on_bit_timeout(void *, int, wait_bit_action_f *, unsigned, int out_of_line_wait_on_bit_lock(void *, int, wait_bit_action_f *, unsigned); int out_of_line_wait_on_atomic_t(atomic_t *, int (*)(atomic_t *), unsigned); wait_queue_head_t *bit_waitqueue(void *, int); +wait_queue_head_t *generic_waitqueue(void *); #define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL) #define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL) diff --git a/kernel/sched/wait.c b/kernel/sched/wait.c index f15d6b6..baf2700 100644 --- a/kernel/sched/wait.c +++ b/kernel/sched/wait.c @@ -493,6 +493,15 @@ wait_queue_head_t *bit_waitqueue(void *word, int bit) } EXPORT_SYMBOL(bit_waitqueue); +wait_queue_head_t *generic_waitqueue(void *ptr) +{ + const struct zone *zone = page_zone(virt_to_page(ptr)); + unsigned long val = (unsigned long)ptr; + + return &zone->wait_table[hash_long(val, zone->wait_table_bits)]; +} +EXPORT_SYMBOL(generic_waitqueue); + /* * Manipulate the atomic_t address to produce a better bit waitqueue table hash * index (we're keying off bit -1, but that would produce a horrible hash -- 2.7.4 -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html