Hi, I have been working on some aci parsing utilities to help with testing and potential tools. I have attached a patch for initial review, but it's not ready for merge. Still todo: * Comment cleanup. * Add helpers to create acis. * Allow entry aci's to be saved back via the Entry. * Improve tests to check for the default entries and make sure they were parsed. * Add pydoc strings. Feedback is appreciated. Sincerely, -- William Brown <william@xxxxxxxxxxxxxxxx>
From 76fe2176e893c538cee6fe5ab0b05161eaf161a0 Mon Sep 17 00:00:00 2001 From: William Brown <william@xxxxxxxxxxxxxxxx> Date: Tue, 4 Aug 2015 13:30:24 +0930 Subject: [PATCH] Add aci parsing utilities, which will return an EntryAci. Not ready for merge, review only. --- lib389/__init__.py | 23 +++++--- lib389/_entry.py | 138 ++++++++++++++++++++++++++++++++++++++++++++++++ lib389/aci.py | 29 ++++++++++ tests/aci_parse_test.py | 53 +++++++++++++++++++ 4 files changed, 236 insertions(+), 7 deletions(-) create mode 100644 lib389/aci.py create mode 100644 tests/aci_parse_test.py diff --git a/lib389/__init__.py b/lib389/__init__.py index 10bf0df..72cb7b8 100644 --- a/lib389/__init__.py +++ b/lib389/__init__.py @@ -135,16 +135,22 @@ def wrapper(f, name): # print data if data: if isinstance(data, tuple): + # Probably should do the same aci vs entry check here return objtype, Entry(data) elif isinstance(data, list): # AD sends back these search references -# if objtype == ldap.RES_SEARCH_RESULT and \ -# isinstance(data[-1],tuple) and \ -# not data[-1][0]: -# print "Received search reference: " -# pprint.pprint(data[-1][1]) -# data.pop() # remove the last non-entry element - + # if objtype == ldap.RES_SEARCH_RESULT and \ + # isinstance(data[-1],tuple) and \ + # not data[-1][0]: + # print "Received search reference: " + # pprint.pprint(data[-1][1]) + # data.pop() # remove the last non-entry element + + for x in data: + print(x) + # So here we need to check if the data contains aci, if so + # we return an AciEntry instead. + # There may be a better way to achieve this however..... return objtype, [Entry(x) for x in data] else: raise TypeError("unknown data type %s returned by result" % @@ -157,6 +163,7 @@ def wrapper(f, name): # We need to convert the Entry into the format used by # python-ldap ent = args[0] + # Make this an "or" AciEntry if isinstance(ent, Entry): return f(ent.dn, ent.toTupleList(), *args[2:]) else: @@ -335,6 +342,7 @@ class DirSrv(SimpleLDAPObject): from lib389.plugins import Plugins from lib389.tasks import Tasks from lib389.index import Index + from lib389.aci import Aci self.agreement = Agreement(self) self.replica = Replica(self) @@ -347,6 +355,7 @@ class DirSrv(SimpleLDAPObject): self.schema = Schema(self) self.plugins = Plugins(self) self.tasks = Tasks(self) + self.aci = Aci(self) def __init__(self, verbose=False, timeout=10): """ diff --git a/lib389/_entry.py b/lib389/_entry.py index b97b6e7..1d55273 100644 --- a/lib389/_entry.py +++ b/lib389/_entry.py @@ -212,3 +212,141 @@ class Entry(object): except MissingEntryError: log.exception("This entry should exist!") raise + + # GetAcis + def getAcis(self): + if not self.hasAttr('aci'): + # There should be a better way to do this? Perhaps + # self search for the aci attr? + return [] + self.acis = map(lambda a: EntryAci(self, a), self.getValues('aci')) + return self.acis + +class EntryAci(object): + + _urlkeys = [ 'targattrfilters', + 'targetfilter', + 'targetattrfilters', + 'target', + 'userdn', + 'groupdn', + 'roledn', + ] + _keys = [ + 'targetscope', + 'targetattrfilters', + 'targattrfilters', + 'targetfilter', + 'targetattr', + 'target', + 'version 3.0;', + 'userdn', + 'groupdn', + 'roledn', + ] + _innerkeys = [ + 'allow', + 'acl', + ] + + def __init__(self, entry, rawaci): + self.entry = entry + self._rawaci = rawaci + self._acidata = self._parse_aci(self._rawaci) + + def _find_terms(self, aci): + lbr_list = [] + rbr_list = [] + depth = 0 + for i, char in enumerate(aci): + if char == '(' and depth == 0: + lbr_list.append(i) + if char == '(': + depth += 1 + if char == ')' and depth == 1: + rbr_list.append(i) + if char == ')': + depth -= 1 + # Now build a set of terms. + terms = [] + for lb, rb in zip(lbr_list, rbr_list): + terms.append(aci[lb + 1:rb]) + return terms + + def _parse_version_3_0(self, rawacipart, data): + terms = [] + # We may need to treat this as a special case. + interms = rawacipart.split(';') + interms = map(lambda x: x.strip(), interms) + # We have to do this because it's not the same + # as other term formats. + for iwork in interms: + for j in self._innerkeys: + if iwork.startswith(j) and j == 'acl': + t = iwork.split(' ', 1)[1] + t = t.replace('"', '') + data[j].append(t) + if iwork.startswith(j) and j == 'allow': + #First we need to get the bits. + first = iwork.index('(') + 1 + second = iwork.index(')', first) + data[j].append(iwork[first:second]) + # This wouldn't parse properly: + ### (version 3.0; acl "Enable anonymous access"; allow (read, search, compare) userdn="ldap:///anyone";) + # it's because _find_terms is expecting userdn wrapped in braces. + # but the term that follows an allow, the () is optional. + # so as a result, it upsets the _find_terms parser. + # So, if there are no () after the allow, it must be a single term, so we just append it. + subterm = iwork[second + 1:] + if '(' not in subterm and ')' not in subterm: + terms.append(subterm.strip()) + else: + terms += self._find_terms(subterm) + return terms + + def _parse_aci(self, rawaci): + #hostdn, aci = entry + aci = rawaci + depth = 0 + data = {} + #data['dn'] = entry.dn + for k in self._keys + self._innerkeys: + data[k] = [] + # We need to get a list of all the depth 0 ( and ) + terms = self._find_terms(aci) + + while len(terms) > 0: + work = terms.pop() + for k in self._keys: + if work.startswith(k): + aci = work.replace(k, '', 1) + if k == 'version 3.0;': + #We pop more terms out, but we don't need to parse them "now" + terms += self._parse_version_3_0(aci, data) + continue + # Nearly all terms are = seperated + pre, val = aci.split('=', 1) + val = val.replace('"', '') + ### / We should replace ldap:/// in some attrs + if k in self._urlkeys: + data[k].append(val.replace('ldap:///','', 1)) + elif k == 'targetattr': + if pre.strip() == '!': + data['targetattreq'] = False + else: + data['targetattreq'] = True + data[k] += val.split('||') + else: + data[k].append(val) + break # So we don't double trigger the checkc + + result = { + 'rawaci': rawaci, + 'targetattreq' : data.get('targetattreq', None), + } + + + for k in self._keys + self._innerkeys: + data[k] = map(lambda x: x.strip(), data[k]) + result[k] = data[k] + return result diff --git a/lib389/aci.py b/lib389/aci.py new file mode 100644 index 0000000..40b0a7a --- /dev/null +++ b/lib389/aci.py @@ -0,0 +1,29 @@ +"""Aci class to help parse and create ACIs. + +You will access this via the Entry Class. +""" + +import ldap + +from lib389._constants import * +from lib389 import Entry, InvalidArgumentError + +# Add a helper for aci listing. + +class Aci(object): + + + def __init__(self, conn): + """ + """ + self.conn = conn + self.log = conn.log + + def list(self, basedn): + acis = [] + rawacientries = self.conn.search_s(basedn, ldap.SCOPE_SUBTREE, 'objectClass=*', ['aci']) + for rawacientry in rawacientries: + acis += rawacientry.getAcis() + return acis + + diff --git a/tests/aci_parse_test.py b/tests/aci_parse_test.py new file mode 100644 index 0000000..4a6049d --- /dev/null +++ b/tests/aci_parse_test.py @@ -0,0 +1,53 @@ +''' +Created on Aug 3, 2015 + +@author: William Brown +''' +from lib389._constants import * +from lib389._aci import Aci +from lib389 import DirSrv,Entry +import ldap + +INSTANCE_PORT = 54321 +INSTANCE_SERVERID = 'aciparseds' +INSTANCE_PREFIX = None + +class Test_schema(): + def setUp(self): + instance = DirSrv(verbose=False) + instance.log.debug("Instance allocated") + args = {SER_HOST: LOCALHOST, + SER_PORT: INSTANCE_PORT, + SER_DEPLOYED_DIR: INSTANCE_PREFIX, + SER_SERVERID_PROP: INSTANCE_SERVERID + } + instance.allocate(args) + if instance.exists(): + instance.delete() + instance.create() + instance.open() + self.instance = instance + + def tearDown(self): + if self.instance.exists(): + #self.instance.db2ldif(bename='userRoot', suffixes=[DEFAULT_SUFFIX], excludeSuffixes=[], encrypt=False, repl_data=False, outputfile='%s/ldif/%s.ldif' % (self.instance.dbdir,INSTANCE_SERVERID )) + #self.instance.clearBackupFS() + #self.instance.backupFS() + self.instance.delete() + + def test_aci(self): + acis = self.instance.aci.list(DEFAULT_SUFFIX) + # Return the list + for aci in acis: + print('---------') + for key in aci._acidata: + print('%s : %s' % (key, aci._acidata[key])) + # Add some better tests that actually check we got the + # default aci's back. + +if __name__ == "__main__": + test = Test_schema() + test.setUp() + test.test_aci() + test.tearDown() + -- 2.4.3
-- 389-devel mailing list 389-devel@xxxxxxxxxxxxxxxxxxxxxxx https://admin.fedoraproject.org/mailman/listinfo/389-devel