Re: bucket index sharding - IO throttle

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Can you send this code through a github pull request (or at least as a
patch)? It'lll be easier to review and comment.

Thanks,
Yehuda

On Wed, Jul 30, 2014 at 7:58 AM, Guang Yang <yguang11@xxxxxxxxxxx> wrote:
> +ceph-devel.
>
> Thanks,
> Guang
>
> On Jul 29, 2014, at 10:20 PM, Guang Yang <yguang11@xxxxxxxxxxx> wrote:
>
>> Hi Yehuda,
>> Per you review comment in terms of IO throttling for bucket index operation, I prototyped the below code (details still need to polish), can you take a look if that is right way to go?
>>
>> Another problem I came across is that ClsBucketIndexOpCtx::handle_compeltion was not called for the bucket index init op (below), is there anything I missed obviously here?
>>
>> Thanks,
>> Guang
>>
>>
>> class ClsBucketIndexAioThrottler {
>> protected:
>>  int completed;
>>  int ret_code;
>>  IoCtx& io_ctx;
>>  Mutex lock;
>>  struct LockCond {
>>    Mutex lock;
>>    Cond cond;
>>    LockCond() : lock("LockCond"), cond() {}
>>  } lock_cond;
>> public:
>>  ClsBucketIndexAioThrottler(IoCtx& _io_ctx)
>>    : completed(0), ret_code(0), io_ctx(_io_ctx),
>>    lock("ClsBucketIndexAioThrottler"), lock_cond() {}
>>
>>  virtual ~ClsBucketIndexAioThrottler() {}
>>  virtual void do_next() = 0;
>>  virtual bool is_completed () = 0;
>>
>>  void complete(int ret) {
>>    {
>>      Mutex::Locker l(lock);
>>      if (ret < 0)
>>        ret_code = ret;
>>      ++completed;
>>    }
>>
>>    lock_cond.lock.Lock();
>>    lock_cond.cond.Signal();
>>    lock_cond.lock.Unlock();
>>  }
>>
>>  int get_ret_code () {
>>    Mutex::Locker l(lock);
>>    return ret_code;
>>  }
>>
>>  virtual int wait_completion() {
>>    lock_cond.lock.Lock();
>>    while (1) {
>>      if (is_completed()) {
>>        lock_cond.lock.Unlock();
>>        return ret_code;
>>      }
>>      lock_cond.cond.Wait(lock_cond.lock);
>>      lock_cond.lock.Lock();
>>    }
>>  }
>> };
>>
>> class ClsBucketIndexListAioThrottler : public ClsBucketIndexAioThrottler {
>> protected:
>>  vector<string> bucket_objects;
>>  vector<string>::iterator iter_pos;
>> public:
>>  ClsBucketIndexListAioThrottler(IoCtx& _io_ctx, const vector<string> _bucket_objs)
>>    : ClsBucketIndexAioThrottler(_io_ctx), bucket_objects(_bucket_objs),
>>    iter_pos(bucket_objects.begin()) {}
>>
>>  virtual bool is_completed() {
>>    Mutex::Locker l(lock);
>>    int sent = 0;
>>    vector<string>::iterator iter = bucket_objects.begin();
>>    for (; iter != iter_pos; ++iter) ++sent;
>>
>>    return (sent == completed &&
>>        (iter_pos == bucket_objects.end() /*Success*/ || ret_code < 0 /*Failure*/));
>>  }
>> };
>>
>> template<typename T>
>> class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
>> private:
>>  T* data;
>>  // Return code of the operation
>>  int* ret_code;
>>
>>  // The Aio completion object associated with this Op, it should
>>  // be release from within the completion handler
>>  librados::AioCompletion* completion;
>>  ClsBucketIndexAioThrottler* throttler;
>> public:
>>  ClsBucketIndexOpCtx(T* _data, int* _ret_code, librados::AioCompletion* _completion,
>>          ClsBucketIndexAioThrottler* _throttler)
>>    : data(_data), ret_code(_ret_code), completion(_completion), throttler(_throttler) {}
>>  ~ClsBucketIndexOpCtx() {}
>>
>>  // The completion callback, fill the response data
>>  void handle_completion(int r, bufferlist& outbl) {
>>    if (r >= 0) {
>>      if (data) {
>>        try {
>>          bufferlist::iterator iter = outbl.begin();
>>          ::decode((*data), iter);
>>        } catch (buffer::error& err) {
>>          r = -EIO;
>>        }
>>      }
>>      // Do the next request
>>    }
>>    throttler->do_next();
>>    throttler->complete(r);
>>    if (completion) {
>>      completion->release();
>>    }
>>  }
>> };
>>
>>
>> class ClsBucketIndexInitAioThrottler : public ClsBucketIndexListAioThrottler {
>> public:
>>  ClsBucketIndexInitAioThrottler(IoCtx& _io_ctx, const vector<string> _bucket_objs) :
>>    ClsBucketIndexListAioThrottler(_io_ctx, _bucket_objs) {}
>>
>>  virtual void do_next() {
>>    string oid;
>>    {
>>      Mutex::Locker l(lock);
>>      if (iter_pos == bucket_objects.end())
>>        return;
>>      oid = *(iter_pos++);
>>    }
>>    AioCompletion* c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
>>    // Dummy
>>    bufferlist in;
>>    librados::ObjectWriteOperation op;
>>    op.create(true);
>>    op.exec("rgw", "bucket_init_index", in, new ClsBucketIndexOpCtx<int>(NULL, NULL, c, this));
>>    io_ctx.aio_operate(oid, c, &op, NULL);
>>  }
>> };
>>
>>
>> int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
>>        const vector<string>& bucket_objs, uint32_t max_aio)
>> {
>>   vector<string>::const_iterator iter = bucket_objs.begin();
>>   bufferlist in;
>>   ClsBucketIndexAioThrottler* throttler = new ClsBucketIndexInitAioThrottler(io_ctx, bucket_objs);
>>   for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
>>       throttler->do_next();
>>   }
>>   throttler->wait_completion();
>>   return 0;
>> }
>>
>>
>
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux