Re: [RFC PATCH v8 18/20] selftests: Add a bpf fq qdisc to selftest

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 5/10/24 12:24 PM, Amery Hung wrote:
This test implements a more sophisticated qdisc using bpf. The bpf fair-
queueing (fq) qdisc gives each flow an equal chance to transmit data. It
also respects the timestamp of skb for rate limiting. The implementation
does not prevent hash collision of flows nor does it recycle flows.

Does it hit some issue to handle the flow collision (just curious if there are missing pieces to do this)?

The bpf fq also takes the chance to communicate packet drop information
with a bpf clsact EDT rate limiter using bpf maps. With the info, the
rate limiter can compenstate the delay caused by packet drops in qdisc
to maintain the throughput.


diff --git a/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c b/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c
new file mode 100644
index 000000000000..5118237da9e4
--- /dev/null
+++ b/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c
@@ -0,0 +1,660 @@
+#include <vmlinux.h>
+#include <bpf/bpf_helpers.h>
+#include "bpf_experimental.h"
+#include "bpf_qdisc_common.h"
+
+char _license[] SEC("license") = "GPL";
+
+#define NSEC_PER_USEC 1000L
+#define NSEC_PER_SEC 1000000000L
+#define PSCHED_MTU (64 * 1024 + 14)
+
+#define NUM_QUEUE_LOG 10
+#define NUM_QUEUE (1 << NUM_QUEUE_LOG)
+#define PRIO_QUEUE (NUM_QUEUE + 1)
+#define COMP_DROP_PKT_DELAY 1
+#define THROTTLED 0xffffffffffffffff
+
+/* fq configuration */
+__u64 q_flow_refill_delay = 40 * 10000; //40us
+__u64 q_horizon = 10ULL * NSEC_PER_SEC;
+__u32 q_initial_quantum = 10 * PSCHED_MTU;
+__u32 q_quantum = 2 * PSCHED_MTU;
+__u32 q_orphan_mask = 1023;
+__u32 q_flow_plimit = 100;
+__u32 q_plimit = 10000;
+__u32 q_timer_slack = 10 * NSEC_PER_USEC;
+bool q_horizon_drop = true;
+
+bool q_compensate_tstamp;
+bool q_random_drop;
+
+unsigned long time_next_delayed_flow = ~0ULL;
+unsigned long unthrottle_latency_ns = 0ULL;
+unsigned long ktime_cache = 0;
+unsigned long dequeue_now;
+unsigned int fq_qlen = 0;

I suspect some of these globals may be more natural if it is stored private to an individual Qdisc instance. i.e. qdisc_priv(). e.g. in the sch_mq setup.

A high level idea is to allow the SEC(".struct_ops.link") to specify its own Qdisc_ops.priv_size.

The bpf prog could use it as a simple u8 array memory area to write anything but the verifier can't learn a lot from it. It will be more useful if it can work like map_value(s) to the verifier such that the verifier can also see the bpf_rb_root/bpf_list_head/bpf_spin_lock...etc.

+
+struct fq_flow_node {
+	u32 hash;
+	int credit;
+	u32 qlen;
+	u32 socket_hash;
+	u64 age;
+	u64 time_next_packet;
+	struct bpf_list_node list_node;
+	struct bpf_rb_node rb_node;
+	struct bpf_rb_root queue __contains_kptr(sk_buff, bpf_rbnode);
+	struct bpf_spin_lock lock;
+	struct bpf_refcount refcount;
+};
+
+struct dequeue_nonprio_ctx {
+	bool dequeued;
+	u64 expire;
+};
+
+struct fq_stashed_flow {
+	struct fq_flow_node __kptr *flow;
+};
+
+struct stashed_skb {
+	struct sk_buff __kptr *skb;
+};
+
+/* [NUM_QUEUE] for TC_PRIO_CONTROL
+ * [0, NUM_QUEUE - 1] for other flows
+ */
+struct {
+	__uint(type, BPF_MAP_TYPE_ARRAY);
+	__type(key, __u32);
+	__type(value, struct fq_stashed_flow);
+	__uint(max_entries, NUM_QUEUE + 1);
+} fq_stashed_flows SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__type(key, __u32);
+	__type(value, __u64);
+	__uint(pinning, LIBBPF_PIN_BY_NAME);
+	__uint(max_entries, 16);
+} rate_map SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__type(key, __u32);
+	__type(value, __u64);
+	__uint(pinning, LIBBPF_PIN_BY_NAME);
+	__uint(max_entries, 16);
+} comp_map SEC(".maps");
+
+#define private(name) SEC(".data." #name) __hidden __attribute__((aligned(8)))
+
+private(A) struct bpf_spin_lock fq_delayed_lock;
+private(A) struct bpf_rb_root fq_delayed __contains(fq_flow_node, rb_node);
+
+private(B) struct bpf_spin_lock fq_new_flows_lock;
+private(B) struct bpf_list_head fq_new_flows __contains(fq_flow_node, list_node);
+
+private(C) struct bpf_spin_lock fq_old_flows_lock;
+private(C) struct bpf_list_head fq_old_flows __contains(fq_flow_node, list_node);
+
+private(D) struct bpf_spin_lock fq_stashed_skb_lock;
+private(D) struct bpf_list_head fq_stashed_skb __contains_kptr(sk_buff, bpf_list);

[ ... ]

+SEC("struct_ops/bpf_fq_enqueue")
+int BPF_PROG(bpf_fq_enqueue, struct sk_buff *skb, struct Qdisc *sch,
+	     struct bpf_sk_buff_ptr *to_free)
+{
+	struct iphdr *iph = (void *)(long)skb->data + sizeof(struct ethhdr);
+	u64 time_to_send, jiffies, delay_ns, *comp_ns, *rate;
+	struct fq_flow_node *flow = NULL, *flow_copy;
+	struct fq_stashed_flow *sflow;
+	u32 hash, daddr, sk_hash;
+	bool connected;
+
+	if (q_random_drop & (bpf_get_prandom_u32() > ~0U * 0.90))
+		goto drop;
+
+	if (fq_qlen >= q_plimit)
+		goto drop;
+
+	if (!skb->tstamp) {
+		time_to_send = ktime_cache = bpf_ktime_get_ns();
+	} else {
+		if (fq_packet_beyond_horizon(skb)) {
+			ktime_cache = bpf_ktime_get_ns();
+			if (fq_packet_beyond_horizon(skb)) {
+				if (q_horizon_drop)
+					goto drop;
+
+				skb->tstamp = ktime_cache + q_horizon;
+			}
+		}
+		time_to_send = skb->tstamp;
+	}
+
+	if (fq_classify(skb, &hash, &sflow, &connected, &sk_hash) < 0)
+		goto drop;
+
+	flow = bpf_kptr_xchg(&sflow->flow, flow);
+	if (!flow)
+		goto drop; //unexpected
+
+	if (hash != PRIO_QUEUE) {
+		if (connected && flow->socket_hash != sk_hash) {
+			flow->credit = q_initial_quantum;
+			flow->socket_hash = sk_hash;
+			if (fq_flow_is_throttled(flow)) {
+				/* mark the flow as undetached. The reference to the
+				 * throttled flow in fq_delayed will be removed later.
+				 */
+				flow_copy = bpf_refcount_acquire(flow);
+				flow_copy->age = 0;
+				fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow_copy);
+			}
+			flow->time_next_packet = 0ULL;
+		}
+
+		if (flow->qlen >= q_flow_plimit) {
+			bpf_kptr_xchg_back(&sflow->flow, flow);
+			goto drop;
+		}
+
+		if (fq_flow_is_detached(flow)) {
+			if (connected)
+				flow->socket_hash = sk_hash;
+
+			flow_copy = bpf_refcount_acquire(flow);
+
+			jiffies = bpf_jiffies64();
+			if ((s64)(jiffies - (flow_copy->age + q_flow_refill_delay)) > 0) {
+				if (flow_copy->credit < q_quantum)
+					flow_copy->credit = q_quantum;
+			}
+			flow_copy->age = 0;
+			fq_flows_add_tail(&fq_new_flows, &fq_new_flows_lock, flow_copy);
+		}
+	}
+
+	skb->tstamp = time_to_send;
+
+	bpf_spin_lock(&flow->lock);
+	bpf_rbtree_excl_add(&flow->queue, &skb->bpf_rbnode, skb_tstamp_less);
+	bpf_spin_unlock(&flow->lock);
+
+	flow->qlen++;
+	bpf_kptr_xchg_back(&sflow->flow, flow);
+
+	fq_qlen++;
+	return NET_XMIT_SUCCESS;
+
+drop:
+	if (q_compensate_tstamp) {
+		bpf_probe_read_kernel(&daddr, sizeof(daddr), &iph->daddr);
+		rate = bpf_map_lookup_elem(&rate_map, &daddr);
+		comp_ns = bpf_map_lookup_elem(&comp_map, &daddr);
+		if (rate && comp_ns) {
+			delay_ns = (u64)qdisc_skb_cb(skb)->pkt_len * NSEC_PER_SEC / (*rate);
+			__sync_fetch_and_add(comp_ns, delay_ns);
+		}
+	}
+	bpf_qdisc_skb_drop(skb, to_free);
+	return NET_XMIT_DROP;
+}

