The attempt to reduce latency for ReplicatedBackend

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Dear Cepher
As all we know, PrimaryPG will never return success to client until
all replica send their MSG_OSD_REPOPREPLY message. This does help ceph
to obtain the strong consistence all the time.

I have made some small changes to this golden rule in Ceph in order to
reduce the latency during random write as much as possible.

In the test patch (please see below), we assume pool size is 3,  the
'success' will be returned to client once PrimaryPG has received any
two of the messages, in this case everything works well, including
recovery and backfill etc. In the meanwhile, the Latency of 4K-random
write in my HDD cluster is almost reduced by 50%, which is cut down
from 20ms to 10ms.

However, with this patch, balance read must be disabled since stale
data could be read from replica PG. And the reliability of the data
will be compromised. Even so, I think the effect is limited since
these missing data can be recovered by using PGLOG.

Besides, this modification also made me a little bit anxious. The
reason is that some bugs have been detected and fixed during testing.
Considering this, I am not sure if there will be any other bugs
related to this modification. Therefore, I really hope to get your
suggestions in order to avoid the unexpected bugs in advance.

It will be highly appreciated if any of you would like to share your
experience and comment.
Big thanks in advance

This patch is base on 14.2.7

diff --git a/src/common/legacy_config_opts.h b/src/common/legacy_config_opts.h
index 79d9c1f..464fba1 100644
--- a/src/common/legacy_config_opts.h
+++ b/src/common/legacy_config_opts.h
@@ -873,6 +873,8 @@ OPTION(osd_requested_scrub_priority, OPT_U32)
 OPTION(osd_pg_delete_priority, OPT_U32)
 OPTION(osd_pg_delete_cost, OPT_U32) // set default cost equal to 1MB io

+OPTION(osd_pg_allow_majority_write, OPT_BOOL) // set default cost
equal to 1MB io
+
 OPTION(osd_recovery_priority, OPT_U32)
 // set default cost equal to 20MB io
 OPTION(osd_recovery_cost, OPT_U32)
diff --git a/src/common/options.cc b/src/common/options.cc
index f0ba13a..26242eb 100644
--- a/src/common/options.cc
+++ b/src/common/options.cc
@@ -4082,6 +4082,10 @@ std::vector<Option> get_global_options() {
     .set_default(1<<20)
     .set_description(""),

+    Option("osd_pg_allow_majority_write", Option::TYPE_BOOL,
Option::LEVEL_ADVANCED)
+    .set_default(false)
+    .set_description(""),
+
     Option("osd_scrub_priority", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(5)
     .set_description("Priority for scrub operations in work queue"),
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index cd52976..ad293ec 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -3936,12 +3936,23 @@ void PG::write_if_dirty(ObjectStore::Transaction& t)

 void PG::add_log_entry(const pg_log_entry_t& e, bool applied)
 {
+  // test if this assertion can pass for ever!
+  dout(10) << "add_log_entry e.version" << e.version << dendl;
+  dout(10) << "add_log_entry e.user_version" << e.user_version << dendl;
+  dout(10) << "add_log_entry info.last_user_version" <<
info.last_user_version << dendl;
+  dout(10) << "add_log_entry info.last_complete" <<
info.last_complete << dendl;
+  dout(10) << "add_log_entry info.last_update" << info.last_update << dendl;
+
   // raise last_complete only if we were previously up to date
   if (info.last_complete == info.last_update)
+  {
+    ceph_assert(e.version.version == info.last_complete.version + 1);
     info.last_complete = e.version;
-
+  }
+
   // raise last_update.
   ceph_assert(e.version > info.last_update);
+  ceph_assert(e.version.version == info.last_update.version + 1);
   info.last_update = e.version;

   // raise user_version, if it increased (it may have not get bumped
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
index f8d67af..1aca6e8 100644
--- a/src/osd/ReplicatedBackend.cc
+++ b/src/osd/ReplicatedBackend.cc
@@ -509,9 +509,11 @@ void ReplicatedBackend::submit_transaction(
 void ReplicatedBackend::op_commit(
   InProgressOpRef& op)
 {
-  if (op->on_commit == nullptr) {
-    // aborted
-    return;
+  if (!cct->_conf->osd_pg_allow_majority_write) {
+    if (op->on_commit == nullptr) {
+      // aborted
+      return;
+    }
   }

   FUNCTRACE(cct);
@@ -523,17 +525,26 @@ void ReplicatedBackend::op_commit(
   }

   op->waiting_for_commit.erase(get_parent()->whoami_shard());
-
-  if (op->waiting_for_commit.empty()) {
-    op->on_commit->complete(0);
-    op->on_commit = 0;
-    in_progress_ops.erase(op->tid);
+  if (cct->_conf->osd_pg_allow_majority_write) {
+    if (op->waiting_for_commit.size() == 1 && op->on_commit) {
+      op->on_commit->complete(0);
+      op->on_commit = 0;
+    }
+    if (op->waiting_for_commit.empty()) {
+      in_progress_ops.erase(op->tid);
+    }
+  } else {
+    if (op->waiting_for_commit.empty()) {
+      op->on_commit->complete(0);
+      op->on_commit = 0;
+      in_progress_ops.erase(op->tid);
+    }
   }
 }

 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
 {
-  static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
+  static_cast<MOSDRepOpReply *>(op->get_nonconst_req())->finish_decode();
   const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
   ceph_assert(r->get_header().type == MSG_OSD_REPOPREPLY);

@@ -552,14 +563,14 @@ void ReplicatedBackend::do_repop_reply(OpRequestRef op)

     if (m)
       dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
-       << " ack_type " << (int)r->ack_type
-       << " from " << from
-       << dendl;
+              << " ack_type " << (int)r->ack_type
+              << " from " << from
+              << dendl;
     else
       dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
-       << " ack_type " << (int)r->ack_type
-       << " from " << from
-       << dendl;
+              << " ack_type " << (int)r->ack_type
+              << " from " << from
+              << dendl;

     // oh, good.

@@ -567,8 +578,8 @@ void ReplicatedBackend::do_repop_reply(OpRequestRef op)
       ceph_assert(ip_op.waiting_for_commit.count(from));
       ip_op.waiting_for_commit.erase(from);
       if (ip_op.op) {
- ip_op.op->mark_event("sub_op_commit_rec");
- ip_op.op->pg_trace.event("sub_op_commit_rec");
+        ip_op.op->mark_event("sub_op_commit_rec");
+        ip_op.op->pg_trace.event("sub_op_commit_rec");
       }
     } else {
       // legacy peer; ignore
@@ -578,12 +589,28 @@ void ReplicatedBackend::do_repop_reply(OpRequestRef op)
       from,
       r->get_last_complete_ondisk());

-    if (ip_op.waiting_for_commit.empty() &&
+    if (cct->_conf->osd_pg_allow_majority_write) {
+      if (ip_op.waiting_for_commit.size() == 1 &&
         ip_op.on_commit) {
-      ip_op.on_commit->complete(0);
-      ip_op.on_commit = 0;
-      in_progress_ops.erase(iter);
+        ip_op.on_commit->complete(0);
+        ip_op.on_commit = 0;
+      }
+      if (ip_op.waiting_for_commit.empty()) {
+        in_progress_ops.erase(iter);
+      }
+    } else {
+      if (ip_op.waiting_for_commit.empty() && ip_op.on_commit) {
+        ip_op.on_commit->complete(0);
+        ip_op.on_commit = 0;
+        in_progress_ops.erase(iter);
+      }
     }
+
+    dout(10) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
+              << " ack_type " << (int)r->ack_type
+              << " from " << from
+              << " in_progress_ops size " << in_progress_ops.size()
+              << dendl;
   }
 }



[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux