Re: read/write on RADOS using external buffer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi,

I just implemented the patch to support external buffer on Rados::read() and Rados::write().

diff --git a/src/include/librados.hpp b/src/include/librados.hpp
index 06fa3b2..bfb0f5b 100644
--- a/src/include/librados.hpp
+++ b/src/include/librados.hpp
@@ -63,8 +63,10 @@ public:
   int create(pool_t pool, const std::string& oid, bool exclusive);
 
   int write(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
+  int write(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
   int write_full(pool_t pool, const std::string& oid, bufferlist& bl);
   int read(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
+  int read(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
   int remove(pool_t pool, const std::string& oid);
   int trunc(pool_t pool, const std::string& oid, size_t size);
 
@@ -135,4 +137,3 @@ public:
 }
 
 #endif
-
diff --git a/src/librados.cc b/src/librados.cc
index 4c8a464..91e6a27 100644
--- a/src/librados.cc
+++ b/src/librados.cc
@@ -72,11 +72,12 @@ class RadosClient : public Dispatcher
 
   Mutex lock;
   Cond cond;
+  static hash_map<tid_t, bufferptr*> buffer_map;
+  static bufferptr* fetch_buffer_func(tid_t tid);
 
- 
 public:
   RadosClient() : messenger(NULL), lock("radosclient") {
-    messenger = new SimpleMessenger();
+    messenger = new SimpleMessenger(&RadosClient::fetch_buffer_func);
   }
 
   ~RadosClient();
@@ -132,8 +133,10 @@ public:
   // io
   int create(PoolCtx& pool, const object_t& oid, bool exclusive);
   int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+  int write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
   int write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl);
   int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+  int read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
   int remove(PoolCtx& pool, const object_t& oid);
   int stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime);
   int trunc(PoolCtx& pool, const object_t& oid, size_t size);
@@ -870,6 +873,15 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist
   return len;
 }
 
+int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
+{
+  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
+  bufferlist bl;
+
+  bl.push_back(bp);
+  return write(pool, oid, off, bl, len);
+}
+
 int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl)
 {
   utime_t ut = g_clock.now();
@@ -1116,6 +1128,46 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist&
   return bl.length();
 }
 
+int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
+{
+  SnapContext snapc;
+
+  Mutex mylock("RadosClient::read::mylock");
+  Cond cond;
+  bool done;
+  int r;
+  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
+  bufferlist bl;
+
+  bl.push_back(bp);
+  lock.Lock();
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
+  tid_t tid = objecter->get_tid();
+  buffer_map[tid] = &bp;
+  objecter->read_with_tid(oid, layout,
+	      off, len, pool.snap_seq, &bl, 0,
+	      onack, tid);
+  lock.Unlock();
+
+  mylock.Lock();
+  while (!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+  buffer_map.erase(tid);
+  dout(10) << "Objecter returned from read r=" << r << dendl;
+
+  if (r < 0)
+    return r;
+
+  if (bl.length() < len) {
+    dout(10) << "Returned length " << bl.length()
+	     << " less than original length "<< len << dendl;
+  }
+
+  return bl.length();
+}
+
 int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime)
 {
   SnapContext snapc;
@@ -1251,6 +1303,13 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map<std::string,
   return r;
 }
 
