diff mbox

[v8,1/6] lib/dlock-list: Distributed and lock-protected lists

Message ID 20171103133420.pngmrsfmtimataz4@linux-n805 (mailing list archive)
State New, archived
Headers show

Commit Message

Davidlohr Bueso Nov. 3, 2017, 1:34 p.m. UTC
On Thu, 02 Nov 2017, Waiman Long wrote:

>> Instead of the current O(N) implementation; at the cost
>> of adding an atomic counter. We also need to add a heads
>> pointer to the node structure such that we can unaccount
>> a thread doing list_del().
>>
>
>The counter will then become the single contention point for all
>concurrent updates to the dlock-list. So it will have a big impact on
>performance. On the other hand, instead of being a counter of # of
>items, we can make that a counter of # of non-empty lists. So its value
>will only be changed when a list go from empty to non-empty and vice
>versa. That will greatly reduce the number of updates to that counter.
>
>
>> Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
>> ---
>> include/linux/dlock-list.h |  2 ++
>> lib/dlock-list.c           | 40 ++++++++++++++++++++++++++++------------
>> 2 files changed, 30 insertions(+), 12 deletions(-)
>>
>> diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
>> index c00c7f92ada4..dd73d5787885 100644
>> --- a/include/linux/dlock-list.h
>> +++ b/include/linux/dlock-list.h
>> @@ -36,6 +36,7 @@ struct dlock_list_head {
>>
>> struct dlock_list_heads {
>>     struct dlock_list_head *heads;
>> +    atomic_t waiters;
>> };
>>
>> /*
>> @@ -44,6 +45,7 @@ struct dlock_list_heads {
>> struct dlock_list_node {
>>     struct list_head list;
>>     struct dlock_list_head *head;
>> +    struct dlock_list_heads *heads;
>> };
>>
>
>I don't want to add a new data item into dlock_list_node as there can be
>thousands or even of them in the system. Instead, I prefer increasing the
>size of dlock_list_head which only have a limited number of them and
>they have unused space because they are cacheline aligned.

Both are good points. Thanks.

----8<--------------------------------------------------------
Subject: [PATCH] [PATCH v2] lib/dlock-list: Scale dlock_lists_empty()

Instead of the current O(N) implementation, at the cost
of adding an atomic counter, we can convert the call to
an atomic_read(). The counter only serves for accounting
empty to non-empty transitions, and vice versa; therefore
only modified twice for each of the lists, during the
lifetime of the dlock -- thus 2*nr_dlock_lists.

In addition, to be able to unaccount a list_del(), we
add a dlist pointer to each head, thus minimizing the
overall memory footprint.

Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
---
 include/linux/dlock-list.h |  2 ++
 lib/dlock-list.c           | 56 +++++++++++++++++++++++++++++++++++-----------
 2 files changed, 45 insertions(+), 13 deletions(-)
diff mbox

Patch

diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
index c00c7f92ada4..d176a2d00cd1 100644
--- a/include/linux/dlock-list.h
+++ b/include/linux/dlock-list.h
@@ -32,10 +32,12 @@ 
 struct dlock_list_head {
 	struct list_head list;
 	spinlock_t lock;
+	struct dlock_list_heads *dlist;
 } ____cacheline_aligned_in_smp;
 
 struct dlock_list_heads {
 	struct dlock_list_head *heads;
+	atomic_t waiters;
 };
 
 /*
diff --git a/lib/dlock-list.c b/lib/dlock-list.c
index a4ddecc01b12..a84f42e800d5 100644
--- a/lib/dlock-list.c
+++ b/lib/dlock-list.c
@@ -122,8 +122,11 @@  int __alloc_dlock_list_heads(struct dlock_list_heads *dlist,
 
 		INIT_LIST_HEAD(&head->list);
 		head->lock = __SPIN_LOCK_UNLOCKED(&head->lock);
+		head->dlist = dlist;
 		lockdep_set_class(&head->lock, key);
 	}
+
+	atomic_set(&dlist->waiters, 0);
 	return 0;
 }
 EXPORT_SYMBOL(__alloc_dlock_list_heads);
@@ -138,30 +141,36 @@  EXPORT_SYMBOL(__alloc_dlock_list_heads);
 void free_dlock_list_heads(struct dlock_list_heads *dlist)
 {
 	kfree(dlist->heads);
-	dlist->heads = NULL;
+	atomic_set(&dlist->waiters, 0);
 }
 EXPORT_SYMBOL(free_dlock_list_heads);
 
 /**
  * dlock_lists_empty - Check if all the dlock lists are empty
  * @dlist: Pointer to the dlock_list_heads structure
- * Return: true if list is empty, false otherwise.
  *
- * This can be a pretty expensive function call. If this function is required
- * in a performance critical path, we may have to maintain a global count
- * of the list entries in the global dlock_list_heads structure instead.
+ * Return: true if all dlock lists are empty, false otherwise.
  */
 bool dlock_lists_empty(struct dlock_list_heads *dlist)
 {
-	int idx;
-
 	/* Shouldn't be called before nr_dlock_lists is initialized */
 	WARN_ON_ONCE(!nr_dlock_lists);
 
-	for (idx = 0; idx < nr_dlock_lists; idx++)
-		if (!list_empty(&dlist->heads[idx].list))
-			return false;
-	return true;
+	/*
+	 * Serialize dlist->waiters such that a 0->1 transition is not missed
+	 * by another thread checking if any of the dlock lists are used.
+	 *
+	 * CPU0				    CPU1
+	 * dlock_list_add()                 dlock_lists_empty()
+	 *   [S] atomic_inc(waiters);
+	 *       smp_mb__after_atomic();
+	 *					  smp_mb__before_atomic();
+	 *				      [L] atomic_read(waiters)
+	 *       list_add()
+	 *
+	 */
+	smp_mb__before_atomic();
+	return !atomic_read(&dlist->waiters);
 }
 EXPORT_SYMBOL(dlock_lists_empty);
 
@@ -179,6 +188,16 @@  void dlock_lists_add(struct dlock_list_node *node,
 	struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)];
 
 	/*
+	 * Bump the waiters counter _before_ taking the head->lock
+	 * such that we don't miss a thread adding itself to a list
+	 * while spinning for the lock.
+	 */
+	if (list_empty_careful(&head->list)) {
+		atomic_inc(&dlist->waiters);
+		smp_mb__after_atomic();
+	}
+
+	/*
 	 * There is no need to disable preemption
 	 */
 	spin_lock(&head->lock);
@@ -199,8 +218,7 @@  EXPORT_SYMBOL(dlock_lists_add);
  * a bug.
  */
 void dlock_lists_del(struct dlock_list_node *node)
-{
-	struct dlock_list_head *head;
+{	struct dlock_list_head *head;
 	bool retry;
 
 	do {
@@ -212,6 +230,18 @@  void dlock_lists_del(struct dlock_list_node *node)
 		spin_lock(&head->lock);
 		if (likely(head == node->head)) {
 			list_del_init(&node->list);
+			/*
+			 * We still hold the head->lock, a normal list_empty()
+			 * check will do.
+			 */
+			if (list_empty(&head->list)) {
+				struct dlock_list_heads *dlist;
+				dlist = node->head->dlist;
+
+				atomic_dec(&dlist->waiters);
+				smp_mb__after_atomic();
+			}
+
 			node->head = NULL;
 			retry = false;
 		} else {