I do a lot of live image making, which results in those loopback devices sometimes not being available. Luckily, losetup will tell us the name of the next free device. Also fix up some imports so they're in the right place, while I'm digging around in these files. --- tests/storage_test/devicelibs_test/baseclass.py | 41 +++++++++++++++---- tests/storage_test/devicelibs_test/crypto_test.py | 46 ++++++++++---------- tests/storage_test/devicelibs_test/lvm_test.py | 31 +++++++------- tests/storage_test/devicelibs_test/mdraid_test.py | 19 +++++---- tests/storage_test/devicelibs_test/mpath_test.py | 4 +- tests/storage_test/devicelibs_test/swap_test.py | 31 +++++++------- 6 files changed, 99 insertions(+), 73 deletions(-) mode change 100644 => 100755 tests/storage_test/devicelibs_test/crypto_test.py mode change 100644 => 100755 tests/storage_test/devicelibs_test/lvm_test.py mode change 100644 => 100755 tests/storage_test/devicelibs_test/mdraid_test.py mode change 100644 => 100755 tests/storage_test/devicelibs_test/mpath_test.py mode change 100644 => 100755 tests/storage_test/devicelibs_test/swap_test.py diff --git a/tests/storage_test/devicelibs_test/baseclass.py b/tests/storage_test/devicelibs_test/baseclass.py index c19bfc3..65c74e8 100644 --- a/tests/storage_test/devicelibs_test/baseclass.py +++ b/tests/storage_test/devicelibs_test/baseclass.py @@ -8,7 +8,7 @@ def makeLoopDev(device_name, file_name): "bs=1024", "count=102400"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) while True: - proc.communicate() + (out, err) = proc.communicate() if proc.returncode is not None: rc = proc.returncode break @@ -18,7 +18,7 @@ def makeLoopDev(device_name, file_name): proc = subprocess.Popen(["losetup", device_name, file_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) while True: - proc.communicate() + (out, err) = proc.communicate() if proc.returncode is not None: rc = proc.returncode break @@ -29,7 +29,7 @@ def removeLoopDev(device_name, file_name): proc = subprocess.Popen(["losetup", "-d", device_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) while True: - proc.communicate() + (out, err) = proc.communicate() if proc.returncode is not None: rc = proc.returncode break @@ -38,18 +38,43 @@ def removeLoopDev(device_name, file_name): os.unlink(file_name) +def getFreeLoopDev(): + # There's a race condition here where another process could grab the loop + # device losetup gives us before we have time to set it up, but that's just + # a chance we'll have to take. + proc = subprocess.Popen(["losetup", "-f"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out = None + + while True: + (out, err) = proc.communicate() + if proc.returncode is not None: + rc = proc.returncode + out = out.strip() + break + + if rc: + raise OSError, "losetup failed to find a free device" + + return out class DevicelibsTestCase(unittest.TestCase): - _LOOP_DEVICES = (("/dev/loop0", "/tmp/test-virtdev0"), - ("/dev/loop1", "/tmp/test-virtdev1")) + _LOOP_DEVICES = ["/tmp/test-virtdev0", "/tmp/test-virtdev1"] + + def __init__(self, *args, **kwargs): + import anaconda_log + anaconda_log.init() - ((_LOOP_DEV0, _LOOP_FILE0), (_LOOP_DEV1, _LOOP_FILE1)) = _LOOP_DEVICES + unittest.TestCase.__init__(self, *args, **kwargs) + self._loopMap = {} def setUp(self): - for dev, file in self._LOOP_DEVICES: + for file in self._LOOP_DEVICES: + dev = getFreeLoopDev() makeLoopDev(dev, file) + self._loopMap[file] = dev def tearDown(self): - for dev, file in self._LOOP_DEVICES: + for (file, dev) in self._loopMap.iteritems(): removeLoopDev(dev, file) diff --git a/tests/storage_test/devicelibs_test/crypto_test.py b/tests/storage_test/devicelibs_test/crypto_test.py old mode 100644 new mode 100755 index 434fdcb..dd05af2 --- a/tests/storage_test/devicelibs_test/crypto_test.py +++ b/tests/storage_test/devicelibs_test/crypto_test.py @@ -7,24 +7,24 @@ import os class CryptoTestCase(baseclass.DevicelibsTestCase): - def setUp(self): - baseclass.DevicelibsTestCase.setUp(self) - import storage.devicelibs.crypto as crypto + def testCrypto(self): + _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] + _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] + import storage.devicelibs.crypto as crypto - def testCrypto(self): ## ## is_luks ## # pass - self.assertEqual(crypto.is_luks(self._LOOP_DEV0), -22) + self.assertEqual(crypto.is_luks(_LOOP_DEV0), -22) self.assertEqual(crypto.is_luks("/not/existing/device"), -22) ## ## luks_format ## # pass - self.assertEqual(crypto.luks_format(self._LOOP_DEV0, passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256), None) + self.assertEqual(crypto.luks_format(_LOOP_DEV0, passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256), None) # make a key file handle, keyfile = tempfile.mkstemp(prefix="key", text=False) @@ -32,25 +32,25 @@ class CryptoTestCase(baseclass.DevicelibsTestCase): os.close(handle) # format with key file - self.assertEqual(crypto.luks_format(self._LOOP_DEV1, key_file=keyfile), None) + self.assertEqual(crypto.luks_format(_LOOP_DEV1, key_file=keyfile), None) # fail self.assertRaises(crypto.CryptoError, crypto.luks_format, "/not/existing/device", passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256) # no passhprase or key file - self.assertRaises(ValueError, crypto.luks_format, self._LOOP_DEV1, cipher="aes-cbc-essiv:sha256", key_size=256) + self.assertRaises(ValueError, crypto.luks_format, _LOOP_DEV1, cipher="aes-cbc-essiv:sha256", key_size=256) ## ## is_luks ## # pass - self.assertEqual(crypto.is_luks(self._LOOP_DEV0), 0) # 0 = is luks - self.assertEqual(crypto.is_luks(self._LOOP_DEV1), 0) + self.assertEqual(crypto.is_luks(_LOOP_DEV0), 0) # 0 = is luks + self.assertEqual(crypto.is_luks(_LOOP_DEV1), 0) ## ## luks_add_key ## # pass - self.assertEqual(crypto.luks_add_key(self._LOOP_DEV0, new_passphrase="another-secret", passphrase="secret"), None) + self.assertEqual(crypto.luks_add_key(_LOOP_DEV0, new_passphrase="another-secret", passphrase="secret"), None) # make another key file handle, new_keyfile = tempfile.mkstemp(prefix="key", text=False) @@ -58,35 +58,35 @@ class CryptoTestCase(baseclass.DevicelibsTestCase): os.close(handle) # add new key file - self.assertEqual(crypto.luks_add_key(self._LOOP_DEV1, new_key_file=new_keyfile, key_file=keyfile), None) + self.assertEqual(crypto.luks_add_key(_LOOP_DEV1, new_key_file=new_keyfile, key_file=keyfile), None) # fail - self.assertRaises(RuntimeError, crypto.luks_add_key, self._LOOP_DEV0, new_passphrase="another-secret", passphrase="wrong-passphrase") + self.assertRaises(crypto.CryptoError, crypto.luks_add_key, _LOOP_DEV0, new_passphrase="another-secret", passphrase="wrong-passphrase") ## ## luks_remove_key ## # fail - self.assertRaises(RuntimeError, crypto.luks_remove_key, self._LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase") + self.assertRaises(RuntimeError, crypto.luks_remove_key, _LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase") # pass - self.assertEqual(crypto.luks_remove_key(self._LOOP_DEV0, del_passphrase="another-secret", passphrase="secret"), None) + self.assertEqual(crypto.luks_remove_key(_LOOP_DEV0, del_passphrase="another-secret", passphrase="secret"), None) # remove key file - self.assertEqual(crypto.luks_remove_key(self._LOOP_DEV1, del_key_file=new_keyfile, key_file=keyfile), None) + self.assertEqual(crypto.luks_remove_key(LOOP_DEV1, del_key_file=new_keyfile, key_file=keyfile), None) ## ## luks_open ## # pass - self.assertEqual(crypto.luks_open(self._LOOP_DEV0, "crypted", passphrase="secret"), None) - self.assertEqual(crypto.luks_open(self._LOOP_DEV1, "encrypted", key_file=keyfile), None) + self.assertEqual(crypto.luks_open(_LOOP_DEV0, "crypted", passphrase="secret"), None) + self.assertEqual(crypto.luks_open(_LOOP_DEV1, "encrypted", key_file=keyfile), None) # fail self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", passphrase="secret") self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", key_file=keyfile) # no passhprase or key file - self.assertRaises(ValueError, crypto.luks_open, self._LOOP_DEV1, "another-crypted") + self.assertRaises(ValueError, crypto.luks_open, _LOOP_DEV1, "another-crypted") ## ## luks_status @@ -100,10 +100,10 @@ class CryptoTestCase(baseclass.DevicelibsTestCase): ## luks_uuid ## # pass - uuid = crypto.luks_uuid(self._LOOP_DEV0) - self.assertEqual(crypto.luks_uuid(self._LOOP_DEV0), uuid) - uuid = crypto.luks_uuid(self._LOOP_DEV1) - self.assertEqual(crypto.luks_uuid(self._LOOP_DEV1), uuid) + uuid = crypto.luks_uuid(_LOOP_DEV0) + self.assertEqual(crypto.luks_uuid(_LOOP_DEV0), uuid) + uuid = crypto.luks_uuid(_LOOP_DEV1) + self.assertEqual(crypto.luks_uuid(_LOOP_DEV1), uuid) ## ## luks_close diff --git a/tests/storage_test/devicelibs_test/lvm_test.py b/tests/storage_test/devicelibs_test/lvm_test.py old mode 100644 new mode 100755 index 734644d..e81e529 --- a/tests/storage_test/devicelibs_test/lvm_test.py +++ b/tests/storage_test/devicelibs_test/lvm_test.py @@ -4,16 +4,17 @@ import unittest class LVMTestCase(baseclass.DevicelibsTestCase): - def setUp(self): - baseclass.DevicelibsTestCase.setUp(self) + def testLVM(self): + _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] + _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] + import storage.devicelibs.lvm as lvm - def testLVM(self): ## ## pvcreate ## # pass - for dev, file in self._LOOP_DEVICES: + for dev, file in self._loopMap.iteritems(): self.assertEqual(lvm.pvcreate(dev), None) # fail @@ -23,7 +24,7 @@ class LVMTestCase(baseclass.DevicelibsTestCase): ## pvresize ## # pass - for dev, file in self._LOOP_DEVICES: + for dev, file in self._loopMap.iteritems(): self.assertEqual(lvm.pvresize(dev, 50), None) self.assertEqual(lvm.pvresize(dev, 100), None) @@ -34,21 +35,21 @@ class LVMTestCase(baseclass.DevicelibsTestCase): ## vgcreate ## # pass - self.assertEqual(lvm.vgcreate("test-vg", [self._LOOP_DEV0, self._LOOP_DEV1], 4), None) + self.assertEqual(lvm.vgcreate("test-vg", [_LOOP_DEV0, _LOOP_DEV1], 4), None) # fail self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", ["/not/existing/device"], 4) # vg already exists - self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [self._LOOP_DEV0], 4) + self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [_LOOP_DEV0], 4) # pe size must be power of 2 - self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [self._LOOP_DEV0], 5) + self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [_LOOP_DEV0], 5) ## ## pvremove ## # fail # cannot remove pv now with vg created - self.assertRaises(lvm.LVMError, lvm.pvremove, self._LOOP_DEV0) + self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0) ## ## vgdeactivate @@ -63,10 +64,10 @@ class LVMTestCase(baseclass.DevicelibsTestCase): ## vgreduce ## # pass - self.assertEqual(lvm.vgreduce("test-vg", [self._LOOP_DEV1]), None) + self.assertEqual(lvm.vgreduce("test-vg", [_LOOP_DEV1]), None) # fail - self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [self._LOOP_DEV1]) + self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [_LOOP_DEV1]) self.assertRaises(lvm.LVMError, lvm.vgreduce, "test-vg", ["/not/existing/device"]) ## @@ -82,9 +83,9 @@ class LVMTestCase(baseclass.DevicelibsTestCase): ## pvinfo ## # pass - self.assertEqual(lvm.pvinfo(self._LOOP_DEV0)["pv_name"], self._LOOP_DEV0) + self.assertEqual(lvm.pvinfo(_LOOP_DEV0)["pv_name"], _LOOP_DEV0) # no vg - self.assertEqual(lvm.pvinfo(self._LOOP_DEV1)["pv_name"], self._LOOP_DEV1) + self.assertEqual(lvm.pvinfo(_LOOP_DEV1)["pv_name"], _LOOP_DEV1) # fail self.assertRaises(lvm.LVMError, lvm.pvinfo, "/not/existing/device") @@ -189,13 +190,13 @@ class LVMTestCase(baseclass.DevicelibsTestCase): ## pvremove ## # pass - for dev, file in self._LOOP_DEVICES: + for dev, file in self._loopMap.iteritems(): self.assertEqual(lvm.pvremove(dev), None) # fail self.assertRaises(lvm.LVMError, lvm.pvremove, "/not/existing/device") # pv already removed - self.assertRaises(lvm.LVMError, lvm.pvremove, self._LOOP_DEV0) + self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0) #def testGetPossiblePhysicalExtents(self): # pass diff --git a/tests/storage_test/devicelibs_test/mdraid_test.py b/tests/storage_test/devicelibs_test/mdraid_test.py old mode 100644 new mode 100755 index 9dde22d..1f7849e --- a/tests/storage_test/devicelibs_test/mdraid_test.py +++ b/tests/storage_test/devicelibs_test/mdraid_test.py @@ -5,11 +5,12 @@ import time class MDRaidTestCase(baseclass.DevicelibsTestCase): - def setUp(self): - baseclass.DevicelibsTestCase.setUp(self) + def testMDRaid(self): + _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] + _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] + import storage.devicelibs.mdraid as mdraid - def testMDRaid(self): ## ## getRaidLevels ## @@ -28,7 +29,7 @@ class MDRaidTestCase(baseclass.DevicelibsTestCase): # fail # unsupported raid - self.assertRaises(ValueError, mdraid.get_raid_min_members, 4) + self.assertRaises(ValueError, mdraid.get_raid_min_members, 8) ## ## get_raid_max_spares @@ -42,13 +43,13 @@ class MDRaidTestCase(baseclass.DevicelibsTestCase): # fail # unsupported raid - self.assertRaises(ValueError, mdraid.get_raid_max_spares, 4, 5) + self.assertRaises(ValueError, mdraid.get_raid_max_spares, 8, 5) ## ## mdcreate ## # pass - self.assertEqual(mdraid.mdcreate("/dev/md0", 1, [self._LOOP_DEV0, self._LOOP_DEV1]), None) + self.assertEqual(mdraid.mdcreate("/dev/md0", 1, [_LOOP_DEV0, _LOOP_DEV1]), None) # wait for raid to settle time.sleep(2) @@ -77,7 +78,7 @@ class MDRaidTestCase(baseclass.DevicelibsTestCase): ## mdactivate ## # pass - self.assertEqual(mdraid.mdactivate("/dev/md0", [self._LOOP_DEV0, self._LOOP_DEV1], super_minor=0), None) + self.assertEqual(mdraid.mdactivate("/dev/md0", [_LOOP_DEV0, _LOOP_DEV1], super_minor=0), None) # wait for raid to settle time.sleep(2) @@ -93,8 +94,8 @@ class MDRaidTestCase(baseclass.DevicelibsTestCase): # deactivate first self.assertEqual(mdraid.mddeactivate("/dev/md0"), None) - self.assertEqual(mdraid.mddestroy(self._LOOP_DEV0), None) - self.assertEqual(mdraid.mddestroy(self._LOOP_DEV1), None) + self.assertEqual(mdraid.mddestroy(_LOOP_DEV0), None) + self.assertEqual(mdraid.mddestroy(_LOOP_DEV1), None) # fail # not a component diff --git a/tests/storage_test/devicelibs_test/mpath_test.py b/tests/storage_test/devicelibs_test/mpath_test.py old mode 100644 new mode 100755 index 813a516..377b50d --- a/tests/storage_test/devicelibs_test/mpath_test.py +++ b/tests/storage_test/devicelibs_test/mpath_test.py @@ -3,11 +3,9 @@ import baseclass import unittest class MPathTestCase(baseclass.DevicelibsTestCase): - def setUp(self): - baseclass.DevicelibsTestCase.setUp(self) + def testMPath(self): import storage.devicelibs.mpath as mpath - def testMPath(self): ## ## parseMultipathOutput ## diff --git a/tests/storage_test/devicelibs_test/swap_test.py b/tests/storage_test/devicelibs_test/swap_test.py old mode 100644 new mode 100755 index ddf8d21..68f778d --- a/tests/storage_test/devicelibs_test/swap_test.py +++ b/tests/storage_test/devicelibs_test/swap_test.py @@ -4,16 +4,17 @@ import unittest class SwapTestCase(baseclass.DevicelibsTestCase): - def setUp(self): - baseclass.DevicelibsTestCase.setUp(self) + def testSwap(self): + _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] + _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] + import storage.devicelibs.swap as swap - def testSwap(self): ## ## mkswap ## # pass - self.assertEqual(swap.mkswap(self._LOOP_DEV0, "swap"), None) + self.assertEqual(swap.mkswap(_LOOP_DEV0, "swap"), None) # fail self.assertRaises(swap.SwapError, swap.mkswap, "/not/existing/device") @@ -22,24 +23,24 @@ class SwapTestCase(baseclass.DevicelibsTestCase): ## swapon ## # pass - self.assertEqual(swap.swapon(self._LOOP_DEV0, 1), None) + self.assertEqual(swap.swapon(_LOOP_DEV0, 1), None) # fail self.assertRaises(swap.SwapError, swap.swapon, "/not/existing/device") # not a swap partition - self.assertRaises(swap.SwapError, swap.swapon, self._LOOP_DEV1) + self.assertRaises(swap.SwapError, swap.swapon, _LOOP_DEV1) # pass # make another swap - self.assertEqual(swap.mkswap(self._LOOP_DEV1, "another-swap"), None) - self.assertEqual(swap.swapon(self._LOOP_DEV1), None) + self.assertEqual(swap.mkswap(_LOOP_DEV1, "another-swap"), None) + self.assertEqual(swap.swapon(_LOOP_DEV1), None) ## ## swapstatus ## # pass - self.assertEqual(swap.swapstatus(self._LOOP_DEV0), True) - self.assertEqual(swap.swapstatus(self._LOOP_DEV1), True) + self.assertEqual(swap.swapstatus(_LOOP_DEV0), True) + self.assertEqual(swap.swapstatus(_LOOP_DEV1), True) # does not fail self.assertEqual(swap.swapstatus("/not/existing/device"), False) @@ -48,18 +49,18 @@ class SwapTestCase(baseclass.DevicelibsTestCase): ## swapoff ## # pass - self.assertEqual(swap.swapoff(self._LOOP_DEV1), None) + self.assertEqual(swap.swapoff(_LOOP_DEV1), None) # check status - self.assertEqual(swap.swapstatus(self._LOOP_DEV0), True) - self.assertEqual(swap.swapstatus(self._LOOP_DEV1), False) + self.assertEqual(swap.swapstatus(_LOOP_DEV0), True) + self.assertEqual(swap.swapstatus(_LOOP_DEV1), False) - self.assertEqual(swap.swapoff(self._LOOP_DEV0), None) + self.assertEqual(swap.swapoff(_LOOP_DEV0), None) # fail self.assertRaises(swap.SwapError, swap.swapoff, "/not/existing/device") # already off - self.assertRaises(swap.SwapError, swap.swapoff, self._LOOP_DEV0) + self.assertRaises(swap.SwapError, swap.swapoff, _LOOP_DEV0) def suite(): -- 1.7.0.1 _______________________________________________ Anaconda-devel-list mailing list Anaconda-devel-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/anaconda-devel-list