diff mbox series

[v7,1/3] llist: Add a lock-less list variant terminated by a sentinel node

Message ID 20221003154459.207538-2-longman@redhat.com (mailing list archive)
State New, archived
Headers show
Series blk-cgroup: Optimize blkcg_rstat_flush() | expand

Commit Message

Waiman Long Oct. 3, 2022, 3:44 p.m. UTC
The lock-less list API is useful for dealing with list in a lock-less
manner. However, one of the drawback of the current API is that there
is not an easy way to determine if an entry has already been put into a
lock-less list. This has to be tracked externally and the tracking will
not be atomic unless some external synchronization logic is in place.

This patch introduces a new variant of the lock-less list terminated
by a sentinel node instead of by NULL. The function names start with
"sllist" instead of "llist". The advantage of this scheme is that we
can atomically determine if an entry has been put into a lock-less
list by looking at the next pointer of the llist_node. Of course, the
callers must clear the next pointer when an entry is removed from the
lockless list. This is done automatically when the sllist_for_each_safe
or sllist_for_each_entry_safe iteraters are used. The non-safe versions
of sllist iterator are not provided.

Signed-off-by: Waiman Long <longman@redhat.com>
---
 include/linux/llist.h | 132 +++++++++++++++++++++++++++++++++++++++++-
 lib/llist.c           |  79 ++++++++++++++++++++++++-
 2 files changed, 209 insertions(+), 2 deletions(-)

Comments

Tejun Heo Oct. 3, 2022, 4:40 p.m. UTC | #1
Hello, Waiman.

On Mon, Oct 03, 2022 at 11:44:57AM -0400, Waiman Long wrote:
> The lock-less list API is useful for dealing with list in a lock-less
> manner. However, one of the drawback of the current API is that there
> is not an easy way to determine if an entry has already been put into a
> lock-less list. This has to be tracked externally and the tracking will
> not be atomic unless some external synchronization logic is in place.
> 
> This patch introduces a new variant of the lock-less list terminated
> by a sentinel node instead of by NULL. The function names start with
> "sllist" instead of "llist". The advantage of this scheme is that we
> can atomically determine if an entry has been put into a lock-less
> list by looking at the next pointer of the llist_node. Of course, the
> callers must clear the next pointer when an entry is removed from the
> lockless list. This is done automatically when the sllist_for_each_safe
> or sllist_for_each_entry_safe iteraters are used. The non-safe versions
> of sllist iterator are not provided.

Any chance we can add sentinel to the existing llist instead of creating a
new variant? There's no real downside to always using sentinel, right?

Thanks.
Waiman Long Oct. 3, 2022, 4:55 p.m. UTC | #2
On 10/3/22 12:40, Tejun Heo wrote:
> Hello, Waiman.
>
> On Mon, Oct 03, 2022 at 11:44:57AM -0400, Waiman Long wrote:
>> The lock-less list API is useful for dealing with list in a lock-less
>> manner. However, one of the drawback of the current API is that there
>> is not an easy way to determine if an entry has already been put into a
>> lock-less list. This has to be tracked externally and the tracking will
>> not be atomic unless some external synchronization logic is in place.
>>
>> This patch introduces a new variant of the lock-less list terminated
>> by a sentinel node instead of by NULL. The function names start with
>> "sllist" instead of "llist". The advantage of this scheme is that we
>> can atomically determine if an entry has been put into a lock-less
>> list by looking at the next pointer of the llist_node. Of course, the
>> callers must clear the next pointer when an entry is removed from the
>> lockless list. This is done automatically when the sllist_for_each_safe
>> or sllist_for_each_entry_safe iteraters are used. The non-safe versions
>> of sllist iterator are not provided.
> Any chance we can add sentinel to the existing llist instead of creating a
> new variant? There's no real downside to always using sentinel, right?

That was my original plan. However, after looking at some existing users 
of lockless list, they have coded in the dependency on the fact that a 
lockless list is empty if it is NULL. I guess I can make this true also 
for the new lockless list with sentinel at the expense of a bit more 
overhead in the entry insertion path and deletion path. I will take a 
further look at that.

Cheers,
Longman
Tejun Heo Oct. 3, 2022, 4:57 p.m. UTC | #3
On Mon, Oct 03, 2022 at 12:55:24PM -0400, Waiman Long wrote:
> That was my original plan. However, after looking at some existing users of
> lockless list, they have coded in the dependency on the fact that a lockless
> list is empty if it is NULL. I guess I can make this true also for the new
> lockless list with sentinel at the expense of a bit more overhead in the
> entry insertion path and deletion path. I will take a further look at that.

There aren't that many users of llist. Maybe it'd be easier / cleaner to
introduce a macro to test whether a llist is empty and replace the NULL
tests?

