Disabling interrupts and in the RPS case locking input_pkt_queue is split into local_irq_disable() and optional spin_lock(). This breaks on PREEMPT_RT because the spinlock_t typed lock can not be acquired with disabled interrupts. The sections in which the lock is acquired is usually short in a sense that it is not causing long und unbounded latiencies. One exception is the skb_flow_limit() invocation which may invoke a BPF program (and may require sleeping locks). By moving local_irq_disable() + spin_lock() into rps_lock(), we can keep interrupts disabled on !PREEMPT_RT and enabled on PREEMPT_RT kernels. Without RPS on a PREEMPT_RT kernel, the needed synchronisation happens as part of local_bh_disable() on the local CPU. Since interrupts remain enabled, enqueue_to_backlog() needs to disable interrupts for ____napi_schedule(). Signed-off-by: Sebastian Andrzej Siewior <bigeasy@xxxxxxxxxxxxx> --- net/core/dev.c | 72 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/net/core/dev.c b/net/core/dev.c index f43d0580fa11d..e9ea56daee2f0 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -216,18 +216,38 @@ static inline struct hlist_head *dev_index_hash(struct net *net, int ifindex) return &net->dev_index_head[ifindex & (NETDEV_HASHENTRIES - 1)]; } -static inline void rps_lock(struct softnet_data *sd) +static inline void rps_lock_irqsave(struct softnet_data *sd, + unsigned long *flags) { -#ifdef CONFIG_RPS - spin_lock(&sd->input_pkt_queue.lock); -#endif + if (IS_ENABLED(CONFIG_RPS)) + spin_lock_irqsave(&sd->input_pkt_queue.lock, *flags); + else if (!IS_ENABLED(CONFIG_PREEMPT_RT)) + local_irq_save(*flags); } -static inline void rps_unlock(struct softnet_data *sd) +static inline void rps_lock_irq_disable(struct softnet_data *sd) { -#ifdef CONFIG_RPS - spin_unlock(&sd->input_pkt_queue.lock); -#endif + if (IS_ENABLED(CONFIG_RPS)) + spin_lock_irq(&sd->input_pkt_queue.lock); + else if (!IS_ENABLED(CONFIG_PREEMPT_RT)) + local_irq_disable(); +} + +static inline void rps_unlock_irq_restore(struct softnet_data *sd, + unsigned long *flags) +{ + if (IS_ENABLED(CONFIG_RPS)) + spin_unlock_irqrestore(&sd->input_pkt_queue.lock, *flags); + else if (!IS_ENABLED(CONFIG_PREEMPT_RT)) + local_irq_restore(*flags); +} + +static inline void rps_unlock_irq_enable(struct softnet_data *sd) +{ + if (IS_ENABLED(CONFIG_RPS)) + spin_unlock_irq(&sd->input_pkt_queue.lock); + else if (!IS_ENABLED(CONFIG_PREEMPT_RT)) + local_irq_enable(); } static struct netdev_name_node *netdev_name_node_alloc(struct net_device *dev, @@ -4525,9 +4545,7 @@ static int enqueue_to_backlog(struct sk_buff *skb, int cpu, sd = &per_cpu(softnet_data, cpu); - local_irq_save(flags); - - rps_lock(sd); + rps_lock_irqsave(sd, &flags); if (!netif_running(skb->dev)) goto drop; qlen = skb_queue_len(&sd->input_pkt_queue); @@ -4536,26 +4554,30 @@ static int enqueue_to_backlog(struct sk_buff *skb, int cpu, enqueue: __skb_queue_tail(&sd->input_pkt_queue, skb); input_queue_tail_incr_save(sd, qtail); - rps_unlock(sd); - local_irq_restore(flags); + rps_unlock_irq_restore(sd, &flags); return NET_RX_SUCCESS; } /* Schedule NAPI for backlog device * We can use non atomic operation since we own the queue lock + * PREEMPT_RT needs to disable interrupts here for + * synchronisation needed in napi_schedule. */ + if (IS_ENABLED(CONFIG_PREEMPT_RT)) + local_irq_disable(); + if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) { if (!rps_ipi_queued(sd)) ____napi_schedule(sd, &sd->backlog); } + if (IS_ENABLED(CONFIG_PREEMPT_RT)) + local_irq_enable(); goto enqueue; } drop: sd->dropped++; - rps_unlock(sd); - - local_irq_restore(flags); + rps_unlock_irq_restore(sd, &flags); atomic_long_inc(&skb->dev->rx_dropped); kfree_skb(skb); @@ -5617,8 +5639,7 @@ static void flush_backlog(struct work_struct *work) local_bh_disable(); sd = this_cpu_ptr(&softnet_data); - local_irq_disable(); - rps_lock(sd); + rps_lock_irq_disable(sd); skb_queue_walk_safe(&sd->input_pkt_queue, skb, tmp) { if (skb->dev->reg_state == NETREG_UNREGISTERING) { __skb_unlink(skb, &sd->input_pkt_queue); @@ -5626,8 +5647,7 @@ static void flush_backlog(struct work_struct *work) input_queue_head_incr(sd); } } - rps_unlock(sd); - local_irq_enable(); + rps_unlock_irq_enable(sd); skb_queue_walk_safe(&sd->process_queue, skb, tmp) { if (skb->dev->reg_state == NETREG_UNREGISTERING) { @@ -5645,16 +5665,14 @@ static bool flush_required(int cpu) struct softnet_data *sd = &per_cpu(softnet_data, cpu); bool do_flush; - local_irq_disable(); - rps_lock(sd); + rps_lock_irq_disable(sd); /* as insertion into process_queue happens with the rps lock held, * process_queue access may race only with dequeue */ do_flush = !skb_queue_empty(&sd->input_pkt_queue) || !skb_queue_empty_lockless(&sd->process_queue); - rps_unlock(sd); - local_irq_enable(); + rps_unlock_irq_enable(sd); return do_flush; #endif @@ -5769,8 +5787,7 @@ static int process_backlog(struct napi_struct *napi, int quota) } - local_irq_disable(); - rps_lock(sd); + rps_lock_irq_disable(sd); if (skb_queue_empty(&sd->input_pkt_queue)) { /* * Inline a custom version of __napi_complete(). @@ -5786,8 +5803,7 @@ static int process_backlog(struct napi_struct *napi, int quota) skb_queue_splice_tail_init(&sd->input_pkt_queue, &sd->process_queue); } - rps_unlock(sd); - local_irq_enable(); + rps_unlock_irq_enable(sd); } return work; -- 2.34.1