Hi Yehuda, Here is the new pull request - https://github.com/ceph/ceph/pull/2187 Thanks, Guang On Jul 31, 2014, at 10:40 PM, Guang Yang <yguang11@xxxxxxxxxxx> wrote: > Thanks Yehuda. I will do that (sorry I was occupied by some other stuff recently but I will try my best to provide a patch as soon as possible). > > Thanks, > Guang > > 在 2014年7月31日,上午1:00,Yehuda Sadeh <yehuda@xxxxxxxxxxx> 写道: > >> Can you send this code through a github pull request (or at least as a >> patch)? It'lll be easier to review and comment. >> >> Thanks, >> Yehuda >> >> On Wed, Jul 30, 2014 at 7:58 AM, Guang Yang <yguang11@xxxxxxxxxxx> wrote: >>> +ceph-devel. >>> >>> Thanks, >>> Guang >>> >>> On Jul 29, 2014, at 10:20 PM, Guang Yang <yguang11@xxxxxxxxxxx> wrote: >>> >>>> Hi Yehuda, >>>> Per you review comment in terms of IO throttling for bucket index operation, I prototyped the below code (details still need to polish), can you take a look if that is right way to go? >>>> >>>> Another problem I came across is that ClsBucketIndexOpCtx::handle_compeltion was not called for the bucket index init op (below), is there anything I missed obviously here? >>>> >>>> Thanks, >>>> Guang >>>> >>>> >>>> class ClsBucketIndexAioThrottler { >>>> protected: >>>> int completed; >>>> int ret_code; >>>> IoCtx& io_ctx; >>>> Mutex lock; >>>> struct LockCond { >>>> Mutex lock; >>>> Cond cond; >>>> LockCond() : lock("LockCond"), cond() {} >>>> } lock_cond; >>>> public: >>>> ClsBucketIndexAioThrottler(IoCtx& _io_ctx) >>>> : completed(0), ret_code(0), io_ctx(_io_ctx), >>>> lock("ClsBucketIndexAioThrottler"), lock_cond() {} >>>> >>>> virtual ~ClsBucketIndexAioThrottler() {} >>>> virtual void do_next() = 0; >>>> virtual bool is_completed () = 0; >>>> >>>> void complete(int ret) { >>>> { >>>> Mutex::Locker l(lock); >>>> if (ret < 0) >>>> ret_code = ret; >>>> ++completed; >>>> } >>>> >>>> lock_cond.lock.Lock(); >>>> lock_cond.cond.Signal(); >>>> lock_cond.lock.Unlock(); >>>> } >>>> >>>> int get_ret_code () { >>>> Mutex::Locker l(lock); >>>> return ret_code; >>>> } >>>> >>>> virtual int wait_completion() { >>>> lock_cond.lock.Lock(); >>>> while (1) { >>>> if (is_completed()) { >>>> lock_cond.lock.Unlock(); >>>> return ret_code; >>>> } >>>> lock_cond.cond.Wait(lock_cond.lock); >>>> lock_cond.lock.Lock(); >>>> } >>>> } >>>> }; >>>> >>>> class ClsBucketIndexListAioThrottler : public ClsBucketIndexAioThrottler { >>>> protected: >>>> vector<string> bucket_objects; >>>> vector<string>::iterator iter_pos; >>>> public: >>>> ClsBucketIndexListAioThrottler(IoCtx& _io_ctx, const vector<string> _bucket_objs) >>>> : ClsBucketIndexAioThrottler(_io_ctx), bucket_objects(_bucket_objs), >>>> iter_pos(bucket_objects.begin()) {} >>>> >>>> virtual bool is_completed() { >>>> Mutex::Locker l(lock); >>>> int sent = 0; >>>> vector<string>::iterator iter = bucket_objects.begin(); >>>> for (; iter != iter_pos; ++iter) ++sent; >>>> >>>> return (sent == completed && >>>> (iter_pos == bucket_objects.end() /*Success*/ || ret_code < 0 /*Failure*/)); >>>> } >>>> }; >>>> >>>> template<typename T> >>>> class ClsBucketIndexOpCtx : public ObjectOperationCompletion { >>>> private: >>>> T* data; >>>> // Return code of the operation >>>> int* ret_code; >>>> >>>> // The Aio completion object associated with this Op, it should >>>> // be release from within the completion handler >>>> librados::AioCompletion* completion; >>>> ClsBucketIndexAioThrottler* throttler; >>>> public: >>>> ClsBucketIndexOpCtx(T* _data, int* _ret_code, librados::AioCompletion* _completion, >>>> ClsBucketIndexAioThrottler* _throttler) >>>> : data(_data), ret_code(_ret_code), completion(_completion), throttler(_throttler) {} >>>> ~ClsBucketIndexOpCtx() {} >>>> >>>> // The completion callback, fill the response data >>>> void handle_completion(int r, bufferlist& outbl) { >>>> if (r >= 0) { >>>> if (data) { >>>> try { >>>> bufferlist::iterator iter = outbl.begin(); >>>> ::decode((*data), iter); >>>> } catch (buffer::error& err) { >>>> r = -EIO; >>>> } >>>> } >>>> // Do the next request >>>> } >>>> throttler->do_next(); >>>> throttler->complete(r); >>>> if (completion) { >>>> completion->release(); >>>> } >>>> } >>>> }; >>>> >>>> >>>> class ClsBucketIndexInitAioThrottler : public ClsBucketIndexListAioThrottler { >>>> public: >>>> ClsBucketIndexInitAioThrottler(IoCtx& _io_ctx, const vector<string> _bucket_objs) : >>>> ClsBucketIndexListAioThrottler(_io_ctx, _bucket_objs) {} >>>> >>>> virtual void do_next() { >>>> string oid; >>>> { >>>> Mutex::Locker l(lock); >>>> if (iter_pos == bucket_objects.end()) >>>> return; >>>> oid = *(iter_pos++); >>>> } >>>> AioCompletion* c = librados::Rados::aio_create_completion(NULL, NULL, NULL); >>>> // Dummy >>>> bufferlist in; >>>> librados::ObjectWriteOperation op; >>>> op.create(true); >>>> op.exec("rgw", "bucket_init_index", in, new ClsBucketIndexOpCtx<int>(NULL, NULL, c, this)); >>>> io_ctx.aio_operate(oid, c, &op, NULL); >>>> } >>>> }; >>>> >>>> >>>> int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx, >>>> const vector<string>& bucket_objs, uint32_t max_aio) >>>> { >>>> vector<string>::const_iterator iter = bucket_objs.begin(); >>>> bufferlist in; >>>> ClsBucketIndexAioThrottler* throttler = new ClsBucketIndexInitAioThrottler(io_ctx, bucket_objs); >>>> for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { >>>> throttler->do_next(); >>>> } >>>> throttler->wait_completion(); >>>> return 0; >>>> } >>>> >>>> >>> >> -- >> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in >> the body of a message to majordomo@xxxxxxxxxxxxxxx >> More majordomo info at http://vger.kernel.org/majordomo-info.html > > -- > To unsubscribe from this list: send the line "unsubscribe ceph-devel" in > the body of a message to majordomo@xxxxxxxxxxxxxxx > More majordomo info at http://vger.kernel.org/majordomo-info.html > -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html