diff mbox

[V3] blk-mq: avoid to starve tag allocation after allocation process migrates

Message ID 20180524055615.GF12533@vader (mailing list archive)
State New, archived
Headers show

Commit Message

Omar Sandoval May 24, 2018, 5:56 a.m. UTC
On Thu, May 24, 2018 at 11:04:40AM +0800, Ming Lei wrote:
> When the allocation process is scheduled back and the mapped hw queue is
> changed, fake one extra wake up on previous queue for compensating wake up
> miss, so other allocations on the previous queue won't be starved.
> 
> This patch fixes one request allocation hang issue, which can be
> triggered easily in case of very low nr_request.

Thanks, Ming, this looks better. One comment below.

> Cc: <stable@vger.kernel.org>
> Cc: Omar Sandoval <osandov@fb.com>
> Signed-off-by: Ming Lei <ming.lei@redhat.com>
> ---
> V3:
> 	- fix comments as suggested by Jens
> 	- remove the wrapper as suggested by Omar
> V2:
> 	- fix build failure
> 
> 
>  block/blk-mq-tag.c      | 12 ++++++++++++
>  include/linux/sbitmap.h |  7 +++++++
>  lib/sbitmap.c           | 22 ++++++++++++----------
>  3 files changed, 31 insertions(+), 10 deletions(-)
> 
> diff --git a/block/blk-mq-tag.c b/block/blk-mq-tag.c
> index 336dde07b230..a4e58fc28a06 100644
> --- a/block/blk-mq-tag.c
> +++ b/block/blk-mq-tag.c
> @@ -134,6 +134,8 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data)
>  	ws = bt_wait_ptr(bt, data->hctx);
>  	drop_ctx = data->ctx == NULL;
>  	do {
> +		struct sbitmap_queue *bt_prev;
> +
>  		/*
>  		 * We're out of tags on this hardware queue, kick any
>  		 * pending IO submits before going to sleep waiting for
> @@ -159,6 +161,7 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data)
>  		if (data->ctx)
>  			blk_mq_put_ctx(data->ctx);
>  
> +		bt_prev = bt;
>  		io_schedule();
>  
>  		data->ctx = blk_mq_get_ctx(data->q);
> @@ -170,6 +173,15 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data)
>  			bt = &tags->bitmap_tags;
>  
>  		finish_wait(&ws->wait, &wait);
> +
> +		/*
> +		 * If destination hw queue is changed, fake wake up on
> +		 * previous queue for compensating the wake up miss, so
> +		 * other allocations on previous queue won't be starved.
> +		 */
> +		if (bt != bt_prev)
> +			sbitmap_queue_wake_up(bt_prev);
> +
>  		ws = bt_wait_ptr(bt, data->hctx);
>  	} while (1);
>  
> diff --git a/include/linux/sbitmap.h b/include/linux/sbitmap.h
> index 841585f6e5f2..bba9d80191b7 100644
> --- a/include/linux/sbitmap.h
> +++ b/include/linux/sbitmap.h
> @@ -484,6 +484,13 @@ static inline struct sbq_wait_state *sbq_wait_ptr(struct sbitmap_queue *sbq,
>  void sbitmap_queue_wake_all(struct sbitmap_queue *sbq);
>  
>  /**
> + * sbitmap_queue_wake_up() - Wake up some of waiters in one waitqueue
> + * on a &struct sbitmap_queue.
> + * @sbq: Bitmap queue to wake up.
> + */
> +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq);
> +
> +/**
>   * sbitmap_queue_show() - Dump &struct sbitmap_queue information to a &struct
>   * seq_file.
>   * @sbq: Bitmap queue to show.
> diff --git a/lib/sbitmap.c b/lib/sbitmap.c
> index e6a9c06ec70c..14e027a33ffa 100644
> --- a/lib/sbitmap.c
> +++ b/lib/sbitmap.c
> @@ -335,8 +335,9 @@ void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth)
>  	if (sbq->wake_batch != wake_batch) {
>  		WRITE_ONCE(sbq->wake_batch, wake_batch);
>  		/*
> -		 * Pairs with the memory barrier in sbq_wake_up() to ensure that
> -		 * the batch size is updated before the wait counts.
> +		 * Pairs with the memory barrier in sbitmap_queue_wake_up()
> +		 * to ensure that the batch size is updated before the wait
> +		 * counts.
>  		 */
>  		smp_mb__before_atomic();
>  		for (i = 0; i < SBQ_WAIT_QUEUES; i++)
> @@ -425,7 +426,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
>  	return NULL;
>  }
>  
> -static void sbq_wake_up(struct sbitmap_queue *sbq)
> +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq)
>  {
>  	struct sbq_wait_state *ws;
>  	unsigned int wake_batch;

Just after this is:

	/*
	 * Pairs with the memory barrier in set_current_state() to ensure the
	 * proper ordering of clear_bit()/waitqueue_active() in the waker and
	 * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the
	 * waiter. See the comment on waitqueue_active(). This is __after_atomic
	 * because we just did clear_bit_unlock() in the caller.
	 */
	smp_mb__after_atomic();

But there's no atomic operation before this in blk_mq_get_tag(). I don't
think this memory barrier is necessary for the blk_mq_get_tag() case, so
let's move it to sbitmap_queue_clear():


That comment also mentioned clear_bit() but we do clear_bit_unlock()
now, so we can update that at the same time.

> @@ -454,23 +455,24 @@ static void sbq_wake_up(struct sbitmap_queue *sbq)
>  		 */
>  		smp_mb__before_atomic();
>  		/*
> -		 * If there are concurrent callers to sbq_wake_up(), the last
> -		 * one to decrement the wait count below zero will bump it back
> -		 * up. If there is a concurrent resize, the count reset will
> -		 * either cause the cmpxchg to fail or overwrite after the
> -		 * cmpxchg.
> +		 * If there are concurrent callers to sbitmap_queue_wake_up(),
> +		 * the last one to decrement the wait count below zero will
> +		 * bump it back up. If there is a concurrent resize, the count
> +		 * reset will either cause the cmpxchg to fail or overwrite
> +		 * after the cmpxchg.
>  		 */
>  		atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch);
>  		sbq_index_atomic_inc(&sbq->wake_index);
>  		wake_up_nr(&ws->wait, wake_batch);
>  	}
>  }
> +EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up);
>  
>  void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr,
>  			 unsigned int cpu)
>  {
>  	sbitmap_clear_bit_unlock(&sbq->sb, nr);
> -	sbq_wake_up(sbq);
> +	sbitmap_queue_wake_up(sbq);
>  	if (likely(!sbq->round_robin && nr < sbq->sb.depth))
>  		*per_cpu_ptr(sbq->alloc_hint, cpu) = nr;
>  }
> @@ -482,7 +484,7 @@ void sbitmap_queue_wake_all(struct sbitmap_queue *sbq)
>  
>  	/*
>  	 * Pairs with the memory barrier in set_current_state() like in
> -	 * sbq_wake_up().
> +	 * sbitmap_queue_wake_up().
>  	 */
>  	smp_mb();
>  	wake_index = atomic_read(&sbq->wake_index);
> -- 
> 2.9.5
>
diff mbox

Patch

diff --git a/lib/sbitmap.c b/lib/sbitmap.c
index 14e027a33ffa..537328a98c06 100644
--- a/lib/sbitmap.c
+++ b/lib/sbitmap.c
@@ -432,15 +432,6 @@  void sbitmap_queue_wake_up(struct sbitmap_queue *sbq)
 	unsigned int wake_batch;
 	int wait_cnt;
 
-	/*
-	 * Pairs with the memory barrier in set_current_state() to ensure the
-	 * proper ordering of clear_bit()/waitqueue_active() in the waker and
-	 * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the
-	 * waiter. See the comment on waitqueue_active(). This is __after_atomic
-	 * because we just did clear_bit_unlock() in the caller.
-	 */
-	smp_mb__after_atomic();
-
 	ws = sbq_wake_ptr(sbq);
 	if (!ws)
 		return;
@@ -472,6 +463,13 @@  void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr,
 			 unsigned int cpu)
 {
 	sbitmap_clear_bit_unlock(&sbq->sb, nr);
+	/*
+	 * Pairs with the memory barrier in set_current_state() to ensure the
+	 * proper ordering of clear_bit_unlock()/waitqueue_active() in the waker
+	 * and test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the
+	 * waiter. See the comment on waitqueue_active().
+	 */
+	smp_mb__after_atomic();
 	sbitmap_queue_wake_up(sbq);
 	if (likely(!sbq->round_robin && nr < sbq->sb.depth))
 		*per_cpu_ptr(sbq->alloc_hint, cpu) = nr;