+hash_map<tid_t, bufferptr*> RadosClient::buffer_map;
+
+bufferptr* RadosClient::fetch_buffer_func(tid_t tid)
+{
+  return buffer_map[tid];
+}
+
 // ---------------------------------------------
 
 namespace librados {
@@ -1401,6 +1460,14 @@ int Rados::write(rados_pool_t pool, const string& o, off_t off, bufferlist& bl,
   return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
 }
 
+int Rados::write(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
+{
+  if (!client)
+    return -EINVAL;
+  object_t oid(o);
+  return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
+}
+
 int Rados::write_full(rados_pool_t pool, const string& o, bufferlist& bl)
 {
   if (!client)
@@ -1433,6 +1500,14 @@ int Rados::read(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, s
   return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
 }
 
+int Rados::read(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
+{
+  if (!client)
+    return -EINVAL;
+  object_t oid(o);
+  return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
+}
+
 int Rados::getxattr(rados_pool_t pool, const string& o, const char *name, bufferlist& bl)
 {
   if (!client)
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index 4632267..75b4576 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -51,7 +51,7 @@ static ostream& _prefix(SimpleMessenger *messenger) {
 #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
 #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
 
-
+SimpleMessenger::fetch_buffer_callback_t SimpleMessenger::fetch_buffer_callback = 0;
 
 /********************************************
  * Accepter
@@ -1786,36 +1786,47 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   data_len = le32_to_cpu(header.data_len);
   data_off = le32_to_cpu(header.data_off);
   if (data_len) {
-    int left = data_len;
-    if (data_off & ~PAGE_MASK) {
-      // head
-      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
-		     (unsigned)left);
-      bufferptr bp = buffer::create(head);
-      if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
+    if (fetch_buffer_callback) {
+      bufferptr *bpp = fetch_buffer_callback(header.tid);
+      if (!bpp)
+	goto allocate_buffer;
+      if (tcp_read( sd, bpp->c_str(), data_len, messenger->timeout ) < 0)
 	goto out_dethrottle;
-      data.push_back(bp);
-      left -= head;
-      dout(20) << "reader got data head " << head << dendl;
-    }
+      data.push_back(*bpp);
+      dout(20) << "reader got data " << data_len << dendl;
+    }else{
+    allocate_buffer:
+      int left = data_len;
+      if (data_off & ~PAGE_MASK) {
+	// head
+	int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
+		       (unsigned)left);
+	bufferptr bp = buffer::create(head);
+	if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
+	  goto out_dethrottle;
+	data.push_back(bp);
+	left -= head;
+	dout(20) << "reader got data head " << head << dendl;
+      }
 
-    // middle
-    int middle = left & PAGE_MASK;
-    if (middle > 0) {
-      bufferptr bp = buffer::create_page_aligned(middle);
-      if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
-	goto out_dethrottle;
-      data.push_back(bp);
-      left -= middle;
-      dout(20) << "reader got data page-aligned middle " << middle << dendl;
-    }
+      // middle
+      int middle = left & PAGE_MASK;
+      if (middle > 0) {
+	bufferptr bp = buffer::create_page_aligned(middle);
+	if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
+	  goto out_dethrottle;
+	data.push_back(bp);
+	left -= middle;
+	dout(20) << "reader got data page-aligned middle " << middle << dendl;
+      }
 
-    if (left) {
-      bufferptr bp = buffer::create(left);
-      if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
-	goto out_dethrottle;
-      data.push_back(bp);
-      dout(20) << "reader got data tail " << left << dendl;
+      if (left) {
+	bufferptr bp = buffer::create(left);
+	if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
+	  goto out_dethrottle;
+	data.push_back(bp);
+	dout(20) << "reader got data tail " << left << dendl;
+      }
     }
   }
 
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index b4a0ef3..72a5a1b 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -567,6 +567,9 @@ private:
   SimpleMessenger *messenger; //hack to make dout macro work, will fix
   int timeout;
 
+  typedef bufferptr* (*fetch_buffer_callback_t) (tid_t);
+  static fetch_buffer_callback_t fetch_buffer_callback;
+
 public:
   SimpleMessenger() :
     Messenger(entity_name_t()),
@@ -580,6 +583,20 @@ public:
     // for local dmsg delivery
     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
   }
+
+  SimpleMessenger(fetch_buffer_callback_t callback) :
+    Messenger(entity_name_t()),
+    accepter(this),
+    lock("SimpleMessenger::lock"), started(false), did_bind(false),
+    dispatch_throttler(g_conf.ms_dispatch_throttle_bytes), need_addr(true),
+    destination_stopped(true), my_type(-1),
+    global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
+    reaper_thread(this), reaper_started(false), reaper_stop(false), 
+    dispatch_thread(this), messenger(this) {
+    fetch_buffer_callback = callback;
+    // for local dmsg delivery
+    dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
+  }
   ~SimpleMessenger() {
     delete dispatch_queue.local_pipe;
   }
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index a34c0a9..fd43795 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -530,6 +530,21 @@ private:
     o->outbl = pbl;
     return op_submit(o);
   }
+  tid_t read_with_tid(const object_t& oid, ceph_object_layout ol, 
+	     uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
+	     Context *onfinish, tid_t tid) {
+    vector<OSDOp> ops(1);
+    ops[0].op.op = CEPH_OSD_OP_READ;
+    ops[0].op.extent.offset = off;
+    ops[0].op.extent.length = len;
+    ops[0].op.extent.truncate_size = 0;
+    ops[0].op.extent.truncate_seq = 0;
+    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
+    o->snapid = snap;
+    o->outbl = pbl;
+    o->tid = tid;
+    return op_submit(o);
+  }
   tid_t read_trunc(const object_t& oid, ceph_object_layout ol, 
 	     uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
 	     uint64_t trunc_size, __u32 trunc_seq,
@@ -725,6 +740,8 @@ private:
 
   void list_objects(ListContext *p, Context *onfinish);
 
+  tid_t get_tid(void) { return ++last_tid; }
+
   // -------------------------
   // pool ops
 private:
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux