[RFC PATCH 3/6] common/Throttle: throttle in FIFO order

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Under heavy write load from many clients, many reader threads will
be waiting in the policy throttler, all on a single condition variable.
When a wakeup is signalled, any of those threads may receive the
signal.  This increases the variance in the message processing
latency, and in extreme cases can significantly delay a message.

This patch causes threads to exit a throttler in the same order
they entered.

Signed-off-by: Jim Schutt <jaschut@xxxxxxxxxx>
---
 src/common/Throttle.h |   42 ++++++++++++++++++++++++++++--------------
 1 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/src/common/Throttle.h b/src/common/Throttle.h
index 10560bf..ca72060 100644
--- a/src/common/Throttle.h
+++ b/src/common/Throttle.h
@@ -3,23 +3,31 @@
 
 #include "Mutex.h"
 #include "Cond.h"
+#include <list>
 
 class Throttle {
-  int64_t count, max, waiting;
+  int64_t count, max;
   uint64_t sseq, wseq;
   Mutex lock;
-  Cond cond;
+  list<Cond*> cond;
   
 public:
-  Throttle(int64_t m = 0) : count(0), max(m), waiting(0), sseq(0), wseq(0),
+  Throttle(int64_t m = 0) : count(0), max(m), sseq(0), wseq(0),
 			  lock("Throttle::lock") {
     assert(m >= 0);
   }
+  ~Throttle() {
+    while (!cond.empty()) {
+      Cond *cv = cond.front();
+      delete cv;
+      cond.pop_front();
+    }
+  }
 
 private:
   void _reset_max(int64_t m) {
-    if (m < max)
-      cond.SignalOne();
+    if (m < max && !cond.empty())
+      cond.front()->SignalOne();
     max = m;
   }
   bool _should_wait(int64_t c) {
@@ -28,19 +36,24 @@ private:
       ((c < max && count + c > max) ||   // normally stay under max
        (c >= max && count > max));       // except for large c
   }
+
   bool _wait(int64_t c) {
     bool waited = false;
-    if (_should_wait(c)) {
-      waiting += c;
+    if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
+      Cond *cv = new Cond;
+      cond.push_back(cv);
       do {
+        if (cv != cond.front())
+          cond.front()->SignalOne();  // wake up the oldest.
 	waited = true;
-	cond.Wait(lock);
-      } while (_should_wait(c));
-      waiting -= c;
+        cv->Wait(lock);
+      } while (_should_wait(c) || cv != cond.front());
+      delete cv;
+      cond.pop_front();
 
       // wake up the next guy
-      if (waiting)
-	cond.SignalOne();
+      if (!cond.empty())
+        cond.front()->SignalOne();
     }
     return waited;
   }
@@ -101,7 +114,7 @@ public:
   bool get_or_fail(int64_t c = 1) {
     assert (c >= 0);
     Mutex::Locker l(lock);
-    if (_should_wait(c)) return false;
+    if (_should_wait(c) || !cond.empty()) return false;
     count += c;
     return true;
   }
@@ -110,7 +123,8 @@ public:
     assert(c >= 0);
     Mutex::Locker l(lock);
     if (c) {
-      cond.SignalOne();
+      if (!cond.empty())
+        cond.front()->SignalOne();
       count -= c;
       assert(count >= 0); //if count goes negative, we failed somewhere!
     }
-- 
1.7.1


--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux