ACK and Pushed Thanks hongming On 02/15/2015 04:12 PM, jiahu wrote:
2 new APIs securityLabel and securityLabelList will be covered in securitylabel.py --- repos/domain/securitylabel.py | 170 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 repos/domain/securitylabel.py diff --git a/repos/domain/securitylabel.py b/repos/domain/securitylabel.py new file mode 100644 index 0000000..cf4aaf3 --- /dev/null +++ b/repos/domain/securitylabel.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python +# test securityLabel() and securityLabelList() API for libvirt + +import libvirt + +from libvirt import libvirtError +from src import sharedmod +from utils import utils + +required_params = ('guestname',) +optional_params = {} + +def check_qemu_conf(logger): + """ + If security_driver is not equal to "selinux", report an error + """ + GREP = "grep \"^security_driver\" /etc/libvirt/qemu.conf" + status, output = utils.exec_cmd(GREP, shell=True) + if status: + return True + else: + if "selinux" in output[0]: + return True + else: + logger.error("Not a default setting in qemu.conf") + return False + +def get_security_policy(logger): + """ + get selinux type from host OS + """ + SELINUX = "getenforce" + status, output = utils.exec_cmd(SELINUX, shell=True) + if not status: + if output[0] == "Enforcing": + sevalue = True + elif output[0] == "Permissive": + sevalue = False + elif output[0] == "Disabled": + sevalue = False + else: + logger.error("Can not find any results") + else: + logger.error("\"" + SELINUX + "\"" + "error") + logger.error(output) + return False + return sevalue + +def get_pid(name,logger): + """ + get process id of specified domain. + """ + PID = "ps aux |grep -v grep | grep \" -name %s\" \ + |awk '{print $2}'" + status, output = utils.exec_cmd(PID % name, shell=True) + if not status: + pass + else: + logger.error("\"" + PID + "\"" + "error") + logger.error(output) + return False + return output[0] + +def get_pid_context(domain,logger): + """ + return context of domain's pid + """ + pid = get_pid(domain,logger) + CONTEXT = "ls -nZd /proc/%s" + status, output = utils.exec_cmd(CONTEXT % pid, shell=True) + if not status: + pass + else: + logger.error("\"" + CONTEXT + "\"" + "error") + logger.error(output) + return False + return pid,output[0] + +def check_selinux_label(api,domain,logger): + """ + check vaules in selinux mode + """ + pid,context = get_pid_context(domain,logger) + logger.debug("The context of %d is %s" % (int(pid), context)) + get_enforce = get_security_policy(logger) + if api[0] in context: + if api[1] == get_enforce: + logger.debug("PASS: '%s'" % api) + return True + else: + logger.debug("Fail: '%s'" % api[1]) + return False + else: + logger.debug("Fail: '%s'" % api[0]) + return False + +def check_DAC_label(api,domain,logger): + """ + check vaules in DAC mode + """ + tmp = [] + pid,context = get_pid_context(domain,logger) + logger.debug("The context of %d is %s" % (int(pid), context)) + #enforcing is always false in DAC mode + for item in api: + tmp.append(item) + get_enforce = False + tmp1 = tmp[0].strip().replace("+","") + tmp[0] = tmp1.split(':') + tmp1 = context.split() + context = str(tmp1.pop(1) +" "+ tmp1.pop(1)).split() + if tmp[0] == context: + if tmp[1] == get_enforce: + logger.debug("PASS: '%s'" % api) + return True + else: + logger.debug("Fail: '%s'" % api[1]) + return False + else: + logger.debug("Fail: '%s'" % api[0]) + return False + +def securitylabel(params): + """ + test APIs for securityLabel and securityLabelList in class virDomain + """ + logger = params['logger'] + domain_name = params['guestname'] + if not check_qemu_conf(logger): + return 1 + try: + conn = sharedmod.libvirtobj['conn'] + + if conn.lookupByName(domain_name): + dom = conn.lookupByName(domain_name) + else: + logger.error("Domain %s is not exist" % domain_name) + return 1 + if not dom.isActive(): + logger.error("Domain %s is not running" % domain_name) + return 1 + + first_label_api = dom.securityLabel() + logger.info("The first lable is %s" % first_label_api) + + if check_selinux_label(first_label_api, domain_name, logger): + logger.info("PASS, %s" % first_label_api) + else: + logger.error("FAIL, %s" % first_label_api) + return 1 + + all_label_api = dom.securityLabelList() + logger.info("The all lable is %s" % all_label_api) + if check_selinux_label(all_label_api[0], domain_name, logger): + logger.info("PASS, %s" % all_label_api[0]) + else: + logger.error("FAIL, %s" % all_label_api[0]) + return 1 + + if check_DAC_label(all_label_api[1], domain_name, logger): + logger.info("PASS, %s" % all_label_api[1]) + else: + logger.error("FAIL, %s" % all_label_api[1]) + return 1 + + except libvirtError, e: + logger.error("API error message: %s" % e.message) + return 1 + + return 0
-- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list