<snip> > Why the change here? Why the addition of bash? No changed, just a slightly botched diff. New, properly done diff at the end of this email but that should be the only discrepancy. > > +def can_run(acl, username, playbook_file): > > """ determines whether the current user is allowed to run a > > playbook > > :param acl: dictionary of acl information > > - :param groups: groups of which the user is a member > > + :param username: username of user running the utility > > :param playbook_file: playbook file that is being run > > :return: True if the user is authorized, False if unauthorized > > """ > > - # exact match quick route > > + > > + authorized_groups = acl[playbook_file] > > + > > if playbook_file in acl: > > - pb_groups = frozenset(acl[playbook_file]) > > - if groups.intersection(pb_groups): > > + pb_authorized = > > _get_playbook_authorized_users(authorized_groups) > > + if username in pb_authorized: > > return True > > return False > > It looks like this function used to return False if an invalid > playbook_file was passed to it, but now it will raise a KeyError. Is > that intentional? I see there is a test case for it, but the > `rbac_playbook` function doesn't handle it. KeyError now handled and test case added, thanks. Tim diff --git a/ansible_utils.spec b/ansible_utils.spec index a340832..ba45a3d 100644 --- a/ansible_utils.spec +++ b/ansible_utils.spec @@ -20,12 +20,14 @@ Requires: PyYAML BuildRequires: python2-devel BuildRequires: python-setuptools BuildRequires: python-kitchen +BuildRequires: python-mock # python-dingus isn't built for el7, can't run tests %if 0%{?rhel} > 6 BuildRequires: pytest BuildRequires: PyYAML BuildRequires: python-dingus +BuildRequires: python-mock %endif # the cli uses argparse, which is not part of the standard libaray in python2.6 diff --git a/ansible_utils/rbac_playbook.py b/ansible_utils/rbac_playbook.py index c850b94..553225b 100755 --- a/ansible_utils/rbac_playbook.py +++ b/ansible_utils/rbac_playbook.py @@ -140,7 +140,7 @@ def generate_message(result, username, playbook_name, args, checksum): :return: rendered string summarizing rbac activity """ subject = "[rbac-playbook] {0} {1} ran {2}".format(result, - username.pw_name, + username, playbook_name) body = ['Details:'] body.extend(['{0}: {1}'.format(key, value) for key, value in args.iteritems()]) @@ -209,36 +209,60 @@ def run_sudo_command(playbook_file, playbook_args): print "EXECV: %s %s" % (executable, ' '.join(args)) os.execv(executable, args) +def _get_username(): + """Retrieve the username for the user which started execution of rbac_playbook""" -def can_run(acl, groups, playbook_file): + user = os.getlogin() + username = pwd.getpwnam(user) + return username.pw_name + +def _get_group_members(groupname): + """Find the members of a specific group + + :param groupname: name of group to find members of + :return: list of usernames for members of the given group, empty list if the group does not exist""" + + group_data = None + try: + group_data = grp.getgrnam(groupname) + except KeyError: + return [] + + return group_data.gr_mem + +def _get_playbook_authorized_users(grouplist): + """Retrieve a set of all users who are members of one or more groups + + :param grouplist: list of one or more group names + :return: set of usernames corresponding to the union of members for each group in the grouplist""" + + userlist = [] + for groupname in grouplist: + userlist.extend(_get_group_members(groupname)) + + return set(userlist) + +def can_run(acl, username, playbook_file): """ determines whether the current user is allowed to run a playbook :param acl: dictionary of acl information - :param groups: groups of which the user is a member + :param username: username of user running the utility :param playbook_file: playbook file that is being run - :return: True if the user is authorized, False if unauthorized + :return: True if the user is authorized, False if unauthorized or the + playbook filename is not in the acl information """ - # exact match quick route + + try: + authorized_groups = acl[playbook_file] + except KeyError: + return False + if playbook_file in acl: - pb_groups = frozenset(acl[playbook_file]) - if groups.intersection(pb_groups): + pb_authorized = _get_playbook_authorized_users(authorized_groups) + if username in pb_authorized: return True return False - -def get_groups(): - """ retrieve the groups of which the current user is currently a member - :return: (username,groups) where groups is a set of groups which the current user is a member - """ - username = os.getlogin() - user = pwd.getpwnam(username) - groups = set(g.gr_name for g in grp.getgrall() if username in g.gr_mem) - groups.add(grp.getgrgid(user.pw_gid).gr_name) - groups = frozenset(groups) - - return user, groups - - def generate_args(options): """ Generate ansible-playbook compatible args representing the information passed into rbac @@ -275,11 +299,11 @@ def rbac_playbook(playbook_name, options): :param playbook_name: name of playbook file to run :param options: dictionary of options """ - username, groups = get_groups() + username = _get_username() checksum = get_checksum(playbook_name) # raise exception if not allowed - if not can_run(config['acls'], groups, playbook_name): + if not can_run(config['acls'], username, playbook_name): notify(generate_message('FAILURE', username, playbook_name, options, checksum)) msg ="user {0} is not authorized to run {1}".format(username.pw_name, playbook_name) raise RbacAuthException(msg) diff --git a/tests/test_rbac_playbook.py b/tests/test_rbac_playbook.py index 765997d..665281e 100644 --- a/tests/test_rbac_playbook.py +++ b/tests/test_rbac_playbook.py @@ -1,6 +1,8 @@ import copy +import grp from dingus import Dingus +from mock import MagicMock, Mock from ansible_utils import rbac_playbook as rbac @@ -83,7 +85,6 @@ class TestProcessArgs(object): class TestGeneratePlaybookArgs(object): def test_simple_limit(self): - ref_playbook = 'unicorn.yml' ref_limit = ['pink.unicorn.com', 'white.unicorn.com', 'blue.unicorn.com', @@ -93,7 +94,7 @@ class TestGeneratePlaybookArgs(object): test_args = rbac.generate_args(ref_options) assert test_args == ['-l', ':'.join(ref_limit)] - def test_simple_limit(self): + def test_simple_tag(self): ref_tags = ['tag', 'tagg', 'tog', 'togg'] ref_options = {'tags': ref_tags} @@ -102,8 +103,6 @@ class TestGeneratePlaybookArgs(object): assert test_args == ['-t', ','.join(ref_tags)] def test_all(self): - - ref_playbook = 'unicorn.yml' ref_limit = ['pink.unicorn.com', 'white.unicorn.com', 'blue.unicorn.com', @@ -123,25 +122,109 @@ class TestGeneratePlaybookArgs(object): '--start-at-task="some task"'] -class TestDetermineCanRun(object): +class TestCanRun(object): def setup_method(self, method): self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn', 'sysadmin-pony'], 'pony.yml': ['sysadmin-pony'] } - def test_allow_one_of_multiple(self, monkeypatch): - ref_groups = set(['sysadmin-unicorn']) - test_can_run = rbac.can_run(self.ref_acl, ref_groups, 'group/unicorns.yml') + def test_allow(self, monkeypatch): + ref_user = 'twilightsparkle' + ref_authorized = ['twilightsparkle', 'fluttershy'] + + stub_authorized = MagicMock(return_value=ref_authorized) + monkeypatch.setattr(rbac, '_get_playbook_authorized_users', stub_authorized) + + test_can_run = rbac.can_run(self.ref_acl, ref_user, 'group/unicorns.yml') assert test_can_run - def test_allow_one_of_multiple(self, monkeypatch): - ref_groups = set(['sysadmin-kittycat']) - test_can_run = rbac.can_run(self.ref_acl, ref_groups, 'group/unicorns.yml') + def test_disallow(self, monkeypatch): + ref_user = 'mittens' + ref_authorized = ['twilightsparkle', 'fluttershy'] + + stub_authorized = MagicMock(return_value=ref_authorized) + monkeypatch.setattr(rbac, '_get_playbook_authorized_users', stub_authorized) + + test_can_run = rbac.can_run(self.ref_acl, ref_user, 'group/unicorns.yml') assert not test_can_run + def should_return_false_invalid_file(self, monkeypatch): + + ref_user = 'twilightsparkle' + + test_can_run = rbac.can_run({}, ref_user, 'group/unicorns.yml') + + assert not test_can_run + +class TestGetAuthorizedUsers(object): + + def setup_method(self, method): + self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn', 'sysadmin-pony'], + 'pony.yml': ['sysadmin-pony'] + } + + def test_get_single_group_users(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony'] + ref_group_members = ['twilightsparkle', 'fluttershy'] + ref_members = set(ref_group_members) + + stub_group_members = MagicMock(return_value=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + + def test_get_multiple_group_users(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn'] + ref_group_members = [['twilightsparkle', 'fluttershy'], ['charlie', 'buttercup']] + ref_members = set(ref_group_members[0] + ref_group_members[1]) + + stub_group_members = MagicMock(side_effect=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + + def test_get_multiple_group_with_overlap(self, monkeypatch): + ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn'] + ref_group_members = [['twilightsparkle', 'fluttershy'], ['charlie', 'buttercup', 'twilightsparkle']] + ref_members = set(ref_group_members[0] + ref_group_members[1]) + + stub_group_members = MagicMock(side_effect=ref_group_members) + monkeypatch.setattr(rbac, '_get_group_members', stub_group_members) + + test_group_members = rbac._get_playbook_authorized_users(ref_authorized_groups) + + assert test_group_members == ref_members + +class TestGetGroupMembers(object): + + def setup_method(self, method): + self.ref_groupname = 'sysadmin-ponies' + + def test_valid_group(self, monkeypatch): + ref_members = ['twilightsparkle', 'fluttershy'] + + stub_getgrnam = Mock(gr_mem=ref_members) + stub_grp = MagicMock(return_value=stub_getgrnam) + monkeypatch.setattr(grp, 'getgrnam', stub_grp) + + test_group_members = rbac._get_group_members(self.ref_groupname) + + assert test_group_members == ref_members + + def test_invalid_group(self, monkeypatch): + stub_grp = MagicMock(side_effect=KeyError) + monkeypatch.setattr(grp, 'getgrnam', stub_grp) + + test_group_members = rbac._get_group_members(self.ref_groupname) + + assert test_group_members == [] class TestConfigMerge(object):
Attachment:
pgpwaszbb1TJh.pgp
Description: OpenPGP digital signature
_______________________________________________ infrastructure mailing list infrastructure@xxxxxxxxxxxxxxxxxxxxxxx http://lists.fedoraproject.org/postorius/infrastructure@xxxxxxxxxxxxxxxxxxxxxxx