On 08/11/2011 05:32 PM, Umesh Deshpande wrote:
This patch creates a separate thread for the guest migration on the source side. migrate_cancel request from the iothread is handled asynchronously. That is, iothread submits migrate_cancel to the migration thread and returns, while the migration thread attends this request at the next iteration to terminate its execution.
Looks pretty good! I hope you agree. :) Just one note inside.
Signed-off-by: Umesh Deshpande<udeshpan@xxxxxxxxxx> --- buffered_file.c | 85 ++++++++++++++++++++++++++++++++---------------------- buffered_file.h | 4 ++ migration.c | 49 ++++++++++++++----------------- migration.h | 6 ++++ 4 files changed, 82 insertions(+), 62 deletions(-) diff --git a/buffered_file.c b/buffered_file.c index 41b42c3..19932b6 100644 --- a/buffered_file.c +++ b/buffered_file.c @@ -16,6 +16,8 @@ #include "qemu-timer.h" #include "qemu-char.h" #include "buffered_file.h" +#include "migration.h" +#include "qemu-thread.h" //#define DEBUG_BUFFERED_FILE @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered void *opaque; QEMUFile *file; int has_error; + int closed; int freeze_output; size_t bytes_xfer; size_t xfer_limit; uint8_t *buffer; size_t buffer_size; size_t buffer_capacity; - QEMUTimer *timer; + QemuThread thread; } QEMUFileBuffered; #ifdef DEBUG_BUFFERED_FILE @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in offset = size; } - if (pos == 0&& size == 0) { - DPRINTF("file is ready\n"); - if (s->bytes_xfer<= s->xfer_limit) { - DPRINTF("notifying client\n"); - s->put_ready(s->opaque); - } - } - return offset; } @@ -175,20 +170,20 @@ static int buffered_close(void *opaque) while (!s->has_error&& s->buffer_size) { buffered_flush(s); - if (s->freeze_output) + if (s->freeze_output) { s->wait_for_unfreeze(s); + } }
This is racy; you might end up calling buffered_put_buffer twice from two different threads.
- ret = s->close(s->opaque); + s->closed = 1; - qemu_del_timer(s->timer); - qemu_free_timer(s->timer); + ret = s->close(s->opaque); qemu_free(s->buffer); - qemu_free(s);
... similarly, here the migration thread might end up using the buffer. Just set s->closed here and wait for thread completion; the migration thread can handle the flushes free the buffer etc. Let the migration thread do as much as possible, it will simplify your life.
return ret; } + static int buffered_rate_limit(void *opaque) { QEMUFileBuffered *s = opaque; @@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque) return s->xfer_limit; } -static void buffered_rate_tick(void *opaque) +static void *migrate_vm(void *opaque) { QEMUFileBuffered *s = opaque; + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; - if (s->has_error) { - buffered_close(s); - return; - } + qemu_mutex_lock_iothread(); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + while (!s->closed) {
... This can be in fact while (!s->closed || s->buffered_size) and that alone will subsume the loop in buffered_close, no?
+ if (s->freeze_output) { + s->wait_for_unfreeze(s); + s->freeze_output = 0; + continue; + } - if (s->freeze_output) - return; + if (s->has_error) { + break; + } + + current_time = qemu_get_clock_ms(rt_clock); + if (!s->closed&& (expire_time> current_time)) { + tv.tv_usec = 1000 * (expire_time - current_time); + select(0, NULL, NULL, NULL,&tv); + continue; + } - s->bytes_xfer = 0; + s->bytes_xfer = 0; + buffered_flush(s); - buffered_flush(s); + expire_time = qemu_get_clock_ms(rt_clock) + 100; + s->put_ready(s->opaque); + } - /* Add some checks around this */ - s->put_ready(s->opaque); + if (s->has_error) { + buffered_close(s); + } + qemu_free(s); + + qemu_mutex_unlock_iothread(); + + return NULL; } QEMUFile *qemu_fopen_ops_buffered(void *opaque, - size_t bytes_per_sec, - BufferedPutFunc *put_buffer, - BufferedPutReadyFunc *put_ready, - BufferedWaitForUnfreezeFunc *wait_for_unfreeze, - BufferedCloseFunc *close) + size_t bytes_per_sec, + BufferedPutFunc *put_buffer, + BufferedPutReadyFunc *put_ready, + BufferedWaitForUnfreezeFunc *wait_for_unfreeze, + BufferedCloseFunc *close) { QEMUFileBuffered *s; @@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, s->put_ready = put_ready; s->wait_for_unfreeze = wait_for_unfreeze; s->close = close; + s->closed = 0; s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, buffered_close, buffered_rate_limit, buffered_set_rate_limit, - buffered_get_rate_limit); - - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); + buffered_get_rate_limit); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + qemu_thread_create(&s->thread, migrate_vm, s); return s->file; } diff --git a/buffered_file.h b/buffered_file.h index 98d358b..477bf7c 100644 --- a/buffered_file.h +++ b/buffered_file.h @@ -17,9 +17,13 @@ #include "hw/hw.h" typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size); +typedef void (BufferedBeginFunc)(void *opaque);
Unused typedef.
typedef void (BufferedPutReadyFunc)(void *opaque); typedef void (BufferedWaitForUnfreezeFunc)(void *opaque); typedef int (BufferedCloseFunc)(void *opaque); +typedef void (BufferedWaitForCancelFunc)(void *opaque); + +void wait_for_cancel(void *opaque);
BufferedWaitForCancelFunc should go in patch 2; wait_for_cancel is unused.
QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit, BufferedPutFunc *put_buffer, diff --git a/migration.c b/migration.c index af3a1f2..d8a0abb 100644 --- a/migration.c +++ b/migration.c @@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s) { int ret = 0; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - if (s->file) { DPRINTF("closing file\n"); if (qemu_fclose(s->file) != 0) { @@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s) return ret; } -void migrate_fd_put_notify(void *opaque) -{ - FdMigrationState *s = opaque; - - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - qemu_file_put_notify(s->file); -} -
qemu_file_put_notify is also unused now.
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) { FdMigrationState *s = opaque; @@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) if (ret == -1) ret = -(s->get_error(s)); - if (ret == -EAGAIN) { - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); - } else if (ret< 0) { + if (ret< 0&& ret != -EAGAIN) { if (s->mon) { monitor_resume(s->mon); } @@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) void migrate_fd_connect(FdMigrationState *s) { - int ret; - + s->begin = 1; s->file = qemu_fopen_ops_buffered(s, s->bandwidth_limit, migrate_fd_put_buffer, migrate_fd_put_ready, migrate_fd_wait_for_unfreeze, migrate_fd_close); - - DPRINTF("beginning savevm\n"); - ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, - s->mig_state.shared); - if (ret< 0) { - DPRINTF("failed, %d\n", ret); - migrate_fd_error(s); - return; - } - - migrate_fd_put_ready(s); } void migrate_fd_put_ready(void *opaque) { FdMigrationState *s = opaque; + int ret; if (s->state != MIG_STATE_ACTIVE) { DPRINTF("put_ready returning because of non-active state\n"); + if (s->state == MIG_STATE_CANCELLED) { + migrate_fd_terminate(s); + } return; } + if (s->begin) { + DPRINTF("beginning savevm\n"); + ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, + s->mig_state.shared); + if (ret< 0) { + DPRINTF("failed, %d\n", ret); + migrate_fd_error(s); + return; + } + s->begin = 0; + } + DPRINTF("iterate\n"); if (qemu_savevm_state_iterate(s->mon, s->file) == 1) { int state; @@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state) DPRINTF("cancelling migration\n"); s->state = MIG_STATE_CANCELLED; +} + +void migrate_fd_terminate(FdMigrationState *s) +{ notifier_list_notify(&migration_state_notifiers); qemu_savevm_state_cancel(s->mon, s->file); @@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque) { FdMigrationState *s = opaque; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); return s->close(s); } diff --git a/migration.h b/migration.h index 050c56c..887f84c 100644 --- a/migration.h +++ b/migration.h @@ -45,9 +45,11 @@ struct FdMigrationState int fd; Monitor *mon; int state; + int begin; int (*get_error)(struct FdMigrationState*); int (*close)(struct FdMigrationState*); int (*write)(struct FdMigrationState*, const void *, size_t); + void (*callback)(void *); void *opaque; }; @@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size); void migrate_fd_connect(FdMigrationState *s); +void migrate_fd_begin(void *opaque); + void migrate_fd_put_ready(void *opaque); int migrate_fd_get_status(MigrationState *mig_state); void migrate_fd_cancel(MigrationState *mig_state); +void migrate_fd_terminate(FdMigrationState *s); + void migrate_fd_release(MigrationState *mig_state); void migrate_fd_wait_for_unfreeze(void *opaque);
-- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html