I have finished the patch that allows to configure users and group on IPA server. I attached the patch to this letter.
>
> 14.05.2015, 17:08, "Мартынов Александр" <m--a-s@xxxxxxxxx>:
>> Hello. I is still making the patch for system-config-users. And I would still like the changes be included in the system-config-users.
>>
>> 02.04.2015, 10:55, "Мартынов Александр" <m--a-s@xxxxxxxxx>:
>>> I am writing patch for system-config-users that allows configure users ang group from a IPA server with python library ipalib from package ipa-python. I would like the changes be included in the system-config-users. What is the best way to implement this from the point of view of architecture? I am now implementing API that is similar to API of libuser.
diff --git a/src/ipa_libuser.py b/src/ipa_libuser.py
new file mode 100644
index 0000000..ead1750
--- /dev/null
+++ b/src/ipa_libuser.py
@@ -0,0 +1,421 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import subprocess
+
+import libuser
+
+import ipalib
+
+def connectToIPA(**kwargs):
+ api_ipa = ipalib.api
+ api_ipa.bootstrap(context = "cli")
+ api_ipa.finalize()
+ try:
+ api_ipa.Backend.xmlclient.connect()
+ return True
+ except:
+ try:
+ api_ipa.Backend.xmlclient.connect()
+ return True
+ except:
+ return False
+ pass
+
+class User:
+ def __getattr__(self, attr):
+ return self.data[attr]
+ def __getitem__(self, attr):
+ return self.data[attr]
+ def __repr__(self):
+ return "<User data = {0}>".format(self.data)
+ def __str__(self):
+ return self.__repr__()
+ def __lt__(self, other):
+ return self.data[libuser.USERNAME][0] < other.data[libuser.USERNAME][0]
+ def __eq__(self, other):
+ if other == None:
+ return False
+ elif type(other) == unicode:
+ return self.data[libuser.USERNAME][0] == other
+ elif type(other) == str:
+ return self.data[libuser.USERNAME][0] == other
+ else:
+ return self.data[libuser.USERNAME][0] == other.data[libuser.USERNAME][0]
+ def __ne__(self, other):
+ return not self.__eq__(other)
+ def __nonzero__(self):
+ return self.data.has_key(libuser.USERNAME)
+ def __init__(self, _ipadata, **kwargs):
+ self.translate = {
+ libuser.USERNAME: 'uid', 'pw_uid': 'uidnumber', 'pw_gid': 'gidnumber',
+ 'givenname': 'givenname', 'sn': 'sn',
+ 'pw_dir': 'homedirectory', 'pw_shell': 'loginshell',
+ 'sp_expire': 'krbpasswordexpiration', 'memberof_group': 'memberof_group', 'nsaccountlock': 'nsaccountlock'}
+ self.default = {'pw_passwd': 'x', 'sp_pwdp' : '', 'sp_lstchg': -1, 'sp_min': 0, 'sp_max': 99999, 'sp_warn': 7, 'sp_inact': -1, 'sp_flag': -1, libuser.GECOS: ''}
+ self.data = {}
+ for k in self.translate:
+ if _ipadata.has_key(self.translate[k]):
+ if type(_ipadata[self.translate[k]]) == tuple:
+ self.data[k] = [el for el in _ipadata[self.translate[k]]]
+ else:
+ self.data[k] = [_ipadata[self.translate[k]]]
+ for k in self.default:
+ self.data[k] = [self.default[k]]
+ if self.data.has_key('pw_uid'):
+ self.data['pw_uid'] = [int(self.data['pw_uid'][0])]
+ self.data['pw_gid'] = [0]
+ if self.data.has_key('givenname') or self.data.has_key('sn'):
+ self.data['pw_gecos'] = [str(self.data.get('givenname', [''])[0] + " " + self.data.get('sn', [''])[0])]
+ else:
+ self.data['pw_gecos'] = [""]
+ def modules(self):
+ return ["ldap"]
+ def get(self, attr, *args):
+ if self.data.has_key(attr):
+ return self.data[attr]
+ return None
+ else:
+ return None
+ def set(self, attr, value):
+ if attr == libuser.USERNAME:
+ return
+ self.data[attr] = value
+ def isLocked(self):
+ return self.data['nsaccountlock'][0]
+ def groupList(self):
+ if self.data.has_key('memberof_group'):
+ return self.data['memberof_group']
+ else:
+ return []
+ def ipadata(self):
+ ret = {}
+ for k in self.translate:
+ if self.data.has_key(k):
+ if type(self.data[k]) == str:
+ ret[self.translate[k]] = unicode(self.data[k])
+ elif type(self.data[k]) == unicode:
+ ret[self.translate[k]] = self.data[k]
+ elif type(self.data[k]) != list:
+ ret[self.translate[k]] = self.data[k]
+ elif len(self.data[k]) > 1:
+ ret[self.translate[k]] = [unicode(el) for el in self.data[k]]
+ else:
+ ret[self.translate[k]] = unicode(self.data[k][0])
+ if self.data.has_key(libuser.GECOS):
+ if type(self.data[libuser.GECOS]) == list:
+ gecos = self.data[libuser.GECOS][0]
+ else:
+ gecos = self.data[libuser.GECOS]
+ if len(gecos) != 0:
+ l = gecos.split(' ')
+ ret['givenname'] = unicode(l[0])
+ if len(l) > 1:
+ ret['sn'] = unicode(l[1])
+ return ret
+ def name(self):
+ return self.data[libuser.USERNAME]
+ def group(self):
+ return self.data.get('memberof_group', [])
+ def id(self):
+ return self.data[libuser.UIDNUMBER]
+ def isIncluded(self, group):
+ if not type(group) in [str, unicode]:
+ group = group.name()[0]
+ if self.data.has_key('memberof_group'):
+ return group in self.data['memberof_group']
+ else:
+ return False
+ def isAdmin(self, _admin):
+ return (self.name()[0] in _admin.get('member_user', [])) or set(self.group()).intersection(set(_admin.get('member_group', []))) != set()
+
+class Group:
+ def __init__(self, _ipadata):
+ self.translate = {libuser.GROUPNAME: 'cn', 'pw_gid': 'gidnumber' , 'gr_mem': 'member_user', 'memberof_group': 'memberof_group'}
+ self.default = { 'gr_passwd': '', 'sp_pwdp': '' }
+ self.data = {}
+ for k in self.translate:
+ if _ipadata.has_key(self.translate[k]):
+ if 0 < len(_ipadata[self.translate[k]]):
+ self.data[k] = [el for el in _ipadata[self.translate[k]]]
+ else:
+ self.data[k] = [_ipadata[self.translate[k]][0]]
+ for k in self.default:
+ self.data[k] = [self.default[k]]
+ if self.data.has_key('pw_gid'):
+ self.data['pw_gid'] = [int(self.data['pw_gid'][0])]
+ else:
+ self.data['pw_gid'] = [0]
+ if not self.data.has_key(libuser.GROUPNAME):
+ self.data[libuser.GROUPNAME] = ['']
+ def __getattr__(self, attr):
+ return self.data[attr]
+ def __getitem__(self, attr):
+ return self.data[attr]
+ def __repr__(self):
+ return "<Group data = {0}>".format(self.data)
+ def __str__(self):
+ return self.name()[0]
+ def __lt__(self, other):
+ return self.data[libuser.GROUPNAME][0] < other.data[libuser.GROUPNAME][0]
+ def __eq__(self, other):
+ if other == None:
+ return False
+ elif type(other) == unicode:
+ return self.data[libuser.GROUPNAME][0] == other
+ elif type(other) == str:
+ return self.data[libuser.GROUPNAME][0] == other
+ else:
+ return self.data[libuser.GROUPNAME][0] == other.data[libuser.GROUPNAME][0]
+ def __ne__(self, other):
+ return not self.__eq__(other)
+ def __nonzero__(self):
+ return self.data.has_key(libuser.GROUPNAME)
+ def get(self, attr):
+ if self.data.has_key(attr):
+ return self.data[attr]
+ else:
+ return []
+ def set(self, attr, value):
+ if attr == libuser.MEMBERNAME and type(value) != list:
+ self.data[attr] = [value]
+ else:
+ self.data[attr] = value
+ def ipadata(self):
+ ret = {}
+ for k in self.translate:
+ if self.data.has_key(k):
+ if type(self.data[k]) == str:
+ ret[self.translate[k]] = unicode(self.data[k])
+ elif type(self.data[k]) == unicode:
+ ret[self.translate[k]] = self.data[k]
+ elif type(self.data[k]) != list:
+ ret[self.translate[k]] = self.data[k]
+ elif len(self.data[k]) > 0 and k in [libuser.GROUPNAME]:
+ ret[self.translate[k]] = unicode(self.data[k][0])
+ elif type(self.data[k]) == list:
+ ret[self.translate[k]] = [unicode(el) for el in self.data[k]]
+ else:
+ ret[self.translate[k]] = unicode(self.data[k][0])
+ return ret
+ def name(self):
+ return self.data[libuser.GROUPNAME]
+ def id(self):
+ return self.data['pw_gid']
+ def isReal(self):
+ return 'primary_groups' in self.data.get('memberof_group', [])
+ def isIncluded(self, group):
+ if type(group) in [list, tuple]:
+ group = group.name()[0]
+ if self.data.has_key('memberof_group'):
+ return group in self.data['memberof_group']
+ else:
+ return False
+
+class IPAAdmin:
+ __cached = False
+ def __user_list(self):
+ return ipalib.api.Command.user_find(all = True)['result']
+ def __update_users(self):
+ self.__users = self.__user_list()
+ def _user_list(self):
+ if not self.__cached:
+ __users = self.__user_list()
+ else:
+ __users = self.__users
+ _groups = self._group_list()
+ _users = map(lambda u: User(u, groups = _groups), __users)
+ return _users;
+ def _user_list_by_uid(self, uid, **kwargs):
+ records = ipalib.api.Command.user_find(all = True)['result']
+ return map(lambda r: User(r), filter(lambda u: uid in u['uid'], records));
+ def _user_list_by_id(self, id, **kwargs):
+ _users = filter(lambda u: id in u.id(), self._user_list())
+ return _users
+ def _add_user(self, **kwargs):
+ kwargs['givenname'] = kwargs.get('givenname', kwargs.get('uid', ""))
+ kwargs['sn'] = kwargs.get('sn', kwargs.get('uid', ""))
+ kwargs['uid'] = kwargs.get('uid', "")
+ gidnumber = kwargs.pop('gidnumber')
+ try:
+ ipalib.api.Command.user_add(**kwargs)
+ except ipalib.errors.ManagedGroupExistsError:
+ print "ipalib.errors.ManagedGroupExistsError"
+ pass
+ self.__update_users()
+ self.__update_groups()
+ def _del_user(self, username):
+ ipalib.api.Command.user_del(uid = username)
+ self.__update_users()
+ def _mod_user(self, **kwargs):
+ _ipadata2 = kwargs
+ uid = kwargs['uid']
+ if kwargs.has_key('krbpasswordexpiration'):
+ krbpasswordexpiration = kwargs.pop('krbpasswordexpiration')
+ _users = self._user_list_by_uid(uid)
+ _user = None
+ if len(_users) > 0:
+ _user = _users[0]
+ else:
+ print "ERROR: user '{0} is not found'".format(uid)
+ _ipadata1 = _user.ipadata()
+ kwargs1 = {}
+ for k in _ipadata2:
+ if _ipadata2[k] != _ipadata1[k]:
+ kwargs1[k] = _ipadata2[k]
+ if kwargs1.has_key('memberof_group'):
+ memberof_group0 = kwargs1.pop('memberof_group')
+ memberof_group1 = _user.ipadata().get('memberof_group', []) if _user else []
+ if kwargs.has_key('memberof_group2'):
+ memberof_group2 = kwargs.pop('memberof_group')
+ else:
+ memberof_group2 = []
+ _groups = [el for el in memberof_group2 if not el in memberof_group1]
+ if kwargs1.has_key('gidnumber'):
+ gidnumber = kwargs1.pop('gidnumber')
+ if 0 < len(kwargs1):
+ ipalib.api.Command.user_mod(uid, **kwargs1)
+ self.__update_users()
+ self.__update_groups()
+ def _set_pass(self, username, password):
+ ipalib.api.Command.passwd(unicode(username[0]), unicode(password))
+ self.__update_users()
+ def __group_list(self):
+ return ipalib.api.Command.group_find(all = True)['result']
+ def __group_list_by_cn(self, cn):
+ return filter(lambda g: cn in g['cn'], self.__group_list());
+ def __update_groups(self):
+ self.__groups = self.__group_list()
+ def _group_list(self):
+ if not self.__cached:
+ __groups = self.__group_list()
+ else:
+ __groups = self.__groups
+ _groups = map(lambda g: Group(g), __groups)
+ return _groups
+ def _group_list_by_name(self, name, **kwargs):
+ if type(name) == list:
+ name = name[0]
+ return filter(lambda g: name in g.name(), self._group_list());
+ def _group_list_by_id(self, id, **kwargs):
+ _groups = filter(lambda g: id in g.id(), self._group_list())
+ return _groups
+ def _add_group(self, **kwargs):
+ cn = kwargs.get('cn', '')
+ description = kwargs.get('description', cn)
+ ipalib.api.Command.group_add(cn = cn, description = description)
+ self.__update_groups()
+ def _del_group(self, groupname):
+ ipalib.api.Command.group_del(groupname)
+ self.__update_groups()
+ self.__update_users()
+ def _mod_group(self, **kwargs):
+ _ipadata2 = kwargs
+ cn = kwargs.pop('cn')
+ if type(cn) == list:
+ cn = cn[0]
+ _group = self._group_list_by_name(cn)[0]
+ _ipadata1 = _group.ipadata()
+ memberof_group1 = _group.ipadata().get('member_user', [])
+ memberof_group2 = kwargs.pop('member_user')
+ kwargs1 = {}
+ for k in _ipadata2:
+ if _ipadata2[k] != _ipadata1[k]:
+ kwargs1[k] = _ipadata2[k]
+ kwargs2 = {'user': [el for el in memberof_group2 if not el in memberof_group1]}
+ kwargs3 = {'user': [el for el in memberof_group1 if not el in memberof_group2]}
+ changed = False
+ if 0 < len(kwargs1):
+ ipalib.api.Command.group_mod(cn, **kwargs1)
+ changed = True
+ if 0 < len(kwargs2['user']):
+ ipalib.api.Command.group_add_member(cn, **kwargs2)
+ changed = True
+ if 0 < len(kwargs3['user']):
+ ipalib.api.Command.group_remove_member(cn, **kwargs3)
+ changed = True
+ if changed:
+ self.__update_users()
+ self.__update_groups()
+ def __init__(self):
+ if connectToIPA():
+ print "connect to IPA"
+ self.__update_users()
+ self.__update_groups()
+ def enumerateUsersFull(self, *args, **kwargs):
+ pattern = args[0].replace("*", "") if len(args) > 0 else ""
+ _users = self._user_list()
+ return filter(lambda u: pattern in u.name()[0], _users)
+ def enumerateUsers(self):
+ _users = self._user_list()
+ return map(lambda u: u['pw_name'][0], _users)
+ def enumerateUsersByGroup(self, groupname, **kwargs):
+ _users = filter(lambda u: u.isIncluded(groupname), self._user_list())
+ return map(lambda u: u['pw_name'][0], _users)
+ def lookupUserByName(self, uid):
+ _users = self._user_list_by_uid(uid)
+ return (_users[0] if 0 < len(_users) else None)
+ def enumerateGroupsFull(self, *args):
+ _groups = self._group_list()
+ pattern = args[0].replace("*", "") if len(args) > 0 else ""
+ return filter(lambda g: pattern in g.name()[0], _groups)
+ def enumerateGroups(self):
+ _groups = self._group_list()
+ return map(lambda u: u['gr_name'][0], _groups)
+ def enumerateGroupsByUser(self, username):
+ _users = self._user_list_by_uid(username)
+ ret = []
+ if len(_users) > 0:
+ ret = _users[0].groupList()
+ return ret
+ def lookupGroupById(self, id):
+ if id == None:
+ return None
+ if id == 0:
+ _groups = self._group_list_by_name("ipausers")
+ ret = (_groups[0] if 0 < len(_groups) else None)
+ return ret
+ _groups = self._group_list_by_id(id)
+ ret = (_groups[0] if 0 < len(_groups) else None)
+ return ret
+ def lookupGroupByName(self, groupname):
+ _groups = self._group_list_by_name(groupname)
+ return (_groups[0] if 0 < len(_groups) else None)
+ def initUser(self, username):
+ _users = self._user_list_by_uid(username)
+ ret = None
+ if len(_users) > 0:
+ ret = _users[0]
+ else:
+ ret = User({'uid': (username, )})
+ return ret
+ def addUser(self, userEnt, **kwargs):
+ self._add_user(**userEnt.ipadata())
+ def deleteUser(self, userEnt):
+ self._del_user(userEnt.get(libuser.USERNAME)[0])
+ def modifyUser(self, userEnt):
+ self._mod_user(**userEnt.ipadata())
+ def lockUser(self, userEnt):
+ self._mod_user(uid = unicode(userEnt.name()), nsaccountlock = True)
+ def unlockUser(self, userEnt, **kwargs):
+ self._mod_user(uid = unicode(userEnt.name()), nsaccountlock = False)
+ def userIsLocked(self, userEnt):
+ return userEnt.isLocked()
+ def setpassUser(self, userEnt, password, number):
+ self._set_pass(userEnt.name(), password);
+ def initGroup(self, groupname):
+ ret = Group({'cn': (groupname, )})
+ return ret
+ def addGroup(self, groupEnt):
+ self._add_group(cn = unicode(groupEnt.get(libuser.GROUPNAME)[0]))
+ def deleteGroup(self, groupEnt):
+ self._del_group(groupEnt.get(libuser.GROUPNAME)[0])
+ pass
+ def modifyGroup(self, groupEnt):
+ self._mod_group(**groupEnt.ipadata())
+ def getUserShells(self):
+ return libuser.getUserShells()
+
+def ADMIN():
+ return IPAAdmin()
diff --git a/src/mainWindow.py b/src/mainWindow.py
index da21676..e3487b3 100755
--- a/src/mainWindow.py
+++ b/src/mainWindow.py
@@ -30,6 +30,7 @@ import rpm
import shutil
import libuser
+import ipa_libuser
from constants import uid_min, gid_min
@@ -143,7 +144,7 @@ class mainWindow:
self.delete_menu = xml.get_widget("delete_menu")
self.toplevel.connect("destroy", self.destroy)
- self.ADMIN = libuser.admin()
+ self.ADMIN = ipa_libuser.ADMIN()
self.userStore = gtk.ListStore(gobject.TYPE_STRING, gobject.TYPE_UINT64, gobject.TYPE_STRING,
gobject.TYPE_STRING, gobject.TYPE_STRING, gobject.TYPE_STRING,
--
devel mailing list
devel@xxxxxxxxxxxxxxxxxxxxxxx
https://admin.fedoraproject.org/mailman/listinfo/devel
Fedora Code of Conduct: http://fedoraproject.org/code-of-conduct