Re: bucket index sharding - IO throttle

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Thanks Yehuda. I will do that (sorry I was occupied by some other stuff recently but I will try my best to provide a patch as soon as possible).

Thanks,
Guang

在 2014年7月31日,上午1:00,Yehuda Sadeh <yehuda@xxxxxxxxxxx> 写道:

> Can you send this code through a github pull request (or at least as a
> patch)? It'lll be easier to review and comment.
> 
> Thanks,
> Yehuda
> 
> On Wed, Jul 30, 2014 at 7:58 AM, Guang Yang <yguang11@xxxxxxxxxxx> wrote:
>> +ceph-devel.
>> 
>> Thanks,
>> Guang
>> 
>> On Jul 29, 2014, at 10:20 PM, Guang Yang <yguang11@xxxxxxxxxxx> wrote:
>> 
>>> Hi Yehuda,
>>> Per you review comment in terms of IO throttling for bucket index operation, I prototyped the below code (details still need to polish), can you take a look if that is right way to go?
>>> 
>>> Another problem I came across is that ClsBucketIndexOpCtx::handle_compeltion was not called for the bucket index init op (below), is there anything I missed obviously here?
>>> 
>>> Thanks,
>>> Guang
>>> 
>>> 
>>> class ClsBucketIndexAioThrottler {
>>> protected:
>>> int completed;
>>> int ret_code;
>>> IoCtx& io_ctx;
>>> Mutex lock;
>>> struct LockCond {
>>>   Mutex lock;
>>>   Cond cond;
>>>   LockCond() : lock("LockCond"), cond() {}
>>> } lock_cond;
>>> public:
>>> ClsBucketIndexAioThrottler(IoCtx& _io_ctx)
>>>   : completed(0), ret_code(0), io_ctx(_io_ctx),
>>>   lock("ClsBucketIndexAioThrottler"), lock_cond() {}
>>> 
>>> virtual ~ClsBucketIndexAioThrottler() {}
>>> virtual void do_next() = 0;
>>> virtual bool is_completed () = 0;
>>> 
>>> void complete(int ret) {
>>>   {
>>>     Mutex::Locker l(lock);
>>>     if (ret < 0)
>>>       ret_code = ret;
>>>     ++completed;
>>>   }
>>> 
>>>   lock_cond.lock.Lock();
>>>   lock_cond.cond.Signal();
>>>   lock_cond.lock.Unlock();
>>> }
>>> 
>>> int get_ret_code () {
>>>   Mutex::Locker l(lock);
>>>   return ret_code;
>>> }
>>> 
>>> virtual int wait_completion() {
>>>   lock_cond.lock.Lock();
>>>   while (1) {
>>>     if (is_completed()) {
>>>       lock_cond.lock.Unlock();
>>>       return ret_code;
>>>     }
>>>     lock_cond.cond.Wait(lock_cond.lock);
>>>     lock_cond.lock.Lock();
>>>   }
>>> }
>>> };
>>> 
>>> class ClsBucketIndexListAioThrottler : public ClsBucketIndexAioThrottler {
>>> protected:
>>> vector<string> bucket_objects;
>>> vector<string>::iterator iter_pos;
>>> public:
>>> ClsBucketIndexListAioThrottler(IoCtx& _io_ctx, const vector<string> _bucket_objs)
>>>   : ClsBucketIndexAioThrottler(_io_ctx), bucket_objects(_bucket_objs),
>>>   iter_pos(bucket_objects.begin()) {}
>>> 
>>> virtual bool is_completed() {
>>>   Mutex::Locker l(lock);
>>>   int sent = 0;
>>>   vector<string>::iterator iter = bucket_objects.begin();
>>>   for (; iter != iter_pos; ++iter) ++sent;
>>> 
>>>   return (sent == completed &&
>>>       (iter_pos == bucket_objects.end() /*Success*/ || ret_code < 0 /*Failure*/));
>>> }
>>> };
>>> 
>>> template<typename T>
>>> class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
>>> private:
>>> T* data;
>>> // Return code of the operation
>>> int* ret_code;
>>> 
>>> // The Aio completion object associated with this Op, it should
>>> // be release from within the completion handler
>>> librados::AioCompletion* completion;
>>> ClsBucketIndexAioThrottler* throttler;
>>> public:
>>> ClsBucketIndexOpCtx(T* _data, int* _ret_code, librados::AioCompletion* _completion,
>>>         ClsBucketIndexAioThrottler* _throttler)
>>>   : data(_data), ret_code(_ret_code), completion(_completion), throttler(_throttler) {}
>>> ~ClsBucketIndexOpCtx() {}
>>> 
>>> // The completion callback, fill the response data
>>> void handle_completion(int r, bufferlist& outbl) {
>>>   if (r >= 0) {
>>>     if (data) {
>>>       try {
>>>         bufferlist::iterator iter = outbl.begin();
>>>         ::decode((*data), iter);
>>>       } catch (buffer::error& err) {
>>>         r = -EIO;
>>>       }
>>>     }
>>>     // Do the next request
>>>   }
>>>   throttler->do_next();
>>>   throttler->complete(r);
>>>   if (completion) {
>>>     completion->release();
>>>   }
>>> }
>>> };
>>> 
>>> 
>>> class ClsBucketIndexInitAioThrottler : public ClsBucketIndexListAioThrottler {
>>> public:
>>> ClsBucketIndexInitAioThrottler(IoCtx& _io_ctx, const vector<string> _bucket_objs) :
>>>   ClsBucketIndexListAioThrottler(_io_ctx, _bucket_objs) {}
>>> 
>>> virtual void do_next() {
>>>   string oid;
>>>   {
>>>     Mutex::Locker l(lock);
>>>     if (iter_pos == bucket_objects.end())
>>>       return;
>>>     oid = *(iter_pos++);
>>>   }
>>>   AioCompletion* c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
>>>   // Dummy
>>>   bufferlist in;
>>>   librados::ObjectWriteOperation op;
>>>   op.create(true);
>>>   op.exec("rgw", "bucket_init_index", in, new ClsBucketIndexOpCtx<int>(NULL, NULL, c, this));
>>>   io_ctx.aio_operate(oid, c, &op, NULL);
>>> }
>>> };
>>> 
>>> 
>>> int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
>>>       const vector<string>& bucket_objs, uint32_t max_aio)
>>> {
>>>  vector<string>::const_iterator iter = bucket_objs.begin();
>>>  bufferlist in;
>>>  ClsBucketIndexAioThrottler* throttler = new ClsBucketIndexInitAioThrottler(io_ctx, bucket_objs);
>>>  for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
>>>      throttler->do_next();
>>>  }
>>>  throttler->wait_completion();
>>>  return 0;
>>> }
>>> 
>>> 
>> 
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@xxxxxxxxxxxxxxx
> More majordomo info at  http://vger.kernel.org/majordomo-info.html

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux