(Difference from previous version: - Put most of the code in a separate module - Wait for workers to finish freeing their used resources before starting tests that require those resources) This patch adds a control.parallel file that runs several test execution pipelines in parallel. The number of pipelines is set to the number of CPUs reported by /proc/cpuinfo. It can be changed by modifying the control file. The total amount of RAM defaults to 3/4 times what 'free -m' reports. The scheduler's job is to make sure tests run in parallel only when there are sufficient resources to allow it. For example, a test that requires 2 CPUs will not run together with a test that requires 3 CPUs on a 4 CPU machine. The same logic applies to RAM. Note that tests that require more CPUs and/or more RAM than the machine has are allowed to run alone, e.g. a test that requires 3GB of RAM is allowed to run on a machine with only 2GB of RAM, but no tests will run in parallel to it. Currently TAP networking isn't supported by this scheduler because the main MAC address pool must be divided between the pipelines ("workers"). This should be straightforward to do but I haven't had the time to do it yet. scan_results.py can be used to list the test results during and after execution. Signed-off-by: Michael Goldish <mgoldish@xxxxxxxxxx> --- client/tests/kvm/control.parallel | 176 ++++++++++++++++++++++++++++ client/tests/kvm/kvm_scheduler.py | 229 +++++++++++++++++++++++++++++++++++++ 2 files changed, 405 insertions(+), 0 deletions(-) create mode 100644 client/tests/kvm/control.parallel create mode 100644 client/tests/kvm/kvm_scheduler.py diff --git a/client/tests/kvm/control.parallel b/client/tests/kvm/control.parallel new file mode 100644 index 0000000..cf268ea --- /dev/null +++ b/client/tests/kvm/control.parallel @@ -0,0 +1,176 @@ +AUTHOR = """ +uril@xxxxxxxxxx (Uri Lublin) +drusso@xxxxxxxxxx (Dror Russo) +mgoldish@xxxxxxxxxx (Michael Goldish) +dhuff@xxxxxxxxxx (David Huff) +aeromenk@xxxxxxxxxx (Alexey Eromenko) +mburns@xxxxxxxxxx (Mike Burns) +""" +TIME = 'SHORT' +NAME = 'KVM test' +TEST_TYPE = 'client' +TEST_CLASS = 'Virtualization' +TEST_CATEGORY = 'Functional' + +DOC = """ +Executes the KVM test framework on a given host. This module is separated in +minor functions, that execute different tests for doing Quality Assurance on +KVM (both kernelspace and userspace) code. +""" + + +import sys, os, commands, re + +#----------------------------------------------------------------------------- +# set English environment (command output might be localized, need to be safe) +#----------------------------------------------------------------------------- +os.environ['LANG'] = 'en_US.UTF-8' + +#--------------------------------------------------------- +# Enable modules import from current directory (tests/kvm) +#--------------------------------------------------------- +pwd = os.path.join(os.environ['AUTODIR'],'tests/kvm') +sys.path.append(pwd) + +# ------------------------ +# create required symlinks +# ------------------------ +# When dispatching tests from autotest-server the links we need do not exist on +# the host (the client). The following lines create those symlinks. Change +# 'rootdir' here and/or mount appropriate directories in it. +# +# When dispatching tests on local host (client mode) one can either setup kvm +# links, or same as server mode use rootdir and set all appropriate links and +# mount-points there. For example, guest installation tests need to know where +# to find the iso-files. +# +# We create the links only if not already exist, so if one already set up the +# links for client/local run we do not touch the links. +rootdir='/tmp/kvm_autotest_root' +iso=os.path.join(rootdir, 'iso') +images=os.path.join(rootdir, 'images') +qemu=os.path.join(rootdir, 'qemu') +qemu_img=os.path.join(rootdir, 'qemu-img') + + +def link_if_not_exist(ldir, target, link_name): + t = target + l = os.path.join(ldir, link_name) + if not os.path.exists(l): + os.system('ln -s %s %s' % (t, l)) + +# Create links only if not already exist +link_if_not_exist(pwd, '../../', 'autotest') +link_if_not_exist(pwd, iso, 'isos') +link_if_not_exist(pwd, images, 'images') +link_if_not_exist(pwd, qemu, 'qemu') +link_if_not_exist(pwd, qemu_img, 'qemu-img') + +# -------------------------------------------------------- +# Params that will be passed to the KVM install/build test +# -------------------------------------------------------- +params = { + "name": "kvm_install", + "shortname": "kvm_install", + "type": "kvm_install", + "mode": "release", + #"mode": "snapshot", + #"mode": "localtar", + #"mode": "localsrc", + #"mode": "git", + #"mode": "noinstall", + + ## Are we going to load modules built by this test? + ## Defaults to 'yes', so if you are going to provide only userspace code to + ## be built by this test, please set load_modules to 'no', and make sure + ## the kvm and kvm-[vendor] module is already loaded by the time you start + ## it. + #"load_modules": "no", + + ## Install from a kvm release ("mode": "release"). You can optionally + ## specify a release tag. If you omit it, the test will get the latest + ## release tag available. + #"release_tag": '84', + "release_dir": 'http://downloads.sourceforge.net/kvm/', + + ## Install from a kvm snapshot location ("mode": "snapshot"). You can + ## optionally specify a snapshot date. If you omit it, the test will get + ## yesterday's snapshot. + #"snapshot_date": '20090712' + #"snapshot_dir": 'http://foo.org/kvm-snapshots/', + + ## Install from a tarball ("mode": "localtar") + #"tarball": "/tmp/kvm-84.tar.gz", + + ## Install from a local source code dir ("mode": "localsrc") + #"srcdir": "/path/to/source-dir" + + ## Install from git ("mode": "git") + ## If you provide only "git_repo" and "user_git_repo", the build test + ## will assume it will perform all build from the userspace dir, building + ## modules trough make -C kernel LINUX=%s sync. As of today (07-13-2009) + ## we need 3 git repos, "git_repo" (linux sources), "user_git_repo" and + ## "kmod_repo" to build KVM userspace + kernel modules. + #"git_repo": 'git://git.kernel.org/pub/scm/linux/kernel/git/avi/kvm.git', + #"user_git_repo": 'git://git.kernel.org/pub/scm/virt/kvm/qemu-kvm.git', + #"kmod_repo": 'git://git.kernel.org/pub/scm/virt/kvm/kvm-kmod.git' +} + +# If you don't want to execute the build stage, just use 'noinstall' as the +# install type. If you run the tests from autotest-server, make sure that +# /tmp/kvm-autotest-root/qemu is a link to your existing executable. Note that +# if kvm_install is chose to run, it overwrites existing qemu and qemu-img +# links to point to the newly built executables. +r = True +r = job.run_test("kvm", params=params, tag=params.get("shortname")) +if not r: + print 'kvm_installation failed ... exiting' + sys.exit(1) + +# ---------------------------------------------------------- +# Get test set (dictionary list) from the configuration file +# ---------------------------------------------------------- +import kvm_config + +filename = os.path.join(pwd, "kvm_tests.cfg") +cfg = kvm_config.config(filename) + +# If desirable, make changes to the test configuration here. For example: +# cfg.parse_string("only fc8_quick") +# cfg.parse_string("display = sdl") + +filename = os.path.join(pwd, "kvm_address_pools.cfg") +if os.path.exists(filename): + cfg.parse_file(filename) + hostname = os.uname()[1].split(".")[0] + if cfg.filter("^" + hostname): + cfg.parse_string("only ^%s" % hostname) + else: + cfg.parse_string("only ^default_host") + +tests = cfg.get_list() + + +# ------------- +# Run the tests +# ------------- +import kvm_scheduler + +# total_cpus defaults to the number of CPUs reported by /proc/cpuinfo +total_cpus = len(re.findall(r"\bprocessor\s*:", + open("/proc/cpuinfo").read(), re.IGNORECASE)) +# total_mem defaults to 3/4 of the total memory reported by 'free' +total_mem = int(commands.getoutput("free -m").splitlines()[1].split()[1]) * 3/4 +# We probably won't need more workers than CPUs +num_workers = total_cpus + +# Start the scheduler and workers +s = kvm_scheduler.scheduler(tests, num_workers, total_cpus, total_mem, pwd) +job.parallel([s.scheduler], + *[(s.worker, i, job.run_test) for i in range(num_workers)]) + + +# create the html report in result dir +reporter = os.path.join(pwd, 'make_html_report.py') +html_file = os.path.join(job.resultdir,'results.html') +os.system('%s -r %s -f %s -R'%(reporter, job.resultdir, html_file)) diff --git a/client/tests/kvm/kvm_scheduler.py b/client/tests/kvm/kvm_scheduler.py new file mode 100644 index 0000000..93b7df6 --- /dev/null +++ b/client/tests/kvm/kvm_scheduler.py @@ -0,0 +1,229 @@ +import os, select +import kvm_utils, kvm_vm, kvm_subprocess + + +class scheduler: + """ + A scheduler that manages several parallel test execution pipelines on a + single host. + """ + + def __init__(self, tests, num_workers, total_cpus, total_mem, bindir): + """ + Initialize the class. + + @param tests: A list of test dictionaries. + @param num_workers: The number of workers (pipelines). + @param total_cpus: The total number of CPUs to dedicate to tests. + @param total_mem: The total amount of memory to dedicate to tests. + @param bindir: The directory where environment files reside. + """ + self.tests = tests + self.num_workers = num_workers + self.total_cpus = total_cpus + self.total_mem = total_mem + self.bindir = bindir + # Pipes -- s stands for scheduler, w stands for worker + self.s2w = [os.pipe() for i in range(num_workers)] + self.w2s = [os.pipe() for i in range(num_workers)] + self.s2w_r = [os.fdopen(r, "r", 0) for r, w in self.s2w] + self.s2w_w = [os.fdopen(w, "w", 0) for r, w in self.s2w] + self.w2s_r = [os.fdopen(r, "r", 0) for r, w in self.w2s] + self.w2s_w = [os.fdopen(w, "w", 0) for r, w in self.w2s] + # "Personal" worker dicts contain modifications that are applied + # specifically to each worker. For example, each worker must use a + # different environment file and a different MAC address pool. + self.worker_dicts = [{"env": "env%d" % i} for i in range(num_workers)] + + + def worker(self, index, run_test_func): + """ + The worker function. + + Waits for commands from the scheduler and processes them. + + @param index: The index of this worker (in the range 0..num_workers-1). + @param run_test_func: A function to be called to run a test + (e.g. job.run_test). + """ + r = self.s2w_r[index] + w = self.w2s_w[index] + self_dict = self.worker_dicts[index] + + # Inform the scheduler this worker is ready + w.write("ready\n") + + while True: + cmd = r.readline().split() + if not cmd: + continue + + # The scheduler wants this worker to run a test + if cmd[0] == "run": + test_index = int(cmd[1]) + test = self.tests[test_index].copy() + test.update(self_dict) + test_iterations = int(test.get("iterations", 1)) + status = run_test_func("kvm", params=test, + tag=test.get("shortname"), + iterations=test_iterations) + w.write("done %s %s\n" % (test_index, status)) + w.write("ready\n") + + # The scheduler wants this worker to free its used resources + elif cmd[0] == "cleanup": + env_filename = os.path.join(self.bindir, self_dict["env"]) + env = kvm_utils.load_env(env_filename, {}) + for obj in env.values(): + if isinstance(obj, kvm_vm.VM): + obj.destroy() + elif isinstance(obj, kvm_subprocess.kvm_spawn): + obj.close() + kvm_utils.dump_env(env, env_filename) + w.write("cleanup_done\n") + w.write("ready\n") + + # There's no more work for this worker + elif cmd[0] == "terminate": + break + + + def scheduler(self): + """ + The scheduler function. + + Sends commands to workers, telling them to run tests, clean up or + terminate execution. + """ + idle_workers = [] + closing_workers = [] + test_status = ["waiting"] * len(self.tests) + test_worker = [None] * len(self.tests) + used_cpus = [0] * self.num_workers + used_mem = [0] * self.num_workers + + while True: + # Wait for a message from a worker + r, w, x = select.select(self.w2s_r, [], []) + + someone_is_ready = False + + for pipe in r: + worker_index = self.w2s_r.index(pipe) + msg = pipe.readline().split() + if not msg: + continue + + # A worker is ready -- add it to the idle_workers list + if msg[0] == "ready": + idle_workers.append(worker_index) + someone_is_ready = True + + # A worker completed a test + elif msg[0] == "done": + test_index = int(msg[1]) + test = self.tests[test_index] + status = int(eval(msg[2])) + test_status[test_index] = ("fail", "pass")[status] + # If the test failed, mark all dependent tests as "failed" too + if not status: + for i, other_test in enumerate(self.tests): + for dep in other_test.get("depend", []): + if dep in test["name"]: + test_status[i] = "fail" + + # A worker is done shutting down its VMs and other processes + elif msg[0] == "cleanup_done": + used_cpus[worker_index] = 0 + used_mem[worker_index] = 0 + closing_workers.remove(worker_index) + + if not someone_is_ready: + continue + + for worker in idle_workers[:]: + # Find a test for this worker + test_found = False + for i, test in enumerate(self.tests): + # We only want "waiting" tests + if test_status[i] != "waiting": + continue + # Make sure the test isn't assigned to another worker + if test_worker[i] is not None and test_worker[i] != worker: + continue + # Make sure the test's dependencies are satisfied + dependencies_satisfied = True + for dep in test["depend"]: + dependencies = [j for j, t in enumerate(self.tests) + if dep in t["name"]] + bad_status_deps = [j for j in dependencies + if test_status[j] != "pass"] + if bad_status_deps: + dependencies_satisfied = False + break + if not dependencies_satisfied: + continue + # Make sure we have enough resources to run the test + test_used_cpus = int(test.get("used_cpus", 1)) + test_used_mem = int(test.get("used_mem", 128)) + # First make sure the other workers aren't using too many + # CPUs (not including the workers currently shutting down) + uc = (sum(used_cpus) - used_cpus[worker] - + sum(used_cpus[i] for i in closing_workers)) + if uc and uc + test_used_cpus > self.total_cpus: + continue + # ... or too much memory + um = (sum(used_mem) - used_mem[worker] - + sum(used_mem[i] for i in closing_workers)) + if um and um + test_used_mem > self.total_mem: + continue + # If we reached this point it means there are, or will + # soon be, enough resources to run the test + test_found = True + # Now check if the test can be run right now, i.e. if the + # other workers, including the ones currently shutting + # down, aren't using too many CPUs + uc = (sum(used_cpus) - used_cpus[worker]) + if uc and uc + test_used_cpus > self.total_cpus: + continue + # ... or too much memory + um = (sum(used_mem) - used_mem[worker]) + if um and um + test_used_mem > self.total_mem: + continue + # Everything is OK -- run the test + test_status[i] = "running" + test_worker[i] = worker + idle_workers.remove(worker) + # Update used_cpus and used_mem + used_cpus[worker] = test_used_cpus + used_mem[worker] = test_used_mem + # Assign all related tests to this worker + for j, other_test in enumerate(self.tests): + for other_dep in other_test["depend"]: + # All tests that depend on this test + if other_dep in test["name"]: + test_worker[j] = worker + break + # ... and all tests that share a dependency + # with this test + for dep in test["depend"]: + if dep in other_dep or other_dep in dep: + test_worker[j] = worker + break + # Tell the worker to run the test + self.s2w_w[worker].write("run %s\n" % i) + break + + # If there won't be any tests for this worker to run soon, tell + # the worker to free its used resources + if not test_found and (used_cpus[worker] or used_mem[worker]): + self.s2w_w[worker].write("cleanup\n") + idle_workers.remove(worker) + closing_workers.append(worker) + + # If there are no more new tests to run, terminate the workers and + # the scheduler + if len(idle_workers) == self.num_workers: + for worker in idle_workers: + self.s2w_w[worker].write("terminate\n") + break -- 1.5.4.1 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html