Re: [PATCH net-next v2 5/7] virtio-net, xsk: realize the function of xsk packet sending

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




On 2021/1/18 下午8:03, Xuan Zhuo wrote:
On Mon, 18 Jan 2021 17:10:24 +0800, Jason Wang <jasowang@xxxxxxxxxx> wrote:
On 2021/1/16 上午10:59, Xuan Zhuo wrote:
virtnet_xsk_run will be called in the tx interrupt handling function
virtnet_poll_tx.

The sending process gets desc from the xsk tx queue, and assembles it to
send the data.

Compared with other drivers, a special place is that the page of the
data in xsk is used here instead of the dma address. Because the virtio
interface does not use the dma address.

Signed-off-by: Xuan Zhuo <xuanzhuo@xxxxxxxxxxxxxxxxx>
---
   drivers/net/virtio_net.c | 200 ++++++++++++++++++++++++++++++++++++++++++++++-
   1 file changed, 197 insertions(+), 3 deletions(-)

diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
index a62d456..42aa9ad 100644
--- a/drivers/net/virtio_net.c
+++ b/drivers/net/virtio_net.c
@@ -119,6 +119,8 @@ struct virtnet_xsk_hdr {
   	u32 len;
   };

+#define VIRTNET_STATE_XSK_WAKEUP 1
+
   #define VIRTNET_SQ_STAT(m)	offsetof(struct virtnet_sq_stats, m)
   #define VIRTNET_RQ_STAT(m)	offsetof(struct virtnet_rq_stats, m)

@@ -163,9 +165,12 @@ struct send_queue {
   		struct xsk_buff_pool   __rcu *pool;
   		struct virtnet_xsk_hdr __rcu *hdr;

+		unsigned long          state;
   		u64                    hdr_con;
   		u64                    hdr_pro;
   		u64                    hdr_n;
+		struct xdp_desc        last_desc;
+		bool                   wait_slot;
   	} xsk;
   };

@@ -284,6 +289,8 @@ static void __free_old_xmit_ptr(struct send_queue *sq, bool in_napi,
   				bool xsk_wakeup,
   				unsigned int *_packets, unsigned int *_bytes);
   static void free_old_xmit_skbs(struct send_queue *sq, bool in_napi);
+static int virtnet_xsk_run(struct send_queue *sq,
+			   struct xsk_buff_pool *pool, int budget);

   static bool is_xdp_frame(void *ptr)
   {
@@ -1590,6 +1597,8 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget)
   	struct virtnet_info *vi = sq->vq->vdev->priv;
   	unsigned int index = vq2txq(sq->vq);
   	struct netdev_queue *txq;
+	struct xsk_buff_pool *pool;
+	int work = 0;

   	if (unlikely(is_xdp_raw_buffer_queue(vi, index))) {
   		/* We don't need to enable cb for XDP */
@@ -1599,15 +1608,26 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget)

   	txq = netdev_get_tx_queue(vi->dev, index);
   	__netif_tx_lock(txq, raw_smp_processor_id());
-	free_old_xmit_skbs(sq, true);
+
+	rcu_read_lock();
+	pool = rcu_dereference(sq->xsk.pool);
+	if (pool) {
+		work = virtnet_xsk_run(sq, pool, budget);
+		rcu_read_unlock();
+	} else {
+		rcu_read_unlock();
+		free_old_xmit_skbs(sq, true);
+	}
+
   	__netif_tx_unlock(txq);

-	virtqueue_napi_complete(napi, sq->vq, 0);
+	if (work < budget)
+		virtqueue_napi_complete(napi, sq->vq, 0);

   	if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
   		netif_tx_wake_queue(txq);

-	return 0;
+	return work;
   }

   static int xmit_skb(struct send_queue *sq, struct sk_buff *skb)
@@ -2647,6 +2667,180 @@ static int virtnet_xdp(struct net_device *dev, struct netdev_bpf *xdp)
   	}
   }

+static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
+			    struct xdp_desc *desc)
+{
+	struct virtnet_info *vi = sq->vq->vdev->priv;
+	void *data, *ptr;
+	struct page *page;
+	struct virtnet_xsk_hdr *xskhdr;
+	u32 idx, offset, n, i, copy, copied;
+	u64 addr;
+	int err, m;
+
+	addr = desc->addr;
+
+	data = xsk_buff_raw_get_data(pool, addr);
+	offset = offset_in_page(data);
+
+	/* one for hdr, one for the first page */
+	n = 2;
+	m = desc->len - (PAGE_SIZE - offset);
+	if (m > 0) {
+		n += m >> PAGE_SHIFT;
+		if (m & PAGE_MASK)
+			++n;
+
+		n = min_t(u32, n, ARRAY_SIZE(sq->sg));
+	}
+
+	idx = sq->xsk.hdr_con % sq->xsk.hdr_n;

I don't understand the reason of the hdr array. It looks to me all of
them are zero and read only from device.

Any reason for not reusing a single hdr for all xdp descriptors? Or
maybe it's time to introduce VIRTIO_NET_F_NO_HDR.
Yes, You are right.
Before supporting functions like csum, here it is indeed possible to achieve it
with only one hdr.


So let's drop the array logic for now since it will give unnecessary stress on the cache.



+	xskhdr = &sq->xsk.hdr[idx];
+
+	/* xskhdr->hdr has been memset to zero, so not need to clear again */
+
+	sg_init_table(sq->sg, n);
+	sg_set_buf(sq->sg, &xskhdr->hdr, vi->hdr_len);
+
+	copied = 0;
+	for (i = 1; i < n; ++i) {
+		copy = min_t(int, desc->len - copied, PAGE_SIZE - offset);
+
+		page = xsk_buff_raw_get_page(pool, addr + copied);
+
+		sg_set_page(sq->sg + i, page, copy, offset);
+		copied += copy;
+		if (offset)
+			offset = 0;
+	}

It looks to me we need to terminate the sg:

**
   * virtqueue_add_outbuf - expose output buffers to other end
   * @vq: the struct virtqueue we're talking about.
   * @sg: scatterlist (must be well-formed and terminated!)

sg_init_table will call sg_init_table -> sg_init_table to do that.


Oh right, I miss the sg_init_table().



+
+	xskhdr->len = desc->len;
+	ptr = xdp_to_ptr(&xskhdr->type);
+
+	err = virtqueue_add_outbuf(sq->vq, sq->sg, n, ptr, GFP_ATOMIC);
+	if (unlikely(err))
+		sq->xsk.last_desc = *desc;
+	else
+		sq->xsk.hdr_con++;
+
+	return err;
+}
+
+static bool virtnet_xsk_dev_is_full(struct send_queue *sq)
+{
+	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
+		return true;
+
+	if (sq->xsk.hdr_con == sq->xsk.hdr_pro)
+		return true;

Can we really reach here?
If another program is sending a large number of packages, then num_free may be
<= 0, and xsk has no chance to send packages.


I meant if num_free <= 2 + MAX_SKB_FRAGS we've already return ture in the first check. If num_free > MAX_SKB_FRAGS, hdr array should be avaialbe slots for the second check is unnecessary.

Btw, how num_free can be less than zero? The virtio_ring.c will decrease num_free only if we had sufficient descriptors:

    if (vq->vq.num_free < descs_used) {
        pr_debug("Can't add buf len %i - avail = %i\n",



+
+	return false;
+}
+
+static int virtnet_xsk_xmit_zc(struct send_queue *sq,
+			       struct xsk_buff_pool *pool, unsigned int budget)
+{
+	struct xdp_desc desc;
+	int err, packet = 0;
+	int ret = -EAGAIN;
+
+	if (sq->xsk.last_desc.addr) {
+		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
+		if (unlikely(err))
+			return -EBUSY;
+
+		++packet;
+		sq->xsk.last_desc.addr = 0;
+	}
+
+	while (budget-- > 0) {
+		if (virtnet_xsk_dev_is_full(sq)) {
+			ret = -EBUSY;
+			break;
+		}

It looks to me we will always hit this if userspace is fast. E.g we
don't kick until the virtqueue is full ...

Yes, if the user is extremely fast. A kick will be called after the budget here
is completed. I think this should be enough.


I think we need do some benchmark:

1) always kick (the kick might not happen actually since the device can disable notification)
2) kick for every several packets
3) kick for the last

I suspect 3 is not the best solution.

Thanks



+
+		if (!xsk_tx_peek_desc(pool, &desc)) {
+			/* done */
+			ret = 0;
+			break;
+		}
+
+		err = virtnet_xsk_xmit(sq, pool, &desc);
+		if (unlikely(err)) {
+			ret = -EBUSY;
+			break;
+		}
+
+		++packet;
+	}
+
+	if (packet) {
+		xsk_tx_release(pool);
+
+		if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq)) {
+			u64_stats_update_begin(&sq->stats.syncp);
+			sq->stats.kicks++;
+			u64_stats_update_end(&sq->stats.syncp);
+		}
+	}
+
+	return ret;
+}
+
+static int virtnet_xsk_run(struct send_queue *sq,
+			   struct xsk_buff_pool *pool, int budget)
+{
+	int err, ret = 0;
+	unsigned int _packets = 0;
+	unsigned int _bytes = 0;
+
+	sq->xsk.wait_slot = false;
+
+	__free_old_xmit_ptr(sq, true, false, &_packets, &_bytes);
+
+	err = virtnet_xsk_xmit_zc(sq, pool, xsk_budget);
+	if (!err) {
+		struct xdp_desc desc;
+
+		clear_bit(VIRTNET_STATE_XSK_WAKEUP, &sq->xsk.state);
+		xsk_set_tx_need_wakeup(pool);
+
+		/* Race breaker. If new is coming after last xmit
+		 * but before flag change
+		 */
+
+		if (!xsk_tx_peek_desc(pool, &desc))
+			goto end;
+
+		set_bit(VIRTNET_STATE_XSK_WAKEUP, &sq->xsk.state);
+		xsk_clear_tx_need_wakeup(pool);

How memory ordering is going to work here? Or we don't need to care
about that?

It's my fault.

+
+		sq->xsk.last_desc = desc;
+		ret = budget;
+		goto end;
+	}
+
+	xsk_clear_tx_need_wakeup(pool);
+
+	if (err == -EAGAIN) {
+		ret = budget;
+		goto end;
+	}
+
+	__free_old_xmit_ptr(sq, true, false, &_packets, &_bytes);
+
+	if (!virtnet_xsk_dev_is_full(sq)) {
+		ret = budget;
+		goto end;
+	}
+
+	sq->xsk.wait_slot = true;
+
+	virtnet_sq_stop_check(sq, true);
+end:
+	return ret;
+}
+
   static int virtnet_get_phys_port_name(struct net_device *dev, char *buf,
   				      size_t len)
   {




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux