On Fri, Oct 09, 2009 at 09:58:14AM +0100, Daniel P. Berrange wrote: > On Fri, Oct 09, 2009 at 10:19:08AM +0200, Dan Kenigsberg wrote: > > On Wed, Oct 07, 2009 at 12:59:39PM +0100, Daniel P. Berrange wrote: > > > On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote: > > > > Would someone help me have a shrink-wrapped solution for obtaining libvirt > > > > events in python? > > > > > > I decided to re-write the demo program so that is shows a serious > > > production kwalitee event loop implementation that can be used in > > > real world applications. I think you'll find this much nicer :-) > > > > It sure look nicer (though I still don't get the hows and whys). > > However, it seems that you have an issue with python's time.time() > > measured in seconds, not milliseconds. > > Here's a version with more docs & the time in milliseconds > > #!/usr/bin/python -u > # > # > # > ################################################################################# > # Start off by implementing a general purpose event loop for anyones use > ################################################################################# > > import sys > import getopt > import os > import libvirt > import select > import errno > import time > import threading > > # > # This general purpose event loop will support waiting for file handle > # I/O and errors events, as well as scheduling repeatable timers with > # a fixed interval. > # > # It is a pure python implementation based around the poll() API > # > class virEventLoopPure: > # This class contains the data we need to track for a > # single file handle > class virEventLoopPureHandle: > def __init__(self, handle, fd, events, cb, opaque): > self.handle = handle > self.fd = fd > self.events = events > self.cb = cb > self.opaque = opaque > > def get_id(self): > return self.handle > > def get_fd(self): > return self.fd > > def get_events(self): > return self.events > > def set_events(self, events): > self.events = events > > def dispatch(self, events): > self.cb(self.handle, > self.fd, > events, > self.opaque[0], > self.opaque[1]) > > # This class contains the data we need to track for a > # single periodic timer > class virEventLoopPureTimer: > def __init__(self, timer, interval, cb, opaque): > self.timer = timer > self.interval = interval > self.cb = cb > self.opaque = opaque > self.lastfired = 0 > > def get_id(self): > return self.timer > > def get_interval(self): > return self.interval > > def set_interval(self, interval): > self.interval = interval > > def get_last_fired(self): > return self.lastfired > > def set_last_fired(self, now): > self.lastfired = now > > def dispatch(self): > self.cb(self.timer, > self.opaque[0], > self.opaque[1]) > > > def __init__(self, debug=False): > self.debugOn = debug > self.poll = select.poll() > self.pipetrick = os.pipe() > self.nextHandleID = 1 > self.nextTimerID = 1 > self.handles = [] > self.timers = [] > self.quit = False > > # The event loop can be used from multiple threads at once. > # Specifically while the main thread is sleeping in poll() > # waiting for events to occur, another thread may come along > # and add/update/remove a file handle, or timer. When this > # happens we need to interrupt the poll() sleep in the other > # thread, so that it'll see the file handle / timer changes. > # > # Using OS level signals for this is very unreliable and > # hard to implement correctly. Thus we use the real classic > # "self pipe" trick. A anonymous pipe, with one end registered > # with the event loop for input events. When we need to force > # the main thread out of a poll() sleep, we simple write a > # single byte of data to the other end of the pipe. > self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1])) > self.poll.register(self.pipetrick[0], select.POLLIN) > > def debug(self, msg): > if self.debugOn: > print msg > > > # Calculate when the next timeout is due to occurr, returning > # the absolute timestamp for the next timeout, or 0 if there is > # no timeout due > def next_timeout(self): > next = 0 > for t in self.timers: > last = t.get_last_fired() > interval = t.get_interval() > if interval < 0: > continue > if next == 0 or (last + interval) < next: > next = last + interval > > return next > > # Lookup a virEventLoopPureHandle object based on file descriptor > def get_handle_by_fd(self, fd): > for h in self.handles: > if h.get_fd() == fd: > return h > return None > > # Lookup a virEventLoopPureHandle object based on its event loop ID > def get_handle_by_id(self, handleID): > for h in self.handles: > if h.get_id() == handleID: > return h > return None > > > # This is the heart of the event loop, performing one single > # iteration. It asks when the next timeout is due, and then > # calcuates the maximum amount of time it is able to sleep > # for in poll() pending file handle events. > # > # It then goes into the poll() sleep. > # > # When poll() returns, there will zero or more file handle > # events which need to be dispatched to registered callbacks > # It may also be time to fire some periodic timers. > # > # Due to the coarse granularity of schedular timeslices, if > # we ask for a sleep of 500ms in order to satisfy a timer, we > # may return upto 1 schedular timeslice early. So even though > # our sleep timeout was reached, the registered timer may not > # technically be at its expiry point. This leads to us going > # back around the loop with a crazy 5ms sleep. So when checking > # if timeouts are due, we allow a margin of 20ms, to avoid > # these pointless repeated tiny sleeps. > def run_once(self): > sleep = -1 > next = self.next_timeout() > self.debug("Next timeout due at %d" % next) > if next > 0: > now = int(time.time() * 1000) > if now >= next: > sleep = 0 > else: > sleep = next - now > > self.debug("Poll with a sleep of %d" % sleep) > events = self.poll.poll(sleep) Thanks! but there's still a units problem here (should be sleep/1000. ) | poll(...) | poll([timeout=-1[, maxevents=-1]]) -> [(fd, events), (...)] | | Wait for events on the epoll file descriptor for a maximum time of timeout | in seconds (as float). -1 makes poll wait indefinitely. | Up to maxevents are returned to the caller. > > # Dispatch any file handle events that occurred > for (fd, revents) in events: > # See if the events was from the self-pipe > # telling us to wakup. if so, then discard > # the data just continue > if fd == self.pipetrick[0]: > data = os.read(fd, 1) > continue > > h = self.get_handle_by_fd(fd) > if h: > self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents)) > h.dispatch(self.events_from_poll(revents)) > > now = int(time.time() * 1000) > for t in self.timers: > interval = t.get_interval() > if interval < 0: > continue > > want = t.get_last_fired() + interval > # Deduct 20ms, since schedular timeslice > # means we could be ever so slightly early > if now >= (want-20): > self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want))) > t.set_last_fired(now) > t.dispatch() > > > # Actually the event loop forever > def run_loop(self): > self.quit = False > while not self.quit: > self.run_once() > > def interrupt(self): > os.write(self.pipetrick[1], 'c') > > > # Registers a new file handle 'fd', monitoring for 'events' (libvirt > # event constants), firing the callback cb() when an event occurs. > # Returns a unique integer identier for this handle, that should be > # used to later update/remove it > def add_handle(self, fd, events, cb, opaque): > handleID = self.nextHandleID + 1 > self.nextHandleID = self.nextHandleID + 1 > > h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque) > self.handles.append(h) > > self.poll.register(fd, self.events_to_poll(events)) > self.interrupt() > > self.debug("Add handle %d fd %d events %d" % (handleID, fd, events)) > > return handleID > > # Registers a new timer with periodic expiry at 'interval' ms, > # firing cb() each time the timer expires. If 'interval' is -1, > # then the timer is registered, but not enabled > # Returns a unique integer identier for this handle, that should be > # used to later update/remove it > def add_timer(self, interval, cb, opaque): > timerID = self.nextTimerID + 1 > self.nextTimerID = self.nextTimerID + 1 > > h = self.virEventLoopPureTimer(timerID, interval, cb, opaque) > self.timers.append(h) > self.interrupt() > > self.debug("Add timer %d interval %d" % (timerID, interval)) > > return timerID > > # Change the set of events to be monitored on the file handle > def update_handle(self, handleID, events): > h = self.get_handle_by_id(handleID) > if h: > h.set_events(events) > self.poll.unregister(h.get_fd()) > self.poll.register(h.get_fd(), self.events_to_poll(events)) > self.interrupt() > > self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events)) > > # Change the periodic frequency of the timer > def update_timer(self, timerID, interval): > for h in self.timers: > if h.get_id() == timerID: > h.set_interval(interval); > self.interrupt() > > self.debug("Update timer %d interval %d" % (timerID, interval)) > break > > # Stop monitoring for events on the file handle > def remove_handle(self, handleID): > handles = [] > for h in self.handles: > if h.get_id() == handleID: > self.poll.unregister(h.get_fd()) > self.debug("Remove handle %d fd %d" % (handleID, h.get_fd())) > else: > handles.append(h) > self.handles = handles > self.interrupt() > > # Stop firing the periodic timer > def remove_timer(self, timerID): > timers = [] > for h in self.timers: > if h.get_id() != timerID: > timers.append(h) > self.debug("Remove timer %d" % timerID) > self.timers = timers > self.interrupt() > > # Convert from libvirt event constants, to poll() events constants > def events_to_poll(self, events): > ret = 0 > if events & libvirt.VIR_EVENT_HANDLE_READABLE: > ret |= select.POLLIN > if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: > ret |= select.POLLOUT > if events & libvirt.VIR_EVENT_HANDLE_ERROR: > ret |= select.POLLERR; > if events & libvirt.VIR_EVENT_HANDLE_HANGUP: > ret |= select.POLLHUP; > return ret > > # Convert from poll() event constants, to libvirt events constants > def events_from_poll(self, events): > ret = 0; > if events & select.POLLIN: > ret |= libvirt.VIR_EVENT_HANDLE_READABLE; > if events & select.POLLOUT: > ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE; > if events & select.POLLNVAL: > ret |= libvirt.VIR_EVENT_HANDLE_ERROR; > if events & select.POLLERR: > ret |= libvirt.VIR_EVENT_HANDLE_ERROR; > if events & select.POLLHUP: > ret |= libvirt.VIR_EVENT_HANDLE_HANGUP; > return ret; > > > ########################################################################### > # Now glue an instance of the general event loop into libvirt's event loop > ########################################################################### > > # This single global instance of the event loop wil be used for > # monitoring libvirt events > eventLoop = virEventLoopPure(debug=False) > > # This keeps track of what thread is running the event loop, > # (if it is run in a background thread) > eventLoopThread = None > > > # These next set of 6 methods are the glue between the official > # libvirt events API, and our particular impl of the event loop > # > # There is no reason why the 'virEventLoopPure' has to be used. > # An application could easily may these 6 glue methods hook into > # another event loop such as GLib's, or something like the python > # Twisted event framework. > > def virEventAddHandleImpl(fd, events, cb, opaque): > global eventLoop > return eventLoop.add_handle(fd, events, cb, opaque) > > def virEventUpdateHandleImpl(handleID, events): > global eventLoop > return eventLoop.update_handle(handleID, events) > > def virEventRemoveHandleImpl(handleID): > global eventLoop > return eventLoop.remove_handle(handleID) > > def virEventAddTimerImpl(interval, cb, opaque): > global eventLoop > return eventLoop.add_timer(interval, cb, opaque) > > def virEventUpdateTimerImpl(timerID, interval): > global eventLoop > return eventLoop.update_timer(timerID, interval) > > def virEventRemoveTimerImpl(timerID): > global eventLoop > return eventLoop.remove_timer(timerID) > > # This tells libvirt what event loop implementation it > # should use > def virEventLoopPureRegister(): > libvirt.virEventRegisterImpl(virEventAddHandleImpl, > virEventUpdateHandleImpl, > virEventRemoveHandleImpl, > virEventAddTimerImpl, > virEventUpdateTimerImpl, > virEventRemoveTimerImpl) > > # Directly run the event loop in the current thread > def virEventLoopPureRun(): > global eventLoop > eventLoop.run_loop() > > # Spawn a background thread to run the event loop > def virEventLoopPureStart(): > global eventLoopThread > virEventLoopPureRegister() > eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop") > eventLoopThread.setDaemon(True) > eventLoopThread.start() > > > ########################################################################## > # Everything that now follows is a simple demo of domain lifecycle events > ########################################################################## > def eventToString(event): > eventStrings = ( "Added", > "Removed", > "Started", > "Suspended", > "Resumed", > "Stopped", > "Saved", > "Restored" ); > return eventStrings[event]; > > def myDomainEventCallback1 (conn, dom, event, detail, opaque): > print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail) > > def myDomainEventCallback2 (conn, dom, event, detail, opaque): > print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail) > > def usage(): > print "usage: "+os.path.basename(sys.argv[0])+" [uri]" > print " uri will default to qemu:///system" > > def main(): > try: > opts, args = getopt.getopt(sys.argv[1:], "h", ["help"] ) > except getopt.GetoptError, err: > # print help information and exit: > print str(err) # will print something like "option -a not recognized" > usage() > sys.exit(2) > for o, a in opts: > if o in ("-h", "--help"): > usage() > sys.exit() > > if len(sys.argv) > 1: > uri = sys.argv[1] > else: > uri = "qemu:///system" > > print "Using uri:" + uri > > # Run a background thread with the event loop > virEventLoopPureStart() > > vc = libvirt.open(uri) > > # Close connection on exit (to test cleanup paths) > old_exitfunc = getattr(sys, 'exitfunc', None) > def exit(): > print "Closing " + str(vc) > vc.close() > if (old_exitfunc): old_exitfunc() > sys.exitfunc = exit > > #Add 2 callbacks to prove this works with more than just one > vc.domainEventRegister(myDomainEventCallback1,None) > vc.domainEventRegister(myDomainEventCallback2,None) > > # The rest of your app would go here normally, but for sake > # of demo we'll just go to sleep. The other option is to > # run the event loop in your main thread if your app is > # totally event based. > while 1: > time.sleep(1) > > > if __name__ == "__main__": > main() > > -- > |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| > |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| > |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| > |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :| -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list