diff mbox

[RFC,1/3] lib/list_batch: A simple list insertion/deletion batching facility

Message ID 1453824219-51437-2-git-send-email-Waiman.Long@hpe.com (mailing list archive)
State New, archived
Headers show

Commit Message

Waiman Long Jan. 26, 2016, 4:03 p.m. UTC
Linked list insertion or deletion under lock is a very common activity
in the Linux kernel. If this is the only activity under lock, the
locking overhead can be pretty large compared with the actual time
spent on the insertion or deletion operation itself especially on a
large system with many CPUs.

This patch introduces a simple list insertion/deletion batching
facility where a group of list insertion and deletion operations are
grouped together in a single batch under lock. This can reduce the
locking overhead and improve overall system performance.

The fast path of this batching facility will be similar in performance
to the "lock; listop; unlock;" sequence of the existing code. If
the lock is not available, it will enter slowpath where the batching
happens.

A new config option LIST_BATCHING is added so that we can control on
which architecture do we want to have this facility enabled.

Signed-off-by: Waiman Long <Waiman.Long@hpe.com>
---
 include/linux/list_batch.h |  120 ++++++++++++++++++++++++++++++++++++++++++++
 lib/Kconfig                |    7 +++
 lib/Makefile               |    1 +
 lib/list_batch.c           |  117 ++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 245 insertions(+), 0 deletions(-)
 create mode 100644 include/linux/list_batch.h
 create mode 100644 lib/list_batch.c

Comments

Peter Zijlstra Jan. 27, 2016, 4:34 p.m. UTC | #1
On Tue, Jan 26, 2016 at 11:03:37AM -0500, Waiman Long wrote:
> +static __always_inline void _list_batch_cmd(enum list_batch_cmd cmd,
> +					    struct list_head *head,
> +					    struct list_head *entry)
> +{
> +	if (cmd == lb_cmd_add)
> +		list_add(entry, head);
> +	else if (cmd == lb_cmd_del)
> +		list_del(entry);
> +	else /* cmd == lb_cmd_del_init */
> +		list_del_init(entry);

Maybe use switch(), GCC has fancy warns with enums and switch().

> +}

> +static inline void do_list_batch(enum list_batch_cmd cmd, spinlock_t *lock,
> +				   struct list_batch *batch,
> +				   struct list_head *entry)
> +{
> +	/*
> +	 * Fast path
> +	 */
> +	if (spin_trylock(lock)) {
> +		_list_batch_cmd(cmd, batch->list, entry);
> +		spin_unlock(lock);

This is still quite a lot of code for an inline function

> +		return;
> +	}
> +	do_list_batch_slowpath(cmd, lock, batch, entry);
> +}



> +void do_list_batch_slowpath(enum list_batch_cmd cmd, spinlock_t *lock,
> +			    struct list_batch *batch, struct list_head *entry)
> +{
> +	struct list_batch_qnode node, *prev, *next, *nptr;
> +	int loop;
> +
> +	/*
> +	 * Put itself into the list_batch queue
> +	 */
> +	node.next  = NULL;
> +	node.entry = entry;
> +	node.cmd   = cmd;
> +	node.state = lb_state_waiting;
> +

Here we rely on the release barrier implied by xchg() to ensure the node
initialization is complete before the xchg() publishes the thing.

But do we also need the acquire part of this barrier? From what I could
tell, the primitive as a whole does not imply any ordering.

> +	prev = xchg(&batch->tail, &node);
> +
> +	if (prev) {
> +		WRITE_ONCE(prev->next, &node);
> +		while (READ_ONCE(node.state) == lb_state_waiting)
> +			cpu_relax();
> +		if (node.state == lb_state_done)
> +			return;
> +		WARN_ON(node.state != lb_state_batch);
> +	}
> +
> +	/*
> +	 * We are now the queue head, we shold now acquire the lock and
> +	 * process a batch of qnodes.
> +	 */
> +	loop = LB_BATCH_SIZE;

Have you tried different sizes?

> +	next = &node;
> +	spin_lock(lock);
> +
> +do_list_again:
> +	do {
> +		nptr = next;
> +		_list_batch_cmd(nptr->cmd, batch->list, nptr->entry);
> +		next = READ_ONCE(nptr->next);
> +		/*
> +		 * As soon as the state is marked lb_state_done, we
> +		 * can no longer assume the content of *nptr as valid.
> +		 * So we have to hold off marking it done until we no
> +		 * longer need its content.
> +		 *
> +		 * The release barrier here is to make sure that we
> +		 * won't access its content after marking it done.
> +		 */
> +		if (next)
> +			smp_store_release(&nptr->state, lb_state_done);
> +	} while (--loop && next);
> +	if (!next) {
> +		/*
> +		 * The queue tail should equal to nptr, so clear it to
> +		 * mark the queue as empty.
> +		 */
> +		if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
> +			/*
> +			 * Queue not empty, wait until the next pointer is
> +			 * initialized.
> +			 */
> +			while (!(next = READ_ONCE(nptr->next)))
> +				cpu_relax();
> +		}
> +		/* The above cmpxchg acts as a memory barrier */

for what? :-)

Also, if that cmpxchg() fails, it very much does _not_ act as one.

I suspect you want smp_store_release() setting the state_done, just as
above, and then use cmpxchg_relaxed().

> +		WRITE_ONCE(nptr->state, lb_state_done);
> +	}
> +	if (next) {
> +		if (loop)
> +			goto do_list_again;	/* More qnodes to process */
> +		/*
> +		 * Mark the next qnode as head to process the next batch
> +		 * of qnodes. The new queue head cannot proceed until we
> +		 * release the lock.
> +		 */
> +		WRITE_ONCE(next->state, lb_state_batch);
> +	}
> +	spin_unlock(lock);
> +}
> +EXPORT_SYMBOL_GPL(do_list_batch_slowpath);
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Waiman Long Jan. 27, 2016, 8:22 p.m. UTC | #2
On 01/27/2016 11:34 AM, Peter Zijlstra wrote:
> On Tue, Jan 26, 2016 at 11:03:37AM -0500, Waiman Long wrote:
>> +static __always_inline void _list_batch_cmd(enum list_batch_cmd cmd,
>> +					    struct list_head *head,
>> +					    struct list_head *entry)
>> +{
>> +	if (cmd == lb_cmd_add)
>> +		list_add(entry, head);
>> +	else if (cmd == lb_cmd_del)
>> +		list_del(entry);
>> +	else /* cmd == lb_cmd_del_init */
>> +		list_del_init(entry);
> Maybe use switch(), GCC has fancy warns with enums and switch().

OK, I will look at the generated code to see if there is any difference.

>
>> +}
>> +static inline void do_list_batch(enum list_batch_cmd cmd, spinlock_t *lock,
>> +				   struct list_batch *batch,
>> +				   struct list_head *entry)
>> +{
>> +	/*
>> +	 * Fast path
>> +	 */
>> +	if (spin_trylock(lock)) {
>> +		_list_batch_cmd(cmd, batch->list, entry);
>> +		spin_unlock(lock);
>> _list_batch_cmd
> This is still quite a lot of code for an inline function

I expect the callers will call it with a constant cmd, thus optimizing 
out all the if conditional checks in _list_batch_cmd(). Taking the 
inline out will probably stop that optimization.

>> +		return;
>> +	}
>> +	do_list_batch_slowpath(cmd, lock, batch, entry);
>> +}
>
>
>> +void do_list_batch_slowpath(enum list_batch_cmd cmd, spinlock_t *lock,
>> +			    struct list_batch *batch, struct list_head *entry)
>> +{
>> +	struct list_batch_qnode node, *prev, *next, *nptr;
>> +	int loop;
>> +
>> +	/*
>> +	 * Put itself into the list_batch queue
>> +	 */
>> +	node.next  = NULL;
>> +	node.entry = entry;
>> +	node.cmd   = cmd;
>> +	node.state = lb_state_waiting;
>> +
> Here we rely on the release barrier implied by xchg() to ensure the node
> initialization is complete before the xchg() publishes the thing.
>
> But do we also need the acquire part of this barrier? From what I could
> tell, the primitive as a whole does not imply any ordering.

I think we probably won't need the acquire part, but I don't have a 
non-x86 machine that can really test out the more relaxed versions of 
the atomic ops. That is why I use the strict versions. We can always 
relax it later on with additional patches.

>
>> +	prev = xchg(&batch->tail,&node);
>> +
>> +	if (prev) {
>> +		WRITE_ONCE(prev->next,&node);
>> +		while (READ_ONCE(node.state) == lb_state_waiting)
>> +			cpu_relax();
>> +		if (node.state == lb_state_done)
>> +			return;
>> +		WARN_ON(node.state != lb_state_batch);
>> +	}
>> +
>> +	/*
>> +	 * We are now the queue head, we shold now acquire the lock and
>> +	 * process a batch of qnodes.
>> +	 */
>> +	loop = LB_BATCH_SIZE;
> Have you tried different sizes?

I have tried 64 and 128. Using 128 seems to give a bit better 
performance number.

>> +	next =&node;
>> +	spin_lock(lock);
>> +
>> +do_list_again:
>> +	do {
>> +		nptr = next;
>> +		_list_batch_cmd(nptr->cmd, batch->list, nptr->entry);
>> +		next = READ_ONCE(nptr->next);
>> +		/*
>> +		 * As soon as the state is marked lb_state_done, we
>> +		 * can no longer assume the content of *nptr as valid.
>> +		 * So we have to hold off marking it done until we no
>> +		 * longer need its content.
>> +		 *
>> +		 * The release barrier here is to make sure that we
>> +		 * won't access its content after marking it done.
>> +		 */
>> +		if (next)
>> +			smp_store_release(&nptr->state, lb_state_done);
>> +	} while (--loop&&  next);
>> +	if (!next) {
>> +		/*
>> +		 * The queue tail should equal to nptr, so clear it to
>> +		 * mark the queue as empty.
>> +		 */
>> +		if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
>> +			/*
>> +			 * Queue not empty, wait until the next pointer is
>> +			 * initialized.
>> +			 */
>> +			while (!(next = READ_ONCE(nptr->next)))
>> +				cpu_relax();
>> +		}
>> +		/* The above cmpxchg acts as a memory barrier */
> for what? :-)
>
> Also, if that cmpxchg() fails, it very much does _not_ act as one.
>
> I suspect you want smp_store_release() setting the state_done, just as
> above, and then use cmpxchg_relaxed().

You are right. I did forgot about there was no memory barrier guarantee 
when cmpxchg() fails. However, in that case, the READ_ONCE() and 
WRITE_ONCE() macros should still provide the necessary ordering, IMO. I 
can certainly change it to use cmpxchg_relaxed() and smp_store_release() 
instead.

>
>> +		WRITE_ONCE(nptr->state, lb_state_done);
>> +	}
>> +	if (next) {
>> +		if (loop)
>> +			goto do_list_again;	/* More qnodes to process */
>> +		/*
>> +		 * Mark the next qnode as head to process the next batch
>> +		 * of qnodes. The new queue head cannot proceed until we
>> +		 * release the lock.
>> +		 */
>> +		WRITE_ONCE(next->state, lb_state_batch);
>> +	}
>> +	spin_unlock(lock);
>> +}
>> +EXPORT_SYMBOL_GPL(do_list_batch_slowpath);

Cheers,
Longman
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Peter Zijlstra Jan. 27, 2016, 8:54 p.m. UTC | #3
On Wed, Jan 27, 2016 at 03:22:19PM -0500, Waiman Long wrote:

> >>+	/*
> >>+	 * Put itself into the list_batch queue
> >>+	 */
> >>+	node.next  = NULL;
> >>+	node.entry = entry;
> >>+	node.cmd   = cmd;
> >>+	node.state = lb_state_waiting;
> >>+
> >Here we rely on the release barrier implied by xchg() to ensure the node
> >initialization is complete before the xchg() publishes the thing.
> >
> >But do we also need the acquire part of this barrier? From what I could
> >tell, the primitive as a whole does not imply any ordering.
> 
> I think we probably won't need the acquire part, but I don't have a non-x86
> machine that can really test out the more relaxed versions of the atomic
> ops. That is why I use the strict versions. We can always relax it later on
> with additional patches.

Yeah, I have no hardware either. But at least we should comment the bits
we do know to rely upon.


> >>+	if (!next) {
> >>+		/*
> >>+		 * The queue tail should equal to nptr, so clear it to
> >>+		 * mark the queue as empty.
> >>+		 */
> >>+		if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
> >>+			/*
> >>+			 * Queue not empty, wait until the next pointer is
> >>+			 * initialized.
> >>+			 */
> >>+			while (!(next = READ_ONCE(nptr->next)))
> >>+				cpu_relax();
> >>+		}
> >>+		/* The above cmpxchg acts as a memory barrier */
> >for what? :-)
> >
> >Also, if that cmpxchg() fails, it very much does _not_ act as one.
> >
> >I suspect you want smp_store_release() setting the state_done, just as
> >above, and then use cmpxchg_relaxed().
> 
> You are right. I did forgot about there was no memory barrier guarantee when
> cmpxchg() fails. 

> However, in that case, the READ_ONCE() and WRITE_ONCE()
> macros should still provide the necessary ordering, IMO.

READ/WRITE_ONCE() provide _no_ order what so ever. And the issue here is
that we must not do any other stores to nptr after the state_done.

> I can certainly
> change it to use cmpxchg_relaxed() and smp_store_release() instead.

That seems a safe combination and would still generate the exact same
code on x86.
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Waiman Long Jan. 28, 2016, 4:45 p.m. UTC | #4
On 01/27/2016 03:54 PM, Peter Zijlstra wrote:
> On Wed, Jan 27, 2016 at 03:22:19PM -0500, Waiman Long wrote:
>
>>>> +	/*
>>>> +	 * Put itself into the list_batch queue
>>>> +	 */
>>>> +	node.next  = NULL;
>>>> +	node.entry = entry;
>>>> +	node.cmd   = cmd;
>>>> +	node.state = lb_state_waiting;
>>>> +
>>> Here we rely on the release barrier implied by xchg() to ensure the node
>>> initialization is complete before the xchg() publishes the thing.
>>>
>>> But do we also need the acquire part of this barrier? From what I could
>>> tell, the primitive as a whole does not imply any ordering.
>> I think we probably won't need the acquire part, but I don't have a non-x86
>> machine that can really test out the more relaxed versions of the atomic
>> ops. That is why I use the strict versions. We can always relax it later on
>> with additional patches.
> Yeah, I have no hardware either. But at least we should comment the bits
> we do know to rely upon.
>

Using xchg_release() looks OK to me. As this feature is enabled on x86 
only for this patch, we can make the change and whoever enabling it for 
other architectures that have a real release function will have to test it.

>>>> +	if (!next) {
>>>> +		/*
>>>> +		 * The queue tail should equal to nptr, so clear it to
>>>> +		 * mark the queue as empty.
>>>> +		 */
>>>> +		if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
>>>> +			/*
>>>> +			 * Queue not empty, wait until the next pointer is
>>>> +			 * initialized.
>>>> +			 */
>>>> +			while (!(next = READ_ONCE(nptr->next)))
>>>> +				cpu_relax();
>>>> +		}
>>>> +		/* The above cmpxchg acts as a memory barrier */
>>> for what? :-)
>>>
>>> Also, if that cmpxchg() fails, it very much does _not_ act as one.
>>>
>>> I suspect you want smp_store_release() setting the state_done, just as
>>> above, and then use cmpxchg_relaxed().
>> You are right. I did forgot about there was no memory barrier guarantee when
>> cmpxchg() fails.
>> However, in that case, the READ_ONCE() and WRITE_ONCE()
>> macros should still provide the necessary ordering, IMO.
> READ/WRITE_ONCE() provide _no_ order what so ever. And the issue here is
> that we must not do any other stores to nptr after the state_done.
>

I thought if those macros are accessing the same cacheline, the compiler 
won't change the ordering and the hardware will take care of the proper 
ordering. Anyway, I do intended to change to use smp_store_release() for 
safety.

>> I can certainly
>> change it to use cmpxchg_relaxed() and smp_store_release() instead.
> That seems a safe combination and would still generate the exact same
> code on x86.

Cheers,
Longman
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Peter Zijlstra Jan. 28, 2016, 6:35 p.m. UTC | #5
On Thu, Jan 28, 2016 at 11:45:40AM -0500, Waiman Long wrote:
> Using xchg_release() looks OK to me. As this feature is enabled on x86 only
> for this patch, we can make the change and whoever enabling it for other
> architectures that have a real release function will have to test it.

Ah, I was more thinking about:

	/*
	 * We rely on the memory barrier implied by xchg() below to
	 * ensure the node initialization is complete before its
	 * published.
	 */

And then use xchg() like you already do.


> >READ/WRITE_ONCE() provide _no_ order what so ever. And the issue here is
> >that we must not do any other stores to nptr after the state_done.
> >
> 
> I thought if those macros are accessing the same cacheline, the compiler
> won't change the ordering and the hardware will take care of the proper
> ordering. Anyway, I do intended to change to use smp_store_release() for
> safety.

