Unit tests were used to ensure that functions and methods work as expected. However, these tests are closely related to the implementation and will result in major changes after some refactoring. To reduce the amount of work needed to add new features or changes to the code most of these tests will be replaced with more abstract form of testing introduced in the following commits. --- tests/__init__.py | 44 ++-- tests/docker_source.py | 150 +++++++++++ tests/test_docker_source.py | 607 ------------------------------------------- tests/test_file_source.py | 171 ------------ tests/test_progress.py | 112 -------- tests/test_utils.py | 580 +---------------------------------------- tests/test_virt_bootstrap.py | 464 --------------------------------- 7 files changed, 180 insertions(+), 1948 deletions(-) create mode 100644 tests/docker_source.py delete mode 100644 tests/test_docker_source.py delete mode 100644 tests/test_file_source.py delete mode 100644 tests/test_progress.py delete mode 100644 tests/test_virt_bootstrap.py diff --git a/tests/__init__.py b/tests/__init__.py index e82c6d5..1b06616 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,22 +1,23 @@ -""" - Test suite for virt-bootstrap - - Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> - - Copyright (C) 2017 Radostin Stoyanov +# -*- coding: utf-8 -*- +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> +# +# Copyright (C) 2017 Radostin Stoyanov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test suite for virt-bootstrap """ import sys @@ -27,13 +28,12 @@ try: except ImportError: import unittest.mock as mock -sys.path += '../src' # noqa: E402 +sys.path.insert(0, '../src') # noqa: E402 -# pylint: disable=import-error +# pylint: disable=import-error, wrong-import-position from virtBootstrap import virt_bootstrap from virtBootstrap import sources from virtBootstrap import progress from virtBootstrap import utils -__all__ = ['unittest', 'mock', - 'virt_bootstrap', 'sources', 'progress', 'utils'] +__all__ = ['virt_bootstrap', 'sources', 'progress', 'utils'] diff --git a/tests/docker_source.py b/tests/docker_source.py new file mode 100644 index 0000000..60404e6 --- /dev/null +++ b/tests/docker_source.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> +# +# Copyright (C) 2017 Radostin Stoyanov +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Tests which aim is to exercise creation of root file system with DockerSource. +""" + +import unittest + +from . import mock +from . import sources + + +# pylint: disable=invalid-name +class TestDockerSource(unittest.TestCase): + """ + Unit tests for DockerSource + """ + ################################### + # Tests for: retrieve_layers_info() + ################################### + def _mock_retrieve_layers_info(self, manifest, kwargs): + """ + This method is gather common test pattern used in the following + two test cases which aim to return an instance of the class + DockerSource with some util functions being mocked. + """ + with mock.patch.multiple('virtBootstrap.utils', + get_image_details=mock.DEFAULT, + get_image_dir=mock.DEFAULT) as m_utils: + + m_utils['get_image_details'].return_value = manifest + m_utils['get_image_dir'].return_value = '/images_path' + + patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri' + with mock.patch(patch_method) as m_uri: + src_instance = sources.DockerSource(**kwargs) + return (src_instance, m_uri, m_utils) + + def test_retrieve_layers_info_pass_arguments_to_get_image_details(self): + """ + Ensures that retrieve_layers_info() calls get_image_details() + with all passed arguments. + """ + src_kwargs = { + 'uri': '', + 'progress': mock.Mock() + } + + manifest = {'schemaVersion': 2, 'layers': []} + (src_instance, + m_uri, m_utils) = self._mock_retrieve_layers_info(manifest, + src_kwargs) + + kwargs = { + 'insecure': src_instance.insecure, + 'username': src_instance.username, + 'password': src_instance.password, + 'raw': True + } + m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs) + + def test_retrieve_layers_info_schema_version_1(self): + """ + Ensures that retrieve_layers_info() extracts the layers' information + from manifest with schema version 1 a list with format: + ["digest", "sum_type", "file_path", "size"]. + """ + kwargs = { + 'uri': '', + 'progress': mock.Mock() + } + + manifest = { + 'schemaVersion': 1, + 'fsLayers': [ + {'blobSum': 'sha256:75c416ea'}, + {'blobSum': 'sha256:c6ff40b6'}, + {'blobSum': 'sha256:a7050fc1'} + ] + } + + expected_result = [ + ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None], + ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None], + ['sha256', '75c416ea', '/images_path/75c416ea.tar', None] + ] + + with mock.patch('os.path.getsize') as m_getsize: + m_getsize.return_value = None + src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] + self.assertEqual(src_instance.layers, expected_result) + + def test_retrieve_layers_info_schema_version_2(self): + """ + Ensures that retrieve_layers_info() extracts the layers' information + from manifest with schema version 2 a list with format: + ["digest", "sum_type", "file_path", "size"]. + """ + kwargs = { + 'uri': '', + 'progress': mock.Mock() + } + + manifest = { + 'schemaVersion': 2, + "layers": [ + {"size": 47103294, "digest": "sha256:75c416ea"}, + {"size": 814, "digest": "sha256:c6ff40b6"}, + {"size": 513, "digest": "sha256:a7050fc1"} + ] + } + + expected_result = [ + ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294], + ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814], + ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513] + ] + + src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] + self.assertEqual(src_instance.layers, expected_result) + + def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self): + """ + Ensures that retrieve_layers_info() calls get_image_details() + with all passed arguments. + """ + kwargs = { + 'uri': '', + 'progress': mock.Mock() + } + + manifest = {'schemaVersion': 3} + with self.assertRaises(ValueError): + self._mock_retrieve_layers_info(manifest, kwargs) diff --git a/tests/test_docker_source.py b/tests/test_docker_source.py deleted file mode 100644 index 4859e1b..0000000 --- a/tests/test_docker_source.py +++ /dev/null @@ -1,607 +0,0 @@ -# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> -# -# Copyright (C) 2017 Radostin Stoyanov -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Unit tests for methods defined in virtBootstrap.sources.DockerSource -""" - -from tests import unittest -from tests import mock -from tests import sources - -try: - from urlparse import urlparse -except ImportError: - from urllib.parse import urlparse - - -# pylint: disable=invalid-name -# pylint: disable=too-many-public-methods -class TestDockerSource(unittest.TestCase): - """ - Test cases for DockerSource - """ - def _mock_docker_source(self): - """ - This method returns an instance of Mock object - that acts as the specification for the DockerSource. - """ - m_self = mock.Mock(spec=sources.DockerSource) - m_self.progress = mock.Mock() - m_self.no_cache = False - m_self.url = "docker://test" - m_self.images_dir = "/images_path" - m_self.insecure = True - m_self.username = 'user' - m_self.password = 'password' - m_self.layers = [ - ['sha256', '75c416ea', '/images_path/75c416ea.tar', ''], - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', ''] - ] - return m_self - - ################################### - # Tests for: __init__() - ################################### - def test_argument_assignment(self): - """ - Ensures that __init__() assigns the arguments' values to instance - variables. - """ - kwargs = {'uri': '', - 'fmt': 'dir', - 'not_secure': False, - 'no_cache': False, - 'progress': mock.Mock(), - 'username': 'username', - 'password': 'password'} - - with mock.patch('virtBootstrap.utils' - '.get_image_dir') as m_get_image_dir: - with mock.patch.multiple('virtBootstrap.sources.DockerSource', - retrieve_layers_info=mock.DEFAULT, - gen_valid_uri=mock.DEFAULT) as mocked: - src_instance = sources.DockerSource(**kwargs) - - test_values = { - src_instance.url: mocked['gen_valid_uri'].return_value, - src_instance.progress: kwargs['progress'].update_progress, - src_instance.username: kwargs['username'], - src_instance.password: kwargs['password'], - src_instance.output_format: kwargs['fmt'], - src_instance.no_cache: kwargs['no_cache'], - src_instance.insecure: kwargs['not_secure'], - src_instance.images_dir: m_get_image_dir() - } - for value in test_values: - self.assertIs(value, test_values[value]) - - def test_source_password_is_required_if_username_specifed(self): - """ - Ensures that __init__() calls getpass() to request password - when username is specified and password is not. - """ - test_password = 'secret' - - kwargs = {arg: '' for arg - in ['uri', 'fmt', 'not_secure', 'password', 'no_cache']} - kwargs['progress'] = mock.Mock() - kwargs['username'] = 'test' - - with mock.patch('virtBootstrap.utils.get_image_dir'): - with mock.patch('getpass.getpass') as m_getpass: - m_getpass.return_value = test_password - with mock.patch.multiple('virtBootstrap.sources.DockerSource', - retrieve_layers_info=mock.DEFAULT, - gen_valid_uri=mock.DEFAULT): - src_instance = sources.DockerSource(**kwargs) - - m_getpass.assert_called_once() - self.assertIs(test_password, src_instance.password) - - ################################### - # Tests for: retrieve_layers_info() - ################################### - def _mock_retrieve_layers_info(self, manifest, kwargs): - """ - This method is gather common test pattern used in the following - two test cases. - """ - with mock.patch.multiple('virtBootstrap.utils', - get_image_details=mock.DEFAULT, - get_image_dir=mock.DEFAULT) as m_utils: - - m_utils['get_image_details'].return_value = manifest - m_utils['get_image_dir'].return_value = '/images_path' - - patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri' - with mock.patch(patch_method) as m_uri: - src_instance = sources.DockerSource(**kwargs) - return (src_instance, m_uri, m_utils) - - def test_retrieve_layers_info_pass_arguments_to_get_image_details(self): - """ - Ensures that retrieve_layers_info() calls get_image_details() - with all passed arguments. - """ - src_kwargs = { - 'uri': '', - 'progress': mock.Mock() - } - - manifest = {'schemaVersion': 2, 'layers': []} - (src_instance, - m_uri, m_utils) = self._mock_retrieve_layers_info(manifest, - src_kwargs) - - kwargs = { - 'insecure': src_instance.insecure, - 'username': src_instance.username, - 'password': src_instance.password, - 'raw': True - } - m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs) - - def test_retrieve_layers_info_schema_version_1(self): - """ - Ensures that retrieve_layers_info() extracts the layers' information - from manifest with schema version 1 a list with format: - ["digest", "sum_type", "file_path", "size"]. - """ - kwargs = { - 'uri': '', - 'progress': mock.Mock() - } - - manifest = { - 'schemaVersion': 1, - 'fsLayers': [ - {'blobSum': 'sha256:75c416ea'}, - {'blobSum': 'sha256:c6ff40b6'}, - {'blobSum': 'sha256:a7050fc1'} - ] - } - - expected_result = [ - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None], - ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None], - ['sha256', '75c416ea', '/images_path/75c416ea.tar', None] - ] - - src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] - self.assertEqual(src_instance.layers, expected_result) - - def test_retrieve_layers_info_schema_version_2(self): - """ - Ensures that retrieve_layers_info() extracts the layers' information - from manifest with schema version 2 a list with format: - ["digest", "sum_type", "file_path", "size"]. - """ - kwargs = { - 'uri': '', - 'progress': mock.Mock() - } - - manifest = { - 'schemaVersion': 2, - "layers": [ - {"size": 47103294, "digest": "sha256:75c416ea"}, - {"size": 814, "digest": "sha256:c6ff40b6"}, - {"size": 513, "digest": "sha256:a7050fc1"} - ] - } - - expected_result = [ - ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294], - ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814], - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513] - ] - - src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] - self.assertEqual(src_instance.layers, expected_result) - - def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self): - """ - Ensures that retrieve_layers_info() calls get_image_details() - with all passed arguments. - """ - kwargs = { - 'uri': '', - 'progress': mock.Mock() - } - - manifest = {'schemaVersion': 3} - with self.assertRaises(ValueError): - self._mock_retrieve_layers_info(manifest, kwargs) - - ################################### - # Tests for: gen_valid_uri() - ################################### - def test_gen_valid_uri(self): - """ - Validates the output of gen_valid_uri() for some test cases. - """ - m_self = self._mock_docker_source() - test_values = { - 'docker:///repo': 'docker://repo', - 'docker:/repo': 'docker://repo', - 'docker://repo/': 'docker://repo', - 'docker://repo/image/': 'docker://repo/image', - 'docker:///repo/image/': 'docker://repo/image', - } - for uri in test_values: - uri_obj = urlparse(uri) - result = sources.DockerSource.gen_valid_uri(m_self, uri_obj) - expected = test_values[uri] - self.assertEqual(result, expected) - - ################################### - # Tests for: download_image() - ################################### - def test_download_image(self): - """ - Ensures that download_image() calls read_skopeo_progress() with - expected skopeo copy command and removes tha leftover manifest file. - """ - m_self = self._mock_docker_source() - m_self.read_skopeo_progress = mock.Mock() - manifest_path = "%s/manifest.json" % m_self.images_dir - with mock.patch('os.remove') as m_remove: - sources.DockerSource.download_image(m_self) - - expected_call = ["skopeo", "copy", m_self.url, - "dir:" + m_self.images_dir, - '--src-tls-verify=false', - '--src-creds={}:{}'.format(m_self.username, - m_self.password)] - m_self.read_skopeo_progress.assert_called_once_with(expected_call) - m_remove.assert_called_once_with(manifest_path) - - ################################### - # Tests for: parse_output() - ################################### - def test_parse_output_return_false_on_fail(self): - """ - Ensures that parse_output() returns False when process call - exits with non-zero code. - """ - m_self = mock.Mock(spec=sources.DockerSource) - m_self.layers = [] - m_proc = mock.Mock() - m_proc.returncode = 1 - self.assertFalse(sources.DockerSource.parse_output(m_self, m_proc)) - - def test_parse_output(self): - """ - Ensures that parse_output() recognises processing of different - layers from the skopeo's output. - """ - m_self = self._mock_docker_source() - m_proc = mock.Mock() - m_proc.poll.return_value = None - m_proc.returncode = 0 - test_values = '\n'.join([ - 'Skipping fetch of repeat blob sha256:c6ff40', - 'Copying blob sha256:75c416ea735c4', - '40.00 MB / 44.92 MB [======================>------]', - 'Copying config sha256:d355ed35', - '40.00 MB / 44.92 MB [======================>------]' - ]) - - expected_progress_calls = [ - mock.call("Downloading layer (1/2)"), - mock.call("Downloading layer (2/2)"), - ] - - with mock.patch('select.select') as m_select: - m_select.return_value = [[test_values], [], []] - with mock.patch('virtBootstrap.utils.read_async') as m_read_async: - m_read_async.return_value = test_values - self.assertTrue(sources.DockerSource.parse_output(m_self, - m_proc)) - m_select.assert_called_once_with([m_proc.stdout], [], []) - m_read_async.assert_called_once_with(test_values) - m_self.progress.assert_has_calls(expected_progress_calls) - m_self.update_progress_from_output.assert_called_once() - m_proc.wait.assert_called_once() - - ################################### - # Tests for: update_progress_from_output() - ################################### - def _mock_update_progress_from_output(self, test_values): - """ - This method is gather common test pattern used in the following - two test cases. - """ - m_self = self._mock_docker_source() - test_method = sources.DockerSource.update_progress_from_output - for line in test_values: - test_method(m_self, line.split(), 1, len(test_values)) - - return m_self.progress.call_args_list - - def test_update_progress_from_output(self): - """ - Ensures that update_progress_from_output() recognises the current - downloaded size, the total layer's size and calculates correct - percentage value. - """ - test_values = [ - '500.00 KB / 4.00 MB [======>------]', - '25.00 MB / 24.10 MB [======>------]', - '40.00 MB / 50.00 MB [======>------]', - ] - expected_values = [2, 17.33, 13.33] - - calls = self._mock_update_progress_from_output(test_values) - for call, expected in zip(calls, expected_values): - self.assertAlmostEqual(call[1]['value'], expected, places=1) - - def test_update_progress_from_output_ignore_failures(self): - """ - Ensures that update_progress_from_output() ignores invalid lines - from skopeo's output. - """ - test_values = [ - 'a ', - '1 ' * 5, - '500.00 MB / 0.00 MB [======>------]', - '00.00 MB / 00.00 MB [======>------]', - ] - self._mock_update_progress_from_output(test_values) - - ################################### - # Tests for: read_skopeo_progress() - ################################### - def _mock_read_skopeo_progress(self, test_cmd, parse_output_return): - """ - This method is gather common test pattern used in the following - two test cases. - """ - m_self = mock.Mock(spec=sources.DockerSource) - m_self.parse_output.return_value = parse_output_return - with mock.patch.multiple('virtBootstrap.sources.' - 'docker_source.subprocess', - Popen=mock.DEFAULT, - PIPE=mock.DEFAULT) as mocked: - with mock.patch('virtBootstrap.utils.make_async') as m_make_async: - sources.DockerSource.read_skopeo_progress(m_self, test_cmd) - - return (mocked, m_make_async) - - def test_read_skopeo_progress(self): - """ - Ensures that read_skopeo_progress() calls make_async() with - the stdout pipe of skopeo's process. - """ - test_cmd = 'test' - mocked, m_make_async = self._mock_read_skopeo_progress(test_cmd, True) - - mocked['Popen'].assert_called_once_with(test_cmd, - stdout=mocked['PIPE'], - stderr=mocked['PIPE'], - universal_newlines=True) - m_make_async.assert_called_once_with(mocked['Popen']().stdout) - - def test_read_skopeo_progress_raise_error(self): - """ - Ensures that read_skopeo_progress() raise CalledProcessError - when parse_output() returns false. - """ - with self.assertRaises(sources.docker_source - .subprocess.CalledProcessError): - self._mock_read_skopeo_progress('test', False) - - ################################### - # Tests for: validate_image_layers() - ################################### - def _mock_validate_image_layers(self, - checksum_return, - path_exists_return, - expected_result, - check_calls=False): - """ - This method is gather common test pattern used in the following - three test cases. - """ - m_self = self._mock_docker_source() - - with mock.patch('os.path.exists') as m_path_exists: - with mock.patch('virtBootstrap.utils.checksum') as m_checksum: - m_checksum.return_value = checksum_return - m_path_exists.return_value = path_exists_return - result = sources.DockerSource.validate_image_layers(m_self) - self.assertEqual(result, expected_result) - - if check_calls: - path_exists_expected_calls = [] - checksum_expected_calls = [] - # Generate expected calls - for sum_type, hash_sum, path, _ignore in m_self.layers: - path_exists_expected_calls.append(mock.call(path)) - checksum_expected_calls.append( - mock.call(path, sum_type, hash_sum)) - - m_path_exists.assert_has_calls(path_exists_expected_calls) - m_checksum.assert_has_calls(checksum_expected_calls) - - def test_validate_image_layers_should_return_true(self): - """ - Ensures that validate_image_layers() returns True when: - - checksum() returns True for all layers - - the file path of all layers exist - - all layers are validated - """ - self._mock_validate_image_layers(True, True, True, True) - - def test_validate_image_layers_return_false_if_path_not_exist(self): - """ - Ensures that validate_image_layers() returns False when - checksum() returns False. - """ - self._mock_validate_image_layers(False, True, False) - - def test_validate_image_layers_return_false_if_checksum_fail(self): - """ - Ensures that validate_image_layers() returns False when - the file path of layer does not exist. - """ - self._mock_validate_image_layers(True, False, False) - - ################################### - # Tests for: fetch_layers() - ################################### - def _mock_fetch_layers(self, validate_return): - """ - This method is gather common test pattern used in the following - two test cases. - """ - m_self = mock.Mock(spec=sources.DockerSource) - m_self.validate_image_layers.return_value = validate_return - sources.DockerSource.fetch_layers(m_self) - return m_self - - def test_fetch_layers_should_call_download_image(self): - """ - Ensures that fetch_layers() calls download_image() - when validate_image_layers() returns False. - """ - m_self = self._mock_fetch_layers(False) - m_self.download_image.assert_called_once() - - def test_fetch_layers_should_not_call_download_image(self): - """ - Ensures that fetch_layers() does not call download_image() - when validate_image_layers() returns True. - """ - m_self = self._mock_fetch_layers(True) - m_self.download_image.assert_not_called() - - ################################### - # Tests for: unpack() - ################################### - def _unpack_test_fmt(self, output_format, patch_method=None, - side_effect=None, m_self=None): - """ - This method is gather common test pattern used in the following - two test cases. - """ - m_self = m_self if m_self else self._mock_docker_source() - m_self.output_format = output_format - dest = 'foo' - - if patch_method: - with mock.patch(patch_method) as mocked: - if side_effect: - mocked.side_effect = side_effect - sources.DockerSource.unpack(m_self, dest) - - mocked.assert_called_once_with(m_self.layers, dest, - m_self.progress) - else: - sources.DockerSource.unpack(m_self, dest) - - m_self.fetch_layers.assert_called_once() - - def test_unpack_dir_format(self): - """ - Ensures that unpack() calls untar_layers() when the output format - is set to 'dir'. - """ - self._unpack_test_fmt('dir', 'virtBootstrap.utils.untar_layers') - - def test_unpack_qcow2_format(self): - """ - Ensures that unpack() calls extract_layers_in_qcow2() when the - output format is set to 'qcow2'. - """ - self._unpack_test_fmt('qcow2', - 'virtBootstrap.utils.extract_layers_in_qcow2') - - def unpack_raise_error_test(self, - output_format, - patch_method, - side_effect=None, - msg=None): - """ - This method is gather common test pattern used in the following - four test cases. - """ - with self.assertRaises(Exception) as err: - self._unpack_test_fmt(output_format, patch_method, - side_effect) - if msg: - self.assertEqual(msg, str(err.exception)) - - def test_unpack_raise_error_for_unknown_format(self): - """ - Ensures that unpack() throws an Exception when called with - invalid output format. - """ - msg = 'Unknown format:foo' - self.unpack_raise_error_test('foo', None, None, msg) - - def test_unpack_raise_error_if_untar_fail(self): - """ - Ensures that unpack() throws an Exception when untar_layers() - fails. - """ - msg = 'Caught untar failure' - side_effect = Exception(msg) - patch_method = 'virtBootstrap.utils.untar_layers' - self.unpack_raise_error_test('dir', patch_method, side_effect, msg) - - def test_unpack_raise_error_if_extract_in_qcow2_fail(self): - """ - Ensures that unpack() throws an Exception when - extract_layers_in_qcow2() fails. - """ - msg = 'Caught extract_layers_in_qcow2 failure' - side_effect = Exception(msg) - patch_method = 'virtBootstrap.utils.extract_layers_in_qcow2' - self.unpack_raise_error_test('qcow2', patch_method, side_effect, msg) - - def test_unpack_no_cache_clean_up(self): - """ - Ensures that unpack() removes the folder which stores tar archives - of image layers when no_cache is set to True. - """ - output_formats = ['dir', 'qcow2'] - patch_methods = [ - 'virtBootstrap.utils.untar_layers', - 'virtBootstrap.utils.extract_layers_in_qcow2' - ] - for fmt, patch_mthd in zip(output_formats, patch_methods): - m_self = self._mock_docker_source() - m_self.no_cache = True - with mock.patch('shutil.rmtree') as m_shutil: - self._unpack_test_fmt(fmt, patch_mthd, m_self=m_self) - m_shutil.assert_called_once_with(m_self.images_dir) - - def test_unpack_no_cache_clean_up_on_failure(self): - """ - Ensures that unpack() removes the folder which stores tar archives - of image layers when no_cache is set to True and exception was - raised. - """ - m_self = self._mock_docker_source() - m_self.no_cache = True - with self.assertRaises(Exception): - with mock.patch('shutil.rmtree') as m_rmtree: - self._unpack_test_fmt('foo', None, m_self=m_self) - m_rmtree.assert_called_once_with(m_self.images_dir) diff --git a/tests/test_file_source.py b/tests/test_file_source.py deleted file mode 100644 index 6e89aa2..0000000 --- a/tests/test_file_source.py +++ /dev/null @@ -1,171 +0,0 @@ -# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> -# -# Copyright (C) 2017 Radostin Stoyanov -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Unit tests for methods defined in virtBootstrap.sources.FileSource -""" - -from tests import unittest -from tests import mock -from tests import sources - - -# pylint: disable=invalid-name -class TestFileSource(unittest.TestCase): - """ - Test cases for FileSource - """ - - ################################### - # Tests for: __init__() - ################################### - def test_argument_assignment(self): - """ - Ensures that __init__() assigns the arguments' values to instance - variables. - """ - kwargs = {'uri': mock.Mock(), - 'fmt': 'dir', - 'progress': mock.Mock()} - - src_instance = sources.FileSource(**kwargs) - - test_values = { - src_instance.path: kwargs['uri'].path, - src_instance.output_format: kwargs['fmt'], - src_instance.progress: kwargs['progress'].update_progress - } - for value in test_values: - self.assertIs(value, test_values[value]) - - ################################### - # Tests for: unpack() - ################################### - def test_unpack_invalid_source_raise_exception(self): - """ - Ensures that unpack() throws an Exception when called with - invalid file source. - """ - m_self = mock.Mock(spec=sources.FileSource) - m_self.path = 'foo' - with mock.patch('os.path.isfile') as m_isfile: - m_isfile.return_value = False - with self.assertRaises(Exception) as err: - sources.FileSource.unpack(m_self, 'bar') - self.assertIn('Invalid file source', str(err.exception)) - - def test_unpack_to_dir(self): - """ - Ensures that unpack() calls safe_untar() when the output format - is set to 'dir'. - """ - m_self = mock.Mock(spec=sources.FileSource) - m_self.progress = mock.Mock() - m_self.path = 'foo' - m_self.output_format = 'dir' - dest = 'bar' - - with mock.patch('os.path.isfile') as m_isfile: - m_isfile.return_value = True - with mock.patch('virtBootstrap.utils.safe_untar') as m_untar: - sources.FileSource.unpack(m_self, dest) - - m_untar.assert_called_once_with(m_self.path, dest) - - def test_unpack_to_qcow2(self): - """ - Ensures that unpack() calls create_qcow2() when the output - format is set to 'qcow2'. - """ - m_self = mock.Mock(spec=sources.FileSource) - m_self.progress = mock.Mock() - m_self.path = 'foo' - m_self.output_format = 'qcow2' - dest = 'bar' - qcow2_file_path = 'foobar' - - with mock.patch.multiple('os.path', - isfile=mock.DEFAULT, - realpath=mock.DEFAULT) as mocked: - - mocked['isfile'].return_value = True - mocked['realpath'].return_value = qcow2_file_path - with mock.patch('virtBootstrap.utils.create_qcow2') as m_qcow2: - sources.FileSource.unpack(m_self, dest) - - m_qcow2.assert_called_once_with(m_self.path, qcow2_file_path) - - def _unpack_raise_error_test(self, - output_format, - side_effect=None, - patch_method=None, - msg=None): - """ - This method is gather common test pattern used in the following - three test cases. - """ - m_self = mock.Mock(spec=sources.FileSource) - m_self.progress = mock.Mock() - m_self.path = 'foo' - m_self.output_format = output_format - dest = 'bar' - - with mock.patch.multiple('os.path', - isfile=mock.DEFAULT, - realpath=mock.DEFAULT) as m_path: - m_path['isfile'].return_value = True - with self.assertRaises(Exception) as err: - if patch_method: - with mock.patch(patch_method) as mocked_method: - mocked_method.side_effect = side_effect - sources.FileSource.unpack(m_self, dest) - else: - sources.FileSource.unpack(m_self, dest) - if msg: - self.assertEqual(msg, str(err.exception)) - - def test_unpack_invalid_format_raise_exception(self): - """ - Ensures that unpack() throws an Exception when called with - invalid output format. - """ - self._unpack_raise_error_test('foo', msg='Unknown format:foo') - - def test_unpack_raise_error_if_untar_fail(self): - """ - Ensures that unpack() throws an Exception when safe_untar() - fails. - """ - msg = 'Caught untar failure' - patch_method = 'virtBootstrap.utils.safe_untar' - self._unpack_raise_error_test(output_format='dir', - side_effect=Exception(msg), - patch_method=patch_method, - msg=msg) - - def test_unpack_raise_error_if_extract_in_qcow2_fail(self): - """ - Ensures that unpack() throws an Exception when create_qcow2() - fails. - """ - msg = 'Caught extract_layers_in_qcow2 failure' - patch_method = 'virtBootstrap.utils.create_qcow2' - self._unpack_raise_error_test(output_format='qcow2', - side_effect=Exception(msg), - patch_method=patch_method, - msg=msg) diff --git a/tests/test_progress.py b/tests/test_progress.py deleted file mode 100644 index 1f609d5..0000000 --- a/tests/test_progress.py +++ /dev/null @@ -1,112 +0,0 @@ -# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> -# -# Copyright (C) 2017 Radostin Stoyanov -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Unit tests for methods defined in virtBootstrap.progress -""" - -from tests import unittest -from tests import mock -from tests import progress - - -# pylint: disable=invalid-name -class TestFileSource(unittest.TestCase): - """ - Test cases for Progress module - """ - - ################################### - # Tests for: __init__() - ################################### - def test_progress_init(self): - """ - Ensures that __init__() assigns the collback value to instance - variable and creates dictionary with 'status', 'value' keys. - """ - callback = mock.Mock() - test_instance = progress.Progress(callback) - for key in ['status', 'value']: - self.assertIn(key, test_instance.progress) - self.assertIs(callback, test_instance.callback) - - ################################### - # Tests for: get_progress() - ################################### - def test_get_progress(self): - """ - Ensures that get_progress() returns copy of the progress dictionary - which has the same keys and values. - """ - test_instance = progress.Progress() - test_result = test_instance.get_progress() - self.assertIsNot(test_instance.progress, test_result) - self.assertDictEqual(test_instance.progress, test_result) - - ################################### - # Tests for: update_progress() - ################################### - def test_update_progress_creates_log_record(self): - """ - Ensures that update_progress() creates log record with info level - and pass the status value as message. - """ - test_instance = progress.Progress() - logger = mock.Mock() - status = "Test" - test_instance.update_progress(status=status, logger=logger) - logger.info.assert_called_once_with(status) - - def test_update_progress_update_status_and_value(self): - """ - Ensures that update_progress() creates log record with info level - and pass the status value as message. - """ - test_instance = progress.Progress() - test_instance.progress = {'status': '', 'value': 0} - new_status = 'Test' - new_value = 100 - new_progress = {'status': new_status, 'value': new_value} - test_instance.update_progress(status=new_status, value=new_value) - self.assertDictEqual(test_instance.progress, new_progress) - - def test_update_progress_update_raise_logger_error(self): - """ - Ensures that update_progress() raise ValueError when creating - log record has failed. - """ - msg = 'test' - test_instance = progress.Progress() - logger = mock.Mock() - logger.info.side_effect = Exception(msg) - with self.assertRaises(ValueError) as err: - test_instance.update_progress(logger=logger) - self.assertIn(msg, str(err.exception)) - - def test_update_progress_update_raise_callback_error(self): - """ - Ensures that update_progress() raise ValueError when calling - callback failed. - """ - msg = 'test' - callback = mock.Mock() - callback.side_effect = Exception(msg) - test_instance = progress.Progress(callback) - with self.assertRaises(ValueError) as err: - test_instance.update_progress('foo', 'bar') - self.assertIn(msg, str(err.exception)) diff --git a/tests/test_utils.py b/tests/test_utils.py index 0b6ccc0..c2f55b5 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> # # Copyright (C) 2017 Radostin Stoyanov @@ -19,126 +20,16 @@ """ Unit tests for functions defined in virtBootstrap.utils """ - -from tests import unittest -from tests import mock -from tests import utils -try: - # pylint: disable=redefined-builtin - from importlib import reload -except ImportError: - pass +import unittest +from . import utils # pylint: disable=invalid-name -# pylint: disable=too-many-public-methods class TestUtils(unittest.TestCase): """ Ensures that functions defined in the utils module of virtBootstrap work as expected. """ - - ################################### - # Tests for: checksum() - ################################### - def test_utils_checksum_return_false_on_invalid_hash(self): - """ - Ensures that checksum() returns False if the actual and expected - hash sum of file are not equal. - """ - with mock.patch.multiple(utils, - open=mock.DEFAULT, - logger=mock.DEFAULT, - hashlib=mock.DEFAULT) as mocked: - path, sum_type, sum_expected = '/foo', 'sha256', 'bar' - mocked['hashlib'].sha256.hexdigest.return_value = False - self.assertFalse(utils.checksum(path, sum_type, sum_expected)) - - def test_utils_checksum_return_false_if_file_could_not_be_opened(self): - """ - Ensures that checksum() returns False if the file to be checked - cannot be open for read. - """ - with mock.patch.multiple(utils, - open=mock.DEFAULT, - logger=mock.DEFAULT, - hashlib=mock.DEFAULT) as mocked: - mocked['open'].side_effect = IOError() - self.assertFalse(utils.checksum('foo', 'sha256', 'bar')) - - def test_utils_checksum_return_true_on_valid_hash(self): - """ - Ensures that checksum() returns True when the actual and expected - hash sum of file are equal. - """ - with mock.patch.multiple(utils, - open=mock.DEFAULT, - logger=mock.DEFAULT, - hashlib=mock.DEFAULT) as mocked: - path, sum_type, sum_expected = '/foo', 'sha256', 'bar' - mocked['hashlib'].sha256.return_value.hexdigest.return_value \ - = sum_expected - self.assertTrue(utils.checksum(path, sum_type, sum_expected)) - - ################################### - # Tests for: execute() - ################################### - def test_utils_execute_logging_on_successful_proc_call(self): - """ - Ensures that execute() creates log record of cmd, stdout and stderr - when the exit code of process is 0. - """ - with mock.patch.multiple(utils, - logger=mock.DEFAULT, - subprocess=mock.DEFAULT) as mocked: - cmd = ['foo'] - output, err = 'test_out', 'test_err' - - mocked['subprocess'].Popen.return_value.returncode = 0 - (mocked['subprocess'].Popen.return_value - .communicate.return_value) = (output.encode(), err.encode()) - - utils.execute(cmd) - mocked['logger'].debug.assert_any_call("Call command:\n%s", cmd[0]) - mocked['logger'].debug.assert_any_call("Stdout:\n%s", output) - mocked['logger'].debug.assert_any_call("Stderr:\n%s", err) - - def test_utils_execute_raise_error_on_unsuccessful_proc_call(self): - """ - Ensures that execute() raise CalledProcessError exception when the - exit code of process is not 0. - """ - with mock.patch('virtBootstrap.utils.subprocess.Popen') as m_popen: - m_popen.return_value.returncode = 1 - m_popen.return_value.communicate.return_value = (b'output', b'err') - with self.assertRaises(utils.subprocess.CalledProcessError): - utils.execute(['foo']) - - ################################### - # Tests for: safe_untar() - ################################### - def test_utils_safe_untar_calls_execute(self): - """ - Ensures that safe_untar() calls execute with virt-sandbox - command to extract source files to destination folder. - Test for users with EUID 0 and 1000. - """ - with mock.patch('virtBootstrap.utils.os.geteuid') as m_geteuid: - for uid in [0, 1000]: - m_geteuid.return_value = uid - reload(utils) - with mock.patch('virtBootstrap.utils.execute') as m_execute: - src, dest = 'foo', 'bar' - utils.safe_untar('foo', 'bar') - cmd = ['virt-sandbox', - '-c', utils.LIBVIRT_CONN, - '-m', 'host-bind:/mnt=' + dest, - '--', - '/bin/tar', 'xf', src, - '-C', '/mnt', - '--exclude', 'dev/*'] - m_execute.assert_called_once_with(cmd) - ################################### # Tests for: bytes_to_size() ################################### @@ -172,241 +63,6 @@ class TestUtils(unittest.TestCase): i += 1 ################################### - # Tests for: log_layer_extract() - ################################### - def test_utils_log_layer_extract(self): - """ - Ensures that log_layer_extract() updates the progress and creates - log record with debug level. - """ - m_progress = mock.Mock() - layer = ['sum_type', 'sum_value', 'layer_file', 'layer_size'] - with mock.patch.multiple(utils, logger=mock.DEFAULT, - bytes_to_size=mock.DEFAULT) as mocked: - utils.log_layer_extract(layer, 'foo', 'bar', m_progress) - mocked['bytes_to_size'].assert_called_once_with('layer_size') - mocked['logger'].debug.assert_called_once() - m_progress.assert_called_once() - - ################################### - # Tests for: get_mime_type() - ################################### - @mock.patch('virtBootstrap.utils.subprocess.Popen') - def test_utils_get_mime_type(self, m_popen): - """ - Ensures that get_mime_type() returns the detected MIME type - of /usr/bin/file. - """ - path = "foo" - mime = "application/x-gzip" - stdout = ('%s: %s' % (path, mime)).encode() - m_popen.return_value.stdout.read.return_value = stdout - self.assertEqual(utils.get_mime_type(path), mime) - m_popen.assert_called_once_with( - ["/usr/bin/file", "--mime-type", path], - stdout=utils.subprocess.PIPE - ) - - ################################### - # Tests for: untar_layers() - ################################### - def test_utils_untar_all_layers_in_order(self): - """ - Ensures that untar_layers() iterates through all passed layers - in order. - """ - layers = ['l1', 'l2', 'l3'] - layers_list = [['', '', layer] for layer in layers] - dest_dir = '/foo' - expected_calls = [mock.call(layer, dest_dir) for layer in layers] - with mock.patch.multiple(utils, - safe_untar=mock.DEFAULT, - log_layer_extract=mock.DEFAULT) as mocked: - utils.untar_layers(layers_list, dest_dir, mock.Mock()) - mocked['safe_untar'].assert_has_calls(expected_calls) - - ################################### - # Tests for: create_qcow2() - ################################### - def _apply_test_to_create_qcow2(self, expected_calls, *args): - """ - This method contains common test pattern used in the next two - test cases. - """ - with mock.patch.multiple(utils, - execute=mock.DEFAULT, - logger=mock.DEFAULT, - get_mime_type=mock.DEFAULT) as mocked: - mocked['get_mime_type'].return_value = 'application/x-gzip' - utils.create_qcow2(*args) - mocked['execute'].assert_has_calls(expected_calls) - - def test_utils_create_qcow2_base_layer(self): - """ - Ensures that create_qcow2() creates base layer when - backing_file = None. - """ - tar_file = 'foo' - layer_file = 'bar' - size = '5G' - backing_file = None - - expected_calls = [ - mock.call(["qemu-img", "create", "-f", "qcow2", layer_file, size]), - - mock.call(['virt-format', - '--format=qcow2', - '--partition=none', - '--filesystem=ext3', - '-a', layer_file]), - - mock.call(['guestfish', - '-a', layer_file, - '-m', '/dev/sda', - 'tar-in', tar_file, '/', 'compress:gzip']) - ] - - self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file, - backing_file, size) - - def test_utils_create_qcow2_layer_with_backing_chain(self): - """ - Ensures that create_qcow2() creates new layer with backing chains - when backing_file is specified. - """ - tar_file = 'foo' - layer_file = 'bar' - backing_file = 'base' - size = '5G' - - expected_calls = [ - mock.call(['qemu-img', 'create', - '-b', backing_file, - '-f', 'qcow2', - layer_file, size]), - - mock.call(['guestfish', - '-a', layer_file, - '-m', '/dev/sda', - 'tar-in', tar_file, '/', 'compress:gzip']) - ] - - self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file, - backing_file, size) - - ################################### - # Tests for: extract_layers_in_qcow2() - ################################### - def test_utils_if_all_layers_extracted_in_order_in_qcow2(self): - """ - Ensures that extract_layers_in_qcow2() iterates through all - layers in order. - """ - layers = ['l1', 'l2', 'l3'] - layers_list = [['', '', layer] for layer in layers] - dest_dir = '/foo' - - # Generate expected calls - expected_calls = [] - qcow2_backing_file = None - for index, layer in enumerate(layers): - qcow2_layer_file = dest_dir + "/layer-%s.qcow2" % index - expected_calls.append( - mock.call(layer, qcow2_layer_file, qcow2_backing_file)) - qcow2_backing_file = qcow2_layer_file - - # Mocking out and execute - with mock.patch.multiple(utils, - create_qcow2=mock.DEFAULT, - log_layer_extract=mock.DEFAULT) as mocked: - utils.extract_layers_in_qcow2(layers_list, dest_dir, mock.Mock()) - - # Check actual calls - mocked['create_qcow2'].assert_has_calls(expected_calls) - - ################################### - # Tests for: get_image_dir() - ################################### - def test_utils_getimage_dir(self): - """ - Ensures that get_image_dir() returns path to DEFAULT_IMG_DIR - if the no_cache argument is set to False and create it if - does not exist. - """ - # Perform this test for UID 0 and 1000 - for uid in [0, 1000]: - with mock.patch('os.geteuid') as m_geteuid: - m_geteuid.return_value = uid - reload(utils) - with mock.patch('os.makedirs') as m_makedirs: - with mock.patch('os.path.exists') as m_path_exists: - m_path_exists.return_value = False - self.assertEqual(utils.get_image_dir(False), - utils.DEFAULT_IMG_DIR) - m_makedirs.assert_called_once_with(utils.DEFAULT_IMG_DIR) - - @mock.patch('tempfile.mkdtemp') - def test_utils_getimage_dir_no_cache(self, m_mkdtemp): - """ - Ensures that get_image_dir() returns temporary file path created - by tempfile.mkdtemp. - """ - m_mkdtemp.return_value = 'foo' - self.assertEqual(utils.get_image_dir(True), 'foo') - m_mkdtemp.assert_called_once() - - ################################### - # Tests for: get_image_details() - ################################### - @mock.patch('virtBootstrap.utils.subprocess.Popen') - def test_utils_get_image_details_raise_error_on_fail(self, m_popen): - """ - Ensures that get_image_details() throws ValueError exception - when stderr from skopeo is provided. - """ - src = 'docker://foo' - m_popen.return_value.communicate.return_value = [b'', b'Error'] - with self.assertRaises(ValueError): - utils.get_image_details(src) - - @mock.patch('virtBootstrap.utils.subprocess.Popen') - def test_utils_get_image_details_return_json_obj_on_success(self, m_popen): - """ - Ensures that get_image_details() returns python dictionary which - represents the data provided from stdout of skopeo when stderr - is not present. - """ - src = 'docker://foo' - json_dict = {'foo': 'bar'} - stdout = utils.json.dumps(json_dict).encode() - m_popen.return_value.communicate.return_value = [stdout, ''] - self.assertDictEqual(utils.get_image_details(src), json_dict) - - def test_utils_get_image_details_all_argument_passed(self): - """ - Ensures that get_image_details() pass all argument values to - skopeo inspect. - """ - src = 'docker://foo' - raw, insecure = True, True - username, password = 'user', 'password' - cmd = ['skopeo', 'inspect', src, - '--raw', - '--tls-verify=false', - "--creds=%s:%s" % (username, password)] - - with mock.patch.multiple(utils.subprocess, - Popen=mock.DEFAULT, - PIPE=mock.DEFAULT) as mocked: - mocked['Popen'].return_value.communicate.return_value = [b'{}', - b''] - utils.get_image_details(src, raw, insecure, username, password) - - mocked['Popen'].assert_called_once_with(cmd, - stdout=mocked['PIPE'], - stderr=mocked['PIPE']) - - ################################### # Tests for: is_new_layer_message() ################################### def test_utils_is_new_layer_message(self): @@ -459,10 +115,11 @@ class TestUtils(unittest.TestCase): Ensures that make_async() sets O_NONBLOCK flag on PIPE. """ - pipe = utils.subprocess.Popen( + proc = utils.subprocess.Popen( ["echo"], stdout=utils.subprocess.PIPE - ).stdout + ) + pipe = proc.stdout fd = pipe.fileno() F_GETFL = utils.fcntl.F_GETFL @@ -471,36 +128,8 @@ class TestUtils(unittest.TestCase): self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK) utils.make_async(fd) self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK) - - ################################### - # Tests for: read_async() - ################################### - def test_utils_read_async_successful_read(self): - """ - Ensures that read_async() calls read() of passed file descriptor. - """ - m_fd = mock.MagicMock() - utils.read_async(m_fd) - m_fd.read.assert_called_once() - - def test_utils_read_async_return_empty_str_on_EAGAIN_error(self): - """ - Ensures that read_async() ignores EAGAIN errors and returns - empty string. - """ - m_fd = mock.MagicMock() - m_fd.read.side_effect = IOError(utils.errno.EAGAIN, '') - self.assertEqual(utils.read_async(m_fd), '') - - def test_utils_read_async_raise_errors(self): - """ - Ensures that read_async() does not ignore IOError which is different - than EAGAIN and throws an exception. - """ - m_fd = mock.MagicMock() - m_fd.read.side_effect = IOError() - with self.assertRaises(IOError): - utils.read_async(m_fd) + proc.wait() + pipe.close() ################################### # Tests for: str2float() @@ -512,196 +141,3 @@ class TestUtils(unittest.TestCase): test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25} for test in test_values: self.assertEqual(utils.str2float(test), test_values[test]) - - ################################### - # Tests for: set_root_password_in_rootfs() - ################################### - def test_utils_set_root_password_in_rootfs_restore_permissions(self): - """ - Ensures that set_root_password_in_rootfs() restore shadow - file permissions after edit. - """ - permissions = 700 - rootfs_path = '/foo' - shadow_file = '%s/etc/shadow' % rootfs_path - - m_open = mock.mock_open(read_data='') - with mock.patch('virtBootstrap.utils.open', m_open, create=True): - with mock.patch('virtBootstrap.utils.os') as m_os: - m_os.stat.return_value = [permissions] - m_os.path.join.return_value = shadow_file - utils.set_root_password_in_rootfs(rootfs_path, 'password') - - expected_calls = [ - mock.call.path.join(rootfs_path, 'etc/shadow'), - mock.call.stat(shadow_file), - mock.call.chmod(shadow_file, 438), - mock.call.chmod(shadow_file, permissions) - ] - m_os.assert_has_calls(expected_calls) - - def test_utils_set_root_password_in_rootfs_restore_permissions_fail(self): - """ - Ensures that set_root_password_in_rootfs() restore shadow file - permissions in case of failure. - """ - permissions = 700 - rootfs_path = '/foo' - shadow_file = '%s/etc/shadow' % rootfs_path - - m_open = mock.mock_open(read_data='') - with mock.patch('virtBootstrap.utils.open', m_open, create=True): - with mock.patch('virtBootstrap.utils.os') as m_os: - m_os.stat.return_value = [permissions] - m_os.path.join.return_value = shadow_file - - with self.assertRaises(Exception): - m_open.side_effect = Exception - utils.set_root_password_in_rootfs(rootfs_path, 'password') - - expected_calls = [ - mock.call.path.join(rootfs_path, 'etc/shadow'), - mock.call.stat(shadow_file), - mock.call.chmod(shadow_file, 438), - mock.call.chmod(shadow_file, permissions) - ] - m_os.assert_has_calls(expected_calls) - - def test_utils_set_root_password_in_rootfs_store_hash(self): - """ - Ensures that set_root_password_in_rootfs() stores the hashed - root password in shadow file. - """ - rootfs_path = '/foo' - password = 'secret' - initial_value = '!locked' - hashed_password = 'hashed_password' - shadow_content = '\n'.join([ - "root:%s::0:99999:7:::", - "bin:*:17004:0:99999:7:::" - "daemon:*:17004:0:99999:7:::", - "adm:*:17004:0:99999:7:::" - ]) - - m_open = mock.mock_open(read_data=shadow_content % initial_value) - with mock.patch('virtBootstrap.utils.open', m_open, create=True): - with mock.patch('virtBootstrap.utils.os'): - with mock.patch('passlib.hosts.linux_context.hash') as m_hash: - m_hash.return_value = hashed_password - utils.set_root_password_in_rootfs(rootfs_path, password) - - m_hash.assert_called_once_with(password) - m_open().write.assert_called_once_with(shadow_content - % hashed_password) - - ################################### - # Tests for: set_root_password_in_image() - ################################### - @mock.patch('virtBootstrap.utils.execute') - def test_utils_set_root_password_in_image(self, m_execute): - """ - Ensures that set_root_password_in_image() calls virt-edit - with correct arguments. - """ - image, password = 'foo', 'password' - password_hash = ('$6$rounds=656000$PaQ/H4c/k8Ix9YOM$' - 'cyD47r9PtAE2LhnkpdbVzsiQbM0/h2S/1Bv' - 'u/sXqUtCg.3Ijp7TQy/8tEVstxMy5k5v4mh' - 'CGFqnVv7S6wd.Ah/') - - expected_call = [ - 'virt-edit', - '-a', image, '/etc/shadow', - '-e', 's,^root:.*?:,root:%s:,' % utils.re.escape(password_hash)] - - hash_function = 'virtBootstrap.utils.passlib.hosts.linux_context.hash' - with mock.patch(hash_function) as m_hash: - m_hash.return_value = password_hash - utils.set_root_password_in_image(image, password) - - m_execute.assert_called_once_with(expected_call) - - ################################### - # Tests for: set_root_password() - ################################### - @mock.patch('virtBootstrap.utils.set_root_password_in_rootfs') - def test_utils_set_root_password_dir(self, m_set_root_password_in_rootfs): - """ - Ensures that set_root_password() calls set_root_password_in_rootfs() - when the format is set to "dir". - """ - fmt, dest, root_password = 'dir', 'dest', 'root_password' - utils.set_root_password(fmt, dest, root_password) - - m_set_root_password_in_rootfs.assert_called_once_with( - dest, root_password - ) - - @mock.patch('virtBootstrap.utils.set_root_password_in_image') - def test_utils_set_root_password_qcow2(self, m_set_root_password_in_image): - """ - Ensures that set_root_password() calls set_root_password_in_image() - when the format is set to "qcow2" with the path to the last - extracted layer. - """ - fmt, dest, root_password = 'qcow2', 'dest', 'root_password' - layers = ['layer-0.qcow2', 'layer-1.qcow2'] - - with mock.patch('os.listdir') as m_listdir: - m_listdir.return_value = layers - utils.set_root_password(fmt, dest, root_password) - - m_set_root_password_in_image.assert_called_once_with( - utils.os.path.join(dest, max(layers)), - root_password - ) - - ################################### - # Tests for: write_progress() - ################################### - def test_utils_write_progress_fill_terminal_width(self): - """ - Ensures that write_progress() outputs a message with length - equal to terminal width and last symbol '\r'. - """ - terminal_width = 120 - prog = {'status': 'status', 'value': 0} - with mock.patch.multiple(utils, - subprocess=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - - (mocked['subprocess'].Popen.return_value.stdout - .read.return_value) = ("20 %s" % terminal_width).encode() - - utils.write_progress(prog) - - mocked['subprocess'].Popen.assert_called_once_with( - ["stty", "size"], - stdout=mocked['subprocess'].PIPE - ) - output_message = mocked['sys'].stdout.write.call_args[0][0] - mocked['sys'].stdout.write.assert_called_once() - self.assertEqual(len(output_message), terminal_width + 1) - self.assertEqual(output_message[-1], '\r') - - def test_utils_write_progress_use_default_term_width_on_failure(self): - """ - Ensures that write_progress() outputs a message with length equal - to default terminal width (80) when the detecting terminal width - has failed. - """ - default_terminal_width = 80 - prog = {'status': 'status', 'value': 0} - with mock.patch.multiple(utils, - subprocess=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - mocked['subprocess'].Popen.side_effect = Exception() - utils.write_progress(prog) - - self.assertEqual(len(mocked['sys'].stdout.write.call_args[0][0]), - default_terminal_width + 1) - mocked['sys'].stdout.write.assert_called_once() - - -if __name__ == '__main__': - unittest.main(exit=False) diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py deleted file mode 100644 index ff744f7..0000000 --- a/tests/test_virt_bootstrap.py +++ /dev/null @@ -1,464 +0,0 @@ -# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> -# -# Copyright (C) 2017 Radostin Stoyanov -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Unit tests for functions defined in virtBootstrap.virt-bootstrap -""" - -from tests import unittest -from tests import mock -from tests import virt_bootstrap -from tests import sources - - -# pylint: disable=invalid-name -class TestVirtBootstrap(unittest.TestCase): - """ - Test cases for virt_bootstrap module - """ - - ################################### - # Tests for: get_source(source_type) - ################################### - def test_get_invaid_source_type_should_fail(self): - """ - Ensures that get_source() throws an Exception when invalid source - name was specified. - """ - with self.assertRaises(Exception) as source: - virt_bootstrap.get_source('invalid') - self.assertIn('invalid', str(source.exception)) - - def test_get_docker_source(self): - """ - Ensures that get_source() returns DockerSource when source name - "docker" is requested. - """ - self.assertIs(virt_bootstrap.get_source('docker'), - sources.DockerSource) - - def test_get_file_source(self): - """ - Ensures that get_source() returns FileSource when source name - "file" is requested. - """ - self.assertIs(virt_bootstrap.get_source('file'), - sources.FileSource) - - ################################### - # Tests for: mapping_uid_gid() - ################################### - def test_mapping_uid_gid(self): - """ - Ensures that mapping_uid_gid() calls map_id() with - correct parameters. - """ - dest = '/path' - calls = [ - { # Call 1 - 'dest': dest, - 'uid': [[0, 1000, 10]], - 'gid': [[0, 1000, 10]] - }, - { # Call 2 - 'dest': dest, - 'uid': [], - 'gid': [[0, 1000, 10]] - }, - { # Call 3 - 'dest': dest, - 'uid': [[0, 1000, 10]], - 'gid': [] - }, - { # Call 4 - 'dest': dest, - 'uid': [[0, 1000, 10], [500, 500, 10]], - 'gid': [[0, 1000, 10]] - } - ] - - expected_calls = [ - # Expected from call 1 - mock.call(dest, [0, 1000, 10], [0, 1000, 10]), - # Expected from call 2 - mock.call(dest, None, [0, 1000, 10]), - # Expected from call 3 - mock.call(dest, [0, 1000, 10], None), - # Expected from call 4 - mock.call(dest, [0, 1000, 10], [0, 1000, 10]), - mock.call(dest, [500, 500, 10], None) - - ] - with mock.patch('virtBootstrap.virt_bootstrap.map_id') as m_map_id: - for args in calls: - virt_bootstrap.mapping_uid_gid(args['dest'], - args['uid'], - args['gid']) - - m_map_id.assert_has_calls(expected_calls) - - ################################### - # Tests for: map_id() - ################################### - @mock.patch('os.path.realpath') - def test_map_id(self, m_realpath): - """ - Ensures that the UID/GID mapping applies to all files - and directories in root file system. - """ - root_path = '/root' - files = ['foo1', 'foo2'] - m_realpath.return_value = root_path - - map_uid = [0, 1000, 10] - map_gid = [0, 1000, 10] - new_id = 'new_id' - - expected_calls = [ - mock.call('/root', new_id, new_id), - mock.call('/root/foo1', new_id, new_id), - mock.call('/root/foo2', new_id, new_id) - ] - - with mock.patch.multiple('os', - lchown=mock.DEFAULT, - lstat=mock.DEFAULT, - walk=mock.DEFAULT) as mocked: - - mocked['walk'].return_value = [(root_path, [], files)] - mocked['lstat']().st_uid = map_uid[0] - mocked['lstat']().st_gid = map_gid[0] - - get_map_id = 'virtBootstrap.virt_bootstrap.get_map_id' - with mock.patch(get_map_id) as m_get_map_id: - m_get_map_id.return_value = new_id - virt_bootstrap.map_id(root_path, map_uid, map_gid) - - mocked['lchown'].assert_has_calls(expected_calls) - - ################################### - # Tests for: get_mapping_opts() - ################################### - def test_get_mapping_opts(self): - """ - Ensures that get_mapping_opts() returns correct options for - mapping value. - """ - test_values = [ - { - 'mapping': [0, 1000, 10], - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, - }, - { - 'mapping': [0, 1000, 10], - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, - }, - { - 'mapping': [500, 1500, 1], - 'expected_result': {'first': 500, 'last': 501, 'offset': 1000}, - }, - { - 'mapping': [-1, -1, -1], - 'expected_result': {'first': 0, 'last': 1, 'offset': 0}, - } - ] - - for test in test_values: - res = virt_bootstrap.get_mapping_opts(test['mapping']) - self.assertEqual(test['expected_result'], res) - - ################################### - # Tests for: get_map_id() - ################################### - def test_get_map_id(self): - """ - Ensures that get_map_id() returns correct UID/GID mapping value. - """ - test_values = [ - { - 'old_id': 0, - 'mapping': [0, 1000, 10], - 'expected_result': 1000 - }, - { - 'old_id': 5, - 'mapping': [0, 500, 10], - 'expected_result': 505 - }, - { - 'old_id': 10, - 'mapping': [0, 100, 10], - 'expected_result': -1 - }, - ] - for test in test_values: - opts = virt_bootstrap.get_mapping_opts(test['mapping']) - res = virt_bootstrap.get_map_id(test['old_id'], opts) - self.assertEqual(test['expected_result'], res) - - ################################### - # Tests for: parse_idmap() - ################################### - def test_parse_idmap(self): - """ - Ensures that parse_idmap() returns correct UID/GID mapping value. - """ - test_values = [ - { - 'mapping': ['0:1000:10', '0:100:10'], - 'expected_result': [[0, 1000, 10], [0, 100, 10]], - }, - { - 'mapping': ['0:1000:10'], - 'expected_result': [[0, 1000, 10]], - }, - { - 'mapping': ['500:1500:1'], - 'expected_result': [[500, 1500, 1]], - }, - { - 'mapping': ['-1:-1:-1'], - 'expected_result': [[-1, -1, -1]], - }, - { - 'mapping': [], - 'expected_result': None, - } - ] - for test in test_values: - res = virt_bootstrap.parse_idmap(test['mapping']) - self.assertEqual(test['expected_result'], res) - - def test_parse_idmap_raise_exception_on_invalid_mapping_value(self): - """ - Ensures that parse_idmap() raise ValueError on mapping value. - """ - with self.assertRaises(ValueError): - virt_bootstrap.parse_idmap(['invalid']) - - ################################### - # Tests for: bootstrap() - ################################### - def test_bootsrap_creates_directory_if_does_not_exist(self): - """ - Ensures that bootstrap() creates destination directory if - it does not exists. - """ - src, dest = 'foo', 'bar' - with mock.patch.multiple(virt_bootstrap, - get_source=mock.DEFAULT, - os=mock.DEFAULT) as mocked: - mocked['os'].path.exists.return_value = False - virt_bootstrap.bootstrap(src, dest) - mocked['os'].path.exists.assert_called_once_with(dest) - mocked['os'].makedirs.assert_called_once_with(dest) - - def test_bootstrap_exit_if_dest_is_invalid(self): - """ - Ensures that bootstrap() exits with code 1 when the destination - path exists but it is not directory. - """ - src, dest = 'foo', 'bar' - with mock.patch.multiple(virt_bootstrap, - get_source=mock.DEFAULT, - os=mock.DEFAULT, - logger=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - mocked['os'].path.exists.return_value = True - mocked['os'].path.isdir.return_value = False - virt_bootstrap.bootstrap(src, dest) - mocked['os'].path.isdir.assert_called_once_with(dest) - mocked['sys'].exit.assert_called_once_with(1) - - def test_bootsrap_exit_if_no_write_access_on_dest(self): - """ - Ensures that bootstrap() exits with code 1 when the current user - has not write permissions on the destination folder. - """ - src, dest = 'foo', 'bar' - with mock.patch.multiple(virt_bootstrap, - get_source=mock.DEFAULT, - os=mock.DEFAULT, - logger=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - mocked['os'].path.exists.return_value = True - mocked['os'].path.isdir.return_value = True - mocked['os'].access.return_value = False - virt_bootstrap.bootstrap(src, dest) - mocked['os'].access.assert_called_once_with(dest, - mocked['os'].W_OK) - mocked['sys'].exit.assert_called_once_with(1) - - def test_bootstrap_use_file_source_if_none_was_specified(self): - """ - Ensures that bootstrap() calls get_source() with argument - 'file' when source format is not specified. - """ - src, dest = 'foo', 'bar' - with mock.patch.multiple(virt_bootstrap, - get_source=mock.DEFAULT, - os=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - virt_bootstrap.bootstrap(src, dest) - mocked['get_source'].assert_called_once_with('file') - - def test_bootstrap_successful_call(self): - """ - Ensures that bootstrap() creates source instance and calls the - unpack method with destination path as argument. - """ - src, dest = 'foo', 'bar' - with mock.patch.multiple(virt_bootstrap, - get_source=mock.DEFAULT, - os=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - mocked['os'].path.exists.return_value = True - mocked['os'].path.isdir.return_value = True - mocked['os'].access.return_value = True - mocked_source = mock.Mock() - mocked_unpack = mock.Mock() - mocked_source.return_value.unpack = mocked_unpack - mocked['get_source'].return_value = mocked_source - virt_bootstrap.bootstrap(src, dest) - # sys.exit should not be called - mocked['sys'].exit.assert_not_called() - mocked_source.assert_called_once() - mocked_unpack.assert_called_once_with(dest) - - def test_bootstrap_all_params_are_passed_to_source_instance(self): - """ - Ensures that bootstrap() is passing all arguments to newly created - source instance. - """ - params_list = ['dest', 'fmt', 'username', 'password', 'root_password', - 'not_secure', 'no_cache', 'progress_cb'] - params = {param: param for param in params_list} - - for kw_param in params_list: - params[kw_param] = kw_param - - with mock.patch.multiple(virt_bootstrap, - get_source=mock.DEFAULT, - os=mock.DEFAULT, - urlparse=mock.DEFAULT, - progress=mock.DEFAULT, - utils=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - mocked['os'].path.exists.return_value = True - mocked['os'].path.isdir.return_value = True - mocked['os'].access.return_value = True - - mocked['progress'].Progress.return_value = params['progress_cb'] - - mocked_source = mock.Mock() - mocked_unpack = mock.Mock() - mocked_source.return_value.unpack = mocked_unpack - mocked['get_source'].return_value = mocked_source - - mocked_uri = mock.Mock() - mocked['urlparse'].return_value = mocked_uri - params['uri'] = mocked_uri - - virt_bootstrap.bootstrap(**params) - # sys.exit should not be called - mocked['sys'].exit.assert_not_called() - - mocked_source.assert_called_once() - mocked_unpack.assert_called_once_with(params['dest']) - - called_with_args, called_with_kwargs = mocked_source.call_args - self.assertEqual(called_with_args, ()) - - del params['dest'] - del params['root_password'] - params['progress'] = params.pop('progress_cb') - for kwarg in params: - self.assertEqual(called_with_kwargs[kwarg], params[kwarg]) - - def test_if_bootstrap_calls_set_root_password(self): - """ - Ensures that bootstrap() calls set_root_password() when the argument - root_password is specified. - """ - src, fmt, dest, root_password = 'foo', 'fmt', 'bar', 'root_password' - with mock.patch.multiple(virt_bootstrap, - get_source=mock.DEFAULT, - os=mock.DEFAULT, - utils=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - mocked['os'].path.exists.return_value = True - mocked['os'].path.isdir.return_value = True - mocked['os'].access.return_value = True - - virt_bootstrap.bootstrap(src, dest, - fmt=fmt, - root_password=root_password) - - mocked['utils'].set_root_password.assert_called_once_with( - fmt, dest, root_password) - - def test_if_bootstrap_calls_set_mapping_uid_gid(self): - """ - Ensures that bootstrap() calls mapping_uid_gid() when the argument - uid_map or gid_map is specified. - """ - src, dest, uid_map, gid_map = 'foo', 'bar', 'id', 'id' - expected_calls = [ - mock.call('bar', None, 'id'), - mock.call('bar', 'id', None), - mock.call('bar', 'id', 'id') - ] - - with mock.patch.multiple(virt_bootstrap, - get_source=mock.DEFAULT, - os=mock.DEFAULT, - mapping_uid_gid=mock.DEFAULT, - utils=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: - mocked['os'].path.exists.return_value = True - mocked['os'].path.isdir.return_value = True - mocked['os'].access.return_value = True - - virt_bootstrap.bootstrap(src, dest, gid_map=gid_map) - virt_bootstrap.bootstrap(src, dest, uid_map=uid_map) - virt_bootstrap.bootstrap(src, dest, - uid_map=uid_map, gid_map=gid_map) - mocked['mapping_uid_gid'].assert_has_calls(expected_calls) - - ################################### - # Tests for: set_logging_conf() - ################################### - def test_if_logging_level_format_handler_are_set(self): - """ - Ensures that set_logging_conf() sets log level and adds new stream - handler with formatting. - """ - with mock.patch('virtBootstrap.virt_bootstrap.logging') as m_logging: - mocked_stream_hdlr = mock.Mock() - m_logger = mock.Mock() - m_logging.getLogger.return_value = m_logger - m_logging.StreamHandler.return_value = mocked_stream_hdlr - virt_bootstrap.set_logging_conf() - m_logging.getLogger.assert_called_once_with('virtBootstrap') - mocked_stream_hdlr.setFormatter.assert_called_once() - m_logger.addHandler.assert_called_once_with(mocked_stream_hdlr) - m_logger.setLevel.assert_called_once() - - -if __name__ == '__main__': - unittest.main(exit=False) -- 2.13.5 _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list