diff mbox series

[07/18] io_uring: support for IO polling

Message ID 20190129192702.3605-8-axboe@kernel.dk (mailing list archive)
State New, archived
Headers show
Series [01/18] fs: add an iopoll method to struct file_operations | expand

Commit Message

Jens Axboe Jan. 29, 2019, 7:26 p.m. UTC
Add support for a polled io_uring context. When a read or write is
submitted to a polled context, the application must poll for completions
on the CQ ring through io_uring_enter(2). Polled IO may not generate
IRQ completions, hence they need to be actively found by the application
itself.

To use polling, io_uring_setup() must be used with the
IORING_SETUP_IOPOLL flag being set. It is illegal to mix and match
polled and non-polled IO on an io_uring.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 243 ++++++++++++++++++++++++++++++++--
 include/uapi/linux/io_uring.h |   5 +
 2 files changed, 240 insertions(+), 8 deletions(-)

Comments

Jann Horn Jan. 29, 2019, 8:47 p.m. UTC | #1
On Tue, Jan 29, 2019 at 8:27 PM Jens Axboe <axboe@kernel.dk> wrote:
> Add support for a polled io_uring context. When a read or write is
> submitted to a polled context, the application must poll for completions
> on the CQ ring through io_uring_enter(2). Polled IO may not generate
> IRQ completions, hence they need to be actively found by the application
> itself.
>
> To use polling, io_uring_setup() must be used with the
> IORING_SETUP_IOPOLL flag being set. It is illegal to mix and match
> polled and non-polled IO on an io_uring.
>
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
[...]
> @@ -102,6 +102,8 @@ struct io_ring_ctx {
>
>         struct {
>                 spinlock_t              completion_lock;
> +               bool                    poll_multi_file;
> +               struct list_head        poll_list;

Please add a comment explaining what protects poll_list against
concurrent modification, and ideally also put lockdep asserts in the
functions that access the list to allow the kernel to sanity-check the
locking at runtime.

As far as I understand:
Elements are added by io_iopoll_req_issued(). io_iopoll_req_issued()
can't race with itself because, depending on IORING_SETUP_SQPOLL,
either you have to come through sys_io_uring_enter() (which takes the
uring_lock), or you have to come from the single-threaded
io_sq_thread().
io_do_iopoll() iterates over the list and removes completed items.
io_do_iopoll() is called through io_iopoll_getevents(), which can be
invoked in two ways during normal operation:
 - sys_io_uring_enter -> __io_uring_enter -> io_iopoll_check
->io_iopoll_getevents; this is only protected by the uring_lock
 - io_sq_thread -> io_iopoll_check ->io_iopoll_getevents; this doesn't
hold any locks
Additionally, the following exit paths:
 - io_sq_thread -> io_iopoll_reap_events -> io_iopoll_getevents
 - io_uring_release -> io_ring_ctx_wait_and_kill ->
io_iopoll_reap_events -> io_iopoll_getevents
 - io_uring_release -> io_ring_ctx_wait_and_kill -> io_ring_ctx_free
-> io_iopoll_reap_events -> io_iopoll_getevents

So as far as I can tell, you can have various races around access to
the poll_list.

[...]
> +static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
> +{
> +       if (*nr) {
> +               kmem_cache_free_bulk(req_cachep, *nr, reqs);
> +               io_ring_drop_ctx_refs(ctx, *nr);
> +               *nr = 0;
> +       }
> +}
[...]
> +static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
> +                              struct list_head *done)
> +{
> +       void *reqs[IO_IOPOLL_BATCH];
> +       struct io_kiocb *req;
> +       int to_free = 0;
> +
> +       while (!list_empty(done)) {
> +               req = list_first_entry(done, struct io_kiocb, list);
> +               list_del(&req->list);
> +
> +               io_cqring_fill_event(ctx, req->user_data, req->error, 0);
> +
> +               reqs[to_free++] = req;
> +               (*nr_events)++;
> +
> +               fput(req->rw.ki_filp);
> +               if (to_free == ARRAY_SIZE(reqs))
> +                       io_free_req_many(ctx, reqs, &to_free);
> +       }
> +       io_commit_cqring(ctx);
> +
> +       if (to_free)
> +               io_free_req_many(ctx, reqs, &to_free);

Nit: You check here whether to_free==0, and then io_free_req_many()
does that again. You can delete one of those checks; I'd probably
delete this one.

> +}
[...]
Jens Axboe Jan. 29, 2019, 8:56 p.m. UTC | #2
On 1/29/19 1:47 PM, Jann Horn wrote:
> On Tue, Jan 29, 2019 at 8:27 PM Jens Axboe <axboe@kernel.dk> wrote:
>> Add support for a polled io_uring context. When a read or write is
>> submitted to a polled context, the application must poll for completions
>> on the CQ ring through io_uring_enter(2). Polled IO may not generate
>> IRQ completions, hence they need to be actively found by the application
>> itself.
>>
>> To use polling, io_uring_setup() must be used with the
>> IORING_SETUP_IOPOLL flag being set. It is illegal to mix and match
>> polled and non-polled IO on an io_uring.
>>
>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> [...]
>> @@ -102,6 +102,8 @@ struct io_ring_ctx {
>>
>>         struct {
>>                 spinlock_t              completion_lock;
>> +               bool                    poll_multi_file;
>> +               struct list_head        poll_list;
> 
> Please add a comment explaining what protects poll_list against
> concurrent modification, and ideally also put lockdep asserts in the
> functions that access the list to allow the kernel to sanity-check the
> locking at runtime.

Not sure that's needed, and it would be a bit difficult with the SQPOLL
thread and non-thread being different cases.

But comments I can definitely add.

> As far as I understand:
> Elements are added by io_iopoll_req_issued(). io_iopoll_req_issued()
> can't race with itself because, depending on IORING_SETUP_SQPOLL,
> either you have to come through sys_io_uring_enter() (which takes the
> uring_lock), or you have to come from the single-threaded
> io_sq_thread().
> io_do_iopoll() iterates over the list and removes completed items.
> io_do_iopoll() is called through io_iopoll_getevents(), which can be
> invoked in two ways during normal operation:
>  - sys_io_uring_enter -> __io_uring_enter -> io_iopoll_check
> ->io_iopoll_getevents; this is only protected by the uring_lock
>  - io_sq_thread -> io_iopoll_check ->io_iopoll_getevents; this doesn't
> hold any locks
> Additionally, the following exit paths:
>  - io_sq_thread -> io_iopoll_reap_events -> io_iopoll_getevents
>  - io_uring_release -> io_ring_ctx_wait_and_kill ->
> io_iopoll_reap_events -> io_iopoll_getevents
>  - io_uring_release -> io_ring_ctx_wait_and_kill -> io_ring_ctx_free
> -> io_iopoll_reap_events -> io_iopoll_getevents

Yes, your understanding is correct. But of important note, those two
cases don't co-exist. If you are using SQPOLL, then only the thread
itself is the one that modifies the list. The only valid call of
io_uring_enter(2) is to wakeup the thread, the task itself will NOT be
doing any issues. If you are NOT using SQPOLL, then any access is inside
the ->uring_lock.

For the reap cases, we don't enter those at shutdown for SQPOLL, we
expect the thread to do it. Hence we wait for the thread to exit before
we do our final release.

> So as far as I can tell, you can have various races around access to
> the poll_list.

How did you make that leap?

>> +static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
>> +{
>> +       if (*nr) {
>> +               kmem_cache_free_bulk(req_cachep, *nr, reqs);
>> +               io_ring_drop_ctx_refs(ctx, *nr);
>> +               *nr = 0;
>> +       }
>> +}
> [...]
>> +static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
>> +                              struct list_head *done)
>> +{
>> +       void *reqs[IO_IOPOLL_BATCH];
>> +       struct io_kiocb *req;
>> +       int to_free = 0;
>> +
>> +       while (!list_empty(done)) {
>> +               req = list_first_entry(done, struct io_kiocb, list);
>> +               list_del(&req->list);
>> +
>> +               io_cqring_fill_event(ctx, req->user_data, req->error, 0);
>> +
>> +               reqs[to_free++] = req;
>> +               (*nr_events)++;
>> +
>> +               fput(req->rw.ki_filp);
>> +               if (to_free == ARRAY_SIZE(reqs))
>> +                       io_free_req_many(ctx, reqs, &to_free);
>> +       }
>> +       io_commit_cqring(ctx);
>> +
>> +       if (to_free)
>> +               io_free_req_many(ctx, reqs, &to_free);
> 
> Nit: You check here whether to_free==0, and then io_free_req_many()
> does that again. You can delete one of those checks; I'd probably
> delete this one.

Agree, I'll kill it.
Jann Horn Jan. 29, 2019, 9:10 p.m. UTC | #3
On Tue, Jan 29, 2019 at 9:56 PM Jens Axboe <axboe@kernel.dk> wrote:
> On 1/29/19 1:47 PM, Jann Horn wrote:
> > On Tue, Jan 29, 2019 at 8:27 PM Jens Axboe <axboe@kernel.dk> wrote:
> >> Add support for a polled io_uring context. When a read or write is
> >> submitted to a polled context, the application must poll for completions
> >> on the CQ ring through io_uring_enter(2). Polled IO may not generate
> >> IRQ completions, hence they need to be actively found by the application
> >> itself.
> >>
> >> To use polling, io_uring_setup() must be used with the
> >> IORING_SETUP_IOPOLL flag being set. It is illegal to mix and match
> >> polled and non-polled IO on an io_uring.
> >>
> >> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> > [...]
> >> @@ -102,6 +102,8 @@ struct io_ring_ctx {
> >>
> >>         struct {
> >>                 spinlock_t              completion_lock;
> >> +               bool                    poll_multi_file;
> >> +               struct list_head        poll_list;
> >
> > Please add a comment explaining what protects poll_list against
> > concurrent modification, and ideally also put lockdep asserts in the
> > functions that access the list to allow the kernel to sanity-check the
> > locking at runtime.
>
> Not sure that's needed, and it would be a bit difficult with the SQPOLL
> thread and non-thread being different cases.
>
> But comments I can definitely add.
>
> > As far as I understand:
> > Elements are added by io_iopoll_req_issued(). io_iopoll_req_issued()
> > can't race with itself because, depending on IORING_SETUP_SQPOLL,
> > either you have to come through sys_io_uring_enter() (which takes the
> > uring_lock), or you have to come from the single-threaded
> > io_sq_thread().
> > io_do_iopoll() iterates over the list and removes completed items.
> > io_do_iopoll() is called through io_iopoll_getevents(), which can be
> > invoked in two ways during normal operation:
> >  - sys_io_uring_enter -> __io_uring_enter -> io_iopoll_check
> > ->io_iopoll_getevents; this is only protected by the uring_lock
> >  - io_sq_thread -> io_iopoll_check ->io_iopoll_getevents; this doesn't
> > hold any locks
> > Additionally, the following exit paths:
> >  - io_sq_thread -> io_iopoll_reap_events -> io_iopoll_getevents
> >  - io_uring_release -> io_ring_ctx_wait_and_kill ->
> > io_iopoll_reap_events -> io_iopoll_getevents
> >  - io_uring_release -> io_ring_ctx_wait_and_kill -> io_ring_ctx_free
> > -> io_iopoll_reap_events -> io_iopoll_getevents
>
> Yes, your understanding is correct. But of important note, those two
> cases don't co-exist. If you are using SQPOLL, then only the thread
> itself is the one that modifies the list. The only valid call of
> io_uring_enter(2) is to wakeup the thread, the task itself will NOT be
> doing any issues. If you are NOT using SQPOLL, then any access is inside
> the ->uring_lock.
>
> For the reap cases, we don't enter those at shutdown for SQPOLL, we
> expect the thread to do it. Hence we wait for the thread to exit before
> we do our final release.
>
> > So as far as I can tell, you can have various races around access to
> > the poll_list.
>
> How did you make that leap?

Ah, you're right, I missed a check when going through
__io_uring_enter(), never mind.
Jens Axboe Jan. 29, 2019, 9:33 p.m. UTC | #4
On 1/29/19 2:10 PM, Jann Horn wrote:
> On Tue, Jan 29, 2019 at 9:56 PM Jens Axboe <axboe@kernel.dk> wrote:
>> On 1/29/19 1:47 PM, Jann Horn wrote:
>>> On Tue, Jan 29, 2019 at 8:27 PM Jens Axboe <axboe@kernel.dk> wrote:
>>>> Add support for a polled io_uring context. When a read or write is
>>>> submitted to a polled context, the application must poll for completions
>>>> on the CQ ring through io_uring_enter(2). Polled IO may not generate
>>>> IRQ completions, hence they need to be actively found by the application
>>>> itself.
>>>>
>>>> To use polling, io_uring_setup() must be used with the
>>>> IORING_SETUP_IOPOLL flag being set. It is illegal to mix and match
>>>> polled and non-polled IO on an io_uring.
>>>>
>>>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
>>> [...]
>>>> @@ -102,6 +102,8 @@ struct io_ring_ctx {
>>>>
>>>>         struct {
>>>>                 spinlock_t              completion_lock;
>>>> +               bool                    poll_multi_file;
>>>> +               struct list_head        poll_list;
>>>
>>> Please add a comment explaining what protects poll_list against
>>> concurrent modification, and ideally also put lockdep asserts in the
>>> functions that access the list to allow the kernel to sanity-check the
>>> locking at runtime.
>>
>> Not sure that's needed, and it would be a bit difficult with the SQPOLL
>> thread and non-thread being different cases.
>>
>> But comments I can definitely add.
>>
>>> As far as I understand:
>>> Elements are added by io_iopoll_req_issued(). io_iopoll_req_issued()
>>> can't race with itself because, depending on IORING_SETUP_SQPOLL,
>>> either you have to come through sys_io_uring_enter() (which takes the
>>> uring_lock), or you have to come from the single-threaded
>>> io_sq_thread().
>>> io_do_iopoll() iterates over the list and removes completed items.
>>> io_do_iopoll() is called through io_iopoll_getevents(), which can be
>>> invoked in two ways during normal operation:
>>>  - sys_io_uring_enter -> __io_uring_enter -> io_iopoll_check
>>> ->io_iopoll_getevents; this is only protected by the uring_lock
>>>  - io_sq_thread -> io_iopoll_check ->io_iopoll_getevents; this doesn't
>>> hold any locks
>>> Additionally, the following exit paths:
>>>  - io_sq_thread -> io_iopoll_reap_events -> io_iopoll_getevents
>>>  - io_uring_release -> io_ring_ctx_wait_and_kill ->
>>> io_iopoll_reap_events -> io_iopoll_getevents
>>>  - io_uring_release -> io_ring_ctx_wait_and_kill -> io_ring_ctx_free
>>> -> io_iopoll_reap_events -> io_iopoll_getevents
>>
>> Yes, your understanding is correct. But of important note, those two
>> cases don't co-exist. If you are using SQPOLL, then only the thread
>> itself is the one that modifies the list. The only valid call of
>> io_uring_enter(2) is to wakeup the thread, the task itself will NOT be
>> doing any issues. If you are NOT using SQPOLL, then any access is inside
>> the ->uring_lock.
>>
>> For the reap cases, we don't enter those at shutdown for SQPOLL, we
>> expect the thread to do it. Hence we wait for the thread to exit before
>> we do our final release.
>>
>>> So as far as I can tell, you can have various races around access to
>>> the poll_list.
>>
>> How did you make that leap?
> 
> Ah, you're right, I missed a check when going through
> __io_uring_enter(), never mind.

OK good, thanks for confirming, was afraid I was starting to lose my
mind.
diff mbox series

Patch

diff --git a/fs/io_uring.c b/fs/io_uring.c
index c75a3e197ed5..a4f4d75609d5 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -102,6 +102,8 @@  struct io_ring_ctx {
 
 	struct {
 		spinlock_t		completion_lock;
+		bool			poll_multi_file;
+		struct list_head	poll_list;
 	} ____cacheline_aligned_in_smp;
 };
 
@@ -120,12 +122,15 @@  struct io_kiocb {
 	struct list_head	list;
 	unsigned int		flags;
 #define REQ_F_FORCE_NONBLOCK	1	/* inline submission attempt */
+#define REQ_F_IOPOLL_COMPLETED	2	/* polled IO has completed */
 	u64			user_data;
+	u64			error;
 
 	struct work_struct	work;
 };
 
 #define IO_PLUG_THRESHOLD		2
+#define IO_IOPOLL_BATCH			8
 
 static struct kmem_cache *req_cachep;
 
@@ -157,6 +162,7 @@  static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	mutex_init(&ctx->uring_lock);
 	init_waitqueue_head(&ctx->wait);
 	spin_lock_init(&ctx->completion_lock);
+	INIT_LIST_HEAD(&ctx->poll_list);
 	return ctx;
 }
 
@@ -251,12 +257,154 @@  static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx)
 	return NULL;
 }
 
+static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
+{
+	if (*nr) {
+		kmem_cache_free_bulk(req_cachep, *nr, reqs);
+		io_ring_drop_ctx_refs(ctx, *nr);
+		*nr = 0;
+	}
+}
+
 static void io_free_req(struct io_kiocb *req)
 {
 	io_ring_drop_ctx_refs(req->ctx, 1);
 	kmem_cache_free(req_cachep, req);
 }
 
+/*
+ * Find and free completed poll iocbs
+ */
+static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
+			       struct list_head *done)
+{
+	void *reqs[IO_IOPOLL_BATCH];
+	struct io_kiocb *req;
+	int to_free = 0;
+
+	while (!list_empty(done)) {
+		req = list_first_entry(done, struct io_kiocb, list);
+		list_del(&req->list);
+
+		io_cqring_fill_event(ctx, req->user_data, req->error, 0);
+
+		reqs[to_free++] = req;
+		(*nr_events)++;
+
+		fput(req->rw.ki_filp);
+		if (to_free == ARRAY_SIZE(reqs))
+			io_free_req_many(ctx, reqs, &to_free);
+	}
+	io_commit_cqring(ctx);
+
+	if (to_free)
+		io_free_req_many(ctx, reqs, &to_free);
+}
+
+static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
+			long min)
+{
+	struct io_kiocb *req, *tmp;
+	LIST_HEAD(done);
+	bool spin;
+	int ret;
+
+	/*
+	 * Only spin for completions if we don't have multiple devices hanging
+	 * off our complete list, and we're under the requested amount.
+	 */
+	spin = !ctx->poll_multi_file && *nr_events < min;
+
+	ret = 0;
+	list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
+		struct kiocb *kiocb = &req->rw;
+
+		/*
+		 * Move completed entries to our local list. If we find a
+		 * request that requires polling, break out and complete
+		 * the done list first, if we have entries there.
+		 */
+		if (req->flags & REQ_F_IOPOLL_COMPLETED) {
+			list_move_tail(&req->list, &done);
+			continue;
+		}
+		if (!list_empty(&done))
+			break;
+
+		ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
+		if (ret < 0)
+			break;
+
+		if (ret && spin)
+			spin = false;
+		ret = 0;
+	}
+
+	if (!list_empty(&done))
+		io_iopoll_complete(ctx, nr_events, &done);
+
+	return ret;
+}
+
+/*
+ * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
+ * non-spinning poll check - we'll still enter the driver poll loop, but only
+ * as a non-spinning completion check.
+ */
+static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
+				long min)
+{
+	while (!list_empty(&ctx->poll_list)) {
+		int ret;
+
+		ret = io_do_iopoll(ctx, nr_events, min);
+		if (ret < 0)
+			return ret;
+		if (!min || *nr_events >= min)
+			return 0;
+	}
+
+	return 1;
+}
+
+/*
+ * We can't just wait for polled events to come to us, we have to actively
+ * find and complete them.
+ */
+static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
+{
+	if (!(ctx->flags & IORING_SETUP_IOPOLL))
+		return;
+
+	mutex_lock(&ctx->uring_lock);
+	while (!list_empty(&ctx->poll_list)) {
+		unsigned int nr_events = 0;
+
+		io_iopoll_getevents(ctx, &nr_events, 1);
+	}
+	mutex_unlock(&ctx->uring_lock);
+}
+
+static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
+			   long min)
+{
+	int ret = 0;
+
+	do {
+		int tmin = 0;
+
+		if (*nr_events < min)
+			tmin = min - *nr_events;
+
+		ret = io_iopoll_getevents(ctx, nr_events, tmin);
+		if (ret <= 0)
+			break;
+		ret = 0;
+	} while (!*nr_events || !need_resched());
+
+	return ret;
+}
+
 static void kiocb_end_write(struct kiocb *kiocb)
 {
 	if (kiocb->ki_flags & IOCB_WRITE) {
@@ -283,9 +431,57 @@  static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
 	io_free_req(req);
 }
 
+static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
+{
+	struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
+
+	kiocb_end_write(kiocb);
+
+	req->error = res;
+	if (res != -EAGAIN)
+		req->flags |= REQ_F_IOPOLL_COMPLETED;
+}
+
+/*
+ * After the iocb has been issued, it's safe to be found on the poll list.
+ * Adding the kiocb to the list AFTER submission ensures that we don't
+ * find it from a io_iopoll_getevents() thread before the issuer is done
+ * accessing the kiocb cookie.
+ */
+static void io_iopoll_req_issued(struct io_kiocb *req)
+{
+	struct io_ring_ctx *ctx = req->ctx;
+
+	/*
+	 * Track whether we have multiple files in our lists. This will impact
+	 * how we do polling eventually, not spinning if we're on potentially
+	 * different devices.
+	 */
+	if (list_empty(&ctx->poll_list)) {
+		ctx->poll_multi_file = false;
+	} else if (!ctx->poll_multi_file) {
+		struct io_kiocb *list_req;
+
+		list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
+						list);
+		if (list_req->rw.ki_filp != req->rw.ki_filp)
+			ctx->poll_multi_file = true;
+	}
+
+	/*
+	 * For fast devices, IO may have already completed. If it has, add
+	 * it to the front so we find it first.
+	 */
+	if (req->flags & REQ_F_IOPOLL_COMPLETED)
+		list_add(&req->list, &ctx->poll_list);
+	else
+		list_add_tail(&req->list, &ctx->poll_list);
+}
+
 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		      bool force_nonblock)
 {
+	struct io_ring_ctx *ctx = req->ctx;
 	struct kiocb *kiocb = &req->rw;
 	unsigned ioprio;
 	int fd, ret;
@@ -315,12 +511,21 @@  static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		kiocb->ki_flags |= IOCB_NOWAIT;
 		req->flags |= REQ_F_FORCE_NONBLOCK;
 	}
-	if (kiocb->ki_flags & IOCB_HIPRI) {
-		ret = -EINVAL;
-		goto out_fput;
-	}
+	if (ctx->flags & IORING_SETUP_IOPOLL) {
+		ret = -EOPNOTSUPP;
+		if (!(kiocb->ki_flags & IOCB_DIRECT) ||
+		    !kiocb->ki_filp->f_op->iopoll)
+			goto out_fput;
 
-	kiocb->ki_complete = io_complete_rw;
+		kiocb->ki_flags |= IOCB_HIPRI;
+		kiocb->ki_complete = io_complete_rw_iopoll;
+	} else {
+		if (kiocb->ki_flags & IOCB_HIPRI) {
+			ret = -EINVAL;
+			goto out_fput;
+		}
+		kiocb->ki_complete = io_complete_rw;
+	}
 	return 0;
 out_fput:
 	fput(kiocb->ki_filp);
@@ -469,6 +674,9 @@  static int io_nop(struct io_kiocb *req, u64 user_data)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 
+	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
+		return -EINVAL;
+
 	io_cqring_add_event(ctx, user_data, 0, 0);
 	io_free_req(req);
 	return 0;
@@ -489,6 +697,8 @@  static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 	if (force_nonblock)
 		return -EAGAIN;
 
