--- storage/devicelibs/lvm.py | 33 +++-- storage/devicelibs/swap.py | 4 +- tests/storage/devicelibs/baseclass.py | 43 ++++++ tests/storage/devicelibs/lvm.py | 229 +++++++++++++++++++++++++++++++++ tests/storage/devicelibs/swap.py | 63 +++++++++ 5 files changed, 357 insertions(+), 15 deletions(-) create mode 100644 tests/storage/devicelibs/baseclass.py create mode 100644 tests/storage/devicelibs/lvm.py create mode 100644 tests/storage/devicelibs/swap.py diff --git a/storage/devicelibs/lvm.py b/storage/devicelibs/lvm.py index cd39707..bd39fc9 100644 --- a/storage/devicelibs/lvm.py +++ b/storage/devicelibs/lvm.py @@ -159,18 +159,25 @@ def pvinfo(device): if not vals: raise LVMError("pvinfo failed for %s" % device) - info = {'pv_name': vals[0], - 'vg_name': vals[2], - 'vg_uuid': vals[3]} + # don't raise an exception if pv is not a part of any vg + pv_name = vals[0] + try: + vg_name, vg_uuid = vals[2], vals[3] + except IndexError: + vg_name, vg_uuid = "", "" + + info = {'pv_name': pv_name, + 'vg_name': vg_name, + 'vg_uuid': vg_uuid} + return info -def vgcreate(vg_name, pvs, pe_size): +def vgcreate(vg_name, pv_list, pe_size): argv = ["vgcreate"] if pe_size: argv.extend(["-s", "%dM" % pe_size]) - pv_list = " ".join(pvs) argv.append(vg_name) - argv.append(pv_list) + argv.extend(pv_list) rc = iutil.execWithRedirect("lvm", argv, stdout = "/dev/tty5", @@ -190,7 +197,7 @@ def vgremove(vg_name): raise LVMError("vgremove failed for %s" % vg_name) def vgactivate(vg_name): - rc = iutil.execWithRedirect("lvm", ["vgchange" "-a", "y", vg_name], + rc = iutil.execWithRedirect("lvm", ["vgchange", "-a", "y", vg_name], stdout = "/dev/tty5", stderr = "/dev/tty5", searchPath=1) @@ -207,8 +214,7 @@ def vgdeactivate(vg_name): raise LVMError("vgdeactivate failed for %s" % vg_name) def vgreduce(vg_name, pv_list): - pvs = " ".join(pv_list) - rc = iutil.execWithRedirect("lvm", ["vgreduce", vg_name, pvs], + rc = iutil.execWithRedirect("lvm", ["vgreduce", vg_name] + pv_list, stdout = "/dev/tty5", stderr = "/dev/tty5", searchPath=1) @@ -235,10 +241,9 @@ def lvs(vg_name): buf = iutil.execWithCapture("lvm", ["lvs", "--noheadings", "--nosuffix", "--units", "m", "-o", - "lv_name,lv_uuid,lv_size"], + "lv_name,lv_uuid,lv_size", vg_name], stderr="/dev/tty5") - lvs = {} for line in buf.splitlines(): line = line.strip() @@ -247,6 +252,10 @@ def lvs(vg_name): (name, uuid, size) = line.split() lvs[name] = {"size": size, "uuid": uuid} + + if not lvs: + raise LVMError(_("lvs failed for %s" % vg_name)) + return lvs def lvcreate(vg_name, lv_name, size): @@ -307,5 +316,3 @@ def lvdeactivate(vg_name, lv_name): if rc: raise LVMError("lvdeactivate failed for %s" % lv_path) - - diff --git a/storage/devicelibs/swap.py b/storage/devicelibs/swap.py index 8ee5b5b..38000eb 100644 --- a/storage/devicelibs/swap.py +++ b/storage/devicelibs/swap.py @@ -23,6 +23,7 @@ import resource import iutil +import resource from ..errors import * @@ -71,7 +72,7 @@ def swapon(device, priority=None): argv = [] if isinstance(priority, int) and 0 <= priority <= 32767: - argv.extend(["-p", priority]) + argv.extend(["-p", "%d" % priority]) argv.append(device) rc = iutil.execWithRedirect("swapon", @@ -105,4 +106,3 @@ def swapstatus(device): return status - diff --git a/tests/storage/devicelibs/baseclass.py b/tests/storage/devicelibs/baseclass.py new file mode 100644 index 0000000..d1264f5 --- /dev/null +++ b/tests/storage/devicelibs/baseclass.py @@ -0,0 +1,43 @@ +import unittest +import os +import subprocess + +class TestDevicelibs(unittest.TestCase): + + _LOOP_DEVICES = (("/dev/loop0", "/tmp/test-virtdev0"), + ("/dev/loop1", "/tmp/test-virtdev1")) + + ((_LOOP_DEV0, _LOOP_FILE0), (_LOOP_DEV1, _LOOP_FILE1)) = _LOOP_DEVICES + + def setUp(self): + for dev, file in self._LOOP_DEVICES: + proc = subprocess.Popen(["dd", "if=/dev/zero", "of=%s" % file, "bs=1024", "count=102400"]) + while True: + proc.communicate() + if proc.returncode is not None: + rc = proc.returncode + break + if rc: + raise OSError, "dd failed creating the file %s" % file + + proc = subprocess.Popen(["losetup", dev, file]) + while True: + proc.communicate() + if proc.returncode is not None: + rc = proc.returncode + break + if rc: + raise OSError, "losetup failed setting up the loop device %s" % dev + + def tearDown(self): + for dev, file in self._LOOP_DEVICES: + proc = subprocess.Popen(["losetup", "-d", dev]) + while True: + proc.communicate() + if proc.returncode is not None: + rc = proc.returncode + break + if rc: + raise OSError, "losetup failed removing the loop device %s" % dev + + os.remove(file) diff --git a/tests/storage/devicelibs/lvm.py b/tests/storage/devicelibs/lvm.py new file mode 100644 index 0000000..210ad9d --- /dev/null +++ b/tests/storage/devicelibs/lvm.py @@ -0,0 +1,229 @@ +import baseclass +import unittest +import storage.devicelibs.lvm as lvm + +class TestLVM(baseclass.TestDevicelibs): + + def testLVMStuff(self): + ## + ## pvcreate + ## + # pass + for dev, file in self._LOOP_DEVICES: + self.assertEqual(lvm.pvcreate(dev), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.pvcreate, "/not/existing/device") + + ## + ## pvresize + ## + # pass + for dev, file in self._LOOP_DEVICES: + self.assertEqual(lvm.pvresize(dev, 50), None) + self.assertEqual(lvm.pvresize(dev, 100), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.pvresize, "/not/existing/device", 50) + + ## + ## vgcreate + ## + # pass + self.assertEqual(lvm.vgcreate("test-vg", [self._LOOP_DEV0, self._LOOP_DEV1], 4), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", ["/not/existing/device"], 4) + # vg already exists + self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [self._LOOP_DEV0], 4) + # pe size must be power of 2 + self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [self._LOOP_DEV0], 5) + + ## + ## pvremove + ## + # fail + # cannot remove pv now with vg created + self.assertRaises(lvm.LVMError, lvm.pvremove, self._LOOP_DEV0) + + ## + ## vgdeactivate + ## + # pass + self.assertEqual(lvm.vgdeactivate("test-vg"), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.vgdeactivate, "wrong-vg-name") + + ## + ## vgreduce + ## + # pass + self.assertEqual(lvm.vgreduce("test-vg", [self._LOOP_DEV1]), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [self._LOOP_DEV1]) + self.assertRaises(lvm.LVMError, lvm.vgreduce, "test-vg", ["/not/existing/device"]) + + ## + ## vgactivate + ## + # pass + self.assertEqual(lvm.vgactivate("test-vg"), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.vgactivate, "wrong-vg-name") + + ## + ## pvinfo + ## + # pass + self.assertEqual(lvm.pvinfo(self._LOOP_DEV0)["pv_name"], self._LOOP_DEV0) + # no vg + self.assertEqual(lvm.pvinfo(self._LOOP_DEV1)["pv_name"], self._LOOP_DEV1) + + # fail + self.assertRaises(lvm.LVMError, lvm.pvinfo, "/not/existing/device") + + ## + ## vginfo + ## + # pass + self.assertEqual(lvm.vginfo("test-vg")["pe_size"], "4.00") + + # fail + self.assertRaises(lvm.LVMError, lvm.vginfo, "wrong-vg-name") + + ## + ## lvcreate + ## + # pass + self.assertEqual(lvm.lvcreate("test-vg", "test-lv", 10), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.lvcreate, "wrong-vg-name", "another-lv", 10) + + ## + ## lvdeactivate + ## + # pass + self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "test-vg", "wrong-lv-name") + self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "test-lv") + self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "wrong-lv-name") + + ## + ## lvresize + ## + # pass + self.assertEqual(lvm.lvresize("test-vg", "test-lv", 60), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "wrong-lv-name", 80) + self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "test-lv", 80) + self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "wrong-lv-name", 80) + # changing to same size + self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "test-lv", 60) + + ## + ## lvactivate + ## + # pass + self.assertEqual(lvm.lvactivate("test-vg", "test-lv"), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.lvactivate, "test-vg", "wrong-lv-name") + self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "test-lv") + self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "wrong-lv-name") + + ## + ## lvs + ## + # pass + self.assertEqual(lvm.lvs("test-vg")["test-lv"]["size"], "60.00") + + # fail + self.assertRaises(lvm.LVMError, lvm.lvs, "wrong-vg-name") + + ## + ## has_lvm + ## + # pass + self.assertEqual(lvm.has_lvm(), True) + + # fail + #TODO + + ## + ## lvremove + ## + # pass + self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None) # is deactivation needed? + self.assertEqual(lvm.lvremove("test-vg", "test-lv"), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "wrong-lv-name") + self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "test-lv") + self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "wrong-lv-name") + # lv already removed + self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "test-lv") + + ## + ## vgremove + ## + # pass + self.assertEqual(lvm.vgremove("test-vg"), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.vgremove, "wrong-vg-name") + # vg already removed + self.assertRaises(lvm.LVMError, lvm.vgremove, "test-vg") + + ## + ## pvremove + ## + # pass + for dev, file in self._LOOP_DEVICES: + self.assertEqual(lvm.pvremove(dev), None) + + # fail + self.assertRaises(lvm.LVMError, lvm.pvremove, "/not/existing/device") + # pv already removed + self.assertRaises(lvm.LVMError, lvm.pvremove, self._LOOP_DEV0) + + #def testGetPossiblePhysicalExtents(self): + # pass + self.assertEqual(lvm.getPossiblePhysicalExtents(4), + filter(lambda pe: pe > 4, map(lambda power: 2**power, xrange(3, 25)))) + self.assertEqual(lvm.getPossiblePhysicalExtents(100000), + filter(lambda pe: pe > 100000, map(lambda power: 2**power, xrange(3, 25)))) + + #def testGetMaxLVSize(self): + # pass + # why do we specify the PE ? not needed... + self.assertEqual(lvm.getMaxLVSize(4), 16*1024**2) + + #def testSafeLVMName(self): + # pass + self.assertEqual(lvm.safeLvmName("/strange/lv*name5"), "strange_lvname5") + + #def testClampSize(self): + # pass + self.assertEqual(lvm.clampSize(10, 4), 8L) + self.assertEqual(lvm.clampSize(10, 4, True), 12L) + + #def testVGUsedSpace(self): + #TODO + pass + + #def testVGFreeSpace(self): + #TODO + pass + + +if __name__ == "__main__": + suite = unittest.TestLoader().loadTestsFromTestCase(TestLVM) + unittest.TextTestRunner(verbosity=2).run(suite) + diff --git a/tests/storage/devicelibs/swap.py b/tests/storage/devicelibs/swap.py new file mode 100644 index 0000000..b1e9bd3 --- /dev/null +++ b/tests/storage/devicelibs/swap.py @@ -0,0 +1,63 @@ +import baseclass +import unittest +import storage.devicelibs.swap as swap + +class TestSwap(baseclass.TestDevicelibs): + + def runTest(self): + ## + ## mkswap + ## + # pass + self.assertEqual(swap.mkswap(self._LOOP_DEV0, "swap"), None) + + # fail + self.assertRaises(swap.SwapError, swap.mkswap, "/not/existing/device") + + ## + ## swapon + ## + # pass + self.assertEqual(swap.swapon(self._LOOP_DEV0, 1), None) + + # fail + self.assertRaises(swap.SwapError, swap.swapon, "/not/existing/device") + # not a swap partition + self.assertRaises(swap.SwapError, swap.swapon, self._LOOP_DEV1) + + # pass + # make another swap + self.assertEqual(swap.mkswap(self._LOOP_DEV1, "another-swap"), None) + self.assertEqual(swap.swapon(self._LOOP_DEV1), None) + + ## + ## swapstatus + ## + # pass + self.assertEqual(swap.swapstatus(self._LOOP_DEV0), True) + self.assertEqual(swap.swapstatus(self._LOOP_DEV1), True) + + # does not fail + self.assertEqual(swap.swapstatus("/not/existing/device"), False) + + ## + ## swapoff + ## + # pass + self.assertEqual(swap.swapoff(self._LOOP_DEV1), None) + + # check status + self.assertEqual(swap.swapstatus(self._LOOP_DEV0), True) + self.assertEqual(swap.swapstatus(self._LOOP_DEV1), False) + + self.assertEqual(swap.swapoff(self._LOOP_DEV0), None) + + # fail + self.assertRaises(swap.SwapError, swap.swapoff, "/not/existing/device") + # already off + self.assertRaises(swap.SwapError, swap.swapoff, self._LOOP_DEV0) + + +if __name__ == "__main__": + suite = unittest.TestLoader().loadTestsFromTestCase(TestSwap) + unittest.TextTestRunner(verbosity=2).run(suite) -- 1.6.0.6 _______________________________________________ Anaconda-devel-list mailing list Anaconda-devel-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/anaconda-devel-list