The macros use a volatile cast, and that ensures the compiler must emit
the store and must emit it as a single store. I'm not 100% sure on the
rules of the compiler reordering volatile accesses, they are not a
compiler barrier.
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/include/linux/list_batch.h b/include/linux/list_batch.h
new file mode 100644
index 0000000..b8583e7
--- /dev/null
+++ b/include/linux/list_batch.h
@@ -0,0 +1,120 @@ 
+/*
+ * List insertion/deletion batching facility
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * (C) Copyright 2016 Hewlett-Packard Enterprise Development LP
+ *
+ * Authors: Waiman Long <waiman.long@hpe.com>
+ */
+#ifndef __LINUX_LIST_BATCH_H
+#define __LINUX_LIST_BATCH_H
+
+#include <linux/spinlock.h>
+#include <linux/list.h>
+
+/*
+ * include/linux/list_batch.h
+ *
+ * Inserting or deleting an entry from a linked list under a spinlock is a
+ * very common operation in the Linux kernel. If many CPUs are trying to
+ * grab the lock and manipulate the linked list, it can lead to significant
+ * lock contention and slow operation.
+ *
+ * This list operation batching facility is used to batch multiple list
+ * operations under one lock/unlock critical section, thus reducing the
+ * locking overhead and improving overall performance.
+ */
+enum list_batch_cmd {
+	lb_cmd_add,
+	lb_cmd_del,
+	lb_cmd_del_init
+};
+
+enum list_batch_state {
+	lb_state_waiting,	/* Node is waiting */
+	lb_state_batch,		/* Queue head to perform batch processing */
+	lb_state_done		/* Job is done */
+};
+
+struct list_batch_qnode {
+	struct list_batch_qnode	*next;
+	struct list_head	*entry;
+	enum list_batch_cmd	cmd;
+	enum list_batch_state	state;
+};
+
+struct list_batch {
+	struct list_head	*list;
+	struct list_batch_qnode *tail;
+};
+
+#define LIST_BATCH_INIT(_list)	\
+	{			\
+		.list = _list,	\
+		.tail = NULL	\
+	}
+
+static inline void list_batch_init(struct list_batch *batch,
+				   struct list_head *list)
+{
+	batch->list = list;
+	batch->tail = NULL;
+}
+
+static __always_inline void _list_batch_cmd(enum list_batch_cmd cmd,
+					    struct list_head *head,
+					    struct list_head *entry)
+{
+	if (cmd == lb_cmd_add)
+		list_add(entry, head);
+	else if (cmd == lb_cmd_del)
+		list_del(entry);
+	else /* cmd == lb_cmd_del_init */
+		list_del_init(entry);
+}
+
+#ifdef CONFIG_LIST_BATCHING
+
+extern void do_list_batch_slowpath(enum list_batch_cmd cmd, spinlock_t *lock,
+				   struct list_batch *batch,
+				   struct list_head *entry);
+
+static inline void do_list_batch(enum list_batch_cmd cmd, spinlock_t *lock,
+				   struct list_batch *batch,
+				   struct list_head *entry)
+{
+	/*
+	 * Fast path
+	 */
+	if (spin_trylock(lock)) {
+		_list_batch_cmd(cmd, batch->list, entry);
+		spin_unlock(lock);
+		return;
+	}
+	do_list_batch_slowpath(cmd, lock, batch, entry);
+}
+
+
+#else /* CONFIG_LIST_BATCHING */
+
+static inline void do_list_batch(enum list_batch_cmd cmd, spinlock_t *lock,
+				   struct list_batch *batch,
+				   struct list_head *entry)
+{
+	spin_lock(lock);
+	_list_batch_cmd(cmd, batch->list, entry);
+	spin_unlock(lock);
+}
+
+#endif /* CONFIG_LIST_BATCHING */
+
+#endif /* __LINUX_LIST_BATCH_H */
diff --git a/lib/Kconfig b/lib/Kconfig
index 133ebc0..d75ce19 100644
--- a/lib/Kconfig
+++ b/lib/Kconfig
@@ -514,6 +514,13 @@  config OID_REGISTRY
 config UCS2_STRING
         tristate
 
+config LIST_BATCHING
+	def_bool y if ARCH_USE_LIST_BATCHING
+	depends on SMP
+
+config ARCH_USE_LIST_BATCHING
+	bool
+
 source "lib/fonts/Kconfig"
 
 config SG_SPLIT
