[PATCH RT 4/6] implement reader limit on read write locks

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch allows for limiting the number of readers a lock may have.
The limit is default to "no limit". The read write locks now keep
track of, not only the number of times a lock is held by read, but also
the number of tasks that have a reader. i.e. If 2 tasks hold the same
read/write lock, and one task holds the lock twice, the count for the
read/write lock would be 3 and the owner count is 2.

The limit of readers is controlled by

  /proc/sys/kernel/rwlock_reader_limit

If this is set to zero or negative, than there is no limit.

Signed-off-by: Steven Rostedt <srostedt@xxxxxxxxxx>
---
 include/linux/rt_lock.h |    1 
 kernel/rtmutex.c        |   89 +++++++++++++++++++++++++++++++++++-------------
 kernel/sysctl.c         |   14 +++++++
 3 files changed, 80 insertions(+), 24 deletions(-)

Index: linux-2.6.24.4-rt4/include/linux/rt_lock.h
===================================================================
--- linux-2.6.24.4-rt4.orig/include/linux/rt_lock.h	2008-03-25 22:54:24.000000000 -0400
+++ linux-2.6.24.4-rt4/include/linux/rt_lock.h	2008-03-25 23:00:46.000000000 -0400
@@ -64,6 +64,7 @@ struct rw_mutex {
 	struct task_struct	*owner;
 	struct rt_mutex		mutex;
 	atomic_t		count;	/* number of times held for read */
+	atomic_t		owners; /* number of owners as readers */
 };
 
 /*
Index: linux-2.6.24.4-rt4/kernel/rtmutex.c
===================================================================
--- linux-2.6.24.4-rt4.orig/kernel/rtmutex.c	2008-03-25 22:55:46.000000000 -0400
+++ linux-2.6.24.4-rt4/kernel/rtmutex.c	2008-03-25 23:06:32.000000000 -0400
@@ -927,6 +927,8 @@ __rt_spin_lock_init(spinlock_t *lock, ch
 }
 EXPORT_SYMBOL(__rt_spin_lock_init);
 
+int rt_rwlock_limit;
+
 static inline int rt_release_bkl(struct rt_mutex *lock, unsigned long flags);
 static inline void rt_reacquire_bkl(int saved_lock_depth);
 
@@ -1000,6 +1002,10 @@ static int try_to_take_rw_read(struct rw
 		goto taken;
 	}
 
+	/* Check for rwlock limits */
+	if (rt_rwlock_limit && atomic_read(&rwm->owners) >= rt_rwlock_limit)
+		return 0;
+
 	if (mtxowner && mtxowner != RT_RW_READER) {
 		if (!try_to_steal_lock(mutex)) {
 			/*
@@ -1044,6 +1050,7 @@ static int try_to_take_rw_read(struct rw
 	rt_rwlock_set_owner(rwm, RT_RW_READER, 0);
  taken:
 	if (incr) {
+		atomic_inc(&rwm->owners);
 		reader_count = current->reader_lock_count++;
 		if (likely(reader_count < MAX_RWLOCK_DEPTH)) {
 			current->owned_read_locks[reader_count].lock = rwm;
@@ -1221,6 +1228,7 @@ rt_read_fastlock(struct rw_mutex *rwm,
 			goto retry;
 		}
 
+		atomic_inc(&rwm->owners);
 		reader_count = current->reader_lock_count++;
 		if (likely(reader_count < MAX_RWLOCK_DEPTH)) {
 			current->owned_read_locks[reader_count].lock = rwm;
@@ -1280,6 +1288,7 @@ retry:
 			goto retry;
 		}
 
+		atomic_inc(&rwm->owners);
 		reader_count = current->reader_lock_count++;
 		if (likely(reader_count < MAX_RWLOCK_DEPTH)) {
 			current->owned_read_locks[reader_count].lock = rwm;
@@ -1471,6 +1480,7 @@ rt_read_slowunlock(struct rw_mutex *rwm,
 	struct rt_mutex *mutex = &rwm->mutex;
 	struct rt_mutex_waiter *waiter;
 	unsigned long flags;
+	unsigned int reader_count;
 	int savestate = !mtx;
 	int i;
 
@@ -1493,6 +1503,7 @@ rt_read_slowunlock(struct rw_mutex *rwm,
 			if (!current->owned_read_locks[i].count) {
 				current->reader_lock_count--;
 				WARN_ON_ONCE(i != current->reader_lock_count);
+				atomic_dec(&rwm->owners);
 			}
 			break;
 		}
@@ -1500,20 +1511,34 @@ rt_read_slowunlock(struct rw_mutex *rwm,
 	WARN_ON_ONCE(i < 0);
 
 	/*
-	 * If there are more readers, let the last one do any wakeups.
-	 * Also check to make sure the owner wasn't cleared when two
-	 * readers released the lock at the same time, and the count
-	 * went to zero before grabbing the wait_lock.
+	 * If the last two (or more) readers unlocked at the same
+	 * time, the owner could be cleared since the count went to
+	 * zero. If this has happened, the rwm owner will not
+	 * be set to current or readers. This means that another reader
+	 * already reset the lock, so there is nothing left to do.
 	 */
-	if (atomic_read(&rwm->count) ||
-	    (rt_rwlock_owner(rwm) != current &&
-	     rt_rwlock_owner(rwm) != RT_RW_READER)) {
-		spin_unlock_irqrestore(&mutex->wait_lock, flags);
-		return;
-	}
+	if ((rt_rwlock_owner(rwm) != current &&
+	     rt_rwlock_owner(rwm) != RT_RW_READER))
+		goto out;
+
+	/*
+	 * If there are more readers and we are under the limit
+	 * let the last reader do the wakeups.
+	 */
+	reader_count = atomic_read(&rwm->count);
+	if (reader_count &&
+	    (!rt_rwlock_limit || atomic_read(&rwm->owners) >= rt_rwlock_limit))
+		goto out;
 
 	/* If no one is blocked, then clear all ownership */
 	if (!rt_mutex_has_waiters(mutex)) {
+		/*
+		 * If count is not zero, we are under the limit with
+		 * no other readers.
+		 */
+		if (reader_count)
+			goto out;
+
 		/* We could still have a pending reader waiting */
 		if (rt_mutex_owner_pending(mutex)) {
 			/* set the rwm back to pending */
@@ -1525,24 +1550,32 @@ rt_read_slowunlock(struct rw_mutex *rwm,
 		goto out;
 	}
 
-	/* We are the last reader with pending waiters. */
+	/*
+	 * If the next waiter is a reader, this can be because of
+	 * two things. One is that we hit the reader limit, or
+	 * Two, there is a pending writer.
+	 * We still only wake up one reader at a time (even if
+	 * we could wake up more). This is because we dont
+	 * have any idea if a writer is pending.
+	 */
 	waiter = rt_mutex_top_waiter(mutex);
-	if (waiter->write_lock)
+	if (waiter->write_lock) {
+		/* only wake up if there are no readers */
+		if (reader_count)
+			goto out;
 		rwm->owner = RT_RW_PENDING_WRITE;
-	else
+	} else {
+		/*
+		 * It is also possible that the reader limit decreased.
+		 * If the limit did decrease, we may not be able to
+		 * wake up the reader if we are currently above the limit.
+		 */
+		if (rt_rwlock_limit &&
+		    unlikely(atomic_read(&rwm->owners) >= rt_rwlock_limit))
+			goto out;
 		rwm->owner = RT_RW_PENDING_READ;
+	}
 
-	/*
-	 * It is possible to have a reader waiting. We still only
-	 * wake one up in that case. A way we can have a reader waiting
-	 * is because a writer woke up, a higher prio reader came
-	 * and stole the lock from the writer. But the writer now
-	 * is no longer waiting on the lock and needs to retake
-	 * the lock. We simply wake up the reader and let the
-	 * reader have the lock. If the writer comes by, it
-	 * will steal the lock from the reader. This is the
-	 * only time we can have a reader pending on a lock.
-	 */
 	wakeup_next_waiter(mutex, savestate);
 
  out:
@@ -1558,15 +1591,22 @@ rt_read_fastunlock(struct rw_mutex *rwm,
 		   int mtx)
 {
 	WARN_ON(!atomic_read(&rwm->count));
+	WARN_ON(!atomic_read(&rwm->owners));
 	WARN_ON(!rwm->owner);
 	atomic_dec(&rwm->count);
 	if (likely(rt_rwlock_cmpxchg(rwm, current, NULL))) {
 		int reader_count = --current->reader_lock_count;
+		int owners;
 		rt_mutex_deadlock_account_unlock(current);
 		if (unlikely(reader_count < 0)) {
 			    reader_count = 0;
 			    WARN_ON_ONCE(1);
 		}
+		owners = atomic_dec_return(&rwm->owners);
+		if (unlikely(owners < 0)) {
+			atomic_set(&rwm->owners, 0);
+			WARN_ON_ONCE(1);
+		}
 		WARN_ON_ONCE(current->owned_read_locks[reader_count].lock != rwm);
 	} else
 		slowfn(rwm, mtx);
@@ -1717,6 +1757,7 @@ void rt_mutex_rwsem_init(struct rw_mutex
 
 	rwm->owner = NULL;
 	atomic_set(&rwm->count, 0);
+	atomic_set(&rwm->owners, 0);
 
 	__rt_mutex_init(mutex, name);
 }
Index: linux-2.6.24.4-rt4/kernel/sysctl.c
===================================================================
--- linux-2.6.24.4-rt4.orig/kernel/sysctl.c	2008-03-25 16:41:48.000000000 -0400
+++ linux-2.6.24.4-rt4/kernel/sysctl.c	2008-03-25 23:00:12.000000000 -0400
@@ -150,6 +150,10 @@ static int parse_table(int __user *, int
 		void __user *, size_t, struct ctl_table *);
 #endif
 
+#ifdef CONFIG_PREEMPT_RT
+extern int rt_rwlock_limit;
+#endif
+
 
 #ifdef CONFIG_PROC_SYSCTL
 static int proc_do_cad_pid(struct ctl_table *table, int write, struct file *filp,
@@ -399,6 +403,16 @@ static struct ctl_table kern_table[] = {
 		.proc_handler	= &proc_dointvec,
 	},
 #endif
+#ifdef CONFIG_PREEMPT_RT
+	{
+		.ctl_name	= CTL_UNNUMBERED,
+		.procname	= "rwlock_reader_limit",
+		.data		= &rt_rwlock_limit,
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
+#endif
 	{
 		.ctl_name	= KERN_PANIC,
 		.procname	= "panic",

-- 
--
To unsubscribe from this list: send the line "unsubscribe linux-rt-users" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [RT Stable]     [Kernel Newbies]     [IDE]     [Security]     [Git]     [Netfilter]     [Bugtraq]     [Yosemite]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux ATA RAID]     [Samba]     [Video 4 Linux]     [Device Mapper]

  Powered by Linux