diff mbox series

[15/26] aio: support for IO polling

Message ID 20181204233729.26776-16-axboe@kernel.dk (mailing list archive)
State New, archived
Headers show
Series [01/26] fs: add an iopoll method to struct file_operations | expand

Commit Message

Jens Axboe Dec. 4, 2018, 11:37 p.m. UTC
Add polled variants of PREAD/PREADV and PWRITE/PWRITEV. These act
like their non-polled counterparts, except we expect to poll for
completion of them. The polling happens at io_getevent() time, and
works just like non-polled IO.

To setup an io_context for polled IO, the application must call
io_setup2() with IOCTX_FLAG_IOPOLL as one of the flags. It is illegal
to mix and match polled and non-polled IO on an io_context.

Polled IO doesn't support the user mapped completion ring. Events
must be reaped through the io_getevents() system call. For non-irq
driven poll devices, there's no way to support completion reaping
from userspace by just looking at the ring. The application itself
is the one that pulls completion entries.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c                     | 393 +++++++++++++++++++++++++++++++----
 include/uapi/linux/aio_abi.h |   3 +
 2 files changed, 360 insertions(+), 36 deletions(-)

Comments

Benny Halevy Dec. 5, 2018, 9:56 a.m. UTC | #1
On Tue, 2018-12-04 at 16:37 -0700, Jens Axboe wrote:
> Add polled variants of PREAD/PREADV and PWRITE/PWRITEV. These act
> like their non-polled counterparts, except we expect to poll for
> completion of them. The polling happens at io_getevent() time, and
> works just like non-polled IO.
> 
> To setup an io_context for polled IO, the application must call
> io_setup2() with IOCTX_FLAG_IOPOLL as one of the flags. It is illegal
> to mix and match polled and non-polled IO on an io_context.
> 
> Polled IO doesn't support the user mapped completion ring. Events
> must be reaped through the io_getevents() system call. For non-irq
> driven poll devices, there's no way to support completion reaping
> from userspace by just looking at the ring. The application itself
> is the one that pulls completion entries.
> 
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/aio.c                     | 393 +++++++++++++++++++++++++++++++----
>  include/uapi/linux/aio_abi.h |   3 +
>  2 files changed, 360 insertions(+), 36 deletions(-)
> 
> diff --git a/fs/aio.c b/fs/aio.c
> index bb6f07ca6940..5d34317c2929 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -153,6 +153,18 @@ struct kioctx {
>  		atomic_t	reqs_available;
>  	} ____cacheline_aligned_in_smp;
>  
> +	/* iopoll submission state */
> +	struct {
> +		spinlock_t poll_lock;
> +		struct list_head poll_submitted;
> +	} ____cacheline_aligned_in_smp;
> +
> +	/* iopoll completion state */
> +	struct {
> +		struct list_head poll_completing;
> +		struct mutex getevents_lock;
> +	} ____cacheline_aligned_in_smp;
> +
>  	struct {
>  		spinlock_t	ctx_lock;
>  		struct list_head active_reqs;	/* used for cancellation */
> @@ -205,14 +217,27 @@ struct aio_kiocb {
>  	__u64			ki_user_data;	/* user's data for completion */
>  
>  	struct list_head	ki_list;	/* the aio core uses this
> -						 * for cancellation */
> +						 * for cancellation, or for
> +						 * polled IO */
> +
> +	unsigned long		ki_flags;
> +#define IOCB_POLL_COMPLETED	0	/* polled IO has completed */
> +#define IOCB_POLL_EAGAIN	1	/* polled submission got EAGAIN */
> +
>  	refcount_t		ki_refcnt;
>  
> -	/*
> -	 * If the aio_resfd field of the userspace iocb is not zero,
> -	 * this is the underlying eventfd context to deliver events to.
> -	 */
> -	struct eventfd_ctx	*ki_eventfd;
> +	union {
> +		/*
> +		 * If the aio_resfd field of the userspace iocb is not zero,
> +		 * this is the underlying eventfd context to deliver events to.
> +		 */
> +		struct eventfd_ctx	*ki_eventfd;
> +
> +		/*
> +		 * For polled IO, stash completion info here
> +		 */
> +		struct io_event		ki_ev;
> +	};
>  };
>  
>  /*------ sysctl variables----*/
> @@ -233,6 +258,7 @@ static const unsigned int iocb_page_shift =
>  				ilog2(PAGE_SIZE / sizeof(struct iocb));
>  
>  static void aio_useriocb_unmap(struct kioctx *);
> +static void aio_iopoll_reap_events(struct kioctx *);
>  
>  static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
>  {
> @@ -471,11 +497,15 @@ static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
>  	int i;
>  	struct file *file;
>  
> -	/* Compensate for the ring buffer's head/tail overlap entry */
> -	nr_events += 2;	/* 1 is required, 2 for good luck */
> -
> +	/*
> +	 * Compensate for the ring buffer's head/tail overlap entry.
> +	 * IO polling doesn't require any io event entries
> +	 */
>  	size = sizeof(struct aio_ring);
> -	size += sizeof(struct io_event) * nr_events;
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
> +		nr_events += 2;	/* 1 is required, 2 for good luck */
> +		size += sizeof(struct io_event) * nr_events;
> +	}
>  
>  	nr_pages = PFN_UP(size);
>  	if (nr_pages < 0)
> @@ -758,6 +788,11 @@ static struct kioctx *io_setup_flags(unsigned long ctxid,
>  
>  	INIT_LIST_HEAD(&ctx->active_reqs);
>  
> +	spin_lock_init(&ctx->poll_lock);
> +	INIT_LIST_HEAD(&ctx->poll_submitted);
> +	INIT_LIST_HEAD(&ctx->poll_completing);
> +	mutex_init(&ctx->getevents_lock);
> +
>  	if (percpu_ref_init(&ctx->users, free_ioctx_users, 0, GFP_KERNEL))
>  		goto err;
>  
> @@ -829,11 +864,15 @@ static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx,
>  {
>  	struct kioctx_table *table;
>  
> +	mutex_lock(&ctx->getevents_lock);
>  	spin_lock(&mm->ioctx_lock);
>  	if (atomic_xchg(&ctx->dead, 1)) {
>  		spin_unlock(&mm->ioctx_lock);
> +		mutex_unlock(&ctx->getevents_lock);
>  		return -EINVAL;
>  	}
> +	aio_iopoll_reap_events(ctx);
> +	mutex_unlock(&ctx->getevents_lock);
>  
>  	table = rcu_dereference_raw(mm->ioctx_table);
>  	WARN_ON(ctx != rcu_access_pointer(table->table[ctx->id]));
> @@ -1042,6 +1081,7 @@ static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
>  	percpu_ref_get(&ctx->reqs);
>  	req->ki_ctx = ctx;
>  	INIT_LIST_HEAD(&req->ki_list);
> +	req->ki_flags = 0;
>  	refcount_set(&req->ki_refcnt, 0);
>  	req->ki_eventfd = NULL;
>  	return req;
> @@ -1083,6 +1123,15 @@ static inline void iocb_put(struct aio_kiocb *iocb)
>  	}
>  }
>  
> +static void iocb_put_many(struct kioctx *ctx, void **iocbs, int *nr)
> +{
> +	if (*nr) {
> +		percpu_ref_put_many(&ctx->reqs, *nr);
> +		kmem_cache_free_bulk(kiocb_cachep, *nr, iocbs);
> +		*nr = 0;
> +	}
> +}
> +
>  static void aio_fill_event(struct io_event *ev, struct aio_kiocb *iocb,
>  			   long res, long res2)
>  {
> @@ -1272,6 +1321,182 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
>  	return ret < 0 || *i >= min_nr;
>  }
>  
> +#define AIO_IOPOLL_BATCH	8
> +
> +/*
> + * Process completed iocb iopoll entries, copying the result to userspace.
> + */
> +static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
> +			    unsigned int *nr_events, long max)
> +{
> +	void *iocbs[AIO_IOPOLL_BATCH];
> +	struct aio_kiocb *iocb, *n;
> +	int to_free = 0, ret = 0;
> +
> +	/* Shouldn't happen... */
> +	if (*nr_events >= max)
> +		return 0;
> +
> +	list_for_each_entry_safe(iocb, n, &ctx->poll_completing, ki_list) {
> +		if (*nr_events == max)
> +			break;
> +		if (!test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
> +			continue;
> +		if (to_free == AIO_IOPOLL_BATCH)
> +			iocb_put_many(ctx, iocbs, &to_free);
> +
> +		list_del(&iocb->ki_list);
> +		iocbs[to_free++] = iocb;
> +
> +		fput(iocb->rw.ki_filp);
> +
> +		if (evs && copy_to_user(evs + *nr_events, &iocb->ki_ev,
> +		    sizeof(iocb->ki_ev))) {
> +			ret = -EFAULT;
> +			break;
> +		}
> +		(*nr_events)++;
> +	}
> +
> +	if (to_free)
> +		iocb_put_many(ctx, iocbs, &to_free);
> +
> +	return ret;
> +}
> +
> +static int aio_iopoll_getevents(struct kioctx *ctx,
> +				struct io_event __user *event,
> +				unsigned int *nr_events, long min, long max)
> +{
> +	struct aio_kiocb *iocb;
> +	int to_poll, polled, ret;
> +
> +	/*
> +	 * Check if we already have done events that satisfy what we need
> +	 */
> +	if (!list_empty(&ctx->poll_completing)) {
> +		ret = aio_iopoll_reap(ctx, event, nr_events, max);
> +		if (ret < 0)
> +			return ret;
> +		/* if min is zero, still go through a poll check */

A function-level comment about that would be more visible.

> +		if (min && *nr_events >= min)
> 
> +			return 0;

if ((min && *nr_events >= min) || *nr_events >= max) ?

Otherwise, if poll_completing or poll_submitted are not empty,
besides getting to the "Shouldn't happen" case in aio_iopoll_reap,
it looks like we're gonna waste some cycles and never call f_op->iopoll

> +	}
> +
> +	/*
> +	 * Take in a new working set from the submitted list, if possible.
> +	 */
> +	if (!list_empty_careful(&ctx->poll_submitted)) {
> +		spin_lock(&ctx->poll_lock);
> +		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> +		spin_unlock(&ctx->poll_lock);
> +	}
> +
> +	if (list_empty(&ctx->poll_completing))
> +		return 0;
> +
> +	/*
> +	 * Check again now that we have a new batch.
> +	 */
> +	ret = aio_iopoll_reap(ctx, event, nr_events, max);
> +	if (ret < 0)
> +		return ret;
> +	/* if min is zero, still go through a poll check */
> +	if (min && *nr_events >= min)
> +		return 0;

ditto

> +
> +	/*
> +	 * Find up to 'max' worth of events to poll for, including the
> +	 * events we already successfully polled
> +	 */
> +	polled = to_poll = 0;
> +	list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
> +		/*
> +		 * Poll for needed events with spin == true, anything after
> +		 * that we just check if we have more, up to max.
> +		 */
> +		bool spin = polled + *nr_events >= min;

I'm confused. Shouldn't spin == true if polled + *nr_events < min?

> +		struct kiocb *kiocb = &iocb->rw;
> +
> +		if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
> +			break;
> +		if (++to_poll + *nr_events > max)
> +			break;
> +
> +		ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
> +		if (ret < 0)
> +			return ret;
> +
> +		polled += ret;
> +		if (polled + *nr_events >= max)
> +			break;
> +	}
> +
> +	ret = aio_iopoll_reap(ctx, event, nr_events, max);
> +	if (ret < 0)
> +		return ret;
> +	if (*nr_events >= min)
> +		return 0;
> +	return to_poll;
> +}
> +
> +/*
> + * We can't just wait for polled events to come to us, we have to actively
> + * find and complete them.
> + */
> +static void aio_iopoll_reap_events(struct kioctx *ctx)
> +{
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +		return;
> +
> +	while (!list_empty_careful(&ctx->poll_submitted) ||
> +	       !list_empty(&ctx->poll_completing)) {
> +		unsigned int nr_events = 0;
> +
> +		aio_iopoll_getevents(ctx, NULL, &nr_events, 1, UINT_MAX);
> +	}
> +}
> +
> +static int __aio_iopoll_check(struct kioctx *ctx, struct io_event __user *event,
> +			      unsigned int *nr_events, long min_nr, long max_nr)
> +{
> +	int ret = 0;
> +
> +	while (!*nr_events || !need_resched()) {
> +		int tmin = 0;
> +
> +		if (*nr_events < min_nr)
> +			tmin = min_nr - *nr_events;

Otherwise, shouldn't the function just return 0?

Or perhaps you explicitly want to go through the tmin==0 case
in aio_iopoll_getevents if *nr_events == min_nr (or min_nr==0)?

> +
> +		ret = aio_iopoll_getevents(ctx, event, nr_events, tmin, max_nr);
> +		if (ret <= 0)
> +			break;
> +		ret = 0;
> +	}
> +
> +	return ret;
> +}
> +
> +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
> +			    struct io_event __user *event)
> +{
> +	unsigned int nr_events = 0;
> +	int ret;
> +
> +	/* Only allow one thread polling at a time */
> +	if (!mutex_trylock(&ctx->getevents_lock))
> +		return -EBUSY;
> +	if (unlikely(atomic_read(&ctx->dead))) {
> +		ret = -EINVAL;
> +		goto err;
> +	}
> +
> +	ret = __aio_iopoll_check(ctx, event, &nr_events, min_nr, nr);
> +err:
> +	mutex_unlock(&ctx->getevents_lock);
> +	return nr_events ? nr_events : ret;
> +}
> +
>  static long read_events(struct kioctx *ctx, long min_nr, long nr,
>  			struct io_event __user *event,
>  			ktime_t until)
> @@ -1375,7 +1600,7 @@ SYSCALL_DEFINE6(io_setup2, u32, nr_events, u32, flags, struct iocb __user *,
>  
>  	if (user1 || user2)
>  		return -EINVAL;
> -	if (flags & ~IOCTX_FLAG_USERIOCB)
> +	if (flags & ~(IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL))
>  		return -EINVAL;
>  
>  	ret = get_user(ctx, ctxp);
> @@ -1509,13 +1734,8 @@ static void aio_remove_iocb(struct aio_kiocb *iocb)
>  	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
>  }
>  
> -static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
> +static void kiocb_end_write(struct kiocb *kiocb)
>  {
> -	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
> -
> -	if (!list_empty_careful(&iocb->ki_list))
> -		aio_remove_iocb(iocb);
> -
>  	if (kiocb->ki_flags & IOCB_WRITE) {
>  		struct inode *inode = file_inode(kiocb->ki_filp);
>  
> @@ -1527,19 +1747,48 @@ static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
>  			__sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
>  		file_end_write(kiocb->ki_filp);
>  	}
> +}
> +
> +static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
> +{
> +	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
> +
> +	if (!list_empty_careful(&iocb->ki_list))
> +		aio_remove_iocb(iocb);
> +
> +	kiocb_end_write(kiocb);
>  
>  	fput(kiocb->ki_filp);
>  	aio_complete(iocb, res, res2);
>  }
>  
> -static int aio_prep_rw(struct kiocb *req, const struct iocb *iocb)
> +static void aio_complete_rw_poll(struct kiocb *kiocb, long res, long res2)
>  {
> +	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
> +
> +	kiocb_end_write(kiocb);
> +
> +	/*
> +	 * Handle EAGAIN from resource limits with polled IO inline, don't
> +	 * pass the event back to userspace.
> +	 */
> +	if (unlikely(res == -EAGAIN))
> +		set_bit(IOCB_POLL_EAGAIN, &iocb->ki_flags);
> +	else {
> +		aio_fill_event(&iocb->ki_ev, iocb, res, res2);
> +		set_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags);
> +	}
> +}
> +
> +static int aio_prep_rw(struct aio_kiocb *kiocb, const struct iocb *iocb)
> +{
> +	struct kioctx *ctx = kiocb->ki_ctx;
> +	struct kiocb *req = &kiocb->rw;
>  	int ret;
>  
>  	req->ki_filp = fget(iocb->aio_fildes);
>  	if (unlikely(!req->ki_filp))
>  		return -EBADF;
> -	req->ki_complete = aio_complete_rw;
>  	req->ki_pos = iocb->aio_offset;
>  	req->ki_flags = iocb_flags(req->ki_filp);
>  	if (iocb->aio_flags & IOCB_FLAG_RESFD)
> @@ -1565,9 +1814,35 @@ static int aio_prep_rw(struct kiocb *req, const struct iocb *iocb)
>  	if (unlikely(ret))
>  		goto out_fput;
>  
> -	req->ki_flags &= ~IOCB_HIPRI; /* no one is going to poll for this I/O */
> -	return 0;
> +	if (iocb->aio_flags & IOCB_FLAG_HIPRI) {
> +		/* shares space in the union, and is rather pointless.. */
> +		ret = -EINVAL;
> +		if (iocb->aio_flags & IOCB_FLAG_RESFD)
> +			goto out_fput;
> +
> +		/* can't submit polled IO to a non-polled ctx */
> +		if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +			goto out_fput;
> +
> +		ret = -EOPNOTSUPP;
> +		if (!(req->ki_flags & IOCB_DIRECT) ||
> +		    !req->ki_filp->f_op->iopoll)
> +			goto out_fput;
> +
> +		req->ki_flags |= IOCB_HIPRI;
> +		req->ki_complete = aio_complete_rw_poll;
> +	} else {
> +		/* can't submit non-polled IO to a polled ctx */
> +		ret = -EINVAL;
> +		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> +			goto out_fput;
>  
> +		/* no one is going to poll for this I/O */
> +		req->ki_flags &= ~IOCB_HIPRI;
> +		req->ki_complete = aio_complete_rw;
> +	}
> +
> +	return 0;
>  out_fput:
>  	fput(req->ki_filp);
>  	return ret;
> @@ -1612,15 +1887,40 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
>  	}
>  }
>  
> -static ssize_t aio_read(struct kiocb *req, const struct iocb *iocb,
> +/*
> + * After the iocb has been issued, it's safe to be found on the poll list.
> + * Adding the kiocb to the list AFTER submission ensures that we don't
> + * find it from a io_getevents() thread before the issuer is done accessing
> + * the kiocb cookie.
> + */
> +static void aio_iopoll_iocb_issued(struct aio_kiocb *kiocb)
> +{
> +	/*
> +	 * For fast devices, IO may have already completed. If it has, add
> +	 * it to the front so we find it first. We can't add to the poll_done
> +	 * list as that's unlocked from the completion side.
> +	 */
> +	const int front_add = test_bit(IOCB_POLL_COMPLETED, &kiocb->ki_flags);
> +	struct kioctx *ctx = kiocb->ki_ctx;
> +
> +	spin_lock(&ctx->poll_lock);
> +	if (front_add)
> +		list_add(&kiocb->ki_list, &ctx->poll_submitted);
> +	else
> +		list_add_tail(&kiocb->ki_list, &ctx->poll_submitted);
> +	spin_unlock(&ctx->poll_lock);
> +}
> +
> +static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
>  			bool vectored, bool compat)
>  {
>  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
> +	struct kiocb *req = &kiocb->rw;
>  	struct iov_iter iter;
>  	struct file *file;
>  	ssize_t ret;
>  
> -	ret = aio_prep_rw(req, iocb);
> +	ret = aio_prep_rw(kiocb, iocb);
>  	if (ret)
>  		return ret;
>  	file = req->ki_filp;
> @@ -1645,15 +1945,16 @@ static ssize_t aio_read(struct kiocb *req, const struct iocb *iocb,
>  	return ret;
>  }
>  
> -static ssize_t aio_write(struct kiocb *req, const struct iocb *iocb,
> +static ssize_t aio_write(struct aio_kiocb *kiocb, const struct iocb *iocb,
>  			 bool vectored, bool compat)
>  {
>  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
> +	struct kiocb *req = &kiocb->rw;
>  	struct iov_iter iter;
>  	struct file *file;
>  	ssize_t ret;
>  
> -	ret = aio_prep_rw(req, iocb);
> +	ret = aio_prep_rw(kiocb, iocb);
>  	if (ret)
>  		return ret;
>  	file = req->ki_filp;
> @@ -1924,7 +2225,8 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
>  		return -EINVAL;
>  	}
>  
> -	if (!get_reqs_available(ctx))
> +	/* Poll IO doesn't need ring reservations */
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL) && !get_reqs_available(ctx))
>  		return -EAGAIN;
>  
>  	ret = -EAGAIN;
> @@ -1947,8 +2249,8 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
>  		}
>  	}
>  
> -	/* Don't support cancel on user mapped iocbs */
> -	if (!(ctx->flags & IOCTX_FLAG_USERIOCB)) {
> +	/* Don't support cancel on user mapped iocbs or polled context */
> +	if (!(ctx->flags & (IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL))) {
>  		ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
>  		if (unlikely(ret)) {
>  			pr_debug("EFAULT: aio_key\n");
> @@ -1959,26 +2261,33 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
>  	req->ki_user_iocb = user_iocb;
>  	req->ki_user_data = iocb->aio_data;
>  
> +	ret = -EINVAL;
>  	switch (iocb->aio_lio_opcode) {
>  	case IOCB_CMD_PREAD:
> -		ret = aio_read(&req->rw, iocb, false, compat);
> +		ret = aio_read(req, iocb, false, compat);
>  		break;
>  	case IOCB_CMD_PWRITE:
> -		ret = aio_write(&req->rw, iocb, false, compat);
> +		ret = aio_write(req, iocb, false, compat);
>  		break;
>  	case IOCB_CMD_PREADV:
> -		ret = aio_read(&req->rw, iocb, true, compat);
> +		ret = aio_read(req, iocb, true, compat);
>  		break;
>  	case IOCB_CMD_PWRITEV:
> -		ret = aio_write(&req->rw, iocb, true, compat);
> +		ret = aio_write(req, iocb, true, compat);
>  		break;
>  	case IOCB_CMD_FSYNC:
> +		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> +			break;
>  		ret = aio_fsync(&req->fsync, iocb, false);
>  		break;
>  	case IOCB_CMD_FDSYNC:
> +		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> +			break;
>  		ret = aio_fsync(&req->fsync, iocb, true);
>  		break;
>  	case IOCB_CMD_POLL:
> +		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> +			break;
>  		ret = aio_poll(req, iocb);
>  		break;
>  	default:
> @@ -1994,13 +2303,21 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
>  	 */
>  	if (ret)
>  		goto out_put_req;
> +	if (ctx->flags & IOCTX_FLAG_IOPOLL) {
> +		if (test_bit(IOCB_POLL_EAGAIN, &req->ki_flags)) {
> +			ret = -EAGAIN;
> +			goto out_put_req;
> +		}
> +		aio_iopoll_iocb_issued(req);
> +	}
>  	return 0;
>  out_put_req:
>  	if (req->ki_eventfd)
>  		eventfd_ctx_put(req->ki_eventfd);
>  	iocb_put(req);
>  out_put_reqs_available:
> -	put_reqs_available(ctx, 1);
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +		put_reqs_available(ctx, 1);
>  	return ret;
>  }
>  
> @@ -2166,7 +2483,7 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
>  	if (unlikely(!ctx))
>  		return -EINVAL;
>  
> -	if (ctx->flags & IOCTX_FLAG_USERIOCB)
> +	if (ctx->flags & (IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL))
>  		goto err;
>  
>  	spin_lock_irq(&ctx->ctx_lock);
> @@ -2201,8 +2518,12 @@ static long do_io_getevents(aio_context_t ctx_id,
>  	long ret = -EINVAL;
>  
>  	if (likely(ioctx)) {
> -		if (likely(min_nr <= nr && min_nr >= 0))
> -			ret = read_events(ioctx, min_nr, nr, events, until);
> +		if (likely(min_nr <= nr && min_nr >= 0)) {
> +			if (ioctx->flags & IOCTX_FLAG_IOPOLL)
> +				ret = aio_iopoll_check(ioctx, min_nr, nr, events);
> +			else
> +				ret = read_events(ioctx, min_nr, nr, events, until);
> +		}
>  		percpu_ref_put(&ioctx->users);
>  	}
>  
> diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
> index 814e6606c413..ea0b9a19f4df 100644
> --- a/include/uapi/linux/aio_abi.h
> +++ b/include/uapi/linux/aio_abi.h
> @@ -52,9 +52,11 @@ enum {
>   *                   is valid.
>   * IOCB_FLAG_IOPRIO - Set if the "aio_reqprio" member of the "struct iocb"
>   *                    is valid.
> + * IOCB_FLAG_HIPRI - Use IO completion polling
>   */
>  #define IOCB_FLAG_RESFD		(1 << 0)
>  #define IOCB_FLAG_IOPRIO	(1 << 1)
> +#define IOCB_FLAG_HIPRI		(1 << 2)
>  
>  /* read() from /dev/aio returns these structures. */
>  struct io_event {
> @@ -107,6 +109,7 @@ struct iocb {
>  }; /* 64 bytes */
>  
>  #define IOCTX_FLAG_USERIOCB	(1 << 0)	/* iocbs are user mapped */
> +#define IOCTX_FLAG_IOPOLL	(1 << 1)	/* io_context is polled */
>  
>  #undef IFBIG
>  #undef IFLITTLE
Jens Axboe Dec. 5, 2018, 1:10 p.m. UTC | #2
On 12/5/18 2:56 AM, Benny Halevy wrote:
>> +static int aio_iopoll_getevents(struct kioctx *ctx,
>> +				struct io_event __user *event,
>> +				unsigned int *nr_events, long min, long max)
>> +{
>> +	struct aio_kiocb *iocb;
>> +	int to_poll, polled, ret;
>> +
>> +	/*
>> +	 * Check if we already have done events that satisfy what we need
>> +	 */
>> +	if (!list_empty(&ctx->poll_completing)) {
>> +		ret = aio_iopoll_reap(ctx, event, nr_events, max);
>> +		if (ret < 0)
>> +			return ret;
>> +		/* if min is zero, still go through a poll check */
> 
> A function-level comment about that would be more visible.

Yes, that might be a better idea.

>> +		if (min && *nr_events >= min)
>>
>> +			return 0;
> 
> if ((min && *nr_events >= min) || *nr_events >= max) ?
> 
> Otherwise, if poll_completing or poll_submitted are not empty,
> besides getting to the "Shouldn't happen" case in aio_iopoll_reap,
> it looks like we're gonna waste some cycles and never call f_op->iopoll

Agree, that would be a better place to catch it. I'll make those two
changes, thanks!

>> +
>> +	/*
>> +	 * Find up to 'max' worth of events to poll for, including the
>> +	 * events we already successfully polled
>> +	 */
>> +	polled = to_poll = 0;
>> +	list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
>> +		/*
>> +		 * Poll for needed events with spin == true, anything after
>> +		 * that we just check if we have more, up to max.
>> +		 */
>> +		bool spin = polled + *nr_events >= min;
> 
> I'm confused. Shouldn't spin == true if polled + *nr_events < min?

Heh, that does look off, and it is. I think the correct condition is:

bool spin = !polled || *nr_events < min;

and I'm not sure why that got broken. I just tested it, slight
performance win as expected correcting this. It ties in with the above
nr_events check - it's perfectly valid to pass min_nr == 0 and just do a
poll check. Conversely, if we already spun, just do non-spin checks on
followup poll checks.

>> +static int __aio_iopoll_check(struct kioctx *ctx, struct io_event __user *event,
>> +			      unsigned int *nr_events, long min_nr, long max_nr)
>> +{
>> +	int ret = 0;
>> +
>> +	while (!*nr_events || !need_resched()) {
>> +		int tmin = 0;
>> +
>> +		if (*nr_events < min_nr)
>> +			tmin = min_nr - *nr_events;
> 
> Otherwise, shouldn't the function just return 0?
> 
> Or perhaps you explicitly want to go through the tmin==0 case
> in aio_iopoll_getevents if *nr_events == min_nr (or min_nr==0)?

No, we need to go through poll, if min_nr == 0, but only if min_nr == 0
and !nr_events. But we only get here for nr_events != 0, so should be
fine as-is.

Thanks for your review, very useful! I'll make the above changes right
now.
Benny Halevy Dec. 6, 2018, 9:04 a.m. UTC | #3
On Wed, 2018-12-05 at 06:10 -0700, Jens Axboe wrote:
> On 12/5/18 2:56 AM, Benny Halevy wrote:
> > > +static int aio_iopoll_getevents(struct kioctx *ctx,
> > > +				struct io_event __user *event,
> > > +				unsigned int *nr_events, long min, long max)
> > > +{
> > > +	struct aio_kiocb *iocb;
> > > +	int to_poll, polled, ret;
> > > +
> > > +	/*
> > > +	 * Check if we already have done events that satisfy what we need
> > > +	 */
> > > +	if (!list_empty(&ctx->poll_completing)) {
> > > +		ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > +		if (ret < 0)
> > > +			return ret;
> > > +		/* if min is zero, still go through a poll check */
> > 
> > A function-level comment about that would be more visible.
> 
> Yes, that might be a better idea.
> 
> > > +		if (min && *nr_events >= min)
> > > 
> > > +			return 0;
> > 
> > if ((min && *nr_events >= min) || *nr_events >= max) ?
> > 
> > Otherwise, if poll_completing or poll_submitted are not empty,
> > besides getting to the "Shouldn't happen" case in aio_iopoll_reap,
> > it looks like we're gonna waste some cycles and never call f_op->iopoll
> 
> Agree, that would be a better place to catch it. I'll make those two
> changes, thanks!
> 
> > > +
> > > +	/*
> > > +	 * Find up to 'max' worth of events to poll for, including the
> > > +	 * events we already successfully polled
> > > +	 */
> > > +	polled = to_poll = 0;
> > > +	list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
> > > +		/*
> > > +		 * Poll for needed events with spin == true, anything after
> > > +		 * that we just check if we have more, up to max.
> > > +		 */
> > > +		bool spin = polled + *nr_events >= min;
> > 
> > I'm confused. Shouldn't spin == true if polled + *nr_events < min?
> 
> Heh, that does look off, and it is. I think the correct condition is:
> 
> bool spin = !polled || *nr_events < min;
> 
> and I'm not sure why that got broken. I just tested it, slight
> performance win as expected correcting this. It ties in with the above
> nr_events check - it's perfectly valid to pass min_nr == 0 and just do a
> poll check. Conversely, if we already spun, just do non-spin checks on
> followup poll checks.
> 

Yep, that makes sense.

> > > +static int __aio_iopoll_check(struct kioctx *ctx, struct io_event __user *event,
> > > +			      unsigned int *nr_events, long min_nr, long max_nr)
> > > +{
> > > +	int ret = 0;
> > > +
> > > +	while (!*nr_events || !need_resched()) {
> > > +		int tmin = 0;
> > > +
> > > +		if (*nr_events < min_nr)
> > > +			tmin = min_nr - *nr_events;
> > 
> > Otherwise, shouldn't the function just return 0?
> > 
> > Or perhaps you explicitly want to go through the tmin==0 case
> > in aio_iopoll_getevents if *nr_events == min_nr (or min_nr==0)?
> 
> No, we need to go through poll, if min_nr == 0, but only if min_nr == 0
> and !nr_events. But we only get here for nr_events != 0, so should be
> fine as-is.
> 
> Thanks for your review, very useful! I'll make the above changes right
> now.
> 

Thanks!
diff mbox series

Patch

diff --git a/fs/aio.c b/fs/aio.c
index bb6f07ca6940..5d34317c2929 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -153,6 +153,18 @@  struct kioctx {
 		atomic_t	reqs_available;
 	} ____cacheline_aligned_in_smp;
 
+	/* iopoll submission state */
+	struct {
+		spinlock_t poll_lock;
+		struct list_head poll_submitted;
+	} ____cacheline_aligned_in_smp;
+
+	/* iopoll completion state */
+	struct {
+		struct list_head poll_completing;
+		struct mutex getevents_lock;
+	} ____cacheline_aligned_in_smp;
+
 	struct {
 		spinlock_t	ctx_lock;
 		struct list_head active_reqs;	/* used for cancellation */
@@ -205,14 +217,27 @@  struct aio_kiocb {
 	__u64			ki_user_data;	/* user's data for completion */
 
 	struct list_head	ki_list;	/* the aio core uses this
-						 * for cancellation */
+						 * for cancellation, or for
+						 * polled IO */
+
+	unsigned long		ki_flags;
+#define IOCB_POLL_COMPLETED	0	/* polled IO has completed */
+#define IOCB_POLL_EAGAIN	1	/* polled submission got EAGAIN */
+
 	refcount_t		ki_refcnt;
 
-	/*
-	 * If the aio_resfd field of the userspace iocb is not zero,
-	 * this is the underlying eventfd context to deliver events to.
-	 */
-	struct eventfd_ctx	*ki_eventfd;
+	union {
+		/*
+		 * If the aio_resfd field of the userspace iocb is not zero,
+		 * this is the underlying eventfd context to deliver events to.
+		 */
+		struct eventfd_ctx	*ki_eventfd;
+
+		/*
+		 * For polled IO, stash completion info here
+		 */
+		struct io_event		ki_ev;
+	};
 };
 
 /*------ sysctl variables----*/
@@ -233,6 +258,7 @@  static const unsigned int iocb_page_shift =
 				ilog2(PAGE_SIZE / sizeof(struct iocb));
 
 static void aio_useriocb_unmap(struct kioctx *);
+static void aio_iopoll_reap_events(struct kioctx *);
 
 static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
 {
@@ -471,11 +497,15 @@  static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
 	int i;
 	struct file *file;
 
-	/* Compensate for the ring buffer's head/tail overlap entry */
-	nr_events += 2;	/* 1 is required, 2 for good luck */
-
+	/*
+	 * Compensate for the ring buffer's head/tail overlap entry.
+	 * IO polling doesn't require any io event entries
+	 */
 	size = sizeof(struct aio_ring);
-	size += sizeof(struct io_event) * nr_events;
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
+		nr_events += 2;	/* 1 is required, 2 for good luck */
+		size += sizeof(struct io_event) * nr_events;
+	}
 
 	nr_pages = PFN_UP(size);
 	if (nr_pages < 0)
@@ -758,6 +788,11 @@  static struct kioctx *io_setup_flags(unsigned long ctxid,
 
 	INIT_LIST_HEAD(&ctx->active_reqs);
 
+	spin_lock_init(&ctx->poll_lock);
+	INIT_LIST_HEAD(&ctx->poll_submitted);
+	INIT_LIST_HEAD(&ctx->poll_completing);
+	mutex_init(&ctx->getevents_lock);
+
 	if (percpu_ref_init(&ctx->users, free_ioctx_users, 0, GFP_KERNEL))
 		goto err;
 
@@ -829,11 +864,15 @@  static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx,
 {
 	struct kioctx_table *table;
 
+	mutex_lock(&ctx->getevents_lock);
 	spin_lock(&mm->ioctx_lock);
 	if (atomic_xchg(&ctx->dead, 1)) {
 		spin_unlock(&mm->ioctx_lock);
+		mutex_unlock(&ctx->getevents_lock);
 		return -EINVAL;
 	}
+	aio_iopoll_reap_events(ctx);
+	mutex_unlock(&ctx->getevents_lock);
 
 	table = rcu_dereference_raw(mm->ioctx_table);
 	WARN_ON(ctx != rcu_access_pointer(table->table[ctx->id]));
@@ -1042,6 +1081,7 @@  static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
 	percpu_ref_get(&ctx->reqs);
 	req->ki_ctx = ctx;
 	INIT_LIST_HEAD(&req->ki_list);
+	req->ki_flags = 0;
 	refcount_set(&req->ki_refcnt, 0);
 	req->ki_eventfd = NULL;
 	return req;
@@ -1083,6 +1123,15 @@  static inline void iocb_put(struct aio_kiocb *iocb)
 	}
 }
 
+static void iocb_put_many(struct kioctx *ctx, void **iocbs, int *nr)
+{
+	if (*nr) {
+		percpu_ref_put_many(&ctx->reqs, *nr);
+		kmem_cache_free_bulk(kiocb_cachep, *nr, iocbs);
+		*nr = 0;
+	}
+}
+
 static void aio_fill_event(struct io_event *ev, struct aio_kiocb *iocb,
 			   long res, long res2)
 {
@@ -1272,6 +1321,182 @@  static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
 	return ret < 0 || *i >= min_nr;
 }
 
+#define AIO_IOPOLL_BATCH	8
+
+/*
+ * Process completed iocb iopoll entries, copying the result to userspace.
+ */
+static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
+			    unsigned int *nr_events, long max)
+{
+	void *iocbs[AIO_IOPOLL_BATCH];
+	struct aio_kiocb *iocb, *n;
+	int to_free = 0, ret = 0;
+
+	/* Shouldn't happen... */
+	if (*nr_events >= max)
+		return 0;
+
+	list_for_each_entry_safe(iocb, n, &ctx->poll_completing, ki_list) {
+		if (*nr_events == max)
+			break;
+		if (!test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
+			continue;
+		if (to_free == AIO_IOPOLL_BATCH)
+			iocb_put_many(ctx, iocbs, &to_free);
+
+		list_del(&iocb->ki_list);
+		iocbs[to_free++] = iocb;
+
+		fput(iocb->rw.ki_filp);
+
+		if (evs && copy_to_user(evs + *nr_events, &iocb->ki_ev,
+		    sizeof(iocb->ki_ev))) {
+			ret = -EFAULT;
+			break;
+		}
+		(*nr_events)++;
+	}
+
+	if (to_free)
+		iocb_put_many(ctx, iocbs, &to_free);
+
+	return ret;
+}
+
+static int aio_iopoll_getevents(struct kioctx *ctx,
+				struct io_event __user *event,
+				unsigned int *nr_events, long min, long max)
+{
+	struct aio_kiocb *iocb;
+	int to_poll, polled, ret;
+
+	/*
+	 * Check if we already have done events that satisfy what we need
+	 */
+	if (!list_empty(&ctx->poll_completing)) {
+		ret = aio_iopoll_reap(ctx, event, nr_events, max);
+		if (ret < 0)
+			return ret;
+		/* if min is zero, still go through a poll check */
+		if (min && *nr_events >= min)
+			return 0;
+	}
+
+	/*
+	 * Take in a new working set from the submitted list, if possible.
+	 */
+	if (!list_empty_careful(&ctx->poll_submitted)) {
+		spin_lock(&ctx->poll_lock);
+		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
+		spin_unlock(&ctx->poll_lock);
+	}
+
+	if (list_empty(&ctx->poll_completing))
+		return 0;
+
+	/*
+	 * Check again now that we have a new batch.
+	 */
+	ret = aio_iopoll_reap(ctx, event, nr_events, max);
+	if (ret < 0)
+		return ret;
+	/* if min is zero, still go through a poll check */
+	if (min && *nr_events >= min)
+		return 0;
+
+	/*
+	 * Find up to 'max' worth of events to poll for, including the
+	 * events we already successfully polled
+	 */
+	polled = to_poll = 0;
+	list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
+		/*
+		 * Poll for needed events with spin == true, anything after
+		 * that we just check if we have more, up to max.
+		 */
+		bool spin = polled + *nr_events >= min;
+		struct kiocb *kiocb = &iocb->rw;
+
+		if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
+			break;
+		if (++to_poll + *nr_events > max)
+			break;
+
+		ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
+		if (ret < 0)
+			return ret;
+
+		polled += ret;
+		if (polled + *nr_events >= max)
+			break;
+	}
+
+	ret = aio_iopoll_reap(ctx, event, nr_events, max);
+	if (ret < 0)
+		return ret;
+	if (*nr_events >= min)
+		return 0;
+	return to_poll;
+}
+
+/*
+ * We can't just wait for polled events to come to us, we have to actively
+ * find and complete them.
+ */
+static void aio_iopoll_reap_events(struct kioctx *ctx)
+{
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+		return;
+
+	while (!list_empty_careful(&ctx->poll_submitted) ||
+	       !list_empty(&ctx->poll_completing)) {
+		unsigned int nr_events = 0;
+
+		aio_iopoll_getevents(ctx, NULL, &nr_events, 1, UINT_MAX);
+	}
+}
+
+static int __aio_iopoll_check(struct kioctx *ctx, struct io_event __user *event,
+			      unsigned int *nr_events, long min_nr, long max_nr)
+{
+	int ret = 0;
+
+	while (!*nr_events || !need_resched()) {
+		int tmin = 0;
+
+		if (*nr_events < min_nr)
+			tmin = min_nr - *nr_events;
+
+		ret = aio_iopoll_getevents(ctx, event, nr_events, tmin, max_nr);
+		if (ret <= 0)
+			break;
+		ret = 0;
+	}
+
+	return ret;
+}
+
+static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
+			    struct io_event __user *event)
+{
+	unsigned int nr_events = 0;
+	int ret;
+
+	/* Only allow one thread polling at a time */
+	if (!mutex_trylock(&ctx->getevents_lock))
+		return -EBUSY;
+	if (unlikely(atomic_read(&ctx->dead))) {
+		ret = -EINVAL;
+		goto err;
+	}
+
+	ret = __aio_iopoll_check(ctx, event, &nr_events, min_nr, nr);
+err:
+	mutex_unlock(&ctx->getevents_lock);
+	return nr_events ? nr_events : ret;
+}
+
 static long read_events(struct kioctx *ctx, long min_nr, long nr,
 			struct io_event __user *event,
 			ktime_t until)
@@ -1375,7 +1600,7 @@  SYSCALL_DEFINE6(io_setup2, u32, nr_events, u32, flags, struct iocb __user *,
 
 	if (user1 || user2)
 		return -EINVAL;
-	if (flags & ~IOCTX_FLAG_USERIOCB)
+	if (flags & ~(IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL))
 		return -EINVAL;
 
 	ret = get_user(ctx, ctxp);
@@ -1509,13 +1734,8 @@  static void aio_remove_iocb(struct aio_kiocb *iocb)
 	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 }
 
-static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
+static void kiocb_end_write(struct kiocb *kiocb)
 {
-	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
-
-	if (!list_empty_careful(&iocb->ki_list))
-		aio_remove_iocb(iocb);
-
 	if (kiocb->ki_flags & IOCB_WRITE) {
 		struct inode *inode = file_inode(kiocb->ki_filp);
 
@@ -1527,19 +1747,48 @@  static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
 			__sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
 		file_end_write(kiocb->ki_filp);
 	}
+}
+
+static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
+{
+	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
+
+	if (!list_empty_careful(&iocb->ki_list))
+		aio_remove_iocb(iocb);
+
+	kiocb_end_write(kiocb);
 
 	fput(kiocb->ki_filp);
 	aio_complete(iocb, res, res2);
 }
 
-static int aio_prep_rw(struct kiocb *req, const struct iocb *iocb)
+static void aio_complete_rw_poll(struct kiocb *kiocb, long res, long res2)
 {
+	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
+
+	kiocb_end_write(kiocb);
+
+	/*
+	 * Handle EAGAIN from resource limits with polled IO inline, don't
+	 * pass the event back to userspace.
+	 */
+	if (unlikely(res == -EAGAIN))
+		set_bit(IOCB_POLL_EAGAIN, &iocb->ki_flags);
+	else {
+		aio_fill_event(&iocb->ki_ev, iocb, res, res2);
+		set_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags);
+	}
+}
+
+static int aio_prep_rw(struct aio_kiocb *kiocb, const struct iocb *iocb)
+{
+	struct kioctx *ctx = kiocb->ki_ctx;
+	struct kiocb *req = &kiocb->rw;
 	int ret;
 
 	req->ki_filp = fget(iocb->aio_fildes);
 	if (unlikely(!req->ki_filp))
 		return -EBADF;
-	req->ki_complete = aio_complete_rw;
 	req->ki_pos = iocb->aio_offset;
 	req->ki_flags = iocb_flags(req->ki_filp);
 	if (iocb->aio_flags & IOCB_FLAG_RESFD)
@@ -1565,9 +1814,35 @@  static int aio_prep_rw(struct kiocb *req, const struct iocb *iocb)
 	if (unlikely(ret))
 		goto out_fput;
 
-	req->ki_flags &= ~IOCB_HIPRI; /* no one is going to poll for this I/O */
-	return 0;
+	if (iocb->aio_flags & IOCB_FLAG_HIPRI) {
+		/* shares space in the union, and is rather pointless.. */
+		ret = -EINVAL;
+		if (iocb->aio_flags & IOCB_FLAG_RESFD)
+			goto out_fput;
+
+		/* can't submit polled IO to a non-polled ctx */
+		if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+			goto out_fput;
+
+		ret = -EOPNOTSUPP;
+		if (!(req->ki_flags & IOCB_DIRECT) ||
+		    !req->ki_filp->f_op->iopoll)
+			goto out_fput;
+
+		req->ki_flags |= IOCB_HIPRI;
+		req->ki_complete = aio_complete_rw_poll;
+	} else {
+		/* can't submit non-polled IO to a polled ctx */
+		ret = -EINVAL;
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			goto out_fput;
 
+		/* no one is going to poll for this I/O */
+		req->ki_flags &= ~IOCB_HIPRI;
+		req->ki_complete = aio_complete_rw;
+	}
+
+	return 0;
 out_fput:
 	fput(req->ki_filp);
 	return ret;
@@ -1612,15 +1887,40 @@  static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
 	}
 }
 
-static ssize_t aio_read(struct kiocb *req, const struct iocb *iocb,
+/*
+ * After the iocb has been issued, it's safe to be found on the poll list.
+ * Adding the kiocb to the list AFTER submission ensures that we don't
+ * find it from a io_getevents() thread before the issuer is done accessing
+ * the kiocb cookie.
+ */
+static void aio_iopoll_iocb_issued(struct aio_kiocb *kiocb)
+{
+	/*
+	 * For fast devices, IO may have already completed. If it has, add
+	 * it to the front so we find it first. We can't add to the poll_done
+	 * list as that's unlocked from the completion side.
+	 */
+	const int front_add = test_bit(IOCB_POLL_COMPLETED, &kiocb->ki_flags);
+	struct kioctx *ctx = kiocb->ki_ctx;
+
+	spin_lock(&ctx->poll_lock);
+	if (front_add)
+		list_add(&kiocb->ki_list, &ctx->poll_submitted);
+	else
+		list_add_tail(&kiocb->ki_list, &ctx->poll_submitted);
+	spin_unlock(&ctx->poll_lock);
+}
+
+static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
 			bool vectored, bool compat)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
+	struct kiocb *req = &kiocb->rw;
 	struct iov_iter iter;
 	struct file *file;
 	ssize_t ret;
 
-	ret = aio_prep_rw(req, iocb);
+	ret = aio_prep_rw(kiocb, iocb);
 	if (ret)
 		return ret;
 	file = req->ki_filp;
@@ -1645,15 +1945,16 @@  static ssize_t aio_read(struct kiocb *req, const struct iocb *iocb,
 	return ret;
 }
 
-static ssize_t aio_write(struct kiocb *req, const struct iocb *iocb,
+static ssize_t aio_write(struct aio_kiocb *kiocb, const struct iocb *iocb,
 			 bool vectored, bool compat)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
+	struct kiocb *req = &kiocb->rw;
 	struct iov_iter iter;
 	struct file *file;
 	ssize_t ret;
 
-	ret = aio_prep_rw(req, iocb);
+	ret = aio_prep_rw(kiocb, iocb);
 	if (ret)
 		return ret;
 	file = req->ki_filp;
@@ -1924,7 +2225,8 @@  static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
 		return -EINVAL;
 	}
 
-	if (!get_reqs_available(ctx))
+	/* Poll IO doesn't need ring reservations */
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL) && !get_reqs_available(ctx))
 		return -EAGAIN;
 
 	ret = -EAGAIN;
@@ -1947,8 +2249,8 @@  static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
 		}
 	}
 
-	/* Don't support cancel on user mapped iocbs */
-	if (!(ctx->flags & IOCTX_FLAG_USERIOCB)) {
+	/* Don't support cancel on user mapped iocbs or polled context */
+	if (!(ctx->flags & (IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL))) {
 		ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
 		if (unlikely(ret)) {
 			pr_debug("EFAULT: aio_key\n");
@@ -1959,26 +2261,33 @@  static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
 	req->ki_user_iocb = user_iocb;
 	req->ki_user_data = iocb->aio_data;
 
+	ret = -EINVAL;
 	switch (iocb->aio_lio_opcode) {
 	case IOCB_CMD_PREAD:
-		ret = aio_read(&req->rw, iocb, false, compat);
+		ret = aio_read(req, iocb, false, compat);
 		break;
 	case IOCB_CMD_PWRITE:
-		ret = aio_write(&req->rw, iocb, false, compat);
+		ret = aio_write(req, iocb, false, compat);
 		break;
 	case IOCB_CMD_PREADV:
-		ret = aio_read(&req->rw, iocb, true, compat);
+		ret = aio_read(req, iocb, true, compat);
 		break;
 	case IOCB_CMD_PWRITEV:
-		ret = aio_write(&req->rw, iocb, true, compat);
+		ret = aio_write(req, iocb, true, compat);
 		break;
 	case IOCB_CMD_FSYNC:
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			break;
 		ret = aio_fsync(&req->fsync, iocb, false);
 		break;
 	case IOCB_CMD_FDSYNC:
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			break;
 		ret = aio_fsync(&req->fsync, iocb, true);
 		break;
 	case IOCB_CMD_POLL:
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			break;
 		ret = aio_poll(req, iocb);
 		break;
 	default:
@@ -1994,13 +2303,21 @@  static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
 	 */
 	if (ret)
 		goto out_put_req;
+	if (ctx->flags & IOCTX_FLAG_IOPOLL) {
+		if (test_bit(IOCB_POLL_EAGAIN, &req->ki_flags)) {
+			ret = -EAGAIN;
+			goto out_put_req;
+		}
+		aio_iopoll_iocb_issued(req);
+	}
 	return 0;
 out_put_req:
 	if (req->ki_eventfd)
 		eventfd_ctx_put(req->ki_eventfd);
 	iocb_put(req);
 out_put_reqs_available:
-	put_reqs_available(ctx, 1);
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+		put_reqs_available(ctx, 1);
 	return ret;
 }
 
@@ -2166,7 +2483,7 @@  SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
 	if (unlikely(!ctx))
 		return -EINVAL;
 
-	if (ctx->flags & IOCTX_FLAG_USERIOCB)
+	if (ctx->flags & (IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL))
 		goto err;
 
 	spin_lock_irq(&ctx->ctx_lock);
@@ -2201,8 +2518,12 @@  static long do_io_getevents(aio_context_t ctx_id,
 	long ret = -EINVAL;
 
 	if (likely(ioctx)) {
-		if (likely(min_nr <= nr && min_nr >= 0))
-			ret = read_events(ioctx, min_nr, nr, events, until);
+		if (likely(min_nr <= nr && min_nr >= 0)) {
+			if (ioctx->flags & IOCTX_FLAG_IOPOLL)
+				ret = aio_iopoll_check(ioctx, min_nr, nr, events);
+			else
+				ret = read_events(ioctx, min_nr, nr, events, until);
+		}
 		percpu_ref_put(&ioctx->users);
 	}
 
diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
index 814e6606c413..ea0b9a19f4df 100644
--- a/include/uapi/linux/aio_abi.h
+++ b/include/uapi/linux/aio_abi.h
@@ -52,9 +52,11 @@  enum {
  *                   is valid.
  * IOCB_FLAG_IOPRIO - Set if the "aio_reqprio" member of the "struct iocb"
  *                    is valid.
+ * IOCB_FLAG_HIPRI - Use IO completion polling
  */
 #define IOCB_FLAG_RESFD		(1 << 0)
 #define IOCB_FLAG_IOPRIO	(1 << 1)
+#define IOCB_FLAG_HIPRI		(1 << 2)
 
 /* read() from /dev/aio returns these structures. */
 struct io_event {
@@ -107,6 +109,7 @@  struct iocb {
 }; /* 64 bytes */
 
 #define IOCTX_FLAG_USERIOCB	(1 << 0)	/* iocbs are user mapped */
+#define IOCTX_FLAG_IOPOLL	(1 << 1)	/* io_context is polled */
 
 #undef IFBIG
 #undef IFLITTLE