[RFC bpf-next 5/7] net: add busy-poll support for XDP sockets

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch adds busy-poll support for XDP sockets (AF_XDP). With
busy-poll, the driver is executed in process context by calling the
poll() syscall. The main advantage with this is that all processing
occurs on a single core. This eliminates the core-to-core cache
transfers that occur between the application and the softirqd
processing on another core, that occurs without busy-poll. From a
systems point of view, it also provides an advatage that we do not
have to provision extra cores in the system to handle
ksoftirqd/softirq processing, as all processing is done on the single
core that executes the application. The drawback of busy-poll is that
max throughput seen from a single application will be lower (due to
the syscall), but on a per core basis it will often be higher as the
normal mode runs on two cores and busy-poll on a single one.

The semantics of busy-poll from the application point of view are the
following:

* The application is required to call poll() to drive rx and tx
  processing. There is no guarantee that softirq and interrupts will
  do this for you.

* It should be enabled on a per socket basis. No global enablement, i.e.
  the XDP socket busy-poll will not care about the current
  /proc/sys/net/core/busy_poll and busy_read global enablement
  mechanisms.

* The batch size (how many packets that are processed every time the
  napi function in the driver is called, i.e. the weight parameter)
  should be configurable. Currently, the busy-poll size of AF_INET
  sockets is set to 8, but for AF_XDP sockets this is too small as the
  amount of processing per packet is much smaller with AF_XDP. This
  should be configurable on a per socket basis.

* If you put multiple AF_XDP busy-poll enabled sockets into a poll()
  call the napi contexts of all of them should be executed. This is in
  contrast to the AF_INET busy-poll that quits after the fist one that
  finds any packets. We need all napi contexts to be executed due to
  the first requirement in this list. The behaviour we want is much more
  like regular sockets in that all of them are checked in the poll
  call.

* Should be possible to mix AF_XDP busy-poll sockets with any other
  sockets including busy-poll AF_INET ones in a single poll() call
  without any change to semantics or the behaviour of any of those
  socket types.

* As suggested by Maxim Mikityanskiy, poll() will in the busy-poll
  mode return POLLERR if the fill ring is empty or the completion
  queue is full.

Busy-poll support is enabled by calling a new setsockopt called
XDP_BUSY_POLL_BATCH_SIZE that takes batch size as an argument. A value
between 1 and NAPI_WEIGHT (64) will turn it on, 0 will turn it off and
any other value will return an error.

A typical packet processing rxdrop loop with busy-poll will look something
like this:

for (i = 0; i < num_socks; i++) {
    fds[i].fd = xsk_socket__fd(xsks[i]->xsk);
    fds[i].events = POLLIN;
}

for (;;) {
    ret = poll(fds, num_socks, 0);
    if (ret <= 0)
            continue;

    for (i = 0; i < num_socks; i++)
        rx_drop(xsks[i], fds); /* The actual application */
}

Signed-off-by: Magnus Karlsson <magnus.karlsson@xxxxxxxxx>
---
 include/net/xdp_sock.h |   3 ++
 net/xdp/Kconfig        |   1 +
 net/xdp/xsk.c          | 122 ++++++++++++++++++++++++++++++++++++++++++++++++-
 net/xdp/xsk_queue.h    |  18 ++++++--
 4 files changed, 138 insertions(+), 6 deletions(-)

diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
index d074b6d..2e956b37 100644
--- a/include/net/xdp_sock.h
+++ b/include/net/xdp_sock.h
@@ -57,7 +57,10 @@ struct xdp_sock {
 	struct net_device *dev;
 	struct xdp_umem *umem;
 	struct list_head flush_node;
+	unsigned int napi_id_rx;
+	unsigned int napi_id_tx;
 	u16 queue_id;
+	u16 bp_batch_size;
 	struct xsk_queue *tx ____cacheline_aligned_in_smp;
 	struct list_head list;
 	bool zc;
diff --git a/net/xdp/Kconfig b/net/xdp/Kconfig
index 0255b33..219baaa 100644
--- a/net/xdp/Kconfig
+++ b/net/xdp/Kconfig
@@ -1,6 +1,7 @@
 config XDP_SOCKETS
 	bool "XDP sockets"
 	depends on BPF_SYSCALL
+	select NET_RX_BUSY_POLL
 	default n
 	help
 	  XDP sockets allows a channel between XDP programs and
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index a14e886..bd3d0fe 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -22,6 +22,7 @@
 #include <linux/net.h>
 #include <linux/netdevice.h>
 #include <linux/rculist.h>
+#include <net/busy_poll.h>
 #include <net/xdp_sock.h>
 #include <net/xdp.h>
 
@@ -302,16 +303,107 @@ static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
 	return (xs->zc) ? xsk_zc_xmit(sk) : xsk_generic_xmit(sk, m, total_len);
 }
 
+static unsigned int xsk_check_rx_poll_err(struct xdp_sock *xs)
+{
+	return xskq_consumer_empty(xs->umem->fq) ? POLLERR : 0;
+}
+
+static unsigned int xsk_check_tx_poll_err(struct xdp_sock *xs)
+{
+	return xskq_producer_full(xs->umem->cq) ? POLLERR : 0;
+}
+
+static bool xsk_busy_loop_end(void *p, unsigned long start_time)
+{
+	return true;
+}
+
+static unsigned int xsk_get_napi_id_rx(struct xdp_sock *xs)
+{
+	if (xs->napi_id_rx)
+		return xs->napi_id_rx;
+	if (xs->dev->_rx[xs->queue_id].xdp_rxq.napi_id) {
+		xs->napi_id_rx = xdp_rxq_info_get(xs->dev,
+						  xs->queue_id)->napi_id;
+		return xs->napi_id_rx;
+	}
+
+	WARN_ON_ONCE(true);
+	return 0;
+}
+
+static unsigned int xsk_get_napi_id_tx(struct xdp_sock *xs)
+{
+	if (xs->napi_id_tx)
+		return xs->napi_id_tx;
+	if (xs->dev->_tx[xs->queue_id].xdp_txq.napi_id) {
+		xs->napi_id_tx = xdp_txq_info_get(xs->dev,
+						  xs->queue_id)->napi_id;
+		return xs->napi_id_tx;
+	}
+
+	WARN_ON_ONCE(true);
+	return 0;
+}
+
+static void xsk_exec_poll_generic(struct sock *sk, struct xdp_sock *xs,
+				  __poll_t events)
+{
+	if (events & (POLLIN | POLLRDNORM))
+		/* NAPI id filled in by the generic XDP code */
+		napi_busy_loop(xsk_get_napi_id_rx(xs), xsk_busy_loop_end, NULL,
+			       xs->bp_batch_size);
+	if (events & (POLLOUT | POLLWRNORM))
+		/* Use the regular send path as we do not have any
+		 * NAPI id for the Tx path. It is only in the driver
+		 * and not communicated upwards in the skb case.
+		 */
+		xsk_generic_xmit(sk, NULL, 0);
+}
+
+static void xsk_exec_poll_zc(struct xdp_sock *xs, __poll_t events)
+{
+	unsigned int napi_id_rx = xsk_get_napi_id_rx(xs);
+	unsigned int napi_id_tx = xsk_get_napi_id_tx(xs);
+
+	if (events & (POLLIN | POLLRDNORM))
+		napi_busy_loop(xs->napi_id_rx, xsk_busy_loop_end, NULL,
+			       xs->bp_batch_size);
+	if (napi_id_rx != napi_id_tx)
+		if (events & (POLLOUT | POLLWRNORM))
+			/* Tx has its own napi so we need to call it too */
+			napi_busy_loop(xs->napi_id_tx, xsk_busy_loop_end, NULL,
+				       xs->bp_batch_size);
+}
+
 static unsigned int xsk_poll(struct file *file, struct socket *sock,
 			     struct poll_table_struct *wait)
 {
 	unsigned int mask = datagram_poll(file, sock, wait);
 	struct sock *sk = sock->sk;
 	struct xdp_sock *xs = xdp_sk(sk);
+	__poll_t events = poll_requested_events(wait);
+
+	if (xs->bp_batch_size) {
+		if (xs->zc)
+			xsk_exec_poll_zc(xs, events);
+		else
+			xsk_exec_poll_generic(sk, xs, events);
+
+		if (events & (POLLIN | POLLRDNORM))
+			mask |= xsk_check_rx_poll_err(xs);
+		if (events & (POLLOUT | POLLWRNORM))
+			mask |= xsk_check_tx_poll_err(xs);
+
+		/* Clear the busy_loop flag so that any further fds in
+		 * the pollfd struct will have their napis scheduled.
+		 */
+		mask &= ~POLL_BUSY_LOOP;
+	}
 
-	if (xs->rx && !xskq_empty_desc(xs->rx))
+	if (xs->rx && !xskq_producer_empty(xs->rx))
 		mask |= POLLIN | POLLRDNORM;
-	if (xs->tx && !xskq_full_desc(xs->tx))
+	if (xs->tx && !xskq_consumer_full(xs->tx))
 		mask |= POLLOUT | POLLWRNORM;
 
 	return mask;
@@ -572,6 +664,21 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
 		mutex_unlock(&xs->mutex);
 		return err;
 	}
+	case XDP_BUSY_POLL_BATCH_SIZE:
+	{
+		u16 batch_size;
+
+		if (copy_from_user(&batch_size, optval, sizeof(batch_size)))
+			return -EFAULT;
+
+		if (batch_size == 0 || batch_size > NAPI_POLL_WEIGHT)
+			return -EINVAL;
+
+		mutex_lock(&xs->mutex);
+		xs->bp_batch_size = batch_size;
+		mutex_unlock(&xs->mutex);
+		return 0;
+	}
 	default:
 		break;
 	}
@@ -644,6 +751,17 @@ static int xsk_getsockopt(struct socket *sock, int level, int optname,
 
 		return 0;
 	}
+	case XDP_BUSY_POLL_BATCH_SIZE:
+		if (len < sizeof(xs->bp_batch_size))
+			return -EINVAL;
+
+		if (copy_to_user(optval, &xs->bp_batch_size,
+				 sizeof(xs->bp_batch_size)))
+			return -EFAULT;
+		if (put_user(sizeof(xs->bp_batch_size), optlen))
+			return -EFAULT;
+
+		return 0;
 	default:
 		break;
 	}
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 88b9ae2..ebbd996 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -292,14 +292,24 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q)
 	WRITE_ONCE(q->ring->producer, q->prod_tail);
 }
 
-static inline bool xskq_full_desc(struct xsk_queue *q)
+static inline bool xskq_consumer_full(struct xsk_queue *q)
 {
-	return xskq_nb_avail(q, q->nentries) == q->nentries;
+	return READ_ONCE(q->ring->producer) - q->cons_tail == q->nentries;
 }
 
-static inline bool xskq_empty_desc(struct xsk_queue *q)
+static inline bool xskq_producer_empty(struct xsk_queue *q)
 {
-	return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
+	return READ_ONCE(q->ring->consumer) == q->prod_tail;
+}
+
+static inline bool xskq_consumer_empty(struct xsk_queue *q)
+{
+	return READ_ONCE(q->ring->producer) == q->cons_tail;
+}
+
+static inline bool xskq_producer_full(struct xsk_queue *q)
+{
+	return q->prod_tail - READ_ONCE(q->ring->consumer) == q->nentries;
 }
 
 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
-- 
2.7.4




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux