rbac_playbook fix for RHEL7

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Long story short, when the batcave upgrade happened on Friday we found
out that rbac_playbook doesn't work on el7 due to an issue with
nss-altfiles.

I figured out how to sidestep the issue by changing the approach that
rbac_playbook takes. It used to get all the groups for the user running
the script and check for an intersection between those groups and the
required groups for the playbook being run.

The new version looks at the groups required for the playbook being
run, gathers all the users in those groups and verifies that the user
running rbac_playbook is in that list before proceeding.

I've included the changes below for security review before updating
anything on batcave01

Tim


diff --git a/ansible_utils.spec b/ansible_utils.spec
index 2e26a70..f097020 100644
--- a/ansible_utils.spec
+++ b/ansible_utils.spec
@@ -20,12 +20,14 @@ Requires:       PyYAML
 BuildRequires:  python2-devel
 BuildRequires:  python-setuptools
 BuildRequires:  python-kitchen
+BuildRequires:  python-mock
 
 # python-dingus isn't built for el7, can't run tests
 %if 0%{?rhel} > 6
 BuildRequires:  pytest
 BuildRequires:  PyYAML
 BuildRequires:  python-dingus
+BuildRequires:  python-mock
 %endif
 
 # the cli uses argparse, which is not part of the standard libaray in
python2.6 diff --git a/ansible_utils/rbac_playbook.py
b/ansible_utils/rbac_playbook.py index 42be23f..c6a952c 100755
--- a/ansible_utils/rbac_playbook.py
+++ b/ansible_utils/rbac_playbook.py
@@ -140,7 +140,7 @@ def generate_message(result, username,
playbook_name, args, checksum): :return: rendered string summarizing
rbac activity """
     subject = "[rbac-playbook] {0} {1} ran {2}".format(result,
-
username.pw_name,
+                                                       username,
                                                        playbook_name)
     body = ['Details:']
     body.extend(['{0}: {1}'.format(key, value) for key, value in
args.iteritems()]) @@ -188,10 +188,13 @@ def
run_sudo_command(playbook_file, playbook_args): full_filename =
os.path.abspath(os.path.join(config['config']['PLAYBOOKS_DIR'],
playbook_file)) 
+    python_args = ['cd', config['config']['ANSIBLE_DIR'], ';',
+                   '/usr/bin/python2',
config['config']['ANSIBLE_PLAYBOOK'],
+                   full_filename]
+    python_args.extend(playbook_args)
     executable = '/usr/bin/sudo'
-    args = ['-i', '/usr/bin/python2',
-            config['config']['ANSIBLE_PLAYBOOK'], full_filename]
-    args.extend(playbook_args)
+    args = ['-i', '/bin/bash', '-i', '-c',
+            ' '.join(python_args)]
 
     # Note: (1) The function used here has to pass the environment to
     # ansible-playbook so that it can connect to the ssh-agent
correctly @@ -206,36 +209,56 @@ def run_sudo_command(playbook_file,
playbook_args): print "EXECV: %s %s" % (executable, ' '.join(args))
         os.execv(executable, args)
 
+def _get_username():
+    """Retrieve the username for the user which started execution of
rbac_playbook""" 
-def can_run(acl, groups, playbook_file):
+    user = os.getlogin()
+    username = pwd.getpwnam(user)
+    return username.pw_name
+
+def _get_group_members(groupname):
+    """Find the members of a specific group
+
+    :param groupname: name of group to find members of
+    :return: list of usernames for members of the given group, empty
list if the group does not exist""" +
+    group_data = None
+    try:
+        group_data = grp.getgrnam(groupname)
+    except KeyError:
+        return []
+
+    return group_data.gr_mem
+
+def _get_playbook_authorized_users(grouplist):
+    """Retrieve a set of all users who are members of one or more
groups +
+    :param grouplist: list of one or more group names
+    :return: set of usernames corresponding to the union of members
for each group in the grouplist""" +
+    userlist = []
+    for groupname in grouplist:
+        userlist.extend(_get_group_members(groupname))
+
+    return set(userlist)
+
+def can_run(acl, username, playbook_file):
     """ determines whether the current user is allowed to run a
playbook 
     :param acl: dictionary of acl information
-    :param groups: groups of which the user is a member
+    :param username: username of user running the utility
     :param playbook_file: playbook file that is being run
     :return: True if the user is authorized, False if unauthorized
     """
-    # exact match quick route
+
+    authorized_groups = acl[playbook_file]
+
     if playbook_file in acl:
-        pb_groups = frozenset(acl[playbook_file])
-        if groups.intersection(pb_groups):
+        pb_authorized =
_get_playbook_authorized_users(authorized_groups)
+        if username in pb_authorized:
             return True
     return False
 
-
-def get_groups():
-    """ retrieve the groups of which the current user is currently a
member
-    :return: (username,groups) where groups is a set of groups which
the current user is a member
-    """
-    username = os.getlogin()
-    user = pwd.getpwnam(username)
-    groups = set(g.gr_name for g in grp.getgrall() if username in
g.gr_mem)
-    groups.add(grp.getgrgid(user.pw_gid).gr_name)
-    groups = frozenset(groups)
-
-    return user, groups
-
-
 def generate_args(options):
     """ Generate ansible-playbook compatible args representing the
information passed into rbac
@@ -272,11 +295,11 @@ def rbac_playbook(playbook_name, options):
     :param playbook_name: name of playbook file to run
     :param options: dictionary of options
     """
-    username, groups = get_groups()
+    username = _get_username()
     checksum = get_checksum(playbook_name)
 
     # raise exception if not allowed
-    if not can_run(config['acls'], groups, playbook_name):
+    if not can_run(config['acls'], username, playbook_name):
         notify(generate_message('FAILURE', username, playbook_name,
options, checksum)) msg ="user {0} is not authorized to run
{1}".format(username.pw_name, playbook_name) raise
RbacAuthException(msg) diff --git a/tests/test_rbac_playbook.py
b/tests/test_rbac_playbook.py index 765997d..0b76825 100644
--- a/tests/test_rbac_playbook.py
+++ b/tests/test_rbac_playbook.py
@@ -1,6 +1,8 @@
 import copy
+import grp
 
 from dingus import Dingus
+from mock import MagicMock, Mock
 
 from ansible_utils import rbac_playbook as rbac
 
@@ -93,7 +95,7 @@ class TestGeneratePlaybookArgs(object):
         test_args = rbac.generate_args(ref_options)
         assert test_args == ['-l', ':'.join(ref_limit)]
 
-    def test_simple_limit(self):
+    def test_simple_tag(self):
         ref_tags = ['tag', 'tagg', 'tog', 'togg']
 
         ref_options = {'tags': ref_tags}
@@ -123,25 +125,101 @@ class TestGeneratePlaybookArgs(object):
                              '--start-at-task="some task"']
 
 
-class TestDetermineCanRun(object):
+class TestCanRun(object):
 
     def setup_method(self, method):
         self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn',
'sysadmin-pony'], 'pony.yml': ['sysadmin-pony']
                         }
 
-    def test_allow_one_of_multiple(self, monkeypatch):
-        ref_groups = set(['sysadmin-unicorn'])
-        test_can_run = rbac.can_run(self.ref_acl, ref_groups,
'group/unicorns.yml')
+    def test_allow(self, monkeypatch):
+        ref_user = 'twilightsparkle'
+        ref_authorized = ['twilightsparkle', 'fluttershy']
+
+        stub_authorized = MagicMock(return_value=ref_authorized)
+        monkeypatch.setattr(rbac, '_get_playbook_authorized_users',
stub_authorized) +
+        test_can_run = rbac.can_run(self.ref_acl, ref_user,
'group/unicorns.yml') 
         assert test_can_run
 
-    def test_allow_one_of_multiple(self, monkeypatch):
-        ref_groups = set(['sysadmin-kittycat'])
-        test_can_run = rbac.can_run(self.ref_acl, ref_groups,
'group/unicorns.yml')
+    def test_disallow(self, monkeypatch):
+        ref_user = 'mittens'
+        ref_authorized = ['twilightsparkle', 'fluttershy']
+
+        stub_authorized = MagicMock(return_value=ref_authorized)
+        monkeypatch.setattr(rbac, '_get_playbook_authorized_users',
stub_authorized) +
+        test_can_run = rbac.can_run(self.ref_acl, ref_user,
'group/unicorns.yml') 
         assert not test_can_run
 
+class TestGetAuthorizedUsers(object):
+
+    def setup_method(self, method):
+        self.ref_acl = {'group/unicorns.yml': ['sysadmin-unicorn',
'sysadmin-pony'],
+                        'pony.yml': ['sysadmin-pony']
+                        }
+
+    def test_get_single_group_users(self, monkeypatch):
+        ref_authorized_groups = ['sysadmin-pony']
+        ref_group_members = ['twilightsparkle', 'fluttershy']
+        ref_members = set(ref_group_members)
+
+        stub_group_members = MagicMock(return_value=ref_group_members)
+        monkeypatch.setattr(rbac, '_get_group_members',
stub_group_members) +
+        test_group_members =
rbac._get_playbook_authorized_users(ref_authorized_groups) +
+        assert test_group_members == ref_members
+
+    def test_get_multiple_group_users(self, monkeypatch):
+        ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn']
+        ref_group_members = [['twilightsparkle', 'fluttershy'],
['charlie', 'buttercup']]
+        ref_members = set(ref_group_members[0] + ref_group_members[1])
+
+        stub_group_members = MagicMock(side_effect=ref_group_members)
+        monkeypatch.setattr(rbac, '_get_group_members',
stub_group_members) +
+        test_group_members =
rbac._get_playbook_authorized_users(ref_authorized_groups) +
+        assert test_group_members == ref_members
+
+    def test_get_multiple_group_with_overlap(self, monkeypatch):
+        ref_authorized_groups = ['sysadmin-pony', 'sysadmin-unicorn']
+        ref_group_members = [['twilightsparkle', 'fluttershy'],
['charlie', 'buttercup', 'twilightsparkle']]
+        ref_members = set(ref_group_members[0] + ref_group_members[1])
+
+        stub_group_members = MagicMock(side_effect=ref_group_members)
+        monkeypatch.setattr(rbac, '_get_group_members',
stub_group_members) +
+        test_group_members =
rbac._get_playbook_authorized_users(ref_authorized_groups) +
+        assert test_group_members == ref_members
+
+class TestGetGroupMembers(object):
+
+    def setup_method(self, method):
+        self.ref_groupname = 'sysadmin-ponies'
+
+    def test_valid_group(self, monkeypatch):
+        ref_members = ['twilightsparkle', 'fluttershy']
+
+        stub_getgrnam = Mock(gr_mem=ref_members)
+        stub_grp = MagicMock(return_value=stub_getgrnam)
+        monkeypatch.setattr(grp, 'getgrnam', stub_grp)
+
+        test_group_members =
rbac._get_group_members(self.ref_groupname) +
+        assert test_group_members == ref_members
+
+    def test_invalid_group(self, monkeypatch):
+        stub_grp = MagicMock(side_effect=KeyError)
+        monkeypatch.setattr(grp, 'getgrnam', stub_grp)
+
+        test_group_members =
rbac._get_group_members(self.ref_groupname) +
+        assert test_group_members == []
 
 class TestConfigMerge(object):
 

Attachment: pgpWcEogRX_uH.pgp
Description: OpenPGP digital signature

_______________________________________________
infrastructure mailing list
infrastructure@xxxxxxxxxxxxxxxxxxxxxxx
http://lists.fedoraproject.org/postorius/infrastructure@xxxxxxxxxxxxxxxxxxxxxxx

[Index of Archives]     [Fedora Development]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Yosemite News]     [KDE Users]

  Powered by Linux