[PATCH 3/3][virt] virt.virt_utils: Add framework for multihost migration

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Multihost migration framework makes multi host migration
guest with load easy. This patch also replaces old tests
for multihost migration with versions that use the
framework.

Multihost miration framework takes care of:
  - preparing environment before migration
  - preparing guest for migration on source and dest host
  - start guest
  - start work on guest
  - migration between hosts
  - check work on destination host
  - close guest
  - postprocess environment after migration

The framework also allow start multiple migration
independently in some time with multiple hosts and
different work on guests.

Signed-off-by: Jiří Župka <jzupka@xxxxxxxxxx>
---
 client/tests/kvm/multi_host.srv                |  112 ++++--
 client/tests/kvm/tests/cpuflags.py             |  203 ++++------
 client/tests/kvm/tests/migration_multi_host.py |  105 +-----
 client/virt/base.cfg.sample                    |    3 +
 client/virt/subtests.cfg.sample                |   16 +-
 client/virt/virt_env_process.py                |   15 +-
 client/virt/virt_utils.py                      |  517 ++++++++++++++++++++++++
 client/virt/virt_vm.py                         |   53 +++-
 8 files changed, 749 insertions(+), 275 deletions(-)

diff --git a/client/tests/kvm/multi_host.srv b/client/tests/kvm/multi_host.srv
index a4bb20f..23c0b24 100644
--- a/client/tests/kvm/multi_host.srv
+++ b/client/tests/kvm/multi_host.srv
@@ -37,14 +37,22 @@ def generate_mac_address():
     return mac
 
 
-def run(pair):
-    logging.info("KVM test running on source host [%s] and destination "
-                 "host [%s]\n", pair[0], pair[1])
-
-    source = hosts.create_host(pair[0])
-    dest = hosts.create_host(pair[1])
-    source_at = autotest_remote.Autotest(source)
-    dest_at = autotest_remote.Autotest(dest)
+def run(machines):
+    logging.info("KVM test running on hosts %s\n", machines)
+    class Machines(object):
+        def __init__(self, host):
+            self.host = host
+            self.at = None
+            self.params = None
+            self.control = None
+
+    _hosts = {}
+    for machine in machines:
+        _hosts[machine] = Machines(hosts.create_host(machine))
+
+    ats = []
+    for host in _hosts.itervalues():
+        host.at = autotest_remote.Autotest(host.host)
 
     cfg_file = os.path.join(KVM_DIR, "multi-host-tests.cfg")
 
@@ -56,7 +64,10 @@ def run(pair):
     parser.parse_file(cfg_file)
     test_dicts = parser.get_dicts()
 
-    source_control_file = dest_control_file = """
+    ips = []
+    for machine in machines:
+        host = _hosts[machine]
+        host.control = """
 testname = "kvm"
 bindir = os.path.join(job.testdir, testname)
 job.install_pkg(testname, 'test', bindir)
@@ -64,21 +75,29 @@ job.install_pkg(testname, 'test', bindir)
 kvm_test_dir = os.path.join(os.environ['AUTODIR'],'tests', 'kvm')
 sys.path.append(kvm_test_dir)
 """
-    import sys
+        ips.append(host.host.ip)
 
     for params in test_dicts:
-        params['srchost'] = source.ip
-        params['dsthost'] = dest.ip
+        params['hosts'] = ips
+
+        params['not_preprocess'] = "yes"
+        for vm in params.get("vms").split():
+            for nic in params.get('nics',"").split():
+                params['nic_mac_%s_%s' % (nic, vm)] = generate_mac_address()
 
-        for nic in params.get('nics',"").split():
-            params['nic_mac_%s' % nic] = generate_mac_address()
+        params['master_images_clone'] = "image1"
+        params['kill_vm'] = "yes"
 
-        source_params = params.copy()
-        source_params['role'] = "source"
+        s_host = _hosts[machines[0]]
+        s_host.params = params.copy()
+        s_host.params['clone_master'] = "yes"
+        s_host.params['hostid'] = machines[0]
 
-        dest_params = params.copy()
-        dest_params['role'] = "destination"
-        dest_params['migration_mode'] = "tcp"
+        for machine in machines[1:]:
+            host = _hosts[machine]
+            host.params = params.copy()
+            host.params['clone_master'] = "no"
+            host.params['hostid'] = machine
 
         # Report the parameters we've received
         print "Test parameters:"
@@ -87,27 +106,34 @@ sys.path.append(kvm_test_dir)
         for key in keys:
             logging.debug("    %s = %s", key, params[key])
 
-        source_control_file += ("job.run_test('kvm', tag='%s', params=%s)" %
-                                (source_params['shortname'], source_params))
-        dest_control_file += ("job.run_test('kvm', tag='%s', params=%s)" %
-                              (dest_params['shortname'], dest_params))
-
-        logging.info('Source control file:\n%s', source_control_file)
-        logging.info('Destination control file:\n%s', dest_control_file)
-        dest_command = subcommand(dest_at.run,
-                                  [dest_control_file, dest.hostname])
-
-        source_command = subcommand(source_at.run,
-                                    [source_control_file, source.hostname])
-
-        parallel([dest_command, source_command])
-
-# Grab the pairs (and failures)
-(pairs, failures) = utils.form_ntuples_from_machines(machines, 2)
-
-# Log the failures
-for failure in failures:
-    job.record("FAIL", failure[0], "kvm", failure[1])
-
-# Now run through each pair and run
-job.parallel_simple(run, pairs, log=False)
+        for machine in machines:
+            host = _hosts[machine]
+            host.control += ("job.run_test('kvm', tag='%s', params=%s)" %
+                             (host.params['shortname'], host.params))
+
+        logging.info('Master control file:\n%s', _hosts[machines[0]].control)
+        for machine in machines[1:]:
+            host = _hosts[machine]
+            logging.info('Slave control file:\n%s', host.control)
+
+        commands = []
+        for machine in machines:
+            host = _hosts[machine]
+            commands.append(subcommand(host.at.run,
+                                       [host.control, host.host.hostname]))
+
+        parallel(commands)
+
+if 'all' in args:
+    # Run test with all machines at once.
+    run(machines)
+else:
+    # Grab the pairs (and failures)
+    (pairs, failures) = utils.form_ntuples_from_machines(machines, 2)
+
+    # Log the failures
+    for failure in failures:
+        job.record("FAIL", failure[0], "kvm", failure[1])
+
+    # Now run through each pair and run
+    job.parallel_simple(run, pairs, log=False)
diff --git a/client/tests/kvm/tests/cpuflags.py b/client/tests/kvm/tests/cpuflags.py
index 2e09f58..54e4ef9 100644
--- a/client/tests/kvm/tests/cpuflags.py
+++ b/client/tests/kvm/tests/cpuflags.py
@@ -656,13 +656,6 @@ def run_cpuflags(test, params, env):
             """
             Test migration between multiple hosts.
             """
-            def guest_active(vm):
-                o = vm.monitor.info("status")
-                if isinstance(o, str):
-                    return "status: running" in o
-                else:
-                    return o.get("status") == "running"
-
             cpu_model, extra_flags = parse_cpu_model()
 
             flags = HgFlags(cpu_model, extra_flags)
@@ -680,136 +673,80 @@ def run_cpuflags(test, params, env):
                 cpuf_model += ",-" + fdel
 
             install_path = "/tmp"
-            login_timeout = int(params.get("login_timeout", 360))
-            role = params.get("role")
-            srchost = params.get("srchost")
-            dsthost = params.get("dsthost")
-            # Port used to communicate info between source and destination
-            comm_port = int(params.get("comm_port", 12324))
-            comm_timeout = float(params.get("comm_timeout", "10"))
-            regain_ip_cmd = params.get("regain_ip_cmd", "dhclient")
-
-            if role == 'source':
-                (self.vm, session) = start_guest_with_cpuflags(cpuf_model,
-                                                               smp)
-
-                install_cpuflags_test_on_vm(self.vm, install_path)
-
-                Flags = check_cpuflags_work(self.vm, install_path,
-                                            flags.all_possible_guest_flags)
-                logging.info("Woking CPU flags: %s", str(Flags[0]))
-                logging.info("Not working CPU flags: %s", str(Flags[1]))
-                logging.warning("Flags works even if not deffined on"
-                                " guest cpu flags: %s",
-                                str(Flags[0] - flags.guest_flags))
-                logging.warning("Not tested CPU flags: %s", str(Flags[2]))
-
-                session.sendline("nohup dd if=/dev/[svh]da of=/tmp/"
-                                "stressblock bs=10MB count=100 &")
 
-                cmd = ("nohup %s/cpuflags-test --stress  %s%s &" %
-                      (os.path.join(install_path, "test_cpu_flags"), smp,
-                      virt_utils.kvm_flags_to_stresstests(Flags[0] &
-                                                    flags.guest_flags)))
-                logging.debug("Guest_flags: %s", str(flags.guest_flags))
-                logging.debug("Working_flags: %s", str(Flags[0]))
-                logging.debug("Start stress on guest: %s", cmd)
-                session.sendline(cmd)
-
-                # Listen on a port to get the migration port received from
-                # dest machine
-                s_socket = socket.socket(socket.AF_INET,
-                                         socket.SOCK_STREAM)
-                s_socket.bind(('', comm_port))
-                s_socket.listen(1)
-                s_socket.settimeout(comm_timeout)
-
-                # Wait 30 seconds for source and dest to reach this point
-                test.job.barrier(srchost,
-                                 'socket_started', 120).rendezvous(srchost,
-                                                                  dsthost)
-
-                c_socket = s_socket.accept()[0]
-
-                mig_port = int(c_socket.recv(6))
-                logging.info("Received from destination the"
-                             " migration port %s" % mig_port)
-                c_socket.close()
-
-                #Wait for start cpuflags-test stress.
-                time.sleep(10)
-                logging.info("Start migrating now...")
-                self.vm.monitor.migrate_set_speed(mig_speed)
-                self.vm.migrate(dest_host=dsthost, remote_port=mig_port)
-
-                # Wait up to 30 seconds for dest to reach this point
-                test.job.barrier(srchost,
-                                 'mig_finished', 30).rendezvous(srchost,
-                                                                dsthost)
-
-            elif role == 'destination':
-                # Wait up to login_timeout + 30 seconds for the source to
-                # reach this point
-                (self.vm, _) = start_guest_with_cpuflags(cpuf_model,
-                                                         smp,
-                                                         True,
-                                                         False)
-
-                test.job.barrier(dsthost, 'socket_started',
-                                 login_timeout + 120).rendezvous(srchost,
-                                                                dsthost)
-
-                c_socket = socket.socket(socket.AF_INET,
-                                         socket.SOCK_STREAM)
-                c_socket.settimeout(comm_timeout)
-                c_socket.connect((srchost, comm_port))
-
-                logging.info("Communicating to source migration"
-                             " port %s" % self.vm.migration_port)
-                c_socket.send("%d" % self.vm.migration_port)
-                c_socket.close()
-
-                # Wait up to mig_timeout + 30 seconds for the source to
-                # reach this point: migration finished
-                test.job.barrier(dsthost, 'mig_finished',
-                                 mig_timeout + 30).rendezvous(srchost,
-                                                              dsthost)
-
-                if not guest_active(self.vm):
-                    raise error.TestFail("Guest not active after"
-                                         " migration")
-
-                logging.info("Migrated guest appears to be running")
-
-                # Log into the guest again
-                logging.info("Logging into migrated guest after"
-                             " migration...")
-                session_serial = self.vm.wait_for_serial_login(
-                                                    timeout=login_timeout)
-                session_serial.cmd(regain_ip_cmd)
-
-                self.vm.verify_illegal_instructonn()
-
-                session = self.vm.wait_for_login(timeout=login_timeout)
+            class testMultihostMigration(virt_utils.MultihostMigration):
+                def __init__(self, test, params, env):
+                    super(testMultihostMigration, self).__init__(test,
+                                                                 params,
+                                                                 env)
 
-                try:
-                    session.cmd('killall cpuflags-test')
-                except aexpect.ShellCmdError:
-                    raise error.TestFail("The cpuflags-test program should"
-                                         " be active after migration and"
-                                         " it's not.")
+                def migration_scenario(self):
+                    srchost = self.params.get("hosts")[0]
+                    dsthost = self.params.get("hosts")[1]
 
-                Flags = check_cpuflags_work(self.vm, install_path,
-                                            flags.all_possible_guest_flags)
-                logging.info("Woking CPU flags: %s", str(Flags[0]))
-                logging.info("Not working CPU flags: %s", str(Flags[1]))
-                logging.warning("Flags works even if not deffined on"
-                                " guest cpu flags: %s",
-                                str(Flags[0] - flags.guest_flags))
-                logging.warning("Not tested CPU flags: %s", str(Flags[2]))
+                    def worker(mig_data):
+                        vm = env.get_vm("vm1")
+                        session = vm.wait_for_login(timeout=self.login_timeout)
 
-            else:
-                raise error.TestError('Invalid role specified')
+                        install_cpuflags_test_on_vm(vm, install_path)
+
+                        Flags = check_cpuflags_work(vm, install_path,
+                                            flags.all_possible_guest_flags)
+                        logging.info("Woking CPU flags: %s", str(Flags[0]))
+                        logging.info("Not working CPU flags: %s",
+                                     str(Flags[1]))
+                        logging.warning("Flags works even if not deffined on"
+                                        " guest cpu flags: %s",
+                                        str(Flags[0] - flags.guest_flags))
+                        logging.warning("Not tested CPU flags: %s",
+                                        str(Flags[2]))
+                        session.sendline("nohup dd if=/dev/[svh]da of=/tmp/"
+                                         "stressblock bs=10MB count=100 &")
+
+                        cmd = ("nohup %s/cpuflags-test --stress  %s%s &" %
+                               (os.path.join(install_path, "test_cpu_flags"),
+                                smp,
+                                virt_utils.kvm_flags_to_stresstests(Flags[0] &
+                                                        flags.guest_flags)))
+                        logging.debug("Guest_flags: %s",
+                                      str(flags.guest_flags))
+                        logging.debug("Working_flags: %s", str(Flags[0]))
+                        logging.debug("Start stress on guest: %s", cmd)
+                        session.sendline(cmd)
+
+                    def check_worker(mig_data):
+                        vm = env.get_vm("vm1")
+
+                        vm.verify_illegal_instruction()
+
+                        session = vm.wait_for_login(timeout=self.login_timeout)
+
+                        try:
+                            session.cmd('killall cpuflags-test')
+                        except aexpect.ShellCmdError:
+                            raise error.TestFail("The cpuflags-test program"
+                                                 " should be active after"
+                                                 " migration and it's not.")
+
+                        Flags = check_cpuflags_work(vm, install_path,
+                                                flags.all_possible_guest_flags)
+                        logging.info("Woking CPU flags: %s",
+                                     str(Flags[0]))
+                        logging.info("Not working CPU flags: %s",
+                                     str(Flags[1]))
+                        logging.warning("Flags works even if not deffined on"
+                                        " guest cpu flags: %s",
+                                        str(Flags[0] - flags.guest_flags))
+                        logging.warning("Not tested CPU flags: %s",
+                                        str(Flags[2]))
+
+                    self.migrate_wait(["vm1"], srchost, dsthost,
+                                      worker, check_worker)
+
+            params_b = params.copy()
+            params_b["cpu_model"] = cpu_model
+            mig = testMultihostMigration(test, params_b, env)
+            mig.run()
 
     test_type = params.get("test_type")
     if (test_type in locals()):
diff --git a/client/tests/kvm/tests/migration_multi_host.py b/client/tests/kvm/tests/migration_multi_host.py
index 30e3ecc..fe6e29a 100644
--- a/client/tests/kvm/tests/migration_multi_host.py
+++ b/client/tests/kvm/tests/migration_multi_host.py
@@ -1,107 +1,26 @@
-import logging, socket
-from autotest_lib.client.common_lib import error
+from autotest_lib.client.virt import virt_utils
 
 
 def run_migration_multi_host(test, params, env):
     """
     KVM multi-host migration test:
 
