Move the implementation of UID/GID file ownership mapping for root file system in the utils module. This could be used for the ownership mapping for files in qcow2 images. Update the unit tests as well. --- src/virtBootstrap/utils.py | 72 ++++++++++++++++ src/virtBootstrap/virt_bootstrap.py | 74 +--------------- tests/test_utils.py | 149 ++++++++++++++++++++++++++++++++ tests/test_virt_bootstrap.py | 168 ++---------------------------------- 4 files changed, 230 insertions(+), 233 deletions(-) diff --git a/src/virtBootstrap/utils.py b/src/virtBootstrap/utils.py index e755eab..b82c37e 100644 --- a/src/virtBootstrap/utils.py +++ b/src/virtBootstrap/utils.py @@ -466,3 +466,75 @@ def write_progress(prog): # Write message to console sys.stdout.write(msg) sys.stdout.flush() + + +# The implementation for remapping ownership of all files inside a +# container's rootfs is inspired by the tool uidmapshift: +# +# Original author: Serge Hallyn <serge.hallyn@xxxxxxxxxx> +# Original license: GPLv2 +# http://bazaar.launchpad.net/%7Eserge-hallyn/+junk/nsexec/view/head:/uidmapshift.c + +def get_map_id(old_id, opts): + """ + Calculate new map_id. + """ + if old_id >= opts['first'] and old_id < opts['last']: + return old_id + opts['offset'] + return -1 + + +def get_mapping_opts(mapping): + """ + Get range options from UID/GID mapping + """ + start = mapping[0] if mapping[0] > -1 else 0 + target = mapping[1] if mapping[1] > -1 else 0 + count = mapping[2] if mapping[2] > -1 else 1 + + opts = { + 'first': start, + 'last': start + count, + 'offset': target - start + } + return opts + + +def map_id(path, map_uid, map_gid): + """ + Remapping ownership of all files inside a container's rootfs. + + map_gid and map_uid: Contain integers in a list with format: + [<start>, <target>, <count>] + """ + if map_uid: + uid_opts = get_mapping_opts(map_uid) + if map_gid: + gid_opts = get_mapping_opts(map_gid) + + for root, _ignore, files in os.walk(os.path.realpath(path)): + for name in [root] + files: + file_path = os.path.join(root, name) + + stat_info = os.lstat(file_path) + old_uid = stat_info.st_uid + old_gid = stat_info.st_gid + + new_uid = get_map_id(old_uid, uid_opts) if map_uid else -1 + new_gid = get_map_id(old_gid, gid_opts) if map_gid else -1 + os.lchown(file_path, new_uid, new_gid) + + +def mapping_uid_gid(dest, uid_map, gid_map): + """ + Mapping ownership for each uid_map and gid_map. + """ + len_diff = len(uid_map) - len(gid_map) + + if len_diff < 0: + uid_map += [None] * abs(len_diff) + elif len_diff > 0: + gid_map += [None] * len_diff + + for uid, gid in zip(uid_map, gid_map): + map_id(dest, uid, gid) diff --git a/src/virtBootstrap/virt_bootstrap.py b/src/virtBootstrap/virt_bootstrap.py index ddc5456..0bc2e2b 100755 --- a/src/virtBootstrap/virt_bootstrap.py +++ b/src/virtBootstrap/virt_bootstrap.py @@ -69,22 +69,6 @@ def get_source(source_type): raise Exception("Invalid image URL scheme: '%s'" % source_type) -# The implementation for remapping ownership of all files inside a -# container's rootfs is inspired by the tool uidmapshift: -# -# Original author: Serge Hallyn <serge.hallyn@xxxxxxxxxx> -# Original license: GPLv2 -# http://bazaar.launchpad.net/%7Eserge-hallyn/+junk/nsexec/view/head:/uidmapshift.c - -def get_map_id(old_id, opts): - """ - Calculate new map_id. - """ - if old_id >= opts['first'] and old_id < opts['last']: - return old_id + opts['offset'] - return -1 - - def parse_idmap(idmap): """ Parse user input to 'start', 'target' and 'count' values. @@ -107,62 +91,6 @@ def parse_idmap(idmap): raise ValueError("Invalid UID/GID mapping value: %s" % idmap) -def get_mapping_opts(mapping): - """ - Get range options from UID/GID mapping - """ - start = mapping[0] if mapping[0] > -1 else 0 - target = mapping[1] if mapping[1] > -1 else 0 - count = mapping[2] if mapping[2] > -1 else 1 - - opts = { - 'first': start, - 'last': start + count, - 'offset': target - start - } - return opts - - -def map_id(path, map_uid, map_gid): - """ - Remapping ownership of all files inside a container's rootfs. - - map_gid and map_uid: Contain integers in a list with format: - [<start>, <target>, <count>] - """ - if map_uid: - uid_opts = get_mapping_opts(map_uid) - if map_gid: - gid_opts = get_mapping_opts(map_gid) - - for root, _ignore, files in os.walk(os.path.realpath(path)): - for name in [root] + files: - file_path = os.path.join(root, name) - - stat_info = os.lstat(file_path) - old_uid = stat_info.st_uid - old_gid = stat_info.st_gid - - new_uid = get_map_id(old_uid, uid_opts) if map_uid else -1 - new_gid = get_map_id(old_gid, gid_opts) if map_gid else -1 - os.lchown(file_path, new_uid, new_gid) - - -def mapping_uid_gid(dest, uid_map, gid_map): - """ - Mapping ownership for each uid_map and gid_map. - """ - len_diff = len(uid_map) - len(gid_map) - - if len_diff < 0: - uid_map += [None] * abs(len_diff) - elif len_diff > 0: - gid_map += [None] * len_diff - - for uid, gid in zip(uid_map, gid_map): - map_id(dest, uid, gid) - - # pylint: disable=too-many-arguments def bootstrap(uri, dest, fmt=utils.DEFAULT_OUTPUT_FORMAT, @@ -206,7 +134,7 @@ def bootstrap(uri, dest, if fmt == "dir" and uid_map or gid_map: logger.info("Mapping UID/GID") - mapping_uid_gid(dest, uid_map, gid_map) + utils.mapping_uid_gid(dest, uid_map, gid_map) def set_logging_conf(loglevel=None): diff --git a/tests/test_utils.py b/tests/test_utils.py index e45a2c9..fbfa943 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -603,6 +603,155 @@ class TestUtils(unittest.TestCase): default_terminal_width + 1) mocked['sys'].stdout.write.assert_called_once() + ################################### + # Tests for: mapping_uid_gid() + ################################### + def test_mapping_uid_gid(self): + """ + Ensures that mapping_uid_gid() calls map_id() with + correct parameters. + """ + dest = '/path' + calls = [ + { # Call 1 + 'dest': dest, + 'uid': [[0, 1000, 10]], + 'gid': [[0, 1000, 10]] + }, + { # Call 2 + 'dest': dest, + 'uid': [], + 'gid': [[0, 1000, 10]] + }, + { # Call 3 + 'dest': dest, + 'uid': [[0, 1000, 10]], + 'gid': [] + }, + { # Call 4 + 'dest': dest, + 'uid': [[0, 1000, 10], [500, 500, 10]], + 'gid': [[0, 1000, 10]] + } + ] + + expected_calls = [ + # Expected from call 1 + mock.call(dest, [0, 1000, 10], [0, 1000, 10]), + # Expected from call 2 + mock.call(dest, None, [0, 1000, 10]), + # Expected from call 3 + mock.call(dest, [0, 1000, 10], None), + # Expected from call 4 + mock.call(dest, [0, 1000, 10], [0, 1000, 10]), + mock.call(dest, [500, 500, 10], None) + + ] + with mock.patch('virtBootstrap.utils.map_id') as m_map_id: + for args in calls: + utils.mapping_uid_gid(args['dest'], args['uid'], args['gid']) + + m_map_id.assert_has_calls(expected_calls) + + ################################### + # Tests for: map_id() + ################################### + @mock.patch('os.path.realpath') + def test_map_id(self, m_realpath): + """ + Ensures that the UID/GID mapping applies to all files + and directories in root file system. + """ + root_path = '/root' + files = ['foo1', 'foo2'] + m_realpath.return_value = root_path + + map_uid = [0, 1000, 10] + map_gid = [0, 1000, 10] + new_id = 'new_id' + + expected_calls = [ + mock.call('/root', new_id, new_id), + mock.call('/root/foo1', new_id, new_id), + mock.call('/root/foo2', new_id, new_id) + ] + + with mock.patch.multiple('os', + lchown=mock.DEFAULT, + lstat=mock.DEFAULT, + walk=mock.DEFAULT) as mocked: + + mocked['walk'].return_value = [(root_path, [], files)] + mocked['lstat']().st_uid = map_uid[0] + mocked['lstat']().st_gid = map_gid[0] + + get_map_id = 'virtBootstrap.utils.get_map_id' + with mock.patch(get_map_id) as m_get_map_id: + m_get_map_id.return_value = new_id + utils.map_id(root_path, map_uid, map_gid) + + mocked['lchown'].assert_has_calls(expected_calls) + + ################################### + # Tests for: get_mapping_opts() + ################################### + def test_get_mapping_opts(self): + """ + Ensures that get_mapping_opts() returns correct options for + mapping value. + """ + test_values = [ + { + 'mapping': [0, 1000, 10], + 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, + }, + { + 'mapping': [0, 1000, 10], + 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, + }, + { + 'mapping': [500, 1500, 1], + 'expected_result': {'first': 500, 'last': 501, 'offset': 1000}, + }, + { + 'mapping': [-1, -1, -1], + 'expected_result': {'first': 0, 'last': 1, 'offset': 0}, + } + ] + + for test in test_values: + res = utils.get_mapping_opts(test['mapping']) + self.assertEqual(test['expected_result'], res) + + ################################### + # Tests for: get_map_id() + ################################### + def test_get_map_id(self): + """ + Ensures that get_map_id() returns correct UID/GID mapping value. + """ + test_values = [ + { + 'old_id': 0, + 'mapping': [0, 1000, 10], + 'expected_result': 1000 + }, + { + 'old_id': 5, + 'mapping': [0, 500, 10], + 'expected_result': 505 + }, + { + 'old_id': 10, + 'mapping': [0, 100, 10], + 'expected_result': -1 + }, + ] + for test in test_values: + opts = utils.get_mapping_opts(test['mapping']) + res = utils.get_map_id(test['old_id'], opts) + self.assertEqual(test['expected_result'], res) + if __name__ == '__main__': unittest.main(exit=False) diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py index ff744f7..c0def7e 100644 --- a/tests/test_virt_bootstrap.py +++ b/tests/test_virt_bootstrap.py @@ -61,157 +61,6 @@ class TestVirtBootstrap(unittest.TestCase): sources.FileSource) ################################### - # Tests for: mapping_uid_gid() - ################################### - def test_mapping_uid_gid(self): - """ - Ensures that mapping_uid_gid() calls map_id() with - correct parameters. - """ - dest = '/path' - calls = [ - { # Call 1 - 'dest': dest, - 'uid': [[0, 1000, 10]], - 'gid': [[0, 1000, 10]] - }, - { # Call 2 - 'dest': dest, - 'uid': [], - 'gid': [[0, 1000, 10]] - }, - { # Call 3 - 'dest': dest, - 'uid': [[0, 1000, 10]], - 'gid': [] - }, - { # Call 4 - 'dest': dest, - 'uid': [[0, 1000, 10], [500, 500, 10]], - 'gid': [[0, 1000, 10]] - } - ] - - expected_calls = [ - # Expected from call 1 - mock.call(dest, [0, 1000, 10], [0, 1000, 10]), - # Expected from call 2 - mock.call(dest, None, [0, 1000, 10]), - # Expected from call 3 - mock.call(dest, [0, 1000, 10], None), - # Expected from call 4 - mock.call(dest, [0, 1000, 10], [0, 1000, 10]), - mock.call(dest, [500, 500, 10], None) - - ] - with mock.patch('virtBootstrap.virt_bootstrap.map_id') as m_map_id: - for args in calls: - virt_bootstrap.mapping_uid_gid(args['dest'], - args['uid'], - args['gid']) - - m_map_id.assert_has_calls(expected_calls) - - ################################### - # Tests for: map_id() - ################################### - @mock.patch('os.path.realpath') - def test_map_id(self, m_realpath): - """ - Ensures that the UID/GID mapping applies to all files - and directories in root file system. - """ - root_path = '/root' - files = ['foo1', 'foo2'] - m_realpath.return_value = root_path - - map_uid = [0, 1000, 10] - map_gid = [0, 1000, 10] - new_id = 'new_id' - - expected_calls = [ - mock.call('/root', new_id, new_id), - mock.call('/root/foo1', new_id, new_id), - mock.call('/root/foo2', new_id, new_id) - ] - - with mock.patch.multiple('os', - lchown=mock.DEFAULT, - lstat=mock.DEFAULT, - walk=mock.DEFAULT) as mocked: - - mocked['walk'].return_value = [(root_path, [], files)] - mocked['lstat']().st_uid = map_uid[0] - mocked['lstat']().st_gid = map_gid[0] - - get_map_id = 'virtBootstrap.virt_bootstrap.get_map_id' - with mock.patch(get_map_id) as m_get_map_id: - m_get_map_id.return_value = new_id - virt_bootstrap.map_id(root_path, map_uid, map_gid) - - mocked['lchown'].assert_has_calls(expected_calls) - - ################################### - # Tests for: get_mapping_opts() - ################################### - def test_get_mapping_opts(self): - """ - Ensures that get_mapping_opts() returns correct options for - mapping value. - """ - test_values = [ - { - 'mapping': [0, 1000, 10], - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, - }, - { - 'mapping': [0, 1000, 10], - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, - }, - { - 'mapping': [500, 1500, 1], - 'expected_result': {'first': 500, 'last': 501, 'offset': 1000}, - }, - { - 'mapping': [-1, -1, -1], - 'expected_result': {'first': 0, 'last': 1, 'offset': 0}, - } - ] - - for test in test_values: - res = virt_bootstrap.get_mapping_opts(test['mapping']) - self.assertEqual(test['expected_result'], res) - - ################################### - # Tests for: get_map_id() - ################################### - def test_get_map_id(self): - """ - Ensures that get_map_id() returns correct UID/GID mapping value. - """ - test_values = [ - { - 'old_id': 0, - 'mapping': [0, 1000, 10], - 'expected_result': 1000 - }, - { - 'old_id': 5, - 'mapping': [0, 500, 10], - 'expected_result': 505 - }, - { - 'old_id': 10, - 'mapping': [0, 100, 10], - 'expected_result': -1 - }, - ] - for test in test_values: - opts = virt_bootstrap.get_mapping_opts(test['mapping']) - res = virt_bootstrap.get_map_id(test['old_id'], opts) - self.assertEqual(test['expected_result'], res) - - ################################### # Tests for: parse_idmap() ################################### def test_parse_idmap(self): @@ -415,9 +264,9 @@ class TestVirtBootstrap(unittest.TestCase): def test_if_bootstrap_calls_set_mapping_uid_gid(self): """ Ensures that bootstrap() calls mapping_uid_gid() when the argument - uid_map or gid_map is specified. + uid_map or gid_map is specified and format='dir'. """ - src, dest, uid_map, gid_map = 'foo', 'bar', 'id', 'id' + src, dest, fmt, uid_map, gid_map = 'foo', 'bar', 'dir', 'id', 'id' expected_calls = [ mock.call('bar', None, 'id'), mock.call('bar', 'id', None), @@ -427,18 +276,17 @@ class TestVirtBootstrap(unittest.TestCase): with mock.patch.multiple(virt_bootstrap, get_source=mock.DEFAULT, os=mock.DEFAULT, - mapping_uid_gid=mock.DEFAULT, - utils=mock.DEFAULT, - sys=mock.DEFAULT) as mocked: + utils=mock.DEFAULT) as mocked: mocked['os'].path.exists.return_value = True mocked['os'].path.isdir.return_value = True mocked['os'].access.return_value = True - virt_bootstrap.bootstrap(src, dest, gid_map=gid_map) - virt_bootstrap.bootstrap(src, dest, uid_map=uid_map) - virt_bootstrap.bootstrap(src, dest, + virt_bootstrap.bootstrap(src, dest, fmt=fmt, gid_map=gid_map) + virt_bootstrap.bootstrap(src, dest, fmt=fmt, uid_map=uid_map) + virt_bootstrap.bootstrap(src, dest, fmt=fmt, uid_map=uid_map, gid_map=gid_map) - mocked['mapping_uid_gid'].assert_has_calls(expected_calls) + + mocked['utils'].mapping_uid_gid.assert_has_calls(expected_calls) ################################### # Tests for: set_logging_conf() -- 2.13.3 _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list