Re: read/write on RADOS using external buffer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Takuya,

Thanks for looking at this!

Unfortunately I think you have taken the wrong approach here. There
isn't any need for thread-local data in the Objecter. It's not threads
that the Objecter cares about, it's clients. Also, I don't think a
change like this needs to involve SimpleMessenger and all that.

Take a look at rados_read in librados.cc. This function implements the
C API, which can already handle using external buffers. It may be that
all you need to do is add functions in the C++ header that call
rados_read and rados_write!

I actually think that rados_read could be made slightly more
efficient. We could probably add a bufferlist constructor that starts
with an already allocated buffer, to avoid the call to bl.copy. I
haven't thought too hard about how to do this but in principle it
seems reasonable.

regards,
Colin McCabe


On Tue, Oct 19, 2010 at 3:27 PM, Takuya ASADA <syuu@xxxxxxxxxxxx> wrote:
> Hi,
>
> I just implemented the patch to support external buffer on Rados::read() and Rados::write().
>
> diff --git a/src/include/librados.hpp b/src/include/librados.hpp
> index 06fa3b2..bfb0f5b 100644
> --- a/src/include/librados.hpp
> +++ b/src/include/librados.hpp
> @@ -63,8 +63,10 @@ public:
>   int create(pool_t pool, const std::string& oid, bool exclusive);
>
>   int write(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
> +  int write(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
>   int write_full(pool_t pool, const std::string& oid, bufferlist& bl);
>   int read(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
> +  int read(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
>   int remove(pool_t pool, const std::string& oid);
>   int trunc(pool_t pool, const std::string& oid, size_t size);
>
> @@ -135,4 +137,3 @@ public:
>  }
>
>  #endif
> -
> diff --git a/src/librados.cc b/src/librados.cc
> index 4c8a464..91e6a27 100644
> --- a/src/librados.cc
> +++ b/src/librados.cc
> @@ -72,11 +72,12 @@ class RadosClient : public Dispatcher
>
>   Mutex lock;
>   Cond cond;
> +  static hash_map<tid_t, bufferptr*> buffer_map;
> +  static bufferptr* fetch_buffer_func(tid_t tid);
>
> -
>  public:
>   RadosClient() : messenger(NULL), lock("radosclient") {
> -    messenger = new SimpleMessenger();
> +    messenger = new SimpleMessenger(&RadosClient::fetch_buffer_func);
>   }
>
>   ~RadosClient();
> @@ -132,8 +133,10 @@ public:
>   // io
>   int create(PoolCtx& pool, const object_t& oid, bool exclusive);
>   int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
> +  int write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
>   int write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl);
>   int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
> +  int read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
>   int remove(PoolCtx& pool, const object_t& oid);
>   int stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime);
>   int trunc(PoolCtx& pool, const object_t& oid, size_t size);
> @@ -870,6 +873,15 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist
>   return len;
>  }
>
> +int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
> +{
> +  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
> +  bufferlist bl;
> +
> +  bl.push_back(bp);
> +  return write(pool, oid, off, bl, len);
> +}
> +
>  int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl)
>  {
>   utime_t ut = g_clock.now();
> @@ -1116,6 +1128,46 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist&
>   return bl.length();
>  }
>
> +int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
> +{
> +  SnapContext snapc;
> +
> +  Mutex mylock("RadosClient::read::mylock");
> +  Cond cond;
> +  bool done;
> +  int r;
> +  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
> +  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
> +  bufferlist bl;
> +
> +  bl.push_back(bp);
> +  lock.Lock();
> +  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
> +  tid_t tid = objecter->get_tid();
> +  buffer_map[tid] = &bp;
> +  objecter->read_with_tid(oid, layout,
> +             off, len, pool.snap_seq, &bl, 0,
> +             onack, tid);
> +  lock.Unlock();
> +
> +  mylock.Lock();
> +  while (!done)
> +    cond.Wait(mylock);
> +  mylock.Unlock();
> +  buffer_map.erase(tid);
> +  dout(10) << "Objecter returned from read r=" << r << dendl;
> +
> +  if (r < 0)
> +    return r;
> +
> +  if (bl.length() < len) {
> +    dout(10) << "Returned length " << bl.length()
> +            << " less than original length "<< len << dendl;
> +  }
> +
> +  return bl.length();
> +}
> +
>  int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime)
>  {
>   SnapContext snapc;
> @@ -1251,6 +1303,13 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map<std::string,
>   return r;
>  }
>
> +hash_map<tid_t, bufferptr*> RadosClient::buffer_map;
> +
> +bufferptr* RadosClient::fetch_buffer_func(tid_t tid)
> +{
> +  return buffer_map[tid];
> +}
> +
>  // ---------------------------------------------
>
>  namespace librados {
> @@ -1401,6 +1460,14 @@ int Rados::write(rados_pool_t pool, const string& o, off_t off, bufferlist& bl,
>   return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
>  }
>
> +int Rados::write(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
> +{
> +  if (!client)
> +    return -EINVAL;
> +  object_t oid(o);
> +  return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
> +}
> +
>  int Rados::write_full(rados_pool_t pool, const string& o, bufferlist& bl)
>  {
>   if (!client)
> @@ -1433,6 +1500,14 @@ int Rados::read(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, s
>   return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
>  }
>
> +int Rados::read(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
> +{
> +  if (!client)
> +    return -EINVAL;
> +  object_t oid(o);
> +  return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
> +}
> +
>  int Rados::getxattr(rados_pool_t pool, const string& o, const char *name, bufferlist& bl)
>  {
>   if (!client)
> diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
> index 4632267..75b4576 100644
> --- a/src/msg/SimpleMessenger.cc
> +++ b/src/msg/SimpleMessenger.cc
> @@ -51,7 +51,7 @@ static ostream& _prefix(SimpleMessenger *messenger) {
>  #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
>  #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
>
> -
> +SimpleMessenger::fetch_buffer_callback_t SimpleMessenger::fetch_buffer_callback = 0;
>
>  /********************************************
>  * Accepter
> @@ -1786,36 +1786,47 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
>   data_len = le32_to_cpu(header.data_len);
>   data_off = le32_to_cpu(header.data_off);
>   if (data_len) {
> -    int left = data_len;
> -    if (data_off & ~PAGE_MASK) {
> -      // head
> -      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
> -                    (unsigned)left);
> -      bufferptr bp = buffer::create(head);
> -      if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
> +    if (fetch_buffer_callback) {
> +      bufferptr *bpp = fetch_buffer_callback(header.tid);
> +      if (!bpp)
> +       goto allocate_buffer;
> +      if (tcp_read( sd, bpp->c_str(), data_len, messenger->timeout ) < 0)
>        goto out_dethrottle;
> -      data.push_back(bp);
> -      left -= head;
> -      dout(20) << "reader got data head " << head << dendl;
> -    }
> +      data.push_back(*bpp);
> +      dout(20) << "reader got data " << data_len << dendl;
> +    }else{
> +    allocate_buffer:
> +      int left = data_len;
> +      if (data_off & ~PAGE_MASK) {
> +       // head
> +       int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
> +                      (unsigned)left);
> +       bufferptr bp = buffer::create(head);
> +       if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       left -= head;
> +       dout(20) << "reader got data head " << head << dendl;
> +      }
>
> -    // middle
> -    int middle = left & PAGE_MASK;
> -    if (middle > 0) {
> -      bufferptr bp = buffer::create_page_aligned(middle);
> -      if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
> -       goto out_dethrottle;
> -      data.push_back(bp);
> -      left -= middle;
> -      dout(20) << "reader got data page-aligned middle " << middle << dendl;
> -    }
> +      // middle
> +      int middle = left & PAGE_MASK;
> +      if (middle > 0) {
> +       bufferptr bp = buffer::create_page_aligned(middle);
> +       if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       left -= middle;
> +       dout(20) << "reader got data page-aligned middle " << middle << dendl;
> +      }
>
> -    if (left) {
> -      bufferptr bp = buffer::create(left);
> -      if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
> -       goto out_dethrottle;
> -      data.push_back(bp);
> -      dout(20) << "reader got data tail " << left << dendl;
> +      if (left) {
> +       bufferptr bp = buffer::create(left);
> +       if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       dout(20) << "reader got data tail " << left << dendl;
> +      }
>     }
>   }
>
> diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
> index b4a0ef3..72a5a1b 100644
> --- a/src/msg/SimpleMessenger.h
> +++ b/src/msg/SimpleMessenger.h
> @@ -567,6 +567,9 @@ private:
>   SimpleMessenger *messenger; //hack to make dout macro work, will fix
>   int timeout;
>
> +  typedef bufferptr* (*fetch_buffer_callback_t) (tid_t);
> +  static fetch_buffer_callback_t fetch_buffer_callback;
> +
>  public:
>   SimpleMessenger() :
>     Messenger(entity_name_t()),
> @@ -580,6 +583,20 @@ public:
>     // for local dmsg delivery
>     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
>   }
> +
> +  SimpleMessenger(fetch_buffer_callback_t callback) :
> +    Messenger(entity_name_t()),
> +    accepter(this),
> +    lock("SimpleMessenger::lock"), started(false), did_bind(false),
> +    dispatch_throttler(g_conf.ms_dispatch_throttle_bytes), need_addr(true),
> +    destination_stopped(true), my_type(-1),
> +    global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
> +    reaper_thread(this), reaper_started(false), reaper_stop(false),
> +    dispatch_thread(this), messenger(this) {
> +    fetch_buffer_callback = callback;
> +    // for local dmsg delivery
> +    dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
> +  }
>   ~SimpleMessenger() {
>     delete dispatch_queue.local_pipe;
>   }
> diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
> index a34c0a9..fd43795 100644
> --- a/src/osdc/Objecter.h
> +++ b/src/osdc/Objecter.h
> @@ -530,6 +530,21 @@ private:
>     o->outbl = pbl;
>     return op_submit(o);
>   }
> +  tid_t read_with_tid(const object_t& oid, ceph_object_layout ol,
> +            uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
> +            Context *onfinish, tid_t tid) {
> +    vector<OSDOp> ops(1);
> +    ops[0].op.op = CEPH_OSD_OP_READ;
> +    ops[0].op.extent.offset = off;
> +    ops[0].op.extent.length = len;
> +    ops[0].op.extent.truncate_size = 0;
> +    ops[0].op.extent.truncate_seq = 0;
> +    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
> +    o->snapid = snap;
> +    o->outbl = pbl;
> +    o->tid = tid;
> +    return op_submit(o);
> +  }
>   tid_t read_trunc(const object_t& oid, ceph_object_layout ol,
>             uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
>             uint64_t trunc_size, __u32 trunc_seq,
> @@ -725,6 +740,8 @@ private:
>
>   void list_objects(ListContext *p, Context *onfinish);
>
> +  tid_t get_tid(void) { return ++last_tid; }
> +
>   // -------------------------
>   // pool ops
>  private:
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@xxxxxxxxxxxxxxx
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux