From: Michael Goldish <mgoldish@xxxxxxxxxx> This patch adds a control.parallel file that runs several test execution pipelines in parallel. The number of pipelines is set to the number of CPUs reported by /proc/cpuinfo. It can be changed by modifying the control file. The total amount of RAM defaults to 3/4 times what 'free -m' reports. The scheduler's job is to make sure tests run in parallel only when there are sufficient resources to allow it. For example, a test that requires 2 CPUs will not run together with a test that requires 3 CPUs on a 4 CPU machine. The same logic applies to RAM. Note that tests that require more CPUs and/or more RAM than the machine has are allowed to run alone, e.g. a test that requires 3GB of RAM is allowed to run on a machine with only 2GB of RAM, but no tests will run in parallel to it. Currently TAP networking isn't supported by this scheduler because the main MAC address pool must be divided between the pipelines ("workers"). This should be straightforward to do but I haven't had the time to do it yet. scan_results.py can be used to list the test results during and after execution. v4: * Updated the install part to be in sync with the current control file * Blended this patch with the one that add scheduler parameters * Instead of custom code to figure number of cpus, used an autotest utils function Signed-off-by: Michael Goldish <mgoldish@xxxxxxxxxx> --- client/tests/kvm/control.parallel | 204 +++++++++++++++++++++++++++++ client/tests/kvm/kvm_scheduler.py | 229 +++++++++++++++++++++++++++++++++ client/tests/kvm/kvm_tests.cfg.sample | 18 +++- 3 files changed, 449 insertions(+), 2 deletions(-) create mode 100644 client/tests/kvm/control.parallel create mode 100644 client/tests/kvm/kvm_scheduler.py diff --git a/client/tests/kvm/control.parallel b/client/tests/kvm/control.parallel new file mode 100644 index 0000000..5c1f20d --- /dev/null +++ b/client/tests/kvm/control.parallel @@ -0,0 +1,204 @@ +AUTHOR = """ +uril@xxxxxxxxxx (Uri Lublin) +drusso@xxxxxxxxxx (Dror Russo) +mgoldish@xxxxxxxxxx (Michael Goldish) +dhuff@xxxxxxxxxx (David Huff) +aeromenk@xxxxxxxxxx (Alexey Eromenko) +mburns@xxxxxxxxxx (Mike Burns) +""" +TIME = 'SHORT' +NAME = 'KVM test' +TEST_TYPE = 'client' +TEST_CLASS = 'Virtualization' +TEST_CATEGORY = 'Functional' + +DOC = """ +Executes the KVM test framework on a given host. This module is separated in +minor functions, that execute different tests for doing Quality Assurance on +KVM (both kernelspace and userspace) code. +""" + + +import sys, os, commands, re + +#----------------------------------------------------------------------------- +# set English environment (command output might be localized, need to be safe) +#----------------------------------------------------------------------------- +os.environ['LANG'] = 'en_US.UTF-8' + +#--------------------------------------------------------- +# Enable modules import from current directory (tests/kvm) +#--------------------------------------------------------- +pwd = os.path.join(os.environ['AUTODIR'],'tests/kvm') +sys.path.append(pwd) + +# ------------------------ +# create required symlinks +# ------------------------ +# When dispatching tests from autotest-server the links we need do not exist on +# the host (the client). The following lines create those symlinks. Change +# 'rootdir' here and/or mount appropriate directories in it. +# +# When dispatching tests on local host (client mode) one can either setup kvm +# links, or same as server mode use rootdir and set all appropriate links and +# mount-points there. For example, guest installation tests need to know where +# to find the iso-files. +# +# We create the links only if not already exist, so if one already set up the +# links for client/local run we do not touch the links. +rootdir='/tmp/kvm_autotest_root' +iso=os.path.join(rootdir, 'iso') +images=os.path.join(rootdir, 'images') +qemu=os.path.join(rootdir, 'qemu') +qemu_img=os.path.join(rootdir, 'qemu-img') + + +def link_if_not_exist(ldir, target, link_name): + t = target + l = os.path.join(ldir, link_name) + if not os.path.exists(l): + os.system('ln -s %s %s' % (t, l)) + +# Create links only if not already exist +link_if_not_exist(pwd, '../../', 'autotest') +link_if_not_exist(pwd, iso, 'isos') +link_if_not_exist(pwd, images, 'images') +link_if_not_exist(pwd, qemu, 'qemu') +link_if_not_exist(pwd, qemu_img, 'qemu-img') + +# -------------------------------------------------------- +# Params that will be passed to the KVM install/build test +# -------------------------------------------------------- +params = { + "name": "build", + "shortname": "build", + "type": "build", + "mode": "release", + #"mode": "snapshot", + #"mode": "localtar", + #"mode": "localsrc", + #"mode": "git", + #"mode": "noinstall", + #"mode": "koji", + + ## Are we going to load modules built by this test? + ## Defaults to 'yes', so if you are going to provide only userspace code to + ## be built by this test, please set load_modules to 'no', and make sure + ## the kvm and kvm-[vendor] module is already loaded by the time you start + ## it. + "load_modules": "no", + + ## Install from a kvm release ("mode": "release"). You can optionally + ## specify a release tag. If you omit it, the test will get the latest + ## release tag available. + #"release_tag": '84', + "release_dir": 'http://downloads.sourceforge.net/project/kvm/', + # This is the place that contains the sourceforge project list of files + "release_listing": 'http://sourceforge.net/projects/kvm/files/', + + ## Install from a kvm snapshot location ("mode": "snapshot"). You can + ## optionally specify a snapshot date. If you omit it, the test will get + ## yesterday's snapshot. + #"snapshot_date": '20090712' + #"snapshot_dir": 'http://foo.org/kvm-snapshots/', + + ## Install from a tarball ("mode": "localtar") + #"tarball": "/tmp/kvm-84.tar.gz", + + ## Install from a local source code dir ("mode": "localsrc") + #"srcdir": "/path/to/source-dir" + + ## Install from koji build server ("mode": "koji") + ## Koji is the Fedora Project buildserver. It is possible to install + ## packages right from Koji if you provide a release tag or a build. + ## Tag (if available) + #"koji_tag": 'dist-f11', + ## Build (if available, is going to override tag). + #"koji_build": 'qemu-0.10-16.fc11', + ## Command to interact with the build server + #"koji_cmd": '/usr/bin/koji', + ## The name of the source package that's being built + #"src_pkg": 'qemu', + ## Name of the rpms we need installed + #"pkg_list": ['qemu-kvm', 'qemu-kvm-tools', 'qemu-system-x86', 'qemu-common', 'qemu-img'], + ## Paths of the qemu relevant executables that should be checked + #"qemu_bin_paths": ['/usr/bin/qemu-kvm', '/usr/bin/qemu-img'], + + ## Install from git ("mode": "git") + ## If you provide only "git_repo" and "user_git_repo", the build test + ## will assume it will perform all build from the userspace dir, building + ## modules trough make -C kernel LINUX=%s sync. As of today (07-13-2009) + ## we need 3 git repos, "git_repo" (linux sources), "user_git_repo" and + ## "kmod_repo" to build KVM userspace + kernel modules. + #"git_repo": 'git://git.kernel.org/pub/scm/linux/kernel/git/avi/kvm.git', + #"kernel_branch": 'kernel_branch_name', + #"kernel_lbranch": 'kernel_lbranch_name', + #"kernel_tag": 'kernel_tag_name', + #"user_git_repo": 'git://git.kernel.org/pub/scm/virt/kvm/qemu-kvm.git', + #"user_branch": 'user_branch_name', + #"user_lbranch": 'user_lbranch_name', + #"user_tag": 'user_tag_name', + #"kmod_repo": 'git://git.kernel.org/pub/scm/virt/kvm/kvm-kmod.git', + #"kmod_branch": 'kmod_branch_name', + #"kmod_lbranch": 'kmod_lbranch_name', + #"kmod_tag": 'kmod_tag_name', +} + +# If you don't want to execute the build stage, just use 'noinstall' as the +# install type. If you run the tests from autotest-server, make sure that +# /tmp/kvm-autotest-root/qemu is a link to your existing executable. Note that +# if kvm_install is chose to run, it overwrites existing qemu and qemu-img +# links to point to the newly built executables. +r = True +r = job.run_test("kvm", params=params, tag=params.get("shortname")) +if not r: + print 'kvm_installation failed ... exiting' + sys.exit(1) + +# ---------------------------------------------------------- +# Get test set (dictionary list) from the configuration file +# ---------------------------------------------------------- +import kvm_config + +filename = os.path.join(pwd, "kvm_tests.cfg") +cfg = kvm_config.config(filename) + +# If desirable, make changes to the test configuration here. For example: +# cfg.parse_string("only fc8_quick") +# cfg.parse_string("display = sdl") + +filename = os.path.join(pwd, "kvm_address_pools.cfg") +if os.path.exists(filename): + cfg.parse_file(filename) + hostname = os.uname()[1].split(".")[0] + if cfg.filter("^" + hostname): + cfg.parse_string("only ^%s" % hostname) + else: + cfg.parse_string("only ^default_host") + +tests = cfg.get_list() + + +# ------------- +# Run the tests +# ------------- +import kvm_scheduler +from autotest_lib.client.bin import utils + +# total_cpus defaults to the number of CPUs reported by /proc/cpuinfo +total_cpus = utils.count_cpus() +# total_mem defaults to 3/4 of the total memory reported by 'free' +total_mem = int(commands.getoutput("free -m").splitlines()[1].split()[1]) * 3/4 +# We probably won't need more workers than CPUs +num_workers = total_cpus + +# Start the scheduler and workers +s = kvm_scheduler.scheduler(tests, num_workers, total_cpus, total_mem, pwd) +job.parallel([s.scheduler], + *[(s.worker, i, job.run_test) for i in range(num_workers)]) + + +# create the html report in result dir +reporter = os.path.join(pwd, 'make_html_report.py') +html_file = os.path.join(job.resultdir,'results.html') +os.system('%s -r %s -f %s -R'%(reporter, job.resultdir, html_file)) diff --git a/client/tests/kvm/kvm_scheduler.py b/client/tests/kvm/kvm_scheduler.py new file mode 100644 index 0000000..93b7df6 --- /dev/null +++ b/client/tests/kvm/kvm_scheduler.py @@ -0,0 +1,229 @@ +import os, select +import kvm_utils, kvm_vm, kvm_subprocess + + +class scheduler: + """ + A scheduler that manages several parallel test execution pipelines on a + single host. + """ + + def __init__(self, tests, num_workers, total_cpus, total_mem, bindir): + """ + Initialize the class. + + @param tests: A list of test dictionaries. + @param num_workers: The number of workers (pipelines). + @param total_cpus: The total number of CPUs to dedicate to tests. + @param total_mem: The total amount of memory to dedicate to tests. + @param bindir: The directory where environment files reside. + """ + self.tests = tests + self.num_workers = num_workers + self.total_cpus = total_cpus + self.total_mem = total_mem + self.bindir = bindir + # Pipes -- s stands for scheduler, w stands for worker + self.s2w = [os.pipe() for i in range(num_workers)] + self.w2s = [os.pipe() for i in range(num_workers)] + self.s2w_r = [os.fdopen(r, "r", 0) for r, w in self.s2w] + self.s2w_w = [os.fdopen(w, "w", 0) for r, w in self.s2w] + self.w2s_r = [os.fdopen(r, "r", 0) for r, w in self.w2s] + self.w2s_w = [os.fdopen(w, "w", 0) for r, w in self.w2s] + # "Personal" worker dicts contain modifications that are applied + # specifically to each worker. For example, each worker must use a + # different environment file and a different MAC address pool. + self.worker_dicts = [{"env": "env%d" % i} for i in range(num_workers)] + + + def worker(self, index, run_test_func): + """ + The worker function. + + Waits for commands from the scheduler and processes them. + + @param index: The index of this worker (in the range 0..num_workers-1). + @param run_test_func: A function to be called to run a test + (e.g. job.run_test). + """ + r = self.s2w_r[index] + w = self.w2s_w[index] + self_dict = self.worker_dicts[index] + + # Inform the scheduler this worker is ready + w.write("ready\n") + + while True: + cmd = r.readline().split() + if not cmd: + continue + + # The scheduler wants this worker to run a test + if cmd[0] == "run": + test_index = int(cmd[1]) + test = self.tests[test_index].copy() + test.update(self_dict) + test_iterations = int(test.get("iterations", 1)) + status = run_test_func("kvm", params=test, + tag=test.get("shortname"), + iterations=test_iterations) + w.write("done %s %s\n" % (test_index, status)) + w.write("ready\n") + + # The scheduler wants this worker to free its used resources + elif cmd[0] == "cleanup": + env_filename = os.path.join(self.bindir, self_dict["env"]) + env = kvm_utils.load_env(env_filename, {}) + for obj in env.values(): + if isinstance(obj, kvm_vm.VM): + obj.destroy() + elif isinstance(obj, kvm_subprocess.kvm_spawn): + obj.close() + kvm_utils.dump_env(env, env_filename) + w.write("cleanup_done\n") + w.write("ready\n") + + # There's no more work for this worker + elif cmd[0] == "terminate": + break + + + def scheduler(self): + """ + The scheduler function. + + Sends commands to workers, telling them to run tests, clean up or + terminate execution. + """ + idle_workers = [] + closing_workers = [] + test_status = ["waiting"] * len(self.tests) + test_worker = [None] * len(self.tests) + used_cpus = [0] * self.num_workers + used_mem = [0] * self.num_workers + + while True: + # Wait for a message from a worker + r, w, x = select.select(self.w2s_r, [], []) + + someone_is_ready = False + + for pipe in r: + worker_index = self.w2s_r.index(pipe) + msg = pipe.readline().split() + if not msg: + continue + + # A worker is ready -- add it to the idle_workers list + if msg[0] == "ready": + idle_workers.append(worker_index) + someone_is_ready = True + + # A worker completed a test + elif msg[0] == "done": + test_index = int(msg[1]) + test = self.tests[test_index] + status = int(eval(msg[2])) + test_status[test_index] = ("fail", "pass")[status] + # If the test failed, mark all dependent tests as "failed" too + if not status: + for i, other_test in enumerate(self.tests): + for dep in other_test.get("depend", []): + if dep in test["name"]: + test_status[i] = "fail" + + # A worker is done shutting down its VMs and other processes + elif msg[0] == "cleanup_done": + used_cpus[worker_index] = 0 + used_mem[worker_index] = 0 + closing_workers.remove(worker_index) + + if not someone_is_ready: + continue + + for worker in idle_workers[:]: + # Find a test for this worker + test_found = False + for i, test in enumerate(self.tests): + # We only want "waiting" tests + if test_status[i] != "waiting": + continue + # Make sure the test isn't assigned to another worker + if test_worker[i] is not None and test_worker[i] != worker: + continue + # Make sure the test's dependencies are satisfied + dependencies_satisfied = True + for dep in test["depend"]: + dependencies = [j for j, t in enumerate(self.tests) + if dep in t["name"]] + bad_status_deps = [j for j in dependencies + if test_status[j] != "pass"] + if bad_status_deps: + dependencies_satisfied = False + break + if not dependencies_satisfied: + continue + # Make sure we have enough resources to run the test + test_used_cpus = int(test.get("used_cpus", 1)) + test_used_mem = int(test.get("used_mem", 128)) + # First make sure the other workers aren't using too many + # CPUs (not including the workers currently shutting down) + uc = (sum(used_cpus) - used_cpus[worker] - + sum(used_cpus[i] for i in closing_workers)) + if uc and uc + test_used_cpus > self.total_cpus: + continue + # ... or too much memory + um = (sum(used_mem) - used_mem[worker] - + sum(used_mem[i] for i in closing_workers)) + if um and um + test_used_mem > self.total_mem: + continue + # If we reached this point it means there are, or will + # soon be, enough resources to run the test + test_found = True + # Now check if the test can be run right now, i.e. if the + # other workers, including the ones currently shutting + # down, aren't using too many CPUs + uc = (sum(used_cpus) - used_cpus[worker]) + if uc and uc + test_used_cpus > self.total_cpus: + continue + # ... or too much memory + um = (sum(used_mem) - used_mem[worker]) + if um and um + test_used_mem > self.total_mem: + continue + # Everything is OK -- run the test + test_status[i] = "running" + test_worker[i] = worker + idle_workers.remove(worker) + # Update used_cpus and used_mem + used_cpus[worker] = test_used_cpus + used_mem[worker] = test_used_mem + # Assign all related tests to this worker + for j, other_test in enumerate(self.tests): + for other_dep in other_test["depend"]: + # All tests that depend on this test + if other_dep in test["name"]: + test_worker[j] = worker + break + # ... and all tests that share a dependency + # with this test + for dep in test["depend"]: + if dep in other_dep or other_dep in dep: + test_worker[j] = worker + break + # Tell the worker to run the test + self.s2w_w[worker].write("run %s\n" % i) + break + + # If there won't be any tests for this worker to run soon, tell + # the worker to free its used resources + if not test_found and (used_cpus[worker] or used_mem[worker]): + self.s2w_w[worker].write("cleanup\n") + idle_workers.remove(worker) + closing_workers.append(worker) + + # If there are no more new tests to run, terminate the workers and + # the scheduler + if len(idle_workers) == self.num_workers: + for worker in idle_workers: + self.s2w_w[worker].write("terminate\n") + break diff --git a/client/tests/kvm/kvm_tests.cfg.sample b/client/tests/kvm/kvm_tests.cfg.sample index ceda7b3..61949ed 100644 --- a/client/tests/kvm/kvm_tests.cfg.sample +++ b/client/tests/kvm/kvm_tests.cfg.sample @@ -22,6 +22,10 @@ image_size = 10G shell_port = 22 display = vnc +# Default scheduler params +used_cpus = 1 +used_mem = 512 + # Port redirections redirs = remote_shell guest_port_remote_shell = 22 @@ -66,6 +70,7 @@ variants: migration_test_command = help kill_vm_on_error = yes iterations = 2 + used_mem = 1024 - autotest: install setup type = autotest @@ -112,6 +117,9 @@ variants: drift_threshold = 50 # Fail if the drift after the rest period is higher than 10% drift_threshold_after_rest = 10 + # For now, make sure this test is executed alone + used_cpus = 100 + - stress_boot: install setup type = stress_boot @@ -122,6 +130,9 @@ variants: kill_vm_vm1 = no kill_vm_gracefully = no extra_params += " -snapshot" + used_cpus = 5 + used_mem = 2560 + - autoit: install setup type = autoit @@ -132,7 +143,7 @@ variants: - notepad: autoit_script = autoit/notepad1.au3 - - nic_hotplug: + - nic_hotplug: install setup type = pci_hotplug pci_type = nic modprobe_acpiphp = no @@ -151,7 +162,7 @@ variants: pci_model = e1000 match_string = "Gigabit Ethernet Controller" - - block_hotplug: + - block_hotplug: install setup type = pci_hotplug pci_type = block reference_cmd = lspci @@ -641,6 +652,9 @@ variants: - @up: - smp2: extra_params += " -smp 2" + used_cpus = 2 + stress_boot: used_cpus = 10 + timedrift: used_cpus = 100 variants: -- 1.6.2.5 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html