These tests aim to verify the output of virt-bootstrap by creating tar archives which contain dummy root file system and call the function bootstrap(). The check the extracted root file system. --- tests/__init__.py | 439 +++++++++++++++++++++++++++++++++++++++++++++++++ tests/docker_source.py | 243 ++++++++++++++++++++++++++- tests/file_source.py | 78 +++++++++ 3 files changed, 757 insertions(+), 3 deletions(-) create mode 100644 tests/file_source.py diff --git a/tests/__init__.py b/tests/__init__.py index 1b06616..9bbae53 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -20,8 +20,18 @@ Test suite for virt-bootstrap """ +import copy +import hashlib +import io +import os +import shutil import sys +import tarfile +import tempfile import unittest +import passlib.hosts + +import guestfs try: import mock @@ -37,3 +47,432 @@ from virtBootstrap import progress from virtBootstrap import utils __all__ = ['virt_bootstrap', 'sources', 'progress', 'utils'] + + +DEFAULT_FILE_MODE = 0o755 +SHADOW_FILE_MODE = 0o600 +NOT_ROOT = (os.geteuid() != 0) + +ROOTFS_TREE = { + 'root': { + 'uid': 0, + 'gid': 0, + 'dirs': [ + 'bin', 'etc', 'home', 'lib', 'opt', 'root', 'run', 'sbin', + 'srv', 'tmp', 'usr', 'var' + ], + 'files': [ + 'etc/hosts', + 'etc/fstab', + ( + 'etc/shadow', + SHADOW_FILE_MODE, + 'root:*::0:99999:7:::' + ) + ] + }, + 'user1': { + 'uid': 500, + 'gid': 500, + 'dirs': ['home/user1'], + 'files': [ + ('home/user1/test_file', 0o644, 'test data') + ] + }, + + 'user2': { + 'uid': 1000, + 'gid': 1000, + 'dirs': [ + 'home/user2', + 'home/user2/test_dir' + ], + 'files': [ + 'home/user2/test_dir/test_file' + ] + } +} + + +# pylint: disable=invalid-name,too-many-arguments +class BuildTarFiles(object): + """ + Create dummy tar files used for testing. + """ + + def __init__(self, tar_dir, rootfs_tree=None): + """ + Create dummy tar files + """ + self.tar_dir = tar_dir + self.rootfs_tree = rootfs_tree or copy.deepcopy(ROOTFS_TREE) + + def create_tar_file(self): + """ + Use temporary name to create uncompressed tarball with dummy file + system. Get checksum of the content and rename the file to + "<checksum>.tar". In this way, we can easily generate manifest for + Docker image and provide it to virt-bootstrap. + """ + filepath = tempfile.mkstemp(dir=self.tar_dir)[1] + with tarfile.open(filepath, 'w') as tar: + self.create_user_dirs(tar) + # Get sha256 checksum of the archive + with open(filepath, 'rb') as file_handle: + file_hash = hashlib.sha256(file_handle.read()).hexdigest() + # Rename the archive to <checksum>.tar + new_filepath = os.path.join(self.tar_dir, "%s.tar" % file_hash) + os.rename(filepath, new_filepath) + os.chmod(new_filepath, 0o644) + return new_filepath + + def create_user_dirs(self, tar_handle): + """ + Create root file system tree in tar archive. + """ + tar_members = [ + ['dirs', tarfile.DIRTYPE], + ['files', tarfile.REGTYPE], + ] + + for user in self.rootfs_tree: + for members, tar_type in tar_members: + self.create_tar_members( + tar_handle, + self.rootfs_tree[user][members], + tar_type, + uid=self.rootfs_tree[user]['uid'], + gid=self.rootfs_tree[user]['gid'] + ) + + def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0): + """ + Add members to tar file. + """ + for member_name in members: + member_data = '' + permissions = DEFAULT_FILE_MODE + if isinstance(member_name, tuple): + if len(member_name) == 3: + member_data = member_name[2] + member_name, permissions = member_name[:2] + data_encoded = member_data.encode('utf-8') + + t_info = tarfile.TarInfo(member_name) + t_info.type = m_type + t_info.mode = permissions + t_info.uid = uid + t_info.gid = gid + t_info.size = len(data_encoded) + + tar_handle.addfile(t_info, io.BytesIO(data_encoded)) + + +class ImageAccessor(unittest.TestCase): + """ + The perpose of this class is to gather methods used to verify content + of extracted root file system. This class can be exteded for different + test cases. + """ + def setUp(self): + """ + Set initial values, create temporary directories and tar archive which + contains dummy root file system. + """ + self.dest_dir = tempfile.mkdtemp('_bootstrap_dest') + self.tar_dir = tempfile.mkdtemp('_bootstrap_tarfiles') + # Set permissions of temporary directories to avoid "Permission denied" + # error from Libvirt. + os.chmod(self.dest_dir, 0o755) + os.chmod(self.tar_dir, 0o755) + self.uid_map = [] + self.gid_map = [] + self.root_password = None + self.checked_members = set() + self.rootfs_tree = copy.deepcopy(ROOTFS_TREE) + # Create tar archive + self.tar_file = BuildTarFiles(self.tar_dir).create_tar_file() + + def tearDown(self): + """ + Clean up. + """ + shutil.rmtree(self.dest_dir) + shutil.rmtree(self.tar_dir) + + def apply_mapping(self): + """ + This method applies UID/GID mapping to all users defined in + self.rootfs_tree. + """ + + for user in self.rootfs_tree: + user_uid = self.rootfs_tree[user]['uid'] + user_gid = self.rootfs_tree[user]['gid'] + + if self.uid_map: + for start, target, count in self.uid_map: + if user_uid >= start and user_uid <= start + count: + diff = user_uid - start + self.rootfs_tree[user]['uid'] = target + diff + + if self.gid_map: + for start, target, count in self.gid_map: + if user_gid >= start and user_gid <= start + count: + diff = user_gid - start + self.rootfs_tree[user]['gid'] = target + diff + + def check_rootfs(self, skip_ownership=False): + """ + Check if the root file system was extracted correctly. + """ + existence_check_func = { + 'files': os.path.isfile, + 'dirs': os.path.isdir + } + for user in self.rootfs_tree: + for m_type in existence_check_func: + self.check_rootfs_members( + user, + m_type, + existence_check_func[m_type], + skip_ownership + ) + + def check_rootfs_members(self, user, members, check_existence, + skip_ownership=False): + """ + Verify permissions, ownership and content of files or + directories in extracted root file system. + + @param user: user name defined in self.rootfs_tree. + @param members: The string 'dirs' or 'files'. + @param check_existence: Function used to check the existence of member. + @param skip_ownership: Boolean whether to skip ownership check. + """ + user_uid = self.rootfs_tree[user]['uid'] + user_gid = self.rootfs_tree[user]['gid'] + + for member_name in self.rootfs_tree[user][members]: + member_data = '' + # Unpack member if tuple. Allow to be specified permissions and + # data for each file. + permissions = DEFAULT_FILE_MODE + if isinstance(member_name, tuple): + if len(member_name) == 3: + member_data = member_name[2] + member_name, permissions = member_name[:2] + + # Skip already checked members. E.g. when multiple layers were + # extracted we want to check only the latest version of file. + if member_name in self.checked_members: + continue + else: + self.checked_members.add(member_name) + + ######################### + # Assertion functions + ######################### + member_path = os.path.join(self.dest_dir, member_name) + self.assertTrue( + check_existence(member_path), + 'Member was not extracted: %s' % member_path + ) + stat = os.stat(member_path) + + self.validate_file_mode(member_path, stat.st_mode, permissions) + + if not skip_ownership: + self.validate_file_ownership( + member_path, stat.st_uid, stat.st_gid, user_uid, user_gid + ) + + if member_data: + with open(member_path, 'r') as content: + file_content = content.read() + + self.assertEqual( + member_data, file_content, + 'Incorrect file content: %s\n' + 'Found: %s\n' + 'Expected: %s' % (member_path, file_content, member_data) + ) + + def validate_shadow_file(self): + """ + Ensure that the extracted /etc/shadow file has correct ownership, + permissions and contains valid hash of the root password. + """ + shadow_path = os.path.join(self.dest_dir, 'etc/shadow') + + self.assertTrue( + os.path.isfile(shadow_path), + 'Does not exist: %s' % shadow_path + ) + stat = os.stat(shadow_path) + + self.validate_file_mode(shadow_path, stat.st_mode, SHADOW_FILE_MODE) + + if not NOT_ROOT: + self.validate_file_ownership( + shadow_path, + stat.st_uid, stat.st_gid, + self.rootfs_tree['root']['uid'], + self.rootfs_tree['root']['gid'] + ) + + with open(shadow_path, 'r') as content: + shadow_content = content.readlines() + if not shadow_content: + raise Exception("File is empty: %s" % shadow_path) + self.validate_shadow_hash(shadow_content) + + def validate_shadow_hash(self, shadow_content): + """ + Validate root password hash of shadow file. + + Note: For simplicity we assume that the first line of /etc/shadow + contains the root entry. + """ + self.assertTrue( + passlib.hosts.linux_context.verify( + self.root_password, + shadow_content[0].split(':')[1] + ), + "Invalid root password hash." + ) + + def validate_file_mode(self, member_name, mode, expected): + """ + Verify permissions of rootfs member. + """ + self.assertEqual( + mode & 0o777, expected, 'Incorrect permissions: %s' % member_name + ) + + def validate_file_ownership(self, member_name, uid, gid, + expected_uid, expected_gid): + """ + Validate UID/GID of rootfs member. + """ + self.assertEqual( + uid, expected_uid, + "Incorrect UID: %s\n" + "Found: %s\n" + "Expected: %s" % (member_name, uid, expected_uid) + ) + self.assertEqual( + gid, expected_gid, + "Incorrect GID: %s\n" + "Found: %s\n" + "Expected: %s" % (member_name, gid, expected_gid,) + ) + + +class Qcow2ImageAccessor(ImageAccessor): + """ + This class gathers methods for verification of root file system content + within extracted qcow2 image. + """ + + def validate_shadow_file_in_image(self, g): + """ + Ensures that /etc/shadow file of disk image has correct permission, + ownership and contains valid hash of the root password. + """ + self.assertTrue( + g.is_file('/etc/shadow'), + "Shadow file does not exist" + ) + + stat = g.stat('/etc/shadow') + + self.validate_file_mode('/etc/shadow', stat['mode'], SHADOW_FILE_MODE) + + self.validate_file_ownership( + '/etc/shadow', + stat['uid'], stat['gid'], + self.rootfs_tree['root']['uid'], + self.rootfs_tree['root']['gid'] + ) + + self.validate_shadow_hash(g.cat('/etc/shadow').split('\n')) + + def check_image_content(self, g, user, members, check_existence): + """ + Verify the existence, permissions and ownership of members in qcow2 + image. + + @param g: guestfs handle + @param user: Name of user defined in self.rootfs_tree + @param members: The string 'dirs' or 'files'. + @param check_existence: Function to confirm existence of member. + """ + permissions = DEFAULT_FILE_MODE + user_uid = self.rootfs_tree[user]['uid'] + user_gid = self.rootfs_tree[user]['gid'] + + for member_name in self.rootfs_tree[user][members]: + # Get specified permissions of file. + if isinstance(member_name, tuple): + member_name, permissions = member_name[:2] + + # Skip already checked files. + if member_name in self.checked_members: + continue + else: + self.checked_members.add(member_name) + + # When using guestfs all names should start with '/' + if not member_name.startswith('/'): + member_name = '/' + member_name + + self.assertTrue( + check_existence(member_name), + "Member was not found: %s" % member_name + ) + stat = g.stat(member_name) + + self.validate_file_mode(member_name, stat['mode'], permissions) + + self.validate_file_ownership( + member_name, stat['uid'], stat['gid'], user_uid, user_gid + ) + + def check_image(self, g): + """ + Check the presence of files and folders in qcow2 image. + """ + for user in self.rootfs_tree: + self.check_image_content(g, user, 'dirs', g.is_dir) + self.check_image_content(g, user, 'files', g.is_file) + + def check_qcow2_image(self, image_path): + """ + Ensures that qcow2 images contain all files. + """ + g = guestfs.GuestFS(python_return_dict=True) + g.add_drive_opts(image_path, readonly=True) + g.launch() + g.mount('/dev/sda', '/') + self.check_image(g) + g.umount('/') + g.shutdown() + + def get_image_path(self, n=0): + """ + Returns the path of stored qcow2 image. + """ + return os.path.join(self.dest_dir, "layer-%d.qcow2" % n) + + def check_qcow2_images(self, image_path): + """ + Ensures that qcow2 images contain all files. + """ + g = guestfs.GuestFS(python_return_dict=True) + g.add_drive_opts(image_path, readonly=True) + g.launch() + g.mount('/dev/sda', '/') + self.check_image(g) + g.umount('/') + g.shutdown() diff --git a/tests/docker_source.py b/tests/docker_source.py index 60404e6..5f9e6fe 100644 --- a/tests/docker_source.py +++ b/tests/docker_source.py @@ -18,15 +18,252 @@ """ Tests which aim is to exercise creation of root file system with DockerSource. + +To avoid fetching network resources we mock out the functions: +- utils.get_image_details(): Returns manifest content +- utils.get_image_dir(): Returns the directory which contains the tar files + +Brief description of this tests: +1. Create dummy tar files named <checksum>.tar used as image layers. +2. Generate manifest content. +3. Mock out get_image_details() and get_image_dir(). +4. Call bootstrap(). +5. Check the result. """ +import copy +import os +import subprocess import unittest +import guestfs from . import mock from . import sources +from . import virt_bootstrap +from . import BuildTarFiles +from . import ImageAccessor +from . import Qcow2ImageAccessor +from . import NOT_ROOT # pylint: disable=invalid-name +class CreateLayers(object): + """ + Create tar files to mimic image layers and generate manifest content. + """ + def __init__(self, initial_tar_file, initial_rootfs_tree, dest_dir): + """ + Create dummy tar files used as image layers. + + The variables: + - layers: Store a lists of paths to created archives. + - layers_rootfs: Store self.rootfs_tree value used to generate tarball. + """ + self.layers = [initial_tar_file] + self.layers_rootfs = [copy.deepcopy(initial_rootfs_tree)] + + tar_builder = BuildTarFiles(dest_dir) + tar_builder.rootfs_tree['root']['dirs'] = [] + tar_builder.rootfs_tree['root']['files'] = [ + ('etc/foo/bar', 0o644, "This should be overwritten") + ] + + self.layers.append(tar_builder.create_tar_file()) + self.layers_rootfs.append(copy.deepcopy(tar_builder.rootfs_tree)) + + tar_builder.rootfs_tree['root']['files'] = [ + ('etc/foo/bar', 0o644, "Content of etc/foo/bar"), + ('bin/foobar', 0o755, "My executable script") + ] + self.layers.append(tar_builder.create_tar_file()) + self.layers_rootfs.append(copy.deepcopy(tar_builder.rootfs_tree)) + + def get_layers_rootfs(self): + """ + Return root file systems used to create layers. + """ + return self.layers_rootfs + + def generate_manifest(self): + """ + Generate Manifest content for layers. + """ + return { + "schemaVersion": 2, + "layers": [ + { + "digest": + "sha256:" + os.path.basename(layer).split('.')[0] + } + for layer in self.layers + ] + } + + +class TestDirDockerSource(ImageAccessor): + """ + Ensures that all layers extracted correctly in destination folder. + """ + def check_result(self, layers_rootfs, dest): + """ + Iterates trough values of layers_rootfs in reverse order (from the last + layer to first) and calls check_extracted_files(). + """ + + def call_bootstrap(self, manifest): + """ + Mock get_image_details() and get_image_dir() and call the function + virt_bootstrap.bootstrap() with root_password value. + """ + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as mocked: + + mocked['get_image_details'].return_value = manifest + mocked['get_image_dir'].return_value = self.tar_dir + + virt_bootstrap.bootstrap( + progress_cb=mock.Mock(), + uri='docker://foo', + fmt='dir', + uid_map=self.uid_map, + gid_map=self.gid_map, + dest=self.dest_dir, + root_password=self.root_password + ) + + def test_dir_extract_rootfs(self): + """ + Ensures that all layers were extracted correctly. + """ + layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir) + self.call_bootstrap(layers.generate_manifest()) + layers_rootfs = layers.get_layers_rootfs() + for rootfs_tree in layers_rootfs[::-1]: + self.rootfs_tree = rootfs_tree + self.check_rootfs(skip_ownership=(os.geteuid != 0)) + + @unittest.skipIf(NOT_ROOT, "Root privileges required") + def test_dir_ownership_mapping(self): + """ + Ensures that the UID/GID mapping was applied correctly to extracted + root file system of all layers. + """ + self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir) + self.call_bootstrap(layers.generate_manifest()) + layers_rootfs = layers.get_layers_rootfs() + for rootfs_tree in layers_rootfs[::-1]: + self.rootfs_tree = rootfs_tree + self.apply_mapping() + self.check_rootfs() + + def test_dir_setting_root_password(self): + """ + Ensures that the root password is set correctly. + """ + layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir) + self.root_password = "My secret root password" + self.call_bootstrap(layers.generate_manifest()) + self.validate_shadow_file() + + +class TestQcow2DockerSource(Qcow2ImageAccessor): + """ + Ensures that the conversion of tar files to qcow2 image with backing chains + works as expected. + """ + def get_image_info(self, image_path): + """ + Wrapper around "qemu-img info" used to information about disk image. + """ + cmd = ['qemu-img', 'info', image_path] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) + output, _ignore = proc.communicate() + return output.decode('utf-8').split('\n') + + def call_bootstrap(self): + """ + Generate tar files which mimic container layers and manifest content. + Mock get_image_details() and get_image_dir() and call the function + virt_bootstrap.bootstrap() for qcow2 format. + Return the root file systems used to generate the tar archives. + """ + layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir) + manifest = layers.generate_manifest() + + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as mocked: + + mocked['get_image_details'].return_value = manifest + mocked['get_image_dir'].return_value = self.tar_dir + + virt_bootstrap.bootstrap( + progress_cb=mock.Mock(), + uri='docker://foobar', + dest=self.dest_dir, + fmt='qcow2', + uid_map=self.uid_map, + gid_map=self.gid_map, + root_password=self.root_password + ) + + return layers.get_layers_rootfs() + + def test_qcow2_build_image(self): + """ + Ensures that the root file system is copied correctly to single + partition qcow2 image and layers are converted correctly to qcow2 + images. + """ + layers_rootfs = self.call_bootstrap() + + ################### + # Check base layer + ################### + base_layer_path = self.get_image_path() + img_format = self.get_image_info(base_layer_path)[1] + self.assertEqual(img_format, 'file format: qcow2') + images = [base_layer_path] + ########################### + # Check backing chains + ########################### + for i in range(1, len(layers_rootfs)): + img_path = self.get_image_path(i) + # img_info contains the output of "qemu-img info" + img_info = self.get_image_info(img_path) + self.assertEqual( + img_info[1], + 'file format: qcow2', + 'Invalid qcow2 disk image: %s' % img_path + ) + backing_file = self.get_image_path(i - 1) + self.assertEqual( + img_info[5], + 'backing file: %s' % backing_file, + "Incorrect backing file for: %s\n" + "Expected: %s\n" + "Found: %s" % (img_info, backing_file, img_info[5]) + ) + images.append(img_path) + ############################### + # Check extracted files/folders + ############################### + g = guestfs.GuestFS(python_return_dict=True) + for path in images: + g.add_drive_opts(path, readonly=True) + g.launch() + devices = g.list_filesystems() + for dev, rootfs in zip(sorted(devices), layers_rootfs): + self.rootfs_tree = rootfs + g.mount(dev, '/') + self.check_image(g) + g.umount('/') + g.shutdown() + + class TestDockerSource(unittest.TestCase): """ Unit tests for DockerSource @@ -36,9 +273,9 @@ class TestDockerSource(unittest.TestCase): ################################### def _mock_retrieve_layers_info(self, manifest, kwargs): """ - This method is gather common test pattern used in the following - two test cases which aim to return an instance of the class - DockerSource with some util functions being mocked. + This method is gather common test pattern used in the following cases + which aim is to return an instance of the class DockerSource with + get_image_details() and get_image_dir() being mocked. """ with mock.patch.multiple('virtBootstrap.utils', get_image_details=mock.DEFAULT, diff --git a/tests/file_source.py b/tests/file_source.py new file mode 100644 index 0000000..79bb234 --- /dev/null +++ b/tests/file_source.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> +# +# Copyright (C) 2017 Radostin Stoyanov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Regression tests which aim to exercise the creation of root file system +with FileSource. +""" + +import unittest + +from . import mock +from . import virt_bootstrap +from . import ImageAccessor +from . import NOT_ROOT + + +# pylint: disable=invalid-name +class TestDirFileSource(ImageAccessor): + """ + Ensures that files from rootfs tarball are extracted correctly. + """ + def call_bootstrap(self): + """ + Execute the bootstrap() method of virt_bootstrap. + """ + virt_bootstrap.bootstrap( + uri=self.tar_file, + dest=self.dest_dir, + fmt='dir', + progress_cb=mock.Mock(), + uid_map=self.uid_map, + gid_map=self.gid_map, + root_password=self.root_password + ) + + def test_dir_extract_rootfs(self): + """ + Extract rootfs from each dummy tarfile. + """ + self.call_bootstrap() + self.check_rootfs(skip_ownership=NOT_ROOT) + + @unittest.skipIf(NOT_ROOT, "Root privileges required") + def test_dir_ownership_mapping(self): + """ + Ensures that UID/GID mapping for extracted root file system are applied + correctly. + """ + self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + self.call_bootstrap() + self.apply_mapping() + self.check_rootfs() + + def test_dir_setting_root_password(self): + """ + Ensures that the root password is set correctly when FileSource is used + with fmt='dir'. + """ + self.root_password = 'my secret root password' + self.call_bootstrap() + self.validate_shadow_file() -- 2.13.5 _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list