[virt-bootstrap] [PATCH v7 03/26] Add regression tests

These tests aim to verify the output of virt-bootstrap by creating tar
archives which contain dummy root file system and call the function
bootstrap(). The check the extracted root file system.
 tests/__init__.py      | 439 +++++++++++++++++++++++++++++++++++++++++++++++++
 tests/docker_source.py | 243 ++++++++++++++++++++++++++-
 tests/file_source.py   |  78 +++++++++
 3 files changed, 757 insertions(+), 3 deletions(-)
 create mode 100644 tests/file_source.py

diff --git a/tests/__init__.py b/tests/__init__.py
index 1b06616..9bbae53 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -20,8 +20,18 @@
 Test suite for virt-bootstrap
+import copy
+import hashlib
+import io
+import os
+import shutil
 import sys
+import tarfile
+import tempfile
 import unittest
+import passlib.hosts
+import guestfs
     import mock
@@ -37,3 +47,432 @@ from virtBootstrap import progress
 from virtBootstrap import utils
 __all__ = ['virt_bootstrap', 'sources', 'progress', 'utils']
+NOT_ROOT = (os.geteuid() != 0)
+    'root': {
+        'uid': 0,
+        'gid': 0,
+        'dirs': [
+            'bin', 'etc', 'home', 'lib', 'opt', 'root', 'run', 'sbin',
+            'srv', 'tmp', 'usr', 'var'
+        ],
+        'files': [
+            'etc/hosts',
+            'etc/fstab',
+            (
+                'etc/shadow',
+                SHADOW_FILE_MODE,
+                'root:*::0:99999:7:::'
+            )
+        ]
+    },
+    'user1': {
+        'uid': 500,
+        'gid': 500,
+        'dirs': ['home/user1'],
+        'files': [
+            ('home/user1/test_file', 0o644, 'test data')
+        ]
+    },
+    'user2': {
+        'uid': 1000,
+        'gid': 1000,
+        'dirs': [
+            'home/user2',
+            'home/user2/test_dir'
+        ],
+        'files': [
+            'home/user2/test_dir/test_file'
+        ]
+    }
+# pylint: disable=invalid-name,too-many-arguments
+class BuildTarFiles(object):
+    """
+    Create dummy tar files used for testing.
+    """
+    def __init__(self, tar_dir, rootfs_tree=None):
+        """
+        Create dummy tar files
+        """
+        self.tar_dir = tar_dir
+        self.rootfs_tree = rootfs_tree or copy.deepcopy(ROOTFS_TREE)
+    def create_tar_file(self):
+        """
+        Use temporary name to create uncompressed tarball with dummy file
+        system. Get checksum of the content and rename the file to
+        "<checksum>.tar". In this way, we can easily generate manifest for
+        Docker image and provide it to virt-bootstrap.
+        """
+        filepath = tempfile.mkstemp(dir=self.tar_dir)[1]
+        with tarfile.open(filepath, 'w') as tar:
+            self.create_user_dirs(tar)
+        # Get sha256 checksum of the archive
+        with open(filepath, 'rb') as file_handle:
+            file_hash = hashlib.sha256(file_handle.read()).hexdigest()
+        # Rename the archive to <checksum>.tar
+        new_filepath = os.path.join(self.tar_dir, "%s.tar" % file_hash)
+        os.rename(filepath, new_filepath)
+        os.chmod(new_filepath, 0o644)
+        return new_filepath
+    def create_user_dirs(self, tar_handle):
+        """
+        Create root file system tree in tar archive.
+        """
+        tar_members = [
+            ['dirs', tarfile.DIRTYPE],
+            ['files', tarfile.REGTYPE],
+        ]
+        for user in self.rootfs_tree:
+            for members, tar_type in tar_members:
+                self.create_tar_members(
+                    tar_handle,
+                    self.rootfs_tree[user][members],
+                    tar_type,
+                    uid=self.rootfs_tree[user]['uid'],
+                    gid=self.rootfs_tree[user]['gid']
+                )
+    def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0):
+        """
+        Add members to tar file.
+        """
+        for member_name in members:
+            member_data = ''
+            permissions = DEFAULT_FILE_MODE
+            if isinstance(member_name, tuple):
+                if len(member_name) == 3:
+                    member_data = member_name[2]
+                member_name, permissions = member_name[:2]
+            data_encoded = member_data.encode('utf-8')
+            t_info = tarfile.TarInfo(member_name)
+            t_info.type = m_type
+            t_info.mode = permissions
+            t_info.uid = uid
+            t_info.gid = gid
+            t_info.size = len(data_encoded)
+            tar_handle.addfile(t_info, io.BytesIO(data_encoded))
+class ImageAccessor(unittest.TestCase):
+    """
+    The perpose of this class is to gather methods used to verify content
+    of extracted root file system. This class can be exteded for different
+    test cases.
+    """
+    def setUp(self):
+        """
+        Set initial values, create temporary directories and tar archive which
+        contains dummy root file system.
+        """
+        self.dest_dir = tempfile.mkdtemp('_bootstrap_dest')
+        self.tar_dir = tempfile.mkdtemp('_bootstrap_tarfiles')
+        # Set permissions of temporary directories to avoid "Permission denied"
+        # error from Libvirt.
+        os.chmod(self.dest_dir, 0o755)
+        os.chmod(self.tar_dir, 0o755)
+        self.uid_map = []
+        self.gid_map = []
+        self.root_password = None
+        self.checked_members = set()
+        self.rootfs_tree = copy.deepcopy(ROOTFS_TREE)
+        # Create tar archive
+        self.tar_file = BuildTarFiles(self.tar_dir).create_tar_file()
+    def tearDown(self):
+        """
+        Clean up.
+        """
+        shutil.rmtree(self.dest_dir)
+        shutil.rmtree(self.tar_dir)
+    def apply_mapping(self):
+        """
+        This method applies UID/GID mapping to all users defined in
+        self.rootfs_tree.
+        """
+        for user in self.rootfs_tree:
+            user_uid = self.rootfs_tree[user]['uid']
+            user_gid = self.rootfs_tree[user]['gid']
+            if self.uid_map:
+                for start, target, count in self.uid_map:
+                    if user_uid >= start and user_uid <= start + count:
+                        diff = user_uid - start
+                        self.rootfs_tree[user]['uid'] = target + diff
+            if self.gid_map:
+                for start, target, count in self.gid_map:
+                    if user_gid >= start and user_gid <= start + count:
+                        diff = user_gid - start
+                        self.rootfs_tree[user]['gid'] = target + diff
+    def check_rootfs(self, skip_ownership=False):
+        """
+        Check if the root file system was extracted correctly.
+        """
+        existence_check_func = {
+            'files': os.path.isfile,
+            'dirs': os.path.isdir
+        }
+        for user in self.rootfs_tree:
+            for m_type in existence_check_func:
+                self.check_rootfs_members(
+                    user,
+                    m_type,
+                    existence_check_func[m_type],
+                    skip_ownership
+                )
+    def check_rootfs_members(self, user, members, check_existence,
+                             skip_ownership=False):
+        """
+        Verify permissions, ownership and content of files or
+        directories in extracted root file system.
+        @param user: user name defined in self.rootfs_tree.
+        @param members: The string 'dirs' or 'files'.
+        @param check_existence: Function used to check the existence of member.
+        @param skip_ownership: Boolean whether to skip ownership check.
+        """
+        user_uid = self.rootfs_tree[user]['uid']
+        user_gid = self.rootfs_tree[user]['gid']
+        for member_name in self.rootfs_tree[user][members]:
+            member_data = ''
+            # Unpack member if tuple. Allow to be specified permissions and
+            # data for each file.
+            permissions = DEFAULT_FILE_MODE
+            if isinstance(member_name, tuple):
+                if len(member_name) == 3:
+                    member_data = member_name[2]
+                member_name, permissions = member_name[:2]
+            # Skip already checked members. E.g. when multiple layers were
+            # extracted we want to check only the latest version of file.
+            if member_name in self.checked_members:
+                continue
+            else:
+                self.checked_members.add(member_name)
+            #########################
+            # Assertion functions
+            #########################
+            member_path = os.path.join(self.dest_dir, member_name)
+            self.assertTrue(
+                check_existence(member_path),
+                'Member was not extracted: %s' % member_path
+            )
+            stat = os.stat(member_path)
+            self.validate_file_mode(member_path, stat.st_mode, permissions)
+            if not skip_ownership:
+                self.validate_file_ownership(
+                    member_path, stat.st_uid, stat.st_gid, user_uid, user_gid
+                )
+            if member_data:
+                with open(member_path, 'r') as content:
+                    file_content = content.read()
+                self.assertEqual(
+                    member_data, file_content,
+                    'Incorrect file content: %s\n'
+                    'Found: %s\n'
+                    'Expected: %s' % (member_path, file_content, member_data)
+                )
+    def validate_shadow_file(self):
+        """
+        Ensure that the extracted /etc/shadow file has correct ownership,
+        permissions and contains valid hash of the root password.
+        """
+        shadow_path = os.path.join(self.dest_dir, 'etc/shadow')
+        self.assertTrue(
+            os.path.isfile(shadow_path),
+            'Does not exist: %s' % shadow_path
+        )
+        stat = os.stat(shadow_path)
+        self.validate_file_mode(shadow_path, stat.st_mode, SHADOW_FILE_MODE)
+        if not NOT_ROOT:
+            self.validate_file_ownership(
+                shadow_path,
+                stat.st_uid, stat.st_gid,
+                self.rootfs_tree['root']['uid'],
+                self.rootfs_tree['root']['gid']
+            )
+        with open(shadow_path, 'r') as content:
+            shadow_content = content.readlines()
+        if not shadow_content:
+            raise Exception("File is empty: %s" % shadow_path)
+        self.validate_shadow_hash(shadow_content)
+    def validate_shadow_hash(self, shadow_content):
+        """
+        Validate root password hash of shadow file.
+        Note: For simplicity we assume that the first line of /etc/shadow
+        contains the root entry.
+        """
+        self.assertTrue(
+            passlib.hosts.linux_context.verify(
+                self.root_password,
+                shadow_content[0].split(':')[1]
+            ),
+            "Invalid root password hash."
+        )
+    def validate_file_mode(self, member_name, mode, expected):
+        """
+        Verify permissions of rootfs member.
+        """
+        self.assertEqual(
+            mode & 0o777, expected, 'Incorrect permissions: %s' % member_name
+        )
+    def validate_file_ownership(self, member_name, uid, gid,
+                                expected_uid, expected_gid):
+        """
+        Validate UID/GID of rootfs member.
+        """
+        self.assertEqual(
+            uid, expected_uid,
+            "Incorrect UID: %s\n"
+            "Found: %s\n"
+            "Expected: %s" % (member_name, uid, expected_uid)
+        )
+        self.assertEqual(
+            gid, expected_gid,
+            "Incorrect GID: %s\n"
+            "Found: %s\n"
+            "Expected: %s" % (member_name, gid, expected_gid,)
+        )
+class Qcow2ImageAccessor(ImageAccessor):
+    """
+    This class gathers methods for verification of root file system content
+    within extracted qcow2 image.
+    """
+    def validate_shadow_file_in_image(self, g):
+        """
+        Ensures that /etc/shadow file of disk image has correct permission,
+        ownership and contains valid hash of the root password.
+        """
+        self.assertTrue(
+            g.is_file('/etc/shadow'),
+            "Shadow file does not exist"
+        )
+        stat = g.stat('/etc/shadow')
+        self.validate_file_mode('/etc/shadow', stat['mode'], SHADOW_FILE_MODE)
+        self.validate_file_ownership(
+            '/etc/shadow',
+            stat['uid'], stat['gid'],
+            self.rootfs_tree['root']['uid'],
+            self.rootfs_tree['root']['gid']
+        )
+        self.validate_shadow_hash(g.cat('/etc/shadow').split('\n'))
+    def check_image_content(self, g, user, members, check_existence):
+        """
+        Verify the existence, permissions and ownership of members in qcow2
+        image.
+        @param g: guestfs handle
+        @param user: Name of user defined in self.rootfs_tree
+        @param members: The string 'dirs' or 'files'.
+        @param check_existence: Function to confirm existence of member.
+        """
+        permissions = DEFAULT_FILE_MODE
+        user_uid = self.rootfs_tree[user]['uid']
+        user_gid = self.rootfs_tree[user]['gid']
+        for member_name in self.rootfs_tree[user][members]:
+            # Get specified permissions of file.
+            if isinstance(member_name, tuple):
+                member_name, permissions = member_name[:2]
+            # Skip already checked files.
+            if member_name in self.checked_members:
+                continue
+            else:
+                self.checked_members.add(member_name)
+            # When using guestfs all names should start with '/'
+            if not member_name.startswith('/'):
+                member_name = '/' + member_name
+            self.assertTrue(
+                check_existence(member_name),
+                "Member was not found: %s" % member_name
+            )
+            stat = g.stat(member_name)
+            self.validate_file_mode(member_name, stat['mode'], permissions)
+            self.validate_file_ownership(
+                member_name, stat['uid'], stat['gid'], user_uid, user_gid
+            )
+    def check_image(self, g):
+        """
+        Check the presence of files and folders in qcow2 image.
+        """
+        for user in self.rootfs_tree:
+            self.check_image_content(g, user, 'dirs', g.is_dir)
+            self.check_image_content(g, user, 'files', g.is_file)
+    def check_qcow2_image(self, image_path):
+        """
+        Ensures that qcow2 images contain all files.
+        """
+        g = guestfs.GuestFS(python_return_dict=True)
+        g.add_drive_opts(image_path, readonly=True)
+        g.launch()
+        g.mount('/dev/sda', '/')
+        self.check_image(g)
+        g.umount('/')
+        g.shutdown()
+    def get_image_path(self, n=0):
+        """
+        Returns the path of stored qcow2 image.
+        """
+        return os.path.join(self.dest_dir, "layer-%d.qcow2" % n)
+    def check_qcow2_images(self, image_path):
+        """
+        Ensures that qcow2 images contain all files.
+        """
+        g = guestfs.GuestFS(python_return_dict=True)
+        g.add_drive_opts(image_path, readonly=True)
+        g.launch()
+        g.mount('/dev/sda', '/')
+        self.check_image(g)
+        g.umount('/')
+        g.shutdown()
diff --git a/tests/docker_source.py b/tests/docker_source.py
index 60404e6..5f9e6fe 100644
--- a/tests/docker_source.py
+++ b/tests/docker_source.py
@@ -18,15 +18,252 @@
 Tests which aim is to exercise creation of root file system with DockerSource.
