On Sat, 2017-08-26 at 21:41 +0100, Radostin Stoyanov wrote: > Unit tests were used to ensure that functions and methods work as > expected. However, these tests are closely related to the > implementation and will result in major changes after some refactoring. > To reduce the amount of work needed to add new features or changes to > the code most of these tests will be replaced with more abstract form > of testing introduced in the following commits. > --- > tests/__init__.py | 44 ++-- > tests/docker_source.py | 150 +++++++++++ > tests/test_docker_source.py | 607 ------------------------------------------- > tests/test_file_source.py | 171 ------------ > tests/test_progress.py | 112 -------- > tests/test_utils.py | 580 +---------------------------------------- > tests/test_virt_bootstrap.py | 464 --------------------------------- > 7 files changed, 180 insertions(+), 1948 deletions(-) > create mode 100644 tests/docker_source.py > delete mode 100644 tests/test_docker_source.py > delete mode 100644 tests/test_file_source.py > delete mode 100644 tests/test_progress.py > delete mode 100644 tests/test_virt_bootstrap.py > > diff --git a/tests/__init__.py b/tests/__init__.py > index e82c6d5..1b06616 100644 > --- a/tests/__init__.py > +++ b/tests/__init__.py > @@ -1,22 +1,23 @@ > -""" > - Test suite for virt-bootstrap > - > - Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > - > - Copyright (C) 2017 Radostin Stoyanov > +# -*- coding: utf-8 -*- > +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > +# > +# Copyright (C) 2017 Radostin Stoyanov > +# > +# This program is free software: you can redistribute it and/or modify > +# it under the terms of the GNU General Public License as published by > +# the Free Software Foundation, either version 3 of the License, or > +# (at your option) any later version. > + > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > + > +# You should have received a copy of the GNU General Public License > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > > - This program is free software: you can redistribute it and/or modify > - it under the terms of the GNU General Public License as published by > - the Free Software Foundation, either version 3 of the License, or > - (at your option) any later version. > - > - This program is distributed in the hope that it will be useful, > - but WITHOUT ANY WARRANTY; without even the implied warranty of > - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > - GNU General Public License for more details. > - > - You should have received a copy of the GNU General Public License > - along with this program. If not, see <http://www.gnu.org/licenses/>. > +""" > +Test suite for virt-bootstrap > """ > > import sys > @@ -27,13 +28,12 @@ try: > except ImportError: > import unittest.mock as mock > > -sys.path += '../src' # noqa: E402 > +sys.path.insert(0, '../src') # noqa: E402 > > -# pylint: disable=import-error > +# pylint: disable=import-error, wrong-import-position > from virtBootstrap import virt_bootstrap > from virtBootstrap import sources > from virtBootstrap import progress > from virtBootstrap import utils > > -__all__ = ['unittest', 'mock', > - 'virt_bootstrap', 'sources', 'progress', 'utils'] > +__all__ = ['virt_bootstrap', 'sources', 'progress', 'utils'] > diff --git a/tests/docker_source.py b/tests/docker_source.py > new file mode 100644 > index 0000000..60404e6 > --- /dev/null > +++ b/tests/docker_source.py > @@ -0,0 +1,150 @@ > +# -*- coding: utf-8 -*- > +# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > +# > +# Copyright (C) 2017 Radostin Stoyanov > +# > +# This program is free software: you can redistribute it and/or modify > +# it under the terms of the GNU General Public License as published by > +# the Free Software Foundation, either version 3 of the License, or > +# (at your option) any later version. > + > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > + > +# You should have received a copy of the GNU General Public License > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > + > +""" > +Tests which aim is to exercise creation of root file system with DockerSource. > +""" > + > +import unittest > + > +from . import mock > +from . import sources > + > + > +# pylint: disable=invalid-name > +class TestDockerSource(unittest.TestCase): > + """ > + Unit tests for DockerSource > + """ > + ################################### > + # Tests for: retrieve_layers_info() > + ################################### > + def _mock_retrieve_layers_info(self, manifest, kwargs): > + """ > + This method is gather common test pattern used in the following > + two test cases which aim to return an instance of the class > + DockerSource with some util functions being mocked. > + """ > + with mock.patch.multiple('virtBootstrap.utils', > + get_image_details=mock.DEFAULT, > + get_image_dir=mock.DEFAULT) as m_utils: > + > + m_utils['get_image_details'].return_value = manifest > + m_utils['get_image_dir'].return_value = '/images_path' > + > + patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri' > + with mock.patch(patch_method) as m_uri: > + src_instance = sources.DockerSource(**kwargs) > + return (src_instance, m_uri, m_utils) > + > + def test_retrieve_layers_info_pass_arguments_to_get_image_details(self): > + """ > + Ensures that retrieve_layers_info() calls get_image_details() > + with all passed arguments. > + """ > + src_kwargs = { > + 'uri': '', > + 'progress': mock.Mock() > + } > + > + manifest = {'schemaVersion': 2, 'layers': []} > + (src_instance, > + m_uri, m_utils) = self._mock_retrieve_layers_info(manifest, > + src_kwargs) > + > + kwargs = { > + 'insecure': src_instance.insecure, > + 'username': src_instance.username, > + 'password': src_instance.password, > + 'raw': True > + } > + m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs) > + > + def test_retrieve_layers_info_schema_version_1(self): > + """ > + Ensures that retrieve_layers_info() extracts the layers' information > + from manifest with schema version 1 a list with format: > + ["digest", "sum_type", "file_path", "size"]. > + """ > + kwargs = { > + 'uri': '', > + 'progress': mock.Mock() > + } > + > + manifest = { > + 'schemaVersion': 1, > + 'fsLayers': [ > + {'blobSum': 'sha256:75c416ea'}, > + {'blobSum': 'sha256:c6ff40b6'}, > + {'blobSum': 'sha256:a7050fc1'} > + ] > + } > + > + expected_result = [ > + ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None], > + ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None], > + ['sha256', '75c416ea', '/images_path/75c416ea.tar', None] > + ] > + > + with mock.patch('os.path.getsize') as m_getsize: > + m_getsize.return_value = None > + src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] > + self.assertEqual(src_instance.layers, expected_result) > + > + def test_retrieve_layers_info_schema_version_2(self): > + """ > + Ensures that retrieve_layers_info() extracts the layers' information > + from manifest with schema version 2 a list with format: > + ["digest", "sum_type", "file_path", "size"]. > + """ > + kwargs = { > + 'uri': '', > + 'progress': mock.Mock() > + } > + > + manifest = { > + 'schemaVersion': 2, > + "layers": [ > + {"size": 47103294, "digest": "sha256:75c416ea"}, > + {"size": 814, "digest": "sha256:c6ff40b6"}, > + {"size": 513, "digest": "sha256:a7050fc1"} > + ] > + } > + > + expected_result = [ > + ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294], > + ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814], > + ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513] > + ] > + > + src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] > + self.assertEqual(src_instance.layers, expected_result) > + > + def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self): > + """ > + Ensures that retrieve_layers_info() calls get_image_details() > + with all passed arguments. > + """ > + kwargs = { > + 'uri': '', > + 'progress': mock.Mock() > + } > + > + manifest = {'schemaVersion': 3} > + with self.assertRaises(ValueError): > + self._mock_retrieve_layers_info(manifest, kwargs) > diff --git a/tests/test_docker_source.py b/tests/test_docker_source.py > deleted file mode 100644 > index 4859e1b..0000000 > --- a/tests/test_docker_source.py > +++ /dev/null > @@ -1,607 +0,0 @@ > -# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > -# > -# Copyright (C) 2017 Radostin Stoyanov > -# > -# This program is free software: you can redistribute it and/or modify > -# it under the terms of the GNU General Public License as published by > -# the Free Software Foundation, either version 3 of the License, or > -# (at your option) any later version. > - > -# This program is distributed in the hope that it will be useful, > -# but WITHOUT ANY WARRANTY; without even the implied warranty of > -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > -# GNU General Public License for more details. > - > -# You should have received a copy of the GNU General Public License > -# along with this program. If not, see <http://www.gnu.org/licenses/>. > - > - > -""" > -Unit tests for methods defined in virtBootstrap.sources.DockerSource > -""" > - > -from tests import unittest > -from tests import mock > -from tests import sources > - > -try: > - from urlparse import urlparse > -except ImportError: > - from urllib.parse import urlparse > - > - > -# pylint: disable=invalid-name > -# pylint: disable=too-many-public-methods > -class TestDockerSource(unittest.TestCase): > - """ > - Test cases for DockerSource > - """ > - def _mock_docker_source(self): > - """ > - This method returns an instance of Mock object > - that acts as the specification for the DockerSource. > - """ > - m_self = mock.Mock(spec=sources.DockerSource) > - m_self.progress = mock.Mock() > - m_self.no_cache = False > - m_self.url = "docker://test" > - m_self.images_dir = "/images_path" > - m_self.insecure = True > - m_self.username = 'user' > - m_self.password = 'password' > - m_self.layers = [ > - ['sha256', '75c416ea', '/images_path/75c416ea.tar', ''], > - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', ''] > - ] > - return m_self > - > - ################################### > - # Tests for: __init__() > - ################################### > - def test_argument_assignment(self): > - """ > - Ensures that __init__() assigns the arguments' values to instance > - variables. > - """ > - kwargs = {'uri': '', > - 'fmt': 'dir', > - 'not_secure': False, > - 'no_cache': False, > - 'progress': mock.Mock(), > - 'username': 'username', > - 'password': 'password'} > - > - with mock.patch('virtBootstrap.utils' > - '.get_image_dir') as m_get_image_dir: > - with mock.patch.multiple('virtBootstrap.sources.DockerSource', > - retrieve_layers_info=mock.DEFAULT, > - gen_valid_uri=mock.DEFAULT) as mocked: > - src_instance = sources.DockerSource(**kwargs) > - > - test_values = { > - src_instance.url: mocked['gen_valid_uri'].return_value, > - src_instance.progress: kwargs['progress'].update_progress, > - src_instance.username: kwargs['username'], > - src_instance.password: kwargs['password'], > - src_instance.output_format: kwargs['fmt'], > - src_instance.no_cache: kwargs['no_cache'], > - src_instance.insecure: kwargs['not_secure'], > - src_instance.images_dir: m_get_image_dir() > - } > - for value in test_values: > - self.assertIs(value, test_values[value]) > - > - def test_source_password_is_required_if_username_specifed(self): > - """ > - Ensures that __init__() calls getpass() to request password > - when username is specified and password is not. > - """ > - test_password = 'secret' > - > - kwargs = {arg: '' for arg > - in ['uri', 'fmt', 'not_secure', 'password', 'no_cache']} > - kwargs['progress'] = mock.Mock() > - kwargs['username'] = 'test' > - > - with mock.patch('virtBootstrap.utils.get_image_dir'): > - with mock.patch('getpass.getpass') as m_getpass: > - m_getpass.return_value = test_password > - with mock.patch.multiple('virtBootstrap.sources.DockerSource', > - retrieve_layers_info=mock.DEFAULT, > - gen_valid_uri=mock.DEFAULT): > - src_instance = sources.DockerSource(**kwargs) > - > - m_getpass.assert_called_once() > - self.assertIs(test_password, src_instance.password) > - > - ################################### > - # Tests for: retrieve_layers_info() > - ################################### > - def _mock_retrieve_layers_info(self, manifest, kwargs): > - """ > - This method is gather common test pattern used in the following > - two test cases. > - """ > - with mock.patch.multiple('virtBootstrap.utils', > - get_image_details=mock.DEFAULT, > - get_image_dir=mock.DEFAULT) as m_utils: > - > - m_utils['get_image_details'].return_value = manifest > - m_utils['get_image_dir'].return_value = '/images_path' > - > - patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri' > - with mock.patch(patch_method) as m_uri: > - src_instance = sources.DockerSource(**kwargs) > - return (src_instance, m_uri, m_utils) > - > - def test_retrieve_layers_info_pass_arguments_to_get_image_details(self): > - """ > - Ensures that retrieve_layers_info() calls get_image_details() > - with all passed arguments. > - """ > - src_kwargs = { > - 'uri': '', > - 'progress': mock.Mock() > - } > - > - manifest = {'schemaVersion': 2, 'layers': []} > - (src_instance, > - m_uri, m_utils) = self._mock_retrieve_layers_info(manifest, > - src_kwargs) > - > - kwargs = { > - 'insecure': src_instance.insecure, > - 'username': src_instance.username, > - 'password': src_instance.password, > - 'raw': True > - } > - m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs) > - > - def test_retrieve_layers_info_schema_version_1(self): > - """ > - Ensures that retrieve_layers_info() extracts the layers' information > - from manifest with schema version 1 a list with format: > - ["digest", "sum_type", "file_path", "size"]. > - """ > - kwargs = { > - 'uri': '', > - 'progress': mock.Mock() > - } > - > - manifest = { > - 'schemaVersion': 1, > - 'fsLayers': [ > - {'blobSum': 'sha256:75c416ea'}, > - {'blobSum': 'sha256:c6ff40b6'}, > - {'blobSum': 'sha256:a7050fc1'} > - ] > - } > - > - expected_result = [ > - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None], > - ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None], > - ['sha256', '75c416ea', '/images_path/75c416ea.tar', None] > - ] > - > - src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] > - self.assertEqual(src_instance.layers, expected_result) > - > - def test_retrieve_layers_info_schema_version_2(self): > - """ > - Ensures that retrieve_layers_info() extracts the layers' information > - from manifest with schema version 2 a list with format: > - ["digest", "sum_type", "file_path", "size"]. > - """ > - kwargs = { > - 'uri': '', > - 'progress': mock.Mock() > - } > - > - manifest = { > - 'schemaVersion': 2, > - "layers": [ > - {"size": 47103294, "digest": "sha256:75c416ea"}, > - {"size": 814, "digest": "sha256:c6ff40b6"}, > - {"size": 513, "digest": "sha256:a7050fc1"} > - ] > - } > - > - expected_result = [ > - ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294], > - ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814], > - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513] > - ] > - > - src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0] > - self.assertEqual(src_instance.layers, expected_result) > - > - def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self): > - """ > - Ensures that retrieve_layers_info() calls get_image_details() > - with all passed arguments. > - """ > - kwargs = { > - 'uri': '', > - 'progress': mock.Mock() > - } > - > - manifest = {'schemaVersion': 3} > - with self.assertRaises(ValueError): > - self._mock_retrieve_layers_info(manifest, kwargs) > - > - ################################### > - # Tests for: gen_valid_uri() > - ################################### > - def test_gen_valid_uri(self): > - """ > - Validates the output of gen_valid_uri() for some test cases. > - """ > - m_self = self._mock_docker_source() > - test_values = { > - 'docker:///repo': 'docker://repo', > - 'docker:/repo': 'docker://repo', > - 'docker://repo/': 'docker://repo', > - 'docker://repo/image/': 'docker://repo/image', > - 'docker:///repo/image/': 'docker://repo/image', > - } > - for uri in test_values: > - uri_obj = urlparse(uri) > - result = sources.DockerSource.gen_valid_uri(m_self, uri_obj) > - expected = test_values[uri] > - self.assertEqual(result, expected) > - > - ################################### > - # Tests for: download_image() > - ################################### > - def test_download_image(self): > - """ > - Ensures that download_image() calls read_skopeo_progress() with > - expected skopeo copy command and removes tha leftover manifest file. > - """ > - m_self = self._mock_docker_source() > - m_self.read_skopeo_progress = mock.Mock() > - manifest_path = "%s/manifest.json" % m_self.images_dir > - with mock.patch('os.remove') as m_remove: > - sources.DockerSource.download_image(m_self) > - > - expected_call = ["skopeo", "copy", m_self.url, > - "dir:" + m_self.images_dir, > - '--src-tls-verify=false', > - '--src-creds={}:{}'.format(m_self.username, > - m_self.password)] > - m_self.read_skopeo_progress.assert_called_once_with(expected_call) > - m_remove.assert_called_once_with(manifest_path) > - > - ################################### > - # Tests for: parse_output() > - ################################### > - def test_parse_output_return_false_on_fail(self): > - """ > - Ensures that parse_output() returns False when process call > - exits with non-zero code. > - """ > - m_self = mock.Mock(spec=sources.DockerSource) > - m_self.layers = [] > - m_proc = mock.Mock() > - m_proc.returncode = 1 > - self.assertFalse(sources.DockerSource.parse_output(m_self, m_proc)) > - > - def test_parse_output(self): > - """ > - Ensures that parse_output() recognises processing of different > - layers from the skopeo's output. > - """ > - m_self = self._mock_docker_source() > - m_proc = mock.Mock() > - m_proc.poll.return_value = None > - m_proc.returncode = 0 > - test_values = '\n'.join([ > - 'Skipping fetch of repeat blob sha256:c6ff40', > - 'Copying blob sha256:75c416ea735c4', > - '40.00 MB / 44.92 MB [======================>------]', > - 'Copying config sha256:d355ed35', > - '40.00 MB / 44.92 MB [======================>------]' > - ]) > - > - expected_progress_calls = [ > - mock.call("Downloading layer (1/2)"), > - mock.call("Downloading layer (2/2)"), > - ] > - > - with mock.patch('select.select') as m_select: > - m_select.return_value = [[test_values], [], []] > - with mock.patch('virtBootstrap.utils.read_async') as m_read_async: > - m_read_async.return_value = test_values > - self.assertTrue(sources.DockerSource.parse_output(m_self, > - m_proc)) > - m_select.assert_called_once_with([m_proc.stdout], [], []) > - m_read_async.assert_called_once_with(test_values) > - m_self.progress.assert_has_calls(expected_progress_calls) > - m_self.update_progress_from_output.assert_called_once() > - m_proc.wait.assert_called_once() > - > - ################################### > - # Tests for: update_progress_from_output() > - ################################### > - def _mock_update_progress_from_output(self, test_values): > - """ > - This method is gather common test pattern used in the following > - two test cases. > - """ > - m_self = self._mock_docker_source() > - test_method = sources.DockerSource.update_progress_from_output > - for line in test_values: > - test_method(m_self, line.split(), 1, len(test_values)) > - > - return m_self.progress.call_args_list > - > - def test_update_progress_from_output(self): > - """ > - Ensures that update_progress_from_output() recognises the current > - downloaded size, the total layer's size and calculates correct > - percentage value. > - """ > - test_values = [ > - '500.00 KB / 4.00 MB [======>------]', > - '25.00 MB / 24.10 MB [======>------]', > - '40.00 MB / 50.00 MB [======>------]', > - ] > - expected_values = [2, 17.33, 13.33] > - > - calls = self._mock_update_progress_from_output(test_values) > - for call, expected in zip(calls, expected_values): > - self.assertAlmostEqual(call[1]['value'], expected, places=1) > - > - def test_update_progress_from_output_ignore_failures(self): > - """ > - Ensures that update_progress_from_output() ignores invalid lines > - from skopeo's output. > - """ > - test_values = [ > - 'a ', > - '1 ' * 5, > - '500.00 MB / 0.00 MB [======>------]', > - '00.00 MB / 00.00 MB [======>------]', > - ] > - self._mock_update_progress_from_output(test_values) > - > - ################################### > - # Tests for: read_skopeo_progress() > - ################################### > - def _mock_read_skopeo_progress(self, test_cmd, parse_output_return): > - """ > - This method is gather common test pattern used in the following > - two test cases. > - """ > - m_self = mock.Mock(spec=sources.DockerSource) > - m_self.parse_output.return_value = parse_output_return > - with mock.patch.multiple('virtBootstrap.sources.' > - 'docker_source.subprocess', > - Popen=mock.DEFAULT, > - PIPE=mock.DEFAULT) as mocked: > - with mock.patch('virtBootstrap.utils.make_async') as m_make_async: > - sources.DockerSource.read_skopeo_progress(m_self, test_cmd) > - > - return (mocked, m_make_async) > - > - def test_read_skopeo_progress(self): > - """ > - Ensures that read_skopeo_progress() calls make_async() with > - the stdout pipe of skopeo's process. > - """ > - test_cmd = 'test' > - mocked, m_make_async = self._mock_read_skopeo_progress(test_cmd, True) > - > - mocked['Popen'].assert_called_once_with(test_cmd, > - stdout=mocked['PIPE'], > - stderr=mocked['PIPE'], > - universal_newlines=True) > - m_make_async.assert_called_once_with(mocked['Popen']().stdout) > - > - def test_read_skopeo_progress_raise_error(self): > - """ > - Ensures that read_skopeo_progress() raise CalledProcessError > - when parse_output() returns false. > - """ > - with self.assertRaises(sources.docker_source > - .subprocess.CalledProcessError): > - self._mock_read_skopeo_progress('test', False) > - > - ################################### > - # Tests for: validate_image_layers() > - ################################### > - def _mock_validate_image_layers(self, > - checksum_return, > - path_exists_return, > - expected_result, > - check_calls=False): > - """ > - This method is gather common test pattern used in the following > - three test cases. > - """ > - m_self = self._mock_docker_source() > - > - with mock.patch('os.path.exists') as m_path_exists: > - with mock.patch('virtBootstrap.utils.checksum') as m_checksum: > - m_checksum.return_value = checksum_return > - m_path_exists.return_value = path_exists_return > - result = sources.DockerSource.validate_image_layers(m_self) > - self.assertEqual(result, expected_result) > - > - if check_calls: > - path_exists_expected_calls = [] > - checksum_expected_calls = [] > - # Generate expected calls > - for sum_type, hash_sum, path, _ignore in m_self.layers: > - path_exists_expected_calls.append(mock.call(path)) > - checksum_expected_calls.append( > - mock.call(path, sum_type, hash_sum)) > - > - m_path_exists.assert_has_calls(path_exists_expected_calls) > - m_checksum.assert_has_calls(checksum_expected_calls) > - > - def test_validate_image_layers_should_return_true(self): > - """ > - Ensures that validate_image_layers() returns True when: > - - checksum() returns True for all layers > - - the file path of all layers exist > - - all layers are validated > - """ > - self._mock_validate_image_layers(True, True, True, True) > - > - def test_validate_image_layers_return_false_if_path_not_exist(self): > - """ > - Ensures that validate_image_layers() returns False when > - checksum() returns False. > - """ > - self._mock_validate_image_layers(False, True, False) > - > - def test_validate_image_layers_return_false_if_checksum_fail(self): > - """ > - Ensures that validate_image_layers() returns False when > - the file path of layer does not exist. > - """ > - self._mock_validate_image_layers(True, False, False) > - > - ################################### > - # Tests for: fetch_layers() > - ################################### > - def _mock_fetch_layers(self, validate_return): > - """ > - This method is gather common test pattern used in the following > - two test cases. > - """ > - m_self = mock.Mock(spec=sources.DockerSource) > - m_self.validate_image_layers.return_value = validate_return > - sources.DockerSource.fetch_layers(m_self) > - return m_self > - > - def test_fetch_layers_should_call_download_image(self): > - """ > - Ensures that fetch_layers() calls download_image() > - when validate_image_layers() returns False. > - """ > - m_self = self._mock_fetch_layers(False) > - m_self.download_image.assert_called_once() > - > - def test_fetch_layers_should_not_call_download_image(self): > - """ > - Ensures that fetch_layers() does not call download_image() > - when validate_image_layers() returns True. > - """ > - m_self = self._mock_fetch_layers(True) > - m_self.download_image.assert_not_called() > - > - ################################### > - # Tests for: unpack() > - ################################### > - def _unpack_test_fmt(self, output_format, patch_method=None, > - side_effect=None, m_self=None): > - """ > - This method is gather common test pattern used in the following > - two test cases. > - """ > - m_self = m_self if m_self else self._mock_docker_source() > - m_self.output_format = output_format > - dest = 'foo' > - > - if patch_method: > - with mock.patch(patch_method) as mocked: > - if side_effect: > - mocked.side_effect = side_effect > - sources.DockerSource.unpack(m_self, dest) > - > - mocked.assert_called_once_with(m_self.layers, dest, > - m_self.progress) > - else: > - sources.DockerSource.unpack(m_self, dest) > - > - m_self.fetch_layers.assert_called_once() > - > - def test_unpack_dir_format(self): > - """ > - Ensures that unpack() calls untar_layers() when the output format > - is set to 'dir'. > - """ > - self._unpack_test_fmt('dir', 'virtBootstrap.utils.untar_layers') > - > - def test_unpack_qcow2_format(self): > - """ > - Ensures that unpack() calls extract_layers_in_qcow2() when the > - output format is set to 'qcow2'. > - """ > - self._unpack_test_fmt('qcow2', > - 'virtBootstrap.utils.extract_layers_in_qcow2') > - > - def unpack_raise_error_test(self, > - output_format, > - patch_method, > - side_effect=None, > - msg=None): > - """ > - This method is gather common test pattern used in the following > - four test cases. > - """ > - with self.assertRaises(Exception) as err: > - self._unpack_test_fmt(output_format, patch_method, > - side_effect) > - if msg: > - self.assertEqual(msg, str(err.exception)) > - > - def test_unpack_raise_error_for_unknown_format(self): > - """ > - Ensures that unpack() throws an Exception when called with > - invalid output format. > - """ > - msg = 'Unknown format:foo' > - self.unpack_raise_error_test('foo', None, None, msg) > - > - def test_unpack_raise_error_if_untar_fail(self): > - """ > - Ensures that unpack() throws an Exception when untar_layers() > - fails. > - """ > - msg = 'Caught untar failure' > - side_effect = Exception(msg) > - patch_method = 'virtBootstrap.utils.untar_layers' > - self.unpack_raise_error_test('dir', patch_method, side_effect, msg) > - > - def test_unpack_raise_error_if_extract_in_qcow2_fail(self): > - """ > - Ensures that unpack() throws an Exception when > - extract_layers_in_qcow2() fails. > - """ > - msg = 'Caught extract_layers_in_qcow2 failure' > - side_effect = Exception(msg) > - patch_method = 'virtBootstrap.utils.extract_layers_in_qcow2' > - self.unpack_raise_error_test('qcow2', patch_method, side_effect, msg) > - > - def test_unpack_no_cache_clean_up(self): > - """ > - Ensures that unpack() removes the folder which stores tar archives > - of image layers when no_cache is set to True. > - """ > - output_formats = ['dir', 'qcow2'] > - patch_methods = [ > - 'virtBootstrap.utils.untar_layers', > - 'virtBootstrap.utils.extract_layers_in_qcow2' > - ] > - for fmt, patch_mthd in zip(output_formats, patch_methods): > - m_self = self._mock_docker_source() > - m_self.no_cache = True > - with mock.patch('shutil.rmtree') as m_shutil: > - self._unpack_test_fmt(fmt, patch_mthd, m_self=m_self) > - m_shutil.assert_called_once_with(m_self.images_dir) > - > - def test_unpack_no_cache_clean_up_on_failure(self): > - """ > - Ensures that unpack() removes the folder which stores tar archives > - of image layers when no_cache is set to True and exception was > - raised. > - """ > - m_self = self._mock_docker_source() > - m_self.no_cache = True > - with self.assertRaises(Exception): > - with mock.patch('shutil.rmtree') as m_rmtree: > - self._unpack_test_fmt('foo', None, m_self=m_self) > - m_rmtree.assert_called_once_with(m_self.images_dir) > diff --git a/tests/test_file_source.py b/tests/test_file_source.py > deleted file mode 100644 > index 6e89aa2..0000000 > --- a/tests/test_file_source.py > +++ /dev/null > @@ -1,171 +0,0 @@ > -# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > -# > -# Copyright (C) 2017 Radostin Stoyanov > -# > -# This program is free software: you can redistribute it and/or modify > -# it under the terms of the GNU General Public License as published by > -# the Free Software Foundation, either version 3 of the License, or > -# (at your option) any later version. > - > -# This program is distributed in the hope that it will be useful, > -# but WITHOUT ANY WARRANTY; without even the implied warranty of > -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > -# GNU General Public License for more details. > - > -# You should have received a copy of the GNU General Public License > -# along with this program. If not, see <http://www.gnu.org/licenses/>. > - > - > -""" > -Unit tests for methods defined in virtBootstrap.sources.FileSource > -""" > - > -from tests import unittest > -from tests import mock > -from tests import sources > - > - > -# pylint: disable=invalid-name > -class TestFileSource(unittest.TestCase): > - """ > - Test cases for FileSource > - """ > - > - ################################### > - # Tests for: __init__() > - ################################### > - def test_argument_assignment(self): > - """ > - Ensures that __init__() assigns the arguments' values to instance > - variables. > - """ > - kwargs = {'uri': mock.Mock(), > - 'fmt': 'dir', > - 'progress': mock.Mock()} > - > - src_instance = sources.FileSource(**kwargs) > - > - test_values = { > - src_instance.path: kwargs['uri'].path, > - src_instance.output_format: kwargs['fmt'], > - src_instance.progress: kwargs['progress'].update_progress > - } > - for value in test_values: > - self.assertIs(value, test_values[value]) > - > - ################################### > - # Tests for: unpack() > - ################################### > - def test_unpack_invalid_source_raise_exception(self): > - """ > - Ensures that unpack() throws an Exception when called with > - invalid file source. > - """ > - m_self = mock.Mock(spec=sources.FileSource) > - m_self.path = 'foo' > - with mock.patch('os.path.isfile') as m_isfile: > - m_isfile.return_value = False > - with self.assertRaises(Exception) as err: > - sources.FileSource.unpack(m_self, 'bar') > - self.assertIn('Invalid file source', str(err.exception)) > - > - def test_unpack_to_dir(self): > - """ > - Ensures that unpack() calls safe_untar() when the output format > - is set to 'dir'. > - """ > - m_self = mock.Mock(spec=sources.FileSource) > - m_self.progress = mock.Mock() > - m_self.path = 'foo' > - m_self.output_format = 'dir' > - dest = 'bar' > - > - with mock.patch('os.path.isfile') as m_isfile: > - m_isfile.return_value = True > - with mock.patch('virtBootstrap.utils.safe_untar') as m_untar: > - sources.FileSource.unpack(m_self, dest) > - > - m_untar.assert_called_once_with(m_self.path, dest) > - > - def test_unpack_to_qcow2(self): > - """ > - Ensures that unpack() calls create_qcow2() when the output > - format is set to 'qcow2'. > - """ > - m_self = mock.Mock(spec=sources.FileSource) > - m_self.progress = mock.Mock() > - m_self.path = 'foo' > - m_self.output_format = 'qcow2' > - dest = 'bar' > - qcow2_file_path = 'foobar' > - > - with mock.patch.multiple('os.path', > - isfile=mock.DEFAULT, > - realpath=mock.DEFAULT) as mocked: > - > - mocked['isfile'].return_value = True > - mocked['realpath'].return_value = qcow2_file_path > - with mock.patch('virtBootstrap.utils.create_qcow2') as m_qcow2: > - sources.FileSource.unpack(m_self, dest) > - > - m_qcow2.assert_called_once_with(m_self.path, qcow2_file_path) > - > - def _unpack_raise_error_test(self, > - output_format, > - side_effect=None, > - patch_method=None, > - msg=None): > - """ > - This method is gather common test pattern used in the following > - three test cases. > - """ > - m_self = mock.Mock(spec=sources.FileSource) > - m_self.progress = mock.Mock() > - m_self.path = 'foo' > - m_self.output_format = output_format > - dest = 'bar' > - > - with mock.patch.multiple('os.path', > - isfile=mock.DEFAULT, > - realpath=mock.DEFAULT) as m_path: > - m_path['isfile'].return_value = True > - with self.assertRaises(Exception) as err: > - if patch_method: > - with mock.patch(patch_method) as mocked_method: > - mocked_method.side_effect = side_effect > - sources.FileSource.unpack(m_self, dest) > - else: > - sources.FileSource.unpack(m_self, dest) > - if msg: > - self.assertEqual(msg, str(err.exception)) > - > - def test_unpack_invalid_format_raise_exception(self): > - """ > - Ensures that unpack() throws an Exception when called with > - invalid output format. > - """ > - self._unpack_raise_error_test('foo', msg='Unknown format:foo') > - > - def test_unpack_raise_error_if_untar_fail(self): > - """ > - Ensures that unpack() throws an Exception when safe_untar() > - fails. > - """ > - msg = 'Caught untar failure' > - patch_method = 'virtBootstrap.utils.safe_untar' > - self._unpack_raise_error_test(output_format='dir', > - side_effect=Exception(msg), > - patch_method=patch_method, > - msg=msg) > - > - def test_unpack_raise_error_if_extract_in_qcow2_fail(self): > - """ > - Ensures that unpack() throws an Exception when create_qcow2() > - fails. > - """ > - msg = 'Caught extract_layers_in_qcow2 failure' > - patch_method = 'virtBootstrap.utils.create_qcow2' > - self._unpack_raise_error_test(output_format='qcow2', > - side_effect=Exception(msg), > - patch_method=patch_method, > - msg=msg) > diff --git a/tests/test_progress.py b/tests/test_progress.py > deleted file mode 100644 > index 1f609d5..0000000 > --- a/tests/test_progress.py > +++ /dev/null > @@ -1,112 +0,0 @@ > -# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > -# > -# Copyright (C) 2017 Radostin Stoyanov > -# > -# This program is free software: you can redistribute it and/or modify > -# it under the terms of the GNU General Public License as published by > -# the Free Software Foundation, either version 3 of the License, or > -# (at your option) any later version. > - > -# This program is distributed in the hope that it will be useful, > -# but WITHOUT ANY WARRANTY; without even the implied warranty of > -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > -# GNU General Public License for more details. > - > -# You should have received a copy of the GNU General Public License > -# along with this program. If not, see <http://www.gnu.org/licenses/>. > - > - > -""" > -Unit tests for methods defined in virtBootstrap.progress > -""" > - > -from tests import unittest > -from tests import mock > -from tests import progress > - > - > -# pylint: disable=invalid-name > -class TestFileSource(unittest.TestCase): > - """ > - Test cases for Progress module > - """ > - > - ################################### > - # Tests for: __init__() > - ################################### > - def test_progress_init(self): > - """ > - Ensures that __init__() assigns the collback value to instance > - variable and creates dictionary with 'status', 'value' keys. > - """ > - callback = mock.Mock() > - test_instance = progress.Progress(callback) > - for key in ['status', 'value']: > - self.assertIn(key, test_instance.progress) > - self.assertIs(callback, test_instance.callback) > - > - ################################### > - # Tests for: get_progress() > - ################################### > - def test_get_progress(self): > - """ > - Ensures that get_progress() returns copy of the progress dictionary > - which has the same keys and values. > - """ > - test_instance = progress.Progress() > - test_result = test_instance.get_progress() > - self.assertIsNot(test_instance.progress, test_result) > - self.assertDictEqual(test_instance.progress, test_result) > - > - ################################### > - # Tests for: update_progress() > - ################################### > - def test_update_progress_creates_log_record(self): > - """ > - Ensures that update_progress() creates log record with info level > - and pass the status value as message. > - """ > - test_instance = progress.Progress() > - logger = mock.Mock() > - status = "Test" > - test_instance.update_progress(status=status, logger=logger) > - logger.info.assert_called_once_with(status) > - > - def test_update_progress_update_status_and_value(self): > - """ > - Ensures that update_progress() creates log record with info level > - and pass the status value as message. > - """ > - test_instance = progress.Progress() > - test_instance.progress = {'status': '', 'value': 0} > - new_status = 'Test' > - new_value = 100 > - new_progress = {'status': new_status, 'value': new_value} > - test_instance.update_progress(status=new_status, value=new_value) > - self.assertDictEqual(test_instance.progress, new_progress) > - > - def test_update_progress_update_raise_logger_error(self): > - """ > - Ensures that update_progress() raise ValueError when creating > - log record has failed. > - """ > - msg = 'test' > - test_instance = progress.Progress() > - logger = mock.Mock() > - logger.info.side_effect = Exception(msg) > - with self.assertRaises(ValueError) as err: > - test_instance.update_progress(logger=logger) > - self.assertIn(msg, str(err.exception)) > - > - def test_update_progress_update_raise_callback_error(self): > - """ > - Ensures that update_progress() raise ValueError when calling > - callback failed. > - """ > - msg = 'test' > - callback = mock.Mock() > - callback.side_effect = Exception(msg) > - test_instance = progress.Progress(callback) > - with self.assertRaises(ValueError) as err: > - test_instance.update_progress('foo', 'bar') > - self.assertIn(msg, str(err.exception)) > diff --git a/tests/test_utils.py b/tests/test_utils.py > index 0b6ccc0..c2f55b5 100644 > --- a/tests/test_utils.py > +++ b/tests/test_utils.py > @@ -1,3 +1,4 @@ > +# -*- coding: utf-8 -*- > # Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > # > # Copyright (C) 2017 Radostin Stoyanov > @@ -19,126 +20,16 @@ > """ > Unit tests for functions defined in virtBootstrap.utils > """ > - > -from tests import unittest > -from tests import mock > -from tests import utils > -try: > - # pylint: disable=redefined-builtin > - from importlib import reload > -except ImportError: > - pass > +import unittest > +from . import utils > > > # pylint: disable=invalid-name > -# pylint: disable=too-many-public-methods > class TestUtils(unittest.TestCase): > """ > Ensures that functions defined in the utils module of virtBootstrap > work as expected. > """ > - > - ################################### > - # Tests for: checksum() > - ################################### > - def test_utils_checksum_return_false_on_invalid_hash(self): > - """ > - Ensures that checksum() returns False if the actual and expected > - hash sum of file are not equal. > - """ > - with mock.patch.multiple(utils, > - open=mock.DEFAULT, > - logger=mock.DEFAULT, > - hashlib=mock.DEFAULT) as mocked: > - path, sum_type, sum_expected = '/foo', 'sha256', 'bar' > - mocked['hashlib'].sha256.hexdigest.return_value = False > - self.assertFalse(utils.checksum(path, sum_type, sum_expected)) > - > - def test_utils_checksum_return_false_if_file_could_not_be_opened(self): > - """ > - Ensures that checksum() returns False if the file to be checked > - cannot be open for read. > - """ > - with mock.patch.multiple(utils, > - open=mock.DEFAULT, > - logger=mock.DEFAULT, > - hashlib=mock.DEFAULT) as mocked: > - mocked['open'].side_effect = IOError() > - self.assertFalse(utils.checksum('foo', 'sha256', 'bar')) > - > - def test_utils_checksum_return_true_on_valid_hash(self): > - """ > - Ensures that checksum() returns True when the actual and expected > - hash sum of file are equal. > - """ > - with mock.patch.multiple(utils, > - open=mock.DEFAULT, > - logger=mock.DEFAULT, > - hashlib=mock.DEFAULT) as mocked: > - path, sum_type, sum_expected = '/foo', 'sha256', 'bar' > - mocked['hashlib'].sha256.return_value.hexdigest.return_value \ > - = sum_expected > - self.assertTrue(utils.checksum(path, sum_type, sum_expected)) > - > - ################################### > - # Tests for: execute() > - ################################### > - def test_utils_execute_logging_on_successful_proc_call(self): > - """ > - Ensures that execute() creates log record of cmd, stdout and stderr > - when the exit code of process is 0. > - """ > - with mock.patch.multiple(utils, > - logger=mock.DEFAULT, > - subprocess=mock.DEFAULT) as mocked: > - cmd = ['foo'] > - output, err = 'test_out', 'test_err' > - > - mocked['subprocess'].Popen.return_value.returncode = 0 > - (mocked['subprocess'].Popen.return_value > - .communicate.return_value) = (output.encode(), err.encode()) > - > - utils.execute(cmd) > - mocked['logger'].debug.assert_any_call("Call command:\n%s", cmd[0]) > - mocked['logger'].debug.assert_any_call("Stdout:\n%s", output) > - mocked['logger'].debug.assert_any_call("Stderr:\n%s", err) > - > - def test_utils_execute_raise_error_on_unsuccessful_proc_call(self): > - """ > - Ensures that execute() raise CalledProcessError exception when the > - exit code of process is not 0. > - """ > - with mock.patch('virtBootstrap.utils.subprocess.Popen') as m_popen: > - m_popen.return_value.returncode = 1 > - m_popen.return_value.communicate.return_value = (b'output', b'err') > - with self.assertRaises(utils.subprocess.CalledProcessError): > - utils.execute(['foo']) > - > - ################################### > - # Tests for: safe_untar() > - ################################### > - def test_utils_safe_untar_calls_execute(self): > - """ > - Ensures that safe_untar() calls execute with virt-sandbox > - command to extract source files to destination folder. > - Test for users with EUID 0 and 1000. > - """ > - with mock.patch('virtBootstrap.utils.os.geteuid') as m_geteuid: > - for uid in [0, 1000]: > - m_geteuid.return_value = uid > - reload(utils) > - with mock.patch('virtBootstrap.utils.execute') as m_execute: > - src, dest = 'foo', 'bar' > - utils.safe_untar('foo', 'bar') > - cmd = ['virt-sandbox', > - '-c', utils.LIBVIRT_CONN, > - '-m', 'host-bind:/mnt=' + dest, > - '--', > - '/bin/tar', 'xf', src, > - '-C', '/mnt', > - '--exclude', 'dev/*'] > - m_execute.assert_called_once_with(cmd) > - > ################################### > # Tests for: bytes_to_size() > ################################### > @@ -172,241 +63,6 @@ class TestUtils(unittest.TestCase): > i += 1 > > ################################### > - # Tests for: log_layer_extract() > - ################################### > - def test_utils_log_layer_extract(self): > - """ > - Ensures that log_layer_extract() updates the progress and creates > - log record with debug level. > - """ > - m_progress = mock.Mock() > - layer = ['sum_type', 'sum_value', 'layer_file', 'layer_size'] > - with mock.patch.multiple(utils, logger=mock.DEFAULT, > - bytes_to_size=mock.DEFAULT) as mocked: > - utils.log_layer_extract(layer, 'foo', 'bar', m_progress) > - mocked['bytes_to_size'].assert_called_once_with('layer_size') > - mocked['logger'].debug.assert_called_once() > - m_progress.assert_called_once() > - > - ################################### > - # Tests for: get_mime_type() > - ################################### > - @mock.patch('virtBootstrap.utils.subprocess.Popen') > - def test_utils_get_mime_type(self, m_popen): > - """ > - Ensures that get_mime_type() returns the detected MIME type > - of /usr/bin/file. > - """ > - path = "foo" > - mime = "application/x-gzip" > - stdout = ('%s: %s' % (path, mime)).encode() > - m_popen.return_value.stdout.read.return_value = stdout > - self.assertEqual(utils.get_mime_type(path), mime) > - m_popen.assert_called_once_with( > - ["/usr/bin/file", "--mime-type", path], > - stdout=utils.subprocess.PIPE > - ) > - > - ################################### > - # Tests for: untar_layers() > - ################################### > - def test_utils_untar_all_layers_in_order(self): > - """ > - Ensures that untar_layers() iterates through all passed layers > - in order. > - """ > - layers = ['l1', 'l2', 'l3'] > - layers_list = [['', '', layer] for layer in layers] > - dest_dir = '/foo' > - expected_calls = [mock.call(layer, dest_dir) for layer in layers] > - with mock.patch.multiple(utils, > - safe_untar=mock.DEFAULT, > - log_layer_extract=mock.DEFAULT) as mocked: > - utils.untar_layers(layers_list, dest_dir, mock.Mock()) > - mocked['safe_untar'].assert_has_calls(expected_calls) > - > - ################################### > - # Tests for: create_qcow2() > - ################################### > - def _apply_test_to_create_qcow2(self, expected_calls, *args): > - """ > - This method contains common test pattern used in the next two > - test cases. > - """ > - with mock.patch.multiple(utils, > - execute=mock.DEFAULT, > - logger=mock.DEFAULT, > - get_mime_type=mock.DEFAULT) as mocked: > - mocked['get_mime_type'].return_value = 'application/x-gzip' > - utils.create_qcow2(*args) > - mocked['execute'].assert_has_calls(expected_calls) > - > - def test_utils_create_qcow2_base_layer(self): > - """ > - Ensures that create_qcow2() creates base layer when > - backing_file = None. > - """ > - tar_file = 'foo' > - layer_file = 'bar' > - size = '5G' > - backing_file = None > - > - expected_calls = [ > - mock.call(["qemu-img", "create", "-f", "qcow2", layer_file, size]), > - > - mock.call(['virt-format', > - '--format=qcow2', > - '--partition=none', > - '--filesystem=ext3', > - '-a', layer_file]), > - > - mock.call(['guestfish', > - '-a', layer_file, > - '-m', '/dev/sda', > - 'tar-in', tar_file, '/', 'compress:gzip']) > - ] > - > - self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file, > - backing_file, size) > - > - def test_utils_create_qcow2_layer_with_backing_chain(self): > - """ > - Ensures that create_qcow2() creates new layer with backing chains > - when backing_file is specified. > - """ > - tar_file = 'foo' > - layer_file = 'bar' > - backing_file = 'base' > - size = '5G' > - > - expected_calls = [ > - mock.call(['qemu-img', 'create', > - '-b', backing_file, > - '-f', 'qcow2', > - layer_file, size]), > - > - mock.call(['guestfish', > - '-a', layer_file, > - '-m', '/dev/sda', > - 'tar-in', tar_file, '/', 'compress:gzip']) > - ] > - > - self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file, > - backing_file, size) > - > - ################################### > - # Tests for: extract_layers_in_qcow2() > - ################################### > - def test_utils_if_all_layers_extracted_in_order_in_qcow2(self): > - """ > - Ensures that extract_layers_in_qcow2() iterates through all > - layers in order. > - """ > - layers = ['l1', 'l2', 'l3'] > - layers_list = [['', '', layer] for layer in layers] > - dest_dir = '/foo' > - > - # Generate expected calls > - expected_calls = [] > - qcow2_backing_file = None > - for index, layer in enumerate(layers): > - qcow2_layer_file = dest_dir + "/layer-%s.qcow2" % index > - expected_calls.append( > - mock.call(layer, qcow2_layer_file, qcow2_backing_file)) > - qcow2_backing_file = qcow2_layer_file > - > - # Mocking out and execute > - with mock.patch.multiple(utils, > - create_qcow2=mock.DEFAULT, > - log_layer_extract=mock.DEFAULT) as mocked: > - utils.extract_layers_in_qcow2(layers_list, dest_dir, mock.Mock()) > - > - # Check actual calls > - mocked['create_qcow2'].assert_has_calls(expected_calls) > - > - ################################### > - # Tests for: get_image_dir() > - ################################### > - def test_utils_getimage_dir(self): > - """ > - Ensures that get_image_dir() returns path to DEFAULT_IMG_DIR > - if the no_cache argument is set to False and create it if > - does not exist. > - """ > - # Perform this test for UID 0 and 1000 > - for uid in [0, 1000]: > - with mock.patch('os.geteuid') as m_geteuid: > - m_geteuid.return_value = uid > - reload(utils) > - with mock.patch('os.makedirs') as m_makedirs: > - with mock.patch('os.path.exists') as m_path_exists: > - m_path_exists.return_value = False > - self.assertEqual(utils.get_image_dir(False), > - utils.DEFAULT_IMG_DIR) > - m_makedirs.assert_called_once_with(utils.DEFAULT_IMG_DIR) > - > - @mock.patch('tempfile.mkdtemp') > - def test_utils_getimage_dir_no_cache(self, m_mkdtemp): > - """ > - Ensures that get_image_dir() returns temporary file path created > - by tempfile.mkdtemp. > - """ > - m_mkdtemp.return_value = 'foo' > - self.assertEqual(utils.get_image_dir(True), 'foo') > - m_mkdtemp.assert_called_once() > - > - ################################### > - # Tests for: get_image_details() > - ################################### > - @mock.patch('virtBootstrap.utils.subprocess.Popen') > - def test_utils_get_image_details_raise_error_on_fail(self, m_popen): > - """ > - Ensures that get_image_details() throws ValueError exception > - when stderr from skopeo is provided. > - """ > - src = 'docker://foo' > - m_popen.return_value.communicate.return_value = [b'', b'Error'] > - with self.assertRaises(ValueError): > - utils.get_image_details(src) > - > - @mock.patch('virtBootstrap.utils.subprocess.Popen') > - def test_utils_get_image_details_return_json_obj_on_success(self, m_popen): > - """ > - Ensures that get_image_details() returns python dictionary which > - represents the data provided from stdout of skopeo when stderr > - is not present. > - """ > - src = 'docker://foo' > - json_dict = {'foo': 'bar'} > - stdout = utils.json.dumps(json_dict).encode() > - m_popen.return_value.communicate.return_value = [stdout, ''] > - self.assertDictEqual(utils.get_image_details(src), json_dict) > - > - def test_utils_get_image_details_all_argument_passed(self): > - """ > - Ensures that get_image_details() pass all argument values to > - skopeo inspect. > - """ > - src = 'docker://foo' > - raw, insecure = True, True > - username, password = 'user', 'password' > - cmd = ['skopeo', 'inspect', src, > - '--raw', > - '--tls-verify=false', > - "--creds=%s:%s" % (username, password)] > - > - with mock.patch.multiple(utils.subprocess, > - Popen=mock.DEFAULT, > - PIPE=mock.DEFAULT) as mocked: > - mocked['Popen'].return_value.communicate.return_value = [b'{}', > - b''] > - utils.get_image_details(src, raw, insecure, username, password) > - > - mocked['Popen'].assert_called_once_with(cmd, > - stdout=mocked['PIPE'], > - stderr=mocked['PIPE']) > - > - ################################### > # Tests for: is_new_layer_message() > ################################### > def test_utils_is_new_layer_message(self): > @@ -459,10 +115,11 @@ class TestUtils(unittest.TestCase): > Ensures that make_async() sets O_NONBLOCK flag on PIPE. > """ > > - pipe = utils.subprocess.Popen( > + proc = utils.subprocess.Popen( > ["echo"], > stdout=utils.subprocess.PIPE > - ).stdout > + ) > + pipe = proc.stdout > > fd = pipe.fileno() > F_GETFL = utils.fcntl.F_GETFL > @@ -471,36 +128,8 @@ class TestUtils(unittest.TestCase): > self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK) > utils.make_async(fd) > self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK) > - > - ################################### > - # Tests for: read_async() > - ################################### > - def test_utils_read_async_successful_read(self): > - """ > - Ensures that read_async() calls read() of passed file descriptor. > - """ > - m_fd = mock.MagicMock() > - utils.read_async(m_fd) > - m_fd.read.assert_called_once() > - > - def test_utils_read_async_return_empty_str_on_EAGAIN_error(self): > - """ > - Ensures that read_async() ignores EAGAIN errors and returns > - empty string. > - """ > - m_fd = mock.MagicMock() > - m_fd.read.side_effect = IOError(utils.errno.EAGAIN, '') > - self.assertEqual(utils.read_async(m_fd), '') > - > - def test_utils_read_async_raise_errors(self): > - """ > - Ensures that read_async() does not ignore IOError which is different > - than EAGAIN and throws an exception. > - """ > - m_fd = mock.MagicMock() > - m_fd.read.side_effect = IOError() > - with self.assertRaises(IOError): > - utils.read_async(m_fd) > + proc.wait() > + pipe.close() > > ################################### > # Tests for: str2float() > @@ -512,196 +141,3 @@ class TestUtils(unittest.TestCase): > test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25} > for test in test_values: > self.assertEqual(utils.str2float(test), test_values[test]) > - > - ################################### > - # Tests for: set_root_password_in_rootfs() > - ################################### > - def test_utils_set_root_password_in_rootfs_restore_permissions(self): > - """ > - Ensures that set_root_password_in_rootfs() restore shadow > - file permissions after edit. > - """ > - permissions = 700 > - rootfs_path = '/foo' > - shadow_file = '%s/etc/shadow' % rootfs_path > - > - m_open = mock.mock_open(read_data='') > - with mock.patch('virtBootstrap.utils.open', m_open, create=True): > - with mock.patch('virtBootstrap.utils.os') as m_os: > - m_os.stat.return_value = [permissions] > - m_os.path.join.return_value = shadow_file > - utils.set_root_password_in_rootfs(rootfs_path, 'password') > - > - expected_calls = [ > - mock.call.path.join(rootfs_path, 'etc/shadow'), > - mock.call.stat(shadow_file), > - mock.call.chmod(shadow_file, 438), > - mock.call.chmod(shadow_file, permissions) > - ] > - m_os.assert_has_calls(expected_calls) > - > - def test_utils_set_root_password_in_rootfs_restore_permissions_fail(self): > - """ > - Ensures that set_root_password_in_rootfs() restore shadow file > - permissions in case of failure. > - """ > - permissions = 700 > - rootfs_path = '/foo' > - shadow_file = '%s/etc/shadow' % rootfs_path > - > - m_open = mock.mock_open(read_data='') > - with mock.patch('virtBootstrap.utils.open', m_open, create=True): > - with mock.patch('virtBootstrap.utils.os') as m_os: > - m_os.stat.return_value = [permissions] > - m_os.path.join.return_value = shadow_file > - > - with self.assertRaises(Exception): > - m_open.side_effect = Exception > - utils.set_root_password_in_rootfs(rootfs_path, 'password') > - > - expected_calls = [ > - mock.call.path.join(rootfs_path, 'etc/shadow'), > - mock.call.stat(shadow_file), > - mock.call.chmod(shadow_file, 438), > - mock.call.chmod(shadow_file, permissions) > - ] > - m_os.assert_has_calls(expected_calls) > - > - def test_utils_set_root_password_in_rootfs_store_hash(self): > - """ > - Ensures that set_root_password_in_rootfs() stores the hashed > - root password in shadow file. > - """ > - rootfs_path = '/foo' > - password = 'secret' > - initial_value = '!locked' > - hashed_password = 'hashed_password' > - shadow_content = '\n'.join([ > - "root:%s::0:99999:7:::", > - "bin:*:17004:0:99999:7:::" > - "daemon:*:17004:0:99999:7:::", > - "adm:*:17004:0:99999:7:::" > - ]) > - > - m_open = mock.mock_open(read_data=shadow_content % initial_value) > - with mock.patch('virtBootstrap.utils.open', m_open, create=True): > - with mock.patch('virtBootstrap.utils.os'): > - with mock.patch('passlib.hosts.linux_context.hash') as m_hash: > - m_hash.return_value = hashed_password > - utils.set_root_password_in_rootfs(rootfs_path, password) > - > - m_hash.assert_called_once_with(password) > - m_open().write.assert_called_once_with(shadow_content > - % hashed_password) > - > - ################################### > - # Tests for: set_root_password_in_image() > - ################################### > - @mock.patch('virtBootstrap.utils.execute') > - def test_utils_set_root_password_in_image(self, m_execute): > - """ > - Ensures that set_root_password_in_image() calls virt-edit > - with correct arguments. > - """ > - image, password = 'foo', 'password' > - password_hash = ('$6$rounds=656000$PaQ/H4c/k8Ix9YOM$' > - 'cyD47r9PtAE2LhnkpdbVzsiQbM0/h2S/1Bv' > - 'u/sXqUtCg.3Ijp7TQy/8tEVstxMy5k5v4mh' > - 'CGFqnVv7S6wd.Ah/') > - > - expected_call = [ > - 'virt-edit', > - '-a', image, '/etc/shadow', > - '-e', 's,^root:.*?:,root:%s:,' % utils.re.escape(password_hash)] > - > - hash_function = 'virtBootstrap.utils.passlib.hosts.linux_context.hash' > - with mock.patch(hash_function) as m_hash: > - m_hash.return_value = password_hash > - utils.set_root_password_in_image(image, password) > - > - m_execute.assert_called_once_with(expected_call) > - > - ################################### > - # Tests for: set_root_password() > - ################################### > - @mock.patch('virtBootstrap.utils.set_root_password_in_rootfs') > - def test_utils_set_root_password_dir(self, m_set_root_password_in_rootfs): > - """ > - Ensures that set_root_password() calls set_root_password_in_rootfs() > - when the format is set to "dir". > - """ > - fmt, dest, root_password = 'dir', 'dest', 'root_password' > - utils.set_root_password(fmt, dest, root_password) > - > - m_set_root_password_in_rootfs.assert_called_once_with( > - dest, root_password > - ) > - > - @mock.patch('virtBootstrap.utils.set_root_password_in_image') > - def test_utils_set_root_password_qcow2(self, m_set_root_password_in_image): > - """ > - Ensures that set_root_password() calls set_root_password_in_image() > - when the format is set to "qcow2" with the path to the last > - extracted layer. > - """ > - fmt, dest, root_password = 'qcow2', 'dest', 'root_password' > - layers = ['layer-0.qcow2', 'layer-1.qcow2'] > - > - with mock.patch('os.listdir') as m_listdir: > - m_listdir.return_value = layers > - utils.set_root_password(fmt, dest, root_password) > - > - m_set_root_password_in_image.assert_called_once_with( > - utils.os.path.join(dest, max(layers)), > - root_password > - ) > - > - ################################### > - # Tests for: write_progress() > - ################################### > - def test_utils_write_progress_fill_terminal_width(self): > - """ > - Ensures that write_progress() outputs a message with length > - equal to terminal width and last symbol '\r'. > - """ > - terminal_width = 120 > - prog = {'status': 'status', 'value': 0} > - with mock.patch.multiple(utils, > - subprocess=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - > - (mocked['subprocess'].Popen.return_value.stdout > - .read.return_value) = ("20 %s" % terminal_width).encode() > - > - utils.write_progress(prog) > - > - mocked['subprocess'].Popen.assert_called_once_with( > - ["stty", "size"], > - stdout=mocked['subprocess'].PIPE > - ) > - output_message = mocked['sys'].stdout.write.call_args[0][0] > - mocked['sys'].stdout.write.assert_called_once() > - self.assertEqual(len(output_message), terminal_width + 1) > - self.assertEqual(output_message[-1], '\r') > - > - def test_utils_write_progress_use_default_term_width_on_failure(self): > - """ > - Ensures that write_progress() outputs a message with length equal > - to default terminal width (80) when the detecting terminal width > - has failed. > - """ > - default_terminal_width = 80 > - prog = {'status': 'status', 'value': 0} > - with mock.patch.multiple(utils, > - subprocess=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - mocked['subprocess'].Popen.side_effect = Exception() > - utils.write_progress(prog) > - > - self.assertEqual(len(mocked['sys'].stdout.write.call_args[0][0]), > - default_terminal_width + 1) > - mocked['sys'].stdout.write.assert_called_once() > - > - > -if __name__ == '__main__': > - unittest.main(exit=False) > diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py > deleted file mode 100644 > index ff744f7..0000000 > --- a/tests/test_virt_bootstrap.py > +++ /dev/null > @@ -1,464 +0,0 @@ > -# Authors: Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > -# > -# Copyright (C) 2017 Radostin Stoyanov > -# > -# This program is free software: you can redistribute it and/or modify > -# it under the terms of the GNU General Public License as published by > -# the Free Software Foundation, either version 3 of the License, or > -# (at your option) any later version. > - > -# This program is distributed in the hope that it will be useful, > -# but WITHOUT ANY WARRANTY; without even the implied warranty of > -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > -# GNU General Public License for more details. > - > -# You should have received a copy of the GNU General Public License > -# along with this program. If not, see <http://www.gnu.org/licenses/>. > - > - > -""" > -Unit tests for functions defined in virtBootstrap.virt-bootstrap > -""" > - > -from tests import unittest > -from tests import mock > -from tests import virt_bootstrap > -from tests import sources > - > - > -# pylint: disable=invalid-name > -class TestVirtBootstrap(unittest.TestCase): > - """ > - Test cases for virt_bootstrap module > - """ > - > - ################################### > - # Tests for: get_source(source_type) > - ################################### > - def test_get_invaid_source_type_should_fail(self): > - """ > - Ensures that get_source() throws an Exception when invalid source > - name was specified. > - """ > - with self.assertRaises(Exception) as source: > - virt_bootstrap.get_source('invalid') > - self.assertIn('invalid', str(source.exception)) > - > - def test_get_docker_source(self): > - """ > - Ensures that get_source() returns DockerSource when source name > - "docker" is requested. > - """ > - self.assertIs(virt_bootstrap.get_source('docker'), > - sources.DockerSource) > - > - def test_get_file_source(self): > - """ > - Ensures that get_source() returns FileSource when source name > - "file" is requested. > - """ > - self.assertIs(virt_bootstrap.get_source('file'), > - sources.FileSource) > - > - ################################### > - # Tests for: mapping_uid_gid() > - ################################### > - def test_mapping_uid_gid(self): > - """ > - Ensures that mapping_uid_gid() calls map_id() with > - correct parameters. > - """ > - dest = '/path' > - calls = [ > - { # Call 1 > - 'dest': dest, > - 'uid': [[0, 1000, 10]], > - 'gid': [[0, 1000, 10]] > - }, > - { # Call 2 > - 'dest': dest, > - 'uid': [], > - 'gid': [[0, 1000, 10]] > - }, > - { # Call 3 > - 'dest': dest, > - 'uid': [[0, 1000, 10]], > - 'gid': [] > - }, > - { # Call 4 > - 'dest': dest, > - 'uid': [[0, 1000, 10], [500, 500, 10]], > - 'gid': [[0, 1000, 10]] > - } > - ] > - > - expected_calls = [ > - # Expected from call 1 > - mock.call(dest, [0, 1000, 10], [0, 1000, 10]), > - # Expected from call 2 > - mock.call(dest, None, [0, 1000, 10]), > - # Expected from call 3 > - mock.call(dest, [0, 1000, 10], None), > - # Expected from call 4 > - mock.call(dest, [0, 1000, 10], [0, 1000, 10]), > - mock.call(dest, [500, 500, 10], None) > - > - ] > - with mock.patch('virtBootstrap.virt_bootstrap.map_id') as m_map_id: > - for args in calls: > - virt_bootstrap.mapping_uid_gid(args['dest'], > - args['uid'], > - args['gid']) > - > - m_map_id.assert_has_calls(expected_calls) > - > - ################################### > - # Tests for: map_id() > - ################################### > - @mock.patch('os.path.realpath') > - def test_map_id(self, m_realpath): > - """ > - Ensures that the UID/GID mapping applies to all files > - and directories in root file system. > - """ > - root_path = '/root' > - files = ['foo1', 'foo2'] > - m_realpath.return_value = root_path > - > - map_uid = [0, 1000, 10] > - map_gid = [0, 1000, 10] > - new_id = 'new_id' > - > - expected_calls = [ > - mock.call('/root', new_id, new_id), > - mock.call('/root/foo1', new_id, new_id), > - mock.call('/root/foo2', new_id, new_id) > - ] > - > - with mock.patch.multiple('os', > - lchown=mock.DEFAULT, > - lstat=mock.DEFAULT, > - walk=mock.DEFAULT) as mocked: > - > - mocked['walk'].return_value = [(root_path, [], files)] > - mocked['lstat']().st_uid = map_uid[0] > - mocked['lstat']().st_gid = map_gid[0] > - > - get_map_id = 'virtBootstrap.virt_bootstrap.get_map_id' > - with mock.patch(get_map_id) as m_get_map_id: > - m_get_map_id.return_value = new_id > - virt_bootstrap.map_id(root_path, map_uid, map_gid) > - > - mocked['lchown'].assert_has_calls(expected_calls) > - > - ################################### > - # Tests for: get_mapping_opts() > - ################################### > - def test_get_mapping_opts(self): > - """ > - Ensures that get_mapping_opts() returns correct options for > - mapping value. > - """ > - test_values = [ > - { > - 'mapping': [0, 1000, 10], > - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, > - }, > - { > - 'mapping': [0, 1000, 10], > - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, > - }, > - { > - 'mapping': [500, 1500, 1], > - 'expected_result': {'first': 500, 'last': 501, 'offset': 1000}, > - }, > - { > - 'mapping': [-1, -1, -1], > - 'expected_result': {'first': 0, 'last': 1, 'offset': 0}, > - } > - ] > - > - for test in test_values: > - res = virt_bootstrap.get_mapping_opts(test['mapping']) > - self.assertEqual(test['expected_result'], res) > - > - ################################### > - # Tests for: get_map_id() > - ################################### > - def test_get_map_id(self): > - """ > - Ensures that get_map_id() returns correct UID/GID mapping value. > - """ > - test_values = [ > - { > - 'old_id': 0, > - 'mapping': [0, 1000, 10], > - 'expected_result': 1000 > - }, > - { > - 'old_id': 5, > - 'mapping': [0, 500, 10], > - 'expected_result': 505 > - }, > - { > - 'old_id': 10, > - 'mapping': [0, 100, 10], > - 'expected_result': -1 > - }, > - ] > - for test in test_values: > - opts = virt_bootstrap.get_mapping_opts(test['mapping']) > - res = virt_bootstrap.get_map_id(test['old_id'], opts) > - self.assertEqual(test['expected_result'], res) > - > - ################################### > - # Tests for: parse_idmap() > - ################################### > - def test_parse_idmap(self): > - """ > - Ensures that parse_idmap() returns correct UID/GID mapping value. > - """ > - test_values = [ > - { > - 'mapping': ['0:1000:10', '0:100:10'], > - 'expected_result': [[0, 1000, 10], [0, 100, 10]], > - }, > - { > - 'mapping': ['0:1000:10'], > - 'expected_result': [[0, 1000, 10]], > - }, > - { > - 'mapping': ['500:1500:1'], > - 'expected_result': [[500, 1500, 1]], > - }, > - { > - 'mapping': ['-1:-1:-1'], > - 'expected_result': [[-1, -1, -1]], > - }, > - { > - 'mapping': [], > - 'expected_result': None, > - } > - ] > - for test in test_values: > - res = virt_bootstrap.parse_idmap(test['mapping']) > - self.assertEqual(test['expected_result'], res) > - > - def test_parse_idmap_raise_exception_on_invalid_mapping_value(self): > - """ > - Ensures that parse_idmap() raise ValueError on mapping value. > - """ > - with self.assertRaises(ValueError): > - virt_bootstrap.parse_idmap(['invalid']) > - > - ################################### > - # Tests for: bootstrap() > - ################################### > - def test_bootsrap_creates_directory_if_does_not_exist(self): > - """ > - Ensures that bootstrap() creates destination directory if > - it does not exists. > - """ > - src, dest = 'foo', 'bar' > - with mock.patch.multiple(virt_bootstrap, > - get_source=mock.DEFAULT, > - os=mock.DEFAULT) as mocked: > - mocked['os'].path.exists.return_value = False > - virt_bootstrap.bootstrap(src, dest) > - mocked['os'].path.exists.assert_called_once_with(dest) > - mocked['os'].makedirs.assert_called_once_with(dest) > - > - def test_bootstrap_exit_if_dest_is_invalid(self): > - """ > - Ensures that bootstrap() exits with code 1 when the destination > - path exists but it is not directory. > - """ > - src, dest = 'foo', 'bar' > - with mock.patch.multiple(virt_bootstrap, > - get_source=mock.DEFAULT, > - os=mock.DEFAULT, > - logger=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - mocked['os'].path.exists.return_value = True > - mocked['os'].path.isdir.return_value = False > - virt_bootstrap.bootstrap(src, dest) > - mocked['os'].path.isdir.assert_called_once_with(dest) > - mocked['sys'].exit.assert_called_once_with(1) > - > - def test_bootsrap_exit_if_no_write_access_on_dest(self): > - """ > - Ensures that bootstrap() exits with code 1 when the current user > - has not write permissions on the destination folder. > - """ > - src, dest = 'foo', 'bar' > - with mock.patch.multiple(virt_bootstrap, > - get_source=mock.DEFAULT, > - os=mock.DEFAULT, > - logger=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - mocked['os'].path.exists.return_value = True > - mocked['os'].path.isdir.return_value = True > - mocked['os'].access.return_value = False > - virt_bootstrap.bootstrap(src, dest) > - mocked['os'].access.assert_called_once_with(dest, > - mocked['os'].W_OK) > - mocked['sys'].exit.assert_called_once_with(1) > - > - def test_bootstrap_use_file_source_if_none_was_specified(self): > - """ > - Ensures that bootstrap() calls get_source() with argument > - 'file' when source format is not specified. > - """ > - src, dest = 'foo', 'bar' > - with mock.patch.multiple(virt_bootstrap, > - get_source=mock.DEFAULT, > - os=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - virt_bootstrap.bootstrap(src, dest) > - mocked['get_source'].assert_called_once_with('file') > - > - def test_bootstrap_successful_call(self): > - """ > - Ensures that bootstrap() creates source instance and calls the > - unpack method with destination path as argument. > - """ > - src, dest = 'foo', 'bar' > - with mock.patch.multiple(virt_bootstrap, > - get_source=mock.DEFAULT, > - os=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - mocked['os'].path.exists.return_value = True > - mocked['os'].path.isdir.return_value = True > - mocked['os'].access.return_value = True > - mocked_source = mock.Mock() > - mocked_unpack = mock.Mock() > - mocked_source.return_value.unpack = mocked_unpack > - mocked['get_source'].return_value = mocked_source > - virt_bootstrap.bootstrap(src, dest) > - # sys.exit should not be called > - mocked['sys'].exit.assert_not_called() > - mocked_source.assert_called_once() > - mocked_unpack.assert_called_once_with(dest) > - > - def test_bootstrap_all_params_are_passed_to_source_instance(self): > - """ > - Ensures that bootstrap() is passing all arguments to newly created > - source instance. > - """ > - params_list = ['dest', 'fmt', 'username', 'password', 'root_password', > - 'not_secure', 'no_cache', 'progress_cb'] > - params = {param: param for param in params_list} > - > - for kw_param in params_list: > - params[kw_param] = kw_param > - > - with mock.patch.multiple(virt_bootstrap, > - get_source=mock.DEFAULT, > - os=mock.DEFAULT, > - urlparse=mock.DEFAULT, > - progress=mock.DEFAULT, > - utils=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - mocked['os'].path.exists.return_value = True > - mocked['os'].path.isdir.return_value = True > - mocked['os'].access.return_value = True > - > - mocked['progress'].Progress.return_value = params['progress_cb'] > - > - mocked_source = mock.Mock() > - mocked_unpack = mock.Mock() > - mocked_source.return_value.unpack = mocked_unpack > - mocked['get_source'].return_value = mocked_source > - > - mocked_uri = mock.Mock() > - mocked['urlparse'].return_value = mocked_uri > - params['uri'] = mocked_uri > - > - virt_bootstrap.bootstrap(**params) > - # sys.exit should not be called > - mocked['sys'].exit.assert_not_called() > - > - mocked_source.assert_called_once() > - mocked_unpack.assert_called_once_with(params['dest']) > - > - called_with_args, called_with_kwargs = mocked_source.call_args > - self.assertEqual(called_with_args, ()) > - > - del params['dest'] > - del params['root_password'] > - params['progress'] = params.pop('progress_cb') > - for kwarg in params: > - self.assertEqual(called_with_kwargs[kwarg], params[kwarg]) > - > - def test_if_bootstrap_calls_set_root_password(self): > - """ > - Ensures that bootstrap() calls set_root_password() when the argument > - root_password is specified. > - """ > - src, fmt, dest, root_password = 'foo', 'fmt', 'bar', 'root_password' > - with mock.patch.multiple(virt_bootstrap, > - get_source=mock.DEFAULT, > - os=mock.DEFAULT, > - utils=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - mocked['os'].path.exists.return_value = True > - mocked['os'].path.isdir.return_value = True > - mocked['os'].access.return_value = True > - > - virt_bootstrap.bootstrap(src, dest, > - fmt=fmt, > - root_password=root_password) > - > - mocked['utils'].set_root_password.assert_called_once_with( > - fmt, dest, root_password) > - > - def test_if_bootstrap_calls_set_mapping_uid_gid(self): > - """ > - Ensures that bootstrap() calls mapping_uid_gid() when the argument > - uid_map or gid_map is specified. > - """ > - src, dest, uid_map, gid_map = 'foo', 'bar', 'id', 'id' > - expected_calls = [ > - mock.call('bar', None, 'id'), > - mock.call('bar', 'id', None), > - mock.call('bar', 'id', 'id') > - ] > - > - with mock.patch.multiple(virt_bootstrap, > - get_source=mock.DEFAULT, > - os=mock.DEFAULT, > - mapping_uid_gid=mock.DEFAULT, > - utils=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > - mocked['os'].path.exists.return_value = True > - mocked['os'].path.isdir.return_value = True > - mocked['os'].access.return_value = True > - > - virt_bootstrap.bootstrap(src, dest, gid_map=gid_map) > - virt_bootstrap.bootstrap(src, dest, uid_map=uid_map) > - virt_bootstrap.bootstrap(src, dest, > - uid_map=uid_map, gid_map=gid_map) > - mocked['mapping_uid_gid'].assert_has_calls(expected_calls) > - > - ################################### > - # Tests for: set_logging_conf() > - ################################### > - def test_if_logging_level_format_handler_are_set(self): > - """ > - Ensures that set_logging_conf() sets log level and adds new stream > - handler with formatting. > - """ > - with mock.patch('virtBootstrap.virt_bootstrap.logging') as m_logging: > - mocked_stream_hdlr = mock.Mock() > - m_logger = mock.Mock() > - m_logging.getLogger.return_value = m_logger > - m_logging.StreamHandler.return_value = mocked_stream_hdlr > - virt_bootstrap.set_logging_conf() > - m_logging.getLogger.assert_called_once_with('virtBootstrap') > - mocked_stream_hdlr.setFormatter.assert_called_once() > - m_logger.addHandler.assert_called_once_with(mocked_stream_hdlr) > - m_logger.setLevel.assert_called_once() > - > - > -if __name__ == '__main__': > - unittest.main(exit=False) ACK -- Cedric _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list