On Wed, 2024-02-28 at 19:05 +0800, Yunjian Wang wrote: > @@ -2661,6 +2776,54 @@ static int tun_ptr_peek_len(void *ptr) > } > } > > +static void tun_peek_xsk(struct tun_file *tfile) > +{ > + struct xsk_buff_pool *pool; > + u32 i, batch, budget; > + void *frame; > + > + if (!ptr_ring_empty(&tfile->tx_ring)) > + return; > + > + spin_lock(&tfile->pool_lock); > + pool = tfile->xsk_pool; > + if (!pool) { > + spin_unlock(&tfile->pool_lock); > + return; > + } > + > + if (tfile->nb_descs) { > + xsk_tx_completed(pool, tfile->nb_descs); > + if (xsk_uses_need_wakeup(pool)) > + xsk_set_tx_need_wakeup(pool); > + } > + > + spin_lock(&tfile->tx_ring.producer_lock); > + budget = min_t(u32, tfile->tx_ring.size, TUN_XDP_BATCH); > + > + batch = xsk_tx_peek_release_desc_batch(pool, budget); > + if (!batch) { This branch looks like an unneeded "optimization". The generic loop below should have the same effect with no measurable perf delta - and smaller code. Just remove this. > + tfile->nb_descs = 0; > + spin_unlock(&tfile->tx_ring.producer_lock); > + spin_unlock(&tfile->pool_lock); > + return; > + } > + > + tfile->nb_descs = batch; > + for (i = 0; i < batch; i++) { > + /* Encode the XDP DESC flag into lowest bit for consumer to differ > + * XDP desc from XDP buffer and sk_buff. > + */ > + frame = tun_xdp_desc_to_ptr(&pool->tx_descs[i]); > + /* The budget must be less than or equal to tx_ring.size, > + * so enqueuing will not fail. > + */ > + __ptr_ring_produce(&tfile->tx_ring, frame); > + } > + spin_unlock(&tfile->tx_ring.producer_lock); > + spin_unlock(&tfile->pool_lock); More related to the general design: it looks wrong. What if get_rx_bufs() will fail (ENOBUF) after successful peeking? With no more incoming packets, later peek will return 0 and it looks like that the half-processed packets will stay in the ring forever??? I think the 'ring produce' part should be moved into tun_do_read(). Cheers, Paolo