[389-devel] [lib389] Review only, aci parsing utilities

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]



I have been working on some aci parsing utilities to help with testing and
potential tools.

I have attached a patch for initial review, but it's not ready for merge.

Still todo:

* Comment cleanup.
* Add helpers to create acis.
* Allow entry aci's to be saved back via the Entry.
* Improve tests to check for the default entries and make sure they were parsed.
* Add pydoc strings.

Feedback is appreciated.


William Brown <william@xxxxxxxxxxxxxxxx>
From 76fe2176e893c538cee6fe5ab0b05161eaf161a0 Mon Sep 17 00:00:00 2001
From: William Brown <william@xxxxxxxxxxxxxxxx>
Date: Tue, 4 Aug 2015 13:30:24 +0930
Subject: [PATCH] Add aci parsing utilities, which will return an EntryAci. Not
 ready for merge, review only.

 lib389/__init__.py      |  23 +++++---
 lib389/_entry.py        | 138 ++++++++++++++++++++++++++++++++++++++++++++++++
 lib389/aci.py           |  29 ++++++++++
 tests/aci_parse_test.py |  53 +++++++++++++++++++
 4 files changed, 236 insertions(+), 7 deletions(-)
 create mode 100644 lib389/aci.py
 create mode 100644 tests/aci_parse_test.py

diff --git a/lib389/__init__.py b/lib389/__init__.py
index 10bf0df..72cb7b8 100644
--- a/lib389/__init__.py
+++ b/lib389/__init__.py
@@ -135,16 +135,22 @@ def wrapper(f, name):
             # print data
             if data:
                 if isinstance(data, tuple):
+                    # Probably should do the same aci vs entry check here
                     return objtype, Entry(data)
                 elif isinstance(data, list):
                     # AD sends back these search references
-#                     if objtype == ldap.RES_SEARCH_RESULT and \
-#                        isinstance(data[-1],tuple) and \
-#                        not data[-1][0]:
-#                         print "Received search reference: "
-#                         pprint.pprint(data[-1][1])
-#                         data.pop() # remove the last non-entry element
+                    # if objtype == ldap.RES_SEARCH_RESULT and \
+                    #    isinstance(data[-1],tuple) and \
+                    #    not data[-1][0]:
+                    #     print "Received search reference: "
+                    #     pprint.pprint(data[-1][1])
+                    #     data.pop() # remove the last non-entry element
+                    for x in data:
+                        print(x)
+                        # So here we need to check if the data contains aci, if so
+                        #  we return an AciEntry instead.
+                        # There may be a better way to achieve this however.....
                     return objtype, [Entry(x) for x in data]
                     raise TypeError("unknown data type %s returned by result" %
@@ -157,6 +163,7 @@ def wrapper(f, name):
             # We need to convert the Entry into the format used by
             # python-ldap
             ent = args[0]
+            # Make this an "or" AciEntry
             if isinstance(ent, Entry):
                 return f(ent.dn, ent.toTupleList(), *args[2:])
@@ -335,6 +342,7 @@ class DirSrv(SimpleLDAPObject):
         from lib389.plugins     import Plugins
         from lib389.tasks       import Tasks
         from lib389.index       import Index
+        from lib389.aci         import Aci
         self.agreement   = Agreement(self)
         self.replica     = Replica(self)
@@ -347,6 +355,7 @@ class DirSrv(SimpleLDAPObject):
         self.schema      = Schema(self)
         self.plugins     = Plugins(self)
         self.tasks       = Tasks(self)
+        self.aci         = Aci(self)
     def __init__(self, verbose=False, timeout=10):
diff --git a/lib389/_entry.py b/lib389/_entry.py
index b97b6e7..1d55273 100644
--- a/lib389/_entry.py
+++ b/lib389/_entry.py
@@ -212,3 +212,141 @@ class Entry(object):
         except MissingEntryError:
             log.exception("This entry should exist!")
+    # GetAcis
+    def getAcis(self):
+        if not self.hasAttr('aci'):
+            # There should be a better way to do this? Perhaps
+            # self search for the aci attr?  
+            return []
+        self.acis = map(lambda a: EntryAci(self, a), self.getValues('aci'))
+        return self.acis
+class EntryAci(object):
+    _urlkeys = [ 'targattrfilters',
+                'targetfilter',
+                'targetattrfilters',
+                'target',
+                'userdn',
+                'groupdn',
+                'roledn',
+              ]
+    _keys = [
+             'targetscope',
+             'targetattrfilters',
+             'targattrfilters',
+             'targetfilter',
+             'targetattr',
+             'target',
+             'version 3.0;',
+             'userdn',
+             'groupdn',
+             'roledn',
+           ]
+    _innerkeys = [
+             'allow',
+             'acl',
+            ]
+    def __init__(self, entry, rawaci):
+        self.entry = entry
+        self._rawaci = rawaci
+        self._acidata = self._parse_aci(self._rawaci)
+    def _find_terms(self, aci):
+        lbr_list = []
+        rbr_list = []
+        depth = 0
+        for i, char in enumerate(aci):
+            if char == '(' and depth == 0:
+                lbr_list.append(i)
+            if char == '(':
+                depth += 1
+            if char == ')' and depth == 1:
+                rbr_list.append(i)
+            if char == ')':
+                depth -= 1
+        # Now build a set of terms.
+        terms = []
+        for lb, rb in zip(lbr_list, rbr_list):
+            terms.append(aci[lb + 1:rb])
+        return terms
+    def _parse_version_3_0(self, rawacipart, data):
+        terms = []
+        # We may need to treat this as a special case.
+        interms = rawacipart.split(';')
+        interms = map(lambda x: x.strip(), interms)
+        # We have to do this because it's not the same
+        # as other term formats.
+        for iwork in interms:
+            for j in self._innerkeys:
+                if iwork.startswith(j) and j == 'acl':
+                    t = iwork.split(' ', 1)[1]
+                    t = t.replace('"', '')
+                    data[j].append(t)
+                if iwork.startswith(j) and j == 'allow':
+                    #First we need to get the bits.
+                    first = iwork.index('(') + 1
+                    second = iwork.index(')', first)
+                    data[j].append(iwork[first:second])
+                    # This wouldn't parse properly:
+                    ### (version 3.0; acl "Enable anonymous access"; allow (read, search, compare) userdn="ldap:///anyone";;)
+                    #  it's because _find_terms is expecting userdn wrapped in braces.
+                    # but the term that follows an allow, the () is optional.
+                    #  so as a result, it upsets the _find_terms parser.
+                    # So, if there are no () after the allow, it must be a single term, so we just append it.
+                    subterm = iwork[second + 1:]
+                    if '(' not in subterm and ')' not in subterm:
+                        terms.append(subterm.strip())
+                    else:
+                        terms += self._find_terms(subterm)
+        return terms
+    def _parse_aci(self, rawaci):
+        #hostdn, aci = entry
+        aci = rawaci
+        depth = 0
+        data = {}
+        #data['dn'] = entry.dn
+        for k in self._keys + self._innerkeys:
+            data[k] = []
+        # We need to get a list of all the depth 0 ( and )
+        terms = self._find_terms(aci)
+        while len(terms) > 0:
+            work = terms.pop()
+            for k in self._keys:
+                if work.startswith(k):
+                    aci = work.replace(k, '', 1)
+                    if k == 'version 3.0;':
+                        #We pop more terms out, but we don't need to parse them "now"
+                        terms += self._parse_version_3_0(aci, data)
+                        continue
+                    # Nearly all terms are = seperated
+                    pre, val = aci.split('=', 1)
+                    val = val.replace('"', '')
+                    ### / We should replace ldap:/// in some attrs
+                    if k in self._urlkeys:
+                        data[k].append(val.replace('ldap:///','', 1))
+                    elif k == 'targetattr':
+                        if pre.strip() == '!':
+                            data['targetattreq'] = False
+                        else:
+                            data['targetattreq'] = True
+                        data[k] += val.split('||')
+                    else:
+                        data[k].append(val)
+                    break # So we don't double trigger the checkc
+        result = {
+            'rawaci': rawaci,
+            'targetattreq' : data.get('targetattreq', None),
+            }
+        for k in self._keys + self._innerkeys:
+            data[k] = map(lambda x: x.strip(), data[k])
+            result[k] = data[k]
+        return result
diff --git a/lib389/aci.py b/lib389/aci.py
new file mode 100644
index 0000000..40b0a7a
--- /dev/null
+++ b/lib389/aci.py
@@ -0,0 +1,29 @@
+"""Aci class to help parse and create ACIs.
+You will access this via the Entry Class.
+import ldap
+from lib389._constants import *
+from lib389 import Entry, InvalidArgumentError
+# Add a helper for aci listing. 
+class Aci(object):
+    def __init__(self, conn):
+        """
+        """
+        self.conn = conn
+        self.log = conn.log
+    def list(self, basedn):
+        acis = []
+        rawacientries = self.conn.search_s(basedn, ldap.SCOPE_SUBTREE, 'objectClass=*', ['aci'])
+        for rawacientry in rawacientries:
+            acis += rawacientry.getAcis()
+        return acis
diff --git a/tests/aci_parse_test.py b/tests/aci_parse_test.py
new file mode 100644
index 0000000..4a6049d
--- /dev/null
+++ b/tests/aci_parse_test.py
@@ -0,0 +1,53 @@
+Created on Aug 3, 2015
+@author: William Brown
+from lib389._constants import *
+from lib389._aci import Aci
+from lib389 import DirSrv,Entry
+import ldap
+INSTANCE_PORT     = 54321
+INSTANCE_SERVERID = 'aciparseds'
+class Test_schema():
+    def setUp(self):
+        instance = DirSrv(verbose=False)
+        instance.log.debug("Instance allocated")
+        args = {SER_HOST:          LOCALHOST,
+                SER_PORT:          INSTANCE_PORT,
+                }
+        instance.allocate(args)
+        if instance.exists():
+            instance.delete()
+        instance.create()
+        instance.open()
+        self.instance = instance
+    def tearDown(self):
+        if self.instance.exists():
+            #self.instance.db2ldif(bename='userRoot', suffixes=[DEFAULT_SUFFIX], excludeSuffixes=[], encrypt=False, repl_data=False, outputfile='%s/ldif/%s.ldif' % (self.instance.dbdir,INSTANCE_SERVERID ))
+            #self.instance.clearBackupFS()
+            #self.instance.backupFS()
+            self.instance.delete()
+    def test_aci(self):
+        acis = self.instance.aci.list(DEFAULT_SUFFIX)
+        # Return the list
+        for aci in acis:
+            print('---------')
+            for key in aci._acidata:
+                print('%s : %s' % (key, aci._acidata[key]))
+        # Add some better tests that actually check we got the 
+        #  default aci's back.
+if __name__ == "__main__":
+    test = Test_schema()
+    test.setUp()
+    test.test_aci()
+    test.tearDown()

389-devel mailing list

[Index of Archives]     [Fedora Directory Announce]     [Fedora Users]     [Older Fedora Users Mail]     [Fedora Advisory Board]     [Fedora Security]     [Fedora Devel Java]     [Fedora Desktop]     [ATA RAID]     [Fedora Marketing]     [Fedora Mentors]     [Fedora Package Review]     [Fedora Art]     [Fedora Music]     [Fedora Packaging]     [CentOS]     [Fedora SELinux]     [Big List of Linux Books]     [KDE Users]     [Fedora Art]     [Fedora Docs]

  Powered by Linux