This patch creates a separate thread for the guest migration on the source side. migrate_cancel request from the iothread is handled asynchronously. That is, iothread submits migrate_cancel to the migration thread and returns, while the migration thread attends this request at the next iteration to terminate its execution. Signed-off-by: Umesh Deshpande <udeshpan@xxxxxxxxxx> --- buffered_file.c | 93 ++++++++++++++++++++++++++++++++++-------------------- migration.c | 77 +++++++++++++++++++++++---------------------- migration.h | 1 + savevm.c | 5 --- 4 files changed, 99 insertions(+), 77 deletions(-) diff --git a/buffered_file.c b/buffered_file.c index 41b42c3..bdcdf42 100644 --- a/buffered_file.c +++ b/buffered_file.c @@ -16,6 +16,8 @@ #include "qemu-timer.h" #include "qemu-char.h" #include "buffered_file.h" +#include "migration.h" +#include "qemu-thread.h" //#define DEBUG_BUFFERED_FILE @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered void *opaque; QEMUFile *file; int has_error; + int closed; int freeze_output; size_t bytes_xfer; size_t xfer_limit; uint8_t *buffer; size_t buffer_size; size_t buffer_capacity; - QEMUTimer *timer; + QemuThread thread; } QEMUFileBuffered; #ifdef DEBUG_BUFFERED_FILE @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in offset = size; } - if (pos == 0 && size == 0) { - DPRINTF("file is ready\n"); - if (s->bytes_xfer <= s->xfer_limit) { - DPRINTF("notifying client\n"); - s->put_ready(s->opaque); - } - } - return offset; } @@ -175,20 +170,20 @@ static int buffered_close(void *opaque) while (!s->has_error && s->buffer_size) { buffered_flush(s); - if (s->freeze_output) + if (s->freeze_output) { s->wait_for_unfreeze(s); + } } - ret = s->close(s->opaque); + s->closed = 1; - qemu_del_timer(s->timer); - qemu_free_timer(s->timer); + ret = s->close(s->opaque); qemu_free(s->buffer); - qemu_free(s); return ret; } + static int buffered_rate_limit(void *opaque) { QEMUFileBuffered *s = opaque; @@ -228,34 +223,63 @@ static int64_t buffered_get_rate_limit(void *opaque) return s->xfer_limit; } -static void buffered_rate_tick(void *opaque) +static void *migrate_vm(void *opaque) { QEMUFileBuffered *s = opaque; + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; - if (s->has_error) { - buffered_close(s); - return; - } + qemu_mutex_lock_iothread(); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + while (!s->closed) { + if (s->freeze_output) { + qemu_mutex_lock_ramlist(); + qemu_mutex_unlock_iothread(); + s->wait_for_unfreeze(s); + qemu_mutex_lock_iothread(); + qemu_mutex_unlock_ramlist(); + s->freeze_output = 0; + continue; + } - if (s->freeze_output) - return; + if (s->has_error) { + break; + } + + current_time = qemu_get_clock_ms(rt_clock); + if (!s->closed && (expire_time > current_time)) { + tv.tv_usec = 1000 * (expire_time - current_time); + qemu_mutex_lock_ramlist(); + qemu_mutex_unlock_iothread(); + select(0, NULL, NULL, NULL, &tv); + qemu_mutex_lock_iothread(); + qemu_mutex_unlock_ramlist(); + continue; + } - s->bytes_xfer = 0; + s->bytes_xfer = 0; + buffered_flush(s); - buffered_flush(s); + expire_time = qemu_get_clock_ms(rt_clock) + 100; + s->put_ready(s->opaque); + } - /* Add some checks around this */ - s->put_ready(s->opaque); + if (s->has_error) { + buffered_close(s); + } + qemu_free(s); + + qemu_mutex_unlock_iothread(); + + return NULL; } QEMUFile *qemu_fopen_ops_buffered(void *opaque, - size_t bytes_per_sec, - BufferedPutFunc *put_buffer, - BufferedPutReadyFunc *put_ready, - BufferedWaitForUnfreezeFunc *wait_for_unfreeze, - BufferedCloseFunc *close) + size_t bytes_per_sec, + BufferedPutFunc *put_buffer, + BufferedPutReadyFunc *put_ready, + BufferedWaitForUnfreezeFunc *wait_for_unfreeze, + BufferedCloseFunc *close) { QEMUFileBuffered *s; @@ -267,15 +291,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, s->put_ready = put_ready; s->wait_for_unfreeze = wait_for_unfreeze; s->close = close; + s->closed = 0; s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, buffered_close, buffered_rate_limit, buffered_set_rate_limit, - buffered_get_rate_limit); - - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); + buffered_get_rate_limit); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + qemu_thread_create(&s->thread, migrate_vm, s); return s->file; } diff --git a/migration.c b/migration.c index af3a1f2..b6ba690 100644 --- a/migration.c +++ b/migration.c @@ -34,6 +34,8 @@ /* Migration speed throttling */ static int64_t max_throttle = (32 << 20); +static int change_speed; + static MigrationState *current_migration; static NotifierList migration_state_notifiers = @@ -141,18 +143,13 @@ int do_migrate_cancel(Monitor *mon, const QDict *qdict, QObject **ret_data) int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data) { int64_t d; - FdMigrationState *s; d = qdict_get_int(qdict, "value"); if (d < 0) { d = 0; } max_throttle = d; - - s = migrate_to_fms(current_migration); - if (s && s->file) { - qemu_file_set_rate_limit(s->file, max_throttle); - } + change_speed = 1; return 0; } @@ -284,8 +281,6 @@ int migrate_fd_cleanup(FdMigrationState *s) { int ret = 0; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - if (s->file) { DPRINTF("closing file\n"); if (qemu_fclose(s->file) != 0) { @@ -307,14 +302,6 @@ int migrate_fd_cleanup(FdMigrationState *s) return ret; } -void migrate_fd_put_notify(void *opaque) -{ - FdMigrationState *s = opaque; - - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - qemu_file_put_notify(s->file); -} - ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) { FdMigrationState *s = opaque; @@ -327,9 +314,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) if (ret == -1) ret = -(s->get_error(s)); - if (ret == -EAGAIN) { - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); - } else if (ret < 0) { + if (ret < 0 && ret != -EAGAIN) { if (s->mon) { monitor_resume(s->mon); } @@ -340,36 +325,59 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) return ret; } -void migrate_fd_connect(FdMigrationState *s) +static void migrate_fd_set_speed(void) { - int ret; + FdMigrationState *s = migrate_to_fms(current_migration); + if (s && s->file && change_speed) { + qemu_file_set_rate_limit(s->file, max_throttle); + change_speed = 0; + } +} + +static void migrate_fd_terminate(FdMigrationState *s) +{ + notifier_list_notify(&migration_state_notifiers); + qemu_savevm_state_cancel(s->mon, s->file); + migrate_fd_cleanup(s); +} + +void migrate_fd_connect(FdMigrationState *s) +{ + s->begin = 1; s->file = qemu_fopen_ops_buffered(s, s->bandwidth_limit, migrate_fd_put_buffer, migrate_fd_put_ready, migrate_fd_wait_for_unfreeze, migrate_fd_close); - - DPRINTF("beginning savevm\n"); - ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, - s->mig_state.shared); - if (ret < 0) { - DPRINTF("failed, %d\n", ret); - migrate_fd_error(s); - return; - } - - migrate_fd_put_ready(s); } void migrate_fd_put_ready(void *opaque) { FdMigrationState *s = opaque; + int ret; if (s->state != MIG_STATE_ACTIVE) { DPRINTF("put_ready returning because of non-active state\n"); + if (s->state == MIG_STATE_CANCELLED) { + migrate_fd_terminate(s); + } return; + } else { + migrate_fd_set_speed(); + } + + if (s->begin) { + DPRINTF("beginning savevm\n"); + ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, + s->mig_state.shared); + if (ret < 0) { + DPRINTF("failed, %d\n", ret); + migrate_fd_error(s); + return; + } + s->begin = 0; } DPRINTF("iterate\n"); @@ -415,10 +423,6 @@ void migrate_fd_cancel(MigrationState *mig_state) DPRINTF("cancelling migration\n"); s->state = MIG_STATE_CANCELLED; - notifier_list_notify(&migration_state_notifiers); - qemu_savevm_state_cancel(s->mon, s->file); - - migrate_fd_cleanup(s); } void migrate_fd_release(MigrationState *mig_state) @@ -458,7 +462,6 @@ int migrate_fd_close(void *opaque) { FdMigrationState *s = opaque; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); return s->close(s); } diff --git a/migration.h b/migration.h index 050c56c..4f50df8 100644 --- a/migration.h +++ b/migration.h @@ -45,6 +45,7 @@ struct FdMigrationState int fd; Monitor *mon; int state; + int begin; int (*get_error)(struct FdMigrationState*); int (*close)(struct FdMigrationState*); int (*write)(struct FdMigrationState*, const void *, size_t); diff --git a/savevm.c b/savevm.c index 8139bc7..f54f555 100644 --- a/savevm.c +++ b/savevm.c @@ -481,11 +481,6 @@ int qemu_fclose(QEMUFile *f) return ret; } -void qemu_file_put_notify(QEMUFile *f) -{ - f->put_buffer(f->opaque, NULL, 0, 0); -} - void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size) { int l; -- 1.7.4.1 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html