Re: [PATCH v1 1/1] Submit the codes for QEMU disk I/O limits.

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <stefanha@xxxxxxxxx> wrote:
> On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@xxxxxxxxxxxxxxxxxx> wrote:
>> +static void bdrv_block_timer(void *opaque)
>> +{
>> +    BlockDriverState *bs = opaque;
>> +    BlockQueue *queue = bs->block_queue;
>> +    uint64_t intval = 1;
>> +
>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>> +        BlockIORequest *request;
>> +        int ret;
>> +
>> +        request = QTAILQ_FIRST(&queue->requests);
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +
>> +        ret = queue->handler(request);
>
> Please remove the function pointer and call qemu_block_queue_handler()
> directly.  This indirection is not needed and makes it harder to
> follow the code.
Should it keep the same style with other queue implemetations such as
network queue? As you have known, network queue has one queue handler.

>
>> +        if (ret == 0) {
>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>> +            break;
>> +        }
>> +
>> +        qemu_free(request);
>> +    }
>> +
>> +    qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock));
>
> intval is always 1.  The timer should be set to the next earliest deadline.
pls see bdrv_aio_readv/writev:
+    /* throttling disk read I/O */
+    if (bs->io_limits != NULL) {
+       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
+            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
+                               sector_num, qiov, nb_sectors, cb, opaque);
+           qemu_mod_timer(bs->block_timer, wait_time +
qemu_get_clock_ns(vm_clock));

The timer has been modified when the blk request is enqueued.

>
>> +}
>> +
>>  void bdrv_register(BlockDriver *bdrv)
>>  {
>>     if (!bdrv->bdrv_aio_readv) {
>> @@ -476,6 +508,11 @@ static int bdrv_open_common(BlockDriverState *bs, const char *filename,
>>         goto free_and_fail;
>>     }
>>
>> +    /* throttling disk I/O limits */
>> +    if (bs->io_limits != NULL) {
>> +       bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler);
>> +    }
>> +
>>  #ifndef _WIN32
>>     if (bs->is_temporary) {
>>         unlink(filename);
>> @@ -642,6 +679,16 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
>>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>>     }
>>
>> +    /* throttling disk I/O limits */
>> +    if (bs->io_limits != NULL) {
>
> block_queue is allocated in bdrv_open_common() but these variables are
> initialized in bdrv_open().  Can you move them together, I don't see a
> reason to keep them apart?
OK, They will be done.
>
>> +       bs->io_disps    = qemu_mallocz(sizeof(BlockIODisp));
>> +       bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>> +       qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock));
>
> Why is the timer being scheduled immediately?  There are no queued requests.
OK, you mean if no request is in block queue, the timer should be stoped?

>
>> +
>> +       bs->slice_start[0] = qemu_get_clock_ns(vm_clock);
>> +       bs->slice_start[1] = qemu_get_clock_ns(vm_clock);
>> +    }
>> +
>>     return 0;
>>
>>  unlink_and_fail:
>> @@ -680,6 +727,15 @@ void bdrv_close(BlockDriverState *bs)
>>         if (bs->change_cb)
>>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>>     }
>> +
>> +    /* throttling disk I/O limits */
>> +    if (bs->block_queue) {
>> +       qemu_del_block_queue(bs->block_queue);
>
> 3 space indent, should be 4 spaces.
done
>
>> +    }
>> +
>> +    if (bs->block_timer) {
>
> qemu_free_timer() will only free() the timer memory but it will not
> cancel the timer.  Use qemu_del_timer() first to ensure that the timer
> is not pending:
>
> qemu_del_timer(bs->block_timer);
OK, thanks.

>
>> +        qemu_free_timer(bs->block_timer);
>> +    }
>>  }
>>
>>  void bdrv_close_all(void)
>> @@ -1312,6 +1368,13 @@ void bdrv_get_geometry_hint(BlockDriverState *bs,
>>     *psecs = bs->secs;
>>  }
>>
>> +/* throttling disk io limits */
>> +void bdrv_set_io_limits(BlockDriverState *bs,
>> +                           BlockIOLimit *io_limits)
>> +{
>> +    bs->io_limits            = io_limits;
>
> This function takes ownership of io_limits but never frees it.  I
> suggest not taking ownership and just copying io_limits into
> bs->io_limits so that the caller does not need to malloc() and the
> lifecycle of bs->io_limits is completely under our control.
>
> Easiest would be to turn BlockDriverState.io_limits into:
>
> BlockIOLimit io_limits;
>
> and just copy in bdrv_set_io_limits():
>
> bs->io_limits = *io_limits;
Good point.
>
> bdrv_exceed_io_limits() returns quite quickly if no limits are set, so
> I wouldn't worry about optimizing it out yet.
>
>> +}
>> +
>>  /* Recognize floppy formats */
>>  typedef struct FDFormat {
>>     FDriveType drive;
>> @@ -2111,6 +2174,154 @@ char *bdrv_snapshot_dump(char *buf, int buf_size, QEMUSnapshotInfo *sn)
>>     return buf;
>>  }
>>
>> +bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>> +                           bool is_write, uint64_t *wait) {
>> +    int64_t  real_time;
>> +    uint64_t bps_limit   = 0;
>> +    double   bytes_limit, bytes_disp, bytes_res, elapsed_time;
>> +    double   slice_time = 0.1, wait_time;
>> +
>> +    if (bs->io_limits->bps[2]) {
>> +        bps_limit = bs->io_limits->bps[2];
>
> Please define a constant (IO_LIMIT_TOTAL?) instead of using magic number 2.
OK.

>
>> +    } else if (bs->io_limits->bps[is_write]) {
>> +        bps_limit = bs->io_limits->bps[is_write];
>> +    } else {
>> +        if (wait) {
>> +            *wait = 0;
>> +        }
>> +
>> +        return false;
>> +    }
>> +
>> +    real_time = qemu_get_clock_ns(vm_clock);
>> +    if (bs->slice_start[is_write] + 100000000 <= real_time) {
>
> Please define a constant for the 100 ms time slice instead of using
> 100000000 directly.
OK.
>
>> +        bs->slice_start[is_write] = real_time;
>> +
>> +        bs->io_disps->bytes[is_write] = 0;
>> +        if (bs->io_limits->bps[2]) {
>> +            bs->io_disps->bytes[!is_write] = 0;
>> +        }
>
> All previous data should be discarded when a new time slice starts:
> bs->io_disps->bytes[IO_LIMIT_READ] = 0;
> bs->io_disps->bytes[IO_LIMIT_WRITE] = 0;
If only bps_rd is specified, bs->io_disps->bytes[IO_LIMIT_WRITE] will
never be used; i think that it should not be cleared here. right?

>
>> +    }
>
> Please start the new slice in common code.  That way you can clear
> bytes[] and ios[] at the same time and don't need to duplicate this
> code in both bdrv_exceed_bps_limits() and bdrv_exceed_iops_limits().
>
>> +
>> +    elapsed_time  = (real_time - bs->slice_start[is_write]) / 1000000000.0;
>> +    fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time);
>> +
>> +    bytes_limit        = bps_limit * slice_time;
>> +    bytes_disp  = bs->io_disps->bytes[is_write];
>> +    if (bs->io_limits->bps[2]) {
>> +        bytes_disp += bs->io_disps->bytes[!is_write];
>> +    }
>> +
>> +    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>> +    fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time);
>> +
>> +    if (bytes_disp + bytes_res <= bytes_limit) {
>> +        if (wait) {
>> +            *wait = 0;
>> +        }
>> +
>> +       fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n");
>> +        return false;
>> +    }
>> +
>> +    /* Calc approx time to dispatch */
>> +    wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit;
>> +    if (!wait_time) {
>> +        wait_time = 1;
>> +    }
>> +
>> +    wait_time = wait_time + (slice_time - elapsed_time);
>> +    if (wait) {
>> +       *wait = wait_time * 1000000000 + 1;
>> +    }
>> +
>> +    return true;
>> +}
>
> After a slice expires all bytes/iops dispatched data is forgotten,
> even if there are still requests queued.  This means that requests
> issued by the guest immediately after a new 100 ms period will be
> issued but existing queued requests will still be waiting.
>
> And since the queued requests don't get their next chance until later,
> it's possible that they will be requeued because the requests that the
> guest snuck in have brought us to the limit again.
>
> In order to solve this problem, we need to extend the current slice if
> there are still requests pending.  To prevent extensions from growing
> the slice forever (and keeping too much old data around), it should be
> alright to cull data from 2 slices ago.  The simplest way of doing
> that is to subtract the bps/iops limits from the bytes/iops
> dispatched.
>
>> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
>>     if (bdrv_check_request(bs, sector_num, nb_sectors))
>>         return NULL;
>>
>> +    /* throttling disk read I/O */
>> +    if (bs->io_limits != NULL) {
>> +       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>> +            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
>> +                               sector_num, qiov, nb_sectors, cb, opaque);
>
> 5 space indent, should be 4.
>
>> +           fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
>> +           qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock));
>
> Imagine 3 requests that need to be queued: A, B, and C.  Since the
> timer is unconditionally set each time a request is queued, the timer
> will be set to C's wait_time.  However A or B's wait_time might be
> earlier and we will miss that deadline!
>
> We really need a priority queue here.  QEMU's timers solve the same
> problem with a sorted list, which might actually be faster for short
> lists where a fancy data structure has too much overhead.
>
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                       BlockDriverState *bs,
>> +                       BlockRequestHandler *handler,
>> +                       int64_t sector_num,
>> +                       QEMUIOVector *qiov,
>> +                       int nb_sectors,
>> +                       BlockDriverCompletionFunc *cb,
>> +                       void *opaque)
>> +{
>> +    BlockIORequest *request;
>> +    BlockDriverAIOCB *acb;
>> +
>> +    request = qemu_malloc(sizeof(BlockIORequest));
>> +    request->bs = bs;
>> +    request->handler = handler;
>> +    request->sector_num = sector_num;
>> +    request->qiov = qiov;
>> +    request->nb_sectors = nb_sectors;
>> +    request->cb = cb;
>> +    request->opaque = opaque;
>> +
>> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> +
>> +    acb = qemu_malloc(sizeof(*acb));
>> +    acb->bs = bs;
>> +    acb->cb = cb;
>> +    acb->opaque = opaque;
>
> AIOPool and qemu_aio_get() should be used instead of manually doing
> this.  Otherwise bdrv_aio_cancel() does not work.
>
> In order to free our acb when the I/O request completes we need a cb
> function.  Right now acb is leaking.
OK.
>
>> diff --git a/qemu-config.c b/qemu-config.c
>> index efa892c..ad5f2d9 100644
>> --- a/qemu-config.c
>> +++ b/qemu-config.c
>> @@ -82,6 +82,30 @@ static QemuOptsList qemu_drive_opts = {
>>             .name = "boot",
>>             .type = QEMU_OPT_BOOL,
>>             .help = "make this a boot drive",
>> +        },{
>> +            .name = "iops",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops for this drive",
>
> Let's explain what "iops" means:
>
> "limit total I/O operations per second"
>
>> +        },{
>> +            .name = "iops_rd",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_rd for this drive",
>
> "limit read operations per second"
>
>> +        },{
>> +            .name = "iops_wr",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_wr for this drive",
>
> "limit write operations per second"
>
>> +        },{
>> +            .name = "bps",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its bps for this drive",
>
> "limit total bytes per second"
>
>> +        },{
>> +            .name = "bps_rd",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its bps_rd for this drive",
>
> "limit read bytes per second"
>
>> +        },{
>> +            .name = "bps_wr",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_wr for this drive",
>
> "limit write bytes per second"
Done
>
> Stefan
>



-- 
Regards,

Zhi Yong Wu
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux