[RFC PATCH 11/17] COLO ctl: implement colo checkpoint protocol

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



implement colo checkpoint protocol.

Checkpoint synchronzing points.

                  Primary                 Secondary
  NEW             @
                                          Suspend
  SUSPENDED                               @
                  Suspend&Save state
  SEND            @
                  Send state              Receive state
  RECEIVED                                @
                  Flush network           Load state
  LOADED                                  @
                  Resume                  Resume

                  Start Comparing
NOTE:
 1) '@' who sends the message
 2) Every sync-point is synchronized by two sides with only
    one handshake(single direction) for low-latency.
    If more strict synchronization is required, a opposite direction
    sync-point should be added.
 3) Since sync-points are single direction, the remote side may
    go forward a lot when this side just receives the sync-point.

Signed-off-by: Yang Hongyang <yanghy@xxxxxxxxxxxxxx>
---
 migration-colo.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 262 insertions(+), 6 deletions(-)

diff --git a/migration-colo.c b/migration-colo.c
index 2699e77..a708872 100644
--- a/migration-colo.c
+++ b/migration-colo.c
@@ -24,6 +24,41 @@
  */
 #define CHKPOINT_TIMER 10000
 
+enum {
+    COLO_READY = 0x46,
+
+    /*
+     * Checkpoint synchronzing points.
+     *
+     *                  Primary                 Secondary
+     *  NEW             @
+     *                                          Suspend
+     *  SUSPENDED                               @
+     *                  Suspend&Save state
+     *  SEND            @
+     *                  Send state              Receive state
+     *  RECEIVED                                @
+     *                  Flush network           Load state
+     *  LOADED                                  @
+     *                  Resume                  Resume
+     *
+     *                  Start Comparing
+     * NOTE:
+     * 1) '@' who sends the message
+     * 2) Every sync-point is synchronized by two sides with only
+     *    one handshake(single direction) for low-latency.
+     *    If more strict synchronization is required, a opposite direction
+     *    sync-point should be added.
+     * 3) Since sync-points are single direction, the remote side may
+     *    go forward a lot when this side just receives the sync-point.
+     */
+    COLO_CHECKPOINT_NEW,
+    COLO_CHECKPOINT_SUSPENDED,
+    COLO_CHECKPOINT_SEND,
+    COLO_CHECKPOINT_RECEIVED,
+    COLO_CHECKPOINT_LOADED,
+};
+
 static QEMUBH *colo_bh;
 
 bool colo_supported(void)
@@ -185,30 +220,161 @@ static const QEMUFileOps colo_read_ops = {
     .close = colo_close,
 };
 
+/* colo checkpoint control helper */
+static bool is_master(void);
+static bool is_slave(void);
+
+static void ctl_error_handler(void *opaque, int err)
+{
+    if (is_slave()) {
+        /* TODO: determine whether we need to failover */
+        /* FIXME: we will not failover currently, just kill slave */
+        error_report("error: colo transmission failed!\n");
+        exit(1);
+    } else if (is_master()) {
+        /* Master still alive, do not failover */
+        error_report("error: colo transmission failed!\n");
+        return;
+    } else {
+        error_report("COLO: Unexpected error happend!\n");
+        exit(EXIT_FAILURE);
+    }
+}
+
+static int colo_ctl_put(QEMUFile *f, uint64_t request)
+{
+    int ret = 0;
+
+    qemu_put_be64(f, request);
+    qemu_fflush(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret < 0) {
+        ctl_error_handler(f, ret);
+        return 1;
+    }
+
+    return ret;
+}
+
+static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
+{
+    int ret = 0;
+    uint64_t temp;
+
+    temp = qemu_get_be64(f);
+
+    ret = qemu_file_get_error(f);
+    if (ret < 0) {
+        ctl_error_handler(f, ret);
+        return 1;
+    }
+
+    *value = temp;
+    return 0;
+}
+
+static int colo_ctl_get(QEMUFile *f, uint64_t require)
+{
+    int ret;
+    uint64_t value;
+
+    ret = colo_ctl_get_value(f, &value);
+    if (ret) {
+        return ret;
+    }
+
+    if (value != require) {
+        error_report("unexpected state received!\n");
+        exit(1);
+    }
+
+    return ret;
+}
+
 /* save */
 
-static __attribute__((unused)) bool is_master(void)
+static bool is_master(void)
 {
     MigrationState *s = migrate_get_current();
     return (s->state == MIG_STATE_COLO);
 }
 
+static int do_colo_transaction(MigrationState *s, QEMUFile *control,
+                               QEMUFile *trans)
+{
+    int ret;
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
+    if (ret) {
+        goto out;
+    }
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: suspend and save vm state to colo buffer */
+
+    ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: send vmstate to slave */
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: Flush network etc. */
+
+    ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: resume master */
+
+out:
+    return ret;
+}
+
 static void *colo_thread(void *opaque)
 {
     MigrationState *s = opaque;
     int dev_hotplug = qdev_hotplug, wait_cp = 0;
     int64_t start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
     int64_t current_time;
+    QEMUFile *colo_control = NULL, *colo_trans = NULL;
+    int ret;
 
     if (colo_compare_init() < 0) {
         error_report("Init colo compare error\n");
         goto out;
     }
 
+    colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
+    if (!colo_control) {
+        error_report("open colo_control failed\n");
+        goto out;
+    }
+
     qdev_hotplug = 0;
 
     colo_buffer_init();
 
+    /*
+     * Wait for slave finish loading vm states and enter COLO
+     * restore.
+     */
+    ret = colo_ctl_get(colo_control, COLO_READY);
+    if (ret) {
+        goto out;
+    }
+
     while (s->state == MIG_STATE_COLO) {
         /* wait for a colo checkpoint */
         wait_cp = colo_compare();
@@ -230,13 +396,33 @@ static void *colo_thread(void *opaque)
 
         /* start a colo checkpoint */
 
-        /*TODO: COLO save */
+        /* open colo buffer for write */
+        colo_trans = qemu_fopen_ops(&colo_buffer, &colo_write_ops);
+        if (!colo_trans) {
+            error_report("open colo buffer failed\n");
+            goto out;
+        }
 
+        if (do_colo_transaction(s, colo_control, colo_trans)) {
+            goto out;
+        }
+
+        qemu_fclose(colo_trans);
+        colo_trans = NULL;
         start_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
     }
 
 out:
+    if (colo_trans) {
+        qemu_fclose(colo_trans);
+    }
+
     colo_buffer_destroy();
+
+    if (colo_control) {
+        qemu_fclose(colo_control);
+    }
+
     colo_compare_destroy();
 
     if (s->state != MIG_STATE_ERROR) {
@@ -281,7 +467,7 @@ void colo_init_checkpointer(MigrationState *s)
 
 static Coroutine *colo;
 
-static __attribute__((unused)) bool is_slave(void)
+static bool is_slave(void)
 {
     return colo != NULL;
 }
@@ -293,13 +479,32 @@ static __attribute__((unused)) bool is_slave(void)
  */
 static int slave_wait_new_checkpoint(QEMUFile *f)
 {
-    /* TODO: wait checkpoint start command from master */
-    return 1;
+    int fd = qemu_get_fd(f);
+    int ret;
+    uint64_t cmd;
+
+    yield_until_fd_readable(fd);
+
+    ret = colo_ctl_get_value(f, &cmd);
+    if (ret) {
+        return 1;
+    }
+
+    if (cmd == COLO_CHECKPOINT_NEW) {
+        return 0;
+    } else {
+        /* Unexpected data received */
+        ctl_error_handler(f, ret);
+        return 1;
+    }
 }
 
 void colo_process_incoming_checkpoints(QEMUFile *f)
 {
+    int fd = qemu_get_fd(f);
     int dev_hotplug = qdev_hotplug;
+    QEMUFile *ctl = NULL;
+    int ret;
 
     if (!restore_use_colo()) {
         return;
@@ -310,18 +515,69 @@ void colo_process_incoming_checkpoints(QEMUFile *f)
     colo = qemu_coroutine_self();
     assert(colo != NULL);
 
+    ctl = qemu_fopen_socket(fd, "wb");
+    if (!ctl) {
+        error_report("can't open incoming channel\n");
+        goto out;
+    }
+
     colo_buffer_init();
 
+    ret = colo_ctl_put(ctl, COLO_READY);
+    if (ret) {
+        goto out;
+    }
+
+    /* TODO: in COLO mode, slave is runing, so start the vm */
+
     while (true) {
         if (slave_wait_new_checkpoint(f)) {
             break;
         }
 
-        /* TODO: COLO restore */
+        /* start colo checkpoint */
+
+        /* TODO: suspend guest */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: open colo buffer for read */
+
+        ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: read migration data into colo buffer */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: load vm state */
+
+        ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
+        if (ret) {
+            goto out;
+        }
+
+        /* TODO: resume guest */
+
+        /* TODO: close colo buffer */
     }
 
+out:
     colo_buffer_destroy();
     colo = NULL;
+
+    if (ctl) {
+        qemu_fclose(ctl);
+    }
+
     restore_exit_colo();
 
     qdev_hotplug = dev_hotplug;
-- 
1.9.1

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux