Use a worker pool for processing the events (e.g. udev, mdevctl config changes) and the initialization instead of a separate initThread and a mdevctl-thread. This has the large advantage that we can leverage the job API and now this thread pool is responsible to do all the "costly-work" and emitting the libvirt nodedev events. Signed-off-by: Marc Hartmayer <mhartmay@xxxxxxxxxxxxx> --- src/node_device/node_device_udev.c | 244 +++++++++++++++++++++-------- 1 file changed, 179 insertions(+), 65 deletions(-) diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c index e4b1532dc385..67a8b5cd7132 100644 --- a/src/node_device/node_device_udev.c +++ b/src/node_device/node_device_udev.c @@ -43,6 +43,7 @@ #include "virnetdev.h" #include "virmdev.h" #include "virutil.h" +#include "virthreadpool.h" #include "configmake.h" @@ -69,13 +70,13 @@ struct _udevEventData { bool udevThreadQuit; bool udevDataReady; - /* init thread */ - virThread *initThread; - /* Protects @mdevctlMonitors */ virMutex mdevctlLock; GList *mdevctlMonitors; int mdevctlTimeout; + + /* Immutable pointer, self-locking APIs */ + virThreadPool *workerPool; }; static virClass *udevEventDataClass; @@ -86,8 +87,6 @@ udevEventDataDispose(void *obj) struct udev *udev = NULL; udevEventData *priv = obj; - g_clear_pointer(&priv->initThread, g_free); - VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) { g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors), g_object_unref); } @@ -100,6 +99,8 @@ udevEventDataDispose(void *obj) udev_unref(udev); } + g_clear_pointer(&priv->workerPool, virThreadPoolFree); + virMutexDestroy(&priv->mdevctlLock); virCondDestroy(&priv->udevThreadCond); @@ -143,6 +144,66 @@ udevEventDataNew(void) return ret; } +typedef enum { + NODE_DEVICE_EVENT_INIT = 0, + NODE_DEVICE_EVENT_UDEV_ADD, + NODE_DEVICE_EVENT_UDEV_REMOVE, + NODE_DEVICE_EVENT_UDEV_CHANGE, + NODE_DEVICE_EVENT_UDEV_MOVE, + NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED, + + NODE_DEVICE_EVENT_LAST +} nodeDeviceEventType; + +struct _nodeDeviceEvent { + nodeDeviceEventType eventType; + void *data; + virFreeCallback dataFreeFunc; +}; +typedef struct _nodeDeviceEvent nodeDeviceEvent; + +static void +nodeDeviceEventFree(nodeDeviceEvent *event) +{ + if (!event) + return; + + if (event->dataFreeFunc) + event->dataFreeFunc(event->data); + g_free(event); +} +G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree); + + /** + * nodeDeviceEventSubmit: + * @eventType: the event to be processed + * @data: additional data for the event processor (the pointer is stolen and it + * will be properly freed using @dataFreeFunc) + * @dataFreeFunc: callback to free @data + * + * Submits @eventType to be processed by the asynchronous event handling + * thread. + */ +static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data, virFreeCallback dataFreeFunc) +{ + nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1); + udevEventData *priv = NULL; + + if (!driver) + return -1; + + priv = driver->privateData; + + event->eventType = eventType; + event->data = data; + event->dataFreeFunc = dataFreeFunc; + if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) { + nodeDeviceEventFree(event); + return -1; + } + return 0; +} + static bool udevHasDeviceProperty(struct udev_device *dev, @@ -1447,7 +1508,7 @@ static void scheduleMdevctlUpdate(udevEventData *data, bool force); static int -udevRemoveOneDeviceSysPath(virNodeDeviceDriverState *driver_state, const char *path) +processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state, const char *path) { virNodeDeviceObj *obj = NULL; virNodeDeviceDef *def; @@ -1529,7 +1590,7 @@ udevSetParent(virNodeDeviceDriverState *driver_state, struct udev_device *device } static int -udevAddOneDevice(virNodeDeviceDriverState *driver_state, struct udev_device *device) +processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state, struct udev_device *device) { g_autofree char *sysfs_path = NULL; virNodeDeviceDef *def = NULL; @@ -1647,7 +1708,7 @@ udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state, struct udev * device = udev_device_new_from_syspath(udev, name); if (device != NULL) { - if (udevAddOneDevice(driver_state, device) != 0) { + if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) { VIR_DEBUG("Failed to create node device for udev device '%s'", name); } @@ -1755,26 +1816,23 @@ udevHandleOneDevice(struct udev_device *device) VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device)); - if (STREQ(action, "add") || STREQ(action, "change")) - return udevAddOneDevice(driver, device); - - if (STREQ(action, "remove")) { - const char *path = udev_device_get_syspath(device); - - return udevRemoveOneDeviceSysPath(driver, path); - } - - if (STREQ(action, "move")) { - const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD"); - - if (devpath_old) { - g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old); - - udevRemoveOneDeviceSysPath(driver, devpath_old_fixed); - } - - return udevAddOneDevice(driver, device); + /* Reference is either released via workerpool logic or at the end of this + * function. */ + device = udev_device_ref(device); + if (STREQ(action, "add")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_ADD, device, + (virFreeCallback)udev_device_unref); + } else if (STREQ(action, "change")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_CHANGE, device, + (virFreeCallback)udev_device_unref); + } else if (STREQ(action, "remove")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_REMOVE, device, + (virFreeCallback)udev_device_unref); + } else if (STREQ(action, "move")) { + return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_MOVE, device, + (virFreeCallback)udev_device_unref); } + udev_device_unref(device); return 0; } @@ -1993,23 +2051,23 @@ udevSetupSystemDev(void) static void -nodeStateInitializeEnumerate(void *opaque) +processNodeStateInitializeEnumerate(virNodeDeviceDriverState *driver_state, void *opaque) { - udevEventData *priv = driver->privateData; + udevEventData *priv = driver_state->privateData; struct udev *udev = opaque; /* Populate with known devices */ - if (udevEnumerateDevices(driver, udev) != 0) + if (udevEnumerateDevices(driver_state, udev) != 0) goto error; /* Load persistent mdevs (which might not be activated yet) and additional * information about active mediated devices from mdevctl */ - if (nodeDeviceUpdateMediatedDevices(driver) != 0) + if (nodeDeviceUpdateMediatedDevices(driver_state) != 0) goto error; cleanup: - VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) { - driver->initialized = true; - virCondBroadcast(&driver->initCond); + VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) { + driver_state->initialized = true; + virCondBroadcast(&driver_state->initCond); } return; @@ -2051,31 +2109,16 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED) static void -mdevctlUpdateThreadFunc(void *opaque) -{ - virNodeDeviceDriverState *driver_state = opaque; - - if (nodeDeviceUpdateMediatedDevices(driver_state) < 0) - VIR_WARN("mdevctl failed to update mediated devices"); -} - - -static void -launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque) +submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque) { udevEventData *priv = opaque; - virThread thread; if (priv->mdevctlTimeout != -1) { virEventRemoveTimeout(priv->mdevctlTimeout); priv->mdevctlTimeout = -1; } - if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc, - "mdevctl-thread", false, driver) < 0) { - virReportSystemError(errno, "%s", - _("failed to create mdevctl thread")); - } + nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED, NULL, NULL); } @@ -2170,7 +2213,7 @@ mdevctlEnableMonitor(udevEventData *priv) /* Schedules an mdevctl update for 100ms in the future, canceling any existing * timeout that may have been set. In this way, multiple update requests in * quick succession can be collapsed into a single update. if @force is true, - * an update thread will be spawned immediately. */ + * the worker job is submitted immediately. */ static void scheduleMdevctlUpdate(udevEventData *data, bool force) @@ -2178,12 +2221,12 @@ scheduleMdevctlUpdate(udevEventData *data, if (!force) { if (data->mdevctlTimeout != -1) virEventRemoveTimeout(data->mdevctlTimeout); - data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread, + data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate, data, NULL); return; } - launchMdevctlUpdateThread(-1, data); + submitMdevctlUpdate(-1, data); } @@ -2223,6 +2266,62 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED, } +static void nodeDeviceEventHandler(void *data, void *opaque) +{ + virNodeDeviceDriverState *driver_state = opaque; + g_autoptr(nodeDeviceEvent) processEvent = data; + + switch (processEvent->eventType) { + case NODE_DEVICE_EVENT_INIT: + { + struct udev *udev = processEvent->data; + + processNodeStateInitializeEnumerate(driver_state, udev); + } + break; + case NODE_DEVICE_EVENT_UDEV_ADD: + case NODE_DEVICE_EVENT_UDEV_CHANGE: + { + struct udev_device *device = processEvent->data; + + processNodeDeviceAddAndChangeEvent(driver_state, device); + } + break; + case NODE_DEVICE_EVENT_UDEV_REMOVE: + { + struct udev_device *device = processEvent->data; + const char *path = udev_device_get_syspath(device); + + processNodeDeviceRemoveEvent(driver_state, path); + } + break; + case NODE_DEVICE_EVENT_UDEV_MOVE: + { + struct udev_device *device = processEvent->data; + const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD"); + + if (devpath_old) { + g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old); + + processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed); + } + + processNodeDeviceAddAndChangeEvent(driver_state, device); + } + break; + case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED: + { + if (nodeDeviceUpdateMediatedDevices(driver_state) < 0) + VIR_WARN("mdevctl failed to update mediated devices"); + } + break; + case NODE_DEVICE_EVENT_LAST: + g_assert_not_reached(); + break; + } +} + + /* Note: It must be safe to call this function even if the driver was not * successfully initialized. This must be considered when changing this * function. */ @@ -2258,6 +2357,9 @@ nodeStateShutdownPrepare(void) priv->udevThreadQuit = true; virCondSignal(&priv->udevThreadCond); } + + if (priv->workerPool) + virThreadPoolStop(priv->workerPool); return 0; } @@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void) return 0; VIR_WITH_OBJECT_LOCK_GUARD(priv) { - if (priv->initThread) - virThreadJoin(priv->initThread); - if (priv->udevThread) - virThreadJoin(priv->udevThread); + if (priv->mdevctlTimeout != -1) { + virEventRemoveTimeout(priv->mdevctlTimeout); + priv->mdevctlTimeout = -1; + } + + if (priv->watch) { + virEventRemoveHandle(priv->watch); + priv->watch = -1; + } } + + if (priv->workerPool) + virThreadPoolDrain(priv->workerPool); return 0; } @@ -2353,6 +2463,16 @@ nodeStateInitialize(bool privileged, driver->parserCallbacks.postParse = nodeDeviceDefPostParse; driver->parserCallbacks.validate = nodeDeviceDefValidate; + /* must be initialized before trying to reconnect to all the running mdevs + * since there might occur some mdevctl monitor events that will be + * dispatched to the worker pool */ + priv->workerPool = virThreadPoolNewFull(1, 1, 0, nodeDeviceEventHandler, + "nodev-device-event", + NULL, + driver); + if (!priv->workerPool) + goto unlock; + if (udevPCITranslateInit(privileged) < 0) goto unlock; @@ -2410,13 +2530,7 @@ nodeStateInitialize(bool privileged, if (udevSetupSystemDev() != 0) goto cleanup; - priv->initThread = g_new0(virThread, 1); - if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate, - "nodedev-init", false, udev) < 0) { - virReportSystemError(errno, "%s", - _("failed to create udev enumerate thread")); - goto cleanup; - } + nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev), (virFreeCallback)udev_unref); return VIR_DRV_STATE_INIT_COMPLETE; -- 2.34.1 _______________________________________________ Devel mailing list -- devel@xxxxxxxxxxxxxxxxx To unsubscribe send an email to devel-leave@xxxxxxxxxxxxxxxxx