[RFC PATCH bpf-next 7/8] i40e: introduce batched XDP rx descriptor processing

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Introduce batched processing of XDP frames in the i40e driver. The batch
size is fixed at 64.

First, the driver performs a lookahead in the rx ring to determine if
there are 64 contiguous descriptors available to be processed. If so,
and if the action returned from the bpf program run for each of the 64
descriptors is XDP_REDIRECT_XSK, the new xsk_rcv_batch API is used to
push the batch to the XDP socket Rx ring.

Logic to fallback to scalar processing is included for situations where
batch processing is not possible eg. not enough descriptors, ring wrap,
different actions returned from bpf program, etc.

Signed-off-by: Ciara Loftus <ciara.loftus@xxxxxxxxx>
Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@xxxxxxxxx>
---
 drivers/net/ethernet/intel/i40e/i40e_xsk.c | 219 +++++++++++++++++++--
 1 file changed, 200 insertions(+), 19 deletions(-)

diff --git a/drivers/net/ethernet/intel/i40e/i40e_xsk.c b/drivers/net/ethernet/intel/i40e/i40e_xsk.c
index c994b4d9c38a..a578bb7b3b99 100644
--- a/drivers/net/ethernet/intel/i40e/i40e_xsk.c
+++ b/drivers/net/ethernet/intel/i40e/i40e_xsk.c
@@ -10,6 +10,9 @@
 #include "i40e_txrx_common.h"
 #include "i40e_xsk.h"
 
+#define I40E_DESCS_PER_BATCH 64
+#define I40E_XSK_BATCH_MASK ~(I40E_DESCS_PER_BATCH - 1)
+
 int i40e_alloc_rx_bi_zc(struct i40e_ring *rx_ring)
 {
 	unsigned long sz = sizeof(*rx_ring->rx_bi_zc) * rx_ring->count;
@@ -139,26 +142,12 @@ int i40e_xsk_pool_setup(struct i40e_vsi *vsi, struct xsk_buff_pool *pool,
 		i40e_xsk_pool_disable(vsi, qid);
 }
 
-/**
- * i40e_run_xdp_zc - Executes an XDP program on an xdp_buff
- * @rx_ring: Rx ring
- * @xdp: xdp_buff used as input to the XDP program
- *
- * Returns any of I40E_XDP_{PASS, CONSUMED, TX, REDIR, REDIR_XSK}
- **/
-static int i40e_run_xdp_zc(struct i40e_ring *rx_ring, struct xdp_buff *xdp)
+static int i40e_handle_xdp_action(struct i40e_ring *rx_ring, struct xdp_buff *xdp,
+				  struct bpf_prog *xdp_prog, u32 act)
 {
 	int err, result = I40E_XDP_PASS;
 	struct i40e_ring *xdp_ring;
-	struct bpf_prog *xdp_prog;
 	struct xdp_sock *xs;
-	u32 act;
-
-	/* NB! xdp_prog will always be !NULL, due to the fact that
-	 * this path is enabled by setting an XDP program.
-	 */
-	xdp_prog = READ_ONCE(rx_ring->xdp_prog);
-	act = bpf_prog_run_xdp(xdp_prog, xdp);
 
 	if (likely(act == XDP_REDIRECT_XSK)) {
 		xs = xsk_get_redirect_xsk(&rx_ring->netdev->_rx[xdp->rxq->queue_index]);
@@ -197,6 +186,21 @@ static int i40e_run_xdp_zc(struct i40e_ring *rx_ring, struct xdp_buff *xdp)
 	return result;
 }
 
+/**
+ * i40e_run_xdp_zc - Executes an XDP program on an xdp_buff
+ * @rx_ring: Rx ring
+ * @xdp: xdp_buff used as input to the XDP program
+ *
+ * Returns any of I40E_XDP_{PASS, CONSUMED, TX, REDIR, REDIR_XSK}
+ **/
+static int i40e_run_xdp_zc(struct i40e_ring *rx_ring, struct xdp_buff *xdp,
+			   struct bpf_prog *xdp_prog)
+{
+	u32 act = bpf_prog_run_xdp(xdp_prog, xdp);
+
+	return i40e_handle_xdp_action(rx_ring, xdp, xdp_prog, act);
+}
+
 bool i40e_alloc_rx_buffers_zc(struct i40e_ring *rx_ring, u16 count)
 {
 	u16 ntu = rx_ring->next_to_use;
@@ -218,6 +222,7 @@ bool i40e_alloc_rx_buffers_zc(struct i40e_ring *rx_ring, u16 count)
 		dma = xsk_buff_xdp_get_dma(*xdp);
 		rx_desc->read.pkt_addr = cpu_to_le64(dma);
 		rx_desc->read.hdr_addr = 0;
+		rx_desc->wb.qword1.status_error_len = 0;
 
 		rx_desc++;
 		xdp++;
@@ -324,6 +329,7 @@ static void i40e_handle_xdp_result_zc(struct i40e_ring *rx_ring,
 }
 
 static inline void i40e_clean_rx_desc_zc(struct i40e_ring *rx_ring,
+					 struct bpf_prog *xdp_prog,
 					 unsigned int *stat_rx_packets,
 					 unsigned int *stat_rx_bytes,
 					 unsigned int *xmit,
@@ -370,7 +376,7 @@ static inline void i40e_clean_rx_desc_zc(struct i40e_ring *rx_ring,
 		xsk_buff_set_size(bi, size);
 		xsk_buff_dma_sync_for_cpu(bi, rx_ring->xsk_pool);
 
-		xdp_res = i40e_run_xdp_zc(rx_ring, bi);
+		xdp_res = i40e_run_xdp_zc(rx_ring, bi, xdp_prog);
 		i40e_handle_xdp_result_zc(rx_ring, bi, rx_desc, &rx_packets,
 					  &rx_bytes, size, xdp_res);
 		total_rx_packets += rx_packets;
@@ -385,6 +391,172 @@ static inline void i40e_clean_rx_desc_zc(struct i40e_ring *rx_ring,
 	*xmit = xdp_xmit;
 }
 
+/**
+ * i40_rx_ring_lookahead - check for new descriptors in the rx ring
+ * @rx_ring: Rx ring
+ * @budget: NAPI budget
+ *
+ * Returns the number of available descriptors in contiguous memory ie.
+ * without a ring wrap.
+ *
+ **/
+static inline unsigned int i40_rx_ring_lookahead(struct i40e_ring *rx_ring,
+						 unsigned int budget)
+{
+	u32 used = (rx_ring->next_to_clean - rx_ring->next_to_use - 1) & (rx_ring->count - 1);
+	union i40e_rx_desc *rx_desc0 = (union i40e_rx_desc *)rx_ring->desc, *rx_desc;
+	u32 next_to_clean = rx_ring->next_to_clean;
+	u32 potential = rx_ring->count - used;
+	u16 count_mask = rx_ring->count - 1;
+	unsigned int size;
+	u64 qword;
+
+	budget &= I40E_XSK_BATCH_MASK;
+
+	while (budget) {
+		if (budget > potential)
+			goto next;
+		rx_desc = rx_desc0 + ((next_to_clean + budget - 1) & count_mask);
+		qword = le64_to_cpu(rx_desc->wb.qword1.status_error_len);
+		dma_rmb();
+
+		size = (qword & I40E_RXD_QW1_LENGTH_PBUF_MASK) >>
+			I40E_RXD_QW1_LENGTH_PBUF_SHIFT;
+		if (size && ((next_to_clean + budget) <= count_mask))
+			return budget;
+
+next:
+		budget >>= 1;
+		budget &= I40E_XSK_BATCH_MASK;
+	}
+
+	return 0;
+}
+
+/**
+ * i40e_run_xdp_zc_batch - Executes an XDP program on an array of xdp_buffs
+ * @rx_ring: Rx ring
+ * @bufs: array of xdp_buffs used as input to the XDP program
+ * @res: array of ints with result for each buf if an error occurs or slow path taken.
+ *
+ * Returns zero if all xdp_buffs successfully took the fast path (XDP_REDIRECT_XSK).
+ * Otherwise returns -1 and sets individual results for each buf in the array *res.
+ * Individual results are one of I40E_XDP_{PASS, CONSUMED, TX, REDIR, REDIR_XSK}
+ **/
+static int i40e_run_xdp_zc_batch(struct i40e_ring *rx_ring, struct xdp_buff **bufs,
+				 struct bpf_prog *xdp_prog, int *res)
+{
+	u32 last_act = XDP_REDIRECT_XSK;
+	int runs = 0, ret = 0, err, i;
+
+	while ((runs < I40E_DESCS_PER_BATCH) && (last_act == XDP_REDIRECT_XSK))
+		last_act = bpf_prog_run_xdp(xdp_prog, *(bufs + runs++));
+
+	if (likely(runs == I40E_DESCS_PER_BATCH)) {
+		struct xdp_sock *xs =
+			xsk_get_redirect_xsk(&rx_ring->netdev->_rx[(*bufs)->rxq->queue_index]);
+
+		err = xsk_rcv_batch(xs, bufs, I40E_DESCS_PER_BATCH);
+		if (unlikely(err)) {
+			ret = -1;
+			for (i = 0; i < I40E_DESCS_PER_BATCH; i++)
+				*(res + i) = I40E_XDP_PASS;
+		}
+	} else {
+		/* Handle the result of each program run individually */
+		u32 act;
+
+		ret = -1;
+		for (i = 0; i < I40E_DESCS_PER_BATCH; i++) {
+			struct xdp_buff *xdp = *(bufs + i);
+
+			/* The result of the first runs-2 programs was XDP_REDIRECT_XSK.
+			 * The result of the subsequent program run was last_act.
+			 * Any remaining bufs have not yet had the program executed, so
+			 * execute it now.
+			 */
+
+			if (i < runs - 2)
+				act = XDP_REDIRECT_XSK;
+			else if (i == runs - 1)
+				act = last_act;
+			else
+				act = bpf_prog_run_xdp(xdp_prog, xdp);
+
+			*(res + i) = i40e_handle_xdp_action(rx_ring, xdp, xdp_prog, act);
+		}
+	}
+
+	return ret;
+}
+
+static inline void i40e_clean_rx_desc_zc_batch(struct i40e_ring *rx_ring,
+					       struct bpf_prog *xdp_prog,
+					       unsigned int *total_rx_packets,
+					       unsigned int *total_rx_bytes,
+					       unsigned int *xdp_xmit)
+{
+	u16 next_to_clean = rx_ring->next_to_clean;
+	unsigned int xdp_res[I40E_DESCS_PER_BATCH];
+	unsigned int size[I40E_DESCS_PER_BATCH];
+	unsigned int rx_packets, rx_bytes = 0;
+	union i40e_rx_desc *rx_desc;
+	struct xdp_buff **bufs;
+	int j, ret;
+	u64 qword;
+
+	rx_desc = I40E_RX_DESC(rx_ring, next_to_clean);
+
+	prefetch(rx_desc + I40E_DESCS_PER_BATCH);
+
+	for (j = 0; j < I40E_DESCS_PER_BATCH; j++) {
+		qword = le64_to_cpu((rx_desc + j)->wb.qword1.status_error_len);
+		size[j] = (qword & I40E_RXD_QW1_LENGTH_PBUF_MASK) >>
+			I40E_RXD_QW1_LENGTH_PBUF_SHIFT;
+	}
+
+	/* This memory barrier is needed to keep us from reading
+	 * any other fields out of the rx_descs until we have
+	 * verified the descriptors have been written back.
+	 */
+	dma_rmb();
+
+	bufs = i40e_rx_bi(rx_ring, next_to_clean);
+
+	for (j = 0; j < I40E_DESCS_PER_BATCH; j++)
+		xsk_buff_set_size(*(bufs + j), size[j]);
+
+	xsk_buff_dma_sync_for_cpu_batch(bufs, rx_ring->xsk_pool, I40E_DESCS_PER_BATCH);
+
+	ret = i40e_run_xdp_zc_batch(rx_ring, bufs, xdp_prog, xdp_res);
+
+	if (unlikely(ret)) {
+		unsigned int err_rx_packets = 0, err_rx_bytes = 0;
+
+		rx_packets = 0;
+		rx_bytes = 0;
+
+		for (j = 0; j < I40E_DESCS_PER_BATCH; j++) {
+			i40e_handle_xdp_result_zc(rx_ring, *(bufs + j), rx_desc + j,
+						  &err_rx_packets, &err_rx_bytes, size[j],
+						  xdp_res[j]);
+			*xdp_xmit |= (xdp_res[j] & (I40E_XDP_TX | I40E_XDP_REDIR |
+						    I40E_XDP_REDIR_XSK));
+			rx_packets += err_rx_packets;
+			rx_bytes += err_rx_bytes;
+		}
+	} else {
+		rx_packets = I40E_DESCS_PER_BATCH;
+		for (j = 0; j < I40E_DESCS_PER_BATCH; j++)
+			rx_bytes += size[j];
+		*xdp_xmit |= I40E_XDP_REDIR_XSK;
+	}
+
+	rx_ring->next_to_clean += I40E_DESCS_PER_BATCH;
+	*total_rx_packets += rx_packets;
+	*total_rx_bytes += rx_bytes;
+}
+
 /**
  * i40e_clean_rx_irq_zc - Consumes Rx packets from the hardware ring
  * @rx_ring: Rx ring
@@ -394,17 +566,26 @@ static inline void i40e_clean_rx_desc_zc(struct i40e_ring *rx_ring,
  **/
 int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget)
 {
+	int batch_budget = i40_rx_ring_lookahead(rx_ring, (unsigned int)budget);
+	struct bpf_prog *xdp_prog = READ_ONCE(rx_ring->xdp_prog);
 	unsigned int total_rx_bytes = 0, total_rx_packets = 0;
 	u16 count_mask = rx_ring->count - 1;
 	unsigned int xdp_xmit = 0;
 	bool failure = false;
 	u16 cleaned_count;
+	int i;
+
+	for (i = 0; i < batch_budget; i += I40E_DESCS_PER_BATCH)
+		i40e_clean_rx_desc_zc_batch(rx_ring, xdp_prog,
+					    &total_rx_packets,
+					    &total_rx_bytes,
+					    &xdp_xmit);
 
-	i40e_clean_rx_desc_zc(rx_ring,
+	i40e_clean_rx_desc_zc(rx_ring, xdp_prog,
 			      &total_rx_packets,
 			      &total_rx_bytes,
 			      &xdp_xmit,
-			      budget);
+			      (unsigned int)budget - total_rx_packets);
 
 	cleaned_count = (rx_ring->next_to_clean - rx_ring->next_to_use - 1) & count_mask;
 
-- 
2.17.1




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux