[PATCH 4/4] posix_acl: get_acl: Synchronize readers

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Currently, while an ACL is not cached yet, the initial caller to get_acl
is responsible for reading the ACL and updating the cache; any
concurrent readers only read the ACL without updating the cache.  This
works reasonably well for local filesystems where reading ACLs is fast
once the underlying disk blocks are cached, but not so well for
filesystems where reading an ACL requires network communication, for
example.

To address that, make subsequent concurrent readers wait for the initial
reader instead.  Any blocked readers are woken up as soon as an ACL is
available (or the initial read has failed).

(Filesystems can still force the VFS not to cache ACLs by calling
forget_cached_acl() in ->get_acl.  This will allow any concurrent
readers waiting for the initial reader to proceed.  As of now, only
Lustre is doing that.)

Signed-off-by: Andreas Gruenbacher <agruenba@xxxxxxxxxx>
Cc: Oleg Drokin <oleg.drokin@xxxxxxxxx>
Cc: Andreas Dilger <andreas.dilger@xxxxxxxxx>
---
 fs/posix_acl.c       | 37 +++++++++++++++++++++++++++++++++----
 include/linux/wait.h |  1 +
 kernel/sched/wait.c  |  9 +++++++++
 3 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/fs/posix_acl.c b/fs/posix_acl.c
index 0be840e..c02c7ca 100644
--- a/fs/posix_acl.c
+++ b/fs/posix_acl.c
@@ -58,6 +58,11 @@ struct posix_acl *get_cached_acl_rcu(struct inode *inode, int type)
 }
 EXPORT_SYMBOL(get_cached_acl_rcu);
 
+static bool is_uncached_acl_sentinel(struct posix_acl *acl)
+{
+	return is_uncached_acl(acl) && acl != ACL_NOT_CACHED;
+}
+
 void set_cached_acl(struct inode *inode, int type, struct posix_acl *acl)
 {
 	struct posix_acl **p = acl_by_type(inode, type);
@@ -66,6 +71,8 @@ void set_cached_acl(struct inode *inode, int type, struct posix_acl *acl)
 	old = xchg(p, posix_acl_dup(acl));
 	if (!is_uncached_acl(old))
 		posix_acl_release(old);
+	else if (is_uncached_acl_sentinel(old))
+		wake_up_all(generic_waitqueue(p));
 }
 EXPORT_SYMBOL(set_cached_acl);
 
@@ -76,6 +83,8 @@ static void __forget_cached_acl(struct posix_acl **p)
 	old = xchg(p, ACL_NOT_CACHED);
 	if (!is_uncached_acl(old))
 		posix_acl_release(old);
+	else if (is_uncached_acl_sentinel(old))
+		wake_up_all(generic_waitqueue(p));
 }
 
 void forget_cached_acl(struct inode *inode, int type)
@@ -110,7 +119,9 @@ static void __complete_get_acl(struct posix_acl **p, struct posix_acl *acl)
 	struct posix_acl *sentinel = uncached_acl_sentinel(current);
 
 	posix_acl_dup(acl);
-	if (cmpxchg(p, sentinel, acl) != sentinel)
+	if (cmpxchg(p, sentinel, acl) == sentinel)
+		wake_up_all(generic_waitqueue(p));
+	else
 		posix_acl_release(acl);
 }
 
@@ -129,7 +140,8 @@ static void __abort_get_acl(struct posix_acl **p)
 {
 	struct posix_acl *sentinel = uncached_acl_sentinel(current);
 
-	cmpxchg(p, sentinel, ACL_NOT_CACHED);
+	if (cmpxchg(p, sentinel, ACL_NOT_CACHED) == sentinel)
+		wake_up_all(generic_waitqueue(p));
 }
 
 void abort_get_acl(struct inode *inode, int type)
@@ -165,6 +177,7 @@ struct posix_acl *get_acl(struct inode *inode, int type)
 	struct posix_acl **p;
 	struct posix_acl *acl;
 
+repeat:
 	acl = get_cached_acl(inode, type);
 	if (!is_uncached_acl(acl))
 		return acl;
@@ -182,12 +195,28 @@ struct posix_acl *get_acl(struct inode *inode, int type)
 	}
 
 	p = acl_by_type(inode, type);
-	__prepare_get_acl(p);
+	if (!__prepare_get_acl(p)) {
+		wait_queue_head_t *wq = generic_waitqueue(p);
+		DEFINE_WAIT(wait);
+
+		for(;;) {
+			prepare_to_wait(wq, &wait, TASK_INTERRUPTIBLE);
+			smp_mb();
+			if (!is_uncached_acl_sentinel(*p))
+				break;
+			io_schedule();
+		}
+		finish_wait(wq, &wait);
+		if (signal_pending(current))
+			return ERR_PTR(-ERESTARTSYS);
+		goto repeat;
+	}
 
 	/*
 	 * Normally, the ACL returned by ->get_acl will be cached.
 	 * A filesystem can prevent that by calling
-	 * forget_cached_acl(inode, type) in ->get_acl.
+	 * forget_cached_acl(inode, type) in ->get_acl, preferably
+	 * early in ->get_acl to avoid serializing concurrent readers.
 	 */
 	acl = inode->i_op->get_acl(inode, type);
 
diff --git a/include/linux/wait.h b/include/linux/wait.h
index c3ff74d..adfeb3c 100644
--- a/include/linux/wait.h
+++ b/include/linux/wait.h
@@ -212,6 +212,7 @@ int out_of_line_wait_on_bit_timeout(void *, int, wait_bit_action_f *, unsigned,
 int out_of_line_wait_on_bit_lock(void *, int, wait_bit_action_f *, unsigned);
 int out_of_line_wait_on_atomic_t(atomic_t *, int (*)(atomic_t *), unsigned);
 wait_queue_head_t *bit_waitqueue(void *, int);
+wait_queue_head_t *generic_waitqueue(void *);
 
 #define wake_up(x)			__wake_up(x, TASK_NORMAL, 1, NULL)
 #define wake_up_nr(x, nr)		__wake_up(x, TASK_NORMAL, nr, NULL)
diff --git a/kernel/sched/wait.c b/kernel/sched/wait.c
index f15d6b6..baf2700 100644
--- a/kernel/sched/wait.c
+++ b/kernel/sched/wait.c
@@ -493,6 +493,15 @@ wait_queue_head_t *bit_waitqueue(void *word, int bit)
 }
 EXPORT_SYMBOL(bit_waitqueue);
 
+wait_queue_head_t *generic_waitqueue(void *ptr)
+{
+	const struct zone *zone = page_zone(virt_to_page(ptr));
+	unsigned long val = (unsigned long)ptr;
+
+	return &zone->wait_table[hash_long(val, zone->wait_table_bits)];
+}
+EXPORT_SYMBOL(generic_waitqueue);
+
 /*
  * Manipulate the atomic_t address to produce a better bit waitqueue table hash
  * index (we're keying off bit -1, but that would produce a horrible hash
-- 
2.7.4

--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux