Add more abstract form of testing which checks only the final result of creation of qcow2 image. --- src/virtBootstrap/utils.py | 96 +++++++++++++++++ tests/test_build_qcow2_image.py | 231 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 327 insertions(+) create mode 100644 tests/test_build_qcow2_image.py diff --git a/src/virtBootstrap/utils.py b/src/virtBootstrap/utils.py index e05a83f..2899022 100644 --- a/src/virtBootstrap/utils.py +++ b/src/virtBootstrap/utils.py @@ -33,6 +33,7 @@ import tempfile import logging import re +import guestfs import passlib.hosts # pylint: disable=invalid-name @@ -42,6 +43,8 @@ logger = logging.getLogger(__name__) DEFAULT_OUTPUT_FORMAT = 'dir' # Default virtual size of qcow2 image DEF_QCOW2_SIZE = '5G' +DEF_BASE_IMAGE_SIZE = 5 * 1024 * 1024 * 1024 + if os.geteuid() == 0: LIBVIRT_CONN = "lxc:///" DEFAULT_IMG_DIR = "/var/lib/virt-bootstrap/docker_images" @@ -51,6 +54,99 @@ else: DEFAULT_IMG_DIR += "/.local/share/virt-bootstrap/docker_images" +class Build_QCOW2_Image(object): + """ + Create qcow2 image with backing chains from list of tar files. + """ + def __init__(self, **kwargs): + """ + Initialize guestfs + + @param tar_files: List of tar files from which to create rootfs + @param dest: Destination directory where qcow2 images will be stored + @param progress: Instance of the progress module + """ + self.tar_files = kwargs['tar_files'] + if not isinstance(self.tar_files, list): + raise ValueError( + 'tar_files must be list not %s' % type(self.tar_files) + ) + self.progress = kwargs['progress'] + self.fmt = 'qcow2' + self.qcow2_files = [os.path.join(kwargs['dest'], 'layer-%s.qcow2' % i) + for i in range(len(self.tar_files))] + + self.g = guestfs.GuestFS(python_return_dict=True) + self.create_base_qcow2_layer(self.tar_files[0], self.qcow2_files[0]) + if len(self.tar_files) > 1: + self.create_backing_chains() + self.g.shutdown() + + def create_disk(self, qcow2_file, backingfile=None, readonly=False): + """ + Create and add qcow2 disk image. + """ + if backingfile is not None: + size = -1 + backingformat = self.fmt + else: + size = DEF_BASE_IMAGE_SIZE + backingformat = None + + self.g.disk_create(qcow2_file, self.fmt, size, backingfile, + backingformat) + self.g.add_drive_opts(qcow2_file, readonly, self.fmt) + + def tar_in(self, tar_file, dev): + """ + Extract tar file in disk device. + """ + self.g.mount(dev, '/') + # Restore extended attributes, SELinux contexts and POSIX ACLs + # from tar file. + self.g.tar_in(tar_file, '/', get_compression_type(tar_file), + xattrs=True, selinux=True, acls=True) + # Shutdown guestfs instance to avoid hot-plugging of devices. + self.g.umount('/') + + def create_base_qcow2_layer(self, tar_file, qcow2_file): + """ + Create and format base qcow2 layer. + + Do this separatelly when extracting multiple layers to avoid + hot-plugging of devices. + """ + self.progress("Creating base layer", logger=logger) + self.create_disk(qcow2_file) + self.g.launch() + dev = self.g.list_devices()[0] + self.progress("Formating disk image", logger=logger) + self.g.mkfs("ext3", dev) + self.tar_in(tar_file, dev) + self.progress("Extracting content of base layer", logger=logger) + self.g.shutdown() + + def create_backing_chains(self): + """ + Create backing chains for all layers after following the first + and tar-in the content. + """ + for i in range(1, len(self.tar_files)): + self.progress("Creating layer %d" % i, logger=logger) + self.create_disk( + self.qcow2_files[i], + backingfile=self.qcow2_files[i - 1] + ) + + self.g.launch() + devices = self.g.list_devices() + # Iterate trough tar files of layers and skip the base layer + for i, tar_file in enumerate(self.tar_files[1:]): + self.progress("Extracting content of layer %d" % (i + 1), + logger=logger) + self.tar_in(tar_file, devices[i]) + + def get_compression_type(tar_file): """ Get compression type of tar file. diff --git a/tests/test_build_qcow2_image.py b/tests/test_build_qcow2_image.py new file mode 100644 index 0000000..3ace5a3 --- /dev/null +++ b/tests/test_build_qcow2_image.py @@ -0,0 +1,231 @@ +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> +# +# Copyright (C) 2017 Radostin Stoyanov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for functions defined in virtBootstrap.utils.Build_QCOW2_Image +""" + +import os +import tarfile +import shutil +from tests import unittest +from tests import mock +from tests import virt_bootstrap +import guestfs + +TAR_DIR = os.path.realpath('tests/tarfiles') +IMAGES_DIR = os.path.realpath('tests/images') +ROOTFS_TREE = { + 'root': { + 'uid': 0, + 'gid': 0, + 'dirs': [ + '/bin', + '/boot', + '/dev', + '/etc', + '/home', + '/lib', + '/media', + '/mnt', + '/opt', + '/proc', + '/root', + '/sbin', + '/srv', + '/sys', + '/usr', + '/usr/include', + '/usr/lib', + '/usr/libexec', + '/usr/local', + '/usr/share', + '/var', + '/var/log', + '/var/mail', + '/var/spool', + '/var/tmp' + ], + 'files': [ + ('/etc/shadow', 0o000) + ] + }, + 'user1': { + 'uid': 500, + 'gid': 500, + 'dirs': [ + '/home/user1' + ], + 'files': [ + '/home/user1/test_file' + ] + }, + + 'user2': { + 'uid': 1000, + 'gid': 1000, + 'dirs': [ + '/home/user2', + '/home/user2/test_dir' + ], + 'files': [ + '/home/user2/test_dir/test_file' + ] + } +} + +TAR_FILES = { + 'test1.tar.gz': { + 'compression': 'w:gz' + }, + 'test2.tar': { + 'compression': 'w' + } +} + + +# pylint: disable=invalid-name +# pylint: disable=too-many-arguments +class TestBuild_Image(unittest.TestCase): + """ + Ensures that methods defined in the Build_QCOW2_Image class work + as expected. + """ + + def setUp(self): + """ + Create dummy rootfs tar files + """ + + if not os.path.exists(TAR_DIR): + os.makedirs(TAR_DIR) + + if not os.path.exists(IMAGES_DIR): + os.makedirs(IMAGES_DIR) + + for filename in TAR_FILES: + filepath = os.path.join(TAR_DIR, filename) + + compression = TAR_FILES[filename]['compression'] + with tarfile.open(filepath, compression) as tar: + self.create_user_dirs(tar) + + def tearDown(self): + """ + Remove created qcow2 images + """ + shutil.rmtree(TAR_DIR) + shutil.rmtree(IMAGES_DIR) + + def create_members(self, tar_handle, names, m_type, uid=0, gid=0, + permissions=0o755): + """ + Add create members of tar file + """ + for name in names: + if isinstance(name, tuple): + name, permissions = name + t_info = tarfile.TarInfo(name) + t_info.type = m_type + t_info.mode = permissions + t_info.uid = uid + t_info.gid = gid + tar_handle.addfile(t_info) + + def create_user_dirs(self, tar_handle): + """ + Create rootfs tree + """ + for user in ROOTFS_TREE: + # Create folders + self.create_members( + tar_handle, + ROOTFS_TREE[user]['dirs'], + tarfile.DIRTYPE, + uid=ROOTFS_TREE[user]['uid'], + gid=ROOTFS_TREE[user]['gid'] + ) + # Create files + self.create_members( + tar_handle, + ROOTFS_TREE[user]['files'], + tarfile.REGTYPE, + uid=ROOTFS_TREE[user]['uid'], + gid=ROOTFS_TREE[user]['gid'] + ) + + def check_members(self, g): + """ + Check if all files and folders exist in the qcow2 image. + """ + for user in ROOTFS_TREE: + permissions = 0o755 + user_uid = ROOTFS_TREE[user]['uid'] + user_gid = ROOTFS_TREE[user]['gid'] + # Check folders + for name in ROOTFS_TREE[user]['dirs']: + if isinstance(name, tuple): + name, permissions = name + self.assertTrue(g.is_dir(name), "Not directory %s" % name) + stat = g.stat(name) + self.assertEqual(stat['mode'] & 0o777, permissions) + self.assertEqual(stat['uid'], user_uid) + self.assertEqual(stat['gid'], user_gid) + + # Check files + for name in ROOTFS_TREE[user]['files']: + if isinstance(name, tuple): + name, permissions = name + self.assertTrue(g.is_file(name), "Not file %s" % name) + stat = g.stat(name) + self.assertEqual(stat['mode'] & 0o777, permissions) + self.assertEqual(stat['uid'], user_uid) + self.assertEqual(stat['gid'], user_gid) + + def check_qcow2_images(self, images): + """ + Ensures that all qcow2 images contain all files. + """ + g = guestfs.GuestFS(python_return_dict=True) + for image_path in images: + g.add_drive_opts(image_path, readonly=1) + g.launch() + devices = g.list_filesystems() + for dev in devices: + g.mount(dev, '/') + self.check_members(g) + g.umount('/') + + g.shutdown() + + def runTest(self): + """ + Create qcow2 image from each dummy tarfile using FileSource. + """ + images = [] + for archive in TAR_FILES: + dest = os.path.join(IMAGES_DIR, archive.split('.')[0]) + images.append(os.path.join(dest, "%s.qcow2" % archive)) + uri = os.path.join(TAR_DIR, archive) + virt_bootstrap.bootstrap( + uri=uri, + dest=dest, + fmt='qcow2', + progress_cb=mock.Mock() + ) + self.check_qcow2_images(images) -- 2.13.4 _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list