From: Tomas Glozar <tglozar@xxxxxxxxxx> Add a sysinfo module for collecting the tuned state. Three properties are collected: - whether tuned is present - what the active tuned profile is - whether tuned profile verification passes In case of a failed verification, the tuned log is also collected. Signed-off-by: Tomas Glozar <tglozar@xxxxxxxxxx> --- rteval/sysinfo/__init__.py | 5 +- rteval/sysinfo/tuned.py | 180 +++++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 rteval/sysinfo/tuned.py diff --git a/rteval/sysinfo/__init__.py b/rteval/sysinfo/__init__.py index 09af52e..4b7b03c 100644 --- a/rteval/sysinfo/__init__.py +++ b/rteval/sysinfo/__init__.py @@ -15,10 +15,11 @@ from rteval.sysinfo.memory import MemoryInfo from rteval.sysinfo.osinfo import OSInfo from rteval.sysinfo.newnet import NetworkInfo from rteval.sysinfo.cmdline import cmdlineInfo +from rteval.sysinfo.tuned import TunedInfo from rteval.sysinfo import dmi class SystemInfo(KernelInfo, SystemServices, dmi.DMIinfo, CPUtopology, - MemoryInfo, OSInfo, NetworkInfo, cmdlineInfo): + MemoryInfo, OSInfo, NetworkInfo, cmdlineInfo, TunedInfo): def __init__(self, config, logger=None): self.__logger = logger KernelInfo.__init__(self, logger=logger) @@ -28,6 +29,7 @@ class SystemInfo(KernelInfo, SystemServices, dmi.DMIinfo, CPUtopology, OSInfo.__init__(self, logger=logger) cmdlineInfo.__init__(self, logger=logger) NetworkInfo.__init__(self, logger=logger) + TunedInfo.__init__(self, logger=logger) # Parse initial DMI decoding errors self.ProcessWarnings() @@ -49,6 +51,7 @@ class SystemInfo(KernelInfo, SystemServices, dmi.DMIinfo, CPUtopology, report_n.addChild(MemoryInfo.MakeReport(self)) report_n.addChild(dmi.DMIinfo.MakeReport(self)) report_n.addChild(cmdlineInfo.MakeReport(self)) + report_n.addChild(TunedInfo.MakeReport(self)) return report_n diff --git a/rteval/sysinfo/tuned.py b/rteval/sysinfo/tuned.py new file mode 100644 index 0000000..e41bd4d --- /dev/null +++ b/rteval/sysinfo/tuned.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Copyright 2024 Tomas Glozar <tglozar@xxxxxxxxxx> +# +import libxml2 +import shutil +import subprocess +import sys + +from rteval.Log import Log + +TUNED_ADM = "tuned-adm" +TUNED_LOG_PATH = "/var/log/tuned/tuned.log" +TUNED_VERIFY_START_LINE = "INFO tuned.daemon.daemon: verifying " \ + "profile(s): realtime" + + +def tuned_present(): + """ + Checks if tuned is present on the system + :return: True if tuned is present, False otherwise + """ + return shutil.which(TUNED_ADM) is not None + + +def tuned_active_profile(): + """ + Gets tuned active profile. + :return: Tuned profile (as a string) or "unknown" + """ + try: + result = subprocess.check_output([TUNED_ADM, "active"]) + except (OSError, subprocess.CalledProcessError): + return "unknown" + result = result.decode("utf-8") + split_result = result.split(": ") + if len(split_result) < 2: + return "unknown" + return split_result[1].strip() + + +def tuned_verify(): + """ + Verifies if tuned profile is applied properly + :return: "success", "failure" or "unknown" + """ + try: + result = subprocess.run([TUNED_ADM, "verify"], + stdout=subprocess.PIPE).stdout + except (OSError, subprocess.CalledProcessError): + return "unknown" + result = result.decode("utf-8") + if result.startswith("Verification succeeded"): + return "success" + elif result.startswith("Verification failed"): + return "failure" + return "unknown" + + +def tuned_get_log(): + """ + Read entries related to last profile verification from tuned log + :return: List of strings containing the entires, or None if no + verification is found in the log + """ + try: + with open(TUNED_LOG_PATH, "r") as file: + lines = file.readlines() + # Find start of last verification + start = None + for i in reversed(range(len(lines))): + if TUNED_VERIFY_START_LINE in lines[i]: + start = i + break + if start is None: + return None + return lines[start:] + except OSError: + return None + + +class TunedInfo: + def __init__(self, logger=None): + self.__logger = logger + self.__present = False + + def __log(self, logtype, msg): + if self.__logger: + self.__logger.log(logtype, msg) + + def tuned_state_get(self): + """ + Gets the state of tuned on the machine. + :return: A dictionary describing the tuned state + """ + result = { + "present": tuned_present() + } + if not result["present"]: + self.__log(Log.DEBUG, "tuned-adm not found; skipping tuned " + "sysinfo collection") + return result + result["active_profile"] = tuned_active_profile() + if result["active_profile"] == "unknown": + self.__log(Log.DEBUG, "could not retrieve tuned active profile") + return result + result["verified"] = tuned_verify() + if result["verified"] == "unknown": + self.__log(Log.DEBUG, "could not verify tuned state") + if result["verified"] == "failure": + # Include log to see cause to failure + result["verification_log"] = tuned_get_log() + + return result + + def MakeReport(self): + tuned = self.tuned_state_get() + + rep_n = libxml2.newNode("Tuned") + rep_n.newProp("present", str(int(tuned["present"]))) + for key, value in tuned.items(): + if key == "present": + continue + child = libxml2.newNode(key) + if key == "verification_log": + if value is None: + self.__log(Log.WARN, "could not get verification log") + continue + for line in value: + # <date> <time> <log-level> <message> + line = line.split(" ", 3) + if len(line) != 4: + continue + line_child = libxml2.newNode("entry") + line_child.newProp("date", line[0]) + line_child.newProp("time", line[1]) + line_child.newProp("level", line[2]) + line_child.setContent(line[3].strip()) + child.addChild(line_child) + else: + child.setContent(value) + rep_n.addChild(child) + + return rep_n + + +def unit_test(rootdir): + try: + # Helper function tests + result = tuned_present() + print("tuned present:", result) + assert isinstance(result, bool), "__tuned_present() should return bool" + result = tuned_active_profile() + print("tuned active profile:", result) + assert isinstance(result, str), "__tuned_active_profile() should " \ + "return string" + result = tuned_verify() + print("tuned verification state:", result) + assert isinstance(result, str), "__tuned_verify() should return string" + result = tuned_get_log() + assert isinstance(result, list) or result is None, \ + "__tuned_get_log() should return list or None" + + # Class tests + tuned = TunedInfo() + result = tuned.tuned_state_get() + print(result) + assert isinstance(result, dict), "TunedInfo.tuned_state_get() " \ + "should return dict" + tuned.MakeReport() + + return 0 + except Exception as err: + print(f"** EXCEPTION: {str(err)}") + return 1 + + +if __name__ == '__main__': + sys.exit(unit_test(None)) -- 2.45.1