diff mbox series

[net-next,v2,3/8] bpf: cpumap: switch to GRO from netif_receive_skb_list()

Message ID 20250107152940.26530-4-aleksander.lobakin@intel.com (mailing list archive)
State New
Delegated to: Netdev Maintainers
Headers show
Series bpf: cpumap: enable GRO for XDP_PASS frames | expand

Checks

Context Check Description
netdev/series_format success Posting correctly formatted
netdev/tree_selection success Clearly marked for net-next, async
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 1 this patch: 1
netdev/build_tools success No tools touched, skip
netdev/cc_maintainers warning 7 maintainers not CCed: yonghong.song@linux.dev jolsa@kernel.org eddyz87@gmail.com song@kernel.org sdf@fomichev.me kpsingh@kernel.org haoluo@google.com
netdev/build_clang success Errors and warnings before: 3 this patch: 3
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 1 this patch: 1
netdev/checkpatch fail ERROR: do not use assignment in if condition
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0
netdev/contest success net-next-2025-01-07--21-00 (tests: 881)

Commit Message

Alexander Lobakin Jan. 7, 2025, 3:29 p.m. UTC
cpumap has its own BH context based on kthread. It has a sane batch
size of 8 frames per one cycle.
GRO can be used here on its own. Adjust cpumap calls to the upper stack
to use GRO API instead of netif_receive_skb_list() which processes skbs
by batches, but doesn't involve GRO layer at all.
In plenty of tests, GRO performs better than listed receiving even
given that it has to calculate full frame checksums on the CPU.
As GRO passes the skbs to the upper stack in the batches of
@gro_normal_batch, i.e. 8 by default, and skb->dev points to the
device where the frame comes from, it is enough to disable GRO
netdev feature on it to completely restore the original behaviour:
untouched frames will be being bulked and passed to the upper stack
by 8, as it was with netif_receive_skb_list().

Signed-off-by: Alexander Lobakin <aleksander.lobakin@intel.com>
Tested-by: Daniel Xu <dxu@dxuuu.xyz>
---
 kernel/bpf/cpumap.c | 45 ++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 42 insertions(+), 3 deletions(-)
diff mbox series

Patch

diff --git a/kernel/bpf/cpumap.c b/kernel/bpf/cpumap.c
index 774accbd4a22..10d062dddb6f 100644
--- a/kernel/bpf/cpumap.c
+++ b/kernel/bpf/cpumap.c
@@ -33,8 +33,8 @@ 
 #include <trace/events/xdp.h>
 #include <linux/btf_ids.h>
 
-#include <linux/netdevice.h>   /* netif_receive_skb_list */
-#include <linux/etherdevice.h> /* eth_type_trans */
+#include <linux/netdevice.h>
+#include <net/gro.h>
 
 /* General idea: XDP packets getting XDP redirected to another CPU,
  * will maximum be stored/queued for one driver ->poll() call.  It is
@@ -68,6 +68,7 @@  struct bpf_cpu_map_entry {
 
 	struct bpf_cpumap_val value;
 	struct bpf_prog *prog;
+	struct gro_node gro;
 
 	struct completion kthread_running;
 	struct rcu_work free_work;
@@ -261,10 +262,36 @@  static int cpu_map_bpf_prog_run(struct bpf_cpu_map_entry *rcpu, void **frames,
 	return nframes;
 }
 
+static void cpu_map_gro_receive(struct bpf_cpu_map_entry *rcpu,
+				struct list_head *list)
+{
+	struct sk_buff *skb, *tmp;
+
+	list_for_each_entry_safe(skb, tmp, list, list) {
+		skb_list_del_init(skb);
+		gro_receive_skb(&rcpu->gro, skb);
+	}
+}
+
+static void cpu_map_gro_flush(struct bpf_cpu_map_entry *rcpu, bool empty)
+{
+	/*
+	 * If the ring is not empty, there'll be a new iteration soon, and we
+	 * only need to do a full flush if a tick is long (> 1 ms).
+	 * If the ring is empty, to not hold GRO packets in the stack for too
+	 * long, do a full flush.
+	 * This is equivalent to how NAPI decides whether to perform a full
+	 * flush.
+	 */
+	gro_flush(&rcpu->gro, !empty && HZ >= 1000);
+	gro_normal_list(&rcpu->gro);
+}
+
 static int cpu_map_kthread_run(void *data)
 {
 	struct bpf_cpu_map_entry *rcpu = data;
 	unsigned long last_qs = jiffies;
+	u32 packets = 0;
 
 	complete(&rcpu->kthread_running);
 	set_current_state(TASK_INTERRUPTIBLE);
@@ -282,6 +309,7 @@  static int cpu_map_kthread_run(void *data)
 		void *frames[CPUMAP_BATCH];
 		void *skbs[CPUMAP_BATCH];
 		LIST_HEAD(list);
+		bool empty;
 
 		/* Release CPU reschedule checks */
 		if (__ptr_ring_empty(rcpu->queue)) {
@@ -361,7 +389,15 @@  static int cpu_map_kthread_run(void *data)
 		trace_xdp_cpumap_kthread(rcpu->map_id, n, kmem_alloc_drops,
 					 sched, &stats);
 
-		netif_receive_skb_list(&list);
+		cpu_map_gro_receive(rcpu, &list);
+
+		/* Flush either every 64 packets or in case of empty ring */
+		empty = __ptr_ring_empty(rcpu->queue);
+		if (packets += n >= NAPI_POLL_WEIGHT || empty) {
+			cpu_map_gro_flush(rcpu, empty);
+			packets = 0;
+		}
+
 		local_bh_enable(); /* resched point, may call do_softirq() */
 	}
 	__set_current_state(TASK_RUNNING);
@@ -430,6 +466,7 @@  __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
 	rcpu->cpu    = cpu;
 	rcpu->map_id = map->id;
 	rcpu->value.qsize  = value->qsize;
+	gro_init(&rcpu->gro);
 
 	if (fd > 0 && __cpu_map_load_bpf_program(rcpu, map, fd))
 		goto free_ptr_ring;
@@ -458,6 +495,7 @@  __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
 	if (rcpu->prog)
 		bpf_prog_put(rcpu->prog);
 free_ptr_ring:
+	gro_cleanup(&rcpu->gro);
 	ptr_ring_cleanup(rcpu->queue, NULL);
 free_queue:
 	kfree(rcpu->queue);
@@ -487,6 +525,7 @@  static void __cpu_map_entry_free(struct work_struct *work)
 
 	if (rcpu->prog)
 		bpf_prog_put(rcpu->prog);
+	gro_cleanup(&rcpu->gro);
 	/* The queue should be empty at this point */
 	__cpu_map_ring_cleanup(rcpu->queue);
 	ptr_ring_cleanup(rcpu->queue, NULL);