Add a static-analyzer.py script that uses libclang's Python bindings to provide a common framework on which arbitrary static analysis checks can be developed and run against QEMU's code base. As an example, a simple "return-value-never-used" check is included that verifies that the return value of static, non-void functions is used by at least one caller. Signed-off-by: Alberto Faria <afaria@xxxxxxxxxx> --- static-analyzer.py | 486 +++++++++++++++++++++ static_analyzer/__init__.py | 242 ++++++++++ static_analyzer/return_value_never_used.py | 117 +++++ 3 files changed, 845 insertions(+) create mode 100755 static-analyzer.py create mode 100644 static_analyzer/__init__.py create mode 100644 static_analyzer/return_value_never_used.py diff --git a/static-analyzer.py b/static-analyzer.py new file mode 100755 index 0000000000..3ade422dbf --- /dev/null +++ b/static-analyzer.py @@ -0,0 +1,486 @@ +#!/usr/bin/env python3 +# ---------------------------------------------------------------------------- # + +from configparser import ConfigParser +from contextlib import contextmanager +from dataclasses import dataclass +import json +import os +import os.path +import shlex +import subprocess +import sys +import re +from argparse import ArgumentParser, Namespace, RawDescriptionHelpFormatter +from multiprocessing import Pool +from pathlib import Path +import textwrap +import time +from typing import ( + Iterable, + Iterator, + List, + Mapping, + NoReturn, + Sequence, + Union, +) + +import clang.cindex # type: ignore + +from static_analyzer import CHECKS, CheckContext + +# ---------------------------------------------------------------------------- # +# Usage + + +def parse_args() -> Namespace: + + available_checks = "\n".join( + f" {name} {' '.join((CHECKS[name].__doc__ or '').split())}" + for name in sorted(CHECKS) + ) + + parser = ArgumentParser( + allow_abbrev=False, + formatter_class=RawDescriptionHelpFormatter, + description=textwrap.dedent( + """ + Checks are best-effort, but should never report false positives. + + This only considers translation units enabled under the given QEMU + build configuration. Note that a single .c file may give rise to + several translation units. + + You should build QEMU before running this, since some translation + units depend on files that are generated during the build. If you + don't, you'll get errors, but should never get false negatives. + """ + ), + epilog=textwrap.dedent( + f""" + available checks: + {available_checks} + + exit codes: + 0 No problems found. + 1 Internal failure. + 2 Bad usage. + 3 Problems found in one or more translation units. + """ + ), + ) + + parser.add_argument( + "build_dir", + type=Path, + help="Path to the build directory.", + ) + + parser.add_argument( + "translation_units", + type=Path, + nargs="*", + help=( + "Analyze only translation units whose root source file matches or" + " is under one of the given paths." + ), + ) + + # add regular options + + parser.add_argument( + "-c", + "--check", + metavar="CHECK", + dest="check_names", + choices=sorted(CHECKS), + action="append", + help=( + "Enable the given check. Can be given multiple times. If not given," + " all checks are enabled." + ), + ) + + parser.add_argument( + "-j", + "--jobs", + dest="threads", + type=int, + default=os.cpu_count() or 1, + help=( + f"Number of threads to employ. Defaults to {os.cpu_count() or 1} on" + f" this machine." + ), + ) + + # add development options + + dev_options = parser.add_argument_group("development options") + + dev_options.add_argument( + "--profile", + metavar="SORT_KEY", + help=( + "Profile execution. Forces single-threaded execution. The argument" + " specifies how to sort the results; see" + " https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats" + ), + ) + + dev_options.add_argument( + "--skip-checks", + action="store_true", + help="Do everything except actually running the checks.", + ) + + # parse arguments + + args = parser.parse_args() + args.check_names = sorted(set(args.check_names or CHECKS)) + + return args + + +# ---------------------------------------------------------------------------- # +# Main + + +def main() -> int: + + args = parse_args() + + if args.profile: + + import cProfile + import pstats + + profile = cProfile.Profile() + + try: + return profile.runcall(lambda: analyze(args)) + finally: + stats = pstats.Stats(profile, stream=sys.stderr) + stats.strip_dirs() + stats.sort_stats(args.profile) + stats.print_stats() + + else: + + return analyze(args) + + +def analyze(args: Namespace) -> int: + + tr_units = get_translation_units(args) + + # analyze translation units + + start_time = time.monotonic() + results: List[bool] = [] + + if len(tr_units) == 1: + progress_suffix = " of 1 translation unit...\033[0m\r" + else: + progress_suffix = f" of {len(tr_units)} translation units...\033[0m\r" + + def print_progress() -> None: + print(f"\033[0;34mAnalyzed {len(results)}" + progress_suffix, end="") + + print_progress() + + def collect_results(results_iter: Iterable[bool]) -> None: + if sys.stdout.isatty(): + for r in results_iter: + results.append(r) + print_progress() + else: + for r in results_iter: + results.append(r) + + if tr_units: + + if args.threads == 1: + + collect_results(map(analyze_translation_unit, tr_units)) + + else: + + # Mimic Python's default pool.map() chunk size, but limit it to + # 5 to avoid very big chunks when analyzing thousands of + # translation units. + chunk_size = min(5, -(-len(tr_units) // (args.threads * 4))) + + with Pool(processes=args.threads) as pool: + collect_results( + pool.imap_unordered( + analyze_translation_unit, tr_units, chunk_size + ) + ) + + end_time = time.monotonic() + + # print summary + + if len(tr_units) == 1: + message = "Analyzed 1 translation unit" + else: + message = f"Analyzed {len(tr_units)} translation units" + + message += f" in {end_time - start_time:.1f} seconds." + + print(f"\033[0;34m{message}\033[0m") + + # exit + + return 0 if all(results) else 3 + + +# ---------------------------------------------------------------------------- # +# Translation units + + +@dataclass +class TranslationUnit: + absolute_path: str + build_working_dir: str + build_command: str + system_include_paths: Sequence[str] + check_names: Sequence[str] + + +def get_translation_units(args: Namespace) -> Sequence["TranslationUnit"]: + """Return a list of translation units to be analyzed.""" + + system_include_paths = get_clang_system_include_paths() + compile_commands = load_compilation_database(args.build_dir) + + # get all translation units + + tr_units: Iterable[TranslationUnit] = ( + TranslationUnit( + absolute_path=str(Path(cmd["directory"], cmd["file"]).resolve()), + build_working_dir=cmd["directory"], + build_command=cmd["command"], + system_include_paths=system_include_paths, + check_names=args.check_names, + ) + for cmd in compile_commands + ) + + # ignore translation units from git submodules + + repo_root = (args.build_dir / "Makefile").resolve(strict=True).parent + module_file = repo_root / ".gitmodules" + assert module_file.exists() + + modules = ConfigParser() + modules.read(module_file) + + disallowed_prefixes = [ + # ensure path is slash-terminated + os.path.join(repo_root, section["path"], "") + for section in modules.values() + if "path" in section + ] + + tr_units = ( + ctx + for ctx in tr_units + if all( + not ctx.absolute_path.startswith(prefix) + for prefix in disallowed_prefixes + ) + ) + + # filter translation units by command line arguments + + if args.translation_units: + + allowed_prefixes = [ + # ensure path exists and is slash-terminated (even if it is a file) + os.path.join(path.resolve(strict=True), "") + for path in args.translation_units + ] + + tr_units = ( + ctx + for ctx in tr_units + if any( + (ctx.absolute_path + "/").startswith(prefix) + for prefix in allowed_prefixes + ) + ) + + # ensure that at least one translation unit is selected + + tr_unit_list = list(tr_units) + + if not tr_unit_list: + fatal("No translation units to analyze") + + # disable all checks if --skip-checks was given + + if args.skip_checks: + for context in tr_unit_list: + context.check_names = [] + + return tr_unit_list + + +def get_clang_system_include_paths() -> Sequence[str]: + + # libclang does not automatically include clang's standard system include + # paths, so we ask clang what they are and include them ourselves. + + result = subprocess.run( + ["clang", "-E", "-", "-v"], + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + universal_newlines=True, # decode output using default encoding + check=True, + ) + + # Module `re` does not support repeated group captures. + pattern = ( + r"#include <...> search starts here:\n" + r"((?: \S*\n)+)" + r"End of search list." + ) + + match = re.search(pattern, result.stderr, re.MULTILINE) + assert match is not None + + return [line[1:] for line in match.group(1).splitlines()] + + +def load_compilation_database(build_dir: Path) -> Sequence[Mapping[str, str]]: + + # clang.cindex.CompilationDatabase.getCompileCommands() apparently produces + # entries for files not listed in compile_commands.json in a best-effort + # manner, which we don't want, so we parse the JSON ourselves instead. + + path = build_dir / "compile_commands.json" + + try: + with path.open("r") as f: + return json.load(f) + except FileNotFoundError: + fatal(f"{path} does not exist") + + +# ---------------------------------------------------------------------------- # +# Analysis + + +def analyze_translation_unit(tr_unit: TranslationUnit) -> bool: + + check_context = get_check_context(tr_unit) + + try: + for name in tr_unit.check_names: + CHECKS[name](check_context) + except Exception as e: + raise RuntimeError(f"Error analyzing {check_context._rel_path}") from e + + return not check_context._problems_found + + +def get_check_context(tr_unit: TranslationUnit) -> CheckContext: + + # relative to script's original working directory + rel_path = os.path.relpath(tr_unit.absolute_path) + + # load translation unit + + command = shlex.split(tr_unit.build_command) + + adjusted_command = [ + # keep the original compilation command name + command[0], + # ignore unknown GCC warning options + "-Wno-unknown-warning-option", + # keep all other arguments but the last, which is the file name + *command[1:-1], + # add clang system include paths + *( + arg + for path in tr_unit.system_include_paths + for arg in ("-isystem", path) + ), + # replace relative path to get absolute location information + tr_unit.absolute_path, + ] + + # clang can warn about things that GCC doesn't + if "-Werror" in adjusted_command: + adjusted_command.remove("-Werror") + + # We change directory for options like -I to work, but have to change back + # to have correct relative paths in messages. + with cwd(tr_unit.build_working_dir): + + try: + tu = clang.cindex.TranslationUnit.from_source( + filename=None, args=adjusted_command + ) + except clang.cindex.TranslationUnitLoadError as e: + raise RuntimeError(f"Failed to load {rel_path}") from e + + if sys.stdout.isatty(): + # add padding to fully overwrite progress message + printer = lambda s: print(s.ljust(50)) + else: + printer = print + + check_context = CheckContext( + translation_unit=tu, + translation_unit_path=tr_unit.absolute_path, + _rel_path=rel_path, + _build_working_dir=Path(tr_unit.build_working_dir), + _problems_found=False, + _printer=printer, + ) + + # check for error/fatal diagnostics + + for diag in tu.diagnostics: + if diag.severity >= clang.cindex.Diagnostic.Error: + check_context._problems_found = True + location = check_context.format_location(diag) + check_context._printer( + f"\033[0;33m{location}: {diag.spelling}; this may lead to false" + f" positives and negatives\033[0m" + ) + + return check_context + + +# ---------------------------------------------------------------------------- # +# Utilities + + +@contextmanager +def cwd(path: Union[str, Path]) -> Iterator[None]: + + original_cwd = os.getcwd() + os.chdir(path) + + try: + yield + finally: + os.chdir(original_cwd) + + +def fatal(message: str) -> NoReturn: + print(f"\033[0;31mERROR: {message}\033[0m") + sys.exit(1) + + +# ---------------------------------------------------------------------------- # + +if __name__ == "__main__": + sys.exit(main()) + +# ---------------------------------------------------------------------------- # diff --git a/static_analyzer/__init__.py b/static_analyzer/__init__.py new file mode 100644 index 0000000000..e6ca48d714 --- /dev/null +++ b/static_analyzer/__init__.py @@ -0,0 +1,242 @@ +# ---------------------------------------------------------------------------- # + +from ctypes import CFUNCTYPE, c_int, py_object +from dataclasses import dataclass +from enum import Enum +import os +import os.path +from pathlib import Path +from importlib import import_module +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Union, +) + +from clang.cindex import ( # type: ignore + Cursor, + CursorKind, + TranslationUnit, + SourceLocation, + conf, +) + +# ---------------------------------------------------------------------------- # +# Monkeypatch clang.cindex + +Cursor.__hash__ = lambda self: self.hash # so `Cursor`s can be dict keys + +# ---------------------------------------------------------------------------- # +# Traversal + + +class VisitorResult(Enum): + + BREAK = 0 + """Terminates the cursor traversal.""" + + CONTINUE = 1 + """Continues the cursor traversal with the next sibling of the cursor just + visited, without visiting its children.""" + + RECURSE = 2 + """Recursively traverse the children of this cursor.""" + + +def visit(root: Cursor, visitor: Callable[[Cursor], VisitorResult]) -> bool: + """ + A simple wrapper around `clang_visitChildren()`. + + The `visitor` callback is called for each visited node, with that node as + its argument. `root` is NOT visited. + + Unlike a standard `Cursor`, the callback argument will have a `parent` field + that points to its parent in the AST. The `parent` will also have its own + `parent` field, and so on, unless it is `root`, in which case its `parent` + field is `None`. We add this because libclang's `lexical_parent` field is + almost always `None` for some reason. + + Returns `false` if the visitation was aborted by the callback returning + `VisitorResult.BREAK`. Returns `true` otherwise. + """ + + tu = root._tu + root.parent = None + + # Stores the path from `root` to the node being visited. We need this to set + # `node.parent`. + path: List[Cursor] = [root] + + exception: List[BaseException] = [] + + @CFUNCTYPE(c_int, Cursor, Cursor, py_object) + def actual_visitor(node: Cursor, parent: Cursor, client_data: None) -> int: + + try: + + # The `node` and `parent` `Cursor` objects are NOT reused in between + # invocations of this visitor callback, so we can't assume that + # `parent.parent` is set. + + while path[-1] != parent: + path.pop() + + node.parent = path[-1] + path.append(node) + + # several clang.cindex methods need Cursor._tu to be set + node._tu = tu + + return visitor(node).value + + except BaseException as e: + + # Exceptions can't cross into C. Stash it, abort the visitation, and + # reraise it. + + exception.append(e) + return VisitorResult.BREAK.value + + result = conf.lib.clang_visitChildren(root, actual_visitor, None) + + if exception: + raise exception[0] + + return result == 0 + + +# ---------------------------------------------------------------------------- # +# Node predicates + + +def might_have_attribute(node: Cursor, attr: Union[CursorKind, str]) -> bool: + """ + Check whether any of `node`'s children are an attribute of the given kind, + or an attribute of kind `UNEXPOSED_ATTR` with the given `str` spelling. + + This check is best-effort and may erroneously return `True`. + """ + + if isinstance(attr, CursorKind): + + assert attr.is_attribute() + + def matcher(n: Cursor) -> bool: + return n.kind == attr + + else: + + def matcher(n: Cursor) -> bool: + if n.kind != CursorKind.UNEXPOSED_ATTR: + return False + tokens = list(n.get_tokens()) + # `tokens` can have 0 or more than 1 element when the attribute + # comes from a macro expansion. AFAICT, in that case we can't do + # better here than tell callers that this might be the attribute + # that they're looking for. + return len(tokens) != 1 or tokens[0].spelling == attr + + return any(map(matcher, node.get_children())) + + +# ---------------------------------------------------------------------------- # +# Checks + + +@dataclass +class CheckContext: + + translation_unit: TranslationUnit + translation_unit_path: str # exactly as reported by libclang + + _rel_path: str + _build_working_dir: Path + _problems_found: bool + + _printer: Callable[[str], None] + + def format_location(self, obj: Any) -> str: + """obj must have a location field/property that is a + `SourceLocation`.""" + return self._format_location(obj.location) + + def _format_location(self, loc: SourceLocation) -> str: + + if loc.file is None: + return self._rel_path + else: + abs_path = (self._build_working_dir / loc.file.name).resolve() + rel_path = os.path.relpath(abs_path) + return f"{rel_path}:{loc.line}:{loc.column}" + + def report(self, node: Cursor, message: str) -> None: + self._problems_found = True + self._printer(f"{self.format_location(node)}: {message}") + + def print_node(self, node: Cursor) -> None: + """This can be handy when developing checks.""" + + print(f"{self.format_location(node)}: kind = {node.kind.name}", end="") + + if node.spelling: + print(f", spelling = {node.spelling!r}", end="") + + if node.type is not None: + print(f", type = {node.type.get_canonical().spelling!r}", end="") + + if node.referenced is not None: + print(f", referenced = {node.referenced.spelling!r}", end="") + + start = self._format_location(node.extent.start) + end = self._format_location(node.extent.end) + print(f", extent = {start}--{end}") + + def print_tree( + self, + node: Cursor, + *, + max_depth: Optional[int] = None, + indentation_level: int = 0, + ) -> None: + """This can be handy when developing checks.""" + + if max_depth is None or max_depth >= 0: + + print(" " * indentation_level, end="") + self.print_node(node) + + for child in node.get_children(): + self.print_tree( + child, + max_depth=None if max_depth is None else max_depth - 1, + indentation_level=indentation_level + 1, + ) + + +Checker = Callable[[CheckContext], None] + +CHECKS: Dict[str, Checker] = {} + + +def check(name: str) -> Callable[[Checker], Checker]: + def decorator(checker: Checker) -> Checker: + assert name not in CHECKS + CHECKS[name] = checker + return checker + + return decorator + + +# ---------------------------------------------------------------------------- # +# Import all checks + +for path in Path(__file__).parent.glob("**/*.py"): + if path.name != "__init__.py": + rel_path = path.relative_to(Path(__file__).parent) + module = "." + ".".join([*rel_path.parts[:-1], rel_path.stem]) + import_module(module, __package__) + +# ---------------------------------------------------------------------------- # diff --git a/static_analyzer/return_value_never_used.py b/static_analyzer/return_value_never_used.py new file mode 100644 index 0000000000..05c06cd4c2 --- /dev/null +++ b/static_analyzer/return_value_never_used.py @@ -0,0 +1,117 @@ +# ---------------------------------------------------------------------------- # + +from typing import Dict + +from clang.cindex import ( # type: ignore + Cursor, + CursorKind, + StorageClass, + TypeKind, +) + +from static_analyzer import ( + CheckContext, + VisitorResult, + check, + might_have_attribute, + visit, +) + +# ---------------------------------------------------------------------------- # + + +@check("return-value-never-used") +def check_return_value_never_used(context: CheckContext) -> None: + """Report static functions with a non-void return value that no caller ever + uses.""" + + # Maps canonical function `Cursor`s to whether we found a place that maybe + # uses their return value. Only includes static functions that don't return + # void, don't have __attribute__((unused)), and belong to the translation + # unit's root file (i.e., were not brought in by an #include). + funcs: Dict[Cursor, bool] = {} + + def visitor(node: Cursor) -> VisitorResult: + + if ( + node.kind == CursorKind.FUNCTION_DECL + and node.storage_class == StorageClass.STATIC + and node.location.file.name == context.translation_unit_path + and node.type.get_result().get_canonical().kind != TypeKind.VOID + and not might_have_attribute(node, "unused") + ): + funcs.setdefault(node.canonical, False) + + if ( + node.kind == CursorKind.DECL_REF_EXPR + and node.referenced.kind == CursorKind.FUNCTION_DECL + and node.referenced.canonical in funcs + and function_occurrence_might_use_return_value(node) + ): + funcs[node.referenced.canonical] = True + + return VisitorResult.RECURSE + + visit(context.translation_unit.cursor, visitor) + + for (cursor, return_value_maybe_used) in funcs.items(): + if not return_value_maybe_used: + context.report( + cursor, f"{cursor.spelling}() return value is never used" + ) + + +def function_occurrence_might_use_return_value(node: Cursor) -> bool: + + parent = get_parent_if_unexposed_expr(node.parent) + + if parent.kind.is_statement(): + + return False + + elif ( + parent.kind == CursorKind.CALL_EXPR + and parent.referenced == node.referenced + ): + + grandparent = get_parent_if_unexposed_expr(parent.parent) + + if not grandparent.kind.is_statement(): + return True + + if grandparent.kind in [ + CursorKind.IF_STMT, + CursorKind.SWITCH_STMT, + CursorKind.WHILE_STMT, + CursorKind.DO_STMT, + CursorKind.RETURN_STMT, + ]: + return True + + if grandparent.kind == CursorKind.FOR_STMT: + + [*for_parts, for_body] = grandparent.get_children() + if len(for_parts) == 0: + return False + elif len(for_parts) in [1, 2]: + return True # call may be in condition part of for loop + elif len(for_parts) == 3: + # Comparison doesn't work properly with `Cursor`s originating + # from nested visitations, so we compare the extent instead. + return parent.extent == for_parts[1].extent + else: + assert False + + return False + + else: + + # might be doing something with a pointer to the function + return True + + +def get_parent_if_unexposed_expr(node: Cursor) -> Cursor: + return node.parent if node.kind == CursorKind.UNEXPOSED_EXPR else node + + +# ---------------------------------------------------------------------------- # -- 2.37.1