On Thu, Aug 31, 2017 at 09:40:23PM +0200, Wojtek Porczyk wrote: > The virEvent implementation is tied to a particular loop. When spinning > another loop, the callbacks have to be moved to another implementation, > so they will have a chance to be invoked, should they be scheduled. If > not, file descriptors will be leaking. > > Signed-off-by: Wojtek Porczyk <woju@xxxxxxxxxxxxxxxxxxxxxx> > --- > libvirtaio.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++------------ > 1 file changed, 51 insertions(+), 13 deletions(-) > > diff --git a/libvirtaio.py b/libvirtaio.py > index fc868bd..d161cd1 100644 > --- a/libvirtaio.py > +++ b/libvirtaio.py > @@ -195,9 +195,10 @@ class FDCallback(Callback): > return '<{} iden={} fd={} event={}>'.format( > self.__class__.__name__, self.iden, self.descriptor.fd, self.event) > > - def update(self, event): > + def update(self, event=None): > '''Update the callback and fix descriptor's watchers''' > - self.event = event > + if event is not None: > + self.event = event > self.descriptor.update() > > # > @@ -238,20 +239,21 @@ class TimeoutCallback(Callback): > self.cb(self.iden, self.opaque) > self.impl.log.debug('timer %r callback ended', self.iden) > > - def update(self, timeout): > + def update(self, timeout=None): > '''Start or the timer, possibly updating timeout''' > - self.timeout = timeout > - > - if self.timeout >= 0 and self._task is None: > - self.impl.log.debug('timer %r start', self.iden) > - self._task = ensure_future(self._timer(), > - loop=self.impl.loop) > + if timeout is not None: > + self.timeout = timeout > > - elif self.timeout < 0 and self._task is not None: > + if self._task is not None: > self.impl.log.debug('timer %r stop', self.iden) > self._task.cancel() # pylint: disable=no-member > self._task = None > > + if self.timeout >= 0: > + self.impl.log.debug('timer %r start', self.iden) > + self._task = ensure_future(self._timer(), > + loop=self.impl.loop) > + > def close(self): > '''Stop the timer and call ff callback''' > super(TimeoutCallback, self).close() > @@ -274,6 +276,7 @@ class virEventAsyncIOImpl(object): > self.callbacks = {} > self.descriptors = DescriptorDict(self) > self.log = logging.getLogger(self.__class__.__name__) > + self.pending_tasks = set() > > def register(self): > '''Register this instance as event loop implementation''' > @@ -284,9 +287,30 @@ class virEventAsyncIOImpl(object): > self._add_timeout, self._update_timeout, self._remove_timeout) > return self > > + def takeover(self, other): > + '''Take over other implementation, probably registered on another loop > + > + :param virEventAsyncIOImpl other: other implementation to be taken over > + ''' > + self.log.warning('%r taking over %r', self, other) > + > + while other.callbacks: > + iden, callback = other.callbacks.popitem() > + self.log.debug(' takeover %d %r', iden, callback) > + assert callback.iden == iden > + callback.impl = self > + self.callbacks[iden] = callback > + > + if isinstance(callback, FDCallback): > + fd = callback.descriptor.fd > + assert callback is other.descriptors[fd].remove_handle(iden) > + self.descriptors[fd].add_handle(callback) > + > def schedule_ff_callback(self, iden, opaque): > '''Schedule a ff callback from one of the handles or timers''' > - ensure_future(self._ff_callback(iden, opaque), loop=self.loop) > + fut = ensure_future(self._ff_callback(iden, opaque), loop=self.loop) > + self.pending_tasks.add(fut) > + fut.add_done_callback(self.pending_tasks.remove) > > @asyncio.coroutine > def _ff_callback(self, iden, opaque): > @@ -297,13 +321,19 @@ class virEventAsyncIOImpl(object): > self.log.debug('ff_callback(iden=%d, opaque=...)', iden) > return libvirt.virEventInvokeFreeCallback(opaque) > > + @asyncio.coroutine > + def drain(self): > + self.log.debug('drain()') > + if self.pending_tasks: > + yield from asyncio.wait(self.pending_tasks, loop=self.loop) > + > def is_idle(self): > '''Returns False if there are leftovers from a connection > > Those may happen if there are sematical problems while closing > a connection. For example, not deregistered events before .close(). > ''' > - return not self.callbacks > + return not self.callbacks and not self.pending_tasks > > def _add_handle(self, fd, event, cb, opaque): > '''Register a callback for monitoring file handle events > @@ -403,10 +433,18 @@ class virEventAsyncIOImpl(object): > callback = self.callbacks.pop(timer) > callback.close() > > + > +_current_impl = None > def virEventRegisterAsyncIOImpl(loop=None): > '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop > > The implementation object is returned, but in normal usage it can safely be > discarded. > ''' > - return virEventAsyncIOImpl(loop=loop).register() > + global _current_impl > + impl = virEventAsyncIOImpl(loop=loop) > + impl.register() > + if _current_impl is not None: > + impl.takeover(_current_impl) > + _current_impl = impl > + return impl IIUC, you are trying to make it possible to register multiple event loop impls. This is *not* supported usage of libvirt. You must call 'virEventRegisterImpl' before opening any connection, and once called you are forbidden to call it again. Regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :| -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list