diff mbox series

[3/3] io_uring: add support for backlogged CQ ring

Message ID 20191107160043.31725-4-axboe@kernel.dk (mailing list archive)
State New, archived
Headers show
Series io_uring CQ ring backpressure | expand

Commit Message

Jens Axboe Nov. 7, 2019, 4 p.m. UTC
Currently we drop completion events, if the CQ ring is full. That's fine
for requests with bounded completion times, but it may make it harder to
use io_uring with networked IO where request completion times are
generally unbounded. Or with POLL, for example, which is also unbounded.

This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
for CQ ring overflows. First of all, it doesn't overflow the ring, it
simply stores a backlog of completions that we weren't able to put into
the CQ ring. To prevent the backlog from growing indefinitely, if the
backlog is non-empty, we apply back pressure on IO submissions. Any
attempt to submit new IO with a non-empty backlog will get an -EBUSY
return from the kernel. This is a signal to the application that it has
backlogged CQ events, and that it must reap those before being allowed
to submit more IO.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
 include/uapi/linux/io_uring.h |   1 +
 2 files changed, 87 insertions(+), 17 deletions(-)

Comments

Pavel Begunkov Nov. 9, 2019, 12:25 p.m. UTC | #1
On 11/7/2019 7:00 PM, Jens Axboe wrote:
> Currently we drop completion events, if the CQ ring is full. That's fine
> for requests with bounded completion times, but it may make it harder to
> use io_uring with networked IO where request completion times are
> generally unbounded. Or with POLL, for example, which is also unbounded.
> 
> This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
> for CQ ring overflows. First of all, it doesn't overflow the ring, it
> simply stores a backlog of completions that we weren't able to put into
> the CQ ring. To prevent the backlog from growing indefinitely, if the
> backlog is non-empty, we apply back pressure on IO submissions. Any
> attempt to submit new IO with a non-empty backlog will get an -EBUSY
> return from the kernel. This is a signal to the application that it has
> backlogged CQ events, and that it must reap those before being allowed
> to submit more IO.>
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
>  include/uapi/linux/io_uring.h |   1 +
>  2 files changed, 87 insertions(+), 17 deletions(-)
> 
> diff --git a/fs/io_uring.c b/fs/io_uring.c
> index f69d9794ce17..ff0f79a57f7b 100644
> --- a/fs/io_uring.c
> +++ b/fs/io_uring.c
> @@ -207,6 +207,7 @@ struct io_ring_ctx {
>  
>  		struct list_head	defer_list;
>  		struct list_head	timeout_list;
> +		struct list_head	cq_overflow_list;
>  
>  		wait_queue_head_t	inflight_wait;
>  	} ____cacheline_aligned_in_smp;
> @@ -414,6 +415,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
>  
>  	ctx->flags = p->flags;
>  	init_waitqueue_head(&ctx->cq_wait);
> +	INIT_LIST_HEAD(&ctx->cq_overflow_list);
>  	init_completion(&ctx->ctx_done);
>  	init_completion(&ctx->sqo_thread_started);
>  	mutex_init(&ctx->uring_lock);
> @@ -588,6 +590,72 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
>  	return &rings->cqes[tail & ctx->cq_mask];
>  }
>  
> +static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
> +{
> +	if (waitqueue_active(&ctx->wait))
> +		wake_up(&ctx->wait);
> +	if (waitqueue_active(&ctx->sqo_wait))
> +		wake_up(&ctx->sqo_wait);
> +	if (ctx->cq_ev_fd)
> +		eventfd_signal(ctx->cq_ev_fd, 1);
> +}
> +
> +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
> +{
> +	struct io_rings *rings = ctx->rings;
> +	struct io_uring_cqe *cqe;
> +	struct io_kiocb *req;
> +	unsigned long flags;
> +	LIST_HEAD(list);
> +
> +	if (list_empty_careful(&ctx->cq_overflow_list))
> +		return;
> +	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
> +	    rings->cq_ring_entries)
> +		return;
> +
> +	spin_lock_irqsave(&ctx->completion_lock, flags);
> +
> +	while (!list_empty(&ctx->cq_overflow_list)) {
> +		cqe = io_get_cqring(ctx);
> +		if (!cqe && !force)
> +			break;> +
> +		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
> +						list);
> +		list_move(&req->list, &list);
> +		if (cqe) {
> +			WRITE_ONCE(cqe->user_data, req->user_data);
> +			WRITE_ONCE(cqe->res, req->result);
> +			WRITE_ONCE(cqe->flags, 0);
> +		}

Hmm, second thought. We should account overflow here.

> +	}
> +
> +	io_commit_cqring(ctx);
> +	spin_unlock_irqrestore(&ctx->completion_lock, flags);
> +	io_cqring_ev_posted(ctx);
> +
> +	while (!list_empty(&list)) {
> +		req = list_first_entry(&list, struct io_kiocb, list);
> +		list_del(&req->list);
> +		io_put_req(req, NULL);
> +	}
> +}
> +
> +static void io_cqring_overflow(struct io_ring_ctx *ctx, struct io_kiocb *req,
> +			       long res)
> +	__must_hold(&ctx->completion_lock)
> +{
> +	if (!(ctx->flags & IORING_SETUP_CQ_NODROP)) {
> +		WRITE_ONCE(ctx->rings->cq_overflow,
> +				atomic_inc_return(&ctx->cached_cq_overflow));
> +	} else {
> +		refcount_inc(&req->refs);
> +		req->result = res;
> +		list_add_tail(&req->list, &ctx->cq_overflow_list);
> +	}
> +}
> +
>  static void io_cqring_fill_event(struct io_kiocb *req, long res)
>  {
>  	struct io_ring_ctx *ctx = req->ctx;
> @@ -601,26 +669,15 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
>  	 * the ring.
>  	 */
>  	cqe = io_get_cqring(ctx);
> -	if (cqe) {
> +	if (likely(cqe)) {
>  		WRITE_ONCE(cqe->user_data, req->user_data);
>  		WRITE_ONCE(cqe->res, res);
>  		WRITE_ONCE(cqe->flags, 0);
>  	} else {
> -		WRITE_ONCE(ctx->rings->cq_overflow,
> -				atomic_inc_return(&ctx->cached_cq_overflow));
> +		io_cqring_overflow(ctx, req, res);
>  	}
>  }
>  
> -static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
> -{
> -	if (waitqueue_active(&ctx->wait))
> -		wake_up(&ctx->wait);
> -	if (waitqueue_active(&ctx->sqo_wait))
> -		wake_up(&ctx->sqo_wait);
> -	if (ctx->cq_ev_fd)
> -		eventfd_signal(ctx->cq_ev_fd, 1);
> -}
> -
>  static void io_cqring_add_event(struct io_kiocb *req, long res)
>  {
>  	struct io_ring_ctx *ctx = req->ctx;
> @@ -877,6 +934,9 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx)
>  {
>  	struct io_rings *rings = ctx->rings;
>  
> +	if (ctx->flags & IORING_SETUP_CQ_NODROP)
> +		io_cqring_overflow_flush(ctx, false);
> +
>  	/* See comment at the top of this file */
>  	smp_rmb();
>  	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
> @@ -2876,6 +2936,10 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
>  	int i, submitted = 0;
>  	bool mm_fault = false;
>  
> +	if ((ctx->flags & IORING_SETUP_CQ_NODROP) &&
> +	    !list_empty(&ctx->cq_overflow_list))
> +		return -EBUSY;
> +
>  	if (nr > IO_PLUG_THRESHOLD) {
>  		io_submit_state_start(&state, ctx, nr);
>  		statep = &state;
> @@ -2967,6 +3031,7 @@ static int io_sq_thread(void *data)
>  	timeout = inflight = 0;
>  	while (!kthread_should_park()) {
>  		unsigned int to_submit;
> +		int ret;
>  
>  		if (inflight) {
>  			unsigned nr_events = 0;
> @@ -3051,8 +3116,9 @@ static int io_sq_thread(void *data)
>  		}
>  
>  		to_submit = min(to_submit, ctx->sq_entries);
> -		inflight += io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm,
> -					   true);
> +		ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
> +		if (ret > 0)
> +			inflight += ret;
>  	}
>  
>  	set_fs(old_fs);
> @@ -4116,8 +4182,10 @@ static int io_uring_flush(struct file *file, void *data)
>  	struct io_ring_ctx *ctx = file->private_data;
>  
>  	io_uring_cancel_files(ctx, data);
> -	if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
> +	if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
> +		io_cqring_overflow_flush(ctx, true);
>  		io_wq_cancel_all(ctx->io_wq);
> +	}
>  	return 0;
>  }
>  
> @@ -4418,7 +4486,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
>  	}
>  
>  	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
> -			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
> +			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
> +			IORING_SETUP_CQ_NODROP))
>  		return -EINVAL;
>  
>  	ret = io_uring_create(entries, &p);
> diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
> index f1a118b01d18..3d8517eb376e 100644
> --- a/include/uapi/linux/io_uring.h
> +++ b/include/uapi/linux/io_uring.h
> @@ -56,6 +56,7 @@ struct io_uring_sqe {
>  #define IORING_SETUP_SQPOLL	(1U << 1)	/* SQ poll thread */
>  #define IORING_SETUP_SQ_AFF	(1U << 2)	/* sq_thread_cpu is valid */
>  #define IORING_SETUP_CQSIZE	(1U << 3)	/* app defines CQ size */
> +#define IORING_SETUP_CQ_NODROP	(1U << 4)	/* no CQ drops */
>  
>  #define IORING_OP_NOP		0
>  #define IORING_OP_READV		1
>
Pavel Begunkov Nov. 9, 2019, 12:33 p.m. UTC | #2
On 11/9/2019 3:25 PM, Pavel Begunkov wrote:
> On 11/7/2019 7:00 PM, Jens Axboe wrote:
>> Currently we drop completion events, if the CQ ring is full. That's fine
>> for requests with bounded completion times, but it may make it harder to
>> use io_uring with networked IO where request completion times are
>> generally unbounded. Or with POLL, for example, which is also unbounded.
>>
>> This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
>> for CQ ring overflows. First of all, it doesn't overflow the ring, it
>> simply stores a backlog of completions that we weren't able to put into
>> the CQ ring. To prevent the backlog from growing indefinitely, if the
>> backlog is non-empty, we apply back pressure on IO submissions. Any
>> attempt to submit new IO with a non-empty backlog will get an -EBUSY
>> return from the kernel. This is a signal to the application that it has
>> backlogged CQ events, and that it must reap those before being allowed
>> to submit more IO.>
>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
>> ---
>>  fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
>>  include/uapi/linux/io_uring.h |   1 +
>>  2 files changed, 87 insertions(+), 17 deletions(-)
>>
>> +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
>> +{
>> +	struct io_rings *rings = ctx->rings;
>> +	struct io_uring_cqe *cqe;
>> +	struct io_kiocb *req;
>> +	unsigned long flags;
>> +	LIST_HEAD(list);
>> +
>> +	if (list_empty_careful(&ctx->cq_overflow_list))
>> +		return;
>> +	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
>> +	    rings->cq_ring_entries)
>> +		return;
>> +
>> +	spin_lock_irqsave(&ctx->completion_lock, flags);
>> +
>> +	while (!list_empty(&ctx->cq_overflow_list)) {
>> +		cqe = io_get_cqring(ctx);
>> +		if (!cqe && !force)
>> +			break;> +
>> +		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
>> +						list);
>> +		list_move(&req->list, &list);
>> +		if (cqe) {
>> +			WRITE_ONCE(cqe->user_data, req->user_data);
>> +			WRITE_ONCE(cqe->res, req->result);
>> +			WRITE_ONCE(cqe->flags, 0);
>> +		}
> 
> Hmm, second thought. We should account overflow here.
> 
Clarification: We should account overflow in case of (!cqe).

i.e.
if (!cqe) { // else
	WRITE_ONCE(ctx->rings->cq_overflow,
			atomic_inc_return(&ctx->cached_cq_overflow));
}

>> +	}
>> +
>> +	io_commit_cqring(ctx);
>> +	spin_unlock_irqrestore(&ctx->completion_lock, flags);
>> +	io_cqring_ev_posted(ctx);
>> +
>> +	while (!list_empty(&list)) {
>> +		req = list_first_entry(&list, struct io_kiocb, list);
>> +		list_del(&req->list);
>> +		io_put_req(req, NULL);
>> +	}
>> +}
>> +
Jens Axboe Nov. 9, 2019, 2:14 p.m. UTC | #3
On 11/9/19 5:33 AM, Pavel Begunkov wrote:
> On 11/9/2019 3:25 PM, Pavel Begunkov wrote:
>> On 11/7/2019 7:00 PM, Jens Axboe wrote:
>>> Currently we drop completion events, if the CQ ring is full. That's fine
>>> for requests with bounded completion times, but it may make it harder to
>>> use io_uring with networked IO where request completion times are
>>> generally unbounded. Or with POLL, for example, which is also unbounded.
>>>
>>> This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
>>> for CQ ring overflows. First of all, it doesn't overflow the ring, it
>>> simply stores a backlog of completions that we weren't able to put into
>>> the CQ ring. To prevent the backlog from growing indefinitely, if the
>>> backlog is non-empty, we apply back pressure on IO submissions. Any
>>> attempt to submit new IO with a non-empty backlog will get an -EBUSY
>>> return from the kernel. This is a signal to the application that it has
>>> backlogged CQ events, and that it must reap those before being allowed
>>> to submit more IO.>
>>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
>>> ---
>>>   fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
>>>   include/uapi/linux/io_uring.h |   1 +
>>>   2 files changed, 87 insertions(+), 17 deletions(-)
>>>
>>> +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
>>> +{
>>> +	struct io_rings *rings = ctx->rings;
>>> +	struct io_uring_cqe *cqe;
>>> +	struct io_kiocb *req;
>>> +	unsigned long flags;
>>> +	LIST_HEAD(list);
>>> +
>>> +	if (list_empty_careful(&ctx->cq_overflow_list))
>>> +		return;
>>> +	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
>>> +	    rings->cq_ring_entries)
>>> +		return;
>>> +
>>> +	spin_lock_irqsave(&ctx->completion_lock, flags);
>>> +
>>> +	while (!list_empty(&ctx->cq_overflow_list)) {
>>> +		cqe = io_get_cqring(ctx);
>>> +		if (!cqe && !force)
>>> +			break;> +
>>> +		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
>>> +						list);
>>> +		list_move(&req->list, &list);
>>> +		if (cqe) {
>>> +			WRITE_ONCE(cqe->user_data, req->user_data);
>>> +			WRITE_ONCE(cqe->res, req->result);
>>> +			WRITE_ONCE(cqe->flags, 0);
>>> +		}
>>
>> Hmm, second thought. We should account overflow here.
>>
> Clarification: We should account overflow in case of (!cqe).
> 
> i.e.
> if (!cqe) { // else
> 	WRITE_ONCE(ctx->rings->cq_overflow,
> 			atomic_inc_return(&ctx->cached_cq_overflow));
> }

Ah yes, we should, even if this is only the flush path. I'll send out
a patch for that, unless you beat me to it.
diff mbox series

Patch

diff --git a/fs/io_uring.c b/fs/io_uring.c
index f69d9794ce17..ff0f79a57f7b 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -207,6 +207,7 @@  struct io_ring_ctx {
 
 		struct list_head	defer_list;
 		struct list_head	timeout_list;
+		struct list_head	cq_overflow_list;
 
 		wait_queue_head_t	inflight_wait;
 	} ____cacheline_aligned_in_smp;
@@ -414,6 +415,7 @@  static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 
 	ctx->flags = p->flags;
 	init_waitqueue_head(&ctx->cq_wait);
+	INIT_LIST_HEAD(&ctx->cq_overflow_list);
 	init_completion(&ctx->ctx_done);
 	init_completion(&ctx->sqo_thread_started);
 	mutex_init(&ctx->uring_lock);
@@ -588,6 +590,72 @@  static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 	return &rings->cqes[tail & ctx->cq_mask];
 }
 
+static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
+{
+	if (waitqueue_active(&ctx->wait))
+		wake_up(&ctx->wait);
+	if (waitqueue_active(&ctx->sqo_wait))
+		wake_up(&ctx->sqo_wait);
+	if (ctx->cq_ev_fd)
+		eventfd_signal(ctx->cq_ev_fd, 1);
+}
+
+static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
+{
+	struct io_rings *rings = ctx->rings;
+	struct io_uring_cqe *cqe;
+	struct io_kiocb *req;
+	unsigned long flags;
+	LIST_HEAD(list);
+
+	if (list_empty_careful(&ctx->cq_overflow_list))
+		return;
+	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
+	    rings->cq_ring_entries)
+		return;
+
+	spin_lock_irqsave(&ctx->completion_lock, flags);
+
+	while (!list_empty(&ctx->cq_overflow_list)) {
+		cqe = io_get_cqring(ctx);
+		if (!cqe && !force)
+			break;
+
+		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
+						list);
+		list_move(&req->list, &list);
+		if (cqe) {
+			WRITE_ONCE(cqe->user_data, req->user_data);
+			WRITE_ONCE(cqe->res, req->result);
+			WRITE_ONCE(cqe->flags, 0);
+		}
+	}
+
+	io_commit_cqring(ctx);
+	spin_unlock_irqrestore(&ctx->completion_lock, flags);
+	io_cqring_ev_posted(ctx);
+
+	while (!list_empty(&list)) {
+		req = list_first_entry(&list, struct io_kiocb, list);
+		list_del(&req->list);
+		io_put_req(req, NULL);
+	}
+}
+
+static void io_cqring_overflow(struct io_ring_ctx *ctx, struct io_kiocb *req,
+			       long res)
+	__must_hold(&ctx->completion_lock)
+{
+	if (!(ctx->flags & IORING_SETUP_CQ_NODROP)) {
+		WRITE_ONCE(ctx->rings->cq_overflow,
+				atomic_inc_return(&ctx->cached_cq_overflow));
+	} else {
+		refcount_inc(&req->refs);
+		req->result = res;
+		list_add_tail(&req->list, &ctx->cq_overflow_list);
+	}
+}
+
 static void io_cqring_fill_event(struct io_kiocb *req, long res)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -601,26 +669,15 @@  static void io_cqring_fill_event(struct io_kiocb *req, long res)
 	 * the ring.
 	 */
 	cqe = io_get_cqring(ctx);
