diff mbox series

[RFC,05/17] pifomap: Add queue rotation for continuously increasing rank mode

Message ID 20220713111430.134810-6-toke@redhat.com (mailing list archive)
State RFC
Delegated to: BPF
Headers show
Series xdp: Add packet queueing and scheduling capabilities | expand

Checks

Context Check Description
bpf/vmtest-bpf-next-PR pending PR summary
bpf/vmtest-bpf-next-VM_Test-2 pending Logs for Kernel LATEST on ubuntu-latest with llvm-15
bpf/vmtest-bpf-next-VM_Test-3 pending Logs for Kernel LATEST on z15 with gcc
bpf/vmtest-bpf-next-VM_Test-1 fail Logs for Kernel LATEST on ubuntu-latest with gcc
netdev/tree_selection success Guessed tree name to be net-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count fail Series longer than 15 patches (and no cover letter)
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 2 this patch: 2
netdev/cc_maintainers success CCed 12 of 12 maintainers
netdev/build_clang success Errors and warnings before: 6 this patch: 6
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 2 this patch: 2
netdev/checkpatch warning WARNING: line length of 86 exceeds 80 columns WARNING: line length of 97 exceeds 80 columns
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

Toke Høiland-Jørgensen July 13, 2022, 11:14 a.m. UTC
Amend the PIFO map so it can operate in a mode that allows the range to
increase continuously. This works by allocating two underlying queues, and
queueing entries into the second one if the first one overflows. When the
primary queue runs empty, if there are entries in the secondary queue, swap
the two queues and shift the operating range of the new secondary queue to
be after the (new) primary. This way the queue can support a continuously
increasing rank, for instance to index by timestamps.

Signed-off-by: Toke Høiland-Jørgensen <toke@redhat.com>
---
 kernel/bpf/pifomap.c | 96 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 75 insertions(+), 21 deletions(-)
diff mbox series

Patch

diff --git a/kernel/bpf/pifomap.c b/kernel/bpf/pifomap.c
index 5040f532e5d8..62633c2c7419 100644
--- a/kernel/bpf/pifomap.c
+++ b/kernel/bpf/pifomap.c
@@ -6,6 +6,7 @@ 
 #include <linux/bpf.h>
 #include <linux/bitops.h>
 #include <linux/btf_ids.h>
+#include <linux/minmax.h>
 #include <net/xdp.h>
 #include <linux/filter.h>
 #include <trace/events/xdp.h>
@@ -44,7 +45,8 @@  struct bpf_pifo_queue {
 
 struct bpf_pifo_map {
 	struct bpf_map map;
-	struct bpf_pifo_queue *queue;
+	struct bpf_pifo_queue *q_primary;
+	struct bpf_pifo_queue *q_secondary;
 	unsigned long num_queued;
 	spinlock_t lock; /* protects enqueue / dequeue */
 
@@ -71,6 +73,12 @@  static bool pifo_map_is_full(struct bpf_pifo_map *pifo)
 	return pifo->num_queued >= pifo->map.max_entries;
 }
 
+static bool pifo_queue_is_empty(struct bpf_pifo_queue *queue)
+{
+	/* first word in bitmap is always the top-level map */
+	return !queue->bitmap[0];
+}
+
 static void pifo_queue_free(struct bpf_pifo_queue *q)
 {
 	bpf_map_area_free(q->buckets);
@@ -79,7 +87,7 @@  static void pifo_queue_free(struct bpf_pifo_queue *q)
 	kfree(q);
 }
 
-static struct bpf_pifo_queue *pifo_queue_alloc(u32 range, int numa_node)
+static struct bpf_pifo_queue *pifo_queue_alloc(u32 range, u32 min_rank, int numa_node)
 {
 	u32 num_longs = 0, offset = 0, i, lvl, levels;
 	struct bpf_pifo_queue *q;
@@ -112,6 +120,7 @@  static struct bpf_pifo_queue *pifo_queue_alloc(u32 range, int numa_node)
 	}
 	q->levels = levels;
 	q->range = range;
+	q->min_rank = min_rank;
 	return q;
 
 err:
@@ -131,10 +140,14 @@  static int pifo_map_init_map(struct bpf_pifo_map *pifo, union bpf_attr *attr,
 
 	bpf_map_init_from_attr(&pifo->map, attr);
 
-	pifo->queue = pifo_queue_alloc(range, pifo->map.numa_node);
-	if (!pifo->queue)
+	pifo->q_primary = pifo_queue_alloc(range, 0, pifo->map.numa_node);
+	if (!pifo->q_primary)
 		return -ENOMEM;
 
+	pifo->q_secondary = pifo_queue_alloc(range, range, pifo->map.numa_node);
+	if (!pifo->q_secondary)
+		goto err_queue;
+
 	if (attr->map_type == BPF_MAP_TYPE_PIFO_GENERIC) {
 		size_t cache_size;
 		int i;
@@ -144,7 +157,7 @@  static int pifo_map_init_map(struct bpf_pifo_map *pifo, union bpf_attr *attr,
 		pifo->elem_cache = bpf_map_area_alloc(cache_size,
 						      pifo->map.numa_node);
 		if (!pifo->elem_cache)
-			goto err_queue;
+			goto err;
 
 		for (i = 0; i < attr->max_entries; i++)
 			pifo->elem_cache->elements[i] = (void *)&pifo->elements[i * elem_size];
@@ -153,8 +166,10 @@  static int pifo_map_init_map(struct bpf_pifo_map *pifo, union bpf_attr *attr,
 
 	return 0;
 
+err:
+	pifo_queue_free(pifo->q_secondary);
 err_queue:
-	pifo_queue_free(pifo->queue);
+	pifo_queue_free(pifo->q_primary);
 	return err;
 }
 
@@ -238,9 +253,12 @@  static void pifo_map_free(struct bpf_map *map)
 
 	synchronize_rcu();
 
-	if (map->map_type == BPF_MAP_TYPE_PIFO_XDP)
-		pifo_queue_flush(pifo->queue);
-	pifo_queue_free(pifo->queue);
+	if (map->map_type == BPF_MAP_TYPE_PIFO_XDP) {
+		pifo_queue_flush(pifo->q_primary);
+		pifo_queue_flush(pifo->q_secondary);
+	}
+	pifo_queue_free(pifo->q_primary);
+	pifo_queue_free(pifo->q_secondary);
 	bpf_map_area_free(pifo->elem_cache);
 	bpf_map_area_free(pifo);
 }
@@ -249,7 +267,7 @@  static int pifo_map_get_next_key(struct bpf_map *map, void *key, void *next_key)
 {
 	struct bpf_pifo_map *pifo = container_of(map, struct bpf_pifo_map, map);
 	u32 index = key ? *(u32 *)key : U32_MAX, offset;
-	struct bpf_pifo_queue *queue = pifo->queue;
+	struct bpf_pifo_queue *queue = pifo->q_primary;
 	unsigned long idx, flags;
 	u32 *next = next_key;
 	int ret = -ENOENT;
@@ -261,15 +279,27 @@  static int pifo_map_get_next_key(struct bpf_map *map, void *key, void *next_key)
 	else
 		offset = index - queue->min_rank + 1;
 
-	if (offset >= queue->range)
-		goto out;
+	if (offset >= queue->range) {
+		offset -= queue->range;
+		queue = pifo->q_secondary;
+
+		if (offset >= queue->range)
+			goto out;
+	}
 
+search:
 	idx = find_next_bit(queue->lvl_bitmap[queue->levels - 1],
 			    queue->range, offset);
-	if (idx == queue->range)
+	if (idx == queue->range) {
+		if (queue == pifo->q_primary) {
+			queue = pifo->q_secondary;
+			offset = 0;
+			goto search;
+		}
 		goto out;
+	}
 
-	*next = idx;
+	*next = idx + queue->min_rank;
 	ret = 0;
 out:
 	spin_unlock_irqrestore(&pifo->lock, flags);
@@ -316,7 +346,7 @@  static void pifo_item_set_next(union bpf_pifo_item *item, void *next, bool xdp)
 static int __pifo_map_enqueue(struct bpf_pifo_map *pifo, union bpf_pifo_item *item,
 			      u64 rank, bool xdp)
 {
-	struct bpf_pifo_queue *queue = pifo->queue;
+	struct bpf_pifo_queue *queue = pifo->q_primary;
 	struct bpf_pifo_bucket *bucket;
 	u64 q_index;
 
@@ -331,8 +361,16 @@  static int __pifo_map_enqueue(struct bpf_pifo_map *pifo, union bpf_pifo_item *it
 	pifo_item_set_next(item, NULL, xdp);
 
 	q_index = rank - queue->min_rank;
-	if (unlikely(q_index >= queue->range))
-		q_index = queue->range - 1;
+	if (unlikely(q_index >= queue->range)) {
+		/* If we overflow the primary queue, enqueue into secondary, and
+		 * if we overflow that enqueue as the last item
+		 */
+		q_index -= queue->range;
+		queue = pifo->q_secondary;
+
+		if (q_index >= queue->range)
+			q_index = queue->range - 1;
+	}
 
 	bucket = &queue->buckets[q_index];
 	if (likely(!bucket->head)) {
@@ -380,7 +418,7 @@  static unsigned long pifo_find_first_bucket(struct bpf_pifo_queue *queue)
 static union bpf_pifo_item *__pifo_map_dequeue(struct bpf_pifo_map *pifo,
 					       u64 flags, u64 *rank, bool xdp)
 {
-	struct bpf_pifo_queue *queue = pifo->queue;
+	struct bpf_pifo_queue *queue = pifo->q_primary;
 	struct bpf_pifo_bucket *bucket;
 	union bpf_pifo_item *item;
 	unsigned long bucket_idx;
@@ -392,6 +430,17 @@  static union bpf_pifo_item *__pifo_map_dequeue(struct bpf_pifo_map *pifo,
 		return NULL;
 	}
 
+	if (!pifo->num_queued) {
+		*rank = -ENOENT;
+		return NULL;
+	}
+
+	if (unlikely(pifo_queue_is_empty(queue))) {
+		swap(pifo->q_primary, pifo->q_secondary);
+		pifo->q_secondary->min_rank = pifo->q_primary->min_rank + pifo->q_primary->range;
+		queue = pifo->q_primary;
+	}
+
 	bucket_idx = pifo_find_first_bucket(queue);
 	if (bucket_idx == -1) {
 		*rank = -ENOENT;
@@ -437,7 +486,7 @@  struct xdp_frame *pifo_map_dequeue(struct bpf_map *map, u64 flags, u64 *rank)
 static void *pifo_map_lookup_elem(struct bpf_map *map, void *key)
 {
 	struct bpf_pifo_map *pifo = container_of(map, struct bpf_pifo_map, map);
-	struct bpf_pifo_queue *queue = pifo->queue;
+	struct bpf_pifo_queue *queue = pifo->q_primary;
 	struct bpf_pifo_bucket *bucket;
 	u32 rank =  *(u32 *)key, idx;
 
@@ -445,8 +494,13 @@  static void *pifo_map_lookup_elem(struct bpf_map *map, void *key)
 		return NULL;
 
 	idx = rank - queue->min_rank;
-	if (idx >= queue->range)
-		return NULL;
+	if (idx >= queue->range) {
+		idx -= queue->range;
+		queue = pifo->q_secondary;
+
+		if (idx >= queue->range)
+			return NULL;
+	}
 
 	bucket = &queue->buckets[idx];
 	/* FIXME: what happens if this changes while userspace is reading the