-    Migration execution progress:
-
-    source host                       dest host
-    ----------------------------------------------------------------------------
-    log into guest
-    ----------------------------------------------------------------------------
-    start socket server
-
-    wait 30 secs -------------------- wait login_timeout+30 secs ---------------
-
-    accept connection                 connect to socket server,send mig_port
-    ----------------------------------------------------------------------------
-    start migration
-
-    wait 30 secs -------------------- wait mig_timeout+30 secs -----------------
-
-    try to log into migrated guest    check VM's status via monitor cmd
-    ----------------------------------------------------------------------------
+    Migration execution progress is described in documentation
+    for migrate method in class MultihostMigration.
 
     @param test: kvm test object.
     @param params: Dictionary with test parameters.
     @param env: Dictionary with the test environment.
     """
-    def guest_active(vm):
-        o = vm.monitor.info("status")
-        if isinstance(o, str):
-            return "status: running" in o
-        else:
-            return o.get("status") == "running"
-
-    vm = env.get_vm(params["main_vm"])
-    vm.verify_alive()
-    login_timeout = int(params.get("login_timeout", 360))
-    role = params.get("role")
-    srchost = params.get("srchost")
-    dsthost = params.get("dsthost")
-    mig_timeout = int(params.get("mig_timeout"))
-    # Port used to communicate info between source and destination
-    comm_port = int(params.get("comm_port", 12324))
-    regain_ip_cmd = params.get("regain_ip_cmd", "dhclient")
-    if role == 'source':
-        session = vm.wait_for_login(timeout=login_timeout)
-
-        # Listen on a port to get the migration port received from
-        # dest machine
-        s_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        s_socket.bind(('', comm_port))
-        s_socket.listen(1)
-
-        # Wait 30 seconds for source and dest to reach this point
-        test.job.barrier(srchost, 'socket_started', 30).rendezvous(srchost,
-                                                                   dsthost)
-
-        c_socket = s_socket.accept()[0]
-        mig_port = int(c_socket.recv(6))
-        logging.info("Received from destination the migration port %s",
-                     mig_port)
-        c_socket.close()
-
-        logging.info("Start migrating now...")
-        vm.migrate(dest_host=dsthost, remote_port=mig_port)
-
-        # Wait up to 30 seconds for dest to reach this point
-        test.job.barrier(srchost, 'mig_finished', 30).rendezvous(srchost,
-                                                                 dsthost)
-
-    elif role == 'destination':
-        # Wait up to login_timeout + 30 seconds for the source to
-        # reach this point
-        test.job.barrier(dsthost, 'socket_started',
-                         login_timeout + 30).rendezvous(srchost,
-                                                        dsthost)
-
-        c_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        c_socket.connect((srchost, comm_port))
-        logging.info("Communicating to source migration port %s",
-                     vm.migration_port)
-        c_socket.send("%d" % vm.migration_port)
-        c_socket.close()
-
-        # Wait up to mig_timeout + 30 seconds for the source to
-        # reach this point: migration finished
-        test.job.barrier(dsthost, 'mig_finished',
-                         mig_timeout + 30).rendezvous(srchost,
-                                                      dsthost)
-
-        if not guest_active(vm):
-            raise error.TestFail("Guest not active after migration")
+    class TestMultihostMigration(virt_utils.MultihostMigration):
+        def __init__(self, test, params, env):
+            super(TestMultihostMigration, self).__init__(test, params, env)
 
-        logging.info("Migrated guest appears to be running")
+        def migration_scenario(self):
+            srchost = self.params.get("hosts")[0]
+            dsthost = self.params.get("hosts")[1]
 
-        # Log into the guest again
-        logging.info("Logging into migrated guest after migration...")
-        session_serial = vm.wait_for_serial_login(timeout=login_timeout)
-        session_serial.cmd(regain_ip_cmd)
-        session = vm.wait_for_login(timeout=login_timeout)
+            self.migrate_wait(["vm1"], srchost, dsthost)
 
-    else:
-        raise error.TestError('Invalid role specified')
+    mig = TestMultihostMigration(test, params, env)
+    mig.run()
diff --git a/client/virt/base.cfg.sample b/client/virt/base.cfg.sample
index 5e0cea8..0ab9092 100644
--- a/client/virt/base.cfg.sample
+++ b/client/virt/base.cfg.sample
@@ -241,3 +241,6 @@ netdev_peer_re = "\s{2,}(.*?): .*?\\\s(.*?):"
 # netdev_peer_re = "\s{2,}(.*?): .*?\s{2,}(.*?):"
 # for RHEL6 host, the regex should be:
 # netdev_peer_re = "\s{2,}(.*?):.*?peer=(.*?)\n"
+
+image_clone_commnad = 'cp --reflink %s %s'
+image_remove_commnad = 'rm -rf %s'
diff --git a/client/virt/subtests.cfg.sample b/client/virt/subtests.cfg.sample
index a5eac01..9fc60ed 100644
--- a/client/virt/subtests.cfg.sample
+++ b/client/virt/subtests.cfg.sample
@@ -527,6 +527,8 @@ variants:
 
     - migrate_multi_host: install setup image_copy unattended_install.cdrom
         type = migration_multi_host
+        vms = "vm1"
+        start_vm = no
         migration_test_command = help
         migration_bg_command = "cd /tmp; nohup tcpdump -q -i any -t ip host localhost"
         migration_bg_check_command = pgrep tcpdump
@@ -534,9 +536,10 @@ variants:
         kill_vm_on_error = yes
         iterations = 2
         used_mem = 1024
-        mig_timeout = 3600
+        mig_timeout = 4800
+        disk_prepare_timeout = 360
         comm_port = 13234
-        regain_ip_cmd = dhclient
+        regain_ip_cmd = killall dhclient; sleep 10; dhclient;
 
     - boot_savevm: install setup image_copy unattended_install.cdrom
         type = boot_savevm
@@ -1695,13 +1698,18 @@ variants:
     - cpuflags_multi_host:
         type = cpuflags
         test_type = test_multi_host_migration
-        #Disable all unnecessary vms.
-        vms = ""
+        vms = "vm1"
+        start_vm = no
         #Try to start guest with all flags which are supported by host.
         all_host_supported_flags = "no"
         cpu_model = "core2duo:sse3"
         guest_spec_flags = "fxsr_opt hypervisor ds pdpe1gb osxsave svm"
         host_spec_flags = "pbe tm ds_cpl monitor acpi dtes64 ht tm2 xtpr est pdcm smx"
+        mig_timeout = 4800
+        kill_vm_on_error = yes
+        disk_prepare_timeout = 360
+        comm_port = 13234
+        regain_ip_cmd = killall dhclient; sleep 10; dhclient;
 
     - cpu_hotplug_test:
         type = cpu_hotplug
diff --git a/client/virt/virt_env_process.py b/client/virt/virt_env_process.py
index d5ef166..48ebec9 100644
--- a/client/virt/virt_env_process.py
+++ b/client/virt/virt_env_process.py
@@ -320,8 +320,21 @@ def preprocess(test, params, env):
                         int(params.get("pre_command_timeout", "600")),
                         params.get("pre_command_noncritical") == "yes")
 
+    #Clone master image from vms.
+    if params.get("master_images_clone"):
+        for vm_name in params.get("vms").split():
+            vm = env.get_vm(vm_name)
+            if vm:
+                vm.destroy(free_mac_addresses=False)
+                env.unregister_vm(vm_name)
+
+            vm_params = params.object_params(vm_name)
+            for image in vm_params.get("master_images_clone").split():
+                virt_vm.clone_image(params, vm_name, image, test.bindir)
+
     # Preprocess all VMs and images
-    process(test, params, env, preprocess_image, preprocess_vm)
+    if params.get("not_preprocess","no") == "no":
+        process(test, params, env, preprocess_image, preprocess_vm)
 
     # Start the screendump thread
     if params.get("take_regular_screendumps") == "yes":
diff --git a/client/virt/virt_utils.py b/client/virt/virt_utils.py
index 4936fa4..07b2f06 100644
--- a/client/virt/virt_utils.py
+++ b/client/virt/virt_utils.py
@@ -10,6 +10,8 @@ import struct, shutil, glob
 from autotest_lib.client.bin import utils, os_dep
 from autotest_lib.client.common_lib import error, logging_config
 from autotest_lib.client.common_lib import logging_manager, git
+from autotest_lib.client.virt import virt_env_process, virt_vm
+from autotest_lib.client.common_lib.syncdata import SyncData, SyncListenServer
 import rss_client, aexpect
 import platform
 
@@ -242,6 +244,34 @@ class Env(UserDict.IterableUserDict):
         del self["vm__%s" % name]
 
 
+    def register_syncserver(self, port, server):
+        """
+        Register a Sync Server in this Env object.
+
+        @param port: Sync Server port.
+        @param server: Sync Server object.
+        """
+        self["sync__%s" % port] = server
+
+
+    def unregister_syncserver(self, port):
+        """
+        Remove a given Sync Server.
+
+        @param port: Sync Server port.
+        """
+        del self["sync__%s" % port]
+
+
+    def get_syncserver(self, port):
+        """
+        Return a Sync Server object by its port.
+
+        @param port: Sync Server port.
+        """
+        return self.get("sync__%s" % port)
+
+
     def register_installer(self, installer):
         """
         Register a installer that was just run
@@ -3701,3 +3731,490 @@ class NumaNode(object):
         logging.info("Numa Node record dict:")
         for i in self.cpus:
             logging.info("    %s: %s" % (i, self.dict[i]))
+
+
+def generate_mac_address_simple():
+    r = random.SystemRandom()
+    mac = "9a:%02x:%02x:%02x:%02x:%02x" % (r.randint(0x00, 0xff),
+                                           r.randint(0x00, 0xff),
+                                           r.randint(0x00, 0xff),
+                                           r.randint(0x00, 0xff),
+                                           r.randint(0x00, 0xff))
+    return mac
+
+
+def guest_active(vm):
+    o = vm.monitor.info("status")
+    if isinstance(o, str):
+        return "status: running" in o
+    else:
+        return o.get("status") == "running"
+
+
+def preprocess_images(bindir, params, env):
+    # Clone master image form vms.
+    for vm_name in params.get("vms").split():
+        vm = env.get_vm(vm_name)
+        if vm:
+            vm.destroy(free_mac_addresses=False)
+        vm_params = params.object_params(vm_name)
+        for image in vm_params.get("master_images_clone").split():
+            virt_vm.clone_image(params, vm_name, image, bindir)
+
+
+def postprocess_images(bindir, params):
+    for vm in params.get("vms").split():
+        vm_params = params.object_params(vm)
+        for image in vm_params.get("master_images_clone").split():
+            virt_vm.rm_image(params, vm, image, bindir)
+
+
+class MigrationData(object):
+    def __init__(self, params, srchost, dsthost, vms_name):
+        """
+        Class that contains data needed for one migration.
+        """
+        self.params = params
+
+        self.source = False
+        if params.get("hostid") == srchost:
+            self.source = True
+
+        self.destination = False
+        if params.get("hostid") == dsthost:
+            self.destination = True
+
+        self.src = srchost
+        self.dst = dsthost
+        self.hosts = [srchost, dsthost]
+        self.mig_id = {'src': srchost, 'dst': dsthost, "vms": vms_name}
+        self.vms_name = vms_name
+        self.vms = []
+        self.vm_ports = None
+
+
+    def is_src(self):
+        """
+        @return: True if host is source.
+        """
+        return self.source
+
+
+    def is_dst(self):
+        """
+        @return: True if host is destination.
+        """
+        return self.destination
+
+
+class MultihostMigration(object):
+    """
+    Class that provides a framework for multi-host migration.
+
+    Migration can be run both synchronously and asynchronously.
+    To specify what is going to happen during the multi-host
+    migration, it is necessary to reimplement the method
+    migration_scenario. It is possible to start multiple migrations
+    in separate threads, since self.migrate is thread safe.
+
+    Only one test using multihost migration framework should be
+    started on one machine otherwise it is necessary to solve the
+    problem with listen server port.
+
+    Multihost migration starts SyncListenServer through which
+    all messages are transfered, since the multiple hosts can
+    be in diferent states.
+
+    Class SyncData is used to transfer data over network or
+    synchronize the migration process. Synchronization sessions
+    are recognized by session_id.
+
+    It is important to note that, in order to have multi-host
+    migration, one needs shared guest image storage. The simplest
+    case is when the guest images are on an NFS server.
+
+    Example:
+        class TestMultihostMigration(virt_utils.MultihostMigration):
+            def __init__(self, test, params, env):
+                super(testMultihostMigration, self).__init__(test, params, env)
+
+            def migration_scenario(self):
+                srchost = self.params.get("hosts")[0]
+                dsthost = self.params.get("hosts")[1]
+
+                def worker(mig_data):
+                    vm = env.get_vm("vm1")
+                    session = vm.wait_for_login(timeout=self.login_timeout)
+                    session.sendline("nohup dd if=/dev/zero of=/dev/null &")
+                    session.cmd("killall -0 dd")
+
+                def check_worker(mig_data):
+                    vm = env.get_vm("vm1")
+                    session = vm.wait_for_login(timeout=self.login_timeout)
+                    session.cmd("killall -9 dd")
+
+                # Almost synchronized migration, waiting to end it.
+                # Work is started only on first VM.
+                self.migrate_wait(["vm1", "vm2"], srchost, dsthost,
+                                  worker, check_worker)
+
+                # Migration started in different threads.
+                # It allows to start multiple migrations simultaneously.
+                mig1 = self.migrate(["vm1"], srchost, dsthost,
+                                    worker, check_worker)
+                mig2 = self.migrate(["vm2"], srchost, dsthost)
+                mig2.join()
+                mig1.join()
+
+    mig = TestMultihostMigration(test, params, env)
+    mig.run()
+    """
+    def __init__(self, test, params, env, preprocess_env=True):
+        self.test = test
+        self.params = params
+        self.env = env
+        self.hosts = params.get("hosts")
+        self.hostid = params.get('hostid', "")
+        self.comm_port = int(params.get("comm_port", 13234))
+        vms_count = len(params["vms"].split())
+
+        self.login_timeout = int(params.get("login_timeout", 360))
+        self.disk_prepare_timeout = int(params.get("disk_prepare_timeout",
+                                              160 * vms_count))
+        self.finish_timeout = int(params.get("finish_timeout",
+                                              120 * vms_count))
+
+        self.new_params = None
+
+        if params.get("clone_master") == "yes":
+            self.clone_master = True
+        else:
+            self.clone_master = False
+
+        self.mig_timeout = int(params.get("mig_timeout"))
+        # Port used to communicate info between source and destination
+        self.regain_ip_cmd = params.get("regain_ip_cmd", "dhclient")
+
+        self.vm_lock = threading.Lock()
+
+        self.sync_server = None
+        if self.clone_master:
+            self.sync_server = SyncListenServer(test.job.resultdir)
+
+        if preprocess_env:
+            self.preprocess_env()
+            self._hosts_barrier(self.hosts, self.hosts, 'disk_prepared',
+                                 self.disk_prepare_timeout)
+
+
+    def migration_scenario(self):
+        """
+        Multi Host migration_scenario is started from method run where the
+        exceptions are checked. It is not necessary to take care of
+        cleaning up after test crash or finish.
+        """
+        raise NotImplementedError
+
+
+    def migrate_vms_src(self, mig_data):
+        """
+        Migrate vms source.
+
+        @param mig_Data: Data for migration.
+
+        For change way how machine migrates is necessary
+        re implement this method.
+        """
+        def mig_wrapper(vm, dsthost, vm_ports):
+            vm.migrate(dest_host=dsthost, remote_port=vm_ports[vm.name])
+
+        logging.info("Start migrating now...")
+        multi_mig = []
+        for vm in mig_data.vms:
+            multi_mig.append((mig_wrapper, (vm, mig_data.dst,
+                                            mig_data.vm_ports)))
+        parallel(multi_mig)
+
+
+    def migrate_vms_dest(self, mig_data):
+        """
+        Migrate vms destination. This function is started on dest host during
+        migration.
+
+        @param mig_Data: Data for migration.
+        """
+        pass
+
+
+    def __del__(self):
+        if self.sync_server:
+            self.sync_server.close()
+
+
+    def master_id(self):
+        return self.hosts[0]
+
+
+    def _hosts_barrier(self, hosts, session_id, tag, timeout):
+        logging.debug("Barrier timeout: %d tags: %s" % (timeout, tag))
+        tags = SyncData(self.master_id(), self.hostid, hosts,
+                        "%s,%s,barrier" % (str(session_id), tag),
+                        self.sync_server).sync(tag, timeout)
+        logging.debug("Barrier tag %s" % (tags))
+
+
+    def preprocess_env(self):
+        """
+        Prepare env to start vms.
+        """
+        preprocess_images(self.test.bindir, self.params, self.env)
+
+
+    def _check_vms_source(self, mig_data):
+        for vm in mig_data.vms:
+            vm.wait_for_login(timeout=self.login_timeout)
+
+        sync = SyncData(self.master_id(), self.hostid, mig_data.hosts,
+                        mig_data.mig_id, self.sync_server)
+        mig_data.vm_ports = sync.sync(timeout=120)[mig_data.dst]
+        logging.info("Received from destination the migration port %s",
+                     str(mig_data.vm_ports))
+
+
+    def _check_vms_dest(self, mig_data):
+        mig_data.vm_ports = {}
+        for vm in mig_data.vms:
+            logging.info("Communicating to source migration port %s",
+                         vm.migration_port)
+            mig_data.vm_ports[vm.name] = vm.migration_port
+
+        SyncData(self.master_id(), self.hostid,
+                 mig_data.hosts, mig_data.mig_id,
+                 self.sync_server).sync(mig_data.vm_ports, timeout=120)
+
+
+    def _prepare_params(self, mig_data):
+        """
+        Prepare separate params for vm migration.
+
+        @param vms_name: List of vms.
+        """
+        new_params = mig_data.params.copy()
+        new_params["vms"] = " ".join(mig_data.vms_name)
+        return new_params
+
+
+    def _check_vms(self, mig_data):
+        """
+        Check if vms are started correctly.
+
+        @param vms: list of vms.
+        @param source: Must be True if is source machine.
+        """
+        logging.info("Try check vms %s" % (mig_data.vms_name))
+        for vm in mig_data.vms_name:
+            if not self.env.get_vm(vm) in mig_data.vms:
+                mig_data.vms.append(self.env.get_vm(vm))
+        for vm in mig_data.vms:
+            logging.info("Check vm %s on host %s" % (vm.name, self.hostid))
+            vm.verify_alive()
+
+        if mig_data.is_src():
+            self._check_vms_source(mig_data)
+        else:
+            self._check_vms_dest(mig_data)
+
+
+    def prepare_for_migration(self, mig_data, migration_mode):
+        """
+        Prepare destination of migration for migration.
+
+        @param mig_data: Class with data necessary for migration.
+        @param migration_mode: Migration mode for prepare machine.
+        """
+        new_params = self._prepare_params(mig_data)
+
+        new_params['migration_mode'] = migration_mode
+        new_params['start_vm'] = 'yes'
+        self.vm_lock.acquire()
+        virt_env_process.process(self.test, new_params, self.env,
+                                 virt_env_process.preprocess_image,
+                                 virt_env_process.preprocess_vm)
+        self.vm_lock.release()
+
+        self._check_vms(mig_data)
+
+
+    def migrate_vms(self, mig_data):
+        """
+        Migrate vms.
+        """
+        if mig_data.is_src():
+            self.migrate_vms_src(mig_data)
+        else:
+            self.migrate_vms_dest(mig_data)
+
+
+    def check_vms(self, mig_data):
+        """
+        Check vms after migrate.
+
+        @param mig_data: object with migration data.
+        """
+        for vm in mig_data.vms:
+            if not guest_active(vm):
+                raise error.TestFail("Guest not active after migration")
+
+        logging.info("Migrated guest appears to be running")
+
+        logging.info("Logging into migrated guest after migration...")
+        for vm in mig_data.vms:
+            session_serial = vm.wait_for_serial_login(timeout=
+                                                      self.login_timeout)
+            #There is sometime happen that system sends some message on
+            #serial console and IP renew command block test. Because
+            #there must be added "sleep" in IP renew command.
+            session_serial.cmd(self.regain_ip_cmd)
+            vm.wait_for_login(timeout=self.login_timeout)
+
+
+    def postprocess_env(self):
+        """
+        Kill vms and delete cloned images.
+        """
+        postprocess_images(self.test.bindir, self.params)
+
+
+    def migrate(self, vms_name, srchost, dsthost, start_work=None,
+                check_work=None):
+        """
+        Migrate machine from srchost to dsthost. It executes start_work on
+        source machine before migration and executes check_work on dsthost
+        after migration.
+
+        Migration execution progress:
+
+        source host                   |   dest host
+        --------------------------------------------------------
+           prepare guest on both sides of migration
+            - start machine and check if machine works
+            - synchronize transfer data needed for migration
+        --------------------------------------------------------
+        start work on source guests   |   wait for migration
+        --------------------------------------------------------
+                     migrate guest to dest host.
+              wait on finish migration synchronization
+        --------------------------------------------------------
+                                      |   check work on vms
+        --------------------------------------------------------
+                    wait for sync on finish migration
+
+        @param vms_name: List of vms.
+        @param srchost: src host id.
+        @param dsthost: dst host id.
+        @param start_work: Function started before migration.
+        @param check_work: Function started after migration.
+        """
+        def migrate_wrap(vms_name, srchost, dsthost, start_work=None,
+                check_work=None):
+            logging.info("Starting migrate vms %s from host %s to %s" %
+                         (vms_name, srchost, dsthost))
+            error = None
+            mig_data = MigrationData(self.params, srchost, dsthost, vms_name)
+            try:
+                try:
+                    if mig_data.is_src():
+                        self.prepare_for_migration(mig_data, None)
+                    elif self.hostid == dsthost:
+                        self.prepare_for_migration(mig_data, "tcp")
+                    else:
+                        return
+
+                    if mig_data.is_src():
+                        if start_work:
+                            start_work(mig_data)
+
+                    self.migrate_vms(mig_data)
+
+                    timeout = 30
+                    if not mig_data.is_src():
+                        timeout = self.mig_timeout
+                    self._hosts_barrier(mig_data.hosts, mig_data.mig_id,
+                                        'mig_finished', timeout)
+
+                    if mig_data.is_dst():
+                        self.check_vms(mig_data)
+                        if check_work:
+                            check_work(mig_data)
+
+                except:
+                    error = True
+                    raise
+            finally:
+                if not error:
+                    self._hosts_barrier(self.hosts,
+                                        mig_data.mig_id,
+                                        'test_finihed',
+                                        self.finish_timeout)
+
+        def wait_wrap(vms_name, srchost, dsthost):
+            mig_data = MigrationData(self.params, srchost, dsthost, vms_name)
+            timeout = (self.login_timeout + self.mig_timeout +
+                       self.finish_timeout)
+
+            self._hosts_barrier(self.hosts, mig_data.mig_id,
+                                'test_finihed', timeout)
+
+        if (self.hostid in [srchost, dsthost]):
+            mig_thread = utils.InterruptedThread(migrate_wrap, (vms_name,
+                                                                srchost,
+                                                                dsthost,
+                                                                start_work,
+                                                                check_work))
+        else:
+            mig_thread = utils.InterruptedThread(wait_wrap, (vms_name,
+                                                             srchost,
+                                                             dsthost))
+        mig_thread.start()
+        return mig_thread
+
+
+    def migrate_wait(self, vms_name, srchost, dsthost, start_work=None,
+                     check_work=None):
+        """
+        Migrate machine from srchost to dsthost and wait for finish.
+        It executes start_work on source machine before migration and executes
+        check_work on dsthost after migration.
+
+        @param vms_name: List of vms.
+        @param srchost: src host id.
+        @param dsthost: dst host id.
+        @param start_work: Function which is started before migration.
+        @param check_work: Function which is started after
+                           done of migration.
+        """
+        self.migrate(vms_name, srchost, dsthost, start_work, check_work).join()
+
+
+    def cleanup(self):
+        """
+        Cleanup env after test.
+        """
+        if self.clone_master:
+            self.sync_server.close()
+            self.postprocess_env()
+
+
+    def run(self):
+        """
+        Start multihost migration scenario.
+        After scenario is finished or if scenario crashed it calls postprocess
+        machines and cleanup env.
+        """
+        try:
+            self.migration_scenario()
+
+            self._hosts_barrier(self.hosts, self.hosts, 'all_test_finihed',
+                                self.finish_timeout)
+        finally:
+            self.cleanup()
diff --git a/client/virt/virt_vm.py b/client/virt/virt_vm.py
index 06db7a6..9df6396 100644
--- a/client/virt/virt_vm.py
+++ b/client/virt/virt_vm.py
@@ -323,6 +323,57 @@ def get_image_filename(params, root_dir):
     return image_filename
 
 
+def clone_image(params, vm_name, image_name, root_dir):
+    """
+    Clone master image to vm specific file.
+
+    @param params: Dictionary containing the test parameters.
+    @param vm_name: Vm name.
+    @param image_name: Master image name.
+    @param root_dir: Base directory for relative filenames.
+    """
+    if not params.get("image_name_%s_%s" % (image_name, vm_name)):
+        m_image_name = params.get("image_name", "image")
+        vm_image_name = "%s_%s" % (m_image_name, vm_name)
+        if params.get("clone_master", "yes") == "yes":
+            image_params = params.object_params(image_name)
+            image_params["image_name"] = vm_image_name
+
+            m_image_fn = get_image_filename(params, root_dir)
+            image_fn = get_image_filename(image_params, root_dir)
+
+            logging.info("Clone master image for vms.")
+            utils.run(params.get("image_clone_commnad") % (m_image_fn,
+                                                           image_fn))
+
+        params["image_name_%s_%s" % (image_name, vm_name)] = vm_image_name
+
+
+def rm_image(params, vm_name, image_name, root_dir):
+    """
+    Remove vm specific file.
+
+    @param params: Dictionary containing the test parameters.
+    @param vm_name: Vm name.
+    @param image_name: Master image name.
+    @param root_dir: Base directory for relative filenames.
+    """
+    if params.get("image_name_%s_%s" % (image_name, vm_name)):
+        m_image_name = params.get("image_name", "image")
+        vm_image_name = "%s_%s" % (m_image_name, vm_name)
+        if params.get("clone_master", "yes") == "yes":
+            image_params = params.object_params(image_name)
+            image_params["image_name"] = vm_image_name
+
+            image_fn = get_image_filename(image_params, root_dir)
+
+            logging.debug("Removing vm specific image file %s", image_fn)
+            if os.path.exists(image_fn):
+                utils.run(params.get("image_remove_commnad") % (image_fn))
+            else:
+                logging.debug("Image file %s not found", image_fn)
+
+
 def create_image(params, root_dir):
     """
     Create an image using qemu_image.
@@ -685,7 +736,7 @@ class BaseVM(object):
                 raise VMDeadKernelCrashError(match.group(0))
 
 
-    def verify_illegal_instructonn(self):
+    def verify_illegal_instruction(self):
         """
         Find illegal instruction code on VM serial console output.
 
-- 
1.7.7.6

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux