Re: [RFC PATCH v3 1/4] separate thread for VM migration

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 08/11/2011 05:32 PM, Umesh Deshpande wrote:
This patch creates a separate thread for the guest migration on the source side.
migrate_cancel request from the iothread is handled asynchronously. That is,
iothread submits migrate_cancel to the migration thread and returns, while the
migration thread attends this request at the next iteration to terminate its
execution.

Looks pretty good!  I hope you agree. :)  Just one note inside.

Signed-off-by: Umesh Deshpande<udeshpan@xxxxxxxxxx>
---
  buffered_file.c |   85 ++++++++++++++++++++++++++++++++----------------------
  buffered_file.h |    4 ++
  migration.c     |   49 ++++++++++++++-----------------
  migration.h     |    6 ++++
  4 files changed, 82 insertions(+), 62 deletions(-)

diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..19932b6 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -16,6 +16,8 @@
  #include "qemu-timer.h"
  #include "qemu-char.h"
  #include "buffered_file.h"
+#include "migration.h"
+#include "qemu-thread.h"

  //#define DEBUG_BUFFERED_FILE

@@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered
      void *opaque;
      QEMUFile *file;
      int has_error;
+    int closed;
      int freeze_output;
      size_t bytes_xfer;
      size_t xfer_limit;
      uint8_t *buffer;
      size_t buffer_size;
      size_t buffer_capacity;
-    QEMUTimer *timer;
+    QemuThread thread;
  } QEMUFileBuffered;

  #ifdef DEBUG_BUFFERED_FILE
@@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
          offset = size;
      }

-    if (pos == 0&&  size == 0) {
-        DPRINTF("file is ready\n");
-        if (s->bytes_xfer<= s->xfer_limit) {
-            DPRINTF("notifying client\n");
-            s->put_ready(s->opaque);
-        }
-    }
-
      return offset;
  }

@@ -175,20 +170,20 @@ static int buffered_close(void *opaque)

      while (!s->has_error&&  s->buffer_size) {
          buffered_flush(s);
-        if (s->freeze_output)
+        if (s->freeze_output) {
              s->wait_for_unfreeze(s);
+        }
      }

This is racy; you might end up calling buffered_put_buffer twice from two different threads.

-    ret = s->close(s->opaque);
+    s->closed = 1;

-    qemu_del_timer(s->timer);
-    qemu_free_timer(s->timer);
+    ret = s->close(s->opaque);
      qemu_free(s->buffer);
-    qemu_free(s);

... similarly, here the migration thread might end up using the buffer. Just set s->closed here and wait for thread completion; the migration thread can handle the flushes free the buffer etc. Let the migration thread do as much as possible, it will simplify your life.

      return ret;
  }

+
  static int buffered_rate_limit(void *opaque)
  {
      QEMUFileBuffered *s = opaque;
@@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque)
      return s->xfer_limit;
  }

-static void buffered_rate_tick(void *opaque)
+static void *migrate_vm(void *opaque)
  {
      QEMUFileBuffered *s = opaque;
+    int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
+    struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};

-    if (s->has_error) {
-        buffered_close(s);
-        return;
-    }
+    qemu_mutex_lock_iothread();

-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    while (!s->closed) {

... This can be in fact

    while (!s->closed || s->buffered_size)

and that alone will subsume the loop in buffered_close, no?

+        if (s->freeze_output) {
+            s->wait_for_unfreeze(s);
+            s->freeze_output = 0;
+            continue;
+        }

-    if (s->freeze_output)
-        return;
+        if (s->has_error) {
+            break;
+        }
+
+        current_time = qemu_get_clock_ms(rt_clock);
+        if (!s->closed&&  (expire_time>  current_time)) {
+            tv.tv_usec = 1000 * (expire_time - current_time);
+            select(0, NULL, NULL, NULL,&tv);
+            continue;
+        }

-    s->bytes_xfer = 0;
+        s->bytes_xfer = 0;
+        buffered_flush(s);

-    buffered_flush(s);
+        expire_time = qemu_get_clock_ms(rt_clock) + 100;
+        s->put_ready(s->opaque);
+    }

-    /* Add some checks around this */
-    s->put_ready(s->opaque);
+    if (s->has_error) {
+        buffered_close(s);
+    }
+    qemu_free(s);
+
+    qemu_mutex_unlock_iothread();
+
+    return NULL;
  }

  QEMUFile *qemu_fopen_ops_buffered(void *opaque,
-                                  size_t bytes_per_sec,
-                                  BufferedPutFunc *put_buffer,
-                                  BufferedPutReadyFunc *put_ready,
-                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
-                                  BufferedCloseFunc *close)
+        size_t bytes_per_sec,
+        BufferedPutFunc *put_buffer,
+        BufferedPutReadyFunc *put_ready,
+        BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
+        BufferedCloseFunc *close)
  {
      QEMUFileBuffered *s;

@@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
      s->put_ready = put_ready;
      s->wait_for_unfreeze = wait_for_unfreeze;
      s->close = close;
+    s->closed = 0;

      s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                               buffered_close, buffered_rate_limit,
                               buffered_set_rate_limit,
-			     buffered_get_rate_limit);
-
-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+                             buffered_get_rate_limit);

-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_thread_create(&s->thread, migrate_vm, s);

      return s->file;
  }
diff --git a/buffered_file.h b/buffered_file.h
index 98d358b..477bf7c 100644
--- a/buffered_file.h
+++ b/buffered_file.h
@@ -17,9 +17,13 @@
  #include "hw/hw.h"

  typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
+typedef void (BufferedBeginFunc)(void *opaque);

Unused typedef.

  typedef void (BufferedPutReadyFunc)(void *opaque);
  typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
  typedef int (BufferedCloseFunc)(void *opaque);
+typedef void (BufferedWaitForCancelFunc)(void *opaque);
+
+void wait_for_cancel(void *opaque);

BufferedWaitForCancelFunc should go in patch 2; wait_for_cancel is unused.

  QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
                                    BufferedPutFunc *put_buffer,
diff --git a/migration.c b/migration.c
index af3a1f2..d8a0abb 100644
--- a/migration.c
+++ b/migration.c
@@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
  {
      int ret = 0;

-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
      if (s->file) {
          DPRINTF("closing file\n");
          if (qemu_fclose(s->file) != 0) {
@@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
      return ret;
  }

-void migrate_fd_put_notify(void *opaque)
-{
-    FdMigrationState *s = opaque;
-
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-    qemu_file_put_notify(s->file);
-}
-

qemu_file_put_notify is also unused now.

  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
  {
      FdMigrationState *s = opaque;
@@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
      if (ret == -1)
          ret = -(s->get_error(s));

-    if (ret == -EAGAIN) {
-        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
-    } else if (ret<  0) {
+    if (ret<  0&&  ret != -EAGAIN) {
          if (s->mon) {
              monitor_resume(s->mon);
          }
@@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)

  void migrate_fd_connect(FdMigrationState *s)
  {
-    int ret;
-
+    s->begin = 1;
      s->file = qemu_fopen_ops_buffered(s,
                                        s->bandwidth_limit,
                                        migrate_fd_put_buffer,
                                        migrate_fd_put_ready,
                                        migrate_fd_wait_for_unfreeze,
                                        migrate_fd_close);
-
-    DPRINTF("beginning savevm\n");
-    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
-                                  s->mig_state.shared);
-    if (ret<  0) {
-        DPRINTF("failed, %d\n", ret);
-        migrate_fd_error(s);
-        return;
-    }
-
-    migrate_fd_put_ready(s);
  }

  void migrate_fd_put_ready(void *opaque)
  {
      FdMigrationState *s = opaque;
+    int ret;

      if (s->state != MIG_STATE_ACTIVE) {
          DPRINTF("put_ready returning because of non-active state\n");
+        if (s->state == MIG_STATE_CANCELLED) {
+            migrate_fd_terminate(s);
+        }
          return;
      }

+    if (s->begin) {
+        DPRINTF("beginning savevm\n");
+        ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
+                s->mig_state.shared);
+        if (ret<  0) {
+            DPRINTF("failed, %d\n", ret);
+            migrate_fd_error(s);
+            return;
+        }
+        s->begin = 0;
+    }
+
      DPRINTF("iterate\n");
      if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
          int state;
@@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state)
      DPRINTF("cancelling migration\n");

      s->state = MIG_STATE_CANCELLED;
+}
+
+void migrate_fd_terminate(FdMigrationState *s)
+{
      notifier_list_notify(&migration_state_notifiers);
      qemu_savevm_state_cancel(s->mon, s->file);

@@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque)
  {
      FdMigrationState *s = opaque;

-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
      return s->close(s);
  }

diff --git a/migration.h b/migration.h
index 050c56c..887f84c 100644
--- a/migration.h
+++ b/migration.h
@@ -45,9 +45,11 @@ struct FdMigrationState
      int fd;
      Monitor *mon;
      int state;
+    int begin;
      int (*get_error)(struct FdMigrationState*);
      int (*close)(struct FdMigrationState*);
      int (*write)(struct FdMigrationState*, const void *, size_t);
+    void (*callback)(void *);
      void *opaque;
  };

@@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);

  void migrate_fd_connect(FdMigrationState *s);

+void migrate_fd_begin(void *opaque);
+
  void migrate_fd_put_ready(void *opaque);

  int migrate_fd_get_status(MigrationState *mig_state);

  void migrate_fd_cancel(MigrationState *mig_state);

+void migrate_fd_terminate(FdMigrationState *s);
+
  void migrate_fd_release(MigrationState *mig_state);

  void migrate_fd_wait_for_unfreeze(void *opaque);

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux