Re: [PATCH] softirq: fix memory corruption when freeing tasklet_struct

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



>
> The following is a draft patch which implements atomic workqueues and
> convert dm-crypt to use it instead of tasklet. It's an early draft and very
> lightly tested but seems to work more or less. It's on top of wq/for6.9 + a
> pending patchset. The following git branch can be used for testing.
>
>   git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft
>
> I'll go over it to make sure all the pieces work. While it adds some
> complications, it doesn't seem too bad and conversion from tasklet should be
> straightforward too.
>
> - It hooks into tasklet[_hi] for now but if we get to update all of tasklet
>   users, we can just repurpose the tasklet softirq slots directly.
>
> - I thought about allowing busy-waits for flushes and cancels but it didn't
>   seem necessary. Keeping them blocking has the benefit of avoiding possible
>   nasty deadlocks. We can revisit if there's need.
>
> - Compared to tasklet, each work item goes through a bit more management
>   code because I wanted to keep the code as unified as possible to regular
>   threaded workqueues. That said, it's not a huge amount and my bet is that
>   the difference is unlikely to be noticeable.
>

Tejun,

  Thank you very much. I really like the approach you have taken in
this patchset.
My design/thought was very similar to the idea you have, tocreate a
new workqueue specifically for bottom half execution.
I missed this bit,   WQ_ATOMIC | WQ_HIGHPRI and many other cases that
you have covered in your patch.

  Here's a partial diff of the idea:

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 2cc0a9606175..3eaa80826b8d 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -348,6 +348,7 @@ static inline unsigned int work_static(struct
work_struct *work) { return 0; }
  * Documentation/core-api/workqueue.rst.
  */
 enum {
+       WQ_IRQ                  = 1 << 0, /* for bottom half */
        WQ_UNBOUND              = 1 << 1, /* not bound to any cpu */
        WQ_FREEZABLE            = 1 << 2, /* freeze during suspend */
        WQ_MEM_RECLAIM          = 1 << 3, /* may be used for memory reclaim */
@@ -426,6 +427,7 @@ extern struct workqueue_struct *system_highpri_wq;
 extern struct workqueue_struct *system_long_wq;
 extern struct workqueue_struct *system_unbound_wq;
 extern struct workqueue_struct *system_freezable_wq;
+extern struct workqueue_struct *system_irq_wq;
 extern struct workqueue_struct *system_power_efficient_wq;
 extern struct workqueue_struct *system_freezable_power_efficient_wq;

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 76e60faed892..64db4fc0d0c7 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -436,6 +436,8 @@ struct workqueue_struct *system_unbound_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_unbound_wq);
 struct workqueue_struct *system_freezable_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_freezable_wq);
+struct workqueue_struct *system_irq_wq __ro_after_init;
+EXPORT_SYMBOL_GPL(irq_bh_wq);
 struct workqueue_struct *system_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_power_efficient_wq);
 struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
@@ -6689,13 +6691,15 @@ void __init workqueue_init_early(void)
                                            WQ_MAX_ACTIVE);
        system_freezable_wq = alloc_workqueue("events_freezable",
                                              WQ_FREEZABLE, 0);
+       system_irq_wq = alloc_workqueue("events_irq",
+                                       WQ_IRQ, 0);
        system_power_efficient_wq = alloc_workqueue("events_power_efficient",
                                              WQ_POWER_EFFICIENT, 0);
        system_freezable_power_efficient_wq =
alloc_workqueue("events_freezable_power_efficient",
                                              WQ_FREEZABLE | WQ_POWER_EFFICIENT,
                                              0);
        BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
-              !system_unbound_wq || !system_freezable_wq ||
+              !system_unbound_wq || !system_freezable_wq || !system_irq_wq ||
               !system_power_efficient_wq ||
               !system_freezable_power_efficient_wq);


 This patchset makes a lot more sense to use as a base to list of
