Use a worker pool for processing the udev events and the initialization instead of a separate initThread and a mdevctl-thread. This has the large advantage that we can leverage the job API and now this thread pool is responsible to do all the "costly-work" and the libvirt nodedev event creation. TODOs: + IMO, it's better practice for all functions called by the virThreadPool's worker thread to pass the driver via parameter and not global variables. Easier to test and cleaner. + how many worker threads should we have at maximum? + there are still TODO's in the code + improve error reporting + improve naming - e.g. rename more udevXXX functions? Signed-off-by: Marc Hartmayer <mhartmay@xxxxxxxxxxxxx> --- src/node_device/node_device_driver.h | 2 +- src/node_device/node_device_driver.c | 6 +- src/node_device/node_device_udev.c | 295 +++++++++++++++++++-------- 3 files changed, 209 insertions(+), 94 deletions(-) diff --git a/src/node_device/node_device_driver.h b/src/node_device/node_device_driver.h index f195cfef9d49..2781ad136d68 100644 --- a/src/node_device/node_device_driver.h +++ b/src/node_device/node_device_driver.h @@ -147,7 +147,7 @@ nodeDeviceParseMdevctlJSON(const char *jsonstring, bool defined); int -nodeDeviceUpdateMediatedDevices(void); +nodeDeviceUpdateMediatedDevices(virNodeDeviceDriverState *driver); void nodeDeviceGenerateName(virNodeDeviceDef *def, diff --git a/src/node_device/node_device_driver.c b/src/node_device/node_device_driver.c index f623339dc973..59c5f9b417a4 100644 --- a/src/node_device/node_device_driver.c +++ b/src/node_device/node_device_driver.c @@ -1887,7 +1887,7 @@ removeMissingPersistentMdev(virNodeDeviceObj *obj, int -nodeDeviceUpdateMediatedDevices(void) +nodeDeviceUpdateMediatedDevices(virNodeDeviceDriverState *node_driver) { g_autofree virNodeDeviceDef **defs = NULL; g_autofree virNodeDeviceDef **act_defs = NULL; @@ -1911,7 +1911,7 @@ nodeDeviceUpdateMediatedDevices(void) /* Any mdevs that were previously defined but were not returned in the * latest mdevctl query should be removed from the device list */ data.defs = defs; - virNodeDeviceObjListForEachRemove(driver->devs, + virNodeDeviceObjListForEachRemove(node_driver->devs, removeMissingPersistentMdev, &data); for (i = 0; i < data.ndefs; i++) @@ -2374,7 +2374,7 @@ nodeDeviceUpdate(virNodeDevice *device, cleanup: virNodeDeviceObjEndAPI(&obj); if (updated) - nodeDeviceUpdateMediatedDevices(); + nodeDeviceUpdateMediatedDevices(driver); return ret; } diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c index cec7d837c43e..2a252d8fe62b 100644 --- a/src/node_device/node_device_udev.c +++ b/src/node_device/node_device_udev.c @@ -43,6 +43,7 @@ #include "virnetdev.h" #include "virmdev.h" #include "virutil.h" +#include "virthreadpool.h" #include "configmake.h" @@ -69,14 +70,14 @@ struct _udevEventData { bool udevThreadQuit; bool udevDataReady; - /* init thread */ - virThread *initThread; - /* Protects @mdevctlMonitors and must be taken when `mdevctl` command is * called to make sure only one thread can query mdevctl at a time. */ virMutex mdevctlLock; GList *mdevctlMonitors; int mdevctlTimeout; + + /* Immutable pointer, self-locking APIs */ + virThreadPool *workerPool; }; static virClass *udevEventDataClass; @@ -146,6 +147,79 @@ udevEventDataNew(void) return ret; } +typedef enum { + NODE_DEVICE_EVENT_INIT = 0, + NODE_DEVICE_EVENT_ADD, + NODE_DEVICE_EVENT_REMOVE, + NODE_DEVICE_EVENT_CHANGE, + NODE_DEVICE_EVENT_MOVE, + NODE_DEVICE_EVENT_UPDATE, + + NODE_DEVICE_EVENT_LAST +} nodeDeviceEventType; + +struct _nodeDeviceEvent { + nodeDeviceEventType eventType; + void *data; +}; +typedef struct _nodeDeviceEvent nodeDeviceEvent; + +static void +nodeDeviceEventFree(nodeDeviceEvent *event) +{ + if (!event) + return; + + switch (event->eventType) { + case NODE_DEVICE_EVENT_INIT: + udev_unref(event->data); + break; + case NODE_DEVICE_EVENT_ADD: + case NODE_DEVICE_EVENT_CHANGE: + case NODE_DEVICE_EVENT_MOVE: + case NODE_DEVICE_EVENT_REMOVE: + udev_device_unref(event->data); + break; + case NODE_DEVICE_EVENT_UPDATE: + break; + case NODE_DEVICE_EVENT_LAST: + // TODO Bug! + g_abort(); + break; + } + g_free(event); +} +G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree); + + /** + * nodeDeviceEventSubmit: + * @eventType: the event to be processed + * @data: additional data for the event processor (the pointer is stolen and it + * will be properly freed) + * + * Submits @eventType to be processed by the asynchronous event handling + * thread. + */ +static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data) +{ + nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1); + udevEventData *priv = NULL; + + /* BUG */ + if (!driver) + g_abort(); + + priv = driver->privateData; + + event->eventType = eventType; + event->data = data; + if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) { + nodeDeviceEventFree(event); + return -1; + } + return 0; +} + static bool udevHasDeviceProperty(struct udev_device *dev, @@ -364,7 +438,7 @@ udevTranslatePCIIds(unsigned int vendor, static int -udevProcessPCI(struct udev_device *device, +udevProcessPCI(virNodeDeviceDriverState *driver_state, struct udev_device *device, virNodeDeviceDef *def) { virNodeDevCapPCIDev *pci_dev = &def->caps->data.pci_dev; @@ -375,8 +449,8 @@ udevProcessPCI(struct udev_device *device, char *p; bool privileged = false; - VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) { - privileged = driver->privileged; + VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) { + privileged = driver_state->privileged; } pci_dev->klass = -1; @@ -1394,12 +1468,12 @@ udevGetDeviceType(struct udev_device *device, static int -udevGetDeviceDetails(struct udev_device *device, +udevGetDeviceDetails(virNodeDeviceDriverState *driver_state, struct udev_device *device, virNodeDeviceDef *def) { switch (def->caps->data.type) { case VIR_NODE_DEV_CAP_PCI_DEV: - return udevProcessPCI(device, def); + return udevProcessPCI(driver_state, device, def); case VIR_NODE_DEV_CAP_USB_DEV: return udevProcessUSBDevice(device, def); case VIR_NODE_DEV_CAP_USB_INTERFACE: @@ -1450,13 +1524,13 @@ static void scheduleMdevctlUpdate(udevEventData *data, bool force); static int -udevRemoveOneDeviceSysPath(const char *path) +processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state, const char *path) { virNodeDeviceObj *obj = NULL; virNodeDeviceDef *def; virObjectEvent *event = NULL; - if (!(obj = virNodeDeviceObjListFindBySysfsPath(driver->devs, path))) { + if (!(obj = virNodeDeviceObjListFindBySysfsPath(driver_state->devs, path))) { VIR_DEBUG("Failed to find device to remove that has udev path '%s'", path); return -1; @@ -1477,21 +1551,21 @@ udevRemoveOneDeviceSysPath(const char *path) } else { VIR_DEBUG("Removing device '%s' with sysfs path '%s'", def->name, path); - virNodeDeviceObjListRemove(driver->devs, obj); + virNodeDeviceObjListRemove(driver_state->devs, obj); } virNodeDeviceObjEndAPI(&obj); /* cannot check for mdev_types since they have already been removed */ VIR_WITH_OBJECT_LOCK_GUARD(driver->privateData) { - scheduleMdevctlUpdate(driver->privateData, false); + scheduleMdevctlUpdate(driver_state->privateData, false); } - virObjectEventStateQueue(driver->nodeDeviceEventState, event); + virObjectEventStateQueue(driver_state->nodeDeviceEventState, event); return 0; } static int -udevSetParent(struct udev_device *device, +udevSetParent(virNodeDeviceDriverState *driver_state, struct udev_device *device, virNodeDeviceDef *def) { struct udev_device *parent_device = NULL; @@ -1514,7 +1588,7 @@ udevSetParent(struct udev_device *device, return -1; } - if ((obj = virNodeDeviceObjListFindBySysfsPath(driver->devs, + if ((obj = virNodeDeviceObjListFindBySysfsPath(driver_state->devs, parent_sysfs_path))) { objdef = virNodeDeviceObjGetDef(obj); def->parent = g_strdup(objdef->name); @@ -1532,7 +1606,7 @@ udevSetParent(struct udev_device *device, } static int -udevAddOneDevice(struct udev_device *device) +processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state, struct udev_device *device) { g_autofree char *sysfs_path = NULL; virNodeDeviceDef *def = NULL; @@ -1563,15 +1637,15 @@ udevAddOneDevice(struct udev_device *device) if (udevGetDeviceNodes(device, def) != 0) goto cleanup; - if (udevGetDeviceDetails(device, def) != 0) + if (udevGetDeviceDetails(driver_state, device, def) != 0) goto cleanup; - if (udevSetParent(device, def) != 0) + if (udevSetParent(driver_state, device, def) != 0) goto cleanup; is_mdev = def->caps->data.type == VIR_NODE_DEV_CAP_MDEV; - if ((obj = virNodeDeviceObjListFindByName(driver->devs, def->name))) { + if ((obj = virNodeDeviceObjListFindByName(driver_state->devs, def->name))) { objdef = virNodeDeviceObjGetDef(obj); if (is_mdev) @@ -1589,7 +1663,7 @@ udevAddOneDevice(struct udev_device *device) /* If this is a device change, the old definition will be freed * and the current definition will take its place. */ - if (!(obj = virNodeDeviceObjListAssignDef(driver->devs, def))) + if (!(obj = virNodeDeviceObjListAssignDef(driver_state->devs, def))) goto cleanup; /* @def is now owned by @obj */ def = NULL; @@ -1609,14 +1683,14 @@ udevAddOneDevice(struct udev_device *device) virNodeDeviceObjEndAPI(&obj); if (has_mdev_types) { - VIR_WITH_OBJECT_LOCK_GUARD(driver->privateData) { - scheduleMdevctlUpdate(driver->privateData, false); + VIR_WITH_OBJECT_LOCK_GUARD(driver_state->privateData) { + scheduleMdevctlUpdate(driver_state->privateData, false); } } /* The added mdev needs an immediate active config update before * the event is issued to allow sane API usage. */ - if (is_mdev && (nodeDeviceUpdateMediatedDevices() < 0)) { + if (is_mdev && (nodeDeviceUpdateMediatedDevices(driver_state) < 0)) { VIR_WARN("Update of mediated device %s failed", NULLSTR_EMPTY(sysfs_path)); } @@ -1624,7 +1698,7 @@ udevAddOneDevice(struct udev_device *device) ret = 0; cleanup: - virObjectEventStateQueue(driver->nodeDeviceEventState, event); + virObjectEventStateQueue(driver_state->nodeDeviceEventState, event); if (ret != 0) { VIR_DEBUG("Discarding device %d %p %s", ret, def, @@ -1637,7 +1711,7 @@ udevAddOneDevice(struct udev_device *device) static int -udevProcessDeviceListEntry(struct udev *udev, +udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state, struct udev *udev, struct udev_list_entry *list_entry) { struct udev_device *device; @@ -1649,7 +1723,7 @@ udevProcessDeviceListEntry(struct udev *udev, device = udev_device_new_from_syspath(udev, name); if (device != NULL) { - if (udevAddOneDevice(device) != 0) { + if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) { VIR_DEBUG("Failed to create node device for udev device '%s'", name); } @@ -1687,7 +1761,7 @@ udevEnumerateAddMatches(struct udev_enumerate *udev_enumerate) static int -udevEnumerateDevices(struct udev *udev) +udevEnumerateDevices(virNodeDeviceDriverState *driver_state, struct udev *udev) { struct udev_enumerate *udev_enumerate = NULL; struct udev_list_entry *list_entry = NULL; @@ -1703,7 +1777,7 @@ udevEnumerateDevices(struct udev *udev) udev_list_entry_foreach(list_entry, udev_enumerate_get_list_entry(udev_enumerate)) { - udevProcessDeviceListEntry(udev, list_entry); + udevProcessDeviceListEntry(driver_state, udev, list_entry); } ret = 0; @@ -1736,8 +1810,8 @@ nodeStateCleanup(void) priv = driver->privateData; if (priv) { - g_clear_pointer(&priv->initThread, g_free); g_clear_pointer(&priv->udevThread, g_free); + virThreadPoolFree(priv->workerPool); } virObjectUnref(priv); @@ -1765,26 +1839,19 @@ udevHandleOneDevice(struct udev_device *device) VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device)); - if (STREQ(action, "add") || STREQ(action, "change")) - return udevAddOneDevice(device); - - if (STREQ(action, "remove")) { - const char *path = udev_device_get_syspath(device); - - return udevRemoveOneDeviceSysPath(path); - } - - if (STREQ(action, "move")) { - const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD"); - - if (devpath_old) { - g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old); - - udevRemoveOneDeviceSysPath(devpath_old_fixed); - } - - return udevAddOneDevice(device); + /* Reference is either released via workerpool logic or at the end of this + * function. */ + device = udev_device_ref(device); + if (STREQ(action, "add")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_ADD, device); + } else if (STREQ(action, "change")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_CHANGE, device); + } else if (STREQ(action, "remove")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_REMOVE, device); + } else if (STREQ(action, "move")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MOVE, device); } + udev_device_unref(device); return 0; } @@ -2003,23 +2070,22 @@ udevSetupSystemDev(void) static void -nodeStateInitializeEnumerate(void *opaque) +processNodeStateInitializeEnumerate(virNodeDeviceDriverState *event_driver, struct udev *udev) { - struct udev *udev = opaque; - udevEventData *priv = driver->privateData; + udevEventData *priv = event_driver->privateData; /* Populate with known devices */ - if (udevEnumerateDevices(udev) != 0) + if (udevEnumerateDevices(event_driver, udev) != 0) goto error; /* Load persistent mdevs (which might not be activated yet) and additional * information about active mediated devices from mdevctl */ - if (nodeDeviceUpdateMediatedDevices() != 0) + if (nodeDeviceUpdateMediatedDevices(event_driver) != 0) goto error; cleanup: - VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) { - driver->initialized = true; - virCondBroadcast(&driver->initCond); + VIR_WITH_MUTEX_LOCK_GUARD(&event_driver->lock) { + event_driver->initialized = true; + virCondBroadcast(&event_driver->initCond); } return; @@ -2059,35 +2125,17 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED) return 0; } - static void -mdevctlUpdateThreadFunc(void *opaque G_GNUC_UNUSED) -{ - udevEventData *priv = driver->privateData; - /* ensure only a single thread can query mdevctl at a time */ - VIR_LOCK_GUARD lock = virLockGuardLock(&priv->mdevctlLock); - - if (nodeDeviceUpdateMediatedDevices() < 0) - VIR_WARN("mdevctl failed to update mediated devices"); -} - - -static void -launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque) +submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque) { udevEventData *priv = opaque; - virThread thread; if (priv->mdevctlTimeout != -1) { virEventRemoveTimeout(priv->mdevctlTimeout); priv->mdevctlTimeout = -1; } - if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc, - "mdevctl-thread", false, NULL) < 0) { - virReportSystemError(errno, "%s", - _("failed to create mdevctl thread")); - } + nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UPDATE, NULL); } @@ -2182,7 +2230,7 @@ mdevctlEnableMonitor(udevEventData *priv) /* Schedules an mdevctl update for 100ms in the future, canceling any existing * timeout that may have been set. In this way, multiple update requests in * quick succession can be collapsed into a single update. if @force is true, - * an update thread will be spawned immediately. */ + * the worker job is submitted immediately. */ static void scheduleMdevctlUpdate(udevEventData *data, bool force) @@ -2190,12 +2238,12 @@ scheduleMdevctlUpdate(udevEventData *data, if (!force) { if (data->mdevctlTimeout != -1) virEventRemoveTimeout(data->mdevctlTimeout); - data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread, + data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate, data, NULL); return; } - launchMdevctlUpdateThread(-1, data); + submitMdevctlUpdate(-1, data); } @@ -2235,6 +2283,67 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED, } +static void nodeDeviceEventHandler(void *data, void *opaque) +{ + virNodeDeviceDriverState *driver_state = opaque; + g_autoptr(nodeDeviceEvent) processEvent = data; + + switch (processEvent->eventType) { + case NODE_DEVICE_EVENT_INIT: + { + struct udev *udev = processEvent->data; + + processNodeStateInitializeEnumerate(driver_state, udev); + } + break; + case NODE_DEVICE_EVENT_ADD: + case NODE_DEVICE_EVENT_CHANGE: + { + struct udev_device *device = processEvent->data; + + processNodeDeviceAddAndChangeEvent(driver_state, device); + } + break; + case NODE_DEVICE_EVENT_REMOVE: + { + struct udev_device *device = processEvent->data; + const char *path = udev_device_get_syspath(device); + + processNodeDeviceRemoveEvent(driver_state, path); + } + break; + case NODE_DEVICE_EVENT_MOVE: + { + struct udev_device *device = processEvent->data; + const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD"); + + if (devpath_old) { + g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old); + + processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed); + } + + processNodeDeviceAddAndChangeEvent(driver_state, device); + } + break; + case NODE_DEVICE_EVENT_UPDATE: + { + udevEventData *priv = driver_state->privateData; + /* ensure only a single thread can query mdevctl at a time */ + VIR_LOCK_GUARD lock = virLockGuardLock(&priv->mdevctlLock); + + if (nodeDeviceUpdateMediatedDevices(driver_state) < 0) + VIR_WARN("mdevctl failed to update mediated devices"); + } + break; + case NODE_DEVICE_EVENT_LAST: + // TODO Bug! + g_abort(); + break; + } +} + + static int nodeStateInitialize(bool privileged, const char *root, @@ -2301,6 +2410,16 @@ nodeStateInitialize(bool privileged, driver->parserCallbacks.postParse = nodeDeviceDefPostParse; driver->parserCallbacks.validate = nodeDeviceDefValidate; + /* must be initialized before trying to reconnect to all the running mdevs + * since there might occur some mdevctl monitor events that will be + * dispatched to the worker pool */ + priv->workerPool = virThreadPoolNewFull(1, 10, 0, nodeDeviceEventHandler, + "nodev-device-event", + NULL, + driver); + if (!priv->workerPool) + goto unlock; + if (udevPCITranslateInit(privileged) < 0) goto unlock; @@ -2359,14 +2478,7 @@ nodeStateInitialize(bool privileged, if (udevSetupSystemDev() != 0) goto cleanup; - priv->initThread = g_new0(virThread, 1); - if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate, - "nodedev-init", false, udev) < 0) { - virReportSystemError(errno, "%s", - _("failed to create udev enumerate thread")); - g_clear_pointer(&priv->initThread, g_free); - goto cleanup; - } + nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev)); return VIR_DRV_STATE_INIT_COMPLETE; @@ -2446,6 +2558,9 @@ nodeStateShutdownPrepare(void) priv->udevThreadQuit = true; virCondSignal(&priv->udevThreadCond); } + + if (priv->workerPool) + virThreadPoolStop(priv->workerPool); return 0; } @@ -2461,11 +2576,11 @@ nodeStateShutdownWait(void) if (!priv) return 0; - if (priv->initThread) - virThreadJoin(priv->initThread); - if (priv->udevThread) virThreadJoin(priv->udevThread); + + if (priv->workerPool) + virThreadPoolDrain(priv->workerPool); return 0; } -- 2.34.1 _______________________________________________ Devel mailing list -- devel@xxxxxxxxxxxxxxxxx To unsubscribe send an email to devel-leave@xxxxxxxxxxxxxxxxx