Hi Takuya, Thanks for looking at this! Unfortunately I think you have taken the wrong approach here. There isn't any need for thread-local data in the Objecter. It's not threads that the Objecter cares about, it's clients. Also, I don't think a change like this needs to involve SimpleMessenger and all that. Take a look at rados_read in librados.cc. This function implements the C API, which can already handle using external buffers. It may be that all you need to do is add functions in the C++ header that call rados_read and rados_write! I actually think that rados_read could be made slightly more efficient. We could probably add a bufferlist constructor that starts with an already allocated buffer, to avoid the call to bl.copy. I haven't thought too hard about how to do this but in principle it seems reasonable. regards, Colin McCabe On Tue, Oct 19, 2010 at 3:27 PM, Takuya ASADA <syuu@xxxxxxxxxxxx> wrote: > Hi, > > I just implemented the patch to support external buffer on Rados::read() and Rados::write(). > > diff --git a/src/include/librados.hpp b/src/include/librados.hpp > index 06fa3b2..bfb0f5b 100644 > --- a/src/include/librados.hpp > +++ b/src/include/librados.hpp > @@ -63,8 +63,10 @@ public: > int create(pool_t pool, const std::string& oid, bool exclusive); > > int write(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len); > + int write(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len); > int write_full(pool_t pool, const std::string& oid, bufferlist& bl); > int read(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len); > + int read(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len); > int remove(pool_t pool, const std::string& oid); > int trunc(pool_t pool, const std::string& oid, size_t size); > > @@ -135,4 +137,3 @@ public: > } > > #endif > - > diff --git a/src/librados.cc b/src/librados.cc > index 4c8a464..91e6a27 100644 > --- a/src/librados.cc > +++ b/src/librados.cc > @@ -72,11 +72,12 @@ class RadosClient : public Dispatcher > > Mutex lock; > Cond cond; > + static hash_map<tid_t, bufferptr*> buffer_map; > + static bufferptr* fetch_buffer_func(tid_t tid); > > - > public: > RadosClient() : messenger(NULL), lock("radosclient") { > - messenger = new SimpleMessenger(); > + messenger = new SimpleMessenger(&RadosClient::fetch_buffer_func); > } > > ~RadosClient(); > @@ -132,8 +133,10 @@ public: > // io > int create(PoolCtx& pool, const object_t& oid, bool exclusive); > int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len); > + int write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len); > int write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl); > int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len); > + int read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len); > int remove(PoolCtx& pool, const object_t& oid); > int stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime); > int trunc(PoolCtx& pool, const object_t& oid, size_t size); > @@ -870,6 +873,15 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist > return len; > } > > +int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len) > +{ > + bufferptr bp = buffer::create_static(len, static_cast<char *>(buf)); > + bufferlist bl; > + > + bl.push_back(bp); > + return write(pool, oid, off, bl, len); > +} > + > int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl) > { > utime_t ut = g_clock.now(); > @@ -1116,6 +1128,46 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& > return bl.length(); > } > > +int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len) > +{ > + SnapContext snapc; > + > + Mutex mylock("RadosClient::read::mylock"); > + Cond cond; > + bool done; > + int r; > + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); > + bufferptr bp = buffer::create_static(len, static_cast<char *>(buf)); > + bufferlist bl; > + > + bl.push_back(bp); > + lock.Lock(); > + ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid); > + tid_t tid = objecter->get_tid(); > + buffer_map[tid] = &bp; > + objecter->read_with_tid(oid, layout, > + off, len, pool.snap_seq, &bl, 0, > + onack, tid); > + lock.Unlock(); > + > + mylock.Lock(); > + while (!done) > + cond.Wait(mylock); > + mylock.Unlock(); > + buffer_map.erase(tid); > + dout(10) << "Objecter returned from read r=" << r << dendl; > + > + if (r < 0) > + return r; > + > + if (bl.length() < len) { > + dout(10) << "Returned length " << bl.length() > + << " less than original length "<< len << dendl; > + } > + > + return bl.length(); > +} > + > int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime) > { > SnapContext snapc; > @@ -1251,6 +1303,13 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map<std::string, > return r; > } > > +hash_map<tid_t, bufferptr*> RadosClient::buffer_map; > + > +bufferptr* RadosClient::fetch_buffer_func(tid_t tid) > +{ > + return buffer_map[tid]; > +} > + > // --------------------------------------------- > > namespace librados { > @@ -1401,6 +1460,14 @@ int Rados::write(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, > return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len); > } > > +int Rados::write(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len) > +{ > + if (!client) > + return -EINVAL; > + object_t oid(o); > + return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, buf, len); > +} > + > int Rados::write_full(rados_pool_t pool, const string& o, bufferlist& bl) > { > if (!client) > @@ -1433,6 +1500,14 @@ int Rados::read(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, s > return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len); > } > > +int Rados::read(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len) > +{ > + if (!client) > + return -EINVAL; > + object_t oid(o); > + return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, buf, len); > +} > + > int Rados::getxattr(rados_pool_t pool, const string& o, const char *name, bufferlist& bl) > { > if (!client) > diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc > index 4632267..75b4576 100644 > --- a/src/msg/SimpleMessenger.cc > +++ b/src/msg/SimpleMessenger.cc > @@ -51,7 +51,7 @@ static ostream& _prefix(SimpleMessenger *messenger) { > #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl; > #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl; > > - > +SimpleMessenger::fetch_buffer_callback_t SimpleMessenger::fetch_buffer_callback = 0; > > /******************************************** > * Accepter > @@ -1786,36 +1786,47 @@ int SimpleMessenger::Pipe::read_message(Message **pm) > data_len = le32_to_cpu(header.data_len); > data_off = le32_to_cpu(header.data_off); > if (data_len) { > - int left = data_len; > - if (data_off & ~PAGE_MASK) { > - // head > - int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK), > - (unsigned)left); > - bufferptr bp = buffer::create(head); > - if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0) > + if (fetch_buffer_callback) { > + bufferptr *bpp = fetch_buffer_callback(header.tid); > + if (!bpp) > + goto allocate_buffer; > + if (tcp_read( sd, bpp->c_str(), data_len, messenger->timeout ) < 0) > goto out_dethrottle; > - data.push_back(bp); > - left -= head; > - dout(20) << "reader got data head " << head << dendl; > - } > + data.push_back(*bpp); > + dout(20) << "reader got data " << data_len << dendl; > + }else{ > + allocate_buffer: > + int left = data_len; > + if (data_off & ~PAGE_MASK) { > + // head > + int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK), > + (unsigned)left); > + bufferptr bp = buffer::create(head); > + if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0) > + goto out_dethrottle; > + data.push_back(bp); > + left -= head; > + dout(20) << "reader got data head " << head << dendl; > + } > > - // middle > - int middle = left & PAGE_MASK; > - if (middle > 0) { > - bufferptr bp = buffer::create_page_aligned(middle); > - if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0) > - goto out_dethrottle; > - data.push_back(bp); > - left -= middle; > - dout(20) << "reader got data page-aligned middle " << middle << dendl; > - } > + // middle > + int middle = left & PAGE_MASK; > + if (middle > 0) { > + bufferptr bp = buffer::create_page_aligned(middle); > + if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0) > + goto out_dethrottle; > + data.push_back(bp); > + left -= middle; > + dout(20) << "reader got data page-aligned middle " << middle << dendl; > + } > > - if (left) { > - bufferptr bp = buffer::create(left); > - if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0) > - goto out_dethrottle; > - data.push_back(bp); > - dout(20) << "reader got data tail " << left << dendl; > + if (left) { > + bufferptr bp = buffer::create(left); > + if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0) > + goto out_dethrottle; > + data.push_back(bp); > + dout(20) << "reader got data tail " << left << dendl; > + } > } > } > > diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h > index b4a0ef3..72a5a1b 100644 > --- a/src/msg/SimpleMessenger.h > +++ b/src/msg/SimpleMessenger.h > @@ -567,6 +567,9 @@ private: > SimpleMessenger *messenger; //hack to make dout macro work, will fix > int timeout; > > + typedef bufferptr* (*fetch_buffer_callback_t) (tid_t); > + static fetch_buffer_callback_t fetch_buffer_callback; > + > public: > SimpleMessenger() : > Messenger(entity_name_t()), > @@ -580,6 +583,20 @@ public: > // for local dmsg delivery > dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN); > } > + > + SimpleMessenger(fetch_buffer_callback_t callback) : > + Messenger(entity_name_t()), > + accepter(this), > + lock("SimpleMessenger::lock"), started(false), did_bind(false), > + dispatch_throttler(g_conf.ms_dispatch_throttle_bytes), need_addr(true), > + destination_stopped(true), my_type(-1), > + global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0), > + reaper_thread(this), reaper_started(false), reaper_stop(false), > + dispatch_thread(this), messenger(this) { > + fetch_buffer_callback = callback; > + // for local dmsg delivery > + dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN); > + } > ~SimpleMessenger() { > delete dispatch_queue.local_pipe; > } > diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h > index a34c0a9..fd43795 100644 > --- a/src/osdc/Objecter.h > +++ b/src/osdc/Objecter.h > @@ -530,6 +530,21 @@ private: > o->outbl = pbl; > return op_submit(o); > } > + tid_t read_with_tid(const object_t& oid, ceph_object_layout ol, > + uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, > + Context *onfinish, tid_t tid) { > + vector<OSDOp> ops(1); > + ops[0].op.op = CEPH_OSD_OP_READ; > + ops[0].op.extent.offset = off; > + ops[0].op.extent.length = len; > + ops[0].op.extent.truncate_size = 0; > + ops[0].op.extent.truncate_seq = 0; > + Op *o = new Op(oid, ol, ops, flags, onfinish, 0); > + o->snapid = snap; > + o->outbl = pbl; > + o->tid = tid; > + return op_submit(o); > + } > tid_t read_trunc(const object_t& oid, ceph_object_layout ol, > uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, > uint64_t trunc_size, __u32 trunc_seq, > @@ -725,6 +740,8 @@ private: > > void list_objects(ListContext *p, Context *onfinish); > > + tid_t get_tid(void) { return ++last_tid; } > + > // ------------------------- > // pool ops > private: > -- > To unsubscribe from this list: send the line "unsubscribe ceph-devel" in > the body of a message to majordomo@xxxxxxxxxxxxxxx > More majordomo info at http://vger.kernel.org/majordomo-info.html > -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html