patches that I have that move away
from tasklets. I will rebase my work on top of your branch and test it.

Thanks.

> Thanks.
>
> From 8224d2602ef454ca164f4added765dc4dddd5e16 Mon Sep 17 00:00:00 2001
> From: Tejun Heo <tj@xxxxxxxxxx>
> Date: Fri, 26 Jan 2024 13:21:42 -1000
> Subject: [PATCH] workqueue: DRAFT: Implement atomic workqueue and convert
>  dmcrypt to use it
>
> ---
>  drivers/md/dm-crypt.c       |  36 +-----
>  include/linux/workqueue.h   |   6 +
>  kernel/workqueue.c          | 234 +++++++++++++++++++++++++++---------
>  kernel/workqueue_internal.h |   3 +
>  4 files changed, 186 insertions(+), 93 deletions(-)
>
> diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c
> index 855b482cbff1..d375285db202 100644
> --- a/drivers/md/dm-crypt.c
> +++ b/drivers/md/dm-crypt.c
> @@ -73,11 +73,8 @@ struct dm_crypt_io {
>         struct bio *base_bio;
>         u8 *integrity_metadata;
>         bool integrity_metadata_from_pool:1;
> -       bool in_tasklet:1;
>
>         struct work_struct work;
> -       struct tasklet_struct tasklet;
> -
>         struct convert_context ctx;
>
>         atomic_t io_pending;
> @@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct crypt_config *cc,
>         io->ctx.r.req = NULL;
>         io->integrity_metadata = NULL;
>         io->integrity_metadata_from_pool = false;
> -       io->in_tasklet = false;
>         atomic_set(&io->io_pending, 0);
>  }
>
> @@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io)
>         atomic_inc(&io->io_pending);
>  }
>
> -static void kcryptd_io_bio_endio(struct work_struct *work)
> -{
> -       struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work);
> -
> -       bio_endio(io->base_bio);
> -}
> -
>  /*
>   * One of the bios was finished. Check for completion of
>   * the whole request and correctly clean up the buffer.
> @@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io)
>                 kfree(io->integrity_metadata);
>
>         base_bio->bi_status = error;
> -
> -       /*
> -        * If we are running this function from our tasklet,
> -        * we can't call bio_endio() here, because it will call
> -        * clone_endio() from dm.c, which in turn will
> -        * free the current struct dm_crypt_io structure with
> -        * our tasklet. In this case we need to delay bio_endio()
> -        * execution to after the tasklet is done and dequeued.
> -        */
> -       if (io->in_tasklet) {
> -               INIT_WORK(&io->work, kcryptd_io_bio_endio);
> -               queue_work(cc->io_queue, &io->work);
> -               return;
> -       }
> -
>         bio_endio(base_bio);
>  }
>
> @@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work)
>                 kcryptd_crypt_write_convert(io);
>  }
>
> -static void kcryptd_crypt_tasklet(unsigned long work)
> -{
> -       kcryptd_crypt((struct work_struct *)work);
> -}
> -
>  static void kcryptd_queue_crypt(struct dm_crypt_io *io)
>  {
>         struct crypt_config *cc = io->cc;
> @@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io)
>                  * it is being executed with irqs disabled.
>                  */
>                 if (in_hardirq() || irqs_disabled()) {
> -                       io->in_tasklet = true;
> -                       tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
> -                       tasklet_schedule(&io->tasklet);
> +                       INIT_WORK(&io->work, kcryptd_crypt);
> +                       queue_work(system_atomic_wq, &io->work);
>                         return;
>                 }
>
> diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
> index 232baea90a1d..1e4938b5b176 100644
> --- a/include/linux/workqueue.h
> +++ b/include/linux/workqueue.h
> @@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
>   * Documentation/core-api/workqueue.rst.
>   */
>  enum wq_flags {
> +       WQ_ATOMIC               = 1 << 0, /* execute in softirq context */
>         WQ_UNBOUND              = 1 << 1, /* not bound to any cpu */
>         WQ_FREEZABLE            = 1 << 2, /* freeze during suspend */
>         WQ_MEM_RECLAIM          = 1 << 3, /* may be used for memory reclaim */
> @@ -392,6 +393,9 @@ enum wq_flags {
>         __WQ_ORDERED            = 1 << 17, /* internal: workqueue is ordered */
>         __WQ_LEGACY             = 1 << 18, /* internal: create*_workqueue() */
>         __WQ_ORDERED_EXPLICIT   = 1 << 19, /* internal: alloc_ordered_workqueue() */
> +
> +       /* atomic wq only allows the following flags */
> +       __WQ_ATOMIC_ALLOWS      = WQ_ATOMIC | WQ_HIGHPRI,
>  };
>
>  enum wq_consts {
> @@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq;
>  extern struct workqueue_struct *system_freezable_wq;
>  extern struct workqueue_struct *system_power_efficient_wq;
>  extern struct workqueue_struct *system_freezable_power_efficient_wq;
> +extern struct workqueue_struct *system_atomic_wq;
> +extern struct workqueue_struct *system_atomic_highpri_wq;
>
>  /**
>   * alloc_workqueue - allocate a workqueue
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index 23740c9ed57a..2a8f21494676 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c
> @@ -73,7 +73,8 @@ enum worker_pool_flags {
>          * wq_pool_attach_mutex to avoid changing binding state while
>          * worker_attach_to_pool() is in progress.
>          */
> -       POOL_MANAGER_ACTIVE     = 1 << 0,       /* being managed */
> +       POOL_ATOMIC             = 1 << 0,       /* is an atomic pool */
> +       POOL_MANAGER_ACTIVE     = 1 << 1,       /* being managed */
>         POOL_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
>  };
>
> @@ -115,6 +116,14 @@ enum wq_internal_consts {
>         WQ_NAME_LEN             = 32,
>  };
>
> +/*
> + * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
> + * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
> + * msecs_to_jiffies() can't be an initializer.
> + */
> +#define ATOMIC_WORKER_JIFFIES  msecs_to_jiffies(2)
> +#define ATOMIC_WORKER_RESTARTS 10
> +
>  /*
>   * Structure fields follow one of the following exclusion rules.
>   *
> @@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false;
>  #endif
>  module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
>
> +/* the atomic worker pools */
> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
> +                                    atomic_worker_pools);
> +
>  /* the per-cpu worker pools */
> -static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
> +                                    cpu_worker_pools);
>
>  static DEFINE_IDR(worker_pool_idr);    /* PR: idr of all pools */
>
> @@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
>  EXPORT_SYMBOL_GPL(system_power_efficient_wq);
>  struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
>  EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
> +struct workqueue_struct *system_atomic_wq;
> +EXPORT_SYMBOL_GPL(system_atomic_wq);
> +struct workqueue_struct *system_atomic_highpri_wq;
> +EXPORT_SYMBOL_GPL(system_atomic_highpri_wq);
>
>  static int worker_thread(void *__worker);
> +static void atomic_worker_taskletfn(struct tasklet_struct *tasklet);
>  static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
>  static void show_pwq(struct pool_workqueue *pwq);
>  static void show_one_worker_pool(struct worker_pool *pool);
> @@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
>                          !lockdep_is_held(&wq_pool_mutex),              \
>                          "RCU, wq->mutex or wq_pool_mutex should be held")
>
> +#define for_each_atomic_worker_pool(pool, cpu)                         \
> +       for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0];            \
> +            (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
> +            (pool)++)
> +
>  #define for_each_cpu_worker_pool(pool, cpu)                            \
>         for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];               \
>              (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
> @@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool)
>         if (!need_more_worker(pool) || !worker)
>                 return false;
>
> +       if (pool->flags & POOL_ATOMIC) {
> +               if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
> +                       tasklet_hi_schedule(&worker->atomic_tasklet);
> +               else
> +                       tasklet_schedule(&worker->atomic_tasklet);
> +               return true;
> +       }
> +
>         p = worker->task;
>
>  #ifdef CONFIG_SMP
> @@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
>         lockdep_assert_held(&pool->lock);
>
>         if (!nna) {
> -               /* per-cpu workqueue, pwq->nr_active is sufficient */
> -               obtained = pwq->nr_active < READ_ONCE(wq->max_active);
> +               /*
> +                * An atomic workqueue always have a single worker per-cpu and
> +                * doesn't impose additional max_active limit. For a per-cpu
> +                * workqueue, checking pwq->nr_active is sufficient.
> +                */
> +               if (wq->flags & WQ_ATOMIC)
> +                       obtained = true;
> +               else
> +                       obtained = pwq->nr_active < READ_ONCE(wq->max_active);
>                 goto out;
>         }
>
> @@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool *pool)
>
>         worker->id = id;
>
> -       if (pool->cpu >= 0)
> -               snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
> -                        pool->attrs->nice < 0  ? "H" : "");
> -       else
> -               snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
> -
> -       worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
> -                                             "kworker/%s", id_buf);
> -       if (IS_ERR(worker->task)) {
> -               if (PTR_ERR(worker->task) == -EINTR) {
> -                       pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
> -                              id_buf);
> -               } else {
> -                       pr_err_once("workqueue: Failed to create a worker thread: %pe",
> -                                   worker->task);
> +       if (pool->flags & POOL_ATOMIC) {
> +               tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn);
> +       } else {
> +               if (pool->cpu >= 0)
> +                       snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
> +                                pool->attrs->nice < 0  ? "H" : "");
> +               else
> +                       snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
> +
> +               worker->task = kthread_create_on_node(worker_thread, worker,
> +                                       pool->node, "kworker/%s", id_buf);
> +               if (IS_ERR(worker->task)) {
> +                       if (PTR_ERR(worker->task) == -EINTR) {
> +                               pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
> +                                      id_buf);
> +                       } else {
> +                               pr_err_once("workqueue: Failed to create a worker thread: %pe",
> +                                           worker->task);
> +                       }
> +                       goto fail;
>                 }
> -               goto fail;
> -       }
>
> -       set_user_nice(worker->task, pool->attrs->nice);
> -       kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
> +               set_user_nice(worker->task, pool->attrs->nice);
> +               kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
> +       }
>
>         /* successful, attach the worker to the pool */
>         worker_attach_to_pool(worker, pool);
> @@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool *pool)
>          * check if not woken up soon. As kick_pool() is noop if @pool is empty,
>          * wake it up explicitly.
>          */
> -       wake_up_process(worker->task);
> +       if (worker->task)
> +               wake_up_process(worker->task);
>
>         raw_spin_unlock_irq(&pool->lock);
>
> @@ -3043,25 +3087,35 @@ __acquires(&pool->lock)
>         lock_map_release(&lockdep_map);
>         lock_map_release(&pwq->wq->lockdep_map);
>
> -       if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
> -                    rcu_preempt_depth() > 0)) {
> -               pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
> -                      "     last function: %ps\n",
> -                      current->comm, preempt_count(), rcu_preempt_depth(),
> -                      task_pid_nr(current), worker->current_func);
> -               debug_show_held_locks(current);
> -               dump_stack();
> -       }
> +       if (worker->task) {
> +               if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
> +                            rcu_preempt_depth() > 0)) {
> +                       pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
> +                              "     last function: %ps\n",
> +                              current->comm, preempt_count(),
> +                              rcu_preempt_depth(), task_pid_nr(current),
> +                              worker->current_func);
> +                       debug_show_held_locks(current);
> +                       dump_stack();
> +               }
>
> -       /*
> -        * The following prevents a kworker from hogging CPU on !PREEMPTION
> -        * kernels, where a requeueing work item waiting for something to
> -        * happen could deadlock with stop_machine as such work item could
> -        * indefinitely requeue itself while all other CPUs are trapped in
> -        * stop_machine. At the same time, report a quiescent RCU state so
> -        * the same condition doesn't freeze RCU.
> -        */
> -       cond_resched();
> +               /*
> +                * The following prevents a kworker from hogging CPU on
> +                * !PREEMPTION kernels, where a requeueing work item waiting for
> +                * something to happen could deadlock with stop_machine as such
> +                * work item could indefinitely requeue itself while all other
> +                * CPUs are trapped in stop_machine. At the same time, report a
> +                * quiescent RCU state so the same condition doesn't freeze RCU.
> +                */
> +               if (worker->task)
> +                       cond_resched();
> +       } else {
> +               if (unlikely(lockdep_depth(current) > 0)) {
> +                       pr_err("BUG: atomic workqueue leaked lock: last function: %ps\n",
> +                              worker->current_func);
> +                       debug_show_held_locks(current);
> +               }
> +       }
>
>         raw_spin_lock_irq(&pool->lock);
>
> @@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer)
>         goto repeat;
>  }
>
> +void atomic_worker_taskletfn(struct tasklet_struct *tasklet)
> +{
> +       struct worker *worker =
> +               container_of(tasklet, struct worker, atomic_tasklet);
> +       struct worker_pool *pool = worker->pool;
> +       int nr_restarts = ATOMIC_WORKER_RESTARTS;
> +       unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES;
> +
> +       raw_spin_lock_irq(&pool->lock);
> +       worker_leave_idle(worker);
> +
> +       /*
> +        * This function follows the structure of worker_thread(). See there for
> +        * explanations on each step.
> +        */
> +       if (need_more_worker(pool))
> +               goto done;
> +
> +       WARN_ON_ONCE(!list_empty(&worker->scheduled));
> +       worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
> +
> +       do {
> +               struct work_struct *work =
> +                       list_first_entry(&pool->worklist,
> +                                        struct work_struct, entry);
> +
> +               if (assign_work(work, worker, NULL))
> +                       process_scheduled_works(worker);
> +       } while (--nr_restarts && time_before(jiffies, end) &&
> +                keep_working(pool));
> +
> +       worker_set_flags(worker, WORKER_PREP);
> +done:
> +       worker_enter_idle(worker);
> +       kick_pool(pool);
> +       raw_spin_unlock_irq(&pool->lock);
> +}
> +
>  /**
>   * check_flush_dependency - check for flush dependency sanity
>   * @target_wq: workqueue being flushed
> @@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
>         size_t wq_size;
>         int name_len;
>
> +       if (flags & WQ_ATOMIC) {
> +               if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS))
> +                       return NULL;
> +               if (WARN_ON_ONCE(max_active))
> +                       return NULL;
> +       }
> +
>         /*
>          * Unbound && max_active == 1 used to imply ordered, which is no longer
>          * the case on many machines due to per-pod pools. While
> @@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
>         cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
>  }
>
> +static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
> +{
> +       BUG_ON(init_worker_pool(pool));
> +       pool->cpu = cpu;
> +       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
> +       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
> +       pool->attrs->nice = nice;
> +       pool->attrs->affn_strict = true;
> +       pool->node = cpu_to_node(cpu);
> +
> +       /* alloc pool ID */
> +       mutex_lock(&wq_pool_mutex);
> +       BUG_ON(worker_pool_assign_id(pool));
> +       mutex_unlock(&wq_pool_mutex);
> +}
> +
>  /**
>   * workqueue_init_early - early init for workqueue subsystem
>   *
> @@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void)
>         pt->pod_node[0] = NUMA_NO_NODE;
>         pt->cpu_pod[0] = 0;
>
> -       /* initialize CPU pools */
> +       /* initialize atomic and CPU pools */
>         for_each_possible_cpu(cpu) {
>                 struct worker_pool *pool;
>
>                 i = 0;
> -               for_each_cpu_worker_pool(pool, cpu) {
> -                       BUG_ON(init_worker_pool(pool));
> -                       pool->cpu = cpu;
> -                       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
> -                       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
> -                       pool->attrs->nice = std_nice[i++];
> -                       pool->attrs->affn_strict = true;
> -                       pool->node = cpu_to_node(cpu);
> -
> -                       /* alloc pool ID */
> -                       mutex_lock(&wq_pool_mutex);
> -                       BUG_ON(worker_pool_assign_id(pool));
> -                       mutex_unlock(&wq_pool_mutex);
> +               for_each_atomic_worker_pool(pool, cpu) {
> +                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
> +                       pool->flags |= POOL_ATOMIC;
>                 }
> +
> +               i = 0;
> +               for_each_cpu_worker_pool(pool, cpu)
> +                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
>         }
>
>         /* create default unbound and ordered wq attrs */
> @@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void)
>         system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
>                                               WQ_FREEZABLE | WQ_POWER_EFFICIENT,
>                                               0);
> +       system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0);
> +       system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq",
> +                                                  WQ_ATOMIC | WQ_HIGHPRI, 0);
>         BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
>                !system_unbound_wq || !system_freezable_wq ||
>                !system_power_efficient_wq ||
> -              !system_freezable_power_efficient_wq);
> +              !system_freezable_power_efficient_wq ||
> +              !system_atomic_wq || !system_atomic_highpri_wq);
>  }
>
>  static void __init wq_cpu_intensive_thresh_init(void)
> @@ -7269,9 +7382,10 @@ void __init workqueue_init(void)
>          * up. Also, create a rescuer for workqueues that requested it.
>          */
>         for_each_possible_cpu(cpu) {
> -               for_each_cpu_worker_pool(pool, cpu) {
> +               for_each_atomic_worker_pool(pool, cpu)
> +                       pool->node = cpu_to_node(cpu);
> +               for_each_cpu_worker_pool(pool, cpu)
>                         pool->node = cpu_to_node(cpu);
> -               }
>         }
>
>         list_for_each_entry(wq, &workqueues, list) {
> @@ -7284,6 +7398,8 @@ void __init workqueue_init(void)
>
>         /* create the initial workers */
>         for_each_online_cpu(cpu) {
> +               for_each_atomic_worker_pool(pool, cpu)
> +                       BUG_ON(!create_worker(pool));
>                 for_each_cpu_worker_pool(pool, cpu) {
>                         pool->flags &= ~POOL_DISASSOCIATED;
>                         BUG_ON(!create_worker(pool));
> diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
> index f6275944ada7..f65f204f38ea 100644
> --- a/kernel/workqueue_internal.h
> +++ b/kernel/workqueue_internal.h
> @@ -10,6 +10,7 @@
>
>  #include <linux/workqueue.h>
>  #include <linux/kthread.h>
> +#include <linux/interrupt.h>
>  #include <linux/preempt.h>
>
>  struct worker_pool;
> @@ -42,6 +43,8 @@ struct worker {
>         struct list_head        scheduled;      /* L: scheduled works */
>
>         struct task_struct      *task;          /* I: worker task */
> +       struct tasklet_struct   atomic_tasklet; /* I: tasklet for atomic pool */
> +
>         struct worker_pool      *pool;          /* A: the associated pool */
>                                                 /* L: for rescuers */
>         struct list_head        node;           /* A: anchored at pool->workers */
> --
> 2.43.0
>
>


-- 
       - Allen




[Index of Archives]     [DM Crypt]     [Fedora Desktop]     [ATA RAID]     [Fedora Marketing]     [Fedora Packaging]     [Fedora SELinux]     [Yosemite Discussion]     [KDE Users]     [Fedora Docs]

  Powered by Linux