> The following is a draft patch which implements atomic workqueues and > convert dm-crypt to use it instead of tasklet. It's an early draft and very > lightly tested but seems to work more or less. It's on top of wq/for6.9 + a > pending patchset. The following git branch can be used for testing. > > git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft > > I'll go over it to make sure all the pieces work. While it adds some > complications, it doesn't seem too bad and conversion from tasklet should be > straightforward too. > > - It hooks into tasklet[_hi] for now but if we get to update all of tasklet > users, we can just repurpose the tasklet softirq slots directly. > > - I thought about allowing busy-waits for flushes and cancels but it didn't > seem necessary. Keeping them blocking has the benefit of avoiding possible > nasty deadlocks. We can revisit if there's need. > > - Compared to tasklet, each work item goes through a bit more management > code because I wanted to keep the code as unified as possible to regular > threaded workqueues. That said, it's not a huge amount and my bet is that > the difference is unlikely to be noticeable. > > Thanks. > > From 8224d2602ef454ca164f4added765dc4dddd5e16 Mon Sep 17 00:00:00 2001 > From: Tejun Heo <tj@xxxxxxxxxx> > Date: Fri, 26 Jan 2024 13:21:42 -1000 > Subject: [PATCH] workqueue: DRAFT: Implement atomic workqueue and convert > dmcrypt to use it > > --- > drivers/md/dm-crypt.c | 36 +----- > include/linux/workqueue.h | 6 + > kernel/workqueue.c | 234 +++++++++++++++++++++++++++--------- > kernel/workqueue_internal.h | 3 + > 4 files changed, 186 insertions(+), 93 deletions(-) > > diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c > index 855b482cbff1..d375285db202 100644 > --- a/drivers/md/dm-crypt.c > +++ b/drivers/md/dm-crypt.c > @@ -73,11 +73,8 @@ struct dm_crypt_io { > struct bio *base_bio; > u8 *integrity_metadata; > bool integrity_metadata_from_pool:1; > - bool in_tasklet:1; > > struct work_struct work; > - struct tasklet_struct tasklet; > - > struct convert_context ctx; > > atomic_t io_pending; > @@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct crypt_config *cc, > io->ctx.r.req = NULL; > io->integrity_metadata = NULL; > io->integrity_metadata_from_pool = false; > - io->in_tasklet = false; > atomic_set(&io->io_pending, 0); > } > > @@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io) > atomic_inc(&io->io_pending); > } > > -static void kcryptd_io_bio_endio(struct work_struct *work) > -{ > - struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work); > - > - bio_endio(io->base_bio); > -} > - > /* > * One of the bios was finished. Check for completion of > * the whole request and correctly clean up the buffer. > @@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io) > kfree(io->integrity_metadata); > > base_bio->bi_status = error; > - > - /* > - * If we are running this function from our tasklet, > - * we can't call bio_endio() here, because it will call > - * clone_endio() from dm.c, which in turn will > - * free the current struct dm_crypt_io structure with > - * our tasklet. In this case we need to delay bio_endio() > - * execution to after the tasklet is done and dequeued. > - */ > - if (io->in_tasklet) { > - INIT_WORK(&io->work, kcryptd_io_bio_endio); > - queue_work(cc->io_queue, &io->work); > - return; > - } > - > bio_endio(base_bio); > } > > @@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work) > kcryptd_crypt_write_convert(io); > } > > -static void kcryptd_crypt_tasklet(unsigned long work) > -{ > - kcryptd_crypt((struct work_struct *)work); > -} > - > static void kcryptd_queue_crypt(struct dm_crypt_io *io) > { > struct crypt_config *cc = io->cc; > @@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io) > * it is being executed with irqs disabled. > */ > if (in_hardirq() || irqs_disabled()) { > - io->in_tasklet = true; > - tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work); > - tasklet_schedule(&io->tasklet); > + INIT_WORK(&io->work, kcryptd_crypt); > + queue_work(system_atomic_wq, &io->work); > return; > } > > diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h > index 232baea90a1d..1e4938b5b176 100644 > --- a/include/linux/workqueue.h > +++ b/include/linux/workqueue.h > @@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; } > * Documentation/core-api/workqueue.rst. > */ > enum wq_flags { > + WQ_ATOMIC = 1 << 0, /* execute in softirq context */ > WQ_UNBOUND = 1 << 1, /* not bound to any cpu */ > WQ_FREEZABLE = 1 << 2, /* freeze during suspend */ > WQ_MEM_RECLAIM = 1 << 3, /* may be used for memory reclaim */ > @@ -392,6 +393,9 @@ enum wq_flags { > __WQ_ORDERED = 1 << 17, /* internal: workqueue is ordered */ > __WQ_LEGACY = 1 << 18, /* internal: create*_workqueue() */ > __WQ_ORDERED_EXPLICIT = 1 << 19, /* internal: alloc_ordered_workqueue() */ > + > + /* atomic wq only allows the following flags */ > + __WQ_ATOMIC_ALLOWS = WQ_ATOMIC | WQ_HIGHPRI, > }; > > enum wq_consts { > @@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq; > extern struct workqueue_struct *system_freezable_wq; > extern struct workqueue_struct *system_power_efficient_wq; > extern struct workqueue_struct *system_freezable_power_efficient_wq; > +extern struct workqueue_struct *system_atomic_wq; > +extern struct workqueue_struct *system_atomic_highpri_wq; > > /** > * alloc_workqueue - allocate a workqueue > diff --git a/kernel/workqueue.c b/kernel/workqueue.c > index 23740c9ed57a..2a8f21494676 100644 > --- a/kernel/workqueue.c > +++ b/kernel/workqueue.c > @@ -73,7 +73,8 @@ enum worker_pool_flags { > * wq_pool_attach_mutex to avoid changing binding state while > * worker_attach_to_pool() is in progress. > */ > - POOL_MANAGER_ACTIVE = 1 << 0, /* being managed */ > + POOL_ATOMIC = 1 << 0, /* is an atomic pool */ > + POOL_MANAGER_ACTIVE = 1 << 1, /* being managed */ > POOL_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */ > }; > > @@ -115,6 +116,14 @@ enum wq_internal_consts { > WQ_NAME_LEN = 32, > }; > > +/* > + * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and > + * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because > + * msecs_to_jiffies() can't be an initializer. > + */ > +#define ATOMIC_WORKER_JIFFIES msecs_to_jiffies(2) > +#define ATOMIC_WORKER_RESTARTS 10 > + > /* > * Structure fields follow one of the following exclusion rules. > * > @@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false; > #endif > module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644); > > +/* the atomic worker pools */ > +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], > + atomic_worker_pools); > + > /* the per-cpu worker pools */ > -static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools); > +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], > + cpu_worker_pools); > > static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */ > > @@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init; > EXPORT_SYMBOL_GPL(system_power_efficient_wq); > struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init; > EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq); > +struct workqueue_struct *system_atomic_wq; > +EXPORT_SYMBOL_GPL(system_atomic_wq); > +struct workqueue_struct *system_atomic_highpri_wq; > +EXPORT_SYMBOL_GPL(system_atomic_highpri_wq); > > static int worker_thread(void *__worker); > +static void atomic_worker_taskletfn(struct tasklet_struct *tasklet); > static void workqueue_sysfs_unregister(struct workqueue_struct *wq); > static void show_pwq(struct pool_workqueue *pwq); > static void show_one_worker_pool(struct worker_pool *pool); > @@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool); > !lockdep_is_held(&wq_pool_mutex), \ > "RCU, wq->mutex or wq_pool_mutex should be held") > > +#define for_each_atomic_worker_pool(pool, cpu) \ > + for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0]; \ > + (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \ > + (pool)++) > + > #define for_each_cpu_worker_pool(pool, cpu) \ > for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \ > (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \ > @@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool) > if (!need_more_worker(pool) || !worker) > return false; > > + if (pool->flags & POOL_ATOMIC) { > + if (pool->attrs->nice == HIGHPRI_NICE_LEVEL) > + tasklet_hi_schedule(&worker->atomic_tasklet); > + else > + tasklet_schedule(&worker->atomic_tasklet); > + return true; > + } > + > p = worker->task; Tejun, I rushed to reply to the draft patch you sent, I should have looked harder. My apologies. The idea that I have been working on is to completely move away from using tasklets. Essentially, "get rid of tasklets entirely in the kernel". So, the use of tasklet_schedule() & tasklet_hi_schedule() will have to go. I have a very hacky draft that is still wip. I am going to borrow many bits from your patch which makes the work I have better. Perhaps we should start a separate thread, thoughts? Thanks. > > #ifdef CONFIG_SMP > @@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill) > lockdep_assert_held(&pool->lock); > > if (!nna) { > - /* per-cpu workqueue, pwq->nr_active is sufficient */ > - obtained = pwq->nr_active < READ_ONCE(wq->max_active); > + /* > + * An atomic workqueue always have a single worker per-cpu and > + * doesn't impose additional max_active limit. For a per-cpu > + * workqueue, checking pwq->nr_active is sufficient. > + */ > + if (wq->flags & WQ_ATOMIC) > + obtained = true; > + else > + obtained = pwq->nr_active < READ_ONCE(wq->max_active); > goto out; > } > > @@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool *pool) > > worker->id = id; > > - if (pool->cpu >= 0) > - snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id, > - pool->attrs->nice < 0 ? "H" : ""); > - else > - snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id); > - > - worker->task = kthread_create_on_node(worker_thread, worker, pool->node, > - "kworker/%s", id_buf); > - if (IS_ERR(worker->task)) { > - if (PTR_ERR(worker->task) == -EINTR) { > - pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n", > - id_buf); > - } else { > - pr_err_once("workqueue: Failed to create a worker thread: %pe", > - worker->task); > + if (pool->flags & POOL_ATOMIC) { > + tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn); > + } else { > + if (pool->cpu >= 0) > + snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id, > + pool->attrs->nice < 0 ? "H" : ""); > + else > + snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id); > + > + worker->task = kthread_create_on_node(worker_thread, worker, > + pool->node, "kworker/%s", id_buf); > + if (IS_ERR(worker->task)) { > + if (PTR_ERR(worker->task) == -EINTR) { > + pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n", > + id_buf); > + } else { > + pr_err_once("workqueue: Failed to create a worker thread: %pe", > + worker->task); > + } > + goto fail; > } > - goto fail; > - } > > - set_user_nice(worker->task, pool->attrs->nice); > - kthread_bind_mask(worker->task, pool_allowed_cpus(pool)); > + set_user_nice(worker->task, pool->attrs->nice); > + kthread_bind_mask(worker->task, pool_allowed_cpus(pool)); > + } > > /* successful, attach the worker to the pool */ > worker_attach_to_pool(worker, pool); > @@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool *pool) > * check if not woken up soon. As kick_pool() is noop if @pool is empty, > * wake it up explicitly. > */ > - wake_up_process(worker->task); > + if (worker->task) > + wake_up_process(worker->task); > > raw_spin_unlock_irq(&pool->lock); > > @@ -3043,25 +3087,35 @@ __acquires(&pool->lock) > lock_map_release(&lockdep_map); > lock_map_release(&pwq->wq->lockdep_map); > > - if (unlikely(in_atomic() || lockdep_depth(current) > 0 || > - rcu_preempt_depth() > 0)) { > - pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n" > - " last function: %ps\n", > - current->comm, preempt_count(), rcu_preempt_depth(), > - task_pid_nr(current), worker->current_func); > - debug_show_held_locks(current); > - dump_stack(); > - } > + if (worker->task) { > + if (unlikely(in_atomic() || lockdep_depth(current) > 0 || > + rcu_preempt_depth() > 0)) { > + pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n" > + " last function: %ps\n", > + current->comm, preempt_count(), > + rcu_preempt_depth(), task_pid_nr(current), > + worker->current_func); > + debug_show_held_locks(current); > + dump_stack(); > + } > > - /* > - * The following prevents a kworker from hogging CPU on !PREEMPTION > - * kernels, where a requeueing work item waiting for something to > - * happen could deadlock with stop_machine as such work item could > - * indefinitely requeue itself while all other CPUs are trapped in > - * stop_machine. At the same time, report a quiescent RCU state so > - * the same condition doesn't freeze RCU. > - */ > - cond_resched(); > + /* > + * The following prevents a kworker from hogging CPU on > + * !PREEMPTION kernels, where a requeueing work item waiting for > + * something to happen could deadlock with stop_machine as such > + * work item could indefinitely requeue itself while all other > + * CPUs are trapped in stop_machine. At the same time, report a > + * quiescent RCU state so the same condition doesn't freeze RCU. > + */ > + if (worker->task) > + cond_resched(); > + } else { > + if (unlikely(lockdep_depth(current) > 0)) { > + pr_err("BUG: atomic workqueue leaked lock: last function: %ps\n", > + worker->current_func); > + debug_show_held_locks(current); > + } > + } > > raw_spin_lock_irq(&pool->lock); > > @@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer) > goto repeat; > } > > +void atomic_worker_taskletfn(struct tasklet_struct *tasklet) > +{ > + struct worker *worker = > + container_of(tasklet, struct worker, atomic_tasklet); > + struct worker_pool *pool = worker->pool; > + int nr_restarts = ATOMIC_WORKER_RESTARTS; > + unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES; > + > + raw_spin_lock_irq(&pool->lock); > + worker_leave_idle(worker); > + > + /* > + * This function follows the structure of worker_thread(). See there for > + * explanations on each step. > + */ > + if (need_more_worker(pool)) > + goto done; > + > + WARN_ON_ONCE(!list_empty(&worker->scheduled)); > + worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND); > + > + do { > + struct work_struct *work = > + list_first_entry(&pool->worklist, > + struct work_struct, entry); > + > + if (assign_work(work, worker, NULL)) > + process_scheduled_works(worker); > + } while (--nr_restarts && time_before(jiffies, end) && > + keep_working(pool)); > + > + worker_set_flags(worker, WORKER_PREP); > +done: > + worker_enter_idle(worker); > + kick_pool(pool); > + raw_spin_unlock_irq(&pool->lock); > +} > + > /** > * check_flush_dependency - check for flush dependency sanity > * @target_wq: workqueue being flushed > @@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt, > size_t wq_size; > int name_len; > > + if (flags & WQ_ATOMIC) { > + if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS)) > + return NULL; > + if (WARN_ON_ONCE(max_active)) > + return NULL; > + } > + > /* > * Unbound && max_active == 1 used to imply ordered, which is no longer > * the case on many machines due to per-pod pools. While > @@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma > cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask); > } > > +static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice) > +{ > + BUG_ON(init_worker_pool(pool)); > + pool->cpu = cpu; > + cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); > + cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu)); > + pool->attrs->nice = nice; > + pool->attrs->affn_strict = true; > + pool->node = cpu_to_node(cpu); > + > + /* alloc pool ID */ > + mutex_lock(&wq_pool_mutex); > + BUG_ON(worker_pool_assign_id(pool)); > + mutex_unlock(&wq_pool_mutex); > +} > + > /** > * workqueue_init_early - early init for workqueue subsystem > * > @@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void) > pt->pod_node[0] = NUMA_NO_NODE; > pt->cpu_pod[0] = 0; > > - /* initialize CPU pools */ > + /* initialize atomic and CPU pools */ > for_each_possible_cpu(cpu) { > struct worker_pool *pool; > > i = 0; > - for_each_cpu_worker_pool(pool, cpu) { > - BUG_ON(init_worker_pool(pool)); > - pool->cpu = cpu; > - cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); > - cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu)); > - pool->attrs->nice = std_nice[i++]; > - pool->attrs->affn_strict = true; > - pool->node = cpu_to_node(cpu); > - > - /* alloc pool ID */ > - mutex_lock(&wq_pool_mutex); > - BUG_ON(worker_pool_assign_id(pool)); > - mutex_unlock(&wq_pool_mutex); > + for_each_atomic_worker_pool(pool, cpu) { > + init_cpu_worker_pool(pool, cpu, std_nice[i++]); > + pool->flags |= POOL_ATOMIC; > } > + > + i = 0; > + for_each_cpu_worker_pool(pool, cpu) > + init_cpu_worker_pool(pool, cpu, std_nice[i++]); > } > > /* create default unbound and ordered wq attrs */ > @@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void) > system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient", > WQ_FREEZABLE | WQ_POWER_EFFICIENT, > 0); > + system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0); > + system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq", > + WQ_ATOMIC | WQ_HIGHPRI, 0); > BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || > !system_unbound_wq || !system_freezable_wq || > !system_power_efficient_wq || > - !system_freezable_power_efficient_wq); > + !system_freezable_power_efficient_wq || > + !system_atomic_wq || !system_atomic_highpri_wq); > } > > static void __init wq_cpu_intensive_thresh_init(void) > @@ -7269,9 +7382,10 @@ void __init workqueue_init(void) > * up. Also, create a rescuer for workqueues that requested it. > */ > for_each_possible_cpu(cpu) { > - for_each_cpu_worker_pool(pool, cpu) { > + for_each_atomic_worker_pool(pool, cpu) > + pool->node = cpu_to_node(cpu); > + for_each_cpu_worker_pool(pool, cpu) > pool->node = cpu_to_node(cpu); > - } > } > > list_for_each_entry(wq, &workqueues, list) { > @@ -7284,6 +7398,8 @@ void __init workqueue_init(void) > > /* create the initial workers */ > for_each_online_cpu(cpu) { > + for_each_atomic_worker_pool(pool, cpu) > + BUG_ON(!create_worker(pool)); > for_each_cpu_worker_pool(pool, cpu) { > pool->flags &= ~POOL_DISASSOCIATED; > BUG_ON(!create_worker(pool)); > diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h > index f6275944ada7..f65f204f38ea 100644 > --- a/kernel/workqueue_internal.h > +++ b/kernel/workqueue_internal.h > @@ -10,6 +10,7 @@ > > #include <linux/workqueue.h> > #include <linux/kthread.h> > +#include <linux/interrupt.h> > #include <linux/preempt.h> > > struct worker_pool; > @@ -42,6 +43,8 @@ struct worker { > struct list_head scheduled; /* L: scheduled works */ > > struct task_struct *task; /* I: worker task */ > + struct tasklet_struct atomic_tasklet; /* I: tasklet for atomic pool */ > + > struct worker_pool *pool; /* A: the associated pool */ > /* L: for rescuers */ > struct list_head node; /* A: anchored at pool->workers */ > -- > 2.43.0 > >