Multihost migration framework makes multi host migration guest with load easy. This patch also replaces old tests for multihost migration with version which using the framework. Multihost miration framework take care about: - preparing environment before migration - preparing guest for migration on source and dest host - start guest - start work on guest - migration between hosts - check work on destination host - close guest - postprocess environment after migraiton The framework also allow start multiple migraiton independently in some time with multiple hosts and different work on guests. Signed-off-by: Jiří Župka <jzupka@xxxxxxxxxx> --- client/tests/kvm/multi_host.srv | 94 +++-- client/tests/kvm/tests/cpuflags.py | 203 ++++------- client/tests/kvm/tests/migration_multi_host.py | 105 +----- client/virt/base.cfg.sample | 3 + client/virt/subtests.cfg.sample | 16 +- client/virt/virt_env_process.py | 15 +- client/virt/virt_utils.py | 506 ++++++++++++++++++++++++ client/virt/virt_vm.py | 51 +++ 8 files changed, 726 insertions(+), 267 deletions(-) diff --git a/client/tests/kvm/multi_host.srv b/client/tests/kvm/multi_host.srv index a4bb20f..c661253 100644 --- a/client/tests/kvm/multi_host.srv +++ b/client/tests/kvm/multi_host.srv @@ -37,14 +37,22 @@ def generate_mac_address(): return mac -def run(pair): - logging.info("KVM test running on source host [%s] and destination " - "host [%s]\n", pair[0], pair[1]) - - source = hosts.create_host(pair[0]) - dest = hosts.create_host(pair[1]) - source_at = autotest_remote.Autotest(source) - dest_at = autotest_remote.Autotest(dest) +def run(machines): + logging.info("KVM test running on hosts %s\n", machines) + class Machines(object): + def __init__(self, host): + self.host = host + self.at = None + self.params = None + self.control = None + + _hosts = {} + for machine in machines: + _hosts[machine] = Machines(hosts.create_host(machine)) + + ats = [] + for host in _hosts.itervalues(): + host.at = autotest_remote.Autotest(host.host) cfg_file = os.path.join(KVM_DIR, "multi-host-tests.cfg") @@ -56,7 +64,9 @@ def run(pair): parser.parse_file(cfg_file) test_dicts = parser.get_dicts() - source_control_file = dest_control_file = """ + ips = [] + for host in _hosts.itervalues(): + host.control = """ testname = "kvm" bindir = os.path.join(job.testdir, testname) job.install_pkg(testname, 'test', bindir) @@ -64,21 +74,29 @@ job.install_pkg(testname, 'test', bindir) kvm_test_dir = os.path.join(os.environ['AUTODIR'],'tests', 'kvm') sys.path.append(kvm_test_dir) """ + ips.append(host.host.ip) import sys for params in test_dicts: - params['srchost'] = source.ip - params['dsthost'] = dest.ip + params['hosts'] = ips + + params['not_preprocess'] = "yes" + for vm in params.get("vms").split(): + for nic in params.get('nics',"").split(): + params['nic_mac_%s_%s' % (nic, vm)] = generate_mac_address() - for nic in params.get('nics',"").split(): - params['nic_mac_%s' % nic] = generate_mac_address() + params['mater_images_clone'] = "image1" + params['kill_vm'] = "yes" - source_params = params.copy() - source_params['role'] = "source" + s_host = _hosts[machines[0]] + s_host.params = params.copy() + s_host.params['clone_master'] = "yes" + s_host.params['hostid'] = machines[0] - dest_params = params.copy() - dest_params['role'] = "destination" - dest_params['migration_mode'] = "tcp" + for machine, host in _hosts.items()[1:]: + host.params = params.copy() + host.params['clone_master'] = "no" + host.params['hostid'] = machine # Report the parameters we've received print "Test parameters:" @@ -87,27 +105,31 @@ sys.path.append(kvm_test_dir) for key in keys: logging.debug(" %s = %s", key, params[key]) - source_control_file += ("job.run_test('kvm', tag='%s', params=%s)" % - (source_params['shortname'], source_params)) - dest_control_file += ("job.run_test('kvm', tag='%s', params=%s)" % - (dest_params['shortname'], dest_params)) + for host in _hosts.itervalues(): + host.control += ("job.run_test('kvm', tag='%s', params=%s)" % + (host.params['shortname'], host.params)) - logging.info('Source control file:\n%s', source_control_file) - logging.info('Destination control file:\n%s', dest_control_file) - dest_command = subcommand(dest_at.run, - [dest_control_file, dest.hostname]) + logging.info('Master control file:\n%s', _hosts[machines[0]].control) + for host in _hosts.values()[1:]: + logging.info('Slave control file:\n%s', host.control) - source_command = subcommand(source_at.run, - [source_control_file, source.hostname]) + commands = [] + for host in _hosts.itervalues(): + commands.append(subcommand(host.at.run, + [host.control, host.host.hostname])) - parallel([dest_command, source_command]) + parallel(commands) -# Grab the pairs (and failures) -(pairs, failures) = utils.form_ntuples_from_machines(machines, 2) +if 'all' in args: + # Run test with all machines at once. + run(machines) +else: + # Grab the pairs (and failures) + (pairs, failures) = utils.form_ntuples_from_machines(machines, 2) -# Log the failures -for failure in failures: - job.record("FAIL", failure[0], "kvm", failure[1]) + # Log the failures + for failure in failures: + job.record("FAIL", failure[0], "kvm", failure[1]) -# Now run through each pair and run -job.parallel_simple(run, pairs, log=False) + # Now run through each pair and run + job.parallel_simple(run, pairs, log=False) diff --git a/client/tests/kvm/tests/cpuflags.py b/client/tests/kvm/tests/cpuflags.py index 2e09f58..114ffa3 100644 --- a/client/tests/kvm/tests/cpuflags.py +++ b/client/tests/kvm/tests/cpuflags.py @@ -656,13 +656,6 @@ def run_cpuflags(test, params, env): """ Test migration between multiple hosts. """ - def guest_active(vm): - o = vm.monitor.info("status") - if isinstance(o, str): - return "status: running" in o - else: - return o.get("status") == "running" - cpu_model, extra_flags = parse_cpu_model() flags = HgFlags(cpu_model, extra_flags) @@ -680,136 +673,80 @@ def run_cpuflags(test, params, env): cpuf_model += ",-" + fdel install_path = "/tmp" - login_timeout = int(params.get("login_timeout", 360)) - role = params.get("role") - srchost = params.get("srchost") - dsthost = params.get("dsthost") - # Port used to communicate info between source and destination - comm_port = int(params.get("comm_port", 12324)) - comm_timeout = float(params.get("comm_timeout", "10")) - regain_ip_cmd = params.get("regain_ip_cmd", "dhclient") - - if role == 'source': - (self.vm, session) = start_guest_with_cpuflags(cpuf_model, - smp) - - install_cpuflags_test_on_vm(self.vm, install_path) - - Flags = check_cpuflags_work(self.vm, install_path, - flags.all_possible_guest_flags) - logging.info("Woking CPU flags: %s", str(Flags[0])) - logging.info("Not working CPU flags: %s", str(Flags[1])) - logging.warning("Flags works even if not deffined on" - " guest cpu flags: %s", - str(Flags[0] - flags.guest_flags)) - logging.warning("Not tested CPU flags: %s", str(Flags[2])) - - session.sendline("nohup dd if=/dev/[svh]da of=/tmp/" - "stressblock bs=10MB count=100 &") - cmd = ("nohup %s/cpuflags-test --stress %s%s &" % - (os.path.join(install_path, "test_cpu_flags"), smp, - virt_utils.kvm_flags_to_stresstests(Flags[0] & - flags.guest_flags))) - logging.debug("Guest_flags: %s", str(flags.guest_flags)) - logging.debug("Working_flags: %s", str(Flags[0])) - logging.debug("Start stress on guest: %s", cmd) - session.sendline(cmd) - - # Listen on a port to get the migration port received from - # dest machine - s_socket = socket.socket(socket.AF_INET, - socket.SOCK_STREAM) - s_socket.bind(('', comm_port)) - s_socket.listen(1) - s_socket.settimeout(comm_timeout) - - # Wait 30 seconds for source and dest to reach this point - test.job.barrier(srchost, - 'socket_started', 120).rendezvous(srchost, - dsthost) - - c_socket = s_socket.accept()[0] - - mig_port = int(c_socket.recv(6)) - logging.info("Received from destination the" - " migration port %s" % mig_port) - c_socket.close() - - #Wait for start cpuflags-test stress. - time.sleep(10) - logging.info("Start migrating now...") - self.vm.monitor.migrate_set_speed(mig_speed) - self.vm.migrate(dest_host=dsthost, remote_port=mig_port) - - # Wait up to 30 seconds for dest to reach this point - test.job.barrier(srchost, - 'mig_finished', 30).rendezvous(srchost, - dsthost) - - elif role == 'destination': - # Wait up to login_timeout + 30 seconds for the source to - # reach this point - (self.vm, _) = start_guest_with_cpuflags(cpuf_model, - smp, - True, - False) - - test.job.barrier(dsthost, 'socket_started', - login_timeout + 120).rendezvous(srchost, - dsthost) - - c_socket = socket.socket(socket.AF_INET, - socket.SOCK_STREAM) - c_socket.settimeout(comm_timeout) - c_socket.connect((srchost, comm_port)) - - logging.info("Communicating to source migration" - " port %s" % self.vm.migration_port) - c_socket.send("%d" % self.vm.migration_port) - c_socket.close() - - # Wait up to mig_timeout + 30 seconds for the source to - # reach this point: migration finished - test.job.barrier(dsthost, 'mig_finished', - mig_timeout + 30).rendezvous(srchost, - dsthost) - - if not guest_active(self.vm): - raise error.TestFail("Guest not active after" - " migration") - - logging.info("Migrated guest appears to be running") - - # Log into the guest again - logging.info("Logging into migrated guest after" - " migration...") - session_serial = self.vm.wait_for_serial_login( - timeout=login_timeout) - session_serial.cmd(regain_ip_cmd) - - self.vm.verify_illegal_instructonn() - - session = self.vm.wait_for_login(timeout=login_timeout) + class testMultihostMigration(virt_utils.MultihostMigration): + def __init__(self, test, params, env): + super(testMultihostMigration, self).__init__(test, + params, + env) - try: - session.cmd('killall cpuflags-test') - except aexpect.ShellCmdError: - raise error.TestFail("The cpuflags-test program should" - " be active after migration and" - " it's not.") + def migration_scenario(self): + srchost = self.params.get("hosts")[0] + dsthost = self.params.get("hosts")[1] - Flags = check_cpuflags_work(self.vm, install_path, - flags.all_possible_guest_flags) - logging.info("Woking CPU flags: %s", str(Flags[0])) - logging.info("Not working CPU flags: %s", str(Flags[1])) - logging.warning("Flags works even if not deffined on" - " guest cpu flags: %s", - str(Flags[0] - flags.guest_flags)) - logging.warning("Not tested CPU flags: %s", str(Flags[2])) + def worker(mig_data): + vm = env.get_vm("vm1") + session = vm.wait_for_login(timeout=self.login_timeout) - else: - raise error.TestError('Invalid role specified') + install_cpuflags_test_on_vm(vm, install_path) + + Flags = check_cpuflags_work(vm, install_path, + flags.all_possible_guest_flags) + logging.info("Woking CPU flags: %s", str(Flags[0])) + logging.info("Not working CPU flags: %s", + str(Flags[1])) + logging.warning("Flags works even if not deffined on" + " guest cpu flags: %s", + str(Flags[0] - flags.guest_flags)) + logging.warning("Not tested CPU flags: %s", + str(Flags[2])) + session.sendline("nohup dd if=/dev/[svh]da of=/tmp/" + "stressblock bs=10MB count=100 &") + + cmd = ("nohup %s/cpuflags-test --stress %s%s &" % + (os.path.join(install_path, "test_cpu_flags"), + smp, + virt_utils.kvm_flags_to_stresstests(Flags[0] & + flags.guest_flags))) + logging.debug("Guest_flags: %s", + str(flags.guest_flags)) + logging.debug("Working_flags: %s", str(Flags[0])) + logging.debug("Start stress on guest: %s", cmd) + session.sendline(cmd) + + def check_worker(mig_data): + vm = env.get_vm("vm1") + + vm.verify_illegal_instructonn() + + session = vm.wait_for_login(timeout=self.login_timeout) + + try: + session.cmd('killall cpuflags-test') + except aexpect.ShellCmdError: + raise error.TestFail("The cpuflags-test program" + " should be active after" + " migration and it's not.") + + Flags = check_cpuflags_work(vm, install_path, + flags.all_possible_guest_flags) + logging.info("Woking CPU flags: %s", + str(Flags[0])) + logging.info("Not working CPU flags: %s", + str(Flags[1])) + logging.warning("Flags works even if not deffined on" + " guest cpu flags: %s", + str(Flags[0] - flags.guest_flags)) + logging.warning("Not tested CPU flags: %s", + str(Flags[2])) + + self.migrate_wait(["vm1"], srchost, dsthost, + worker, check_worker) + + params_b = params.copy() + params_b["cpu_model"] = cpu_model + mig = testMultihostMigration(test, params_b, env) + mig.run() test_type = params.get("test_type") if (test_type in locals()): diff --git a/client/tests/kvm/tests/migration_multi_host.py b/client/tests/kvm/tests/migration_multi_host.py index 30e3ecc..0f6fe2e 100644 --- a/client/tests/kvm/tests/migration_multi_host.py +++ b/client/tests/kvm/tests/migration_multi_host.py @@ -1,107 +1,26 @@ -import logging, socket -from autotest_lib.client.common_lib import error +from autotest_lib.client.virt import virt_utils def run_migration_multi_host(test, params, env): """ KVM multi-host migration test: - Migration execution progress: - - source host dest host - ---------------------------------------------------------------------------- - log into guest - ---------------------------------------------------------------------------- - start socket server - - wait 30 secs -------------------- wait login_timeout+30 secs --------------- - - accept connection connect to socket server,send mig_port - ---------------------------------------------------------------------------- - start migration - - wait 30 secs -------------------- wait mig_timeout+30 secs ----------------- - - try to log into migrated guest check VM's status via monitor cmd - ---------------------------------------------------------------------------- + Migration execution progress is described in documentation + for migrate method in class MultihostMigration. @param test: kvm test object. @param params: Dictionary with test parameters. @param env: Dictionary with the test environment. """ - def guest_active(vm): - o = vm.monitor.info("status") - if isinstance(o, str): - return "status: running" in o - else: - return o.get("status") == "running" - - vm = env.get_vm(params["main_vm"]) - vm.verify_alive() - login_timeout = int(params.get("login_timeout", 360)) - role = params.get("role") - srchost = params.get("srchost") - dsthost = params.get("dsthost") - mig_timeout = int(params.get("mig_timeout")) - # Port used to communicate info between source and destination - comm_port = int(params.get("comm_port", 12324)) - regain_ip_cmd = params.get("regain_ip_cmd", "dhclient") - if role == 'source': - session = vm.wait_for_login(timeout=login_timeout) - - # Listen on a port to get the migration port received from - # dest machine - s_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s_socket.bind(('', comm_port)) - s_socket.listen(1) - - # Wait 30 seconds for source and dest to reach this point - test.job.barrier(srchost, 'socket_started', 30).rendezvous(srchost, - dsthost) - - c_socket = s_socket.accept()[0] - mig_port = int(c_socket.recv(6)) - logging.info("Received from destination the migration port %s", - mig_port) - c_socket.close() - - logging.info("Start migrating now...") - vm.migrate(dest_host=dsthost, remote_port=mig_port) - - # Wait up to 30 seconds for dest to reach this point - test.job.barrier(srchost, 'mig_finished', 30).rendezvous(srchost, - dsthost) - - elif role == 'destination': - # Wait up to login_timeout + 30 seconds for the source to - # reach this point - test.job.barrier(dsthost, 'socket_started', - login_timeout + 30).rendezvous(srchost, - dsthost) - - c_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - c_socket.connect((srchost, comm_port)) - logging.info("Communicating to source migration port %s", - vm.migration_port) - c_socket.send("%d" % vm.migration_port) - c_socket.close() - - # Wait up to mig_timeout + 30 seconds for the source to - # reach this point: migration finished - test.job.barrier(dsthost, 'mig_finished', - mig_timeout + 30).rendezvous(srchost, - dsthost) - - if not guest_active(vm): - raise error.TestFail("Guest not active after migration") + class testMultihostMigration(virt_utils.MultihostMigration): + def __init__(self, test, params, env): + super(testMultihostMigration, self).__init__(test, params, env) - logging.info("Migrated guest appears to be running") + def migration_scenario(self): + srchost = self.params.get("hosts")[0] + dsthost = self.params.get("hosts")[1] - # Log into the guest again - logging.info("Logging into migrated guest after migration...") - session_serial = vm.wait_for_serial_login(timeout=login_timeout) - session_serial.cmd(regain_ip_cmd) - session = vm.wait_for_login(timeout=login_timeout) + self.migrate_wait(["vm1"], srchost, dsthost) - else: - raise error.TestError('Invalid role specified') + mig = testMultihostMigration(test, params, env) + mig.run() diff --git a/client/virt/base.cfg.sample b/client/virt/base.cfg.sample index 1ecbf36..93b9159 100644 --- a/client/virt/base.cfg.sample +++ b/client/virt/base.cfg.sample @@ -196,3 +196,6 @@ netdev_peer_re = "\s{2,}(.*?): .*?\\\s(.*?):" # netdev_peer_re = "\s{2,}(.*?): .*?\s{2,}(.*?):" # for RHEL6 host, the regex should be: # netdev_peer_re = "\s{2,}(.*?):.*?peer=(.*?)\n" + +image_clone_commnad = 'cp --reflink %s %s' +image_remove_commnad = 'rm -rf %s' diff --git a/client/virt/subtests.cfg.sample b/client/virt/subtests.cfg.sample index 7026334..762be35 100644 --- a/client/virt/subtests.cfg.sample +++ b/client/virt/subtests.cfg.sample @@ -527,6 +527,8 @@ variants: - migrate_multi_host: install setup image_copy unattended_install.cdrom type = migration_multi_host + vms = "vm1" + start_vm = no migration_test_command = help migration_bg_command = "cd /tmp; nohup tcpdump -q -i any -t ip host localhost" migration_bg_check_command = pgrep tcpdump @@ -534,9 +536,10 @@ variants: kill_vm_on_error = yes iterations = 2 used_mem = 1024 - mig_timeout = 3600 + mig_timeout = 4800 + disk_prepare_timeout = 360 comm_port = 13234 - regain_ip_cmd = dhclient + regain_ip_cmd = killall dhclient; sleep 10; dhclient; - boot_savevm: install setup image_copy unattended_install.cdrom type = boot_savevm @@ -1694,13 +1697,18 @@ variants: - cpuflags_multi_host: type = cpuflags test_type = test_multi_host_migration - #Disable all unnecessary vms. - vms = "" + vms = "vm1" + start_vm = no #Try to start guest with all flags which are supported by host. all_host_supported_flags = "no" cpu_model = "core2duo:sse3" guest_spec_flags = "fxsr_opt hypervisor ds pdpe1gb osxsave svm" host_spec_flags = "pbe tm ds_cpl monitor acpi dtes64 ht tm2 xtpr est pdcm smx" + mig_timeout = 4800 + kill_vm_on_error = yes + disk_prepare_timeout = 360 + comm_port = 13234 + regain_ip_cmd = killall dhclient; sleep 10; dhclient; - cpu_hotplug_test: type = cpu_hotplug diff --git a/client/virt/virt_env_process.py b/client/virt/virt_env_process.py index d5ef166..75e9fdf 100644 --- a/client/virt/virt_env_process.py +++ b/client/virt/virt_env_process.py @@ -320,8 +320,21 @@ def preprocess(test, params, env): int(params.get("pre_command_timeout", "600")), params.get("pre_command_noncritical") == "yes") + #Clone master image form vms. + if params.get("mater_images_clone"): + for vm_name in params.get("vms").split(): + vm = env.get_vm(vm_name) + if vm: + vm.destroy(free_mac_addresses=False) + env.unregister_vm(vm_name) + + vm_params = params.object_params(vm_name) + for image in vm_params.get("mater_images_clone").split(): + virt_vm.clone_image(params, vm_name, image, test.bindir) + # Preprocess all VMs and images - process(test, params, env, preprocess_image, preprocess_vm) + if params.get("not_preprocess","no") == "no": + process(test, params, env, preprocess_image, preprocess_vm) # Start the screendump thread if params.get("take_regular_screendumps") == "yes": diff --git a/client/virt/virt_utils.py b/client/virt/virt_utils.py index 7a35f83..bd3a2e1 100644 --- a/client/virt/virt_utils.py +++ b/client/virt/virt_utils.py @@ -10,6 +10,8 @@ import struct, shutil, glob from autotest_lib.client.bin import utils, os_dep from autotest_lib.client.common_lib import error, logging_config from autotest_lib.client.common_lib import logging_manager, git +from autotest_lib.client.virt import virt_env_process, virt_vm +from autotest_lib.client.common_lib.syncdata import SyncData, SyncListenServer import rss_client, aexpect import platform @@ -242,6 +244,34 @@ class Env(UserDict.IterableUserDict): del self["vm__%s" % name] + def register_syncserver(self, port, server): + """ + Register a Synchronization in this Env object. + + @param port: Sync Server port. + @param server: Sync Server object. + """ + self["sync__%s" % port] = server + + + def unregister_syncserver(self, port): + """ + Remove a given Server. + + @param port: Sync Server port. + """ + del self["sync__%s" % port] + + + def get_syncserver(self, port): + """ + Return a SyncServer object by its port. + + @param port: Sync Server port. + """ + return self.get("sync__%s" % port) + + def register_installer(self, installer): """ Register a installer that was just run @@ -3651,3 +3681,479 @@ class NumaNode(object): logging.info("Numa Node record dict:") for i in self.cpus: logging.info(" %s: %s" % (i, self.dict[i])) + + +def generate_mac_address_simple(): + r = random.SystemRandom() + mac = "9a:%02x:%02x:%02x:%02x:%02x" % (r.randint(0x00, 0xff), + r.randint(0x00, 0xff), + r.randint(0x00, 0xff), + r.randint(0x00, 0xff), + r.randint(0x00, 0xff)) + return mac + + +def guest_active(vm): + o = vm.monitor.info("status") + if isinstance(o, str): + return "status: running" in o + else: + return o.get("status") == "running" + + +def preprocess_images(bindir, params, env): + #Clone master image form vms. + for vm_name in params.get("vms").split(): + vm = env.get_vm(vm_name) + if vm: + vm.destroy(free_mac_addresses=False) + vm_params = params.object_params(vm_name) + for image in vm_params.get("mater_images_clone").split(): + virt_vm.clone_image(params, vm_name, image, bindir) + + +def postprocess_images(bindir, params): + for vm in params.get("vms").split(): + vm_params = params.object_params(vm) + for image in vm_params.get("mater_images_clone").split(): + virt_vm.rm_image(params, vm, image, bindir) + + +class MultihostMigration(object): + """ + Class provide framework for multihost migration. Migration could be run + synchronously and asynchronously. For make recept for multihost migration + is necessary reimplement method migration_scenario. There is possible + start multiple migration in separate threads. self.migrate is thread save. + Only one test using multihost migration framework should be started on + one machine otherway there is necessary solve problem with listen server + port. + Multihost migration framework starts SyncListenServer through which + all message are transfered. Because there is necessary communicate between + multiple hosts which could be in diferent states. + Class SyncData is used for transfer data over network or syncronization + migration process. Synchronization session are recognized by session_id. + There si necessary have shared guest image storage space for multihost + migration. + Example: + class testMultihostMigration(virt_utils.MultihostMigration): + def __init__(self, test, params, env): + super(testMultihostMigration, self).__init__(test, params, env) + + def migration_scenario(self): + srchost = self.params.get("hosts")[0] + dsthost = self.params.get("hosts")[1] + + def worker(mig_data): + vm = env.get_vm("vm1") + session = vm.wait_for_login(timeout=self.login_timeout) + session.sendline("nohup dd if=/dev/zero of=/dev/null &") + session.cmd("killall -0 dd") + + def check_worker(mig_data): + vm = env.get_vm("vm1") + session = vm.wait_for_login(timeout=self.login_timeout) + session.cmd("killall -9 dd") + + #Almost synchroned migration with waiting to end fo migration. + #Work is started only on first VM. + self.migrate_wait(["vm1", "vm2"], srchost, dsthost, + worker, check_worker) + + #Migration started in different threads. + #It allow start multiple migration simultaneously. + mig1 = self.migrate(["vm1"], srchost, dsthost, + worker, check_worker) + mig2 = self.migrate(["vm2"], srchost, dsthost) + mig2.join() + mig1.join() + + mig = testMultihostMigration(test, params, env) + mig.run() + """ + class MigData(object): + def __init__(self, params, srchost, dsthost, vms_name): + """ + Class contain data need for one migration. + """ + self.params = params + + self.source = False + if params.get("hostid") == srchost: + self.source = True + + self.destination = False + if params.get("hostid") == dsthost: + self.destination = True + + self.src = srchost + self.dst = dsthost + self.hosts = [srchost, dsthost] + self.mig_id = {'src': srchost, 'dst': dsthost, "vms": vms_name} + self.vms_name = vms_name + self.vms = [] + self.vm_ports = None + + + def is_src(self): + """ + @return: True if host is source. + """ + return self.source + + + def is_dst(self): + """ + @return: True if host is destination. + """ + return self.destination + + + def __init__(self, test, params, env, preprocess_env=True): + self.test = test + self.params = params + self.env = env + self.hosts = params.get("hosts") + self.hostid = params.get('hostid', "") + self.comm_port = int(params.get("comm_port", 13234)) + vms_count = len(params["vms"].split()) + + self.login_timeout = int(params.get("login_timeout", 360)) + self.disk_prepare_timeout = int(params.get("disk_prepare_timeout", + 160 * vms_count)) + self.finish_timeout = int(params.get("finish_timeout", + 120 * vms_count)) + + self.new_params = None + + if params.get("clone_master") == "yes": + self.clone_master = True + else: + self.clone_master = False + + self.mig_timeout = int(params.get("mig_timeout")) + # Port used to communicate info between source and destination + self.regain_ip_cmd = params.get("regain_ip_cmd", "dhclient") + + self.vm_lock = threading.Lock() + + self.sync_server = None + if self.clone_master: + self.sync_server = SyncListenServer(test.job.resultdir) + + if preprocess_env: + self.preprocess_env() + self._hosts_barrier(self.hosts, self.hosts, 'disk_prepared', + self.disk_prepare_timeout) + + + def migration_scenario(self): + """ + Multi Host migration_scenario is started from methos run where are + exceptions checked. There is not necessary take care about + cleaning after test crash or finish. + """ + raise NotImplementedError + + + def migrate_vms_src(self, mig_data): + """ + Migrate vms source. + + @param mig_Data: Data for migration. + + For change way how machine migrates is necessary + re implement this method. + """ + def mig_warpper(vm, dsthost, vm_ports): + vm.migrate(dest_host=dsthost, remote_port=vm_ports[vm.name]) + + logging.info("Start migrating now...") + multi_mig = [] + for vm in mig_data.vms: + multi_mig.append((mig_warpper, (vm, mig_data.dst, + mig_data.vm_ports))) + parallel(multi_mig) + + + def migrate_vms_dest(self, mig_data): + """ + Migrate vms destination. This function is started on dest host during + migration. + + @param mig_Data: Data for migration. + """ + pass + + + def __del__(self): + if self.sync_server: + self.sync_server.close() + + + def master_id(self): + return self.hosts[0] + + + def _hosts_barrier(self, hosts, session_id, tag, timeout): + logging.debug("Barrier timeout: %d tags: %s" % (timeout, tag)) + tags = SyncData(self.master_id(), self.hostid, hosts, + "%s,%s,barrier" % (str(session_id), tag), + self.sync_server).sync(tag, timeout) + logging.debug("Barrier tag %s" % (tags)) + + + def preprocess_env(self): + """ + Prepare env for start vms. + """ + preprocess_images(self.test.bindir, self.params, self.env) + + + def _check_vms_source(self, mig_data): + for vm in mig_data.vms: + vm.wait_for_login(timeout=self.login_timeout) + + sync = SyncData(self.master_id(), self.hostid, mig_data.hosts, + mig_data.mig_id, self.sync_server) + mig_data.vm_ports = sync.sync(timeout=120)[mig_data.dst] + logging.info("Received from destination the migration port %s", + str(mig_data.vm_ports)) + + + def _check_vms_dest(self, mig_data): + mig_data.vm_ports = {} + for vm in mig_data.vms: + logging.info("Communicating to source migration port %s", + vm.migration_port) + mig_data.vm_ports[vm.name] = vm.migration_port + + SyncData(self.master_id(), self.hostid, + mig_data.hosts, mig_data.mig_id, + self.sync_server).sync(mig_data.vm_ports, timeout=120) + + + def _prepare_params(self, mig_data): + """ + Prepare separate params for vm for migration. + + @param vms_name: List of vms. + """ + new_params = mig_data.params.copy() + new_params["vms"] = " ".join(mig_data.vms_name) + return new_params + + + def _check_vms(self, mig_data): + """ + Check if vms are started correctly. + + @param vms: list of vms. + @param source: Must be True if is source machine. + """ + logging.info("Try check vms %s" % (mig_data.vms_name)) + for vm in mig_data.vms_name: + if not self.env.get_vm(vm) in mig_data.vms: + mig_data.vms.append(self.env.get_vm(vm)) + for vm in mig_data.vms: + logging.info("Check vm %s on host %s" % (vm.name, self.hostid)) + vm.verify_alive() + + if mig_data.is_src(): + self._check_vms_source(mig_data) + else: + self._check_vms_dest(mig_data) + + + def prepare_for_migration(self, mig_data, migration_mode): + """ + Prepare destination of migration for migration. + + @param mig_data: Class with data which is necessary for migration. + @param migration_mode: Migration mode for prepare machine. + """ + new_params = self._prepare_params(mig_data) + + new_params['migration_mode'] = migration_mode + new_params['start_vm'] = 'yes' + with self.vm_lock: + virt_env_process.process(self.test, new_params, self.env, + virt_env_process.preprocess_image, + virt_env_process.preprocess_vm) + + self._check_vms(mig_data) + + + def migrate_vms(self, mig_data): + """ + Migrate vms. + """ + if mig_data.is_src(): + self.migrate_vms_src(mig_data) + else: + self.migrate_vms_dest(mig_data) + + + def check_vms(self, mig_data): + """ + Check vms after migrate. + + @param mig_data: object with migration data. + """ + for vm in mig_data.vms: + if not guest_active(vm): + raise error.TestFail("Guest not active after migration") + + logging.info("Migrated guest appears to be running") + + logging.info("Logging into migrated guest after migration...") + for vm in mig_data.vms: + session_serial = vm.wait_for_serial_login(timeout= + self.login_timeout) + #There is sometime happen that system sends some message on + #serial console and IP renew command block test. Because + #there must be added "sleep" in IP renew command. + session_serial.cmd(self.regain_ip_cmd) + vm.wait_for_login(timeout=self.login_timeout) + + + def postprocess_env(self): + """ + Kill vms and delete cloned images. + """ + postprocess_images(self.test.bindir, self.params) + + + def migrate(self, vms_name, srchost, dsthost, start_work=None, + check_work=None): + """ + Migrate machine from srchost to dsthost. It executes start_work on + source machine before migration and executes check_work on dsthost + after migration. + + Migration execution progress: + + source host | dest host + -------------------------------------------------------- + prepare guest on both side of migration + - start machine and check if machine work + - synchronize transfer data necessary for migraiton + -------------------------------------------------------- + start work on source guests | wait for migration + -------------------------------------------------------- + migrate guest to dest host. + wait on finish migration synchronization + -------------------------------------------------------- + | check work on vms + -------------------------------------------------------- + wait for sync on finish migration + + @param vms_name: List of vms. + @param srchost: src host id. + @param dsthost: dst host id. + @param start_work: Function which is started before migration. + @param check_work: Function which is started after + done of migration. + """ + def migrate_wrap(vms_name, srchost, dsthost, start_work=None, + check_work=None): + logging.info("Starting migrate vms %s from host %s to %s" % + (vms_name, srchost, dsthost)) + error = None + mig_data = self.MigData(self.params, srchost, dsthost, vms_name) + try: + if mig_data.is_src(): + self.prepare_for_migration(mig_data, None) + elif self.hostid == dsthost: + self.prepare_for_migration(mig_data, "tcp") + else: + return + + if mig_data.is_src(): + if start_work: + start_work(mig_data) + + self.migrate_vms(mig_data) + + timeout = 30 + if not mig_data.is_src(): + timeout = self.mig_timeout + self._hosts_barrier(mig_data.hosts, mig_data.mig_id, + 'mig_finished', timeout) + + if mig_data.is_dst(): + self.check_vms(mig_data) + if check_work: + check_work(mig_data) + + except: + error = True + raise + finally: + if not error: + self._hosts_barrier(self.hosts, + mig_data.mig_id, + 'test_finihed', + self.finish_timeout) + + def wait_wrap(vms_name, srchost, dsthost): + mig_data = self.MigData(self.params, srchost, dsthost, vms_name) + timeout = (self.login_timeout + self.mig_timeout + + self.finish_timeout) + + self._hosts_barrier(self.hosts, mig_data.mig_id, + 'test_finihed', timeout) + + if (self.hostid in [srchost, dsthost]): + mig_thread = utils.InterruptedThread(migrate_wrap, (vms_name, + srchost, + dsthost, + start_work, + check_work)) + else: + mig_thread = utils.InterruptedThread(wait_wrap, (vms_name, + srchost, + dsthost)) + mig_thread.start() + return mig_thread + + + def migrate_wait(self, vms_name, srchost, dsthost, start_work=None, + check_work=None): + """ + Migrate machine from srchost to dsthost and wait for finish. + It executes start_work on source machine before migration and executes + check_work on dsthost after migration. + + @param vms_name: List of vms. + @param srchost: src host id. + @param dsthost: dst host id. + @param start_work: Function which is started before migration. + @param check_work: Function which is started after + done of migration. + """ + self.migrate(vms_name, srchost, dsthost, start_work, check_work).join() + + + def cleanup(self): + """ + Cleanup env after test. + """ + if self.clone_master: + self.sync_server.close() + self.postprocess_env() + + + def run(self): + """ + Start multihost migration scenario. + After scenario is finished or if scenario crashed it calls postprocess + machines and cleanup env. + """ + try: + self.migration_scenario() + + self._hosts_barrier(self.hosts, self.hosts, 'all_test_finihed', + self.finish_timeout) + finally: + self.cleanup() diff --git a/client/virt/virt_vm.py b/client/virt/virt_vm.py index 06db7a6..b177458 100644 --- a/client/virt/virt_vm.py +++ b/client/virt/virt_vm.py @@ -323,6 +323,57 @@ def get_image_filename(params, root_dir): return image_filename +def clone_image(params, vm_name, image_name, root_dir): + """ + Clone master image to vm specific file. + + @param params: Dictionary containing the test parameters. + @param vm_name: Vm name. + @param image_name: Master image name. + @param root_dir: Base directory for relative filenames. + """ + if not params.get("image_name_%s_%s" % (image_name, vm_name)): + m_image_name = params.get("image_name", "image") + vm_image_name = "%s_%s" % (m_image_name, vm_name) + if params.get("clone_master", "yes") == "yes": + image_params = params.object_params(image_name) + image_params["image_name"] = vm_image_name + + m_image_fn = get_image_filename(params, root_dir) + image_fn = get_image_filename(image_params, root_dir) + + logging.info("Clone master image for vms.") + utils.run(params.get("image_clone_commnad") % (m_image_fn, + image_fn)) + + params["image_name_%s_%s" % (image_name, vm_name)] = vm_image_name + + +def rm_image(params, vm_name, image_name, root_dir): + """ + Remove vm specific file. + + @param params: Dictionary containing the test parameters. + @param vm_name: Vm name. + @param image_name: Master image name. + @param root_dir: Base directory for relative filenames. + """ + if params.get("image_name_%s_%s" % (image_name, vm_name)): + m_image_name = params.get("image_name", "image") + vm_image_name = "%s_%s" % (m_image_name, vm_name) + if params.get("clone_master", "yes") == "yes": + image_params = params.object_params(image_name) + image_params["image_name"] = vm_image_name + + image_fn = get_image_filename(image_params, root_dir) + + logging.debug("Removing vm specific image file %s", image_fn) + if os.path.exists(image_fn): + utils.run(params.get("image_remove_commnad") % (image_fn)) + else: + logging.debug("Image file %s not found", image_fn) + + def create_image(params, root_dir): """ Create an image using qemu_image. -- 1.7.7.6 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html