Re: [RFC PATCH 11/17] COLO ctl: implement colo checkpoint protocol

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



* Yang Hongyang (yanghy@xxxxxxxxxxxxxx) wrote:
> implement colo checkpoint protocol.
> 
> Checkpoint synchronzing points.
> 
>                   Primary                 Secondary
>   NEW             @
>                                           Suspend
>   SUSPENDED                               @
>                   Suspend&Save state
>   SEND            @
>                   Send state              Receive state
>   RECEIVED                                @
>                   Flush network           Load state
>   LOADED                                  @
>                   Resume                  Resume
> 
>                   Start Comparing
> NOTE:
>  1) '@' who sends the message
>  2) Every sync-point is synchronized by two sides with only
>     one handshake(single direction) for low-latency.
>     If more strict synchronization is required, a opposite direction
>     sync-point should be added.
>  3) Since sync-points are single direction, the remote side may
>     go forward a lot when this side just receives the sync-point.
> 
> Signed-off-by: Yang Hongyang <yanghy@xxxxxxxxxxxxxx>
> ---
>  migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 262 insertions(+), 6 deletions(-)
> 
> diff --git a/migration-colo.c b/migration-colo.c
> index 2699e77..a708872 100644
> --- a/migration-colo.c
> +++ b/migration-colo.c
> @@ -24,6 +24,41 @@
>   */
>  #define CHKPOINT_TIMER 10000
>  
> +enum {
> +    COLO_READY = 0x46,
> +
> +    /*
> +     * Checkpoint synchronzing points.
> +     *
> +     *                  Primary                 Secondary
> +     *  NEW             @
> +     *                                          Suspend
> +     *  SUSPENDED                               @
> +     *                  Suspend&Save state
> +     *  SEND            @
> +     *                  Send state              Receive state
> +     *  RECEIVED                                @
> +     *                  Flush network           Load state
> +     *  LOADED                                  @
> +     *                  Resume                  Resume
> +     *
> +     *                  Start Comparing
> +     * NOTE:
> +     * 1) '@' who sends the message
> +     * 2) Every sync-point is synchronized by two sides with only
> +     *    one handshake(single direction) for low-latency.
> +     *    If more strict synchronization is required, a opposite direction
> +     *    sync-point should be added.
> +     * 3) Since sync-points are single direction, the remote side may
> +     *    go forward a lot when this side just receives the sync-point.
> +     */
> +    COLO_CHECKPOINT_NEW,
> +    COLO_CHECKPOINT_SUSPENDED,
> +    COLO_CHECKPOINT_SEND,
> +    COLO_CHECKPOINT_RECEIVED,
> +    COLO_CHECKPOINT_LOADED,
> +};
> +
>  static QEMUBH *colo_bh;
>  
>  bool colo_supported(void)
> @@ -185,30 +220,161 @@ static const QEMUFileOps colo_read_ops = {
>      .close = colo_close,
>  };
>  
> +/* colo checkpoint control helper */
> +static bool is_master(void);
> +static bool is_slave(void);
> +
> +static void ctl_error_handler(void *opaque, int err)
> +{
> +    if (is_slave()) {
> +        /* TODO: determine whether we need to failover */
> +        /* FIXME: we will not failover currently, just kill slave */
> +        error_report("error: colo transmission failed!\n");
> +        exit(1);
> +    } else if (is_master()) {
> +        /* Master still alive, do not failover */
> +        error_report("error: colo transmission failed!\n");
> +        return;
> +    } else {
> +        error_report("COLO: Unexpected error happend!\n");
> +        exit(EXIT_FAILURE);
> +    }
> +}
> +
> +static int colo_ctl_put(QEMUFile *f, uint64_t request)
> +{
> +    int ret = 0;
> +
> +    qemu_put_be64(f, request);
> +    qemu_fflush(f);
> +
> +    ret = qemu_file_get_error(f);
> +    if (ret < 0) {
> +        ctl_error_handler(f, ret);
> +        return 1;
> +    }
> +
> +    return ret;
> +}
> +
> +static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
> +{
> +    int ret = 0;
> +    uint64_t temp;
> +
> +    temp = qemu_get_be64(f);
> +
> +    ret = qemu_file_get_error(f);
> +    if (ret < 0) {
> +        ctl_error_handler(f, ret);
> +        return 1;
> +    }
> +
> +    *value = temp;
> +    return 0;
> +}
> +
> +static int colo_ctl_get(QEMUFile *f, uint64_t require)
> +{
> +    int ret;
> +    uint64_t value;
> +
> +    ret = colo_ctl_get_value(f, &value);
> +    if (ret) {
> +        return ret;
> +    }
> +
> +    if (value != require) {
> +        error_report("unexpected state received!\n");

I find it useful to print the expected/received state to
be able to figure out what went wrong.

> +        exit(1);
> +    }
> +
> +    return ret;
> +}
> +
>  /* save */
>  
> -static __attribute__((unused)) bool is_master(void)
> +static bool is_master(void)
>  {
>      MigrationState *s = migrate_get_current();
>      return (s->state == MIG_STATE_COLO);
>  }
>  
> +static int do_colo_transaction(MigrationState *s, QEMUFile *control,
> +                               QEMUFile *trans)
> +{
> +    int ret;
> +
> +    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);

What happens at this point if the slave just doesn't respond?
(i.e. the socket doesn't drop - you just don't get the byte).

> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: suspend and save vm state to colo buffer */
> +
> +    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: send vmstate to slave */
> +
> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: Flush network etc. */
> +
> +    ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: resume master */
> +
> +out:
> +    return ret;
> +}
> +
>  static void *colo_thread(void *opaque)
>  {
>      MigrationState *s = opaque;
>      int dev_hotplug = qdev_hotplug, wait_cp = 0;
>      int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>      int64_t current_time;
> +    QEMUFile *colo_control = NULL, *colo_trans = NULL;
> +    int ret;
>  
>      if (colo_compare_init() < 0) {
>          error_report("Init colo compare error\n");
>          goto out;
>      }
>  
> +    colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
> +    if (!colo_control) {
> +        error_report("open colo_control failed\n");
> +        goto out;
> +    }

In my postcopy world I'm trying to abstract this type of thing into a 'return path'
so that the QEMUFile can implement it however it wants and you don't
need to assume it's a socket.  But I'm still fighting some of those details.

Dave

> +
>      qdev_hotplug = 0;
>  
>      colo_buffer_init();
>  
> +    /*
> +     * Wait for slave finish loading vm states and enter COLO
> +     * restore.
> +     */
> +    ret = colo_ctl_get(colo_control, COLO_READY);
> +    if (ret) {
> +        goto out;
> +    }
> +
>      while (s->state == MIG_STATE_COLO) {
>          /* wait for a colo checkpoint */
>          wait_cp = colo_compare();
> @@ -230,13 +396,33 @@ static void *colo_thread(void *opaque)
>  
>          /* start a colo checkpoint */
>  
> -        /*TODO: COLO save */
> +        /* open colo buffer for write */
> +        colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops);
> +        if (!colo_trans) {
> +            error_report("open colo buffer failed\n");
> +            goto out;
> +        }
>  
> +        if (do_colo_transaction(s, colo_control, colo_trans)) {
> +            goto out;
> +        }
> +
> +        qemu_fclose(colo_trans);
> +        colo_trans = NULL;
>          start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>      }
>  
>  out:
> +    if (colo_trans) {
> +        qemu_fclose(colo_trans);
> +    }
> +
>      colo_buffer_destroy();
> +
> +    if (colo_control) {
> +        qemu_fclose(colo_control);
> +    }
> +
>      colo_compare_destroy();
>  
>      if (s->state != MIG_STATE_ERROR) {
> @@ -281,7 +467,7 @@ void colo_init_checkpointer(MigrationState *s)
>  
>  static Coroutine *colo;
>  
> -static __attribute__((unused)) bool is_slave(void)
> +static bool is_slave(void)
>  {
>      return colo != NULL;
>  }
> @@ -293,13 +479,32 @@ static __attribute__((unused)) bool is_slave(void)
>   */
>  static int slave_wait_new_checkpoint(QEMUFile *f)
>  {
> -    /* TODO: wait checkpoint start command from master */
> -    return 1;
> +    int fd = qemu_get_fd(f);
> +    int ret;
> +    uint64_t cmd;
> +
> +    yield_until_fd_readable(fd);
> +
> +    ret = colo_ctl_get_value(f, &cmd);
> +    if (ret) {
> +        return 1;
> +    }
> +
> +    if (cmd == COLO_CHECKPOINT_NEW) {
> +        return 0;
> +    } else {
> +        /* Unexpected data received */
> +        ctl_error_handler(f, ret);
> +        return 1;
> +    }
>  }
>  
>  void colo_process_incoming_checkpoints(QEMUFile *f)
>  {
> +    int fd = qemu_get_fd(f);
>      int dev_hotplug = qdev_hotplug;
> +    QEMUFile *ctl = NULL;
> +    int ret;
>  
>      if (!restore_use_colo()) {
>          return;
> @@ -310,18 +515,69 @@ void colo_process_incoming_checkpoints(QEMUFile *f)
>      colo = qemu_coroutine_self();
>      assert(colo != NULL);
>  
> +    ctl = qemu_fopen_socket(fd, "wb");
> +    if (!ctl) {
> +        error_report("can't open incoming channel\n");
> +        goto out;
> +    }
> +
>      colo_buffer_init();
>  
> +    ret = colo_ctl_put(ctl, COLO_READY);
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* TODO: in COLO mode, slave is runing, so start the vm */
> +
>      while (true) {
>          if (slave_wait_new_checkpoint(f)) {
>              break;
>          }
>  
> -        /* TODO: COLO restore */
> +        /* start colo checkpoint */
> +
> +        /* TODO: suspend guest */
> +
> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
> +        if (ret) {
> +            goto out;
> +        }
> +
> +        /* TODO: open colo buffer for read */
> +
> +        ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
> +        if (ret) {
> +            goto out;
> +        }
> +
> +        /* TODO: read migration data into colo buffer */
> +
> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
> +        if (ret) {
> +            goto out;
> +        }
> +
> +        /* TODO: load vm state */
> +
> +        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
> +        if (ret) {
> +            goto out;
> +        }
> +
> +        /* TODO: resume guest */
> +
> +        /* TODO: close colo buffer */
>      }
>  
> +out:
>      colo_buffer_destroy();
>      colo = NULL;
> +
> +    if (ctl) {
> +        qemu_fclose(ctl);
> +    }
> +
>      restore_exit_colo();
>  
>      qdev_hotplug = dev_hotplug;
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@xxxxxxxxxx / Manchester, UK
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux