[PATCH 3/3] librbd: v3 copy-on-read for clones, write entire object into child asychronously

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



  The object has been already saved in m_entire_object bufferlist.
Send "copyup" request with m_entire_object, then cls_rbd method copyup will write the object data to child.
AioCompletion is used to protect copyup aio operation, and remove it after copyup request finished.
Add a function  xlist::iterator:: item *get_cur() {return cur;} in include/xlist.h
to support removing item in xlist.

Signed-off-by: Min Chen <minchen@xxxxxxxxxxxxxxx>
Signed-off-by: Li Wang <liwang@xxxxxxxxxxxxxxx>
Signed-off-by: Yunchuan Wen <yunchuanwen@xxxxxxxxxxxxxxx>
---
 src/include/xlist.h      |    1 +
 src/librbd/AioRequest.cc |   33 ++++++++++++++++++++++
 src/librbd/AioRequest.h  |    1 +
 src/librbd/ImageCtx.cc   |   68 ++++++++++++++++++++++++++++++++++++++++++++++
 src/librbd/ImageCtx.h    |    6 ++++
 src/librbd/internal.cc   |    4 +++
 6 files changed, 113 insertions(+)

diff --git a/src/include/xlist.h b/src/include/xlist.h
index 5384561..3932c40 100644
--- a/src/include/xlist.h
+++ b/src/include/xlist.h
@@ -157,6 +157,7 @@ public:
       return *this;
     }
     bool end() const { return cur == 0; }
+    item *get_cur() const { return cur; }
   };
 
   iterator begin() { return iterator(_front); }
diff --git a/src/librbd/AioRequest.cc b/src/librbd/AioRequest.cc
index 767fa75..ee0a98e 100644
--- a/src/librbd/AioRequest.cc
+++ b/src/librbd/AioRequest.cc
@@ -71,6 +71,38 @@ namespace librbd {
 
   /** read **/
 
+  //copy-on-read: after read entire object, just write it into child
+  ssize_t AioRead::write_cor()
+  {
+    ldout(m_ictx->cct, 20) << "write_cor" << dendl;
+    int ret = 0;
+
+    m_ictx->snap_lock.get_read();
+    ::SnapContext snapc = m_ictx->snapc;
+    m_ictx->snap_lock.put_read();
+
+    librados::ObjectWriteOperation copyup_cor;
+    copyup_cor.exec("rbd", "copyup", m_entire_object);
+
+    std::vector<librados::snap_t> m_snaps;
+    for (std::vector<snapid_t>::const_iterator it = snapc.snaps.begin();
+                it != snapc.snaps.end(); ++it) {
+      m_snaps.push_back(it->val);
+    }
+
+    librados::AioCompletion *cor_completion =
+        librados::Rados::aio_create_completion(m_ictx, librbd::cor_completion_callback, NULL);
+
+    xlist<librados::AioCompletion *>::item *comp =
+       new xlist<librados::AioCompletion *>::item(cor_completion);
+
+    m_ictx->add_cor_completion(comp);//add cor_completion to xlist
+    //asynchronously write object
+    ret = m_ictx->md_ctx.aio_operate(m_oid, cor_completion, &copyup_cor, snapc.seq.val, m_snaps);
+
+    return ret;
+  }
+
   bool AioRead::should_complete(int r)
   {
     ldout(m_ictx->cct, 20) << "should_complete " << this << " " << m_oid << " " << m_object_off << "~" << m_object_len
@@ -128,6 +160,7 @@ namespace librbd {
 	m_ictx->prune_parent_extents(image_extents, image_overlap);
 	// copy the read range to m_read_data
 	m_read_data.substr_of(m_entire_object, m_object_off, m_object_len);
+	write_cor();
       }
     }
 
diff --git a/src/librbd/AioRequest.h b/src/librbd/AioRequest.h
index 42301a5..4e40024 100644
--- a/src/librbd/AioRequest.h
+++ b/src/librbd/AioRequest.h
@@ -75,6 +75,7 @@ namespace librbd {
 	m_tried_parent(false), m_sparse(sparse) {
     }
     virtual ~AioRead() {}
+    ssize_t write_cor();
     virtual bool should_complete(int r);
     virtual int send();
 
diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc
index 6477e8d..f74eafb 100644
--- a/src/librbd/ImageCtx.cc
+++ b/src/librbd/ImageCtx.cc
@@ -45,6 +45,7 @@ namespace librbd {
       snap_lock("librbd::ImageCtx::snap_lock"),
       parent_lock("librbd::ImageCtx::parent_lock"),
       refresh_lock("librbd::ImageCtx::refresh_lock"),
+      cor_lock("librbd::ImageCtx::cor_lock"),
       extra_read_flags(0),
       old_format(true),
       order(0), size(0), features(0),
@@ -96,6 +97,7 @@ namespace librbd {
       object_set->return_enoent = true;
       object_cacher->start();
     }
+    cor_completions = new xlist<librados::AioCompletion*>();
   }
 
   ImageCtx::~ImageCtx() {
@@ -112,6 +114,10 @@ namespace librbd {
       delete object_set;
       object_set = NULL;
     }
+    if (cor_completions) {
+      delete cor_completions;
+      cor_completions = NULL;
+    }
     delete[] format_string;
   }
 
@@ -648,4 +654,66 @@ namespace librbd {
 		   << " from image extents " << objectx << dendl;
     return len;
  }
+
+  void ImageCtx::add_cor_completion(xlist<librados::AioCompletion*>::item *comp)
+  {
+    if(!comp)
+      return;
+
+    cor_lock.Lock();
+    cor_completions->push_back(comp);
+    cor_lock.Unlock();
+
+    ldout(cct, 10) << "add_cor_completion:: size = "<< cor_completions->size() << dendl;
+  }
+
+  void ImageCtx::wait_last_completions()
+  {
+    ldout(cct, 10) << "wait_last_completions:: cor_completions = " << cor_completions  << " size = " << cor_completions->size()  << dendl;
+    xlist<librados::AioCompletion*>::iterator itr;
+    xlist<librados::AioCompletion*>::item *ptr;
+
+    while (!cor_completions->empty()){
+      cor_lock.Lock();
+      librados::AioCompletion *comp = cor_completions->front();
+      comp->wait_for_complete();
+      itr = cor_completions->begin();
+      ptr = itr.get_cur();
+      cor_completions->pop_front();
+      delete ptr;
+      ptr = NULL;
+      cor_lock.Unlock();
+    }
+    ldout(cct, 10) << "wait_last_completions:: after clear cor_completions = " << cor_completions  << " size = " << cor_completions->size() << dendl;
+  }
+
+  void cor_completion_callback(librados::completion_t aio_completion_impl, void *arg)
+  {
+    librbd::ImageCtx * ictx = (librbd::ImageCtx *)arg;
+
+    ictx->cor_lock.Lock();
+    xlist<librados::AioCompletion*> *completions = ictx->cor_completions; 
+    ictx->cor_lock.Unlock();
+
+    ldout(ictx->cct, 10) << "cor_completion_callback:: cor_completions = " << completions << " size = "<< completions->size() << dendl;
+    if (!completions) 
+      return;
+
+    //find current AioCompletion item in xlist, and remove it
+    for (xlist<librados::AioCompletion*>::iterator itr = completions->begin(); !(itr.end()); ++itr) {
+       if (aio_completion_impl == (*itr)->pc){
+         xlist<librados::AioCompletion*>::item *ptr = itr.get_cur();
+
+         ictx->cor_lock.Lock();
+         completions->remove(ptr);
+         ictx->cor_lock.Unlock();
+
+         delete ptr;//delete xlist<librados::AioCompletion*>::item *
+         ptr = NULL;
+         break;
+       }
+    }
+    ldout(ictx->cct, 10) << "cor_completion_callback:: after remove item, size = " << completions->size() << dendl;
+  }
+
 }
diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h
index 026a3e0..e1d08c9 100644
--- a/src/librbd/ImageCtx.h
+++ b/src/librbd/ImageCtx.h
@@ -68,6 +68,7 @@ namespace librbd {
     RWLock snap_lock; // protects snapshot-related member variables:
     RWLock parent_lock; // protects parent_md and parent
     Mutex refresh_lock; // protects refresh_seq and last_refresh
+    Mutex cor_lock; //protects cor_completions for copy-on-read
 
     unsigned extra_read_flags;
 
@@ -89,6 +90,8 @@ namespace librbd {
     LibrbdWriteback *writeback_handler;
     ObjectCacher::ObjectSet *object_set;
 
+    xlist<librados::AioCompletion*> *cor_completions; //copy-on-read AioCompletions
+
     /**
      * Either image_name or image_id must be set.
      * If id is not known, pass the empty std::string,
@@ -148,7 +151,10 @@ namespace librbd {
     uint64_t prune_parent_extents(vector<pair<uint64_t,uint64_t> >& objectx,
 				  uint64_t overlap);
 
+    void add_cor_completion(xlist<librados::AioCompletion*>::item *comp);
+    void wait_last_completions();//wait for uncompleted asynchronous write which is still in xlist
   };
+  void cor_completion_callback(librados::completion_t aio_completion_impl, void *arg);
 }
 
 #endif
diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc
index 127be38..d676b77 100644
--- a/src/librbd/internal.cc
+++ b/src/librbd/internal.cc
@@ -2101,6 +2101,10 @@ reprotect_and_return_err:
   void close_image(ImageCtx *ictx)
   {
     ldout(ictx->cct, 20) << "close_image " << ictx << dendl;
+
+    if (ictx->cor_completions)
+      ictx->wait_last_completions();//copy-on-read: wait for unfinished AioCompletion requests
+
     if (ictx->object_cacher)
       ictx->shutdown_cache(); // implicitly flushes
     else
-- 
1.7.10.4

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux