Hey guys, it's time for another release of the Func Users & Groups
management module.
The additions for 0.5 are mostly polish:
- register_method_args has been implemented.
- unit tests have been added to test_client.py for the methods contained
in the users module.
It seems to me like all the intended functionality now exists in a
working state in the current version of the module, and that it's about
ready for inclusion into the source tree. What do you guys think?
The relevant files are attached.
###############################################################################
#
# Func Users and Group management module.
# Author: Gregory Masseau <gjmasseau@xxxxxxxxxxxxxxxxxxx>
#
###############################################################################
#
# Legal:
#
# This software may be freely redistributed under the terms of the GNU
# general public license.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
###############################################################################
#
# Changelog:
#
# 0.5:
# - (Feature) Implemented register_method_args.
# - (External) Unit tests added to test_client.py.
#
# 0.4:
# - (Feature) Password alteration method added.
# - (Feature) Pluralized methods for all applicable singular methods.
# - (Misc) General API cleanup.
#
# 0.3:
# - (Feature) Methods added to create, delete, and modify groups.
# - (Feature) Methods added to create, delete, and modify users.
# - (Feature) Manage the memberships of users within groups
#
# 0.2:
# - (Feature) Most informational methods are complete and working for both
# users and groups at this point.
#
# 0.1:
# - (Feature) Initial release, supporting only some informational query
# messages regarding user accounts on the target system.
#
###############################################################################
"""
User management module for func.
"""
from func.minion.modules import func_module
from func.minion import sub_process
import pwd
import grp
from os import system
class UsersModule(func_module.FuncModule):
version = "0,5"
api_version = "0,5"
description = "Nearly complete."
# INTERNALLY USED METHODS #####################################################
def __command(self,*list):
"""
This method is used internally by this module when invoking external commands. It should remain private.
This method should probably be improved to check the elems for suspicious characters like ';','&&','||'.
"""
cmd = ''
for elem in list:
if elem == '':
pass
else:
cmd = cmd + " '" + str(elem.replace("'","") if (type(elem) == str) else elem) + "'"
# print "\nCmd: [%s]"%cmd
return False if system(cmd+" 2>/dev/null 1>/dev/null") else True
def __plural(self,f):
return (lambda xs: map(f,xs))
# GROUPADD METHODS ############################################################
def __groupadd(self,group,*switches):
"""Constructs the proper argument sequence for self.__command and returns it's result."""
if self.group_exists(group):
return False
else:
return self.__command("/usr/sbin/groupadd",group,*switches)
def group_add(self,group,*gid):
"""Adds a group on the target system(s)."""
if gid:
if self.gid_exists(gid[0]):
return False
else:
print str(gid[0]) + "<-"
return self.__groupadd(group,"-g",gid[0])
else:
return self.__groupadd(group)
def groups_add(self,*groups):
"""Adds a series of groups on the target system(s)."""
return self.__plural(self.group_add)(groups)
def group_add_non_unique(self,group,*gid):
"""Adds a group on the target system(s)."""
if gid:
if self.gid_exists(gid[0]):
return False
else:
return self.__groupadd(group,"-o","-g",gid[0])
else:
return self.__groupadd(group,"-o")
# GROUPDEL METHODS ############################################################
def __groupdel(self,group,*switches):
"""Constructs the proper argument sequence for self.__command and returns it's result."""
if self.group_exists(group):
return self.__command("/usr/sbin/groupdel",group,*switches)
else:
return False
def group_del(self,group):
"""Deletes a group on the target system(s)."""
return self.__groupdel(group)
def groups_del(self,*groups):
"""Adds a series of groups."""
return self.__plural(self.group_del)(groups)
# GROUPMOD METHODS ############################################################
def __groupmod(self,group,*switches):
"""Constructs the proper argument sequence for self.__command and returns it's result."""
if self.group_exists(group):
if switches:
return self.__command("/usr/sbin/groupmod",group,*switches)
else:
return self.__command("/usr/sbin/groupmod",group)
else:
return False
def group_set_gid_non_unique(self,group,gid):
"""Changes the GID of the specified group on the target system(s), allowing non-unique GID."""
return self.__groupmod(group,"-o","-g",gid)
def group_set_gid(self,group,gid):
"""Changes the GID of the specified group on the target system(s)."""
if self.gid_exists(gid):
return False
else:
return self.__groupmod(group,"-g",gid)
def group_set_groupname(self,group,groupname):
"""Changes the name of the specified group on the target system(s)."""
if self.group_exists(groupname):
return False
else:
return self.__groupmod(group,"-n",groupname)
# USERADD METHODS #############################################################
def __useradd(self,user,*switches):
"""Constructs the proper argument sequence for self.__command and returns it's result."""
if self.user_exists(user):
return False
else:
return self.__command("/usr/sbin/useradd",user)
def user_add(self,user):
"""Adds a user on the target system(s)."""
return self.__useradd(user)
def users_add(self,*users):
"""Adds a series of users on the target system(s)."""
return self.__plural(self.user_add)(users)
# USERDEL METHODS #############################################################
def __userdel(self,user,*switches):
"""Constructs the proper argument sequence for self.__command and returns it's result."""
if self.user_exists(user):
return self.__command("/usr/sbin/userdel",user,*switches)
else:
return False
def user_del(self,user,*options):
"""Deletes a user on the target system(s)."""
switches=[]
if options:
for option in options:
if option == 'force':
switches.append('-f')
elif option == 'remove':
switches.append('-r')
else:
return False
return self.__userdel(user,*switches)
def users_del(self,*users):
"""Deletes a series of users on the target system(s)."""
return self.__plural(self.user_del)(users)
# USERMOD METHODS #############################################################
def __usermod(self,user,*switches):
"""Constructs the proper argument sequence for self.__command and returns it's result."""
if self.user_exists(user):
command = []
if switches:
command = list(switches)
command.append(user)
return self.__command("/usr/sbin/usermod",*command)
else:
return False
def user_lock(self,user):
"""Locks a user account on the target system(s)."""
return self.__usermod(user,"-L")
def users_lock(self,*users):
"""Locks a series of user accounts on the target system(s)."""
return self.__plural(self.user_lock)(users)
def user_set_shell(self,user,shell):
"""Set a specified user's shell on the target system(s)."""
return self.__usermod(user,"-s",shell)
def users_set_shell(self,shell,*users):
"""Set a specified list of users' shell on the target system(s)."""
return self.__plural(lambda u: self.user_set_shell(u,shell))(users)
def user_set_home(self,user,home):
"""Change (but don't move the contents of) a user's home folder on the target system(s)."""
return self.__usermod(user,"-d",home)
def user_set_loginname(self,user,loginname):
"""Change a user's login name on the target system(s)."""
return self.__usermod(user,"-l",loginname)
def user_set_comment(self,user,comment):
"""Change the value of a user's GECOS field -- maybe replace this with a field sensitive version?"""
return self.__usermod(user,"-c",comment)
def user_set_expiredate(self,user,expiredate):
"""Set the expity date for a specified user on the target system(s)."""
return self.__usermod(user,"-e",expiredate)
def users_set_expiredate(self,expiredate,*users):
"""Set a specified list of users' expiry date on the target system(s)."""
return self.__plural(lambda u: self.user_set_expiredate(u,expiredate))(users)
def user_set_uid_non_unique(self,user,uid):
"""Change a user's UID, allowing non-unique UIDs on the target system(s)."""
return self.__usermod(user,"-u",uid,"-o")
def user_set_uid(self,user,uid):
"""Change a user's UID on the target system(s)."""
return self.__usermod(user,"-u",uid)
def user_set_inactive(self,user,inactive):
"""Set the inactivity timer on a user on the target system(s)."""
return self.__usermod(user,"-f",inactive)
def users_set_inactive(self,inactive,*users):
"""Set the inactivity timer on a series of users on the target system(s)."""
return self.__plural(lambda u: self.user_set_inactive(u,inactive))(users)
def user_set_gid(self,user,gid):
"""Change a users primary group by GID on the target system(s)."""
if self.gid_exists(gid):
return self.__usermod(user,"-g",gid)
else:
return False
def users_set_gid(self,gid,*users):
"""Set a specified list of users' primary GID on the target system(s)."""
return self.__plural(lambda u: self.user_set_gid(u,gid))(users)
def user_move_home(self,user,home):
"""Changes and moves a users home folder on the target system(s)."""
return self.__usermod(user,"-d",home,"-m")
def user_unlock(self,user):
"""Unlocks a specified user account on the target system(s)."""
return self.__usermod(user,"-U")
def users_unlock(self,*users):
"""Unlocks a specified list of users' accounts on the target system(s)."""
return self.__plural(self.user_unlock)(users)
def user_add_to_group(self,user,group):
"""Appends the user to a specified group on the target system(s)."""
if self.group_exists(group):
return self.__usermod(user,"-aG",group)
else:
return False
def users_add_to_group(self,group,*users):
"""Appends the list of users to a specified group on the target system(s)."""
return self.__plural(lambda u: self.user_add_to_group(u,group))(users)
def user_set_group(self,user,group):
"""Changes a users primary group on the target system(s)."""
if self.group_exists(group):
gid = self.group_to_gid(group)
return self.__usermod(user,"-g",gid)
else:
return False
def users_set_group(self,group,*users):
"""Changes a series of users' primary group on the target system(s)."""
return self.__plural(lambda u: self.user_set_group(u,group))(users)
# PASSWD/CHPASWD METHODS #####################################################
def passwd(self,user,passwd):
"""Changes a user's password on the target system(s)."""
if self.user_exists(user):
if system("echo "+passwd+" | passwd --stdin "+user):
return False
else:
return True
else:
return False
# INFORMATIONAL METHODS #######################################################
# EXISTANCE TEST METHODS
def user_exists(self,user):
"""Checks to see if a given user exists on the target system(s)."""
try:
if pwd.getpwnam(user):
return True
except KeyError:
return False
def users_exist(self,*users):
"""Checks to see if a series of users exists on the target system(s)."""
return self.__plural(self.user_exists)(users)
def uid_exists(self,uid):
"""Checks to see if a given UID exists on the target system(s)."""
try:
if pwd.getpwuid(int(uid)):
return True
except KeyError:
return False
def uids_exist(self,*uids):
"""Checks to see if a series of UIDs exists on the target system(s)."""
return self.__plural(self.uid_exists)(uids)
def group_exists(self,group):
"""Checks to see if a given group exists on the target system(s)."""
try:
if grp.getgrnam(group):
return True
except KeyError:
return False
def groups_exist(self,*groups):
"""Checks to see if a series of groups exist on the target system(s)."""
return self.__plural(self.group_exists)(groups)
def gid_exists(self,gid):
"""Checks to see if a given GID exists on the target system(s)."""
try:
if grp.getgrgid(int(gid)):
return True
except KeyError:
return False
def gids_exist(self,*gids):
"""Checks to see if a series of GIDs exist on the target system(s)."""
return self.__plural(self.gid_exists)(gids)
# LISTING METHODS
def user_list(self):
"""Lists all users on the target system(s)."""
users = []
for user in pwd.getpwall():
users.append(user[0])
return users
def users_list(self):
"""Lists all users on the target system(s)."""
return self.user_list()
def uid_list(self):
"""Lists all UIDs on the target system(s)."""
uids = []
for user in pwd.getpwall():
uids.append(user[2] if user[2] < 4294967294 else True)
return uids
def uids_list(self):
"""Lists all UIDs on the target system(s)."""
return self.uid_list()
def group_list(self):
"""Lists all groups on the target system(s)."""
groups = []
for group in grp.getgrall():
groups.append(group[0])
return groups
def groups_list(self):
"""Lists all groups on the target system(s)."""
return self.group_list()
def gid_list(self):
"""Lists all GIDs on the target system(s)."""
gids = []
for group in grp.getgrall():
gids.append(group[2] if group[2] < 4294967294 else True)
return gids
def gids_list(self):
"""Lists all GIDs on the target system(s)."""
return self.gid_list()
# INFO METHODS
def user_info(self,user):
"""Returns user info or false for a specified user on the target system(s)."""
try:
if pwd.getpwnam(user):
info = pwd.getpwnam(user)
return list(info) # I'm not sure why this has to be listed but the method fails to work if it isn't.
except KeyError:
return False
def users_info(self,*users):
"""Returns a list of (group info or False) for a series of users on the target system(s)."""
return self.__plural(self.user_info)(users)
def uid_info(self,uid):
"""Returns user info or false for a specified user (by UID) on the target system(s)."""
try:
if pwd.getpwuid(uid):
info = pwd.getpwuid(int(uid))
return list(info)
except KeyError:
return False
def uids_info(self,*uids):
"""Returns a list (group info or False) for a series of users (by UID) on the target system(s)."""
return self.__plural(self.uid_info)(uids)
def group_info(self,group):
"""Returns group info or false for a specified group on the target system(s)."""
try:
if grp.getgrnam(group):
info = grp.getgrnam(group)
return list(info) #for some reason this needs to be list-ed
except KeyError:
return False
def groups_info(self,*groups):
"""Returns a list (group info or False) for a series of groups on the target system(s)."""
return self.__plural(self.group_info)(groups)
def gid_info(self,gid):
"""Returns group info or false for a specified group (by GID) on the target system(s)."""
try:
if grp.getgrgid(int(gid)):
info = grp.getgrgid(int(gid))
return list(info)
except KeyError:
return False
def gids_info(self,*gids):
"""Returns a list (group info or False) for a series of groups (by GID) on the target system(s)."""
return self.__plural(self.gid_info)(gids)
# INVENTORY METHODS
def user_inventory(self):
"""Returns user info for all users on the target system(s)."""
return pwd.getpwall()
def users_inventory(self):
"""Returns user info for all users on the target system(s)."""
return self.users_inventory()
def group_inventory(self):
"""Returns group info for all users on the target system(s)."""
return grp.getgrall()
def groups_inventory(self):
"""Returns group info for all users on the target system(s)."""
return self.groups_inventory()
# CONVERSION METHODS
def user_to_uid(self,user):
"""Takes a user name and converts it to the matching UID."""
try:
username = pwd.getpwnam(user)[2]
return username
except KeyError:
return False
def users_to_uids(self,*users):
"""Takes a series of usernames and converts it to a list of matching UIDs."""
return self.__plural(self.user_to_uid)(users)
def uid_to_user(self,uid):
"""Takes a UID and converts it to the matching user name."""
try:
user = pwd.getpwuid(int(uid))[0]
return user
except KeyError:
return False
def uids_to_users(self,*uids):
"""Takes a series of UIDs and converts it to a list of matching user names."""
return self.__plural(self.uid_to_user)(uids)
def group_to_gid(self,group):
"""Takes a group name and converts it to the matching GID."""
try:
groupname = grp.getgrnam(group)[2]
return groupname
except KeyError:
return False
def groups_to_gids(self,*groups):
"""Takes a series of group names and converts it to a list of matching GIDs."""
return self.__plural(self.group_to_gid)(groups)
def gid_to_group(self,gid):
"""Takes a GID and converts it to the matching group name."""
try:
group = grp.getgrgid(int(gid))[0]
return group
except KeyError:
return False
def gids_to_groups(self,*gids):
"""Takes a series of GIDs and converts it to a list of matching group names."""
return self.__plural(self.gid_to_groups)(gids)
######
def register_method_args(self):
password = {
'type':'string',
'optional':False,
'description':'A password.'
}
cmdopt = {
'type':'string',
'optional':False,
'description':'An option to the command.'
}
cmdopts = {
'type':'list*',
'optional':False,
'description':'An series of options to the command.'
}
username = {
'type':'string',
'optional':False,
'description':'A username.',
}
usernames = {
'type':'list*',
'optional':False,
'description':'A series of usernames.',
}
group = {
'type':'string',
'optional':False,
'description':'A group name.'
}
groups = {
'type':'list*',
'optional':False,
'description':'A series of group names.'
}
gid = {
'type':'int',
'optional':False,
'description':'A gid.'
}
gids = {
'type':'list*',
'optional':False,
'description':'A series of gids.'
}
ogid = {
'type':'list*',
'optional':False,
'description':'An optional gid.'
}
ouid = {
'type':'list*',
'optional':False,
'description':'An optional uid.'
}
uid = {
'type':'int',
'optional':False,
'description':'A uid.'
}
uids = {
'type':'list*',
'optional':False,
'description':'A series of uids.'
}
return {
#GROUPADD METHODS
'group_add':{
'args':{
'group':group,
'gid':ogid
},
'description':"Create a group."
},
'groups_add':{
'args':{
'groups':groups
},
'description':"Create series of groups."
},
'group_add_non_unique':{
'args':{
'group':group,
'gid':ogid
},
'description':"Create a group."
},
#GROUPDEL METHODS
'group_del':{
'args':{
'group':group
},
'description':"Delete a group."
},
'groups_del':{
'args':{
'groups':groups
},
'description':"Delete a series of groups."
},
#GROUPMOD METHODS
'group_set_gid_non_unique':{
'args':{
'group':group,
'gid':gid
},
'description':"Allows a groups gid to be non-unique."
},
'group_set_gid':{
'args':{
'group':group,
'gid':gid
},
'description':"Set a group's gid."
},
'group_set_groupname':{
'args':{
'group':group,
'groupname':group
},
'description':"Set a group's groupname."
},
#USERADD METHODS
'user_add':{
'args':{
'user':username
},
'description':"Create a user."
},
'users_add':{
'args':{
'users':usernames
},
'description':"Create series of users."
},
#USERDEL METHODS
'user_del':{
'args':{
'user':username,
'options':cmdopts,
},
'description':"Delete a user's account."
},
'users_del':{
'args':{
'users':usernames,
},
'description':"Delete a series of users' accounts."
},
#USERMOD METHODS
'user_lock':{
'args':{
'user':username,
},
'description':"Lock a user's account."
},
'users_lock':{
'args':{
'users':usernames,
},
'description':"Lock a series of users' accounts."
},
'user_set_shell':{
'args':{
'user':username,
'shell':{
'type':'string',
'optional':False,
'description':"A path to a shell."
}
},
'description':"Set a user's shell."
},
'users_set_shell':{
'args':{
'users':usernames,
'shell':{
'type':'string',
'optional':False,
'description':"A path to a shell."
}
},
'description':"Set a series of users' shell."
},
'user_set_home':{
'args':{
'user':username,
'home':{
'type':'string',
'optional':False,
'description':"A directory."
}
},
'description':"Set a user's home folder."
},
'user_set_loginname':{
'args':{
'user':username,
'loginname':username
},
'description':"Set a user's GECOS field."
},
'user_set_comment':{
'args':{
'user':username,
'comment':cmdopt
},
'description':"Set a user's GECOS field."
},
'user_set_expiredate':{
'args':{
'user':username,
'expiredate':cmdopt
},
'description':"Set a user's account's expiry date."
},
'users_set_expiredate':{
'args':{
'expiredate':cmdopt,
'users':usernames
},
'description':"Set a series of users' accounts' expiry date."
},
'user_set_uid_non_unique':{
'args':{
'user':username,
'uid':uid
},
'description':"Set a user's uid."
},
'user_set_uid':{
'args':{
'user':username,
'uid':uid
},
'description':"Set a user's uid."
},
'user_set_inactive':{
'args':{
'user':username,
'inactive':cmdopt
},
'description':"Set a user's inactivity timer."
},
'users_set_inactive':{
'args':{
'inactive':cmdopt,
'users':usernames
},
'description':"Set a series of users' inactivity timer."
},
'user_set_gid':{
'args':{
'user':username,
'gid':gid
},
'description':"Set a user's gid."
},
'users_set_gid':{
'args':{
'gid':gid,
'users':usernames
},
'description':"Set a series of users' gids."
},
'user_move_home':{
'args':{
'user':username,
'home':{
'type':'string',
'optional':False,
'description':"A directory."
}
},
'description':"Set a user's home folder and move the contents to the new folder."
},
'user_unlock':{
'args':{
'user':username,
},
'description':"Unlock a user's account."
},
'users_unlock':{
'args':{
'users':usernames,
},
'description':"Unlock a series of users' account."
},
'user_add_to_group':{
'args':{
'group':group,
'user':username,
},
'description':"Append a user to a group."
},
'users_add_to_group':{
'args':{
'group':group,
'users':usernames,
},
'description':"Append a series of users to a group."
},
'user_set_group':{
'args':{
'group':group,
'user':username,
},
'description':"Set a user's group."
},
'users_set_group':{
'args':{
'group':group,
'users':usernames,
},
'description':"Set a series of users' group."
},
#PASSWD METHODS
'passwd':{
'args':{
'user':username,
'passwd':password
},
'description':"Change a user's password."
},
#EXISTANCE TEST METHODS
'uid_exists':{
'args':{
'uid':uid
},
'description':'Test the existance of a uids.'
},
'uids_exist':{
'args':{
'uids':uids
},
'description':'Test the existance of a series of uids.'
},
'gid_exists':{
'args':{
'gid':gid
},
'description':'Test the existance of a gids.'
},
'gids_exist':{
'args':{
'gids':gids
},
'description':'Test the existance of a series of groups.'
},
'user_exists':{
'args':{
'user':username
},
'description':'Test the existance of a users.'
},
'users_exist':{
'args':{
'users':usernames
},
'description':'Test the existance of a series of users.'
},
'group_exists':{
'args':{
'group':group
},
'description':'Test the existance of a groups.'
},
'groups_exist':{
'args':{
'groups':groups
},
'description':'Test the existance of a series of groups.'
},
#LISTING METHODS
'uid_list':{
'args':{},
'description':'Get a list of all uids.'
},
'uids_list':{
'args':{},
'description':'Get a list of all uids.'
},
'gid_list':{
'args':{},
'description':'Get a list of all gids.'
},
'gids_list':{
'args':{},
'description':'Get a list of all groups.'
},
'user_list':{
'args':{},
'description':'Get a list of all users.'
},
'users_list':{
'args':{},
'description':'Get a list of all users.'
},
'group_list':{
'args':{},
'description':'Get a list of all groups.'
},
'groups_list':{
'args':{},
'description':'Get a list of all groups.'
},
#INFO METHODS
'user_info':{
'args':{
'user':username
},
'description':'Fetch info for a specified user.'
},
'users_info':{
'args':{
'users':usernames
},
'description':'Fetch info for a specified series of users.'
},
'uid_info':{
'args':{
'uid':uid
},
'description':'Fetch info for a specified uid.'
},
'uids_info':{
'args':{
'uids':uids
},
'description':'Fetch info for a specified series of uids.'
},
'group_info':{
'args':{
'group':group
},
'description':'Fetch info for a specified group.'
},
'groups_info':{
'args':{
'groups':groups
},
'description':'Fetch info for a specified series of groups.'
},
'gid_info':{
'args':{
'gid':gid
},
'description':'Fetch info for a specified gid.'
},
'gids_info':{
'args':{
'gids':gids
},
'description':'Fetch info for a specified series of gids.'
},
#INVENTORY METHODS
'user_inventory':{
'args':{},
'description':'Get user info for all users.'
},
'users_inventory':{
'args':{},
'description':'Get user info for all users.'
},
'group_inventory':{
'args':{},
'description':'Get group info for all groups.'
},
'groups_inventory':{
'args':{},
'description':'Get group info for all groups.'
},
#CONVERSION METHODS
'user_to_uid':{
'args':{
'user':username
},
'description':'Convert a username to a matching uid.'
},
'users_to_uids':{
'args':{
'users':usernames
},
'description':'Convert a series of usernames to a list of matching uids.'
},
'uid_to_user':{
'args':{
'uid':uid
},
'description':'Convert a uid to a username.'
},
'uids_to_users':{
'args':{
'uids':uids
},
'description':'Convert a series of uids to a list of matching usernames.'
},
'group_to_gid':{
'args':{
'group':group
},
'description':'Convert a group to a matching gid.'
},
'groups_to_gids':{
'args':{
'groups':groups
},
'description':'Converts a series of groups to a list of matching gids.'
},
'gid_to_group':{
'args':{
'gid':gid
},
'description':'Converts a gids to a matching groupname.'
},
'gids_to_groups':{
'args':{
'gids':gids
},
'description':'Converts a series of gids to a list of matching groupnames.'
}
}
def ree(self):
password = {
'type':'string',
'optional':False,
'description':'A password.'
}
cmdopt = {
'type':'string',
'optional':False,
'description':'An option to the command.'
}
cmdopts = {
'type':'list*',
'optional':False,
'description':'An series of options to the command.'
}
username = {
'type':'string',
'optional':False,
'description':'A username.',
}
usernames = {
'type':'list*',
'optional':False,
'description':'A series of usernames.',
}
group = {
'type':'string',
'optional':False,
'description':'A group name.'
}
groups = {
'type':'list*',
'optional':False,
'description':'A series of group names.'
}
gid = {
'type':'int',
'optional':False,
'description':'A gid.'
}
gids = {
'type':'list*',
'optional':False,
'description':'A series of gids.'
}
ogid = {
'type':'list*',
'optional':False,
'description':'An optional gid.'
}
ouid = {
'type':'list*',
'optional':False,
'description':'An optional uid.'
}
uid = {
'type':'int',
'optional':False,
'description':'A uid.'
}
uids = {
'type':'list*',
'optional':False,
'description':'A series of uids.'
}
return {
#GROUPADD METHODS
'group_add':{
'args':{
'group':group,
'gid':ogid
},
'description':"Create a group."
},
'groups_add':{
'args':{
'groups':groups
},
'description':"Create series of groups."
},
'group_add_non_unique':{
'args':{
'group':group,
'gid':ogid
},
'description':"Create a group."
},
#GROUPDEL METHODS
'group_del':{
'args':{
'group':group
},
'description':"Delete a group."
},
'groups_del':{
'args':{
'groups':groups
},
'description':"Delete a series of groups."
},
#GROUPMOD METHODS
'group_set_gid_non_unique':{
'args':{
'group':group,
'gid':gid
},
'description':"Allows a groups gid to be non-unique."
},
'group_set_gid':{
'args':{
'group':group,
'gid':gid
},
'description':"Set a group's gid."
},
'group_set_groupname':{
'args':{
'group':group,
'groupname':group
},
'description':"Set a group's groupname."
},
#USERADD METHODS
'user_add':{
'args':{
'user':username
},
'description':"Create a user."
},
'users_add':{
'args':{
'users':usernames
},
'description':"Create series of users."
},
#USERDEL METHODS
'user_del':{
'args':{
'user':username,
'options':cmdopts,
},
'description':"Delete a user's account."
},
'users_del':{
'args':{
'users':usernames,
'options':cmdopts,
},
'description':"Delete a series of users' accounts."
},
#USERMOD METHODS
'user_lock':{
'args':{
'user':username,
},
'description':"Lock a user's account."
},
'users_lock':{
'args':{
'users':usernames,
},
'description':"Lock a series of users' accounts."
},
'user_set_shell':{
'args':{
'user':username,
'shell':{
'type':'string',
'optional':False,
'description':"A path to a shell."
}
},
'description':"Set a user's shell."
},
'users_set_shell':{
'args':{
'users':usernames,
'shell':{
'type':'string',
'optional':False,
'description':"A path to a shell."
}
},
'description':"Set a series of users' shell."
},
'user_set_home':{
'args':{
'user':username,
'home':{
'type':'string',
'optional':False,
'description':"A directory."
}
},
'description':"Set a user's home folder."
},
'user_set_loginname':{
'args':{
'user':username,
'loginname':username
},
'description':"Set a user's GECOS field."
},
'user_set_comment':{
'args':{
'user':username,
'comment':cmdopt
},
'description':"Set a user's GECOS field."
},
'user_set_expiredate':{
'args':{
'user':username,
'expiredate':cmdopt
},
'description':"Set a user's account's expiry date."
},
'users_set_expiredate':{
'args':{
'expiredate':cmdopt,
'users':usernames
},
'description':"Set a series of users' accounts' expiry date."
},
'user_set_uid_non_unique':{
'args':{
'user':username
},
'description':"Set a user's uid."
},
'user_set_uid':{
'args':{
'user':username,
'uid':uid
},
'description':"Set a user's uid."
},
'user_set_inactive':{
'args':{
'user':username,
'inactive':cmdopt
},
'description':"Set a user's inactivity timer."
},
'users_set_inactive':{
'args':{
'inactive':cmdopt,
'users':usernames
},
'description':"Set a series of users' inactivity timer."
},
'user_set_gid':{
'args':{
'user':username,
'gid':gid
},
'description':"Set a user's gid."
},
'users_set_gid':{
'args':{
'gid':gid,
'users':usernames
},
'description':"Set a series of users' gids."
},
'user_move_home':{
'args':{
'user':username,
'home':{
'type':'string',
'optional':False,
'description':"A directory."
}
},
'description':"Set a user's home folder and move the contents to the new folder."
},
'user_unlock':{
'args':{
'user':username,
},
'description':"Unlock a user's account."
},
'users_unlock':{
'args':{
'users':usernames,
},
'description':"Unlock a series of users' account."
},
'user_add_to_group':{
'args':{
'group':group,
'user':username,
},
'description':"Append a user to a group."
},
'users_add_to_group':{
'args':{
'group':group,
'users':usernames,
},
'description':"Append a series of users to a group."
},
'user_set_group':{
'args':{
'group':group,
'user':username,
},
'description':"Set a user's group."
},
'users_set_group':{
'args':{
'group':group,
'users':usernames,
},
'description':"Set a series of users' group."
},
#EXISTANCE TEST METHODS
'uid_exists':{
'args':{
'uid':uid
},
'description':'Test the existance of a uids.'
},
'uids_exist':{
'args':{
'uids':uids
},
'description':'Test the existance of a series of uids.'
},
'gid_exists':{
'args':{
'gid':gid
},
'description':'Test the existance of a gids.'
},
'gids_exist':{
'args':{
'gids':gids
},
'description':'Test the existance of a series of groups.'
},
'user_exists':{
'args':{
'user':username
},
'description':'Test the existance of a users.'
},
'users_exist':{
'args':{
'users':usernames
},
'description':'Test the existance of a series of users.'
},
'group_exists':{
'args':{
'group':group
},
'description':'Test the existance of a groups.'
},
'groups_exist':{
'args':{
'groups':groups
},
'description':'Test the existance of a series of groups.'
},
#LISTING METHODS
'uid_list':{
'args':{},
'description':'Get a list of all uids.'
},
'uids_list':{
'args':{},
'description':'Get a list of all uids.'
},
'gid_list':{
'args':{},
'description':'Get a list of all gids.'
},
'gids_list':{
'args':{},
'description':'Get a list of all groups.'
},
'user_list':{
'args':{},
'description':'Get a list of all users.'
},
'users_list':{
'args':{},
'description':'Get a list of all users.'
},
'group_list':{
'args':{},
'description':'Get a list of all groups.'
},
'groups_list':{
'args':{},
'description':'Get a list of all groups.'
},
#INFO METHODS
'user_info':{
'args':{
'user':username
},
'description':'Fetch info for a specified user.'
},
'users_info':{
'args':{
'users':usernames
},
'description':'Fetch info for a specified series of users.'
},
'uid_info':{
'args':{
'uid':uid
},
'description':'Fetch info for a specified uid.'
},
'uids_info':{
'args':{
'uids':uids
},
'description':'Fetch info for a specified series of uids.'
},
'group_info':{
'args':{
'group':group
},
'description':'Fetch info for a specified group.'
},
'groups_info':{
'args':{
'groups':groups
},
'description':'Fetch info for a specified series of groups.'
},
'gid_info':{
'args':{
'gid':gid
},
'description':'Fetch info for a specified gid.'
},
'gids_info':{
'args':{
'gids':gids
},
'description':'Fetch info for a specified series of gids.'
},
#INVENTORY METHODS
'user_inventory':{
'args':{},
'description':'Get user info for all users.'
},
'users_inventory':{
'args':{},
'description':'Get user info for all users.'
},
'group_inventory':{
'args':{},
'description':'Get group info for all groups.'
},
'groups_inventory':{
'args':{},
'description':'Get group info for all groups.'
},
#CONVERSION METHODS
'user_to_uid':{
'args':{
'user':username
},
'description':'Convert a username to a matching uid.'
},
'users_to_uids':{
'args':{
'users':usernames
},
'description':'Convert a series of usernames to a list of matching uids.'
},
'uid_to_user':{
'args':{
'uid':uid
},
'description':'Convert a uid to a username.'
},
'uids_to_users':{
'args':{
'uids':uids
},
'description':'Convert a series of uids to a list of matching usernames.'
},
'group_to_gid':{
'args':{
'groups':groups
},
'description':'Convert a group to a matching gid.'
},
'groups_to_gids':{
'args':{
'groups':groups
},
'description':'Converts a series of groups to a list of matching gids.'
},
'gid_to_group':{
'args':{
'gid':gid
},
'description':'Converts a gids to a matching groupname.'
},
'gids_to_groups':{
'args':{
'gids':gids
},
'description':'Converts a series of gids to a list of matching groupnames.'
}
}
#!/usr/bin/python
## Copyright 2008, Various
## Adrian Likins <alikins@xxxxxxxxxx>
##
## This software may be freely redistributed under the terms of the GNU
## general public license.
##
import os
import socket
import unittest
import xmlrpclib
import func.overlord.client as fc
import func.utils
import socket
class BaseTest:
# assume we are talking to localhost
# th = socket.gethostname()
#th = socket.getfqdn()
th = 'liberia'
nforks=1
async=False
def __init__(self):
pass
def setUp(self):
self.overlord = fc.Overlord(self.th,
nforks=self.nforks,
async=self.async)
def test_module_version(self):
mod = getattr(self.overlord, self.module)
result = mod.module_version()
self.assert_on_fault(result)
def test_module_api_version(self):
mod = getattr(self.overlord, self.module)
result = mod.module_api_version()
self.assert_on_fault(result)
def test_module_description(self):
mod = getattr(self.overlord, self.module)
result = mod.module_description()
self.assert_on_fault(result)
def test_module_list_methods(self):
mod = getattr(self.overlord, self.module)
result = mod.list_methods()
self.assert_on_fault(result)
def test_module_get_method_args(self):
mod = getattr(self.overlord,self.module)
arg_result=mod.get_method_args()
self.assert_on_fault(arg_result)
def test_module_inventory(self):
mod = getattr(self.overlord, self.module)
result = mod.list_methods()
res = result[self.th]
# not all modules have an inventory, so check for it
# FIXME: not real happy about having to call list method for
# every module, but it works for now -akl
if "inventory" in res:
result = mod.inventory()
self.assert_on_fault(result)
# we do this all over the place...
def assert_on_fault(self, result):
assert func.utils.is_error(result[self.th]) == False
# assert type(result[self.th]) != xmlrpclib.Fault
def assert_on_no_fault(self, results):
assert func.utils.is_error(results[self.th]) == True
# attrs set so we can skip these via nosetest
test_module_version.intro = True
test_module_api_version.intro = True
test_module_description.intro = True
test_module_list_methods.intro = True
test_module_inventory.intro = True
test_module_get_method_args.intro = True
class TestTest(BaseTest):
module = "test"
def test_add(self):
result = self.overlord.test.add(1,5)
self.assert_on_fault(result)
assert result[self.th] == 6
def test_add_string(self):
result = self.overlord.test.add("foo", "bar")
self.assert_on_fault(result)
assert result[self.th] == "foobar"
def test_sleep(self):
result = self.overlord.test.sleep(1)
self.assert_on_fault(result)
def test_explode(self):
results = self.overlord.test.explode()
print results
self.assert_on_no_fault(results)
def test_explode_no_string(self):
results = self.overlord.test.explode_no_string()
print results
self.assert_on_no_fault(results)
def _echo_test(self, data):
result = self.overlord.test.echo(data)
self.assert_on_fault(result)
assert result[self.th] == data
# this tests are basically just to test the basic
# marshalling/demarshaling bits
def test_echo_int(self):
self._echo_test(1)
def test_echo_string(self):
self._echo_test("caneatcheese")
def test_echo_array(self):
self._echo_test([1, 2, "three", "fore", "V",])
def test_echo_hash(self):
self._echo_test({"one":1, "too":2, "III":3, "many":8, "lots":12312})
def test_echo_bool_false(self):
self._echo_test(False)
def test_echo_bool_true(self):
self._echo_test(True)
def test_echo_float(self):
self._echo_test(123.456)
def test_echo_big_float(self):
self._echo_test(123121232.23)
def test_echo_bigger_float(self):
self._echo_test(234234234234234234234.234234234234234)
def test_echo_little_float(self):
self._echo_test(0.000000000000000000000000000000037)
def test_echo_binary(self):
blob = "348dshke354ts0d9urgk"
import xmlrpclib
data = xmlrpclib.Binary(blob)
self._echo_test(data)
def test_echo_date(self):
import datetime
dt = datetime.datetime(1974, 1, 5, 11, 59 ,0,0, None)
import xmlrpclib
data = xmlrpclib.DateTime(dt)
self._echo_test(data)
def test_config(self):
result = self.overlord.test.configfoo()
config = result[self.th]
self.assert_on_fault(result)
def test_config_write(self):
result = self.overlord.test.config_save()
config = result[self.th]
self.assert_on_fault(result)
def test_config_set(self):
result = self.overlord.test.config_set("example", 5000)
config = result[self.th]
self.assert_on_fault(result)
def test_config_get_example(self):
result = self.overlord.test.config_get("example")
config = result[self.th]
self.assert_on_fault(result)
def test_config_get_string(self):
result = self.overlord.test.config_get("string_option")
option = result[self.th]
print option
assert(type(option) == type("foo"))
self.assert_on_fault(result)
def test_config_get_int(self):
result = self.overlord.test.config_get("int_option")
option = result[self.th]
assert(type(option) == type(1))
self.assert_on_fault(result)
def test_config_get_bool(self):
result = self.overlord.test.config_get("bool_option")
option = result[self.th]
assert(type(option) == type(True))
self.assert_on_fault(result)
def test_config_get_float(self):
result = self.overlord.test.config_get("float_option")
option = result[self.th]
assert(type(option) == type(2.71828183))
self.assert_on_fault(result)
def test_config_get_test(self):
result = self.overlord.test.config_get_test()
self.assert_on_fault(result)
class TestCommand(BaseTest):
module = "command"
def test_echo(self):
result = self.overlord.command.run("echo -n foo")
self.assert_on_fault(result)
assert result[self.th][1] == "foo"
def test_rpm(self):
# looksing for some package that should be there, rh specific
# ish at the moment
result = self.overlord.command.run("rpm -q filesystem")
self.assert_on_fault(result)
assert result[self.th][1].split("-")[0] == "filesystem"
def test_env(self):
result = self.overlord.command.run("env",
{'BLIPPYFOO':'awesome'})
self.assert_on_fault(result)
assert result[self.th][1].find("BLIPPYFOO=awesome") != -1
# def test_sleep_long(self):
# result = self.overlord.command.run("sleep 256")
# self.assert_on_fault(result)
def test_shell_globs(self):
result = self.overlord.command.run("ls /etc/cron*")
self.assert_on_fault(result)
def test_shell_pipe(self):
result = self.overlord.command.run("echo 'foobar' | grep foo")
self.assert_on_fault(result)
assert result[self.th][1] == "foobar\n"
def test_shell_redirect(self):
result = self.overlord.command.run("mktemp")
tmpfile = result[self.th][1].strip()
result = self.overlord.command.run("echo foobar >> %s; cat %s" % (tmpfile, tmpfile))
self.assert_on_fault(result)
assert result[self.th][1] == "foobar\n"
def test_shell_multiple_commands(self):
result = self.overlord.command.run("cal; date; uptime; ls;")
self.assert_on_fault(result)
class TestCopyfile(BaseTest):
fn = "/tmp/func_test_file"
dest_fn = "/tmp/func_test_file_dest"
content = "this is a func test file"
module = "copyfile"
def create_a_file(self, size=1):
f = open(self.fn, "w")
f.write(self.content*size)
f.close()
# def test_local_copyfile(self):
# result = self.overlord.local.copyfile.send(self.fn, self.dest_fn)
# print result
# self.assert_on_fault(result)
def test_copyfile(self, size=1):
self.create_a_file(size=size)
fb = open(self.fn,"r").read()
data = xmlrpclib.Binary(fb)
result = self.overlord.copyfile.copyfile(self.dest_fn, data)
self.assert_on_fault(result)
assert result[self.th] == 0
# def test_copyfile_big(self):
# # make a file in the ~70 meg range
# self.test_copyfile(size=100)
def test_checksum(self):
self.create_a_file()
fb = open(self.fn,"r").read()
data = xmlrpclib.Binary(fb)
result = self.overlord.copyfile.copyfile(self.dest_fn, data)
result = self.overlord.copyfile.checksum(self.dest_fn)
self.assert_on_fault(result)
assert result[self.th] == "b36a8040e44c16605d7784cdf1b3d9ed04ea7f55"
class TestHardware(BaseTest):
module = "hardware"
def test_inventory(self):
result = self.overlord.hardware.inventory()
self.assert_on_fault(result)
def test_halinfo(self):
result = self.overlord.hardware.hal_info()
self.assert_on_fault(result)
def test_info(self):
result = self.overlord.hardware.info()
self.assert_on_fault(result)
def test_info_no_devices(self):
result = self.overlord.hardware.info(False)
self.assert_on_fault(result)
class TestFileTracker(BaseTest):
fn = "/etc/hosts"
fn_glob = "/etc/init.d/*"
fn_list = ["/etc/hosts", "/etc/func/minion.conf"]
fn_glob_list = ["/etc/hosts", "/etc/func/*"]
module = "filetracker"
def test_track(self):
result = self.overlord.filetracker.track(self.fn)
assert result[self.th] == 1
self.assert_on_fault(result)
def test_track_glob(self):
result = self.overlord.filetracker.track(self.fn)
assert result[self.th] == 1
self.assert_on_fault(result)
def test_untrack_glob(self):
result = self.overlord.filetracker.track(self.fn_glob)
result = self.overlord.filetracker.untrack(self.fn_glob)
self.assert_on_fault(result)
def test_track_fn_list(self):
result = self.overlord.filetracker.track(self.fn_list)
assert result[self.th] == 1
self.assert_on_fault(result)
def test_untrack_fn_list(self):
result = self.overlord.filetracker.track(self.fn_list)
result = self.overlord.filetracker.untrack(self.fn_list)
self.assert_on_fault(result)
def test_track_fn_glob_list(self):
result = self.overlord.filetracker.track(self.fn_glob_list)
assert result[self.th] == 1
self.assert_on_fault(result)
def test_untrack_fn_glob_list(self):
result = self.overlord.filetracker.track(self.fn_glob_list)
result = self.overlord.filetracker.untrack(self.fn_glob_list)
self.assert_on_fault(result)
def test_inventory(self):
result = self.overlord.filetracker.track(self.fn)
result = self.overlord.filetracker.inventory(False)
self.assert_on_fault(result)
assert self.fn in result[self.th][0]
# assert result[self.th][0][3] == 0
def test_untrack(self):
result = self.overlord.filetracker.track(self.fn)
result = self.overlord.filetracker.untrack(self.fn)
self.assert_on_fault(result)
result_inv = self.overlord.filetracker.inventory(False)
tracked_files = result_inv[self.th]
for i in tracked_files:
if i[0] == self.fn:
assert "%s was not properly untracked" % self.fn
class TestMount(BaseTest):
module = "mount"
def test_mount_list(self):
result = self.overlord.mount.list()
self.assert_on_fault(result)
# INSERT some clever way to test mount here
class TestNetworkTest(BaseTest):
module = "networktest"
def test_ping(self):
result = self.overlord.networktest.ping(self.th, "-c", "2")
self.assert_on_fault(result)
def test_ping_bad_arg(self):
result = self.overlord.networktest.ping(self.th)
# this should give us a FuncException
foo = func.utils.is_error(result[self.th])
def test_netstat(self):
result = self.overlord.networktest.netstat("-n")
self.assert_on_fault(result)
def test_traceroute(self):
result = self.overlord.networktest.traceroute(self.th)
self.assert_on_fault(result)
def test_dig(self):
result = self.overlord.networktest.dig("redhat.com")
self.assert_on_fault(result)
def test_isportopen_closed_port(self):
result = self.overlord.networktest.isportopen(self.th, 34251)
self.assert_on_fault(result)
def test_isportopen_open_port(self):
result = self.overlord.networktest.isportopen(self.th, 51234)
self.assert_on_fault(result)
class TestProcess(BaseTest):
module = "process"
def test_info(self):
result = self.overlord.process.info()
self.assert_on_fault(result)
def test_mem(self):
result = self.overlord.process.mem()
self.assert_on_fault(result)
# FIXME: how to test kill/pkill? start a process with
# command and then kill it?
class TestService(BaseTest):
module = "service"
def test_inventory(self):
result = self.overlord.service.inventory()
self.assert_on_fault(result)
def test_get_enabled(self):
result = self.overlord.service.get_enabled()
self.assert_on_fault(result)
def test_get_running(self):
result = self.overlord.service.get_running()
self.assert_on_fault(result)
def test_get_status(self):
running_data = self.overlord.service.get_running()[self.th]
result = self.overlord.service.status(running_data[0][0])
self.assert_on_fault(result)
#FIXME: whats a good way to test starting/stoping services without
# doing bad things? -akl
class TestRpm(BaseTest):
module = "rpms"
def test_inventory(self):
result = self.overlord.rpms.inventory()
self.assert_on_fault(result)
def test_glob(self):
# if func is running, there should at least be python installed ;->
result = self.overlord.rpms.glob("python*", False)
self.assert_on_fault(result)
def test_glob_flatten(self):
result = self.overlord.rpms.glob("python*", True)
self.assert_on_fault(result)
def test_glob_nomatch(self):
# shouldn't be any rpms called "-" ;->
result = self.overlord.rpms.glob("-*")
self.assert_on_fault(result)
def test_glob_gpg_pubkey(self):
# gpg-pubkey packages are weird rpm virtual packages, and tend to do
# weird things, so try that too
result = self.overlord.rpms.glob("gpg-pubkey*")
self.assert_on_fault(result)
def test_glob_gpg_pubkey_no_flat(self):
# gpg-pubkey packages are weird rpm virtual packages, and tend to do
# weird things, so try that too
result = self.overlord.rpms.glob("gpg-pubkey*", False)
self.assert_on_fault(result)
def test_glob_match_all(self):
result = self.overlord.rpms.glob("*", False)
self.assert_on_fault(result)
class TestSmart(BaseTest):
module = "smart"
def test_info(self):
result = self.overlord.smart.info()
self.assert_on_fault(result)
class TestSysctl(BaseTest):
module = "sysctl"
def test_list(self):
result = self.overlord.sysctl.list()
self.assert_on_fault(result)
def test_get(self):
result = self.overlord.sysctl.get("kernel.max_lock_depth")
self.assert_on_fault(result)
class TestYum(BaseTest):
module = "yumcmd"
def test_check_update(self):
result = self.overlord.yumcmd.check_update()
self.assert_on_fault(result)
def test_check_update_empty_filter(self):
results = self.overlord.yumcmd.check_update([])
self.assert_on_fault(results)
results_no_filter = self.overlord.yumcmd.check_update()
assert results == results_no_filter
def test_check_update_splat_filter(self):
results = self.overlord.yumcmd.check_update(['*'])
self.assert_on_fault(results)
results_no_filter = self.overlord.yumcmd.check_update()
assert results == results_no_filter
# this fails on fc6, need to test on newer yum to see whats up
# def test_update_non_existent_package(self):
# result = self.overlord.yumcmd.update("thisisapackage-_-that_should==never+exist234234234")
# self.assert_on_fault(result)
# # hmm, that method always returns True... not much to test there... -akl
class TestIptables(BaseTest):
module = "iptables"
def test_dump(self):
result = self.overlord.iptables.dump()
# at the moment, we dont set anything up
# to verify, so this is just a basic
# "does it crash" test
def test_policy(self):
result = self.overlord.iptables.policy()
class TestIptablesPort(BaseTest):
module = "iptables.port"
def test_inventory(self):
results = self.overlord.iptables.port.inventory()
# doesnt have an inventory, so er... -akl
class TestEchoTest(BaseTest):
module = "echo"
def test_run_string(self):
result=self.overlord.echo.run_string("heyman")
self.assert_on_fault(result)
def test_run_int(self):
result=self.overlord.echo.run_int(12)
self.assert_on_fault(result)
def test_run_float(self):
result=self.overlord.echo.run_float(12.0)
self.assert_on_fault(result)
def test_run_options(self):
result=self.overlord.echo.run_options("hehehh")
self.assert_on_fault(result)
def test_run_list(self):
result=self.overlord.echo.run_list(['one','two','three'])
self.assert_on_fault(result)
def test_run_hash(self):
result=self.overlord.echo.run_hash({'one':1,'two':2})
self.assert_on_fault(result)
def test_run_boolean(self):
result=self.overlord.echo.run_hash(True)
self.assert_on_fault(result)
class TestSystem(BaseTest):
module = "system"
def test_list_methods(self):
result = self.overlord.system.list_methods()
self.assert_on_fault(result)
def test_listMethods(self):
result = self.overlord.system.listMethods()
self.assert_on_fault(result)
def test_list_modules(self):
result = self.overlord.system.list_modules()
self.assert_on_fault(result)
#FIXME: we really should just implement these for the system stuff
# as well
def test_module_version(self):
pass
def test_module_api_version(self):
pass
def test_module_description(self):
pass
def test_module_get_method_args(self):
pass
#import time
#class TestAsyncTest(BaseTest):
# module = "async.test"
# nforks=1
# async=True
# def test_sleep_async(self):
# job_id = self.overlord.test.sleep(5)
# print "job_id", job_id
# time.sleep(5)
# (return_code, results) = self.overlord.job_status(job_id)
# print "return_code", return_code
# print "results", results
#
# def test_add_async(self):
# job_id = self.overlord.test.add(1,5)
# print "job_id", job_id
# time.sleep(6)
# (return_code, results) = self.overlord.job_status(job_id)
# print "return_code", return_code
# print "results", results
class TestUsers(BaseTest):
import random as r
module = "users"
testuser1 = "fni_testuser1"
testuser2 = "fni_testuser2"
testgroup1 = "fni_testgroup1"
testgroup2 = "fni_testgroup2"
def __unused_gid__(self):
potentialgid = self.r.randint(2,65535)
return potentialgid if self.overlord.users.gid_exists(potentialgid) else self.__unused_gid__()
def __unused_uid__(self):
potentialuid = self.r.randint(2,65535)
return potentialuid if self.overlord.users.uid_exists(potentialuid) else self.__unused_uid__()
def __make_test_user__(self):
return self.overlord.users.user_add(TestUsers.testuser1)
def __make_test_user2__(self):
return self.overlord.users.user_add(TestUsers.testuser2)
def __make_test_group__(self):
return self.overlord.users.group_add(TestUsers.testgroup1)
def __make_test_group2__(self):
self.overlord.users.group_add(TestUsers.testgroup2)
def __cleanup__(self):
self.overlord.users.user_del(TestUsers.testuser1) if self.overlord.users.user_exists(TestUsers.testuser1) else True
self.overlord.users.user_del(TestUsers.testuser2) if self.overlord.users.user_exists(TestUsers.testuser2) else True
self.overlord.users.group_del(TestUsers.testgroup1) if self.overlord.users.group_exists(TestUsers.testgroup1) else True
self.overlord.users.group_del(TestUsers.testgroup2) if self.overlord.users.group_exists(TestUsers.testgroup2) else True
def test_user_add(self):
try:
result = self.__make_test_user__()
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_del(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_del(TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_group_add(self):
try:
result = self.__make_test_group__()
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_group_del(self):
try:
self.__make_test_group__()
result = self.overlord.users.group_del(TestUsers.testgroup1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_groups_add(self):
try:
result = self.overlord.users.groups_add(TestUsers.testgroup1,TestUsers.testgroup2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_users_add(self):
try:
result = self.overlord.users.users_add(TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_groups_del(self):
try:
self.__make_test_group__()
self.__make_test_group2__()
result = self.overlord.users.groups_del(TestUsers.testgroup1,TestUsers.testgroup2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_users_del(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
result = self.overlord.users.users_del(TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_group_set_gid_non_unique(self):
try:
self.__make_test_group__()
result = self.overlord.users.group_set_gid_non_unique(TestUsers.testgroup1,self.__unused_gid__())
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_group_set_gid(self):
try:
self.__make_test_group__()
result = self.overlord.users.group_set_gid(TestUsers.testgroup1,self.__unused_gid__())
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_group_set_groupname(self):
try:
self.__make_test_group__()
result = self.overlord.users.group_set_groupname(TestUsers.testgroup1,TestUsers.testgroup2)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_group_add_non_unique(self):
try:
result = self.overlord.users.group_add_non_unique(TestUsers.testgroup1,self.__unused_gid__())
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_lock(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_lock(TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_users_lock(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
result = self.overlord.users.users_lock(TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_user_set_shell(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_set_shell(TestUsers.testuser1,"/bin/false")
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_users_set_shell(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
result = self.overlord.users.users_set_shell("/bin/false",TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_user_set_home(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_set_home(TestUsers.testuser1,"/home/"+TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_set_loginname(self):
try:
self.__make_test_user__()
self.overlord.users.user_set_loginname(TestUsers.testuser1,TestUsers.testuser2)
result = self.overlord.users.user_set_loginname(TestUsers.testuser2,TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_set_comment(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_set_comment(TestUsers.testuser1,"zyxxyz")
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_set_expiredate(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_set_expiredate(TestUsers.testuser1,999999)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_users_set_expiredate(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
result = self.overlord.users.users_set_expiredate(999999,TestUsers.testuser1,TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_user_set_uid_non_unique(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_set_uid_non_unique(TestUsers.testuser1,self.__unused_uid__())
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_set_uid(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_set_uid(TestUsers.testuser1,self.__unused_uid__())
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_set_inactive(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_set_inactive(TestUsers.testuser1,999999)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_users_set_inactive(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
result = self.overlord.users.users_set_inactive(999999,TestUsers.testuser1,TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_user_move_home(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_move_home(TestUsers.testuser1,"/home/"+TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_unlock(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_unlock(TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_users_unlock(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
result = self.overlord.users.users_unlock(TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_users_add_to_group(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
self.__make_test_group__()
result = self.overlord.users.users_add_to_group(TestUsers.testgroup1,TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_user_add_to_group(self):
try:
self.__make_test_user__()
self.__make_test_group__()
result = self.overlord.users.user_add_to_group(TestUsers.testuser1,TestUsers.testgroup1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_users_set_gid(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
self.__make_test_group__()
result = self.overlord.users.users_set_gid(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th],TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_user_set_gid(self):
try:
self.__make_test_user__()
self.__make_test_group__()
result = self.overlord.users.user_set_gid(TestUsers.testuser1,self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th])
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_users_set_group(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
self.__make_test_group__()
result = self.overlord.users.users_set_group(TestUsers.testgroup1,TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_user_set_group(self):
try:
self.__make_test_user__()
self.__make_test_group__()
result = self.overlord.users.user_set_group(TestUsers.testuser1,TestUsers.testgroup1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_passwd(self):
try:
self.__make_test_user__()
self.__make_test_group__()
result = self.overlord.users.passwd(TestUsers.testuser1,"".join(self.r.sample([chr(x) for x in range(48,58)+range(65,91)+range(97,123)],8)))
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_user_exists(self):
try:
self.__make_test_user__()
result = self.overlord.users.user_exists(TestUsers.testuser1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_uid_exists(self):
try:
self.__make_test_user__()
result = self.overlord.users.uid_exists(self.overlord.users.user_to_uid(TestUsers.testuser1)[self.th])
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_group_exists(self):
try:
self.__make_test_group__()
result = self.overlord.users.group_exists(TestUsers.testgroup1)
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_gid_exists(self):
try:
self.__make_test_group__()
result = self.overlord.users.gid_exists(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th])
self.assert_on_fault(result)
assert result[self.th] == True
finally:
self.__cleanup__()
def test_users_exist(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
result = self.overlord.users.users_exist(TestUsers.testuser1,TestUsers.testuser2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_uids_exist(self):
try:
self.__make_test_user__()
self.__make_test_user2__()
result = self.overlord.users.uids_exist(self.overlord.users.user_to_uid(TestUsers.testuser1)[self.th],self.overlord.users.user_to_uid(TestUsers.testuser2)[self.th])
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_groups_exist(self):
try:
self.__make_test_group__()
self.__make_test_group2__()
result = self.overlord.users.groups_exist(TestUsers.testgroup1,TestUsers.testgroup2)
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def test_gids_exist(self):
try:
self.__make_test_group__()
result = self.overlord.users.gids_exist(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th],self.overlord.users.group_to_gid(TestUsers.testgroup2)[self.th])
self.assert_on_fault(result)
assert result[self.th] == [True,True]
finally:
self.__cleanup__()
def __trivial_test__(self,function):
def list_test():
try:
result = function()
self.assert_on_fault(result)
finally:
self.__cleanup__()
return list_test
def test_user_list(self): self.__trivial_test__(self.overlord.users.user_list)()
def test_users_list(self): self.__trivial_test__(self.overlord.users.users_list)()
def test_group_list(self): self.__trivial_test__(self.overlord.users.group_list)()
def test_groups_list(self): self.__trivial_test__(self.overlord.users.groups_list)()
def test_uid_list(self): self.__trivial_test__(self.overlord.users.uid_list)()
def test_uids_list(self): self.__trivial_test__(self.overlord.users.uids_list)()
def test_gid_list(self): self.__trivial_test__(self.overlord.users.gid_list)()
def test_gids_list(self): self.__trivial_test__(self.overlord.users.gids_list)()
def __info_test__(self,function):
def info_test(target):
try:
self.__make_test_user__()
self.__make_test_user2__()
self.__make_test_group__()
self.__make_test_group2__()
result = function(target)
self.assert_on_fault(result)
finally:
self.__cleanup__()
return info_test
def test_user_info(self): self.__info_test__(self.overlord.users.user_info)(TestUsers.testuser1)
def test_group_info(self): self.__info_test__(self.overlord.users.group_info)(TestUsers.testgroup1)
def test_uid_info(self): self.__info_test__(self.overlord.users.uid_info)(self.overlord.users.user_to_uid(TestUsers.testuser1)[self.th])
def test_gid_info(self): self.__info_test__(self.overlord.users.gid_info)(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th])
def __plural_info_test__(self,function):
def plural_info_test(*targets):
try:
self.__make_test_user__()
self.__make_test_user2__()
self.__make_test_group__()
self.__make_test_group2__()
result = function(*targets)
self.assert_on_fault(result)
finally:
self.__cleanup__()
return plural_info_test
def test_users_info(self): self.__plural_info_test__(self.overlord.users.users_info)(TestUsers.testuser1,TestUsers.testuser2)
def test_groups_info(self): self.__plural_info_test__(self.overlord.users.groups_info)(TestUsers.testgroup1,TestUsers.testgroup2)
def test_uids_info(self): self.__plural_info_test__(self.overlord.users.uids_info)(self.overlord.users.user_to_uid(TestUsers.testuser1)[self.th],self.overlord.users.user_to_uid(TestUsers.testuser2)[self.th])
def test_gids_info(self): self.__plural_info_test__(self.overlord.users.gids_info)(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th],self.overlord.users.group_to_gid(TestUsers.testgroup2)[self.th])
def user_to_uid(self): self.__info_test__(self.overlord.users.user_to_uid)(TestUsers.testuser1)
def group_to_guid(self): self.__info_test__(self.overlord.users.group_to_guid)(TestUsers.testgroup1)
def users_to_uids(self): self.__plural_info_test__(self.overlord.users.users_to_uids)(TestUsers.testuser1,TestUsers.testuser2)
def groups_to_gids(self): self.__plural_info_test__(self.overlord.users.groups_to_gids)(TestUsers.testgroup1,TestUsers.testgroup2)
_______________________________________________
Func-list mailing list
Func-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/func-list