-	if (cqe) {
+	if (likely(cqe)) {
 		WRITE_ONCE(cqe->user_data, req->user_data);
 		WRITE_ONCE(cqe->res, res);
 		WRITE_ONCE(cqe->flags, 0);
 	} else {
-		WRITE_ONCE(ctx->rings->cq_overflow,
-				atomic_inc_return(&ctx->cached_cq_overflow));
+		io_cqring_overflow(ctx, req, res);
 	}
 }
 
-static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
-{
-	if (waitqueue_active(&ctx->wait))
-		wake_up(&ctx->wait);
-	if (waitqueue_active(&ctx->sqo_wait))
-		wake_up(&ctx->sqo_wait);
-	if (ctx->cq_ev_fd)
-		eventfd_signal(ctx->cq_ev_fd, 1);
-}
-
 static void io_cqring_add_event(struct io_kiocb *req, long res)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -877,6 +934,9 @@  static unsigned io_cqring_events(struct io_ring_ctx *ctx)
 {
 	struct io_rings *rings = ctx->rings;
 
+	if (ctx->flags & IORING_SETUP_CQ_NODROP)
+		io_cqring_overflow_flush(ctx, false);
+
 	/* See comment at the top of this file */
 	smp_rmb();
 	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
@@ -2876,6 +2936,10 @@  static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
 	int i, submitted = 0;
 	bool mm_fault = false;
 
+	if ((ctx->flags & IORING_SETUP_CQ_NODROP) &&
+	    !list_empty(&ctx->cq_overflow_list))
+		return -EBUSY;
+
 	if (nr > IO_PLUG_THRESHOLD) {
 		io_submit_state_start(&state, ctx, nr);
 		statep = &state;
@@ -2967,6 +3031,7 @@  static int io_sq_thread(void *data)
 	timeout = inflight = 0;
 	while (!kthread_should_park()) {
 		unsigned int to_submit;
+		int ret;
 
 		if (inflight) {
 			unsigned nr_events = 0;
@@ -3051,8 +3116,9 @@  static int io_sq_thread(void *data)
 		}
 
 		to_submit = min(to_submit, ctx->sq_entries);
-		inflight += io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm,
-					   true);
+		ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
+		if (ret > 0)
+			inflight += ret;
 	}
 
 	set_fs(old_fs);
@@ -4116,8 +4182,10 @@  static int io_uring_flush(struct file *file, void *data)
 	struct io_ring_ctx *ctx = file->private_data;
 
 	io_uring_cancel_files(ctx, data);
-	if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
+	if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
+		io_cqring_overflow_flush(ctx, true);
 		io_wq_cancel_all(ctx->io_wq);
+	}
 	return 0;
 }
 
@@ -4418,7 +4486,8 @@  static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
 	}
 
 	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
-			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
+			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
+			IORING_SETUP_CQ_NODROP))
 		return -EINVAL;
 
 	ret = io_uring_create(entries, &p);
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index f1a118b01d18..3d8517eb376e 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -56,6 +56,7 @@  struct io_uring_sqe {
 #define IORING_SETUP_SQPOLL	(1U << 1)	/* SQ poll thread */
 #define IORING_SETUP_SQ_AFF	(1U << 2)	/* sq_thread_cpu is valid */
 #define IORING_SETUP_CQSIZE	(1U << 3)	/* app defines CQ size */
+#define IORING_SETUP_CQ_NODROP	(1U << 4)	/* no CQ drops */
 
 #define IORING_OP_NOP		0
 #define IORING_OP_READV		1