Thanks.
Waiman Long Oct. 3, 2022, 5:32 p.m. UTC | #4
On 10/3/22 12:57, Tejun Heo wrote:
> On Mon, Oct 03, 2022 at 12:55:24PM -0400, Waiman Long wrote:
>> That was my original plan. However, after looking at some existing users of
>> lockless list, they have coded in the dependency on the fact that a lockless
>> list is empty if it is NULL. I guess I can make this true also for the new
>> lockless list with sentinel at the expense of a bit more overhead in the
>> entry insertion path and deletion path. I will take a further look at that.
> There aren't that many users of llist. Maybe it'd be easier / cleaner to
> introduce a macro to test whether a llist is empty and replace the NULL
> tests?

What my current thinking is to make llist works with both NULL and 
sentinel terminated lockless list. Users who wish to use the sentinel 
terminated version will have to use special sentinel version of 
LLIST_HEAD() macro and llist_del_all() and __llist_del_all() functions. 
In this way, I don't need to touch an existing users of llist while 
minimizing code redundancy. What do you think?

Cheers,
Longman
Tejun Heo Oct. 3, 2022, 5:36 p.m. UTC | #5
Hello,

On Mon, Oct 03, 2022 at 01:32:49PM -0400, Waiman Long wrote:
> What my current thinking is to make llist works with both NULL and sentinel
> terminated lockless list. Users who wish to use the sentinel terminated
> version will have to use special sentinel version of LLIST_HEAD() macro and
> llist_del_all() and __llist_del_all() functions. In this way, I don't need
> to touch an existing users of llist while minimizing code redundancy. What
> do you think?

Wouldn't that be more error-prone in the long term? I'd just bite the bullet
and convert the empty tests. It is a hassle to find them but given that it's
just the head node testing, it hopefully wouldn't be too bad.

Thanks.
Waiman Long Oct. 3, 2022, 5:40 p.m. UTC | #6
On 10/3/22 13:36, Tejun Heo wrote:
> Hello,
>
> On Mon, Oct 03, 2022 at 01:32:49PM -0400, Waiman Long wrote:
>> What my current thinking is to make llist works with both NULL and sentinel
>> terminated lockless list. Users who wish to use the sentinel terminated
>> version will have to use special sentinel version of LLIST_HEAD() macro and
>> llist_del_all() and __llist_del_all() functions. In this way, I don't need
>> to touch an existing users of llist while minimizing code redundancy. What
>> do you think?
> Wouldn't that be more error-prone in the long term? I'd just bite the bullet
> and convert the empty tests. It is a hassle to find them but given that it's
> just the head node testing, it hopefully wouldn't be too bad.

OK, I will take a further look at what changes will be needed by the 
existing llist users.

Thanks,
Longman
Waiman Long Oct. 3, 2022, 7:39 p.m. UTC | #7
On 10/3/22 13:40, Waiman Long wrote:
>
> On 10/3/22 13:36, Tejun Heo wrote:
>> Hello,
>>
>> On Mon, Oct 03, 2022 at 01:32:49PM -0400, Waiman Long wrote:
>>> What my current thinking is to make llist works with both NULL and 
>>> sentinel
>>> terminated lockless list. Users who wish to use the sentinel terminated
>>> version will have to use special sentinel version of LLIST_HEAD() 
>>> macro and
>>> llist_del_all() and __llist_del_all() functions. In this way, I 
>>> don't need
>>> to touch an existing users of llist while minimizing code 
>>> redundancy. What
>>> do you think?
>> Wouldn't that be more error-prone in the long term? I'd just bite the 
>> bullet
>> and convert the empty tests. It is a hassle to find them but given 
>> that it's
>> just the head node testing, it hopefully wouldn't be too bad.
>
> OK, I will take a further look at what changes will be needed by the 
> existing llist users.

After a further look, I think the task of making sentinel llist the 
default will be more time consuming that I initially thought. For example,

1) arch/powerpc/include/asm/kvm_book3s_64.h:
    It has its own llist iterator for_each_nest_rmap_safe.

2) kprobe use llist but not the full set of APIs and the
    various arch code put NULL in their llist_node to communicate
    with kprobe.

3) drivers/vhost/scsi.c uses llist but it doesn't use LLIST_HEAD
    nor init_llist_head to initialize the llist_head. I suspect that
    it may relies on NULL being the initial value.

There are 123 instances where llist_head is referenced in arch, driver, 
filesystem and kernel code. Going through all these to make sure that it 
will all work will be a major effort. I think it will be safer to allow 
both NULL and the sentinel node as the initializers and gradually 
convert them to use the proper llist APIs over time to complete the 
conversion. I am sorry that I can't spend that much time upfront for 
this conversion effort.

Regards,
Longman
Tejun Heo Oct. 3, 2022, 8:15 p.m. UTC | #8
Hello,

On Mon, Oct 03, 2022 at 03:39:02PM -0400, Waiman Long wrote:
> There are 123 instances where llist_head is referenced in arch, driver,
> filesystem and kernel code. Going through all these to make sure that it
> will all work will be a major effort. I think it will be safer to allow both
> NULL and the sentinel node as the initializers and gradually convert them to
> use the proper llist APIs over time to complete the conversion. I am sorry
> that I can't spend that much time upfront for this conversion effort.

I see. Oh well, thanks for taking a look.
Huang, Ying Oct. 8, 2022, 2:15 a.m. UTC | #9
Waiman Long <longman@redhat.com> writes:

> The lock-less list API is useful for dealing with list in a lock-less
> manner. However, one of the drawback of the current API is that there
> is not an easy way to determine if an entry has already been put into a
> lock-less list. This has to be tracked externally and the tracking will
> not be atomic unless some external synchronization logic is in place.
>
> This patch introduces a new variant of the lock-less list terminated
> by a sentinel node instead of by NULL. The function names start with
> "sllist" instead of "llist". The advantage of this scheme is that we
> can atomically determine if an entry has been put into a lock-less
> list by looking at the next pointer of the llist_node.

I guess that in the previous solution we use
test_and_set_bit()/clear_bit() on another member of the type containing
llist_node to track whether the node is in the llist?  After your patch,
we can use cmpxchg()/WRITE_ONCE(, NULL) on llist_node->next for that?

> Of course, the
> callers must clear the next pointer when an entry is removed from the
> lockless list. This is done automatically when the sllist_for_each_safe
> or sllist_for_each_entry_safe

Per my understanding, other xxx_for_each_safe() variants will not change
the list by itself.  So how about rename the functions?

> iteraters are used. The non-safe versions
> of sllist iterator are not provided.
>
> Signed-off-by: Waiman Long <longman@redhat.com>
> ---
>  include/linux/llist.h | 132 +++++++++++++++++++++++++++++++++++++++++-
>  lib/llist.c           |  79 ++++++++++++++++++++++++-
>  2 files changed, 209 insertions(+), 2 deletions(-)
>
> diff --git a/include/linux/llist.h b/include/linux/llist.h
> index 85bda2d02d65..d9a921656adb 100644
> --- a/include/linux/llist.h
> +++ b/include/linux/llist.h
> @@ -2,7 +2,8 @@
>  #ifndef LLIST_H
>  #define LLIST_H
>  /*
> - * Lock-less NULL terminated single linked list
> + * Lock-less NULL terminated singly linked list
> + * --------------------------------------------
>   *
>   * Cases where locking is not needed:
>   * If there are multiple producers and multiple consumers, llist_add can be
> @@ -46,6 +47,16 @@
>   *
>   * Copyright 2010,2011 Intel Corp.
>   *   Author: Huang Ying <ying.huang@intel.com>
> + *
> + * Lock-less sentinel node terminated singly linked list
> + * -----------------------------------------------------
> + *
> + * This is a variant of the generic lock-less list where the end of the list
> + * is terminated by a sentinel node instead of NULL. The advantage of this
> + * scheme is that we can use the next pointer of the llist_node to determine
> + * if it has been put into a lock-less list. However, the next pointer of
> + * the llist_node should be cleared ASAP after it has been removed from a
> + * lock-less list.
>   */
>  
>  #include <linux/atomic.h>
> @@ -64,6 +75,13 @@ struct llist_node {
>  #define LLIST_HEAD_INIT(name)	{ NULL }
>  #define LLIST_HEAD(name)	struct llist_head name = LLIST_HEAD_INIT(name)
>  
> +/*
> + * Sentinel lock-less list
> + */
> +extern struct llist_node	__llist_end;
> +#define SLLIST_HEAD_INIT(name)	{ &__llist_end }
> +#define SLLIST_HEAD(name)	struct llist_head name = SLLIST_HEAD_INIT(name)
> +
>  /**
>   * init_llist_head - initialize lock-less list head
>   * @head:	the head for your lock-less list
> @@ -73,6 +91,15 @@ static inline void init_llist_head(struct llist_head *list)
>  	list->first = NULL;
>  }
>  
> +/**
> + * init_sllist_head - initialize sentinel lock-less list head
> + * @head:	the head for your sentinel lock-less list
> + */
> +static inline void init_sllist_head(struct llist_head *list)
> +{
> +	list->first = &__llist_end;
> +}
> +
>  /**
>   * llist_entry - get the struct of this entry
>   * @ptr:	the &struct llist_node pointer.
> @@ -99,6 +126,15 @@ static inline void init_llist_head(struct llist_head *list)
>  #define member_address_is_nonnull(ptr, member)	\
>  	((uintptr_t)(ptr) + offsetof(typeof(*(ptr)), member) != 0)
>  
> +/**
> + * member_address_is_notsentinel - check whether member address is not sentinel
> + * @ptr:	the object pointer (struct type * that contains the llist_node)
> + * @member:	the name of the llist_node within the struct.
> + */
> +#define member_address_is_notsentinel(ptr, member)	\
> +	((uintptr_t)(ptr) + offsetof(typeof(*(ptr)), member)	\
> +		!= (uintptr_t)&__llist_end)
> +
>  /**
>   * llist_for_each - iterate over some deleted entries of a lock-less list
>   * @pos:	the &struct llist_node to use as a loop cursor
> @@ -135,6 +171,18 @@ static inline void init_llist_head(struct llist_head *list)
>  #define llist_for_each_safe(pos, n, node)			\
>  	for ((pos) = (node); (pos) && ((n) = (pos)->next, true); (pos) = (n))
>  
> +/**
> + * sllist_for_each_safe - iterate over entries of a sentinel lock-less list
> + *			  safe against removal of list entry
> + * @pos:	the &struct llist_node to use as a loop cursor
> + * @n:		another &struct llist_node to use as temporary storage
> + * @node:	the first entry of deleted list entries
> + */
> +#define sllist_for_each_safe(pos, n, node)			\
> +	for ((pos) = (node); ((pos) !=  &__llist_end) &&	\
> +	     ((n) = (pos)->next,				\
> +	     ({ WRITE_ONCE((pos)->next, NULL); }), true); (pos) = (n))
> +
>  /**
>   * llist_for_each_entry - iterate over some deleted entries of lock-less list of given type
>   * @pos:	the type * to use as a loop cursor.
> @@ -178,6 +226,21 @@ static inline void init_llist_head(struct llist_head *list)
>  	        (n = llist_entry(pos->member.next, typeof(*n), member), true); \
>  	     pos = n)
>  
> +/**
> + * sllist_for_each_entry_safe - iterate over sentinel entries of lock-less list
> + *				of given type safe against removal of list entry
> + * @pos:	the type * to use as a loop cursor.
> + * @n:		another type * to use as temporary storage
> + * @node:	the first entry of deleted list entries.
> + * @member:	the name of the llist_node with the struct.
> + */
> +#define sllist_for_each_entry_safe(pos, n, node, member)		       \
> +	for (pos = llist_entry((node), typeof(*(pos)), member);		       \
> +	     member_address_is_notsentinel(pos, member) &&		       \
> +		(n = llist_entry((pos)->member.next, typeof(*(n)), member),    \
> +		({ WRITE_ONCE((pos)->member.next, NULL); }), true);	       \
> +	     pos = n)
> +
>  /**
>   * llist_empty - tests whether a lock-less list is empty
>   * @head:	the list to test
> @@ -191,15 +254,35 @@ static inline bool llist_empty(const struct llist_head *head)
>  	return READ_ONCE(head->first) == NULL;
>  }
>  
> +/**
> + * sllist_empty - tests whether a lock-less list is empty
> + * @head:	the list to test
> + */
> +static inline bool sllist_empty(const struct llist_head *head)
> +{
> +	return READ_ONCE(head->first) == &__llist_end;
> +}
> +
>  static inline struct llist_node *llist_next(struct llist_node *node)
>  {
>  	return node->next;
>  }
>  
> +static inline struct llist_node *sllist_next(struct llist_node *node)
> +{
> +	struct llist_node *next = node->next;
> +
> +	return (next == &__llist_end) ? NULL : next;
> +}
> +
>  extern bool llist_add_batch(struct llist_node *new_first,
>  			    struct llist_node *new_last,
>  			    struct llist_head *head);
>  
> +extern bool sllist_add_batch(struct llist_node *new_first,
> +			     struct llist_node *new_last,
> +			     struct llist_head *head);
> +
>  static inline bool __llist_add_batch(struct llist_node *new_first,
>  				     struct llist_node *new_last,
>  				     struct llist_head *head)
> @@ -209,6 +292,15 @@ static inline bool __llist_add_batch(struct llist_node *new_first,
>  	return new_last->next == NULL;
>  }
>  
> +static inline bool __sllist_add_batch(struct llist_node *new_first,
> +				      struct llist_node *new_last,
> +				      struct llist_head *head)
> +{
> +	new_last->next = head->first;
> +	head->first = new_first;
> +	return new_last->next == &__llist_end;
> +}
> +
>  /**
>   * llist_add - add a new entry
>   * @new:	new entry to be added
> @@ -221,11 +313,28 @@ static inline bool llist_add(struct llist_node *new, struct llist_head *head)
>  	return llist_add_batch(new, new, head);
>  }
>  
> +/**
> + * sllist_add - add a new entry
> + * @new:	new entry to be added
> + * @head:	the head for your lock-less list
> + *
> + * Returns true if the list was empty prior to adding this entry.
> + */
> +static inline bool sllist_add(struct llist_node *new, struct llist_head *head)
> +{
> +	return sllist_add_batch(new, new, head);
> +}
> +
>  static inline bool __llist_add(struct llist_node *new, struct llist_head *head)
>  {
>  	return __llist_add_batch(new, new, head);
>  }
>  
> +static inline bool __sllist_add(struct llist_node *new, struct llist_head *head)
> +{
> +	return __sllist_add_batch(new, new, head);
> +}
> +
>  /**
>   * llist_del_all - delete all entries from lock-less list
>   * @head:	the head of lock-less list to delete all entries
> @@ -239,6 +348,17 @@ static inline struct llist_node *llist_del_all(struct llist_head *head)
>  	return xchg(&head->first, NULL);
>  }
>  
> +/**
> + * sllist_del_all - delete all entries from sentinel lock-less list
> + * @head:	the head of lock-less list to delete all entries
> + */
> +static inline struct llist_node *sllist_del_all(struct llist_head *head)
> +{
> +	struct llist_node *first = xchg(&head->first, &__llist_end);
> +
> +	return (first == &__llist_end) ? NULL : first;
> +}
> +
>  static inline struct llist_node *__llist_del_all(struct llist_head *head)
>  {
>  	struct llist_node *first = head->first;
> @@ -247,8 +367,18 @@ static inline struct llist_node *__llist_del_all(struct llist_head *head)
>  	return first;
>  }
>  
> +static inline struct llist_node *__sllist_del_all(struct llist_head *head)
> +{
> +	struct llist_node *first = head->first;
> +
> +	head->first = &__llist_end;
> +	return (first == &__llist_end) ? NULL : first;
> +}
> +
>  extern struct llist_node *llist_del_first(struct llist_head *head);
> +extern struct llist_node *sllist_del_first(struct llist_head *head);
>  
>  struct llist_node *llist_reverse_order(struct llist_node *head);
> +struct llist_node *sllist_reverse_order(struct llist_node *head);
>  
>  #endif /* LLIST_H */
> diff --git a/lib/llist.c b/lib/llist.c
> index 611ce4881a87..418327394735 100644
> --- a/lib/llist.c
> +++ b/lib/llist.c
> @@ -1,6 +1,6 @@
>  // SPDX-License-Identifier: GPL-2.0-only
>  /*
> - * Lock-less NULL terminated single linked list
> + * Lock-less NULL and sentinel node terminated singly linked lists
>   *
>   * The basic atomic operation of this list is cmpxchg on long.  On
>   * architectures that don't have NMI-safe cmpxchg implementation, the
> @@ -12,8 +12,11 @@
>   */
>  #include <linux/kernel.h>
>  #include <linux/export.h>
> +#include <linux/cache.h>
>  #include <linux/llist.h>
>  
> +struct llist_node __llist_end __ro_after_init;
> +EXPORT_SYMBOL_GPL(__llist_end);
>  
>  /**
>   * llist_add_batch - add several linked entries in batch
> @@ -36,6 +39,27 @@ bool llist_add_batch(struct llist_node *new_first, struct llist_node *new_last,
>  }
>  EXPORT_SYMBOL_GPL(llist_add_batch);
>  
> +/**
> + * sllist_add_batch - add several linked entries in batch
> + * @new_first:	first entry in batch to be added
> + * @new_last:	last entry in batch to be added
> + * @head:	the head for your lock-less list
> + *
> + * Return whether list is empty before adding.
> + */
> +bool sllist_add_batch(struct llist_node *new_first, struct llist_node *new_last,
> +		      struct llist_head *head)
> +{
> +	struct llist_node *first;
> +
> +	do {
> +		new_last->next = first = READ_ONCE(head->first);

Here new_last->next may be changed from NULL to non-NULL?  Do we need
to do this atomically?  Even cmpxchg() to guarantee it is NULL before
put into list?

> +	} while (cmpxchg(&head->first, first, new_first) != first);
> +
> +	return first == &__llist_end;
> +}
> +EXPORT_SYMBOL_GPL(sllist_add_batch);
> +
>  /**
>   * llist_del_first - delete the first entry of lock-less list
>   * @head:	the head for your lock-less list
> @@ -69,6 +93,33 @@ struct llist_node *llist_del_first(struct llist_head *head)
>  }
>  EXPORT_SYMBOL_GPL(llist_del_first);
>  
> +/**
> + * sllist_del_first - delete the first entry of sentinel lock-less list
> + * @head:	the head for your lock-less list
> + *
> + * If list is empty, return NULL, otherwise, return the first entry
> + * deleted, this is the newest added one.
> + */
> +struct llist_node *sllist_del_first(struct llist_head *head)
> +{
> +	struct llist_node *entry, *old_entry, *next;
> +
> +	entry = smp_load_acquire(&head->first);
> +	for (;;) {
> +		if (entry == &__llist_end)
> +			return NULL;
> +		old_entry = entry;
> +		next = READ_ONCE(entry->next);
> +		entry = cmpxchg(&head->first, old_entry, next);
> +		if (entry == old_entry)
> +			break;
> +	}
> +	WRITE_ONCE(entry->next, NULL);
> +
> +	return entry;
> +}
> +EXPORT_SYMBOL_GPL(sllist_del_first);
> +
>  /**
>   * llist_reverse_order - reverse order of a llist chain
>   * @head:	first item of the list to be reversed
> @@ -90,3 +141,29 @@ struct llist_node *llist_reverse_order(struct llist_node *head)
>  	return new_head;
>  }
>  EXPORT_SYMBOL_GPL(llist_reverse_order);
> +
> +/**
> + * sllist_reverse_order - reverse order of a llist chain
> + * @head:	first item of the list to be reversed
> + *
> + * Reverse the order of a chain of llist entries and return the
> + * new first entry.
> + */
> +struct llist_node *sllist_reverse_order(struct llist_node *head)
> +{
> +	struct llist_node *new_head = &__llist_end;
> +
> +	if (!head || (head == &__llist_end))
> +		return NULL;
> +
> +	while (head != &__llist_end) {
> +		struct llist_node *tmp = head;
> +
> +		head = head->next;
> +		tmp->next = new_head;
> +		new_head = tmp;
> +	}
> +
> +	return new_head;
> +}
> +EXPORT_SYMBOL_GPL(sllist_reverse_order);

Best Regards,
Huang, Ying
diff mbox series

Patch

diff --git a/include/linux/llist.h b/include/linux/llist.h
index 85bda2d02d65..d9a921656adb 100644
--- a/include/linux/llist.h
+++ b/include/linux/llist.h
@@ -2,7 +2,8 @@ 
 #ifndef LLIST_H
 #define LLIST_H
 /*
- * Lock-less NULL terminated single linked list
+ * Lock-less NULL terminated singly linked list
+ * --------------------------------------------
  *
  * Cases where locking is not needed:
  * If there are multiple producers and multiple consumers, llist_add can be
@@ -46,6 +47,16 @@ 
  *
  * Copyright 2010,2011 Intel Corp.
  *   Author: Huang Ying <ying.huang@intel.com>
+ *
+ * Lock-less sentinel node terminated singly linked list
+ * -----------------------------------------------------
+ *
+ * This is a variant of the generic lock-less list where the end of the list
+ * is terminated by a sentinel node instead of NULL. The advantage of this
+ * scheme is that we can use the next pointer of the llist_node to determine
+ * if it has been put into a lock-less list. However, the next pointer of
+ * the llist_node should be cleared ASAP after it has been removed from a
+ * lock-less list.
  */
 
 #include <linux/atomic.h>
@@ -64,6 +75,13 @@  struct llist_node {
 #define LLIST_HEAD_INIT(name)	{ NULL }
 #define LLIST_HEAD(name)	struct llist_head name = LLIST_HEAD_INIT(name)
 
+/*
+ * Sentinel lock-less list
+ */
+extern struct llist_node	__llist_end;
+#define SLLIST_HEAD_INIT(name)	{ &__llist_end }
+#define SLLIST_HEAD(name)	struct llist_head name = SLLIST_HEAD_INIT(name)
+
 /**
  * init_llist_head - initialize lock-less list head
  * @head:	the head for your lock-less list
@@ -73,6 +91,15 @@  static inline void init_llist_head(struct llist_head *list)
 	list->first = NULL;
 }
 
+/**
+ * init_sllist_head - initialize sentinel lock-less list head
+ * @head:	the head for your sentinel lock-less list
+ */
+static inline void init_sllist_head(struct llist_head *list)
+{
+	list->first = &__llist_end;
+}
+
 /**
  * llist_entry - get the struct of this entry
  * @ptr:	the &struct llist_node pointer.
@@ -99,6 +126,15 @@  static inline void init_llist_head(struct llist_head *list)
 #define member_address_is_nonnull(ptr, member)	\
 	((uintptr_t)(ptr) + offsetof(typeof(*(ptr)), member) != 0)
 
+/**
+ * member_address_is_notsentinel - check whether member address is not sentinel
+ * @ptr:	the object pointer (struct type * that contains the llist_node)
+ * @member:	the name of the llist_node within the struct.
+ */
+#define member_address_is_notsentinel(ptr, member)	\
+	((uintptr_t)(ptr) + offsetof(typeof(*(ptr)), member)	\
+		!= (uintptr_t)&__llist_end)
+
 /**
  * llist_for_each - iterate over some deleted entries of a lock-less list
  * @pos:	the &struct llist_node to use as a loop cursor
@@ -135,6 +171,18 @@  static inline void init_llist_head(struct llist_head *list)
 #define llist_for_each_safe(pos, n, node)			\
 	for ((pos) = (node); (pos) && ((n) = (pos)->next, true); (pos) = (n))
 
+/**
+ * sllist_for_each_safe - iterate over entries of a sentinel lock-less list
+ *			  safe against removal of list entry
+ * @pos:	the &struct llist_node to use as a loop cursor
+ * @n:		another &struct llist_node to use as temporary storage
+ * @node:	the first entry of deleted list entries
+ */
+#define sllist_for_each_safe(pos, n, node)			\
+	for ((pos) = (node); ((pos) !=  &__llist_end) &&	\
+	     ((n) = (pos)->next,				\
+	     ({ WRITE_ONCE((pos)->next, NULL); }), true); (pos) = (n))
+
 /**
  * llist_for_each_entry - iterate over some deleted entries of lock-less list of given type
  * @pos:	the type * to use as a loop cursor.
@@ -178,6 +226,21 @@  static inline void init_llist_head(struct llist_head *list)
 	        (n = llist_entry(pos->member.next, typeof(*n), member), true); \
 	     pos = n)
 
+/**
+ * sllist_for_each_entry_safe - iterate over sentinel entries of lock-less list
+ *				of given type safe against removal of list entry
+ * @pos:	the type * to use as a loop cursor.
+ * @n:		another type * to use as temporary storage
+ * @node:	the first entry of deleted list entries.
+ * @member:	the name of the llist_node with the struct.
+ */
+#define sllist_for_each_entry_safe(pos, n, node, member)		       \
+	for (pos = llist_entry((node), typeof(*(pos)), member);		       \
+	     member_address_is_notsentinel(pos, member) &&		       \
+		(n = llist_entry((pos)->member.next, typeof(*(n)), member),    \
+		({ WRITE_ONCE((pos)->member.next, NULL); }), true);	       \
+	     pos = n)
+
 /**
  * llist_empty - tests whether a lock-less list is empty
  * @head:	the list to test
@@ -191,15 +254,35 @@  static inline bool llist_empty(const struct llist_head *head)
 	return READ_ONCE(head->first) == NULL;
 }
 
+/**
+ * sllist_empty - tests whether a lock-less list is empty
+ * @head:	the list to test
+ */
+static inline bool sllist_empty(const struct llist_head *head)
+{
+	return READ_ONCE(head->first) == &__llist_end;
+}
+
 static inline struct llist_node *llist_next(struct llist_node *node)
 {
 	return node->next;
 }
 
+static inline struct llist_node *sllist_next(struct llist_node *node)
+{
+	struct llist_node *next = node->next;
+
+	return (next == &__llist_end) ? NULL : next;
+}
+
 extern bool llist_add_batch(struct llist_node *new_first,
 			    struct llist_node *new_last,
 			    struct llist_head *head);
 
+extern bool sllist_add_batch(struct llist_node *new_first,
+			     struct llist_node *new_last,
+			     struct llist_head *head);
+
 static inline bool __llist_add_batch(struct llist_node *new_first,
 				     struct llist_node *new_last,
 				     struct llist_head *head)
@@ -209,6 +292,15 @@  static inline bool __llist_add_batch(struct llist_node *new_first,
 	return new_last->next == NULL;
 }
 
+static inline bool __sllist_add_batch(struct llist_node *new_first,
+				      struct llist_node *new_last,
+				      struct llist_head *head)
+{
+	new_last->next = head->first;
+	head->first = new_first;
+	return new_last->next == &__llist_end;
+}
+
 /**
  * llist_add - add a new entry
  * @new:	new entry to be added
@@ -221,11 +313,28 @@  static inline bool llist_add(struct llist_node *new, struct llist_head *head)
 	return llist_add_batch(new, new, head);
 }
 
+/**
+ * sllist_add - add a new entry
+ * @new:	new entry to be added
+ * @head:	the head for your lock-less list
+ *
+ * Returns true if the list was empty prior to adding this entry.
+ */
+static inline bool sllist_add(struct llist_node *new, struct llist_head *head)
+{
+	return sllist_add_batch(new, new, head);
+}
+
 static inline bool __llist_add(struct llist_node *new, struct llist_head *head)
 {
 	return __llist_add_batch(new, new, head);
 }
 
+static inline bool __sllist_add(struct llist_node *new, struct llist_head *head)
+{
+	return __sllist_add_batch(new, new, head);
+}
+
 /**
  * llist_del_all - delete all entries from lock-less list
  * @head:	the head of lock-less list to delete all entries
@@ -239,6 +348,17 @@  static inline struct llist_node *llist_del_all(struct llist_head *head)
 	return xchg(&head->first, NULL);
 }
 
+/**
+ * sllist_del_all - delete all entries from sentinel lock-less list
+ * @head:	the head of lock-less list to delete all entries
+ */
+static inline struct llist_node *sllist_del_all(struct llist_head *head)
+{
+	struct llist_node *first = xchg(&head->first, &__llist_end);
+
+	return (first == &__llist_end) ? NULL : first;
+}
+
 static inline struct llist_node *__llist_del_all(struct llist_head *head)
 {
 	struct llist_node *first = head->first;
@@ -247,8 +367,18 @@  static inline struct llist_node *__llist_del_all(struct llist_head *head)
 	return first;
 }
 
+static inline struct llist_node *__sllist_del_all(struct llist_head *head)
+{
+	struct llist_node *first = head->first;
+
+	head->first = &__llist_end;
+	return (first == &__llist_end) ? NULL : first;
+}
+
 extern struct llist_node *llist_del_first(struct llist_head *head);
+extern struct llist_node *sllist_del_first(struct llist_head *head);
 
 struct llist_node *llist_reverse_order(struct llist_node *head);
+struct llist_node *sllist_reverse_order(struct llist_node *head);
 
 #endif /* LLIST_H */
diff --git a/lib/llist.c b/lib/llist.c
index 611ce4881a87..418327394735 100644
--- a/lib/llist.c
+++ b/lib/llist.c
@@ -1,6 +1,6 @@ 
 // SPDX-License-Identifier: GPL-2.0-only
 /*
- * Lock-less NULL terminated single linked list
+ * Lock-less NULL and sentinel node terminated singly linked lists
  *
  * The basic atomic operation of this list is cmpxchg on long.  On
  * architectures that don't have NMI-safe cmpxchg implementation, the
@@ -12,8 +12,11 @@ 
  */
 #include <linux/kernel.h>
 #include <linux/export.h>
+#include <linux/cache.h>
 #include <linux/llist.h>
 
+struct llist_node __llist_end __ro_after_init;
+EXPORT_SYMBOL_GPL(__llist_end);
 
 /**
  * llist_add_batch - add several linked entries in batch
@@ -36,6 +39,27 @@  bool llist_add_batch(struct llist_node *new_first, struct llist_node *new_last,
 }
 EXPORT_SYMBOL_GPL(llist_add_batch);
 
+/**
+ * sllist_add_batch - add several linked entries in batch
+ * @new_first:	first entry in batch to be added
+ * @new_last:	last entry in batch to be added
+ * @head:	the head for your lock-less list
+ *
+ * Return whether list is empty before adding.
+ */
+bool sllist_add_batch(struct llist_node *new_first, struct llist_node *new_last,
+		      struct llist_head *head)
+{
+	struct llist_node *first;
+
+	do {
+		new_last->next = first = READ_ONCE(head->first);
+	} while (cmpxchg(&head->first, first, new_first) != first);
+
+	return first == &__llist_end;
+}
+EXPORT_SYMBOL_GPL(sllist_add_batch);
+
 /**
  * llist_del_first - delete the first entry of lock-less list
  * @head:	the head for your lock-less list
@@ -69,6 +93,33 @@  struct llist_node *llist_del_first(struct llist_head *head)
 }
 EXPORT_SYMBOL_GPL(llist_del_first);
 
+/**
+ * sllist_del_first - delete the first entry of sentinel lock-less list
+ * @head:	the head for your lock-less list
+ *
+ * If list is empty, return NULL, otherwise, return the first entry
+ * deleted, this is the newest added one.
+ */
+struct llist_node *sllist_del_first(struct llist_head *head)
+{
+	struct llist_node *entry, *old_entry, *next;
+
+	entry = smp_load_acquire(&head->first);
+	for (;;) {
+		if (entry == &__llist_end)
+			return NULL;
+		old_entry = entry;
+		next = READ_ONCE(entry->next);
+		entry = cmpxchg(&head->first, old_entry, next);
+		if (entry == old_entry)
+			break;
+	}
+	WRITE_ONCE(entry->next, NULL);
+
+	return entry;
+}
+EXPORT_SYMBOL_GPL(sllist_del_first);
+
 /**
  * llist_reverse_order - reverse order of a llist chain
  * @head:	first item of the list to be reversed
@@ -90,3 +141,29 @@  struct llist_node *llist_reverse_order(struct llist_node *head)
 	return new_head;
 }
 EXPORT_SYMBOL_GPL(llist_reverse_order);
+
+/**
+ * sllist_reverse_order - reverse order of a llist chain
+ * @head:	first item of the list to be reversed
+ *
+ * Reverse the order of a chain of llist entries and return the
+ * new first entry.
+ */
+struct llist_node *sllist_reverse_order(struct llist_node *head)
+{
+	struct llist_node *new_head = &__llist_end;
+
+	if (!head || (head == &__llist_end))
+		return NULL;
+
+	while (head != &__llist_end) {
+		struct llist_node *tmp = head;
+
+		head = head->next;
+		tmp->next = new_head;
+		new_head = tmp;
+	}
+
+	return new_head;
+}
+EXPORT_SYMBOL_GPL(sllist_reverse_order);