Re: [PATCH v2 37/41] postcopy: implement outgoing part of postcopy live migration

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Isaku Yamahata <yamahata@xxxxxxxxxxxxx> wrote:
> This patch implements postcopy live migration for outgoing part
>
> Signed-off-by: Isaku Yamahata <yamahata@xxxxxxxxxxxxx>
> ---
> Changes v1 -> v2:
> - fix parameter to qemu_fdopen()
> - handle QEMU_UMEM_REQ_EOC properly
>   when PO_STATE_ALL_PAGES_SENT, QEMU_UMEM_REQ_EOC request was ignored.
>   handle properly it.
> - flush on-demand page unconditionally
> - improve postcopy_outgoing_ram_save_live and postcopy_outgoing_begin()
> - use qemu_fopen_fd
> - use memory api instead of obsolete api
> - segv in postcopy_outgoing_check_all_ram_sent()
> - catch up qapi change
> ---
>  arch_init.c               |   19 ++-
>  migration-exec.c          |    4 +
>  migration-fd.c            |   17 ++
>  migration-postcopy-stub.c |   22 +++
>  migration-postcopy.c      |  450 +++++++++++++++++++++++++++++++++++++++++++++
>  migration-tcp.c           |   25 ++-
>  migration-unix.c          |   26 ++-
>  migration.c               |   32 +++-
>  migration.h               |   12 ++
>  savevm.c                  |   22 ++-
>  sysemu.h                  |    2 +-
>  11 files changed, 614 insertions(+), 17 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 22d9691..3599e5c 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -154,6 +154,13 @@ static int is_dup_page(uint8_t *page)
>      return 1;
>  }
>  
> +static bool outgoing_postcopy = false;
> +
> +void ram_save_set_params(const MigrationParams *params, void *opaque)
> +{
> +    outgoing_postcopy = params->postcopy;
> +}
> +
>  static RAMBlock *last_block_sent = NULL;
>  static uint64_t bytes_transferred;
>  
> @@ -343,6 +350,15 @@ int ram_save_live(QEMUFile *f, int stage, void *opaque)
>      uint64_t expected_time = 0;
>      int ret;
>  
> +    if (stage == 1) {
> +        bytes_transferred = 0;
> +        last_block_sent = NULL;
> +        ram_save_set_last_block(NULL, 0);
> +    }
> +    if (outgoing_postcopy) {
> +        return postcopy_outgoing_ram_save_live(f, stage, opaque);
> +    }
> +
>      if (stage < 0) {
>          memory_global_dirty_log_stop();
>          return 0;
> @@ -351,9 +367,6 @@ int ram_save_live(QEMUFile *f, int stage, void *opaque)
>      memory_global_sync_dirty_bitmap(get_system_memory());
>  
>      if (stage == 1) {
> -        bytes_transferred = 0;
> -        last_block_sent = NULL;
> -        ram_save_set_last_block(NULL, 0);
>          sort_ram_list();
>  
>          /* Make sure all dirty bits are set */
> diff --git a/migration-exec.c b/migration-exec.c
> index 7f08b3b..a90da5c 100644
> --- a/migration-exec.c
> +++ b/migration-exec.c
> @@ -64,6 +64,10 @@ int exec_start_outgoing_migration(MigrationState *s, const char *command)
>  {
>      FILE *f;
>  
> +    if (s->params.postcopy) {
> +        return -ENOSYS;
> +    }
> +
>      f = popen(command, "w");
>      if (f == NULL) {
>          DPRINTF("Unable to popen exec target\n");
> diff --git a/migration-fd.c b/migration-fd.c
> index 42b8162..83b5f18 100644
> --- a/migration-fd.c
> +++ b/migration-fd.c
> @@ -90,6 +90,23 @@ int fd_start_outgoing_migration(MigrationState *s, const char *fdname)
>      s->write = fd_write;
>      s->close = fd_close;
>  
> +    if (s->params.postcopy) {
> +        int flags = fcntl(s->fd, F_GETFL);
> +        if ((flags & O_ACCMODE) != O_RDWR) {
> +            goto err_after_open;
> +        }
> +
> +        s->fd_read = dup(s->fd);
> +        if (s->fd_read == -1) {
> +            goto err_after_open;
> +        }
> +        s->file_read = qemu_fopen_fd(s->fd_read);
> +        if (s->file_read == NULL) {
> +            close(s->fd_read);
> +            goto err_after_open;
> +        }
> +    }
> +
>      migrate_fd_connect(s);
>      return 0;
>  
> diff --git a/migration-postcopy-stub.c b/migration-postcopy-stub.c
> index f9ebcbe..9c64827 100644
> --- a/migration-postcopy-stub.c
> +++ b/migration-postcopy-stub.c
> @@ -24,6 +24,28 @@
>  #include "sysemu.h"
>  #include "migration.h"
>  
> +int postcopy_outgoing_create_read_socket(MigrationState *s)
> +{
> +    return -ENOSYS;
> +}
> +
> +int postcopy_outgoing_ram_save_live(Monitor *mon,
> +                                    QEMUFile *f, int stage, void *opaque)
> +{
> +    return -ENOSYS;
> +}
> +
> +void *postcopy_outgoing_begin(MigrationState *ms)
> +{
> +    return NULL;
> +}
> +
> +int postcopy_outgoing_ram_save_background(Monitor *mon, QEMUFile *f,
> +                                          void *postcopy)
> +{
> +    return -ENOSYS;
> +}
> +
>  int postcopy_incoming_init(const char *incoming, bool incoming_postcopy)
>  {
>      return -ENOSYS;
> diff --git a/migration-postcopy.c b/migration-postcopy.c
> index 5913e05..eb37094 100644
> --- a/migration-postcopy.c
> +++ b/migration-postcopy.c
> @@ -177,6 +177,456 @@ static void postcopy_incoming_send_req(QEMUFile *f,
>      }
>  }
>  
> +static int postcopy_outgoing_recv_req_idstr(QEMUFile *f,
> +                                            struct qemu_umem_req *req,
> +                                            size_t *offset)
> +{
> +    int ret;
> +
> +    req->len = qemu_peek_byte(f, *offset);
> +    *offset += 1;
> +    if (req->len == 0) {
> +        return -EAGAIN;
> +    }
> +    req->idstr = g_malloc((int)req->len + 1);
> +    ret = qemu_peek_buffer(f, (uint8_t*)req->idstr, req->len, *offset);
> +    *offset += ret;
> +    if (ret != req->len) {
> +        g_free(req->idstr);
> +        req->idstr = NULL;
> +        return -EAGAIN;
> +    }
> +    req->idstr[req->len] = 0;
> +    return 0;
> +}
> +
> +static int postcopy_outgoing_recv_req_pgoffs(QEMUFile *f,
> +                                             struct qemu_umem_req *req,
> +                                             size_t *offset)
> +{
> +    int ret;
> +    uint32_t be32;
> +    uint32_t i;
> +
> +    ret = qemu_peek_buffer(f, (uint8_t*)&be32, sizeof(be32), *offset);
> +    *offset += sizeof(be32);
> +    if (ret != sizeof(be32)) {
> +        return -EAGAIN;
> +    }
> +
> +    req->nr = be32_to_cpu(be32);
> +    req->pgoffs = g_new(uint64_t, req->nr);
> +    for (i = 0; i < req->nr; i++) {
> +        uint64_t be64;
> +        ret = qemu_peek_buffer(f, (uint8_t*)&be64, sizeof(be64), *offset);
> +        *offset += sizeof(be64);
> +        if (ret != sizeof(be64)) {
> +            g_free(req->pgoffs);
> +            req->pgoffs = NULL;
> +            return -EAGAIN;
> +        }
> +        req->pgoffs[i] = be64_to_cpu(be64);
> +    }
> +    return 0;
> +}
> +
> +static int postcopy_outgoing_recv_req(QEMUFile *f, struct qemu_umem_req *req)
> +{
> +    int size;
> +    int ret;
> +    size_t offset = 0;
> +
> +    size = qemu_peek_buffer(f, (uint8_t*)&req->cmd, 1, offset);
> +    if (size <= 0) {
> +        return -EAGAIN;
> +    }
> +    offset += 1;
> +
> +    switch (req->cmd) {
> +    case QEMU_UMEM_REQ_INIT:
> +    case QEMU_UMEM_REQ_EOC:
> +        /* nothing */
> +        break;
> +    case QEMU_UMEM_REQ_ON_DEMAND:
> +    case QEMU_UMEM_REQ_BACKGROUND:
> +    case QEMU_UMEM_REQ_REMOVE:
> +        ret = postcopy_outgoing_recv_req_idstr(f, req, &offset);
> +        if (ret < 0) {
> +            return ret;
> +        }
> +        ret = postcopy_outgoing_recv_req_pgoffs(f, req, &offset);
> +        if (ret < 0) {
> +            return ret;
> +        }
> +        break;
> +    case QEMU_UMEM_REQ_ON_DEMAND_CONT:
> +    case QEMU_UMEM_REQ_BACKGROUND_CONT:
> +        ret = postcopy_outgoing_recv_req_pgoffs(f, req, &offset);
> +        if (ret < 0) {
> +            return ret;
> +        }
> +        break;
> +    default:
> +        abort();
> +        break;
> +    }
> +    qemu_file_skip(f, offset);
> +    DPRINTF("cmd %d\n", req->cmd);
> +    return 0;
> +}
> +
> +static void postcopy_outgoing_free_req(struct qemu_umem_req *req)
> +{
> +    g_free(req->idstr);
> +    g_free(req->pgoffs);
> +}
> +
> +/***************************************************************************
> + * outgoing part
> + */
> +
> +#define QEMU_SAVE_LIVE_STAGE_START      0x01    /* = QEMU_VM_SECTION_START */
> +#define QEMU_SAVE_LIVE_STAGE_PART       0x02    /* = QEMU_VM_SECTION_PART */
> +#define QEMU_SAVE_LIVE_STAGE_END        0x03    /* = QEMU_VM_SECTION_END */
> +
> +enum POState {
> +    PO_STATE_ERROR_RECEIVE,
> +    PO_STATE_ACTIVE,
> +    PO_STATE_EOC_RECEIVED,
> +    PO_STATE_ALL_PAGES_SENT,
> +    PO_STATE_COMPLETED,
> +};
> +typedef enum POState POState;
> +
> +struct PostcopyOutgoingState {
> +    POState state;
> +    QEMUFile *mig_read;
> +    int fd_read;
> +    RAMBlock *last_block_read;
> +
> +    QEMUFile *mig_buffered_write;
> +    MigrationState *ms;
> +
> +    /* For nobg mode. Check if all pages are sent */
> +    RAMBlock *block;
> +    ram_addr_t offset;
> +};
> +typedef struct PostcopyOutgoingState PostcopyOutgoingState;
> +
> +int postcopy_outgoing_create_read_socket(MigrationState *s)
> +{
> +    if (!s->params.postcopy) {
> +        return 0;
> +    }
> +
> +    s->fd_read = dup(s->fd);
> +    if (s->fd_read == -1) {
> +        int ret = -errno;
> +        perror("dup");
> +        return ret;
> +    }
> +    s->file_read = qemu_fopen_socket(s->fd_read);
> +    if (s->file_read == NULL) {
> +        return -EINVAL;
> +    }
> +    return 0;
> +}
> +
> +int postcopy_outgoing_ram_save_live(QEMUFile *f, int stage, void *opaque)
> +{
> +    int ret = 0;
> +    DPRINTF("stage %d\n", stage);
> +    switch (stage) {
> +    case QEMU_SAVE_LIVE_STAGE_START:
> +        sort_ram_list();
> +        ram_save_live_mem_size(f);
> +        break;
> +    case QEMU_SAVE_LIVE_STAGE_PART:
> +        ret = 1;
> +        break;
> +    case QEMU_SAVE_LIVE_STAGE_END:
> +        break;
> +    default:
> +        abort();
> +    }
> +    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
> +    return ret;
> +}
> +
> +/*
> + * return value
> + *   0: continue postcopy mode
> + * > 0: completed postcopy mode.
> + * < 0: error
> + */
> +static int postcopy_outgoing_handle_req(PostcopyOutgoingState *s,
> +                                        const struct qemu_umem_req *req,
> +                                        bool *written)
> +{
> +    int i;
> +    RAMBlock *block;
> +
> +    DPRINTF("cmd %d state %d\n", req->cmd, s->state);
> +    switch(req->cmd) {
> +    case QEMU_UMEM_REQ_INIT:
> +        /* nothing */
> +        break;
> +    case QEMU_UMEM_REQ_EOC:
> +        /* tell to finish migration. */
> +        if (s->state == PO_STATE_ALL_PAGES_SENT) {
> +            s->state = PO_STATE_COMPLETED;
> +            DPRINTF("-> PO_STATE_COMPLETED\n");
> +        } else {
> +            s->state = PO_STATE_EOC_RECEIVED;
> +            DPRINTF("-> PO_STATE_EOC_RECEIVED\n");
> +        }
> +        return 1;
> +    case QEMU_UMEM_REQ_ON_DEMAND:
> +    case QEMU_UMEM_REQ_BACKGROUND:
> +        DPRINTF("idstr: %s\n", req->idstr);
> +        block = ram_find_block(req->idstr, strlen(req->idstr));
> +        if (block == NULL) {
> +            return -EINVAL;
> +        }
> +        s->last_block_read = block;
> +        /* fall through */
> +    case QEMU_UMEM_REQ_ON_DEMAND_CONT:
> +    case QEMU_UMEM_REQ_BACKGROUND_CONT:
> +        DPRINTF("nr %d\n", req->nr);
> +        if (s->mig_buffered_write == NULL) {
> +            assert(s->state == PO_STATE_ALL_PAGES_SENT);
> +            break;
> +        }
> +        for (i = 0; i < req->nr; i++) {
> +            DPRINTF("offs[%d] 0x%"PRIx64"\n", i, req->pgoffs[i]);
> +            int ret = ram_save_page(s->mig_buffered_write, s->last_block_read,
> +                                    req->pgoffs[i] << TARGET_PAGE_BITS);
> +            if (ret > 0) {
> +                *written = true;
> +            }
> +        }
> +        break;
> +    case QEMU_UMEM_REQ_REMOVE:
> +        block = ram_find_block(req->idstr, strlen(req->idstr));
> +        if (block == NULL) {
> +            return -EINVAL;
> +        }
> +        for (i = 0; i < req->nr; i++) {
> +            ram_addr_t offset = req->pgoffs[i] << TARGET_PAGE_BITS;
> +            memory_region_reset_dirty(block->mr, offset, TARGET_PAGE_SIZE,
> +                                      MIGRATION_DIRTY_FLAG);
> +        }
> +        break;
> +    default:
> +        return -EINVAL;
> +    }
> +    return 0;
> +}
> +
> +static void postcopy_outgoing_close_mig_read(PostcopyOutgoingState *s)
> +{
> +    if (s->mig_read != NULL) {
> +        qemu_set_fd_handler(s->fd_read, NULL, NULL, NULL);
> +        qemu_fclose(s->mig_read);
> +        s->mig_read = NULL;
> +        fd_close(&s->fd_read);
> +
> +        s->ms->file_read = NULL;
> +        s->ms->fd_read = -1;
> +    }
> +}
> +
> +static void postcopy_outgoing_completed(PostcopyOutgoingState *s)
> +{
> +    postcopy_outgoing_close_mig_read(s);
> +    s->ms->postcopy = NULL;
> +    g_free(s);
> +}
> +
> +static void postcopy_outgoing_recv_handler(void *opaque)
> +{
> +    PostcopyOutgoingState *s = opaque;
> +    bool written = false;
> +    int ret = 0;
> +
> +    assert(s->state == PO_STATE_ACTIVE ||
> +           s->state == PO_STATE_ALL_PAGES_SENT);
> +
> +    do {
> +        struct qemu_umem_req req = {.idstr = NULL,
> +                                    .pgoffs = NULL};
> +
> +        ret = postcopy_outgoing_recv_req(s->mig_read, &req);
> +        if (ret < 0) {
> +            if (ret == -EAGAIN) {
> +                ret = 0;
> +            }
> +            break;
> +        }
> +
> +        /* Even when s->state == PO_STATE_ALL_PAGES_SENT,
> +           some request can be received like QEMU_UMEM_REQ_EOC */
> +        ret = postcopy_outgoing_handle_req(s, &req, &written);
> +        postcopy_outgoing_free_req(&req);
> +    } while (ret == 0);
> +
> +    /*
> +     * flush buffered_file.
> +     * Although mig_write is rate-limited buffered file, those written pages
> +     * are requested on demand by the destination. So forcibly push
> +     * those pages ignoring rate limiting
> +     */
> +    if (written) {
> +        qemu_buffered_file_drain(s->mig_buffered_write);
> +    }
> +
> +    if (ret < 0) {
> +        switch (s->state) {
> +        case PO_STATE_ACTIVE:
> +            s->state = PO_STATE_ERROR_RECEIVE;
> +            DPRINTF("-> PO_STATE_ERROR_RECEIVE\n");
> +            break;
> +        case PO_STATE_ALL_PAGES_SENT:
> +            s->state = PO_STATE_COMPLETED;
> +            DPRINTF("-> PO_STATE_ALL_PAGES_SENT\n");
> +            break;
> +        default:
> +            abort();
> +        }
> +    }
> +    if (s->state == PO_STATE_ERROR_RECEIVE || s->state == PO_STATE_COMPLETED) {
> +        postcopy_outgoing_close_mig_read(s);
> +    }
> +    if (s->state == PO_STATE_COMPLETED) {
> +        DPRINTF("PO_STATE_COMPLETED\n");
> +        MigrationState *ms = s->ms;
> +        postcopy_outgoing_completed(s);
> +        migrate_fd_completed(ms);
> +    }
> +}
> +
> +void *postcopy_outgoing_begin(MigrationState *ms)
> +{
> +    PostcopyOutgoingState *s = g_new(PostcopyOutgoingState, 1);
> +    DPRINTF("outgoing begin\n");
> +    qemu_fflush(ms->file);
> +
> +    s->ms = ms;
> +    s->state = PO_STATE_ACTIVE;
> +    s->fd_read = ms->fd_read;
> +    s->mig_read = ms->file_read;
> +    s->mig_buffered_write = ms->file;
> +    s->block = NULL;
> +    s->offset = 0;
> +
> +    /* Make sure all dirty bits are set */
> +    cpu_physical_memory_set_dirty_tracking(0);
> +    ram_save_memory_set_dirty();
> +
> +    qemu_set_fd_handler(s->fd_read,
> +                        &postcopy_outgoing_recv_handler, NULL, s);
> +    return s;
> +}

Why des it return void*?  It can just return PostcopyOutgoingState, and
be doing with it?



> @@ -111,11 +119,19 @@ int unix_start_outgoing_migration(MigrationState *s, const char *path)
>  
>      if (ret < 0) {
>          DPRINTF("connect failed\n");
> -        migrate_fd_error(s);
> -        return ret;
> +        goto error_out;
> +    }
> +
> +    ret = postcopy_outgoing_create_read_socket(s);
> +    if (ret < 0) {
> +        goto error_out;
>      }
>      migrate_fd_connect(s);
>      return 0;
> +
> +error_out:
> +    migrate_fd_error(s);
> +    return ret;
>  }
>  

Wondering if we can refactor this in a single place?  We are repeating
it for each migration type.

> diff --git a/sysemu.h b/sysemu.h
> index 3857cf0..6ee4cd8 100644
> --- a/sysemu.h
> +++ b/sysemu.h
> @@ -79,7 +79,7 @@ void qemu_announce_self(void);
>  bool qemu_savevm_state_blocked(Error **errp);
>  int qemu_savevm_state_begin(QEMUFile *f, const MigrationParams *params);
>  int qemu_savevm_state_iterate(QEMUFile *f);
> -int qemu_savevm_state_complete(QEMUFile *f);
> +int qemu_savevm_state_complete(QEMUFile *f, bool postcopy);
>  void qemu_savevm_state_cancel(QEMUFile *f);
>  int qemu_loadvm_state(QEMUFile *f);

A question, could we just pass the params structure?  otherwise, if we
need another argument for another extensio, we should have to add yet
another argument.

I still need to take a more detailed look to see if I can see an easy
way to integrate postcopy, still quite a bit of code repeated.

Later, Juan.
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux