Re: bucket index sharding - IO throttle

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Yehuda,
Can you help to review the latest patch with throttle mechanism you suggested. Thanks!

Thanks,
Guang
On Aug 4, 2014, at 3:20 PM, Guang Yang <yguang11@xxxxxxxxxxx> wrote:

> Hi Yehuda,
> Here is the new pull request - https://github.com/ceph/ceph/pull/2187
> 
> Thanks,
> Guang
> On Jul 31, 2014, at 10:40 PM, Guang Yang <yguang11@xxxxxxxxxxx> wrote:
> 
>> Thanks Yehuda. I will do that (sorry I was occupied by some other stuff recently but I will try my best to provide a patch as soon as possible).
>> 
>> Thanks,
>> Guang
>> 
>> 在 2014年7月31日,上午1:00,Yehuda Sadeh <yehuda@xxxxxxxxxxx> 写道:
>> 
>>> Can you send this code through a github pull request (or at least as a
>>> patch)? It'lll be easier to review and comment.
>>> 
>>> Thanks,
>>> Yehuda
>>> 
>>> On Wed, Jul 30, 2014 at 7:58 AM, Guang Yang <yguang11@xxxxxxxxxxx> wrote:
>>>> +ceph-devel.
>>>> 
>>>> Thanks,
>>>> Guang
>>>> 
>>>> On Jul 29, 2014, at 10:20 PM, Guang Yang <yguang11@xxxxxxxxxxx> wrote:
>>>> 
>>>>> Hi Yehuda,
>>>>> Per you review comment in terms of IO throttling for bucket index operation, I prototyped the below code (details still need to polish), can you take a look if that is right way to go?
>>>>> 
>>>>> Another problem I came across is that ClsBucketIndexOpCtx::handle_compeltion was not called for the bucket index init op (below), is there anything I missed obviously here?
>>>>> 
>>>>> Thanks,
>>>>> Guang
>>>>> 
>>>>> 
>>>>> class ClsBucketIndexAioThrottler {
>>>>> protected:
>>>>> int completed;
>>>>> int ret_code;
>>>>> IoCtx& io_ctx;
>>>>> Mutex lock;
>>>>> struct LockCond {
>>>>> Mutex lock;
>>>>> Cond cond;
>>>>> LockCond() : lock("LockCond"), cond() {}
>>>>> } lock_cond;
>>>>> public:
>>>>> ClsBucketIndexAioThrottler(IoCtx& _io_ctx)
>>>>> : completed(0), ret_code(0), io_ctx(_io_ctx),
>>>>> lock("ClsBucketIndexAioThrottler"), lock_cond() {}
>>>>> 
>>>>> virtual ~ClsBucketIndexAioThrottler() {}
>>>>> virtual void do_next() = 0;
>>>>> virtual bool is_completed () = 0;
>>>>> 
>>>>> void complete(int ret) {
>>>>> {
>>>>>   Mutex::Locker l(lock);
>>>>>   if (ret < 0)
>>>>>     ret_code = ret;
>>>>>   ++completed;
>>>>> }
>>>>> 
>>>>> lock_cond.lock.Lock();
>>>>> lock_cond.cond.Signal();
>>>>> lock_cond.lock.Unlock();
>>>>> }
>>>>> 
>>>>> int get_ret_code () {
>>>>> Mutex::Locker l(lock);
>>>>> return ret_code;
>>>>> }
>>>>> 
>>>>> virtual int wait_completion() {
>>>>> lock_cond.lock.Lock();
>>>>> while (1) {
>>>>>   if (is_completed()) {
>>>>>     lock_cond.lock.Unlock();
>>>>>     return ret_code;
>>>>>   }
>>>>>   lock_cond.cond.Wait(lock_cond.lock);
>>>>>   lock_cond.lock.Lock();
>>>>> }
>>>>> }
>>>>> };
>>>>> 
>>>>> class ClsBucketIndexListAioThrottler : public ClsBucketIndexAioThrottler {
>>>>> protected:
>>>>> vector<string> bucket_objects;
>>>>> vector<string>::iterator iter_pos;
>>>>> public:
>>>>> ClsBucketIndexListAioThrottler(IoCtx& _io_ctx, const vector<string> _bucket_objs)
>>>>> : ClsBucketIndexAioThrottler(_io_ctx), bucket_objects(_bucket_objs),
>>>>> iter_pos(bucket_objects.begin()) {}
>>>>> 
>>>>> virtual bool is_completed() {
>>>>> Mutex::Locker l(lock);
>>>>> int sent = 0;
>>>>> vector<string>::iterator iter = bucket_objects.begin();
>>>>> for (; iter != iter_pos; ++iter) ++sent;
>>>>> 
>>>>> return (sent == completed &&
>>>>>     (iter_pos == bucket_objects.end() /*Success*/ || ret_code < 0 /*Failure*/));
>>>>> }
>>>>> };
>>>>> 
>>>>> template<typename T>
>>>>> class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
>>>>> private:
>>>>> T* data;
>>>>> // Return code of the operation
>>>>> int* ret_code;
>>>>> 
>>>>> // The Aio completion object associated with this Op, it should
>>>>> // be release from within the completion handler
>>>>> librados::AioCompletion* completion;
>>>>> ClsBucketIndexAioThrottler* throttler;
>>>>> public:
>>>>> ClsBucketIndexOpCtx(T* _data, int* _ret_code, librados::AioCompletion* _completion,
>>>>>       ClsBucketIndexAioThrottler* _throttler)
>>>>> : data(_data), ret_code(_ret_code), completion(_completion), throttler(_throttler) {}
>>>>> ~ClsBucketIndexOpCtx() {}
>>>>> 
>>>>> // The completion callback, fill the response data
>>>>> void handle_completion(int r, bufferlist& outbl) {
>>>>> if (r >= 0) {
>>>>>   if (data) {
>>>>>     try {
>>>>>       bufferlist::iterator iter = outbl.begin();
>>>>>       ::decode((*data), iter);
>>>>>     } catch (buffer::error& err) {
>>>>>       r = -EIO;
>>>>>     }
>>>>>   }
>>>>>   // Do the next request
>>>>> }
>>>>> throttler->do_next();
>>>>> throttler->complete(r);
>>>>> if (completion) {
>>>>>   completion->release();
>>>>> }
>>>>> }
>>>>> };
>>>>> 
>>>>> 
>>>>> class ClsBucketIndexInitAioThrottler : public ClsBucketIndexListAioThrottler {
>>>>> public:
>>>>> ClsBucketIndexInitAioThrottler(IoCtx& _io_ctx, const vector<string> _bucket_objs) :
>>>>> ClsBucketIndexListAioThrottler(_io_ctx, _bucket_objs) {}
>>>>> 
>>>>> virtual void do_next() {
>>>>> string oid;
>>>>> {
>>>>>   Mutex::Locker l(lock);
>>>>>   if (iter_pos == bucket_objects.end())
>>>>>     return;
>>>>>   oid = *(iter_pos++);
>>>>> }
>>>>> AioCompletion* c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
>>>>> // Dummy
>>>>> bufferlist in;
>>>>> librados::ObjectWriteOperation op;
>>>>> op.create(true);
>>>>> op.exec("rgw", "bucket_init_index", in, new ClsBucketIndexOpCtx<int>(NULL, NULL, c, this));
>>>>> io_ctx.aio_operate(oid, c, &op, NULL);
>>>>> }
>>>>> };
>>>>> 
>>>>> 
>>>>> int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
>>>>>     const vector<string>& bucket_objs, uint32_t max_aio)
>>>>> {
>>>>> vector<string>::const_iterator iter = bucket_objs.begin();
>>>>> bufferlist in;
>>>>> ClsBucketIndexAioThrottler* throttler = new ClsBucketIndexInitAioThrottler(io_ctx, bucket_objs);
>>>>> for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
>>>>>    throttler->do_next();
>>>>> }
>>>>> throttler->wait_completion();
>>>>> return 0;
>>>>> }
>>>>> 
>>>>> 
>>>> 
>>> --
>>> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
>>> the body of a message to majordomo@xxxxxxxxxxxxxxx
>>> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>> 
>> --
>> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
>> the body of a message to majordomo@xxxxxxxxxxxxxxx
>> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>> 
> 

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux