The virEvent implementation is tied to a particular loop. When spinning another loop, the callbacks have to be moved to another implementation, so they will have a chance to be invoked, should they be scheduled. If not, file descriptors will be leaking. Signed-off-by: Wojtek Porczyk <woju@xxxxxxxxxxxxxxxxxxxxxx> --- libvirtaio.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/libvirtaio.py b/libvirtaio.py index fc868bd..d161cd1 100644 --- a/libvirtaio.py +++ b/libvirtaio.py @@ -195,9 +195,10 @@ class FDCallback(Callback): return '<{} iden={} fd={} event={}>'.format( self.__class__.__name__, self.iden, self.descriptor.fd, self.event) - def update(self, event): + def update(self, event=None): '''Update the callback and fix descriptor's watchers''' - self.event = event + if event is not None: + self.event = event self.descriptor.update() # @@ -238,20 +239,21 @@ class TimeoutCallback(Callback): self.cb(self.iden, self.opaque) self.impl.log.debug('timer %r callback ended', self.iden) - def update(self, timeout): + def update(self, timeout=None): '''Start or the timer, possibly updating timeout''' - self.timeout = timeout - - if self.timeout >= 0 and self._task is None: - self.impl.log.debug('timer %r start', self.iden) - self._task = ensure_future(self._timer(), - loop=self.impl.loop) + if timeout is not None: + self.timeout = timeout - elif self.timeout < 0 and self._task is not None: + if self._task is not None: self.impl.log.debug('timer %r stop', self.iden) self._task.cancel() # pylint: disable=no-member self._task = None + if self.timeout >= 0: + self.impl.log.debug('timer %r start', self.iden) + self._task = ensure_future(self._timer(), + loop=self.impl.loop) + def close(self): '''Stop the timer and call ff callback''' super(TimeoutCallback, self).close() @@ -274,6 +276,7 @@ class virEventAsyncIOImpl(object): self.callbacks = {} self.descriptors = DescriptorDict(self) self.log = logging.getLogger(self.__class__.__name__) + self.pending_tasks = set() def register(self): '''Register this instance as event loop implementation''' @@ -284,9 +287,30 @@ class virEventAsyncIOImpl(object): self._add_timeout, self._update_timeout, self._remove_timeout) return self + def takeover(self, other): + '''Take over other implementation, probably registered on another loop + + :param virEventAsyncIOImpl other: other implementation to be taken over + ''' + self.log.warning('%r taking over %r', self, other) + + while other.callbacks: + iden, callback = other.callbacks.popitem() + self.log.debug(' takeover %d %r', iden, callback) + assert callback.iden == iden + callback.impl = self + self.callbacks[iden] = callback + + if isinstance(callback, FDCallback): + fd = callback.descriptor.fd + assert callback is other.descriptors[fd].remove_handle(iden) + self.descriptors[fd].add_handle(callback) + def schedule_ff_callback(self, iden, opaque): '''Schedule a ff callback from one of the handles or timers''' - ensure_future(self._ff_callback(iden, opaque), loop=self.loop) + fut = ensure_future(self._ff_callback(iden, opaque), loop=self.loop) + self.pending_tasks.add(fut) + fut.add_done_callback(self.pending_tasks.remove) @asyncio.coroutine def _ff_callback(self, iden, opaque): @@ -297,13 +321,19 @@ class virEventAsyncIOImpl(object): self.log.debug('ff_callback(iden=%d, opaque=...)', iden) return libvirt.virEventInvokeFreeCallback(opaque) + @asyncio.coroutine + def drain(self): + self.log.debug('drain()') + if self.pending_tasks: + yield from asyncio.wait(self.pending_tasks, loop=self.loop) + def is_idle(self): '''Returns False if there are leftovers from a connection Those may happen if there are sematical problems while closing a connection. For example, not deregistered events before .close(). ''' - return not self.callbacks + return not self.callbacks and not self.pending_tasks def _add_handle(self, fd, event, cb, opaque): '''Register a callback for monitoring file handle events @@ -403,10 +433,18 @@ class virEventAsyncIOImpl(object): callback = self.callbacks.pop(timer) callback.close() + +_current_impl = None def virEventRegisterAsyncIOImpl(loop=None): '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop The implementation object is returned, but in normal usage it can safely be discarded. ''' - return virEventAsyncIOImpl(loop=loop).register() + global _current_impl + impl = virEventAsyncIOImpl(loop=loop) + impl.register() + if _current_impl is not None: + impl.takeover(_current_impl) + _current_impl = impl + return impl -- 2.9.4
Attachment:
signature.asc
Description: Digital signature
-- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list