On Thu, 2017-08-03 at 14:13 +0100, Radostin Stoyanov wrote: > Move the implementation of UID/GID file ownership mapping for root file > system in the utils module. > > This could be used for the ownership mapping for files in qcow2 images. > > Update the unit tests as well. > --- > src/virtBootstrap/utils.py | 72 ++++++++++++++++ > src/virtBootstrap/virt_bootstrap.py | 74 +--------------- > tests/test_utils.py | 149 ++++++++++++++++++++++++++++++++ > tests/test_virt_bootstrap.py | 168 ++---------------------------------- > 4 files changed, 230 insertions(+), 233 deletions(-) > > diff --git a/src/virtBootstrap/utils.py b/src/virtBootstrap/utils.py > index 965dcad..071bb3e 100644 > --- a/src/virtBootstrap/utils.py > +++ b/src/virtBootstrap/utils.py > @@ -471,3 +471,75 @@ def write_progress(prog): > # Write message to console > sys.stdout.write(msg) > sys.stdout.flush() > + > + > +# The implementation for remapping ownership of all files inside a > +# container's rootfs is inspired by the tool uidmapshift: > +# > +# Original author: Serge Hallyn <serge.hallyn@xxxxxxxxxx> > +# Original license: GPLv2 > +# http://bazaar.launchpad.net/%7Eserge-hallyn/+junk/nsexec/view/head:/uidmapshift.c > + > +def get_map_id(old_id, opts): > + """ > + Calculate new map_id. > + """ > + if old_id >= opts['first'] and old_id < opts['last']: > + return old_id + opts['offset'] > + return -1 > + > + > +def get_mapping_opts(mapping): > + """ > + Get range options from UID/GID mapping > + """ > + start = mapping[0] if mapping[0] > -1 else 0 > + target = mapping[1] if mapping[1] > -1 else 0 > + count = mapping[2] if mapping[2] > -1 else 1 > + > + opts = { > + 'first': start, > + 'last': start + count, > + 'offset': target - start > + } > + return opts > + > + > +def map_id(path, map_uid, map_gid): > + """ > + Remapping ownership of all files inside a container's rootfs. > + > + map_gid and map_uid: Contain integers in a list with format: > + [<start>, <target>, <count>] > + """ > + if map_uid: > + uid_opts = get_mapping_opts(map_uid) > + if map_gid: > + gid_opts = get_mapping_opts(map_gid) > + > + for root, _ignore, files in os.walk(os.path.realpath(path)): > + for name in [root] + files: > + file_path = os.path.join(root, name) > + > + stat_info = os.lstat(file_path) > + old_uid = stat_info.st_uid > + old_gid = stat_info.st_gid > + > + new_uid = get_map_id(old_uid, uid_opts) if map_uid else -1 > + new_gid = get_map_id(old_gid, gid_opts) if map_gid else -1 > + os.lchown(file_path, new_uid, new_gid) > + > + > +def mapping_uid_gid(dest, uid_map, gid_map): > + """ > + Mapping ownership for each uid_map and gid_map. > + """ > + len_diff = len(uid_map) - len(gid_map) > + > + if len_diff < 0: > + uid_map += [None] * abs(len_diff) > + elif len_diff > 0: > + gid_map += [None] * len_diff > + > + for uid, gid in zip(uid_map, gid_map): > + map_id(dest, uid, gid) > diff --git a/src/virtBootstrap/virt_bootstrap.py b/src/virtBootstrap/virt_bootstrap.py > index ddc5456..0bc2e2b 100755 > --- a/src/virtBootstrap/virt_bootstrap.py > +++ b/src/virtBootstrap/virt_bootstrap.py > @@ -69,22 +69,6 @@ def get_source(source_type): > raise Exception("Invalid image URL scheme: '%s'" % source_type) > > > -# The implementation for remapping ownership of all files inside a > -# container's rootfs is inspired by the tool uidmapshift: > -# > -# Original author: Serge Hallyn <serge.hallyn@xxxxxxxxxx> > -# Original license: GPLv2 > -# http://bazaar.launchpad.net/%7Eserge-hallyn/+junk/nsexec/view/head:/uidmapshift.c > - > -def get_map_id(old_id, opts): > - """ > - Calculate new map_id. > - """ > - if old_id >= opts['first'] and old_id < opts['last']: > - return old_id + opts['offset'] > - return -1 > - > - > def parse_idmap(idmap): > """ > Parse user input to 'start', 'target' and 'count' values. > @@ -107,62 +91,6 @@ def parse_idmap(idmap): > raise ValueError("Invalid UID/GID mapping value: %s" % idmap) > > > -def get_mapping_opts(mapping): > - """ > - Get range options from UID/GID mapping > - """ > - start = mapping[0] if mapping[0] > -1 else 0 > - target = mapping[1] if mapping[1] > -1 else 0 > - count = mapping[2] if mapping[2] > -1 else 1 > - > - opts = { > - 'first': start, > - 'last': start + count, > - 'offset': target - start > - } > - return opts > - > - > -def map_id(path, map_uid, map_gid): > - """ > - Remapping ownership of all files inside a container's rootfs. > - > - map_gid and map_uid: Contain integers in a list with format: > - [<start>, <target>, <count>] > - """ > - if map_uid: > - uid_opts = get_mapping_opts(map_uid) > - if map_gid: > - gid_opts = get_mapping_opts(map_gid) > - > - for root, _ignore, files in os.walk(os.path.realpath(path)): > - for name in [root] + files: > - file_path = os.path.join(root, name) > - > - stat_info = os.lstat(file_path) > - old_uid = stat_info.st_uid > - old_gid = stat_info.st_gid > - > - new_uid = get_map_id(old_uid, uid_opts) if map_uid else -1 > - new_gid = get_map_id(old_gid, gid_opts) if map_gid else -1 > - os.lchown(file_path, new_uid, new_gid) > - > - > -def mapping_uid_gid(dest, uid_map, gid_map): > - """ > - Mapping ownership for each uid_map and gid_map. > - """ > - len_diff = len(uid_map) - len(gid_map) > - > - if len_diff < 0: > - uid_map += [None] * abs(len_diff) > - elif len_diff > 0: > - gid_map += [None] * len_diff > - > - for uid, gid in zip(uid_map, gid_map): > - map_id(dest, uid, gid) > - > - > # pylint: disable=too-many-arguments > def bootstrap(uri, dest, > fmt=utils.DEFAULT_OUTPUT_FORMAT, > @@ -206,7 +134,7 @@ def bootstrap(uri, dest, > > if fmt == "dir" and uid_map or gid_map: > logger.info("Mapping UID/GID") > - mapping_uid_gid(dest, uid_map, gid_map) > + utils.mapping_uid_gid(dest, uid_map, gid_map) > > > def set_logging_conf(loglevel=None): > diff --git a/tests/test_utils.py b/tests/test_utils.py > index e45a2c9..7ce2ba4 100644 > --- a/tests/test_utils.py > +++ b/tests/test_utils.py > @@ -603,6 +603,155 @@ class TestUtils(unittest.TestCase): > default_terminal_width + 1) > mocked['sys'].stdout.write.assert_called_once() > > + ################################### > + # Tests for: mapping_uid_gid() > + ################################### > + def test_mapping_uid_gid(self): > + """ > + Ensures that mapping_uid_gid() calls map_id() with > + correct parameters. > + """ > + dest = '/path' > + calls = [ > + { # Call 1 > + 'dest': dest, > + 'uid': [[0, 1000, 10]], > + 'gid': [[0, 1000, 10]] > + }, > + { # Call 2 > + 'dest': dest, > + 'uid': [], > + 'gid': [[0, 1000, 10]] > + }, > + { # Call 3 > + 'dest': dest, > + 'uid': [[0, 1000, 10]], > + 'gid': [] > + }, > + { # Call 4 > + 'dest': dest, > + 'uid': [[0, 1000, 10], [500, 500, 10]], > + 'gid': [[0, 1000, 10]] > + } > + ] > + > + expected_calls = [ > + # Expected from call 1 > + mock.call(dest, [0, 1000, 10], [0, 1000, 10]), > + # Expected from call 2 > + mock.call(dest, None, [0, 1000, 10]), > + # Expected from call 3 > + mock.call(dest, [0, 1000, 10], None), > + # Expected from call 4 > + mock.call(dest, [0, 1000, 10], [0, 1000, 10]), > + mock.call(dest, [500, 500, 10], None) > + ] > + > + with mock.patch('virtBootstrap.utils.map_id') as m_map_id: > + for args in calls: > + utils.mapping_uid_gid(args['dest'], args['uid'], args['gid']) > + > + m_map_id.assert_has_calls(expected_calls) > + > + ################################### > + # Tests for: map_id() > + ################################### > + @mock.patch('os.path.realpath') > + def test_map_id(self, m_realpath): > + """ > + Ensures that the UID/GID mapping applies to all files > + and directories in root file system. > + """ > + root_path = '/root' > + files = ['foo1', 'foo2'] > + m_realpath.return_value = root_path > + > + map_uid = [0, 1000, 10] > + map_gid = [0, 1000, 10] > + new_id = 'new_id' > + > + expected_calls = [ > + mock.call('/root', new_id, new_id), > + mock.call('/root/foo1', new_id, new_id), > + mock.call('/root/foo2', new_id, new_id) > + ] > + > + with mock.patch.multiple('os', > + lchown=mock.DEFAULT, > + lstat=mock.DEFAULT, > + walk=mock.DEFAULT) as mocked: > + > + mocked['walk'].return_value = [(root_path, [], files)] > + mocked['lstat']().st_uid = map_uid[0] > + mocked['lstat']().st_gid = map_gid[0] > + > + get_map_id = 'virtBootstrap.utils.get_map_id' > + with mock.patch(get_map_id) as m_get_map_id: > + m_get_map_id.return_value = new_id > + utils.map_id(root_path, map_uid, map_gid) > + > + mocked['lchown'].assert_has_calls(expected_calls) > + > + ################################### > + # Tests for: get_mapping_opts() > + ################################### > + def test_get_mapping_opts(self): > + """ > + Ensures that get_mapping_opts() returns correct options for > + mapping value. > + """ > + test_values = [ > + { > + 'mapping': [0, 1000, 10], > + 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, > + }, > + { > + 'mapping': [0, 1000, 10], > + 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, > + }, > + { > + 'mapping': [500, 1500, 1], > + 'expected_result': {'first': 500, 'last': 501, 'offset': 1000}, > + }, > + { > + 'mapping': [-1, -1, -1], > + 'expected_result': {'first': 0, 'last': 1, 'offset': 0}, > + } > + ] > + > + for test in test_values: > + res = utils.get_mapping_opts(test['mapping']) > + self.assertEqual(test['expected_result'], res) > + > + ################################### > + # Tests for: get_map_id() > + ################################### > + def test_get_map_id(self): > + """ > + Ensures that get_map_id() returns correct UID/GID mapping value. > + """ > + test_values = [ > + { > + 'old_id': 0, > + 'mapping': [0, 1000, 10], > + 'expected_result': 1000 > + }, > + { > + 'old_id': 5, > + 'mapping': [0, 500, 10], > + 'expected_result': 505 > + }, > + { > + 'old_id': 10, > + 'mapping': [0, 100, 10], > + 'expected_result': -1 > + }, > + ] > + for test in test_values: > + opts = utils.get_mapping_opts(test['mapping']) > + res = utils.get_map_id(test['old_id'], opts) > + self.assertEqual(test['expected_result'], res) > + > > if __name__ == '__main__': > unittest.main(exit=False) > diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py > index ff744f7..c0def7e 100644 > --- a/tests/test_virt_bootstrap.py > +++ b/tests/test_virt_bootstrap.py > @@ -61,157 +61,6 @@ class TestVirtBootstrap(unittest.TestCase): > sources.FileSource) > > ################################### > - # Tests for: mapping_uid_gid() > - ################################### > - def test_mapping_uid_gid(self): > - """ > - Ensures that mapping_uid_gid() calls map_id() with > - correct parameters. > - """ > - dest = '/path' > - calls = [ > - { # Call 1 > - 'dest': dest, > - 'uid': [[0, 1000, 10]], > - 'gid': [[0, 1000, 10]] > - }, > - { # Call 2 > - 'dest': dest, > - 'uid': [], > - 'gid': [[0, 1000, 10]] > - }, > - { # Call 3 > - 'dest': dest, > - 'uid': [[0, 1000, 10]], > - 'gid': [] > - }, > - { # Call 4 > - 'dest': dest, > - 'uid': [[0, 1000, 10], [500, 500, 10]], > - 'gid': [[0, 1000, 10]] > - } > - ] > - > - expected_calls = [ > - # Expected from call 1 > - mock.call(dest, [0, 1000, 10], [0, 1000, 10]), > - # Expected from call 2 > - mock.call(dest, None, [0, 1000, 10]), > - # Expected from call 3 > - mock.call(dest, [0, 1000, 10], None), > - # Expected from call 4 > - mock.call(dest, [0, 1000, 10], [0, 1000, 10]), > - mock.call(dest, [500, 500, 10], None) > - > - ] > - with mock.patch('virtBootstrap.virt_bootstrap.map_id') as m_map_id: > - for args in calls: > - virt_bootstrap.mapping_uid_gid(args['dest'], > - args['uid'], > - args['gid']) > - > - m_map_id.assert_has_calls(expected_calls) > - > - ################################### > - # Tests for: map_id() > - ################################### > - @mock.patch('os.path.realpath') > - def test_map_id(self, m_realpath): > - """ > - Ensures that the UID/GID mapping applies to all files > - and directories in root file system. > - """ > - root_path = '/root' > - files = ['foo1', 'foo2'] > - m_realpath.return_value = root_path > - > - map_uid = [0, 1000, 10] > - map_gid = [0, 1000, 10] > - new_id = 'new_id' > - > - expected_calls = [ > - mock.call('/root', new_id, new_id), > - mock.call('/root/foo1', new_id, new_id), > - mock.call('/root/foo2', new_id, new_id) > - ] > - > - with mock.patch.multiple('os', > - lchown=mock.DEFAULT, > - lstat=mock.DEFAULT, > - walk=mock.DEFAULT) as mocked: > - > - mocked['walk'].return_value = [(root_path, [], files)] > - mocked['lstat']().st_uid = map_uid[0] > - mocked['lstat']().st_gid = map_gid[0] > - > - get_map_id = 'virtBootstrap.virt_bootstrap.get_map_id' > - with mock.patch(get_map_id) as m_get_map_id: > - m_get_map_id.return_value = new_id > - virt_bootstrap.map_id(root_path, map_uid, map_gid) > - > - mocked['lchown'].assert_has_calls(expected_calls) > - > - ################################### > - # Tests for: get_mapping_opts() > - ################################### > - def test_get_mapping_opts(self): > - """ > - Ensures that get_mapping_opts() returns correct options for > - mapping value. > - """ > - test_values = [ > - { > - 'mapping': [0, 1000, 10], > - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, > - }, > - { > - 'mapping': [0, 1000, 10], > - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000}, > - }, > - { > - 'mapping': [500, 1500, 1], > - 'expected_result': {'first': 500, 'last': 501, 'offset': 1000}, > - }, > - { > - 'mapping': [-1, -1, -1], > - 'expected_result': {'first': 0, 'last': 1, 'offset': 0}, > - } > - ] > - > - for test in test_values: > - res = virt_bootstrap.get_mapping_opts(test['mapping']) > - self.assertEqual(test['expected_result'], res) > - > - ################################### > - # Tests for: get_map_id() > - ################################### > - def test_get_map_id(self): > - """ > - Ensures that get_map_id() returns correct UID/GID mapping value. > - """ > - test_values = [ > - { > - 'old_id': 0, > - 'mapping': [0, 1000, 10], > - 'expected_result': 1000 > - }, > - { > - 'old_id': 5, > - 'mapping': [0, 500, 10], > - 'expected_result': 505 > - }, > - { > - 'old_id': 10, > - 'mapping': [0, 100, 10], > - 'expected_result': -1 > - }, > - ] > - for test in test_values: > - opts = virt_bootstrap.get_mapping_opts(test['mapping']) > - res = virt_bootstrap.get_map_id(test['old_id'], opts) > - self.assertEqual(test['expected_result'], res) > - > - ################################### > # Tests for: parse_idmap() > ################################### > def test_parse_idmap(self): > @@ -415,9 +264,9 @@ class TestVirtBootstrap(unittest.TestCase): > def test_if_bootstrap_calls_set_mapping_uid_gid(self): > """ > Ensures that bootstrap() calls mapping_uid_gid() when the argument > - uid_map or gid_map is specified. > + uid_map or gid_map is specified and format='dir'. > """ > - src, dest, uid_map, gid_map = 'foo', 'bar', 'id', 'id' > + src, dest, fmt, uid_map, gid_map = 'foo', 'bar', 'dir', 'id', 'id' > expected_calls = [ > mock.call('bar', None, 'id'), > mock.call('bar', 'id', None), > @@ -427,18 +276,17 @@ class TestVirtBootstrap(unittest.TestCase): > with mock.patch.multiple(virt_bootstrap, > get_source=mock.DEFAULT, > os=mock.DEFAULT, > - mapping_uid_gid=mock.DEFAULT, > - utils=mock.DEFAULT, > - sys=mock.DEFAULT) as mocked: > + utils=mock.DEFAULT) as mocked: > mocked['os'].path.exists.return_value = True > mocked['os'].path.isdir.return_value = True > mocked['os'].access.return_value = True > > - virt_bootstrap.bootstrap(src, dest, gid_map=gid_map) > - virt_bootstrap.bootstrap(src, dest, uid_map=uid_map) > - virt_bootstrap.bootstrap(src, dest, > + virt_bootstrap.bootstrap(src, dest, fmt=fmt, gid_map=gid_map) > + virt_bootstrap.bootstrap(src, dest, fmt=fmt, uid_map=uid_map) > + virt_bootstrap.bootstrap(src, dest, fmt=fmt, > uid_map=uid_map, gid_map=gid_map) > - mocked['mapping_uid_gid'].assert_has_calls(expected_calls) > + > + mocked['utils'].mapping_uid_gid.assert_has_calls(expected_calls) > > ################################### > # Tests for: set_logging_conf() ACK -- Cedric _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list