diff --git a/lib/Makefile b/lib/Makefile
index a7c26a4..2791262 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -210,6 +210,7 @@  quiet_cmd_build_OID_registry = GEN     $@
 clean-files	+= oid_registry_data.c
 
 obj-$(CONFIG_UCS2_STRING) += ucs2_string.o
+obj-$(CONFIG_LIST_BATCHING) += list_batch.o
 obj-$(CONFIG_UBSAN) += ubsan.o
 
 UBSAN_SANITIZE_ubsan.o := n
diff --git a/lib/list_batch.c b/lib/list_batch.c
new file mode 100644
index 0000000..ac51d49
--- /dev/null
+++ b/lib/list_batch.c
@@ -0,0 +1,117 @@ 
+/*
+ * List insertion/deletion batching facility
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * (C) Copyright 2016 Hewlett-Packard Enterprise Development LP
+ *
+ * Authors: Waiman Long <waiman.long@hpe.com>
+ */
+#include <linux/list_batch.h>
+
+/*
+ * List processing batch size = 128
+ *
+ * The batch size shouldn't be too large. Otherwise, it will be too unfair
+ * to the task doing the batch processing. It shouldn't be too small neither
+ * as the performance benefit will be reduced.
+ */
+#define LB_BATCH_SIZE	(1 << 7)
+
+/*
+ * Inserting or deleting an entry from a linked list under a spinlock is a
+ * very common operation in the Linux kernel. If many CPUs are trying to
+ * grab the lock and manipulate the linked list, it can lead to significant
+ * lock contention and slow operation.
+ *
+ * This list operation batching facility is used to batch multiple list
+ * operations under one lock/unlock critical section, thus reducing the
+ * locking overhead and improving overall performance.
+ */
+void do_list_batch_slowpath(enum list_batch_cmd cmd, spinlock_t *lock,
+			    struct list_batch *batch, struct list_head *entry)
+{
+	struct list_batch_qnode node, *prev, *next, *nptr;
+	int loop;
+
+	/*
+	 * Put itself into the list_batch queue
+	 */
+	node.next  = NULL;
+	node.entry = entry;
+	node.cmd   = cmd;
+	node.state = lb_state_waiting;
+
+	prev = xchg(&batch->tail, &node);
+
+	if (prev) {
+		WRITE_ONCE(prev->next, &node);
+		while (READ_ONCE(node.state) == lb_state_waiting)
+			cpu_relax();
+		if (node.state == lb_state_done)
+			return;
+		WARN_ON(node.state != lb_state_batch);
+	}
+
+	/*
+	 * We are now the queue head, we shold now acquire the lock and
+	 * process a batch of qnodes.
+	 */
+	loop = LB_BATCH_SIZE;
+	next = &node;
+	spin_lock(lock);
+
+do_list_again:
+	do {
+		nptr = next;
+		_list_batch_cmd(nptr->cmd, batch->list, nptr->entry);
+		next = READ_ONCE(nptr->next);
+		/*
+		 * As soon as the state is marked lb_state_done, we
+		 * can no longer assume the content of *nptr as valid.
+		 * So we have to hold off marking it done until we no
+		 * longer need its content.
+		 *
+		 * The release barrier here is to make sure that we
+		 * won't access its content after marking it done.
+		 */
+		if (next)
+			smp_store_release(&nptr->state, lb_state_done);
+	} while (--loop && next);
+	if (!next) {
+		/*
+		 * The queue tail should equal to nptr, so clear it to
+		 * mark the queue as empty.
+		 */
+		if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
+			/*
+			 * Queue not empty, wait until the next pointer is
+			 * initialized.
+			 */
+			while (!(next = READ_ONCE(nptr->next)))
+				cpu_relax();
+		}
+		/* The above cmpxchg acts as a memory barrier */
+		WRITE_ONCE(nptr->state, lb_state_done);
+	}
+	if (next) {
+		if (loop)
+			goto do_list_again;	/* More qnodes to process */
+		/*
+		 * Mark the next qnode as head to process the next batch
+		 * of qnodes. The new queue head cannot proceed until we
+		 * release the lock.
+		 */
+		WRITE_ONCE(next->state, lb_state_batch);
+	}
+	spin_unlock(lock);
+}
+EXPORT_SYMBOL_GPL(do_list_batch_slowpath);