Re: [PATCH] softirq: fix memory corruption when freeing tasklet_struct

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hello,

The following is a draft patch which implements atomic workqueues and
convert dm-crypt to use it instead of tasklet. It's an early draft and very
lightly tested but seems to work more or less. It's on top of wq/for6.9 + a
pending patchset. The following git branch can be used for testing.

  git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft

I'll go over it to make sure all the pieces work. While it adds some
complications, it doesn't seem too bad and conversion from tasklet should be
straightforward too.

- It hooks into tasklet[_hi] for now but if we get to update all of tasklet
  users, we can just repurpose the tasklet softirq slots directly.

- I thought about allowing busy-waits for flushes and cancels but it didn't
  seem necessary. Keeping them blocking has the benefit of avoiding possible
  nasty deadlocks. We can revisit if there's need.

- Compared to tasklet, each work item goes through a bit more management
  code because I wanted to keep the code as unified as possible to regular
  threaded workqueues. That said, it's not a huge amount and my bet is that
  the difference is unlikely to be noticeable.

Thanks.

>From 8224d2602ef454ca164f4added765dc4dddd5e16 Mon Sep 17 00:00:00 2001
From: Tejun Heo <tj@xxxxxxxxxx>
Date: Fri, 26 Jan 2024 13:21:42 -1000
Subject: [PATCH] workqueue: DRAFT: Implement atomic workqueue and convert
 dmcrypt to use it

---
 drivers/md/dm-crypt.c       |  36 +-----
 include/linux/workqueue.h   |   6 +
 kernel/workqueue.c          | 234 +++++++++++++++++++++++++++---------
 kernel/workqueue_internal.h |   3 +
 4 files changed, 186 insertions(+), 93 deletions(-)

diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c
index 855b482cbff1..d375285db202 100644
--- a/drivers/md/dm-crypt.c
+++ b/drivers/md/dm-crypt.c
@@ -73,11 +73,8 @@ struct dm_crypt_io {
 	struct bio *base_bio;
 	u8 *integrity_metadata;
 	bool integrity_metadata_from_pool:1;
-	bool in_tasklet:1;
 
 	struct work_struct work;
-	struct tasklet_struct tasklet;
-
 	struct convert_context ctx;
 
 	atomic_t io_pending;
@@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct crypt_config *cc,
 	io->ctx.r.req = NULL;
 	io->integrity_metadata = NULL;
 	io->integrity_metadata_from_pool = false;
-	io->in_tasklet = false;
 	atomic_set(&io->io_pending, 0);
 }
 
@@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io)
 	atomic_inc(&io->io_pending);
 }
 
-static void kcryptd_io_bio_endio(struct work_struct *work)
-{
-	struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work);
-
-	bio_endio(io->base_bio);
-}
-
 /*
  * One of the bios was finished. Check for completion of
  * the whole request and correctly clean up the buffer.
@@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io)
 		kfree(io->integrity_metadata);
 
 	base_bio->bi_status = error;
-
-	/*
-	 * If we are running this function from our tasklet,
-	 * we can't call bio_endio() here, because it will call
-	 * clone_endio() from dm.c, which in turn will
-	 * free the current struct dm_crypt_io structure with
-	 * our tasklet. In this case we need to delay bio_endio()
-	 * execution to after the tasklet is done and dequeued.
-	 */
-	if (io->in_tasklet) {
-		INIT_WORK(&io->work, kcryptd_io_bio_endio);
-		queue_work(cc->io_queue, &io->work);
-		return;
-	}
-
 	bio_endio(base_bio);
 }
 
@@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work)
 		kcryptd_crypt_write_convert(io);
 }
 
-static void kcryptd_crypt_tasklet(unsigned long work)
-{
-	kcryptd_crypt((struct work_struct *)work);
-}
-
 static void kcryptd_queue_crypt(struct dm_crypt_io *io)
 {
 	struct crypt_config *cc = io->cc;
@@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io)
 		 * it is being executed with irqs disabled.
 		 */
 		if (in_hardirq() || irqs_disabled()) {
-			io->in_tasklet = true;
-			tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
-			tasklet_schedule(&io->tasklet);
+			INIT_WORK(&io->work, kcryptd_crypt);
+			queue_work(system_atomic_wq, &io->work);
 			return;
 		}
 
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 232baea90a1d..1e4938b5b176 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
  * Documentation/core-api/workqueue.rst.
  */
 enum wq_flags {
+	WQ_ATOMIC		= 1 << 0, /* execute in softirq context */
 	WQ_UNBOUND		= 1 << 1, /* not bound to any cpu */
 	WQ_FREEZABLE		= 1 << 2, /* freeze during suspend */
 	WQ_MEM_RECLAIM		= 1 << 3, /* may be used for memory reclaim */
@@ -392,6 +393,9 @@ enum wq_flags {
 	__WQ_ORDERED		= 1 << 17, /* internal: workqueue is ordered */
 	__WQ_LEGACY		= 1 << 18, /* internal: create*_workqueue() */
 	__WQ_ORDERED_EXPLICIT	= 1 << 19, /* internal: alloc_ordered_workqueue() */
+
+	/* atomic wq only allows the following flags */
+	__WQ_ATOMIC_ALLOWS	= WQ_ATOMIC | WQ_HIGHPRI,
 };
 
 enum wq_consts {
@@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq;
 extern struct workqueue_struct *system_freezable_wq;
 extern struct workqueue_struct *system_power_efficient_wq;
 extern struct workqueue_struct *system_freezable_power_efficient_wq;
+extern struct workqueue_struct *system_atomic_wq;
+extern struct workqueue_struct *system_atomic_highpri_wq;
 
 /**
  * alloc_workqueue - allocate a workqueue
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 23740c9ed57a..2a8f21494676 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -73,7 +73,8 @@ enum worker_pool_flags {
 	 * wq_pool_attach_mutex to avoid changing binding state while
 	 * worker_attach_to_pool() is in progress.
 	 */
-	POOL_MANAGER_ACTIVE	= 1 << 0,	/* being managed */
+	POOL_ATOMIC		= 1 << 0,	/* is an atomic pool */
+	POOL_MANAGER_ACTIVE	= 1 << 1,	/* being managed */
 	POOL_DISASSOCIATED	= 1 << 2,	/* cpu can't serve workers */
 };
 
@@ -115,6 +116,14 @@ enum wq_internal_consts {
 	WQ_NAME_LEN		= 32,
 };
 
+/*
+ * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
+ * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
+ * msecs_to_jiffies() can't be an initializer.
+ */
+#define ATOMIC_WORKER_JIFFIES	msecs_to_jiffies(2)
+#define ATOMIC_WORKER_RESTARTS	10
+
 /*
  * Structure fields follow one of the following exclusion rules.
  *
@@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false;
 #endif
 module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
 
+/* the atomic worker pools */
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+				     atomic_worker_pools);
+
 /* the per-cpu worker pools */
-static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+				     cpu_worker_pools);
 
 static DEFINE_IDR(worker_pool_idr);	/* PR: idr of all pools */
 
@@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_power_efficient_wq);
 struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
+struct workqueue_struct *system_atomic_wq;
+EXPORT_SYMBOL_GPL(system_atomic_wq);
+struct workqueue_struct *system_atomic_highpri_wq;
+EXPORT_SYMBOL_GPL(system_atomic_highpri_wq);
 
 static int worker_thread(void *__worker);
+static void atomic_worker_taskletfn(struct tasklet_struct *tasklet);
 static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
 static void show_pwq(struct pool_workqueue *pwq);
 static void show_one_worker_pool(struct worker_pool *pool);
@@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
 			 !lockdep_is_held(&wq_pool_mutex),		\
 			 "RCU, wq->mutex or wq_pool_mutex should be held")
 
+#define for_each_atomic_worker_pool(pool, cpu)				\
+	for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0];		\
+	     (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
+	     (pool)++)
+
 #define for_each_cpu_worker_pool(pool, cpu)				\
 	for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];		\
 	     (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
@@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool)
 	if (!need_more_worker(pool) || !worker)
 		return false;
 
+	if (pool->flags & POOL_ATOMIC) {
+		if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
+			tasklet_hi_schedule(&worker->atomic_tasklet);
+		else
+			tasklet_schedule(&worker->atomic_tasklet);
+		return true;
+	}
+
 	p = worker->task;
 
 #ifdef CONFIG_SMP
@@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
 	lockdep_assert_held(&pool->lock);
 
 	if (!nna) {
-		/* per-cpu workqueue, pwq->nr_active is sufficient */
-		obtained = pwq->nr_active < READ_ONCE(wq->max_active);
+		/*
+		 * An atomic workqueue always have a single worker per-cpu and
+		 * doesn't impose additional max_active limit. For a per-cpu
+		 * workqueue, checking pwq->nr_active is sufficient.
+		 */
+		if (wq->flags & WQ_ATOMIC)
+			obtained = true;
+		else
+			obtained = pwq->nr_active < READ_ONCE(wq->max_active);
 		goto out;
 	}
 
@@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool *pool)
 
 	worker->id = id;
 
-	if (pool->cpu >= 0)
-		snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
-			 pool->attrs->nice < 0  ? "H" : "");
-	else
-		snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
-
-	worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
-					      "kworker/%s", id_buf);
-	if (IS_ERR(worker->task)) {
-		if (PTR_ERR(worker->task) == -EINTR) {
-			pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
-			       id_buf);
-		} else {
-			pr_err_once("workqueue: Failed to create a worker thread: %pe",
-				    worker->task);
+	if (pool->flags & POOL_ATOMIC) {
+		tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn);
+	} else {
+		if (pool->cpu >= 0)
+			snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
+				 pool->attrs->nice < 0  ? "H" : "");
+		else
+			snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
+
+		worker->task = kthread_create_on_node(worker_thread, worker,
+					pool->node, "kworker/%s", id_buf);
+		if (IS_ERR(worker->task)) {
+			if (PTR_ERR(worker->task) == -EINTR) {
+				pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
+				       id_buf);
+			} else {
+				pr_err_once("workqueue: Failed to create a worker thread: %pe",
+					    worker->task);
+			}
+			goto fail;
 		}
-		goto fail;
-	}
 
-	set_user_nice(worker->task, pool->attrs->nice);
-	kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+		set_user_nice(worker->task, pool->attrs->nice);
+		kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+	}
 
 	/* successful, attach the worker to the pool */
 	worker_attach_to_pool(worker, pool);
@@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool *pool)
 	 * check if not woken up soon. As kick_pool() is noop if @pool is empty,
 	 * wake it up explicitly.
 	 */
-	wake_up_process(worker->task);
+	if (worker->task)
+		wake_up_process(worker->task);
 
 	raw_spin_unlock_irq(&pool->lock);
 
@@ -3043,25 +3087,35 @@ __acquires(&pool->lock)
 	lock_map_release(&lockdep_map);
 	lock_map_release(&pwq->wq->lockdep_map);
 
-	if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
-		     rcu_preempt_depth() > 0)) {
-		pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
-		       "     last function: %ps\n",
-		       current->comm, preempt_count(), rcu_preempt_depth(),
-		       task_pid_nr(current), worker->current_func);
-		debug_show_held_locks(current);
-		dump_stack();
-	}
+	if (worker->task) {
+		if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
+			     rcu_preempt_depth() > 0)) {
+			pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
+			       "     last function: %ps\n",
+			       current->comm, preempt_count(),
+			       rcu_preempt_depth(), task_pid_nr(current),
+			       worker->current_func);
+			debug_show_held_locks(current);
+			dump_stack();
+		}
 
-	/*
-	 * The following prevents a kworker from hogging CPU on !PREEMPTION
-	 * kernels, where a requeueing work item waiting for something to
-	 * happen could deadlock with stop_machine as such work item could
-	 * indefinitely requeue itself while all other CPUs are trapped in
-	 * stop_machine. At the same time, report a quiescent RCU state so
-	 * the same condition doesn't freeze RCU.
-	 */
-	cond_resched();
+		/*
+		 * The following prevents a kworker from hogging CPU on
+		 * !PREEMPTION kernels, where a requeueing work item waiting for
+		 * something to happen could deadlock with stop_machine as such
+		 * work item could indefinitely requeue itself while all other
+		 * CPUs are trapped in stop_machine. At the same time, report a
+		 * quiescent RCU state so the same condition doesn't freeze RCU.
+		 */
+		if (worker->task)
+			cond_resched();
+	} else {
+		if (unlikely(lockdep_depth(current) > 0)) {
+			pr_err("BUG: atomic workqueue leaked lock: last function: %ps\n",
+			       worker->current_func);
+			debug_show_held_locks(current);
+		}
+	}
 
 	raw_spin_lock_irq(&pool->lock);
 
@@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer)
 	goto repeat;
 }
 
+void atomic_worker_taskletfn(struct tasklet_struct *tasklet)
+{
+	struct worker *worker =
+		container_of(tasklet, struct worker, atomic_tasklet);
+	struct worker_pool *pool = worker->pool;
+	int nr_restarts = ATOMIC_WORKER_RESTARTS;
+	unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES;
+
+	raw_spin_lock_irq(&pool->lock);
+	worker_leave_idle(worker);
+
+	/*
+	 * This function follows the structure of worker_thread(). See there for
+	 * explanations on each step.
+	 */
+	if (need_more_worker(pool))
+		goto done;
+
+	WARN_ON_ONCE(!list_empty(&worker->scheduled));
+	worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
+
+	do {
+		struct work_struct *work =
+			list_first_entry(&pool->worklist,
+					 struct work_struct, entry);
+
+		if (assign_work(work, worker, NULL))
+			process_scheduled_works(worker);
+	} while (--nr_restarts && time_before(jiffies, end) &&
+		 keep_working(pool));
+
+	worker_set_flags(worker, WORKER_PREP);
+done:
+	worker_enter_idle(worker);
+	kick_pool(pool);
+	raw_spin_unlock_irq(&pool->lock);
+}
+
 /**
  * check_flush_dependency - check for flush dependency sanity
  * @target_wq: workqueue being flushed
@@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
 	size_t wq_size;
 	int name_len;
 
+	if (flags & WQ_ATOMIC) {
+		if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS))
+			return NULL;
+		if (WARN_ON_ONCE(max_active))
+			return NULL;
+	}
+
 	/*
 	 * Unbound && max_active == 1 used to imply ordered, which is no longer
 	 * the case on many machines due to per-pod pools. While
@@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
 	cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
 }
 
+static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
+{
+	BUG_ON(init_worker_pool(pool));
+	pool->cpu = cpu;
+	cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
+	cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
+	pool->attrs->nice = nice;
+	pool->attrs->affn_strict = true;
+	pool->node = cpu_to_node(cpu);
+
+	/* alloc pool ID */
+	mutex_lock(&wq_pool_mutex);
+	BUG_ON(worker_pool_assign_id(pool));
+	mutex_unlock(&wq_pool_mutex);
+}
+
 /**
  * workqueue_init_early - early init for workqueue subsystem
  *
@@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void)
 	pt->pod_node[0] = NUMA_NO_NODE;
 	pt->cpu_pod[0] = 0;
 
-	/* initialize CPU pools */
+	/* initialize atomic and CPU pools */
 	for_each_possible_cpu(cpu) {
 		struct worker_pool *pool;
 
 		i = 0;
-		for_each_cpu_worker_pool(pool, cpu) {
-			BUG_ON(init_worker_pool(pool));
-			pool->cpu = cpu;
-			cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
-			cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
-			pool->attrs->nice = std_nice[i++];
-			pool->attrs->affn_strict = true;
-			pool->node = cpu_to_node(cpu);
-
-			/* alloc pool ID */
-			mutex_lock(&wq_pool_mutex);
-			BUG_ON(worker_pool_assign_id(pool));
-			mutex_unlock(&wq_pool_mutex);
+		for_each_atomic_worker_pool(pool, cpu) {
+			init_cpu_worker_pool(pool, cpu, std_nice[i++]);
+			pool->flags |= POOL_ATOMIC;
 		}
+
+		i = 0;
+		for_each_cpu_worker_pool(pool, cpu)
+			init_cpu_worker_pool(pool, cpu, std_nice[i++]);
 	}
 
 	/* create default unbound and ordered wq attrs */
@@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void)
 	system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
 					      WQ_FREEZABLE | WQ_POWER_EFFICIENT,
 					      0);
+	system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0);
+	system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq",
+						   WQ_ATOMIC | WQ_HIGHPRI, 0);
 	BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
 	       !system_unbound_wq || !system_freezable_wq ||
 	       !system_power_efficient_wq ||
-	       !system_freezable_power_efficient_wq);
+	       !system_freezable_power_efficient_wq ||
+	       !system_atomic_wq || !system_atomic_highpri_wq);
 }
 
 static void __init wq_cpu_intensive_thresh_init(void)
@@ -7269,9 +7382,10 @@ void __init workqueue_init(void)
 	 * up. Also, create a rescuer for workqueues that requested it.
 	 */
 	for_each_possible_cpu(cpu) {
-		for_each_cpu_worker_pool(pool, cpu) {
+		for_each_atomic_worker_pool(pool, cpu)
+			pool->node = cpu_to_node(cpu);
+		for_each_cpu_worker_pool(pool, cpu)
 			pool->node = cpu_to_node(cpu);
-		}
 	}
 
 	list_for_each_entry(wq, &workqueues, list) {
@@ -7284,6 +7398,8 @@ void __init workqueue_init(void)
 
 	/* create the initial workers */
 	for_each_online_cpu(cpu) {
+		for_each_atomic_worker_pool(pool, cpu)
+			BUG_ON(!create_worker(pool));
 		for_each_cpu_worker_pool(pool, cpu) {
 			pool->flags &= ~POOL_DISASSOCIATED;
 			BUG_ON(!create_worker(pool));
diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
index f6275944ada7..f65f204f38ea 100644
--- a/kernel/workqueue_internal.h
+++ b/kernel/workqueue_internal.h
@@ -10,6 +10,7 @@
 
 #include <linux/workqueue.h>
 #include <linux/kthread.h>
+#include <linux/interrupt.h>
 #include <linux/preempt.h>
 
 struct worker_pool;
@@ -42,6 +43,8 @@ struct worker {
 	struct list_head	scheduled;	/* L: scheduled works */
 
 	struct task_struct	*task;		/* I: worker task */
+	struct tasklet_struct	atomic_tasklet;	/* I: tasklet for atomic pool */
+
 	struct worker_pool	*pool;		/* A: the associated pool */
 						/* L: for rescuers */
 	struct list_head	node;		/* A: anchored at pool->workers */
-- 
2.43.0





[Index of Archives]     [DM Crypt]     [Fedora Desktop]     [ATA RAID]     [Fedora Marketing]     [Fedora Packaging]     [Fedora SELinux]     [Yosemite Discussion]     [KDE Users]     [Fedora Docs]

  Powered by Linux