diff mbox series

[RFC,bpf-next,1/2] bpf, cpumap: Use queue_rcu_work() to remove unnecessary rcu_barrier()

Message ID 20230728023030.1906124-2-houtao@huaweicloud.com (mailing list archive)
State RFC
Delegated to: BPF
Headers show
Series Remove unnecessary synchronizations in cpumap | expand

Checks

Context Check Description
bpf/vmtest-bpf-next-PR success PR summary
bpf/vmtest-bpf-next-VM_Test-1 success Logs for ShellCheck
bpf/vmtest-bpf-next-VM_Test-6 success Logs for set-matrix
bpf/vmtest-bpf-next-VM_Test-2 success Logs for build for aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-4 success Logs for build for x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-5 success Logs for build for x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-3 success Logs for build for s390x with gcc
bpf/vmtest-bpf-next-VM_Test-7 success Logs for test_maps on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-8 pending Logs for test_maps on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-9 success Logs for test_maps on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-10 success Logs for test_maps on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-11 pending Logs for test_progs on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-12 pending Logs for test_progs on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-13 pending Logs for test_progs on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-14 success Logs for test_progs on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-15 success Logs for test_progs_no_alu32 on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-16 pending Logs for test_progs_no_alu32 on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-17 success Logs for test_progs_no_alu32 on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-18 success Logs for test_progs_no_alu32 on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-19 success Logs for test_progs_no_alu32_parallel on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-20 success Logs for test_progs_no_alu32_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-21 success Logs for test_progs_no_alu32_parallel on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-22 success Logs for test_progs_parallel on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-23 success Logs for test_progs_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-24 success Logs for test_progs_parallel on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-25 success Logs for test_verifier on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-26 pending Logs for test_verifier on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-27 success Logs for test_verifier on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-28 success Logs for test_verifier on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-29 success Logs for veristat
netdev/series_format success Posting correctly formatted
netdev/tree_selection success Clearly marked for bpf-next
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 1344 this patch: 1344
netdev/cc_maintainers success CCed 16 of 16 maintainers
netdev/build_clang success Errors and warnings before: 1365 this patch: 1365
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 1367 this patch: 1367
netdev/checkpatch warning WARNING: line length of 84 exceeds 80 columns
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

Hou Tao July 28, 2023, 2:30 a.m. UTC
From: Hou Tao <houtao1@huawei.com>

As for now __cpu_map_entry_replace() uses call_rcu() to wait for the
inflight xdp program and NAPI poll to exit the RCU read critical
section, and then launch kworker cpu_map_kthread_stop() to call
kthread_stop() to handle all pending xdp frames or skbs.

But it is unnecessary to use rcu_barrier() in cpu_map_kthread_stop() to
wait for the completion of __cpu_map_entry_free(), because rcu_barrier()
will wait for all pending RCU callbacks and cpu_map_kthread_stop() only
needs to wait for the completion of a specific __cpu_map_entry_free().

So use queue_rcu_work() to replace call_rcu(), schedule_work() and
rcu_barrier(). queue_rcu_work() will queue a __cpu_map_entry_free()
kworker after a RCU grace period. Because __cpu_map_entry_free() is
running in a kworker context, so it is OK to do all of these freeing
procedures include kthread_stop() in it.

After the update, there is no need to do reference-counting for
bpf_cpu_map_entry, because bpf_cpu_map_entry is freed directly in
__cpu_map_entry_free(), so just remove it.

Signed-off-by: Hou Tao <houtao1@huawei.com>
---
 kernel/bpf/cpumap.c | 93 +++++++++++----------------------------------
 1 file changed, 23 insertions(+), 70 deletions(-)

Comments

Toke Høiland-Jørgensen Aug. 10, 2023, 10:16 a.m. UTC | #1
Hou Tao <houtao@huaweicloud.com> writes:

> From: Hou Tao <houtao1@huawei.com>
>
> As for now __cpu_map_entry_replace() uses call_rcu() to wait for the
> inflight xdp program and NAPI poll to exit the RCU read critical
> section, and then launch kworker cpu_map_kthread_stop() to call
> kthread_stop() to handle all pending xdp frames or skbs.
>
> But it is unnecessary to use rcu_barrier() in cpu_map_kthread_stop() to
> wait for the completion of __cpu_map_entry_free(), because rcu_barrier()
> will wait for all pending RCU callbacks and cpu_map_kthread_stop() only
> needs to wait for the completion of a specific __cpu_map_entry_free().
>
> So use queue_rcu_work() to replace call_rcu(), schedule_work() and
> rcu_barrier(). queue_rcu_work() will queue a __cpu_map_entry_free()
> kworker after a RCU grace period. Because __cpu_map_entry_free() is
> running in a kworker context, so it is OK to do all of these freeing
> procedures include kthread_stop() in it.
>
> After the update, there is no need to do reference-counting for
> bpf_cpu_map_entry, because bpf_cpu_map_entry is freed directly in
> __cpu_map_entry_free(), so just remove it.
>
> Signed-off-by: Hou Tao <houtao1@huawei.com>

I think your analysis is correct, and this is a nice cleanup of what is
really a bit of an over-complicated cleanup flow - well done!

I have a few nits below, but with those feel free to resend as non-RFC
and add my:

Reviewed-by: Toke Høiland-Jørgensen <toke@redhat.com>

> ---
>  kernel/bpf/cpumap.c | 93 +++++++++++----------------------------------
>  1 file changed, 23 insertions(+), 70 deletions(-)
>
> diff --git a/kernel/bpf/cpumap.c b/kernel/bpf/cpumap.c
> index 0a16e30b16ef..24f39c37526f 100644
> --- a/kernel/bpf/cpumap.c
> +++ b/kernel/bpf/cpumap.c
> @@ -67,10 +67,7 @@ struct bpf_cpu_map_entry {
>  	struct bpf_cpumap_val value;
>  	struct bpf_prog *prog;
>  
> -	atomic_t refcnt; /* Control when this struct can be free'ed */
> -	struct rcu_head rcu;
> -
> -	struct work_struct kthread_stop_wq;
> +	struct rcu_work free_work;
>  };
>  
>  struct bpf_cpu_map {
> @@ -115,11 +112,6 @@ static struct bpf_map *cpu_map_alloc(union bpf_attr *attr)
>  	return &cmap->map;
>  }
>  
> -static void get_cpu_map_entry(struct bpf_cpu_map_entry *rcpu)
> -{
> -	atomic_inc(&rcpu->refcnt);
> -}
> -
>  static void __cpu_map_ring_cleanup(struct ptr_ring *ring)
>  {
>  	/* The tear-down procedure should have made sure that queue is
> @@ -134,43 +126,6 @@ static void __cpu_map_ring_cleanup(struct ptr_ring *ring)
>  			xdp_return_frame(xdpf);
>  }
>  
> -static void put_cpu_map_entry(struct bpf_cpu_map_entry *rcpu)
> -{
> -	if (atomic_dec_and_test(&rcpu->refcnt)) {
> -		if (rcpu->prog)
> -			bpf_prog_put(rcpu->prog);
> -		/* The queue should be empty at this point */
> -		__cpu_map_ring_cleanup(rcpu->queue);
> -		ptr_ring_cleanup(rcpu->queue, NULL);
> -		kfree(rcpu->queue);
> -		kfree(rcpu);
> -	}
> -}
> -
> -/* called from workqueue, to workaround syscall using preempt_disable */
> -static void cpu_map_kthread_stop(struct work_struct *work)
> -{
> -	struct bpf_cpu_map_entry *rcpu;
> -	int err;
> -
> -	rcpu = container_of(work, struct bpf_cpu_map_entry, kthread_stop_wq);
> -
> -	/* Wait for flush in __cpu_map_entry_free(), via full RCU barrier,
> -	 * as it waits until all in-flight call_rcu() callbacks complete.
> -	 */
> -	rcu_barrier();
> -
> -	/* kthread_stop will wake_up_process and wait for it to complete */
> -	err = kthread_stop(rcpu->kthread);
> -	if (err) {
> -		/* kthread_stop may be called before cpu_map_kthread_run
> -		 * is executed, so we need to release the memory related
> -		 * to rcpu.
> -		 */
> -		put_cpu_map_entry(rcpu);
> -	}
> -}
> -
>  static void cpu_map_bpf_prog_run_skb(struct bpf_cpu_map_entry *rcpu,
>  				     struct list_head *listp,
>  				     struct xdp_cpumap_stats *stats)
> @@ -395,7 +350,6 @@ static int cpu_map_kthread_run(void *data)
>  	}
>  	__set_current_state(TASK_RUNNING);
>  
> -	put_cpu_map_entry(rcpu);
>  	return 0;
>  }
>  
> @@ -471,9 +425,6 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
>  	if (IS_ERR(rcpu->kthread))
>  		goto free_prog;
>  
> -	get_cpu_map_entry(rcpu); /* 1-refcnt for being in cmap->cpu_map[] */
> -	get_cpu_map_entry(rcpu); /* 1-refcnt for kthread */
> -
>  	/* Make sure kthread runs on a single CPU */
>  	kthread_bind(rcpu->kthread, cpu);
>  	wake_up_process(rcpu->kthread);
> @@ -494,7 +445,7 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
>  	return NULL;
>  }
>  
> -static void __cpu_map_entry_free(struct rcu_head *rcu)
> +static void __cpu_map_entry_free(struct work_struct *work)
>  {
>  	struct bpf_cpu_map_entry *rcpu;
>  
> @@ -503,30 +454,33 @@ static void __cpu_map_entry_free(struct rcu_head *rcu)
>  	 * new packets and cannot change/set flush_needed that can
>  	 * find this entry.
>  	 */
> -	rcpu = container_of(rcu, struct bpf_cpu_map_entry, rcu);
> +	rcpu = container_of(to_rcu_work(work), struct bpf_cpu_map_entry, free_work);
>  
>  	free_percpu(rcpu->bulkq);

Let's move this free down to the end along with the others.

> -	/* Cannot kthread_stop() here, last put free rcpu resources */
> -	put_cpu_map_entry(rcpu);
> +
> +	/* kthread_stop will wake_up_process and wait for it to complete */

Suggest adding to this comment: "cpu_map_kthread_run() makes sure the
pointer ring is empty before exiting."

> +	kthread_stop(rcpu->kthread);
> +
> +	if (rcpu->prog)
> +		bpf_prog_put(rcpu->prog);
> +	/* The queue should be empty at this point */
> +	__cpu_map_ring_cleanup(rcpu->queue);
> +	ptr_ring_cleanup(rcpu->queue, NULL);
> +	kfree(rcpu->queue);
> +	kfree(rcpu);
>  }
>  
>  /* After xchg pointer to bpf_cpu_map_entry, use the call_rcu() to
> - * ensure any driver rcu critical sections have completed, but this
> - * does not guarantee a flush has happened yet. Because driver side
> - * rcu_read_lock/unlock only protects the running XDP program.  The
> - * atomic xchg and NULL-ptr check in __cpu_map_flush() makes sure a
> - * pending flush op doesn't fail.
> + * ensure both any driver rcu critical sections and xdp_do_flush()
> + * have completed.
>   *
>   * The bpf_cpu_map_entry is still used by the kthread, and there can
> - * still be pending packets (in queue and percpu bulkq).  A refcnt
> - * makes sure to last user (kthread_stop vs. call_rcu) free memory
> - * resources.
> + * still be pending packets (in queue and percpu bulkq).
>   *
> - * The rcu callback __cpu_map_entry_free flush remaining packets in
> - * percpu bulkq to queue.  Due to caller map_delete_elem() disable
> - * preemption, cannot call kthread_stop() to make sure queue is empty.
> - * Instead a work_queue is started for stopping kthread,
> - * cpu_map_kthread_stop, which waits for an RCU grace period before
> + * Due to caller map_delete_elem() is in RCU read critical section,
> + * cannot call kthread_stop() to make sure queue is empty. Instead
> + * a work_struct is started for stopping kthread,
> + * __cpu_map_entry_free, which waits for a RCU grace period before
>   * stopping kthread, emptying the queue.
>   */

I think the above comment is a bit too convoluted, still. I'd suggest
just replacing the whole thing with this:

/* After the xchg of the bpf_cpu_map_entry pointer, we need to make sure the old
 * entry is no longer in use before freeing. We use queue_rcu_work() to call
 * __cpu_map_entry_free() in a separate workqueue after waiting for an RCU grace
 * period. This means that (a) all pending enqueue and flush operations have
 * completed (because or the RCU callback), and (b) we are in a workqueue
 * context where we can stop the kthread and wait for it to exit before freeing
 * everything.
 */

>  static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
> @@ -536,9 +490,8 @@ static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
>  
>  	old_rcpu = unrcu_pointer(xchg(&cmap->cpu_map[key_cpu], RCU_INITIALIZER(rcpu)));
>  	if (old_rcpu) {
> -		call_rcu(&old_rcpu->rcu, __cpu_map_entry_free);
> -		INIT_WORK(&old_rcpu->kthread_stop_wq, cpu_map_kthread_stop);
> -		schedule_work(&old_rcpu->kthread_stop_wq);
> +		INIT_RCU_WORK(&old_rcpu->free_work, __cpu_map_entry_free);
> +		queue_rcu_work(system_wq, &old_rcpu->free_work);
>  	}
>  }
>  
> -- 
> 2.29.2
Hou Tao Aug. 11, 2023, 10:22 a.m. UTC | #2
Hi,

On 8/10/2023 6:16 PM, Toke Høiland-Jørgensen wrote:
> Hou Tao <houtao@huaweicloud.com> writes:
>
>> From: Hou Tao <houtao1@huawei.com>
>>
>> As for now __cpu_map_entry_replace() uses call_rcu() to wait for the
>> inflight xdp program and NAPI poll to exit the RCU read critical
>> section, and then launch kworker cpu_map_kthread_stop() to call
>> kthread_stop() to handle all pending xdp frames or skbs.
>>
>> But it is unnecessary to use rcu_barrier() in cpu_map_kthread_stop() to
>> wait for the completion of __cpu_map_entry_free(), because rcu_barrier()
>> will wait for all pending RCU callbacks and cpu_map_kthread_stop() only
>> needs to wait for the completion of a specific __cpu_map_entry_free().
>>
>> So use queue_rcu_work() to replace call_rcu(), schedule_work() and
>> rcu_barrier(). queue_rcu_work() will queue a __cpu_map_entry_free()
>> kworker after a RCU grace period. Because __cpu_map_entry_free() is
>> running in a kworker context, so it is OK to do all of these freeing
>> procedures include kthread_stop() in it.
>>
>> After the update, there is no need to do reference-counting for
>> bpf_cpu_map_entry, because bpf_cpu_map_entry is freed directly in
>> __cpu_map_entry_free(), so just remove it.
>>
>> Signed-off-by: Hou Tao <houtao1@huawei.com>
> I think your analysis is correct, and this is a nice cleanup of what is
> really a bit of an over-complicated cleanup flow - well done!
>
> I have a few nits below, but with those feel free to resend as non-RFC
> and add my:
>
> Reviewed-by: Toke Høiland-Jørgensen <toke@redhat.com>

