diff mbox series

[27/27] aio: add support for pre-mapped user IO buffers

Message ID 20181130165646.27341-28-axboe@kernel.dk (mailing list archive)
State New, archived
Headers show
Series [01/27] aio: fix failure to put the file pointer | expand

Commit Message

Jens Axboe Nov. 30, 2018, 4:56 p.m. UTC
If we have fixed user buffers, we can map them into the kernel when we
setup the io_context. That avoids the need to do get_user_pages() for
each and every IO.

To utilize this feature, the application must set both
IOCTX_FLAG_USERIOCB, to provide iocb's in userspace, and then
IOCTX_FLAG_FIXEDBUFS. The latter tells aio that the iocbs that are
mapped already contain valid destination and sizes. These buffers can
then be mapped into the kernel for the life time of the io_context, as
opposed to just the duration of the each single IO.

Only works with non-vectored read/write commands for now, not with
PREADV/PWRITEV.

A limit of 4M is imposed as the largest buffer we currently support.
There's nothing preventing us from going larger, but we need some cap,
and 4M seemed like it would definitely be big enough.

See the fio change for how to utilize this feature:

http://git.kernel.dk/cgit/fio/commit/?id=2041bd343da1c1e955253f62374588718c64f0f3

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c                     | 185 +++++++++++++++++++++++++++++++----
 include/uapi/linux/aio_abi.h |   1 +
 2 files changed, 169 insertions(+), 17 deletions(-)

Comments

Jeff Moyer Nov. 30, 2018, 9:44 p.m. UTC | #1
Hi, Jens,

Jens Axboe <axboe@kernel.dk> writes:

> If we have fixed user buffers, we can map them into the kernel when we
> setup the io_context. That avoids the need to do get_user_pages() for
> each and every IO.
>
> To utilize this feature, the application must set both
> IOCTX_FLAG_USERIOCB, to provide iocb's in userspace, and then
> IOCTX_FLAG_FIXEDBUFS. The latter tells aio that the iocbs that are
> mapped already contain valid destination and sizes. These buffers can
> then be mapped into the kernel for the life time of the io_context, as
> opposed to just the duration of the each single IO.
>
> Only works with non-vectored read/write commands for now, not with
> PREADV/PWRITEV.
>
> A limit of 4M is imposed as the largest buffer we currently support.
> There's nothing preventing us from going larger, but we need some cap,
> and 4M seemed like it would definitely be big enough.

Doesn't this mean that a user can pin a bunch of memory?  Something like
4MB * aio_max_nr?

$ sysctl fs.aio-max-nr
fs.aio-max-nr = 1048576

If so, it may be a good idea to account the memory under RLIMIT_MEMLOCK.

I'm not sure how close you are to proposing this patch set for realz.
If it's soon (now?), then CC-ing linux-api and writing man pages would
be a good idea.  I can help out with the libaio bits if you'd like.  I
haven't yet had time to take this stuff for a spin, sorry.  I'll try to
get to that soonish.

The speedups are pretty impressive!

Cheers,
Jeff


> See the fio change for how to utilize this feature:
>
> http://git.kernel.dk/cgit/fio/commit/?id=2041bd343da1c1e955253f62374588718c64f0f3
>
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/aio.c                     | 185 +++++++++++++++++++++++++++++++----
>  include/uapi/linux/aio_abi.h |   1 +
>  2 files changed, 169 insertions(+), 17 deletions(-)
>
> diff --git a/fs/aio.c b/fs/aio.c
> index 426939f1dae9..f735967488a5 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -42,6 +42,7 @@
>  #include <linux/ramfs.h>
>  #include <linux/percpu-refcount.h>
>  #include <linux/mount.h>
> +#include <linux/sizes.h>
>  
>  #include <asm/kmap_types.h>
>  #include <linux/uaccess.h>
> @@ -86,6 +87,11 @@ struct ctx_rq_wait {
>  	atomic_t count;
>  };
>  
> +struct aio_mapped_ubuf {
> +	struct kvec *kvec;
> +	unsigned int nr_kvecs;
> +};
> +
>  struct kioctx {
>  	struct percpu_ref	users;
>  	atomic_t		dead;
> @@ -124,6 +130,8 @@ struct kioctx {
>  	struct page		**iocb_pages;
>  	long			iocb_nr_pages;
>  
> +	struct aio_mapped_ubuf	*user_bufs;
> +
>  	struct rcu_work		free_rwork;	/* see free_ioctx() */
>  
>  	/*
> @@ -290,6 +298,7 @@ static const bool aio_use_state_req_list = false;
>  #endif
>  
>  static void aio_useriocb_free(struct kioctx *);
> +static void aio_iocb_buffer_unmap(struct kioctx *);
>  static void aio_iopoll_reap_events(struct kioctx *);
>  
>  static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
> @@ -652,6 +661,7 @@ static void free_ioctx(struct work_struct *work)
>  					  free_rwork);
>  	pr_debug("freeing %p\n", ctx);
>  
> +	aio_iocb_buffer_unmap(ctx);
>  	aio_useriocb_free(ctx);
>  	aio_free_ring(ctx);
>  	free_percpu(ctx->cpu);
> @@ -1597,6 +1607,115 @@ static struct iocb *aio_iocb_from_index(struct kioctx *ctx, int index)
>  	return iocb + index;
>  }
>  
> +static void aio_iocb_buffer_unmap(struct kioctx *ctx)
> +{
> +	int i, j;
> +
> +	if (!ctx->user_bufs)
> +		return;
> +
> +	for (i = 0; i < ctx->max_reqs; i++) {
> +		struct aio_mapped_ubuf *amu = &ctx->user_bufs[i];
> +
> +		for (j = 0; j < amu->nr_kvecs; j++) {
> +			struct page *page;
> +
> +			page = virt_to_page(amu->kvec[j].iov_base);
> +			put_page(page);
> +		}
> +		kfree(amu->kvec);
> +		amu->nr_kvecs = 0;
> +	}
> +
> +	kfree(ctx->user_bufs);
> +	ctx->user_bufs = NULL;
> +}
> +
> +static int aio_iocb_buffer_map(struct kioctx *ctx)
> +{
> +	struct page **pages = NULL;
> +	int i, j, got_pages = 0;
> +	struct iocb *iocb;
> +	int ret = -EINVAL;
> +
> +	ctx->user_bufs = kzalloc(ctx->max_reqs * sizeof(struct aio_mapped_ubuf),
> +					GFP_KERNEL);
> +	if (!ctx->user_bufs)
> +		return -ENOMEM;
> +
> +	for (i = 0; i < ctx->max_reqs; i++) {
> +		struct aio_mapped_ubuf *amu = &ctx->user_bufs[i];
> +		unsigned long off, start, end, ubuf;
> +		int pret, nr_pages;
> +		size_t size;
> +
> +		iocb = aio_iocb_from_index(ctx, i);
> +
> +		/*
> +		 * Don't impose further limits on the size and buffer
> +		 * constraints here, we'll -EINVAL later when IO is
> +		 * submitted if they are wrong.
> +		 */
> +		ret = -EFAULT;
> +		if (!iocb->aio_buf)
> +			goto err;
> +
> +		/* arbitrary limit, but we need something */
> +		if (iocb->aio_nbytes > SZ_4M)
> +			goto err;
> +
> +		ubuf = iocb->aio_buf;
> +		end = (ubuf + iocb->aio_nbytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
> +		start = ubuf >> PAGE_SHIFT;
> +		nr_pages = end - start;
> +
> +		if (!pages || nr_pages > got_pages) {
> +			kfree(pages);
> +			pages = kmalloc(nr_pages * sizeof(struct page *),
> +					GFP_KERNEL);
> +			if (!pages) {
> +				ret = -ENOMEM;
> +				goto err;
> +			}
> +			got_pages = nr_pages;
> +		}
> +
> +		amu->kvec = kmalloc(nr_pages * sizeof(struct kvec), GFP_KERNEL);
> +		if (!amu->kvec)
> +			goto err;
> +
> +		down_write(&current->mm->mmap_sem);
> +		pret = get_user_pages((unsigned long) iocb->aio_buf, nr_pages,
> +					1, pages, NULL);
> +		up_write(&current->mm->mmap_sem);
> +
> +		if (pret < nr_pages) {
> +			if (pret < 0)
> +				ret = pret;
> +			goto err;
> +		}
> +
> +		off = ubuf & ~PAGE_MASK;
> +		size = iocb->aio_nbytes;
> +		for (j = 0; j < nr_pages; j++) {
> +			size_t vec_len;
> +
> +			vec_len = min_t(size_t, size, PAGE_SIZE - off);
> +			amu->kvec[j].iov_base = page_address(pages[j]) + off;
> +			amu->kvec[j].iov_len = vec_len;
> +			off = 0;
> +			size -= vec_len;
> +		}
> +		amu->nr_kvecs = nr_pages;
> +	}
> +	kfree(pages);
> +	return 0;
> +err:
> +	kfree(pages);
> +	aio_iocb_buffer_unmap(ctx);
> +	return ret;
> +}
> +
>  static void aio_useriocb_free(struct kioctx *ctx)
>  {
>  	int i;
> @@ -1647,7 +1766,8 @@ SYSCALL_DEFINE4(io_setup2, u32, nr_events, u32, flags, struct iocb * __user,
>  	unsigned long ctx;
>  	long ret;
>  
> -	if (flags & ~(IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL))
> +	if (flags & ~(IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL |
> +		      IOCTX_FLAG_FIXEDBUFS))
>  		return -EINVAL;
>  
>  	ret = get_user(ctx, ctxp);
> @@ -1663,6 +1783,15 @@ SYSCALL_DEFINE4(io_setup2, u32, nr_events, u32, flags, struct iocb * __user,
>  		ret = aio_useriocb_map(ioctx, iocbs);
>  		if (ret)
>  			goto err;
> +		if (flags & IOCTX_FLAG_FIXEDBUFS) {
> +			ret = aio_iocb_buffer_map(ioctx);
> +			if (ret)
> +				goto err;
> +		}
> +	} else if (flags & IOCTX_FLAG_FIXEDBUFS) {
> +		/* can only support fixed bufs with user mapped iocbs */
> +		ret = -EINVAL;
> +		goto err;
>  	}
>  
>  	ret = put_user(ioctx->user_id, ctxp);
> @@ -1939,23 +2068,38 @@ static int aio_prep_rw(struct aio_kiocb *kiocb, const struct iocb *iocb,
>  	return ret;
>  }
>  
> -static int aio_setup_rw(int rw, const struct iocb *iocb, struct iovec **iovec,
> -		bool vectored, bool compat, struct iov_iter *iter)
> +static int aio_setup_rw(int rw, struct aio_kiocb *kiocb,
> +		const struct iocb *iocb, struct iovec **iovec, bool vectored,
> +		bool compat, bool kvecs, struct iov_iter *iter)
>  {
> -	void __user *buf = (void __user *)(uintptr_t)iocb->aio_buf;
> +	void __user *ubuf = (void __user *)(uintptr_t)iocb->aio_buf;
>  	size_t len = iocb->aio_nbytes;
>  
>  	if (!vectored) {
> -		ssize_t ret = import_single_range(rw, buf, len, *iovec, iter);
> +		ssize_t ret;
> +
> +		if (!kvecs) {
> +			ret = import_single_range(rw, ubuf, len, *iovec, iter);
> +		} else {
> +			long index = (long) kiocb->ki_user_iocb;
> +			struct aio_mapped_ubuf *amu;
> +
> +			/* __io_submit_one() already validated the index */
> +			amu = &kiocb->ki_ctx->user_bufs[index];
> +			ret = import_kvec(rw, amu->kvec, amu->nr_kvecs,
> +						len, iter);
> +		}
>  		*iovec = NULL;
>  		return ret;
>  	}
> +	if (kvecs)
> +		return -EINVAL;
>  #ifdef CONFIG_COMPAT
>  	if (compat)
> -		return compat_import_iovec(rw, buf, len, UIO_FASTIOV, iovec,
> +		return compat_import_iovec(rw, ubuf, len, UIO_FASTIOV, iovec,
>  				iter);
>  #endif
> -	return import_iovec(rw, buf, len, UIO_FASTIOV, iovec, iter);
> +	return import_iovec(rw, ubuf, len, UIO_FASTIOV, iovec, iter);
>  }
>  
>  static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
> @@ -2028,7 +2172,7 @@ static void aio_iopoll_iocb_issued(struct aio_submit_state *state,
>  
>  static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
>  			struct aio_submit_state *state, bool vectored,
> -			bool compat)
> +			bool compat, bool kvecs)
>  {
>  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
>  	struct kiocb *req = &kiocb->rw;
> @@ -2048,9 +2192,11 @@ static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
>  	if (unlikely(!file->f_op->read_iter))
>  		goto out_fput;
>  
> -	ret = aio_setup_rw(READ, iocb, &iovec, vectored, compat, &iter);
> +	ret = aio_setup_rw(READ, kiocb, iocb, &iovec, vectored, compat, kvecs,
> +				&iter);
>  	if (ret)
>  		goto out_fput;
> +
>  	ret = rw_verify_area(READ, file, &req->ki_pos, iov_iter_count(&iter));
>  	if (!ret)
>  		aio_rw_done(req, call_read_iter(file, req, &iter));
> @@ -2063,7 +2209,7 @@ static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
>  
>  static ssize_t aio_write(struct aio_kiocb *kiocb, const struct iocb *iocb,
>  			 struct aio_submit_state *state, bool vectored,
> -			 bool compat)
> +			 bool compat, bool kvecs)
>  {
>  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
>  	struct kiocb *req = &kiocb->rw;
> @@ -2083,7 +2229,8 @@ static ssize_t aio_write(struct aio_kiocb *kiocb, const struct iocb *iocb,
>  	if (unlikely(!file->f_op->write_iter))
>  		goto out_fput;
>  
> -	ret = aio_setup_rw(WRITE, iocb, &iovec, vectored, compat, &iter);
> +	ret = aio_setup_rw(WRITE, kiocb, iocb, &iovec, vectored, compat, kvecs,
> +				&iter);
>  	if (ret)
>  		goto out_fput;
>  	ret = rw_verify_area(WRITE, file, &req->ki_pos, iov_iter_count(&iter));
> @@ -2322,7 +2469,8 @@ static ssize_t aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
>  
>  static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
>  			   struct iocb __user *user_iocb,
> -			   struct aio_submit_state *state, bool compat)
> +			   struct aio_submit_state *state, bool compat,
> +			   bool kvecs)
>  {
>  	struct aio_kiocb *req;
>  	ssize_t ret;
> @@ -2382,16 +2530,16 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
>  	ret = -EINVAL;
>  	switch (iocb->aio_lio_opcode) {
>  	case IOCB_CMD_PREAD:
> -		ret = aio_read(req, iocb, state, false, compat);
> +		ret = aio_read(req, iocb, state, false, compat, kvecs);
>  		break;
>  	case IOCB_CMD_PWRITE:
> -		ret = aio_write(req, iocb, state, false, compat);
> +		ret = aio_write(req, iocb, state, false, compat, kvecs);
>  		break;
>  	case IOCB_CMD_PREADV:
> -		ret = aio_read(req, iocb, state, true, compat);
> +		ret = aio_read(req, iocb, state, true, compat, kvecs);
>  		break;
>  	case IOCB_CMD_PWRITEV:
> -		ret = aio_write(req, iocb, state, true, compat);
> +		ret = aio_write(req, iocb, state, true, compat, kvecs);
>  		break;
>  	case IOCB_CMD_FSYNC:
>  		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> @@ -2443,6 +2591,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
>  			 struct aio_submit_state *state, bool compat)
>  {
>  	struct iocb iocb, *iocbp;
> +	bool kvecs;
>  
>  	if (ctx->flags & IOCTX_FLAG_USERIOCB) {
>  		unsigned long iocb_index = (unsigned long) user_iocb;
> @@ -2450,14 +2599,16 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
>  		if (iocb_index >= ctx->max_reqs)
>  			return -EINVAL;
>  
> +		kvecs = (ctx->flags & IOCTX_FLAG_FIXEDBUFS) != 0;
>  		iocbp = aio_iocb_from_index(ctx, iocb_index);
>  	} else {
>  		if (unlikely(copy_from_user(&iocb, user_iocb, sizeof(iocb))))
>  			return -EFAULT;
> +		kvecs = false;
>  		iocbp = &iocb;
>  	}
>  
> -	return __io_submit_one(ctx, iocbp, user_iocb, state, compat);
> +	return __io_submit_one(ctx, iocbp, user_iocb, state, compat, kvecs);
>  }
>  
>  #ifdef CONFIG_BLOCK
> diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
> index ea0b9a19f4df..05d72cf86bd3 100644
> --- a/include/uapi/linux/aio_abi.h
> +++ b/include/uapi/linux/aio_abi.h
> @@ -110,6 +110,7 @@ struct iocb {
>  
>  #define IOCTX_FLAG_USERIOCB	(1 << 0)	/* iocbs are user mapped */
>  #define IOCTX_FLAG_IOPOLL	(1 << 1)	/* io_context is polled */
> +#define IOCTX_FLAG_FIXEDBUFS	(1 << 2)	/* IO buffers are fixed */
>  
>  #undef IFBIG
>  #undef IFLITTLE
Jens Axboe Nov. 30, 2018, 9:57 p.m. UTC | #2
On 11/30/18 2:44 PM, Jeff Moyer wrote:
> Hi, Jens,
> 
> Jens Axboe <axboe@kernel.dk> writes:
> 
>> If we have fixed user buffers, we can map them into the kernel when we
>> setup the io_context. That avoids the need to do get_user_pages() for
>> each and every IO.
>>
>> To utilize this feature, the application must set both
>> IOCTX_FLAG_USERIOCB, to provide iocb's in userspace, and then
>> IOCTX_FLAG_FIXEDBUFS. The latter tells aio that the iocbs that are
>> mapped already contain valid destination and sizes. These buffers can
>> then be mapped into the kernel for the life time of the io_context, as
>> opposed to just the duration of the each single IO.
>>
>> Only works with non-vectored read/write commands for now, not with
>> PREADV/PWRITEV.
>>
>> A limit of 4M is imposed as the largest buffer we currently support.
>> There's nothing preventing us from going larger, but we need some cap,
>> and 4M seemed like it would definitely be big enough.
> 
> Doesn't this mean that a user can pin a bunch of memory?  Something like
> 4MB * aio_max_nr?
> 
> $ sysctl fs.aio-max-nr
> fs.aio-max-nr = 1048576
> 
> If so, it may be a good idea to account the memory under RLIMIT_MEMLOCK.

Yes, it'll need some kind of limiting, right now the limit would indeed
be aio-max-nr * 4MB. 4G isn't terrible, but...

RLIMIT_MEMLOCK isn't a bad idea.

> I'm not sure how close you are to proposing this patch set for realz.
> If it's soon (now?), then CC-ing linux-api and writing man pages would
> be a good idea.  I can help out with the libaio bits if you'd like.  I
> haven't yet had time to take this stuff for a spin, sorry.  I'll try to
> get to that soonish.

I am proposing it for real, not sure how long it'll take to get it
reviewed and moved forward. Unless I get lucky. 4.22 seems like a more
viable version than 4.21.

I'll take any help I can get on the API/man page parts. And/or testing!

> The speedups are pretty impressive!

That's why I put them in there, maybe that'd get peoples attention :-)
Jeff Moyer Nov. 30, 2018, 10:04 p.m. UTC | #3
Jens Axboe <axboe@kernel.dk> writes:

>>> A limit of 4M is imposed as the largest buffer we currently support.
>>> There's nothing preventing us from going larger, but we need some cap,
>>> and 4M seemed like it would definitely be big enough.
>> 
>> Doesn't this mean that a user can pin a bunch of memory?  Something like
>> 4MB * aio_max_nr?
>> 
>> $ sysctl fs.aio-max-nr
>> fs.aio-max-nr = 1048576
>> 
>> If so, it may be a good idea to account the memory under RLIMIT_MEMLOCK.
>
> Yes, it'll need some kind of limiting, right now the limit would indeed
> be aio-max-nr * 4MB. 4G isn't terrible, but...

Unless my math's wrong, that's 4TiB on my system.  ;-)

> RLIMIT_MEMLOCK isn't a bad idea.
>
>> I'm not sure how close you are to proposing this patch set for realz.
>> If it's soon (now?), then CC-ing linux-api and writing man pages would
>> be a good idea.  I can help out with the libaio bits if you'd like.  I
>> haven't yet had time to take this stuff for a spin, sorry.  I'll try to
>> get to that soonish.
>
> I am proposing it for real, not sure how long it'll take to get it
> reviewed and moved forward. Unless I get lucky. 4.22 seems like a more
> viable version than 4.21.
>
> I'll take any help I can get on the API/man page parts. And/or testing!

OK, I'll add libaio support (including unit tests), write the man page,
and I'll definitely do some testing.  I'll start on all that probably in
the latter half of next week.

>> The speedups are pretty impressive!
>
> That's why I put them in there, maybe that'd get peoples attention :-)

Indeed.  :)

Cheers,
Jeff
Jens Axboe Nov. 30, 2018, 10:11 p.m. UTC | #4
On 11/30/18 3:04 PM, Jeff Moyer wrote:
> Jens Axboe <axboe@kernel.dk> writes:
> 
>>>> A limit of 4M is imposed as the largest buffer we currently support.
>>>> There's nothing preventing us from going larger, but we need some cap,
>>>> and 4M seemed like it would definitely be big enough.
>>>
>>> Doesn't this mean that a user can pin a bunch of memory?  Something like
>>> 4MB * aio_max_nr?
>>>
>>> $ sysctl fs.aio-max-nr
>>> fs.aio-max-nr = 1048576
>>>
>>> If so, it may be a good idea to account the memory under RLIMIT_MEMLOCK.
>>
>> Yes, it'll need some kind of limiting, right now the limit would indeed
>> be aio-max-nr * 4MB. 4G isn't terrible, but...
> 
> Unless my math's wrong, that's 4TiB on my system.  ;-)

I guess that's a little more terrible ;-)

>> RLIMIT_MEMLOCK isn't a bad idea.
>>
>>> I'm not sure how close you are to proposing this patch set for realz.
>>> If it's soon (now?), then CC-ing linux-api and writing man pages would
>>> be a good idea.  I can help out with the libaio bits if you'd like.  I
>>> haven't yet had time to take this stuff for a spin, sorry.  I'll try to
>>> get to that soonish.
>>
>> I am proposing it for real, not sure how long it'll take to get it
>> reviewed and moved forward. Unless I get lucky. 4.22 seems like a more
>> viable version than 4.21.
>>
>> I'll take any help I can get on the API/man page parts. And/or testing!
> 
> OK, I'll add libaio support (including unit tests), write the man page,
> and I'll definitely do some testing.  I'll start on all that probably in
> the latter half of next week.

Awesome, that's much appreciated!
diff mbox series

Patch

diff --git a/fs/aio.c b/fs/aio.c
index 426939f1dae9..f735967488a5 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -42,6 +42,7 @@ 
 #include <linux/ramfs.h>
 #include <linux/percpu-refcount.h>
 #include <linux/mount.h>
+#include <linux/sizes.h>
 
 #include <asm/kmap_types.h>
 #include <linux/uaccess.h>
@@ -86,6 +87,11 @@  struct ctx_rq_wait {
 	atomic_t count;
 };
 
+struct aio_mapped_ubuf {
+	struct kvec *kvec;
+	unsigned int nr_kvecs;
+};
+
 struct kioctx {
 	struct percpu_ref	users;
 	atomic_t		dead;
@@ -124,6 +130,8 @@  struct kioctx {
 	struct page		**iocb_pages;
 	long			iocb_nr_pages;
 
+	struct aio_mapped_ubuf	*user_bufs;
+
 	struct rcu_work		free_rwork;	/* see free_ioctx() */
 
 	/*
@@ -290,6 +298,7 @@  static const bool aio_use_state_req_list = false;
 #endif
 
 static void aio_useriocb_free(struct kioctx *);
+static void aio_iocb_buffer_unmap(struct kioctx *);
 static void aio_iopoll_reap_events(struct kioctx *);
 
 static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
@@ -652,6 +661,7 @@  static void free_ioctx(struct work_struct *work)
 					  free_rwork);
 	pr_debug("freeing %p\n", ctx);
 
+	aio_iocb_buffer_unmap(ctx);
 	aio_useriocb_free(ctx);
 	aio_free_ring(ctx);
 	free_percpu(ctx->cpu);
@@ -1597,6 +1607,115 @@  static struct iocb *aio_iocb_from_index(struct kioctx *ctx, int index)
 	return iocb + index;
 }
 
+static void aio_iocb_buffer_unmap(struct kioctx *ctx)
+{
+	int i, j;
+
+	if (!ctx->user_bufs)
+		return;
+
+	for (i = 0; i < ctx->max_reqs; i++) {
+		struct aio_mapped_ubuf *amu = &ctx->user_bufs[i];
+
+		for (j = 0; j < amu->nr_kvecs; j++) {
+			struct page *page;
+
+			page = virt_to_page(amu->kvec[j].iov_base);
+			put_page(page);
+		}
+		kfree(amu->kvec);
+		amu->nr_kvecs = 0;
+	}
+
+	kfree(ctx->user_bufs);
+	ctx->user_bufs = NULL;
+}
+
+static int aio_iocb_buffer_map(struct kioctx *ctx)
+{
+	struct page **pages = NULL;
+	int i, j, got_pages = 0;
+	struct iocb *iocb;
+	int ret = -EINVAL;
+
+	ctx->user_bufs = kzalloc(ctx->max_reqs * sizeof(struct aio_mapped_ubuf),
+					GFP_KERNEL);
+	if (!ctx->user_bufs)
+		return -ENOMEM;
+
+	for (i = 0; i < ctx->max_reqs; i++) {
+		struct aio_mapped_ubuf *amu = &ctx->user_bufs[i];
+		unsigned long off, start, end, ubuf;
+		int pret, nr_pages;
+		size_t size;
+
+		iocb = aio_iocb_from_index(ctx, i);
+
+		/*
+		 * Don't impose further limits on the size and buffer
+		 * constraints here, we'll -EINVAL later when IO is
+		 * submitted if they are wrong.
+		 */
+		ret = -EFAULT;
+		if (!iocb->aio_buf)
+			goto err;
+
+		/* arbitrary limit, but we need something */
+		if (iocb->aio_nbytes > SZ_4M)
+			goto err;
+
+		ubuf = iocb->aio_buf;
+		end = (ubuf + iocb->aio_nbytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
+		start = ubuf >> PAGE_SHIFT;
+		nr_pages = end - start;
+
+		if (!pages || nr_pages > got_pages) {
+			kfree(pages);
+			pages = kmalloc(nr_pages * sizeof(struct page *),
+					GFP_KERNEL);
+			if (!pages) {
+				ret = -ENOMEM;
+				goto err;
+			}
+			got_pages = nr_pages;
+		}
+
+		amu->kvec = kmalloc(nr_pages * sizeof(struct kvec), GFP_KERNEL);
+		if (!amu->kvec)
+			goto err;
+
+		down_write(&current->mm->mmap_sem);
+		pret = get_user_pages((unsigned long) iocb->aio_buf, nr_pages,
+					1, pages, NULL);
+		up_write(&current->mm->mmap_sem);
+
+		if (pret < nr_pages) {
+			if (pret < 0)
+				ret = pret;
+			goto err;
+		}
+
+		off = ubuf & ~PAGE_MASK;
+		size = iocb->aio_nbytes;
+		for (j = 0; j < nr_pages; j++) {
+			size_t vec_len;
+
+			vec_len = min_t(size_t, size, PAGE_SIZE - off);
+			amu->kvec[j].iov_base = page_address(pages[j]) + off;
+			amu->kvec[j].iov_len = vec_len;
+			off = 0;
+			size -= vec_len;
+		}
+		amu->nr_kvecs = nr_pages;
+	}
+	kfree(pages);
+	return 0;
+err:
+	kfree(pages);
+	aio_iocb_buffer_unmap(ctx);
+	return ret;
+}
+
 static void aio_useriocb_free(struct kioctx *ctx)
 {
 	int i;
@@ -1647,7 +1766,8 @@  SYSCALL_DEFINE4(io_setup2, u32, nr_events, u32, flags, struct iocb * __user,
 	unsigned long ctx;
 	long ret;
 
-	if (flags & ~(IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL))
+	if (flags & ~(IOCTX_FLAG_USERIOCB | IOCTX_FLAG_IOPOLL |
+		      IOCTX_FLAG_FIXEDBUFS))
 		return -EINVAL;
 
 	ret = get_user(ctx, ctxp);
@@ -1663,6 +1783,15 @@  SYSCALL_DEFINE4(io_setup2, u32, nr_events, u32, flags, struct iocb * __user,
 		ret = aio_useriocb_map(ioctx, iocbs);
 		if (ret)
 			goto err;
+		if (flags & IOCTX_FLAG_FIXEDBUFS) {
+			ret = aio_iocb_buffer_map(ioctx);
+			if (ret)
+				goto err;
+		}
+	} else if (flags & IOCTX_FLAG_FIXEDBUFS) {
+		/* can only support fixed bufs with user mapped iocbs */
+		ret = -EINVAL;
+		goto err;
 	}
 
 	ret = put_user(ioctx->user_id, ctxp);
@@ -1939,23 +2068,38 @@  static int aio_prep_rw(struct aio_kiocb *kiocb, const struct iocb *iocb,
 	return ret;
 }
 
-static int aio_setup_rw(int rw, const struct iocb *iocb, struct iovec **iovec,
-		bool vectored, bool compat, struct iov_iter *iter)
+static int aio_setup_rw(int rw, struct aio_kiocb *kiocb,
+		const struct iocb *iocb, struct iovec **iovec, bool vectored,
+		bool compat, bool kvecs, struct iov_iter *iter)
 {
-	void __user *buf = (void __user *)(uintptr_t)iocb->aio_buf;
+	void __user *ubuf = (void __user *)(uintptr_t)iocb->aio_buf;
 	size_t len = iocb->aio_nbytes;
 
 	if (!vectored) {
-		ssize_t ret = import_single_range(rw, buf, len, *iovec, iter);
+		ssize_t ret;
+
+		if (!kvecs) {
+			ret = import_single_range(rw, ubuf, len, *iovec, iter);
+		} else {
+			long index = (long) kiocb->ki_user_iocb;
+			struct aio_mapped_ubuf *amu;
+
+			/* __io_submit_one() already validated the index */
+			amu = &kiocb->ki_ctx->user_bufs[index];
+			ret = import_kvec(rw, amu->kvec, amu->nr_kvecs,
+						len, iter);
+		}
 		*iovec = NULL;
 		return ret;
 	}
+	if (kvecs)
+		return -EINVAL;
 #ifdef CONFIG_COMPAT
 	if (compat)
-		return compat_import_iovec(rw, buf, len, UIO_FASTIOV, iovec,
+		return compat_import_iovec(rw, ubuf, len, UIO_FASTIOV, iovec,
 				iter);
 #endif
-	return import_iovec(rw, buf, len, UIO_FASTIOV, iovec, iter);
+	return import_iovec(rw, ubuf, len, UIO_FASTIOV, iovec, iter);
 }
 
 static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
@@ -2028,7 +2172,7 @@  static void aio_iopoll_iocb_issued(struct aio_submit_state *state,
 
 static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
 			struct aio_submit_state *state, bool vectored,
-			bool compat)
+			bool compat, bool kvecs)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
 	struct kiocb *req = &kiocb->rw;
@@ -2048,9 +2192,11 @@  static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
 	if (unlikely(!file->f_op->read_iter))
 		goto out_fput;
 
-	ret = aio_setup_rw(READ, iocb, &iovec, vectored, compat, &iter);
+	ret = aio_setup_rw(READ, kiocb, iocb, &iovec, vectored, compat, kvecs,
+				&iter);
 	if (ret)
 		goto out_fput;
+
 	ret = rw_verify_area(READ, file, &req->ki_pos, iov_iter_count(&iter));
 	if (!ret)
 		aio_rw_done(req, call_read_iter(file, req, &iter));
@@ -2063,7 +2209,7 @@  static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
 
 static ssize_t aio_write(struct aio_kiocb *kiocb, const struct iocb *iocb,
 			 struct aio_submit_state *state, bool vectored,
-			 bool compat)
+			 bool compat, bool kvecs)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
 	struct kiocb *req = &kiocb->rw;
@@ -2083,7 +2229,8 @@  static ssize_t aio_write(struct aio_kiocb *kiocb, const struct iocb *iocb,
 	if (unlikely(!file->f_op->write_iter))
 		goto out_fput;
 
-	ret = aio_setup_rw(WRITE, iocb, &iovec, vectored, compat, &iter);
+	ret = aio_setup_rw(WRITE, kiocb, iocb, &iovec, vectored, compat, kvecs,
+				&iter);
 	if (ret)
 		goto out_fput;
 	ret = rw_verify_area(WRITE, file, &req->ki_pos, iov_iter_count(&iter));
@@ -2322,7 +2469,8 @@  static ssize_t aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
 
 static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
 			   struct iocb __user *user_iocb,
-			   struct aio_submit_state *state, bool compat)
+			   struct aio_submit_state *state, bool compat,
+			   bool kvecs)
 {
 	struct aio_kiocb *req;
 	ssize_t ret;
@@ -2382,16 +2530,16 @@  static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
 	ret = -EINVAL;
 	switch (iocb->aio_lio_opcode) {
 	case IOCB_CMD_PREAD:
-		ret = aio_read(req, iocb, state, false, compat);
+		ret = aio_read(req, iocb, state, false, compat, kvecs);
 		break;
 	case IOCB_CMD_PWRITE:
-		ret = aio_write(req, iocb, state, false, compat);
+		ret = aio_write(req, iocb, state, false, compat, kvecs);
 		break;
 	case IOCB_CMD_PREADV:
-		ret = aio_read(req, iocb, state, true, compat);
+		ret = aio_read(req, iocb, state, true, compat, kvecs);
 		break;
 	case IOCB_CMD_PWRITEV:
-		ret = aio_write(req, iocb, state, true, compat);
+		ret = aio_write(req, iocb, state, true, compat, kvecs);
 		break;
 	case IOCB_CMD_FSYNC:
 		if (ctx->flags & IOCTX_FLAG_IOPOLL)
@@ -2443,6 +2591,7 @@  static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 			 struct aio_submit_state *state, bool compat)
 {
 	struct iocb iocb, *iocbp;
+	bool kvecs;
 
 	if (ctx->flags & IOCTX_FLAG_USERIOCB) {
 		unsigned long iocb_index = (unsigned long) user_iocb;
@@ -2450,14 +2599,16 @@  static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		if (iocb_index >= ctx->max_reqs)
 			return -EINVAL;
 
+		kvecs = (ctx->flags & IOCTX_FLAG_FIXEDBUFS) != 0;
 		iocbp = aio_iocb_from_index(ctx, iocb_index);
 	} else {
 		if (unlikely(copy_from_user(&iocb, user_iocb, sizeof(iocb))))
 			return -EFAULT;
+		kvecs = false;
 		iocbp = &iocb;
 	}
 
-	return __io_submit_one(ctx, iocbp, user_iocb, state, compat);
+	return __io_submit_one(ctx, iocbp, user_iocb, state, compat, kvecs);
 }
 
 #ifdef CONFIG_BLOCK
diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
index ea0b9a19f4df..05d72cf86bd3 100644
--- a/include/uapi/linux/aio_abi.h
+++ b/include/uapi/linux/aio_abi.h
@@ -110,6 +110,7 @@  struct iocb {
 
 #define IOCTX_FLAG_USERIOCB	(1 << 0)	/* iocbs are user mapped */
 #define IOCTX_FLAG_IOPOLL	(1 << 1)	/* io_context is polled */
+#define IOCTX_FLAG_FIXEDBUFS	(1 << 2)	/* IO buffers are fixed */
 
 #undef IFBIG
 #undef IFLITTLE