* Michael Goldish <mgoldish@xxxxxxxxxx> [2009-05-17 09:50]: > Hi all, > > We've recently implemented a very simple form of parallel test > execution into KVM-Autotest and we'd like some feedback on it. This > suggestion allows the user to manually assign tests to hosts/queues. > It also takes care of assigning different MAC address ranges to > hosts/queues. By 'queues' I mean parallel execution pipelines. Each > host has one or more queues. The number of queues is defined by the > user, and should reflect the capabilities of the host. > > This implementation involves only minor modifications to the code > itself; most of the work is done in a new config file kvm_hosts.cfg, > which has the exact same format as kvm_tests.cfg. The new file > provides the framework with information about hosts/queues. The new > file is parsed after kvm_tests.cfg. The test sets (such as > 'nightly' and 'weekly'), previously defined at the end of > kvm_tests.cfg, should now be defined last, after kvm_hosts.cfg. > Test sets no longer select only the tests to execute, but also > where each test should be executed (i.e. on what host/queue). > > The final result of parsing the config files is a list of tests, each > with its own 'hostname' and 'queue' parameters. Each host executes > only the tests whose 'hostname' parameter matches the current host, > and puts tests with different 'queue' values in parallel pipelines > of execution. > > Ideally, the Autotest server should take care of assigning tests to > hosts automatically, but there are still a few technical difficulties > to be resolved before we can implement that. We're considering the > current suggestion as a temporary solution until a better one is > found. > > Basically, the advantages are: > - allows the user full control over what tests run, and where/how they run > - takes care of assigning MAC address ranges to different hosts/queues (required for TAP networking) > - can be used from the server or with the client, which makes it relevant also for users who don't have an Autotest server installed > - involves only minor code changes (except for the config files) > - is pretty much the simplest possible solution (and simple is good) > > Drawbacks: > - requires some initial work to be done by the user -- the user has to define exactly where each test should run > - test sets need to be modified when tests or hosts are added/removed, to include/exclude them I took a slightly different approach. The kvm_tests.cfg file already provides a dependency relationship between different tests. I modified the main loop in the control file to walk the entire list of jobs and pull out any jobs that don't have any dependencies (ie, install tests). And then run N jobs in parallel from that list until it is exhausted; then store the results. Then loop the over the remaining list of jobs again finding the jobs that can be run. On a larger multi core system, one might set the number of parallel jobs equal to the number of cores. I think this works well with using autoserv to farm out different kvm_tests.cfg to different machines. Attaching my stale patch just for comment. Needs to be updated since I sat on this for a while. There were a number of issues: - kvm_log is a shared resource, fixed it up so parallel jobs can both call it - vnc, redir and other network resources are shared, so, in kvm_tests.cfg file each job needs a parallel offset. - in kvm_tests.cfg file need to define additional vm and nic objects, one for each parallel threads. Advantages: - works a lot like the single threaded model does, and if threads=1 then it runs the same path - config files don't change significantly, just some additional VM objects at the top and some offset values - transparent to an autoserv setup, autoserv would just need to specify the kvm_tests.cfg file to each host. Disadvantages: - the main loop waits for each group of parallel jobs to complete before starting any more. If somehow an install is mixed with a reboot test, we'll wait around before starting more jobs - probably a few more here, but I don't have them on the top of my head. -- Ryan Harper Software Engineer; Linux Technology Center IBM Corp., Austin, Tx ryanh@xxxxxxxxxx
diff --git a/client/bin/job.py b/client/bin/job.py index 6ed5b36..d772488 100755 --- a/client/bin/job.py +++ b/client/bin/job.py @@ -751,21 +751,27 @@ class base_job(object): """Run tasks in parallel""" pids = [] + pid_to_task = {} + return_vals = {} old_log_filename = self.log_filename for i, task in enumerate(tasklist): assert isinstance(task, (tuple, list)) self.log_filename = old_log_filename + (".%d" % i) task_func = lambda: task[0](*task[1:]) - pids.append(parallel.fork_start(self.resultdir, task_func)) + pid = parallel.fork_start(self.resultdir, task_func) + pids.append(pid) + pid_to_task[pid] = i old_log_path = os.path.join(self.resultdir, old_log_filename) old_log = open(old_log_path, "a") exceptions = [] for i, pid in enumerate(pids): # wait for the task to finish + task = pid_to_task[pid] try: - parallel.fork_waitfor(self.resultdir, pid) + return_vals[task] = parallel.fork_waitfor(self.resultdir, pid) except Exception, e: + return_vals[task] = 1 exceptions.append(e) # copy the logs from the subtask into the main log new_log_path = old_log_path + (".%d" % i) @@ -784,6 +790,9 @@ class base_job(object): msg = "%d task(s) failed in job.parallel" % len(exceptions) raise error.JobError(msg) + # send back array True|False on success or failure + return map(lambda x: x == 0, return_vals.values()) + def quit(self): # XXX: should have a better name. diff --git a/client/bin/parallel.py b/client/bin/parallel.py index e8749db..e381c89 100644 --- a/client/bin/parallel.py +++ b/client/bin/parallel.py @@ -45,3 +45,4 @@ def fork_waitfor(tmp, pid): if (status != 0): raise error.TestError("test failed rc=%d" % (status)) + return status diff --git a/client/tests/kvm_runtest_2/control b/client/tests/kvm_runtest_2/control index cb27821..1251e94 100644 --- a/client/tests/kvm_runtest_2/control +++ b/client/tests/kvm_runtest_2/control @@ -1,4 +1,4 @@ -import sys, os +import sys, os, copy # enable modules import from current directory (tests/kvm_runtest_2) pwd = os.path.join(os.environ['AUTODIR'],'tests/kvm_runtest_2') @@ -82,24 +82,80 @@ list = kvm_config.config(filename).get_list() # ------------- # Run the tests # ------------- +threads = 1 status_dict = {} - -for dict in list: - if dict.get("skip") == "yes": - continue - dependencies_satisfied = True - for dep in dict.get("depend"): - for test_name in status_dict.keys(): - if not dep in test_name: - continue - if not status_dict[test_name]: +current_list = copy.deepcopy(list) + +while len(current_list) > 0: + parallel_jobs = [] + for dict in current_list: + name = dict['name'] + if dict.get("skip") == "yes": + continue + dependencies_satisfied = True + # it's ok to ignore deps that we won't run, but not ok to ignore + # deps that have failed + for dep in dict.get("depend"): + # if we're waiting for the job to be run, deps aren't met + if dep in map(lambda x: x['name'], parallel_jobs): dependencies_satisfied = False break - if dependencies_satisfied: - current_status = job.run_test("kvm_runtest_2", params=dict, tag=dict.get("shortname")) - else: - current_status = False - status_dict[dict.get("name")] = current_status + for test_name in status_dict.keys(): + if not dep in test_name: + continue + if not status_dict[test_name]: + dependencies_satisfied = False + break + + # add the job to parallel list if we can run it *now* + if dependencies_satisfied == True: + parallel_jobs.append(dict) + current_list = filter(lambda x: x['name'] != name, current_list) + + + print '**************WARK************* %d jobs to run in parallel' %(len(parallel_jobs)) + # now, we can run the current parallel jobs in sets of N tests where + # N is the number of parallel jobs allowed. + for g in range(0,len(parallel_jobs), threads): + group = parallel_jobs[g:g+threads] + names = map(lambda x: x['name'], group) + + # HACK - my python fu is not great enough to get around exec/eval to + # dynamically define functions which are needed for job.parallel + j = [] + k = [] + for i,dict in enumerate(group): + # reset vm and main_vm list into each dict + dict['vms'] = "vm%s" % i + dict['main_vm'] = "vm%s" % i + # FIXME: should be # of threads * index + dict['parallel_offset'] = 2*i + print dict + # create unique dict name and make a copy of dict + pdict_name = "dict_vm%s"%(i) + print pdict_name + exec "%s = copy.deepcopy(dict)" %(pdict_name) + tag = dict.get("shortname") + sdtag = 'parallel_%s' % i + defstr = 'def j%s():\n job.run_test("kvm_runtest_2", params=%s, tag="%s", subdir_tag="%s")' % (i,pdict_name, tag,sdtag) + # define dummy function for this job + exec defstr + eval("j.append(j%s)" % i) + k.append(dict.get("shortname")) + + print "***************** WARK ************* parallel: %s"%(j) + print "***************** WARK ************* parallel: %s"%(map(lambda x: [x], k)) + + # need to wrap each function in the array with [], then use * to unpack the list + # as variable arg list to job.parallel + current_status = job.parallel(*map(lambda x: [x], j)) + #current_status = job.parallel([j[0]], [j[1]]) + print "status: %s"%(current_status) + + # update status of each job from parallel run + for i,dict in enumerate(group): + status_dict[dict.get("name")] = current_status[i] + # create the html report in result dir reporter = os.path.join(pwd, 'make_html_report.py') diff --git a/client/tests/kvm_runtest_2/kvm_log.py b/client/tests/kvm_runtest_2/kvm_log.py index ff1dfe9..06dc47d 100644 --- a/client/tests/kvm_runtest_2/kvm_log.py +++ b/client/tests/kvm_runtest_2/kvm_log.py @@ -5,10 +5,9 @@ version = "20081205" import time import sys +import os -prefix_stack = [""] -log_level = 4 - +pids = {} def name_of_caller(): return sys._getframe(2).f_code.co_name @@ -28,24 +27,50 @@ def format_stack(min_depth, max_depth, stop_func): list.reverse() return "|".join(list) -def set_prefix(prefix): - global prefix_stack - prefix_stack.append(" %s" % prefix) +def get_prefix(pid): + global pids + if (pids.has_key(pid) and not pids[pid].has_key('prefix')) or not pids.has_key(pid): + pids[pid] = {} + default_prefix = " [%s]" % pid + pids[pid]['prefix'] = [default_prefix] + return pids[pid]['prefix'] + +def set_prefix(pid, prefix): + global pids + p = get_prefix(pid) + p.append(" %s" % prefix) + pids[pid]['prefix'] = p + +def restore_prefix(pid): + global pids + if pid in pids.keys(): + pids[pid]['prefix'].pop() -def restore_prefix(): - global prefix_stack - prefix_stack.pop() +def get_level(pid): + global pids + if (pids.has_key(pid) and not pids[pid].has_key('loglevel')) or not pids.has_key(pid): + pids[pid] = {} + pids[pid]['loglevel'] = 4 -def set_level(level): - global log_level - log_level = level + return pids[pid]['loglevel'] + +def set_level(pid, level): + global pids + l = get_level(pid) + l = level + pids[pid]['loglevel'] = l def log(level, message, prefix=True): + global pids + mypid = os.getpid() + prefix = get_prefix(mypid) + loglevel = get_level(mypid) + if message.endswith("\n"): message = message[:-1] - if level <= log_level: + if level <= loglevel: if prefix: - print "%s%s: %s" % (time.strftime("%Y%m%d-%H%M%S"), prefix_stack[-1], message) + print "%s%s: %s" % (time.strftime("%Y%m%d-%H%M%S"), prefix[-1], message) else: print message diff --git a/client/tests/kvm_runtest_2/kvm_runtest_2.py b/client/tests/kvm_runtest_2/kvm_runtest_2.py index c53877f..c6504ba 100644 --- a/client/tests/kvm_runtest_2/kvm_runtest_2.py +++ b/client/tests/kvm_runtest_2/kvm_runtest_2.py @@ -53,7 +53,10 @@ class kvm_runtest_2(test.test): resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) # Set the logging prefix - kvm_log.set_prefix(params.get("shortname")) + tag = params.get("shortname") + mypid = os.getpid() + print '[%s] ************************** WARK ******************* i am tag=%s\n' %(mypid, tag) + kvm_log.set_prefix(mypid, tag) # Report the parameters we've received and write them as keyvals kvm_log.debug("Test parameters:") @@ -65,6 +68,7 @@ class kvm_runtest_2(test.test): # Open the environment file env_filename = os.path.join(self.bindir, "env") + kvm_log.debug("using env file: %s" %(env_filename)) env = shelve.open(env_filename, writeback=True) kvm_log.debug("Contents of environment: %s" % str(env)) diff --git a/client/tests/kvm_runtest_2/kvm_vm.py b/client/tests/kvm_runtest_2/kvm_vm.py index fa68ce4..90ca21a 100644 --- a/client/tests/kvm_runtest_2/kvm_vm.py +++ b/client/tests/kvm_runtest_2/kvm_vm.py @@ -271,6 +271,9 @@ class VM: kvm_log.error("Actual MD5 sum differs from expected one") return False + # store parallel offset + self.port_offset = int(params.get("parallel_offset", 0)) + # Find available monitor filename while True: # The monitor filename should be unique @@ -281,7 +284,7 @@ class VM: # Handle port redirections redir_names = kvm_utils.get_sub_dict_names(params, "redirs") - host_ports = kvm_utils.find_free_ports(5000, 6000, len(redir_names)) + host_ports = kvm_utils.find_free_ports(self.port_offset+5000, 6000, len(redir_names)) self.redirs = {} for i in range(len(redir_names)): redir_params = kvm_utils.get_sub_dict(params, redir_names[i]) @@ -290,7 +293,7 @@ class VM: # Find available VNC port, if needed if params.get("display") == "vnc": - self.vnc_port = kvm_utils.find_free_port(5900, 6000) + self.vnc_port = kvm_utils.find_free_port(self.port_offset+5900, 6000) # Make qemu command qemu_command = self.make_qemu_command() @@ -298,7 +301,7 @@ class VM: # Is this VM supposed to accept incoming migrations? if for_migration: # Find available migration port - self.migration_port = kvm_utils.find_free_port(5200, 6000) + self.migration_port = kvm_utils.find_free_port(self.port_offset+5200, 6000) # Add -incoming option to the qemu command qemu_command += " -incoming tcp:0:%d" % self.migration_port