On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <stefanha@xxxxxxxxx> wrote: > On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@xxxxxxxxxxxxxxxxxx> wrote: >> +static void bdrv_block_timer(void *opaque) >> +{ >> + BlockDriverState *bs = opaque; >> + BlockQueue *queue = bs->block_queue; >> + uint64_t intval = 1; >> + >> + while (!QTAILQ_EMPTY(&queue->requests)) { >> + BlockIORequest *request; >> + int ret; >> + >> + request = QTAILQ_FIRST(&queue->requests); >> + QTAILQ_REMOVE(&queue->requests, request, entry); >> + >> + ret = queue->handler(request); > > Please remove the function pointer and call qemu_block_queue_handler() > directly. This indirection is not needed and makes it harder to > follow the code. > >> + if (ret == 0) { >> + QTAILQ_INSERT_HEAD(&queue->requests, request, entry); >> + break; >> + } >> + >> + qemu_free(request); >> + } >> + >> + qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock)); > > intval is always 1. The timer should be set to the next earliest deadline. > >> +} >> + >> void bdrv_register(BlockDriver *bdrv) >> { >> if (!bdrv->bdrv_aio_readv) { >> @@ -476,6 +508,11 @@ static int bdrv_open_common(BlockDriverState *bs, const char *filename, >> goto free_and_fail; >> } >> >> + /* throttling disk I/O limits */ >> + if (bs->io_limits != NULL) { >> + bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler); >> + } >> + >> #ifndef _WIN32 >> if (bs->is_temporary) { >> unlink(filename); >> @@ -642,6 +679,16 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, >> bs->change_cb(bs->change_opaque, CHANGE_MEDIA); >> } >> >> + /* throttling disk I/O limits */ >> + if (bs->io_limits != NULL) { > > block_queue is allocated in bdrv_open_common() but these variables are > initialized in bdrv_open(). Can you move them together, I don't see a > reason to keep them apart? > >> + bs->io_disps = qemu_mallocz(sizeof(BlockIODisp)); >> + bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs); >> + qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock)); > > Why is the timer being scheduled immediately? There are no queued requests. > >> + >> + bs->slice_start[0] = qemu_get_clock_ns(vm_clock); >> + bs->slice_start[1] = qemu_get_clock_ns(vm_clock); >> + } >> + >> return 0; >> >> unlink_and_fail: >> @@ -680,6 +727,15 @@ void bdrv_close(BlockDriverState *bs) >> if (bs->change_cb) >> bs->change_cb(bs->change_opaque, CHANGE_MEDIA); >> } >> + >> + /* throttling disk I/O limits */ >> + if (bs->block_queue) { >> + qemu_del_block_queue(bs->block_queue); > > 3 space indent, should be 4 spaces. > >> + } >> + >> + if (bs->block_timer) { > > qemu_free_timer() will only free() the timer memory but it will not > cancel the timer. Use qemu_del_timer() first to ensure that the timer > is not pending: > > qemu_del_timer(bs->block_timer); > >> + qemu_free_timer(bs->block_timer); >> + } >> } >> >> void bdrv_close_all(void) >> @@ -1312,6 +1368,13 @@ void bdrv_get_geometry_hint(BlockDriverState *bs, >> *psecs = bs->secs; >> } >> >> +/* throttling disk io limits */ >> +void bdrv_set_io_limits(BlockDriverState *bs, >> + BlockIOLimit *io_limits) >> +{ >> + bs->io_limits = io_limits; > > This function takes ownership of io_limits but never frees it. I > suggest not taking ownership and just copying io_limits into > bs->io_limits so that the caller does not need to malloc() and the > lifecycle of bs->io_limits is completely under our control. > > Easiest would be to turn BlockDriverState.io_limits into: > > BlockIOLimit io_limits; > > and just copy in bdrv_set_io_limits(): > > bs->io_limits = *io_limits; > > bdrv_exceed_io_limits() returns quite quickly if no limits are set, so > I wouldn't worry about optimizing it out yet. > >> +} >> + >> /* Recognize floppy formats */ >> typedef struct FDFormat { >> FDriveType drive; >> @@ -2111,6 +2174,154 @@ char *bdrv_snapshot_dump(char *buf, int buf_size, QEMUSnapshotInfo *sn) >> return buf; >> } >> >> +bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors, >> + bool is_write, uint64_t *wait) { >> + int64_t real_time; >> + uint64_t bps_limit = 0; >> + double bytes_limit, bytes_disp, bytes_res, elapsed_time; >> + double slice_time = 0.1, wait_time; >> + >> + if (bs->io_limits->bps[2]) { >> + bps_limit = bs->io_limits->bps[2]; > > Please define a constant (IO_LIMIT_TOTAL?) instead of using magic number 2. > >> + } else if (bs->io_limits->bps[is_write]) { >> + bps_limit = bs->io_limits->bps[is_write]; >> + } else { >> + if (wait) { >> + *wait = 0; >> + } >> + >> + return false; >> + } >> + >> + real_time = qemu_get_clock_ns(vm_clock); >> + if (bs->slice_start[is_write] + 100000000 <= real_time) { > > Please define a constant for the 100 ms time slice instead of using > 100000000 directly. > >> + bs->slice_start[is_write] = real_time; >> + >> + bs->io_disps->bytes[is_write] = 0; >> + if (bs->io_limits->bps[2]) { >> + bs->io_disps->bytes[!is_write] = 0; >> + } > > All previous data should be discarded when a new time slice starts: > bs->io_disps->bytes[IO_LIMIT_READ] = 0; > bs->io_disps->bytes[IO_LIMIT_WRITE] = 0; > >> + } > > Please start the new slice in common code. That way you can clear > bytes[] and ios[] at the same time and don't need to duplicate this > code in both bdrv_exceed_bps_limits() and bdrv_exceed_iops_limits(). > >> + >> + elapsed_time = (real_time - bs->slice_start[is_write]) / 1000000000.0; >> + fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time); >> + >> + bytes_limit = bps_limit * slice_time; >> + bytes_disp = bs->io_disps->bytes[is_write]; >> + if (bs->io_limits->bps[2]) { >> + bytes_disp += bs->io_disps->bytes[!is_write]; >> + } >> + >> + bytes_res = (unsigned) nb_sectors * BDRV_SECTOR_SIZE; >> + fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time); >> + >> + if (bytes_disp + bytes_res <= bytes_limit) { >> + if (wait) { >> + *wait = 0; >> + } >> + >> + fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n"); >> + return false; >> + } >> + >> + /* Calc approx time to dispatch */ >> + wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit; >> + if (!wait_time) { >> + wait_time = 1; >> + } >> + >> + wait_time = wait_time + (slice_time - elapsed_time); >> + if (wait) { >> + *wait = wait_time * 1000000000 + 1; >> + } >> + >> + return true; >> +} > > After a slice expires all bytes/iops dispatched data is forgotten, > even if there are still requests queued. This means that requests > issued by the guest immediately after a new 100 ms period will be > issued but existing queued requests will still be waiting. > > And since the queued requests don't get their next chance until later, > it's possible that they will be requeued because the requests that the > guest snuck in have brought us to the limit again. > > In order to solve this problem, we need to extend the current slice if Extend the current slice? like in-kernel block throttling algorithm? Our algorithm seems not to adopt it currently. > there are still requests pending. To prevent extensions from growing > the slice forever (and keeping too much old data around), it should be > alright to cull data from 2 slices ago. The simplest way of doing > that is to subtract the bps/iops limits from the bytes/iops > dispatched. You mean that the largest value of current_slice_time is not more than 2 slice_time? > >> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num, >> if (bdrv_check_request(bs, sector_num, nb_sectors)) >> return NULL; >> >> + /* throttling disk read I/O */ >> + if (bs->io_limits != NULL) { >> + if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) { >> + ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv, >> + sector_num, qiov, nb_sectors, cb, opaque); > > 5 space indent, should be 4. > >> + fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock)); >> + qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock)); > > Imagine 3 requests that need to be queued: A, B, and C. Since the > timer is unconditionally set each time a request is queued, the timer > will be set to C's wait_time. However A or B's wait_time might be > earlier and we will miss that deadline! Yeah, exactly there is this issue. > > We really need a priority queue here. QEMU's timers solve the same > problem with a sorted list, which might actually be faster for short > lists where a fancy data structure has too much overhead. You mean the block requests should be handled in FIFO way in order? If the block queue is not empty, should this coming request be enqueued at first? right? > >> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue, >> + BlockDriverState *bs, >> + BlockRequestHandler *handler, >> + int64_t sector_num, >> + QEMUIOVector *qiov, >> + int nb_sectors, >> + BlockDriverCompletionFunc *cb, >> + void *opaque) >> +{ >> + BlockIORequest *request; >> + BlockDriverAIOCB *acb; >> + >> + request = qemu_malloc(sizeof(BlockIORequest)); >> + request->bs = bs; >> + request->handler = handler; >> + request->sector_num = sector_num; >> + request->qiov = qiov; >> + request->nb_sectors = nb_sectors; >> + request->cb = cb; >> + request->opaque = opaque; >> + >> + QTAILQ_INSERT_TAIL(&queue->requests, request, entry); >> + >> + acb = qemu_malloc(sizeof(*acb)); >> + acb->bs = bs; >> + acb->cb = cb; >> + acb->opaque = opaque; > > AIOPool and qemu_aio_get() should be used instead of manually doing > this. Otherwise bdrv_aio_cancel() does not work. > > In order to free our acb when the I/O request completes we need a cb > function. Right now acb is leaking. Yeah, >> + acb->cb = cb; this expression can not complete what you expect? > >> diff --git a/qemu-config.c b/qemu-config.c >> index efa892c..ad5f2d9 100644 >> --- a/qemu-config.c >> +++ b/qemu-config.c >> @@ -82,6 +82,30 @@ static QemuOptsList qemu_drive_opts = { >> .name = "boot", >> .type = QEMU_OPT_BOOL, >> .help = "make this a boot drive", >> + },{ >> + .name = "iops", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its iops for this drive", > > Let's explain what "iops" means: > > "limit total I/O operations per second" > >> + },{ >> + .name = "iops_rd", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its iops_rd for this drive", > > "limit read operations per second" > >> + },{ >> + .name = "iops_wr", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its iops_wr for this drive", > > "limit write operations per second" > >> + },{ >> + .name = "bps", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its bps for this drive", > > "limit total bytes per second" > >> + },{ >> + .name = "bps_rd", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its bps_rd for this drive", > > "limit read bytes per second" > >> + },{ >> + .name = "bps_wr", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its iops_wr for this drive", > > "limit write bytes per second" > > Stefan > -- Regards, Zhi Yong Wu -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html