diff mbox series

[16/22] aio: add support for submission/completion rings

Message ID 20181221192236.12866-17-axboe@kernel.dk (mailing list archive)
State New, archived
Headers show
Series [01/22] fs: add an iopoll method to struct file_operations | expand

Commit Message

Jens Axboe Dec. 21, 2018, 7:22 p.m. UTC
The submission queue (SQ) and completion queue (CQ) rings are shared
between the application and the kernel. This eliminates the need to
copy data back and forth to submit and complete IO. We use the same
structures as the old aio interface. The SQ rings are indexes into a
struct iocb array, like we would submit through io_submit(), and the
CQ rings are struct io_event, like we would pass in (and copy back)
from io_getevents().

A new system call is added for this, io_ring_enter(). This system call
submits IO that is stored in the SQ ring, and/or completes IO and stores
the results in the CQ ring. Hence it's possible to both complete and
submit IO in a single system call.

For IRQ driven IO, an application only needs to enter the kernel for
completions if it wants to wait for them to occur.

Sample application: http://git.kernel.dk/cgit/fio/plain/t/aio-ring.c

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 arch/x86/entry/syscalls/syscall_64.tbl |   1 +
 fs/aio.c                               | 485 +++++++++++++++++++++++--
 include/linux/syscalls.h               |   4 +-
 include/uapi/linux/aio_abi.h           |  29 ++
 kernel/sys_ni.c                        |   1 +
 5 files changed, 494 insertions(+), 26 deletions(-)

Comments

Christoph Hellwig Dec. 27, 2018, 1:47 p.m. UTC | #1
On Fri, Dec 21, 2018 at 12:22:30PM -0700, Jens Axboe wrote:
> The submission queue (SQ) and completion queue (CQ) rings are shared
> between the application and the kernel. This eliminates the need to
> copy data back and forth to submit and complete IO. We use the same
> structures as the old aio interface. The SQ rings are indexes into a
> struct iocb array, like we would submit through io_submit(), and the
> CQ rings are struct io_event, like we would pass in (and copy back)
> from io_getevents().
> 
> A new system call is added for this, io_ring_enter(). This system call
> submits IO that is stored in the SQ ring, and/or completes IO and stores
> the results in the CQ ring. Hence it's possible to both complete and
> submit IO in a single system call.

So this still reuses the aio interface, but I think that is a really
bad idea.  For one the aio_context_t which really is a chunk of memory
mapped into the user address space really make no sense at all here.

We'd much rather allocate a file descriptor using anon_inode_getfd
and operate on that.  That also means we can just close that fd
instead of needing the magic io_destroy, and save all the checks for
which kind of FD we operate on.

The big question to me is if we really need the polled version of
'classic aio'.  If not we could save io_setup2 and would just need
io_ring_setup and io_ring_enter as the new syscalls, and basically
avoid touching the old aio code entirely.

Also as another potential interface enhancement I think we should
consider pre-registering the files we want to do I/O to in the
io_ring_setup system call, thus avoiding fget/fput entirely
in io_ring_enter.  In general the set of files used for aio-like
operations is rather static and should be known at setup time,
in the worst case we might have to have a version of the setup
call that can modify the set up files (similar to how say
epoll works).

Also this whole API needs a better description, and a CC to the
linux-api list.

