On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <stefanha@xxxxxxxxx> wrote: > On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@xxxxxxxxxxxxxxxxxx> wrote: >> +static void bdrv_block_timer(void *opaque) >> +{ >> + BlockDriverState *bs = opaque; >> + BlockQueue *queue = bs->block_queue; >> + uint64_t intval = 1; >> + >> + while (!QTAILQ_EMPTY(&queue->requests)) { >> + BlockIORequest *request; >> + int ret; >> + >> + request = QTAILQ_FIRST(&queue->requests); >> + QTAILQ_REMOVE(&queue->requests, request, entry); >> + >> + ret = queue->handler(request); > > Please remove the function pointer and call qemu_block_queue_handler() > directly. This indirection is not needed and makes it harder to > follow the code. Should it keep the same style with other queue implemetations such as network queue? As you have known, network queue has one queue handler. > >> + if (ret == 0) { >> + QTAILQ_INSERT_HEAD(&queue->requests, request, entry); >> + break; >> + } >> + >> + qemu_free(request); >> + } >> + >> + qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock)); > > intval is always 1. The timer should be set to the next earliest deadline. pls see bdrv_aio_readv/writev: + /* throttling disk read I/O */ + if (bs->io_limits != NULL) { + if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) { + ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv, + sector_num, qiov, nb_sectors, cb, opaque); + qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock)); The timer has been modified when the blk request is enqueued. > >> +} >> + >> void bdrv_register(BlockDriver *bdrv) >> { >> if (!bdrv->bdrv_aio_readv) { >> @@ -476,6 +508,11 @@ static int bdrv_open_common(BlockDriverState *bs, const char *filename, >> goto free_and_fail; >> } >> >> + /* throttling disk I/O limits */ >> + if (bs->io_limits != NULL) { >> + bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler); >> + } >> + >> #ifndef _WIN32 >> if (bs->is_temporary) { >> unlink(filename); >> @@ -642,6 +679,16 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, >> bs->change_cb(bs->change_opaque, CHANGE_MEDIA); >> } >> >> + /* throttling disk I/O limits */ >> + if (bs->io_limits != NULL) { > > block_queue is allocated in bdrv_open_common() but these variables are > initialized in bdrv_open(). Can you move them together, I don't see a > reason to keep them apart? OK, They will be done. > >> + bs->io_disps = qemu_mallocz(sizeof(BlockIODisp)); >> + bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs); >> + qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock)); > > Why is the timer being scheduled immediately? There are no queued requests. OK, you mean if no request is in block queue, the timer should be stoped? > >> + >> + bs->slice_start[0] = qemu_get_clock_ns(vm_clock); >> + bs->slice_start[1] = qemu_get_clock_ns(vm_clock); >> + } >> + >> return 0; >> >> unlink_and_fail: >> @@ -680,6 +727,15 @@ void bdrv_close(BlockDriverState *bs) >> if (bs->change_cb) >> bs->change_cb(bs->change_opaque, CHANGE_MEDIA); >> } >> + >> + /* throttling disk I/O limits */ >> + if (bs->block_queue) { >> + qemu_del_block_queue(bs->block_queue); > > 3 space indent, should be 4 spaces. done > >> + } >> + >> + if (bs->block_timer) { > > qemu_free_timer() will only free() the timer memory but it will not > cancel the timer. Use qemu_del_timer() first to ensure that the timer > is not pending: > > qemu_del_timer(bs->block_timer); OK, thanks. > >> + qemu_free_timer(bs->block_timer); >> + } >> } >> >> void bdrv_close_all(void) >> @@ -1312,6 +1368,13 @@ void bdrv_get_geometry_hint(BlockDriverState *bs, >> *psecs = bs->secs; >> } >> >> +/* throttling disk io limits */ >> +void bdrv_set_io_limits(BlockDriverState *bs, >> + BlockIOLimit *io_limits) >> +{ >> + bs->io_limits = io_limits; > > This function takes ownership of io_limits but never frees it. I > suggest not taking ownership and just copying io_limits into > bs->io_limits so that the caller does not need to malloc() and the > lifecycle of bs->io_limits is completely under our control. > > Easiest would be to turn BlockDriverState.io_limits into: > > BlockIOLimit io_limits; > > and just copy in bdrv_set_io_limits(): > > bs->io_limits = *io_limits; Good point. > > bdrv_exceed_io_limits() returns quite quickly if no limits are set, so > I wouldn't worry about optimizing it out yet. > >> +} >> + >> /* Recognize floppy formats */ >> typedef struct FDFormat { >> FDriveType drive; >> @@ -2111,6 +2174,154 @@ char *bdrv_snapshot_dump(char *buf, int buf_size, QEMUSnapshotInfo *sn) >> return buf; >> } >> >> +bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors, >> + bool is_write, uint64_t *wait) { >> + int64_t real_time; >> + uint64_t bps_limit = 0; >> + double bytes_limit, bytes_disp, bytes_res, elapsed_time; >> + double slice_time = 0.1, wait_time; >> + >> + if (bs->io_limits->bps[2]) { >> + bps_limit = bs->io_limits->bps[2]; > > Please define a constant (IO_LIMIT_TOTAL?) instead of using magic number 2. OK. > >> + } else if (bs->io_limits->bps[is_write]) { >> + bps_limit = bs->io_limits->bps[is_write]; >> + } else { >> + if (wait) { >> + *wait = 0; >> + } >> + >> + return false; >> + } >> + >> + real_time = qemu_get_clock_ns(vm_clock); >> + if (bs->slice_start[is_write] + 100000000 <= real_time) { > > Please define a constant for the 100 ms time slice instead of using > 100000000 directly. OK. > >> + bs->slice_start[is_write] = real_time; >> + >> + bs->io_disps->bytes[is_write] = 0; >> + if (bs->io_limits->bps[2]) { >> + bs->io_disps->bytes[!is_write] = 0; >> + } > > All previous data should be discarded when a new time slice starts: > bs->io_disps->bytes[IO_LIMIT_READ] = 0; > bs->io_disps->bytes[IO_LIMIT_WRITE] = 0; If only bps_rd is specified, bs->io_disps->bytes[IO_LIMIT_WRITE] will never be used; i think that it should not be cleared here. right? > >> + } > > Please start the new slice in common code. That way you can clear > bytes[] and ios[] at the same time and don't need to duplicate this > code in both bdrv_exceed_bps_limits() and bdrv_exceed_iops_limits(). > >> + >> + elapsed_time = (real_time - bs->slice_start[is_write]) / 1000000000.0; >> + fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time); >> + >> + bytes_limit = bps_limit * slice_time; >> + bytes_disp = bs->io_disps->bytes[is_write]; >> + if (bs->io_limits->bps[2]) { >> + bytes_disp += bs->io_disps->bytes[!is_write]; >> + } >> + >> + bytes_res = (unsigned) nb_sectors * BDRV_SECTOR_SIZE; >> + fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time); >> + >> + if (bytes_disp + bytes_res <= bytes_limit) { >> + if (wait) { >> + *wait = 0; >> + } >> + >> + fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n"); >> + return false; >> + } >> + >> + /* Calc approx time to dispatch */ >> + wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit; >> + if (!wait_time) { >> + wait_time = 1; >> + } >> + >> + wait_time = wait_time + (slice_time - elapsed_time); >> + if (wait) { >> + *wait = wait_time * 1000000000 + 1; >> + } >> + >> + return true; >> +} > > After a slice expires all bytes/iops dispatched data is forgotten, > even if there are still requests queued. This means that requests > issued by the guest immediately after a new 100 ms period will be > issued but existing queued requests will still be waiting. > > And since the queued requests don't get their next chance until later, > it's possible that they will be requeued because the requests that the > guest snuck in have brought us to the limit again. > > In order to solve this problem, we need to extend the current slice if > there are still requests pending. To prevent extensions from growing > the slice forever (and keeping too much old data around), it should be > alright to cull data from 2 slices ago. The simplest way of doing > that is to subtract the bps/iops limits from the bytes/iops > dispatched. > >> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num, >> if (bdrv_check_request(bs, sector_num, nb_sectors)) >> return NULL; >> >> + /* throttling disk read I/O */ >> + if (bs->io_limits != NULL) { >> + if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) { >> + ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv, >> + sector_num, qiov, nb_sectors, cb, opaque); > > 5 space indent, should be 4. > >> + fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock)); >> + qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock)); > > Imagine 3 requests that need to be queued: A, B, and C. Since the > timer is unconditionally set each time a request is queued, the timer > will be set to C's wait_time. However A or B's wait_time might be > earlier and we will miss that deadline! > > We really need a priority queue here. QEMU's timers solve the same > problem with a sorted list, which might actually be faster for short > lists where a fancy data structure has too much overhead. > >> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue, >> + BlockDriverState *bs, >> + BlockRequestHandler *handler, >> + int64_t sector_num, >> + QEMUIOVector *qiov, >> + int nb_sectors, >> + BlockDriverCompletionFunc *cb, >> + void *opaque) >> +{ >> + BlockIORequest *request; >> + BlockDriverAIOCB *acb; >> + >> + request = qemu_malloc(sizeof(BlockIORequest)); >> + request->bs = bs; >> + request->handler = handler; >> + request->sector_num = sector_num; >> + request->qiov = qiov; >> + request->nb_sectors = nb_sectors; >> + request->cb = cb; >> + request->opaque = opaque; >> + >> + QTAILQ_INSERT_TAIL(&queue->requests, request, entry); >> + >> + acb = qemu_malloc(sizeof(*acb)); >> + acb->bs = bs; >> + acb->cb = cb; >> + acb->opaque = opaque; > > AIOPool and qemu_aio_get() should be used instead of manually doing > this. Otherwise bdrv_aio_cancel() does not work. > > In order to free our acb when the I/O request completes we need a cb > function. Right now acb is leaking. OK. > >> diff --git a/qemu-config.c b/qemu-config.c >> index efa892c..ad5f2d9 100644 >> --- a/qemu-config.c >> +++ b/qemu-config.c >> @@ -82,6 +82,30 @@ static QemuOptsList qemu_drive_opts = { >> .name = "boot", >> .type = QEMU_OPT_BOOL, >> .help = "make this a boot drive", >> + },{ >> + .name = "iops", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its iops for this drive", > > Let's explain what "iops" means: > > "limit total I/O operations per second" > >> + },{ >> + .name = "iops_rd", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its iops_rd for this drive", > > "limit read operations per second" > >> + },{ >> + .name = "iops_wr", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its iops_wr for this drive", > > "limit write operations per second" > >> + },{ >> + .name = "bps", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its bps for this drive", > > "limit total bytes per second" > >> + },{ >> + .name = "bps_rd", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its bps_rd for this drive", > > "limit read bytes per second" > >> + },{ >> + .name = "bps_wr", >> + .type = QEMU_OPT_NUMBER, >> + .help = "cap its iops_wr for this drive", > > "limit write bytes per second" Done > > Stefan > -- Regards, Zhi Yong Wu -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html