+	if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
+		return -EINVAL;
 	if (unlikely(sqe->addr || sqe->ioprio))
 		return -EINVAL;
 
@@ -540,7 +750,16 @@  static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 		break;
 	}
 
-	return ret;
+	if (ret)
+		return ret;
+
+	if (ctx->flags & IORING_SETUP_IOPOLL) {
+		if (req->error == -EAGAIN)
+			return -EAGAIN;
+		io_iopoll_req_issued(req);
+	}
+
+	return 0;
 }
 
 static void io_sq_wq_submit_work(struct work_struct *work)
@@ -763,6 +982,8 @@  static int __io_uring_enter(struct io_ring_ctx *ctx, unsigned to_submit,
 			return submitted;
 	}
 	if (flags & IORING_ENTER_GETEVENTS) {
+		unsigned nr_events = 0;
+
 		/*
 		 * The application could have included the 'to_submit' count
 		 * in how many events it wanted to wait for. If we failed to
@@ -772,7 +993,10 @@  static int __io_uring_enter(struct io_ring_ctx *ctx, unsigned to_submit,
 		if (submitted < to_submit)
 			min_complete = min_t(unsigned, submitted, min_complete);
 
-		ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
+		if (ctx->flags & IORING_SETUP_IOPOLL)
+			ret = io_iopoll_check(ctx, &nr_events, min_complete);
+		else
+			ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
 	}
 
 	return submitted ? submitted : ret;
@@ -873,6 +1097,8 @@  static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	mmdrop(ctx->sqo_mm);
 	put_files_struct(ctx->sqo_files);
 
+	io_iopoll_reap_events(ctx);
+
 	io_mem_free(ctx->sq_ring);
 	io_mem_free(ctx->sq_sqes);
 	io_mem_free(ctx->cq_ring);
@@ -910,6 +1136,7 @@  static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 	percpu_ref_kill(&ctx->refs);
 	mutex_unlock(&ctx->uring_lock);
 
+	io_iopoll_reap_events(ctx);
 	wait_for_completion(&ctx->ctx_done);
 	io_ring_ctx_free(ctx);
 }
@@ -1131,7 +1358,7 @@  static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
 			return -EINVAL;
 	}
 
-	if (p.flags)
+	if (p.flags & ~IORING_SETUP_IOPOLL)
 		return -EINVAL;
 
 	ret = io_uring_create(entries, &p);
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 0fca46f8fc37..4952fc921866 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -30,6 +30,11 @@  struct io_uring_sqe {
 	__u64	__pad2[3];
 };
 
+/*
+ * io_uring_setup() flags
+ */
+#define IORING_SETUP_IOPOLL	(1U << 0)	/* io_context is polled */
+
 #define IORING_OP_NOP		0
 #define IORING_OP_READV		1
 #define IORING_OP_WRITEV	2