On Mon, 2017-07-24 at 09:14 +0100, Radostin Stoyanov wrote: > --- > tests/test_utils.py | 702 ++++++++++++++++++++++++++++++++++++++++++++++++++++ > 1 file changed, 702 insertions(+) > create mode 100644 tests/test_utils.py > > diff --git a/tests/test_utils.py b/tests/test_utils.py > new file mode 100644 > index 0000000..f1b31bd > --- /dev/null > +++ b/tests/test_utils.py > @@ -0,0 +1,702 @@ > +# Authors: > +# Cedric Bosdonnat <cbosdonnat@xxxxxxxx> > +# Radostin Stoyanov <rstoyanov1@xxxxxxxxx> > +# > +# Copyright (C) 2017 SUSE, Inc. > +# Copyright (C) 2017 Radostin Stoyanov > +# Again > +# This program is free software: you can redistribute it and/or modify > +# it under the terms of the GNU General Public License as published by > +# the Free Software Foundation, either version 3 of the License, or > +# (at your option) any later version. > + > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > + > +# You should have received a copy of the GNU General Public License > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > + > + > +""" > +Unit tests for functions defined in virtBootstrap.utils > +""" > + > +from tests import unittest > +from tests import mock > +from tests import utils > +try: > + # pylint: disable=redefined-builtin > + from importlib import reload > +except ImportError: > + pass > + > + > +# pylint: disable=invalid-name > +# pylint: disable=too-many-public-methods > +class TestUtils(unittest.TestCase): > + """ > + Ensures that functions defined in the utils module of virtBootstrap > + work as expected. > + """ > + > + ################################### > + # Tests for: checksum() > + ################################### > + def test_utils_checksum_return_false_on_invalid_hash(self): > + """ > + Ensures that checksum() returns False if the actual and expected > + hash sum of file are not equal. > + """ > + with mock.patch.multiple(utils, > + open=mock.DEFAULT, > + logger=mock.DEFAULT, > + hashlib=mock.DEFAULT) as mocked: > + path, sum_type, sum_expected = '/foo', 'sha256', 'bar' > + mocked['hashlib'].sha256.hexdigest.return_value = False > + self.assertFalse(utils.checksum(path, sum_type, sum_expected)) > + > + def test_utils_checksum_return_false_if_file_could_not_be_opened(self): > + """ > + Ensures that checksum() returns False if the file to be checked > + cannot be open for read. > + """ > + with mock.patch.multiple(utils, > + open=mock.DEFAULT, > + logger=mock.DEFAULT, > + hashlib=mock.DEFAULT) as mocked: > + mocked['open'].side_effect = IOError() > + self.assertFalse(utils.checksum('foo', 'sha256', 'bar')) > + > + def test_utils_checksum_return_true_on_valid_hash(self): > + """ > + Ensures that checksum() returns True when the actual and expected > + hash sum of file are equal. > + """ > + with mock.patch.multiple(utils, > + open=mock.DEFAULT, > + logger=mock.DEFAULT, > + hashlib=mock.DEFAULT) as mocked: > + path, sum_type, sum_expected = '/foo', 'sha256', 'bar' > + mocked['hashlib'].sha256.return_value.hexdigest.return_value \ > + = sum_expected > + self.assertTrue(utils.checksum(path, sum_type, sum_expected)) > + > + ################################### > + # Tests for: execute() > + ################################### > + def test_utils_execute_logging_on_successful_proc_call(self): > + """ > + Ensures that execute() creates log record of cmd, stdout and stderr > + when the exit code of process is 0. > + """ > + with mock.patch.multiple(utils, > + logger=mock.DEFAULT, > + Popen=mock.DEFAULT) as mocked: > + cmd = ['foo'] > + output, err = 'test_out', 'test_err' > + > + mocked['Popen'].return_value.returncode = 0 > + (mocked['Popen'].return_value > + .communicate.return_value) = (output.encode(), err.encode()) > + > + utils.execute(cmd) > + mocked['logger'].debug.assert_any_call("Call command:\n%s", cmd[0]) > + mocked['logger'].debug.assert_any_call("Stdout:\n%s", output) > + mocked['logger'].debug.assert_any_call("Stderr:\n%s", err) > + > + def test_utils_execute_raise_error_on_unsuccessful_proc_call(self): > + """ > + Ensures that execute() raise CalledProcessError exception when the > + exit code of process is not 0. > + """ > + with mock.patch('virtBootstrap.utils.Popen') as m_popen: > + m_popen.return_value.returncode = 1 > + m_popen.return_value.communicate.return_value = (b'output', b'err') > + with self.assertRaises(utils.CalledProcessError): > + utils.execute(['foo']) > + > + ################################### > + # Tests for: safe_untar() > + ################################### > + def test_utils_safe_untar_calls_execute(self): > + """ > + Ensures that safe_untar() calls execute with virt-sandbox > + command to extract source files to destination folder. > + Test for users with EUID 0 and 1000. > + """ > + with mock.patch('virtBootstrap.utils.os.geteuid') as m_geteuid: > + for uid in [0, 1000]: > + m_geteuid.return_value = uid > + reload(utils) > + with mock.patch('virtBootstrap.utils.execute') as m_execute: > + src, dest = 'foo', 'bar' > + utils.safe_untar('foo', 'bar') > + cmd = ['virt-sandbox', > + '-c', utils.LIBVIRT_CONN, > + '-m', 'host-bind:/mnt=' + dest, > + '--', > + '/bin/tar', 'xf', src, > + '-C', '/mnt', > + '--exclude', 'dev/*'] > + m_execute.assert_called_once_with(cmd) > + > + ################################### > + # Tests for: bytes_to_size() > + ################################### > + def test_utils_bytes_to_size(self): > + """ > + Validates the output of bytes_to_size() for some test cases. > + """ > + test_values = { > + 0: '0', 1: '1', 512: '512', 1000: '0.98 KiB', 1024: '1 KiB', > + 4096: '4 KiB', 5120: '5 KiB', 10 ** 10: '9.31 GiB' > + } > + for value in test_values: > + self.assertEqual(utils.bytes_to_size(value), test_values[value]) > + > + ################################### > + # Tests for: size_to_bytes() > + ################################### > + def test_utils_size_to_bytes(self): > + """ > + Validates the output of size_to_bytes() for some test cases. > + """ > + test_values = [1, '0'] > + test_formats = ['TB', 'GB', 'MB', 'KB', 'B'] > + expected_output = [1099511627776, 1073741824, 1048576, 1024, 1, > + 0, 0, 0, 0, 0] > + i = 0 > + for value in test_values: > + for fmt in test_formats: > + self.assertEqual(utils.size_to_bytes(value, fmt), > + expected_output[i]) > + i += 1 > + > + ################################### > + # Tests for: log_layer_extract() > + ################################### > + def test_utils_log_layer_extract(self): > + """ > + Ensures that log_layer_extract() updates the progress and creates > + log record with debug level. > + """ > + m_progress = mock.Mock() > + layer = ['sum_type', 'sum_value', 'layer_file', 'layer_size'] > + with mock.patch.multiple(utils, logger=mock.DEFAULT, > + bytes_to_size=mock.DEFAULT) as mocked: > + utils.log_layer_extract(layer, 'foo', 'bar', m_progress) > + mocked['bytes_to_size'].assert_called_once_with('layer_size') > + mocked['logger'].debug.assert_called_once() > + m_progress.assert_called_once() > + > + ################################### > + # Tests for: get_mime_type() > + ################################### > + @mock.patch('virtBootstrap.utils.Popen') > + def test_utils_get_mime_type(self, m_popen): > + """ > + Ensures that get_mime_type() returns the detected MIME type > + of /usr/bin/file. > + """ > + path = "foo" > + mime = "application/x-gzip" > + stdout = ('%s: %s' % (path, mime)).encode() > + m_popen.return_value.stdout.read.return_value = stdout > + self.assertEqual(utils.get_mime_type(path), mime) > + m_popen.assert_called_once_with(["/usr/bin/file", "--mime-type", path], > + stdout=utils.PIPE) > + > + ################################### > + # Tests for: untar_layers() > + ################################### > + def test_utils_untar_all_layers_in_order(self): > + """ > + Ensures that untar_layers() iterates through all passed layers > + in order. > + """ > + layers = ['l1', 'l2', 'l3'] > + layers_list = [['', '', layer] for layer in layers] > + dest_dir = '/foo' > + expected_calls = [mock.call(layer, dest_dir) for layer in layers] > + with mock.patch.multiple(utils, > + safe_untar=mock.DEFAULT, > + log_layer_extract=mock.DEFAULT) as mocked: > + utils.untar_layers(layers_list, dest_dir, mock.Mock()) > + mocked['safe_untar'].assert_has_calls(expected_calls) > + > + ################################### > + # Tests for: create_qcow2() > + ################################### > + def _apply_test_to_create_qcow2(self, expected_calls, *args): > + """ > + This method contains common test pattern used in the next two > + test cases. > + """ > + with mock.patch.multiple(utils, > + execute=mock.DEFAULT, > + logger=mock.DEFAULT, > + get_mime_type=mock.DEFAULT) as mocked: > + mocked['get_mime_type'].return_value = 'application/x-gzip' > + utils.create_qcow2(*args) > + mocked['execute'].assert_has_calls(expected_calls) > + > + def test_utils_create_qcow2_base_layer(self): > + """ > + Ensures that create_qcow2() creates base layer when > + backing_file = None. > + """ > + tar_file = 'foo' > + layer_file = 'bar' > + size = '5G' > + backing_file = None > + > + expected_calls = [ > + mock.call(["qemu-img", "create", "-f", "qcow2", layer_file, size]), > + > + mock.call(['virt-format', > + '--format=qcow2', > + '--partition=none', > + '--filesystem=ext3', > + '-a', layer_file]), > + > + mock.call(['guestfish', > + '-a', layer_file, > + '-m', '/dev/sda', > + 'tar-in', tar_file, '/', 'compress:gzip']) > + ] > + > + self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file, > + backing_file, size) > + > + def test_utils_create_qcow2_layer_with_backing_chain(self): > + """ > + Ensures that create_qcow2() creates new layer with backing chains > + when backing_file is specified. > + """ > + tar_file = 'foo' > + layer_file = 'bar' > + backing_file = 'base' > + size = '5G' > + > + expected_calls = [ > + mock.call(['qemu-img', 'create', > + '-b', backing_file, > + '-f', 'qcow2', > + layer_file, size]), > + > + mock.call(['guestfish', > + '-a', layer_file, > + '-m', '/dev/sda', > + 'tar-in', tar_file, '/', 'compress:gzip']) > + ] > + > + self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file, > + backing_file, size) > + > + ################################### > + # Tests for: extract_layers_in_qcow2() > + ################################### > + def test_utils_if_all_layers_extracted_in_order_in_qcow2(self): > + """ > + Ensures that extract_layers_in_qcow2() iterates through all > + layers in order. > + """ > + layers = ['l1', 'l2', 'l3'] > + layers_list = [['', '', layer] for layer in layers] > + dest_dir = '/foo' > + > + # Generate expected calls > + expected_calls = [] > + qcow2_backing_file = None > + for index, layer in enumerate(layers): > + qcow2_layer_file = dest_dir + "/layer-%s.qcow2" % index > + expected_calls.append( > + mock.call(layer, qcow2_layer_file, qcow2_backing_file)) > + qcow2_backing_file = qcow2_layer_file > + > + # Mocking out and execute > + with mock.patch.multiple(utils, > + create_qcow2=mock.DEFAULT, > + log_layer_extract=mock.DEFAULT) as mocked: > + utils.extract_layers_in_qcow2(layers_list, dest_dir, mock.Mock()) > + > + # Check actual calls > + mocked['create_qcow2'].assert_has_calls(expected_calls) > + > + ################################### > + # Tests for: get_image_dir() > + ################################### > + def test_utils_getimage_dir(self): > + """ > + Ensures that get_image_dir() returns path to DEFAULT_IMG_DIR > + if the no_cache argument is set to False and create it if > + does not exist. > + """ > + # Perform this test for UID 0 and 1000 > + for uid in [0, 1000]: > + with mock.patch('os.geteuid') as m_geteuid: > + m_geteuid.return_value = uid > + reload(utils) > + with mock.patch('os.makedirs') as m_makedirs: > + with mock.patch('os.path.exists') as m_path_exists: > + m_path_exists.return_value = False > + self.assertEqual(utils.get_image_dir(False), > + utils.DEFAULT_IMG_DIR) > + m_makedirs.assert_called_once_with(utils.DEFAULT_IMG_DIR) > + > + @mock.patch('tempfile.mkdtemp') > + def test_utils_getimage_dir_no_cache(self, m_mkdtemp): > + """ > + Ensures that get_image_dir() returns temporary file path created > + by tempfile.mkdtemp. > + """ > + m_mkdtemp.return_value = 'foo' > + self.assertEqual(utils.get_image_dir(True), 'foo') > + m_mkdtemp.assert_called_once() > + > + ################################### > + # Tests for: get_image_details() > + ################################### > + @mock.patch('virtBootstrap.utils.Popen') > + def test_utils_get_image_details_raise_error_on_fail(self, m_popen): > + """ > + Ensures that get_image_details() throws ValueError exception > + when stderr from skopeo is provided. > + """ > + src = 'docker://foo' > + m_popen.return_value.communicate.return_value = [b'', b'Error'] > + with self.assertRaises(ValueError): > + utils.get_image_details(src) > + > + @mock.patch('virtBootstrap.utils.Popen') > + def test_utils_get_image_details_return_json_obj_on_success(self, m_popen): > + """ > + Ensures that get_image_details() returns python dictionary which > + represents the data provided from stdout of skopeo when stderr > + is not present. > + """ > + src = 'docker://foo' > + json_dict = {'foo': 'bar'} > + stdout = utils.json.dumps(json_dict).encode() > + m_popen.return_value.communicate.return_value = [stdout, ''] > + self.assertDictEqual(utils.get_image_details(src), json_dict) > + > + def test_utils_get_image_details_all_argument_passed(self): > + """ > + Ensures that get_image_details() pass all argument values to > + skopeo inspect. > + """ > + src = 'docker://foo' > + raw, insecure = True, True > + username, password = 'user', 'password' > + cmd = ['skopeo', 'inspect', src, > + '--raw', > + '--tls-verify=false', > + "--creds=%s:%s" % (username, password)] > + > + with mock.patch.multiple(utils, > + Popen=mock.DEFAULT, > + PIPE=mock.DEFAULT) as mocked: > + mocked['Popen'].return_value.communicate.return_value = [b'{}', > + b''] > + utils.get_image_details(src, raw, insecure, username, password) > + > + mocked['Popen'].assert_called_once_with(cmd, > + stdout=mocked['PIPE'], > + stderr=mocked['PIPE']) > + > + ################################### > + # Tests for: is_new_layer_message() > + ################################### > + def test_utils_is_new_layer_message(self): > + """ > + Ensures that is_new_layer_message() returns True when message > + from the skopeo's stdout indicates processing of new layer > + and False otherwise. > + """ > + > + valid_msgs = [ > + "Copying blob sha256:be232718519c940b04bc57", > + "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2" > + ] > + > + invalid_msgs = [ > + 'Copying config sha256', 'test', '' > + ] > + > + for msg in valid_msgs: > + self.assertTrue(utils.is_new_layer_message(msg)) > + for msg in invalid_msgs: > + self.assertFalse(utils.is_new_layer_message(msg)) > + > + ################################### > + # Tests for: is_layer_config_message() > + ################################### > + def test_utils_is_layer_config_message(self): > + """ > + Ensures that is_layer_config_message() returns True when message > + from the skopeo's stdout indicates processing of manifest file > + of container image and False otherwise. > + """ > + invalid_msgs = [ > + "Copying blob sha256:be232718519c940b04bc57", > + "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2", > + '' > + ] > + > + valid_msg = 'Copying config sha256:d355ed3537e94e76389fd78b7724' > + > + self.assertTrue(utils.is_layer_config_message(valid_msg)) > + for msg in invalid_msgs: > + self.assertFalse(utils.is_layer_config_message(msg)) > + > + ################################### > + # Tests for: make_async() > + ################################### > + def test_utils_make_async(self): > + """ > + Ensures that make_async() sets O_NONBLOCK flag on PIPE. > + """ > + > + pipe = utils.Popen(["echo"], stdout=utils.PIPE).stdout > + fd = pipe.fileno() > + F_GETFL = utils.fcntl.F_GETFL > + O_NONBLOCK = utils.os.O_NONBLOCK > + > + self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK) > + utils.make_async(fd) > + self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK) > + > + ################################### > + # Tests for: read_async() > + ################################### > + def test_utils_read_async_successful_read(self): > + """ > + Ensures that read_async() calls read() of passed file descriptor. > + """ > + m_fd = mock.MagicMock() > + utils.read_async(m_fd) > + m_fd.read.assert_called_once() > + > + def test_utils_read_async_return_empty_str_on_EAGAIN_error(self): > + """ > + Ensures that read_async() ignores EAGAIN errors and returns > + empty string. > + """ > + m_fd = mock.MagicMock() > + m_fd.read.side_effect = IOError(utils.errno.EAGAIN, '') > + self.assertEqual(utils.read_async(m_fd), '') > + > + def test_utils_read_async_raise_errors(self): > + """ > + Ensures that read_async() does not ignore IOError which is different > + than EAGAIN and throws an exception. > + """ > + m_fd = mock.MagicMock() > + m_fd.read.side_effect = IOError() > + with self.assertRaises(IOError): > + utils.read_async(m_fd) > + > + ################################### > + # Tests for: str2float() > + ################################### > + def test_utils_str2float(self): > + """ > + Validates the output of str2float() for some test cases. > + """ > + test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25} > + for test in test_values: > + self.assertEqual(utils.str2float(test), test_values[test]) > + > + ################################### > + # Tests for: set_root_password_in_rootfs() > + ################################### > + def test_utils_set_root_password_in_rootfs_restore_permissions(self): > + """ > + Ensures that set_root_password_in_rootfs() restore shadow > + file permissions after edit. > + """ > + permissions = 700 > + rootfs_path = '/foo' > + shadow_file = '%s/etc/shadow' % rootfs_path > + > + m_open = mock.mock_open(read_data='') > + with mock.patch('virtBootstrap.utils.open', m_open, create=True): > + with mock.patch('virtBootstrap.utils.os') as m_os: > + m_os.stat.return_value = [permissions] > + m_os.path.join.return_value = shadow_file > + utils.set_root_password_in_rootfs(rootfs_path, 'password') > + > + expected_calls = [ > + mock.call.path.join(rootfs_path, 'etc/shadow'), > + mock.call.stat(shadow_file), > + mock.call.chmod(shadow_file, 438), > + mock.call.chmod(shadow_file, permissions) > + ] > + m_os.assert_has_calls(expected_calls) > + > + def test_utils_set_root_password_in_rootfs_restore_permissions_fail(self): > + """ > + Ensures that set_root_password_in_rootfs() restore shadow file > + permissions in case of failure. > + """ > + permissions = 700 > + rootfs_path = '/foo' > + shadow_file = '%s/etc/shadow' % rootfs_path > + > + m_open = mock.mock_open(read_data='') > + with mock.patch('virtBootstrap.utils.open', m_open, create=True): > + with mock.patch('virtBootstrap.utils.os') as m_os: > + m_os.stat.return_value = [permissions] > + m_os.path.join.return_value = shadow_file > + > + with self.assertRaises(Exception): > + m_open.side_effect = Exception > + utils.set_root_password_in_rootfs(rootfs_path, 'password') > + > + expected_calls = [ > + mock.call.path.join(rootfs_path, 'etc/shadow'), > + mock.call.stat(shadow_file), > + mock.call.chmod(shadow_file, 438), > + mock.call.chmod(shadow_file, permissions) > + ] > + m_os.assert_has_calls(expected_calls) > + > + def test_utils_set_root_password_in_rootfs_store_hash(self): > + """ > + Ensures that set_root_password_in_rootfs() stores the hashed > + root password in shadow file. > + """ > + rootfs_path = '/foo' > + password = 'secret' > + initial_value = '!locked' > + hashed_password = 'hashed_password' > + shadow_content = '\n'.join([ > + "root:%s::0:99999:7:::", > + "bin:*:17004:0:99999:7:::" > + "daemon:*:17004:0:99999:7:::", > + "adm:*:17004:0:99999:7:::" > + ]) > + > + m_open = mock.mock_open(read_data=shadow_content % initial_value) > + with mock.patch('virtBootstrap.utils.open', m_open, create=True): > + with mock.patch('virtBootstrap.utils.os'): > + with mock.patch('passlib.hosts.linux_context.hash') as m_hash: > + m_hash.return_value = hashed_password > + utils.set_root_password_in_rootfs(rootfs_path, password) > + > + m_hash.assert_called_once_with(password) > + m_open().write.assert_called_once_with(shadow_content > + % hashed_password) > + > + ################################### > + # Tests for: set_root_password_in_image() > + ################################### > + @mock.patch('virtBootstrap.utils.execute') > + def test_utils_set_root_password_in_image(self, m_execute): > + """ > + Ensures that set_root_password_in_image() calls virt-edit > + with correct arguments. > + """ > + image, password = 'foo', 'password' > + password_hash = ('$6$rounds=656000$PaQ/H4c/k8Ix9YOM$' > + 'cyD47r9PtAE2LhnkpdbVzsiQbM0/h2S/1Bv' > + 'u/sXqUtCg.3Ijp7TQy/8tEVstxMy5k5v4mh' > + 'CGFqnVv7S6wd.Ah/') > + > + expected_call = [ > + 'virt-edit', > + '-a', image, '/etc/shadow', > + '-e', 's,^root:.*?:,root:%s:,' % utils.re.escape(password_hash)] > + > + hash_function = 'virtBootstrap.utils.passlib.hosts.linux_context.hash' > + with mock.patch(hash_function) as m_hash: > + m_hash.return_value = password_hash > + utils.set_root_password_in_image(image, password) > + > + m_execute.assert_called_once_with(expected_call) > + > + ################################### > + # Tests for: set_root_password() > + ################################### > + @mock.patch('virtBootstrap.utils.set_root_password_in_rootfs') > + def test_utils_set_root_password_dir(self, m_set_root_password_in_rootfs): > + """ > + Ensures that set_root_password() calls set_root_password_in_rootfs() > + when the format is set to "dir". > + """ > + fmt, dest, root_password = 'dir', 'dest', 'root_password' > + utils.set_root_password(fmt, dest, root_password) > + > + m_set_root_password_in_rootfs.assert_called_once_with( > + dest, root_password > + ) > + > + @mock.patch('virtBootstrap.utils.set_root_password_in_image') > + def test_utils_set_root_password_dir(self, m_set_root_password_in_image): > + """ > + Ensures that set_root_password() calls set_root_password_in_image() > + when the format is set to "qcow2" with the path to the last > + extracted layer. > + """ > + fmt, dest, root_password = 'qcow2', 'dest', 'root_password' > + layers = ['layer-0.qcow2', 'layer-1.qcow2'] > + > + with mock.patch('os.listdir') as m_listdir: > + m_listdir.return_value = layers > + utils.set_root_password(fmt, dest, root_password) > + > + m_set_root_password_in_image.assert_called_once_with( > + utils.os.path.join(dest, max(layers)), > + root_password > + ) > + > + ################################### > + # Tests for: write_progress() > + ################################### > + def test_utils_write_progress_fill_terminal_width(self): > + """ > + Ensures that write_progress() outputs a message with length > + equal to terminal width and last symbol '\r'. > + """ > + terminal_width = 120 > + prog = {'status': 'status', 'value': 0} > + with mock.patch.multiple(utils, > + Popen=mock.DEFAULT, > + PIPE=mock.DEFAULT, > + sys=mock.DEFAULT) as mocked: > + > + (mocked['Popen'].return_value.stdout > + .read.return_value) = ("20 %s" % terminal_width).encode() > + > + utils.write_progress(prog) > + > + mocked['Popen'].assert_called_once_with(["stty", "size"], > + stdout=mocked['PIPE']) > + output_message = mocked['sys'].stdout.write.call_args[0][0] > + mocked['sys'].stdout.write.assert_called_once() > + self.assertEqual(len(output_message), terminal_width + 1) > + self.assertEqual(output_message[-1], '\r') > + > + def test_utils_write_progress_use_default_term_width_on_failure(self): > + """ > + Ensures that write_progress() outputs a message with length equal > + to default terminal width (80) when the detecting terminal width > + has failed. > + """ > + default_terminal_width = 80 > + prog = {'status': 'status', 'value': 0} > + with mock.patch.multiple(utils, Popen=mock.DEFAULT, > + sys=mock.DEFAULT) as mocked: > + mocked['Popen'].side_effect = Exception() > + utils.write_progress(prog) > + > + self.assertEqual(len(mocked['sys'].stdout.write.call_args[0][0]), > + default_terminal_width + 1) > + mocked['sys'].stdout.write.assert_called_once() > + > + > +if __name__ == '__main__': > + unittest.main(exit=False) ACK -- Cedric _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list