+To avoid fetching network resources we mock out the functions:
+- utils.get_image_details(): Returns manifest content
+- utils.get_image_dir(): Returns the directory which contains the tar files
+Brief description of this tests:
+1. Create dummy tar files named <checksum>.tar used as image layers.
+2. Generate manifest content.
+3. Mock out get_image_details() and get_image_dir().
+4. Call bootstrap().
+5. Check the result.
+import copy
+import os
+import subprocess
 import unittest
+import guestfs
 from . import mock
 from . import sources
+from . import virt_bootstrap
+from . import BuildTarFiles
+from . import ImageAccessor
+from . import Qcow2ImageAccessor
+from . import NOT_ROOT
 # pylint: disable=invalid-name
+class CreateLayers(object):
+    """
+    Create tar files to mimic image layers and generate manifest content.
+    """
+    def __init__(self, initial_tar_file, initial_rootfs_tree, dest_dir):
+        """
+        Create dummy tar files used as image layers.
+        The variables:
+        - layers: Store a lists of paths to created archives.
+        - layers_rootfs: Store self.rootfs_tree value used to generate tarball.
+        """
+        self.layers = [initial_tar_file]
+        self.layers_rootfs = [copy.deepcopy(initial_rootfs_tree)]
+        tar_builder = BuildTarFiles(dest_dir)
+        tar_builder.rootfs_tree['root']['dirs'] = []
+        tar_builder.rootfs_tree['root']['files'] = [
+            ('etc/foo/bar', 0o644, "This should be overwritten")
+        ]
+        self.layers.append(tar_builder.create_tar_file())
+        self.layers_rootfs.append(copy.deepcopy(tar_builder.rootfs_tree))
+        tar_builder.rootfs_tree['root']['files'] = [
+            ('etc/foo/bar', 0o644, "Content of etc/foo/bar"),
+            ('bin/foobar', 0o755, "My executable script")
+        ]
+        self.layers.append(tar_builder.create_tar_file())
+        self.layers_rootfs.append(copy.deepcopy(tar_builder.rootfs_tree))
+    def get_layers_rootfs(self):
+        """
+        Return root file systems used to create layers.
+        """
+        return self.layers_rootfs
+    def generate_manifest(self):
+        """
+        Generate Manifest content for layers.
+        """
+        return {
+            "schemaVersion": 2,
+            "layers": [
+                {
+                    "digest":
+                    "sha256:" + os.path.basename(layer).split('.')[0]
+                }
+                for layer in self.layers
+            ]
+        }
+class TestDirDockerSource(ImageAccessor):
+    """
+    Ensures that all layers extracted correctly in destination folder.
+    """
+    def check_result(self, layers_rootfs, dest):
+        """
+        Iterates trough values of layers_rootfs in reverse order (from the last
+        layer to first) and calls check_extracted_files().
+        """
+    def call_bootstrap(self, manifest):
+        """
+        Mock get_image_details() and get_image_dir() and call the function
+        virt_bootstrap.bootstrap() with root_password value.
+        """
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as mocked:
+            mocked['get_image_details'].return_value = manifest
+            mocked['get_image_dir'].return_value = self.tar_dir
+            virt_bootstrap.bootstrap(
+                progress_cb=mock.Mock(),
+                uri='docker://foo',
+                fmt='dir',
+                uid_map=self.uid_map,
+                gid_map=self.gid_map,
+                dest=self.dest_dir,
+                root_password=self.root_password
+            )
+    def test_dir_extract_rootfs(self):
+        """
+        Ensures that all layers were extracted correctly.
+        """
+        layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir)
+        self.call_bootstrap(layers.generate_manifest())
+        layers_rootfs = layers.get_layers_rootfs()
+        for rootfs_tree in layers_rootfs[::-1]:
+            self.rootfs_tree = rootfs_tree
+            self.check_rootfs(skip_ownership=(os.geteuid != 0))
+    @unittest.skipIf(NOT_ROOT, "Root privileges required")
+    def test_dir_ownership_mapping(self):
+        """
+        Ensures that the UID/GID mapping was applied correctly to extracted
+        root file system of all layers.
+        """
+        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir)
+        self.call_bootstrap(layers.generate_manifest())
+        layers_rootfs = layers.get_layers_rootfs()
+        for rootfs_tree in layers_rootfs[::-1]:
+            self.rootfs_tree = rootfs_tree
+            self.apply_mapping()
+            self.check_rootfs()
+    def test_dir_setting_root_password(self):
+        """
+        Ensures that the root password is set correctly.
+        """
+        layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir)
+        self.root_password = "My secret root password"
+        self.call_bootstrap(layers.generate_manifest())
+        self.validate_shadow_file()
+class TestQcow2DockerSource(Qcow2ImageAccessor):
+    """
+    Ensures that the conversion of tar files to qcow2 image with backing chains
+    works as expected.
+    """
+    def get_image_info(self, image_path):
+        """
+        Wrapper around "qemu-img info" used to information about disk image.
+        """
+        cmd = ['qemu-img', 'info', image_path]
+        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+        output, _ignore = proc.communicate()
+        return output.decode('utf-8').split('\n')
+    def call_bootstrap(self):
+        """
+        Generate tar files which mimic container layers and manifest content.
+        Mock get_image_details() and get_image_dir() and call the function
+        virt_bootstrap.bootstrap() for qcow2 format.
+        Return the root file systems used to generate the tar archives.
+        """
+        layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir)
+        manifest = layers.generate_manifest()
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as mocked:
+            mocked['get_image_details'].return_value = manifest
+            mocked['get_image_dir'].return_value = self.tar_dir
+            virt_bootstrap.bootstrap(
+                progress_cb=mock.Mock(),
+                uri='docker://foobar',
+                dest=self.dest_dir,
+                fmt='qcow2',
+                uid_map=self.uid_map,
+                gid_map=self.gid_map,
+                root_password=self.root_password
+            )
+        return layers.get_layers_rootfs()
+    def test_qcow2_build_image(self):
+        """
+        Ensures that the root file system is copied correctly to single
+        partition qcow2 image and layers are converted correctly to qcow2
+        images.
+        """
+        layers_rootfs = self.call_bootstrap()
+        ###################
+        # Check base layer
+        ###################
+        base_layer_path = self.get_image_path()
+        img_format = self.get_image_info(base_layer_path)[1]
+        self.assertEqual(img_format, 'file format: qcow2')
+        images = [base_layer_path]
+        ###########################
+        # Check backing chains
+        ###########################
+        for i in range(1, len(layers_rootfs)):
+            img_path = self.get_image_path(i)
+            # img_info contains the output of "qemu-img info"
+            img_info = self.get_image_info(img_path)
+            self.assertEqual(
+                img_info[1],
+                'file format: qcow2',
+                'Invalid qcow2 disk image: %s' % img_path
+            )
+            backing_file = self.get_image_path(i - 1)
+            self.assertEqual(
+                img_info[5],
+                'backing file: %s' % backing_file,
+                "Incorrect backing file for: %s\n"
+                "Expected: %s\n"
+                "Found: %s" % (img_info, backing_file, img_info[5])
+            )
+            images.append(img_path)
+        ###############################
+        # Check extracted files/folders
+        ###############################
+        g = guestfs.GuestFS(python_return_dict=True)
+        for path in images:
+            g.add_drive_opts(path, readonly=True)
+        g.launch()
+        devices = g.list_filesystems()
+        for dev, rootfs in zip(sorted(devices), layers_rootfs):
+            self.rootfs_tree = rootfs
+            g.mount(dev, '/')
+            self.check_image(g)
+            g.umount('/')
+        g.shutdown()
 class TestDockerSource(unittest.TestCase):
     Unit tests for DockerSource