> +static void aio_commit_cqring(struct kioctx *ctx, unsigned next_tail)
> +{
> +	struct aio_cq_ring *ring = page_address(ctx->cq_ring.pages[0]);

I don't think we can use page_address here as the memory might be
highmem.  Same for all the other uses of page_address.

> +	range->pages = kzalloc(nr_pages * sizeof(struct page *), GFP_KERNEL);

This should use kcalloc.  Same for a few other instances.

> +static int __io_ring_enter(struct kioctx *ctx, unsigned int to_submit,
> +			   unsigned int min_complete, unsigned int flags)
> +{
> +	int ret = 0;
> +
> +	if (flags & IORING_FLAG_SUBMIT) {
> +		ret = aio_ring_submit(ctx, to_submit);
> +		if (ret < 0)
> +			return ret;
> +	}

I don't think we need the IORING_FLAG_SUBMIT flag - a non-zero
to_submit argument should be a good enough indicator.

Also this interface will need some cache flushing help, othewise
it won't work at all for architectures with VIVT caches.
Jens Axboe Jan. 2, 2019, 4:28 p.m. UTC | #2
On 12/27/18 6:47 AM, Christoph Hellwig wrote:
> On Fri, Dec 21, 2018 at 12:22:30PM -0700, Jens Axboe wrote:
>> The submission queue (SQ) and completion queue (CQ) rings are shared
>> between the application and the kernel. This eliminates the need to
>> copy data back and forth to submit and complete IO. We use the same
>> structures as the old aio interface. The SQ rings are indexes into a
>> struct iocb array, like we would submit through io_submit(), and the
>> CQ rings are struct io_event, like we would pass in (and copy back)
>> from io_getevents().
>>
>> A new system call is added for this, io_ring_enter(). This system call
>> submits IO that is stored in the SQ ring, and/or completes IO and stores
>> the results in the CQ ring. Hence it's possible to both complete and
>> submit IO in a single system call.
> 
> So this still reuses the aio interface, but I think that is a really
> bad idea.  For one the aio_context_t which really is a chunk of memory
> mapped into the user address space really make no sense at all here.

I don't think that's a big deal at all, the new ring interface just ends
up being a subset of it. We don't map the old user ring, for instance.

> We'd much rather allocate a file descriptor using anon_inode_getfd
> and operate on that.  That also means we can just close that fd
> instead of needing the magic io_destroy, and save all the checks for
> which kind of FD we operate on.

I'm not against doing something else for setup, but we still need some
way of passing in information about what we want. Things like ring
sizing. The actual rings we could mmap from known offsets if we went the
anon route. 

> The big question to me is if we really need the polled version of
> 'classic aio'.  If not we could save io_setup2 and would just need
> io_ring_setup and io_ring_enter as the new syscalls, and basically
> avoid touching the old aio code entirely.

Don't feel strongly about that part. I do like to continue using the aio
infrastructure instead of rewriting everything. And that means that
regular aio gets polling for free, essentially. So seems kind of silly
NOT to offer it as an option.

> Also as another potential interface enhancement I think we should
> consider pre-registering the files we want to do I/O to in the
> io_ring_setup system call, thus avoiding fget/fput entirely
> in io_ring_enter.  In general the set of files used for aio-like
> operations is rather static and should be known at setup time,
> in the worst case we might have to have a version of the setup
> call that can modify the set up files (similar to how say
> epoll works).

I agree, that would be a nice improvement to not have to get/put files
all the time.

> Also this whole API needs a better description, and a CC to the
> linux-api list.

Yep

>> +static void aio_commit_cqring(struct kioctx *ctx, unsigned next_tail)
>> +{
>> +	struct aio_cq_ring *ring = page_address(ctx->cq_ring.pages[0]);
> 
> I don't think we can use page_address here as the memory might be
> highmem.  Same for all the other uses of page_address.

Made changes to support that.

>> +	range->pages = kzalloc(nr_pages * sizeof(struct page *), GFP_KERNEL);
> 
> This should use kcalloc.  Same for a few other instances.

Done

>> +static int __io_ring_enter(struct kioctx *ctx, unsigned int to_submit,
>> +			   unsigned int min_complete, unsigned int flags)
>> +{
>> +	int ret = 0;
>> +
>> +	if (flags & IORING_FLAG_SUBMIT) {
>> +		ret = aio_ring_submit(ctx, to_submit);
>> +		if (ret < 0)
>> +			return ret;
>> +	}
> 
> I don't think we need the IORING_FLAG_SUBMIT flag - a non-zero
> to_submit argument should be a good enough indicator.

Made that change.

> Also this interface will need some cache flushing help, othewise
> it won't work at all for architectures with VIVT caches.

Fixed this up too.
Jeff Moyer Jan. 2, 2019, 7:11 p.m. UTC | #3
Jens Axboe <axboe@kernel.dk> writes:

> The submission queue (SQ) and completion queue (CQ) rings are shared
> between the application and the kernel. This eliminates the need to
> copy data back and forth to submit and complete IO. We use the same
> structures as the old aio interface. The SQ rings are indexes into a
> struct iocb array, like we would submit through io_submit(), and the
> CQ rings are struct io_event, like we would pass in (and copy back)
> from io_getevents().
>
> A new system call is added for this, io_ring_enter(). This system call
> submits IO that is stored in the SQ ring, and/or completes IO and stores
> the results in the CQ ring. Hence it's possible to both complete and
> submit IO in a single system call.
>
> For IRQ driven IO, an application only needs to enter the kernel for
> completions if it wants to wait for them to occur.

It looks like the memory for the submission and completion queues is
not migratable (scq rings are non-migratory!  [1]).  Maybe you should use
the aio pseudofs for allocations, like is done with the original aio
ring.  I realize that would change the api...

-Jeff

[1] https://www.youtube.com/watch?v=liIlW-ovx0Y
Jens Axboe Jan. 2, 2019, 8:32 p.m. UTC | #4
On 1/2/19 9:28 AM, Jens Axboe wrote:
>> We'd much rather allocate a file descriptor using anon_inode_getfd
>> and operate on that.  That also means we can just close that fd
>> instead of needing the magic io_destroy, and save all the checks for
>> which kind of FD we operate on.
> 
> I'm not against doing something else for setup, but we still need some
> way of passing in information about what we want. Things like ring
> sizing. The actual rings we could mmap from known offsets if we went the
> anon route. 

OK, so we could go the anon_inode_getfd() route and make it ioctls. Move
the gup stuff to mmap off that inode. I'll try and play with it a bit.
diff mbox series

Patch

diff --git a/arch/x86/entry/syscalls/syscall_64.tbl b/arch/x86/entry/syscalls/syscall_64.tbl
index 67c357225fb0..55a26700a637 100644
--- a/arch/x86/entry/syscalls/syscall_64.tbl
+++ b/arch/x86/entry/syscalls/syscall_64.tbl
@@ -344,6 +344,7 @@ 
 333	common	io_pgetevents		__x64_sys_io_pgetevents
 334	common	rseq			__x64_sys_rseq
 335	common	io_setup2		__x64_sys_io_setup2
+336	common	io_ring_enter		__x64_sys_io_ring_enter
 
 #
 # x32-specific system call numbers start at 512 to avoid cache impact
diff --git a/fs/aio.c b/fs/aio.c
index 9e9b49fe9a8b..a49109e69334 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -95,6 +95,18 @@  struct ctx_rq_wait {
 	atomic_t count;
 };
 
+struct aio_mapped_range {
+	struct page **pages;
+	long nr_pages;
+};
+
+struct aio_iocb_ring {
+	struct aio_mapped_range ring_range;	/* maps user SQ ring */
+	struct aio_sq_ring *ring;
+
+	struct aio_mapped_range iocb_range;	/* maps user iocbs */
+};
+
 struct kioctx {
 	struct percpu_ref	users;
 	atomic_t		dead;
@@ -130,6 +142,11 @@  struct kioctx {
 	struct page		**ring_pages;
 	long			nr_pages;
 
+	/* if used, completion and submission rings */
+	struct aio_iocb_ring	sq_ring;
+	struct aio_mapped_range cq_ring;
+	int			cq_ring_overflow;
+
 	struct rcu_work		free_rwork;	/* see free_ioctx() */
 
 	/*
@@ -285,6 +302,14 @@  static struct vfsmount *aio_mnt;
 static const struct file_operations aio_ring_fops;
 static const struct address_space_operations aio_ctx_aops;
 
+static const unsigned int array_page_shift =
+				ilog2(PAGE_SIZE / sizeof(u32));
+static const unsigned int iocb_page_shift =
+				ilog2(PAGE_SIZE / sizeof(struct iocb));
+static const unsigned int event_page_shift =
+				ilog2(PAGE_SIZE / sizeof(struct io_event));
+
+static void aio_scqring_unmap(struct kioctx *);
 static void aio_iopoll_reap_events(struct kioctx *);
 
 static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
@@ -515,6 +540,12 @@  static const struct address_space_operations aio_ctx_aops = {
 #endif
 };
 
+/* Polled IO or SQ/CQ rings don't use the old ring */
+static bool aio_ctx_old_ring(struct kioctx *ctx)
+{
+	return !(ctx->flags & (IOCTX_FLAG_IOPOLL | IOCTX_FLAG_SCQRING));
+}
+
 static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
 {
 	struct aio_ring *ring;
@@ -529,7 +560,7 @@  static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
 	 * IO polling doesn't require any io event entries
 	 */
 	size = sizeof(struct aio_ring);
-	if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
+	if (aio_ctx_old_ring(ctx)) {
 		nr_events += 2;	/* 1 is required, 2 for good luck */
 		size += sizeof(struct io_event) * nr_events;
 	}
@@ -621,7 +652,7 @@  static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
  */
 static bool aio_ctx_supports_cancel(struct kioctx *ctx)
 {
-	return (ctx->flags & IOCTX_FLAG_IOPOLL) == 0;
+	return (ctx->flags & (IOCTX_FLAG_IOPOLL | IOCTX_FLAG_SCQRING)) == 0;
 }
 
 #define AIO_EVENTS_PER_PAGE	(PAGE_SIZE / sizeof(struct io_event))
@@ -657,6 +688,7 @@  static void free_ioctx(struct work_struct *work)
 					  free_rwork);
 	pr_debug("freeing %p\n", ctx);
 
+	aio_scqring_unmap(ctx);
 	aio_free_ring(ctx);
 	free_percpu(ctx->cpu);
 	percpu_ref_exit(&ctx->reqs);
@@ -1202,6 +1234,39 @@  static void aio_fill_event(struct io_event *ev, struct aio_kiocb *iocb,
 	ev->res2 = res2;
 }
 
+static void aio_commit_cqring(struct kioctx *ctx, unsigned next_tail)
+{
+	struct aio_cq_ring *ring = page_address(ctx->cq_ring.pages[0]);
+
+	if (next_tail != ring->tail) {
+		ring->tail = next_tail;
+		smp_wmb();
+	}
+}
+
+static struct io_event *aio_peek_cqring(struct kioctx *ctx, unsigned *ntail)
+{
+	struct aio_cq_ring *ring;
+	struct io_event *ev;
+	unsigned tail;
+
+	ring = page_address(ctx->cq_ring.pages[0]);
+
+	smp_rmb();
+	tail = READ_ONCE(ring->tail);
+	*ntail = tail + 1;
+	if (*ntail == ring->nr_events)
+		*ntail = 0;
+	if (*ntail == READ_ONCE(ring->head))
+		return NULL;
+
+	/* io_event array starts offset one into the mapped range */
+	tail++;
+	ev = page_address(ctx->cq_ring.pages[tail >> event_page_shift]);
+	tail &= ((1 << event_page_shift) - 1);
+	return ev + tail;
+}
+
 static void aio_ring_complete(struct kioctx *ctx, struct aio_kiocb *iocb,
 			      long res, long res2)
 {
@@ -1263,7 +1328,36 @@  static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
 {
 	struct kioctx *ctx = iocb->ki_ctx;
 
-	aio_ring_complete(ctx, iocb, res, res2);
+	if (ctx->flags & IOCTX_FLAG_SCQRING) {
+		unsigned long flags;
+		struct io_event *ev;
+		unsigned int tail;
+
+		/*
+		 * If we can't get a cq entry, userspace overflowed the
+		 * submission (by quite a lot). Flag it as an overflow
+		 * condition, and next io_ring_enter(2) call will return
+		 * -EOVERFLOW.
+		 */
+		spin_lock_irqsave(&ctx->completion_lock, flags);
+		ev = aio_peek_cqring(ctx, &tail);
+		if (ev) {
+			aio_fill_event(ev, iocb, res, res2);
+			aio_commit_cqring(ctx, tail);
+		} else
+			ctx->cq_ring_overflow = 1;
+		spin_unlock_irqrestore(&ctx->completion_lock, flags);
+	} else {
+		aio_ring_complete(ctx, iocb, res, res2);
+
+		/*
+		 * We have to order our ring_info tail store above and test
+		 * of the wait list below outside the wait lock.  This is
+		 * like in wake_up_bit() where clearing a bit has to be
+		 * ordered with the unlocked test.
+		 */
+		smp_mb();
+	}
 
 	/*
 	 * Check if the user asked us to deliver the result through an
@@ -1275,14 +1369,6 @@  static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
 		eventfd_ctx_put(iocb->ki_eventfd);
 	}
 
-	/*
-	 * We have to order our ring_info tail store above and test
-	 * of the wait list below outside the wait lock.  This is
-	 * like in wake_up_bit() where clearing a bit has to be
-	 * ordered with the unlocked test.
-	 */
-	smp_mb();
-
 	if (waitqueue_active(&ctx->wait))
 		wake_up(&ctx->wait);
 	iocb_put(iocb);
@@ -1405,6 +1491,9 @@  static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
 		return 0;
 
 	list_for_each_entry_safe(iocb, n, &ctx->poll_completing, ki_list) {
+		struct io_event *ev = NULL;
+		unsigned int next_tail;
+
 		if (*nr_events == max)
 			break;
 		if (!test_bit(KIOCB_F_POLL_COMPLETED, &iocb->ki_flags))
@@ -1412,6 +1501,14 @@  static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
 		if (to_free == AIO_IOPOLL_BATCH)
 			iocb_put_many(ctx, iocbs, &to_free);
 
+		/* Will only happen if the application over-commits */
+		ret = -EAGAIN;
+		if (ctx->flags & IOCTX_FLAG_SCQRING) {
+			ev = aio_peek_cqring(ctx, &next_tail);
+			if (!ev)
+				break;
+		}
+
 		list_del(&iocb->ki_list);
 		iocbs[to_free++] = iocb;
 
@@ -1430,8 +1527,11 @@  static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
 			file_count = 1;
 		}
 
-		if (evs && copy_to_user(evs + *nr_events, &iocb->ki_ev,
-		    sizeof(iocb->ki_ev))) {
+		if (ev) {
+			memcpy(ev, &iocb->ki_ev, sizeof(*ev));
+			aio_commit_cqring(ctx, next_tail);
+		} else if (evs && copy_to_user(evs + *nr_events, &iocb->ki_ev,
+				sizeof(iocb->ki_ev))) {
 			ret = -EFAULT;
 			break;
 		}
@@ -1612,24 +1712,139 @@  static long read_events(struct kioctx *ctx, long min_nr, long nr,
 	return ret;
 }
 
+static void aio_unmap_range(struct aio_mapped_range *range)
+{
+	int i;
+
+	if (!range->nr_pages)
+		return;
+
+	for (i = 0; i < range->nr_pages; i++)
+		put_page(range->pages[i]);
+
+	kfree(range->pages);
+	range->pages = NULL;
+	range->nr_pages = 0;
+}
+
+static int aio_map_range(struct aio_mapped_range *range, void __user *uaddr,
+			 size_t size, int gup_flags)
+{
+	int nr_pages, ret;
+
+	if ((unsigned long) uaddr & ~PAGE_MASK)
+		return -EINVAL;
+
+	nr_pages = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+
+	range->pages = kzalloc(nr_pages * sizeof(struct page *), GFP_KERNEL);
+	if (!range->pages)
+		return -ENOMEM;
+
+	down_write(&current->mm->mmap_sem);
+	ret = get_user_pages((unsigned long) uaddr, nr_pages, gup_flags,
+				range->pages, NULL);
+	up_write(&current->mm->mmap_sem);
+
+	if (ret < nr_pages) {
+		kfree(range->pages);
+		return -ENOMEM;
+	}
+
+	range->nr_pages = nr_pages;
+	return 0;
+}
+
+static void aio_scqring_unmap(struct kioctx *ctx)
+{
+	aio_unmap_range(&ctx->sq_ring.ring_range);
+	aio_unmap_range(&ctx->sq_ring.iocb_range);
+	aio_unmap_range(&ctx->cq_ring);
+}
+
+static int aio_scqring_map(struct kioctx *ctx,
+			   struct aio_sq_ring __user *sq_ring,
+			   struct aio_cq_ring __user *cq_ring)
+{
+	int ret, sq_ring_size, cq_ring_size;
+	struct aio_cq_ring *kcq_ring;
+	void __user *uptr;
+	size_t size;
+
+	/* Two is the minimum size we can support. */
+	if (ctx->max_reqs < 2)
+		return -EINVAL;
+
+	/*
+	 * The CQ ring size is QD + 1, so we don't have to track full condition
+	 * for head == tail. The SQ ring we make twice that in size, to make
+	 * room for having more inflight than the QD.
+	 */
+	sq_ring_size = ctx->max_reqs;
+	cq_ring_size = 2 * ctx->max_reqs;
+
+	/* Map SQ ring and iocbs */
+	size = sizeof(struct aio_sq_ring) + sq_ring_size * sizeof(u32);
+	ret = aio_map_range(&ctx->sq_ring.ring_range, sq_ring, size, FOLL_WRITE);
+	if (ret)
+		return ret;
+
+	ctx->sq_ring.ring = page_address(ctx->sq_ring.ring_range.pages[0]);
+	if (ctx->sq_ring.ring->nr_events < sq_ring_size) {
+		ret = -EFAULT;
+		goto err;
+	}
+	ctx->sq_ring.ring->nr_events = sq_ring_size;
+	ctx->sq_ring.ring->head = ctx->sq_ring.ring->tail = 0;
+
+	size = sizeof(struct iocb) * sq_ring_size;
+	uptr = (void __user *) (unsigned long) ctx->sq_ring.ring->iocbs;
+	ret = aio_map_range(&ctx->sq_ring.iocb_range, uptr, size, 0);
+	if (ret)
+		goto err;
+
+	/* Map CQ ring and io_events */
+	size = sizeof(struct aio_cq_ring) +
+			cq_ring_size * sizeof(struct io_event);
+	ret = aio_map_range(&ctx->cq_ring, cq_ring, size, FOLL_WRITE);
+	if (ret)
+		goto err;
+
+	kcq_ring = page_address(ctx->cq_ring.pages[0]);
+	if (kcq_ring->nr_events < cq_ring_size) {
+		ret = -EFAULT;
+		goto err;
+	}
+	kcq_ring->nr_events = cq_ring_size;
+	kcq_ring->head = kcq_ring->tail = 0;
+
+err:
+	if (ret) {
+		aio_unmap_range(&ctx->sq_ring.ring_range);
+		aio_unmap_range(&ctx->sq_ring.iocb_range);
+		aio_unmap_range(&ctx->cq_ring);
+	}
+	return ret;
+}
+
 /* sys_io_setup2:
  *	Like sys_io_setup(), except that it takes a set of flags
  *	(IOCTX_FLAG_*), and some pointers to user structures:
  *
- *	*user1 - reserved for future use
+ *	*sq_ring - pointer to the userspace SQ ring, if used.
  *
- *	*user2 - reserved for future use.
+ *	*cq_ring - pointer to the userspace CQ ring, if used.
  */
-SYSCALL_DEFINE5(io_setup2, u32, nr_events, u32, flags, void __user *, user1,
-		void __user *, user2, aio_context_t __user *, ctxp)
+SYSCALL_DEFINE5(io_setup2, u32, nr_events, u32, flags,
+		struct aio_sq_ring __user *, sq_ring,
+		struct aio_cq_ring __user *, cq_ring,
+		aio_context_t __user *, ctxp)
 {
 	struct kioctx *ioctx;
 	unsigned long ctx;
 	long ret;
 
-	if (user1 || user2)
-		return -EINVAL;
-	if (flags & ~IOCTX_FLAG_IOPOLL)
+	if (flags & ~(IOCTX_FLAG_IOPOLL | IOCTX_FLAG_SCQRING))
 		return -EINVAL;
 
 	ret = get_user(ctx, ctxp);
@@ -1641,9 +1856,17 @@  SYSCALL_DEFINE5(io_setup2, u32, nr_events, u32, flags, void __user *, user1,
 	if (IS_ERR(ioctx))
 		goto out;
 
+	if (flags & IOCTX_FLAG_SCQRING) {
+		ret = aio_scqring_map(ioctx, sq_ring, cq_ring);
+		if (ret)
+			goto err;
+	}
+
 	ret = put_user(ioctx->user_id, ctxp);
-	if (ret)
+	if (ret) {
+err:
 		kill_ioctx(current->mm, ioctx, NULL);
+	}
 	percpu_ref_put(&ioctx->users);
 out:
 	return ret;
@@ -2325,8 +2548,7 @@  static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
 		return -EINVAL;
 	}
 
-	/* Poll IO doesn't need ring reservations */
-	if (!(ctx->flags & IOCTX_FLAG_IOPOLL) && !get_reqs_available(ctx))
+	if (aio_ctx_old_ring(ctx) && !get_reqs_available(ctx))
 		return -EAGAIN;
 
 	ret = -EAGAIN;
@@ -2418,7 +2640,7 @@  static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
 		eventfd_ctx_put(req->ki_eventfd);
 	iocb_put(req);
 out_put_reqs_available:
-	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+	if (aio_ctx_old_ring(ctx))
 		put_reqs_available(ctx, 1);
 	return ret;
 }
@@ -2479,6 +2701,212 @@  static void aio_submit_state_start(struct aio_submit_state *state,
 #endif
 }
 
+static const struct iocb *aio_iocb_from_index(struct kioctx *ctx, unsigned idx)
+{
+	struct aio_mapped_range *range = &ctx->sq_ring.iocb_range;
+	const struct iocb *iocb;
+
+	iocb = page_address(range->pages[idx >> iocb_page_shift]);
+	idx &= ((1 << iocb_page_shift) - 1);
+	return iocb + idx;
+}
+
+static void aio_commit_sqring(struct kioctx *ctx, unsigned next_head)
+{
+	struct aio_sq_ring *ring = ctx->sq_ring.ring;
+
+	if (ring->head != next_head) {
+		ring->head = next_head;
+		smp_wmb();
+	}
+}
+
+static const struct iocb *aio_peek_sqring(struct kioctx *ctx,
+					  unsigned *iocb_index, unsigned *nhead)
+{
+	struct aio_mapped_range *range = &ctx->sq_ring.ring_range;
+	struct aio_sq_ring *ring = ctx->sq_ring.ring;
+	unsigned head;
+	u32 *array;
+
+	smp_rmb();
+	head = READ_ONCE(ring->head);
+	if (head == READ_ONCE(ring->tail))
+		return NULL;
+
+	*nhead = head + 1;
+	if (*nhead == ring->nr_events)
+		*nhead = 0;
+
+	/*
+	 * No guarantee the array is in the first page, so we can't just
+	 * index ring->array. Find the map and offset from the head.
+	 */
+	head += offsetof(struct aio_sq_ring, array) >> 2;
+	array = page_address(range->pages[head >> array_page_shift]);
+	head &= ((1 << array_page_shift) - 1);
+	*iocb_index = array[head];
+
+	if (*iocb_index < ring->nr_events)
+		return aio_iocb_from_index(ctx, *iocb_index);
+
+	/* drop invalid entries */
+	aio_commit_sqring(ctx, *nhead);
+	return NULL;
+}
+
+static int aio_ring_submit(struct kioctx *ctx, unsigned int to_submit)
+{
+	struct aio_submit_state state, *statep = NULL;
+	int i, ret = 0, submit = 0;
+
+	if (to_submit > AIO_PLUG_THRESHOLD) {
+		aio_submit_state_start(&state, ctx, to_submit);
+		statep = &state;
+	}
+
+	for (i = 0; i < to_submit; i++) {
+		unsigned next_head, iocb_index;
+		const struct iocb *iocb;
+
+		iocb = aio_peek_sqring(ctx, &iocb_index, &next_head);
+		if (!iocb)
+			break;
+
+		ret = __io_submit_one(ctx, iocb, iocb_index, statep, false);
+		if (ret)
+			break;
+
+		submit++;
+		aio_commit_sqring(ctx, next_head);
+	}
+
+	if (statep)
+		aio_submit_state_end(statep);
+
+	return submit ? submit : ret;
+}
+
+/*
+ * Wait until events become available, if we don't already have some. The
+ * application must reap them itself, as they reside on the shared cq ring.
+ */
+static int aio_cqring_wait(struct kioctx *ctx, int min_events)
+{
+	struct aio_cq_ring *ring = page_address(ctx->cq_ring.pages[0]);
+	DEFINE_WAIT(wait);
+	int ret = 0;
+
+	smp_rmb();
+	if (ring->head != ring->tail)
+		return 0;
+	if (!min_events)
+		return 0;
+
+	do {
+		prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
+
+		ret = 0;
+		smp_rmb();
+		if (ring->head != ring->tail)
+			break;
+
+		schedule();
+
+		ret = -EINVAL;
+		if (atomic_read(&ctx->dead))
+			break;
+		ret = -EINTR;
+		if (signal_pending(current))
+			break;
+	} while (1);
+
+	finish_wait(&ctx->wait, &wait);
+	return ret;
+}
+
+static int __io_ring_enter(struct kioctx *ctx, unsigned int to_submit,
+			   unsigned int min_complete, unsigned int flags)
+{
+	int ret = 0;
+
+	if (flags & IORING_FLAG_SUBMIT) {
+		ret = aio_ring_submit(ctx, to_submit);
+		if (ret < 0)
+			return ret;
+	}
+	if (flags & IORING_FLAG_GETEVENTS) {
+		unsigned int nr_events = 0;
+		int get_ret;
+
+		if (!ret && to_submit)
+			min_complete = 0;
+
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			get_ret = __aio_iopoll_check(ctx, NULL, &nr_events,
+							min_complete, -1U);
+		else
+			get_ret = aio_cqring_wait(ctx, min_complete);
+
+		if (get_ret < 0 && !ret)
+			ret = get_ret;
+	}
+
+	return ret;
+}
+
+/* sys_io_ring_enter:
+ *	Alternative way to both submit and complete IO, instead of using
+ *	io_submit(2) and io_getevents(2). Requires the use of the SQ/CQ
+ *	ring interface, hence the io_context must be setup with
+ *	io_setup2() and IOCTX_FLAG_SCQRING must be specified (and the
+ *	sq_ring/cq_ring passed in).
+ *
+ *	Returns the number of IOs submitted, if IORING_FLAG_SUBMIT
+ *	is used, otherwise returns 0 for IORING_FLAG_GETEVENTS success,
+ *	but not the number of events, as those will have to be found
+ *	by the application by reading the CQ ring anyway.
+ *
+ *	Apart from that, the error returns are much like io_submit()
+ *	and io_getevents(), since a lot of the same error conditions
+ *	are shared.
+ */
+SYSCALL_DEFINE4(io_ring_enter, aio_context_t, ctx_id, u32, to_submit,
+		u32, min_complete, u32, flags)
+{
+	struct kioctx *ctx;
+	long ret;
+
+	ctx = lookup_ioctx(ctx_id);
+	if (!ctx) {
+		pr_debug("EINVAL: invalid context id\n");
+		return -EINVAL;
+	}
+
+	ret = -EBUSY;
+	if (!mutex_trylock(&ctx->getevents_lock))
+		goto err;
+
+	ret = -EOVERFLOW;
+	if (ctx->cq_ring_overflow) {
+		ctx->cq_ring_overflow = 0;
+		goto err_unlock;
+	}
+
+	ret = -EINVAL;
+	if (unlikely(atomic_read(&ctx->dead)))
+		goto err_unlock;
+
+	if (ctx->flags & IOCTX_FLAG_SCQRING)
+		ret = __io_ring_enter(ctx, to_submit, min_complete, flags);
+
+err_unlock:
+	mutex_unlock(&ctx->getevents_lock);
+err:
+	percpu_ref_put(&ctx->users);
+	return ret;
+}
+
 /* sys_io_submit:
  *	Queue the nr iocbs pointed to by iocbpp for processing.  Returns
  *	the number of iocbs queued.  May return -EINVAL if the aio_context
@@ -2508,6 +2936,10 @@  SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
 		return -EINVAL;
 	}
 
+	/* SCQRING must use io_ring_enter() */
+	if (ctx->flags & IOCTX_FLAG_SCQRING)
+		return -EINVAL;
+
 	if (nr > ctx->nr_events)
 		nr = ctx->nr_events;
 
@@ -2659,7 +3091,10 @@  static long do_io_getevents(aio_context_t ctx_id,
 	long ret = -EINVAL;
 
 	if (likely(ioctx)) {
-		if (likely(min_nr <= nr && min_nr >= 0)) {
+		/* SCQRING must use io_ring_enter() */
+		if (ioctx->flags & IOCTX_FLAG_SCQRING)
+			ret = -EINVAL;
+		else if (min_nr <= nr && min_nr >= 0) {
 			if (ioctx->flags & IOCTX_FLAG_IOPOLL)
 				ret = aio_iopoll_check(ioctx, min_nr, nr, events);
 			else
diff --git a/include/linux/syscalls.h b/include/linux/syscalls.h
index 67b7f03aa9fc..ebcc73d8a6ad 100644
--- a/include/linux/syscalls.h
+++ b/include/linux/syscalls.h
@@ -287,8 +287,10 @@  static inline void addr_limit_user_check(void)
  */
 #ifndef CONFIG_ARCH_HAS_SYSCALL_WRAPPER
 asmlinkage long sys_io_setup(unsigned nr_reqs, aio_context_t __user *ctx);
-asmlinkage long sys_io_setup2(unsigned, unsigned, void __user *, void __user *,
+asmlinkage long sys_io_setup2(unsigned, unsigned, struct aio_sq_ring __user *,
+				struct aio_cq_ring __user *,
 				aio_context_t __user *);
+asmlinkage long sys_io_ring_enter(aio_context_t, unsigned, unsigned, unsigned);
 asmlinkage long sys_io_destroy(aio_context_t ctx);
 asmlinkage long sys_io_submit(aio_context_t, long,
 			struct iocb __user * __user *);
diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
index a6829bae9ada..5d3ada40ce15 100644
--- a/include/uapi/linux/aio_abi.h
+++ b/include/uapi/linux/aio_abi.h
@@ -109,6 +109,35 @@  struct iocb {
 }; /* 64 bytes */
 
 #define IOCTX_FLAG_IOPOLL	(1 << 0)	/* io_context is polled */
+#define IOCTX_FLAG_SCQRING	(1 << 1)	/* Use SQ/CQ rings */
+
+struct aio_sq_ring {
+	union {
+		struct {
+			u32 head;	/* kernel consumer head */
+			u32 tail;	/* app producer tail */
+			u32 nr_events;	/* max events in ring */
+			u64 iocbs;	/* setup pointer to app iocbs */
+		};
+		u32 pad[16];
+	};
+	u32 array[0];			/* actual ring, index to iocbs */
+};
+
+struct aio_cq_ring {
+	union {
+		struct {
+			u32 head;	/* app consumer head */
+			u32 tail;	/* kernel producer tail */
+			u32 nr_events;	/* max events in ring */
+		};
+		struct io_event pad;
+	};
+	struct io_event events[0];	/* ring, array of io_events */
+};
+
+#define IORING_FLAG_SUBMIT	(1 << 0)
+#define IORING_FLAG_GETEVENTS	(1 << 1)
 
 #undef IFBIG
 #undef IFLITTLE
diff --git a/kernel/sys_ni.c b/kernel/sys_ni.c
index 17c8b4393669..a32b7ea93838 100644
--- a/kernel/sys_ni.c
+++ b/kernel/sys_ni.c
@@ -38,6 +38,7 @@  asmlinkage long sys_ni_syscall(void)
 
 COND_SYSCALL(io_setup);
 COND_SYSCALL(io_setup2);
+COND_SYSCALL(io_ring_enter);
 COND_SYSCALL_COMPAT(io_setup);
 COND_SYSCALL(io_destroy);
 COND_SYSCALL(io_submit);