Long story short, when the batcave upgrade happened on Friday we found out that rbac_playbook doesn't work on el7 due to an issue with nss-altfiles. I figured out how to sidestep the issue by changing the approach that rbac_playbook takes. It used to get all the groups for the user running the script and check for an intersection between those groups and the required groups for the playbook being run. The new version looks at the groups required for the playbook being run, gathers all the users in those groups and verifies that the user running rbac_playbook is in that list before proceeding. I've included the changes below for security review before updating anything on batcave01 Tim diff --git a/ansible_utils.spec b/ansible_utils.spec index 2e26a70..f097020 100644 --- a/ansible_utils.spec +++ b/ansible_utils.spec @@ -20,12 +20,14 @@ Requires: PyYAML BuildRequires: python2-devel BuildRequires: python-setuptools BuildRequires: python-kitchen +BuildRequires: python-mock # python-dingus isn't built for el7, can't run tests %if 0%{?rhel} > 6 BuildRequires: pytest BuildRequires: PyYAML BuildRequires: python-dingus +BuildRequires: python-mock %endif # the cli uses argparse, which is not part of the standard libaray in python2.6 diff --git a/ansible_utils/rbac_playbook.py b/ansible_utils/rbac_playbook.py index 42be23f..c6a952c 100755 --- a/ansible_utils/rbac_playbook.py +++ b/ansible_utils/rbac_playbook.py @@ -140,7 +140,7 @@ def generate_message(result, username, playbook_name, args, checksum): :return: rendered string summarizing rbac activity """ subject = "[rbac-playbook] {0} {1} ran {2}".format(result, - username.pw_name, + username, playbook_name) body = ['Details:'] body.extend(['{0}: {1}'.format(key, value) for key, value in args.iteritems()]) @@ -188,10 +188,13 @@ def run_sudo_command(playbook_file, playbook_args): full_filename = os.path.abspath(os.path.join(config['config']['PLAYBOOKS_DIR'], playbook_file)) + python_args = ['cd', config['config']['ANSIBLE_DIR'], ';', + '/usr/bin/python2', config['config']['ANSIBLE_PLAYBOOK'], + full_filename] + python_args.extend(playbook_args) executable = '/usr/bin/sudo' - args = ['-i', '/usr/bin/python2', - config['config']['ANSIBLE_PLAYBOOK'], full_filename] - args.extend(playbook_args) + args = ['-i', '/bin/bash', '-i', '-c', + ' '.join(python_args)] # Note: (1) The function used here has to pass the environment to # ansible-playbook so that it can connect to the ssh-agent correctly @@ -206,36 +209,56 @@ def run_sudo_command(playbook_file, playbook_args): print "EXECV: %s %s" % (executable, ' '.join(args)) os.execv(executable, args) +def _get_username(): + """Retrieve the username for the user which started execution of rbac_playbook""" -def can_run(acl, groups, playbook_file): + user = os.getlogin() + username = pwd.getpwnam(user) + return username.pw_name + +def _get_group_members(groupname): + """Find the members of a specific group + + :param groupname: name of group to find members of + :return: list of usernames for members of the given group, empty list if the group does not exist""" + + group_data = None + try: + group_data = grp.getgrnam(groupname) + except KeyError: + return [] + + return group_data.gr_mem + +def _get_playbook_authorized_users(grouplist): + """Retrieve a set of all users who are members of one or more groups + + :param grouplist: list of one or more group names + :return: set of usernames corresponding to the union of members for each group in the grouplist""" + + userlist = [] + for groupname in grouplist: + userlist.extend(_get_group_members(groupname)) + + return set(userlist) + +def can_run(acl, username, playbook_file): """ determines whether the current user is allowed to run a playbook :param acl: dictionary of acl information - :param groups: groups of which the user is a member + :param username: username of user running the utility :param playbook_file: playbook file that is being run :return: True if the user is authorized, False if unauthorized """ - # exact match quick route + + authorized_groups = acl[playbook_file] + if playbook_file in acl: - pb_groups = frozenset(acl[playbook_file]) - if groups.intersection(pb_groups): + pb_authorized = _get_playbook_authorized_users(authorized_groups) + if username in pb_authorized: return True return False - -def get_groups(): - """ retrieve the groups of which the current user is currently a member - :return: (username,groups) where groups is a set of groups which the current user is a member - """ - username = os.getlogin() - user = pwd.getpwnam(username) - groups = set(g.gr_name for g in grp.getgrall() if username in g.gr_mem) - groups.add(grp.getgrgid(user.pw_gid).gr_name) - groups = frozenset(groups) - - return user, groups - - def generate_args(options): """ Generate ansible-playbook compatible args representing the information passed into rbac @@ -272,11 +295,11 @@ def rbac_playbook(playbook_name, options): :param playbook_name: name of playbook file to run :param options: dictionary of options """ - username, groups = get_groups() + username = _get_username() checksum = get_checksum(playbook_name) # raise exception if not allowed - if not can_run(config['acls'], groups, playbook_name): + if not can_run(config['acls'], username, playbook_name): notify(generate_message('FAILURE', username, playbook_name, options, checksum)) msg ="user {0} is not authorized to run {1}".format(username.pw_name, playbook_name) raise RbacAuthException(msg) diff --git a/tests/test_rbac_playbook.py b/tests/test_rbac_playbook.py index 765997d..0b76825 100644 --- a/tests/test_rbac_playbook.py +++ b/tests/test_rbac_playbook.py @@ -1,6 +1,8 @@ import copy +import grp from dingus import Dingus +from mock import MagicMock, Mock from ansible_utils import rbac_playbook as rbac @@ -93,7 +95,7 @@ class TestGeneratePlaybookArgs(object): test_args = rbac.generate_args(ref_options) assert test_args == ['-l', ':'.join(ref_limit)] - def test_simple_limit(self): + def test_simple_tag(self): ref_tags = ['tag', 'tagg', 'tog', 'togg'] ref_options = {'tags': ref_tags} @@ -123,25 +125,101 @@ class TestGeneratePlaybookArgs(object): '--start-at-task="some task"'] -class TestDetermineCanRun(object): +class TestCanRun(object): def setup_method(self, method): self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn', 'sysadmin-pony'], 'pony.yml': ['sysadmin-pony'] } - def test_allow_one_of_multiple(self, monkeypatch): - ref_groups = set(['sysadmin-unicorn']) - test_can_run = rbac.can_run(self.ref_acl, ref_groups, 'group/unicorns.yml') + def test_allow(self, monkeypatch): + ref_user = 'twilightsparkle' + ref_authorized = ['twilightsparkle', 'fluttershy'] + + stub_authorized = MagicMock(return_value=ref_authorized) + monkeypatch.setattr(rbac, '_get_playbook_authorized_users', stub_authorized) + + test_can_run = rbac.can_run(self.ref_acl, ref_user, 'group/unicorns.yml') assert test_can_run - def test_allow_one_of_multiple(self, monkeypatch): - ref_groups = set(['sysadmin-kittycat']) - test_can_run = rbac.can_run(self.ref_acl, ref_groups, 'group/unicorns.yml') + def test_disallow(self, monkeypatch): + ref_user = 'mittens' + ref_authorized = ['twilightsparkle', 'fluttershy'] + + stub_authorized = MagicMock(return_value=ref_authorized) + monkeypatch.setattr(rbac, '_get_playbook_authorized_users', stub_authorized) + + test_can_run = rbac.can_run(self.ref_acl, ref_user, 'group/unicorns.yml') assert not test_can_run +class TestGetAuthorizedUsers(object): + + def setup_method(self, method): + self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn', 'sysadmin-pony'], + 'pony.yml': ['sysadmin-pony'] + } + + def test_get_single_group_users(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony'] + ref_group_members = ['twilightsparkle', 'fluttershy'] + ref_members = set(ref_group_members) + + stub_group_members = MagicMock(return_value=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + + def test_get_multiple_group_users(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn'] + ref_group_members = [['twilightsparkle', 'fluttershy'], ['charlie', 'buttercup']] + ref_members = set(ref_group_members[0] + ref_group_members[1]) + + stub_group_members = MagicMock(side_effect=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + + def test_get_multiple_group_with_overlap(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn'] + ref_group_members = [['twilightsparkle', 'fluttershy'], ['charlie', 'buttercup', 'twilightsparkle']] + ref_members = set(ref_group_members[0] + ref_group_members[1]) + + stub_group_members = MagicMock(side_effect=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + +class TestGetGroupMembers(object): + + def setup_method(self, method): + self.ref_groupname = 'sysadmin-ponies' + + def test_valid_group(self, monkeypatch): + ref_members = ['twilightsparkle', 'fluttershy'] + + stub_getgrnam = Mock(gr_mem=ref_members) + stub_grp = MagicMock(return_value=stub_getgrnam) + monkeypatch.setattr(grp, 'getgrnam', stub_grp) + + test_group_members = rbac._get_group_members(self.ref_groupname) + + assert test_group_members == ref_members + + def test_invalid_group(self, monkeypatch): + stub_grp = MagicMock(side_effect=KeyError) + monkeypatch.setattr(grp, 'getgrnam', stub_grp) + + test_group_members = rbac._get_group_members(self.ref_groupname) + + assert test_group_members == [] class TestConfigMerge(object):
Attachment:
pgpWcEogRX_uH.pgp
Description: OpenPGP digital signature
_______________________________________________ infrastructure mailing list infrastructure@xxxxxxxxxxxxxxxxxxxxxxx http://lists.fedoraproject.org/postorius/infrastructure@xxxxxxxxxxxxxxxxxxxxxxx