Remove the functions set_root_password(), set_root_password_in_image() and replace the unit tests with more abstract form of testing which creates qcow2 images and test if the shadow file has been changed. --- src/virtBootstrap/sources/docker_source.py | 4 +- src/virtBootstrap/sources/file_source.py | 4 +- src/virtBootstrap/utils.py | 72 +++++++++++++++++++----------- src/virtBootstrap/virt_bootstrap.py | 14 +++--- tests/test_build_qcow2_image.py | 57 +++++++++++++++++++++++ tests/test_utils.py | 62 ------------------------- tests/test_virt_bootstrap.py | 12 ++--- 7 files changed, 124 insertions(+), 101 deletions(-) diff --git a/src/virtBootstrap/sources/docker_source.py b/src/virtBootstrap/sources/docker_source.py index 45e6c1d..207d166 100644 --- a/src/virtBootstrap/sources/docker_source.py +++ b/src/virtBootstrap/sources/docker_source.py @@ -65,6 +65,7 @@ class DockerSource(object): self.password = kwargs.get('password', None) self.uid_map = kwargs.get('uid_map', None) self.gid_map = kwargs.get('gid_map', None) + self.root_password = kwargs.get('root_password', None) self.output_format = kwargs.get('fmt', utils.DEFAULT_OUTPUT_FORMAT) self.insecure = kwargs.get('not_secure', False) self.no_cache = kwargs.get('no_cache', False) @@ -279,7 +280,8 @@ class DockerSource(object): dest=dest, progress=self.progress, uid_map=self.uid_map, - gid_map=self.gid_map + gid_map=self.gid_map, + root_password=self.root_password ) else: raise Exception("Unknown format:" + self.output_format) diff --git a/src/virtBootstrap/sources/file_source.py b/src/virtBootstrap/sources/file_source.py index 70b91db..ef03741 100644 --- a/src/virtBootstrap/sources/file_source.py +++ b/src/virtBootstrap/sources/file_source.py @@ -49,6 +49,7 @@ class FileSource(object): self.output_format = kwargs.get('fmt', utils.DEFAULT_OUTPUT_FORMAT) self.uid_map = kwargs.get('uid_map', None) self.gid_map = kwargs.get('gid_map', None) + self.root_password = kwargs.get('root_password', None) self.progress = kwargs['progress'].update_progress def unpack(self, dest): @@ -72,7 +73,8 @@ class FileSource(object): dest=dest, progress=self.progress, uid_map=self.uid_map, - gid_map=self.gid_map + gid_map=self.gid_map, + root_password=self.root_password ) else: raise Exception("Unknown format:" + self.output_format) diff --git a/src/virtBootstrap/utils.py b/src/virtBootstrap/utils.py index 66679c5..0328bbd 100644 --- a/src/virtBootstrap/utils.py +++ b/src/virtBootstrap/utils.py @@ -32,7 +32,6 @@ import sys import tarfile import tempfile import logging -import re import guestfs import passlib.hosts @@ -65,6 +64,7 @@ class Build_QCOW2_Image(object): @param tar_files: List of tar files from which to create rootfs @param uid_map: Mappings for UID of files in rootfs @param gid_map: Mappings for GID of files in rootfs + @param root_password: Root password to be set @param dest: Destination directory where qcow2 images will be stored @param progress: Instance of the progress module @@ -80,16 +80,22 @@ class Build_QCOW2_Image(object): self.progress = kwargs['progress'] self.uid_map = kwargs.get('uid_map', None) self.gid_map = kwargs.get('gid_map', None) + self.root_password = kwargs.get('root_password', None) self.fmt = 'qcow2' + nlayers = len(self.tar_files) + # Add an overlay layer to apply changes in the shadow file + if self.root_password: + nlayers += 1 self.qcow2_files = [os.path.join(kwargs['dest'], 'layer-%s.qcow2' % i) - for i in range(len(self.tar_files))] + for i in range(nlayers)] self.g = guestfs.GuestFS(python_return_dict=True) self.create_base_qcow2_layer(self.tar_files[0], self.qcow2_files[0]) if len(self.tar_files) > 1: self.create_backing_chains() self.g.shutdown() + self.set_root_password() def create_disk(self, qcow2_file, backingfile=None, readonly=False): """ @@ -184,6 +190,45 @@ class Build_QCOW2_Image(object): if new_uid != -1 or new_gid != -1: self.g.lchown(new_uid, new_gid, os.path.join('/', member.name)) + def set_root_password(self): + """ + Set root password in the shadow file of image. + + Mount the last the layer to update the shadow file with the + hash for the root password. + """ + if self.root_password is None: + return + + self.progress("Setting root password", logger=logger) + + self.create_disk( + self.qcow2_files[-1], + backingfile=self.qcow2_files[-2] + ) + + self.g.launch() + self.g.mount(self.g.list_devices()[0], '/') + + if not self.g.is_file('/etc/shadow'): + logger.error('shadow file was not found in this image') + return + + shadow_content = self.g.read_file('/etc/shadow').split('\n') + + if not shadow_content: + logger.error('shadow file was empty') + return + + # Note: 'shadow_content' is of type list, thus pass-by-reference + # is used here. + set_password_in_shadow_content( + shadow_content, + self.root_password + ) + self.g.write('/etc/shadow', '\n'.join(shadow_content)) + self.g.umount('/') + def get_compression_type(tar_file): """ @@ -472,29 +517,6 @@ def set_root_password_in_rootfs(rootfs, password): os.chmod(shadow_file, shadow_file_permissions) -def set_root_password_in_image(image, password): - """ - Set password on the root user within image - """ - password_hash = passlib.hosts.linux_context.hash(password) - execute(['virt-edit', - '-a', image, '/etc/shadow', - '-e', 's,^root:.*?:,root:%s:,' % re.escape(password_hash)]) - - -def set_root_password(fmt, dest, root_password): - """ - Set root password - """ - if fmt == "dir": - set_root_password_in_rootfs(dest, root_password) - elif fmt == "qcow2": - layers = [layer for layer in os.listdir(dest) - if layer.startswith('layer-')] - set_root_password_in_image(os.path.join(dest, max(layers)), - root_password) - - def write_progress(prog): """ Write progress output to console diff --git a/src/virtBootstrap/virt_bootstrap.py b/src/virtBootstrap/virt_bootstrap.py index 6c1e944..cbd9f0c 100755 --- a/src/virtBootstrap/virt_bootstrap.py +++ b/src/virtBootstrap/virt_bootstrap.py @@ -128,15 +128,17 @@ def bootstrap(uri, dest, gid_map=gid_map, not_secure=not_secure, no_cache=no_cache, + root_password=root_password, progress=prog).unpack(dest) - if root_password is not None: - logger.info("Setting password of the root account") - utils.set_root_password(fmt, dest, root_password) + if fmt == "dir": + if root_password is not None: + logger.info("Setting password of the root account") + utils.set_root_password_in_rootfs(dest, root_password) - if fmt == "dir" and (uid_map or gid_map): - logger.info("Mapping UID/GID") - utils.mapping_uid_gid(dest, uid_map, gid_map) + if uid_map or gid_map: + logger.info("Mapping UID/GID") + utils.mapping_uid_gid(dest, uid_map, gid_map) def set_logging_conf(loglevel=None): diff --git a/tests/test_build_qcow2_image.py b/tests/test_build_qcow2_image.py index 5ed0ade..3f160d9 100644 --- a/tests/test_build_qcow2_image.py +++ b/tests/test_build_qcow2_image.py @@ -28,6 +28,11 @@ from tests import mock from tests import virt_bootstrap import guestfs +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + TAR_DIR = os.path.realpath('tests/tarfiles') IMAGES_DIR = os.path.realpath('tests/images') ROOTFS_TREE = { @@ -276,3 +281,55 @@ class TestIDMapping(TestBuild_Image): ) self.set_ownership_mapping(idmap, idmap) self.check_qcow2_images(images) + + +class TestSetRootPassword(TestBuild_Image): + """ + Ensures that setting root password works for qcow2 images. + """ + def create_user_dirs(self, tar_handle): + """ + Create rootfs tree with only /etc/shadow file + """ + data = 'root:*:' + t_info = tarfile.TarInfo('/etc/shadow') + t_info.size = len(data) + t_info.mode = 0o000 + t_info.type = tarfile.REGTYPE + t_info.uid = 0 + t_info.gid = 0 + tar_handle.addfile(t_info, StringIO(data.encode('utf-8'))) + + def check_members(self, g): + """ + Check if all files and folders exist in the qcow2 image. + """ + name = '/etc/shadow' + self.assertTrue(g.is_file(name), "Not file %s" % name) + stat = g.stat(name) + self.assertEqual(stat['mode'] & 0o777, 0o000) + self.assertEqual(stat['uid'], 0) + self.assertEqual(stat['gid'], 0) + content = g.read_file('/etc/shadow') + root_hash = content.split(':')[1] + self.assertNotEqual(root_hash, '*') + self.assertGreater(len(root_hash), 100) + + def runTest(self): + """ + Create qcow2 image from each dummy tarfile using FileSource + and set root password. + """ + images = [] + for filename in TAR_FILES: + dest = os.path.join(IMAGES_DIR, filename.split('.')[0]) + images.append(os.path.join(dest, "layer-1.qcow2")) + uri = os.path.join(TAR_DIR, filename) + virt_bootstrap.bootstrap( + uri=uri, + dest=dest, + fmt='qcow2', + progress_cb=mock.Mock(), + root_password='test' + ) + self.check_qcow2_images(images) diff --git a/tests/test_utils.py b/tests/test_utils.py index 7ce2ba4..56f3460 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -496,68 +496,6 @@ class TestUtils(unittest.TestCase): % hashed_password) ################################### - # Tests for: set_root_password_in_image() - ################################### - @mock.patch('virtBootstrap.utils.execute') - def test_utils_set_root_password_in_image(self, m_execute): - """ - Ensures that set_root_password_in_image() calls virt-edit - with correct arguments. - """ - image, password = 'foo', 'password' - password_hash = ('$6$rounds=656000$PaQ/H4c/k8Ix9YOM$' - 'cyD47r9PtAE2LhnkpdbVzsiQbM0/h2S/1Bv' - 'u/sXqUtCg.3Ijp7TQy/8tEVstxMy5k5v4mh' - 'CGFqnVv7S6wd.Ah/') - - expected_call = [ - 'virt-edit', - '-a', image, '/etc/shadow', - '-e', 's,^root:.*?:,root:%s:,' % utils.re.escape(password_hash)] - - hash_function = 'virtBootstrap.utils.passlib.hosts.linux_context.hash' - with mock.patch(hash_function) as m_hash: - m_hash.return_value = password_hash - utils.set_root_password_in_image(image, password) - - m_execute.assert_called_once_with(expected_call) - - ################################### - # Tests for: set_root_password() - ################################### - @mock.patch('virtBootstrap.utils.set_root_password_in_rootfs') - def test_utils_set_root_password_dir(self, m_set_root_password_in_rootfs): - """ - Ensures that set_root_password() calls set_root_password_in_rootfs() - when the format is set to "dir". - """ - fmt, dest, root_password = 'dir', 'dest', 'root_password' - utils.set_root_password(fmt, dest, root_password) - - m_set_root_password_in_rootfs.assert_called_once_with( - dest, root_password - ) - - @mock.patch('virtBootstrap.utils.set_root_password_in_image') - def test_utils_set_root_password_qcow2(self, m_set_root_password_in_image): - """ - Ensures that set_root_password() calls set_root_password_in_image() - when the format is set to "qcow2" with the path to the last - extracted layer. - """ - fmt, dest, root_password = 'qcow2', 'dest', 'root_password' - layers = ['layer-0.qcow2', 'layer-1.qcow2'] - - with mock.patch('os.listdir') as m_listdir: - m_listdir.return_value = layers - utils.set_root_password(fmt, dest, root_password) - - m_set_root_password_in_image.assert_called_once_with( - utils.os.path.join(dest, max(layers)), - root_password - ) - - ################################### # Tests for: write_progress() ################################### def test_utils_write_progress_fill_terminal_width(self): diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py index c0def7e..808dcf8 100644 --- a/tests/test_virt_bootstrap.py +++ b/tests/test_virt_bootstrap.py @@ -239,12 +239,12 @@ class TestVirtBootstrap(unittest.TestCase): for kwarg in params: self.assertEqual(called_with_kwargs[kwarg], params[kwarg]) - def test_if_bootstrap_calls_set_root_password(self): + def test_if_bootstrap_calls_set_root_password_in_rootfs(self): """ - Ensures that bootstrap() calls set_root_password() when the argument - root_password is specified. + Ensures that bootstrap() calls set_root_password_in_rootfs() + when the argument root_password is specified. """ - src, fmt, dest, root_password = 'foo', 'fmt', 'bar', 'root_password' + src, fmt, dest, root_password = 'foo', 'dir', 'bar', 'root_password' with mock.patch.multiple(virt_bootstrap, get_source=mock.DEFAULT, os=mock.DEFAULT, @@ -258,8 +258,8 @@ class TestVirtBootstrap(unittest.TestCase): fmt=fmt, root_password=root_password) - mocked['utils'].set_root_password.assert_called_once_with( - fmt, dest, root_password) + (mocked['utils'].set_root_password_in_rootfs + .assert_called_once_with(dest, root_password)) def test_if_bootstrap_calls_set_mapping_uid_gid(self): """ -- 2.13.4 _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list