* Yang Hongyang (yanghy@xxxxxxxxxxxxxx) wrote: > implement colo checkpoint protocol. > > Checkpoint synchronzing points. > > Primary Secondary > NEW @ > Suspend > SUSPENDED @ > Suspend&Save state > SEND @ > Send state Receive state > RECEIVED @ > Flush network Load state > LOADED @ > Resume Resume > > Start Comparing > NOTE: > 1) '@' who sends the message > 2) Every sync-point is synchronized by two sides with only > one handshake(single direction) for low-latency. > If more strict synchronization is required, a opposite direction > sync-point should be added. > 3) Since sync-points are single direction, the remote side may > go forward a lot when this side just receives the sync-point. > > Signed-off-by: Yang Hongyang <yanghy@xxxxxxxxxxxxxx> > --- > migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- > 1 file changed, 262 insertions(+), 6 deletions(-) > > diff --git a/migration-colo.c b/migration-colo.c > index 2699e77..a708872 100644 > --- a/migration-colo.c > +++ b/migration-colo.c > @@ -24,6 +24,41 @@ > */ > #define CHKPOINT_TIMER 10000 > > +enum { > + COLO_READY = 0x46, > + > + /* > + * Checkpoint synchronzing points. > + * > + * Primary Secondary > + * NEW @ > + * Suspend > + * SUSPENDED @ > + * Suspend&Save state > + * SEND @ > + * Send state Receive state > + * RECEIVED @ > + * Flush network Load state > + * LOADED @ > + * Resume Resume > + * > + * Start Comparing > + * NOTE: > + * 1) '@' who sends the message > + * 2) Every sync-point is synchronized by two sides with only > + * one handshake(single direction) for low-latency. > + * If more strict synchronization is required, a opposite direction > + * sync-point should be added. > + * 3) Since sync-points are single direction, the remote side may > + * go forward a lot when this side just receives the sync-point. > + */ > + COLO_CHECKPOINT_NEW, > + COLO_CHECKPOINT_SUSPENDED, > + COLO_CHECKPOINT_SEND, > + COLO_CHECKPOINT_RECEIVED, > + COLO_CHECKPOINT_LOADED, > +}; > + > static QEMUBH *colo_bh; > > bool colo_supported(void) > @@ -185,30 +220,161 @@ static const QEMUFileOps colo_read_ops = { > .close = colo_close, > }; > > +/* colo checkpoint control helper */ > +static bool is_master(void); > +static bool is_slave(void); > + > +static void ctl_error_handler(void *opaque, int err) > +{ > + if (is_slave()) { > + /* TODO: determine whether we need to failover */ > + /* FIXME: we will not failover currently, just kill slave */ > + error_report("error: colo transmission failed!\n"); > + exit(1); > + } else if (is_master()) { > + /* Master still alive, do not failover */ > + error_report("error: colo transmission failed!\n"); > + return; > + } else { > + error_report("COLO: Unexpected error happend!\n"); > + exit(EXIT_FAILURE); > + } > +} > + > +static int colo_ctl_put(QEMUFile *f, uint64_t request) > +{ > + int ret = 0; > + > + qemu_put_be64(f, request); > + qemu_fflush(f); > + > + ret = qemu_file_get_error(f); > + if (ret < 0) { > + ctl_error_handler(f, ret); > + return 1; > + } > + > + return ret; > +} > + > +static int colo_ctl_get_value(QEMUFile *f, uint64_t *value) > +{ > + int ret = 0; > + uint64_t temp; > + > + temp = qemu_get_be64(f); > + > + ret = qemu_file_get_error(f); > + if (ret < 0) { > + ctl_error_handler(f, ret); > + return 1; > + } > + > + *value = temp; > + return 0; > +} > + > +static int colo_ctl_get(QEMUFile *f, uint64_t require) > +{ > + int ret; > + uint64_t value; > + > + ret = colo_ctl_get_value(f, &value); > + if (ret) { > + return ret; > + } > + > + if (value != require) { > + error_report("unexpected state received!\n"); I find it useful to print the expected/received state to be able to figure out what went wrong. > + exit(1); > + } > + > + return ret; > +} > + > /* save */ > > -static __attribute__((unused)) bool is_master(void) > +static bool is_master(void) > { > MigrationState *s = migrate_get_current(); > return (s->state == MIG_STATE_COLO); > } > > +static int do_colo_transaction(MigrationState *s, QEMUFile *control, > + QEMUFile *trans) > +{ > + int ret; > + > + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW); > + if (ret) { > + goto out; > + } > + > + ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED); What happens at this point if the slave just doesn't respond? (i.e. the socket doesn't drop - you just don't get the byte). > + if (ret) { > + goto out; > + } > + > + /* TODO: suspend and save vm state to colo buffer */ > + > + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND); > + if (ret) { > + goto out; > + } > + > + /* TODO: send vmstate to slave */ > + > + ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED); > + if (ret) { > + goto out; > + } > + > + /* TODO: Flush network etc. */ > + > + ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED); > + if (ret) { > + goto out; > + } > + > + /* TODO: resume master */ > + > +out: > + return ret; > +} > + > static void *colo_thread(void *opaque) > { > MigrationState *s = opaque; > int dev_hotplug = qdev_hotplug, wait_cp = 0; > int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); > int64_t current_time; > + QEMUFile *colo_control = NULL, *colo_trans = NULL; > + int ret; > > if (colo_compare_init() < 0) { > error_report("Init colo compare error\n"); > goto out; > } > > + colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb"); > + if (!colo_control) { > + error_report("open colo_control failed\n"); > + goto out; > + } In my postcopy world I'm trying to abstract this type of thing into a 'return path' so that the QEMUFile can implement it however it wants and you don't need to assume it's a socket. But I'm still fighting some of those details. Dave > + > qdev_hotplug = 0; > > colo_buffer_init(); > > + /* > + * Wait for slave finish loading vm states and enter COLO > + * restore. > + */ > + ret = colo_ctl_get(colo_control, COLO_READY); > + if (ret) { > + goto out; > + } > + > while (s->state == MIG_STATE_COLO) { > /* wait for a colo checkpoint */ > wait_cp = colo_compare(); > @@ -230,13 +396,33 @@ static void *colo_thread(void *opaque) > > /* start a colo checkpoint */ > > - /*TODO: COLO save */ > + /* open colo buffer for write */ > + colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops); > + if (!colo_trans) { > + error_report("open colo buffer failed\n"); > + goto out; > + } > > + if (do_colo_transaction(s, colo_control, colo_trans)) { > + goto out; > + } > + > + qemu_fclose(colo_trans); > + colo_trans = NULL; > start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); > } > > out: > + if (colo_trans) { > + qemu_fclose(colo_trans); > + } > + > colo_buffer_destroy(); > + > + if (colo_control) { > + qemu_fclose(colo_control); > + } > + > colo_compare_destroy(); > > if (s->state != MIG_STATE_ERROR) { > @@ -281,7 +467,7 @@ void colo_init_checkpointer(MigrationState *s) > > static Coroutine *colo; > > -static __attribute__((unused)) bool is_slave(void) > +static bool is_slave(void) > { > return colo != NULL; > } > @@ -293,13 +479,32 @@ static __attribute__((unused)) bool is_slave(void) > */ > static int slave_wait_new_checkpoint(QEMUFile *f) > { > - /* TODO: wait checkpoint start command from master */ > - return 1; > + int fd = qemu_get_fd(f); > + int ret; > + uint64_t cmd; > + > + yield_until_fd_readable(fd); > + > + ret = colo_ctl_get_value(f, &cmd); > + if (ret) { > + return 1; > + } > + > + if (cmd == COLO_CHECKPOINT_NEW) { > + return 0; > + } else { > + /* Unexpected data received */ > + ctl_error_handler(f, ret); > + return 1; > + } > } > > void colo_process_incoming_checkpoints(QEMUFile *f) > { > + int fd = qemu_get_fd(f); > int dev_hotplug = qdev_hotplug; > + QEMUFile *ctl = NULL; > + int ret; > > if (!restore_use_colo()) { > return; > @@ -310,18 +515,69 @@ void colo_process_incoming_checkpoints(QEMUFile *f) > colo = qemu_coroutine_self(); > assert(colo != NULL); > > + ctl = qemu_fopen_socket(fd, "wb"); > + if (!ctl) { > + error_report("can't open incoming channel\n"); > + goto out; > + } > + > colo_buffer_init(); > > + ret = colo_ctl_put(ctl, COLO_READY); > + if (ret) { > + goto out; > + } > + > + /* TODO: in COLO mode, slave is runing, so start the vm */ > + > while (true) { > if (slave_wait_new_checkpoint(f)) { > break; > } > > - /* TODO: COLO restore */ > + /* start colo checkpoint */ > + > + /* TODO: suspend guest */ > + > + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED); > + if (ret) { > + goto out; > + } > + > + /* TODO: open colo buffer for read */ > + > + ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND); > + if (ret) { > + goto out; > + } > + > + /* TODO: read migration data into colo buffer */ > + > + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED); > + if (ret) { > + goto out; > + } > + > + /* TODO: load vm state */ > + > + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED); > + if (ret) { > + goto out; > + } > + > + /* TODO: resume guest */ > + > + /* TODO: close colo buffer */ > } > > +out: > colo_buffer_destroy(); > colo = NULL; > + > + if (ctl) { > + qemu_fclose(ctl); > + } > + > restore_exit_colo(); > > qdev_hotplug = dev_hotplug; > -- > 1.9.1 > -- Dr. David Alan Gilbert / dgilbert@xxxxxxxxxx / Manchester, UK -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html