[ ... ]

+SEC("struct_ops/bpf_fq_dequeue")
+struct sk_buff *BPF_PROG(bpf_fq_dequeue, struct Qdisc *sch)
+{
+	struct dequeue_nonprio_ctx cb_ctx = {};
+	struct sk_buff *skb = NULL;
+
+	skb = fq_dequeue_prio();
+	if (skb) {
+		bpf_skb_set_dev(skb, sch);
+		return skb;
+	}
+
+	ktime_cache = dequeue_now = bpf_ktime_get_ns();
+	fq_check_throttled();
+	bpf_loop(q_plimit, fq_dequeue_nonprio_flows, &cb_ctx, 0);
+
+	skb = get_stashed_skb();
+
+	if (skb) {
+		bpf_skb_set_dev(skb, sch);
+		return skb;
+	}
+
+	if (cb_ctx.expire)
+		bpf_qdisc_watchdog_schedule(sch, cb_ctx.expire, q_timer_slack);
+
+	return NULL;
+}

The enqueue and dequeue are using the bpf map (e.g. arraymap) or global var (also an arraymap). Potentially, the map can be shared by different qdisc instances (sch) and they could be attached to different net devices also. Not sure if there is potentail issue? e.g. the bpf_fq_reset below.
or a bpf prog dequeue a skb with a different skb->dev.

+
+static int
+fq_reset_flows(u32 index, void *ctx)
+{
+	struct bpf_list_node *node;
+	struct fq_flow_node *flow;
+
+	bpf_spin_lock(&fq_new_flows_lock);
+	node = bpf_list_pop_front(&fq_new_flows);
+	bpf_spin_unlock(&fq_new_flows_lock);
+	if (!node) {
+		bpf_spin_lock(&fq_old_flows_lock);
+		node = bpf_list_pop_front(&fq_old_flows);
+		bpf_spin_unlock(&fq_old_flows_lock);
+		if (!node)
+			return 1;
+	}
+
+	flow = container_of(node, struct fq_flow_node, list_node);
+	bpf_obj_drop(flow);
+
+	return 0;
+}
+
+static int
+fq_reset_stashed_flows(u32 index, void *ctx)
+{
+	struct fq_flow_node *flow = NULL;
+	struct fq_stashed_flow *sflow;
+
+	sflow = bpf_map_lookup_elem(&fq_stashed_flows, &index);
+	if (!sflow)
+		return 0;
+
+	flow = bpf_kptr_xchg(&sflow->flow, flow);
+	if (flow)
+		bpf_obj_drop(flow);
+
+	return 0;
+}
+
+SEC("struct_ops/bpf_fq_reset")
+void BPF_PROG(bpf_fq_reset, struct Qdisc *sch)
+{
+	bool unset_all = true;
+	fq_qlen = 0;
+	bpf_loop(NUM_QUEUE + 1, fq_reset_stashed_flows, NULL, 0);
+	bpf_loop(NUM_QUEUE, fq_reset_flows, NULL, 0);
+	bpf_loop(NUM_QUEUE, fq_unset_throttled_flows, &unset_all, 0);

I am not sure if it can depend on a bpf prog to do cleanup/reset. What if it missed to drop some skb which potentially could hold up resources like sk/dev/netns?

A quick thought is that the struct_ops knows all the bpf progs and each prog tracks the used map in prog->aux->used_maps. The kernel can clean it up. However, the map may still be used by other Qdisc instances.

It may be easier if the skb can only be enqueued somewhere in the qdisc_priv() and then cleans up its own qdisc_priv during reset.

+	return;
+}
+
+SEC(".struct_ops")
+struct Qdisc_ops fq = {
+	.enqueue   = (void *)bpf_fq_enqueue,
+	.dequeue   = (void *)bpf_fq_dequeue,
+	.reset     = (void *)bpf_fq_reset,
+	.id        = "bpf_fq",
+};





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux