diff mbox series

[RFC,bpf-next,v5,1/2] bpf: Only reuse after one RCU GP in bpf memory allocator

Message ID 20230619143231.222536-2-houtao@huaweicloud.com (mailing list archive)
State RFC
Delegated to: BPF
Headers show
Series Handle immediate reuse in bpf memory allocator | expand

Checks

Context Check Description
bpf/vmtest-bpf-next-PR success PR summary
bpf/vmtest-bpf-next-VM_Test-1 success Logs for ShellCheck
bpf/vmtest-bpf-next-VM_Test-2 success Logs for build for aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-4 success Logs for build for x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-5 success Logs for build for x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-6 success Logs for set-matrix
bpf/vmtest-bpf-next-VM_Test-3 success Logs for build for s390x with gcc
bpf/vmtest-bpf-next-VM_Test-27 success Logs for test_verifier on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-7 success Logs for test_maps on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-9 success Logs for test_maps on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-10 success Logs for test_maps on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-11 success Logs for test_progs on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-13 success Logs for test_progs on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-14 success Logs for test_progs on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-15 success Logs for test_progs_no_alu32 on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-17 success Logs for test_progs_no_alu32 on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-18 success Logs for test_progs_no_alu32 on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-19 success Logs for test_progs_no_alu32_parallel on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-20 success Logs for test_progs_no_alu32_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-21 success Logs for test_progs_no_alu32_parallel on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-22 success Logs for test_progs_parallel on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-23 success Logs for test_progs_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-24 success Logs for test_progs_parallel on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-25 success Logs for test_verifier on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-26 success Logs for test_verifier on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-28 success Logs for test_verifier on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-29 success Logs for veristat
netdev/series_format success Posting correctly formatted
netdev/tree_selection success Clearly marked for bpf-next
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 13 this patch: 13
netdev/cc_maintainers success CCed 12 of 12 maintainers
netdev/build_clang success Errors and warnings before: 8 this patch: 8
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 13 this patch: 13
netdev/checkpatch warning WARNING: line length of 81 exceeds 80 columns WARNING: line length of 82 exceeds 80 columns WARNING: line length of 83 exceeds 80 columns WARNING: line length of 84 exceeds 80 columns WARNING: line length of 86 exceeds 80 columns WARNING: line length of 87 exceeds 80 columns WARNING: line length of 89 exceeds 80 columns WARNING: line length of 90 exceeds 80 columns WARNING: line length of 95 exceeds 80 columns
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0
bpf/vmtest-bpf-next-VM_Test-12 success Logs for test_progs on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-16 success Logs for test_progs_no_alu32 on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-8 success Logs for test_maps on s390x with gcc

Commit Message

Hou Tao June 19, 2023, 2:32 p.m. UTC
From: Hou Tao <houtao1@huawei.com>

Currently the freed objects in bpf memory allocator may be reused
immediately by new allocation, it introduces use-after-bpf-ma-free
problem for non-preallocated hash map and makes lookup procedure
return incorrect result. The immediate reuse also makes introducing
new use case more difficult (e.g. qp-trie).

So implement reuse-after-RCU-GP to solve these problems. For
reuse-after-RCU-GP, these freed objects are reused only after one RCU
grace period and may be returned back to slab system after another
RCU-tasks-trace grace period. So for bpf programs which care about reuse
problem, these programs can use bpf_rcu_read_{lock,unlock}() to access
these freed objects safely and for those which doesn't care, there will
be safely use-after-bpf-ma-free because these objects have not been
freed by bpf memory allocator.

To make these freed elements being reusab quickly, allocate memory
dynamically to create inflight RCU callbacks to mark these freed objects
being reusab. These allocated memories will be freed when reuse_rcu
callbacks completes. When no memory is available, it will use per-cpu
rcu_head in bpf_mem_cache to mark these objects as being reusable. Part
of these reusable objects will be freed through RCU-tasks-trace grace
period to reduce the risk of OOM and these freeing objects will be also
available for reuse. To reduce the lock contention and keep numa-aware
attribute as much as possbile, per-cpu list is used for reusable objects
and freeing objects.

As shown in the following benchmark results, the memory usage increases
a lot and the performance of overwrite and batch_op case is also
degraded after applying the patch. The benchmark is conducted on a
KVM-VM with 8-CPUs and 16GB memory.

The command line for htab-mem-benchmark is:

  ./bench htab-mem --use-case $name -w 3 -d 10 -a -p 8

And the command line for map_perf_test benchmark is:

  ./map_perf_test 4 8 16384

htab-mem-benchmark (before):
overwrite           per-prod-op 115.90 ± 0.04k/s, avg mem  2.14 ± 0.12MiB, peak mem  2.73MiB
batch_add_batch_del per-prod-op 113.40 ± 0.35k/s, avg mem  2.32 ± 0.20MiB, peak mem  2.75MiB
add_del_on_diff_cpu per-prod-op  18.43 ± 0.08k/s, avg mem 18.17 ± 3.84MiB, peak mem 31.20MiB

map_perf_test (before):
0:hash_map_perf kmalloc 302508 events per sec
1:hash_map_perf kmalloc 161805 events per sec
2:hash_map_perf kmalloc 161138 events per sec
3:hash_map_perf kmalloc 161138 events per sec
4:hash_map_perf kmalloc 158535 events per sec
5:hash_map_perf kmalloc 161456 events per sec
6:hash_map_perf kmalloc 160277 events per sec
7:hash_map_perf kmalloc 159522 events per sec

htab-mem-benchmark (after):
overwrite           per-prod-op 49.11 ± 1.30k/s, avg mem 313.09 ± 80.36MiB, peak mem 509.09MiB
batch_add_batch_del per-prod-op 76.06 ± 2.38k/s, avg mem 287.97 ± 63.59MiB, peak mem 496.81MiB
add_del_on_diff_cpu per-prod-op 18.75 ± 0.09k/s, avg mem  27.71 ±  4.92MiB, peak mem  44.54MiB

map_perf_test (after)
2:hash_map_perf kmalloc 167714 events per sec
0:hash_map_perf kmalloc 118467 events per sec
4:hash_map_perf kmalloc 116437 events per sec
1:hash_map_perf kmalloc 115299 events per sec
5:hash_map_perf kmalloc 116043 events per sec
7:hash_map_perf kmalloc 115735 events per sec
6:hash_map_perf kmalloc 115219 events per sec
3:hash_map_perf kmalloc 113883 events per sec

Signed-off-by: Hou Tao <houtao1@huawei.com>
---
 kernel/bpf/memalloc.c | 333 ++++++++++++++++++++++++++----------------
 1 file changed, 208 insertions(+), 125 deletions(-)
diff mbox series

Patch

diff --git a/kernel/bpf/memalloc.c b/kernel/bpf/memalloc.c
index 0668bcd7c926..9b31c53fd285 100644
--- a/kernel/bpf/memalloc.c
+++ b/kernel/bpf/memalloc.c
@@ -96,19 +96,33 @@  struct bpf_mem_cache {
 	int unit_size;
 	/* count of objects in free_llist */
 	int free_cnt;
-	int low_watermark, high_watermark, batch;
+	int prepare_reuse_cnt;
+	int watermark, batch;
 	int percpu_size;
+	bool direct_free;
+	raw_spinlock_t lock;
 
-	struct rcu_head rcu;
+	struct rcu_head reuse_rh;
+	struct rcu_head free_rh;
 	struct llist_head free_by_rcu;
 	struct llist_head waiting_for_gp;
-	atomic_t call_rcu_in_progress;
+	struct llist_head reuse_ready;
+	struct llist_head wait_for_free;
+	atomic_t reuse_rcu_in_progress;
+	atomic_t free_rcu_in_progress;
+	atomic_t dyn_reuse_rcu_cnt;
 };
 
 struct bpf_mem_caches {
 	struct bpf_mem_cache cache[NUM_CACHES];
 };
 
+struct bpf_reuse_batch {
+	struct bpf_mem_cache *c;
+	struct llist_node *head;
+	struct rcu_head rcu;
+};
+
 static struct llist_node notrace *__llist_del_first(struct llist_head *head)
 {
 	struct llist_node *entry, *next;
@@ -153,6 +167,46 @@  static struct mem_cgroup *get_memcg(const struct bpf_mem_cache *c)
 #endif
 }
 
+static int bpf_ma_get_reusable_obj(struct bpf_mem_cache *c, int cnt)
+{
+	struct llist_node *head = NULL, *tail = NULL, *obj;
+	unsigned long flags;
+	int alloc = 0;
+
+	if (llist_empty(&c->reuse_ready) && llist_empty(&c->wait_for_free))
+		return 0;
+
+	raw_spin_lock_irqsave(&c->lock, flags);
+	while (alloc < cnt) {
+		obj = __llist_del_first(&c->reuse_ready);
+		if (!obj) {
+			obj = __llist_del_first(&c->wait_for_free);
+			if (!obj)
+				break;
+		}
+		if (!tail)
+			tail = obj;
+		obj->next = head;
+		head = obj;
+		alloc++;
+	}
+	raw_spin_unlock_irqrestore(&c->lock, flags);
+
+	if (!alloc)
+		goto out;
+
+	if (IS_ENABLED(CONFIG_PREEMPT_RT))
+		local_irq_save(flags);
+	WARN_ON_ONCE(local_inc_return(&c->active) != 1);
+	__llist_add_batch(head, tail, &c->free_llist);
+	c->free_cnt += alloc;
+	local_dec(&c->active);
+	if (IS_ENABLED(CONFIG_PREEMPT_RT))
+		local_irq_restore(flags);
+out:
+	return alloc;
+}
+
 /* Mostly runs from irq_work except __init phase. */
 static void alloc_bulk(struct bpf_mem_cache *c, int cnt, int node)
 {
@@ -161,32 +215,21 @@  static void alloc_bulk(struct bpf_mem_cache *c, int cnt, int node)
 	void *obj;
 	int i;
 
+	i = bpf_ma_get_reusable_obj(c, cnt);
+	if (i >= cnt)
+		return;
+
 	memcg = get_memcg(c);
 	old_memcg = set_active_memcg(memcg);
-	for (i = 0; i < cnt; i++) {
-		/*
-		 * free_by_rcu is only manipulated by irq work refill_work().
-		 * IRQ works on the same CPU are called sequentially, so it is
-		 * safe to use __llist_del_first() here. If alloc_bulk() is
-		 * invoked by the initial prefill, there will be no running
-		 * refill_work(), so __llist_del_first() is fine as well.
-		 *
-		 * In most cases, objects on free_by_rcu are from the same CPU.
-		 * If some objects come from other CPUs, it doesn't incur any
-		 * harm because NUMA_NO_NODE means the preference for current
-		 * numa node and it is not a guarantee.
+	for (; i < cnt; i++) {
+		/* Allocate, but don't deplete atomic reserves that typical
+		 * GFP_ATOMIC would do. irq_work runs on this cpu and kmalloc
+		 * will allocate from the current numa node which is what we
+		 * want here.
 		 */
-		obj = __llist_del_first(&c->free_by_rcu);
-		if (!obj) {
-			/* Allocate, but don't deplete atomic reserves that typical
-			 * GFP_ATOMIC would do. irq_work runs on this cpu and kmalloc
-			 * will allocate from the current numa node which is what we
-			 * want here.
-			 */
-			obj = __alloc(c, node, GFP_NOWAIT | __GFP_NOWARN | __GFP_ACCOUNT);
-			if (!obj)
-				break;
-		}
+		obj = __alloc(c, node, GFP_NOWAIT | __GFP_NOWARN | __GFP_ACCOUNT);
+		if (!obj)
+			break;
 		if (IS_ENABLED(CONFIG_PREEMPT_RT))
 			/* In RT irq_work runs in per-cpu kthread, so disable
 			 * interrupts to avoid preemption and interrupts and
@@ -230,100 +273,131 @@  static void free_all(struct llist_node *llnode, bool percpu)
 		free_one(pos, percpu);
 }
 
-static void __free_rcu(struct rcu_head *head)
+static void free_rcu(struct rcu_head *rcu)
 {
-	struct bpf_mem_cache *c = container_of(head, struct bpf_mem_cache, rcu);
+	struct bpf_mem_cache *c = container_of(rcu, struct bpf_mem_cache, free_rh);
+	struct llist_node *head;
+	unsigned long flags;
 
-	free_all(llist_del_all(&c->waiting_for_gp), !!c->percpu_size);
-	atomic_set(&c->call_rcu_in_progress, 0);
-}
+	/* Draining or alloc_bulk() may be in progress */
+	raw_spin_lock_irqsave(&c->lock, flags);
+	head = __llist_del_all(&c->wait_for_free);
+	raw_spin_unlock_irqrestore(&c->lock, flags);
 
-static void __free_rcu_tasks_trace(struct rcu_head *head)
-{
-	/* If RCU Tasks Trace grace period implies RCU grace period,
-	 * there is no need to invoke call_rcu().
-	 */
-	if (rcu_trace_implies_rcu_gp())
-		__free_rcu(head);
-	else
-		call_rcu(head, __free_rcu);
+	free_all(head, !!c->percpu_size);
+	atomic_set(&c->free_rcu_in_progress, 0);
 }
 
-static void enque_to_free(struct bpf_mem_cache *c, void *obj)
+static void bpf_ma_add_to_reuse_ready_or_free(struct bpf_mem_cache *c, struct llist_node *head)
 {
-	struct llist_node *llnode = obj;
+	bool direct_free = false;
+	struct llist_node *tail;
+	unsigned long flags;
 
-	/* bpf_mem_cache is a per-cpu object. Freeing happens in irq_work.
-	 * Nothing races to add to free_by_rcu list.
+	tail = head;
+	while (tail->next)
+		tail = tail->next;
+
+	raw_spin_lock_irqsave(&c->lock, flags);
+	/* Don't move these objects to reuse_ready list and free
+	 * these objects directly.
 	 */
-	__llist_add(llnode, &c->free_by_rcu);
+	if (c->direct_free) {
+		direct_free = true;
+		goto unlock;
+	}
+
+	__llist_add_batch(head, tail, &c->reuse_ready);
+
+	if (atomic_xchg(&c->free_rcu_in_progress, 1))
+		goto unlock;
+
+	WARN_ON_ONCE(!llist_empty(&c->wait_for_free));
+	c->wait_for_free.first = __llist_del_all(&c->reuse_ready);
+	raw_spin_unlock_irqrestore(&c->lock, flags);
+	call_rcu_tasks_trace(&c->free_rh, free_rcu);
+	return;
+
+unlock:
+	raw_spin_unlock_irqrestore(&c->lock, flags);
+	if (direct_free)
+		free_all(head, !!c->percpu_size);
+	return;
 }
 
-static void do_call_rcu(struct bpf_mem_cache *c)
+static void reuse_rcu(struct rcu_head *rcu)
 {
-	struct llist_node *llnode, *t;
+	struct bpf_mem_cache *c = container_of(rcu, struct bpf_mem_cache, reuse_rh);
+	struct llist_node *head;
+
+	head = llist_del_all(&c->waiting_for_gp);
+	/* Draining is in progress ? */
+	if (head)
+		bpf_ma_add_to_reuse_ready_or_free(c, head);
+	atomic_set(&c->reuse_rcu_in_progress, 0);
+}
 
-	if (atomic_xchg(&c->call_rcu_in_progress, 1))
-		return;
+static void dyn_reuse_rcu(struct rcu_head *rcu)
+{
+	struct bpf_reuse_batch *batch = container_of(rcu, struct bpf_reuse_batch, rcu);
+	struct bpf_mem_cache *c = batch->c;
 
-	WARN_ON_ONCE(!llist_empty(&c->waiting_for_gp));
-	llist_for_each_safe(llnode, t, __llist_del_all(&c->free_by_rcu))
-		/* There is no concurrent __llist_add(waiting_for_gp) access.
-		 * It doesn't race with llist_del_all either.
-		 * But there could be two concurrent llist_del_all(waiting_for_gp):
-		 * from __free_rcu() and from drain_mem_cache().
-		 */
-		__llist_add(llnode, &c->waiting_for_gp);
-	/* Use call_rcu_tasks_trace() to wait for sleepable progs to finish.
-	 * If RCU Tasks Trace grace period implies RCU grace period, free
-	 * these elements directly, else use call_rcu() to wait for normal
-	 * progs to finish and finally do free_one() on each element.
-	 */
-	call_rcu_tasks_trace(&c->rcu, __free_rcu_tasks_trace);
+	bpf_ma_add_to_reuse_ready_or_free(c, batch->head);
+	atomic_dec(&c->dyn_reuse_rcu_cnt);
+	kfree(batch);
 }
 
-static void free_bulk(struct bpf_mem_cache *c)
+static void reuse_bulk(struct bpf_mem_cache *c)
 {
-	struct llist_node *llnode, *t;
+	struct llist_node *head, *tail;
+	struct bpf_reuse_batch *batch;
 	unsigned long flags;
-	int cnt;
 
-	do {
-		if (IS_ENABLED(CONFIG_PREEMPT_RT))
-			local_irq_save(flags);
-		WARN_ON_ONCE(local_inc_return(&c->active) != 1);
-		llnode = __llist_del_first(&c->free_llist);
-		if (llnode)
-			cnt = --c->free_cnt;
-		else
-			cnt = 0;
-		local_dec(&c->active);
-		if (IS_ENABLED(CONFIG_PREEMPT_RT))
-			local_irq_restore(flags);
-		if (llnode)
-			enque_to_free(c, llnode);
-	} while (cnt > (c->high_watermark + c->low_watermark) / 2);
+	head = llist_del_all(&c->free_llist_extra);
+	tail = head;
+	while (tail && tail->next)
+		tail = tail->next;
+
+	if (IS_ENABLED(CONFIG_PREEMPT_RT))
+		local_irq_save(flags);
+	WARN_ON_ONCE(local_inc_return(&c->active) != 1);
+	if (head)
+		__llist_add_batch(head, tail, &c->free_by_rcu);
+	c->prepare_reuse_cnt = 0;
+	local_dec(&c->active);
+	if (IS_ENABLED(CONFIG_PREEMPT_RT))
+		local_irq_restore(flags);
+
+	batch = kmalloc(sizeof(*batch), GFP_NOWAIT | __GFP_NOWARN);
+	if (batch) {
+		batch->c = c;
+		batch->head = __llist_del_all(&c->free_by_rcu);
+		atomic_inc(&c->dyn_reuse_rcu_cnt);
+		call_rcu(&batch->rcu, dyn_reuse_rcu);
+		return;
+	}
+
+	if (atomic_xchg(&c->reuse_rcu_in_progress, 1))
+		return;
 
-	/* and drain free_llist_extra */
-	llist_for_each_safe(llnode, t, llist_del_all(&c->free_llist_extra))
-		enque_to_free(c, llnode);
-	do_call_rcu(c);
+	WARN_ON_ONCE(!llist_empty(&c->waiting_for_gp));
+	c->waiting_for_gp.first = __llist_del_all(&c->free_by_rcu);
+	call_rcu(&c->reuse_rh, reuse_rcu);
 }
 
 static void bpf_mem_refill(struct irq_work *work)
 {
 	struct bpf_mem_cache *c = container_of(work, struct bpf_mem_cache, refill_work);
-	int cnt;
 
 	/* Racy access to free_cnt. It doesn't need to be 100% accurate */
-	cnt = c->free_cnt;
-	if (cnt < c->low_watermark)
+	if (c->free_cnt <= c->watermark)
 		/* irq_work runs on this cpu and kmalloc will allocate
 		 * from the current numa node which is what we want here.
 		 */
 		alloc_bulk(c, c->batch, NUMA_NO_NODE);
-	else if (cnt > c->high_watermark)
-		free_bulk(c);
+
+	if (c->prepare_reuse_cnt >= c->watermark)
+		reuse_bulk(c);
 }
 
 static void notrace irq_work_raise(struct bpf_mem_cache *c)
@@ -335,7 +409,7 @@  static void notrace irq_work_raise(struct bpf_mem_cache *c)
  * the freelist cache will be elem_size * 64 (or less) on each cpu.
  *
  * For bpf programs that don't have statically known allocation sizes and
- * assuming (low_mark + high_mark) / 2 as an average number of elements per
+ * assuming watermark * 2 as an average number of elements per
  * bucket and all buckets are used the total amount of memory in freelists
  * on each cpu will be:
  * 64*16 + 64*32 + 64*64 + 64*96 + 64*128 + 64*196 + 64*256 + 32*512 + 16*1024 + 8*2048 + 4*4096
@@ -349,19 +423,15 @@  static void notrace irq_work_raise(struct bpf_mem_cache *c)
 static void prefill_mem_cache(struct bpf_mem_cache *c, int cpu)
 {
 	init_irq_work(&c->refill_work, bpf_mem_refill);
-	if (c->unit_size <= 256) {
-		c->low_watermark = 32;
-		c->high_watermark = 96;
-	} else {
-		/* When page_size == 4k, order-0 cache will have low_mark == 2
-		 * and high_mark == 6 with batch alloc of 3 individual pages at
-		 * a time.
-		 * 8k allocs and above low == 1, high == 3, batch == 1.
+	if (c->unit_size <= 256)
+		c->watermark = 32;
+	else
+		/* When page_size == 4k, order-0 cache will have mark == 2
+		 * with batch alloc of 2 individual pages at a time.
+		 * 8k allocs and above low == 1, batch == 1.
 		 */
-		c->low_watermark = max(32 * 256 / c->unit_size, 1);
-		c->high_watermark = max(96 * 256 / c->unit_size, 3);
-	}
-	c->batch = max((c->high_watermark - c->low_watermark) / 4 * 3, 1);
+		c->watermark = max(32 * 256 / c->unit_size, 1);
+	c->batch = c->watermark;
 
 	/* To avoid consuming memory assume that 1st run of bpf
 	 * prog won't be doing more than 4 map_update_elem from
@@ -406,6 +476,7 @@  int bpf_mem_alloc_init(struct bpf_mem_alloc *ma, int size, bool percpu)
 			c->unit_size = unit_size;
 			c->objcg = objcg;
 			c->percpu_size = percpu_size;
+			raw_spin_lock_init(&c->lock);
 			prefill_mem_cache(c, cpu);
 		}
 		ma->cache = pc;
@@ -428,6 +499,7 @@  int bpf_mem_alloc_init(struct bpf_mem_alloc *ma, int size, bool percpu)
 			c = &cc->cache[i];
 			c->unit_size = sizes[i];
 			c->objcg = objcg;
+			raw_spin_lock_init(&c->lock);
 			prefill_mem_cache(c, cpu);
 		}
 	}
@@ -438,16 +510,28 @@  int bpf_mem_alloc_init(struct bpf_mem_alloc *ma, int size, bool percpu)
 static void drain_mem_cache(struct bpf_mem_cache *c)
 {
 	bool percpu = !!c->percpu_size;
+	struct llist_node *head[2];
+	unsigned long flags;
 
 	/* No progs are using this bpf_mem_cache, but htab_map_free() called
 	 * bpf_mem_cache_free() for all remaining elements and they can be in
 	 * free_by_rcu or in waiting_for_gp lists, so drain those lists now.
 	 *
-	 * Except for waiting_for_gp list, there are no concurrent operations
-	 * on these lists, so it is safe to use __llist_del_all().
+	 * Except for waiting_for_gp, reuse_ready and wait_for_free list,
+	 * there are no concurrent operations on these lists, so it is safe
+	 * to use __llist_del_all().
 	 */
 	free_all(__llist_del_all(&c->free_by_rcu), percpu);
 	free_all(llist_del_all(&c->waiting_for_gp), percpu);
+
+	raw_spin_lock_irqsave(&c->lock, flags);
+	c->direct_free = true;
+	head[0] = __llist_del_all(&c->reuse_ready);
+	head[1] = __llist_del_all(&c->wait_for_free);
+	raw_spin_unlock_irqrestore(&c->lock, flags);
+	free_all(head[0], percpu);
+	free_all(head[1], percpu);
+
 	free_all(__llist_del_all(&c->free_llist), percpu);
 	free_all(__llist_del_all(&c->free_llist_extra), percpu);
 }
@@ -462,19 +546,13 @@  static void free_mem_alloc_no_barrier(struct bpf_mem_alloc *ma)
 
 static void free_mem_alloc(struct bpf_mem_alloc *ma)
 {
-	/* waiting_for_gp lists was drained, but __free_rcu might
-	 * still execute. Wait for it now before we freeing percpu caches.
-	 *
-	 * rcu_barrier_tasks_trace() doesn't imply synchronize_rcu_tasks_trace(),
-	 * but rcu_barrier_tasks_trace() and rcu_barrier() below are only used
-	 * to wait for the pending __free_rcu_tasks_trace() and __free_rcu(),
-	 * so if call_rcu(head, __free_rcu) is skipped due to
-	 * rcu_trace_implies_rcu_gp(), it will be OK to skip rcu_barrier() by
-	 * using rcu_trace_implies_rcu_gp() as well.
+	/* Use rcu_barrier() to wait for the pending reuse_rcu() and use
+	 * rcu_barrier_tasks_trace() to wait for the pending free_rcu().
+	 * direct_free has already been set to prevent reuse_rcu() from
+	 * calling freee_rcu() again.
 	 */
+	rcu_barrier();
 	rcu_barrier_tasks_trace();
-	if (!rcu_trace_implies_rcu_gp())
-		rcu_barrier();
 	free_mem_alloc_no_barrier(ma);
 }
 
@@ -535,7 +613,9 @@  void bpf_mem_alloc_destroy(struct bpf_mem_alloc *ma)
 			 */
 			irq_work_sync(&c->refill_work);
 			drain_mem_cache(c);
-			rcu_in_progress += atomic_read(&c->call_rcu_in_progress);
+			rcu_in_progress += atomic_read(&c->reuse_rcu_in_progress);
+			rcu_in_progress += atomic_read(&c->free_rcu_in_progress);
+			rcu_in_progress += atomic_read(&c->dyn_reuse_rcu_cnt);
 		}
 		/* objcg is the same across cpus */
 		if (c->objcg)
@@ -550,7 +630,9 @@  void bpf_mem_alloc_destroy(struct bpf_mem_alloc *ma)
 				c = &cc->cache[i];
 				irq_work_sync(&c->refill_work);
 				drain_mem_cache(c);
-				rcu_in_progress += atomic_read(&c->call_rcu_in_progress);
+				rcu_in_progress += atomic_read(&c->reuse_rcu_in_progress);
+				rcu_in_progress += atomic_read(&c->free_rcu_in_progress);
+				rcu_in_progress += atomic_read(&c->dyn_reuse_rcu_cnt);
 			}
 		}
 		if (c->objcg)
@@ -589,14 +671,14 @@  static void notrace *unit_alloc(struct bpf_mem_cache *c)
 
 	WARN_ON(cnt < 0);
 
-	if (cnt < c->low_watermark)
+	if (cnt <= c->watermark)
 		irq_work_raise(c);
 	return llnode;
 }
 
 /* Though 'ptr' object could have been allocated on a different cpu
- * add it to the free_llist of the current cpu.
- * Let kfree() logic deal with it when it's later called from irq_work.
+ * add it to the free_by_rcu list of the current cpu.
+ * Let kfree() logic deal with it when it's later called from RCU cb.
  */
 static void notrace unit_free(struct bpf_mem_cache *c, void *ptr)
 {
@@ -607,12 +689,13 @@  static void notrace unit_free(struct bpf_mem_cache *c, void *ptr)
 	BUILD_BUG_ON(LLIST_NODE_SZ > 8);
 
 	local_irq_save(flags);
+	/* In case a NMI-context bpf program is also freeing object. */
 	if (local_inc_return(&c->active) == 1) {
-		__llist_add(llnode, &c->free_llist);
-		cnt = ++c->free_cnt;
+		__llist_add(llnode, &c->free_by_rcu);
+		cnt = ++c->prepare_reuse_cnt;
 	} else {
 		/* unit_free() cannot fail. Therefore add an object to atomic
-		 * llist. free_bulk() will drain it. Though free_llist_extra is
+		 * llist. reuse_bulk() will drain it. Though free_llist_extra is
 		 * a per-cpu list we have to use atomic llist_add here, since
 		 * it also can be interrupted by bpf nmi prog that does another
 		 * unit_free() into the same free_llist_extra.
@@ -622,7 +705,7 @@  static void notrace unit_free(struct bpf_mem_cache *c, void *ptr)
 	local_dec(&c->active);
 	local_irq_restore(flags);
 
-	if (cnt > c->high_watermark)
+	if (cnt >= c->watermark)
 		/* free few objects from current cpu into global kmalloc pool */
 		irq_work_raise(c);
 }