[RFC PATCH 05/17] pifomap: Add queue rotation for continuously increasing rank mode

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Amend the PIFO map so it can operate in a mode that allows the range to
increase continuously. This works by allocating two underlying queues, and
queueing entries into the second one if the first one overflows. When the
primary queue runs empty, if there are entries in the secondary queue, swap
the two queues and shift the operating range of the new secondary queue to
be after the (new) primary. This way the queue can support a continuously
increasing rank, for instance to index by timestamps.

Signed-off-by: Toke Høiland-Jørgensen <toke@xxxxxxxxxx>
---
 kernel/bpf/pifomap.c | 96 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 75 insertions(+), 21 deletions(-)

diff --git a/kernel/bpf/pifomap.c b/kernel/bpf/pifomap.c
index 5040f532e5d8..62633c2c7419 100644
--- a/kernel/bpf/pifomap.c
+++ b/kernel/bpf/pifomap.c
@@ -6,6 +6,7 @@
 #include <linux/bpf.h>
 #include <linux/bitops.h>
 #include <linux/btf_ids.h>
+#include <linux/minmax.h>
 #include <net/xdp.h>
 #include <linux/filter.h>
 #include <trace/events/xdp.h>
@@ -44,7 +45,8 @@ struct bpf_pifo_queue {
 
 struct bpf_pifo_map {
 	struct bpf_map map;
-	struct bpf_pifo_queue *queue;
+	struct bpf_pifo_queue *q_primary;
+	struct bpf_pifo_queue *q_secondary;
 	unsigned long num_queued;
 	spinlock_t lock; /* protects enqueue / dequeue */
 
@@ -71,6 +73,12 @@ static bool pifo_map_is_full(struct bpf_pifo_map *pifo)
 	return pifo->num_queued >= pifo->map.max_entries;
 }
 
+static bool pifo_queue_is_empty(struct bpf_pifo_queue *queue)
+{
+	/* first word in bitmap is always the top-level map */
+	return !queue->bitmap[0];
+}
+
 static void pifo_queue_free(struct bpf_pifo_queue *q)
 {
 	bpf_map_area_free(q->buckets);
@@ -79,7 +87,7 @@ static void pifo_queue_free(struct bpf_pifo_queue *q)
 	kfree(q);
 }
 
-static struct bpf_pifo_queue *pifo_queue_alloc(u32 range, int numa_node)
+static struct bpf_pifo_queue *pifo_queue_alloc(u32 range, u32 min_rank, int numa_node)
 {
 	u32 num_longs = 0, offset = 0, i, lvl, levels;
 	struct bpf_pifo_queue *q;
@@ -112,6 +120,7 @@ static struct bpf_pifo_queue *pifo_queue_alloc(u32 range, int numa_node)
 	}
 	q->levels = levels;
 	q->range = range;
+	q->min_rank = min_rank;
 	return q;
 
 err:
@@ -131,10 +140,14 @@ static int pifo_map_init_map(struct bpf_pifo_map *pifo, union bpf_attr *attr,
 
 	bpf_map_init_from_attr(&pifo->map, attr);
 
-	pifo->queue = pifo_queue_alloc(range, pifo->map.numa_node);
-	if (!pifo->queue)
+	pifo->q_primary = pifo_queue_alloc(range, 0, pifo->map.numa_node);
+	if (!pifo->q_primary)
 		return -ENOMEM;
 
+	pifo->q_secondary = pifo_queue_alloc(range, range, pifo->map.numa_node);
+	if (!pifo->q_secondary)
+		goto err_queue;
+
 	if (attr->map_type == BPF_MAP_TYPE_PIFO_GENERIC) {
 		size_t cache_size;
 		int i;
@@ -144,7 +157,7 @@ static int pifo_map_init_map(struct bpf_pifo_map *pifo, union bpf_attr *attr,
 		pifo->elem_cache = bpf_map_area_alloc(cache_size,
 						      pifo->map.numa_node);
 		if (!pifo->elem_cache)
-			goto err_queue;
+			goto err;
 
 		for (i = 0; i < attr->max_entries; i++)
 			pifo->elem_cache->elements[i] = (void *)&pifo->elements[i * elem_size];
@@ -153,8 +166,10 @@ static int pifo_map_init_map(struct bpf_pifo_map *pifo, union bpf_attr *attr,
 
 	return 0;
 
+err:
+	pifo_queue_free(pifo->q_secondary);
 err_queue:
-	pifo_queue_free(pifo->queue);
+	pifo_queue_free(pifo->q_primary);
 	return err;
 }
 
@@ -238,9 +253,12 @@ static void pifo_map_free(struct bpf_map *map)
 
 	synchronize_rcu();
 
-	if (map->map_type == BPF_MAP_TYPE_PIFO_XDP)
-		pifo_queue_flush(pifo->queue);
-	pifo_queue_free(pifo->queue);
+	if (map->map_type == BPF_MAP_TYPE_PIFO_XDP) {
+		pifo_queue_flush(pifo->q_primary);
+		pifo_queue_flush(pifo->q_secondary);
+	}
+	pifo_queue_free(pifo->q_primary);
+	pifo_queue_free(pifo->q_secondary);
 	bpf_map_area_free(pifo->elem_cache);
 	bpf_map_area_free(pifo);
 }
@@ -249,7 +267,7 @@ static int pifo_map_get_next_key(struct bpf_map *map, void *key, void *next_key)
 {
 	struct bpf_pifo_map *pifo = container_of(map, struct bpf_pifo_map, map);
 	u32 index = key ? *(u32 *)key : U32_MAX, offset;
-	struct bpf_pifo_queue *queue = pifo->queue;
+	struct bpf_pifo_queue *queue = pifo->q_primary;
 	unsigned long idx, flags;
 	u32 *next = next_key;
 	int ret = -ENOENT;
@@ -261,15 +279,27 @@ static int pifo_map_get_next_key(struct bpf_map *map, void *key, void *next_key)
 	else
 		offset = index - queue->min_rank + 1;
 
-	if (offset >= queue->range)
-		goto out;
+	if (offset >= queue->range) {
+		offset -= queue->range;
+		queue = pifo->q_secondary;
+
+		if (offset >= queue->range)
+			goto out;
+	}
 
+search:
 	idx = find_next_bit(queue->lvl_bitmap[queue->levels - 1],
 			    queue->range, offset);
-	if (idx == queue->range)
+	if (idx == queue->range) {
+		if (queue == pifo->q_primary) {
+			queue = pifo->q_secondary;
+			offset = 0;
+			goto search;
+		}
 		goto out;
+	}
 
-	*next = idx;
+	*next = idx + queue->min_rank;
 	ret = 0;
 out:
 	spin_unlock_irqrestore(&pifo->lock, flags);
@@ -316,7 +346,7 @@ static void pifo_item_set_next(union bpf_pifo_item *item, void *next, bool xdp)
 static int __pifo_map_enqueue(struct bpf_pifo_map *pifo, union bpf_pifo_item *item,
 			      u64 rank, bool xdp)
 {
-	struct bpf_pifo_queue *queue = pifo->queue;
+	struct bpf_pifo_queue *queue = pifo->q_primary;
 	struct bpf_pifo_bucket *bucket;
 	u64 q_index;
 
@@ -331,8 +361,16 @@ static int __pifo_map_enqueue(struct bpf_pifo_map *pifo, union bpf_pifo_item *it
 	pifo_item_set_next(item, NULL, xdp);
 
 	q_index = rank - queue->min_rank;
-	if (unlikely(q_index >= queue->range))
-		q_index = queue->range - 1;
+	if (unlikely(q_index >= queue->range)) {
+		/* If we overflow the primary queue, enqueue into secondary, and
+		 * if we overflow that enqueue as the last item
+		 */
+		q_index -= queue->range;
+		queue = pifo->q_secondary;
+
+		if (q_index >= queue->range)
+			q_index = queue->range - 1;
+	}
 
 	bucket = &queue->buckets[q_index];
 	if (likely(!bucket->head)) {
@@ -380,7 +418,7 @@ static unsigned long pifo_find_first_bucket(struct bpf_pifo_queue *queue)
 static union bpf_pifo_item *__pifo_map_dequeue(struct bpf_pifo_map *pifo,
 					       u64 flags, u64 *rank, bool xdp)
 {
-	struct bpf_pifo_queue *queue = pifo->queue;
+	struct bpf_pifo_queue *queue = pifo->q_primary;
 	struct bpf_pifo_bucket *bucket;
 	union bpf_pifo_item *item;
 	unsigned long bucket_idx;
@@ -392,6 +430,17 @@ static union bpf_pifo_item *__pifo_map_dequeue(struct bpf_pifo_map *pifo,
 		return NULL;
 	}
 
+	if (!pifo->num_queued) {
+		*rank = -ENOENT;
+		return NULL;
+	}
+
+	if (unlikely(pifo_queue_is_empty(queue))) {
+		swap(pifo->q_primary, pifo->q_secondary);
+		pifo->q_secondary->min_rank = pifo->q_primary->min_rank + pifo->q_primary->range;
+		queue = pifo->q_primary;
+	}
+
 	bucket_idx = pifo_find_first_bucket(queue);
 	if (bucket_idx == -1) {
 		*rank = -ENOENT;
@@ -437,7 +486,7 @@ struct xdp_frame *pifo_map_dequeue(struct bpf_map *map, u64 flags, u64 *rank)
 static void *pifo_map_lookup_elem(struct bpf_map *map, void *key)
 {
 	struct bpf_pifo_map *pifo = container_of(map, struct bpf_pifo_map, map);
-	struct bpf_pifo_queue *queue = pifo->queue;
+	struct bpf_pifo_queue *queue = pifo->q_primary;
 	struct bpf_pifo_bucket *bucket;
 	u32 rank =  *(u32 *)key, idx;
 
@@ -445,8 +494,13 @@ static void *pifo_map_lookup_elem(struct bpf_map *map, void *key)
 		return NULL;
 
 	idx = rank - queue->min_rank;
-	if (idx >= queue->range)
-		return NULL;
+	if (idx >= queue->range) {
+		idx -= queue->range;
+		queue = pifo->q_secondary;
+
+		if (idx >= queue->range)
+			return NULL;
+	}
 
 	bucket = &queue->buckets[idx];
 	/* FIXME: what happens if this changes while userspace is reading the
-- 
2.37.0




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux