Func Users Module v0.5

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hey guys, it's time for another release of the Func Users & Groups management module.

The additions for 0.5 are mostly polish:

- register_method_args has been implemented.
- unit tests have been added to test_client.py for the methods contained in the users module.

It seems to me like all the intended functionality now exists in a working state in the current version of the module, and that it's about ready for inclusion into the source tree. What do you guys think?

The relevant files are attached.
###############################################################################
#
# Func Users and Group management module.
# Author: Gregory Masseau <gjmasseau@xxxxxxxxxxxxxxxxxxx>
#
###############################################################################
#
# Legal: 
#
# This software may be freely redistributed under the terms of the GNU
# general public license.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
###############################################################################
#
# Changelog:
#
# 0.5:
# - (Feature)  Implemented register_method_args.
# - (External) Unit tests added to test_client.py.
#
# 0.4:
# - (Feature)  Password alteration method added.
# - (Feature)  Pluralized methods for all applicable singular methods.
# - (Misc)     General API cleanup.
#
# 0.3:
# - (Feature)  Methods added to create, delete, and modify groups.
# - (Feature)  Methods added to create, delete, and modify users.
# - (Feature)  Manage the memberships of users within groups
#
# 0.2:
# - (Feature)  Most informational methods are complete and working for both 
#              users and groups at this point.
#
# 0.1:
# - (Feature)  Initial release, supporting only some informational query 
#              messages regarding user accounts on the target system.
#
###############################################################################
"""
User management module for func.
"""

from func.minion.modules import func_module
from func.minion import sub_process
import pwd
import grp
from os import system

class UsersModule(func_module.FuncModule):
  version     = "0,5"
  api_version = "0,5"
  description = "Nearly complete."

# INTERNALLY USED METHODS #####################################################
  def __command(self,*list):
    """
    This method is used internally by this module when invoking external commands. It should remain private.
    This method should probably be improved to check the elems for suspicious characters like ';','&&','||'.
    """
    cmd = ''
    for elem in list:
      if elem == '':
        pass
      else:
        cmd = cmd + " '" + str(elem.replace("'","") if (type(elem) == str) else elem) + "'"
#    print "\nCmd: [%s]"%cmd
    return False if system(cmd+" 2>/dev/null 1>/dev/null") else True

  def __plural(self,f):
    return (lambda xs: map(f,xs))

# GROUPADD METHODS ############################################################
  def __groupadd(self,group,*switches):
    """Constructs the proper argument sequence for self.__command and returns it's result."""
    if self.group_exists(group):
      return False
    else:
      return self.__command("/usr/sbin/groupadd",group,*switches)

  def group_add(self,group,*gid):
     """Adds a group on the target system(s)."""
     if gid:
       if self.gid_exists(gid[0]):
         return False
       else:
         print str(gid[0]) + "<-"
         return self.__groupadd(group,"-g",gid[0])
     else:
       return self.__groupadd(group)

  def groups_add(self,*groups):
    """Adds a series of groups on the target system(s)."""
    return self.__plural(self.group_add)(groups)

  def group_add_non_unique(self,group,*gid):
    """Adds a group on the target system(s)."""
    if gid:
      if self.gid_exists(gid[0]):
        return False
      else:
        return self.__groupadd(group,"-o","-g",gid[0])
    else:
      return self.__groupadd(group,"-o")

# GROUPDEL METHODS ############################################################
  def __groupdel(self,group,*switches):
    """Constructs the proper argument sequence for self.__command and returns it's result."""
    if self.group_exists(group):
      return self.__command("/usr/sbin/groupdel",group,*switches)
    else:
      return False

  def group_del(self,group):
    """Deletes a group on the target system(s)."""
    return self.__groupdel(group)
 
  def groups_del(self,*groups):
    """Adds a series of groups."""
    return self.__plural(self.group_del)(groups)

# GROUPMOD METHODS ############################################################
  def __groupmod(self,group,*switches):
    """Constructs the proper argument sequence for self.__command and returns it's result."""
    if self.group_exists(group):
      if switches:
        return self.__command("/usr/sbin/groupmod",group,*switches)
      else:
        return self.__command("/usr/sbin/groupmod",group)
    else:
      return False

  def group_set_gid_non_unique(self,group,gid):
    """Changes the GID of the specified group on the target system(s), allowing non-unique GID."""
    return self.__groupmod(group,"-o","-g",gid)

  def group_set_gid(self,group,gid):
    """Changes the GID of the specified group on the target system(s)."""
    if self.gid_exists(gid):
      return False
    else:
      return self.__groupmod(group,"-g",gid)

  def group_set_groupname(self,group,groupname):
    """Changes the name of the specified group on the target system(s)."""
    if self.group_exists(groupname):
      return False
    else:
      return self.__groupmod(group,"-n",groupname)

# USERADD METHODS #############################################################
  def __useradd(self,user,*switches):
    """Constructs the proper argument sequence for self.__command and returns it's result."""
    if self.user_exists(user):
      return False
    else:
      return self.__command("/usr/sbin/useradd",user)

  def user_add(self,user):
    """Adds a user on the target system(s)."""
    return self.__useradd(user) 

  def users_add(self,*users):
    """Adds a series of users on the target system(s)."""
    return self.__plural(self.user_add)(users)

# USERDEL METHODS #############################################################
  def __userdel(self,user,*switches):
    """Constructs the proper argument sequence for self.__command and returns it's result."""
    if self.user_exists(user):
      return self.__command("/usr/sbin/userdel",user,*switches)
    else:
      return False

  def user_del(self,user,*options):
    """Deletes a user on the target system(s)."""
    switches=[]
    if options:
      for option in options:
        if   option == 'force':
          switches.append('-f')
        elif option == 'remove':
          switches.append('-r')
        else: 
          return False
    return self.__userdel(user,*switches) 

  def users_del(self,*users):
    """Deletes a series of users on the target system(s)."""
    return self.__plural(self.user_del)(users)

# USERMOD METHODS #############################################################
  def __usermod(self,user,*switches):
    """Constructs the proper argument sequence for self.__command and returns it's result."""
    if self.user_exists(user):
      command = []
      if switches:
        command = list(switches)
      command.append(user)
      return self.__command("/usr/sbin/usermod",*command)
    else:
      return False

  def user_lock(self,user):
    """Locks a user account on the target system(s)."""
    return self.__usermod(user,"-L")

  def users_lock(self,*users):
    """Locks a series of user accounts on the target system(s)."""
    return self.__plural(self.user_lock)(users)

  def user_set_shell(self,user,shell):
    """Set a specified user's shell on the target system(s)."""
    return self.__usermod(user,"-s",shell)

  def users_set_shell(self,shell,*users):
    """Set a specified list of users' shell on the target system(s)."""
    return self.__plural(lambda u: self.user_set_shell(u,shell))(users)

  def user_set_home(self,user,home):
    """Change (but don't move the contents of) a user's home folder on the target system(s)."""
    return self.__usermod(user,"-d",home)

  def user_set_loginname(self,user,loginname):
    """Change a user's login name on the target system(s)."""
    return self.__usermod(user,"-l",loginname)

  def user_set_comment(self,user,comment):
    """Change the value of a user's GECOS field -- maybe replace this with a field sensitive version?"""
    return self.__usermod(user,"-c",comment)

  def user_set_expiredate(self,user,expiredate):
    """Set the expity date for a specified user on the target system(s)."""
    return self.__usermod(user,"-e",expiredate)

  def users_set_expiredate(self,expiredate,*users):
    """Set a specified list of users' expiry date on the target system(s)."""
    return self.__plural(lambda u: self.user_set_expiredate(u,expiredate))(users)

  def user_set_uid_non_unique(self,user,uid):
    """Change a user's UID, allowing non-unique UIDs on the target system(s)."""
    return self.__usermod(user,"-u",uid,"-o")

  def user_set_uid(self,user,uid):
    """Change a user's UID on the target system(s)."""
    return self.__usermod(user,"-u",uid)

  def user_set_inactive(self,user,inactive):
    """Set the inactivity timer on a user on the target system(s)."""
    return self.__usermod(user,"-f",inactive)

  def users_set_inactive(self,inactive,*users):
    """Set the inactivity timer on a series of users on the target system(s)."""
    return self.__plural(lambda u: self.user_set_inactive(u,inactive))(users)

  def user_set_gid(self,user,gid):
    """Change a users primary group by GID on the target system(s)."""
    if self.gid_exists(gid):
      return self.__usermod(user,"-g",gid)
    else:
      return False

  def users_set_gid(self,gid,*users):
    """Set a specified list of users' primary GID on the target system(s)."""
    return self.__plural(lambda u: self.user_set_gid(u,gid))(users)

  def user_move_home(self,user,home):
    """Changes and moves a users home folder on the target system(s)."""
    return self.__usermod(user,"-d",home,"-m")

  def user_unlock(self,user):
    """Unlocks a specified user account on the target system(s)."""
    return self.__usermod(user,"-U")

  def users_unlock(self,*users):
    """Unlocks a specified list of users' accounts on the target system(s)."""
    return self.__plural(self.user_unlock)(users)

  def user_add_to_group(self,user,group):
    """Appends the user to a specified group on the target system(s)."""
    if self.group_exists(group):
      return self.__usermod(user,"-aG",group)
    else:
      return False

  def users_add_to_group(self,group,*users):
    """Appends the list of users to a specified group on the target system(s)."""
    return self.__plural(lambda u: self.user_add_to_group(u,group))(users)

  def user_set_group(self,user,group):
    """Changes a users primary group on the target system(s)."""
    if self.group_exists(group):
      gid = self.group_to_gid(group)
      return self.__usermod(user,"-g",gid)
    else:
      return False

  def users_set_group(self,group,*users):
    """Changes a series of users' primary group on the target system(s)."""
    return self.__plural(lambda u: self.user_set_group(u,group))(users)

# PASSWD/CHPASWD METHODS  #####################################################
  def passwd(self,user,passwd):
    """Changes a user's password on the target system(s)."""
    if self.user_exists(user):
      if system("echo "+passwd+" | passwd --stdin "+user):
        return False
      else:
        return True
    else:
      return False

# INFORMATIONAL METHODS #######################################################
# EXISTANCE TEST METHODS
  def user_exists(self,user):
    """Checks to see if a given user exists on the target system(s)."""
    try:
      if pwd.getpwnam(user):
        return True
    except KeyError:
      return False

  def users_exist(self,*users):
    """Checks to see if a series of users exists on the target system(s)."""
    return self.__plural(self.user_exists)(users)

  def uid_exists(self,uid):
    """Checks to see if a given UID exists on the target system(s)."""
    try:
      if pwd.getpwuid(int(uid)):
        return True
    except KeyError:
      return False

  def uids_exist(self,*uids):
    """Checks to see if a series of UIDs exists on the target system(s)."""
    return self.__plural(self.uid_exists)(uids)

  def group_exists(self,group):
    """Checks to see if a given group exists on the target system(s)."""
    try:
      if grp.getgrnam(group):
        return True
    except KeyError:
      return False

  def groups_exist(self,*groups):
    """Checks to see if a series of groups exist on the target system(s)."""
    return self.__plural(self.group_exists)(groups)

  def gid_exists(self,gid):
    """Checks to see if a given GID exists on the target system(s)."""
    try:
      if grp.getgrgid(int(gid)):
        return True
    except KeyError:
      return False

  def gids_exist(self,*gids):
    """Checks to see if a series of GIDs exist on the target system(s)."""
    return self.__plural(self.gid_exists)(gids)

# LISTING METHODS
  def user_list(self):
    """Lists all users on the target system(s)."""
    users = []
    for user in pwd.getpwall():
      users.append(user[0])
    return users

  def users_list(self):
    """Lists all users on the target system(s)."""
    return self.user_list()

  def uid_list(self):
    """Lists all UIDs on the target system(s)."""
    uids = []
    for user in pwd.getpwall():
      uids.append(user[2] if user[2] < 4294967294 else True)
    return uids

  def uids_list(self):
    """Lists all UIDs on the target system(s)."""
    return self.uid_list()

  def group_list(self):
    """Lists all groups on the target system(s)."""
    groups = []
    for group in grp.getgrall():
      groups.append(group[0])
    return groups

  def groups_list(self):
    """Lists all groups on the target system(s)."""
    return self.group_list()

  def gid_list(self):
    """Lists all GIDs on the target system(s)."""
    gids = []
    for group in grp.getgrall():
      gids.append(group[2] if group[2] < 4294967294 else True)
    return gids

  def gids_list(self):
    """Lists all GIDs on the target system(s)."""
    return self.gid_list()

# INFO METHODS
  def user_info(self,user):
    """Returns user info or false for a specified user on the target system(s)."""
    try:
      if pwd.getpwnam(user):
         info = pwd.getpwnam(user)
         return list(info) # I'm not sure why this has to be listed but the method fails to work if it isn't.
    except KeyError:
      return False

  def users_info(self,*users):
    """Returns a list of (group info or False) for a series of users on the target system(s)."""
    return self.__plural(self.user_info)(users)

  def uid_info(self,uid):
    """Returns user info or false for a specified user (by UID) on the target system(s)."""
    try:
      if pwd.getpwuid(uid):
        info = pwd.getpwuid(int(uid))
        return list(info)
    except KeyError:
      return False

  def uids_info(self,*uids):
    """Returns a list (group info or False) for a series of users (by UID) on the target system(s)."""
    return self.__plural(self.uid_info)(uids)

  def group_info(self,group):
    """Returns group info or false for a specified group on the target system(s)."""
    try:
      if grp.getgrnam(group):
        info = grp.getgrnam(group)
        return list(info) #for some reason this needs to be list-ed
    except KeyError:
      return False

  def groups_info(self,*groups):
    """Returns a list (group info or False) for a series of groups on the target system(s)."""
    return self.__plural(self.group_info)(groups)

  def gid_info(self,gid):
    """Returns group info or false for a specified group (by GID) on the target system(s)."""
    try:
      if grp.getgrgid(int(gid)):
        info = grp.getgrgid(int(gid))
        return list(info)
    except KeyError:
      return False

  def gids_info(self,*gids):
    """Returns a list (group info or False) for a series of groups (by GID) on the target system(s)."""
    return self.__plural(self.gid_info)(gids)

# INVENTORY METHODS
  def user_inventory(self):
    """Returns user info for all users on the target system(s)."""
    return pwd.getpwall()

  def users_inventory(self):
    """Returns user info for all users on the target system(s)."""
    return self.users_inventory()

  def group_inventory(self):
    """Returns group info for all users on the target system(s)."""
    return grp.getgrall()

  def groups_inventory(self):
    """Returns group info for all users on the target system(s)."""
    return self.groups_inventory()

# CONVERSION METHODS
  def user_to_uid(self,user):
    """Takes a user name and converts it to the matching UID."""
    try:
      username = pwd.getpwnam(user)[2]
      return username
    except KeyError:
      return False

  def users_to_uids(self,*users):
    """Takes a series of usernames and converts it to a list of matching UIDs."""
    return self.__plural(self.user_to_uid)(users)

  def uid_to_user(self,uid):
    """Takes a UID and converts it to the matching user name."""
    try:
      user = pwd.getpwuid(int(uid))[0]
      return user
    except KeyError:
      return False

  def uids_to_users(self,*uids):
    """Takes a series of UIDs and converts it to a list of matching user names."""
    return self.__plural(self.uid_to_user)(uids)

  def group_to_gid(self,group):
    """Takes a group name and converts it to the matching GID."""
    try:
      groupname = grp.getgrnam(group)[2]
      return groupname
    except KeyError:
      return False

  def groups_to_gids(self,*groups):
    """Takes a series of group names and converts it to a list of matching GIDs."""
    return self.__plural(self.group_to_gid)(groups)

  def gid_to_group(self,gid):
    """Takes a GID and converts it to the matching group name."""
    try:
      group = grp.getgrgid(int(gid))[0]
      return group
    except KeyError:
      return False

  def gids_to_groups(self,*gids):
    """Takes a series of GIDs and converts it to a list of matching group names."""
    return self.__plural(self.gid_to_groups)(gids)

######

  def register_method_args(self):
    password = {
      'type':'string',
      'optional':False,
      'description':'A password.'
    }
    cmdopt = {
      'type':'string',
      'optional':False,
      'description':'An option to the command.'
    }
    cmdopts = {
      'type':'list*',
      'optional':False,
      'description':'An series of options to the command.'
    }
    username = {
      'type':'string',
      'optional':False,
      'description':'A username.',
    }
    usernames = {
      'type':'list*',
      'optional':False,
      'description':'A series of usernames.',
    }
    group = {
      'type':'string',
      'optional':False,
      'description':'A group name.'
    }
    groups = {
      'type':'list*',
      'optional':False,
      'description':'A series of group names.'
    }
    gid = {
      'type':'int',
      'optional':False,
      'description':'A gid.'
    }
    gids = {
      'type':'list*',
      'optional':False,
      'description':'A series of gids.'
    }
    ogid = {
      'type':'list*',
      'optional':False,
      'description':'An optional gid.'
    }
    ouid = {
      'type':'list*',
      'optional':False,
      'description':'An optional uid.'
    }
    uid = {
      'type':'int',
      'optional':False,
      'description':'A uid.'
    }
    uids = {
      'type':'list*',
      'optional':False,
      'description':'A series of uids.'
    }

    return {
      #GROUPADD METHODS
      'group_add':{
        'args':{
          'group':group,
          'gid':ogid
        },
      'description':"Create a group."
      },

      'groups_add':{
        'args':{
          'groups':groups
        },
      'description':"Create series of groups."
      },

      'group_add_non_unique':{
        'args':{
          'group':group,
          'gid':ogid
        },
      'description':"Create a group."
      },

      #GROUPDEL METHODS
      'group_del':{
        'args':{
          'group':group
        },
      'description':"Delete a group."
      },

      'groups_del':{
        'args':{
          'groups':groups
        },
      'description':"Delete a series of groups."
      },

      #GROUPMOD METHODS
      'group_set_gid_non_unique':{
        'args':{
          'group':group,
          'gid':gid
        },
      'description':"Allows a groups gid to be non-unique."
      },

      'group_set_gid':{
        'args':{
          'group':group,
          'gid':gid
        },
      'description':"Set a group's gid."
      },

      'group_set_groupname':{
        'args':{
          'group':group,
          'groupname':group
        },
      'description':"Set a group's groupname."
      },

      #USERADD METHODS
      'user_add':{
        'args':{
          'user':username
        },
      'description':"Create a user."
      },

      'users_add':{
        'args':{
          'users':usernames
        },
      'description':"Create series of users."
      },

      #USERDEL METHODS
      'user_del':{
        'args':{
          'user':username,
          'options':cmdopts,
        },
      'description':"Delete a user's account."
      },

      'users_del':{
        'args':{
          'users':usernames,
        },
      'description':"Delete a series of users' accounts."
      },

      #USERMOD METHODS
      'user_lock':{
        'args':{
          'user':username,
        },
      'description':"Lock a user's account."
      },

      'users_lock':{
        'args':{
          'users':usernames,
        },
      'description':"Lock a series of users' accounts."
      },

      'user_set_shell':{
        'args':{
          'user':username,
          'shell':{
             'type':'string',
              'optional':False,
              'description':"A path to a shell."
          }
        },
      'description':"Set a user's shell."
      },

      'users_set_shell':{
        'args':{
          'users':usernames,
          'shell':{
             'type':'string',
              'optional':False,
              'description':"A path to a shell."
          }
        },
      'description':"Set a series of users' shell."
      },

      'user_set_home':{
        'args':{
          'user':username,
          'home':{
             'type':'string',
              'optional':False,
              'description':"A directory."
          }
        },
      'description':"Set a user's home folder."
      },

      'user_set_loginname':{
        'args':{
          'user':username,
          'loginname':username
        },
      'description':"Set a user's GECOS field."
      },

      'user_set_comment':{
        'args':{
          'user':username,
          'comment':cmdopt
        },
      'description':"Set a user's GECOS field."
      },

      'user_set_expiredate':{
        'args':{
          'user':username,
          'expiredate':cmdopt
        },
      'description':"Set a user's account's expiry date."
      },

      'users_set_expiredate':{
        'args':{
          'expiredate':cmdopt,
          'users':usernames
        },
      'description':"Set a series of users' accounts' expiry date."
      },

      'user_set_uid_non_unique':{
        'args':{
          'user':username,
          'uid':uid
        },
      'description':"Set a user's uid."
      },

      'user_set_uid':{
        'args':{
          'user':username,
          'uid':uid
        },
      'description':"Set a user's uid."
      },

      'user_set_inactive':{
        'args':{
          'user':username,
          'inactive':cmdopt
        },
      'description':"Set a user's inactivity timer."
      },

      'users_set_inactive':{
        'args':{
          'inactive':cmdopt,
          'users':usernames
        },
      'description':"Set a series of users' inactivity timer."
      },

      'user_set_gid':{
        'args':{
          'user':username,
          'gid':gid
        },
      'description':"Set a user's gid."
      },

      'users_set_gid':{
        'args':{
          'gid':gid,
          'users':usernames
        },
      'description':"Set a series of users' gids."
      },

      'user_move_home':{
        'args':{
          'user':username,
          'home':{
             'type':'string',
              'optional':False,
              'description':"A directory."
          }
        },
      'description':"Set a user's home folder and move the contents to the new folder."
      },

      'user_unlock':{
        'args':{
          'user':username,
        },
      'description':"Unlock a user's account."
      },

      'users_unlock':{
        'args':{
          'users':usernames,
        },
      'description':"Unlock a series of users' account."
      },

      'user_add_to_group':{
        'args':{
          'group':group,
          'user':username,
        },
      'description':"Append a user to a group."
      },

      'users_add_to_group':{
        'args':{
          'group':group,
          'users':usernames,
        },
      'description':"Append a series of users to a group."
      },

      'user_set_group':{
        'args':{
          'group':group,
          'user':username,
        },
      'description':"Set a user's group."
      },

      'users_set_group':{
        'args':{
          'group':group,
          'users':usernames,
        },
      'description':"Set a series of users' group."
      },

      #PASSWD METHODS
      'passwd':{
        'args':{
          'user':username,
          'passwd':password
        },
      'description':"Change a user's password."
      },

      #EXISTANCE TEST METHODS
      'uid_exists':{
        'args':{
          'uid':uid
        },
      'description':'Test the existance of a uids.'
      },

      'uids_exist':{
        'args':{
          'uids':uids
        },
      'description':'Test the existance of a series of uids.'
      },

      'gid_exists':{
        'args':{
          'gid':gid
        },
      'description':'Test the existance of a gids.'
      },

      'gids_exist':{
        'args':{
          'gids':gids
        },
      'description':'Test the existance of a series of groups.'
      },

      'user_exists':{
        'args':{
          'user':username
        },
      'description':'Test the existance of a users.'
      },

      'users_exist':{
        'args':{
          'users':usernames
        },
      'description':'Test the existance of a series of users.'
      },

      'group_exists':{
        'args':{
          'group':group
        },
      'description':'Test the existance of a groups.'
      },

      'groups_exist':{
        'args':{
          'groups':groups
        },
      'description':'Test the existance of a series of groups.'
      },

      #LISTING METHODS
      'uid_list':{
        'args':{},
      'description':'Get a list of all uids.'
      },

      'uids_list':{
        'args':{},
      'description':'Get a list of all uids.'
      },

      'gid_list':{
        'args':{},
      'description':'Get a list of all gids.'
      },

      'gids_list':{
        'args':{},
      'description':'Get a list of all groups.'
      },

      'user_list':{
        'args':{},
      'description':'Get a list of all users.'
      },

      'users_list':{
        'args':{},
      'description':'Get a list of all users.'
      },

      'group_list':{
        'args':{},
      'description':'Get a list of all groups.'
      },

      'groups_list':{
        'args':{},
      'description':'Get a list of all groups.'
      },

      #INFO METHODS
      'user_info':{
        'args':{
          'user':username
        },
      'description':'Fetch info for a specified user.'
      },

      'users_info':{
        'args':{
          'users':usernames
        },
      'description':'Fetch info for a specified series of users.'
      },

      'uid_info':{
        'args':{
          'uid':uid
        },
      'description':'Fetch info for a specified uid.'
      },

      'uids_info':{
        'args':{
          'uids':uids
        },
      'description':'Fetch info for a specified series of uids.'
      },

      'group_info':{
        'args':{
          'group':group
        },
      'description':'Fetch info for a specified group.'
      },

      'groups_info':{
        'args':{
          'groups':groups
        },
      'description':'Fetch info for a specified series of groups.'
      },

      'gid_info':{
        'args':{
          'gid':gid
        },
      'description':'Fetch info for a specified gid.'
      },

      'gids_info':{
        'args':{
          'gids':gids
        },
      'description':'Fetch info for a specified series of gids.'
      },

      #INVENTORY METHODS
      'user_inventory':{
        'args':{},
      'description':'Get user info for all users.'
      },

      'users_inventory':{
        'args':{},
      'description':'Get user info for all users.'
      },

      'group_inventory':{
        'args':{},
      'description':'Get group info for all groups.'
      },

      'groups_inventory':{
        'args':{},
      'description':'Get group info for all groups.'
      },

      #CONVERSION METHODS
      'user_to_uid':{
        'args':{
          'user':username
        },
      'description':'Convert a username to a matching uid.'
      },

      'users_to_uids':{
        'args':{
          'users':usernames
        },
      'description':'Convert a series of usernames to a list of matching uids.'
      },

      'uid_to_user':{
        'args':{
          'uid':uid
        },
      'description':'Convert a uid to a username.'
      },

      'uids_to_users':{
        'args':{
          'uids':uids
        },
      'description':'Convert a series of uids to a list of matching usernames.'
      },

      'group_to_gid':{
        'args':{
          'group':group
        },
      'description':'Convert a group to a matching gid.'
      },

      'groups_to_gids':{
        'args':{
          'groups':groups
        },
      'description':'Converts a series of groups to a list of matching gids.'
      },

      'gid_to_group':{
        'args':{
          'gid':gid
        },
      'description':'Converts a gids to a matching groupname.'
      },

      'gids_to_groups':{
        'args':{
          'gids':gids
        },
      'description':'Converts a series of gids to a list of matching groupnames.'
      }

    }


  def ree(self):
    password = {
      'type':'string',
      'optional':False,
      'description':'A password.'
    }
    cmdopt = {
      'type':'string',
      'optional':False,
      'description':'An option to the command.'
    }
    cmdopts = {
      'type':'list*',
      'optional':False,
      'description':'An series of options to the command.'
    }
    username = {
      'type':'string',
      'optional':False,
      'description':'A username.',
    }
    usernames = {
      'type':'list*',
      'optional':False,
      'description':'A series of usernames.',
    }
    group = {
      'type':'string',
      'optional':False,
      'description':'A group name.'
    }
    groups = {
      'type':'list*',
      'optional':False,
      'description':'A series of group names.'
    }
    gid = {
      'type':'int',
      'optional':False,
      'description':'A gid.'
    }
    gids = {
      'type':'list*',
      'optional':False,
      'description':'A series of gids.'
    }
    ogid = {
      'type':'list*',
      'optional':False,
      'description':'An optional gid.'
    }
    ouid = {
      'type':'list*',
      'optional':False,
      'description':'An optional uid.'
    }
    uid = {
      'type':'int',
      'optional':False,
      'description':'A uid.'
    }
    uids = {
      'type':'list*',
      'optional':False,
      'description':'A series of uids.'
    }

    return {
      #GROUPADD METHODS
      'group_add':{
        'args':{
          'group':group,
          'gid':ogid
        },
      'description':"Create a group."
      },

      'groups_add':{
        'args':{
          'groups':groups
        },
      'description':"Create series of groups."
      },

      'group_add_non_unique':{
        'args':{
          'group':group,
          'gid':ogid
        },
      'description':"Create a group."
      },

      #GROUPDEL METHODS
      'group_del':{
        'args':{
          'group':group
        },
      'description':"Delete a group."
      },

      'groups_del':{
        'args':{
          'groups':groups
        },
      'description':"Delete a series of groups."
      },

      #GROUPMOD METHODS
      'group_set_gid_non_unique':{
        'args':{
          'group':group,
          'gid':gid
        },
      'description':"Allows a groups gid to be non-unique."
      },

      'group_set_gid':{
        'args':{
          'group':group,
          'gid':gid
        },
      'description':"Set a group's gid."
      },

      'group_set_groupname':{
        'args':{
          'group':group,
          'groupname':group
        },
      'description':"Set a group's groupname."
      },

      #USERADD METHODS
      'user_add':{
        'args':{
          'user':username
        },
      'description':"Create a user."
      },

      'users_add':{
        'args':{
          'users':usernames
        },
      'description':"Create series of users."
      },

      #USERDEL METHODS
      'user_del':{
        'args':{
          'user':username,
          'options':cmdopts,
        },
      'description':"Delete a user's account."
      },

      'users_del':{
        'args':{
          'users':usernames,
          'options':cmdopts,
        },
      'description':"Delete a series of users' accounts."
      },

      #USERMOD METHODS
      'user_lock':{
        'args':{
          'user':username,
        },
      'description':"Lock a user's account."
      },

      'users_lock':{
        'args':{
          'users':usernames,
        },
      'description':"Lock a series of users' accounts."
      },

      'user_set_shell':{
        'args':{
          'user':username,
          'shell':{
             'type':'string',
              'optional':False,
              'description':"A path to a shell."
          }
        },
      'description':"Set a user's shell."
      },

      'users_set_shell':{
        'args':{
          'users':usernames,
          'shell':{
             'type':'string',
              'optional':False,
              'description':"A path to a shell."
          }
        },
      'description':"Set a series of users' shell."
      },

      'user_set_home':{
        'args':{
          'user':username,
          'home':{
             'type':'string',
              'optional':False,
              'description':"A directory."
          }
        },
      'description':"Set a user's home folder."
      },

      'user_set_loginname':{
        'args':{
          'user':username,
          'loginname':username
        },
      'description':"Set a user's GECOS field."
      },

      'user_set_comment':{
        'args':{
          'user':username,
          'comment':cmdopt
        },
      'description':"Set a user's GECOS field."
      },

      'user_set_expiredate':{
        'args':{
          'user':username,
          'expiredate':cmdopt
        },
      'description':"Set a user's account's expiry date."
      },

      'users_set_expiredate':{
        'args':{
          'expiredate':cmdopt,
          'users':usernames
        },
      'description':"Set a series of users' accounts' expiry date."
      },

      'user_set_uid_non_unique':{
        'args':{
          'user':username
        },
      'description':"Set a user's uid."
      },

      'user_set_uid':{
        'args':{
          'user':username,
          'uid':uid
        },
      'description':"Set a user's uid."
      },

      'user_set_inactive':{
        'args':{
          'user':username,
          'inactive':cmdopt
        },
      'description':"Set a user's inactivity timer."
      },

      'users_set_inactive':{
        'args':{
          'inactive':cmdopt,
          'users':usernames
        },
      'description':"Set a series of users' inactivity timer."
      },

      'user_set_gid':{
        'args':{
          'user':username,
          'gid':gid
        },
      'description':"Set a user's gid."
      },

      'users_set_gid':{
        'args':{
          'gid':gid,
          'users':usernames
        },
      'description':"Set a series of users' gids."
      },

      'user_move_home':{
        'args':{
          'user':username,
          'home':{
             'type':'string',
              'optional':False,
              'description':"A directory."
          }
        },
      'description':"Set a user's home folder and move the contents to the new folder."
      },

      'user_unlock':{
        'args':{
          'user':username,
        },
      'description':"Unlock a user's account."
      },

      'users_unlock':{
        'args':{
          'users':usernames,
        },
      'description':"Unlock a series of users' account."
      },

      'user_add_to_group':{
        'args':{
          'group':group,
          'user':username,
        },
      'description':"Append a user to a group."
      },

      'users_add_to_group':{
        'args':{
          'group':group,
          'users':usernames,
        },
      'description':"Append a series of users to a group."
      },

      'user_set_group':{
        'args':{
          'group':group,
          'user':username,
        },
      'description':"Set a user's group."
      },

      'users_set_group':{
        'args':{
          'group':group,
          'users':usernames,
        },
      'description':"Set a series of users' group."
      },

      #EXISTANCE TEST METHODS
      'uid_exists':{
        'args':{
          'uid':uid
        },
      'description':'Test the existance of a uids.'
      },

      'uids_exist':{
        'args':{
          'uids':uids
        },
      'description':'Test the existance of a series of uids.'
      },

      'gid_exists':{
        'args':{
          'gid':gid
        },
      'description':'Test the existance of a gids.'
      },

      'gids_exist':{
        'args':{
          'gids':gids
        },
      'description':'Test the existance of a series of groups.'
      },

      'user_exists':{
        'args':{
          'user':username
        },
      'description':'Test the existance of a users.'
      },

      'users_exist':{
        'args':{
          'users':usernames
        },
      'description':'Test the existance of a series of users.'
      },

      'group_exists':{
        'args':{
          'group':group
        },
      'description':'Test the existance of a groups.'
      },

      'groups_exist':{
        'args':{
          'groups':groups
        },
      'description':'Test the existance of a series of groups.'
      },

      #LISTING METHODS
      'uid_list':{
        'args':{},
      'description':'Get a list of all uids.'
      },

      'uids_list':{
        'args':{},
      'description':'Get a list of all uids.'
      },

      'gid_list':{
        'args':{},
      'description':'Get a list of all gids.'
      },

      'gids_list':{
        'args':{},
      'description':'Get a list of all groups.'
      },

      'user_list':{
        'args':{},
      'description':'Get a list of all users.'
      },

      'users_list':{
        'args':{},
      'description':'Get a list of all users.'
      },

      'group_list':{
        'args':{},
      'description':'Get a list of all groups.'
      },

      'groups_list':{
        'args':{},
      'description':'Get a list of all groups.'
      },

      #INFO METHODS
      'user_info':{
        'args':{
          'user':username
        },
      'description':'Fetch info for a specified user.'
      },

      'users_info':{
        'args':{
          'users':usernames
        },
      'description':'Fetch info for a specified series of users.'
      },

      'uid_info':{
        'args':{
          'uid':uid
        },
      'description':'Fetch info for a specified uid.'
      },

      'uids_info':{
        'args':{
          'uids':uids
        },
      'description':'Fetch info for a specified series of uids.'
      },

      'group_info':{
        'args':{
          'group':group
        },
      'description':'Fetch info for a specified group.'
      },

      'groups_info':{
        'args':{
          'groups':groups
        },
      'description':'Fetch info for a specified series of groups.'
      },

      'gid_info':{
        'args':{
          'gid':gid
        },
      'description':'Fetch info for a specified gid.'
      },

      'gids_info':{
        'args':{
          'gids':gids
        },
      'description':'Fetch info for a specified series of gids.'
      },

      #INVENTORY METHODS
      'user_inventory':{
        'args':{},
      'description':'Get user info for all users.'
      },

      'users_inventory':{
        'args':{},
      'description':'Get user info for all users.'
      },

      'group_inventory':{
        'args':{},
      'description':'Get group info for all groups.'
      },

      'groups_inventory':{
        'args':{},
      'description':'Get group info for all groups.'
      },

      #CONVERSION METHODS
      'user_to_uid':{
        'args':{
          'user':username
        },
      'description':'Convert a username to a matching uid.'
      },

      'users_to_uids':{
        'args':{
          'users':usernames
        },
      'description':'Convert a series of usernames to a list of matching uids.'
      },

      'uid_to_user':{
        'args':{
          'uid':uid
        },
      'description':'Convert a uid to a username.'
      },

      'uids_to_users':{
        'args':{
          'uids':uids
        },
      'description':'Convert a series of uids to a list of matching usernames.'
      },

      'group_to_gid':{
        'args':{
          'groups':groups
        },
      'description':'Convert a group to a matching gid.'
      },

      'groups_to_gids':{
        'args':{
          'groups':groups
        },
      'description':'Converts a series of groups to a list of matching gids.'
      },

      'gid_to_group':{
        'args':{
          'gid':gid
        },
      'description':'Converts a gids to a matching groupname.'
      },

      'gids_to_groups':{
        'args':{
          'gids':gids
        },
      'description':'Converts a series of gids to a list of matching groupnames.'
      }

    }
#!/usr/bin/python


## Copyright 2008, Various
## Adrian Likins <alikins@xxxxxxxxxx>
##
## This software may be freely redistributed under the terms of the GNU
## general public license.
##

import os
import socket
import unittest
import xmlrpclib

import func.overlord.client as fc
import func.utils
import socket

class BaseTest:
    # assume we are talking to localhost
    # th = socket.gethostname()
    #th = socket.getfqdn()
    th = 'liberia'
    nforks=1
    async=False

    def __init__(self):
        pass

    def setUp(self):
        self.overlord = fc.Overlord(self.th,
                                    nforks=self.nforks,
                                    async=self.async)

    def test_module_version(self):
        mod = getattr(self.overlord, self.module)
        result = mod.module_version()
        self.assert_on_fault(result)

    def test_module_api_version(self):
        mod = getattr(self.overlord, self.module)
        result = mod.module_api_version()        
        self.assert_on_fault(result)

    def test_module_description(self):
        mod = getattr(self.overlord, self.module)
        result = mod.module_description()
        self.assert_on_fault(result)

    def test_module_list_methods(self):
        mod = getattr(self.overlord, self.module)
        result = mod.list_methods()
        self.assert_on_fault(result)

    def test_module_get_method_args(self):
        mod = getattr(self.overlord,self.module)
        arg_result=mod.get_method_args()
        self.assert_on_fault(arg_result)

    def test_module_inventory(self):
        mod = getattr(self.overlord, self.module)
        result = mod.list_methods()
        res = result[self.th]

        # not all modules have an inventory, so check for it
        # FIXME: not real happy about having to call list method for
        #        every module, but it works for now -akl
        if "inventory" in res:
            result = mod.inventory()
        self.assert_on_fault(result)


    # we do this all over the place...
    def assert_on_fault(self, result):
        assert func.utils.is_error(result[self.th]) == False
#        assert type(result[self.th]) != xmlrpclib.Fault

    def assert_on_no_fault(self, results):
        assert func.utils.is_error(results[self.th]) == True

    # attrs set so we can skip these via nosetest
    test_module_version.intro = True
    test_module_api_version.intro = True
    test_module_description.intro = True
    test_module_list_methods.intro = True
    test_module_inventory.intro = True
    test_module_get_method_args.intro = True

class TestTest(BaseTest):
    module = "test"
    def test_add(self):
        result = self.overlord.test.add(1,5)
        self.assert_on_fault(result)
        assert result[self.th] == 6

    def test_add_string(self):
        result = self.overlord.test.add("foo", "bar")
        self.assert_on_fault(result)
        assert result[self.th] == "foobar"

    def test_sleep(self):
        result = self.overlord.test.sleep(1)
        self.assert_on_fault(result)

    def test_explode(self):
        results = self.overlord.test.explode()
        print results
        self.assert_on_no_fault(results)


    def test_explode_no_string(self):
        results = self.overlord.test.explode_no_string()
        print results
        self.assert_on_no_fault(results)


    def _echo_test(self, data):
        result = self.overlord.test.echo(data)
        self.assert_on_fault(result)
        assert result[self.th] == data
        
    # this tests are basically just to test the basic
    # marshalling/demarshaling bits
    def test_echo_int(self):
        self._echo_test(1)

    def test_echo_string(self):
        self._echo_test("caneatcheese")

    def test_echo_array(self):
        self._echo_test([1, 2, "three", "fore", "V",])

    def test_echo_hash(self):
        self._echo_test({"one":1, "too":2, "III":3, "many":8, "lots":12312})

    def test_echo_bool_false(self):
        self._echo_test(False)

    def test_echo_bool_true(self):
        self._echo_test(True)

    def test_echo_float(self):
        self._echo_test(123.456)

    def test_echo_big_float(self):
        self._echo_test(123121232.23)

    def test_echo_bigger_float(self):
        self._echo_test(234234234234234234234.234234234234234)

    def test_echo_little_float(self):
        self._echo_test(0.000000000000000000000000000000037)

    def test_echo_binary(self):
        blob = "348dshke354ts0d9urgk"
        import xmlrpclib
        data = xmlrpclib.Binary(blob)
        self._echo_test(data)

    def test_echo_date(self):
        import datetime
        dt = datetime.datetime(1974, 1, 5, 11, 59 ,0,0, None)
        import xmlrpclib
        data = xmlrpclib.DateTime(dt)
        self._echo_test(data)

    def test_config(self):
        result = self.overlord.test.configfoo()
        config = result[self.th]
        self.assert_on_fault(result)

    def test_config_write(self):
        result = self.overlord.test.config_save()
        config = result[self.th]
        self.assert_on_fault(result)

    def test_config_set(self):
        result = self.overlord.test.config_set("example", 5000)
        config = result[self.th]
        self.assert_on_fault(result)

    def test_config_get_example(self):
        result = self.overlord.test.config_get("example")
        config = result[self.th]
        self.assert_on_fault(result)

    def test_config_get_string(self):
	result = self.overlord.test.config_get("string_option")
        option = result[self.th]
        print option
        assert(type(option) == type("foo"))
	self.assert_on_fault(result)

    def test_config_get_int(self):
        result = self.overlord.test.config_get("int_option")
        option = result[self.th]
        assert(type(option) == type(1))
        self.assert_on_fault(result)

    def test_config_get_bool(self):
        result = self.overlord.test.config_get("bool_option")
        option = result[self.th]
        assert(type(option) == type(True))
        self.assert_on_fault(result)

    def test_config_get_float(self):
        result = self.overlord.test.config_get("float_option")
        option = result[self.th]
        assert(type(option) == type(2.71828183))
        self.assert_on_fault(result)

    def test_config_get_test(self):
	result = self.overlord.test.config_get_test()
	self.assert_on_fault(result)




class TestCommand(BaseTest):
    module = "command"
    def test_echo(self):
        result = self.overlord.command.run("echo -n foo")

        self.assert_on_fault(result)
        assert result[self.th][1] == "foo"

    def test_rpm(self):
        # looksing for some package that should be there, rh specific
        # ish at the moment
        result = self.overlord.command.run("rpm -q filesystem")

        self.assert_on_fault(result)
        assert result[self.th][1].split("-")[0] == "filesystem"


    def test_env(self):
	result = self.overlord.command.run("env",
				           {'BLIPPYFOO':'awesome'})
	self.assert_on_fault(result)
	assert result[self.th][1].find("BLIPPYFOO=awesome") != -1

#    def test_sleep_long(self):
#        result = self.overlord.command.run("sleep 256")
#        self.assert_on_fault(result)

    def test_shell_globs(self):
        result = self.overlord.command.run("ls /etc/cron*")
        self.assert_on_fault(result)

    def test_shell_pipe(self):
        result = self.overlord.command.run("echo 'foobar' | grep foo")
        self.assert_on_fault(result)
        assert result[self.th][1] == "foobar\n"


    def test_shell_redirect(self):
        result = self.overlord.command.run("mktemp")
        tmpfile = result[self.th][1].strip()
        result = self.overlord.command.run("echo foobar >> %s; cat %s" % (tmpfile, tmpfile))
        self.assert_on_fault(result)
        assert result[self.th][1] == "foobar\n"

    def test_shell_multiple_commands(self):
        result = self.overlord.command.run("cal; date; uptime; ls;")
        self.assert_on_fault(result)


class TestCopyfile(BaseTest):
    fn = "/tmp/func_test_file"
    dest_fn = "/tmp/func_test_file_dest"
    content = "this is a func test file"
    module = "copyfile"
    def create_a_file(self, size=1):
        
        f = open(self.fn, "w")
        f.write(self.content*size)
        f.close()

#    def test_local_copyfile(self):
#        result = self.overlord.local.copyfile.send(self.fn, self.dest_fn)
#        print result
#        self.assert_on_fault(result)


    def test_copyfile(self, size=1):
        self.create_a_file(size=size)
        fb = open(self.fn,"r").read()
        data = xmlrpclib.Binary(fb)
        result = self.overlord.copyfile.copyfile(self.dest_fn, data)
        self.assert_on_fault(result)
        assert result[self.th]  == 0
        
 #   def test_copyfile_big(self):
 #       # make a file in the ~70 meg range
 #       self.test_copyfile(size=100)
        
 
    def test_checksum(self):
        self.create_a_file()
        fb = open(self.fn,"r").read()
        data = xmlrpclib.Binary(fb)
        result = self.overlord.copyfile.copyfile(self.dest_fn, data)
        result = self.overlord.copyfile.checksum(self.dest_fn)
        self.assert_on_fault(result)
        assert result[self.th] == "b36a8040e44c16605d7784cdf1b3d9ed04ea7f55"
        

class TestHardware(BaseTest):
    module = "hardware"
    def test_inventory(self):
        result = self.overlord.hardware.inventory()
        self.assert_on_fault(result)

    def test_halinfo(self):
        result = self.overlord.hardware.hal_info()
        self.assert_on_fault(result)

    def test_info(self):
        result = self.overlord.hardware.info()
        self.assert_on_fault(result)


    def test_info_no_devices(self):
        result = self.overlord.hardware.info(False)
        self.assert_on_fault(result)

class TestFileTracker(BaseTest):
    fn = "/etc/hosts"
    fn_glob = "/etc/init.d/*"
    fn_list = ["/etc/hosts", "/etc/func/minion.conf"]
    fn_glob_list = ["/etc/hosts", "/etc/func/*"]
    module = "filetracker"
    def test_track(self):
        result = self.overlord.filetracker.track(self.fn)
        assert result[self.th] == 1
        self.assert_on_fault(result)

    def test_track_glob(self):
        result = self.overlord.filetracker.track(self.fn)
        assert result[self.th] == 1
        self.assert_on_fault(result)

    def test_untrack_glob(self):
        result = self.overlord.filetracker.track(self.fn_glob)
        result = self.overlord.filetracker.untrack(self.fn_glob)
        self.assert_on_fault(result)

    def test_track_fn_list(self):
        result = self.overlord.filetracker.track(self.fn_list)
        assert result[self.th] == 1
        self.assert_on_fault(result)

    def test_untrack_fn_list(self):
        result = self.overlord.filetracker.track(self.fn_list)
        result = self.overlord.filetracker.untrack(self.fn_list)
        self.assert_on_fault(result)

    def test_track_fn_glob_list(self):
        result = self.overlord.filetracker.track(self.fn_glob_list)
        assert result[self.th] == 1
        self.assert_on_fault(result)

    def test_untrack_fn_glob_list(self):
        result = self.overlord.filetracker.track(self.fn_glob_list)
        result = self.overlord.filetracker.untrack(self.fn_glob_list)
        self.assert_on_fault(result)


    def test_inventory(self):
        result = self.overlord.filetracker.track(self.fn)
        result = self.overlord.filetracker.inventory(False)
        self.assert_on_fault(result)
        assert self.fn in result[self.th][0]
#        assert result[self.th][0][3] == 0

    def test_untrack(self):
        result = self.overlord.filetracker.track(self.fn)
        result = self.overlord.filetracker.untrack(self.fn)
        self.assert_on_fault(result)
        result_inv = self.overlord.filetracker.inventory(False)
        tracked_files = result_inv[self.th]
        for i in tracked_files:
            if i[0] == self.fn:
                assert "%s was not properly untracked" % self.fn


class TestMount(BaseTest):
    module = "mount"
    def test_mount_list(self):
        result = self.overlord.mount.list()
        self.assert_on_fault(result)

    # INSERT some clever way to test mount here


class TestNetworkTest(BaseTest):
    module = "networktest"
    def test_ping(self):
        result = self.overlord.networktest.ping(self.th, "-c", "2")
        self.assert_on_fault(result)

    def test_ping_bad_arg(self):
         result = self.overlord.networktest.ping(self.th)
         # this should give us a FuncException
         foo = func.utils.is_error(result[self.th]) 
         
    def test_netstat(self):
        result = self.overlord.networktest.netstat("-n")
        self.assert_on_fault(result)

    def test_traceroute(self):
        result = self.overlord.networktest.traceroute(self.th)
        self.assert_on_fault(result)

    def test_dig(self):
        result = self.overlord.networktest.dig("redhat.com")
        self.assert_on_fault(result)

    def test_isportopen_closed_port(self):
        result = self.overlord.networktest.isportopen(self.th, 34251)
        self.assert_on_fault(result)

    def test_isportopen_open_port(self):
        result = self.overlord.networktest.isportopen(self.th, 51234)
        self.assert_on_fault(result)


class TestProcess(BaseTest):
    module = "process"
    def test_info(self):
        result = self.overlord.process.info()
        self.assert_on_fault(result)

    def test_mem(self):
        result = self.overlord.process.mem()
        self.assert_on_fault(result)

    # FIXME: how to test kill/pkill? start a process with
    #        command and then kill it?


class TestService(BaseTest):
    module = "service"
    def test_inventory(self):
        result = self.overlord.service.inventory()
        self.assert_on_fault(result)
    
    def test_get_enabled(self):
        result = self.overlord.service.get_enabled()
        self.assert_on_fault(result)

    def test_get_running(self):
        result = self.overlord.service.get_running()
        self.assert_on_fault(result)

    def test_get_status(self):
        running_data = self.overlord.service.get_running()[self.th]
        result = self.overlord.service.status(running_data[0][0])
        self.assert_on_fault(result)

        #FIXME: whats a good way to test starting/stoping services without
        #       doing bad things? -akl

class TestRpm(BaseTest):
    module = "rpms"
    def test_inventory(self):
        result = self.overlord.rpms.inventory()
        self.assert_on_fault(result)

    def test_glob(self):
        # if func is running, there should at least be python installed ;->
        result = self.overlord.rpms.glob("python*", False)
        self.assert_on_fault(result)

    def test_glob_flatten(self):
        result = self.overlord.rpms.glob("python*", True)
        self.assert_on_fault(result)

    def test_glob_nomatch(self):
        # shouldn't be any rpms called "-" ;->
        result = self.overlord.rpms.glob("-*")
        self.assert_on_fault(result)

    def test_glob_gpg_pubkey(self):
        # gpg-pubkey packages are weird rpm virtual packages, and tend to do
        # weird things, so try that too
        result = self.overlord.rpms.glob("gpg-pubkey*")
        self.assert_on_fault(result)

    def test_glob_gpg_pubkey_no_flat(self):
        # gpg-pubkey packages are weird rpm virtual packages, and tend to do
        # weird things, so try that too
        result = self.overlord.rpms.glob("gpg-pubkey*", False)
        self.assert_on_fault(result)

    def test_glob_match_all(self):
        result = self.overlord.rpms.glob("*", False)
        self.assert_on_fault(result)



class TestSmart(BaseTest):
    module = "smart"
    def test_info(self):
        result = self.overlord.smart.info()
        self.assert_on_fault(result)
    

class TestSysctl(BaseTest):
    module = "sysctl"
    def test_list(self):
        result = self.overlord.sysctl.list()
        self.assert_on_fault(result)

    def test_get(self):
        result = self.overlord.sysctl.get("kernel.max_lock_depth")
        self.assert_on_fault(result)

class TestYum(BaseTest):
    module = "yumcmd"
    def test_check_update(self):
        result = self.overlord.yumcmd.check_update()
        self.assert_on_fault(result)

    def test_check_update_empty_filter(self):
        results = self.overlord.yumcmd.check_update([])
        self.assert_on_fault(results)
        results_no_filter = self.overlord.yumcmd.check_update()
        assert results == results_no_filter

    def test_check_update_splat_filter(self):
        results = self.overlord.yumcmd.check_update(['*'])
        self.assert_on_fault(results)
        results_no_filter = self.overlord.yumcmd.check_update()
        assert results == results_no_filter

# this fails on fc6, need to test on newer yum to see whats up
#    def test_update_non_existent_package(self):
#        result = self.overlord.yumcmd.update("thisisapackage-_-that_should==never+exist234234234")
#        self.assert_on_fault(result)
#        # hmm, that method always returns True... not much to test there... -akl

class TestIptables(BaseTest):
    module = "iptables"

    def test_dump(self):
        result = self.overlord.iptables.dump()

        # at the moment, we dont set anything up
        # to verify, so this is just a basic
        # "does it crash" test

    def test_policy(self):
        result = self.overlord.iptables.policy()


class TestIptablesPort(BaseTest):
    module = "iptables.port"

    def test_inventory(self):
        results = self.overlord.iptables.port.inventory()
        # doesnt have an inventory, so er... -akl

        
class TestEchoTest(BaseTest):
    module = "echo"

    def test_run_string(self):
        result=self.overlord.echo.run_string("heyman")
        self.assert_on_fault(result)
        
    def test_run_int(self):
        result=self.overlord.echo.run_int(12)
        self.assert_on_fault(result)
        
    def test_run_float(self):
        result=self.overlord.echo.run_float(12.0)
        self.assert_on_fault(result)
        
    def test_run_options(self):
        result=self.overlord.echo.run_options("hehehh")
        self.assert_on_fault(result)

    def test_run_list(self):
        result=self.overlord.echo.run_list(['one','two','three'])
        self.assert_on_fault(result)

    def test_run_hash(self):
        result=self.overlord.echo.run_hash({'one':1,'two':2})
        self.assert_on_fault(result)

    def test_run_boolean(self):
        result=self.overlord.echo.run_hash(True)
        self.assert_on_fault(result)





class TestSystem(BaseTest):
    module = "system"
    def test_list_methods(self):
        result = self.overlord.system.list_methods()
        self.assert_on_fault(result)

    
    def test_listMethods(self):
        result = self.overlord.system.listMethods()
        self.assert_on_fault(result)
    
    def test_list_modules(self):
        result = self.overlord.system.list_modules()
        self.assert_on_fault(result)


    #FIXME: we really should just implement these for the system stuff
    #       as well
    def test_module_version(self):
        pass

    def test_module_api_version(self):
        pass

    def test_module_description(self):
        pass

    def test_module_get_method_args(self):
        pass


#import time
#class TestAsyncTest(BaseTest):
#    module = "async.test"
#    nforks=1
#    async=True
#    def test_sleep_async(self):
#        job_id = self.overlord.test.sleep(5)
#        print "job_id", job_id
#        time.sleep(5)
#        (return_code, results) = self.overlord.job_status(job_id)
#        print "return_code", return_code
#        print "results", results
#
#    def test_add_async(self):
#        job_id = self.overlord.test.add(1,5)
#        print "job_id", job_id
#        time.sleep(6)
#        (return_code, results) = self.overlord.job_status(job_id)
#        print "return_code", return_code
       # print "results", results