@@ -36,9 +273,9 @@ class TestDockerSource(unittest.TestCase):
     def _mock_retrieve_layers_info(self, manifest, kwargs):
-        This method is gather common test pattern used in the following
-        two test cases which aim to return an instance of the class
-        DockerSource with some util functions being mocked.
+        This method is gather common test pattern used in the following cases
+        which aim is to return an instance of the class DockerSource with
+        get_image_details() and get_image_dir() being mocked.
         with mock.patch.multiple('virtBootstrap.utils',
diff --git a/tests/file_source.py b/tests/file_source.py
new file mode 100644
index 0000000..79bb234
--- /dev/null
+++ b/tests/file_source.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx>
+# Copyright (C) 2017 Radostin Stoyanov
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+Regression tests which aim to exercise the creation of root file system
+with FileSource.
+import unittest
+from . import mock
+from . import virt_bootstrap
+from . import ImageAccessor
+from . import NOT_ROOT
+# pylint: disable=invalid-name
+class TestDirFileSource(ImageAccessor):
+    """
+    Ensures that files from rootfs tarball are extracted correctly.
+    """
+    def call_bootstrap(self):
+        """
+        Execute the bootstrap() method of virt_bootstrap.
+        """
+        virt_bootstrap.bootstrap(
+            uri=self.tar_file,
+            dest=self.dest_dir,
+            fmt='dir',
+            progress_cb=mock.Mock(),
+            uid_map=self.uid_map,
+            gid_map=self.gid_map,
+            root_password=self.root_password
+        )
+    def test_dir_extract_rootfs(self):
+        """
+        Extract rootfs from each dummy tarfile.
+        """
+        self.call_bootstrap()
+        self.check_rootfs(skip_ownership=NOT_ROOT)
+    @unittest.skipIf(NOT_ROOT, "Root privileges required")
+    def test_dir_ownership_mapping(self):
+        """
+        Ensures that UID/GID mapping for extracted root file system are applied
+        correctly.
+        """
+        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        self.call_bootstrap()
+        self.apply_mapping()
+        self.check_rootfs()
+    def test_dir_setting_root_password(self):
+        """
+        Ensures that the root password is set correctly when FileSource is used
+        with fmt='dir'.
+        """
+        self.root_password = 'my secret root password'
+        self.call_bootstrap()
+        self.validate_shadow_file()