Thanks for the review.
>
>> ---
>>  kernel/bpf/cpumap.c | 93 +++++++++++----------------------------------
>>  1 file changed, 23 insertions(+), 70 deletions(-)
>>
SNIP
>> -static void __cpu_map_entry_free(struct rcu_head *rcu)
>> +static void __cpu_map_entry_free(struct work_struct *work)
>>  {
>>  	struct bpf_cpu_map_entry *rcpu;
>>  
>> @@ -503,30 +454,33 @@ static void __cpu_map_entry_free(struct rcu_head *rcu)
>>  	 * new packets and cannot change/set flush_needed that can
>>  	 * find this entry.
>>  	 */
>> -	rcpu = container_of(rcu, struct bpf_cpu_map_entry, rcu);
>> +	rcpu = container_of(to_rcu_work(work), struct bpf_cpu_map_entry, free_work);
>>  
>>  	free_percpu(rcpu->bulkq);
> Let's move this free down to the end along with the others.

Will do in v1.
>
>> -	/* Cannot kthread_stop() here, last put free rcpu resources */
>> -	put_cpu_map_entry(rcpu);
>> +
>> +	/* kthread_stop will wake_up_process and wait for it to complete */
> Suggest adding to this comment: "cpu_map_kthread_run() makes sure the
> pointer ring is empty before exiting."

Will do in v1.
>
>> +	kthread_stop(rcpu->kthread);
>> +
>> +	if (rcpu->prog)
>> +		bpf_prog_put(rcpu->prog);
>> +	/* The queue should be empty at this point */
>> +	__cpu_map_ring_cleanup(rcpu->queue);
>> +	ptr_ring_cleanup(rcpu->queue, NULL);
>> +	kfree(rcpu->queue);
>> +	kfree(rcpu);
>>  }
>>  
>>  /* After xchg pointer to bpf_cpu_map_entry, use the call_rcu() to
>> - * ensure any driver rcu critical sections have completed, but this
>> - * does not guarantee a flush has happened yet. Because driver side
>> - * rcu_read_lock/unlock only protects the running XDP program.  The
>> - * atomic xchg and NULL-ptr check in __cpu_map_flush() makes sure a
>> - * pending flush op doesn't fail.
>> + * ensure both any driver rcu critical sections and xdp_do_flush()
>> + * have completed.
>>   *
>>   * The bpf_cpu_map_entry is still used by the kthread, and there can
>> - * still be pending packets (in queue and percpu bulkq).  A refcnt
>> - * makes sure to last user (kthread_stop vs. call_rcu) free memory
>> - * resources.
>> + * still be pending packets (in queue and percpu bulkq).
>>   *
>> - * The rcu callback __cpu_map_entry_free flush remaining packets in
>> - * percpu bulkq to queue.  Due to caller map_delete_elem() disable
>> - * preemption, cannot call kthread_stop() to make sure queue is empty.
>> - * Instead a work_queue is started for stopping kthread,
>> - * cpu_map_kthread_stop, which waits for an RCU grace period before
>> + * Due to caller map_delete_elem() is in RCU read critical section,
>> + * cannot call kthread_stop() to make sure queue is empty. Instead
>> + * a work_struct is started for stopping kthread,
>> + * __cpu_map_entry_free, which waits for a RCU grace period before
>>   * stopping kthread, emptying the queue.
>>   */
> I think the above comment is a bit too convoluted, still. I'd suggest
> just replacing the whole thing with this:
>
> /* After the xchg of the bpf_cpu_map_entry pointer, we need to make sure the old
>  * entry is no longer in use before freeing. We use queue_rcu_work() to call
>  * __cpu_map_entry_free() in a separate workqueue after waiting for an RCU grace
>  * period. This means that (a) all pending enqueue and flush operations have
>  * completed (because or the RCU callback), and (b) we are in a workqueue
>  * context where we can stop the kthread and wait for it to exit before freeing
>  * everything.
>  */
Much better. Thanks for the rephrasing.  Will update it in v1.
>>  static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
>> @@ -536,9 +490,8 @@ static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
>>  
>>  	old_rcpu = unrcu_pointer(xchg(&cmap->cpu_map[key_cpu], RCU_INITIALIZER(rcpu)));
>>  	if (old_rcpu) {
>> -		call_rcu(&old_rcpu->rcu, __cpu_map_entry_free);
>> -		INIT_WORK(&old_rcpu->kthread_stop_wq, cpu_map_kthread_stop);
>> -		schedule_work(&old_rcpu->kthread_stop_wq);
>> +		INIT_RCU_WORK(&old_rcpu->free_work, __cpu_map_entry_free);
>> +		queue_rcu_work(system_wq, &old_rcpu->free_work);
>>  	}
>>  }
>>  
>> -- 
>> 2.29.2
diff mbox series

Patch

diff --git a/kernel/bpf/cpumap.c b/kernel/bpf/cpumap.c
index 0a16e30b16ef..24f39c37526f 100644
--- a/kernel/bpf/cpumap.c
+++ b/kernel/bpf/cpumap.c
@@ -67,10 +67,7 @@  struct bpf_cpu_map_entry {
 	struct bpf_cpumap_val value;
 	struct bpf_prog *prog;
 
-	atomic_t refcnt; /* Control when this struct can be free'ed */
-	struct rcu_head rcu;
-
-	struct work_struct kthread_stop_wq;
+	struct rcu_work free_work;
 };
 
 struct bpf_cpu_map {
@@ -115,11 +112,6 @@  static struct bpf_map *cpu_map_alloc(union bpf_attr *attr)
 	return &cmap->map;
 }
 
-static void get_cpu_map_entry(struct bpf_cpu_map_entry *rcpu)
-{
-	atomic_inc(&rcpu->refcnt);
-}
-
 static void __cpu_map_ring_cleanup(struct ptr_ring *ring)
 {
 	/* The tear-down procedure should have made sure that queue is
@@ -134,43 +126,6 @@  static void __cpu_map_ring_cleanup(struct ptr_ring *ring)
 			xdp_return_frame(xdpf);
 }
 
-static void put_cpu_map_entry(struct bpf_cpu_map_entry *rcpu)
-{
-	if (atomic_dec_and_test(&rcpu->refcnt)) {
-		if (rcpu->prog)
-			bpf_prog_put(rcpu->prog);
-		/* The queue should be empty at this point */
-		__cpu_map_ring_cleanup(rcpu->queue);
-		ptr_ring_cleanup(rcpu->queue, NULL);
-		kfree(rcpu->queue);
-		kfree(rcpu);
-	}
-}
-
-/* called from workqueue, to workaround syscall using preempt_disable */
-static void cpu_map_kthread_stop(struct work_struct *work)
-{
-	struct bpf_cpu_map_entry *rcpu;
-	int err;
-
-	rcpu = container_of(work, struct bpf_cpu_map_entry, kthread_stop_wq);
-
-	/* Wait for flush in __cpu_map_entry_free(), via full RCU barrier,
-	 * as it waits until all in-flight call_rcu() callbacks complete.
-	 */
-	rcu_barrier();
-
-	/* kthread_stop will wake_up_process and wait for it to complete */
-	err = kthread_stop(rcpu->kthread);
-	if (err) {
-		/* kthread_stop may be called before cpu_map_kthread_run
-		 * is executed, so we need to release the memory related
-		 * to rcpu.
-		 */
-		put_cpu_map_entry(rcpu);
-	}
-}
-
 static void cpu_map_bpf_prog_run_skb(struct bpf_cpu_map_entry *rcpu,
 				     struct list_head *listp,
 				     struct xdp_cpumap_stats *stats)
@@ -395,7 +350,6 @@  static int cpu_map_kthread_run(void *data)
 	}
 	__set_current_state(TASK_RUNNING);
 
-	put_cpu_map_entry(rcpu);
 	return 0;
 }
 
@@ -471,9 +425,6 @@  __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
 	if (IS_ERR(rcpu->kthread))
 		goto free_prog;
 
-	get_cpu_map_entry(rcpu); /* 1-refcnt for being in cmap->cpu_map[] */
-	get_cpu_map_entry(rcpu); /* 1-refcnt for kthread */
-
 	/* Make sure kthread runs on a single CPU */
 	kthread_bind(rcpu->kthread, cpu);
 	wake_up_process(rcpu->kthread);
@@ -494,7 +445,7 @@  __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
 	return NULL;
 }
 
-static void __cpu_map_entry_free(struct rcu_head *rcu)
+static void __cpu_map_entry_free(struct work_struct *work)
 {
 	struct bpf_cpu_map_entry *rcpu;
 
@@ -503,30 +454,33 @@  static void __cpu_map_entry_free(struct rcu_head *rcu)
 	 * new packets and cannot change/set flush_needed that can
 	 * find this entry.
 	 */
-	rcpu = container_of(rcu, struct bpf_cpu_map_entry, rcu);
+	rcpu = container_of(to_rcu_work(work), struct bpf_cpu_map_entry, free_work);
 
 	free_percpu(rcpu->bulkq);
-	/* Cannot kthread_stop() here, last put free rcpu resources */
-	put_cpu_map_entry(rcpu);
+
+	/* kthread_stop will wake_up_process and wait for it to complete */
+	kthread_stop(rcpu->kthread);
+
+	if (rcpu->prog)
+		bpf_prog_put(rcpu->prog);
+	/* The queue should be empty at this point */
+	__cpu_map_ring_cleanup(rcpu->queue);
+	ptr_ring_cleanup(rcpu->queue, NULL);
+	kfree(rcpu->queue);
+	kfree(rcpu);
 }
 
 /* After xchg pointer to bpf_cpu_map_entry, use the call_rcu() to
- * ensure any driver rcu critical sections have completed, but this
- * does not guarantee a flush has happened yet. Because driver side
- * rcu_read_lock/unlock only protects the running XDP program.  The
- * atomic xchg and NULL-ptr check in __cpu_map_flush() makes sure a
- * pending flush op doesn't fail.
+ * ensure both any driver rcu critical sections and xdp_do_flush()
+ * have completed.
  *
  * The bpf_cpu_map_entry is still used by the kthread, and there can
- * still be pending packets (in queue and percpu bulkq).  A refcnt
- * makes sure to last user (kthread_stop vs. call_rcu) free memory
- * resources.
+ * still be pending packets (in queue and percpu bulkq).
  *
- * The rcu callback __cpu_map_entry_free flush remaining packets in
- * percpu bulkq to queue.  Due to caller map_delete_elem() disable
- * preemption, cannot call kthread_stop() to make sure queue is empty.
- * Instead a work_queue is started for stopping kthread,
- * cpu_map_kthread_stop, which waits for an RCU grace period before
+ * Due to caller map_delete_elem() is in RCU read critical section,
+ * cannot call kthread_stop() to make sure queue is empty. Instead
+ * a work_struct is started for stopping kthread,
+ * __cpu_map_entry_free, which waits for a RCU grace period before
  * stopping kthread, emptying the queue.
  */
 static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
@@ -536,9 +490,8 @@  static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
 
 	old_rcpu = unrcu_pointer(xchg(&cmap->cpu_map[key_cpu], RCU_INITIALIZER(rcpu)));
 	if (old_rcpu) {
-		call_rcu(&old_rcpu->rcu, __cpu_map_entry_free);
-		INIT_WORK(&old_rcpu->kthread_stop_wq, cpu_map_kthread_stop);
-		schedule_work(&old_rcpu->kthread_stop_wq);
+		INIT_RCU_WORK(&old_rcpu->free_work, __cpu_map_entry_free);
+		queue_rcu_work(system_wq, &old_rcpu->free_work);
 	}
 }