class TestUsers(BaseTest):
    import random as r
    module    = "users"
    testuser1 = "fni_testuser1"
    testuser2 = "fni_testuser2"
    testgroup1 = "fni_testgroup1"
    testgroup2 = "fni_testgroup2"

    def __unused_gid__(self):
      potentialgid = self.r.randint(2,65535)
      return potentialgid if self.overlord.users.gid_exists(potentialgid) else self.__unused_gid__()

    def __unused_uid__(self):
      potentialuid = self.r.randint(2,65535)
      return potentialuid if self.overlord.users.uid_exists(potentialuid) else self.__unused_uid__()

    def __make_test_user__(self):
      return self.overlord.users.user_add(TestUsers.testuser1)

    def __make_test_user2__(self):
      return self.overlord.users.user_add(TestUsers.testuser2)

    def __make_test_group__(self):
      return self.overlord.users.group_add(TestUsers.testgroup1)

    def __make_test_group2__(self):
      self.overlord.users.group_add(TestUsers.testgroup2)

    def __cleanup__(self):
      self.overlord.users.user_del(TestUsers.testuser1) if self.overlord.users.user_exists(TestUsers.testuser1) else True
      self.overlord.users.user_del(TestUsers.testuser2) if self.overlord.users.user_exists(TestUsers.testuser2) else True
      self.overlord.users.group_del(TestUsers.testgroup1) if self.overlord.users.group_exists(TestUsers.testgroup1) else True
      self.overlord.users.group_del(TestUsers.testgroup2) if self.overlord.users.group_exists(TestUsers.testgroup2) else True

    def test_user_add(self):
      try: 
        result = self.__make_test_user__()
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally: 
        self.__cleanup__()

    def test_user_del(self):
      try: 
        self.__make_test_user__()
        result = self.overlord.users.user_del(TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()
 
    def test_group_add(self):
      try:
        result = self.__make_test_group__()
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_group_del(self):
      try:
        self.__make_test_group__()
        result = self.overlord.users.group_del(TestUsers.testgroup1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_groups_add(self):
      try:
        result = self.overlord.users.groups_add(TestUsers.testgroup1,TestUsers.testgroup2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_users_add(self):
      try:
        result = self.overlord.users.users_add(TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_groups_del(self):
      try:
        self.__make_test_group__()
        self.__make_test_group2__()
        result = self.overlord.users.groups_del(TestUsers.testgroup1,TestUsers.testgroup2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_users_del(self):
      try:
        self.__make_test_user__()
        self.__make_test_user2__()
        result = self.overlord.users.users_del(TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_group_set_gid_non_unique(self):
      try:
        self.__make_test_group__()
        result = self.overlord.users.group_set_gid_non_unique(TestUsers.testgroup1,self.__unused_gid__())
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_group_set_gid(self):
      try:
        self.__make_test_group__()
        result = self.overlord.users.group_set_gid(TestUsers.testgroup1,self.__unused_gid__())
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()
    
    def test_group_set_groupname(self):
      try:
        self.__make_test_group__()
        result = self.overlord.users.group_set_groupname(TestUsers.testgroup1,TestUsers.testgroup2)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()
    
    def test_group_add_non_unique(self):
      try:
        result = self.overlord.users.group_add_non_unique(TestUsers.testgroup1,self.__unused_gid__())
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_user_lock(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_lock(TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_users_lock(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        result = self.overlord.users.users_lock(TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_user_set_shell(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_set_shell(TestUsers.testuser1,"/bin/false")
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_users_set_shell(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        result = self.overlord.users.users_set_shell("/bin/false",TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_user_set_home(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_set_home(TestUsers.testuser1,"/home/"+TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_user_set_loginname(self):
      try:
        self.__make_test_user__()  
        self.overlord.users.user_set_loginname(TestUsers.testuser1,TestUsers.testuser2)
        result = self.overlord.users.user_set_loginname(TestUsers.testuser2,TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_user_set_comment(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_set_comment(TestUsers.testuser1,"zyxxyz")
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_user_set_expiredate(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_set_expiredate(TestUsers.testuser1,999999)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_users_set_expiredate(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        result = self.overlord.users.users_set_expiredate(999999,TestUsers.testuser1,TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_user_set_uid_non_unique(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_set_uid_non_unique(TestUsers.testuser1,self.__unused_uid__())
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_user_set_uid(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_set_uid(TestUsers.testuser1,self.__unused_uid__())
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_user_set_inactive(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_set_inactive(TestUsers.testuser1,999999)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_users_set_inactive(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        result = self.overlord.users.users_set_inactive(999999,TestUsers.testuser1,TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_user_move_home(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_move_home(TestUsers.testuser1,"/home/"+TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_user_unlock(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_unlock(TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_users_unlock(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        result = self.overlord.users.users_unlock(TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_users_add_to_group(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        self.__make_test_group__()
        result = self.overlord.users.users_add_to_group(TestUsers.testgroup1,TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_user_add_to_group(self):
      try:
        self.__make_test_user__()  
        self.__make_test_group__()
        result = self.overlord.users.user_add_to_group(TestUsers.testuser1,TestUsers.testgroup1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_users_set_gid(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        self.__make_test_group__()
        result = self.overlord.users.users_set_gid(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th],TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_user_set_gid(self):
      try:
        self.__make_test_user__()  
        self.__make_test_group__()
        result = self.overlord.users.user_set_gid(TestUsers.testuser1,self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th])
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_users_set_group(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        self.__make_test_group__()
        result = self.overlord.users.users_set_group(TestUsers.testgroup1,TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_user_set_group(self):
      try:
        self.__make_test_user__()  
        self.__make_test_group__()
        result = self.overlord.users.user_set_group(TestUsers.testuser1,TestUsers.testgroup1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_passwd(self):
      try:
        self.__make_test_user__()  
        self.__make_test_group__()
        result = self.overlord.users.passwd(TestUsers.testuser1,"".join(self.r.sample([chr(x) for x in range(48,58)+range(65,91)+range(97,123)],8)))
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_user_exists(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.user_exists(TestUsers.testuser1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_uid_exists(self):
      try:
        self.__make_test_user__()  
        result = self.overlord.users.uid_exists(self.overlord.users.user_to_uid(TestUsers.testuser1)[self.th])
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_group_exists(self):
      try:
        self.__make_test_group__()
        result = self.overlord.users.group_exists(TestUsers.testgroup1)
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_gid_exists(self):
      try:
        self.__make_test_group__()
        result = self.overlord.users.gid_exists(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th])
        self.assert_on_fault(result)
        assert result[self.th] == True
      finally:
        self.__cleanup__()

    def test_users_exist(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        result = self.overlord.users.users_exist(TestUsers.testuser1,TestUsers.testuser2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_uids_exist(self):
      try:
        self.__make_test_user__()  
        self.__make_test_user2__()  
        result = self.overlord.users.uids_exist(self.overlord.users.user_to_uid(TestUsers.testuser1)[self.th],self.overlord.users.user_to_uid(TestUsers.testuser2)[self.th])
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_groups_exist(self):
      try:
        self.__make_test_group__()
        self.__make_test_group2__()
        result = self.overlord.users.groups_exist(TestUsers.testgroup1,TestUsers.testgroup2)
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()

    def test_gids_exist(self):
      try:
        self.__make_test_group__()
        result = self.overlord.users.gids_exist(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th],self.overlord.users.group_to_gid(TestUsers.testgroup2)[self.th])
        self.assert_on_fault(result)
        assert result[self.th] == [True,True]
      finally:
        self.__cleanup__()
    
    def __trivial_test__(self,function):
      def list_test():
        try:
          result = function()
          self.assert_on_fault(result)
        finally:
          self.__cleanup__()
      return list_test

    def test_user_list(self):    self.__trivial_test__(self.overlord.users.user_list)()
    def test_users_list(self):   self.__trivial_test__(self.overlord.users.users_list)()
    def test_group_list(self):   self.__trivial_test__(self.overlord.users.group_list)()
    def test_groups_list(self):  self.__trivial_test__(self.overlord.users.groups_list)()
    def test_uid_list(self):     self.__trivial_test__(self.overlord.users.uid_list)()
    def test_uids_list(self):    self.__trivial_test__(self.overlord.users.uids_list)()
    def test_gid_list(self):     self.__trivial_test__(self.overlord.users.gid_list)()
    def test_gids_list(self):    self.__trivial_test__(self.overlord.users.gids_list)()

    def __info_test__(self,function):
      def info_test(target):
        try:
          self.__make_test_user__()
          self.__make_test_user2__()
          self.__make_test_group__()
          self.__make_test_group2__()
          result = function(target)
          self.assert_on_fault(result)
        finally:
          self.__cleanup__()
      return info_test

    def test_user_info(self):  self.__info_test__(self.overlord.users.user_info)(TestUsers.testuser1)
    def test_group_info(self): self.__info_test__(self.overlord.users.group_info)(TestUsers.testgroup1)
    def test_uid_info(self):   self.__info_test__(self.overlord.users.uid_info)(self.overlord.users.user_to_uid(TestUsers.testuser1)[self.th])
    def test_gid_info(self):   self.__info_test__(self.overlord.users.gid_info)(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th])

    def __plural_info_test__(self,function):
      def plural_info_test(*targets):
        try:
          self.__make_test_user__()
          self.__make_test_user2__()
          self.__make_test_group__()
          self.__make_test_group2__()
          result = function(*targets)
          self.assert_on_fault(result)
        finally:
          self.__cleanup__()
      return plural_info_test

    def test_users_info(self):  self.__plural_info_test__(self.overlord.users.users_info)(TestUsers.testuser1,TestUsers.testuser2)
    def test_groups_info(self): self.__plural_info_test__(self.overlord.users.groups_info)(TestUsers.testgroup1,TestUsers.testgroup2)
    def test_uids_info(self):   self.__plural_info_test__(self.overlord.users.uids_info)(self.overlord.users.user_to_uid(TestUsers.testuser1)[self.th],self.overlord.users.user_to_uid(TestUsers.testuser2)[self.th])
    def test_gids_info(self):   self.__plural_info_test__(self.overlord.users.gids_info)(self.overlord.users.group_to_gid(TestUsers.testgroup1)[self.th],self.overlord.users.group_to_gid(TestUsers.testgroup2)[self.th])

    def user_to_uid(self):      self.__info_test__(self.overlord.users.user_to_uid)(TestUsers.testuser1)
    def group_to_guid(self):    self.__info_test__(self.overlord.users.group_to_guid)(TestUsers.testgroup1)
    def users_to_uids(self):    self.__plural_info_test__(self.overlord.users.users_to_uids)(TestUsers.testuser1,TestUsers.testuser2)
    def groups_to_gids(self):   self.__plural_info_test__(self.overlord.users.groups_to_gids)(TestUsers.testgroup1,TestUsers.testgroup2)
_______________________________________________
Func-list mailing list
Func-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/func-list

[Index of Archives]     [Fedora Users]     [Linux Networking]     [Fedora Legacy List]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]

  Powered by Linux