Use a worker pool for processing the events (e.g. udev, mdevctl config changes) and the initialization instead of a separate initThread and a mdevctl-thread. This has the large advantage that we can leverage the job API and now this thread pool is responsible to do all the "costly-work" and emitting the libvirt nodedev events. Reviewed-by: Jonathon Jongsma <jjongsma@xxxxxxxxxx> Reviewed-by: Boris Fiuczynski <fiuczy@xxxxxxxxxxxxx> Signed-off-by: Marc Hartmayer <mhartmay@xxxxxxxxxxxxx> --- src/node_device/node_device_driver.c | 9 +- src/node_device/node_device_udev.c | 241 +++++++++++++++++++-------- src/test/test_driver.c | 8 +- 3 files changed, 185 insertions(+), 73 deletions(-) diff --git a/src/node_device/node_device_driver.c b/src/node_device/node_device_driver.c index 59c5f9b417a4..a51537d87ceb 100644 --- a/src/node_device/node_device_driver.c +++ b/src/node_device/node_device_driver.c @@ -1421,10 +1421,11 @@ nodeDeviceDestroy(virNodeDevicePtr device) goto cleanup; /* Because we're about to release the lock and thus run into a race - * possibility (however improbable) with a udevAddOneDevice change - * event which would essentially free the existing @def (obj->def) and - * replace it with something new, we need to grab the parent field - * and then find the parent obj in order to manage the vport */ + * possibility (however improbable) with a + * processNodeDeviceAddAndChangeEvent change event which would + * essentially free the existing @def (obj->def) and replace it with + * something new, we need to grab the parent field and then find the + * parent obj in order to manage the vport */ parent = g_strdup(def->parent); virNodeDeviceObjEndAPI(&obj); diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c index 4f8dae3f85c8..1f7123a5fafa 100644 --- a/src/node_device/node_device_udev.c +++ b/src/node_device/node_device_udev.c @@ -43,6 +43,7 @@ #include "virnetdev.h" #include "virmdev.h" #include "virutil.h" +#include "virthreadpool.h" #include "configmake.h" @@ -69,13 +70,13 @@ struct _udevEventData { bool udevThreadQuit; bool udevDataReady; - /* init thread */ - virThread *initThread; - /* Protects @mdevctlMonitors */ virMutex mdevctlLock; GList *mdevctlMonitors; int mdevctlTimeout; + + /* Immutable pointer, self-locking APIs */ + virThreadPool *workerPool; }; static virClass *udevEventDataClass; @@ -86,8 +87,6 @@ udevEventDataDispose(void *obj) struct udev *udev = NULL; udevEventData *priv = obj; - g_clear_pointer(&priv->initThread, g_free); - VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) { g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors), g_object_unref); } @@ -100,6 +99,8 @@ udevEventDataDispose(void *obj) udev_unref(udev); } + g_clear_pointer(&priv->workerPool, virThreadPoolFree); + virMutexDestroy(&priv->mdevctlLock); virCondDestroy(&priv->udevThreadCond); @@ -143,6 +144,66 @@ udevEventDataNew(void) return ret; } +typedef enum { + NODE_DEVICE_EVENT_INIT = 0, + NODE_DEVICE_EVENT_UDEV_ADD, + NODE_DEVICE_EVENT_UDEV_REMOVE, + NODE_DEVICE_EVENT_UDEV_CHANGE, + NODE_DEVICE_EVENT_UDEV_MOVE, + NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED, + + NODE_DEVICE_EVENT_LAST +} nodeDeviceEventType; + +struct _nodeDeviceEvent { + nodeDeviceEventType eventType; + void *data; + virFreeCallback dataFreeFunc; +}; +typedef struct _nodeDeviceEvent nodeDeviceEvent; + +static void +nodeDeviceEventFree(nodeDeviceEvent *event) +{ + if (!event) + return; + + if (event->dataFreeFunc) + event->dataFreeFunc(event->data); + g_free(event); +} +G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree); + + /** + * nodeDeviceEventSubmit: + * @eventType: the event to be processed + * @data: additional data for the event processor (the pointer is stolen and it + * will be properly freed using @dataFreeFunc) + * @dataFreeFunc: callback to free @data + * + * Submits @eventType to be processed by the asynchronous event handling + * thread. + */ +static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data, virFreeCallback dataFreeFunc) +{ + nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1); + udevEventData *priv = NULL; + + if (!driver) + return -1; + + priv = driver->privateData; + + event->eventType = eventType; + event->data = data; + event->dataFreeFunc = dataFreeFunc; + if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) { + nodeDeviceEventFree(event); + return -1; + } + return 0; +} + static bool udevHasDeviceProperty(struct udev_device *dev, @@ -1446,8 +1507,8 @@ udevGetDeviceDetails(virNodeDeviceDriverState *driver_state, static int -udevRemoveOneDeviceSysPath(virNodeDeviceDriverState *driver_state, - const char *path) +processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state, + const char *path) { virNodeDeviceObj *obj = NULL; virNodeDeviceDef *def; @@ -1529,8 +1590,8 @@ udevSetParent(virNodeDeviceDriverState *driver_state, } static int -udevAddOneDevice(virNodeDeviceDriverState *driver_state, - struct udev_device *device) +processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state, + struct udev_device *device) { g_autofree char *sysfs_path = NULL; virNodeDeviceDef *def = NULL; @@ -1643,7 +1704,7 @@ udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state, device = udev_device_new_from_syspath(udev, name); if (device != NULL) { - if (udevAddOneDevice(driver_state, device) != 0) { + if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) { VIR_DEBUG("Failed to create node device for udev device '%s'", name); } @@ -1752,26 +1813,23 @@ udevHandleOneDevice(struct udev_device *device) VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device)); - if (STREQ(action, "add") || STREQ(action, "change")) - return udevAddOneDevice(driver, device); - - if (STREQ(action, "remove")) { - const char *path = udev_device_get_syspath(device); - - return udevRemoveOneDeviceSysPath(driver, path); - } - - if (STREQ(action, "move")) { - const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD"); - - if (devpath_old) { - g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old); - - udevRemoveOneDeviceSysPath(driver, devpath_old_fixed); - } - - return udevAddOneDevice(driver, device); + /* Reference is either released via workerpool logic or at the end of this + * function. */ + device = udev_device_ref(device); + if (STREQ(action, "add")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_ADD, device, + (virFreeCallback)udev_device_unref); + } else if (STREQ(action, "change")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_CHANGE, device, + (virFreeCallback)udev_device_unref); + } else if (STREQ(action, "remove")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_REMOVE, device, + (virFreeCallback)udev_device_unref); + } else if (STREQ(action, "move")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_MOVE, device, + (virFreeCallback)udev_device_unref); } + udev_device_unref(device); return 0; } @@ -1990,23 +2048,24 @@ udevSetupSystemDev(void) static void -nodeStateInitializeEnumerate(void *opaque) +processNodeStateInitializeEnumerate(virNodeDeviceDriverState *driver_state, + void *opaque) { struct udev *udev = opaque; - udevEventData *priv = driver->privateData; + udevEventData *priv = driver_state->privateData; /* Populate with known devices */ - if (udevEnumerateDevices(driver, udev) != 0) + if (udevEnumerateDevices(driver_state, udev) != 0) goto error; /* Load persistent mdevs (which might not be activated yet) and additional * information about active mediated devices from mdevctl */ - if (nodeDeviceUpdateMediatedDevices(driver) != 0) + if (nodeDeviceUpdateMediatedDevices(driver_state) != 0) goto error; cleanup: - VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) { - driver->initialized = true; - virCondBroadcast(&driver->initCond); + VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) { + driver_state->initialized = true; + virCondBroadcast(&driver_state->initCond); } return; @@ -2048,31 +2107,16 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED) static void -mdevctlUpdateThreadFunc(void *opaque) -{ - virNodeDeviceDriverState *driver_state = opaque; - - if (nodeDeviceUpdateMediatedDevices(driver_state) < 0) - VIR_WARN("mdevctl failed to update mediated devices"); -} - - -static void -launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque) +submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque) { udevEventData *priv = opaque; - virThread thread; if (priv->mdevctlTimeout != -1) { virEventRemoveTimeout(priv->mdevctlTimeout); priv->mdevctlTimeout = -1; } - if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc, - "mdevctl-thread", false, driver) < 0) { - virReportSystemError(errno, "%s", - _("failed to create mdevctl thread")); - } + nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED, NULL, NULL); } @@ -2167,7 +2211,7 @@ mdevctlEnableMonitor(udevEventData *priv) /* Schedules an mdevctl update for 100ms in the future, canceling any existing * timeout that may have been set. In this way, multiple update requests in * quick succession can be collapsed into a single update. if @force is true, - * an update thread will be spawned immediately. */ + * the worker job is submitted immediately. */ static void scheduleMdevctlUpdate(udevEventData *data, bool force) @@ -2175,12 +2219,12 @@ scheduleMdevctlUpdate(udevEventData *data, if (!force) { if (data->mdevctlTimeout != -1) virEventRemoveTimeout(data->mdevctlTimeout); - data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread, + data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate, data, NULL); return; } - launchMdevctlUpdateThread(-1, data); + submitMdevctlUpdate(-1, data); } @@ -2220,6 +2264,62 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED, } +static void nodeDeviceEventHandler(void *data, void *opaque) +{ + virNodeDeviceDriverState *driver_state = opaque; + g_autoptr(nodeDeviceEvent) processEvent = data; + + switch (processEvent->eventType) { + case NODE_DEVICE_EVENT_INIT: + { + struct udev *udev = processEvent->data; + + processNodeStateInitializeEnumerate(driver_state, udev); + } + break; + case NODE_DEVICE_EVENT_UDEV_ADD: + case NODE_DEVICE_EVENT_UDEV_CHANGE: + { + struct udev_device *device = processEvent->data; + + processNodeDeviceAddAndChangeEvent(driver_state, device); + } + break; + case NODE_DEVICE_EVENT_UDEV_REMOVE: + { + struct udev_device *device = processEvent->data; + const char *path = udev_device_get_syspath(device); + + processNodeDeviceRemoveEvent(driver_state, path); + } + break; + case NODE_DEVICE_EVENT_UDEV_MOVE: + { + struct udev_device *device = processEvent->data; + const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD"); + + if (devpath_old) { + g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old); + + processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed); + } + + processNodeDeviceAddAndChangeEvent(driver_state, device); + } + break; + case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED: + { + if (nodeDeviceUpdateMediatedDevices(driver_state) < 0) + VIR_WARN("mdevctl failed to update mediated devices"); + } + break; + case NODE_DEVICE_EVENT_LAST: + g_assert_not_reached(); + break; + } +} + + /* Note: It must be safe to call this function even if the driver was not * successfully initialized. This must be considered when changing this * function. */ @@ -2255,6 +2355,9 @@ nodeStateShutdownPrepare(void) priv->udevThreadQuit = true; virCondSignal(&priv->udevThreadCond); } + + if (priv->workerPool) + virThreadPoolStop(priv->workerPool); return 0; } @@ -2275,11 +2378,12 @@ nodeStateShutdownWait(void) return 0; VIR_WITH_OBJECT_LOCK_GUARD(priv) { - if (priv->initThread) - virThreadJoin(priv->initThread); if (priv->udevThread) virThreadJoin(priv->udevThread); } + + if (priv->workerPool) + virThreadPoolDrain(priv->workerPool); return 0; } @@ -2350,6 +2454,19 @@ nodeStateInitialize(bool privileged, driver->parserCallbacks.postParse = nodeDeviceDefPostParse; driver->parserCallbacks.validate = nodeDeviceDefValidate; + /* With the current design, we can only have exactly *one* worker thread as + * otherwise we cannot guarantee that the 'order(udev_events) == + * order(nodedev_events)' is preserved. The worker pool must be initialized + * before trying to reconnect to all the running mdevs since there might + * occur some mdevctl monitor events that will be dispatched to the worker + * pool. */ + priv->workerPool = virThreadPoolNewFull(1, 1, 0, nodeDeviceEventHandler, + "nodev-device-event", + NULL, + driver); + if (!priv->workerPool) + goto unlock; + if (udevPCITranslateInit(privileged) < 0) goto unlock; @@ -2407,13 +2524,7 @@ nodeStateInitialize(bool privileged, if (udevSetupSystemDev() != 0) goto cleanup; - priv->initThread = g_new0(virThread, 1); - if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate, - "nodedev-init", false, udev) < 0) { - virReportSystemError(errno, "%s", - _("failed to create udev enumerate thread")); - goto cleanup; - } + nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev), (virFreeCallback)udev_unref); return VIR_DRV_STATE_INIT_COMPLETE; diff --git a/src/test/test_driver.c b/src/test/test_driver.c index 81b1ba4294bd..76f89a224f21 100644 --- a/src/test/test_driver.c +++ b/src/test/test_driver.c @@ -7750,10 +7750,10 @@ testNodeDeviceDestroy(virNodeDevicePtr dev) if (virNodeDeviceGetWWNs(def, &wwnn, &wwpn) == -1) goto cleanup; - /* Unlike the real code we cannot run into the udevAddOneDevice race - * which would replace obj->def, so no need to save off the parent, - * but do need to drop the @obj lock so that the FindByName code doesn't - * deadlock on ourselves */ + /* Unlike the real code we cannot run into the + * processNodeDeviceAddAndChangeEvent race which would replace obj->def, so + * no need to save off the parent, but do need to drop the @obj lock so that + * the FindByName code doesn't deadlock on ourselves */ virObjectUnlock(obj); /* We do this just for basic validation and throw away the parentobj -- 2.34.1 _______________________________________________ Devel mailing list -- devel@xxxxxxxxxxxxxxxxx To unsubscribe send an email to devel-leave@xxxxxxxxxxxxxxxxx