This is usable only on python >= 3.4 (or 3.3 with out-of-tree asyncio), however it should be harmless for anyone with older python versions. In simplest case, to have the callbacks queued on the default loop: >>> import libvirtaio >>> libvirtaio.virEventRegisterAsyncIOImpl() The function is not present on non-compatible platforms. Signed-off-by: Wojtek Porczyk <woju@xxxxxxxxxxxxxxxxxxxxxx> --- libvirt-python.spec.in | 1 + libvirtaio.py | 401 +++++++++++++++++++++++++++++++++++++++++++++++++ sanitytest.py | 2 +- setup.py | 12 ++ 4 files changed, 415 insertions(+), 1 deletion(-) create mode 100644 libvirtaio.py diff --git a/libvirt-python.spec.in b/libvirt-python.spec.in index 3021ebd..0ee535e 100644 --- a/libvirt-python.spec.in +++ b/libvirt-python.spec.in @@ -86,6 +86,7 @@ rm -f %{buildroot}%{_libdir}/python*/site-packages/*egg-info %defattr(-,root,root) %doc ChangeLog AUTHORS NEWS README COPYING COPYING.LESSER examples/ %{_libdir}/python3*/site-packages/libvirt.py* +%{_libdir}/python3*/site-packages/libvirtaio.py* %{_libdir}/python3*/site-packages/libvirt_qemu.py* %{_libdir}/python3*/site-packages/libvirt_lxc.py* %{_libdir}/python3*/site-packages/__pycache__/libvirt.cpython-*.py* diff --git a/libvirtaio.py b/libvirtaio.py new file mode 100644 index 0000000..8428f71 --- /dev/null +++ b/libvirtaio.py @@ -0,0 +1,401 @@ +# +# libvirtaio -- asyncio adapter for libvirt +# Copyright (C) 2017 Wojtek Porczyk <woju@xxxxxxxxxxxxxxxxxxxxxx> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, see +# <http://www.gnu.org/licenses/>. +# + +'''Libvirt event loop implementation using asyncio + +Register the implementation of default loop: + + >>> import libvirtaio + >>> libvirtaio.virEventRegisterAsyncIOImpl() + +.. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html +''' + +__author__ = 'Wojtek Porczyk <woju@xxxxxxxxxxxxxxxxxxxxxx>' +__license__ = 'LGPL-2.1+' +__all__ = ['virEventAsyncIOImpl', 'virEventRegisterAsyncIOImpl'] + +import asyncio +import itertools +import logging +import warnings + +import libvirt + +try: + from asyncio import ensure_future +except ImportError: + from asyncio import async as ensure_future + + +class Callback(object): + '''Base class for holding callback + + :param virEventAsyncIOImpl impl: the implementation in which we run + :param cb: the callback itself + :param opaque: the opaque tuple passed by libvirt + ''' + # pylint: disable=too-few-public-methods + + _iden_counter = itertools.count() + + def __init__(self, impl, cb, opaque, *args, **kwargs): + super().__init__(*args, **kwargs) + self.iden = next(self._iden_counter) + self.impl = impl + self.cb = cb + self.opaque = opaque + + assert self.iden not in self.impl.callbacks, \ + 'found {} callback: {!r}'.format( + self.iden, self.impl.callbacks[self.iden]) + self.impl.callbacks[self.iden] = self + + def __repr__(self): + return '<{} iden={}>'.format(self.__clas__.__name__, self.iden) + + def close(self): + '''Schedule *ff* callback''' + self.impl.log.debug('callback %d close(), scheduling ff', self.iden) + self.impl.schedule_ff_callback(self.opaque) + +# +# file descriptors +# + +class Descriptor(object): + '''Manager of one file descriptor + + :param virEventAsyncIOImpl impl: the implementation in which we run + :param int fd: the file descriptor + ''' + def __init__(self, impl, fd): + self.impl = impl + self.fd = fd + self.callbacks = {} + + def _handle(self, event): + '''Dispatch the event to the descriptors + + :param int event: The event (from libvirt's constants) being dispatched + ''' + for callback in self.callbacks.values(): + if callback.event is not None and callback.event & event: + callback.cb(callback.iden, self.fd, event, callback.opaque) + + def update(self): + '''Register or unregister callbacks at event loop + + This should be called after change of any ``.event`` in callbacks. + ''' + # It seems like loop.add_{reader,writer} can be run multiple times + # and will still register the callback only once. Likewise, + # remove_{reader,writer} may be run even if the reader/writer + # is not registered (and will just return False). + + # For the edge case of empty callbacks, any() returns False. + if any(callback.event & ~( + libvirt.VIR_EVENT_HANDLE_READABLE | + libvirt.VIR_EVENT_HANDLE_WRITABLE) + for callback in self.callbacks.values()): + warnings.warn( + 'The only event supported are VIR_EVENT_HANDLE_READABLE ' + 'and VIR_EVENT_HANDLE_WRITABLE', + UserWarning) + + if any(callback.event & libvirt.VIR_EVENT_HANDLE_READABLE + for callback in self.callbacks.values()): + self.impl.loop.add_reader( + self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_READABLE) + else: + self.impl.loop.remove_reader(self.fd) + + if any(callback.event & libvirt.VIR_EVENT_HANDLE_WRITABLE + for callback in self.callbacks.values()): + self.impl.loop.add_writer( + self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_WRITABLE) + else: + self.impl.loop.remove_writer(self.fd) + + def add_handle(self, callback): + '''Add a callback to the descriptor + + :param FDCallback callback: the callback to add + :rtype: None + + After adding the callback, it is immediately watched. + ''' + self.callbacks[callback.iden] = callback + self.update() + + def remove_handle(self, iden): + '''Remove a callback from the descriptor + + :param int iden: the identifier of the callback + :returns: the callback + :rtype: FDCallback + + After removing the callback, the descriptor may be unwatched, if there + are no more handles for it. + ''' + callback = self.callbacks.pop(iden) + self.update() + return callback + + def close(self): + '''''' + self.callbacks.clear() + self.update() + +class DescriptorDict(dict): + '''Descriptors collection + + This is used internally by virEventAsyncIOImpl to hold descriptors. + ''' + def __init__(self, impl): + super().__init__() + self.impl = impl + + def __missing__(self, fd): + descriptor = Descriptor(self.impl, fd) + self[fd] = descriptor + return descriptor + +class FDCallback(Callback): + '''Callback for file descriptor (watcher) + + :param Descriptor descriptor: the descriptor manager + :param int event: bitset of events on which to fire the callback + ''' + # pylint: disable=too-few-public-methods + + def __init__(self, *args, descriptor, event, **kwargs): + super().__init__(*args, **kwargs) + self.descriptor = descriptor + self.event = event + + def __repr__(self): + return '<{} iden={} fd={} event={}>'.format( + self.__class__.__name__, self.iden, self.descriptor.fd, self.event) + + def update(self, *, event): + '''Update the callback and fix descriptor's watchers''' + self.event = event + self.descriptor.update() + +# +# timeouts +# + +class TimeoutCallback(Callback): + '''Callback for timer''' + def __init__(self, *args, timeout, **kwargs): + super().__init__(*args, **kwargs) + self.timeout = timeout + self._task = None + + def __repr__(self): + return '<{} iden={} timeout={}>'.format( + self.__class__.__name__, self.iden, self.timeout) + + @asyncio.coroutine + def _timer(self): + '''An actual timer running on the event loop. + + This is a coroutine. + ''' + while True: + assert self.timeout >= 0, \ + 'invalid timeout {} for running timer'.format(self.timeout) + + try: + if self.timeout > 0: + timeout = self.timeout * 1e-3 + self.impl.log.debug('sleeping %r', timeout) + yield from asyncio.sleep(timeout) + else: + # scheduling timeout for next loop iteration + yield + + except asyncio.CancelledError: + self.impl.log.debug('timer %d cancelled', self.iden) + break + + self.cb(self.iden, self.opaque) + self.impl.log.debug('timer %r callback ended', self.iden) + + def update(self, *, timeout=None): + '''Start or the timer, possibly updating timeout''' + if timeout is not None: + self.timeout = timeout + + if self.timeout >= 0 and self._task is None: + self.impl.log.debug('timer %r start', self.iden) + self._task = ensure_future(self._timer(), + loop=self.impl.loop) + + elif self.timeout < 0 and self._task is not None: + self.impl.log.debug('timer %r stop', self.iden) + self._task.cancel() # pylint: disable=no-member + self._task = None + + def close(self): + '''Stop the timer and call ff callback''' + self.timeout = -1 + self.update() + super().close() + +# +# main implementation +# + +class virEventAsyncIOImpl(object): + '''Libvirt event adapter to asyncio. + + :param loop: asyncio's event loop + + If *loop* is not specified, the current (or default) event loop is used. + ''' + + def __init__(self, *, loop=None): + self.loop = loop or asyncio.get_event_loop() + self.callbacks = {} + self.descriptors = DescriptorDict(self) + self.log = logging.getLogger(self.__class__.__name__) + + def register(self): + '''Register this instance as event loop implementation''' + # pylint: disable=bad-whitespace + self.log.debug('register()') + libvirt.virEventRegisterImpl( + self._add_handle, self._update_handle, self._remove_handle, + self._add_timeout, self._update_timeout, self._remove_timeout) + return self + + def schedule_ff_callback(self, opaque): + '''Schedule a ff callback from one of the handles or timers''' + self.loop.call_soon(libvirt.virEventExecuteFFCallback, opaque) + + def is_idle(self): + '''Returns False if there are leftovers from a connection + + Those may happen if there are sematical problems while closing + a connection. For example, not deregistered events before .close(). + ''' + return not self.callbacks + + def _add_handle(self, fd, event, cb, opaque): + '''Register a callback for monitoring file handle events + + :param int fd: file descriptor to listen on + :param int event: bitset of events on which to fire the callback + :param cb: the callback to be called when an event occurrs + :param opaque: user data to pass to the callback + :rtype: int + :returns: handle watch number to be used for updating and unregistering for events + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFuncFunc + ''' + self.log.debug('add_handle(fd=%d, event=%d, cb=%r, opaque=%r)', + fd, event, cb, opaque) + callback = FDCallback(self, cb, opaque, + descriptor=self.descriptors[fd], event=event) + self.callbacks[callback.iden] = callback + self.descriptors[fd].add_handle(callback) + return callback.iden + + def _update_handle(self, watch, event): + '''Change event set for a monitored file handle + + :param int watch: file descriptor watch to modify + :param int event: new events to listen on + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateHandleFunc + ''' + self.log.debug('update_handle(watch=%d, event=%d)', watch, event) + return self.callbacks[watch].update(event=event) + + def _remove_handle(self, watch): + '''Unregister a callback from a file handle. + + :param int watch: file descriptor watch to stop listening on + :returns: None (see source for explanation) + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc + ''' + self.log.debug('remove_handle(watch=%d)', watch) + callback = self.callbacks.pop(watch) + assert callback is self.descriptors.remove_handle(watch) + callback.close() + + def _add_timeout(self, timeout, cb, opaque): + '''Register a callback for a timer event + + :param int timeout: the timeout to monitor + :param cb: the callback to call when timeout has expired + :param opaque: user data to pass to the callback + :rtype: int + :returns: a timer value + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddTimeoutFunc + ''' + self.log.debug('add_timeout(timeout=%d, cb=%r, opaque=%r)', + timeout, cb, opaque) + callback = TimeoutCallback(self, cb, opaque, timeout=timeout) + self.callbacks[callback.iden] = callback + callback.update() + return callback.iden + + def _update_timeout(self, timer, timeout): + '''Change frequency for a timer + + :param int timer: the timer to modify + :param int timeout: the new timeout value in ms + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateTimeoutFunc + ''' + self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout) + return self.callbacks[timer].update(timeout=timeout) + + def _remove_timeout(self, timer): + '''Unregister a callback for a timer + + :param int timer: the timer to remove + :returns: None (see source for explanation) + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveTimeoutFunc + ''' + self.log.debug('remove_timeout(timer=%d)', timer) + callback = self.callbacks.pop(timer) + callback.close() + +def virEventRegisterAsyncIOImpl(*, loop=None): + '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop + + The implementation object is returned, but in normal usage it can safely be + discarded. + ''' + return virEventAsyncIOImpl(loop=loop).register() diff --git a/sanitytest.py b/sanitytest.py index 6548831..53a739f 100644 --- a/sanitytest.py +++ b/sanitytest.py @@ -350,7 +350,7 @@ for klass in gotfunctions: for func in sorted(gotfunctions[klass]): # These are pure python methods with no C APi if func in ["connect", "getConnect", "domain", "getDomain", - "virEventRegisterAsyncIOImpl"]: + "virEventRegisterAsyncIOImpl", "virEventExecuteFFCallback"]: continue key = "%s.%s" % (klass, func) diff --git a/setup.py b/setup.py index 120ddd5..bac9010 100755 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ import sys import os import os.path import re +import shutil import time MIN_LIBVIRT = "0.9.11" @@ -50,6 +51,12 @@ def have_libvirt_lxc(): except DistutilsExecError: return False +def have_libvirtaio(): + # This depends on asyncio, which in turn depends on "yield from" syntax. + # The asyncio module itself is in standard library since 3.4, but there is + # an out-of-tree version compatible with 3.3. + return sys.version_info >= (3, 3) + def get_pkgconfig_data(args, mod, required=True): """Run pkg-config to and return content associated with it""" f = os.popen("%s %s %s" % (get_pkgcfg(), " ".join(args), mod)) @@ -124,6 +131,9 @@ def get_module_lists(): c_modules.append(modulelxc) py_modules.append("libvirt_lxc") + if have_libvirtaio(): + py_modules.append("libvirtaio") + return c_modules, py_modules @@ -141,6 +151,8 @@ class my_build(build): self.spawn([sys.executable, "generator.py", "libvirt-qemu", apis[1]]) if have_libvirt_lxc(): self.spawn([sys.executable, "generator.py", "libvirt-lxc", apis[2]]) + if have_libvirtaio(): + shutil.copy('libvirtaio.py', 'build') build.run(self) -- 2.5.5
Attachment:
signature.asc
Description: Digital signature
-- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list