These tests aim to verify the output of virt-bootstrap in more abstract manner by creating tar files calling bootstrap() with them to check the result when output format is set to "dir" or "qcow2". --- tests/__init__.py | 436 ++++++++++++++++++++++++++++++++++++++++ tests/docker_source.py | 535 +++++++++++++++++++++++++++++++++++++++++++++++++ tests/file_source.py | 184 +++++++++++++++++ tests/test_utils.py | 143 +++++++++++++ 4 files changed, 1298 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/docker_source.py create mode 100644 tests/file_source.py create mode 100644 tests/test_utils.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..7a53c38 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,436 @@ +# -*- coding: utf-8 -*- +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> +# +# Copyright (C) 2017 Radostin Stoyanov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Regression tests for virt-bootstrap +""" + +import hashlib +import io +import os +import shutil +import sys +import tarfile +import unittest +import passlib.hosts + +try: + import mock +except ImportError: + import unittest.mock as mock + +sys.path.insert(0, '../src') # noqa: E402 + +# pylint: disable=import-error, wrong-import-position +from virtBootstrap import virt_bootstrap +from virtBootstrap import sources +from virtBootstrap import progress +from virtBootstrap import utils + +__all__ = ['virt_bootstrap', 'sources', 'progress', 'utils'] + + +# pylint: disable=invalid-name,too-many-instance-attributes,too-many-arguments +class BuildTarFiles(unittest.TestCase): + """ + Create dummy tar files used for testing. + """ + + def setUp(self): + """ + Create dummy rootfs tar files + """ + self.tar_dir = os.path.realpath('tests/tarfiles') + self.dest_dir = os.path.realpath('tests/filesystems') + self.default_permissions = 0o755 + self.shadow_file_permissions = 0o600 + self.shadow_file_content = "root:*:" + self.tmp_folders = [self.tar_dir, self.dest_dir] + self.tar_file = None + self.uid_map = [] + self.gid_map = [] + self.root_password = '' + self.checked_members = set() + self.rootfs_tree = { + 'root': { + 'uid': 0, + 'gid': 0, + 'dirs': [ + 'bin', + 'etc', + 'home', + 'lib', + 'opt', + 'root', + 'run', + 'sbin', + 'srv', + 'tmp', + 'usr', + 'var', + ], + 'files': [ + 'etc/hosts', + ( + 'etc/shadow', + self.shadow_file_permissions, + self.shadow_file_content + ) + ] + }, + 'user1': { + 'uid': 500, + 'gid': 500, + 'dirs': ['home/user1'], + 'files': [ + ('home/user1/test_file', 0o644, 'test data') + ] + }, + + 'user2': { + 'uid': 1000, + 'gid': 1000, + 'dirs': [ + 'home/user2', + 'home/user2/test_dir' + ], + 'files': [ + 'home/user2/test_dir/test_file' + ] + } + } + + # Create test folders + for test_dir in self.tmp_folders: + if not os.path.exists(test_dir): + os.makedirs(test_dir) + + self.create_tar_file() + + def create_tar_file(self): + """ + Use temporary name to create uncomressed tarball with dummy file + system. Then get checksum of the content and rename the tarfile to + <checksum>.tar In this way we can easily generate Manifest of Docker + registry and pass it to virt-bootstrap. + """ + filepath = os.path.join(self.tar_dir, 'tmp_file.tar') + with tarfile.open(filepath, 'w') as tar: + self.create_user_dirs(tar) + # Get sha256 checksum of the archive + with open(filepath, 'rb') as file_handle: + file_hash = hashlib.sha256(file_handle.read()).hexdigest() + # Rename the archive to <checksum>.tar + new_filepath = os.path.join(self.tar_dir, "%s.tar" % file_hash) + os.rename(filepath, new_filepath) + self.tar_file = new_filepath + + def tearDown(self): + """ + Clean up. + """ + for test_dir in self.tmp_folders: + shutil.rmtree(test_dir) + + def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0, + permissions=None): + """ + Add members to tar file. + """ + if permissions is None: + permissions = self.default_permissions + + for name in members: + data = '' + if isinstance(name, tuple): + name, permissions, data = name + data_encoded = data.encode('utf-8') + + t_info = tarfile.TarInfo(name) + t_info.type = m_type + t_info.mode = permissions + t_info.uid = uid + t_info.gid = gid + t_info.size = len(data_encoded) + + tar_handle.addfile(t_info, io.BytesIO(data_encoded)) + + def create_user_dirs(self, tar_handle): + """ + Create root file system tree in tar archive. + """ + tar_members = [ + ['dirs', tarfile.DIRTYPE], + ['files', tarfile.REGTYPE], + ] + + for user in self.rootfs_tree: + for members, tar_type in tar_members: + self.create_tar_members( + tar_handle, + self.rootfs_tree[user][members], + tar_type, + uid=self.rootfs_tree[user]['uid'], + gid=self.rootfs_tree[user]['gid'] + ) + + def apply_mapping(self): + """ + This method applies UID/GID mapping to all users defined in + self.rootfs_tree. + """ + + for user in self.rootfs_tree: + user_uid = self.rootfs_tree[user]['uid'] + user_gid = self.rootfs_tree[user]['gid'] + + if self.uid_map: + for start, tartget, rng in self.uid_map: + if user_uid >= start and user_uid <= start + rng: + diff = user_uid - start + self.rootfs_tree[user]['uid'] = tartget + diff + + if self.gid_map: + for start, tartget, rng in self.gid_map: + if user_gid >= start and user_gid <= start + rng: + diff = user_gid - start + self.rootfs_tree[user]['gid'] = tartget + diff + + def check_rootfs(self, skip_ownership=False): + """ + Check if the root file system was extracted correctly. + """ + for user in self.rootfs_tree: + self.check_extracted_members( + user, 'files', os.path.isfile, skip_ownership + ) + self.check_extracted_members( + user, 'dirs', os.path.isdir, skip_ownership + ) + + def check_extracted_members(self, user, members, check_existance, + skip_ownership=False): + """ + Check permissions, ownership and content of + extracted files or directories. + + @param user: user name defined in self.rootfs_tree + @param members: The string 'dirs' or 'files' + @param check_existance: Function used to confirm the existnace of + member. (E.g. os.path.isdir or os.path.isfile) + @param skip_ownership: Whther to skip verification of ownership. Useful + when members are extracted with unprivileged user. + """ + permissions = self.default_permissions + user_uid = self.rootfs_tree[user]['uid'] + user_gid = self.rootfs_tree[user]['gid'] + + for member_name in self.rootfs_tree[user][members]: + # If unpack member if it is tuple. Allow us to specify permissions + # and data per file. + member_data = '' + if isinstance(member_name, tuple): + if len(member_name) == 3: + member_data = member_name[2] + member_name, permissions = member_name[:2] + + # Skip already checked members. E.g. when multiple layers were + # extracted we want to check only the latest version of file. + if member_name in self.checked_members: + continue + else: + self.checked_members.add(member_name) + + ######################### + # Assertion functions + ######################### + member_path = os.path.join(self.dest_dir, member_name) + self.assertTrue( + check_existance(member_path), + 'Member was not extracted: %s' % member_path + ) + stat = os.stat(member_path) + self.assertEqual( + stat.st_mode & 0o777, permissions, + 'Incorrect permissions: %s' % member_path + ) + if not skip_ownership: + self.assertEqual( + stat.st_uid, user_uid, + 'Incorrect UID: %s' % member_path + ) + self.assertEqual( + stat.st_gid, user_gid, + 'Incorrect GID: %s' % member_path + ) + + if member_data: + with open(member_path, 'r') as content: + file_content = content.read() + self.assertEqual( + member_data, file_content, + 'Incorrect file content: %s\n' + 'Found: %s\n' + 'Expected: %s' % (member_path, file_content, member_data) + ) + + def validate_shadow_file(self, shadow_path, + skip_hash=False, skip_ownership=False): + """ + Ensure that extracted /etc/shadow file has correct ownership, + permissions and hashed root password. + """ + self.assertTrue( + os.path.isfile(shadow_path), + 'Does not exist: %s' % shadow_path + ) + stat = os.stat(shadow_path) + self.assertEqual( + stat.st_mode & 0o777, + self.shadow_file_permissions, + 'Shadow file has incorrect permissions: %s' % shadow_path + ) + if not skip_ownership: + self.assertEqual( + stat.st_uid, + self.rootfs_tree['root']['uid'], + 'Shadow file has incorrect UID: %s' % shadow_path + ) + self.assertEqual( + stat.st_gid, + self.rootfs_tree['root']['gid'], + 'Shadow file has incorrect GID: %s' % shadow_path + ) + + if not skip_hash: + # Note: For simplicity we assume that the first line of the file + # contains the root entry. + with open(shadow_path, 'r') as content: + shadow_content = content.readlines() + + if not shadow_content: + raise Exception("File is empty: %s" % shadow_path) + + self.assertTrue( + passlib.hosts.linux_context.verify( + self.root_password, + shadow_content[0].split(':')[1] + ), + "Root password hash is invalid." + ) + + def validate_shadow_file_in_image(self, g): + """ + Validate permission, ownership and root password hash for /etc/shadow + file stored in qcow2 image. + """ + self.assertTrue( + g.is_file('/etc/shadow'), + "Shadow file does not exist" + ) + + stat = g.stat('/etc/shadow') + self.assertEqual( + stat['mode'] & 0o777, + self.shadow_file_permissions, + 'Shadow file has incorrect permissions' + ) + self.assertEqual( + stat['uid'], + self.rootfs_tree['root']['uid'], + 'Shadow file has incorrect UID' + ) + self.assertEqual( + stat['gid'], + self.rootfs_tree['root']['gid'], + 'Shadow file has incorrect GID' + ) + + # Note: For simplicity we assume that the first line of the file + # contains the root entry. + shadow_content = g.cat('/etc/shadow').split('\n') + self.assertTrue( + passlib.hosts.linux_context.verify( + self.root_password, + shadow_content[0].split(':')[1] + ), + "Root password hash is invalid." + ) + + def check_image_content(self, g, user, members, check_existance): + """ + Verify the existance, permissions, ownership of members in qcow2 image. + + @param g: guestfs handle + @param user: User defined in self.rootfs_tree + @param members: The string 'dirs' or 'files' + @param check_existance: Function used to confirm the existnace of + member. (E.g. os.path.isdir or os.path.isfile) + """ + permissions = self.default_permissions + user_uid = self.rootfs_tree[user]['uid'] + user_gid = self.rootfs_tree[user]['gid'] + + for member_name in self.rootfs_tree[user][members]: + # Get specified permissions of file. + if isinstance(member_name, tuple): + member_name, permissions = member_name[:2] + + # Skip already checked files. + if member_name in self.checked_members: + continue + else: + self.checked_members.add(member_name) + + # When using guestfs all names should start with '/' + if not member_name.startswith('/'): + member_name = '/' + member_name + + self.assertTrue( + check_existance(member_name), + "Member was not found: %s" % member_name + ) + stat = g.stat(member_name) + self.assertEqual( + stat['mode'] & 0o777, permissions, + 'Incorrect permissions: %s' % member_name + ) + self.assertEqual( + stat['uid'], + user_uid, + "Incorrect UID: %s\n" + "Found: %s\n" + "Expected: %s" % (member_name, stat['uid'], user_uid) + ) + self.assertEqual( + stat['gid'], + user_gid, + "Incorrect GID: %s\n" + "Found: %s\n" + "Expected: %s" % (member_name, stat['gid'], user_gid) + ) + + def check_image(self, g): + """ + Check the presence of files and folders in qcow2 image. + """ + for user in self.rootfs_tree: + # Check folders + self.check_image_content(g, user, 'dirs', g.is_dir) + # Check files + self.check_image_content(g, user, 'files', g.is_file) diff --git a/tests/docker_source.py b/tests/docker_source.py new file mode 100644 index 0000000..cf2b2d2 --- /dev/null +++ b/tests/docker_source.py @@ -0,0 +1,535 @@ +# -*- coding: utf-8 -*- +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> +# +# Copyright (C) 2017 Radostin Stoyanov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Regression tests which aim to excercise creation of root file system +with DockerSource. + +To avoid fetching network resources we mock out the functions: +- utils.get_image_details(): Returns manifest content +- utils.get_image_dir(): Returns the directory which contains the tar files + +Brief description of this tests: +1. Create 3 dummy tar files named <checksum>.tar used as image layers. +2. Generate manifest content. +3. Mock out get_image_details() and get_image_dir(). +4. Call bootstrap(). +5. Check the result. +""" + +import copy +import os +import subprocess +import unittest +import guestfs + +from . import mock +from . import sources +from . import virt_bootstrap +from . import BuildTarFiles + + +# pylint: disable=invalid-name +class CreateLayers(BuildTarFiles): + """ + Create tarfiles to used as image layers and generate manifest for them. + """ + def check_result(self, layers_rootfs, dest): + """ + This method is used to check extracted root file system. + """ + pass + + def call_bootstrap(self, manifest, dest): + """ + This method is used to call vurtBootstrap.bootstrap() + """ + pass + + def generate_manifest(self, layers, layers_rootfs): + """ + Generate Manifest content and then call self.call_bootstrap() and + self.check_result() + """ + manifest = { + "schemaVersion": 2, + "layers": [ + { + "digest": + "sha256:" + os.path.basename(layer).split('.')[0] + } + for layer in layers + ] + } + self.call_bootstrap(manifest, self.dest_dir) + self.check_result(layers_rootfs, self.dest_dir) + + def main(self): + """ + Create dummy tar files used to simulate layers of image + then call generate_manifest(). + + The variable: + - "layers" store a lists of paths to created archives. + + - "layers_rootfs" store the value of self.rootfs_tree used to generate + tarball. + """ + # Store base layer info + layers = [self.tar_file] + layers_rootfs = [copy.deepcopy(self.rootfs_tree)] + + # Create Layer 1 + self.rootfs_tree['root']['files'] = [ + ('etc/foo/bar', 0o644, "This should be overwritten") + ] + self.create_tar_file() + # Store layer 1 info + layers.append(self.tar_file) + layers_rootfs.append(copy.deepcopy(self.rootfs_tree)) + + # Create Layer 2 + self.rootfs_tree['root']['files'] = [ + ('etc/foo/bar', 0o644, "Content of etc/foo/bar"), + ('bin/foobar', 0o755, "My executable script") + ] + self.create_tar_file() + # Store layer 2 info + layers.append(self.tar_file) + layers_rootfs.append(copy.deepcopy(self.rootfs_tree)) + + self.generate_manifest(layers, layers_rootfs) + + +class DirExtractRootFS(CreateLayers): + """ + Ensures that all layers extracted correctly in destination folder. + """ + def check_result(self, layers_rootfs, dest): + """ + Iterates trough values of layers_rootfs in reverse order (from the last + layer to first) and calls check_extracted_files(). + """ + for rootfs_tree in layers_rootfs[::-1]: + self.rootfs_tree = rootfs_tree + self.check_rootfs(skip_ownership=(os.geteuid != 0)) + + def call_bootstrap(self, manifest, dest): + """ + Mock get_image_details() and get_image_dir() then call the function + virt-bootstra.bootstrap(). + """ + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as mocked: + + mocked['get_image_details'].return_value = manifest + mocked['get_image_dir'].return_value = self.tar_dir + + virt_bootstrap.bootstrap( + uri='docker://foobar', + dest=dest, + fmt='dir', + progress_cb=mock.Mock() + ) + + def runTest(self): + """ + Execute this test. + """ + self.main() + + +@unittest.skipIf(os.geteuid() != 0, "Root privileges required") +class DirOwnershipMapping(CreateLayers): + """ + Ensures that the mapping of ownership works as expected. + """ + + def check_result(self, layers_rootfs, dest): + """ + Iterate trough all values of layers_rootfs in reverse order apply the + mapping self.rootfs_tree used to create the test archive and verify + extracted files. + """ + for rootfs_tree in layers_rootfs[::-1]: + self.rootfs_tree = rootfs_tree + self.apply_mapping() + self.check_rootfs() + + def call_bootstrap(self, manifest, dest): + """ + Mock get_image_details() and get_image_dir() and call the function + virt_bootstrap.bootstrap() with UID/GID mapping values. + """ + self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as mocked: + mocked['get_image_details'].return_value = manifest + mocked['get_image_dir'].return_value = self.tar_dir + virt_bootstrap.bootstrap( + progress_cb=mock.Mock(), + uri='docker://foo', + dest=dest, + fmt='dir', + uid_map=self.uid_map, + gid_map=self.gid_map + ) + + def runTest(self): + """ + Execute this test. + """ + self.main() + + +@unittest.skipIf(os.geteuid() != 0, "Root privileges required") +class DirSettingRootPassword(CreateLayers): + """ + Ensures that the root password is set correctly. + """ + def check_result(self, layers_rootfs, dest): + """ + Validate shadow file + """ + self.validate_shadow_file(os.path.join(dest, 'etc/shadow')) + + def call_bootstrap(self, manifest, dest): + """ + Mock get_image_details() and get_image_dir() and call the function + virt_bootstrap.bootstrap() with root_password value. + """ + self.root_password = "My secret root password" + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as mocked: + + mocked['get_image_details'].return_value = manifest + mocked['get_image_dir'].return_value = self.tar_dir + + virt_bootstrap.bootstrap( + progress_cb=mock.Mock(), + uri='docker://foo', + dest=dest, + fmt='dir', + root_password=self.root_password + ) + + def runTest(self): + """ + Execute this test. + """ + self.main() + + +class Qcow2BuildImage(CreateLayers): + """ + Ensures that the convertion of tar files to qcow2 image with backing chains + works as expected. + """ + def get_image_info(self, image_path): + """ + Returns information about the disk image using "qemu-img". + """ + cmd = ['qemu-img', 'info', image_path] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) + output, _ignore = proc.communicate() + return output.decode('utf-8').split('\n') + + def check_result(self, layers_rootfs, dest): + """ + Verify that layers were converted correctly to qcow2 images. + """ + ################### + # Check base layer + ################### + base_layer_path = os.path.join(dest, "layer-0.qcow2") + base_info = self.get_image_info(base_layer_path) + self.assertEqual(base_info[1], 'file format: qcow2') + images = [base_layer_path] + ########################### + # Check backing chains + ########################### + for i in range(1, len(layers_rootfs)): + img_path = os.path.join(dest, "layer-%d.qcow2" % i) + img_info = self.get_image_info(img_path) + self.assertEqual( + img_info[1], + 'file format: qcow2', + 'Invalid qcow2 disk image: %s' % img_path + ) + backing_file = os.path.join(dest, "layer-%d.qcow2" % (i - 1)) + self.assertEqual( + img_info[5], + 'backing file: %s' % backing_file, + "Incorrect backing file for: %s\n" + "Expected: %s\n" + "Found: %s" % (img_info, backing_file, img_info[5]) + ) + images.append(img_path) + ############################### + # Check extracted files/folders + ############################### + g = guestfs.GuestFS(python_return_dict=True) + for path in images: + g.add_drive_opts(path, readonly=True) + g.launch() + devices = g.list_filesystems() + for dev, rootfs in zip(sorted(devices), layers_rootfs): + self.rootfs_tree = rootfs + g.mount(dev, '/') + self.check_image(g) + g.umount('/') + g.shutdown() + + def call_bootstrap(self, manifest, dest): + """ + Mock get_image_details() and get_image_dir() and call the function + virt_bootstrap.bootstrap() for qcow2 format. + """ + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as mocked: + + mocked['get_image_details'].return_value = manifest + mocked['get_image_dir'].return_value = self.tar_dir + + virt_bootstrap.bootstrap( + uri='docker://foobar', + dest=dest, + fmt='qcow2', + progress_cb=mock.Mock() + ) + + def runTest(self): + """ + Execute this test. + """ + self.main() + + +@unittest.skip("Need fix for this test to pass") +class Qcow2OwnershipMapping(Qcow2BuildImage): + """ + Ensures that UID/GID mapping works correctly for qcow2 conversion. + """ + def check_result(self, layers_rootfs, dest): + """ + Iterate through values of layers_rootfs in reverse order and apply the + mapping values to self.rootfs_tree. Then verify the ownership of + files/folders created in the last backing chain of qcow2 image. + """ + g = guestfs.GuestFS(python_return_dict=True) + image_path = os.path.join(dest, "layer-%d.qcow2" % len(layers_rootfs)) + g.add_drive_opts(image_path, readonly=True) + + g.launch() + for rootfs in layers_rootfs[::-1]: + self.rootfs_tree = rootfs + self.apply_mapping() + g.mount('/dev/sda', '/') + self.check_image(g) + g.umount('/') + g.shutdown() + + def call_bootstrap(self, manifest, dest): + """ + Mock the functions get_image_details() and get_image_dir() then call + virtbootstrap.bootstrap() with UID/GID mapping values and fmt="qcow2". + """ + self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as mocked: + mocked['get_image_details'].return_value = manifest + mocked['get_image_dir'].return_value = self.tar_dir + virt_bootstrap.bootstrap( + uri='docker://foobar', + progress_cb=mock.Mock(), + fmt='qcow2', + dest=dest, + uid_map=self.uid_map, + gid_map=self.gid_map + ) + + +@unittest.skip("Need fix for this test to pass") +class Qcow2SettingRootPassword(Qcow2BuildImage): + """ + Ensures that the root password is set correctly in the last backing chain + of image qcow2 image. + """ + def check_result(self, layers_rootfs, dest): + """ + Load last backing chain and validate shadow file. + """ + g = guestfs.GuestFS(python_return_dict=True) + g.add_drive_opts( + os.path.join(dest, "layer-%d.qcow2" % len(layers_rootfs)), + readonly=True + ) + g.launch() + g.mount('/dev/sda', '/') + self.validate_shadow_file_in_image(g) + g.umount('/') + g.shutdown() + + def call_bootstrap(self, manifest, dest): + """ + Mock the functions get_image_details() and get_image_dir() then + call virt_bootstrap.bootstrap() with root password value and qcow2 + output format. + """ + self.root_password = "My secret password" + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as mocked: + mocked['get_image_details'].return_value = manifest + mocked['get_image_dir'].return_value = self.tar_dir + virt_bootstrap.bootstrap( + uri='docker://foobar', + dest=dest, + fmt='qcow2', + progress_cb=mock.Mock(), + root_password=self.root_password + ) + + +class TestDockerSource(unittest.TestCase): + """ + Unit tests for DockerSource + """ + ################################### + # Tests for: retrieve_layers_info() + ################################### + def _mock_retrieve_layers_info(self, manifest, kwargs): + """ + This method is gather common test pattern used in the following + two test cases which aim to return an instance of the class + DockerSource with some util functions being mocked. + """ + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as m_utils: + + m_utils['get_image_details'].return_value = manifest + m_utils['get_image_dir'].return_value = '/images_path' + + patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri' + with mock.patch(patch_method) as m_uri: + src_instance = sources.DockerSource(**kwargs) + return (src_instance, m_uri, m_utils) + + def test_retrieve_layers_info_pass_arguments_to_get_image_details(self): + """ + Ensures that retrieve_layers_info() calls get_image_details() + with all passed arguments. + """ + src_kwargs = { + 'uri': '', + 'progress': mock.Mock() + } + + manifest = {'schemaVersion': 2, 'layers': []} + (src_instance, + m_uri, m_utils) = self._mock_retrieve_layers_info(manifest, + src_kwargs) + + kwargs = { + 'insecure': src_instance.insecure, + 'username': src_instance.username, + 'password': src_instance.password, + 'raw': True + } + m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs) + + def test_retrieve_layers_info_schema_version_1(self): + """ + Ensures that retrieve_layers_info() extracts the layers' information + from manifest with schema version 1 a list with format: + ["digest", "sum_type", "file_path", "size"]. + """ + kwargs = { + 'uri': '', + 'progress': mock.Mock() + } + + manifest = { + 'schemaVersion': 1, + 'fsLayers': [ + {'blobSum': 'sha256:75c416ea'}, + {'blobSum': 'sha256:c6ff40b6'}, + {'blobSum': 'sha256:a7050fc1'} + ] + } + + expected_result = [ + ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None], + ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None], + ['sha256', '75c416ea', '/images_path/75c416ea.tar', None] + ] + + src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] + self.assertEqual(src_instance.layers, expected_result) + + def test_retrieve_layers_info_schema_version_2(self): + """ + Ensures that retrieve_layers_info() extracts the layers' information + from manifest with schema version 2 a list with format: + ["digest", "sum_type", "file_path", "size"]. + """ + kwargs = { + 'uri': '', + 'progress': mock.Mock() + } + + manifest = { + 'schemaVersion': 2, + "layers": [ + {"size": 47103294, "digest": "sha256:75c416ea"}, + {"size": 814, "digest": "sha256:c6ff40b6"}, + {"size": 513, "digest": "sha256:a7050fc1"} + ] + } + + expected_result = [ + ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294], + ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814], + ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513] + ] + + src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] + self.assertEqual(src_instance.layers, expected_result) + + def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self): + """ + Ensures that retrieve_layers_info() calls get_image_details() + with all passed arguments. + """ + kwargs = { + 'uri': '', + 'progress': mock.Mock() + } + + manifest = {'schemaVersion': 3} + with self.assertRaises(ValueError): + self._mock_retrieve_layers_info(manifest, kwargs) diff --git a/tests/file_source.py b/tests/file_source.py new file mode 100644 index 0000000..dd7fd00 --- /dev/null +++ b/tests/file_source.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> +# +# Copyright (C) 2017 Radostin Stoyanov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Regression tests which aim to excercise the creation of root file system +with FileSource. +""" + +import os +import unittest +import guestfs + +from . import virt_bootstrap +from . import BuildTarFiles +from . import mock + + +# pylint: disable=invalid-name +class Qcow2BuildImage(BuildTarFiles): + """ + Ensures that building qcow2 image from tarball with root file system + works as expected. + """ + + def check_qcow2_images(self, image_path): + """ + Ensures that qcow2 images contain all files. + """ + g = guestfs.GuestFS(python_return_dict=True) + g.add_drive_opts(image_path, readonly=True) + g.launch() + g.mount('/dev/sda', '/') + self.check_image(g) + g.umount('/') + g.shutdown() + + def get_image_path(self): + """ + Returns the path where the qcow2 image will be stored. + """ + return os.path.join( + self.dest_dir, + "%s.qcow2" % os.path.basename(self.tar_file) + ) + + def runTest(self): + """ + Create qcow2 image from each dummy tarfile. + """ + virt_bootstrap.bootstrap( + uri=self.tar_file, + dest=self.dest_dir, + fmt='qcow2', + progress_cb=mock.Mock() + ) + self.check_qcow2_images(self.get_image_path()) + + +@unittest.skip("Not implemented") +class Qcow2OwnershipMapping(Qcow2BuildImage): + """ + Ensures that UID/GID mapping works correctly for qcow2 conversion. + """ + def runTest(self): + """ + Create qcow2 image from each dummy tarfile. + """ + self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + virt_bootstrap.bootstrap( + progress_cb=mock.Mock(), + uri=self.tar_file, + dest=self.dest_dir, + fmt='qcow2', + uid_map=self.uid_map, + gid_map=self.gid_map + ) + self.apply_mapping() + self.check_qcow2_images(self.get_image_path()) + + +@unittest.skip("Not implemented") +class Qcow2SettingRootPassword(Qcow2BuildImage): + """ + Ensures that the root password is set correctly in the backing file + of image qcow2 image. + """ + def runTest(self): + """ + Create qcow2 image from each dummy tarfile. + """ + self.root_password = "My secret password" + virt_bootstrap.bootstrap( + progress_cb=mock.Mock(), + uri=self.tar_file, + dest=self.dest_dir, + fmt='qcow2', + root_password=self.root_password + ) + self.check_image = self.validate_shadow_file_in_image + self.check_qcow2_images(self.get_image_path()) + + +@unittest.skipIf(os.geteuid() != 0, "Root privileges required") +class DirExtractRootFS(BuildTarFiles): + """ + Ensures that files from rootfs tarball are extracted correctly. + """ + def runTest(self): + """ + Extract rootfs from each dummy tarfile. + """ + dest = self.dest_dir + virt_bootstrap.bootstrap( + uri=self.tar_file, + dest=dest, + fmt='dir', + progress_cb=mock.Mock() + ) + self.check_rootfs() + + +@unittest.skipIf(os.geteuid() != 0, "Root privileges required") +class DirOwnershipMapping(DirExtractRootFS): + """ + Ensures that UID/GID mapping for extracted root file system are applied + correctly. + """ + def runTest(self): + """ + Extract the dummy tarfiles using FileSource and apply UID/GID mappings. + """ + self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]] + # Create qcow2 images + dest = self.dest_dir + virt_bootstrap.bootstrap( + uri=self.tar_file, + dest=dest, + fmt='dir', + progress_cb=mock.Mock(), + uid_map=self.uid_map, + gid_map=self.gid_map + ) + self.apply_mapping() + self.check_rootfs() + + +@unittest.skipIf(os.geteuid() != 0, "Root privileges required") +class DirSettingRootPassword(DirExtractRootFS): + """ + Ensures that the root password is set correctly when FileSource is used + with fmt='dir'. + """ + + def runTest(self): + """ + Extract rootfs from each dummy tarfile and set root password. + """ + self.root_password = 'my secret root password' + virt_bootstrap.bootstrap( + uri=self.tar_file, + dest=self.dest_dir, + fmt='dir', + progress_cb=mock.Mock(), + root_password=self.root_password + ) + self.validate_shadow_file(os.path.join(self.dest_dir, 'etc/shadow')) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..c2f55b5 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> +# +# Copyright (C) 2017 Radostin Stoyanov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Unit tests for functions defined in virtBootstrap.utils +""" +import unittest +from . import utils + + +# pylint: disable=invalid-name +class TestUtils(unittest.TestCase): + """ + Ensures that functions defined in the utils module of virtBootstrap + work as expected. + """ + ################################### + # Tests for: bytes_to_size() + ################################### + def test_utils_bytes_to_size(self): + """ + Validates the output of bytes_to_size() for some test cases. + """ + test_values = { + 0: '0', 1: '1', 512: '512', 1000: '0.98 KiB', 1024: '1 KiB', + 4096: '4 KiB', 5120: '5 KiB', 10 ** 10: '9.31 GiB' + } + for value in test_values: + self.assertEqual(utils.bytes_to_size(value), test_values[value]) + + ################################### + # Tests for: size_to_bytes() + ################################### + def test_utils_size_to_bytes(self): + """ + Validates the output of size_to_bytes() for some test cases. + """ + test_values = [1, '0'] + test_formats = ['TB', 'GB', 'MB', 'KB', 'B'] + expected_output = [1099511627776, 1073741824, 1048576, 1024, 1, + 0, 0, 0, 0, 0] + i = 0 + for value in test_values: + for fmt in test_formats: + self.assertEqual(utils.size_to_bytes(value, fmt), + expected_output[i]) + i += 1 + + ################################### + # Tests for: is_new_layer_message() + ################################### + def test_utils_is_new_layer_message(self): + """ + Ensures that is_new_layer_message() returns True when message + from the skopeo's stdout indicates processing of new layer + and False otherwise. + """ + + valid_msgs = [ + "Copying blob sha256:be232718519c940b04bc57", + "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2" + ] + + invalid_msgs = [ + 'Copying config sha256', 'test', '' + ] + + for msg in valid_msgs: + self.assertTrue(utils.is_new_layer_message(msg)) + for msg in invalid_msgs: + self.assertFalse(utils.is_new_layer_message(msg)) + + ################################### + # Tests for: is_layer_config_message() + ################################### + def test_utils_is_layer_config_message(self): + """ + Ensures that is_layer_config_message() returns True when message + from the skopeo's stdout indicates processing of manifest file + of container image and False otherwise. + """ + invalid_msgs = [ + "Copying blob sha256:be232718519c940b04bc57", + "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2", + '' + ] + + valid_msg = 'Copying config sha256:d355ed3537e94e76389fd78b7724' + + self.assertTrue(utils.is_layer_config_message(valid_msg)) + for msg in invalid_msgs: + self.assertFalse(utils.is_layer_config_message(msg)) + + ################################### + # Tests for: make_async() + ################################### + def test_utils_make_async(self): + """ + Ensures that make_async() sets O_NONBLOCK flag on PIPE. + """ + + proc = utils.subprocess.Popen( + ["echo"], + stdout=utils.subprocess.PIPE + ) + pipe = proc.stdout + + fd = pipe.fileno() + F_GETFL = utils.fcntl.F_GETFL + O_NONBLOCK = utils.os.O_NONBLOCK + + self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK) + utils.make_async(fd) + self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK) + proc.wait() + pipe.close() + + ################################### + # Tests for: str2float() + ################################### + def test_utils_str2float(self): + """ + Validates the output of str2float() for some test cases. + """ + test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25} + for test in test_values: + self.assertEqual(utils.str2float(test), test_values[test]) -- 2.13.5 _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list