This patch is for information only and should not be comitted. Signed-off-by: Adam Litke <agl@xxxxxxxxxx> --- blockPull-test.py | 281 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 281 insertions(+), 0 deletions(-) create mode 100644 blockPull-test.py diff --git a/blockPull-test.py b/blockPull-test.py new file mode 100644 index 0000000..a75e0d9 --- /dev/null +++ b/blockPull-test.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python + +import sys +import subprocess +import time +import unittest +import re +import threading +import libvirt + +qemu_img_bin = "/home/aglitke/src/qemu/qemu-img" +virsh_bin = "/home/aglitke/src/libvirt/tools/virsh" + +dom_xml = """ +<domain type='kvm'> + <name>blockPull-test</name> + <memory>131072</memory> + <currentMemory>131072</currentMemory> + <vcpu>1</vcpu> + <os> + <type arch='x86_64' machine='pc-0.13'>hvm</type> + <boot dev='hd'/> + </os> + <features> + <acpi/> + <apic/> + <pae/> + </features> + <clock offset='utc'/> + <on_poweroff>destroy</on_poweroff> + <on_reboot>restart</on_reboot> + <on_crash>restart</on_crash> + <devices> + <emulator>/home/aglitke/src/qemu/x86_64-softmmu/qemu-system-x86_64</emulator> + <disk type='file' device='disk'> + <driver name='qemu' type='qed'/> + <source file='/tmp/disk1.qed' /> + <target dev='vda' bus='virtio'/> + </disk> + <disk type='file' device='disk'> + <driver name='qemu' type='qed'/> + <source file='/tmp/disk2.qed' /> + <target dev='vdb' bus='virtio'/> + </disk> + <disk type='file' device='disk'> + <driver name='qemu' type='raw'/> + <source file='/tmp/disk3.raw' /> + <target dev='vdc' bus='virtio'/> + </disk> + <graphics type='vnc' port='-1' autoport='yes'/> + </devices> +</domain> +""" + +def qemu_img(*args): + global qemu_img_bin + + devnull = open('/dev/null', 'r+') + return subprocess.call([qemu_img_bin] + list(args), stdin=devnull, stdout=devnull) + +def virsh(*args): + global virsh_bin + + devnull = open('/dev/null', 'r+') + return subprocess.Popen([virsh_bin] + list(args), + stdout=subprocess.PIPE).communicate()[0] + #return subprocess.call([virsh_bin] + list(args), + # stdin=devnull, stdout=devnull, stderr=devnull) + +def make_baseimage(name, size_mb): + devnull = open('/dev/null', 'r+') + return subprocess.call(['dd', 'if=/dev/zero', "of=%s" % name, 'bs=1M', + 'count=%i' % size_mb], stdin=devnull, stdout=devnull, stderr=devnull) + +def has_backing_file(path): + global qemu_img_bin + p1 = subprocess.Popen([qemu_img_bin, "info", path], + stdout=subprocess.PIPE).communicate()[0] + matches = re.findall("^backing file:", p1, re.M) + if len(matches) > 0: + return True + return False + +class BlockPullTestCase(unittest.TestCase): + def _error_handler(self, ctx, error, dummy=None): + pass + + def create_disks(self, sparse): + self.disks = [ '/tmp/disk1.qed', '/tmp/disk2.qed', '/tmp/disk3.raw' ] + if sparse: + qemu_img('create', '-f', 'raw', '/tmp/backing1.img', '100M') + qemu_img('create', '-f', 'raw', '/tmp/backing2.img', '100M') + else: + make_baseimage('/tmp/backing1.img', 100) + make_baseimage('/tmp/backing2.img', 100) + qemu_img('create', '-f', 'qed', '-o', 'backing_file=/tmp/backing1.img', self.disks[0]) + qemu_img('create', '-f', 'qed', '-o', 'backing_file=/tmp/backing2.img', self.disks[1]) + qemu_img('create', '-f', 'raw', self.disks[2], '100M') + + def begin(self, sparse=True): + global dom_xml + + libvirt.registerErrorHandler(self._error_handler, None) + self.create_disks(sparse) + self.conn = libvirt.open('qemu:///system') + self.dom = self.conn.createXML(dom_xml, 0) + + def end(self): + self.dom.destroy() + self.conn.close() + +class TestBasicErrors(BlockPullTestCase): + def setUp(self): + self.begin() + + def tearDown(self): + self.end() + + def test_bad_path(self): + try: + self.dom.blockPull('/dev/null', 0, 0) + except libvirt.libvirtError, e: + self.assertEqual(libvirt.VIR_ERR_INVALID_ARG, e.get_error_code()) + else: + e = self.conn.virConnGetLastError() + self.assertEqual(libvirt.VIR_ERR_INVALID_ARG, e[0]) + + def test_abort_no_stream(self): + try: + self.dom.blockJobAbort(self.disks[0], 0) + except libvirt.libvirtError, e: + self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e.get_error_code()) + else: + e = self.conn.virConnGetLastError() + self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e[0]) + + def test_start_same_twice(self): + self.dom.blockPull(self.disks[0], 0, 0) + try: + self.dom.blockPull(self.disks[0], 0, 0) + except libvirt.libvirtError, e: + self.assertEqual(libvirt.VIR_ERR_OPERATION_FAILED, e.get_error_code()) + else: + e = self.conn.virConnGetLastError() + self.assertEqual(libvirt.VIR_ERR_OPERATION_FAILED, e[0]) + + def test_unsupported_disk(self): + try: + self.dom.blockPull(self.disks[2], 0, 0) + except libvirt.libvirtError, e: + self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e.get_error_code()) + else: + e = self.conn.virConnGetLastError() + self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e[0]) + +class TestBasicCommands(BlockPullTestCase): + def setUp(self): + pass + + def tearDown(self): + self.end() + + def test_start_stop(self): + self.begin(sparse=False) + self.dom.blockPull(self.disks[0], 0, 0) + time.sleep(1) + info = self.dom.blockJobInfo(self.disks[0], 0) + self.assertIsNot(None, info) + self.assertEqual(info['type'], libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_PULL) + self.dom.blockJobAbort(self.disks[0], 0) + time.sleep(1) + self.assertIs(None, self.dom.blockJobInfo(self.disks[0], 0)) + + def test_whole_disk(self): + self.begin() + self.assertTrue(has_backing_file(self.disks[0])) + self.dom.blockPull(self.disks[0], 0, 0) + for i in xrange(1, 5): + if self.dom.blockJobInfo(self.disks[0], 0) is None: + break + time.sleep(1) + self.assertFalse(has_backing_file(self.disks[0])) + + def test_two_disks_at_once(self): + self.begin() + disk_list = range(2) + for d in disk_list: + self.dom.blockPull(self.disks[d], 0, 0) + + for i in xrange(5): + for d in disk_list: + info = self.dom.blockJobInfo(self.disks[d], 0) + if info is None: + disk_list.remove(d) + if len(disk_list) == 0: + break + time.sleep(1) + for d in range(2): + self.assertFalse(has_backing_file(self.disks[d])) + +class TestEvents(BlockPullTestCase): + def eventLoopRun(self): + while self.do_events: + libvirt.virEventRunDefaultImpl() + + def eventLoopStart(self): + libvirt.virEventRegisterDefaultImpl() + self.eventLoopThread = threading.Thread(target=self.eventLoopRun, name="libvirtEventLoop") + self.eventLoopThread.setDaemon(True) + self.do_events = True + self.eventLoopThread.start() + + def eventLoopStop(self): + self.do_events = False + + def setUp(self): + self.eventLoopStart() + + def tearDown(self): + self.end() + + @staticmethod + def recordBlockJobEvent(conn, dom, path, type, status, inst): + inst.event = (dom, path, type, status) + + def test_event_complete(self): + self.begin() + self.event = None + self.conn.domainEventRegisterAny(self.dom, libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_JOB, + TestEvents.recordBlockJobEvent, self) + self.dom.blockPull(self.disks[0], 0, 0) + for i in xrange(1, 5): + if self.event is not None: + break + time.sleep(1) + self.eventLoopStop() + self.assertIsNot(None, self.event) + self.assertFalse(has_backing_file(self.disks[0])) + self.assertEqual(self.event[1], self.disks[0]) + self.assertEqual(self.event[2], libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_PULL) + self.assertEqual(self.event[3], libvirt.VIR_DOMAIN_BLOCK_JOB_COMPLETED) + +class TestVirsh(BlockPullTestCase): + def setUp(self): + pass + + def tearDown(self): + self.end() + + def test_blockpull(self): + self.begin() + virsh('blockpull', self.dom.name(), self.disks[0]) + for i in xrange(1, 5): + if self.dom.blockJobInfo(self.disks[0], 0) is None: + break + time.sleep(1) + self.assertFalse(has_backing_file(self.disks[0])) + + def test_job_abort(self): + self.begin(sparse=False) + self.dom.blockPull(self.disks[0], 0, 0) + time.sleep(1) + self.assertIsNot(None, self.dom.blockJobInfo(self.disks[0], 0)) + virsh('blockjob', self.dom.name(), '--abort', self.disks[0]) + time.sleep(2) + self.assertIs(None, self.dom.blockJobInfo(self.disks[0], 0)) + self.assertTrue(has_backing_file(self.disks[0])) + + def test_job_info(self): + self.begin(sparse=False) + virsh('blockpull', self.dom.name(), self.disks[0]) + for i in xrange(1, 10): + output = virsh('blockjob', self.dom.name(), '--info', self.disks[0]) + matches = re.findall("^Block Pull:", output, re.M) + if len(matches) > 0: + break + time.sleep(1) + self.assertFalse(has_backing_file(self.disks[0])) + +if __name__ == '__main__': + unittest.main() -- 1.7.3 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list