Re: read/write on RADOS using external buffer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Takuya:
At first glance this looks okay, but we're going to want to review it
carefully given the changes to SimpleMessenger, and check its impact
on performance, etc to decide if this is something we actually want to
merge, or if we'd like to implement this as a derivative messenger or
something. :)
Could you let us know how you've been testing this? :)
-Greg

On Tue, Oct 19, 2010 at 3:27 PM, Takuya ASADA <syuu@xxxxxxxxxxxx> wrote:
> Hi,
>
> I just implemented the patch to support external buffer on Rados::read() and Rados::write().
>
> diff --git a/src/include/librados.hpp b/src/include/librados.hpp
> index 06fa3b2..bfb0f5b 100644
> --- a/src/include/librados.hpp
> +++ b/src/include/librados.hpp
> @@ -63,8 +63,10 @@ public:
>   int create(pool_t pool, const std::string& oid, bool exclusive);
>
>   int write(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
> +  int write(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
>   int write_full(pool_t pool, const std::string& oid, bufferlist& bl);
>   int read(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
> +  int read(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
>   int remove(pool_t pool, const std::string& oid);
>   int trunc(pool_t pool, const std::string& oid, size_t size);
>
> @@ -135,4 +137,3 @@ public:
>  }
>
>  #endif
> -
> diff --git a/src/librados.cc b/src/librados.cc
> index 4c8a464..91e6a27 100644
> --- a/src/librados.cc
> +++ b/src/librados.cc
> @@ -72,11 +72,12 @@ class RadosClient : public Dispatcher
>
>   Mutex lock;
>   Cond cond;
> +  static hash_map<tid_t, bufferptr*> buffer_map;
> +  static bufferptr* fetch_buffer_func(tid_t tid);
>
> -
>  public:
>   RadosClient() : messenger(NULL), lock("radosclient") {
> -    messenger = new SimpleMessenger();
> +    messenger = new SimpleMessenger(&RadosClient::fetch_buffer_func);
>   }
>
>   ~RadosClient();
> @@ -132,8 +133,10 @@ public:
>   // io
>   int create(PoolCtx& pool, const object_t& oid, bool exclusive);
>   int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
> +  int write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
>   int write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl);
>   int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
> +  int read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
>   int remove(PoolCtx& pool, const object_t& oid);
>   int stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime);
>   int trunc(PoolCtx& pool, const object_t& oid, size_t size);
> @@ -870,6 +873,15 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist
>   return len;
>  }
>
> +int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
> +{
> +  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
> +  bufferlist bl;
> +
> +  bl.push_back(bp);
> +  return write(pool, oid, off, bl, len);
> +}
> +
>  int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl)
>  {
>   utime_t ut = g_clock.now();
> @@ -1116,6 +1128,46 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist&
>   return bl.length();
>  }
>
> +int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
> +{
> +  SnapContext snapc;
> +
> +  Mutex mylock("RadosClient::read::mylock");
> +  Cond cond;
> +  bool done;
> +  int r;
> +  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
> +  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
> +  bufferlist bl;
> +
> +  bl.push_back(bp);
> +  lock.Lock();
> +  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
> +  tid_t tid = objecter->get_tid();
> +  buffer_map[tid] = &bp;
> +  objecter->read_with_tid(oid, layout,
> +             off, len, pool.snap_seq, &bl, 0,
> +             onack, tid);
> +  lock.Unlock();
> +
> +  mylock.Lock();
> +  while (!done)
> +    cond.Wait(mylock);
> +  mylock.Unlock();
> +  buffer_map.erase(tid);
> +  dout(10) << "Objecter returned from read r=" << r << dendl;
> +
> +  if (r < 0)
> +    return r;
> +
> +  if (bl.length() < len) {
> +    dout(10) << "Returned length " << bl.length()
> +            << " less than original length "<< len << dendl;
> +  }
> +
> +  return bl.length();
> +}
> +
>  int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime)
>  {
>   SnapContext snapc;
> @@ -1251,6 +1303,13 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map<std::string,
>   return r;
>  }
>
> +hash_map<tid_t, bufferptr*> RadosClient::buffer_map;
> +
> +bufferptr* RadosClient::fetch_buffer_func(tid_t tid)
> +{
> +  return buffer_map[tid];
> +}
> +
>  // ---------------------------------------------
>
>  namespace librados {
> @@ -1401,6 +1460,14 @@ int Rados::write(rados_pool_t pool, const string& o, off_t off, bufferlist& bl,
>   return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
>  }
>
> +int Rados::write(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
> +{
> +  if (!client)
> +    return -EINVAL;
> +  object_t oid(o);
> +  return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
> +}
> +
>  int Rados::write_full(rados_pool_t pool, const string& o, bufferlist& bl)
>  {
>   if (!client)
> @@ -1433,6 +1500,14 @@ int Rados::read(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, s
>   return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
>  }
>
> +int Rados::read(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
> +{
> +  if (!client)
> +    return -EINVAL;
> +  object_t oid(o);
> +  return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
> +}
> +
>  int Rados::getxattr(rados_pool_t pool, const string& o, const char *name, bufferlist& bl)
>  {
>   if (!client)
> diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
> index 4632267..75b4576 100644
> --- a/src/msg/SimpleMessenger.cc
> +++ b/src/msg/SimpleMessenger.cc
> @@ -51,7 +51,7 @@ static ostream& _prefix(SimpleMessenger *messenger) {
>  #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
>  #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
>
> -
> +SimpleMessenger::fetch_buffer_callback_t SimpleMessenger::fetch_buffer_callback = 0;
>
>  /********************************************
>  * Accepter
> @@ -1786,36 +1786,47 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
>   data_len = le32_to_cpu(header.data_len);
>   data_off = le32_to_cpu(header.data_off);
>   if (data_len) {
> -    int left = data_len;
> -    if (data_off & ~PAGE_MASK) {
> -      // head
> -      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
> -                    (unsigned)left);
> -      bufferptr bp = buffer::create(head);
> -      if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
> +    if (fetch_buffer_callback) {
> +      bufferptr *bpp = fetch_buffer_callback(header.tid);
> +      if (!bpp)
> +       goto allocate_buffer;
> +      if (tcp_read( sd, bpp->c_str(), data_len, messenger->timeout ) < 0)
>        goto out_dethrottle;
> -      data.push_back(bp);
> -      left -= head;
> -      dout(20) << "reader got data head " << head << dendl;
> -    }
> +      data.push_back(*bpp);
> +      dout(20) << "reader got data " << data_len << dendl;
> +    }else{
> +    allocate_buffer:
> +      int left = data_len;
> +      if (data_off & ~PAGE_MASK) {
> +       // head
> +       int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
> +                      (unsigned)left);
> +       bufferptr bp = buffer::create(head);
> +       if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       left -= head;
> +       dout(20) << "reader got data head " << head << dendl;
> +      }
>
> -    // middle
> -    int middle = left & PAGE_MASK;
> -    if (middle > 0) {
> -      bufferptr bp = buffer::create_page_aligned(middle);
> -      if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
> -       goto out_dethrottle;
> -      data.push_back(bp);
> -      left -= middle;
> -      dout(20) << "reader got data page-aligned middle " << middle << dendl;
> -    }
> +      // middle
> +      int middle = left & PAGE_MASK;
> +      if (middle > 0) {
> +       bufferptr bp = buffer::create_page_aligned(middle);
> +       if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       left -= middle;
> +       dout(20) << "reader got data page-aligned middle " << middle << dendl;
> +      }
>
> -    if (left) {
> -      bufferptr bp = buffer::create(left);
> -      if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
> -       goto out_dethrottle;
> -      data.push_back(bp);
> -      dout(20) << "reader got data tail " << left << dendl;
> +      if (left) {
> +       bufferptr bp = buffer::create(left);
> +       if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       dout(20) << "reader got data tail " << left << dendl;
> +      }
>     }
>   }
>
> diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
> index b4a0ef3..72a5a1b 100644
> --- a/src/msg/SimpleMessenger.h
> +++ b/src/msg/SimpleMessenger.h
> @@ -567,6 +567,9 @@ private:
>   SimpleMessenger *messenger; //hack to make dout macro work, will fix
>   int timeout;
>
> +  typedef bufferptr* (*fetch_buffer_callback_t) (tid_t);
> +  static fetch_buffer_callback_t fetch_buffer_callback;
> +
>  public:
>   SimpleMessenger() :
>     Messenger(entity_name_t()),
> @@ -580,6 +583,20 @@ public:
>     // for local dmsg delivery
>     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
>   }
> +
> +  SimpleMessenger(fetch_buffer_callback_t callback) :
> +    Messenger(entity_name_t()),
> +    accepter(this),
> +    lock("SimpleMessenger::lock"), started(false), did_bind(false),
> +    dispatch_throttler(g_conf.ms_dispatch_throttle_bytes), need_addr(true),
> +    destination_stopped(true), my_type(-1),
> +    global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
> +    reaper_thread(this), reaper_started(false), reaper_stop(false),
> +    dispatch_thread(this), messenger(this) {
> +    fetch_buffer_callback = callback;
> +    // for local dmsg delivery
> +    dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
> +  }
>   ~SimpleMessenger() {
>     delete dispatch_queue.local_pipe;
>   }
> diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
> index a34c0a9..fd43795 100644
> --- a/src/osdc/Objecter.h
> +++ b/src/osdc/Objecter.h
> @@ -530,6 +530,21 @@ private:
>     o->outbl = pbl;
>     return op_submit(o);
>   }
> +  tid_t read_with_tid(const object_t& oid, ceph_object_layout ol,
> +            uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
> +            Context *onfinish, tid_t tid) {
> +    vector<OSDOp> ops(1);
> +    ops[0].op.op = CEPH_OSD_OP_READ;
> +    ops[0].op.extent.offset = off;
> +    ops[0].op.extent.length = len;
> +    ops[0].op.extent.truncate_size = 0;
> +    ops[0].op.extent.truncate_seq = 0;
> +    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
> +    o->snapid = snap;
> +    o->outbl = pbl;
> +    o->tid = tid;
> +    return op_submit(o);
> +  }
>   tid_t read_trunc(const object_t& oid, ceph_object_layout ol,
>             uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
>             uint64_t trunc_size, __u32 trunc_seq,
> @@ -725,6 +740,8 @@ private:
>
>   void list_objects(ListContext *p, Context *onfinish);
>
> +  tid_t get_tid(void) { return ++last_tid; }
> +
>   // -------------------------
>   // pool ops
>  private:
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@xxxxxxxxxxxxxxx
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux